package org.infinispan.persistence;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.infinispan.commons.test.CommonsTestingUtil;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.marshall.persistence.impl.MarshalledEntryUtil;
import org.infinispan.metadata.Metadata;
import org.infinispan.persistence.support.WaitNonBlockingStore;
import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.test.SingleCacheManagerTest;
import org.infinispan.test.TestDataSCI;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.util.concurrent.WithinThreadExecutor;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "persistence.ParallelIterationTest")
/* loaded from: input_file:org/infinispan/persistence/ParallelIterationTest.class */
public abstract class ParallelIterationTest extends SingleCacheManagerTest {
    private static final int NUM_THREADS = 10;
    private static final int NUM_ENTRIES = 200;
    protected WaitNonBlockingStore<Object, Object> store;
    protected ExecutorService executor;
    protected IntSet allSegments;

    @Override // org.infinispan.test.SingleCacheManagerTest
    protected EmbeddedCacheManager createCacheManager() throws Exception {
        ConfigurationBuilder defaultStandaloneCacheConfig = getDefaultStandaloneCacheConfig(false);
        configurePersistence(defaultStandaloneCacheConfig);
        GlobalConfigurationBuilder globalConfigurationBuilder = new GlobalConfigurationBuilder();
        globalConfigurationBuilder.globalState().persistentLocation(CommonsTestingUtil.tmpDirectory(getClass()));
        globalConfigurationBuilder.serialization().addContextInitializer(getSerializationContextInitializer());
        EmbeddedCacheManager createCacheManager = TestCacheManagerFactory.createCacheManager(globalConfigurationBuilder, defaultStandaloneCacheConfig);
        this.store = TestingUtil.getFirstStore(createCacheManager.getCache());
        this.executor = testExecutor();
        this.allSegments = IntSets.immutableRangeSet(createCacheManager.getCache().getCacheConfiguration().clustering().hash().numSegments());
        return createCacheManager;
    }

    protected abstract void configurePersistence(ConfigurationBuilder configurationBuilder);

    protected SerializationContextInitializer getSerializationContextInitializer() {
        return TestDataSCI.INSTANCE;
    }

    public void testParallelIterationWithValue() {
        runIterationTest(this.executor, true);
    }

    public void testSequentialIterationWithValue() {
        runIterationTest(new WithinThreadExecutor(), true);
    }

    public void testParallelIterationWithoutValue() {
        runIterationTest(this.executor, false);
    }

    public void testSequentialIterationWithoutValue() {
        runIterationTest(new WithinThreadExecutor(), false);
    }

    private void runIterationTest(Executor executor, boolean z) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Assert.assertEquals(this.store.sizeWait(this.allSegments), 0L);
        insertData();
        Flowable doOnNext = Flowable.fromPublisher(this.store.publishEntries(this.allSegments, null, z)).doOnNext(marshallableEntry -> {
            Integer num;
            Integer unwrapKey = unwrapKey(marshallableEntry.getKey());
            if (z && (num = (Integer) concurrentHashMap.put(unwrapKey, unwrapValue(marshallableEntry.getValue()))) != null) {
                this.log.warnf("Already a value present for key %s: %s", unwrapKey, num);
                atomicBoolean.set(true);
            }
            if (marshallableEntry.getMetadata() == null) {
                this.log.tracef("No metadata found for key %d", unwrapKey);
                return;
            }
            this.log.tracef("For key %d found metadata %s", unwrapKey, marshallableEntry.getMetadata());
            Metadata metadata = (Metadata) concurrentHashMap2.put(unwrapKey, marshallableEntry.getMetadata());
            if (metadata != null) {
                this.log.warnf("Already a metadata present for key %s: %s", unwrapKey, metadata);
                atomicBoolean.set(true);
            }
        });
        TestSubscriber create = TestSubscriber.create(0L);
        doOnNext.subscribe(create);
        int i = 10;
        for (int i2 = 0; i2 < (NUM_ENTRIES / 10) - 1; i2++) {
            executor.execute(() -> {
                create.request(i);
            });
        }
        create.awaitCount(NUM_ENTRIES - 10);
        create.request(10 + 1);
        create.awaitDone(10L, TimeUnit.SECONDS);
        create.assertNoErrors();
        Assert.assertEquals(NUM_ENTRIES, create.values().size());
        Assert.assertFalse(atomicBoolean.get());
        for (int i3 = 0; i3 < NUM_ENTRIES; i3++) {
            if (z) {
                Assert.assertEquals(concurrentHashMap.get(Integer.valueOf(i3)), Integer.valueOf(i3), "For key " + i3);
            } else {
                Assert.assertNull(concurrentHashMap.get(Integer.valueOf(i3)), "For key " + i3);
            }
            if (hasMetadata(z, i3)) {
                Assert.assertNotNull(concurrentHashMap2.get(Integer.valueOf(i3)), "For key " + i3);
                Assert.assertEquals(((Metadata) concurrentHashMap2.get(Integer.valueOf(i3))).lifespan(), lifespan(i3), "For key " + i3);
                Assert.assertEquals(((Metadata) concurrentHashMap2.get(Integer.valueOf(i3))).maxIdle(), maxIdle(i3), "For key " + i3);
            } else {
                assertMetadataEmpty((Metadata) concurrentHashMap2.get(Integer.valueOf(i3)));
            }
        }
    }

    private void insertData() {
        for (int i = 0; i < NUM_ENTRIES; i++) {
            long currentTimeMillis = System.currentTimeMillis();
            this.store.write(MarshalledEntryUtil.create(wrapKey(i), wrapValue(i, i), insertMetadata(i) ? TestingUtil.metadata(Long.valueOf(lifespan(i)), Long.valueOf(maxIdle(i))) : null, currentTimeMillis, currentTimeMillis, this.cache));
        }
    }

    protected void assertMetadataEmpty(Metadata metadata) {
        Assert.assertNull(metadata);
    }

    protected boolean insertMetadata(int i) {
        return i % 2 == 0;
    }

    protected boolean hasMetadata(boolean z, int i) {
        return insertMetadata(i);
    }

    protected long lifespan(int i) {
        return 1000 * (i + 1000);
    }

    protected long maxIdle(int i) {
        return 10000 * (i + 1000);
    }

    protected Object wrapKey(int i) {
        return Integer.valueOf(i);
    }

    protected Integer unwrapKey(Object obj) {
        return (Integer) obj;
    }

    protected Object wrapValue(int i, int i2) {
        return Integer.valueOf(i2);
    }

    protected Integer unwrapValue(Object obj) {
        return (Integer) obj;
    }
}
