package org.infinispan.statetransfer;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.transaction.TransactionManager;
import org.infinispan.Cache;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.distribution.rehash.RehashStressTest;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.partionhandling.impl.AvailabilityMode;
import org.infinispan.profiling.DeadlockDetectionPerformanceTest;
import org.infinispan.remoting.ReplicationQueue;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.topology.CacheJoinInfo;
import org.infinispan.topology.CacheStatusResponse;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.ClusterTopologyManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(groups = {"functional"}, testName = "statetransfer.StateTransferReplicationQueueTest")
/* loaded from: input_file:org/infinispan/statetransfer/StateTransferReplicationQueueTest.class */
public class StateTransferReplicationQueueTest extends MultipleCacheManagersTest {
    public static final String A_B_NAME = "a_b_name";
    public static final String A_C_NAME = "a_c_name";
    public static final String A_B_AGE = "a_b_age";
    public static final String A_C_AGE = "a_c_age";
    public static final String JOE = "JOE";
    public static final String BOB = "BOB";
    public static final Integer TWENTY;
    public static final Integer FORTY;
    private final String TX_CACHE = "txCache";
    private final String NONTX_CACHE = "nontxCache";
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/statetransfer/StateTransferReplicationQueueTest$DelayingClusterTopologyManager.class */
    public class DelayingClusterTopologyManager implements ClusterTopologyManager {
        private final EmbeddedCacheManager manager1;
        private ClusterTopologyManager instance;

        public DelayingClusterTopologyManager(EmbeddedCacheManager embeddedCacheManager) {
            this.manager1 = embeddedCacheManager;
            this.instance = (ClusterTopologyManager) TestingUtil.extractGlobalComponent(embeddedCacheManager, ClusterTopologyManager.class);
        }

        public CacheStatusResponse handleJoin(String str, Address address, CacheJoinInfo cacheJoinInfo, int i) throws Exception {
            CacheStatusResponse handleJoin = this.instance.handleJoin(str, address, cacheJoinInfo, i);
            StateTransferReplicationQueueTest.this.log.tracef("Delaying join response", new Object[0]);
            Thread.sleep(500L);
            return handleJoin;
        }

        public void handleLeave(String str, Address address, int i) throws Exception {
            this.instance.handleLeave(str, address, i);
        }

        public void handleRebalanceCompleted(String str, Address address, int i, Throwable th, int i2) throws Exception {
            this.instance.handleRebalanceCompleted(str, address, i, th, i2);
        }

        public void handleClusterView(boolean z, int i) {
            this.instance.handleClusterView(z, i);
        }

        public void broadcastRebalanceStart(String str, CacheTopology cacheTopology, boolean z, boolean z2) {
            StateTransferReplicationQueueTest.this.log.tracef("Delaying rebalance", new Object[0]);
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            this.instance.broadcastRebalanceStart(str, cacheTopology, z, z2);
        }

        public void broadcastTopologyUpdate(String str, CacheTopology cacheTopology, AvailabilityMode availabilityMode, boolean z, boolean z2) {
            this.instance.broadcastTopologyUpdate(str, cacheTopology, availabilityMode, z, z2);
        }

        public void broadcastStableTopologyUpdate(String str, CacheTopology cacheTopology, boolean z, boolean z2) {
            this.instance.broadcastStableTopologyUpdate(str, cacheTopology, z, z2);
        }

        public boolean isRebalancingEnabled() {
            return this.instance.isRebalancingEnabled();
        }

        public void setRebalancingEnabled(boolean z) {
            this.instance.setRebalancingEnabled(z);
        }

        public void forceRebalance(String str) {
            this.instance.forceRebalance(str);
        }
    }

    /* loaded from: input_file:org/infinispan/statetransfer/StateTransferReplicationQueueTest$PojoValue.class */
    public static class PojoValue implements Externalizable {
        static AtomicBoolean holdUp = new AtomicBoolean();
        Log log = LogFactory.getLog(PojoValue.class);
        volatile int value;

        public PojoValue() {
        }

        public PojoValue(int i) {
            this.value = i;
        }

        @Override // java.io.Externalizable
        public void writeExternal(ObjectOutput objectOutput) throws IOException {
            Thread.currentThread().getName();
            if (!holdUp.get()) {
                this.log.debug("In streaming...");
                holdUp.compareAndSet(false, true);
                this.log.debug("Holding up...");
                TestingUtil.sleepThread(1000L);
            }
            objectOutput.writeInt(this.value);
        }

        @Override // java.io.Externalizable
        public void readExternal(ObjectInput objectInput) throws IOException, ClassNotFoundException {
            this.value = objectInput.readInt();
        }

        public int hashCode() {
            return this.value + 31;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.value == ((PojoValue) obj).value;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/statetransfer/StateTransferReplicationQueueTest$WritingThread.class */
    public static class WritingThread extends Thread {
        private final Cache<Object, Object> cache;
        private final boolean tx;
        private volatile boolean stop;
        private volatile int result;
        private TransactionManager tm;

        WritingThread(Cache<Object, Object> cache, boolean z) {
            super("WriterThread");
            this.cache = cache;
            this.tx = z;
            if (z) {
                this.tm = TestingUtil.getTransactionManager(cache);
            }
            setDaemon(true);
        }

        public int result() {
            return this.result;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            int i = 0;
            while (!this.stop) {
                try {
                    if (this.tx) {
                        this.tm.begin();
                    }
                    this.cache.put("test" + i, new PojoValue(i));
                    this.cache.remove("test" + i);
                    if (this.tx) {
                        this.tm.commit();
                    }
                    i++;
                    if (i % RehashStressTest.MAX_INTERVAL_BETWEEN_TASK == 0) {
                        TestingUtil.sleepThread(1L);
                    }
                } catch (Exception e) {
                    stopThread();
                }
            }
            this.result = i;
        }

        public void stopThread() {
            this.stop = true;
        }
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
    }

    private Configuration buildConfiguration(boolean z) {
        ConfigurationBuilder defaultClusteredCacheConfig = getDefaultClusteredCacheConfig(CacheMode.REPL_ASYNC, z);
        defaultClusteredCacheConfig.clustering().async().useReplQueue(true).replQueueInterval(100L).replQueueMaxElements(3);
        return defaultClusteredCacheConfig.build();
    }

    public void testStateTransferWithNodeRestartedAndBusyTx(Method method) throws Exception {
        this.log.info(method.getName() + " start");
        doWritingCacheTest("txCache", true);
        this.log.info(method.getName() + "end");
    }

    public void testStateTransferWithNodeRestartedAndBusyImplicitTx(Method method) throws Exception {
        this.log.info(method.getName() + " start");
        doWritingCacheTest("txCache", false);
        this.log.info(method.getName() + "end");
    }

    public void testStateTransferWithNodeRestartedAndBusyNonTx(Method method) throws Exception {
        this.log.info(method.getName() + " start");
        doWritingCacheTest("nontxCache", false);
        this.log.info(method.getName() + "end");
    }

    private void doWritingCacheTest(String str, boolean z) throws InterruptedException {
        EmbeddedCacheManager createCacheManager = createCacheManager();
        Cache<Object, Object> cache = createCacheManager.getCache(str);
        TestingUtil.replaceComponent((CacheContainer) createCacheManager, (Class<DelayingClusterTopologyManager>) ClusterTopologyManager.class, new DelayingClusterTopologyManager(createCacheManager), true);
        EmbeddedCacheManager createCacheManager2 = createCacheManager();
        createCacheManager2.getCache(str);
        writeInitialData(cache);
        WritingThread writingThread = new WritingThread(cache, z);
        writingThread.start();
        createCacheManager2.stop();
        TestingUtil.blockUntilViewsReceived(DeadlockDetectionPerformanceTest.BENCHMARK_DURATION, false, cache);
        TestingUtil.waitForRehashToComplete(cache);
        Cache<Object, Object> cache2 = createCacheManager().getCache(str);
        TestingUtil.blockUntilViewsReceived(DeadlockDetectionPerformanceTest.BENCHMARK_DURATION, cache, cache2);
        TestingUtil.waitForRehashToComplete(cache, cache2);
        writingThread.stopThread();
        writingThread.join(DeadlockDetectionPerformanceTest.BENCHMARK_DURATION);
        verifyInitialData(cache2);
        int result = writingThread.result();
        final ReplicationQueue replicationQueue = (ReplicationQueue) cache.getAdvancedCache().getComponentRegistry().getComponent(ReplicationQueue.class);
        eventually(new AbstractInfinispanTest.Condition() { // from class: org.infinispan.statetransfer.StateTransferReplicationQueueTest.1
            @Override // org.infinispan.test.AbstractInfinispanTest.Condition
            public boolean isSatisfied() throws Exception {
                return replicationQueue.getElementsCount() == 0;
            }
        });
        Thread.sleep(1000L);
        for (int i = 0; i < result; i++) {
            AssertJUnit.assertNull(cache2.get("test" + i));
        }
    }

    protected EmbeddedCacheManager createCacheManager() {
        EmbeddedCacheManager addClusterEnabledCacheManager = addClusterEnabledCacheManager();
        addClusterEnabledCacheManager.defineConfiguration("txCache", buildConfiguration(true));
        addClusterEnabledCacheManager.defineConfiguration("nontxCache", buildConfiguration(true));
        return addClusterEnabledCacheManager;
    }

    protected void writeInitialData(Cache<Object, Object> cache) {
        cache.put("a_b_name", "JOE");
        cache.put("a_b_age", TWENTY);
        cache.put("a_c_name", "BOB");
        cache.put("a_c_age", FORTY);
    }

    protected void verifyInitialData(Cache<Object, Object> cache) {
        if (!$assertionsDisabled && !"JOE".equals(cache.get("a_b_name"))) {
            throw new AssertionError("Incorrect value for key a_b_name");
        }
        if (!$assertionsDisabled && !TWENTY.equals(cache.get("a_b_age"))) {
            throw new AssertionError("Incorrect value for key a_b_age");
        }
        if (!$assertionsDisabled && !"BOB".equals(cache.get("a_c_name"))) {
            throw new AssertionError("Incorrect value for key a_c_name");
        }
        if (!$assertionsDisabled && !FORTY.equals(cache.get("a_c_age"))) {
            throw new AssertionError("Incorrect value for key a_c_age");
        }
    }

    static {
        $assertionsDisabled = !StateTransferReplicationQueueTest.class.desiredAssertionStatus();
        TWENTY = 20;
        FORTY = 40;
    }
}
