package org.infinispan.distribution.rehash;

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.infinispan.AdvancedCache;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.distribution.MagicKey;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.TopologyChanged;
import org.infinispan.notifications.cachelistener.event.TopologyChangedEvent;
import org.infinispan.remoting.responses.CacheNotFoundResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.statetransfer.StateRequestCommand;
import org.infinispan.statetransfer.StateResponseCommand;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.util.ControlledRpcManager;
import org.infinispan.util.concurrent.ReclosableLatch;
import org.infinispan.util.concurrent.TimeoutException;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(groups = {"functional"}, testName = "distribution.rehash.NonTxPrimaryOwnerLeavingTest")
/* loaded from: input_file:org/infinispan/distribution/rehash/NonTxPrimaryOwnerLeavingTest.class */
public class NonTxPrimaryOwnerLeavingTest extends MultipleCacheManagersTest {

    @Listener
    /* loaded from: input_file:org/infinispan/distribution/rehash/NonTxPrimaryOwnerLeavingTest$TopologyUpdateListener.class */
    public class TopologyUpdateListener {
        private final ReclosableLatch preLatch = new ReclosableLatch();
        private final ReclosableLatch postLatch = new ReclosableLatch();
        private volatile boolean broken = false;

        public TopologyUpdateListener() {
        }

        @TopologyChanged
        public void onTopologyChange(TopologyChangedEvent topologyChangedEvent) throws InterruptedException {
            if (!topologyChangedEvent.isPre()) {
                NonTxPrimaryOwnerLeavingTest.this.log.tracef("Signalling topology %d finished installing", topologyChangedEvent.getNewTopologyId());
                this.postLatch.open();
            } else {
                NonTxPrimaryOwnerLeavingTest.this.log.tracef("Blocking topology %d", topologyChangedEvent.getNewTopologyId());
                this.broken = !this.preLatch.await(10L, TimeUnit.SECONDS);
                this.preLatch.close();
            }
        }

        void unblockOnce() {
            this.preLatch.open();
            AssertJUnit.assertFalse(this.broken);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void waitForTopologyToFinish() throws InterruptedException {
            if (!this.postLatch.await(10L, TimeUnit.SECONDS)) {
                throw new TimeoutException();
            }
            this.postLatch.close();
        }
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        configurationBuilder.clustering().cacheMode(CacheMode.DIST_SYNC);
        configurationBuilder.transaction().transactionMode(TransactionMode.NON_TRANSACTIONAL);
        addClusterEnabledCacheManager(configurationBuilder);
        addClusterEnabledCacheManager(configurationBuilder);
        addClusterEnabledCacheManager(configurationBuilder);
        waitForClusterToForm();
    }

    @Test(groups = {"unstable"})
    public void testPrimaryOwnerLeavingDuringPut() throws Exception {
        doTest(TestWriteOperation.PUT_CREATE, false);
    }

    public void testPrimaryOwnerLeavingDuringPutIfAbsent() throws Exception {
        doTest(TestWriteOperation.PUT_IF_ABSENT, false);
    }

    public void testPrimaryOwnerLeaveDuringPutAll() throws Exception {
        doTest(TestWriteOperation.PUT_MAP_CREATE, false);
    }

    public void testPrimaryOwnerLeaveDuringPutAll2() throws Exception {
        doTest(TestWriteOperation.PUT_MAP_CREATE, true);
    }

    private void doTest(TestWriteOperation testWriteOperation, boolean z) throws Exception {
        AdvancedCache advancedCache = advancedCache(0);
        AdvancedCache advancedCache2 = advancedCache(1);
        AdvancedCache advancedCache3 = advancedCache(2);
        TopologyUpdateListener topologyUpdateListener = new TopologyUpdateListener();
        advancedCache.addListener(topologyUpdateListener);
        TopologyUpdateListener topologyUpdateListener2 = new TopologyUpdateListener();
        advancedCache3.addListener(topologyUpdateListener2);
        ControlledRpcManager replaceRpcManager = ControlledRpcManager.replaceRpcManager(advancedCache);
        replaceRpcManager.excludeCommands(StateRequestCommand.class, StateResponseCommand.class);
        MagicKey magicKey = new MagicKey(advancedCache2);
        Future fork = fork(() -> {
            return testWriteOperation.perform(advancedCache, magicKey);
        });
        ControlledRpcManager.BlockedRequest expectCommand = replaceRpcManager.expectCommand(testWriteOperation.getCommandClass());
        advancedCache2.stop();
        if (!z) {
            topologyUpdateListener.unblockOnce();
            topologyUpdateListener.waitForTopologyToFinish();
        }
        expectCommand.send().expectResponse(address(1), (Response) CacheNotFoundResponse.INSTANCE).receive();
        if (z) {
            replaceRpcManager.expectNoCommand(100L, TimeUnit.MILLISECONDS);
            topologyUpdateListener.unblockOnce();
            topologyUpdateListener.waitForTopologyToFinish();
        }
        topologyUpdateListener2.unblockOnce();
        topologyUpdateListener2.waitForTopologyToFinish();
        if (!advancedCache.getDistributionManager().getCacheTopology().getDistribution(magicKey).isPrimary()) {
            replaceRpcManager.expectCommand(testWriteOperation.getCommandClass()).send().receiveAll();
        }
        AssertJUnit.assertNull(fork.get(10L, TimeUnit.SECONDS));
        this.log.tracef("Write operation is done", new Object[0]);
        advancedCache.removeListener(topologyUpdateListener);
        advancedCache3.removeListener(topologyUpdateListener2);
        topologyUpdateListener.unblockOnce();
        topologyUpdateListener.unblockOnce();
        replaceRpcManager.stopBlocking();
        AssertJUnit.assertEquals(testWriteOperation.getValue(), advancedCache.get(magicKey));
        AssertJUnit.assertEquals(testWriteOperation.getValue(), advancedCache3.get(magicKey));
    }
}
