package org.infinispan.replication;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.transaction.TransactionManager;
import org.infinispan.Cache;
import org.infinispan.atomic.AtomicHashMapConcurrencyTest;
import org.infinispan.atomic.AtomicMapLookup;
import org.infinispan.config.Configuration;
import org.infinispan.config.GlobalConfiguration;
import org.infinispan.executors.ScheduledExecutorFactory;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.remoting.ReplicationQueue;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "replication.ReplicationQueueTest")
/* loaded from: input_file:org/infinispan/replication/ReplicationQueueTest.class */
public class ReplicationQueueTest extends MultipleCacheManagersTest {
    private static final int REPL_QUEUE_INTERVAL = 5000;
    private static final int REPL_QUEUE_MAX_ELEMENTS = 10;
    long creationTime;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/infinispan/replication/ReplicationQueueTest$ReplQueueTestScheduledExecutorFactory.class */
    public static class ReplQueueTestScheduledExecutorFactory implements ScheduledExecutorFactory {
        static Properties myProps = new Properties();
        static boolean methodCalled = false;
        static Runnable command;
        static long initialDelay;
        static long delay;
        static TimeUnit unit;

        public ScheduledExecutorService getScheduledExecutor(Properties properties) {
            Assert.assertEquals(properties.size(), 5);
            Assert.assertEquals(properties.get("componentName"), "replicationQueue-thread");
            Assert.assertEquals(properties.get("threadPriority"), "" + KnownComponentNames.getDefaultThreadPrio("org.infinispan.executors.replicationQueue"));
            Assert.assertEquals(properties.get("aaa"), "bbb");
            Assert.assertEquals(properties.get("ddd"), "ccc");
            Assert.assertTrue(properties.containsKey("threadNameSuffix"));
            methodCalled = true;
            return new ScheduledThreadPoolExecutor(1) { // from class: org.infinispan.replication.ReplicationQueueTest.ReplQueueTestScheduledExecutorFactory.1
                @Override // java.util.concurrent.ScheduledThreadPoolExecutor, java.util.concurrent.ScheduledExecutorService
                public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable, long j, long j2, TimeUnit timeUnit) {
                    ReplQueueTestScheduledExecutorFactory.command = runnable;
                    ReplQueueTestScheduledExecutorFactory.initialDelay = j;
                    ReplQueueTestScheduledExecutorFactory.delay = j2;
                    ReplQueueTestScheduledExecutorFactory.unit = timeUnit;
                    return null;
                }
            };
        }

        static {
            myProps.put("aaa", "bbb");
            myProps.put("ddd", "ccc");
        }
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        GlobalConfiguration clusteredDefault = GlobalConfiguration.getClusteredDefault();
        clusteredDefault.setReplicationQueueScheduledExecutorFactoryClass(ReplQueueTestScheduledExecutorFactory.class.getName());
        clusteredDefault.setReplicationQueueScheduledExecutorProperties(ReplQueueTestScheduledExecutorFactory.myProps);
        registerCacheManager(TestCacheManagerFactory.createCacheManager(clusteredDefault), TestCacheManagerFactory.createCacheManager(clusteredDefault));
        Configuration defaultClusteredConfig = getDefaultClusteredConfig(Configuration.CacheMode.REPL_ASYNC, true);
        defaultClusteredConfig.setUseReplQueue(true);
        defaultClusteredConfig.setReplQueueInterval(5000L);
        defaultClusteredConfig.setReplQueueMaxElements(10);
        this.creationTime = System.currentTimeMillis();
        manager(0).defineConfiguration("replQueue", defaultClusteredConfig);
        Configuration clone = defaultClusteredConfig.clone();
        clone.setUseReplQueue(false);
        manager(1).defineConfiguration("replQueue", clone);
    }

    @Test(dependsOnMethods = {"testReplicationBasedOnTime"})
    public void testAppropriateExecutorIsUsed() {
        if (!$assertionsDisabled && !ReplQueueTestScheduledExecutorFactory.methodCalled) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && ReplQueueTestScheduledExecutorFactory.command == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && ReplQueueTestScheduledExecutorFactory.delay != 5000) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && ReplQueueTestScheduledExecutorFactory.initialDelay != 5000) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && ReplQueueTestScheduledExecutorFactory.unit != TimeUnit.MILLISECONDS) {
            throw new AssertionError();
        }
    }

    public void testReplicationBasedOnTime() throws InterruptedException {
        Cache cache = cache(0, "replQueue");
        Cache cache2 = cache(1, "replQueue");
        cache.put(AtomicHashMapConcurrencyTest.KEY, "value");
        ReplicationQueue replicationQueue = (ReplicationQueue) TestingUtil.extractComponent(cache, ReplicationQueue.class);
        if (!$assertionsDisabled && replicationQueue == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && replicationQueue.getElementsCount() != 1) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && cache2.get(AtomicHashMapConcurrencyTest.KEY) != null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !cache.get(AtomicHashMapConcurrencyTest.KEY).equals("value")) {
            throw new AssertionError();
        }
        ReplQueueTestScheduledExecutorFactory.command.run();
        long currentTimeMillis = System.currentTimeMillis();
        while (System.currentTimeMillis() - currentTimeMillis < 5000 && cache2.get(AtomicHashMapConcurrencyTest.KEY) == null) {
            Thread.sleep(50L);
        }
        if (!$assertionsDisabled && !cache2.get(AtomicHashMapConcurrencyTest.KEY).equals("value")) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && replicationQueue.getElementsCount() != 0) {
            throw new AssertionError();
        }
    }

    public void testReplicationBasedOnTimeWithTx() throws Exception {
        Cache cache = cache(0, "replQueue");
        Cache cache2 = cache(1, "replQueue");
        TransactionManager transactionManager = TestingUtil.getTransactionManager(cache);
        transactionManager.begin();
        cache.put(AtomicHashMapConcurrencyTest.KEY, "value");
        transactionManager.commit();
        ReplicationQueue replicationQueue = (ReplicationQueue) TestingUtil.extractComponent(cache, ReplicationQueue.class);
        if (!$assertionsDisabled && replicationQueue == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && replicationQueue.getElementsCount() != 1) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && cache2.get(AtomicHashMapConcurrencyTest.KEY) != null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !cache.get(AtomicHashMapConcurrencyTest.KEY).equals("value")) {
            throw new AssertionError();
        }
        ReplQueueTestScheduledExecutorFactory.command.run();
        long currentTimeMillis = System.currentTimeMillis();
        while (System.currentTimeMillis() - currentTimeMillis < 5000 && cache2.get(AtomicHashMapConcurrencyTest.KEY) == null) {
            Thread.sleep(50L);
        }
        if (!$assertionsDisabled && !cache2.get(AtomicHashMapConcurrencyTest.KEY).equals("value")) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && replicationQueue.getElementsCount() != 0) {
            throw new AssertionError();
        }
    }

    public void testReplicationBasedOnSize() throws Exception {
        Cache cache = cache(0, "replQueue");
        Cache cache2 = cache(1, "replQueue");
        for (int i = 0; i < 10; i++) {
            cache.put(AtomicHashMapConcurrencyTest.KEY + i, "value" + i);
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (System.currentTimeMillis() - currentTimeMillis < 3000 && cache2.size() != 10) {
            Thread.sleep(50L);
        }
        for (int i2 = 0; i2 < 10; i2++) {
            if (!$assertionsDisabled && !cache2.get(AtomicHashMapConcurrencyTest.KEY + i2).equals("value" + i2)) {
                throw new AssertionError();
            }
        }
    }

    public void testReplicationBasedOnSizeWithTx() throws Exception {
        Cache cache = cache(0, "replQueue");
        Cache cache2 = cache(1, "replQueue");
        TransactionManager transactionManager = TestingUtil.getTransactionManager(cache);
        for (int i = 0; i < 10; i++) {
            transactionManager.begin();
            cache.put(AtomicHashMapConcurrencyTest.KEY + i, "value" + i);
            transactionManager.commit();
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (System.currentTimeMillis() - currentTimeMillis < 3000 && cache2.size() != 10) {
            Thread.sleep(50L);
        }
        for (int i2 = 0; i2 < 10; i2++) {
            if (!$assertionsDisabled && !cache2.get(AtomicHashMapConcurrencyTest.KEY + i2).equals("value" + i2)) {
                throw new AssertionError();
            }
        }
    }

    public void testReplicationQueueMultipleThreads() throws Exception {
        final Cache cache = cache(0, "replQueue");
        Cache cache2 = cache(1, "replQueue");
        Thread[] threadArr = new Thread[5];
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        for (int i = 0; i < 5; i++) {
            final int i2 = i;
            threadArr[i] = new Thread() { // from class: org.infinispan.replication.ReplicationQueueTest.1
                int index;

                {
                    this.index = i2;
                }

                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        countDownLatch.await();
                    } catch (InterruptedException e) {
                    }
                    for (int i3 = 0; i3 < 2; i3++) {
                        cache.put(AtomicHashMapConcurrencyTest.KEY + this.index + "_" + i3, "value");
                    }
                }
            };
            threadArr[i].start();
        }
        countDownLatch.countDown();
        for (Thread thread : threadArr) {
            thread.join();
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (System.currentTimeMillis() - currentTimeMillis < 3000 && cache2.size() != 10) {
            Thread.sleep(50L);
        }
        if (!$assertionsDisabled && cache2.size() != 10) {
            throw new AssertionError();
        }
        ReplicationQueue replicationQueue = (ReplicationQueue) TestingUtil.extractComponent(cache, ReplicationQueue.class);
        if (!$assertionsDisabled && replicationQueue.getElementsCount() != (5 * 2) - 10) {
            throw new AssertionError();
        }
    }

    public void testAtomicHashMap() throws Exception {
        Cache cache = cache(0, "replQueue");
        Cache cache2 = cache(1, "replQueue");
        TransactionManager transactionManager = TestingUtil.getTransactionManager(cache);
        transactionManager.begin();
        AtomicMapLookup.getAtomicMap(cache, "foo").put("sub-key", "sub-value");
        transactionManager.commit();
        ReplQueueTestScheduledExecutorFactory.command.run();
        long currentTimeMillis = System.currentTimeMillis();
        while (System.currentTimeMillis() - currentTimeMillis < 5000 && cache2.get("foo") == null) {
            Thread.sleep(50L);
        }
        if (!$assertionsDisabled && AtomicMapLookup.getAtomicMap(cache2, "foo", false) == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && AtomicMapLookup.getAtomicMap(cache2, "foo").get("sub-key") == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !AtomicMapLookup.getAtomicMap(cache2, "foo").get("sub-key").equals("sub-value")) {
            throw new AssertionError();
        }
    }

    static {
        $assertionsDisabled = !ReplicationQueueTest.class.desiredAssertionStatus();
    }
}
