package org.infinispan.replication;

import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.config.Configuration;
import org.infinispan.config.GlobalConfiguration;
import org.infinispan.remoting.ReplicationQueueImpl;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "replication.ConcurrentFlushReplQueueTest")
/* loaded from: input_file:modeshape-unit-test/lib/infinispan-core-5.1.2.FINAL-tests.jar:org/infinispan/replication/ConcurrentFlushReplQueueTest.class */
public class ConcurrentFlushReplQueueTest extends MultipleCacheManagersTest {
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:modeshape-unit-test/lib/infinispan-core-5.1.2.FINAL-tests.jar:org/infinispan/replication/ConcurrentFlushReplQueueTest$MockReplQueue.class */
    public static class MockReplQueue extends ReplicationQueueImpl {
        static final Log log = LogFactory.getLog(MockReplQueue.class);
        static CountDownLatch intervalFlushLatch;
        static CountDownLatch secondPutLatch;
        static CountDownLatch removeCompletedLatch;

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.infinispan.remoting.ReplicationQueueImpl
        public List<ReplicableCommand> drainReplQueue() {
            log.debugf("drainReplQueue called", new Object[0]);
            List<ReplicableCommand> drainReplQueue = super.drainReplQueue();
            try {
                if (drainReplQueue.size() > 0 && Thread.currentThread().getName().startsWith("Scheduled-")) {
                    log.debugf("Drained the put command on the replication thread: %s", drainReplQueue);
                    secondPutLatch.countDown();
                    intervalFlushLatch.await(5L, TimeUnit.SECONDS);
                } else if (drainReplQueue.size() > 0) {
                    log.debugf("Drained the put+remove commands on the main thread: %s", drainReplQueue);
                    removeCompletedLatch.countDown();
                }
                return drainReplQueue;
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        Configuration configuration = new Configuration();
        configuration.setCacheMode(Configuration.CacheMode.REPL_ASYNC);
        configuration.setUseReplQueue(true);
        configuration.setReplQueueInterval(1000L);
        configuration.setReplQueueMaxElements(2);
        configuration.setReplQueueClass(MockReplQueue.class.getName());
        registerCacheManager(TestCacheManagerFactory.createCacheManager(GlobalConfiguration.getClusteredDefault(), configuration), TestCacheManagerFactory.createCacheManager(GlobalConfiguration.getClusteredDefault(), configuration));
    }

    public void testConcurrentFlush(Method method) throws Exception {
        Cache cache = cache(0);
        Cache cache2 = cache(1);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        MockReplQueue.intervalFlushLatch = countDownLatch;
        MockReplQueue.secondPutLatch = countDownLatch2;
        MockReplQueue.removeCompletedLatch = countDownLatch3;
        String str = "k-" + method.getName();
        cache.put(str, "v-" + method.getName());
        countDownLatch2.await();
        cache.put("k-blah", "v-blah");
        cache.remove(str);
        countDownLatch3.await(1000L, TimeUnit.MILLISECONDS);
        countDownLatch.countDown();
        TestingUtil.sleepThread(500L);
        if (!$assertionsDisabled && cache2.containsKey(str)) {
            throw new AssertionError();
        }
    }

    static {
        $assertionsDisabled = !ConcurrentFlushReplQueueTest.class.desiredAssertionStatus();
    }
}
