package org.infinispan.client.hotrod.event;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.Search;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.client.hotrod.query.testdomain.protobuf.UserPB;
import org.infinispan.client.hotrod.query.testdomain.protobuf.marshallers.TestDomainSCI;
import org.infinispan.client.hotrod.test.MultiHotRodServersTest;
import org.infinispan.commons.time.TimeService;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.IndexStorage;
import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.query.api.continuous.ContinuousQuery;
import org.infinispan.query.api.continuous.ContinuousQueryListener;
import org.infinispan.query.dsl.Query;
import org.infinispan.query.dsl.embedded.testdomain.User;
import org.infinispan.query.dsl.embedded.testdomain.hsearch.UserHS;
import org.infinispan.query.remote.impl.filter.IckleContinuousQueryProtobufCacheEventFilterConverterFactory;
import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.infinispan.test.TestingUtil;
import org.infinispan.util.ControlledTimeService;
import org.infinispan.util.KeyValuePair;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "client.hotrod.event.ContinuousQueryObjectStorageTest")
/* loaded from: input_file:org/infinispan/client/hotrod/event/ContinuousQueryObjectStorageTest.class */
public class ContinuousQueryObjectStorageTest extends MultiHotRodServersTest {
    private static final int NUM_NODES = 5;
    private RemoteCache<String, User> remoteCache;
    private ControlledTimeService timeService = new ControlledTimeService();

    protected void createCacheManagers() throws Throwable {
        createHotRodServers(NUM_NODES, getConfigurationBuilder());
        waitForClusterToForm();
        IckleContinuousQueryProtobufCacheEventFilterConverterFactory ickleContinuousQueryProtobufCacheEventFilterConverterFactory = new IckleContinuousQueryProtobufCacheEventFilterConverterFactory();
        for (int i = 0; i < NUM_NODES; i++) {
            server(i).addCacheEventFilterConverterFactory("continuous-query-filter-converter-factory", ickleContinuousQueryProtobufCacheEventFilterConverterFactory);
            TestingUtil.replaceComponent(server(i).getCacheManager(), TimeService.class, this.timeService, true);
        }
        this.remoteCache = client(0).getCache();
    }

    @Override // org.infinispan.client.hotrod.test.MultiHotRodServersTest
    protected List<SerializationContextInitializer> contextInitializers() {
        return Arrays.asList(TestDomainSCI.INSTANCE, ClientEventSCI.INSTANCE);
    }

    protected ConfigurationBuilder getConfigurationBuilder() {
        ConfigurationBuilder hotRodCacheConfiguration = HotRodTestingUtil.hotRodCacheConfiguration(getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false));
        hotRodCacheConfiguration.encoding().key().mediaType("application/x-java-object");
        hotRodCacheConfiguration.encoding().value().mediaType("application/x-java-object");
        hotRodCacheConfiguration.indexing().enable().storage(IndexStorage.LOCAL_HEAP).addIndexedEntities(new Class[]{UserHS.class});
        hotRodCacheConfiguration.expiration().disableReaper();
        return hotRodCacheConfiguration;
    }

    @Test(expectedExceptions = {HotRodClientException.class}, expectedExceptionsMessageRegExp = ".*ISPN028509:.*")
    public void testDisallowGroupingAndAggregation() {
        Search.getContinuousQuery(this.remoteCache).addContinuousQueryListener(Search.getQueryFactory(this.remoteCache).create("SELECT MAX(age) FROM sample_bank_account.User WHERE age >= 20"), new ContinuousQueryListener<String, Object[]>() { // from class: org.infinispan.client.hotrod.event.ContinuousQueryObjectStorageTest.1
        });
    }

    public void testContinuousQuery() {
        UserPB userPB = new UserPB();
        userPB.setId(1);
        userPB.setName("John");
        userPB.setSurname("Doe");
        userPB.setGender(User.Gender.MALE);
        userPB.setAge(22);
        userPB.setAccountIds(new HashSet(Arrays.asList(1, 2)));
        userPB.setNotes("Lorem ipsum dolor sit amet");
        UserPB userPB2 = new UserPB();
        userPB2.setId(2);
        userPB2.setName("Spider");
        userPB2.setSurname("Man");
        userPB2.setGender(User.Gender.MALE);
        userPB2.setAge(32);
        userPB2.setAccountIds(Collections.singleton(3));
        UserPB userPB3 = new UserPB();
        userPB3.setId(3);
        userPB3.setName("Spider");
        userPB3.setSurname("Woman");
        userPB3.setGender(User.Gender.FEMALE);
        userPB3.setAge(40);
        this.remoteCache.clear();
        this.remoteCache.put("user" + userPB.getId(), userPB);
        this.remoteCache.put("user" + userPB2.getId(), userPB2);
        this.remoteCache.put("user" + userPB3.getId(), userPB3);
        AssertJUnit.assertEquals(3, this.remoteCache.size());
        Query parameter = Search.getQueryFactory(this.remoteCache).create("FROM sample_bank_account.User WHERE age <= :ageParam").setParameter("ageParam", 32);
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        final LinkedBlockingQueue linkedBlockingQueue2 = new LinkedBlockingQueue();
        ContinuousQueryListener<String, User> continuousQueryListener = new ContinuousQueryListener<String, User>() { // from class: org.infinispan.client.hotrod.event.ContinuousQueryObjectStorageTest.2
            public void resultJoining(String str, User user) {
                linkedBlockingQueue.add(new KeyValuePair(str, user));
            }

            public void resultLeaving(String str) {
                linkedBlockingQueue2.add(str);
            }
        };
        ContinuousQuery continuousQuery = Search.getContinuousQuery(this.remoteCache);
        continuousQuery.addContinuousQueryListener(parameter, continuousQueryListener);
        expectElementsInQueue(linkedBlockingQueue, 2, keyValuePair -> {
            return ((User) keyValuePair.getValue()).getAge();
        }, 32, 22);
        expectElementsInQueue(linkedBlockingQueue2, 0);
        expectNoMoreElementsInQueues(linkedBlockingQueue, linkedBlockingQueue2);
        userPB3.setAge(30);
        this.remoteCache.put("user" + userPB3.getId(), userPB3);
        expectElementsInQueue(linkedBlockingQueue, 1, keyValuePair2 -> {
            return ((User) keyValuePair2.getValue()).getAge();
        }, 30);
        expectElementsInQueue(linkedBlockingQueue2, 0);
        expectNoMoreElementsInQueues(linkedBlockingQueue, linkedBlockingQueue2);
        userPB.setAge(40);
        userPB2.setAge(40);
        userPB3.setAge(40);
        this.remoteCache.put("user" + userPB.getId(), userPB);
        this.remoteCache.put("user" + userPB2.getId(), userPB2);
        this.remoteCache.put("user" + userPB3.getId(), userPB3);
        expectElementsInQueue(linkedBlockingQueue, 0);
        expectElementsInQueue(linkedBlockingQueue2, 3);
        expectNoMoreElementsInQueues(linkedBlockingQueue, linkedBlockingQueue2);
        this.remoteCache.clear();
        userPB.setAge(21);
        userPB2.setAge(22);
        this.remoteCache.put("expiredUser1", userPB, 5L, TimeUnit.MILLISECONDS);
        this.remoteCache.put("expiredUser2", userPB2, 5L, TimeUnit.MILLISECONDS);
        expectElementsInQueue(linkedBlockingQueue, 2);
        expectElementsInQueue(linkedBlockingQueue2, 0);
        expectNoMoreElementsInQueues(linkedBlockingQueue, linkedBlockingQueue2);
        this.timeService.advance(6L);
        AssertJUnit.assertNull(this.remoteCache.get("expiredUser1"));
        AssertJUnit.assertNull(this.remoteCache.get("expiredUser2"));
        expectElementsInQueue(linkedBlockingQueue, 0);
        expectElementsInQueue(linkedBlockingQueue2, 2);
        expectNoMoreElementsInQueues(linkedBlockingQueue, linkedBlockingQueue2);
        continuousQuery.removeContinuousQueryListener(continuousQueryListener);
        userPB2.setAge(22);
        this.remoteCache.put("user" + userPB2.getId(), userPB2);
        expectElementsInQueue(linkedBlockingQueue, 0);
        expectElementsInQueue(linkedBlockingQueue2, 0);
        expectNoMoreElementsInQueues(linkedBlockingQueue, linkedBlockingQueue2);
    }

    public void testContinuousQueryWithProjections() throws InterruptedException {
        UserPB userPB = new UserPB();
        userPB.setId(1);
        userPB.setName("John");
        userPB.setSurname("Doe");
        userPB.setGender(User.Gender.MALE);
        userPB.setAge(22);
        userPB.setAccountIds(new HashSet(Arrays.asList(1, 2)));
        userPB.setNotes("Lorem ipsum dolor sit amet");
        UserPB userPB2 = new UserPB();
        userPB2.setId(2);
        userPB2.setName("Spider");
        userPB2.setSurname("Man");
        userPB2.setGender(User.Gender.MALE);
        userPB2.setAge(32);
        userPB2.setAccountIds(Collections.singleton(3));
        UserPB userPB3 = new UserPB();
        userPB3.setId(3);
        userPB3.setName("Spider");
        userPB3.setSurname("Woman");
        userPB3.setGender(User.Gender.FEMALE);
        userPB3.setAge(40);
        this.remoteCache.clear();
        this.remoteCache.put("user" + userPB.getId(), userPB);
        this.remoteCache.put("user" + userPB2.getId(), userPB2);
        this.remoteCache.put("user" + userPB3.getId(), userPB3);
        AssertJUnit.assertEquals(3, this.remoteCache.size());
        Query parameter = Search.getQueryFactory(this.remoteCache).create("SELECT age FROM sample_bank_account.User WHERE age <= :ageParam").setParameter("ageParam", 32);
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        final LinkedBlockingQueue linkedBlockingQueue2 = new LinkedBlockingQueue();
        ContinuousQueryListener<String, Object[]> continuousQueryListener = new ContinuousQueryListener<String, Object[]>() { // from class: org.infinispan.client.hotrod.event.ContinuousQueryObjectStorageTest.3
            public void resultJoining(String str, Object[] objArr) {
                linkedBlockingQueue.add(new KeyValuePair(str, objArr));
            }

            public void resultLeaving(String str) {
                linkedBlockingQueue2.add(str);
            }
        };
        ContinuousQuery continuousQuery = Search.getContinuousQuery(this.remoteCache);
        continuousQuery.addContinuousQueryListener(parameter, continuousQueryListener);
        expectElementsInQueue(linkedBlockingQueue, 2, keyValuePair -> {
            return ((Object[]) keyValuePair.getValue())[0];
        }, 32, 22);
        expectElementsInQueue(linkedBlockingQueue2, 0);
        expectNoMoreElementsInQueues(linkedBlockingQueue, linkedBlockingQueue2);
        userPB3.setAge(30);
        this.remoteCache.put("user" + userPB3.getId(), userPB3);
        expectElementsInQueue(linkedBlockingQueue, 1, keyValuePair2 -> {
            return ((Object[]) keyValuePair2.getValue())[0];
        }, 30);
        expectElementsInQueue(linkedBlockingQueue2, 0);
        expectNoMoreElementsInQueues(linkedBlockingQueue, linkedBlockingQueue2);
        userPB.setAge(40);
        userPB2.setAge(40);
        userPB3.setAge(40);
        this.remoteCache.put("user" + userPB.getId(), userPB);
        this.remoteCache.put("user" + userPB2.getId(), userPB2);
        this.remoteCache.put("user" + userPB3.getId(), userPB3);
        expectElementsInQueue(linkedBlockingQueue, 0);
        expectElementsInQueue(linkedBlockingQueue2, 3);
        this.remoteCache.clear();
        userPB.setAge(21);
        userPB2.setAge(22);
        this.remoteCache.put("expiredUser1", userPB, 5L, TimeUnit.MILLISECONDS);
        this.remoteCache.put("expiredUser2", userPB2, 5L, TimeUnit.MILLISECONDS);
        expectElementsInQueue(linkedBlockingQueue, 2);
        expectElementsInQueue(linkedBlockingQueue2, 0);
        this.timeService.advance(6L);
        AssertJUnit.assertNull(this.remoteCache.get("expiredUser1"));
        AssertJUnit.assertNull(this.remoteCache.get("expiredUser2"));
        expectElementsInQueue(linkedBlockingQueue, 0);
        expectElementsInQueue(linkedBlockingQueue2, 2);
        continuousQuery.removeContinuousQueryListener(continuousQueryListener);
        userPB2.setAge(22);
        this.remoteCache.put("user" + userPB2.getId(), userPB2);
        expectElementsInQueue(linkedBlockingQueue, 0);
        expectElementsInQueue(linkedBlockingQueue2, 0);
    }

    public void testContinuousQueryChangingParameter() throws InterruptedException {
        UserPB userPB = new UserPB();
        userPB.setId(1);
        userPB.setName("John");
        userPB.setSurname("Doe");
        userPB.setGender(User.Gender.MALE);
        userPB.setAge(22);
        userPB.setAccountIds(new HashSet(Arrays.asList(1, 2)));
        userPB.setNotes("Lorem ipsum dolor sit amet");
        UserPB userPB2 = new UserPB();
        userPB2.setId(2);
        userPB2.setName("Spider");
        userPB2.setSurname("Man");
        userPB2.setGender(User.Gender.MALE);
        userPB2.setAge(32);
        userPB2.setAccountIds(Collections.singleton(3));
        UserPB userPB3 = new UserPB();
        userPB3.setId(3);
        userPB3.setName("Spider");
        userPB3.setSurname("Woman");
        userPB3.setGender(User.Gender.FEMALE);
        userPB3.setAge(40);
        this.remoteCache.clear();
        this.remoteCache.put("user" + userPB.getId(), userPB);
        this.remoteCache.put("user" + userPB2.getId(), userPB2);
        this.remoteCache.put("user" + userPB3.getId(), userPB3);
        AssertJUnit.assertEquals(3, this.remoteCache.size());
        Query parameter = Search.getQueryFactory(this.remoteCache).create("SELECT age FROM sample_bank_account.User WHERE age <= :ageParam").setParameter("ageParam", 32);
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        final LinkedBlockingQueue linkedBlockingQueue2 = new LinkedBlockingQueue();
        ContinuousQueryListener<String, Object[]> continuousQueryListener = new ContinuousQueryListener<String, Object[]>() { // from class: org.infinispan.client.hotrod.event.ContinuousQueryObjectStorageTest.4
            public void resultJoining(String str, Object[] objArr) {
                linkedBlockingQueue.add(new KeyValuePair(str, objArr));
            }

            public void resultLeaving(String str) {
                linkedBlockingQueue2.add(str);
            }
        };
        ContinuousQuery continuousQuery = Search.getContinuousQuery(this.remoteCache);
        continuousQuery.addContinuousQueryListener(parameter, continuousQueryListener);
        expectElementsInQueue(linkedBlockingQueue, 2, keyValuePair -> {
            return ((Object[]) keyValuePair.getValue())[0];
        }, 32, 22);
        expectElementsInQueue(linkedBlockingQueue2, 0);
        expectNoMoreElementsInQueues(linkedBlockingQueue, linkedBlockingQueue2);
        linkedBlockingQueue.clear();
        linkedBlockingQueue2.clear();
        continuousQuery.removeContinuousQueryListener(continuousQueryListener);
        parameter.setParameter("ageParam", 40);
        ContinuousQueryListener<String, Object[]> continuousQueryListener2 = new ContinuousQueryListener<String, Object[]>() { // from class: org.infinispan.client.hotrod.event.ContinuousQueryObjectStorageTest.5
            public void resultJoining(String str, Object[] objArr) {
                linkedBlockingQueue.add(new KeyValuePair(str, objArr));
            }

            public void resultLeaving(String str) {
                linkedBlockingQueue2.add(str);
            }
        };
        continuousQuery.addContinuousQueryListener(parameter, continuousQueryListener2);
        expectElementsInQueue(linkedBlockingQueue, 3);
        expectElementsInQueue(linkedBlockingQueue2, 0);
        continuousQuery.removeContinuousQueryListener(continuousQueryListener2);
    }

    private <T> void expectElementsInQueue(BlockingQueue<T> blockingQueue, int i) {
        expectElementsInQueue(blockingQueue, i, null, new Object[0]);
    }

    private <T, R> void expectElementsInQueue(BlockingQueue<T> blockingQueue, int i, Function<T, R> function, Object... objArr) {
        ArrayList arrayList;
        if (objArr.length == 0) {
            arrayList = null;
        } else {
            if (objArr.length != i) {
                throw new IllegalArgumentException("The number of expected values must either match the number of expected elements or no expected values should be specified.");
            }
            arrayList = new ArrayList(objArr.length);
            Collections.addAll(arrayList, objArr);
        }
        for (int i2 = 0; i2 < i; i2++) {
            try {
                T poll = blockingQueue.poll(5L, TimeUnit.SECONDS);
                AssertJUnit.assertNotNull("Queue was empty after reading " + i2 + " elements!", poll);
                if (arrayList != null) {
                    Object apply = function != null ? function.apply(poll) : poll;
                    AssertJUnit.assertTrue("Expectation failed on element number " + i2 + ", unexpected value: " + apply, arrayList.remove(apply));
                }
            } catch (InterruptedException e) {
                throw new AssertionError("Interrupted while waiting for condition", e);
            }
        }
    }

    private void expectNoMoreElementsInQueues(BlockingQueue<?>... blockingQueueArr) {
        TestingUtil.sleepThread(100L);
        for (BlockingQueue<?> blockingQueue : blockingQueueArr) {
            AssertJUnit.assertNull("No more elements expected in queue!", blockingQueue.poll());
        }
    }
}
