package org.infinispan.persistence.manager;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.commons.test.Exceptions;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.marshall.persistence.impl.MarshalledEntryUtil;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.support.DelayStore;
import org.infinispan.persistence.support.FailStore;
import org.infinispan.test.SingleCacheManagerTest;
import org.infinispan.test.TestException;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.util.concurrent.CompletionStages;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(groups = {"unit"}, testName = "persistence.PersistenceManagerTest")
/* loaded from: input_file:org/infinispan/persistence/manager/PersistenceManagerTest.class */
public class PersistenceManagerTest extends SingleCacheManagerTest {
    public void testPublishAfterStop() {
        PersistenceManager persistenceManager = (PersistenceManager) TestingUtil.extractComponent(this.cache, PersistenceManager.class);
        insertEntry(persistenceManager, (KeyPartitioner) TestingUtil.extractComponent(this.cache, KeyPartitioner.class), "k", "v");
        persistenceManager.stop();
        Flowable.fromPublisher(persistenceManager.publishEntries(true, true)).blockingSubscribe(marshallableEntry -> {
            AssertJUnit.fail("shouldn't run");
        });
    }

    public void testStopDuringPublish() throws ExecutionException, InterruptedException, TimeoutException {
        PersistenceManager persistenceManager = (PersistenceManager) TestingUtil.extractComponent(this.cache, PersistenceManager.class);
        KeyPartitioner keyPartitioner = (KeyPartitioner) TestingUtil.extractComponent(this.cache, KeyPartitioner.class);
        insertEntry(persistenceManager, keyPartitioner, "k1", "v1");
        insertEntry(persistenceManager, keyPartitioner, "k2", "v2");
        insertEntry(persistenceManager, keyPartitioner, "k3", "v3");
        DelayStore delayStore = (DelayStore) TestingUtil.getFirstStore(this.cache);
        delayStore.delayBeforeEmit(1);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Future fork = fork(() -> {
            TestSubscriber create = TestSubscriber.create(0L);
            Flowable.fromPublisher(persistenceManager.publishEntries(true, true)).subscribe(create);
            countDownLatch.countDown();
            AssertJUnit.assertTrue(countDownLatch2.await(10L, TimeUnit.SECONDS));
            create.request(Long.MAX_VALUE);
            create.await(10L, TimeUnit.SECONDS);
            create.assertNoErrors();
            create.assertComplete();
            return Integer.valueOf(create.values().size());
        });
        AssertJUnit.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
        Objects.requireNonNull(persistenceManager);
        Future<Void> fork2 = fork(persistenceManager::stop);
        Thread.sleep(50L);
        AssertJUnit.assertFalse(fork2.isDone());
        AssertJUnit.assertFalse(fork.isDone());
        countDownLatch2.countDown();
        Thread.sleep(50L);
        AssertJUnit.assertFalse(fork2.isDone());
        AssertJUnit.assertFalse(fork.isDone());
        delayStore.endDelay();
        Integer num = (Integer) fork.get(30L, TimeUnit.SECONDS);
        fork2.get(30L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals(3, num.intValue());
    }

    public void testEarlyTerminatedPublish() {
        PersistenceManagerImpl persistenceManagerImpl = (PersistenceManager) TestingUtil.extractComponent(this.cache, PersistenceManager.class);
        KeyPartitioner keyPartitioner = (KeyPartitioner) TestingUtil.extractComponent(this.cache, KeyPartitioner.class);
        for (int i = 0; i < 140; i++) {
            insertEntry(persistenceManagerImpl, keyPartitioner, "k" + i, "v");
        }
        DelayStore delayStore = (DelayStore) TestingUtil.getFirstStore(this.cache);
        delayStore.delayBeforeEmit(1);
        PersistenceManagerImpl persistenceManagerImpl2 = persistenceManagerImpl;
        AssertJUnit.assertFalse(persistenceManagerImpl2.anyLocksHeld());
        AssertJUnit.assertFalse(this.cache.isEmpty());
        AssertJUnit.assertFalse(persistenceManagerImpl2.anyLocksHeld());
        delayStore.endDelay();
    }

    public void testStoreExceptionInWrite() {
        PersistenceManager persistenceManager = (PersistenceManager) TestingUtil.extractComponent(this.cache, PersistenceManager.class);
        KeyPartitioner keyPartitioner = (KeyPartitioner) TestingUtil.extractComponent(this.cache, KeyPartitioner.class);
        DelayStore delayStore = (DelayStore) TestingUtil.getStore(this.cache, 0, true);
        ((FailStore) TestingUtil.getStore(this.cache, 1, true)).failModification(2);
        int segment = keyPartitioner.getSegment("k");
        Exceptions.expectCompletionException(TestException.class, persistenceManager.writeToAllNonTxStores(MarshalledEntryUtil.create("k", "v", this.cache), segment, PersistenceManager.AccessMode.BOTH));
        AssertJUnit.assertTrue(delayStore.contains("k"));
        Exceptions.expectCompletionException(TestException.class, persistenceManager.deleteFromAllStores("k", segment, PersistenceManager.AccessMode.BOTH));
        AssertJUnit.assertFalse(delayStore.contains("k"));
    }

    @Override // org.infinispan.test.SingleCacheManagerTest
    protected EmbeddedCacheManager createCacheManager() {
        ConfigurationBuilder defaultStandaloneCacheConfig = getDefaultStandaloneCacheConfig(true);
        defaultStandaloneCacheConfig.persistence().addStore(DelayStore.ConfigurationBuilder.class);
        defaultStandaloneCacheConfig.persistence().addStore(FailStore.ConfigurationBuilder.class);
        defaultStandaloneCacheConfig.persistence().addStore(FailStore.ConfigurationBuilder.class);
        return TestCacheManagerFactory.createCacheManager(defaultStandaloneCacheConfig);
    }

    private void insertEntry(PersistenceManager persistenceManager, KeyPartitioner keyPartitioner, String str, String str2) {
        CompletionStages.join(persistenceManager.writeToAllNonTxStores(MarshalledEntryUtil.create(str, str2, this.cache), keyPartitioner.getSegment(str), PersistenceManager.AccessMode.BOTH));
    }
}
