/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.query.distributed;

import java.util.List;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commons.util.concurrent.FutureListener;
import org.infinispan.commons.util.concurrent.NotifyingFuture;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.Index;
import org.infinispan.context.Flag;
import org.infinispan.query.CacheQuery;
import org.infinispan.query.MassIndexer;
import org.infinispan.query.Search;
import org.infinispan.query.SearchManager;
import org.infinispan.query.distributed.Transaction;
import org.infinispan.query.impl.massindex.IndexUpdater;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.MultipleCacheManagersTest;

public class AsyncMassIndexPerfTest
extends MultipleCacheManagersTest {
    private static final int OBJECT_COUNT = 1000000;
    private static final int WRITING_THREADS = 5;
    private static final boolean DISABLE_INDEX_WHEN_INSERTING = true;
    private static final boolean TX_ENABLED = false;
    private static final String MERGE_FACTOR = "30";
    private static final CacheMode CACHE_MODE = CacheMode.DIST_SYNC;
    private static final IndexManager INDEX_MANAGER = IndexManager.INFINISPAN;
    private static final Provider DIRECTORY_PROVIDER = Provider.INFINISPAN;
    private static final WorkerMode WORKER_MODE = WorkerMode.sync;
    private static final int PRINT_EACH = 10;
    private Cache<Integer, Transaction> cache1;
    private Cache<Integer, Transaction> cache2;
    private MassIndexer massIndexer;

    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder cacheCfg = AsyncMassIndexPerfTest.getDefaultClusteredCacheConfig((CacheMode)CACHE_MODE, (boolean)false);
        cacheCfg.clustering().sync().replTimeout(120000L).indexing().index(Index.LOCAL).addIndexedEntity(Transaction.class).addProperty("default.directory_provider", DIRECTORY_PROVIDER.toString()).addProperty("default.indexmanager", INDEX_MANAGER.toString()).addProperty("default.indexwriter.merge_factor", MERGE_FACTOR).addProperty("hibernate.search.default.worker.execution", WORKER_MODE.toString()).addProperty("error_handler", "org.infinispan.query.helper.StaticTestingErrorHandler").addProperty("lucene_version", "LUCENE_CURRENT");
        List caches = this.createClusteredCaches(2, cacheCfg);
        this.cache1 = (Cache)caches.get(0);
        this.cache2 = (Cache)caches.get(1);
        this.massIndexer = Search.getSearchManager(this.cache1).getMassIndexer();
    }

    private void writeData() throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(5, this.getTestThreadFactory("Worker"));
        final AtomicInteger counter = new AtomicInteger(0);
        for (int i = 0; i < 1000000; ++i) {
            executorService.submit(new Runnable(){

                @Override
                public void run() {
                    int key = counter.incrementAndGet();
                    AdvancedCache insertCache = AsyncMassIndexPerfTest.this.cache1.getAdvancedCache().withFlags(new Flag[]{Flag.SKIP_INDEXING});
                    insertCache.put((Object)key, (Object)new Transaction(key * 100, "0eab" + key));
                    if (key != 0 && key % 10 == 0) {
                        System.out.printf("\rInserted %d", key);
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(3L, TimeUnit.MINUTES);
        System.out.println();
    }

    private void waitForIndexSize(final int expected) {
        this.eventually(new AbstractInfinispanTest.Condition(){

            public boolean isSatisfied() throws Exception {
                int idxCount = AsyncMassIndexPerfTest.this.countIndex();
                System.out.printf("\rWaiting for indexing completion: %d indexed so far", idxCount);
                return idxCount == expected;
            }
        });
        System.out.println("\nIndexing done.");
    }

    public static void main(String[] args) throws Throwable {
        AsyncMassIndexPerfTest test = new AsyncMassIndexPerfTest();
        test.createBeforeClass();
        test.createBeforeMethod();
        test.populate();
    }

    public void populate() throws Exception {
        StopTimer stopTimer = new StopTimer();
        this.writeData();
        stopTimer.stop();
        System.out.printf("\rData inserted in %d seconds.", stopTimer.getElapsedIn(TimeUnit.SECONDS));
        this.info();
        new Thread(new EventLoop(this.massIndexer)).start();
    }

    private void info() {
        System.out.println("\rTo run MassIndexer, press 'r'. To cancel press 'c'. To put new entry, press 'i'. For index size, type 's', 'p' to purge it, and 'f' to flush");
    }

    protected int countIndex() {
        SearchManager searchManager = Search.getSearchManager(this.cache1);
        CacheQuery q = searchManager.getQuery((Query)new MatchAllDocsQuery(), new Class[]{Transaction.class});
        return q.getResultSize();
    }

    protected void clearIndex() {
        SearchManager searchManager = Search.getSearchManager(this.cache1);
        searchManager.purge(Transaction.class);
    }

    private void flushIndex() {
        new IndexUpdater(this.cache1).flush(Transaction.class);
    }

    class StopTimer {
        private long start = this.currentTime();
        private long elapsed;

        private long currentTime() {
            return System.currentTimeMillis();
        }

        public void reset() {
            this.start = this.currentTime();
        }

        public void stop() {
            this.elapsed = this.currentTime() - this.start;
        }

        public long getElapsedIn(TimeUnit unit) {
            return unit.convert(this.elapsed, TimeUnit.MILLISECONDS);
        }
    }

    class EventLoop
    implements Runnable {
        private MassIndexer massIndexer;
        private NotifyingFuture<Void> future;
        private AtomicInteger nexIndex = new AtomicInteger(1000000);

        public EventLoop(MassIndexer massIndexer) {
            this.massIndexer = massIndexer;
        }

        void startMassIndexer() {
            System.out.println("Running MassIndexer");
            final StopTimer stopTimer = new StopTimer();
            this.future = this.massIndexer.startAsync();
            this.future.attachListener((FutureListener)new FutureListener<Void>(){

                public void futureDone(Future<Void> future) {
                    stopTimer.stop();
                    try {
                        future.get();
                        System.out.printf("\nMass indexer run in %d seconds", stopTimer.getElapsedIn(TimeUnit.SECONDS));
                        System.out.println();
                        AsyncMassIndexPerfTest.this.waitForIndexSize(EventLoop.this.nexIndex.get());
                        System.out.println("Mass Indexing complete.");
                    }
                    catch (Exception e) {
                        System.out.println("Error executing massindexer");
                        e.printStackTrace();
                    }
                }
            });
        }

        @Override
        public void run() {
            Scanner scanner = new Scanner(System.in);
            while (!Thread.interrupted()) {
                String next = scanner.next();
                if ("c".equals(next)) {
                    if (this.future == null) {
                        System.out.println("\rMassIndexer not started");
                        continue;
                    }
                    this.future.cancel(true);
                    System.out.println("Mass Indexer cancelled");
                }
                if ("r".equals(next)) {
                    this.startMassIndexer();
                }
                if ("f".equals(next)) {
                    AsyncMassIndexPerfTest.this.flushIndex();
                    System.out.println("Index flushed.");
                }
                if ("s".equals(next)) {
                    System.out.printf("Index size is %d\n", AsyncMassIndexPerfTest.this.countIndex());
                }
                if ("p".equals(next)) {
                    AsyncMassIndexPerfTest.this.clearIndex();
                    System.out.println("Index cleared.");
                }
                if ("i".equals(next)) {
                    int nextIndex = this.nexIndex.incrementAndGet();
                    AsyncMassIndexPerfTest.this.cache2.put((Object)nextIndex, (Object)new Transaction(nextIndex, "0" + nextIndex));
                    System.out.println("New entry inserted");
                }
                AsyncMassIndexPerfTest.this.info();
            }
        }
    }

    private static enum IndexManager {
        NRT("near-real-time"),
        INFINISPAN("org.infinispan.query.indexmanager.InfinispanIndexManager"),
        DIRECTORY("directory-based");

        private final String cfg;

        private IndexManager(String cfg) {
            this.cfg = cfg;
        }

        public String toString() {
            return this.cfg;
        }
    }

    private static enum WorkerMode {
        async,
        sync;

    }

    private static enum Provider {
        RAM("ram"),
        FILESYSTEM("filesystem"),
        INFINISPAN("infinispan");

        private final String cfg;

        private Provider(String cfg) {
            this.cfg = cfg;
        }

        public String toString() {
            return this.cfg;
        }
    }
}

