package org.infinispan.query.backend;

import java.util.Iterator;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.Index;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.query.indexmanager.IndexUpdateCommand;
import org.infinispan.query.indexmanager.InfinispanIndexManager;
import org.infinispan.query.test.Person;
import org.infinispan.query.test.QueryTestSCI;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.rpc.ResponseFilter;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcManagerImpl;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.MultiCacheManagerCallable;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "query.backend.AsyncBackendTest")
/* loaded from: input_file:org/infinispan/query/backend/AsyncBackendTest.class */
public class AsyncBackendTest extends AbstractInfinispanTest {
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/query/backend/AsyncBackendTest$Assertion.class */
    public interface Assertion {
        void doAssertion(Transport transport) throws Exception;
    }

    @Test
    public void testWithEnablingAsync() {
        test(getBaseConfigPlus("default.worker.execution", "async"), transport -> {
            calledIndexAsynchronously(transport, "person");
        });
    }

    @Test
    public void testWithEnablingAsyncForDifferentIndex() {
        test(getBaseConfigPlus("cat.worker.execution", "async"), transport -> {
            calledIndexSynchronously(transport, "person");
        });
    }

    @Test
    public void testWithDefaultSettings() {
        test(getBaseConfig(), transport -> {
            calledIndexSynchronously(transport, "person");
        });
    }

    @Test
    public void testWithShardedIndex() {
        test(getBaseConfigPlus("default.sharding_strategy.nbr_of_shards", "2", "person.0.worker.execution", "async"), transport -> {
            calledIndexAsynchronously(transport, "person.0");
            calledIndexSynchronously(transport, "person.1");
        });
    }

    @Test
    public void testOverridingDefault() {
        test(getBaseConfigPlus("default.worker.execution", "async", "person.worker.execution", "sync"), transport -> {
            calledIndexSynchronously(transport, "person");
        });
    }

    @Test
    public void testHierarchy() {
        test(getBaseConfigPlus("person.sharding_strategy.nbr_of_shards", "3", "default.worker.execution", "sync", "person.worker.execution", "async", "person.1.worker.execution", "sync"), transport -> {
            calledIndexAsynchronously(transport, "person.0");
            calledIndexSynchronously(transport, "person.1");
            calledIndexAsynchronously(transport, "person.2");
        });
    }

    private ConfigurationBuilder getBaseConfig() {
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        configurationBuilder.clustering().cacheMode(CacheMode.REPL_SYNC).indexing().index(Index.ALL).addIndexedEntity(Person.class).addProperty("default.indexmanager", InfinispanIndexManager.class.getName()).addProperty("lucene_version", "LUCENE_CURRENT");
        return configurationBuilder;
    }

    private ConfigurationBuilder getBaseConfigPlus(String... strArr) {
        if (!$assertionsDisabled && (strArr == null || strArr.length % 2 != 0)) {
            throw new AssertionError();
        }
        ConfigurationBuilder baseConfig = getBaseConfig();
        for (int i = 0; i < strArr.length; i += 2) {
            baseConfig.indexing().addProperty(strArr[i], strArr[i + 1]);
        }
        return baseConfig;
    }

    private void calledIndexSynchronously(Transport transport, String str) throws Exception {
        assertIndexCall(transport, str, true);
    }

    private void calledIndexAsynchronously(Transport transport, String str) throws Exception {
        assertIndexCall(transport, str, false);
    }

    private void assertIndexCall(Transport transport, String str, boolean z) throws Exception {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(IndexUpdateCommand.class);
        ((Transport) Mockito.verify(transport, Mockito.atLeastOnce())).invokeRemotelyAsync(ArgumentMatchers.anyCollection(), (ReplicableCommand) forClass.capture(), (ResponseMode) ArgumentMatchers.eq(z ? ResponseMode.SYNCHRONOUS : ResponseMode.ASYNCHRONOUS), ArgumentMatchers.anyLong(), (ResponseFilter) ArgumentMatchers.any(), (DeliverOrder) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean());
        boolean z2 = false;
        Iterator it = forClass.getAllValues().iterator();
        while (it.hasNext()) {
            z2 |= ((IndexUpdateCommand) it.next()).getIndexName().equals(str);
        }
        Assert.assertTrue(z2);
    }

    private void test(ConfigurationBuilder configurationBuilder, final Assertion assertion) {
        TestingUtil.withCacheManagers(new MultiCacheManagerCallable(new EmbeddedCacheManager[]{TestCacheManagerFactory.createClusteredCacheManager(QueryTestSCI.INSTANCE, configurationBuilder), TestCacheManagerFactory.createClusteredCacheManager(QueryTestSCI.INSTANCE, configurationBuilder)}) { // from class: org.infinispan.query.backend.AsyncBackendTest.1
            public void call() throws Exception {
                EmbeddedCacheManager embeddedCacheManager = AsyncBackendTest.this.isMaster(this.cms[0].getCache()) ? this.cms[1] : this.cms[0];
                Transport spyOnTransport = AsyncBackendTest.this.spyOnTransport(embeddedCacheManager.getCache());
                embeddedCacheManager.getCache().put(1, new Person("person1", "blurb1", 20));
                embeddedCacheManager.getCache().put(2, new Person("person2", "blurb2", 27));
                embeddedCacheManager.getCache().put(3, new Person("person3", "blurb3", 56));
                assertion.doAssertion(spyOnTransport);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isMaster(Cache<?, ?> cache) {
        Transport transport = cache.getAdvancedCache().getRpcManager().getTransport();
        return transport.getCoordinator().equals(transport.getAddress());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Transport spyOnTransport(Cache<?, ?> cache) {
        RpcManager rpcManager = cache.getAdvancedCache().getRpcManager();
        Transport transport = (Transport) Mockito.spy(rpcManager.getTransport());
        TestingUtil.replaceField(transport, "t", rpcManager, RpcManagerImpl.class);
        return transport;
    }

    static {
        $assertionsDisabled = !AsyncBackendTest.class.desiredAssertionStatus();
    }
}
