package org.infinispan.statetransfer;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.interceptors.BaseCustomAsyncInterceptor;
import org.infinispan.interceptors.impl.EntryWrappingInterceptor;
import org.infinispan.interceptors.locking.PessimisticLockingInterceptor;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.transaction.LockingMode;
import org.infinispan.util.ReplicatedControlledConsistentHashFactory;
import org.infinispan.util.concurrent.IsolationLevel;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(groups = {"functional"}, testName = "statetransfer.ReplCommandRetryTest")
/* loaded from: input_file:org/infinispan/statetransfer/ReplCommandRetryTest.class */
public class ReplCommandRetryTest extends MultipleCacheManagersTest {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/infinispan/statetransfer/ReplCommandRetryTest$DelayInterceptor.class */
    public class DelayInterceptor extends BaseCustomAsyncInterceptor {
        private final AtomicInteger counter = new AtomicInteger(0);
        private final CheckPoint checkPoint = new CheckPoint();
        private final Class<?> commandToBlock;

        public DelayInterceptor(Class<?> cls) {
            this.commandToBlock = cls;
        }

        public int getCounter() {
            return this.counter.get();
        }

        public void waitUntilBlocked(int i) throws TimeoutException, InterruptedException {
            AssertJUnit.assertEquals("blocked_" + i + "_on_" + this.cache, this.checkPoint.peek(5L, TimeUnit.SECONDS, "blocked_" + i + "_on_" + this.cache));
        }

        public void unblock(int i) throws InterruptedException, TimeoutException, BrokenBarrierException {
            ReplCommandRetryTest.log.tracef("Unblocking command on cache %s", this.cache);
            this.checkPoint.awaitStrict("blocked_" + i + "_on_" + this.cache, 5L, TimeUnit.SECONDS);
            this.checkPoint.trigger("resume_" + i + "_on_" + this.cache);
        }

        public Object visitPutKeyValueCommand(InvocationContext invocationContext, PutKeyValueCommand putKeyValueCommand) throws Throwable {
            return invokeNextThenAccept(invocationContext, putKeyValueCommand, (invocationContext2, putKeyValueCommand2, obj) -> {
                if (invocationContext.isInTxScope() || putKeyValueCommand.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) {
                    return;
                }
                doBlock(invocationContext, putKeyValueCommand);
            });
        }

        public Object visitLockControlCommand(TxInvocationContext txInvocationContext, LockControlCommand lockControlCommand) throws Throwable {
            return invokeNextThenAccept(txInvocationContext, lockControlCommand, (invocationContext, lockControlCommand2, obj) -> {
                if (txInvocationContext.getCacheTransaction().isFromStateTransfer()) {
                    return;
                }
                doBlock(txInvocationContext, lockControlCommand);
            });
        }

        public Object visitPrepareCommand(TxInvocationContext txInvocationContext, PrepareCommand prepareCommand) throws Throwable {
            return invokeNextThenAccept(txInvocationContext, prepareCommand, (invocationContext, prepareCommand2, obj) -> {
                if (txInvocationContext.getCacheTransaction().isFromStateTransfer()) {
                    return;
                }
                doBlock(txInvocationContext, prepareCommand);
            });
        }

        public Object visitCommitCommand(TxInvocationContext txInvocationContext, CommitCommand commitCommand) throws Throwable {
            return invokeNextThenAccept(txInvocationContext, commitCommand, (invocationContext, commitCommand2, obj) -> {
                if (txInvocationContext.getCacheTransaction().isFromStateTransfer()) {
                    return;
                }
                doBlock(txInvocationContext, commitCommand);
            });
        }

        private void doBlock(InvocationContext invocationContext, ReplicableCommand replicableCommand) throws InterruptedException, TimeoutException {
            if (this.commandToBlock != replicableCommand.getClass()) {
                return;
            }
            ReplCommandRetryTest.log.tracef("Delaying command %s originating from %s", replicableCommand, invocationContext.getOrigin());
            Integer valueOf = Integer.valueOf(this.counter.incrementAndGet());
            this.checkPoint.trigger("blocked_" + valueOf + "_on_" + this.cache);
            this.checkPoint.awaitStrict("resume_" + valueOf + "_on_" + this.cache, 15L, TimeUnit.SECONDS);
            ReplCommandRetryTest.log.tracef("Command unblocked: %s", replicableCommand);
        }

        public String toString() {
            return "DelayInterceptor{counter=" + this.counter + "}";
        }
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() {
    }

    private ConfigurationBuilder buildConfig(LockingMode lockingMode, Class<?> cls, boolean z) {
        ConfigurationBuilder defaultClusteredCacheConfig = getDefaultClusteredCacheConfig(CacheMode.REPL_SYNC, lockingMode != null);
        defaultClusteredCacheConfig.transaction().lockingMode(lockingMode);
        defaultClusteredCacheConfig.clustering().hash().numSegments(1).consistentHashFactory(new ReplicatedControlledConsistentHashFactory(0, new int[0]));
        defaultClusteredCacheConfig.clustering().remoteTimeout(15000L);
        defaultClusteredCacheConfig.clustering().stateTransfer().fetchInMemoryState(true);
        if (cls != LockControlCommand.class || z) {
            defaultClusteredCacheConfig.customInterceptors().addInterceptor().after(EntryWrappingInterceptor.class).interceptor(new DelayInterceptor(cls));
        } else {
            defaultClusteredCacheConfig.customInterceptors().addInterceptor().before(PessimisticLockingInterceptor.class).interceptor(new DelayInterceptor(cls));
        }
        defaultClusteredCacheConfig.locking().isolationLevel(IsolationLevel.READ_COMMITTED);
        return defaultClusteredCacheConfig;
    }

    public void testRetryAfterJoinNonTransactional() throws Exception {
        Cache cache = addClusterEnabledCacheManager(buildConfig(null, PutKeyValueCommand.class, true)).getCache();
        DelayInterceptor findInterceptor = TestingUtil.findInterceptor(cache, DelayInterceptor.class);
        int topologyId = cache.getAdvancedCache().getDistributionManager().getCacheTopology().getTopologyId();
        Cache cache2 = addClusterEnabledCacheManager(buildConfig(null, PutKeyValueCommand.class, false)).getCache();
        DelayInterceptor findInterceptor2 = TestingUtil.findInterceptor(cache2, DelayInterceptor.class);
        waitForStateTransfer(topologyId + 4, cache, cache2);
        Future fork = fork(() -> {
            log.tracef("Initiating a put command on %s", cache);
            cache.put("k", "v");
            return null;
        });
        findInterceptor2.waitUntilBlocked(1);
        Cache cache3 = addClusterEnabledCacheManager(buildConfig(null, PutKeyValueCommand.class, false)).getCache();
        DelayInterceptor findInterceptor3 = TestingUtil.findInterceptor(cache3, DelayInterceptor.class);
        waitForStateTransfer(topologyId + 8, cache, cache2, cache3);
        log.tracef("Triggering retry 1", new Object[0]);
        findInterceptor2.unblock(1);
        findInterceptor2.waitUntilBlocked(2);
        findInterceptor3.waitUntilBlocked(1);
        findInterceptor2.unblock(2);
        Cache cache4 = addClusterEnabledCacheManager(buildConfig(null, PutKeyValueCommand.class, false)).getCache();
        DelayInterceptor findInterceptor4 = TestingUtil.findInterceptor(cache4, DelayInterceptor.class);
        waitForStateTransfer(topologyId + 12, cache, cache2, cache3, cache4);
        log.tracef("Triggering retry 2", new Object[0]);
        findInterceptor3.unblock(1);
        findInterceptor2.unblock(3);
        findInterceptor3.unblock(2);
        findInterceptor4.unblock(1);
        findInterceptor.unblock(1);
        log.tracef("Waiting for the put command to finish on %s", cache);
        Object obj = fork.get(10L, TimeUnit.SECONDS);
        log.tracef("Put command finished on %s", cache);
        AssertJUnit.assertNull(obj);
        AssertJUnit.assertEquals(1, findInterceptor.getCounter());
        AssertJUnit.assertEquals(3, findInterceptor2.getCounter());
        AssertJUnit.assertEquals(2, findInterceptor3.getCounter());
        AssertJUnit.assertEquals(1, findInterceptor4.getCounter());
    }

    public void testRetryAfterJoinLockControlCommand() throws Exception {
        testRetryAfterJoinTransactional(LockingMode.PESSIMISTIC, LockControlCommand.class);
    }

    public void testRetryAfterJoinOnePhasePrepareCommand() throws Exception {
        testRetryAfterJoinTransactional(LockingMode.PESSIMISTIC, PrepareCommand.class);
    }

    public void testRetryAfterJoinTwoPhasePrepareCommand() throws Exception {
        testRetryAfterJoinTransactional(LockingMode.OPTIMISTIC, PrepareCommand.class);
    }

    public void testRetryAfterJoinCommitCommand() throws Exception {
        testRetryAfterJoinTransactional(LockingMode.OPTIMISTIC, CommitCommand.class);
    }

    private void testRetryAfterJoinTransactional(LockingMode lockingMode, Class<?> cls) throws Exception {
        Cache cache = addClusterEnabledCacheManager(buildConfig(lockingMode, cls, false)).getCache();
        DelayInterceptor findInterceptor = TestingUtil.findInterceptor(cache, DelayInterceptor.class);
        int topologyId = cache.getAdvancedCache().getDistributionManager().getCacheTopology().getTopologyId();
        Cache cache2 = addClusterEnabledCacheManager(buildConfig(lockingMode, cls, true)).getCache();
        DelayInterceptor findInterceptor2 = TestingUtil.findInterceptor(cache2, DelayInterceptor.class);
        waitForStateTransfer(topologyId + 4, cache, cache2);
        Future fork = fork(() -> {
            log.tracef("Initiating a transaction on backup owner %s", cache2);
            cache2.put("k", "v");
            return null;
        });
        findInterceptor.waitUntilBlocked(1);
        Cache cache3 = addClusterEnabledCacheManager(buildConfig(lockingMode, cls, false)).getCache();
        DelayInterceptor findInterceptor3 = TestingUtil.findInterceptor(cache3, DelayInterceptor.class);
        waitForStateTransfer(topologyId + 8, cache, cache2, cache3);
        log.tracef("Triggering retry 1 from node %s", cache);
        findInterceptor.unblock(1);
        findInterceptor.waitUntilBlocked(2);
        findInterceptor3.waitUntilBlocked(1);
        Cache cache4 = addClusterEnabledCacheManager(buildConfig(lockingMode, cls, false)).getCache();
        DelayInterceptor findInterceptor4 = TestingUtil.findInterceptor(cache4, DelayInterceptor.class);
        waitForStateTransfer(topologyId + 12, cache, cache2, cache3, cache4);
        findInterceptor.unblock(2);
        log.tracef("Triggering retry 2 from %s", cache3);
        findInterceptor3.unblock(1);
        findInterceptor.unblock(3);
        findInterceptor3.unblock(2);
        findInterceptor4.unblock(1);
        log.tracef("Finishing tx on %s", cache2);
        findInterceptor2.unblock(1);
        log.tracef("Waiting for the transaction to finish on %s", cache2);
        fork.get(10L, TimeUnit.SECONDS);
        log.tracef("Transaction finished on %s", cache2);
        AssertJUnit.assertEquals(findInterceptor.getCounter(), 3);
        AssertJUnit.assertEquals(findInterceptor2.getCounter(), 1);
        AssertJUnit.assertEquals(findInterceptor3.getCounter(), 2);
        AssertJUnit.assertEquals(findInterceptor4.getCounter(), 1);
    }

    private void waitForStateTransfer(int i, Cache... cacheArr) {
        TestingUtil.waitForNoRebalance(cacheArr);
        for (Cache cache : cacheArr) {
            LocalizedCacheTopology cacheTopology = cache.getAdvancedCache().getDistributionManager().getCacheTopology();
            AssertJUnit.assertEquals(String.format("Wrong topology on cache %s, expected %d and got %s", cache, Integer.valueOf(i), cacheTopology), i, cacheTopology.getTopologyId());
        }
    }
}
