/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.query.distributed;

import java.util.Collections;
import java.util.Scanner;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.hibernate.search.spi.impl.PojoIndexedTypeIdentifier;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commons.test.TestResourceTracker;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.context.Flag;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.query.Indexer;
import org.infinispan.query.Search;
import org.infinispan.query.dsl.Query;
import org.infinispan.query.helper.StaticTestingErrorHandler;
import org.infinispan.query.impl.ComponentRegistryUtils;
import org.infinispan.query.impl.massindex.IndexUpdater;
import org.infinispan.query.test.QueryTestSCI;
import org.infinispan.query.test.Transaction;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.fwk.TestCacheManagerFactory;

public class AsyncMassIndexPerfTest
extends MultipleCacheManagersTest {
    private static final int OBJECT_COUNT = 1000000;
    private static final int WRITING_THREADS = 5;
    private static final boolean DISABLE_INDEX_WHEN_INSERTING = true;
    private static final boolean TX_ENABLED = false;
    private static final String MERGE_FACTOR = "30";
    private static final CacheMode CACHE_MODE = CacheMode.LOCAL;
    private static final IndexManager INDEX_MANAGER = IndexManager.DIRECTORY;
    private static final Provider DIRECTORY_PROVIDER = Provider.LOCAL_HEAP;
    private static final WorkerMode WORKER_MODE = WorkerMode.sync;
    private static final int PRINT_EACH = 10;
    private Cache<Integer, Transaction> cache1;
    private Cache<Integer, Transaction> cache2;
    private Indexer indexer;

    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder cacheCfg;
        boolean local;
        boolean bl = local = CACHE_MODE == CacheMode.LOCAL;
        if (local) {
            cacheCfg = TestCacheManagerFactory.getDefaultCacheConfiguration((boolean)false);
        } else {
            cacheCfg = AsyncMassIndexPerfTest.getDefaultClusteredCacheConfig((CacheMode)CACHE_MODE, (boolean)false);
            cacheCfg.clustering().remoteTimeout(120000L);
        }
        cacheCfg.indexing().enable().addIndexedEntity(Transaction.class).addProperty("default.directory_provider", DIRECTORY_PROVIDER.toString()).addProperty("default.indexmanager", INDEX_MANAGER.toString()).addProperty("default.indexwriter.merge_factor", MERGE_FACTOR).addProperty("hibernate.search.default.worker.execution", WORKER_MODE.toString()).addProperty("error_handler", StaticTestingErrorHandler.class.getName()).addProperty("lucene_version", "LUCENE_CURRENT");
        if (!local) {
            this.createClusteredCaches(2, QueryTestSCI.INSTANCE, cacheCfg);
            this.cache2 = this.cache(1);
            this.cache1 = this.cache(0);
        } else {
            EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createCacheManager((ConfigurationBuilder)cacheCfg);
            this.cache1 = cacheManager.getCache();
            this.cache2 = cacheManager.getCache();
        }
        this.indexer = Search.getIndexer(this.cache1);
    }

    private void writeData() throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(5, this.getTestThreadFactory("Worker"));
        AtomicInteger counter = new AtomicInteger(0);
        for (int i = 0; i < 1000000; ++i) {
            executorService.submit(() -> {
                AdvancedCache insertCache = this.cache1;
                insertCache = insertCache.getAdvancedCache().withFlags(Flag.SKIP_INDEXING);
                int key = counter.incrementAndGet();
                insertCache.put((Object)key, (Object)new Transaction(key * 100, "0eab" + key));
                if (key != 0 && key % 10 == 0) {
                    System.out.printf("\rInserted %d", key);
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(3L, TimeUnit.MINUTES);
        System.out.println();
    }

    private void waitForIndexSize(int expected) {
        this.eventually(() -> {
            long idxCount = this.countIndex();
            System.out.printf("\rWaiting for indexing completion (%d): %d indexed so far", expected, idxCount);
            return idxCount == (long)expected;
        });
        System.out.println("\nIndexing done.");
    }

    public static void main(String[] args) throws Throwable {
        AsyncMassIndexPerfTest test = new AsyncMassIndexPerfTest();
        TestResourceTracker.testThreadStarted((String)test.getTestName());
        test.createBeforeClass();
        test.createBeforeMethod();
        test.populate();
    }

    public void populate() throws Exception {
        StopTimer stopTimer = new StopTimer();
        this.writeData();
        stopTimer.stop();
        System.out.printf("\rData inserted in %d seconds.", stopTimer.getElapsedIn(TimeUnit.SECONDS));
        this.info();
        new Thread(new EventLoop(this.indexer)).start();
    }

    private void info() {
        System.out.println("\rr: Run MassIndexer\nc: Cancel MassIndexer\ni: Put new entry\ns: Current index size\np: Purge indexes\nf: flush\nh: This menu\nx: Exit");
    }

    protected long countIndex() {
        Query q = Search.getQueryFactory(this.cache1).create("FROM " + Transaction.class.getName());
        return q.execute().hitCount().orElse(-1L);
    }

    protected void clearIndex() {
        Search.getIndexer(this.cache1).remove();
    }

    private void flushIndex() {
        IndexUpdater indexUpdater = new IndexUpdater(ComponentRegistryUtils.getSearchIntegrator(this.cache1), ComponentRegistryUtils.getKeyTransformationHandler(this.cache1), ComponentRegistryUtils.getTimeService(this.cache1));
        indexUpdater.flush(Collections.singleton(new PojoIndexedTypeIdentifier(Transaction.class)));
    }

    static class StopTimer {
        private long start = this.currentTime();
        private long elapsed;

        private long currentTime() {
            return System.currentTimeMillis();
        }

        public void reset() {
            this.start = this.currentTime();
        }

        public void stop() {
            this.elapsed = this.currentTime() - this.start;
        }

        public long getElapsedIn(TimeUnit unit) {
            return unit.convert(this.elapsed, TimeUnit.MILLISECONDS);
        }
    }

    class EventLoop
    implements Runnable {
        private Indexer indexer;
        private CompletionStage<Void> future;
        private AtomicInteger nexIndex = new AtomicInteger(1000000);

        public EventLoop(Indexer indexer) {
            this.indexer = indexer;
        }

        void startMassIndexer() {
            System.out.println("Running MassIndexer");
            StopTimer stopTimer = new StopTimer();
            this.future = this.indexer.run().toCompletableFuture();
            this.future.whenComplete((v, t) -> {
                stopTimer.stop();
                if (t != null) {
                    System.out.println("Error executing massindexer");
                    t.printStackTrace();
                }
                System.out.printf("\nMass indexer run in %d seconds", stopTimer.getElapsedIn(TimeUnit.SECONDS));
                System.out.println();
                AsyncMassIndexPerfTest.this.waitForIndexSize(this.nexIndex.get());
                System.out.println("Mass Indexing complete.");
            });
        }

        @Override
        public void run() {
            Scanner scanner = new Scanner(System.in);
            while (!Thread.interrupted()) {
                String next = scanner.next();
                if ("c".equals(next)) {
                    if (this.future == null) {
                        System.out.println("\rMassIndexer not started");
                        continue;
                    }
                    System.out.println("Mass Indexer cancelled");
                }
                if ("r".equals(next)) {
                    this.startMassIndexer();
                }
                if ("f".equals(next)) {
                    AsyncMassIndexPerfTest.this.flushIndex();
                    System.out.println("Index flushed.");
                }
                if ("s".equals(next)) {
                    System.out.printf("Index size is %d\n", AsyncMassIndexPerfTest.this.countIndex());
                }
                if ("p".equals(next)) {
                    AsyncMassIndexPerfTest.this.clearIndex();
                    System.out.println("Index cleared.");
                }
                if ("i".equals(next)) {
                    int nextIndex = this.nexIndex.incrementAndGet();
                    AsyncMassIndexPerfTest.this.cache2.put((Object)nextIndex, (Object)new Transaction(nextIndex, "0" + nextIndex));
                    System.out.println("New entry inserted");
                }
                if ("h".equals(next)) {
                    AsyncMassIndexPerfTest.this.info();
                }
                if (!"x".equals(next)) continue;
                System.exit(0);
            }
        }
    }

    private static enum IndexManager {
        NRT("near-real-time"),
        DIRECTORY("directory-based");

        private final String cfg;

        private IndexManager(String cfg) {
            this.cfg = cfg;
        }

        public String toString() {
            return this.cfg;
        }
    }

    private static enum WorkerMode {
        async,
        sync;

    }

    private static enum Provider {
        LOCAL_HEAP("local-heap"),
        FILESYSTEM("filesystem");

        private final String cfg;

        private Provider(String cfg) {
            this.cfg = cfg;
        }

        public String toString() {
            return this.cfg;
        }
    }
}

