package org.infinispan.scattered.statetransfer;

import java.util.Arrays;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.configuration.cache.BiasAcquisition;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.distribution.BlockingInterceptor;
import org.infinispan.distribution.MagicKey;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ControlledTransport;
import org.infinispan.statetransfer.StateRequestCommand;
import org.infinispan.statetransfer.StateResponseCommand;
import org.infinispan.statetransfer.StateTransferInterceptor;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TopologyChangeListener;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.transport.DelayedViewJGroupsTransport;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.CacheTopologyControlCommand;
import org.infinispan.util.BlockingLocalTopologyManager;
import org.infinispan.util.ControlledConsistentHashFactory;
import org.infinispan.util.ControlledRpcManager;
import org.junit.Assert;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(groups = {"functional"}, testName = "scattered.statetransfer.CoordinatorStopTest")
/* loaded from: input_file:org/infinispan/scattered/statetransfer/CoordinatorStopTest.class */
public class CoordinatorStopTest extends MultipleCacheManagersTest {
    private CountDownLatch viewLatch;
    private ControlledConsistentHashFactory.Scattered chf;

    @Override // org.infinispan.test.MultipleCacheManagersTest
    public Object[] factory() {
        return new Object[]{new CoordinatorStopTest().biasAcquisition(BiasAcquisition.NEVER), new CoordinatorStopTest().biasAcquisition(BiasAcquisition.ON_WRITE)};
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        this.chf = new ControlledConsistentHashFactory.Scattered(new int[]{0, 1, 2});
        configurationBuilder.clustering().cacheMode(CacheMode.SCATTERED_SYNC).hash().numSegments(3).consistentHashFactory(this.chf);
        if (this.biasAcquisition != null) {
            configurationBuilder.clustering().biasAcquisition(this.biasAcquisition);
        }
        addClusterEnabledCacheManager(configurationBuilder);
        this.viewLatch = new CountDownLatch(1);
        GlobalConfigurationBuilder globalConfigurationBuilder = new GlobalConfigurationBuilder();
        globalConfigurationBuilder.transport().transport(new DelayedViewJGroupsTransport(this.viewLatch));
        addClusterEnabledCacheManager(globalConfigurationBuilder, configurationBuilder);
        globalConfigurationBuilder.transport().transport(new DelayedViewJGroupsTransport(this.viewLatch));
        addClusterEnabledCacheManager(globalConfigurationBuilder, configurationBuilder);
        Assert.assertTrue(cache(0).getCacheManager().isCoordinator());
        cache(1);
        cache(2);
        waitForClusterToForm();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v17, types: [int[], int[][]] */
    public void testCoordinatorLeaves() throws InterruptedException, ExecutionException, TimeoutException, BrokenBarrierException {
        String name = cache(1).getName();
        MagicKey magicKey = new MagicKey(cache(0));
        cache(1).put(magicKey, "value");
        int topologyId = cache(1).getAdvancedCache().getDistributionManager().getCacheTopology().getTopologyId();
        BlockingLocalTopologyManager replaceTopologyManager = BlockingLocalTopologyManager.replaceTopologyManager(manager(2), name);
        ControlledTransport replace = ControlledTransport.replace(cache(0));
        ControlledTransport replace2 = ControlledTransport.replace(cache(1));
        replace.blockBefore(replicableCommand -> {
            if (!(replicableCommand instanceof CacheTopologyControlCommand)) {
                return false;
            }
            CacheTopologyControlCommand cacheTopologyControlCommand = (CacheTopologyControlCommand) replicableCommand;
            return cacheTopologyControlCommand.getCacheName().equals(name) && cacheTopologyControlCommand.getTopologyId() == topologyId + 2 && cacheTopologyControlCommand.getType() == CacheTopologyControlCommand.Type.REBALANCE_START;
        });
        replace2.blockBefore(replicableCommand2 -> {
            if (!(replicableCommand2 instanceof CacheTopologyControlCommand)) {
                return false;
            }
            CacheTopologyControlCommand cacheTopologyControlCommand = (CacheTopologyControlCommand) replicableCommand2;
            return cacheTopologyControlCommand.getCacheName().equals(name) && cacheTopologyControlCommand.getTopologyId() == topologyId + 4 && cacheTopologyControlCommand.getType() == CacheTopologyControlCommand.Type.REBALANCE_START;
        });
        ControlledRpcManager replaceRpcManager = ControlledRpcManager.replaceRpcManager(cache(2));
        replaceRpcManager.excludeCommands(StateResponseCommand.class, ClusteredGetCommand.class);
        this.chf.setOwnerIndexes(new int[]{new int[]{1}, new int[]{0}, new int[]{1}});
        this.log.infof("Stopping coordinator %s, last stable topology is %d", manager(0), Integer.valueOf(topologyId));
        Future<Void> fork = fork(() -> {
            manager(0).stop();
        });
        BlockingLocalTopologyManager.BlockedTopology expectTopologyUpdate = replaceTopologyManager.expectTopologyUpdate(CacheTopology.Phase.NO_REBALANCE, topologyId + 1);
        if (expectTopologyUpdate.getCacheTopology().getTopologyId() == topologyId + 1) {
            Assert.assertEquals(CacheTopology.Phase.NO_REBALANCE, expectTopologyUpdate.getPhase());
        }
        Assert.assertEquals(2L, expectTopologyUpdate.getCacheTopology().getActualMembers().size());
        Assert.assertEquals((Object) null, expectTopologyUpdate.getCacheTopology().getPendingCH());
        assertOwners(expectTopologyUpdate, true, 0, new Address[0]);
        assertOwners(expectTopologyUpdate, true, 1, address(1));
        assertOwners(expectTopologyUpdate, true, 2, address(2));
        expectTopologyUpdate.unblock();
        replace.stopBlocking();
        fork.get(10L, TimeUnit.SECONDS);
        BlockingLocalTopologyManager.BlockedTopology expectTopologyUpdate2 = replaceTopologyManager.expectTopologyUpdate(CacheTopology.Phase.TRANSITORY, topologyId + 2);
        Assert.assertEquals(CacheTopology.Phase.TRANSITORY, expectTopologyUpdate2.getPhase());
        Assert.assertEquals(2L, expectTopologyUpdate2.getCacheTopology().getActualMembers().size());
        Assert.assertNotNull(expectTopologyUpdate2.getCacheTopology().getPendingCH());
        assertOwners(expectTopologyUpdate2, false, 0, address(2));
        assertOwners(expectTopologyUpdate2, false, 1, address(1));
        assertOwners(expectTopologyUpdate2, false, 2, address(2));
        expectTopologyUpdate2.unblock();
        replaceRpcManager.expectCommand(StateRequestCommand.class, stateRequestCommand -> {
            Assert.assertEquals(StateRequestCommand.Type.CONFIRM_REVOKED_SEGMENTS, stateRequestCommand.getType());
        }).send().receiveAll();
        this.viewLatch.countDown();
        ControlledRpcManager.BlockedRequest expectCommand = replaceRpcManager.expectCommand(StateRequestCommand.class, stateRequestCommand2 -> {
            Assert.assertEquals(StateRequestCommand.Type.START_KEYS_TRANSFER, stateRequestCommand2.getType());
        });
        BlockingLocalTopologyManager.BlockedTopology expectTopologyUpdate3 = replaceTopologyManager.expectTopologyUpdate(CacheTopology.Phase.NO_REBALANCE, topologyId + 3);
        Assert.assertEquals(2L, expectTopologyUpdate3.getCacheTopology().getActualMembers().size());
        Assert.assertEquals((Object) null, expectTopologyUpdate3.getCacheTopology().getPendingCH());
        TopologyChangeListener install = TopologyChangeListener.install(cache(2));
        replaceTopologyManager.stopBlocking();
        expectTopologyUpdate3.unblock();
        if (expectTopologyUpdate3.getCacheTopology().getCurrentCH().locatePrimaryOwnerForSegment(0) == null) {
            replaceRpcManager.expectCommand(StateRequestCommand.class, stateRequestCommand3 -> {
                Assert.assertEquals(StateRequestCommand.Type.CANCEL_STATE_TRANSFER, stateRequestCommand3.getType());
            }).send();
        }
        install.await(10L, TimeUnit.SECONDS);
        expectCommand.send().receiveAll();
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        BlockingInterceptor blockingInterceptor = new BlockingInterceptor(cyclicBarrier, GetKeyValueCommand.class, true, true);
        cache(2).getAdvancedCache().getAsyncInterceptorChain().addInterceptorAfter(blockingInterceptor, StateTransferInterceptor.class);
        Future fork2 = fork(() -> {
            return cache(2).get(magicKey);
        });
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        blockingInterceptor.suspend(true);
        replaceRpcManager.stopBlocking();
        replace2.stopBlocking();
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals("value", fork2.get());
    }

    private void assertOwners(BlockingLocalTopologyManager.BlockedTopology blockedTopology, boolean z, int i, Address... addressArr) {
        Assert.assertEquals("Topology: " + blockedTopology.getCacheTopology(), Arrays.asList(addressArr), (z ? blockedTopology.getCacheTopology().getCurrentCH() : blockedTopology.getCacheTopology().getPendingCH()).locateOwnersForSegment(i));
    }
}
