package org.infinispan.statetransfer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.Cache;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.VersionedPrepareCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.DistributionTestHelper;
import org.infinispan.expiration.impl.ExpirationWithClusteredWriteSkewTest;
import org.infinispan.interceptors.base.BaseCustomInterceptor;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.tx.dld.ControlledRpcManager;
import org.infinispan.util.BaseControlledConsistentHashFactory;
import org.infinispan.util.BlockingLocalTopologyManager;
import org.infinispan.util.concurrent.IsolationLevel;
import org.infinispan.util.logging.LogFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "statetransfer.WriteSkewDuringStateTransferTest", singleThreaded = true)
/* loaded from: input_file:org/infinispan/statetransfer/WriteSkewDuringStateTransferTest.class */
public class WriteSkewDuringStateTransferTest extends MultipleCacheManagersTest {
    private final List<BlockingLocalTopologyManager> topologyManagerList = Collections.synchronizedList(new ArrayList(4));

    /* loaded from: input_file:org/infinispan/statetransfer/WriteSkewDuringStateTransferTest$Action.class */
    public interface Action {
        boolean isApplicable(InvocationContext invocationContext, VisitableCommand visitableCommand);

        void before(InvocationContext invocationContext, VisitableCommand visitableCommand, Cache cache);

        void after(InvocationContext invocationContext, VisitableCommand visitableCommand, Cache cache);
    }

    /* loaded from: input_file:org/infinispan/statetransfer/WriteSkewDuringStateTransferTest$ConsistentHashFactoryImpl.class */
    public static class ConsistentHashFactoryImpl extends BaseControlledConsistentHashFactory.Default {
        public ConsistentHashFactoryImpl() {
            super(1);
        }

        @Override // org.infinispan.util.BaseControlledConsistentHashFactory
        protected final List<Address> createOwnersCollection(List<Address> list, int i, int i2) {
            AssertJUnit.assertEquals("Wrong number of owners", 3, i);
            ArrayList arrayList = new ArrayList(3);
            arrayList.add(list.get(list.size() - 1));
            for (int i3 = 0; i3 < list.size() - 1; i3++) {
                arrayList.add(list.get(i3));
            }
            return arrayList;
        }
    }

    /* loaded from: input_file:org/infinispan/statetransfer/WriteSkewDuringStateTransferTest$ControlledCommandInterceptor.class */
    public static class ControlledCommandInterceptor extends BaseCustomInterceptor {
        private final List<Action> actionList = new ArrayList(3);

        public ControlledCommandInterceptor(Cache<Object, Object> cache) {
            this.cache = cache;
            this.cacheConfiguration = cache.getCacheConfiguration();
            this.embeddedCacheManager = cache.getCacheManager();
            cache.getAdvancedCache().addInterceptor(this, 0);
        }

        public ControlledCommandInterceptor() {
        }

        public void addAction(Action action) {
            this.actionList.add(action);
        }

        protected Object handleDefault(InvocationContext invocationContext, VisitableCommand visitableCommand) throws Throwable {
            List<Action> extractActions = extractActions(invocationContext, visitableCommand);
            if (extractActions.isEmpty()) {
                return invokeNextInterceptor(invocationContext, visitableCommand);
            }
            Iterator<Action> it = extractActions.iterator();
            while (it.hasNext()) {
                it.next().before(invocationContext, visitableCommand, this.cache);
            }
            Object invokeNextInterceptor = invokeNextInterceptor(invocationContext, visitableCommand);
            Iterator<Action> it2 = extractActions.iterator();
            while (it2.hasNext()) {
                it2.next().after(invocationContext, visitableCommand, this.cache);
            }
            return invokeNextInterceptor;
        }

        private List<Action> extractActions(InvocationContext invocationContext, VisitableCommand visitableCommand) {
            if (this.actionList.isEmpty()) {
                return Collections.emptyList();
            }
            ArrayList arrayList = new ArrayList(this.actionList.size());
            for (Action action : this.actionList) {
                if (action.isApplicable(invocationContext, visitableCommand)) {
                    arrayList.add(action);
                }
            }
            return arrayList;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/statetransfer/WriteSkewDuringStateTransferTest$NewNode.class */
    public static class NewNode {
        Future<Void> joinerFuture;
        NodeController controller;

        private NewNode() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/statetransfer/WriteSkewDuringStateTransferTest$NodeController.class */
    public static class NodeController {
        ControlledCommandInterceptor interceptor;
        BlockingLocalTopologyManager topologyManager;

        private NodeController() {
        }
    }

    @AfterMethod(alwaysRun = true)
    public final void unblockAll() {
        Iterator<BlockingLocalTopologyManager> it = this.topologyManagerList.iterator();
        while (it.hasNext()) {
            it.next().stopBlockingAll();
        }
        this.topologyManagerList.clear();
    }

    public void testVersionsAfterStateTransfer() throws Exception {
        assertClusterSize("Wrong cluster size", 2);
        assertKeyOwnership("key1", cache(1), cache(0));
        int currentTopologyId = currentTopologyId(cache(0));
        ControlledRpcManager replaceRpcManager = replaceRpcManager(cache(0));
        NodeController nodeControllerIn = setNodeControllerIn(cache(0));
        setInitialPhaseForNodeA(nodeControllerIn, currentTopologyId);
        NodeController nodeControllerIn2 = setNodeControllerIn(cache(1));
        setInitialPhaseForNodeB(nodeControllerIn2, currentTopologyId);
        NewNode addNode = addNode(currentTopologyId);
        nodeControllerIn.topologyManager.waitToBlock(BlockingLocalTopologyManager.LatchType.CONSISTENT_HASH_UPDATE);
        replaceRpcManager.blockAfter(VersionedPrepareCommand.class);
        Future<Object> executeTransaction = executeTransaction(cache(0), "key1");
        replaceRpcManager.waitForCommandToBlock();
        nodeControllerIn.topologyManager.stopBlocking(BlockingLocalTopologyManager.LatchType.CONSISTENT_HASH_UPDATE);
        awaitForTopology(currentTopologyId + 4, cache(0));
        replaceRpcManager.stopBlocking();
        AssertJUnit.assertNull("Wrong put() return value.", executeTransaction.get());
        nodeControllerIn.topologyManager.stopBlockingAll();
        nodeControllerIn2.topologyManager.stopBlockingAll();
        addNode.controller.topologyManager.stopBlockingAll();
        addNode.joinerFuture.get();
        awaitForTopology(currentTopologyId + 4, cache(0));
        awaitForTopology(currentTopologyId + 4, cache(1));
        awaitForTopology(currentTopologyId + 4, cache(2));
        assertKeyVersionInDataContainer("key1", cache(1), cache(2));
        cache(0).put("key1", "v2");
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        createClusteredCaches(2, configuration());
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void assertKeyVersionInDataContainer(Object obj, Cache... cacheArr) {
        for (Cache cache : cacheArr) {
            InternalCacheEntry internalCacheEntry = ((DataContainer) TestingUtil.extractComponent(cache, DataContainer.class)).get(obj);
            AssertJUnit.assertNotNull("Entry cannot be null in " + address((Cache<?, ?>) cache) + ".", internalCacheEntry);
            AssertJUnit.assertNotNull("Version cannot be null.", internalCacheEntry.getMetadata().version());
        }
    }

    private ControlledRpcManager replaceRpcManager(Cache cache) {
        ControlledRpcManager controlledRpcManager = new ControlledRpcManager((RpcManager) TestingUtil.extractComponent(cache, RpcManager.class));
        TestingUtil.replaceComponent((Cache<?, ?>) cache, (Class<ControlledRpcManager>) RpcManager.class, controlledRpcManager, true);
        return controlledRpcManager;
    }

    private void awaitForTopology(int i, Cache cache) {
        eventually(() -> {
            return i == currentTopologyId(cache);
        });
    }

    private int currentTopologyId(Cache cache) {
        return ((StateTransferManager) TestingUtil.extractComponent(cache, StateTransferManager.class)).getCacheTopology().getTopologyId();
    }

    private Future<Object> executeTransaction(Cache<Object, Object> cache, Object obj) {
        return fork(() -> {
            return TestingUtil.withTx(cache.getAdvancedCache().getTransactionManager(), () -> {
                return cache.put(obj, ExpirationWithClusteredWriteSkewTest.VALUE);
            });
        });
    }

    private NewNode addNode(final int i) {
        final NewNode newNode = new NewNode();
        ConfigurationBuilder configuration = configuration();
        newNode.controller = new NodeController();
        newNode.controller.interceptor = new ControlledCommandInterceptor();
        configuration.customInterceptors().addInterceptor().index(0).interceptor(newNode.controller.interceptor);
        EmbeddedCacheManager addClusterEnabledCacheManager = addClusterEnabledCacheManager(configuration);
        newNode.controller.topologyManager = replaceTopologyManager(addClusterEnabledCacheManager);
        newNode.controller.interceptor.addAction(new Action() { // from class: org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.1
            @Override // org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.Action
            public boolean isApplicable(InvocationContext invocationContext, VisitableCommand visitableCommand) {
                return !invocationContext.isOriginLocal() && (visitableCommand instanceof PrepareCommand);
            }

            @Override // org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.Action
            public void before(InvocationContext invocationContext, VisitableCommand visitableCommand, Cache cache) {
                WriteSkewDuringStateTransferTest.this.log.tracef("Before: command=%s. origin=%s", visitableCommand, invocationContext.getOrigin());
                if (invocationContext.getOrigin().equals(WriteSkewDuringStateTransferTest.this.address((Cache<?, ?>) WriteSkewDuringStateTransferTest.this.cache(1)))) {
                    try {
                        cache.getAdvancedCache().getComponentRegistry().getStateTransferLock().waitForTopology(i + 2, 10L, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } catch (TimeoutException e2) {
                        throw WriteSkewDuringStateTransferTest.this.log.failedWaitingForTopology(i + 2);
                    }
                }
            }

            @Override // org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.Action
            public void after(InvocationContext invocationContext, VisitableCommand visitableCommand, Cache cache) {
                WriteSkewDuringStateTransferTest.this.log.tracef("After: command=%s. origin=%s", visitableCommand, invocationContext.getOrigin());
                if (invocationContext.getOrigin().equals(WriteSkewDuringStateTransferTest.this.address((Cache<?, ?>) WriteSkewDuringStateTransferTest.this.cache(0)))) {
                    newNode.controller.topologyManager.stopBlocking(BlockingLocalTopologyManager.LatchType.CONSISTENT_HASH_UPDATE);
                }
            }
        });
        newNode.controller.topologyManager.startBlocking(BlockingLocalTopologyManager.LatchType.CONSISTENT_HASH_UPDATE);
        newNode.joinerFuture = fork(() -> {
            waitForClusterToForm();
            return null;
        });
        return newNode;
    }

    private ConfigurationBuilder configuration() {
        ConfigurationBuilder defaultClusteredCacheConfig = getDefaultClusteredCacheConfig(CacheMode.REPL_SYNC, true);
        defaultClusteredCacheConfig.clustering().stateTransfer().fetchInMemoryState(true).hash().numSegments(1).numOwners(3).consistentHashFactory(new ConsistentHashFactoryImpl());
        defaultClusteredCacheConfig.locking().isolationLevel(IsolationLevel.REPEATABLE_READ);
        return defaultClusteredCacheConfig;
    }

    private void assertKeyOwnership(Object obj, Cache cache, Cache... cacheArr) {
        AssertJUnit.assertTrue("Wrong ownership for " + obj + ".", DistributionTestHelper.hasOwners(obj, cache, cacheArr));
    }

    private BlockingLocalTopologyManager replaceTopologyManager(CacheContainer cacheContainer) {
        BlockingLocalTopologyManager replaceTopologyManager = BlockingLocalTopologyManager.replaceTopologyManager(cacheContainer);
        this.topologyManagerList.add(replaceTopologyManager);
        return replaceTopologyManager;
    }

    private static NodeController setNodeControllerIn(Cache<Object, Object> cache) {
        NodeController nodeController = new NodeController();
        nodeController.interceptor = new ControlledCommandInterceptor(cache);
        nodeController.topologyManager = BlockingLocalTopologyManager.replaceTopologyManager(cache.getCacheManager());
        return nodeController;
    }

    private static void setInitialPhaseForNodeA(NodeController nodeController, final int i) {
        nodeController.interceptor.addAction(new Action() { // from class: org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.2
            @Override // org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.Action
            public boolean isApplicable(InvocationContext invocationContext, VisitableCommand visitableCommand) {
                return invocationContext.isOriginLocal() && (visitableCommand instanceof PrepareCommand);
            }

            @Override // org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.Action
            public void before(InvocationContext invocationContext, VisitableCommand visitableCommand, Cache cache) {
                try {
                    cache.getAdvancedCache().getComponentRegistry().getStateTransferLock().waitForTopology(i + 1, 10L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (TimeoutException e2) {
                    throw LogFactory.getLog(getClass()).failedWaitingForTopology(i + 1);
                }
            }

            @Override // org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.Action
            public void after(InvocationContext invocationContext, VisitableCommand visitableCommand, Cache cache) {
            }
        });
        nodeController.topologyManager.startBlocking(BlockingLocalTopologyManager.LatchType.CONSISTENT_HASH_UPDATE);
    }

    private static void setInitialPhaseForNodeB(NodeController nodeController, final int i) {
        nodeController.interceptor.addAction(new Action() { // from class: org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.3
            @Override // org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.Action
            public boolean isApplicable(InvocationContext invocationContext, VisitableCommand visitableCommand) {
                return !invocationContext.isOriginLocal() && (visitableCommand instanceof PrepareCommand);
            }

            @Override // org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.Action
            public void before(InvocationContext invocationContext, VisitableCommand visitableCommand, Cache cache) {
                try {
                    cache.getAdvancedCache().getComponentRegistry().getStateTransferLock().waitForTopology(i + 2, 10L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (TimeoutException e2) {
                    throw LogFactory.getLog(getClass()).failedWaitingForTopology(i + 2);
                }
            }

            @Override // org.infinispan.statetransfer.WriteSkewDuringStateTransferTest.Action
            public void after(InvocationContext invocationContext, VisitableCommand visitableCommand, Cache cache) {
            }
        });
    }
}
