package org.infinispan.statetransfer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.Cache;
import org.infinispan.commons.util.Util;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.partitionhandling.AvailabilityMode;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.fwk.InTransactionMode;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.test.fwk.TransportFlags;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.LocalTopologyManager;
import org.infinispan.transaction.TransactionMode;
import org.jgroups.protocols.DISCARD;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(groups = {"functional"}, testName = "statetransfer.ClusterTopologyManagerTest")
/* loaded from: input_file:org/infinispan/statetransfer/ClusterTopologyManagerTest.class */
public class ClusterTopologyManagerTest extends MultipleCacheManagersTest {
    public static final String CACHE_NAME = "testCache";
    private static final String OTHER_CACHE_NAME = "other_cache";
    private ConfigurationBuilder defaultConfig;
    Cache c1;
    Cache c2;
    Cache c3;
    DISCARD d1;
    DISCARD d2;
    DISCARD d3;
    static final /* synthetic */ boolean $assertionsDisabled;

    @Override // org.infinispan.test.MultipleCacheManagersTest
    public Object[] factory() {
        return new Object[]{new ClusterTopologyManagerTest().cacheMode(CacheMode.DIST_SYNC).transactional(true), new ClusterTopologyManagerTest().cacheMode(CacheMode.SCATTERED_SYNC).transactional(false)};
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        this.defaultConfig = getDefaultClusteredCacheConfig(this.cacheMode, this.transactional.booleanValue());
        createClusteredCaches(3, this.defaultConfig, new TransportFlags().withFD(true).withMerge(true));
        defineConfigurationOnAllManagers("testCache", this.defaultConfig);
        this.c1 = cache(0, "testCache");
        this.c2 = cache(1, "testCache");
        this.c3 = cache(2, "testCache");
        this.d1 = TestingUtil.getDiscardForCache(this.c1);
        this.d2 = TestingUtil.getDiscardForCache(this.c2);
        this.d3 = TestingUtil.getDiscardForCache(this.c3);
    }

    public void testNodeAbruptLeave() throws Exception {
        ConfigurationBuilder read = new ConfigurationBuilder().read(manager(0).getDefaultCacheConfiguration());
        defineConfigurationOnAllManagers("cache2", read);
        defineConfigurationOnAllManagers("cache3", read);
        defineConfigurationOnAllManagers("cache4", read);
        defineConfigurationOnAllManagers("cache5", read);
        cache(0, "cache2");
        cache(1, "cache2");
        cache(0, "cache3");
        cache(2, "cache3");
        cache(1, "cache4");
        cache(2, "cache4");
        cache(0, "cache5");
        cache(1, "cache5");
        this.log.debugf("Killing coordinator via discard", new Object[0]);
        this.d3.setDiscardAll(true);
        long currentTimeMillis = System.currentTimeMillis();
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c1, this.c2);
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c3);
        TestingUtil.waitForNoRebalance(this.c1, this.c2);
        TestingUtil.waitForNoRebalance(this.c3);
        TestingUtil.waitForNoRebalance(cache(0, "cache2"), cache(1, "cache2"));
        TestingUtil.waitForNoRebalance(cache(0, "cache3"));
        TestingUtil.waitForNoRebalance(cache(1, "cache4"));
        TestingUtil.waitForNoRebalance(cache(0, "cache5"), cache(1, "cache5"));
        long currentTimeMillis2 = System.currentTimeMillis();
        this.log.debugf("Recovery took %s", Util.prettyPrintTime(currentTimeMillis2 - currentTimeMillis));
        if (!$assertionsDisabled && currentTimeMillis2 - currentTimeMillis >= 30000) {
            throw new AssertionError("Recovery took too long: " + Util.prettyPrintTime(currentTimeMillis2 - currentTimeMillis));
        }
        EmbeddedCacheManager addClusterEnabledCacheManager = addClusterEnabledCacheManager(this.defaultConfig, new TransportFlags().withFD(true).withMerge(true));
        addClusterEnabledCacheManager.defineConfiguration("testCache", this.defaultConfig.build());
        Cache cache = cache(3, "testCache");
        TestingUtil.blockUntilViewsReceived(30000L, true, this.c1, this.c2, cache);
        TestingUtil.waitForNoRebalance(this.c1, this.c2, cache);
        addClusterEnabledCacheManager.defineConfiguration("cache2", this.defaultConfig.build());
        addClusterEnabledCacheManager.defineConfiguration("cache3", this.defaultConfig.build());
        addClusterEnabledCacheManager.defineConfiguration("cache4", this.defaultConfig.build());
        addClusterEnabledCacheManager.defineConfiguration("cache5", this.defaultConfig.build());
        cache(3, "cache2");
        cache(3, "cache3");
        cache(3, "cache4");
        cache(3, "cache5");
        TestingUtil.waitForNoRebalance(cache(0, "cache2"), cache(1, "cache2"), cache(3, "cache2"));
        TestingUtil.waitForNoRebalance(cache(0, "cache3"), cache(3, "cache3"));
        TestingUtil.waitForNoRebalance(cache(1, "cache4"), cache(3, "cache4"));
        TestingUtil.waitForNoRebalance(cache(0, "cache5"), cache(1, "cache5"), cache(3, "cache5"));
    }

    public void testClusterRecoveryAfterCoordLeave() throws Exception {
        this.log.debugf("Killing coordinator via discard", new Object[0]);
        this.d1.setDiscardAll(true);
        long currentTimeMillis = System.currentTimeMillis();
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c1);
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c2, this.c3);
        TestingUtil.waitForNoRebalance(this.c1);
        TestingUtil.waitForNoRebalance(this.c2, this.c3);
        long currentTimeMillis2 = System.currentTimeMillis();
        this.log.debugf("Recovery took %s", Util.prettyPrintTime(currentTimeMillis2 - currentTimeMillis));
        if (!$assertionsDisabled && currentTimeMillis2 - currentTimeMillis >= 30000) {
            throw new AssertionError("Recovery took too long: " + Util.prettyPrintTime(currentTimeMillis2 - currentTimeMillis));
        }
        addClusterEnabledCacheManager(this.defaultConfig, new TransportFlags().withFD(true).withMerge(true));
        Cache cache = cache(3, "testCache");
        TestingUtil.blockUntilViewsReceived(30000L, true, this.c2, this.c3, cache);
        TestingUtil.waitForNoRebalance(this.c2, this.c3, cache);
    }

    public void testClusterRecoveryAfterThreeWaySplit() throws Exception {
        this.log.debugf("Splitting the cluster in three", new Object[0]);
        this.d1.setDiscardAll(true);
        this.d2.setDiscardAll(true);
        this.d3.setDiscardAll(true);
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c1);
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c2);
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c3);
        TestingUtil.waitForNoRebalance(this.c1);
        TestingUtil.waitForNoRebalance(this.c2);
        TestingUtil.waitForNoRebalance(this.c3);
        this.log.debugf("Merging the cluster partitions", new Object[0]);
        this.d1.setDiscardAll(false);
        this.d2.setDiscardAll(false);
        this.d3.setDiscardAll(false);
        long currentTimeMillis = System.currentTimeMillis();
        TestingUtil.blockUntilViewsReceived(60000L, this.c1, this.c2, this.c3);
        TestingUtil.waitForNoRebalance(this.c1, this.c2, this.c3);
        long currentTimeMillis2 = System.currentTimeMillis();
        this.log.debugf("Merge took %s", Util.prettyPrintTime(currentTimeMillis2 - currentTimeMillis));
        if (!$assertionsDisabled && currentTimeMillis2 - currentTimeMillis >= 30000) {
            throw new AssertionError("Merge took too long: " + Util.prettyPrintTime(currentTimeMillis2 - currentTimeMillis));
        }
        addClusterEnabledCacheManager(this.defaultConfig, new TransportFlags().withFD(true).withMerge(true));
        Cache cache = cache(3, "testCache");
        TestingUtil.blockUntilViewsReceived(30000L, true, this.c1, this.c2, this.c3, cache);
        TestingUtil.waitForNoRebalance(this.c1, this.c2, this.c3, cache);
    }

    public void testClusterRecoveryAfterSplitAndCoordLeave() throws Exception {
        this.log.debugf("Splitting the cluster in three", new Object[0]);
        this.d1.setDiscardAll(true);
        this.d2.setDiscardAll(true);
        this.d3.setDiscardAll(true);
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c1);
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c2);
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c3);
        TestingUtil.waitForNoRebalance(this.c1);
        TestingUtil.waitForNoRebalance(this.c2);
        TestingUtil.waitForNoRebalance(this.c3);
        manager(0).stop();
        this.log.debugf("Merging the cluster partitions", new Object[0]);
        this.d2.setDiscardAll(false);
        this.d3.setDiscardAll(false);
        long currentTimeMillis = System.currentTimeMillis();
        TestingUtil.blockUntilViewsReceived(30000L, this.c2, this.c3);
        TestingUtil.waitForNoRebalance(this.c2, this.c3);
        long currentTimeMillis2 = System.currentTimeMillis();
        this.log.debugf("Merge took %s", Util.prettyPrintTime(currentTimeMillis2 - currentTimeMillis));
        if (!$assertionsDisabled && currentTimeMillis2 - currentTimeMillis >= 30000) {
            throw new AssertionError("Merge took too long: " + Util.prettyPrintTime(currentTimeMillis2 - currentTimeMillis));
        }
        addClusterEnabledCacheManager(this.defaultConfig, new TransportFlags().withFD(true).withMerge(true));
        Cache cache = cache(3, "testCache");
        TestingUtil.blockUntilViewsReceived(30000L, true, this.c2, this.c3, cache);
        TestingUtil.waitForNoRebalance(this.c2, this.c3, cache);
    }

    public void testClusterRecoveryWithRebalance() throws Exception {
        ArrayList arrayList = new ArrayList(manager(0).getMembers());
        Collections.sort(arrayList);
        Address address = (Address) arrayList.get(0);
        this.log.debugf("The merge coordinator will be %s", address);
        EmbeddedCacheManager manager = manager(address);
        int indexOf = this.cacheManagers.indexOf(manager);
        this.log.debugf("Splitting the cluster in three", new Object[0]);
        this.d1.setDiscardAll(true);
        this.d2.setDiscardAll(true);
        this.d3.setDiscardAll(true);
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c1);
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c2);
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c3);
        TestingUtil.waitForNoRebalance(this.c1);
        TestingUtil.waitForNoRebalance(this.c2);
        TestingUtil.waitForNoRebalance(this.c3);
        if (indexOf == 0) {
            this.d1.setDiscardAll(false);
        }
        if (indexOf == 1) {
            this.d2.setDiscardAll(false);
        }
        if (indexOf == 2) {
            this.d3.setDiscardAll(false);
        }
        manager.getTransport().getViewId();
        CheckPoint checkPoint = new CheckPoint();
        blockRebalanceStart(manager, checkPoint, 2);
        EmbeddedCacheManager addClusterEnabledCacheManager = addClusterEnabledCacheManager(this.defaultConfig, new TransportFlags().withFD(true).withMerge(true));
        blockRebalanceStart(addClusterEnabledCacheManager, checkPoint, 2);
        addClusterEnabledCacheManager.defineConfiguration("testCache", this.defaultConfig.build());
        addClusterEnabledCacheManager.defineConfiguration(OTHER_CACHE_NAME, this.defaultConfig.build());
        addClusterEnabledCacheManager.getCache(OTHER_CACHE_NAME);
        Future fork = fork(() -> {
            return addClusterEnabledCacheManager.getCache("testCache");
        });
        this.log.debugf("Waiting for the REBALANCE_START command to reach the merge coordinator", new Object[0]);
        checkPoint.awaitStrict("rebalance_" + Arrays.asList(address, addClusterEnabledCacheManager.getAddress()), 10L, TimeUnit.SECONDS);
        this.log.debugf("Merging the cluster partitions", new Object[0]);
        this.d1.setDiscardAll(false);
        this.d2.setDiscardAll(false);
        this.d3.setDiscardAll(false);
        long currentTimeMillis = System.currentTimeMillis();
        TestingUtil.blockUntilViewsReceived(TestCacheManagerFactory.KEEP_ALIVE, this.cacheManagers);
        TestingUtil.waitForNoRebalance(caches("testCache"));
        this.log.debugf("Unblocking the REBALANCE_START command on the coordinator", new Object[0]);
        checkPoint.triggerForever("merge");
        Cache cache = (Cache) fork.get(30L, TimeUnit.SECONDS);
        TestingUtil.waitForNoRebalance(this.c1, this.c2, this.c3, cache);
        long currentTimeMillis2 = System.currentTimeMillis();
        this.log.debugf("Merge took %s", Util.prettyPrintTime(currentTimeMillis2 - currentTimeMillis));
        if (!$assertionsDisabled && currentTimeMillis2 - currentTimeMillis >= 30000) {
            throw new AssertionError("Merge took too long: " + Util.prettyPrintTime(currentTimeMillis2 - currentTimeMillis));
        }
        EmbeddedCacheManager addClusterEnabledCacheManager2 = addClusterEnabledCacheManager(this.defaultConfig, new TransportFlags().withFD(true).withMerge(true));
        addClusterEnabledCacheManager2.defineConfiguration("testCache", this.defaultConfig.build());
        Cache cache2 = addClusterEnabledCacheManager2.getCache("testCache");
        TestingUtil.blockUntilViewsReceived(30000L, true, this.c1, this.c2, this.c3, cache, cache2);
        TestingUtil.waitForNoRebalance(this.c1, this.c2, this.c3, cache, cache2);
    }

    protected void blockRebalanceStart(EmbeddedCacheManager embeddedCacheManager, CheckPoint checkPoint, int i) throws InterruptedException {
        LocalTopologyManager localTopologyManager = (LocalTopologyManager) Mockito.spy((LocalTopologyManager) TestingUtil.extractGlobalComponent(embeddedCacheManager, LocalTopologyManager.class));
        ((LocalTopologyManager) Mockito.doAnswer(invocationOnMock -> {
            List members = ((CacheTopology) invocationOnMock.getArguments()[1]).getMembers();
            checkPoint.trigger("rebalance_" + members);
            if (members.size() == i) {
                this.log.debugf("Blocking the REBALANCE_START command with members %s on %s", members, embeddedCacheManager.getAddress());
                checkPoint.awaitStrict("merge", 30L, TimeUnit.SECONDS);
            }
            return invocationOnMock.callRealMethod();
        }).when(localTopologyManager)).handleRebalance((String) Matchers.eq("testCache"), (CacheTopology) Matchers.any(CacheTopology.class), Matchers.anyInt(), (Address) Matchers.any(Address.class));
        TestingUtil.replaceComponent((CacheContainer) embeddedCacheManager, (Class<LocalTopologyManager>) LocalTopologyManager.class, localTopologyManager, true);
    }

    public void testAbruptLeaveAfterGetStatus() throws TimeoutException, InterruptedException {
        LocalTopologyManager localTopologyManager = (LocalTopologyManager) TestingUtil.extractGlobalComponent(manager(1), LocalTopologyManager.class);
        CheckPoint checkPoint = new CheckPoint();
        LocalTopologyManager localTopologyManager2 = (LocalTopologyManager) Mockito.spy(localTopologyManager);
        localTopologyManager.getCacheTopology("testCache");
        ((LocalTopologyManager) Mockito.doAnswer(invocationOnMock -> {
            checkPoint.trigger("GET_STATUS_" + ((Integer) invocationOnMock.getArguments()[0]).intValue());
            this.log.debugf("Blocking the GET_STATUS command on the new coordinator", new Object[0]);
            checkPoint.awaitStrict("3 left", 10L, TimeUnit.SECONDS);
            return invocationOnMock.callRealMethod();
        }).when(localTopologyManager2)).handleStatusRequest(Matchers.anyInt());
        TestingUtil.replaceComponent((CacheContainer) manager(1), (Class<LocalTopologyManager>) LocalTopologyManager.class, localTopologyManager2, true);
        this.log.debugf("Killing coordinator", new Object[0]);
        manager(0).stop();
        TestingUtil.blockUntilViewsReceived(30000L, false, manager(1), manager(2));
        checkPoint.awaitStrict("GET_STATUS_" + manager(1).getTransport().getViewId(), 10L, TimeUnit.SECONDS);
        this.d3.setDiscardAll(true);
        manager(2).stop();
        TestingUtil.blockUntilViewsReceived(30000L, false, manager(1));
        checkPoint.triggerForever("3 left");
        TestingUtil.waitForNoRebalance(this.c2);
    }

    public void testAbruptLeaveAfterGetStatus2() throws TimeoutException, InterruptedException {
        LocalTopologyManager localTopologyManager = (LocalTopologyManager) TestingUtil.extractGlobalComponent(manager(1), LocalTopologyManager.class);
        CheckPoint checkPoint = new CheckPoint();
        LocalTopologyManager localTopologyManager2 = (LocalTopologyManager) Mockito.spy(localTopologyManager);
        CacheTopology cacheTopology = localTopologyManager.getCacheTopology("testCache");
        ((LocalTopologyManager) Mockito.doAnswer(invocationOnMock -> {
            checkPoint.trigger("GET_STATUS_" + ((Integer) invocationOnMock.getArguments()[0]).intValue());
            this.log.debugf("Blocking the GET_STATUS command on the new coordinator", new Object[0]);
            checkPoint.awaitStrict("3 left", 10L, TimeUnit.SECONDS);
            return invocationOnMock.callRealMethod();
        }).when(localTopologyManager2)).handleStatusRequest(Matchers.anyInt());
        ((LocalTopologyManager) Mockito.doAnswer(invocationOnMock2 -> {
            CacheTopology cacheTopology2 = (CacheTopology) invocationOnMock2.getArguments()[1];
            if (cacheTopology2.getRebalanceId() != cacheTopology.getRebalanceId() + 1) {
                return invocationOnMock2.callRealMethod();
            }
            this.log.debugf("Discarding CH update command %s", cacheTopology2);
            return null;
        }).when(localTopologyManager2)).handleTopologyUpdate((String) Matchers.eq("testCache"), (CacheTopology) Matchers.any(CacheTopology.class), (AvailabilityMode) Matchers.any(AvailabilityMode.class), Matchers.anyInt(), (Address) Matchers.any(Address.class));
        ((LocalTopologyManager) Mockito.doAnswer(invocationOnMock3 -> {
            CacheTopology cacheTopology2 = (CacheTopology) invocationOnMock3.getArguments()[1];
            if (cacheTopology2.getRebalanceId() != cacheTopology.getRebalanceId() + 2) {
                return invocationOnMock3.callRealMethod();
            }
            this.log.debugf("Discarding rebalance command %s", cacheTopology2);
            return null;
        }).when(localTopologyManager2)).handleRebalance((String) Matchers.eq("testCache"), (CacheTopology) Matchers.any(CacheTopology.class), Matchers.anyInt(), (Address) Matchers.any(Address.class));
        TestingUtil.replaceComponent((CacheContainer) manager(1), (Class<LocalTopologyManager>) LocalTopologyManager.class, localTopologyManager2, true);
        this.log.debugf("Killing coordinator", new Object[0]);
        manager(0).stop();
        TestingUtil.blockUntilViewsReceived(30000L, false, manager(1), manager(2));
        checkPoint.awaitStrict("GET_STATUS_" + manager(1).getTransport().getViewId(), 10L, TimeUnit.SECONDS);
        this.d3.setDiscardAll(true);
        manager(2).stop();
        TestingUtil.blockUntilViewsReceived(30000L, false, manager(1));
        checkPoint.triggerForever("3 left");
        TestingUtil.waitForNoRebalance(this.c2);
    }

    @InTransactionMode({TransactionMode.TRANSACTIONAL})
    public void testLeaveDuringGetTransactions() throws InterruptedException, TimeoutException {
        CheckPoint checkPoint = new CheckPoint();
        StateProvider stateProvider = (StateProvider) Mockito.spy((StateProvider) TestingUtil.extractComponent(this.c2, StateProvider.class));
        ((StateProvider) Mockito.doAnswer(invocationOnMock -> {
            int intValue = ((Integer) invocationOnMock.getArguments()[1]).intValue();
            checkPoint.trigger("GET_TRANSACTIONS");
            this.log.debugf("Blocking the GET_TRANSACTIONS(%d) command on the %s", intValue, this.c2);
            checkPoint.awaitStrict("LEAVE", 10L, TimeUnit.SECONDS);
            return invocationOnMock.callRealMethod();
        }).when(stateProvider)).getTransactionsForSegments((Address) Matchers.any(Address.class), Matchers.anyInt(), Matchers.anySet());
        TestingUtil.replaceComponent((Cache<?, ?>) this.c2, (Class<StateProvider>) StateProvider.class, stateProvider, true);
        long currentTimeMillis = System.currentTimeMillis();
        manager(2).stop();
        checkPoint.awaitStrict("GET_TRANSACTIONS", 10L, TimeUnit.SECONDS);
        manager(1).stop();
        checkPoint.trigger("LEAVE");
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c1);
        TestingUtil.waitForNoRebalance(this.c1);
        long currentTimeMillis2 = System.currentTimeMillis();
        this.log.debugf("Recovery took %s", Util.prettyPrintTime(currentTimeMillis2 - currentTimeMillis));
        if (!$assertionsDisabled && currentTimeMillis2 - currentTimeMillis >= 30000) {
            throw new AssertionError("Recovery took too long: " + Util.prettyPrintTime(currentTimeMillis2 - currentTimeMillis));
        }
    }

    public void testJoinerBecomesOnlyMember() {
        killMember(2, "testCache");
        defineConfigurationOnAllManagers(OTHER_CACHE_NAME, new ConfigurationBuilder().read(manager(0).getDefaultCacheConfiguration()));
        this.d2.setDiscardAll(true);
        fork(() -> {
            return cache(1, OTHER_CACHE_NAME);
        });
        TestingUtil.blockUntilViewsReceived(30000L, false, manager(1));
        TestingUtil.waitForNoRebalance(cache(1, OTHER_CACHE_NAME));
    }

    static {
        $assertionsDisabled = !ClusterTopologyManagerTest.class.desiredAssertionStatus();
    }
}
