package org.infinispan.topology;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.infinispan.commands.topology.CacheStatusRequestCommand;
import org.infinispan.commands.topology.RebalanceStartCommand;
import org.infinispan.commands.topology.RebalanceStatusRequestCommand;
import org.infinispan.commands.topology.TopologyUpdateCommand;
import org.infinispan.commands.topology.TopologyUpdateStableCommand;
import org.infinispan.configuration.ConfigurationManager;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.distribution.TestAddress;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.distribution.ch.impl.ReplicatedConsistentHashFactory;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.factories.impl.BasicComponentRegistry;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.manager.TestModuleRepository;
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifierImpl;
import org.infinispan.partitionhandling.AvailabilityMode;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.MockTransport;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.ClusterTopologyManager;
import org.infinispan.util.concurrent.CompletionStages;
import org.mockito.Mockito;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

@Test(groups = {"unit"}, testName = "topology.ClusterTopologyManagerImplTest")
/* loaded from: input_file:org/infinispan/topology/ClusterTopologyManagerImplTest.class */
public class ClusterTopologyManagerImplTest extends AbstractInfinispanTest {
    private static final String CACHE_NAME = "testCache";
    private static final Address A = new TestAddress(0, "A");
    private static final Address B = new TestAddress(1, "B");
    private ExecutorService executor = Executors.newFixedThreadPool(2, getTestThreadFactory("Executor"));
    private ExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(getTestThreadFactory("Executor"));
    private final ConsistentHashFactory<?> replicatedChf = new ReplicatedConsistentHashFactory();
    private final CacheJoinInfo joinInfoA = makeJoinInfo();
    private final CacheJoinInfo joinInfoB = makeJoinInfo();

    private CacheJoinInfo makeJoinInfo() {
        return new CacheJoinInfo(this.replicatedChf, 16, 1, 1000L, CacheMode.REPL_SYNC, 1.0f, PersistentUUID.randomUUID(), Optional.empty());
    }

    public void testClusterStartupWith2Nodes() throws Exception {
        GlobalComponentRegistry globalComponentRegistry = new GlobalComponentRegistry(GlobalConfigurationBuilder.defaultClusteredBuilder().build(), (EmbeddedCacheManager) Mockito.mock(EmbeddedCacheManager.class), Collections.emptySet(), TestModuleRepository.defaultModuleRepository(), (ConfigurationManager) Mockito.mock(ConfigurationManager.class));
        BasicComponentRegistry basicComponentRegistry = (BasicComponentRegistry) globalComponentRegistry.getComponent(BasicComponentRegistry.class);
        CacheManagerNotifierImpl cacheManagerNotifierImpl = new CacheManagerNotifierImpl();
        basicComponentRegistry.replaceComponent(CacheManagerNotifier.class.getName(), cacheManagerNotifierImpl, false);
        cacheManagerNotifierImpl.start();
        MockTransport mockTransport = new MockTransport(A);
        basicComponentRegistry.replaceComponent(Transport.class.getName(), mockTransport, false);
        basicComponentRegistry.replaceComponent(PersistentUUIDManager.class.getName(), new PersistentUUIDManagerImpl(), false);
        basicComponentRegistry.replaceComponent("org.infinispan.executors.non-blocking", this.executor, false);
        basicComponentRegistry.replaceComponent("org.infinispan.executors.timeout", this.scheduledExecutor, false);
        MockLocalTopologyManager mockLocalTopologyManager = new MockLocalTopologyManager("testCache");
        basicComponentRegistry.replaceComponent(LocalTopologyManager.class.getName(), mockLocalTopologyManager, false);
        mockTransport.init(1, Collections.singletonList(A));
        mockLocalTopologyManager.init(null, null, null, null);
        ClusterTopologyManagerImpl clusterTopologyManagerImpl = new ClusterTopologyManagerImpl();
        basicComponentRegistry.replaceComponent(ClusterTopologyManager.class.getName(), clusterTopologyManagerImpl, false);
        globalComponentRegistry.rewire();
        clusterTopologyManagerImpl.start();
        mockTransport.expectCommand(CacheStatusRequestCommand.class).finish();
        Thread.sleep(1L);
        mockTransport.verifyNoErrors();
        CacheStatusResponse cacheStatusResponse = (CacheStatusResponse) CompletionStages.join(clusterTopologyManagerImpl.handleJoin("testCache", A, this.joinInfoA, 1));
        AssertJUnit.assertEquals(1, cacheStatusResponse.getCacheTopology().getTopologyId());
        assertCHMembers(cacheStatusResponse.getCacheTopology().getCurrentCH(), A);
        AssertJUnit.assertNull(cacheStatusResponse.getCacheTopology().getPendingCH());
        mockLocalTopologyManager.handleTopologyUpdate("testCache", cacheStatusResponse.getCacheTopology(), cacheStatusResponse.getAvailabilityMode(), 1, A);
        mockLocalTopologyManager.expectTopology(1, Collections.singletonList(A), null, CacheTopology.Phase.NO_REBALANCE);
        mockTransport.expectCommand(TopologyUpdateStableCommand.class, topologyUpdateStableCommand -> {
            assertCHMembers(topologyUpdateStableCommand.getCurrentCH(), A);
            AssertJUnit.assertNull(topologyUpdateStableCommand.getPendingCH());
        }).finish();
        mockTransport.updateView(2, Arrays.asList(A, B));
        cacheManagerNotifierImpl.notifyViewChange(Arrays.asList(A, B), Collections.singletonList(A), A, 2);
        mockTransport.expectHeartBeatCommand().finish();
        AssertJUnit.assertNull((CacheStatusResponse) CompletionStages.join(clusterTopologyManagerImpl.handleJoin("testCache", B, this.joinInfoB, 1)));
        CacheStatusResponse cacheStatusResponse2 = (CacheStatusResponse) CompletionStages.join(clusterTopologyManagerImpl.handleJoin("testCache", B, this.joinInfoB, 2));
        AssertJUnit.assertEquals(1, cacheStatusResponse2.getCacheTopology().getTopologyId());
        assertCHMembers(cacheStatusResponse2.getCacheTopology().getCurrentCH(), A);
        AssertJUnit.assertNull(cacheStatusResponse2.getCacheTopology().getPendingCH());
        verifyRebalance(mockTransport, mockLocalTopologyManager, clusterTopologyManagerImpl, 2, 1, Collections.singletonList(A), Arrays.asList(A, B));
        mockTransport.verifyNoErrors();
        globalComponentRegistry.stop();
    }

    public void testCoordinatorLostDuringRebalance() throws Exception {
        GlobalComponentRegistry globalComponentRegistry = new GlobalComponentRegistry(GlobalConfigurationBuilder.defaultClusteredBuilder().build(), (EmbeddedCacheManager) Mockito.mock(EmbeddedCacheManager.class), Collections.emptySet(), TestModuleRepository.defaultModuleRepository(), (ConfigurationManager) Mockito.mock(ConfigurationManager.class));
        BasicComponentRegistry basicComponentRegistry = (BasicComponentRegistry) globalComponentRegistry.getComponent(BasicComponentRegistry.class);
        CacheManagerNotifierImpl cacheManagerNotifierImpl = new CacheManagerNotifierImpl();
        basicComponentRegistry.replaceComponent(CacheManagerNotifier.class.getName(), cacheManagerNotifierImpl, false);
        cacheManagerNotifierImpl.start();
        MockTransport mockTransport = new MockTransport(B);
        basicComponentRegistry.replaceComponent(Transport.class.getName(), mockTransport, false);
        PersistentUUIDManagerImpl persistentUUIDManagerImpl = new PersistentUUIDManagerImpl();
        basicComponentRegistry.replaceComponent(PersistentUUIDManager.class.getName(), persistentUUIDManagerImpl, false);
        basicComponentRegistry.replaceComponent("org.infinispan.executors.non-blocking", this.executor, false);
        basicComponentRegistry.replaceComponent("org.infinispan.executors.timeout", this.scheduledExecutor, false);
        MockLocalTopologyManager mockLocalTopologyManager = new MockLocalTopologyManager("testCache");
        basicComponentRegistry.replaceComponent(LocalTopologyManager.class.getName(), mockLocalTopologyManager, false);
        mockTransport.init(2, Arrays.asList(A, B));
        ConsistentHash create = this.replicatedChf.create(this.joinInfoA.getNumOwners(), this.joinInfoA.getNumSegments(), Collections.singletonList(A), (Map) null);
        mockLocalTopologyManager.init(this.joinInfoA, new CacheTopology(4, 2, create, this.replicatedChf.create(this.joinInfoA.getNumOwners(), this.joinInfoA.getNumSegments(), Arrays.asList(A, B), (Map) null), CacheTopology.Phase.READ_NEW_WRITE_ALL, Arrays.asList(A, B), Arrays.asList(this.joinInfoA.getPersistentUUID(), this.joinInfoB.getPersistentUUID())), new CacheTopology(1, 1, create, (ConsistentHash) null, CacheTopology.Phase.NO_REBALANCE, Collections.singletonList(A), Collections.singletonList(this.joinInfoA.getPersistentUUID())), AvailabilityMode.AVAILABLE);
        persistentUUIDManagerImpl.addPersistentAddressMapping(A, this.joinInfoA.getPersistentUUID());
        persistentUUIDManagerImpl.addPersistentAddressMapping(B, this.joinInfoB.getPersistentUUID());
        ClusterTopologyManagerImpl clusterTopologyManagerImpl = new ClusterTopologyManagerImpl();
        basicComponentRegistry.replaceComponent(ClusterTopologyManager.class.getName(), clusterTopologyManagerImpl, false);
        globalComponentRegistry.rewire();
        Objects.requireNonNull(clusterTopologyManagerImpl);
        runConcurrently(clusterTopologyManagerImpl::start, () -> {
            mockTransport.expectCommand(RebalanceStatusRequestCommand.class).singleResponse(A, SuccessfulResponse.create(true));
        });
        ClusterTopologyManager.ClusterManagerStatus clusterManagerStatus = ClusterTopologyManager.ClusterManagerStatus.REGULAR_MEMBER;
        Objects.requireNonNull(clusterTopologyManagerImpl);
        eventuallyEquals(clusterManagerStatus, clusterTopologyManagerImpl::getStatus);
        mockTransport.updateView(3, Collections.singletonList(B));
        cacheManagerNotifierImpl.notifyViewChange(Collections.singletonList(B), Arrays.asList(A, B), B, 3);
        mockTransport.expectCommand(CacheStatusRequestCommand.class).finish();
        mockLocalTopologyManager.expectTopology(5, Arrays.asList(A, B), null, CacheTopology.Phase.NO_REBALANCE);
        mockTransport.expectCommand(TopologyUpdateCommand.class, topologyUpdateCommand -> {
            AssertJUnit.assertEquals(5, topologyUpdateCommand.getTopologyId());
            assertCHMembers(topologyUpdateCommand.getCurrentCH(), A, B);
            AssertJUnit.assertNull(topologyUpdateCommand.getPendingCH());
        });
        mockTransport.expectCommand(TopologyUpdateStableCommand.class, topologyUpdateStableCommand -> {
            AssertJUnit.assertEquals(1, topologyUpdateStableCommand.getTopologyId());
            assertCHMembers(topologyUpdateStableCommand.getCurrentCH(), A);
            AssertJUnit.assertNull(topologyUpdateStableCommand.getPendingCH());
        });
        mockLocalTopologyManager.expectTopology(6, Collections.singletonList(B), null, CacheTopology.Phase.NO_REBALANCE);
        mockTransport.expectCommand(TopologyUpdateCommand.class, topologyUpdateCommand2 -> {
            AssertJUnit.assertEquals(6, topologyUpdateCommand2.getTopologyId());
            assertCHMembers(topologyUpdateCommand2.getCurrentCH(), B);
            AssertJUnit.assertNull(topologyUpdateCommand2.getPendingCH());
        });
        mockTransport.expectCommand(TopologyUpdateStableCommand.class, topologyUpdateStableCommand2 -> {
            AssertJUnit.assertEquals(6, topologyUpdateStableCommand2.getTopologyId());
            assertCHMembers(topologyUpdateStableCommand2.getCurrentCH(), B);
            AssertJUnit.assertNull(topologyUpdateStableCommand2.getPendingCH());
        });
        Thread.sleep(1L);
        mockTransport.verifyNoErrors();
        mockTransport.updateView(4, Arrays.asList(B, A));
        cacheManagerNotifierImpl.notifyViewChange(Arrays.asList(B, A), Collections.singletonList(B), A, 4);
        mockTransport.expectHeartBeatCommand().finish();
        clusterTopologyManagerImpl.handleJoin("testCache", A, this.joinInfoA, 4);
        verifyRebalance(mockTransport, mockLocalTopologyManager, clusterTopologyManagerImpl, 7, 4, Collections.singletonList(B), Arrays.asList(B, A));
        mockTransport.verifyNoErrors();
        globalComponentRegistry.stop();
    }

    private void verifyRebalance(MockTransport mockTransport, MockLocalTopologyManager mockLocalTopologyManager, ClusterTopologyManagerImpl clusterTopologyManagerImpl, int i, int i2, List<Address> list, List<Address> list2) throws Exception {
        mockLocalTopologyManager.expectTopology(i, list, list2, CacheTopology.Phase.READ_OLD_WRITE_ALL);
        mockTransport.expectCommand(RebalanceStartCommand.class, rebalanceStartCommand -> {
            AssertJUnit.assertEquals(i, rebalanceStartCommand.getTopologyId());
            AssertJUnit.assertEquals(CacheTopology.Phase.READ_OLD_WRITE_ALL, rebalanceStartCommand.getPhase());
            AssertJUnit.assertEquals(list, rebalanceStartCommand.getCurrentCH().getMembers());
            AssertJUnit.assertEquals(list2, rebalanceStartCommand.getPendingCH().getMembers());
        }).finish();
        clusterTopologyManagerImpl.handleRebalancePhaseConfirm("testCache", A, i, (Throwable) null, i2);
        clusterTopologyManagerImpl.handleRebalancePhaseConfirm("testCache", B, i, (Throwable) null, i2);
        mockLocalTopologyManager.expectTopology(i + 1, list, list2, CacheTopology.Phase.READ_ALL_WRITE_ALL);
        mockTransport.expectCommand(TopologyUpdateCommand.class, topologyUpdateCommand -> {
            AssertJUnit.assertEquals(i + 1, topologyUpdateCommand.getTopologyId());
            AssertJUnit.assertEquals(CacheTopology.Phase.READ_ALL_WRITE_ALL, topologyUpdateCommand.getPhase());
            AssertJUnit.assertEquals(list, topologyUpdateCommand.getCurrentCH().getMembers());
            AssertJUnit.assertEquals(list2, topologyUpdateCommand.getPendingCH().getMembers());
        }).finish();
        clusterTopologyManagerImpl.handleRebalancePhaseConfirm("testCache", A, i + 1, (Throwable) null, i2);
        clusterTopologyManagerImpl.handleRebalancePhaseConfirm("testCache", B, i + 1, (Throwable) null, i2);
        mockLocalTopologyManager.expectTopology(i + 2, list, list2, CacheTopology.Phase.READ_NEW_WRITE_ALL);
        mockTransport.expectCommand(TopologyUpdateCommand.class, topologyUpdateCommand2 -> {
            AssertJUnit.assertEquals(i + 2, topologyUpdateCommand2.getTopologyId());
            AssertJUnit.assertEquals(CacheTopology.Phase.READ_NEW_WRITE_ALL, topologyUpdateCommand2.getPhase());
            AssertJUnit.assertEquals(list, topologyUpdateCommand2.getCurrentCH().getMembers());
            AssertJUnit.assertEquals(list2, topologyUpdateCommand2.getPendingCH().getMembers());
        }).finish();
        clusterTopologyManagerImpl.handleRebalancePhaseConfirm("testCache", A, i + 2, (Throwable) null, i2);
        clusterTopologyManagerImpl.handleRebalancePhaseConfirm("testCache", B, i + 2, (Throwable) null, i2);
        mockLocalTopologyManager.expectTopology(i + 3, list2, null, CacheTopology.Phase.NO_REBALANCE);
        mockTransport.expectCommand(TopologyUpdateCommand.class, topologyUpdateCommand3 -> {
            AssertJUnit.assertEquals(i + 3, topologyUpdateCommand3.getTopologyId());
            AssertJUnit.assertEquals(CacheTopology.Phase.NO_REBALANCE, topologyUpdateCommand3.getPhase());
            AssertJUnit.assertEquals(list2, topologyUpdateCommand3.getCurrentCH().getMembers());
            AssertJUnit.assertNull(topologyUpdateCommand3.getPendingCH());
        }).finish();
        mockTransport.expectCommand(TopologyUpdateStableCommand.class, topologyUpdateStableCommand -> {
            AssertJUnit.assertEquals(i + 3, topologyUpdateStableCommand.getTopologyId());
            AssertJUnit.assertEquals(list2, topologyUpdateStableCommand.getCurrentCH().getMembers());
            AssertJUnit.assertNull(topologyUpdateStableCommand.getPendingCH());
        }).finish();
    }

    private void assertCHMembers(ConsistentHash consistentHash, Address... addressArr) {
        AssertJUnit.assertEquals(Arrays.asList(addressArr), consistentHash.getMembers());
    }

    @AfterClass(alwaysRun = true)
    public void shutdownExecutors() throws InterruptedException {
        this.executor.shutdownNow();
        AssertJUnit.assertTrue(this.executor.awaitTermination(10L, TimeUnit.SECONDS));
        this.scheduledExecutor.shutdownNow();
        AssertJUnit.assertTrue(this.scheduledExecutor.awaitTermination(10L, TimeUnit.SECONDS));
    }
}
