package org.infinispan.distribution.rehash;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.infinispan.Cache;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.remote.RevokeBiasCommand;
import org.infinispan.commands.remote.recovery.TxCompletionNotificationCommand;
import org.infinispan.commands.statetransfer.StateResponseCommand;
import org.infinispan.commands.triangle.BackupWriteCommand;
import org.infinispan.commands.tx.AbstractTransactionBoundaryCommand;
import org.infinispan.commands.write.InvalidateVersionsCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.distribution.BlockingInterceptor;
import org.infinispan.globalstate.NoOpGlobalConfigurationManager;
import org.infinispan.interceptors.AsyncInterceptorChain;
import org.infinispan.interceptors.impl.EntryWrappingInterceptor;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.test.AbstractCacheTest;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.op.TestWriteOperation;
import org.infinispan.topology.ClusterTopologyManager;
import org.infinispan.transaction.LockingMode;
import org.infinispan.util.ControlledRpcManager;
import org.infinispan.util.concurrent.IsolationLevel;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "distribution.rehash.StateTransferOverwritingValueTest")
/* loaded from: input_file:org/infinispan/distribution/rehash/StateTransferOverwritingValueTest.class */
public class StateTransferOverwritingValueTest extends MultipleCacheManagersTest {
    public StateTransferOverwritingValueTest() {
        this.cleanup = AbstractCacheTest.CleanupPhase.AFTER_METHOD;
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    public Object[] factory() {
        return new Object[]{new StateTransferOverwritingValueTest().cacheMode(CacheMode.DIST_SYNC).transactional(false), new StateTransferOverwritingValueTest().cacheMode(CacheMode.DIST_SYNC).transactional(true).lockingMode(LockingMode.OPTIMISTIC), new StateTransferOverwritingValueTest().cacheMode(CacheMode.DIST_SYNC).transactional(true).lockingMode(LockingMode.PESSIMISTIC), new StateTransferOverwritingValueTest().cacheMode(CacheMode.SCATTERED_SYNC).transactional(false)};
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        addClusterEnabledCacheManager(getConfigurationBuilder());
        waitForClusterToForm();
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void amendCacheManagerBeforeStart(EmbeddedCacheManager embeddedCacheManager) {
        NoOpGlobalConfigurationManager.amendCacheManager(embeddedCacheManager);
    }

    protected ConfigurationBuilder getConfigurationBuilder() {
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        configurationBuilder.clustering().cacheMode(this.cacheMode);
        configurationBuilder.transaction().transactionMode(transactionMode());
        if (this.lockingMode != null) {
            configurationBuilder.transaction().lockingMode(this.lockingMode);
        }
        configurationBuilder.locking().isolationLevel(IsolationLevel.READ_COMMITTED);
        return configurationBuilder;
    }

    public void testBackupOwnerJoiningDuringPut() throws Exception {
        doTest(TestWriteOperation.PUT_CREATE);
    }

    public void testBackupOwnerJoiningDuringPutOverwrite() throws Exception {
        doTest(TestWriteOperation.PUT_OVERWRITE);
    }

    public void testBackupOwnerJoiningDuringPutIfAbsent() throws Exception {
        doTest(TestWriteOperation.PUT_IF_ABSENT);
    }

    public void testBackupOwnerJoiningDuringReplace() throws Exception {
        doTest(TestWriteOperation.REPLACE);
    }

    public void testBackupOwnerJoiningDuringReplaceWithPreviousValue() throws Exception {
        doTest(TestWriteOperation.REPLACE_EXACT);
    }

    public void testBackupOwnerJoiningDuringRemove() throws Exception {
        doTest(TestWriteOperation.REMOVE);
    }

    public void testBackupOwnerJoiningDuringRemoveWithPreviousValue() throws Exception {
        doTest(TestWriteOperation.REMOVE_EXACT);
    }

    private void doTest(TestWriteOperation testWriteOperation) throws Exception {
        Cache advancedCache = advancedCache(0);
        Object previousValue = testWriteOperation.getPreviousValue();
        if (previousValue != null) {
            advancedCache.put("key", previousValue);
            AssertJUnit.assertEquals(previousValue, advancedCache.get("key"));
            this.log.tracef("Previous value inserted: %s = %s", "key", previousValue);
        }
        int topologyId = advancedCache.getDistributionManager().getCacheTopology().getTopologyId();
        CheckPoint checkPoint = new CheckPoint();
        ControlledRpcManager replaceRpcManager = ControlledRpcManager.replaceRpcManager(advancedCache);
        replaceRpcManager.excludeCommands(WriteCommand.class, BackupWriteCommand.class, RevokeBiasCommand.class, InvalidateVersionsCommand.class, AbstractTransactionBoundaryCommand.class, TxCompletionNotificationCommand.class);
        int i = topologyId + 1;
        blockRebalanceConfirmation(manager(0), checkPoint, i);
        this.log.tracef("Starting the cache on the joiner", new Object[0]);
        ConfigurationBuilder configurationBuilder = getConfigurationBuilder();
        configurationBuilder.clustering().stateTransfer().awaitInitialTransfer(false).timeout(30L, TimeUnit.SECONDS);
        addClusterEnabledCacheManager(configurationBuilder);
        Cache advancedCache2 = advancedCache(1);
        ((StateTransferLock) TestingUtil.extractComponent(advancedCache2, StateTransferLock.class)).transactionDataFuture(i).get(10L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals(2, advancedCache2.getRpcManager().getMembers().size());
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        BlockingInterceptor blockingInterceptor = new BlockingInterceptor(cyclicBarrier, true, false, (Predicate<VisitableCommand>) (this.cacheMode.isScattered() ? visitableCommand -> {
            return (visitableCommand instanceof PutKeyValueCommand) || (visitableCommand instanceof RemoveCommand);
        } : visitableCommand2 -> {
            return visitableCommand2.getClass().equals(testWriteOperation.getCommandClass());
        }));
        AsyncInterceptorChain extractInterceptorChain = TestingUtil.extractInterceptorChain(advancedCache2);
        AssertJUnit.assertTrue(extractInterceptorChain.addInterceptorAfter(blockingInterceptor, extractInterceptorChain.findInterceptorExtending(EntryWrappingInterceptor.class).getClass()));
        ControlledRpcManager.BlockedRequest expectCommand = replaceRpcManager.expectCommand(StateResponseCommand.class);
        Future fork = fork(() -> {
            return testWriteOperation.perform(advancedCache, "key");
        });
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        blockingInterceptor.suspend(true);
        expectCommand.send().receiveAll();
        if (this.cacheMode.isScattered()) {
            replaceRpcManager.expectCommand(StateResponseCommand.class).send().receiveAll();
        }
        checkPoint.awaitStrict("pre_rebalance_confirmation_" + i + "_from_" + address(1), 10L, TimeUnit.SECONDS);
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals(testWriteOperation.getReturnValue(), fork.get(10L, TimeUnit.SECONDS));
        this.log.tracef("%s operation is done", testWriteOperation);
        checkPoint.trigger("resume_rebalance_confirmation_" + i + "_from_" + address(0));
        checkPoint.trigger("resume_rebalance_confirmation_" + i + "_from_" + address(1));
        TestingUtil.waitForNoRebalance(advancedCache, advancedCache2);
        AssertJUnit.assertEquals(testWriteOperation.getValue(), advancedCache.get("key"));
        AssertJUnit.assertEquals(testWriteOperation.getValue(), advancedCache2.get("key"));
        replaceRpcManager.stopBlocking();
    }

    private void blockRebalanceConfirmation(EmbeddedCacheManager embeddedCacheManager, CheckPoint checkPoint, int i) throws Exception {
        Answer delegatesTo = AdditionalAnswers.delegatesTo((ClusterTopologyManager) TestingUtil.extractGlobalComponent(embeddedCacheManager, ClusterTopologyManager.class));
        ClusterTopologyManager clusterTopologyManager = (ClusterTopologyManager) Mockito.mock(ClusterTopologyManager.class, Mockito.withSettings().defaultAnswer(delegatesTo));
        ((ClusterTopologyManager) Mockito.doAnswer(invocationOnMock -> {
            Object[] arguments = invocationOnMock.getArguments();
            Address address = (Address) arguments[1];
            int intValue = ((Integer) arguments[2]).intValue();
            if (intValue == i) {
                checkPoint.trigger("pre_rebalance_confirmation_" + intValue + "_from_" + address);
                checkPoint.awaitStrict("resume_rebalance_confirmation_" + intValue + "_from_" + address, 10L, TimeUnit.SECONDS);
            }
            return delegatesTo.answer(invocationOnMock);
        }).when(clusterTopologyManager)).handleRebalancePhaseConfirm(ArgumentMatchers.anyString(), (Address) ArgumentMatchers.any(Address.class), ArgumentMatchers.anyInt(), (Throwable) ArgumentMatchers.isNull(), ArgumentMatchers.anyInt());
        TestingUtil.replaceComponent((CacheContainer) embeddedCacheManager, (Class<ClusterTopologyManager>) ClusterTopologyManager.class, clusterTopologyManager, true);
    }
}
