package org.infinispan.distribution.rehash;

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.infinispan.AdvancedCache;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.distribution.MagicKey;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.TopologyChanged;
import org.infinispan.notifications.cachelistener.event.TopologyChangedEvent;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.tx.dld.ControlledRpcManager;
import org.infinispan.util.concurrent.ReclosableLatch;
import org.infinispan.util.concurrent.TimeoutException;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(groups = {"functional"}, testName = "distribution.rehash.NonTxPrimaryOwnerLeavingTest")
/* loaded from: input_file:org/infinispan/distribution/rehash/NonTxPrimaryOwnerLeavingTest.class */
public class NonTxPrimaryOwnerLeavingTest extends MultipleCacheManagersTest {

    @Listener
    /* loaded from: input_file:org/infinispan/distribution/rehash/NonTxPrimaryOwnerLeavingTest$TopologyUpdateListener.class */
    public static class TopologyUpdateListener {
        final ReclosableLatch preLatch = new ReclosableLatch();
        final ReclosableLatch postLatch = new ReclosableLatch();
        volatile boolean broken = false;

        @TopologyChanged
        public void onTopologyChange(TopologyChangedEvent topologyChangedEvent) throws InterruptedException {
            if (topologyChangedEvent.isPre()) {
                this.broken = !this.preLatch.await(10L, TimeUnit.SECONDS);
            } else {
                this.postLatch.open();
            }
        }
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        configurationBuilder.clustering().cacheMode(CacheMode.DIST_SYNC);
        configurationBuilder.transaction().transactionMode(TransactionMode.NON_TRANSACTIONAL);
        addClusterEnabledCacheManager(configurationBuilder);
        addClusterEnabledCacheManager(configurationBuilder);
        addClusterEnabledCacheManager(configurationBuilder);
        waitForClusterToForm();
    }

    @Test(groups = {"unstable"})
    public void testPrimaryOwnerLeavingDuringPut() throws Exception {
        doTest(TestWriteOperation.PUT_CREATE, false);
    }

    public void testPrimaryOwnerLeavingDuringPutIfAbsent() throws Exception {
        doTest(TestWriteOperation.PUT_IF_ABSENT, false);
    }

    public void testPrimaryOwnerLeaveDuringPutAll() throws Exception {
        doTest(TestWriteOperation.PUT_MAP_CREATE, false);
    }

    public void testPrimaryOwnerLeaveDuringPutAll2() throws Exception {
        doTest(TestWriteOperation.PUT_MAP_CREATE, true);
    }

    private void doTest(TestWriteOperation testWriteOperation, boolean z) throws Exception {
        AdvancedCache advancedCache = advancedCache(0);
        AdvancedCache advancedCache2 = advancedCache(1);
        AdvancedCache advancedCache3 = advancedCache(2);
        ControlledRpcManager controlledRpcManager = new ControlledRpcManager(advancedCache.getRpcManager());
        advancedCache.getComponentRegistry().registerComponent(controlledRpcManager, RpcManager.class);
        advancedCache.getComponentRegistry().rewire();
        controlledRpcManager.blockBefore(testWriteOperation.getCommandClass());
        MagicKey magicKey = new MagicKey(advancedCache2);
        Future fork = fork(() -> {
            return testWriteOperation.perform(advancedCache, magicKey);
        });
        controlledRpcManager.waitForCommandToBlock();
        TopologyUpdateListener topologyUpdateListener = new TopologyUpdateListener();
        advancedCache.addListener(topologyUpdateListener);
        advancedCache2.stop();
        if (!z) {
            topologyUpdateListener.preLatch.open();
            AssertJUnit.assertFalse(topologyUpdateListener.broken);
            if (!topologyUpdateListener.postLatch.await(10L, TimeUnit.SECONDS)) {
                throw new TimeoutException();
            }
        }
        controlledRpcManager.stopBlocking();
        if (z) {
            TestingUtil.sleepThread(500L);
            topologyUpdateListener.preLatch.open();
            AssertJUnit.assertFalse(topologyUpdateListener.broken);
        }
        AssertJUnit.assertNull(fork.get(10L, TimeUnit.SECONDS));
        this.log.tracef("Write operation is done", new Object[0]);
        AssertJUnit.assertEquals(testWriteOperation.getValue(), advancedCache.get(magicKey));
        AssertJUnit.assertEquals(testWriteOperation.getValue(), advancedCache3.get(magicKey));
    }
}
