package org.infinispan.statetransfer;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.Cache;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.interceptors.BaseAsyncInterceptor;
import org.infinispan.interceptors.impl.InvocationContextInterceptor;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.transaction.LockingMode;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.transaction.lookup.EmbeddedTransactionManagerLookup;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(groups = {"functional"}, testName = "statetransfer.InitialStateTransferCompletionTest")
/* loaded from: input_file:org/infinispan/statetransfer/InitialStateTransferCompletionTest.class */
public class InitialStateTransferCompletionTest extends MultipleCacheManagersTest {
    private ConfigurationBuilder cacheConfigBuilder;

    /* loaded from: input_file:org/infinispan/statetransfer/InitialStateTransferCompletionTest$CountInterceptor.class */
    static class CountInterceptor extends BaseAsyncInterceptor {
        private final AtomicBoolean ignoreFurtherStateTransfer;
        private final AtomicInteger transferredKeys;

        public CountInterceptor(AtomicBoolean atomicBoolean, AtomicInteger atomicInteger) {
            this.ignoreFurtherStateTransfer = atomicBoolean;
            this.transferredKeys = atomicInteger;
        }

        public Object visitCommand(InvocationContext invocationContext, VisitableCommand visitableCommand) throws Throwable {
            if (!(visitableCommand instanceof PutKeyValueCommand) || !((PutKeyValueCommand) visitableCommand).hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) {
                return invokeNext(invocationContext, visitableCommand);
            }
            if (this.ignoreFurtherStateTransfer.get()) {
                return null;
            }
            return invokeNextThenAccept(invocationContext, visitableCommand, (invocationContext2, visitableCommand2, obj) -> {
                this.transferredKeys.incrementAndGet();
            });
        }
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        this.cacheConfigBuilder = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, true, true);
        this.cacheConfigBuilder.transaction().transactionMode(TransactionMode.TRANSACTIONAL).transactionManagerLookup(new EmbeddedTransactionManagerLookup()).lockingMode(LockingMode.PESSIMISTIC).clustering().hash().numOwners(10).stateTransfer().fetchInMemoryState(true).awaitInitialTransfer(true);
        createCluster(this.cacheConfigBuilder, 2);
        waitForClusterToForm();
    }

    public void testStateTransferCompletion() throws Exception {
        Cache cache = cache(0);
        for (int i = 0; i < 100; i++) {
            cache.put("k" + i, "v" + i);
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicInteger atomicInteger = new AtomicInteger();
        this.cacheConfigBuilder.customInterceptors().addInterceptor().before(InvocationContextInterceptor.class).interceptor(new CountInterceptor(atomicBoolean, atomicInteger));
        log.trace("Adding new member ...");
        addClusterEnabledCacheManager(this.cacheConfigBuilder);
        Cache cache2 = cache(2);
        atomicBoolean.set(true);
        log.trace("Successfully added a new member");
        AssertJUnit.assertEquals(100, atomicInteger.get());
        LocalizedCacheTopology cacheTopology = cache2.getAdvancedCache().getDistributionManager().getCacheTopology();
        AssertJUnit.assertNull(cacheTopology.getPendingCH());
        AssertJUnit.assertTrue(cacheTopology.getReadConsistentHash().getMembers().contains(address(2)));
        DataContainer dataContainer = cache(2).getAdvancedCache().getDataContainer();
        AssertJUnit.assertEquals(100, dataContainer.size());
        for (int i2 = 0; i2 < 100; i2++) {
            String str = "k" + i2;
            AssertJUnit.assertTrue(cacheTopology.isReadOwner(str));
            InternalCacheEntry internalCacheEntry = dataContainer.get(str);
            AssertJUnit.assertNotNull(internalCacheEntry);
            AssertJUnit.assertEquals("v" + i2, internalCacheEntry.getValue());
        }
    }
}
