/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.query.affinity;

import java.io.Serializable;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.hibernate.search.annotations.Field;
import org.hibernate.search.annotations.Indexed;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.Index;
import org.infinispan.container.DataContainer;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.distribution.ch.impl.AffinityPartitioner;
import org.infinispan.lucene.IndexScopedKey;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.query.CacheQuery;
import org.infinispan.query.Search;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.MultipleCacheManagersTest;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="query.AffinityTest")
public class AffinityTest
extends MultipleCacheManagersTest {
    private static final int ENTRIES = 50;
    private ConfigurationBuilder cacheCfg;
    private Random random = new Random();

    protected void createCacheManagers() throws Throwable {
        this.cacheCfg = AffinityTest.getDefaultClusteredCacheConfig((CacheMode)CacheMode.DIST_SYNC, (boolean)false);
        this.cacheCfg.clustering().hash().numSegments(60).numOwners(2).keyPartitioner((KeyPartitioner)new AffinityPartitioner());
        this.cacheCfg.indexing().index(Index.ALL).addIndexedEntity(Entity.class).addProperty("hibernate.search.default.directory_provider", "infinispan").addProperty("hibernate.search.lucene_version", "LUCENE_CURRENT").addProperty("entity.indexmanager", "org.infinispan.query.affinity.ShardIndexManager");
        this.createClusteredCaches(3, this.cacheCfg);
    }

    public void testConcurrentWrites() throws InterruptedException {
        int numThreads = 2;
        AtomicInteger counter = new AtomicInteger(0);
        ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
        List cacheList = this.caches();
        List<Future> futures = IntStream.rangeClosed(1, numThreads).boxed().map(tid -> executorService.submit(() -> IntStream.rangeClosed(1, 50).boxed().forEach(entry -> {
            int id = counter.incrementAndGet();
            this.pickCache(cacheList).put((Object)String.valueOf(id), (Object)new Entity(id));
        }))).collect(Collectors.toList());
        futures.forEach(f -> {
            try {
                f.get();
            }
            catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
        Assert.assertEquals((int)this.pickCache(cacheList).size(), (int)(numThreads * 50));
        cacheList.forEach(c -> {
            CacheQuery q = Search.getSearchManager((Cache)c).getQuery((Query)new MatchAllDocsQuery(), new Class[]{Entity.class});
            this.eventually(() -> q.list().size() == numThreads * 50);
        });
    }

    public Cache<String, Entity> pickCache(List<Cache<String, Entity>> caches) {
        return caches.get(this.random.nextInt(caches.size() - 1));
    }

    private void populate(int initialId, int finalId, List<Cache<String, Entity>> caches) {
        IntStream.rangeClosed(initialId, finalId).boxed().forEach(i -> {
            Entity cfr_ignored_0 = (Entity)this.pickCache(caches).put((Object)i.toString(), (Object)new Entity((int)i));
        });
    }

    private void addNode() {
        this.addClusterEnabledCacheManager(this.cacheCfg);
        this.waitForClusterToForm();
    }

    public void shouldHaveIndexAffinity() throws Exception {
        List initialCaches = this.caches();
        this.populate(1, 25, initialCaches);
        this.checkAffinity();
        this.addNode();
        List currentCaches = this.caches();
        this.populate(26, 50, currentCaches);
        this.checkAffinity();
        CacheQuery q = Search.getSearchManager(this.pickCache(currentCaches)).getQuery((Query)new MatchAllDocsQuery(), new Class[]{Entity.class});
        Assert.assertEquals((int)50, (int)this.pickCache(currentCaches).size());
        Assert.assertEquals((int)50, (int)q.list().size());
        this.addNode();
        this.checkAffinity();
        Assert.assertEquals((int)50, (int)this.pickCache(this.caches()).size());
        this.populate(51, 100, currentCaches);
        this.checkAffinity();
        Assert.assertEquals((int)100, (int)q.list().size());
    }

    private void checkAffinity() {
        for (EmbeddedCacheManager clusterMember : this.cacheManagers) {
            this.checkAffinity(clusterMember.getCache("LuceneIndexesData"));
            this.checkAffinity(clusterMember.getCache("LuceneIndexesMetadata"));
            this.checkAffinity(clusterMember.getCache("LuceneIndexesLocking"));
        }
    }

    private void checkAffinity(Cache<IndexScopedKey, ?> indexCache) {
        AdvancedCache advancedCache = indexCache.getAdvancedCache();
        DataContainer dataContainer = advancedCache.getDataContainer();
        ConsistentHash consistentHash = advancedCache.getDistributionManager().getConsistentHash();
        Address address = advancedCache.getRpcManager().getAddress();
        Set ownedSegments = consistentHash.getSegmentsForOwner(address);
        dataContainer.forEach(entry -> {
            int segmentAffinity = ((IndexScopedKey)entry.getKey()).getAffinitySegmentId();
            Assert.assertTrue((boolean)ownedSegments.contains(segmentAffinity));
        });
    }

    @AfterMethod
    protected void clearContent() throws Throwable {
        ((EmbeddedCacheManager)this.cacheManagers.get(0)).getCache().clear();
    }

    @Indexed(index="entity")
    static class Entity
    implements Serializable {
        @Field
        private final int val;

        public Entity(int val) {
            this.val = val;
        }
    }
}

