/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.query.affinity;

import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.container.DataContainer;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.lucene.IndexScopedKey;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.query.CacheQuery;
import org.infinispan.query.Search;
import org.infinispan.query.affinity.BaseAffinityTest;
import org.infinispan.query.affinity.Entity;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="query.AffinityTest")
@CleanupAfterMethod
public class AffinityTest
extends BaseAffinityTest {
    private ExecutorService executorService;

    protected void createCacheManagers() throws Throwable {
        this.createClusteredCaches(3, this.getDefaultCacheConfigBuilder());
    }

    @Override
    @AfterMethod(alwaysRun=true)
    protected void clearContent() throws Throwable {
        if (this.executorService != null) {
            this.executorService.shutdownNow();
            this.executorService = null;
        }
        super.clearContent();
    }

    public void testConcurrentWrites() throws Exception {
        int numThreads = 2;
        AtomicInteger counter = new AtomicInteger(0);
        this.executorService = Executors.newFixedThreadPool(numThreads);
        List cacheList = this.caches();
        this.log.info((Object)"Starting threads");
        List futures = IntStream.rangeClosed(1, numThreads).boxed().map(tid -> this.executorService.submit(() -> IntStream.rangeClosed(1, this.getNumEntries()).forEach(entry -> {
            int id = counter.incrementAndGet();
            this.pickCache().put((Object)String.valueOf(id), (Object)new Entity(id));
        }))).collect(Collectors.toList());
        this.log.info((Object)"Waiting for threads");
        for (Future f : futures) {
            f.get();
        }
        this.log.info((Object)"Checking cache size");
        cacheList.forEach(c -> {
            Assert.assertEquals((int)c.size(), (int)(numThreads * this.getNumEntries()));
            CacheQuery q = Search.getSearchManager((Cache)c).getQuery((Query)new MatchAllDocsQuery(), new Class[]{Entity.class});
            this.eventuallyEquals(numThreads * this.getNumEntries(), () -> q.list().size());
        });
    }

    public void shouldHaveIndexAffinity() {
        this.populate(1, this.getNumEntries() / 2);
        this.checkAffinity();
        this.addNode();
        this.populate(this.getNumEntries() / 2 + 1, this.getNumEntries());
        this.checkAffinity();
        CacheQuery q = Search.getSearchManager(this.pickCache()).getQuery((Query)new MatchAllDocsQuery(), new Class[]{Entity.class});
        Assert.assertEquals((int)this.getNumEntries(), (int)this.pickCache().size());
        Assert.assertEquals((int)this.getNumEntries(), (int)q.list().size());
        this.addNode();
        this.checkAffinity();
        Assert.assertEquals((int)this.getNumEntries(), (int)this.pickCache().size());
        this.populate(this.getNumEntries() + 1, this.getNumEntries() * 2);
        this.checkAffinity();
        Assert.assertEquals((int)(this.getNumEntries() * 2), (int)q.list().size());
    }

    void checkAffinity() {
        for (EmbeddedCacheManager clusterMember : this.cacheManagers) {
            this.checkAffinity(clusterMember.getCache("LuceneIndexesData"));
            this.checkAffinity(clusterMember.getCache("LuceneIndexesMetadata"));
            this.checkAffinity(clusterMember.getCache("LuceneIndexesLocking"));
        }
    }

    private void checkAffinity(Cache<IndexScopedKey, ?> indexCache) {
        AdvancedCache advancedCache = indexCache.getAdvancedCache();
        DataContainer dataContainer = advancedCache.getDataContainer();
        ConsistentHash consistentHash = advancedCache.getDistributionManager().getConsistentHash();
        Address address = advancedCache.getRpcManager().getAddress();
        Set ownedSegments = consistentHash.getSegmentsForOwner(address);
        dataContainer.forEach(entry -> {
            int segmentAffinity = ((IndexScopedKey)entry.getKey()).getAffinitySegmentId();
            Assert.assertTrue((boolean)ownedSegments.contains(segmentAffinity));
        });
    }
}

