package org.infinispan.notifications.cachelistener.cluster;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.Cache;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.distribution.BlockingInterceptor;
import org.infinispan.distribution.MagicKey;
import org.infinispan.interceptors.impl.EntryWrappingInterceptor;
import org.infinispan.notifications.cachelistener.cluster.AbstractClusterListenerUtilTest;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.test.TestingUtil;
import org.infinispan.tx.dld.ControlledRpcManager;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "notifications.cachelistener.cluster.ClusterListenerReplTest")
/* loaded from: input_file:org/infinispan/notifications/cachelistener/cluster/ClusterListenerReplTest.class */
public class ClusterListenerReplTest extends AbstractClusterListenerNonTxTest {
    public ClusterListenerReplTest() {
        super(false, CacheMode.REPL_SYNC);
    }

    public void testPrimaryOwnerGoesDownBeforeBackupRaisesEvent() throws InterruptedException, TimeoutException, ExecutionException, BrokenBarrierException {
        final Cache cache = cache(0, "cluster-listener");
        Cache cache2 = cache(1, "cluster-listener");
        Cache cache3 = cache(2, "cluster-listener");
        AbstractClusterListenerUtilTest.ClusterListener clusterListener = new AbstractClusterListenerUtilTest.ClusterListener();
        cache.addListener(clusterListener);
        ControlledRpcManager controlledRpcManager = new ControlledRpcManager((RpcManager) TestingUtil.extractComponent(cache2, RpcManager.class));
        controlledRpcManager.blockBefore(PutKeyValueCommand.class);
        TestingUtil.replaceComponent((Cache<?, ?>) cache2, (Class<ControlledRpcManager>) RpcManager.class, controlledRpcManager, true);
        final MagicKey magicKey = new MagicKey((Cache<?, ?>) cache2, (Cache<?, ?>[]) new Cache[]{cache3});
        Future fork = fork(new Callable<String>() { // from class: org.infinispan.notifications.cachelistener.cluster.ClusterListenerReplTest.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public String call() throws Exception {
                return (String) cache.put(magicKey, "first-value");
            }
        });
        controlledRpcManager.waitForCommandToBlock(10L, TimeUnit.SECONDS);
        TestingUtil.killCacheManagers(cache2.getCacheManager());
        AssertJUnit.assertEquals((String) null, (String) fork.get(10L, TimeUnit.SECONDS));
        AssertJUnit.assertTrue(clusterListener.events.size() >= 1);
        AssertJUnit.assertTrue(clusterListener.events.size() <= 2);
        checkEvent(clusterListener.events.get(0), magicKey, true, true);
    }

    public void testPrimaryOwnerGoesDownAfterBackupRaisesEvent() throws InterruptedException, TimeoutException, ExecutionException, BrokenBarrierException {
        Cache cache = cache(0, "cluster-listener");
        Cache cache2 = cache(1, "cluster-listener");
        Cache cache3 = cache(2, "cluster-listener");
        AbstractClusterListenerUtilTest.ClusterListener clusterListener = new AbstractClusterListenerUtilTest.ClusterListener();
        cache.addListener(clusterListener);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        BlockingInterceptor blockingInterceptor = new BlockingInterceptor(cyclicBarrier, PutKeyValueCommand.class, true, false);
        cache.getAdvancedCache().getAsyncInterceptorChain().addInterceptorBefore(blockingInterceptor, EntryWrappingInterceptor.class);
        BlockingInterceptor blockingInterceptor2 = new BlockingInterceptor(cyclicBarrier, PutKeyValueCommand.class, true, false);
        cache3.getAdvancedCache().getAsyncInterceptorChain().addInterceptorBefore(blockingInterceptor2, EntryWrappingInterceptor.class);
        MagicKey magicKey = new MagicKey(cache2);
        Future fork = fork(() -> {
            return (String) cache.put(magicKey, "first-value");
        });
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        cache.getAdvancedCache().getAsyncInterceptorChain().removeInterceptor(BlockingInterceptor.class);
        cache3.getAdvancedCache().getAsyncInterceptorChain().removeInterceptor(BlockingInterceptor.class);
        blockingInterceptor.suspend(true);
        blockingInterceptor2.suspend(true);
        TestingUtil.killCacheManagers(cache2.getCacheManager());
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals("first-value", (String) fork.get(10L, TimeUnit.SECONDS));
        AssertJUnit.assertTrue(clusterListener.events.size() >= 2);
        AssertJUnit.assertTrue(clusterListener.events.size() <= 3);
        checkEvent(clusterListener.events.get(0), magicKey, true, false);
        checkEvent(clusterListener.events.get(1), magicKey, false, true);
    }
}
