package org.infinispan.xsite;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.concurrent.AbstractInProcessFuture;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.remoting.LocalInvocation;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.responses.CacheNotFoundResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.concurrent.TimeoutException;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.xsite.statetransfer.XSiteState;
import org.infinispan.xsite.statetransfer.XSiteStatePushCommand;
import org.infinispan.xsite.statetransfer.XSiteStateTransferControlCommand;

/* loaded from: input_file:WEB-INF/lib/infinispan-core-8.0.2-SNAPSHOT.jar:org/infinispan/xsite/ClusteredCacheBackupReceiver.class */
public class ClusteredCacheBackupReceiver extends BaseBackupReceiver {
    private static final Log log = LogFactory.getLog(ClusteredCacheBackupReceiver.class);
    private static final boolean trace = log.isDebugEnabled();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/infinispan-core-8.0.2-SNAPSHOT.jar:org/infinispan/xsite/ClusteredCacheBackupReceiver$StatePushTask.class */
    public static class StatePushTask {
        private final List<XSiteState> chunk;
        private final Address address;
        private final Cache<?, ?> cache;
        private volatile Future<Map<Address, Response>> remoteFuture;

        private StatePushTask(List<XSiteState> list, Address address, Cache<?, ?> cache) {
            this.chunk = list;
            this.address = address;
            this.cache = cache;
        }

        public void executeRemote() {
            RpcManager rpcManager = this.cache.getAdvancedCache().getRpcManager();
            this.remoteFuture = rpcManager.invokeRemotelyAsync(Collections.singletonList(this.address), BaseBackupReceiver.newStatePushCommand(this.cache, this.chunk), rpcManager.getDefaultRpcOptions(true));
        }

        public void executeLocal() {
            try {
                final Response call = LocalInvocation.newInstanceFromCache(this.cache, BaseBackupReceiver.newStatePushCommand(this.cache, this.chunk)).call();
                this.remoteFuture = new AbstractInProcessFuture<Map<Address, Response>>() { // from class: org.infinispan.xsite.ClusteredCacheBackupReceiver.StatePushTask.1
                    @Override // java.util.concurrent.Future
                    public Map<Address, Response> get() throws InterruptedException, ExecutionException {
                        return Collections.singletonMap(StatePushTask.this.address, call);
                    }
                };
            } catch (Exception e) {
                this.remoteFuture = new AbstractInProcessFuture<Map<Address, Response>>() { // from class: org.infinispan.xsite.ClusteredCacheBackupReceiver.StatePushTask.2
                    @Override // java.util.concurrent.Future
                    public Map<Address, Response> get() throws InterruptedException, ExecutionException {
                        throw new ExecutionException(e);
                    }
                };
            }
        }

        public Response awaitResponse() throws Exception {
            Future<Map<Address, Response>> future = this.remoteFuture;
            if (future == null) {
                throw new NullPointerException("Should not happen!");
            }
            Map<Address, Response> map = future.get();
            if (map.size() == 1 && map.containsKey(this.address)) {
                return map.values().iterator().next();
            }
            throw new IllegalStateException("Shouldn't happen. Map is " + map);
        }
    }

    public ClusteredCacheBackupReceiver(Cache<Object, Object> cache) {
        super(cache);
    }

    private static boolean awaitRemoteTask(Cache<?, ?> cache, StatePushTask statePushTask) throws Exception {
        try {
            if (trace) {
                log.tracef("Waiting reply from %s", statePushTask.address);
            }
            Response awaitResponse = statePushTask.awaitResponse();
            if (trace) {
                log.tracef("Response received is %s", awaitResponse);
            }
            if (awaitResponse == CacheNotFoundResponse.INSTANCE) {
                if (trace) {
                    log.tracef("Cache not found in node '%s'. Retrying locally!", statePushTask.address);
                }
                if (!cache.getStatus().allowInvocations()) {
                    throw new CacheException("Cache is stopping or terminated: " + cache.getStatus());
                }
                statePushTask.executeLocal();
            }
            return true;
        } catch (Exception e) {
            if (!cache.getStatus().allowInvocations()) {
                throw new CacheException("Cache is stopping or terminated: " + cache.getStatus());
            }
            if (!cache.getAdvancedCache().getRpcManager().getMembers().contains(statePushTask.address) || cache.getAdvancedCache().getRpcManager().getAddress().equals(statePushTask.address)) {
                if (trace) {
                    log.tracef(e, "An exception was sent by %s. Retrying locally!", statePushTask.address);
                }
                statePushTask.executeLocal();
                return false;
            }
            if (trace) {
                log.tracef(e, "An exception was sent by %s. Retrying!", statePushTask.address);
            }
            statePushTask.executeRemote();
            return false;
        }
    }

    @Override // org.infinispan.xsite.BackupReceiver
    public void handleStateTransferControl(XSiteStateTransferControlCommand xSiteStateTransferControlCommand) throws Exception {
        XSiteStateTransferControlCommand xSiteStateTransferControlCommand2 = xSiteStateTransferControlCommand;
        if (!xSiteStateTransferControlCommand.getCacheName().equals(this.cache.getName())) {
            xSiteStateTransferControlCommand2 = xSiteStateTransferControlCommand.copyForCache(this.cache.getName());
        }
        xSiteStateTransferControlCommand2.setSiteName(xSiteStateTransferControlCommand.getOriginSite());
        invokeRemotelyInLocalSite(xSiteStateTransferControlCommand2);
    }

    @Override // org.infinispan.xsite.BackupReceiver
    public void handleStateTransferState(XSiteStatePushCommand xSiteStatePushCommand) throws Exception {
        if (!this.cache.getStatus().allowInvocations()) {
            throw new CacheException("Cache is stopping or terminated: " + this.cache.getStatus());
        }
        long expectedEndTime = this.timeService.expectedEndTime(xSiteStatePushCommand.getTimeout(), TimeUnit.MILLISECONDS);
        ClusteringDependentLogic clusteringDependentLogic = (ClusteringDependentLogic) this.cache.getAdvancedCache().getComponentRegistry().getComponent(ClusteringDependentLogic.class);
        HashMap hashMap = new HashMap();
        Address address = clusteringDependentLogic.getAddress();
        if (trace) {
            log.tracef("Received X-Site state transfer '%s'. Splitting by primary owner.", xSiteStatePushCommand);
        }
        for (XSiteState xSiteState : xSiteStatePushCommand.getChunk()) {
            Address primaryOwner = clusteringDependentLogic.getPrimaryOwner(xSiteState.key());
            List list = (List) hashMap.get(primaryOwner);
            if (list == null) {
                list = new LinkedList();
                hashMap.put(primaryOwner, list);
            }
            list.add(xSiteState);
        }
        List list2 = (List) hashMap.remove(address);
        ArrayList arrayList = new ArrayList(hashMap.size());
        for (Map.Entry entry : hashMap.entrySet()) {
            if (entry.getValue() != null && !((List) entry.getValue()).isEmpty()) {
                if (trace) {
                    log.tracef("Node '%s' will apply %s", entry.getKey(), entry.getValue());
                }
                StatePushTask statePushTask = new StatePushTask((List) entry.getValue(), (Address) entry.getKey(), this.cache);
                arrayList.add(statePushTask);
                statePushTask.executeRemote();
            }
        }
        hashMap.clear();
        if (trace) {
            log.tracef("Local node '%s' will apply %s", address, list2);
        }
        if (list2 != null) {
            StatePushTask statePushTask2 = new StatePushTask(list2, address, this.cache);
            arrayList.add(statePushTask2);
            statePushTask2.executeLocal();
        }
        if (trace) {
            log.tracef("Waiting for the remote tasks...", new Object[0]);
        }
        while (!arrayList.isEmpty() && !this.timeService.isTimeExpired(expectedEndTime)) {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                if (awaitRemoteTask(this.cache, (StatePushTask) it.next())) {
                    it.remove();
                }
            }
        }
        if (!this.cache.getStatus().allowInvocations()) {
            throw new CacheException("Cache is stopping or terminated: " + this.cache.getStatus());
        }
        if (!arrayList.isEmpty()) {
            throw new TimeoutException("Unable to apply state in the time limit.");
        }
    }

    private Map<Address, Response> invokeRemotelyInLocalSite(CacheRpcCommand cacheRpcCommand) throws Exception {
        RpcManager rpcManager = this.cache.getAdvancedCache().getRpcManager();
        CompletableFuture<Map<Address, Response>> invokeRemotelyAsync = rpcManager.invokeRemotelyAsync(null, cacheRpcCommand, rpcManager.getDefaultRpcOptions(true, DeliverOrder.NONE));
        HashMap hashMap = new HashMap();
        hashMap.put(rpcManager.getAddress(), LocalInvocation.newInstanceFromCache(this.cache, cacheRpcCommand).call());
        hashMap.putAll(invokeRemotelyAsync.get());
        return hashMap;
    }
}
