package org.infinispan.persistence.support;

import java.lang.reflect.Method;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.Cache;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.configuration.BuiltBy;
import org.infinispan.commons.configuration.ConfigurationFor;
import org.infinispan.commons.configuration.attributes.AttributeSet;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.test.Exceptions;
import org.infinispan.commons.test.TestResourceTracker;
import org.infinispan.configuration.cache.AsyncStoreConfiguration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.PersistenceConfigurationBuilder;
import org.infinispan.marshall.TestObjectStreamMarshaller;
import org.infinispan.marshall.persistence.impl.MarshalledEntryUtil;
import org.infinispan.persistence.async.AsyncNonBlockingStore;
import org.infinispan.persistence.dummy.DummyInMemoryStore;
import org.infinispan.persistence.dummy.DummyInMemoryStoreConfiguration;
import org.infinispan.persistence.dummy.DummyInMemoryStoreConfigurationBuilder;
import org.infinispan.persistence.spi.InitializationContext;
import org.infinispan.persistence.spi.MarshallableEntry;
import org.infinispan.persistence.spi.PersistenceException;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.CacheManagerCallable;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.test.fwk.TestInternalCacheEntryFactory;
import org.infinispan.util.PersistenceMockUtil;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"unit"}, testName = "persistence.support.AsyncStoreTest", sequential = true)
/* loaded from: input_file:org/infinispan/persistence/support/AsyncStoreTest.class */
public class AsyncStoreTest extends AbstractInfinispanTest {
    private AsyncNonBlockingStore<Object, Object> store;
    private TestObjectStreamMarshaller marshaller;
    private static final Log log = LogFactory.getLog(AsyncStoreTest.class);
    private static final ThreadLocal<DelayableStore> STORE = new ThreadLocal<>();

    /* loaded from: input_file:org/infinispan/persistence/support/AsyncStoreTest$DelayableStore.class */
    public static class DelayableStore extends DummyInMemoryStore {
        private volatile CompletableFuture<Void> delayedFuture;

        public DelayableStore() {
            AsyncStoreTest.STORE.set(this);
        }

        @Override // org.infinispan.persistence.dummy.DummyInMemoryStore
        public CompletionStage<Void> write(int i, MarshallableEntry marshallableEntry) {
            CompletionStage<Void> write = super.write(i, marshallableEntry);
            return this.delayedFuture != null ? write.thenCompose(r3 -> {
                return this.delayedFuture;
            }) : write;
        }

        @Override // org.infinispan.persistence.dummy.DummyInMemoryStore
        public CompletionStage<Boolean> delete(int i, Object obj) {
            CompletionStage<Boolean> delete = super.delete(i, obj);
            return this.delayedFuture != null ? delete.thenCompose(bool -> {
                return this.delayedFuture.thenCompose(r3 -> {
                    return CompletableFutures.booleanStage(bool.booleanValue());
                });
            }) : delete;
        }
    }

    @BuiltBy(LockableStoreConfigurationBuilder.class)
    @ConfigurationFor(DelayableStore.class)
    /* loaded from: input_file:org/infinispan/persistence/support/AsyncStoreTest$DelayableStoreConfiguration.class */
    public static class DelayableStoreConfiguration extends DummyInMemoryStoreConfiguration {
        public DelayableStoreConfiguration(AttributeSet attributeSet, AsyncStoreConfiguration asyncStoreConfiguration) {
            super(attributeSet, asyncStoreConfiguration);
        }
    }

    /* loaded from: input_file:org/infinispan/persistence/support/AsyncStoreTest$LockableStoreConfigurationBuilder.class */
    public static class LockableStoreConfigurationBuilder extends DummyInMemoryStoreConfigurationBuilder {
        public LockableStoreConfigurationBuilder(PersistenceConfigurationBuilder persistenceConfigurationBuilder) {
            super(persistenceConfigurationBuilder);
        }

        @Override // org.infinispan.persistence.dummy.DummyInMemoryStoreConfigurationBuilder
        /* renamed from: create */
        public DelayableStoreConfiguration mo348create() {
            return new DelayableStoreConfiguration(this.attributes.protect(), this.async.create());
        }
    }

    /* loaded from: input_file:org/infinispan/persistence/support/AsyncStoreTest$OneEntryCacheManagerCallable.class */
    private static abstract class OneEntryCacheManagerCallable extends CacheManagerCallable {
        protected final Cache<String, String> cache;
        protected final DelayableStore store;

        private static ConfigurationBuilder config(boolean z) {
            ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
            configurationBuilder.memory().maxCount(1L).persistence().passivation(z).addStore(LockableStoreConfigurationBuilder.class).async().modificationQueueSize(1).enable();
            return configurationBuilder;
        }

        OneEntryCacheManagerCallable(boolean z) {
            super(TestCacheManagerFactory.createCacheManager(config(z)));
            this.cache = this.cm.getCache();
            this.store = (DelayableStore) AsyncStoreTest.STORE.get();
        }
    }

    private InitializationContext createStore() throws PersistenceException {
        return createStore(false);
    }

    private InitializationContext createStore(boolean z) throws PersistenceException {
        ConfigurationBuilder defaultCacheConfiguration = TestCacheManagerFactory.getDefaultCacheConfiguration(false);
        DummyInMemoryStoreConfigurationBuilder dummyInMemoryStoreConfigurationBuilder = (DummyInMemoryStoreConfigurationBuilder) defaultCacheConfiguration.persistence().addStore(DummyInMemoryStoreConfigurationBuilder.class).storeName(AsyncStoreTest.class.getName()).segmented(false);
        dummyInMemoryStoreConfigurationBuilder.async().enable();
        dummyInMemoryStoreConfigurationBuilder.slow(z);
        InitializationContext createContext = PersistenceMockUtil.createContext(getClass(), defaultCacheConfiguration.build(), this.marshaller);
        this.store = new AsyncNonBlockingStore<>(new DummyInMemoryStore());
        CompletionStages.join(this.store.start(createContext));
        return createContext;
    }

    @BeforeMethod
    public void createMarshalledEntryFactory() {
        this.marshaller = new TestObjectStreamMarshaller();
    }

    @AfterMethod
    public void tearDown() throws PersistenceException {
        if (this.store != null) {
            CompletionStages.join(this.store.stop());
        }
        this.marshaller.stop();
    }

    @Test(timeOut = 30000)
    public void testPutRemove() throws Exception {
        TestResourceTracker.testThreadStarted(getTestName());
        createStore();
        doTestPut(1000, "testPutRemove-k-", "testPutRemove-v-");
        doTestRemove(1000, "testPutRemove-k-");
    }

    @Test(timeOut = 30000)
    public void testRepeatedPutRemove() throws Exception {
        TestResourceTracker.testThreadStarted(getTestName());
        createStore();
        int i = 0;
        for (int i2 = 0; i2 < 2000; i2++) {
            try {
                doTestPut(10, "testRepeatedPutRemove-k-", "testRepeatedPutRemove-v-");
                doTestRemove(10, "testRepeatedPutRemove-k-");
            } catch (Error e) {
                i++;
            }
        }
        AssertJUnit.assertEquals(0, i);
    }

    @Test(timeOut = 30000)
    public void testPutClearPut() throws Exception {
        TestResourceTracker.testThreadStarted(getTestName());
        createStore();
        doTestPut(1000, "testPutClearPut-k-", "testPutClearPut-v-");
        doTestClear(1000, "testPutClearPut-k-");
        doTestPut(1000, "testPutClearPut-k-", "testPutClearPut-v[2]-");
        doTestRemove(1000, "testPutClearPut-k-");
    }

    @Test(timeOut = 30000)
    public void testRepeatedPutClearPut() throws Exception {
        TestResourceTracker.testThreadStarted(getTestName());
        createStore();
        int i = 0;
        for (int i2 = 0; i2 < 2000; i2++) {
            try {
                doTestPut(10, "testRepeatedPutClearPut-k-", "testRepeatedPutClearPut-v-");
                doTestClear(10, "testRepeatedPutClearPut-k-");
                doTestPut(10, "testRepeatedPutClearPut-k-", "testRepeatedPutClearPut-v[2]-");
            } catch (Error e) {
                i++;
            }
        }
        AssertJUnit.assertEquals(0, i);
    }

    @Test(timeOut = 30000)
    public void testMultiplePutsOnSameKey() throws Exception {
        TestResourceTracker.testThreadStarted(getTestName());
        createStore();
        doTestSameKeyPut(1000, "testMultiplePutsOnSameKey-k", "testMultiplePutsOnSameKey-v-");
        doTestSameKeyRemove("testMultiplePutsOnSameKey-k");
    }

    @Test(timeOut = 30000)
    public void testRestrictionOnAddingToAsyncQueue() throws Exception {
        TestResourceTracker.testThreadStarted(getTestName());
        InitializationContext createStore = createStore();
        this.store.delete(0, "blah");
        doTestPut(10, "testRestrictionOnAddingToAsyncQueue-k", "testRestrictionOnAddingToAsyncQueue-v-");
        CompletionStages.join(this.store.stop());
        try {
            this.store.write(0, MarshalledEntryUtil.create("k", (Marshaller) this.marshaller));
            AssertJUnit.fail("Should have restricted this entry from being made");
        } catch (CacheException e) {
        }
        CompletionStages.join(this.store.start(createStore));
        doTestRemove(10, "testRestrictionOnAddingToAsyncQueue-k");
    }

    @Test(timeOut = 30000)
    public void testConcurrentClearAndStop() throws Exception {
        TestResourceTracker.testThreadStarted(getTestName());
        createStore(true);
        Future fork = fork(() -> {
            while (true) {
                try {
                    synchronized (this) {
                        this.store.clear();
                    }
                } catch (CacheException e) {
                    return null;
                }
            }
        });
        Thread.sleep(500L);
        synchronized (this) {
            CompletionStages.join(this.store.stop());
        }
        fork.get(1000L, TimeUnit.SECONDS);
    }

    private void doTestPut(int i, String str, String str2) {
        for (int i2 = 0; i2 < i; i2++) {
            this.store.write(0, MarshalledEntryUtil.create(TestInternalCacheEntryFactory.create(str + i2, str2 + i2), (Marshaller) this.marshaller));
        }
        for (int i3 = 0; i3 < i; i3++) {
            MarshallableEntry marshallableEntry = (MarshallableEntry) CompletionStages.join(this.store.load(0, str + i3));
            AssertJUnit.assertNotNull(marshallableEntry);
            AssertJUnit.assertEquals(str2 + i3, marshallableEntry.getValue());
        }
    }

    private void doTestSameKeyPut(int i, String str, String str2) {
        for (int i2 = 0; i2 < i; i2++) {
            this.store.write(0, MarshalledEntryUtil.create(str, str2 + i2, (Marshaller) this.marshaller));
        }
        MarshallableEntry marshallableEntry = (MarshallableEntry) CompletionStages.join(this.store.load(0, str));
        AssertJUnit.assertNotNull(marshallableEntry);
        AssertJUnit.assertEquals(str2 + (i - 1), marshallableEntry.getValue());
    }

    private void doTestRemove(int i, String str) throws Exception {
        for (int i2 = 0; i2 < i; i2++) {
            this.store.delete(0, str + i2);
        }
        for (int i3 = 0; i3 < i; i3++) {
            AssertJUnit.assertNull(CompletionStages.join(this.store.load(0, str + i3)));
        }
    }

    private void doTestSameKeyRemove(String str) {
        this.store.delete(0, str);
        AssertJUnit.assertNull(CompletionStages.join(this.store.load(0, str)));
    }

    private void doTestClear(int i, String str) throws Exception {
        CompletionStages.join(this.store.clear());
        for (int i2 = 0; i2 < i; i2++) {
            AssertJUnit.assertNull(CompletionStages.join(this.store.load(0, str + i2)));
        }
    }

    public void testModificationQueueSize(Method method) {
        int i = 10;
        DelayableStore delayableStore = new DelayableStore();
        ConfigurationBuilder defaultCacheConfiguration = TestCacheManagerFactory.getDefaultCacheConfiguration(false);
        defaultCacheConfiguration.persistence().addStore(new LockableStoreConfigurationBuilder(defaultCacheConfiguration.persistence())).async().modificationQueueSize(10);
        this.store = new AsyncNonBlockingStore<>(delayableStore);
        CompletionStages.join(this.store.start(PersistenceMockUtil.createContext(getClass(), defaultCacheConfiguration.build(), this.marshaller)));
        delayableStore.delayedFuture = new CompletableFuture();
        try {
            Future fork = fork(() -> {
                for (int i2 = 0; i2 < i + 1; i2++) {
                    try {
                        CompletionStages.join(this.store.write(0, MarshalledEntryUtil.create(TestingUtil.k(method, i2), TestingUtil.v(method, i2), (Marshaller) this.marshaller)));
                    } catch (Exception e) {
                        log.error("Error storing entry", e);
                        return null;
                    }
                }
                CompletionStages.join(this.store.write(0, MarshalledEntryUtil.create(TestingUtil.k(method, i + 1), TestingUtil.v(method, i + 1), (Marshaller) this.marshaller)));
                return null;
            });
            Exceptions.expectException(TimeoutException.class, () -> {
                fork.get(1L, TimeUnit.SECONDS);
            });
            AssertJUnit.assertEquals(1L, delayableStore.size());
            delayableStore.delayedFuture.complete(null);
            CompletionStages.join(this.store.stop());
        } catch (Throwable th) {
            CompletionStages.join(this.store.stop());
            throw th;
        }
    }

    public void testEndToEndPutPutPassivation() throws Exception {
        doTestEndToEndPutPut(true);
    }

    public void testEndToEndPutPut() throws Exception {
        doTestEndToEndPutPut(false);
    }

    private void doTestEndToEndPutPut(final boolean z) throws Exception {
        TestingUtil.withCacheManager(new OneEntryCacheManagerCallable(z) { // from class: org.infinispan.persistence.support.AsyncStoreTest.1
            @Override // org.infinispan.test.CacheManagerCallable
            public void call() throws InterruptedException {
                this.cache.put("X", "1");
                this.cache.put("Y", "1");
                AsyncStoreTest.this.eventually(() -> {
                    return this.store.loadEntry("X") != null;
                });
                this.store.delayedFuture = new CompletableFuture();
                try {
                    this.cache.put("X", "2");
                    AsyncStoreTest asyncStoreTest = AsyncStoreTest.this;
                    boolean z2 = z;
                    Future fork = asyncStoreTest.fork(() -> {
                        this.cache.put("Y", "2");
                        if (z2) {
                            return;
                        }
                        this.cache.put("Z", "1");
                    });
                    Exceptions.expectException(TimeoutException.class, () -> {
                        fork.get(100L, TimeUnit.MILLISECONDS);
                    });
                    AssertJUnit.assertEquals("2", (String) this.cache.get("X"));
                    if (!z) {
                        AssertJUnit.assertEquals("1", (String) this.cache.get("Z"));
                    }
                } finally {
                    this.store.delayedFuture.complete(false);
                }
            }
        });
    }

    public void testEndToEndPutRemovePassivation() throws Exception {
        doTestEndToEndPutRemove(true);
    }

    public void testEndToEndPutRemove() throws Exception {
        doTestEndToEndPutRemove(false);
    }

    private void doTestEndToEndPutRemove(final boolean z) throws Exception {
        TestingUtil.withCacheManager(new OneEntryCacheManagerCallable(z) { // from class: org.infinispan.persistence.support.AsyncStoreTest.2
            @Override // org.infinispan.test.CacheManagerCallable
            public void call() throws InterruptedException {
                this.cache.put("X", "1");
                this.cache.put("Y", "1");
                AsyncStoreTest.this.eventually(() -> {
                    return this.store.loadEntry("X") != null;
                });
                this.store.delayedFuture = new CompletableFuture();
                try {
                    this.cache.put("replicating", "completes, but replication is stuck on delayed Future");
                    if (!z) {
                        this.cache.put("in-queue", "completes, but waiting on previous replication to complete before replicating");
                    }
                    Future fork = AsyncStoreTest.this.fork(() -> {
                        this.cache.remove("X");
                    });
                    Exceptions.expectException(TimeoutException.class, () -> {
                        fork.get(100L, TimeUnit.MILLISECONDS);
                    });
                } finally {
                    this.store.delayedFuture.complete(false);
                }
            }
        });
    }
}
