package org.infinispan.statetransfer;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.transaction.TransactionManager;
import org.infinispan.Cache;
import org.infinispan.config.Configuration;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachemanagerlistener.annotation.Merged;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.MergeEvent;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.profiling.DeadlockDetectionPerformanceTest;
import org.infinispan.test.AbstractCacheTest;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TransportFlags;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "statetransfer.StateTransferFunctionalTest", enabled = true)
/* loaded from: input_file:modeshape-unit-test/lib/infinispan-core-5.1.2.FINAL-tests.jar:org/infinispan/statetransfer/StateTransferFunctionalTest.class */
public class StateTransferFunctionalTest extends MultipleCacheManagersTest {
    public static final String A_B_NAME = "a_b_name";
    public static final String A_C_NAME = "a_c_name";
    public static final String A_D_NAME = "a_d_age";
    public static final String A_B_AGE = "a_b_age";
    public static final String A_C_AGE = "a_c_age";
    public static final String A_D_AGE = "a_d_age";
    public static final String JOE = "JOE";
    public static final String BOB = "BOB";
    public static final String JANE = "JANE";
    public static final Integer TWENTY;
    public static final Integer FORTY;
    Configuration config;
    protected final String cacheName;
    private volatile int testCount;
    private static final Log log;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:modeshape-unit-test/lib/infinispan-core-5.1.2.FINAL-tests.jar:org/infinispan/statetransfer/StateTransferFunctionalTest$DelayTransfer.class */
    public static class DelayTransfer implements Serializable {
        private static final long serialVersionUID = 6361429803359702822L;
        private transient int count;

        private void readObject(ObjectInputStream objectInputStream) throws IOException, ClassNotFoundException {
            objectInputStream.defaultReadObject();
        }

        private void writeObject(ObjectOutputStream objectOutputStream) throws IOException {
            objectOutputStream.defaultWriteObject();
            int i = this.count;
            this.count = i + 1;
            if (i == 0) {
                return;
            }
            try {
                Thread.sleep(2000L);
            } catch (InterruptedException e) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:modeshape-unit-test/lib/infinispan-core-5.1.2.FINAL-tests.jar:org/infinispan/statetransfer/StateTransferFunctionalTest$JoiningNode.class */
    public class JoiningNode {
        private final EmbeddedCacheManager cm;
        private final CountDownLatch latch;
        private final MergeOrViewChangeListener listener;

        private JoiningNode() {
            this.cm = StateTransferFunctionalTest.this.createCacheManager();
            this.latch = new CountDownLatch(1);
            this.listener = new MergeOrViewChangeListener(this.latch);
            this.cm.addListener(this.listener);
        }

        Cache getCache(String str) {
            return this.cm.getCache(str);
        }

        void waitForJoin(long j, Cache... cacheArr) throws InterruptedException {
            TestingUtil.blockUntilViewsReceived(j, cacheArr);
            this.latch.await(j, TimeUnit.MILLISECONDS);
        }

        private boolean isStateTransferred() {
            return !this.listener.merged;
        }

        void verifyStateTransfer(Cache cache) {
            if (isStateTransferred()) {
                StateTransferFunctionalTest.this.verifyInitialData(cache);
            }
        }
    }

    @Listener
    /* loaded from: input_file:modeshape-unit-test/lib/infinispan-core-5.1.2.FINAL-tests.jar:org/infinispan/statetransfer/StateTransferFunctionalTest$MergeOrViewChangeListener.class */
    public static class MergeOrViewChangeListener {
        public boolean merged;
        public boolean viewChanged;
        private final CountDownLatch latch;

        public MergeOrViewChangeListener(CountDownLatch countDownLatch) {
            this.latch = countDownLatch;
        }

        @Merged
        public void mergedView(MergeEvent mergeEvent) {
            StateTransferFunctionalTest.log.infof("View merged received %s", mergeEvent);
            this.merged = true;
            this.latch.countDown();
        }

        @ViewChanged
        public void viewChanged(ViewChangedEvent viewChangedEvent) {
            StateTransferFunctionalTest.log.infof("View change received %s", viewChangedEvent);
            this.viewChanged = true;
            this.latch.countDown();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:modeshape-unit-test/lib/infinispan-core-5.1.2.FINAL-tests.jar:org/infinispan/statetransfer/StateTransferFunctionalTest$WritingThread.class */
    public static class WritingThread extends Thread {
        private final Cache<Object, Object> cache;
        private final boolean tx;
        private volatile boolean stop;
        private volatile int result;
        private TransactionManager tm;

        WritingThread(Cache<Object, Object> cache, boolean z) {
            super("WriterThread,StateTransferFunctionalTest");
            this.cache = cache;
            this.tx = z;
            if (z) {
                this.tm = TestingUtil.getTransactionManager(cache);
            }
            setDaemon(true);
        }

        public int result() {
            return this.result;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            int i = 0;
            while (!this.stop) {
                try {
                    if (this.tx) {
                        this.tm.begin();
                    }
                    String str = "test" + i;
                    int i2 = i;
                    i++;
                    this.cache.put(str, Integer.valueOf(i2));
                    if (this.tx) {
                        this.tm.commit();
                    }
                } catch (Exception e) {
                    i--;
                    stopThread();
                }
            }
            this.result = i;
        }

        public void stopThread() {
            this.stop = true;
        }
    }

    public StateTransferFunctionalTest() {
        this("nbst");
    }

    public StateTransferFunctionalTest(String str) {
        this.testCount = 0;
        this.cacheName = str;
        this.cleanup = AbstractCacheTest.CleanupPhase.AFTER_METHOD;
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        this.config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC, true);
        this.config.setSyncReplTimeout(30000L);
        this.config.setFetchInMemoryState(true);
        this.config.setUseLockStriping(false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public EmbeddedCacheManager createCacheManager() {
        EmbeddedCacheManager addClusterEnabledCacheManager = addClusterEnabledCacheManager(new TransportFlags().withMerge(true));
        addClusterEnabledCacheManager.defineConfiguration(this.cacheName, this.config.mo684clone());
        return addClusterEnabledCacheManager;
    }

    public void testInitialStateTransfer(Method method) throws Exception {
        this.testCount++;
        logTestStart(method);
        Cache<Object, Object> cache = createCacheManager().getCache(this.cacheName);
        writeInitialData(cache);
        JoiningNode joiningNode = new JoiningNode();
        Cache cache2 = joiningNode.getCache(this.cacheName);
        joiningNode.waitForJoin(DeadlockDetectionPerformanceTest.BENCHMARK_DURATION, cache, cache2);
        joiningNode.verifyStateTransfer(cache2);
        logTestEnd(method);
    }

    public void testInitialStateTransferCacheNotPresent(Method method) throws Exception {
        this.testCount++;
        logTestStart(method);
        EmbeddedCacheManager createCacheManager = createCacheManager();
        Cache<Object, Object> cache = createCacheManager.getCache(this.cacheName);
        writeInitialData(cache);
        JoiningNode joiningNode = new JoiningNode();
        Cache cache2 = joiningNode.getCache(this.cacheName);
        joiningNode.waitForJoin(DeadlockDetectionPerformanceTest.BENCHMARK_DURATION, cache, cache2);
        joiningNode.verifyStateTransfer(cache2);
        createCacheManager.defineConfiguration("otherCache", this.config.mo684clone());
        createCacheManager.getCache("otherCache");
        logTestEnd(method);
    }

    public void testConcurrentStateTransfer(Method method) throws Exception {
        this.testCount++;
        logTestStart(method);
        Cache<Object, Object> cache = createCacheManager().getCache(this.cacheName);
        writeInitialData(cache);
        JoiningNode joiningNode = new JoiningNode();
        Cache cache2 = joiningNode.getCache(this.cacheName);
        cache.put("delay", new DelayTransfer());
        joiningNode.waitForJoin(DeadlockDetectionPerformanceTest.BENCHMARK_DURATION, cache, cache2);
        joiningNode.verifyStateTransfer(cache2);
        final JoiningNode joiningNode2 = new JoiningNode();
        final JoiningNode joiningNode3 = new JoiningNode();
        Thread thread = new Thread(new Runnable() { // from class: org.infinispan.statetransfer.StateTransferFunctionalTest.1
            @Override // java.lang.Runnable
            public void run() {
                joiningNode2.getCache(StateTransferFunctionalTest.this.cacheName);
            }
        });
        thread.setName("CacheStarter-Cache3");
        thread.start();
        Thread thread2 = new Thread(new Runnable() { // from class: org.infinispan.statetransfer.StateTransferFunctionalTest.2
            @Override // java.lang.Runnable
            public void run() {
                joiningNode3.getCache(StateTransferFunctionalTest.this.cacheName);
            }
        });
        thread2.setName("CacheStarter-Cache4");
        thread2.start();
        thread.join();
        thread2.join();
        Cache cache3 = joiningNode2.getCache(this.cacheName);
        Cache cache4 = joiningNode3.getCache(this.cacheName);
        joiningNode2.waitForJoin(120000L, cache, cache2, cache3, cache4);
        joiningNode3.waitForJoin(120000L, cache, cache2, cache3, cache4);
        joiningNode2.verifyStateTransfer(cache3);
        joiningNode3.verifyStateTransfer(cache4);
        logTestEnd(method);
    }

    public void testSTWithThirdWritingNonTxCache(Method method) throws Exception {
        this.testCount++;
        logTestStart(method);
        thirdWritingCacheTest(false);
        logTestEnd(method);
    }

    public void testSTWithThirdWritingTxCache(Method method) throws Exception {
        this.testCount++;
        logTestStart(method);
        thirdWritingCacheTest(true);
        logTestEnd(method);
    }

    @Test(timeOut = 120000)
    public void testSTWithWritingNonTxThread(Method method) throws Exception {
        this.testCount++;
        logTestStart(method);
        writingThreadTest(false);
        logTestEnd(method);
    }

    @Test(timeOut = 120000)
    public void testSTWithWritingTxThread(Method method) throws Exception {
        this.testCount++;
        logTestStart(method);
        writingThreadTest(true);
        logTestEnd(method);
    }

    @Test(enabled = false, description = "The new state transfer doesn't work with cache or cache manager restarts (yet)")
    public void testInitialStateTransferAfterRestart(Method method) throws Exception {
        this.testCount++;
        logTestStart(method);
        Cache cache = createCacheManager().getCache(this.cacheName);
        writeInitialData(cache);
        JoiningNode joiningNode = new JoiningNode();
        Cache cache2 = joiningNode.getCache(this.cacheName);
        joiningNode.waitForJoin(DeadlockDetectionPerformanceTest.BENCHMARK_DURATION, cache, cache2);
        joiningNode.verifyStateTransfer(cache2);
        cache2.stop();
        cache2.start();
        verifyInitialData(cache2);
        logTestEnd(method);
    }

    private void logTestStart(Method method) {
        logTestLifecycle(method, "start");
    }

    private void logTestEnd(Method method) {
        logTestLifecycle(method, "end");
    }

    private void logTestLifecycle(Method method, String str) {
        log.infof("%s %s - %s", method.getName(), str, Integer.valueOf(this.testCount));
    }

    private void thirdWritingCacheTest(boolean z) throws InterruptedException {
        Cache<Object, Object> cache = createCacheManager().getCache(this.cacheName);
        Cache cache2 = createCacheManager().getCache(this.cacheName);
        TestingUtil.blockUntilViewsReceived(DeadlockDetectionPerformanceTest.BENCHMARK_DURATION, cache, cache2);
        writeInitialData(cache);
        cache.put("delay", new DelayTransfer());
        WritingThread writingThread = new WritingThread(cache2, z);
        writingThread.start();
        JoiningNode joiningNode = new JoiningNode();
        Cache cache3 = joiningNode.getCache(this.cacheName);
        joiningNode.waitForJoin(DeadlockDetectionPerformanceTest.BENCHMARK_DURATION, cache, cache3, cache2);
        writingThread.stopThread();
        writingThread.join();
        joiningNode.verifyStateTransfer(cache3);
        int result = writingThread.result();
        for (int i = 0; i < result; i++) {
            Object obj = cache3.get("test" + i);
            if (!$assertionsDisabled && !new Integer(i).equals(obj)) {
                throw new AssertionError("Entry under key [test" + i + "] was [" + cache3.get("test" + i) + "] but expected [" + i + "]");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void verifyInitialData(Cache<Object, Object> cache) {
        log.debugf("Checking values on cache " + cache.getAdvancedCache().getRpcManager().getAddress(), new Object[0]);
        if (!$assertionsDisabled && !"JOE".equals(cache.get("a_b_name"))) {
            throw new AssertionError("Incorrect value for key a_b_name");
        }
        if (!$assertionsDisabled && !TWENTY.equals(cache.get("a_b_age"))) {
            throw new AssertionError("Incorrect value for key a_b_age");
        }
        if (!$assertionsDisabled && !"BOB".equals(cache.get("a_c_name"))) {
            throw new AssertionError("Incorrect value for key a_c_name");
        }
        if (!$assertionsDisabled && !FORTY.equals(cache.get("a_c_age"))) {
            throw new AssertionError("Incorrect value for key a_c_age");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void writeInitialData(Cache<Object, Object> cache) {
        cache.put("a_b_name", "JOE");
        cache.put("a_b_age", TWENTY);
        cache.put("a_c_name", "BOB");
        cache.put("a_c_age", FORTY);
    }

    private void writingThreadTest(boolean z) throws InterruptedException {
        Cache<Object, Object> cache = createCacheManager().getCache(this.cacheName);
        writeInitialData(cache);
        cache.put("delay", new DelayTransfer());
        WritingThread writingThread = new WritingThread(cache, z);
        writingThread.start();
        verifyInitialData(cache);
        JoiningNode joiningNode = new JoiningNode();
        Cache cache2 = joiningNode.getCache(this.cacheName);
        joiningNode.waitForJoin(DeadlockDetectionPerformanceTest.BENCHMARK_DURATION, cache, cache2);
        writingThread.stopThread();
        writingThread.join();
        verifyInitialData(cache);
        joiningNode.verifyStateTransfer(cache2);
        int result = writingThread.result();
        for (int i = 0; i < result; i++) {
            if (!$assertionsDisabled && !new Integer(i).equals(cache2.get("test" + i))) {
                throw new AssertionError("Entry under key [test" + i + "] was [" + cache2.get("test" + i) + "] but expected [" + i + "]");
            }
        }
    }

    static {
        $assertionsDisabled = !StateTransferFunctionalTest.class.desiredAssertionStatus();
        TWENTY = 20;
        FORTY = 40;
        log = LogFactory.getLog(StateTransferFunctionalTest.class);
    }
}
