package org.infinispan.distribution;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.Cache;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.write.InvalidateL1Command;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.interceptors.base.CommandInterceptor;
import org.infinispan.interceptors.distribution.L1WriteSynchronizer;
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.util.concurrent.IsolationLevel;
import org.infinispan.util.concurrent.NotifyingFuture;
import org.junit.Assert;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "distribution.BaseDistSyncL1Test")
/* loaded from: input_file:org/infinispan/distribution/BaseDistSyncL1Test.class */
public abstract class BaseDistSyncL1Test extends BaseDistFunctionalTest<Object, String> {
    protected static final String key = "key-to-the-cache";
    protected static final String firstValue = "first-put";
    protected static final String secondValue = "second-put";
    protected IsolationLevel isolationLevel = IsolationLevel.READ_COMMITTED;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.distribution.BaseDistFunctionalTest
    public ConfigurationBuilder buildConfiguration() {
        ConfigurationBuilder buildConfiguration = super.buildConfiguration();
        buildConfiguration.locking().isolationLevel(this.isolationLevel);
        return buildConfiguration;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addBlockingInterceptorBeforeTx(Cache<?, ?> cache, CyclicBarrier cyclicBarrier, Class<? extends VisitableCommand> cls) {
        addBlockingInterceptorBeforeTx(cache, cyclicBarrier, cls, true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addBlockingInterceptorBeforeTx(Cache<?, ?> cache, CyclicBarrier cyclicBarrier, Class<? extends VisitableCommand> cls, boolean z) {
        addBlockingInterceptor(cache, cyclicBarrier, cls, getDistributionInterceptorClass(), z);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addBlockingInterceptor(Cache<?, ?> cache, CyclicBarrier cyclicBarrier, Class<? extends VisitableCommand> cls, Class<? extends CommandInterceptor> cls2, boolean z) {
        cache.getAdvancedCache().addInterceptorBefore(new BlockingInterceptor(cyclicBarrier, cls, z), cls2);
    }

    protected abstract Class<? extends CommandInterceptor> getDistributionInterceptorClass();

    protected abstract Class<? extends CommandInterceptor> getL1InterceptorClass();

    /* JADX INFO: Access modifiers changed from: protected */
    public void assertL1StateOnLocalWrite(Cache<?, ?> cache, Cache<?, ?> cache2, Object obj, Object obj2) {
        assertIsNotInL1(cache, obj);
    }

    protected void assertL1GetWithConcurrentUpdate(Cache<Object, String> cache, Cache<Object, String> cache2, Object obj, String str, String str2) throws InterruptedException, ExecutionException, TimeoutException, BrokenBarrierException {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        addBlockingInterceptorBeforeTx(cache, cyclicBarrier, GetKeyValueCommand.class);
        try {
            NotifyingFuture async = cache.getAsync(obj);
            cyclicBarrier.await(5L, TimeUnit.SECONDS);
            AssertJUnit.assertEquals(str, (String) cache2.put(obj, str2));
            cyclicBarrier.await(5L, TimeUnit.SECONDS);
            AssertJUnit.assertEquals(str, (String) async.get(5L, TimeUnit.SECONDS));
            removeAllBlockingInterceptorsFromCache(cache);
            assertL1StateOnLocalWrite(cache, cache2, obj, str2);
            AssertJUnit.assertEquals(str2, (String) cache.get(obj));
            assertIsInL1(cache, obj);
            removeAllBlockingInterceptorsFromCache(cache);
        } catch (Throwable th) {
            removeAllBlockingInterceptorsFromCache(cache);
            throw th;
        }
    }

    @Test
    public void testNoEntryInL1GetWithConcurrentInvalidation() throws InterruptedException, ExecutionException, TimeoutException, BrokenBarrierException {
        Cache<Object, String> firstNonOwner = getFirstNonOwner(key);
        Cache<Object, String> firstOwner = getFirstOwner(key);
        firstOwner.put(key, firstValue);
        assertL1GetWithConcurrentUpdate(firstNonOwner, firstOwner, key, firstValue, secondValue);
    }

    @Test
    public void testEntryInL1GetWithConcurrentInvalidation() throws InterruptedException, ExecutionException, TimeoutException, BrokenBarrierException {
        Cache<?, ?> firstNonOwner = getFirstNonOwner(key);
        Cache<Object, String> firstOwner = getFirstOwner(key);
        firstOwner.put(key, firstValue);
        firstNonOwner.get(key);
        assertIsInL1(firstNonOwner, key);
        assertL1GetWithConcurrentUpdate(firstNonOwner, firstOwner, key, firstValue, secondValue);
    }

    @Test
    public void testEntryInL1GetWithConcurrentPut() throws InterruptedException, ExecutionException, TimeoutException, BrokenBarrierException {
        Cache<?, ?> firstNonOwner = getFirstNonOwner(key);
        getFirstOwner(key).put(key, firstValue);
        firstNonOwner.get(key);
        assertIsInL1(firstNonOwner, key);
        assertL1GetWithConcurrentUpdate(firstNonOwner, firstNonOwner, key, firstValue, secondValue);
    }

    @Test
    public void testNoEntryInL1GetWithConcurrentPut() throws InterruptedException, ExecutionException, TimeoutException, BrokenBarrierException {
        Cache<Object, String> firstNonOwner = getFirstNonOwner(key);
        getFirstOwner(key).put(key, firstValue);
        assertL1GetWithConcurrentUpdate(firstNonOwner, firstNonOwner, key, firstValue, secondValue);
    }

    @Test
    public void testNoEntryInL1MultipleConcurrentGetsWithInvalidation() throws TimeoutException, InterruptedException, ExecutionException, BrokenBarrierException {
        final Cache<?, ?> firstNonOwner = getFirstNonOwner(key);
        Cache<Object, String> firstOwner = getFirstOwner(key);
        firstOwner.put(key, firstValue);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        firstNonOwner.getAdvancedCache().addInterceptorBefore(new BlockingInterceptor(cyclicBarrier, InvalidateL1Command.class, false), getL1InterceptorClass());
        try {
            AssertJUnit.assertEquals(firstValue, (String) firstNonOwner.get(key));
            NotifyingFuture putAsync = firstOwner.putAsync(key, secondValue);
            cyclicBarrier.await(5L, TimeUnit.SECONDS);
            firstNonOwner.getAdvancedCache().getDataContainer().remove(key);
            removeAllBlockingInterceptorsFromCache(firstNonOwner);
            CyclicBarrier cyclicBarrier2 = new CyclicBarrier(2);
            addBlockingInterceptorBeforeTx(firstNonOwner, cyclicBarrier2, GetKeyValueCommand.class);
            NotifyingFuture async = firstNonOwner.getAsync(key);
            cyclicBarrier2.await(5L, TimeUnit.SECONDS);
            cyclicBarrier.await(5L, TimeUnit.SECONDS);
            AssertJUnit.assertEquals(firstValue, (String) putAsync.get(5L, TimeUnit.SECONDS));
            cyclicBarrier2.await(5L, TimeUnit.SECONDS);
            AssertJUnit.assertNotNull(async.get(5L, TimeUnit.SECONDS));
            removeAllBlockingInterceptorsFromCache(firstNonOwner);
            assertIsNotInL1(firstNonOwner, key);
            eventually(new AbstractInfinispanTest.Condition() { // from class: org.infinispan.distribution.BaseDistSyncL1Test.1
                @Override // org.infinispan.test.AbstractInfinispanTest.Condition
                public boolean isSatisfied() throws Exception {
                    AssertJUnit.assertEquals(BaseDistSyncL1Test.secondValue, (String) firstNonOwner.get(BaseDistSyncL1Test.key));
                    return BaseDistSyncL1Test.this.isInL1(firstNonOwner, BaseDistSyncL1Test.key);
                }
            });
            removeAllBlockingInterceptorsFromCache(firstNonOwner);
        } catch (Throwable th) {
            removeAllBlockingInterceptorsFromCache(firstNonOwner);
            throw th;
        }
    }

    @Test
    public void testGetAfterWriteAlreadyInvalidatedCurrentGet() throws InterruptedException, TimeoutException, BrokenBarrierException, ExecutionException {
        final Cache<?, ?> firstNonOwner = getFirstNonOwner(key);
        Cache<Object, String> firstOwner = getFirstOwner(key);
        firstOwner.put(key, firstValue);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        addBlockingInterceptor(firstNonOwner, cyclicBarrier, GetKeyValueCommand.class, getDistributionInterceptorClass(), true);
        try {
            Future fork = fork(new Callable<String>() { // from class: org.infinispan.distribution.BaseDistSyncL1Test.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public String call() throws Exception {
                    return (String) firstNonOwner.get(BaseDistSyncL1Test.key);
                }
            });
            cyclicBarrier.await(10L, TimeUnit.SECONDS);
            firstOwner.put(key, secondValue);
            AssertJUnit.assertEquals(secondValue, (String) firstNonOwner.get(key));
            assertIsInL1(firstNonOwner, key);
            AssertJUnit.assertEquals(secondValue, firstNonOwner.getAdvancedCache().getDataContainer().get(key).getValue());
            cyclicBarrier.await(10L, TimeUnit.SECONDS);
            AssertJUnit.assertEquals(firstValue, (String) fork.get(10L, TimeUnit.SECONDS));
            assertIsInL1(firstNonOwner, key);
            AssertJUnit.assertEquals(secondValue, firstNonOwner.getAdvancedCache().getDataContainer().get(key).getValue());
            removeAllBlockingInterceptorsFromCache(firstNonOwner);
        } catch (Throwable th) {
            removeAllBlockingInterceptorsFromCache(firstNonOwner);
            throw th;
        }
    }

    @Test
    public void testRemoteGetArrivesButWriteOccursBeforeRegistration() throws Throwable {
        Cache<Object, String>[] owners = getOwners(key, 2);
        Cache<Object, String> cache = owners[0];
        Cache<Object, String> cache2 = owners[1];
        Cache<?, ?> firstNonOwner = getFirstNonOwner(key);
        cache.put(key, firstValue);
        assertIsNotInL1(firstNonOwner, key);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        cache.getAdvancedCache().addInterceptorAfter(new BlockingInterceptor(cyclicBarrier, GetKeyValueCommand.class, true), getL1InterceptorClass());
        cache2.getAdvancedCache().addInterceptorAfter(new BlockingInterceptor(cyclicBarrier, GetKeyValueCommand.class, true), getL1InterceptorClass());
        try {
            NotifyingFuture async = firstNonOwner.getAsync(key);
            cyclicBarrier.await(10L, TimeUnit.SECONDS);
            Assert.assertEquals(firstValue, cache.put(key, secondValue));
            cyclicBarrier.await(10L, TimeUnit.SECONDS);
            Assert.assertEquals(this.isolationLevel == IsolationLevel.REPEATABLE_READ ? firstValue : secondValue, async.get(10L, TimeUnit.SECONDS));
            assertIsNotInL1(firstNonOwner, key);
            removeAllBlockingInterceptorsFromCache(cache);
            removeAllBlockingInterceptorsFromCache(cache2);
        } catch (Throwable th) {
            removeAllBlockingInterceptorsFromCache(cache);
            removeAllBlockingInterceptorsFromCache(cache2);
            throw th;
        }
    }

    @Test
    public void testGetBlockedInvalidation() throws Throwable {
        Cache<?, ?> firstNonOwner = getFirstNonOwner(key);
        Cache<Object, String> firstOwner = getFirstOwner(key);
        firstOwner.put(key, firstValue);
        assertIsNotInL1(firstNonOwner, key);
        CheckPoint checkPoint = new CheckPoint();
        waitUntilAboutToAcquireLock(firstNonOwner, checkPoint);
        this.log.warn("Doing get here - ignore all previous");
        NotifyingFuture async = firstNonOwner.getAsync(key);
        checkPoint.awaitStrict("pre_acquire_shared_topology_lock_invoked", 10L, TimeUnit.SECONDS);
        NotifyingFuture putAsync = firstOwner.putAsync(key, secondValue);
        try {
            putAsync.get(1L, TimeUnit.SECONDS);
            Assert.fail("Should have thrown a TimeoutException");
        } catch (TimeoutException e) {
        }
        checkPoint.triggerForever("pre_acquire_shared_topology_lock_released");
        Assert.assertEquals(firstValue, async.get(10L, TimeUnit.SECONDS));
        Assert.assertEquals(firstValue, putAsync.get(10L, TimeUnit.SECONDS));
        assertIsNotInL1(firstNonOwner, key);
    }

    @Test
    public void testGetBlockingAnotherGet() throws Throwable {
        Cache<?, ?> firstNonOwner = getFirstNonOwner(key);
        getFirstOwner(key).put(key, firstValue);
        assertIsNotInL1(firstNonOwner, key);
        CheckPoint checkPoint = new CheckPoint();
        StateTransferLock waitUntilAboutToAcquireLock = waitUntilAboutToAcquireLock(firstNonOwner, checkPoint);
        try {
            this.log.warn("Doing get here - ignore all previous");
            NotifyingFuture async = firstNonOwner.getAsync(key);
            checkPoint.awaitStrict("pre_acquire_shared_topology_lock_invoked", 10L, TimeUnit.SECONDS);
            NotifyingFuture async2 = firstNonOwner.getAsync(key);
            try {
                async2.get(1L, TimeUnit.SECONDS);
                Assert.fail("Should have thrown a TimeoutException");
            } catch (TimeoutException e) {
            }
            checkPoint.triggerForever("pre_acquire_shared_topology_lock_released");
            Assert.assertEquals(firstValue, async.get(10L, TimeUnit.SECONDS));
            Assert.assertEquals(firstValue, async2.get(10L, TimeUnit.SECONDS));
            assertIsInL1(firstNonOwner, key);
            TestingUtil.replaceComponent(firstNonOwner, (Class<StateTransferLock>) StateTransferLock.class, waitUntilAboutToAcquireLock, true);
        } catch (Throwable th) {
            TestingUtil.replaceComponent(firstNonOwner, (Class<StateTransferLock>) StateTransferLock.class, waitUntilAboutToAcquireLock, true);
            throw th;
        }
    }

    @Test
    public void testGetBlockingAnotherGetWithMiss() throws Throwable {
        Cache<?, ?> firstNonOwner = getFirstNonOwner(key);
        getFirstOwner(key);
        assertIsNotInL1(firstNonOwner, key);
        CheckPoint checkPoint = new CheckPoint();
        L1Manager waitUntilL1Registration = waitUntilL1Registration(firstNonOwner, checkPoint);
        try {
            this.log.warn("Doing get here - ignore all previous");
            NotifyingFuture async = firstNonOwner.getAsync(key);
            checkPoint.awaitStrict("pre_acquire_shared_topology_lock_invoked", 10L, TimeUnit.SECONDS);
            NotifyingFuture async2 = firstNonOwner.getAsync(key);
            try {
                async2.get(1L, TimeUnit.SECONDS);
                Assert.fail("Should have thrown a TimeoutException");
            } catch (TimeoutException e) {
            }
            checkPoint.triggerForever("pre_acquire_shared_topology_lock_released");
            Assert.assertNull(async.get(10L, TimeUnit.SECONDS));
            Assert.assertNull(async2.get(10L, TimeUnit.SECONDS));
            TestingUtil.replaceComponent(firstNonOwner, (Class<L1Manager>) L1Manager.class, waitUntilL1Registration, true);
        } catch (Throwable th) {
            TestingUtil.replaceComponent(firstNonOwner, (Class<L1Manager>) L1Manager.class, waitUntilL1Registration, true);
            throw th;
        }
    }

    @Test
    public void testGetBlockingLocalPut() throws Throwable {
        Cache<?, ?> firstNonOwner = getFirstNonOwner(key);
        getFirstOwner(key).put(key, firstValue);
        assertIsNotInL1(firstNonOwner, key);
        CheckPoint checkPoint = new CheckPoint();
        waitUntilAboutToAcquireLock(firstNonOwner, checkPoint);
        this.log.warn("Doing get here - ignore all previous");
        NotifyingFuture async = firstNonOwner.getAsync(key);
        checkPoint.awaitStrict("pre_acquire_shared_topology_lock_invoked", 10L, TimeUnit.SECONDS);
        NotifyingFuture putAsync = firstNonOwner.putAsync(key, secondValue);
        try {
            putAsync.get(1L, TimeUnit.SECONDS);
            Assert.fail("Should have thrown a TimeoutException");
        } catch (TimeoutException e) {
        }
        checkPoint.triggerForever("pre_acquire_shared_topology_lock_released");
        Assert.assertEquals(firstValue, async.get(10L, TimeUnit.SECONDS));
        Assert.assertEquals(firstValue, putAsync.get(10L, TimeUnit.SECONDS));
        if (firstNonOwner.getCacheConfiguration().transaction().transactionMode() == TransactionMode.TRANSACTIONAL) {
            assertIsInL1(firstNonOwner, key);
        } else {
            assertIsNotInL1(firstNonOwner, key);
        }
    }

    protected StateTransferLock waitUntilAboutToAcquireLock(Cache<?, ?> cache, final CheckPoint checkPoint) {
        StateTransferLock stateTransferLock = (StateTransferLock) TestingUtil.extractComponent(cache, StateTransferLock.class);
        final Answer delegatesTo = AdditionalAnswers.delegatesTo(stateTransferLock);
        StateTransferLock stateTransferLock2 = (StateTransferLock) Mockito.mock(StateTransferLock.class, Mockito.withSettings().defaultAnswer(delegatesTo));
        ((StateTransferLock) Mockito.doAnswer(new Answer() { // from class: org.infinispan.distribution.BaseDistSyncL1Test.3
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                checkPoint.trigger("pre_acquire_shared_topology_lock_invoked");
                checkPoint.awaitStrict("pre_acquire_shared_topology_lock_released", 10L, TimeUnit.SECONDS);
                return delegatesTo.answer(invocationOnMock);
            }
        }).when(stateTransferLock2)).acquireSharedTopologyLock();
        TestingUtil.replaceComponent(cache, (Class<StateTransferLock>) StateTransferLock.class, stateTransferLock2, true);
        return stateTransferLock;
    }

    protected L1Manager waitUntilL1Registration(Cache<?, ?> cache, final CheckPoint checkPoint) {
        L1Manager l1Manager = (L1Manager) TestingUtil.extractComponent(cache, L1Manager.class);
        final Answer delegatesTo = AdditionalAnswers.delegatesTo(l1Manager);
        L1Manager l1Manager2 = (L1Manager) Mockito.mock(L1Manager.class, Mockito.withSettings().defaultAnswer(delegatesTo));
        ((L1Manager) Mockito.doAnswer(new Answer() { // from class: org.infinispan.distribution.BaseDistSyncL1Test.4
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                checkPoint.trigger("pre_acquire_shared_topology_lock_invoked");
                checkPoint.awaitStrict("pre_acquire_shared_topology_lock_released", 10L, TimeUnit.SECONDS);
                return delegatesTo.answer(invocationOnMock);
            }
        }).when(l1Manager2)).registerL1WriteSynchronizer(Mockito.anyObject(), (L1WriteSynchronizer) Mockito.any(L1WriteSynchronizer.class));
        TestingUtil.replaceComponent(cache, (Class<L1Manager>) L1Manager.class, l1Manager2, true);
        return l1Manager;
    }
}
