package org.infinispan.distribution.rehash;

import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.distribution.BlockingInterceptor;
import org.infinispan.interceptors.impl.EntryWrappingInterceptor;
import org.infinispan.lock.StripedLockTest;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.partitionhandling.AvailabilityMode;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.LocalTopologyManager;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.util.BaseControlledConsistentHashFactory;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(groups = {"functional"}, testName = "distribution.rehash.NonTxPrimaryOwnerBecomingNonOwnerTest")
/* loaded from: input_file:org/infinispan/distribution/rehash/NonTxPrimaryOwnerBecomingNonOwnerTest.class */
public class NonTxPrimaryOwnerBecomingNonOwnerTest extends MultipleCacheManagersTest {
    private static final String CACHE_NAME = "___defaultcache";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/distribution/rehash/NonTxPrimaryOwnerBecomingNonOwnerTest$CustomConsistentHashFactory.class */
    public static class CustomConsistentHashFactory extends BaseControlledConsistentHashFactory.Default {
        private CustomConsistentHashFactory() {
            super(1);
        }

        /* JADX WARN: Type inference failed for: r0v4, types: [int[], int[][]] */
        /* JADX WARN: Type inference failed for: r0v6, types: [int[], int[][]] */
        /* JADX WARN: Type inference failed for: r0v8, types: [int[], int[][]] */
        @Override // org.infinispan.util.BaseControlledConsistentHashFactory
        protected int[][] assignOwners(int i, int i2, List<Address> list) {
            AssertJUnit.assertEquals(2, i2);
            switch (list.size()) {
                case StripedLockTest.CAN_ACQUIRE_WL /* 1 */:
                    return new int[]{new int[]{0}};
                case 2:
                    return new int[]{new int[]{0, 1}};
                default:
                    return new int[]{new int[]{list.size() - 1, 0}};
            }
        }
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder configurationBuilder = getConfigurationBuilder();
        addClusterEnabledCacheManager(configurationBuilder);
        addClusterEnabledCacheManager(configurationBuilder);
        waitForClusterToForm();
    }

    private ConfigurationBuilder getConfigurationBuilder() {
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        configurationBuilder.clustering().cacheMode(CacheMode.DIST_SYNC);
        configurationBuilder.clustering().hash().numSegments(1).consistentHashFactory(new CustomConsistentHashFactory());
        configurationBuilder.transaction().transactionMode(TransactionMode.NON_TRANSACTIONAL);
        return configurationBuilder;
    }

    public void testPrimaryOwnerChangingDuringPut() throws Exception {
        doTest(TestWriteOperation.PUT_CREATE);
    }

    public void testPrimaryOwnerChangingDuringPutIfAbsent() throws Exception {
        doTest(TestWriteOperation.PUT_IF_ABSENT);
    }

    public void testPrimaryOwnerChangingDuringReplace() throws Exception {
        doTest(TestWriteOperation.REPLACE);
    }

    public void testPrimaryOwnerChangingDuringReplaceExact() throws Exception {
        doTest(TestWriteOperation.REPLACE_EXACT);
    }

    public void testPrimaryOwnerChangingDuringRemove() throws Exception {
        doTest(TestWriteOperation.REMOVE);
    }

    public void testPrimaryOwnerChangingDuringRemoveExact() throws Exception {
        doTest(TestWriteOperation.REMOVE_EXACT);
    }

    private void doTest(TestWriteOperation testWriteOperation) throws Exception {
        if (testWriteOperation.getPreviousValue() != null) {
            cache(0, CACHE_NAME).put("testkey", testWriteOperation.getPreviousValue());
        }
        CheckPoint checkPoint = new CheckPoint();
        LocalTopologyManager localTopologyManager = (LocalTopologyManager) TestingUtil.extractGlobalComponent(manager(0), LocalTopologyManager.class);
        int topologyId = localTopologyManager.getCacheTopology(CACHE_NAME).getTopologyId();
        Cache advancedCache = advancedCache(0);
        addBlockingLocalTopologyManager(manager(0), checkPoint, topologyId);
        Cache advancedCache2 = advancedCache(1);
        addBlockingLocalTopologyManager(manager(1), checkPoint, topologyId);
        ConfigurationBuilder configurationBuilder = getConfigurationBuilder();
        configurationBuilder.clustering().stateTransfer().awaitInitialTransfer(false);
        addClusterEnabledCacheManager(configurationBuilder);
        addBlockingLocalTopologyManager(manager(2), checkPoint, topologyId);
        this.log.tracef("Starting the cache on the joiner", new Object[0]);
        Cache advancedCache3 = advancedCache(2);
        int i = topologyId + 1;
        checkPoint.trigger("allow_topology_" + i + "_on_" + address(0));
        checkPoint.trigger("allow_topology_" + i + "_on_" + address(1));
        checkPoint.trigger("allow_topology_" + i + "_on_" + address(2));
        Stream.of((Object[]) new AdvancedCache[]{advancedCache, advancedCache2, advancedCache3}).forEach(advancedCache4 -> {
            eventuallyEquals(3, () -> {
                return Integer.valueOf(advancedCache4.getRpcManager().getMembers().size());
            });
        });
        CacheTopology cacheTopology = localTopologyManager.getCacheTopology(CACHE_NAME);
        AssertJUnit.assertEquals(i, cacheTopology.getTopologyId());
        AssertJUnit.assertNotNull(cacheTopology.getPendingCH());
        this.log.tracef("Rebalance started. Found key %s with current owners %s and pending owners %s", "testkey", cacheTopology.getCurrentCH().locateOwners("testkey"), cacheTopology.getPendingCH().locateOwners("testkey"));
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        BlockingInterceptor blockingInterceptor = new BlockingInterceptor(cyclicBarrier, testWriteOperation.getCommandClass(), false, true);
        advancedCache.getAsyncInterceptorChain().addInterceptorBefore(blockingInterceptor, EntryWrappingInterceptor.class);
        Future fork = fork(() -> {
            return testWriteOperation.perform(advancedCache, "testkey");
        });
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        int i2 = i + 1;
        checkPoint.trigger("allow_topology_" + i2 + "_on_" + address(0));
        eventuallyEquals(Integer.valueOf(i2), () -> {
            return Integer.valueOf(advancedCache.getComponentRegistry().getStateTransferManager().getCacheTopology().getTopologyId());
        });
        this.log.tracef("Unblocking the write command on node " + address(1), new Object[0]);
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        blockingInterceptor.suspend(true);
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        checkPoint.triggerAll();
        TestingUtil.waitForNoRebalance(advancedCache, advancedCache2, advancedCache3);
        fork.get(10L, TimeUnit.SECONDS);
        this.log.tracef("Write operation is done", new Object[0]);
        AssertJUnit.assertEquals(testWriteOperation.getValue(), advancedCache.get("testkey"));
        AssertJUnit.assertEquals(testWriteOperation.getValue(), advancedCache2.get("testkey"));
        AssertJUnit.assertEquals(testWriteOperation.getValue(), advancedCache3.get("testkey"));
        AssertJUnit.assertFalse(advancedCache.getAdvancedCache().getLockManager().isLocked("testkey"));
        AssertJUnit.assertFalse(advancedCache2.getAdvancedCache().getLockManager().isLocked("testkey"));
        AssertJUnit.assertFalse(advancedCache3.getAdvancedCache().getLockManager().isLocked("testkey"));
    }

    private void addBlockingLocalTopologyManager(EmbeddedCacheManager embeddedCacheManager, CheckPoint checkPoint, int i) throws InterruptedException {
        LocalTopologyManager localTopologyManager = (LocalTopologyManager) Mockito.spy((LocalTopologyManager) TestingUtil.extractGlobalComponent(embeddedCacheManager, LocalTopologyManager.class));
        ((LocalTopologyManager) Mockito.doAnswer(invocationOnMock -> {
            CacheTopology cacheTopology = (CacheTopology) invocationOnMock.getArguments()[1];
            if (cacheTopology.getTopologyId() != i) {
                checkPoint.trigger("pre_topology_" + cacheTopology.getTopologyId() + "_on_" + embeddedCacheManager.getAddress());
                checkPoint.await("allow_topology_" + cacheTopology.getTopologyId() + "_on_" + embeddedCacheManager.getAddress(), 10L, TimeUnit.SECONDS);
            }
            return invocationOnMock.callRealMethod();
        }).when(localTopologyManager)).handleTopologyUpdate((String) Matchers.eq(CACHE_NAME), (CacheTopology) Matchers.any(CacheTopology.class), (AvailabilityMode) Matchers.any(AvailabilityMode.class), Matchers.anyInt(), (Address) Matchers.any(Address.class));
        TestingUtil.extractGlobalComponentRegistry(embeddedCacheManager).registerComponent(localTopologyManager, LocalTopologyManager.class);
    }
}
