package org.infinispan.distribution.rehash;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.container.DataContainer;
import org.infinispan.distribution.MagicKey;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.InboundInvocationHandler;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.jgroups.CommandAwareRpcDispatcher;
import org.infinispan.statetransfer.StateChunk;
import org.infinispan.statetransfer.StateRequestCommand;
import org.infinispan.statetransfer.StateResponseCommand;
import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.tx.dld.ControlledRpcManager;
import org.jgroups.blocks.Response;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(groups = {"functional"}, testName = "distribution.rehash.StateResponseOrderingTest")
/* loaded from: input_file:org/infinispan/distribution/rehash/StateResponseOrderingTest.class */
public class StateResponseOrderingTest extends MultipleCacheManagersTest {
    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder defaultCacheConfiguration = TestCacheManagerFactory.getDefaultCacheConfiguration(true);
        defaultCacheConfiguration.clustering().cacheMode(CacheMode.DIST_SYNC).hash().numOwners(3);
        createCluster(defaultCacheConfiguration, 4);
        waitForClusterToForm();
    }

    public void testOldStateResponse() throws Throwable {
        MagicKey magicKey = new MagicKey("k1", (Cache<?, ?>) cache(1));
        MagicKey magicKey2 = new MagicKey("k2", (Cache<?, ?>) cache(2));
        MagicKey magicKey3 = new MagicKey("k3", (Cache<?, ?>) cache(3));
        cache(1).put(magicKey, "v1");
        cache(2).put(magicKey2, "v2");
        cache(3).put(magicKey3, "v3");
        final StateTransferManager stateTransferManager = cache(0).getAdvancedCache().getComponentRegistry().getStateTransferManager();
        int topologyId = stateTransferManager.getCacheTopology().getTopologyId();
        ControlledRpcManager controlledRpcManager = new ControlledRpcManager((RpcManager) TestingUtil.extractComponent(cache(0), RpcManager.class));
        controlledRpcManager.blockBefore(StateRequestCommand.class);
        TestingUtil.replaceComponent((Cache<?, ?>) cache(0), (Class<ControlledRpcManager>) RpcManager.class, controlledRpcManager, true);
        cache(3).stop();
        eventually(new AbstractInfinispanTest.Condition() { // from class: org.infinispan.distribution.rehash.StateResponseOrderingTest.1
            @Override // org.infinispan.test.AbstractInfinispanTest.Condition
            public boolean isSatisfied() throws Exception {
                return stateTransferManager.getCacheTopology().getPendingCH() != null;
            }
        });
        ((InboundInvocationHandler) TestingUtil.extractGlobalComponent(manager(0), InboundInvocationHandler.class)).handle(new StateResponseCommand("___defaultcache", address(3), topologyId, Arrays.asList(new StateChunk(0, Collections.emptyList(), true))), address(3), null, false);
        controlledRpcManager.stopBlocking();
        TestingUtil.waitForRehashToComplete(cache(0), cache(1), cache(2));
        DataContainer dataContainer = (DataContainer) TestingUtil.extractComponent(cache(0), DataContainer.class);
        Assert.assertTrue(dataContainer.containsKey(magicKey));
        Assert.assertTrue(dataContainer.containsKey(magicKey2));
        Assert.assertTrue(dataContainer.containsKey(magicKey3));
    }

    public void testStateResponseWhileRestartingBrokenTransfers() throws Throwable {
        Address address = address(1);
        MagicKey magicKey = new MagicKey("k1", (Cache<?, ?>) cache(1));
        cache(0).put(magicKey, "v1");
        Address address2 = advancedCache(0).getDistributionManager().locate(magicKey).get(1);
        Address address3 = advancedCache(0).getDistributionManager().locate(magicKey).get(2);
        ArrayList arrayList = new ArrayList(advancedCache(0).getRpcManager().getMembers());
        arrayList.removeAll(Arrays.asList(address, address2, address3));
        Address address4 = (Address) arrayList.get(0);
        this.log.debugf("Starting test with key %s, primary owner %s, backup owners %s and %s, non-owner %s", new Object[]{magicKey, address, address2, address3, address4});
        AdvancedCache advancedCache = manager(address4).m494getCache().getAdvancedCache();
        final StateTransferManager stateTransferManager = advancedCache.getComponentRegistry().getStateTransferManager();
        final int topologyId = stateTransferManager.getCacheTopology().getTopologyId();
        CheckPoint checkPoint = new CheckPoint();
        replaceInvocationHandler(checkPoint, manager(address4), address4);
        replaceInvocationHandler(checkPoint, manager(address), address4);
        replaceInvocationHandler(checkPoint, manager(address2), address4);
        this.log.debugf("Killing node %s", address3);
        manager(address3).m494getCache().stop();
        eventually(new AbstractInfinispanTest.Condition() { // from class: org.infinispan.distribution.rehash.StateResponseOrderingTest.2
            @Override // org.infinispan.test.AbstractInfinispanTest.Condition
            public boolean isSatisfied() throws Exception {
                return stateTransferManager.getCacheTopology().getTopologyId() == topologyId + 2;
            }
        });
        checkPoint.trigger("OUT_GET_TRANSACTIONS_" + address);
        checkPoint.trigger("OUT_GET_TRANSACTIONS_" + address2);
        checkPoint.awaitStrict("IN_GET_TRANSACTIONS_" + address, 10L, TimeUnit.SECONDS);
        checkPoint.awaitStrict("IN_GET_TRANSACTIONS_" + address2, 10L, TimeUnit.SECONDS);
        Address address5 = checkPoint.peek(5L, TimeUnit.SECONDS, new StringBuilder().append("IN_START_STATE_TRANSFER_").append(address).toString(), new StringBuilder().append("IN_START_STATE_TRANSFER_").append(address2).toString()).endsWith(address.toString()) ? address : address2;
        Address address6 = address5 == address ? address2 : address;
        this.log.debugf("Killing node %s. Key %s is located on %s", address6, magicKey, advancedCache.getDistributionManager().locate(magicKey));
        this.log.debugf("Data on node %s: %s", address, manager(address).m494getCache().keySet());
        this.log.debugf("Data on node %s: %s", address2, manager(address2).m494getCache().keySet());
        checkPoint.await("IN_START_STATE_TRANSFER_" + address5, 1L, TimeUnit.SECONDS);
        checkPoint.trigger("OUT_START_STATE_TRANSFER_" + address5);
        manager(address6).m494getCache().stop();
        checkPoint.awaitStrict("IN_RESPONSE_" + address5, 10L, TimeUnit.SECONDS);
        checkPoint.trigger("OUT_RESPONSE_" + address5);
        this.log.debugf("Received segments?", new Object[0]);
        Thread.sleep(1000L);
        checkPoint.awaitStrict("IN_GET_TRANSACTIONS_" + address5, 10L, TimeUnit.SECONDS);
        checkPoint.trigger("OUT_GET_TRANSACTIONS_" + address5);
        checkPoint.awaitStrict("IN_START_STATE_TRANSFER_" + address5, 10L, TimeUnit.SECONDS);
        checkPoint.trigger("OUT_START_STATE_TRANSFER_" + address5);
        checkPoint.awaitStrict("IN_RESPONSE_" + address5, 10L, TimeUnit.SECONDS);
        checkPoint.trigger("OUT_RESPONSE_" + address5);
        TestingUtil.waitForRehashToComplete(advancedCache, manager(address5).m494getCache());
        this.log.debugf("Final checkpoint status: %s", checkPoint);
        Assert.assertTrue(((DataContainer) TestingUtil.extractComponent(advancedCache, DataContainer.class)).containsKey(magicKey));
    }

    private void replaceInvocationHandler(final CheckPoint checkPoint, final EmbeddedCacheManager embeddedCacheManager, final Address address) throws Throwable {
        final InboundInvocationHandler inboundInvocationHandler = (InboundInvocationHandler) TestingUtil.extractGlobalComponent(embeddedCacheManager, InboundInvocationHandler.class);
        InboundInvocationHandler inboundInvocationHandler2 = (InboundInvocationHandler) Mockito.mock(InboundInvocationHandler.class);
        ((InboundInvocationHandler) Mockito.doAnswer(new Answer<Object>() { // from class: org.infinispan.distribution.rehash.StateResponseOrderingTest.3
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                CacheRpcCommand cacheRpcCommand = (CacheRpcCommand) invocationOnMock.getArguments()[0];
                Address address2 = (Address) invocationOnMock.getArguments()[1];
                Response response = (Response) invocationOnMock.getArguments()[2];
                boolean booleanValue = ((Boolean) invocationOnMock.getArguments()[3]).booleanValue();
                if ((cacheRpcCommand instanceof StateRequestCommand) && address2.equals(address)) {
                    StateRequestCommand stateRequestCommand = (StateRequestCommand) cacheRpcCommand;
                    checkPoint.trigger("IN_" + stateRequestCommand.getType() + '_' + embeddedCacheManager.getAddress());
                    checkPoint.awaitStrict("OUT_" + stateRequestCommand.getType() + '_' + embeddedCacheManager.getAddress(), 5L, TimeUnit.SECONDS);
                } else if ((cacheRpcCommand instanceof StateResponseCommand) && embeddedCacheManager.getAddress().equals(address)) {
                    checkPoint.trigger("IN_RESPONSE_" + address2);
                    checkPoint.awaitStrict("OUT_RESPONSE_" + address2, 5L, TimeUnit.SECONDS);
                }
                inboundInvocationHandler.handle(cacheRpcCommand, address2, response, booleanValue);
                return null;
            }
        }).when(inboundInvocationHandler2)).handle((CacheRpcCommand) Matchers.any(CacheRpcCommand.class), (Address) Matchers.any(Address.class), (Response) Matchers.any(Response.class), Mockito.anyBoolean());
        TestingUtil.replaceComponent((CacheContainer) embeddedCacheManager, (Class<InboundInvocationHandler>) InboundInvocationHandler.class, inboundInvocationHandler2, true);
        TestingUtil.replaceField(inboundInvocationHandler2, "inboundInvocationHandler", (CommandAwareRpcDispatcher) TestingUtil.extractField((Transport) TestingUtil.extractGlobalComponent(embeddedCacheManager, Transport.class), "dispatcher"), CommandAwareRpcDispatcher.class);
    }
}
