package org.infinispan.xsite.statetransfer.failures;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.topology.CacheTopology;
import org.infinispan.util.BlockingLocalTopologyManager;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.xsite.BackupReceiver;
import org.infinispan.xsite.BackupReceiverDelegator;
import org.infinispan.xsite.statetransfer.XSiteState;
import org.infinispan.xsite.statetransfer.XSiteStatePushCommand;
import org.infinispan.xsite.statetransfer.failures.AbstractTopologyChangeTest;
import org.testng.annotations.Test;

@Test(groups = {"xsite", "unstable"}, testName = "xsite.statetransfer.failures.SiteConsumerTopologyChangeTest")
/* loaded from: input_file:org/infinispan/xsite/statetransfer/failures/SiteConsumerTopologyChangeTest.class */
public class SiteConsumerTopologyChangeTest extends AbstractTopologyChangeTest {

    /* JADX INFO: Access modifiers changed from: package-private */
    @Scope(Scopes.NAMED_CACHE)
    /* loaded from: input_file:org/infinispan/xsite/statetransfer/failures/SiteConsumerTopologyChangeTest$BlockingBackupReceiver.class */
    public static class BlockingBackupReceiver extends BackupReceiverDelegator {
        private final Set<Address> addressSet;
        private final AtomicBoolean discard;
        private final CheckPoint checkPoint;

        @Inject
        DistributionManager manager;

        BlockingBackupReceiver(BackupReceiver backupReceiver, AtomicBoolean atomicBoolean, CheckPoint checkPoint) {
            super(backupReceiver);
            this.addressSet = new HashSet();
            this.discard = atomicBoolean;
            this.checkPoint = checkPoint;
        }

        @Override // org.infinispan.xsite.BackupReceiverDelegator
        public CompletionStage<Void> handleStateTransferState(XSiteStatePushCommand xSiteStatePushCommand) {
            if (!this.discard.get()) {
                return this.delegate.handleStateTransferState(xSiteStatePushCommand);
            }
            synchronized (this.addressSet) {
                if (this.addressSet.size() == 3) {
                    this.checkPoint.trigger("before-block");
                    try {
                        this.checkPoint.awaitStrict("blocked", 30L, TimeUnit.SECONDS);
                        return this.delegate.handleStateTransferState(xSiteStatePushCommand);
                    } catch (InterruptedException | TimeoutException e) {
                        return CompletableFutures.completedExceptionFuture(e);
                    }
                }
                for (XSiteState xSiteState : xSiteStatePushCommand.getChunk()) {
                    this.addressSet.add(this.manager.getCacheTopology().getDistribution(xSiteState.key()).primary());
                }
                return this.delegate.handleStateTransferState(xSiteStatePushCommand);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/xsite/statetransfer/failures/SiteConsumerTopologyChangeTest$NotifierBackupReceiver.class */
    public static class NotifierBackupReceiver extends BackupReceiverDelegator {
        private final CheckPoint checkPoint;

        NotifierBackupReceiver(BackupReceiver backupReceiver, CheckPoint checkPoint) {
            super(backupReceiver);
            this.checkPoint = checkPoint;
        }

        @Override // org.infinispan.xsite.BackupReceiverDelegator
        public CompletionStage<Void> handleStateTransferState(XSiteStatePushCommand xSiteStatePushCommand) {
            this.checkPoint.trigger("before-chunk");
            return this.delegate.handleStateTransferState(xSiteStatePushCommand);
        }
    }

    @Test(enabled = false, description = "Will be fixed by ISPN-6228")
    public void testJoinDuringXSiteST() throws InterruptedException, ExecutionException, TimeoutException {
        doTopologyChangeDuringXSiteST(AbstractTopologyChangeTest.TopologyEvent.JOIN);
    }

    public void testLeaveDuringXSiteST() throws InterruptedException, ExecutionException, TimeoutException {
        doTopologyChangeDuringXSiteST(AbstractTopologyChangeTest.TopologyEvent.LEAVE);
    }

    public void testSiteMasterLeaveDuringXSiteST() throws InterruptedException, ExecutionException, TimeoutException {
        doTopologyChangeDuringXSiteST(AbstractTopologyChangeTest.TopologyEvent.SITE_MASTER_LEAVE);
    }

    public void testXSiteDuringJoin() throws InterruptedException, ExecutionException, TimeoutException {
        doXSiteStateTransferDuringTopologyChange(AbstractTopologyChangeTest.TopologyEvent.JOIN);
    }

    public void testXSiteSTDuringLeave() throws InterruptedException, ExecutionException, TimeoutException {
        doXSiteStateTransferDuringTopologyChange(AbstractTopologyChangeTest.TopologyEvent.LEAVE);
    }

    private void doTopologyChangeDuringXSiteST(AbstractTopologyChangeTest.TopologyEvent topologyEvent) throws TimeoutException, InterruptedException, ExecutionException {
        log.debugf("Start topology change during x-site state transfer with %s", topologyEvent);
        initBeforeTest();
        AbstractTopologyChangeTest.TestCaches<?, ?> createTestCache = createTestCache(topologyEvent, "NYC-2");
        printTestCaches(createTestCache);
        CheckPoint checkPoint = new CheckPoint();
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        TestingUtil.wrapComponent(cache("NYC-2", 0), BackupReceiver.class, backupReceiver -> {
            return new BlockingBackupReceiver(backupReceiver, atomicBoolean, checkPoint);
        });
        log.debug("Start x-site state transfer");
        startStateTransfer(createTestCache.coordinator, "NYC-2");
        assertOnline("LON-1", "NYC-2");
        checkPoint.awaitStrict("before-block", 30L, TimeUnit.SECONDS);
        Future<Void> triggerTopologyChange = triggerTopologyChange("NYC-2", createTestCache.removeIndex);
        atomicBoolean.set(false);
        checkPoint.triggerForever("blocked");
        triggerTopologyChange.get();
        awaitXSiteStateSent("LON-1");
        awaitLocalStateTransfer("NYC-2");
        assertEventuallyNoStateTransferInReceivingSite(null);
        assertData();
    }

    private void doXSiteStateTransferDuringTopologyChange(AbstractTopologyChangeTest.TopologyEvent topologyEvent) throws TimeoutException, InterruptedException, ExecutionException {
        log.debugf("Start topology change during x-site state transfer with %s", topologyEvent);
        initBeforeTest();
        AbstractTopologyChangeTest.TestCaches<?, ?> createTestCache = createTestCache(topologyEvent, "NYC-2");
        printTestCaches(createTestCache);
        BlockingLocalTopologyManager replaceTopologyManagerDefaultCache = BlockingLocalTopologyManager.replaceTopologyManagerDefaultCache(createTestCache.controllerCache.getCacheManager());
        CheckPoint checkPoint = new CheckPoint();
        TestingUtil.wrapComponent(cache("NYC-2", 0), BackupReceiver.class, backupReceiver -> {
            return new NotifierBackupReceiver(backupReceiver, checkPoint);
        });
        Future<Void> triggerTopologyChange = triggerTopologyChange("NYC-2", createTestCache.removeIndex);
        if (topologyEvent == AbstractTopologyChangeTest.TopologyEvent.LEAVE) {
            replaceTopologyManagerDefaultCache.confirmTopologyUpdate(CacheTopology.Phase.NO_REBALANCE);
        }
        replaceTopologyManagerDefaultCache.confirmTopologyUpdate(CacheTopology.Phase.READ_OLD_WRITE_ALL);
        log.debug("Start x-site state transfer");
        startStateTransfer(createTestCache.coordinator, "NYC-2");
        assertOnline("LON-1", "NYC-2");
        checkPoint.awaitStrict("before-chunk", 30L, TimeUnit.SECONDS);
        replaceTopologyManagerDefaultCache.confirmTopologyUpdate(CacheTopology.Phase.READ_ALL_WRITE_ALL);
        replaceTopologyManagerDefaultCache.confirmTopologyUpdate(CacheTopology.Phase.READ_NEW_WRITE_ALL);
        replaceTopologyManagerDefaultCache.confirmTopologyUpdate(CacheTopology.Phase.NO_REBALANCE);
        triggerTopologyChange.get();
        awaitXSiteStateSent("LON-1");
        awaitLocalStateTransfer("NYC-2");
        assertEventuallyNoStateTransferInReceivingSite(null);
        assertData();
    }
}
