package org.infinispan.query.affinity;

import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.infinispan.Cache;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.query.CacheQuery;
import org.infinispan.query.Search;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.test.fwk.TransportFlags;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"stress"}, testName = "query.AffinityTopologyChangeTest")
/* loaded from: input_file:org/infinispan/query/affinity/AffinityTopologyChangeTest.class */
public class AffinityTopologyChangeTest extends BaseAffinityTest {
    private static final int THREADS_PER_NODE = 3;
    private int ENTRIES = 500;
    private AtomicInteger globalCounter = new AtomicInteger(0);
    private Node indexing1;
    private Node indexing2;
    private Node indexing3;
    private Node querying;
    private static final Function<Object, Integer> EXTRACT_PROJECTION = obj -> {
        return (Integer) ((Object[]) obj)[0];
    };

    /* loaded from: input_file:org/infinispan/query/affinity/AffinityTopologyChangeTest$IndexingNode.class */
    private class IndexingNode extends TaskNode {
        private IndexingNode() {
            super(AffinityTopologyChangeTest.this);
        }

        @Override // org.infinispan.query.affinity.AffinityTopologyChangeTest.TaskNode
        void executeTask() {
            int i = 0;
            while (i <= AffinityTopologyChangeTest.this.ENTRIES) {
                i = AffinityTopologyChangeTest.this.globalCounter.incrementAndGet();
                if (i <= AffinityTopologyChangeTest.this.ENTRIES) {
                    this.cache.put(String.valueOf(i), new Entity(i));
                    System.out.printf("Put %d\n", Integer.valueOf(i));
                }
            }
        }
    }

    /* loaded from: input_file:org/infinispan/query/affinity/AffinityTopologyChangeTest$Node.class */
    abstract class Node {
        protected EmbeddedCacheManager cacheManager;
        protected Cache<String, Entity> cache;

        Node() {
        }

        Node addToCluster() {
            this.cacheManager = AffinityTopologyChangeTest.this.addClusterEnabledCacheManager(AffinityTopologyChangeTest.this.getCacheConfig());
            this.cache = this.cacheManager.getCache();
            return this;
        }

        void kill() {
            TestingUtil.killCacheManagers(new EmbeddedCacheManager[]{this.cacheManager});
            AffinityTopologyChangeTest.this.cacheManagers.remove(this.cacheManager);
        }

        abstract CompletableFuture<Void> run();
    }

    /* loaded from: input_file:org/infinispan/query/affinity/AffinityTopologyChangeTest$QueryingNode.class */
    private class QueryingNode extends TaskNode {
        static final int QUERY_INTERVAL_MS = 500;

        QueryingNode() {
            super(1);
        }

        @Override // org.infinispan.query.affinity.AffinityTopologyChangeTest.TaskNode
        void executeTask() {
            int i = AffinityTopologyChangeTest.this.globalCounter.get();
            CacheQuery query = Search.getSearchManager(this.cache).getQuery(new MatchAllDocsQuery(), new Class[]{Entity.class});
            while (i <= AffinityTopologyChangeTest.this.ENTRIES) {
                int size = query.list().size();
                AssertJUnit.assertTrue(size > 0 && size <= AffinityTopologyChangeTest.this.ENTRIES);
                i = AffinityTopologyChangeTest.this.globalCounter.get();
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                }
            }
        }
    }

    /* loaded from: input_file:org/infinispan/query/affinity/AffinityTopologyChangeTest$TaskNode.class */
    abstract class TaskNode extends Node {
        private final ExecutorService executorService;

        TaskNode(AffinityTopologyChangeTest affinityTopologyChangeTest) {
            this(AffinityTopologyChangeTest.THREADS_PER_NODE);
        }

        TaskNode(int i) {
            super();
            this.executorService = Executors.newFixedThreadPool(i);
        }

        @Override // org.infinispan.query.affinity.AffinityTopologyChangeTest.Node
        void kill() {
            this.executorService.shutdownNow();
            super.kill();
        }

        abstract void executeTask();

        @Override // org.infinispan.query.affinity.AffinityTopologyChangeTest.Node
        CompletableFuture<Void> run() {
            return CompletableFuture.allOf((CompletableFuture[]) ((List) IntStream.range(0, AffinityTopologyChangeTest.THREADS_PER_NODE).boxed().map(num -> {
                return CompletableFuture.supplyAsync(() -> {
                    executeTask();
                    return null;
                }, this.executorService);
            }).collect(Collectors.toList())).toArray(new CompletableFuture[AffinityTopologyChangeTest.THREADS_PER_NODE]));
        }
    }

    @BeforeMethod
    public void prepare() {
        this.indexing1 = new IndexingNode();
        this.indexing2 = new IndexingNode();
        this.indexing3 = new IndexingNode();
        this.querying = new QueryingNode();
    }

    @AfterMethod
    public void after() {
        this.indexing3.kill();
        this.querying.kill();
        this.indexing2.kill();
        this.indexing1.kill();
    }

    @Test
    public void testReadWriteUnderTopologyChanges() throws Exception {
        CompletableFuture<Void> run = this.indexing1.addToCluster().run();
        CompletableFuture<Void> run2 = this.indexing2.addToCluster().run();
        eventuallyEquals(2, () -> {
            return Integer.valueOf(this.indexing2.cacheManager.getMembers().size());
        });
        CompletableFuture.allOf(run, run2, this.indexing3.addToCluster().run(), this.querying.addToCluster().run()).join();
        eventually(() -> {
            Set set = (Set) Search.getSearchManager(pickCache()).getQuery(new MatchAllDocsQuery(), new Class[0]).projection(new String[]{"val"}).list().stream().map(EXTRACT_PROJECTION).collect(Collectors.toSet());
            int size = set.size();
            IntStream.rangeClosed(1, this.ENTRIES).boxed().filter(num -> {
                return !set.contains(num);
            }).forEach(num2 -> {
                System.out.println("Missing id: " + num2);
            });
            System.out.println("resultSize=" + size + ", ENTRIES=" + this.ENTRIES);
            return size == this.ENTRIES;
        });
    }

    protected EmbeddedCacheManager addClusterEnabledCacheManager(ConfigurationBuilder configurationBuilder, TransportFlags transportFlags) {
        GlobalConfigurationBuilder defaultClusteredBuilder = GlobalConfigurationBuilder.defaultClusteredBuilder();
        TestCacheManagerFactory.amendTransport(defaultClusteredBuilder);
        EmbeddedCacheManager newDefaultCacheManager = TestCacheManagerFactory.newDefaultCacheManager(true, defaultClusteredBuilder, configurationBuilder, false);
        Configuration dataCacheConfig = getDataCacheConfig();
        newDefaultCacheManager.defineConfiguration("LuceneIndexesLocking", getLockCacheConfig());
        newDefaultCacheManager.defineConfiguration("LuceneIndexesMetadata", dataCacheConfig);
        newDefaultCacheManager.defineConfiguration("LuceneIndexesData", dataCacheConfig);
        this.cacheManagers.add(newDefaultCacheManager);
        return newDefaultCacheManager;
    }

    @Override // org.infinispan.query.affinity.BaseAffinityTest
    protected void createCacheManagers() throws Throwable {
    }
}
