package org.infinispan.scattered.statetransfer;

import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.configuration.cache.BiasAcquisition;
import org.infinispan.distribution.MagicKey;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.statetransfer.ConcurrentStartTest;
import org.infinispan.statetransfer.DelegatingStateTransferLock;
import org.infinispan.statetransfer.StateResponseCommand;
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.test.TestingUtil;
import org.infinispan.topology.LocalTopologyManager;
import org.infinispan.util.AbstractControlledRpcManager;
import org.infinispan.util.BlockingLocalTopologyManager;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "scattered.statetransfer.PushTransferTest")
/* loaded from: input_file:org/infinispan/scattered/statetransfer/PushTransferTest.class */
public class PushTransferTest extends AbstractStateTransferTest {
    @Override // org.infinispan.test.MultipleCacheManagersTest
    public Object[] factory() {
        return new Object[]{new PushTransferTest().biasAcquisition(BiasAcquisition.NEVER), new PushTransferTest().biasAcquisition(BiasAcquisition.ON_WRITE)};
    }

    public void testNodeJoin() throws Exception {
        List<MagicKey> init = init();
        EmbeddedCacheManager addClusterEnabledCacheManager = addClusterEnabledCacheManager(this.defaultConfig, TRANSPORT_FLAGS);
        LocalTopologyManager localTopologyManager = (LocalTopologyManager) TestingUtil.extractGlobalComponent(addClusterEnabledCacheManager, LocalTopologyManager.class);
        int topologyId = ((StateTransferManager) TestingUtil.extractComponent(this.c1, StateTransferManager.class)).getCacheTopology().getTopologyId();
        BlockingLocalTopologyManager blockingLocalTopologyManager = new BlockingLocalTopologyManager(localTopologyManager);
        blockingLocalTopologyManager.startBlocking(BlockingLocalTopologyManager.LatchType.CONSISTENT_HASH_UPDATE);
        blockingLocalTopologyManager.startBlocking(BlockingLocalTopologyManager.LatchType.REBALANCE);
        TestingUtil.replaceComponent((CacheContainer) addClusterEnabledCacheManager, (Class<? extends BlockingLocalTopologyManager>) LocalTopologyManager.class, blockingLocalTopologyManager, true);
        Stream.of((Object[]) new Cache[]{this.c1, this.c2, this.c3}).forEach(cache -> {
            TestingUtil.replaceComponent((Cache<?, ?>) cache, (Class<? extends AbstractControlledRpcManager>) RpcManager.class, new AbstractControlledRpcManager((RpcManager) TestingUtil.extractComponent(cache, RpcManager.class)) { // from class: org.infinispan.scattered.statetransfer.PushTransferTest.1
                /* JADX INFO: Access modifiers changed from: protected */
                @Override // org.infinispan.util.AbstractControlledRpcManager
                public <T> T afterInvokeRemotely(ReplicableCommand replicableCommand, T t, Object obj) {
                    if ((replicableCommand instanceof StateResponseCommand) && ((Map) t).keySet().contains(addClusterEnabledCacheManager.getAddress())) {
                        try {
                            Field declaredField = StateResponseCommand.class.getDeclaredField("pushTransfer");
                            declaredField.setAccessible(true);
                            if (declaredField.getBoolean(replicableCommand)) {
                                this.log.info("Push transfer response received, unblocking");
                                blockingLocalTopologyManager.stopBlocking(BlockingLocalTopologyManager.LatchType.CONSISTENT_HASH_UPDATE);
                                blockingLocalTopologyManager.stopBlocking(BlockingLocalTopologyManager.LatchType.REBALANCE);
                            }
                        } catch (Exception e) {
                            throw new IllegalStateException();
                        }
                    }
                    return t;
                }
            }, true);
        });
        TestingUtil.addCacheStartingHook(addClusterEnabledCacheManager, (str, componentRegistry) -> {
            componentRegistry.registerComponent(new DelegatingStateTransferLock((StateTransferLock) componentRegistry.getComponent(StateTransferLock.class)) { // from class: org.infinispan.scattered.statetransfer.PushTransferTest.2
                @Override // org.infinispan.statetransfer.DelegatingStateTransferLock
                public boolean transactionDataReceived(int i) {
                    if (i == topologyId + 1) {
                        blockingLocalTopologyManager.stopBlocking(BlockingLocalTopologyManager.LatchType.CONSISTENT_HASH_UPDATE);
                        blockingLocalTopologyManager.stopBlocking(BlockingLocalTopologyManager.LatchType.REBALANCE);
                    }
                    return super.transactionDataReceived(i);
                }
            }, StateTransferLock.class);
            componentRegistry.rewire();
        });
        Cache cache2 = addClusterEnabledCacheManager.getCache(ConcurrentStartTest.SCATTERED_CACHE_NAME);
        TestingUtil.blockUntilViewsReceived(30000L, false, this.c1, this.c2, this.c3, cache2);
        TestingUtil.waitForNoRebalance(this.c1, this.c2, this.c3, cache2);
        for (MagicKey magicKey : init) {
            AssertJUnit.assertEquals("Key " + magicKey + " has incorrect number of copies", 2, Stream.of((Object[]) new Cache[]{this.c1, this.c2, this.c3, cache2}).mapToInt(cache3 -> {
                return cache3.getAdvancedCache().getDataContainer().containsKey(magicKey) ? 1 : 0;
            }).sum());
        }
    }
}
