package org.infinispan.conflict.impl;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.statetransfer.StateResponseCommand;
import org.infinispan.commands.topology.TopologyUpdateCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.distribution.MagicKey;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.inboundhandler.AbstractDelegatingHandler;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.InboundInvocationHandler;
import org.infinispan.remoting.inboundhandler.PerCacheInboundInvocationHandler;
import org.infinispan.remoting.inboundhandler.Reply;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestInternalCacheEntryFactory;
import org.infinispan.topology.CacheTopology;
import org.infinispan.xsite.XSiteReplicateCommand;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "conflict.impl.OperationsDuringMergeConflictTest")
/* loaded from: input_file:org/infinispan/conflict/impl/OperationsDuringMergeConflictTest.class */
public class OperationsDuringMergeConflictTest extends BaseMergePolicyTest {
    private static final String PARTITION_0_VAL = "A";
    private static final String PARTITION_1_VAL = "B";
    private static final String MERGE_RESULT = "C";
    private static final String PUT_RESULT = "D";
    private MergeAction mergeAction;

    /* loaded from: input_file:org/infinispan/conflict/impl/OperationsDuringMergeConflictTest$BlockStateResponseCommandHandler.class */
    private class BlockStateResponseCommandHandler extends AbstractDelegatingHandler {
        final CountDownLatch latch;

        BlockStateResponseCommandHandler(PerCacheInboundInvocationHandler perCacheInboundInvocationHandler, CountDownLatch countDownLatch) {
            super(perCacheInboundInvocationHandler);
            this.latch = countDownLatch;
        }

        public void handle(CacheRpcCommand cacheRpcCommand, Reply reply, DeliverOrder deliverOrder) {
            if (cacheRpcCommand instanceof StateResponseCommand) {
                OperationsDuringMergeConflictTest.this.awaitLatch(this.latch);
            }
            this.delegate.handle(cacheRpcCommand, reply, deliverOrder);
        }
    }

    /* loaded from: input_file:org/infinispan/conflict/impl/OperationsDuringMergeConflictTest$BlockingInboundInvocationHandler.class */
    private class BlockingInboundInvocationHandler implements InboundInvocationHandler {
        final InboundInvocationHandler delegate;
        final CountDownLatch latch;

        BlockingInboundInvocationHandler(InboundInvocationHandler inboundInvocationHandler, CountDownLatch countDownLatch) {
            this.delegate = inboundInvocationHandler;
            this.latch = countDownLatch;
        }

        public void handleFromCluster(Address address, ReplicableCommand replicableCommand, Reply reply, DeliverOrder deliverOrder) {
            if ((replicableCommand instanceof TopologyUpdateCommand) && ((TopologyUpdateCommand) replicableCommand).getPhase() == CacheTopology.Phase.READ_OLD_WRITE_ALL) {
                OperationsDuringMergeConflictTest.this.awaitLatch(this.latch);
            }
            this.delegate.handleFromCluster(address, replicableCommand, reply, deliverOrder);
        }

        public void handleFromRemoteSite(String str, XSiteReplicateCommand xSiteReplicateCommand, Reply reply, DeliverOrder deliverOrder) {
            this.delegate.handleFromRemoteSite(str, xSiteReplicateCommand, reply, deliverOrder);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/conflict/impl/OperationsDuringMergeConflictTest$MergeAction.class */
    public enum MergeAction {
        PUT(OperationsDuringMergeConflictTest.PUT_RESULT),
        REMOVE(null),
        NONE(OperationsDuringMergeConflictTest.MERGE_RESULT);

        String value;

        MergeAction(String str) {
            this.value = str;
        }
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    public Object[] factory() {
        return new Object[]{new OperationsDuringMergeConflictTest(MergeAction.NONE), new OperationsDuringMergeConflictTest(MergeAction.PUT), new OperationsDuringMergeConflictTest(MergeAction.REMOVE)};
    }

    public OperationsDuringMergeConflictTest() {
    }

    public OperationsDuringMergeConflictTest(MergeAction mergeAction) {
        super(CacheMode.DIST_SYNC, null, new int[]{0, 1}, new int[]{2, 3});
        this.mergePolicy = (cacheEntry, list) -> {
            return TestInternalCacheEntryFactory.create(this.conflictKey, MERGE_RESULT);
        };
        this.description = mergeAction.toString();
        this.mergeAction = mergeAction;
        this.valueAfterMerge = mergeAction.value;
    }

    @Override // org.infinispan.conflict.impl.BaseMergePolicyTest
    protected void beforeSplit() {
        this.conflictKey = new MagicKey((Cache<?, ?>) cache(this.p0.node(0)), (Cache<?, ?>[]) new Cache[]{cache(this.p1.node(0))});
    }

    @Override // org.infinispan.conflict.impl.BaseMergePolicyTest
    protected void duringSplit(AdvancedCache advancedCache, AdvancedCache advancedCache2) {
        cache(this.p0.node(0)).put(this.conflictKey, PARTITION_0_VAL);
        cache(this.p1.node(0)).put(this.conflictKey, PARTITION_1_VAL);
        assertCacheGet(this.conflictKey, PARTITION_0_VAL, this.p0.getNodes());
        assertCacheGet(this.conflictKey, PARTITION_1_VAL, this.p1.getNodes());
    }

    @Override // org.infinispan.conflict.impl.BaseMergePolicyTest
    protected void performMerge() {
        boolean z = this.mergeAction != MergeAction.NONE;
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        try {
            IntStream.range(0, this.numMembersInCluster).forEach(i -> {
                TestingUtil.wrapInboundInvocationHandler(cache(i), perCacheInboundInvocationHandler -> {
                    return new BlockStateResponseCommandHandler(perCacheInboundInvocationHandler, countDownLatch);
                });
                EmbeddedCacheManager manager = mo192manager(i);
                TestingUtil.replaceComponent((CacheContainer) manager, (Class<BlockingInboundInvocationHandler>) InboundInvocationHandler.class, new BlockingInboundInvocationHandler((InboundInvocationHandler) TestingUtil.extractGlobalComponent(manager, InboundInvocationHandler.class), countDownLatch2), true);
            });
            assertCacheGet(this.conflictKey, PARTITION_0_VAL, this.p0.getNodes());
            assertCacheGet(this.conflictKey, PARTITION_1_VAL, this.p1.getNodes());
            partition(0).merge(partition(1), false);
            assertCacheGet(this.conflictKey, PARTITION_0_VAL, this.p0.getNodes());
            assertCacheGet(this.conflictKey, PARTITION_1_VAL, this.p1.getNodes());
            if (z) {
                TestingUtil.waitForTopologyPhase((List) caches().stream().map(cache -> {
                    return cache.getCacheManager().getAddress();
                }).collect(Collectors.toList()), CacheTopology.Phase.CONFLICT_RESOLUTION, (Cache[]) caches().toArray(new Cache[this.numMembersInCluster]));
                if (this.mergeAction == MergeAction.PUT) {
                    cache(0).put(this.conflictKey, this.mergeAction.value);
                } else {
                    cache(0).remove(this.conflictKey);
                }
            }
            countDownLatch.countDown();
            countDownLatch2.countDown();
            TestingUtil.waitForNoRebalance(caches());
            assertCacheGetValAllCaches(this.mergeAction);
        } catch (Throwable th) {
            countDownLatch.countDown();
            countDownLatch2.countDown();
            throw th;
        }
    }

    private void assertCacheGetValAllCaches(MergeAction mergeAction) {
        assertCacheGet(this.conflictKey, mergeAction.value, cacheIndexes());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void awaitLatch(CountDownLatch countDownLatch) {
        try {
            if (!countDownLatch.await(120L, TimeUnit.SECONDS)) {
                AssertJUnit.fail("CountDownLatch await timedout");
            }
        } catch (InterruptedException e) {
            AssertJUnit.fail("CountDownLatch Interrupted");
        }
    }
}
