package org.infinispan.statetransfer;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.lang.reflect.Method;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.transaction.TransactionManager;
import org.infinispan.Cache;
import org.infinispan.config.Configuration;
import org.infinispan.config.GlobalConfiguration;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.profiling.DeadlockDetectionPerformanceTest;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "statetransfer.StateTransferReplicationQueueTest")
/* loaded from: input_file:org/infinispan/statetransfer/StateTransferReplicationQueueTest.class */
public class StateTransferReplicationQueueTest extends MultipleCacheManagersTest {
    public static final String A_B_NAME = "a_b_name";
    public static final String A_C_NAME = "a_c_name";
    public static final String A_B_AGE = "a_b_age";
    public static final String A_C_AGE = "a_c_age";
    public static final String JOE = "JOE";
    public static final String BOB = "BOB";
    public static final Integer TWENTY;
    public static final Integer FORTY;
    private final String cacheName = "nbst-replqueue";
    Configuration config;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/infinispan/statetransfer/StateTransferReplicationQueueTest$PojoValue.class */
    public static class PojoValue implements Externalizable {
        Log log = LogFactory.getLog(PojoValue.class);
        static AtomicBoolean holdUp = new AtomicBoolean();
        volatile int value;

        public PojoValue() {
        }

        public PojoValue(int i) {
            this.value = i;
        }

        @Override // java.io.Externalizable
        public void writeExternal(ObjectOutput objectOutput) throws IOException {
            String name = Thread.currentThread().getName();
            if (!holdUp.get() && name.contains("STREAMING_STATE_TRANSFER-sender")) {
                this.log.debug("In streaming...");
                holdUp.compareAndSet(false, true);
                this.log.debug("Holding up...");
                TestingUtil.sleepThread(2000L);
            }
            objectOutput.writeInt(this.value);
        }

        @Override // java.io.Externalizable
        public void readExternal(ObjectInput objectInput) throws IOException, ClassNotFoundException {
            this.value = objectInput.readInt();
        }

        public int hashCode() {
            return this.value + 31;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.value == ((PojoValue) obj).value;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/statetransfer/StateTransferReplicationQueueTest$WritingThread.class */
    public static class WritingThread extends Thread {
        private final Cache<Object, Object> cache;
        private final boolean tx;
        private volatile boolean stop;
        private volatile int result;
        private TransactionManager tm;

        WritingThread(Cache<Object, Object> cache, boolean z) {
            super("WriterThread");
            this.cache = cache;
            this.tx = z;
            if (z) {
                this.tm = TestingUtil.getTransactionManager(cache);
            }
            setDaemon(true);
        }

        public int result() {
            return this.result;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            int i = 0;
            while (!this.stop) {
                try {
                    if (this.tx) {
                        this.tm.begin();
                    }
                    this.cache.put("test" + i, new PojoValue(i));
                    this.cache.remove("test" + i);
                    i++;
                    if (this.tx) {
                        this.tm.commit();
                    }
                    if (i % 1000 == 0) {
                        TestingUtil.sleepThread(1L);
                    }
                } catch (Exception e) {
                    stopThread();
                }
            }
            this.result = i;
        }

        public void stopThread() {
            this.stop = true;
        }
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        this.config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_ASYNC, true);
        this.config.setUseReplQueue(true);
        this.config.setReplQueueInterval(100L, TimeUnit.MILLISECONDS);
        this.config.setReplQueueMaxElements(100);
        this.config.setUseAsyncMarshalling(false);
        this.config.setFetchInMemoryState(true);
        this.config.setUseLockStriping(false);
    }

    protected EmbeddedCacheManager createCacheManager() {
        EmbeddedCacheManager addClusterEnabledCacheManager = addClusterEnabledCacheManager();
        GlobalConfiguration globalConfiguration = addClusterEnabledCacheManager.getGlobalConfiguration();
        Properties properties = new Properties();
        properties.setProperty("maxThreads", "25");
        globalConfiguration.setAsyncTransportExecutorProperties(properties);
        addClusterEnabledCacheManager.defineConfiguration("nbst-replqueue", this.config.clone());
        return addClusterEnabledCacheManager;
    }

    protected void writeInitialData(Cache<Object, Object> cache) {
        cache.put("a_b_name", "JOE");
        cache.put("a_b_age", TWENTY);
        cache.put("a_c_name", "BOB");
        cache.put("a_c_age", FORTY);
    }

    protected void verifyInitialData(Cache<Object, Object> cache) {
        if (!$assertionsDisabled && !"JOE".equals(cache.get("a_b_name"))) {
            throw new AssertionError("Incorrect value for key a_b_name");
        }
        if (!$assertionsDisabled && !TWENTY.equals(cache.get("a_b_age"))) {
            throw new AssertionError("Incorrect value for key a_b_age");
        }
        if (!$assertionsDisabled && !"BOB".equals(cache.get("a_c_name"))) {
            throw new AssertionError("Incorrect value for key a_c_name");
        }
        if (!$assertionsDisabled && !FORTY.equals(cache.get("a_c_age"))) {
            throw new AssertionError("Incorrect value for key a_c_age");
        }
    }

    public void testStateTransferWithNodeRestartedAndBusy(Method method) throws Exception {
        this.log.info(method.getName() + " start");
        thirdWritingCacheTest(false);
        this.log.info(method.getName() + "end");
    }

    private void thirdWritingCacheTest(boolean z) throws InterruptedException {
        Cache<Object, Object> cache = createCacheManager().getCache("nbst-replqueue");
        EmbeddedCacheManager createCacheManager = createCacheManager();
        createCacheManager.getCache("nbst-replqueue");
        writeInitialData(cache);
        WritingThread writingThread = new WritingThread(cache, z);
        writingThread.start();
        createCacheManager.stop();
        TestingUtil.blockUntilViewsReceived(DeadlockDetectionPerformanceTest.BENCHMARK_DURATION, false, cache);
        Cache<Object, Object> cache2 = createCacheManager().getCache("nbst-replqueue");
        TestingUtil.blockUntilViewsReceived(DeadlockDetectionPerformanceTest.BENCHMARK_DURATION, cache, cache2);
        writingThread.stopThread();
        writingThread.join(DeadlockDetectionPerformanceTest.BENCHMARK_DURATION);
        verifyInitialData(cache2);
        int result = writingThread.result();
        TestingUtil.sleepThread(5000L);
        for (int i = 0; i < result; i++) {
            Object obj = cache2.get("test" + i);
            if (!$assertionsDisabled && obj != null) {
                throw new AssertionError();
            }
        }
    }

    static {
        $assertionsDisabled = !StateTransferReplicationQueueTest.class.desiredAssertionStatus();
        TWENTY = 20;
        FORTY = 40;
    }
}
