package org.infinispan.notifications.cachelistener.cluster;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.Cache;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.conflict.EntryMergePolicy;
import org.infinispan.distribution.MagicKey;
import org.infinispan.notifications.cachelistener.cluster.AbstractClusterListenerUtilTest;
import org.infinispan.statetransfer.StateProvider;
import org.infinispan.test.TestDataSCI;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.util.concurrent.IsolationLevel;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = {"functional"})
/* loaded from: input_file:org/infinispan/notifications/cachelistener/cluster/AbstractClusterListenerDistAddListenerTest.class */
public abstract class AbstractClusterListenerDistAddListenerTest extends AbstractClusterListenerUtilTest {
    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractClusterListenerDistAddListenerTest(boolean z) {
        super(z, CacheMode.DIST_SYNC);
    }

    @Override // org.infinispan.notifications.cachelistener.cluster.AbstractClusterListenerUtilTest, org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        this.builderUsed = new ConfigurationBuilder();
        this.builderUsed.clustering().cacheMode(this.cacheMode).partitionHandling().mergePolicy((EntryMergePolicy) null);
        if (this.tx) {
            this.builderUsed.transaction().transactionMode(TransactionMode.TRANSACTIONAL);
            this.builderUsed.locking().isolationLevel(IsolationLevel.READ_COMMITTED);
        }
        this.builderUsed.expiration().disableReaper();
        createClusteredCaches(3, "cluster-listener", TestDataSCI.INSTANCE, this.builderUsed);
        injectTimeServices();
    }

    @Test
    public void testMemberJoinsWhileClusterListenerInstalled() throws TimeoutException, InterruptedException, ExecutionException {
        Cache<?, ?> cache = cache(0, "cluster-listener");
        Cache cache2 = cache(1, "cluster-listener");
        CheckPoint checkPoint = new CheckPoint();
        waitUntilListenerInstalled(cache, checkPoint);
        checkPoint.triggerForever("post_add_listener_release_" + cache);
        AbstractClusterListenerUtilTest.ClusterListener clusterListener = new AbstractClusterListenerUtilTest.ClusterListener();
        Future fork = fork(() -> {
            cache2.addListener(clusterListener);
            return null;
        });
        checkPoint.awaitStrict("pre_add_listener_invoked_" + cache, 10L, TimeUnit.SECONDS);
        addClusteredCacheManager();
        waitForClusterToForm("cluster-listener");
        Cache<Object, String> cache3 = cache(3, "cluster-listener");
        checkPoint.triggerForever("pre_add_listener_release_" + cache);
        fork.get(10L, TimeUnit.SECONDS);
        verifySimpleInsertion(cache3, new MagicKey(cache3), "first-value", null, clusterListener, "first-value");
    }

    @Test
    public void testMemberJoinsWhileClusterListenerInstalledDuplicate() throws TimeoutException, InterruptedException, ExecutionException {
        Cache<?, ?> cache = cache(0, "cluster-listener");
        Cache cache2 = cache(1, "cluster-listener");
        CheckPoint checkPoint = new CheckPoint();
        waitUntilListenerInstalled(cache, checkPoint);
        checkPoint.triggerForever("pre_add_listener_release_" + cache);
        AbstractClusterListenerUtilTest.ClusterListener clusterListener = new AbstractClusterListenerUtilTest.ClusterListener(this);
        Future fork = fork(() -> {
            cache2.addListener(clusterListener);
            return null;
        });
        checkPoint.awaitStrict("post_add_listener_invoked_" + cache, 10L, TimeUnit.SECONDS);
        addClusteredCacheManager();
        waitForClusterToForm("cluster-listener");
        Cache<Object, String> cache3 = cache(3, "cluster-listener");
        checkPoint.triggerForever("post_add_listener_release_" + cache);
        fork.get(10L, TimeUnit.SECONDS);
        verifySimpleInsertion(cache3, new MagicKey(cache3), "first-value", null, clusterListener, "first-value");
    }

    @Test
    public void testMemberJoinsAndRetrievesClusterListenersButMainListenerNodeDiesBeforeInstalled() throws TimeoutException, InterruptedException, ExecutionException {
        Cache<?, ?> cache = cache(0, "cluster-listener");
        cache(1, "cluster-listener").addListener(new AbstractClusterListenerUtilTest.ClusterListener(this));
        Assert.assertEquals(manager(0).getAddress(), manager(0).getMembers().get(0));
        CheckPoint checkPoint = new CheckPoint();
        waitUntilRequestingListeners(cache, checkPoint);
        checkPoint.triggerForever("pre_cluster_listeners_release_" + cache);
        addClusteredCacheManager();
        Future fork = fork(() -> {
            return cache(3, "cluster-listener");
        });
        checkPoint.awaitStrict("post_cluster_listeners_invoked_" + cache, 10L, TimeUnit.SECONDS);
        log.info("Killing node 1 ..");
        TestingUtil.killCacheManagers(manager(1));
        this.cacheManagers.remove(1);
        log.info("Node 1 killed");
        checkPoint.triggerForever("post_cluster_listeners_release_" + cache);
        TestingUtil.blockUntilViewsReceived(10000, false, (Collection<?>) this.cacheManagers);
        TestingUtil.waitForNoRebalance(caches("cluster-listener"));
        Iterator it = ((Cache) fork.get(10L, TimeUnit.SECONDS)).getAdvancedCache().getListeners().iterator();
        while (it.hasNext()) {
            Assert.assertFalse(it.next() instanceof RemoteClusterListener);
        }
    }

    @Test
    public void testNodeJoiningAndStateNodeDiesWithExistingClusterListener() throws TimeoutException, InterruptedException, ExecutionException {
        Cache<?, ?> cache = cache(0, "cluster-listener");
        Cache cache2 = cache(1, "cluster-listener");
        Cache cache3 = cache(2, "cluster-listener");
        int size = cache.getAdvancedCache().getListeners().size();
        int size2 = cache2.getAdvancedCache().getListeners().size();
        int size3 = cache3.getAdvancedCache().getListeners().size();
        AbstractClusterListenerUtilTest.ClusterListener clusterListener = new AbstractClusterListenerUtilTest.ClusterListener(this);
        cache3.addListener(clusterListener);
        Assert.assertEquals(cache.getAdvancedCache().getListeners().size(), size + (this.cacheMode.isDistributed() ? 1 : 0));
        Assert.assertEquals(cache2.getAdvancedCache().getListeners().size(), size2 + (this.cacheMode.isDistributed() ? 1 : 0));
        Assert.assertEquals(cache3.getAdvancedCache().getListeners().size(), size3 + 1);
        Assert.assertEquals(manager(0).getAddress(), manager(0).getMembers().get(0));
        CheckPoint checkPoint = new CheckPoint();
        waitUntilRequestingListeners(cache, checkPoint);
        checkPoint.triggerForever("post_cluster_listeners_release_" + cache);
        addClusteredCacheManager();
        Future fork = fork(() -> {
            return cache(3, "cluster-listener");
        });
        checkPoint.awaitStrict("pre_cluster_listeners_invoked_" + cache, 10L, TimeUnit.SECONDS);
        log.info("Killing node 0 ..");
        TestingUtil.killCacheManagers(manager(0));
        this.cacheManagers.remove(0);
        log.info("Node 0 killed");
        TestingUtil.blockUntilViewsReceived(10000, false, (Collection<?>) this.cacheManagers);
        TestingUtil.waitForNoRebalance(caches("cluster-listener"));
        checkPoint.triggerForever("pre_cluster_listeners_invoked_" + cache);
        Cache<Object, String> cache4 = (Cache) fork.get(10L, TimeUnit.SECONDS);
        verifySimpleInsertion(cache4, new MagicKey(cache4), "first-value", null, clusterListener, "first-value");
    }

    @Test(enabled = false, description = "Test may not be doable, check TODO in test")
    public void testNodeJoiningAndStateNodeDiesWhichHasClusterListener() throws TimeoutException, InterruptedException, ExecutionException {
        Cache<?, ?> cache = cache(0, "cluster-listener");
        Cache cache2 = cache(1, "cluster-listener");
        Cache cache3 = cache(2, "cluster-listener");
        int size = cache.getAdvancedCache().getListeners().size();
        int size2 = cache2.getAdvancedCache().getListeners().size();
        int size3 = cache3.getAdvancedCache().getListeners().size();
        cache.addListener(new AbstractClusterListenerUtilTest.ClusterListener(this));
        Assert.assertEquals(cache.getAdvancedCache().getListeners().size(), size + 1);
        Assert.assertEquals(cache2.getAdvancedCache().getListeners().size(), size2 + 1);
        Assert.assertEquals(cache3.getAdvancedCache().getListeners().size(), size3 + 1);
        Assert.assertEquals(manager(0).getAddress(), manager(0).getMembers().get(0));
        CheckPoint checkPoint = new CheckPoint();
        waitUntilRequestingListeners(cache, checkPoint);
        checkPoint.triggerForever("post_cluster_listeners_release_" + cache);
        waitUntilViewChangeOccurs(manager(1), "manager1", checkPoint);
        checkPoint.trigger("pre_view_listener_release_manager1");
        addClusteredCacheManager();
        waitUntilViewChangeOccurs(manager(3), "manager3", checkPoint);
        checkPoint.trigger("pre_view_listener_release_manager3");
        Future fork = fork(() -> {
            return cache(3, "cluster-listener");
        });
        checkPoint.awaitStrict("post_view_listener_invoked_manager1", 10L, TimeUnit.SECONDS);
        checkPoint.awaitStrict("pre_cluster_listeners_invoked_" + cache, 10L, TimeUnit.SECONDS);
        log.info("Killing node 0 ..");
        TestingUtil.killCacheManagers(manager(0));
        this.cacheManagers.remove(0);
        log.info("Node 0 killed");
        Cache cache4 = (Cache) fork.get(10L, TimeUnit.SECONDS);
        checkPoint.triggerForever("pre_view_listener_release_manager1");
        TestingUtil.blockUntilViewsReceived(60000L, false, (Cache<?, ?>[]) new Cache[]{cache2, cache3});
        TestingUtil.waitForNoRebalance(cache2, cache3);
        cache4.put(new MagicKey(cache4), "first-value");
        Assert.assertEquals(cache2.getAdvancedCache().getListeners().size(), size2);
        Assert.assertEquals(cache3.getAdvancedCache().getListeners().size(), size3);
        Iterator it = cache4.getAdvancedCache().getListeners().iterator();
        while (it.hasNext()) {
            Assert.assertFalse(it.next() instanceof RemoteClusterListener);
        }
    }

    protected void waitUntilRequestingListeners(Cache<?, ?> cache, CheckPoint checkPoint) {
        Answer delegatesTo = AdditionalAnswers.delegatesTo((StateProvider) TestingUtil.extractComponent(cache, StateProvider.class));
        StateProvider stateProvider = (StateProvider) Mockito.mock(StateProvider.class, Mockito.withSettings().defaultAnswer(delegatesTo));
        ((StateProvider) Mockito.doAnswer(invocationOnMock -> {
            checkPoint.trigger("pre_cluster_listeners_invoked_" + cache);
            checkPoint.awaitStrict("pre_cluster_listeners_release_" + cache, 10L, TimeUnit.SECONDS);
            try {
                Object answer = delegatesTo.answer(invocationOnMock);
                checkPoint.trigger("post_cluster_listeners_invoked_" + cache);
                checkPoint.awaitStrict("post_cluster_listeners_release_" + cache, 10L, TimeUnit.SECONDS);
                return answer;
            } catch (Throwable th) {
                checkPoint.trigger("post_cluster_listeners_invoked_" + cache);
                checkPoint.awaitStrict("post_cluster_listeners_release_" + cache, 10L, TimeUnit.SECONDS);
                throw th;
            }
        }).when(stateProvider)).getClusterListenersToInstall();
        TestingUtil.replaceComponent(cache, (Class<? extends StateProvider>) StateProvider.class, stateProvider, true);
    }
}
