package org.infinispan.statetransfer;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.Cache;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.interceptors.base.BaseCustomInterceptor;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.topology.CacheTopology;
import org.testng.Assert;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(groups = {"functional"}, testName = "statetransfer.ReplCommandForwardingTest")
/* loaded from: input_file:org/infinispan/statetransfer/ReplCommandForwardingTest.class */
public class ReplCommandForwardingTest extends MultipleCacheManagersTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/statetransfer/ReplCommandForwardingTest$DelayInterceptor.class */
    public class DelayInterceptor extends BaseCustomInterceptor {
        private final AtomicInteger counter;
        private final SynchronousQueue<Object> barrier;

        private DelayInterceptor() {
            this.counter = new AtomicInteger(0);
            this.barrier = new SynchronousQueue<>(true);
        }

        public int getCounter() {
            return this.counter.get();
        }

        public void unblock(int i) throws InterruptedException, TimeoutException, BrokenBarrierException {
            ReplCommandForwardingTest.this.log.tracef("Unblocking command on cache %s", this.cache);
            Assert.assertTrue(this.barrier.offer(Integer.valueOf(i), 5L, TimeUnit.SECONDS), String.format("There is no DelayInterceptor waiting to be unblocked on cache %s", this.cache));
        }

        public Object visitPutKeyValueCommand(InvocationContext invocationContext, PutKeyValueCommand putKeyValueCommand) throws Throwable {
            Object visitPutKeyValueCommand = super.visitPutKeyValueCommand(invocationContext, putKeyValueCommand);
            if (!invocationContext.isInTxScope() && !putKeyValueCommand.hasFlag(Flag.PUT_FOR_STATE_TRANSFER)) {
                ReplCommandForwardingTest.this.log.tracef("Delaying command %s", putKeyValueCommand);
                Integer valueOf = Integer.valueOf(this.counter.incrementAndGet());
                Assert.assertEquals(this.barrier.poll(15L, TimeUnit.SECONDS), valueOf, String.format("Timed out waiting for unblock(%d) call on cache %s", valueOf, this.cache));
                ReplCommandForwardingTest.this.log.tracef("Command unblocked: %s", putKeyValueCommand);
            }
            return visitPutKeyValueCommand;
        }

        public Object visitPrepareCommand(TxInvocationContext txInvocationContext, PrepareCommand prepareCommand) throws Throwable {
            Object visitPrepareCommand = super.visitPrepareCommand(txInvocationContext, prepareCommand);
            if (!txInvocationContext.isOriginLocal() || !txInvocationContext.getCacheTransaction().isFromStateTransfer()) {
                ReplCommandForwardingTest.this.log.tracef("Delaying command %s", prepareCommand);
                Integer valueOf = Integer.valueOf(this.counter.incrementAndGet());
                Assert.assertEquals(this.barrier.poll(15L, TimeUnit.SECONDS), valueOf, String.format("Timed out waiting for unblock(%d) call on cache %s", valueOf, this.cache));
                ReplCommandForwardingTest.this.log.tracef("Command unblocked: %s", prepareCommand);
            }
            return visitPrepareCommand;
        }

        public String toString() {
            return "DelayInterceptor{counter=" + this.counter + "}";
        }
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
    }

    private ConfigurationBuilder buildConfig(boolean z) {
        ConfigurationBuilder defaultClusteredCacheConfig = getDefaultClusteredCacheConfig(CacheMode.REPL_SYNC, z);
        defaultClusteredCacheConfig.clustering().sync().replTimeout(15000L);
        defaultClusteredCacheConfig.clustering().stateTransfer().fetchInMemoryState(true);
        defaultClusteredCacheConfig.customInterceptors().addInterceptor().after(StateTransferInterceptor.class).interceptor(new DelayInterceptor());
        return defaultClusteredCacheConfig;
    }

    public void testForwardToJoinerNonTransactional() throws Exception {
        final Cache cache = addClusterEnabledCacheManager(buildConfig(false)).getCache();
        DelayInterceptor findInterceptor = TestingUtil.findInterceptor(cache, DelayInterceptor.class);
        Cache cache2 = addClusterEnabledCacheManager(buildConfig(false)).getCache();
        DelayInterceptor findInterceptor2 = TestingUtil.findInterceptor(cache2, DelayInterceptor.class);
        waitForStateTransfer(2, cache, cache2);
        Future fork = fork(new Callable<Object>() { // from class: org.infinispan.statetransfer.ReplCommandForwardingTest.1
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                ReplCommandForwardingTest.this.log.tracef("Initiating a put command on %s", cache);
                cache.put("k", "v");
                return null;
            }
        });
        Cache cache3 = addClusterEnabledCacheManager(buildConfig(false)).getCache();
        DelayInterceptor findInterceptor3 = TestingUtil.findInterceptor(cache3, DelayInterceptor.class);
        waitForStateTransfer(4, cache, cache2, cache3);
        this.log.tracef("Forwarding the command from %s", cache2);
        findInterceptor2.unblock(1);
        Thread.sleep(1000L);
        findInterceptor.unblock(1);
        findInterceptor2.unblock(2);
        Cache cache4 = addClusterEnabledCacheManager(buildConfig(false)).getCache();
        DelayInterceptor findInterceptor4 = TestingUtil.findInterceptor(cache4, DelayInterceptor.class);
        waitForStateTransfer(6, cache, cache2, cache3, cache4);
        this.log.tracef("Forwarding the command from %s", cache3);
        findInterceptor3.unblock(1);
        findInterceptor.unblock(2);
        findInterceptor4.unblock(1);
        this.log.tracef("Forwarding the command from %s for a second time", cache3);
        findInterceptor3.unblock(2);
        findInterceptor2.unblock(3);
        findInterceptor4.unblock(2);
        this.log.tracef("Waiting for the put command to finish on %s", cache);
        fork.get(10L, TimeUnit.SECONDS);
        this.log.tracef("Put command finished on %s", cache);
        Assert.assertEquals(findInterceptor.getCounter(), 2);
        Assert.assertEquals(findInterceptor2.getCounter(), 3);
        Assert.assertEquals(findInterceptor3.getCounter(), 2);
        Assert.assertEquals(findInterceptor4.getCounter(), 2);
    }

    public void testForwardToJoinerTransactional() throws Exception {
        final Cache cache = addClusterEnabledCacheManager(buildConfig(true)).getCache();
        DelayInterceptor findInterceptor = TestingUtil.findInterceptor(cache, DelayInterceptor.class);
        Cache cache2 = addClusterEnabledCacheManager(buildConfig(true)).getCache();
        DelayInterceptor findInterceptor2 = TestingUtil.findInterceptor(cache2, DelayInterceptor.class);
        waitForStateTransfer(2, cache, cache2);
        Future fork = fork(new Callable<Object>() { // from class: org.infinispan.statetransfer.ReplCommandForwardingTest.2
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                ReplCommandForwardingTest.this.log.tracef("Initiating a transaction on %s", cache);
                cache.put("k", "v");
                return null;
            }
        });
        Cache cache3 = addClusterEnabledCacheManager(buildConfig(true)).getCache();
        DelayInterceptor findInterceptor3 = TestingUtil.findInterceptor(cache3, DelayInterceptor.class);
        waitForStateTransfer(4, cache, cache2, cache3);
        this.log.tracef("Forwarding the prepare command from %s", cache2);
        findInterceptor2.unblock(1);
        Cache cache4 = addClusterEnabledCacheManager(buildConfig(true)).getCache();
        DelayInterceptor findInterceptor4 = TestingUtil.findInterceptor(cache4, DelayInterceptor.class);
        waitForStateTransfer(6, cache, cache2, cache3, cache4);
        this.log.tracef("Forwarding the prepare command from %s", cache3);
        findInterceptor3.unblock(1);
        findInterceptor2.unblock(2);
        findInterceptor4.unblock(1);
        this.log.tracef("Forwarding the prepare command from %s", cache);
        findInterceptor.unblock(1);
        findInterceptor2.unblock(3);
        findInterceptor3.unblock(2);
        findInterceptor4.unblock(2);
        this.log.tracef("Waiting for the transaction to finish on %s", cache);
        fork.get(10L, TimeUnit.SECONDS);
        this.log.tracef("Transaction finished on %s", cache);
        Assert.assertEquals(findInterceptor.getCounter(), 1);
        Assert.assertEquals(findInterceptor2.getCounter(), 3);
        Assert.assertEquals(findInterceptor3.getCounter(), 2);
        Assert.assertEquals(findInterceptor4.getCounter(), 2);
    }

    private void waitForStateTransfer(int i, Cache... cacheArr) {
        TestingUtil.waitForRehashToComplete(cacheArr);
        for (Cache cache : cacheArr) {
            CacheTopology cacheTopology = ((StateTransferManager) TestingUtil.extractComponent(cache, StateTransferManager.class)).getCacheTopology();
            Assert.assertEquals(cacheTopology.getTopologyId(), i, String.format("Wrong topology on cache %s, expected %d and got %s", cache, Integer.valueOf(i), cacheTopology));
        }
    }
}
