package org.infinispan.client.hotrod.event;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.Search;
import org.infinispan.client.hotrod.marshall.ProtoStreamMarshaller;
import org.infinispan.client.hotrod.query.testdomain.protobuf.UserPB;
import org.infinispan.client.hotrod.query.testdomain.protobuf.marshallers.MarshallerRegistration;
import org.infinispan.client.hotrod.test.InternalRemoteCacheManager;
import org.infinispan.client.hotrod.test.MultiHotRodServersTest;
import org.infinispan.commons.time.TimeService;
import org.infinispan.commons.util.Util;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.Index;
import org.infinispan.query.api.continuous.ContinuousQuery;
import org.infinispan.query.api.continuous.ContinuousQueryListener;
import org.infinispan.query.dsl.Expression;
import org.infinispan.query.dsl.Query;
import org.infinispan.query.dsl.embedded.testdomain.User;
import org.infinispan.query.remote.impl.filter.IckleContinuousQueryProtobufCacheEventFilterConverterFactory;
import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.infinispan.test.TestingUtil;
import org.infinispan.util.ControlledTimeService;
import org.infinispan.util.KeyValuePair;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "client.hotrod.event.RemoteContinuousQueryLeavingRemoteCacheManagerTest")
/* loaded from: input_file:org/infinispan/client/hotrod/event/RemoteContinuousQueryLeavingRemoteCacheManagerTest.class */
public class RemoteContinuousQueryLeavingRemoteCacheManagerTest extends MultiHotRodServersTest {
    private RemoteCache<String, User> remoteCache;
    private final int NUM_NODES = 1;
    private ControlledTimeService timeService = new ControlledTimeService();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/infinispan/client/hotrod/event/RemoteContinuousQueryLeavingRemoteCacheManagerTest$Listener.class */
    public static class Listener implements ContinuousQueryListener<String, User> {
        final BlockingQueue<KeyValuePair<String, User>> joined = new LinkedBlockingQueue();
        final BlockingQueue<KeyValuePair<String, User>> updated = new LinkedBlockingQueue();
        final BlockingQueue<String> left = new LinkedBlockingQueue();

        Listener() {
        }

        public void resultJoining(String str, User user) {
            this.joined.add(new KeyValuePair<>(str, user));
        }

        public void resultUpdated(String str, User user) {
            this.updated.add(new KeyValuePair<>(str, user));
        }

        public void resultLeaving(String str) {
            this.left.add(str);
        }
    }

    protected void createCacheManagers() throws Throwable {
        createHotRodServers(1, getConfigurationBuilder());
        waitForClusterToForm();
        IckleContinuousQueryProtobufCacheEventFilterConverterFactory ickleContinuousQueryProtobufCacheEventFilterConverterFactory = new IckleContinuousQueryProtobufCacheEventFilterConverterFactory();
        for (int i = 0; i < 1; i++) {
            server(i).addCacheEventFilterConverterFactory("continuous-query-filter-converter-factory", ickleContinuousQueryProtobufCacheEventFilterConverterFactory);
            TestingUtil.replaceComponent(server(i).getCacheManager(), TimeService.class, this.timeService, true);
        }
        this.remoteCache = client(0).getCache();
        RemoteCache cache = client(0).getCache("___protobuf_metadata");
        cache.put("sample_bank_account/bank.proto", Util.getResourceAsString(MarshallerRegistration.PROTOBUF_RES, getClass().getClassLoader()));
        AssertJUnit.assertFalse(cache.containsKey(".errors"));
        MarshallerRegistration.registerMarshallers(ProtoStreamMarshaller.getSerializationContext(client(0)));
    }

    protected ConfigurationBuilder getConfigurationBuilder() {
        ConfigurationBuilder hotRodCacheConfiguration = HotRodTestingUtil.hotRodCacheConfiguration(getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false));
        hotRodCacheConfiguration.indexing().index(Index.ALL).addProperty("default.directory_provider", "local-heap").addProperty("lucene_version", "LUCENE_CURRENT");
        hotRodCacheConfiguration.expiration().disableReaper();
        return hotRodCacheConfiguration;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.client.hotrod.test.MultiHotRodServersTest
    public org.infinispan.client.hotrod.configuration.ConfigurationBuilder createHotRodClientConfigurationBuilder(int i) {
        return super.createHotRodClientConfigurationBuilder(i).marshaller(new ProtoStreamMarshaller());
    }

    private Listener applyContinuousQuery(RemoteCache<String, User> remoteCache) {
        Query parameter = Search.getQueryFactory(remoteCache).from(UserPB.class).having("age").lte(Expression.param("ageParam")).build().setParameter("ageParam", 32);
        ContinuousQuery continuousQuery = Search.getContinuousQuery(remoteCache);
        Listener listener = new Listener();
        continuousQuery.addContinuousQueryListener(parameter, listener);
        return listener;
    }

    public void testContinuousQueryRemoveRCM() throws IOException {
        InternalRemoteCacheManager internalRemoteCacheManager = new InternalRemoteCacheManager(createHotRodClientConfigurationBuilder(server(0).getPort().intValue()).build());
        MarshallerRegistration.registerMarshallers(ProtoStreamMarshaller.getSerializationContext(internalRemoteCacheManager));
        RemoteCache<String, User> cache = internalRemoteCacheManager.getCache();
        UserPB userPB = new UserPB();
        userPB.setId(1);
        userPB.setName("John");
        userPB.setSurname("Doe");
        userPB.setGender(User.Gender.MALE);
        userPB.setAge(22);
        userPB.setAccountIds(new HashSet(Arrays.asList(1, 2)));
        userPB.setNotes("Lorem ipsum dolor sit amet");
        this.remoteCache.put("user" + userPB.getId(), userPB);
        Listener applyContinuousQuery = applyContinuousQuery(this.remoteCache);
        Listener applyContinuousQuery2 = applyContinuousQuery(cache);
        expectElementsInQueue(applyContinuousQuery.joined, 1, keyValuePair -> {
            return ((User) keyValuePair.getValue()).getAge();
        }, 22);
        expectElementsInQueue(applyContinuousQuery2.joined, 1, keyValuePair2 -> {
            return ((User) keyValuePair2.getValue()).getAge();
        }, 22);
        expectElementsInQueue(applyContinuousQuery.updated, 0);
        expectElementsInQueue(applyContinuousQuery2.updated, 0);
        expectElementsInQueue(applyContinuousQuery.left, 0);
        expectElementsInQueue(applyContinuousQuery2.left, 0);
        internalRemoteCacheManager.stop();
        userPB.setAge(23);
        this.remoteCache.put("user" + userPB.getId(), userPB);
        expectElementsInQueue(applyContinuousQuery.joined, 0);
        expectElementsInQueue(applyContinuousQuery.updated, 1, keyValuePair3 -> {
            return ((User) keyValuePair3.getValue()).getAge();
        }, 23);
        expectElementsInQueue(applyContinuousQuery.left, 0);
    }

    private <T> void expectElementsInQueue(BlockingQueue<T> blockingQueue, int i) {
        expectElementsInQueue(blockingQueue, i, null, new Object[0]);
    }

    private <T, R> void expectElementsInQueue(BlockingQueue<T> blockingQueue, int i, Function<T, R> function, Object... objArr) {
        ArrayList arrayList;
        if (objArr.length == 0) {
            arrayList = null;
        } else {
            if (objArr.length != i) {
                throw new IllegalArgumentException("The number of expected values must either match the number of expected elements or no expected values should be specified.");
            }
            arrayList = new ArrayList(objArr.length);
            Collections.addAll(arrayList, objArr);
        }
        for (int i2 = 0; i2 < i; i2++) {
            try {
                T poll = blockingQueue.poll(5L, TimeUnit.SECONDS);
                AssertJUnit.assertNotNull("Queue was empty after reading " + i2 + " elements!", poll);
                if (arrayList != null) {
                    Object apply = function != null ? function.apply(poll) : poll;
                    AssertJUnit.assertTrue("Expectation failed on element number " + i2 + ", unexpected value: " + apply, arrayList.remove(apply));
                }
            } catch (InterruptedException e) {
                throw new AssertionError("Interrupted while waiting for condition", e);
            }
        }
        try {
            AssertJUnit.assertNull("No more elements expected in queue!", blockingQueue.poll(100L, TimeUnit.MILLISECONDS));
        } catch (InterruptedException e2) {
            throw new AssertionError("Interrupted while waiting for condition", e2);
        }
    }
}
