package org.infinispan.scattered.statetransfer;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.infinispan.Cache;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.statetransfer.StateResponseCommand;
import org.infinispan.configuration.cache.BiasAcquisition;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.distribution.MagicKey;
import org.infinispan.factories.impl.BasicComponentRegistry;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.inboundhandler.AbstractDelegatingHandler;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.PerCacheInboundInvocationHandler;
import org.infinispan.remoting.inboundhandler.Reply;
import org.infinispan.statetransfer.ConcurrentStartTest;
import org.infinispan.test.TestDataSCI;
import org.infinispan.test.TestingUtil;
import org.infinispan.topology.CacheTopology;
import org.infinispan.util.BlockingLocalTopologyManager;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "scattered.statetransfer.PushTransferTest")
/* loaded from: input_file:org/infinispan/scattered/statetransfer/PushTransferTest.class */
public class PushTransferTest extends AbstractStateTransferTest {
    @Override // org.infinispan.test.MultipleCacheManagersTest
    public Object[] factory() {
        return new Object[]{new PushTransferTest().biasAcquisition(BiasAcquisition.NEVER), new PushTransferTest().biasAcquisition(BiasAcquisition.ON_WRITE)};
    }

    public void testNodeJoin() throws Exception {
        List<MagicKey> init = init();
        EmbeddedCacheManager addClusterEnabledCacheManager = addClusterEnabledCacheManager(TestDataSCI.INSTANCE, (ConfigurationBuilder) null, TRANSPORT_FLAGS);
        addClusterEnabledCacheManager.defineConfiguration(ConcurrentStartTest.SCATTERED_CACHE_NAME, this.defaultConfig.build());
        this.c1.getAdvancedCache().getDistributionManager().getCacheTopology().getTopologyId();
        BlockingLocalTopologyManager replaceTopologyManager = BlockingLocalTopologyManager.replaceTopologyManager(addClusterEnabledCacheManager, ConcurrentStartTest.SCATTERED_CACHE_NAME);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        TestingUtil.addCacheStartingHook(addClusterEnabledCacheManager, (str, componentRegistry) -> {
            final PerCacheInboundInvocationHandler perCacheInboundInvocationHandler = (PerCacheInboundInvocationHandler) componentRegistry.getComponent(PerCacheInboundInvocationHandler.class);
            ((BasicComponentRegistry) componentRegistry.getComponent(BasicComponentRegistry.class)).replaceComponent(PerCacheInboundInvocationHandler.class.getName(), new AbstractDelegatingHandler(perCacheInboundInvocationHandler) { // from class: org.infinispan.scattered.statetransfer.PushTransferTest.1
                public void handle(CacheRpcCommand cacheRpcCommand, Reply reply, DeliverOrder deliverOrder) {
                    if (cacheRpcCommand instanceof StateResponseCommand) {
                        PushTransferTest.log.tracef("State received on %s", addClusterEnabledCacheManager.getAddress());
                        countDownLatch.countDown();
                    }
                    PerCacheInboundInvocationHandler perCacheInboundInvocationHandler2 = perCacheInboundInvocationHandler;
                    EmbeddedCacheManager embeddedCacheManager = addClusterEnabledCacheManager;
                    CountDownLatch countDownLatch3 = countDownLatch2;
                    perCacheInboundInvocationHandler2.handle(cacheRpcCommand, response -> {
                        PushTransferTest.log.tracef("State applied on %s", embeddedCacheManager.getAddress());
                        countDownLatch3.countDown();
                        reply.reply(response);
                    }, deliverOrder);
                }
            }, false);
            componentRegistry.rewire();
            componentRegistry.cacheComponents();
        });
        Future fork = fork(() -> {
            return addClusterEnabledCacheManager.getCache(ConcurrentStartTest.SCATTERED_CACHE_NAME);
        });
        AssertJUnit.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
        AssertJUnit.assertFalse(countDownLatch2.await(100L, TimeUnit.MILLISECONDS));
        replaceTopologyManager.confirmTopologyUpdate(CacheTopology.Phase.TRANSITORY);
        AssertJUnit.assertEquals(0L, countDownLatch2.getCount());
        replaceTopologyManager.confirmTopologyUpdate(CacheTopology.Phase.NO_REBALANCE);
        Cache cache = (Cache) fork.get(30L, TimeUnit.SECONDS);
        TestingUtil.blockUntilViewsReceived(30000L, false, (Cache<?, ?>[]) new Cache[]{this.c1, this.c2, this.c3, cache});
        TestingUtil.waitForNoRebalance(this.c1, this.c2, this.c3, cache);
        for (MagicKey magicKey : init) {
            AssertJUnit.assertEquals("Key " + magicKey + " has incorrect number of copies", 2, Stream.of((Object[]) new Cache[]{this.c1, this.c2, this.c3, cache}).mapToInt(cache2 -> {
                return cache2.getAdvancedCache().getDataContainer().containsKey(magicKey) ? 1 : 0;
            }).sum());
        }
    }
}
