package org.infinispan.persistence;

import io.reactivex.Flowable;
import io.reactivex.internal.functions.Functions;
import io.reactivex.subscribers.TestSubscriber;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.marshall.persistence.impl.MarshalledEntryUtil;
import org.infinispan.persistence.dummy.DummyInMemoryStoreConfigurationBuilder;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.manager.PersistenceManagerImpl;
import org.infinispan.test.Exceptions;
import org.infinispan.test.SingleCacheManagerTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.concurrent.WithinThreadExecutor;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(groups = {"unit"}, testName = "persistence.PersistenceManagerTest")
/* loaded from: input_file:org/infinispan/persistence/PersistenceManagerTest.class */
public class PersistenceManagerTest extends SingleCacheManagerTest {
    public void testProcessAfterStop() {
        PersistenceManager persistenceManager = (PersistenceManager) TestingUtil.extractComponent(this.cache, PersistenceManager.class);
        persistenceManager.writeToAllNonTxStores(MarshalledEntryUtil.create("k", "v", this.cache), ((KeyPartitioner) TestingUtil.extractComponent(this.cache, KeyPartitioner.class)).getSegment("k"), PersistenceManager.AccessMode.BOTH);
        persistenceManager.stop();
        Flowable.fromPublisher(persistenceManager.publishEntries(true, true)).subscribe(marshallableEntry -> {
            AssertJUnit.fail("shouldn't run");
        });
    }

    public void testStopDuringProcess() throws ExecutionException, InterruptedException, TimeoutException {
        PersistenceManager persistenceManager = (PersistenceManager) TestingUtil.extractComponent(this.cache, PersistenceManager.class);
        KeyPartitioner keyPartitioner = (KeyPartitioner) TestingUtil.extractComponent(this.cache, KeyPartitioner.class);
        CompletionStages.join(persistenceManager.writeToAllNonTxStores(MarshalledEntryUtil.create("k1", "v1", this.cache), keyPartitioner.getSegment("k1"), PersistenceManager.AccessMode.BOTH));
        CompletionStages.join(persistenceManager.writeToAllNonTxStores(MarshalledEntryUtil.create("k2", "v2", this.cache), keyPartitioner.getSegment("k2"), PersistenceManager.AccessMode.BOTH));
        CompletionStages.join(persistenceManager.writeToAllNonTxStores(MarshalledEntryUtil.create("k3", "v3", this.cache), keyPartitioner.getSegment("k3"), PersistenceManager.AccessMode.BOTH));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Future fork = fork(() -> {
            TestSubscriber create = TestSubscriber.create(0L);
            Flowable.fromPublisher(persistenceManager.publishEntries(true, true)).subscribe(create);
            countDownLatch.countDown();
            AssertJUnit.assertTrue(countDownLatch2.await(10L, TimeUnit.SECONDS));
            create.request(Long.MAX_VALUE);
            create.await(10L, TimeUnit.SECONDS);
            create.assertNoErrors();
            create.assertComplete();
            return Integer.valueOf(create.valueCount());
        });
        AssertJUnit.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
        Objects.requireNonNull(persistenceManager);
        Future<Void> fork2 = fork(persistenceManager::stop);
        Exceptions.expectException(TimeoutException.class, () -> {
            fork2.get(50L, TimeUnit.MILLISECONDS);
        });
        countDownLatch2.countDown();
        Integer num = (Integer) fork.get(30L, TimeUnit.SECONDS);
        fork2.get(30L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals(3, num.intValue());
    }

    public void testEarlyTerminatedOperation() {
        PersistenceManagerImpl persistenceManagerImpl = (PersistenceManager) TestingUtil.extractComponent(this.cache, PersistenceManager.class);
        KeyPartitioner keyPartitioner = (KeyPartitioner) TestingUtil.extractComponent(this.cache, KeyPartitioner.class);
        for (int i = 0; i < 140; i++) {
            String str = "k" + i;
            persistenceManagerImpl.writeToAllNonTxStores(MarshalledEntryUtil.create(str, "v", this.cache), keyPartitioner.getSegment(str), PersistenceManager.AccessMode.BOTH);
        }
        PersistenceManagerImpl persistenceManagerImpl2 = persistenceManagerImpl;
        AssertJUnit.assertEquals(0, persistenceManagerImpl2.activePublisherInvocations());
        AssertJUnit.assertFalse(this.cache.isEmpty());
        AssertJUnit.assertEquals(0, persistenceManagerImpl2.activePublisherInvocations());
    }

    public void testPublishWithInterrupt() throws InterruptedException {
        ExecutorService executorService = (ExecutorService) TestingUtil.extractGlobalComponent(this.cache.getCacheManager(), ExecutorService.class, "org.infinispan.executors.blocking");
        TestingUtil.replaceComponent((CacheContainer) this.cache.getCacheManager(), (Class<WithinThreadExecutor>) ExecutorService.class, "org.infinispan.executors.blocking", new WithinThreadExecutor(), true);
        try {
            PersistenceManager persistenceManager = (PersistenceManager) TestingUtil.extractComponent(this.cache, PersistenceManager.class);
            persistenceManager.stop();
            persistenceManager.start();
            Thread.currentThread().interrupt();
            CountDownLatch countDownLatch = new CountDownLatch(1);
            AtomicReference atomicReference = new AtomicReference();
            Flowable.fromPublisher(persistenceManager.publishEntries(true, true)).subscribe(Functions.emptyConsumer(), th -> {
                atomicReference.set(th);
                countDownLatch.countDown();
            });
            countDownLatch.await(10L, TimeUnit.SECONDS);
            Exceptions.assertException(InterruptedException.class, (Throwable) atomicReference.get());
            TestingUtil.replaceComponent((CacheContainer) this.cache.getCacheManager(), (Class<ExecutorService>) ExecutorService.class, "org.infinispan.executors.blocking", executorService, true);
        } catch (Throwable th2) {
            TestingUtil.replaceComponent((CacheContainer) this.cache.getCacheManager(), (Class<ExecutorService>) ExecutorService.class, "org.infinispan.executors.blocking", executorService, true);
            throw th2;
        }
    }

    @Override // org.infinispan.test.SingleCacheManagerTest
    protected EmbeddedCacheManager createCacheManager() {
        ConfigurationBuilder defaultStandaloneCacheConfig = getDefaultStandaloneCacheConfig(true);
        defaultStandaloneCacheConfig.persistence().addStore(DummyInMemoryStoreConfigurationBuilder.class).slow(true);
        return TestCacheManagerFactory.createCacheManager(defaultStandaloneCacheConfig);
    }
}
