package org.infinispan.remoting;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.commands.remote.SingleRpcCommand;
import org.infinispan.commons.io.ByteBuffer;
import org.infinispan.commons.marshall.StreamingMarshaller;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.jgroups.CommandAwareRpcDispatcher;
import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
import org.infinispan.stream.impl.StreamRequestCommand;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.topology.CacheTopologyControlCommand;
import org.infinispan.util.ByteString;
import org.infinispan.util.concurrent.BlockingTaskAwareExecutorServiceImpl;
import org.jgroups.Address;
import org.jgroups.Message;
import org.jgroups.blocks.Response;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "remoting.AsynchronousInvocationTest")
/* loaded from: input_file:org/infinispan/remoting/AsynchronousInvocationTest.class */
public class AsynchronousInvocationTest extends AbstractInfinispanTest {
    private EmbeddedCacheManager cacheManager;
    private DummyTaskCountExecutorService executorService;
    private CommandAwareRpcDispatcher commandAwareRpcDispatcher;
    private Address address;
    private StreamingMarshaller marshaller;
    private CommandsFactory commandsFactory;
    private ReplicableCommand blockingCacheRpcCommand;
    private ReplicableCommand nonBlockingCacheRpcCommand;
    private ReplicableCommand blockingNonCacheRpcCommand;
    private ReplicableCommand nonBlockingNonCacheRpcCommand;
    private ReplicableCommand blockingSingleRpcCommand;
    private ReplicableCommand nonBlockingSingleRpcCommand;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/remoting/AsynchronousInvocationTest$CountDownLatchResponse.class */
    public static class CountDownLatchResponse implements Response {
        private final CountDownLatch countDownLatch;

        private CountDownLatchResponse() {
            this.countDownLatch = new CountDownLatch(1);
        }

        public void send(Object obj, boolean z) {
            this.countDownLatch.countDown();
        }

        public void send(Message message, boolean z) {
            this.countDownLatch.countDown();
        }

        public boolean await(long j, TimeUnit timeUnit) throws InterruptedException {
            return this.countDownLatch.await(j, timeUnit);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/remoting/AsynchronousInvocationTest$DummyTaskCountExecutorService.class */
    public class DummyTaskCountExecutorService extends AbstractExecutorService {
        private volatile boolean hasExecutedCommand;

        private DummyTaskCountExecutorService() {
        }

        @Override // java.util.concurrent.Executor
        public void execute(Runnable runnable) {
            this.hasExecutedCommand = true;
            runnable.run();
        }

        public void reset() {
            this.hasExecutedCommand = false;
        }

        @Override // java.util.concurrent.ExecutorService
        public void shutdown() {
        }

        @Override // java.util.concurrent.ExecutorService
        public List<Runnable> shutdownNow() {
            return Collections.emptyList();
        }

        @Override // java.util.concurrent.ExecutorService
        public boolean isShutdown() {
            return false;
        }

        @Override // java.util.concurrent.ExecutorService
        public boolean isTerminated() {
            return false;
        }

        @Override // java.util.concurrent.ExecutorService
        public boolean awaitTermination(long j, TimeUnit timeUnit) throws InterruptedException {
            return false;
        }
    }

    private static ReplicableCommand mockReplicableCommand(boolean z) throws Throwable {
        ReplicableCommand replicableCommand = (ReplicableCommand) Mockito.mock(ReplicableCommand.class);
        Mockito.when(Boolean.valueOf(replicableCommand.canBlock())).thenReturn(Boolean.valueOf(z));
        ((ReplicableCommand) Mockito.doReturn((Object) null).when(replicableCommand)).invokeAsync();
        return replicableCommand;
    }

    @BeforeClass
    public void setUp() throws Throwable {
        this.executorService = new DummyTaskCountExecutorService();
        BlockingTaskAwareExecutorServiceImpl blockingTaskAwareExecutorServiceImpl = new BlockingTaskAwareExecutorServiceImpl("AsynchronousInvocationTest-Controller", this.executorService, TIME_SERVICE);
        ConfigurationBuilder defaultCacheConfiguration = TestCacheManagerFactory.getDefaultCacheConfiguration(false);
        defaultCacheConfiguration.clustering().cacheMode(CacheMode.DIST_SYNC);
        this.cacheManager = TestCacheManagerFactory.createClusteredCacheManager(defaultCacheConfiguration);
        Cache cache = this.cacheManager.getCache();
        ByteString fromString = ByteString.fromString(cache.getName());
        JGroupsTransport jGroupsTransport = (Transport) TestingUtil.extractGlobalComponent(this.cacheManager, Transport.class);
        if (jGroupsTransport instanceof JGroupsTransport) {
            this.commandAwareRpcDispatcher = jGroupsTransport.getCommandAwareRpcDispatcher();
            this.address = jGroupsTransport.getChannel().getAddress();
            this.marshaller = TestingUtil.extractGlobalMarshaller(this.cacheManager);
        } else {
            Assert.fail("Expected a JGroups Transport");
        }
        ComponentRegistry componentRegistry = cache.getAdvancedCache().getComponentRegistry();
        componentRegistry.registerComponent(blockingTaskAwareExecutorServiceImpl, "org.infinispan.executors.remote");
        componentRegistry.rewire();
        GlobalComponentRegistry globalComponentRegistry = cache.getCacheManager().getGlobalComponentRegistry();
        globalComponentRegistry.registerComponent(blockingTaskAwareExecutorServiceImpl, "org.infinispan.executors.remote");
        globalComponentRegistry.rewire();
        this.commandsFactory = TestingUtil.extractCommandsFactory(cache);
        ReplicableCommand mockReplicableCommand = mockReplicableCommand(false);
        ReplicableCommand mockReplicableCommand2 = mockReplicableCommand(true);
        this.blockingCacheRpcCommand = new StreamRequestCommand(fromString);
        this.nonBlockingCacheRpcCommand = new ClusteredGetCommand("key", fromString, 0L);
        this.blockingNonCacheRpcCommand = new CacheTopologyControlCommand((String) null, CacheTopologyControlCommand.Type.POLICY_GET_STATUS, (org.infinispan.remoting.transport.Address) null, 0);
        this.nonBlockingNonCacheRpcCommand = new ClusteredGetCommand("key", fromString, 0L);
        this.blockingSingleRpcCommand = new SingleRpcCommand(fromString, mockReplicableCommand2);
        this.nonBlockingSingleRpcCommand = new SingleRpcCommand(fromString, mockReplicableCommand);
    }

    @AfterClass
    public void tearDown() {
        if (this.cacheManager != null) {
            ((ExecutorService) this.cacheManager.getGlobalComponentRegistry().getComponent(ExecutorService.class, "org.infinispan.executors.remote")).shutdownNow();
            this.cacheManager.stop();
        }
    }

    public void testCommands() {
        Assert.assertTrue(this.blockingCacheRpcCommand.canBlock());
        Assert.assertTrue(this.blockingNonCacheRpcCommand.canBlock());
        Assert.assertTrue(this.blockingSingleRpcCommand.canBlock());
        Assert.assertFalse(this.nonBlockingCacheRpcCommand.canBlock());
        Assert.assertFalse(this.nonBlockingNonCacheRpcCommand.canBlock());
        Assert.assertFalse(this.nonBlockingSingleRpcCommand.canBlock());
    }

    public void testCacheRpcCommands() throws Exception {
        assertDispatchForCommand(this.blockingCacheRpcCommand, true);
        assertDispatchForCommand(this.nonBlockingCacheRpcCommand, false);
    }

    public void testSingleRpcCommand() throws Exception {
        assertDispatchForCommand(this.blockingSingleRpcCommand, true);
        assertDispatchForCommand(this.nonBlockingSingleRpcCommand, false);
    }

    public void testNonCacheRpcCommands() throws Exception {
        assertDispatchForCommand(this.blockingNonCacheRpcCommand, true);
        assertDispatchForCommand(this.nonBlockingNonCacheRpcCommand, false);
    }

    private void assertDispatchForCommand(ReplicableCommand replicableCommand, boolean z) throws Exception {
        this.log.debugf("Testing " + replicableCommand.getClass().getCanonicalName(), new Object[0]);
        this.commandsFactory.initializeReplicableCommand(replicableCommand, true);
        Message serialize = serialize(replicableCommand, true, this.address);
        if (serialize == null) {
            this.log.debugf("Don't test " + replicableCommand.getClass() + ". it is not Serializable", new Object[0]);
            return;
        }
        this.executorService.reset();
        CountDownLatchResponse countDownLatchResponse = new CountDownLatchResponse();
        this.commandAwareRpcDispatcher.handle(serialize, countDownLatchResponse);
        countDownLatchResponse.await(30L, TimeUnit.SECONDS);
        Assert.assertEquals(this.executorService.hasExecutedCommand, z, "Command " + replicableCommand.getClass() + " dispatched wrongly.");
        Message serialize2 = serialize(replicableCommand, false, this.address);
        if (serialize2 == null) {
            this.log.debugf("Don't test " + replicableCommand.getClass() + ". it is not Serializable", new Object[0]);
            return;
        }
        this.executorService.reset();
        CountDownLatchResponse countDownLatchResponse2 = new CountDownLatchResponse();
        this.commandAwareRpcDispatcher.handle(serialize2, countDownLatchResponse2);
        countDownLatchResponse2.await(30L, TimeUnit.SECONDS);
        Assert.assertFalse(this.executorService.hasExecutedCommand, "Command " + replicableCommand.getClass() + " dispatched wrongly.");
    }

    private Message serialize(ReplicableCommand replicableCommand, boolean z, Address address) {
        try {
            ByteBuffer objectToBuffer = this.marshaller.objectToBuffer(replicableCommand);
            Message message = new Message((Address) null, objectToBuffer.getBuf(), objectToBuffer.getOffset(), objectToBuffer.getLength());
            message.setFlag(new Message.Flag[]{Message.Flag.NO_TOTAL_ORDER});
            if (z) {
                message.setFlag(new Message.Flag[]{Message.Flag.OOB});
            }
            message.src(address);
            return message;
        } catch (Exception e) {
            return null;
        }
    }
}
