/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.query.backend;

import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.query.indexmanager.IndexUpdateCommand;
import org.infinispan.query.indexmanager.InfinispanIndexManager;
import org.infinispan.query.test.Person;
import org.infinispan.query.test.QueryTestSCI;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcManagerImpl;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.impl.VoidResponseCollector;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.MultiCacheManagerCallable;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="query.backend.AsyncBackendTest")
public class AsyncBackendTest
extends AbstractInfinispanTest {
    @Test
    public void testWithEnablingAsync() {
        this.test(this.getBaseConfigPlus("default.worker.execution", "async"), transport -> this.calledIndexAsynchronously(transport, "person"));
    }

    @Test
    public void testWithEnablingAsyncForDifferentIndex() {
        this.test(this.getBaseConfigPlus("cat.worker.execution", "async"), transport -> this.calledIndexSynchronously(transport, "person"));
    }

    @Test
    public void testWithDefaultSettings() {
        this.test(this.getBaseConfig(), transport -> this.calledIndexSynchronously(transport, "person"));
    }

    @Test
    public void testWithShardedIndex() {
        ConfigurationBuilder cfg = this.getBaseConfigPlus("default.sharding_strategy.nbr_of_shards", "2", "person.0.worker.execution", "async");
        this.test(cfg, transport -> {
            this.calledIndexAsynchronously(transport, "person.0");
            this.calledIndexSynchronously(transport, "person.1");
        });
    }

    @Test
    public void testOverridingDefault() {
        ConfigurationBuilder cfg = this.getBaseConfigPlus("default.worker.execution", "async", "person.worker.execution", "sync");
        this.test(cfg, transport -> this.calledIndexSynchronously(transport, "person"));
    }

    @Test
    public void testHierarchy() {
        ConfigurationBuilder cfg = this.getBaseConfigPlus("person.sharding_strategy.nbr_of_shards", "3", "default.worker.execution", "sync", "person.worker.execution", "async", "person.1.worker.execution", "sync");
        this.test(cfg, transport -> {
            this.calledIndexAsynchronously(transport, "person.0");
            this.calledIndexSynchronously(transport, "person.1");
            this.calledIndexAsynchronously(transport, "person.2");
        });
    }

    private ConfigurationBuilder getBaseConfig() {
        ConfigurationBuilder cfg = new ConfigurationBuilder();
        cfg.clustering().cacheMode(CacheMode.DIST_SYNC).indexing().enable().addIndexedEntity(Person.class).addProperty("default.indexmanager", InfinispanIndexManager.class.getName()).addProperty("lucene_version", "LUCENE_CURRENT");
        return cfg;
    }

    private ConfigurationBuilder getBaseConfigPlus(String ... props) {
        assert (props != null && props.length % 2 == 0);
        ConfigurationBuilder cfg = this.getBaseConfig();
        for (int i = 0; i < props.length; i += 2) {
            cfg.indexing().addProperty(props[i], props[i + 1]);
        }
        return cfg;
    }

    private void calledIndexSynchronously(Transport transport, String indexName) throws Exception {
        this.assertIndexCall(transport, indexName, true);
    }

    private void calledIndexAsynchronously(Transport transport, String indexName) throws Exception {
        this.assertIndexCall(transport, indexName, false);
    }

    private void assertIndexCall(Transport transport, String indexName, boolean sync) throws Exception {
        ArgumentCaptor argument = ArgumentCaptor.forClass(IndexUpdateCommand.class);
        if (sync) {
            ((Transport)Mockito.verify((Object)transport, (VerificationMode)Mockito.atLeastOnce())).invokeCommand((Address)ArgumentMatchers.any(Address.class), (ReplicableCommand)argument.capture(), (ResponseCollector)ArgumentMatchers.any(VoidResponseCollector.class), (DeliverOrder)ArgumentMatchers.eq((Object)DeliverOrder.NONE), ArgumentMatchers.anyLong(), (TimeUnit)((Object)ArgumentMatchers.any()));
        } else {
            ((Transport)Mockito.verify((Object)transport, (VerificationMode)Mockito.atLeastOnce())).sendTo((Address)ArgumentMatchers.any(Address.class), (ReplicableCommand)argument.capture(), (DeliverOrder)ArgumentMatchers.eq((Object)DeliverOrder.PER_SENDER));
        }
        boolean indexCalled = false;
        for (IndexUpdateCommand updateCommand : argument.getAllValues()) {
            indexCalled |= updateCommand.getIndexName().equals(indexName);
        }
        Assert.assertTrue((boolean)indexCalled);
    }

    private void test(ConfigurationBuilder cfg, final Assertion assertion) {
        TestingUtil.withCacheManagers((MultiCacheManagerCallable)new MultiCacheManagerCallable(new EmbeddedCacheManager[]{TestCacheManagerFactory.createClusteredCacheManager((SerializationContextInitializer)QueryTestSCI.INSTANCE, (ConfigurationBuilder)cfg), TestCacheManagerFactory.createClusteredCacheManager((SerializationContextInitializer)QueryTestSCI.INSTANCE, (ConfigurationBuilder)cfg)}){

            public void call() throws Exception {
                EmbeddedCacheManager slave = AsyncBackendTest.this.isMaster(this.cms[0].getCache()) ? this.cms[1] : this.cms[0];
                Transport wireTappedTransport = AsyncBackendTest.this.spyOnTransport(slave.getCache());
                for (int i = 0; i < 50; ++i) {
                    slave.getCache().put((Object)i, (Object)new Person("person", "blurb", 20));
                }
                assertion.doAssertion(wireTappedTransport);
            }
        });
    }

    private boolean isMaster(Cache<?, ?> cm) {
        Transport transport = cm.getAdvancedCache().getRpcManager().getTransport();
        return transport.getCoordinator().equals(transport.getAddress());
    }

    private Transport spyOnTransport(Cache<?, ?> cache) {
        RpcManager rpcManager = cache.getAdvancedCache().getRpcManager();
        Transport transport = (Transport)Mockito.spy((Object)rpcManager.getTransport());
        TestingUtil.replaceField((Object)transport, (String)"t", (Object)rpcManager, RpcManagerImpl.class);
        return transport;
    }

    private static interface Assertion {
        public void doAssertion(Transport var1) throws Exception;
    }
}

