package org.infinispan.xsite.statetransfer;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commons.util.CollectionFactory;
import org.infinispan.configuration.cache.BackupConfiguration;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.notifications.cachelistener.annotation.TopologyChanged;
import org.infinispan.notifications.cachelistener.event.TopologyChangedEvent;
import org.infinispan.remoting.LocalInvocation;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.responses.ExceptionResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.ResponseGenerator;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.RetryOnFailureXSiteCommand;
import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.xsite.XSiteBackup;
import org.infinispan.xsite.statetransfer.XSiteStateTransferControlCommand;

@Listener
/* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.2.8-SNAPSHOT.jar:org/infinispan/xsite/statetransfer/XSiteStateTransferManagerImpl.class */
public class XSiteStateTransferManagerImpl implements XSiteStateTransferManager {
    private static final Log log = LogFactory.getLog(XSiteStateTransferManagerImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    private static final boolean debug = log.isDebugEnabled();
    private final ConcurrentMap<String, XSiteStateTransferCollector> siteCollector = CollectionFactory.makeConcurrentMap();
    private final ConcurrentMap<String, String> status = CollectionFactory.makeConcurrentMap();
    private RpcManager rpcManager;
    private Configuration configuration;
    private CommandsFactory commandsFactory;
    private ResponseGenerator responseGenerator;
    private ExecutorService asyncExecutor;
    private StateTransferManager stateTransferManager;
    private CacheNotifier cacheNotifier;
    private XSiteStateConsumer consumer;
    private XSiteStateProvider provider;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/infinispan-embedded-8.2.8-SNAPSHOT.jar:org/infinispan/xsite/statetransfer/XSiteStateTransferManagerImpl$BackupRpcConfiguration.class */
    public static class BackupRpcConfiguration {
        private final long waitTime;
        private final int maxRetries;

        private BackupRpcConfiguration(long j, int i) {
            this.waitTime = j;
            this.maxRetries = i;
        }
    }

    @Inject
    public void inject(RpcManager rpcManager, Configuration configuration, CommandsFactory commandsFactory, ResponseGenerator responseGenerator, StateTransferManager stateTransferManager, CacheNotifier cacheNotifier, XSiteStateConsumer xSiteStateConsumer, XSiteStateProvider xSiteStateProvider, @ComponentName("org.infinispan.executors.transport") ExecutorService executorService) {
        this.rpcManager = rpcManager;
        this.configuration = configuration;
        this.commandsFactory = commandsFactory;
        this.responseGenerator = responseGenerator;
        this.asyncExecutor = executorService;
        this.stateTransferManager = stateTransferManager;
        this.cacheNotifier = cacheNotifier;
        this.consumer = xSiteStateConsumer;
        this.provider = xSiteStateProvider;
    }

    @Start
    public void addListener() {
        this.cacheNotifier.addListener(this);
    }

    @Stop
    public void removeListener() {
        this.cacheNotifier.removeListener(this);
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public void notifyStatePushFinished(String str, Address address, boolean z) throws Throwable {
        XSiteStateTransferCollector xSiteStateTransferCollector = this.siteCollector.get(str);
        if (xSiteStateTransferCollector == null) {
            return;
        }
        XSiteBackup findSite = findSite(str);
        if (xSiteStateTransferCollector.confirmStateTransfer(address, z)) {
            this.siteCollector.remove(str);
            this.status.put(str, xSiteStateTransferCollector.isStatusOk() ? XSiteStateTransferManager.STATUS_OK : XSiteStateTransferManager.STATUS_ERROR);
            controlStateTransferOnLocalSite(XSiteStateTransferControlCommand.StateTransferControl.CANCEL_SEND, str);
            controlStateTransferOnRemoteSite(findSite, XSiteStateTransferControlCommand.StateTransferControl.FINISH_RECEIVE, backupRpcConfiguration(str));
        }
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public final void startPushState(String str) throws Throwable {
        if (str == null) {
            throw new NullPointerException("Site name cannot be null!");
        }
        XSiteBackup findSite = findSite(str);
        if (findSite == null) {
            throw new IllegalArgumentException("Site " + str + " not found!");
        }
        if (this.siteCollector.putIfAbsent(str, new XSiteStateTransferCollector(this.rpcManager.getMembers())) != null) {
            throw new Exception(String.format("X-Site state transfer to '%s' already started!", str));
        }
        this.status.remove(str);
        try {
            controlStateTransferOnRemoteSite(findSite, XSiteStateTransferControlCommand.StateTransferControl.START_RECEIVE, null);
            if (!this.stateTransferManager.isStateTransferInProgress()) {
                controlStateTransferOnLocalSite(XSiteStateTransferControlCommand.StateTransferControl.START_SEND, str);
            } else if (debug) {
                log.debugf("Not start sending keys to site '%s' while rebalance in progress. Wait until it is finished!", str);
            }
        } catch (Throwable th) {
            handleFailure(findSite);
            throw new Exception(th);
        }
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public List<String> getRunningStateTransfers() {
        return this.siteCollector.isEmpty() ? Collections.emptyList() : new ArrayList(this.siteCollector.keySet());
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public Map<String, String> getStatus() {
        return this.status.isEmpty() ? Collections.emptyMap() : new HashMap(this.status);
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public void clearStatus() {
        this.status.clear();
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public void cancelPushState(String str) throws Throwable {
        if (!this.siteCollector.containsKey(str)) {
            if (trace) {
                log.tracef("Tried to cancel push state to '%s' but it does not exist.", str);
                return;
            }
            return;
        }
        XSiteBackup findSite = findSite(str);
        if (findSite == null) {
            throw new IllegalArgumentException("Site " + str + " not found!");
        }
        controlStateTransferOnLocalSite(XSiteStateTransferControlCommand.StateTransferControl.CANCEL_SEND, str);
        controlStateTransferOnRemoteSite(findSite, XSiteStateTransferControlCommand.StateTransferControl.FINISH_RECEIVE, null);
        this.siteCollector.remove(str);
        this.status.put(str, XSiteStateTransferManager.STATUS_CANCELED);
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public Map<String, String> getClusterStatus() throws Exception {
        XSiteStateTransferControlCommand buildXSiteStateTransferControlCommand = this.commandsFactory.buildXSiteStateTransferControlCommand(XSiteStateTransferControlCommand.StateTransferControl.STATUS_REQUEST, null);
        HashMap hashMap = new HashMap();
        for (Response response : invokeRemotelyInLocalSite(buildXSiteStateTransferControlCommand).values()) {
            if (response instanceof SuccessfulResponse) {
                hashMap.putAll((Map) ((SuccessfulResponse) response).getResponseValue());
            }
        }
        return hashMap;
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public void clearClusterStatus() throws Exception {
        controlStateTransferOnLocalSite(XSiteStateTransferControlCommand.StateTransferControl.CLEAR_STATUS, null);
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public String getSendingSiteName() {
        return this.consumer.getSendingSiteName();
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public void cancelReceive(String str) throws Exception {
        controlStateTransferOnLocalSite(XSiteStateTransferControlCommand.StateTransferControl.FINISH_RECEIVE, str);
    }

    @Override // org.infinispan.xsite.statetransfer.XSiteStateTransferManager
    public void becomeCoordinator(String str) {
        startCoordinating(Collections.singleton(str), this.rpcManager.getMembers());
        if (this.stateTransferManager.isStateTransferInProgress()) {
            try {
                log.debugf("Canceling x-site state transfer for site %s", str);
                controlStateTransferOnLocalSite(XSiteStateTransferControlCommand.StateTransferControl.CANCEL_SEND, str);
                return;
            } catch (Exception e) {
                log.debugf(e, "Unable to cancel x-site state transfer for site %s", str);
                return;
            }
        }
        log.debugf("Restarting x-site state transfer for site %s", str);
        try {
            controlStateTransferOnLocalSite(XSiteStateTransferControlCommand.StateTransferControl.RESTART_SEND, str);
        } catch (Exception e2) {
            log.failedToRestartXSiteStateTransfer(str, e2);
        }
    }

    @TopologyChanged
    public <K, V> void handleTopology(TopologyChangedEvent<K, V> topologyChangedEvent) {
        if (debug) {
            log.debugf("Topology change detected! %s", topologyChangedEvent);
        }
        if (topologyChangedEvent.isPre()) {
            return;
        }
        List<Address> members = topologyChangedEvent.getConsistentHashAtEnd().getMembers();
        boolean equals = members.get(0).equals(this.rpcManager.getAddress());
        Collection<String> sitesMissingCoordinator = this.provider.getSitesMissingCoordinator(new HashSet(members));
        if (equals) {
            startCoordinating(sitesMissingCoordinator, members);
        }
        if (this.stateTransferManager.isStateTransferInProgress()) {
            for (String str : this.siteCollector.keySet()) {
                try {
                    log.debugf("Topology change detected! Canceling x-site state transfer for site %s", str);
                    controlStateTransferOnLocalSite(XSiteStateTransferControlCommand.StateTransferControl.CANCEL_SEND, str);
                } catch (Exception e) {
                    log.debugf(e, "Unable to cancel x-site state transfer for site %s", str);
                }
            }
            return;
        }
        for (Map.Entry<String, XSiteStateTransferCollector> entry : this.siteCollector.entrySet()) {
            entry.setValue(new XSiteStateTransferCollector(members));
            log.debugf("Topology change detected! Restarting x-site state transfer for site %s", entry.getKey());
            try {
                controlStateTransferOnLocalSite(XSiteStateTransferControlCommand.StateTransferControl.RESTART_SEND, entry.getKey());
            } catch (Exception e2) {
                log.failedToRestartXSiteStateTransfer(entry.getKey(), e2);
            }
        }
    }

    private void startCoordinating(Collection<String> collection, Collection<Address> collection2) {
        if (debug) {
            log.debugf("Becoming the x-site state transfer coordinator for %s", collection);
        }
        for (String str : collection) {
            if (str == null) {
                throw new NullPointerException("Site name cannot be null!");
            }
            if (findSite(str) == null) {
                throw new IllegalArgumentException("Site " + str + " not found!");
            }
            this.siteCollector.putIfAbsent(str, new XSiteStateTransferCollector(collection2));
        }
    }

    private void handleFailure(XSiteBackup xSiteBackup) {
        if (debug) {
            log.debugf("Handle start state transfer failure to %s", xSiteBackup.getSiteName());
        }
        this.siteCollector.remove(xSiteBackup.getSiteName());
        try {
            controlStateTransferOnLocalSite(XSiteStateTransferControlCommand.StateTransferControl.CANCEL_SEND, xSiteBackup.getSiteName());
        } catch (Exception e) {
            if (debug) {
                log.debugf(e, "Exception while cancel sending to remote site %s", xSiteBackup.getSiteName());
            }
        }
        try {
            controlStateTransferOnRemoteSite(xSiteBackup, XSiteStateTransferControlCommand.StateTransferControl.FINISH_RECEIVE, null);
        } catch (Throwable th) {
            if (debug) {
                log.debugf(th, "Exception while cancel receiving in remote site %s", xSiteBackup.getSiteName());
            }
        }
    }

    private void controlStateTransferOnRemoteSite(XSiteBackup xSiteBackup, XSiteStateTransferControlCommand.StateTransferControl stateTransferControl, BackupRpcConfiguration backupRpcConfiguration) throws Throwable {
        RetryOnFailureXSiteCommand.newInstance(xSiteBackup, this.commandsFactory.buildXSiteStateTransferControlCommand(stateTransferControl, null), backupRpcConfiguration == null ? RetryOnFailureXSiteCommand.NO_RETRY : new RetryOnFailureXSiteCommand.MaxRetriesPolicy(backupRpcConfiguration.maxRetries)).execute(this.rpcManager.getTransport(), backupRpcConfiguration == null ? 1L : backupRpcConfiguration.waitTime, TimeUnit.MILLISECONDS);
    }

    private void controlStateTransferOnLocalSite(XSiteStateTransferControlCommand.StateTransferControl stateTransferControl, String str) throws Exception {
        XSiteStateTransferControlCommand buildXSiteStateTransferControlCommand = this.commandsFactory.buildXSiteStateTransferControlCommand(stateTransferControl, str);
        buildXSiteStateTransferControlCommand.setTopologyId(currentTopologyId());
        for (Map.Entry<Address, Response> entry : invokeRemotelyInLocalSite(buildXSiteStateTransferControlCommand).entrySet()) {
            if (entry.getValue() instanceof ExceptionResponse) {
                throw ((ExceptionResponse) entry.getValue()).getException();
            }
        }
    }

    private int currentTopologyId() {
        return this.stateTransferManager.getCacheTopology().getTopologyId();
    }

    private XSiteBackup findSite(String str) {
        for (BackupConfiguration backupConfiguration : this.configuration.sites().allBackups()) {
            if (backupConfiguration.site().equals(str)) {
                return new XSiteBackup(backupConfiguration.site(), true, backupConfiguration.stateTransfer().timeout());
            }
        }
        return null;
    }

    private BackupRpcConfiguration backupRpcConfiguration(String str) {
        for (BackupConfiguration backupConfiguration : this.configuration.sites().allBackups()) {
            if (backupConfiguration.site().equals(str)) {
                return new BackupRpcConfiguration(backupConfiguration.stateTransfer().waitTime(), backupConfiguration.stateTransfer().maxRetries());
            }
        }
        return null;
    }

    private Map<Address, Response> invokeRemotelyInLocalSite(CacheRpcCommand cacheRpcCommand) throws Exception {
        this.commandsFactory.initializeReplicableCommand(cacheRpcCommand, false);
        CompletableFuture<Map<Address, Response>> invokeRemotelyAsync = this.rpcManager.invokeRemotelyAsync(null, cacheRpcCommand, this.rpcManager.getRpcOptionsBuilder(ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, DeliverOrder.NONE).build());
        Future submit = this.asyncExecutor.submit(LocalInvocation.newInstance(this.responseGenerator, cacheRpcCommand, this.commandsFactory, this.rpcManager.getAddress()));
        HashMap hashMap = new HashMap();
        hashMap.put(this.rpcManager.getAddress(), submit.get());
        hashMap.putAll((Map) CompletableFutures.await(invokeRemotelyAsync));
        return hashMap;
    }
}
