package org.infinispan.statetransfer;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.distribution.MagicKey;
import org.infinispan.remoting.inboundhandler.AbstractDelegatingHandler;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.PerCacheInboundInvocationHandler;
import org.infinispan.remoting.inboundhandler.Reply;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.UnsureResponse;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.concurrent.StateSequencer;
import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
import org.infinispan.util.AbstractControlledRpcManager;
import org.infinispan.util.ControlledConsistentHashFactory;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "statetransfer.TxReplay3Test")
/* loaded from: input_file:org/infinispan/statetransfer/TxReplay3Test.class */
public class TxReplay3Test extends MultipleCacheManagersTest {
    private static final String VALUE_1 = "v1";
    private static final String VALUE_2 = "v2";
    private static final String TX1_LOCKED = "tx1:acquired_lock";
    private static final String TX1_UNSURE = "tx1:unsure_response";
    private static final String TX2_PENDING = "tx2:waiting_tx1";
    private static final String MAIN_ADVANCE = "main:advance";
    private static final String JOIN_NEW_NODE = "join:add_new_node";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/statetransfer/TxReplay3Test$Handler.class */
    public static class Handler extends AbstractDelegatingHandler {
        private static final Log log = LogFactory.getLog(Handler.class);
        private final StateSequencer sequencer;
        private volatile boolean triggered;
        private volatile Address origin;

        public Handler(PerCacheInboundInvocationHandler perCacheInboundInvocationHandler, StateSequencer stateSequencer) {
            super(perCacheInboundInvocationHandler);
            this.triggered = false;
            this.sequencer = stateSequencer;
        }

        public void setOrigin(Address address) {
            this.origin = address;
        }

        protected boolean beforeHandle(CacheRpcCommand cacheRpcCommand, Reply reply, DeliverOrder deliverOrder) {
            log.debugf("Before invoking %s. expected origin=%s", cacheRpcCommand, this.origin);
            return super.beforeHandle(cacheRpcCommand, reply, deliverOrder);
        }

        protected void afterHandle(CacheRpcCommand cacheRpcCommand, DeliverOrder deliverOrder, boolean z) {
            super.afterHandle(cacheRpcCommand, deliverOrder, z);
            log.debugf("After invoking %s. expected origin=%s", cacheRpcCommand, this.origin);
            if (!this.triggered && (cacheRpcCommand instanceof PrepareCommand) && cacheRpcCommand.getOrigin().equals(this.origin)) {
                log.debugf("Triggering %s.", TxReplay3Test.TX2_PENDING);
                this.triggered = true;
                try {
                    this.sequencer.advance(TxReplay3Test.TX2_PENDING);
                } catch (InterruptedException | TimeoutException e) {
                    throw new CacheException(e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/statetransfer/TxReplay3Test$UnsureResponseRpcManager.class */
    public static class UnsureResponseRpcManager extends AbstractControlledRpcManager {
        private static final Log log = LogFactory.getLog(UnsureResponseRpcManager.class);
        private final StateSequencer sequencer;
        private volatile boolean triggered;

        public UnsureResponseRpcManager(RpcManager rpcManager, StateSequencer stateSequencer) {
            super(rpcManager);
            this.triggered = false;
            this.sequencer = stateSequencer;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.infinispan.util.AbstractControlledRpcManager
        public void beforeInvokeRemotely(ReplicableCommand replicableCommand) {
            super.beforeInvokeRemotely(replicableCommand);
            log.debugf("Before invoke remotely %s", replicableCommand);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.infinispan.util.AbstractControlledRpcManager
        public Map<Address, Response> afterInvokeRemotely(ReplicableCommand replicableCommand, Map<Address, Response> map) {
            Map<Address, Response> afterInvokeRemotely = super.afterInvokeRemotely(replicableCommand, map);
            log.debugf("After invoke remotely %s. Responses=%s", replicableCommand, afterInvokeRemotely);
            if (!this.triggered && (replicableCommand instanceof PrepareCommand)) {
                log.debugf("Triggering %s and %s", TxReplay3Test.TX1_LOCKED, TxReplay3Test.TX1_UNSURE);
                this.triggered = true;
                try {
                    this.sequencer.advance(TxReplay3Test.TX1_LOCKED);
                    this.sequencer.advance(TxReplay3Test.TX1_UNSURE);
                    Iterator<Map.Entry<Address, Response>> it = afterInvokeRemotely.entrySet().iterator();
                    while (it.hasNext()) {
                        it.next().setValue(UnsureResponse.INSTANCE);
                    }
                    log.debugf("After invoke remotely %s. New Responses=%s", replicableCommand, afterInvokeRemotely);
                } catch (InterruptedException | TimeoutException e) {
                    throw new CacheException(e);
                }
            }
            return afterInvokeRemotely;
        }
    }

    public void testReplay() throws Exception {
        MagicKey magicKey = new MagicKey("TxReplay3Test", (Cache<?, ?>) cache(0));
        StateSequencer stateSequencer = new StateSequencer();
        stateSequencer.logicalThread("tx1", TX1_LOCKED, TX1_UNSURE);
        stateSequencer.logicalThread("tx2", TX2_PENDING, new String[0]);
        stateSequencer.logicalThread("join", JOIN_NEW_NODE, new String[0]);
        stateSequencer.logicalThread("main", MAIN_ADVANCE, new String[0]);
        stateSequencer.order(TX1_LOCKED, MAIN_ADVANCE, TX2_PENDING, JOIN_NEW_NODE, TX1_UNSURE);
        TestingUtil.wrapComponent(cache(1), RpcManager.class, (cache, rpcManager) -> {
            return new UnsureResponseRpcManager(rpcManager, stateSequencer);
        }, true);
        TestingUtil.wrapPerCacheInboundInvocationHandler(cache(0), (cache2, perCacheInboundInvocationHandler) -> {
            return new Handler(perCacheInboundInvocationHandler, stateSequencer);
        }, true).setOrigin(address(cache(2)));
        Future fork = fork(() -> {
            cache(1).put(magicKey, VALUE_1);
            return null;
        });
        stateSequencer.advance(MAIN_ADVANCE);
        Future fork2 = fork(() -> {
            cache(2).put(magicKey, VALUE_2);
            return null;
        });
        stateSequencer.enter(JOIN_NEW_NODE);
        addClusterEnabledCacheManager(config()).getCache();
        waitForClusterToForm();
        stateSequencer.exit(JOIN_NEW_NODE);
        fork.get(30L, TimeUnit.SECONDS);
        fork2.get(30L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals(VALUE_2, cache(0).get(magicKey));
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected void createCacheManagers() throws Throwable {
        createClusteredCaches(3, config());
    }

    private static ConfigurationBuilder config() {
        ConfigurationBuilder defaultClusteredCacheConfig = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, true);
        defaultClusteredCacheConfig.transaction().useSynchronization(false).transactionManagerLookup(new DummyTransactionManagerLookup()).recovery().disable();
        defaultClusteredCacheConfig.locking().lockAcquisitionTimeout(1L, TimeUnit.MINUTES);
        defaultClusteredCacheConfig.clustering().sync().replTimeout(1L, TimeUnit.MINUTES).hash().numOwners(1).numSegments(1).consistentHashFactory(new ControlledConsistentHashFactory(0, new int[0])).stateTransfer().fetchInMemoryState(false);
        return defaultClusteredCacheConfig;
    }
}
