package org.infinispan.topology;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Priority;
import org.infinispan.CacheException;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.infinispan.notifications.cachemanagerlistener.annotation.Merged;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.remoting.responses.ExceptionResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.topology.CacheTopologyControlCommand;
import org.infinispan.util.concurrent.ConcurrentMapFactory;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

/* loaded from: input_file:WEB-INF/lib/infinispan-core-5.2.0.Alpha4.jar:org/infinispan/topology/ClusterTopologyManagerImpl.class */
public class ClusterTopologyManagerImpl implements ClusterTopologyManager {
    private static Log log = LogFactory.getLog(ClusterTopologyManagerImpl.class);
    private Transport transport;
    private RebalancePolicy rebalancePolicy;
    private GlobalConfiguration globalConfiguration;
    private GlobalComponentRegistry gcr;
    private CacheManagerNotifier cacheManagerNotifier;
    private ExecutorService asyncTransportExecutor;
    private volatile boolean isCoordinator;
    private volatile boolean isShuttingDown;
    private volatile int viewId = -1;
    private final Object viewUpdateLock = new Object();
    private final ConcurrentMap<String, RebalanceInfo> rebalanceStatusMap = ConcurrentMapFactory.makeConcurrentMap();
    private ClusterViewListener listener;

    @Listener(sync = false)
    /* loaded from: input_file:WEB-INF/lib/infinispan-core-5.2.0.Alpha4.jar:org/infinispan/topology/ClusterTopologyManagerImpl$ClusterViewListener.class */
    public class ClusterViewListener {
        public ClusterViewListener() {
        }

        @Merged
        @ViewChanged
        public void handleViewChange(ViewChangedEvent viewChangedEvent) {
            ClusterTopologyManagerImpl.this.handleNewView(viewChangedEvent.getNewMembers(), viewChangedEvent.isMergeView(), viewChangedEvent.getViewId());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/infinispan-core-5.2.0.Alpha4.jar:org/infinispan/topology/ClusterTopologyManagerImpl$RebalanceInfo.class */
    public static class RebalanceInfo {
        private final String cacheName;
        private final int topologyId;
        private final Set<Address> confirmationsNeeded;

        public RebalanceInfo(String str, int i, Collection<Address> collection) {
            this.cacheName = str;
            this.topologyId = i;
            this.confirmationsNeeded = new HashSet(collection);
            ClusterTopologyManagerImpl.log.tracef("Initialized rebalance confirmation collector %d, initial list is %s", Integer.valueOf(i), this.confirmationsNeeded);
        }

        public boolean confirmRebalance(Address address, int i) {
            synchronized (this) {
                if (this.topologyId != i) {
                    throw new CacheException(String.format("Received invalid rebalance confirmation from %s for cache %s, expecting topology id %d but got %d", address, this.cacheName, Integer.valueOf(this.topologyId), Integer.valueOf(i)));
                }
                if (!this.confirmationsNeeded.remove(address)) {
                    ClusterTopologyManagerImpl.log.tracef("Rebalance confirmation collector %d ignored confirmation for %s, which is not a member", Integer.valueOf(this.topologyId), address);
                    return false;
                }
                ClusterTopologyManagerImpl.log.tracef("Rebalance confirmation collector %d received confirmation for %s, remaining list is %s", Integer.valueOf(this.topologyId), address, this.confirmationsNeeded);
                return this.confirmationsNeeded.isEmpty();
            }
        }

        public boolean updateMembers(Collection<Address> collection) {
            boolean z;
            synchronized (this) {
                z = this.confirmationsNeeded.retainAll(collection) && this.confirmationsNeeded.isEmpty();
            }
            return z;
        }

        public String toString() {
            return "RebalanceInfo{topologyId=" + this.topologyId + ", confirmationsNeeded=" + this.confirmationsNeeded + '}';
        }
    }

    @Inject
    public void inject(Transport transport, RebalancePolicy rebalancePolicy, @ComponentName("org.infinispan.executors.transport") ExecutorService executorService, GlobalConfiguration globalConfiguration, GlobalComponentRegistry globalComponentRegistry, CacheManagerNotifier cacheManagerNotifier) {
        this.transport = transport;
        this.rebalancePolicy = rebalancePolicy;
        this.asyncTransportExecutor = executorService;
        this.globalConfiguration = globalConfiguration;
        this.gcr = globalComponentRegistry;
        this.cacheManagerNotifier = cacheManagerNotifier;
    }

    @Start(priority = 100)
    public void start() {
        this.isShuttingDown = false;
        this.isCoordinator = this.transport.isCoordinator();
        this.listener = new ClusterViewListener();
        this.cacheManagerNotifier.addListener(this.listener);
        handleNewView(this.transport.getMembers(), false, this.transport.getViewId());
    }

    @Stop(priority = 100)
    public void stop() {
        this.isShuttingDown = true;
        this.cacheManagerNotifier.removeListener(this.listener);
        synchronized (this.viewUpdateLock) {
            this.viewId = Priority.OFF_INT;
            this.viewUpdateLock.notifyAll();
        }
    }

    @Override // org.infinispan.topology.ClusterTopologyManager
    public void updateConsistentHash(String str, CacheTopology cacheTopology) throws Exception {
        log.debugf("Updating cluster-wide consistent hash for cache %s, topology = %s", str, cacheTopology);
        executeOnClusterSync(new CacheTopologyControlCommand(str, CacheTopologyControlCommand.Type.CH_UPDATE, this.transport.getAddress(), cacheTopology, this.transport.getViewId()), getGlobalTimeout());
        RebalanceInfo rebalanceInfo = this.rebalanceStatusMap.get(str);
        if (rebalanceInfo == null || !rebalanceInfo.updateMembers(cacheTopology.getMembers())) {
            return;
        }
        onClusterRebalanceCompleted(str, cacheTopology.getTopologyId(), rebalanceInfo);
    }

    private void onClusterRebalanceCompleted(String str, int i, RebalanceInfo rebalanceInfo) throws Exception {
        log.debugf("Removing rebalance information for topology id %d", Integer.valueOf(i));
        this.rebalanceStatusMap.remove(str);
        this.rebalancePolicy.onRebalanceCompleted(str, i);
    }

    @Override // org.infinispan.topology.ClusterTopologyManager
    public void rebalance(String str, CacheTopology cacheTopology) throws Exception {
        log.debugf("Starting cluster-wide rebalance for cache %s, topology = %s", str, cacheTopology);
        RebalanceInfo putIfAbsent = this.rebalanceStatusMap.putIfAbsent(str, new RebalanceInfo(str, cacheTopology.getTopologyId(), cacheTopology.getPendingCH().getMembers()));
        if (putIfAbsent != null) {
            throw new IllegalStateException("Aborting the current rebalance, there is another operation in progress: " + putIfAbsent);
        }
        executeOnClusterAsync(new CacheTopologyControlCommand(str, CacheTopologyControlCommand.Type.REBALANCE_START, this.transport.getAddress(), cacheTopology, this.viewId));
    }

    @Override // org.infinispan.topology.ClusterTopologyManager
    public CacheTopology handleJoin(String str, Address address, CacheJoinInfo cacheJoinInfo, int i) throws Exception {
        waitForView(i);
        if (this.isShuttingDown) {
            log.debugf("Ignoring join request from %s for cache %s, the local cache manager is shutting down", address, str);
            return null;
        }
        this.rebalancePolicy.initCache(str, cacheJoinInfo);
        return this.rebalancePolicy.addJoiners(str, Collections.singletonList(address));
    }

    @Override // org.infinispan.topology.ClusterTopologyManager
    public void handleLeave(String str, Address address, int i) throws Exception {
        if (this.isShuttingDown) {
            log.debugf("Ignoring leave request from %s for cache %s, the local cache manager is shutting down", address, str);
        } else {
            this.rebalancePolicy.removeLeavers(str, Collections.singletonList(address));
        }
    }

    @Override // org.infinispan.topology.ClusterTopologyManager
    public void handleRebalanceCompleted(String str, Address address, int i, Throwable th, int i2) throws Exception {
        if (th != null) {
            log.rebalanceError(str, address, th);
        }
        log.debugf("Finished local rebalance for cache %s on node %s, topology id = %d", str, address, Integer.valueOf(i));
        RebalanceInfo rebalanceInfo = this.rebalanceStatusMap.get(str);
        if (rebalanceInfo == null) {
            throw new CacheException(String.format("Received invalid rebalance confirmation from %s for cache %s, we don't have a rebalance in progress", address, str));
        }
        if (rebalanceInfo.confirmRebalance(address, i)) {
            onClusterRebalanceCompleted(str, i, rebalanceInfo);
        }
    }

    private void waitForView(int i) throws InterruptedException {
        if (this.viewId < i) {
            log.tracef("Received a cache topology command with a higher view id: %s, our view id is %s", Integer.valueOf(i), Integer.valueOf(this.viewId));
        }
        synchronized (this.viewUpdateLock) {
            while (this.viewId < i) {
                this.viewUpdateLock.wait(1000L);
            }
        }
    }

    private Map<Address, Object> executeOnClusterSync(final ReplicableCommand replicableCommand, final int i) throws Exception {
        Future submit = this.asyncTransportExecutor.submit(new Callable<Map<Address, Response>>() { // from class: org.infinispan.topology.ClusterTopologyManagerImpl.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Map<Address, Response> call() throws Exception {
                return ClusterTopologyManagerImpl.this.transport.invokeRemotely(null, replicableCommand, ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, i, true, null);
            }
        });
        Future submit2 = this.asyncTransportExecutor.submit(new Callable<Object>() { // from class: org.infinispan.topology.ClusterTopologyManagerImpl.2
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                ClusterTopologyManagerImpl.this.gcr.wireDependencies(replicableCommand);
                try {
                    return replicableCommand.perform(null);
                } catch (Throwable th) {
                    throw new Exception(th);
                }
            }
        });
        Map map = (Map) submit.get(i, TimeUnit.MILLISECONDS);
        HashMap hashMap = new HashMap(this.transport.getMembers().size());
        for (Map.Entry entry : map.entrySet()) {
            Address address = (Address) entry.getKey();
            Response response = (Response) entry.getValue();
            if (!response.isSuccessful()) {
                throw new CacheException("Unsuccessful response received from node " + address + ": " + response, response instanceof ExceptionResponse ? ((ExceptionResponse) response).getException() : null);
            }
            hashMap.put(address, ((SuccessfulResponse) response).getResponseValue());
        }
        Response response2 = (Response) submit2.get(i, TimeUnit.MILLISECONDS);
        if (!response2.isSuccessful()) {
            throw new CacheException("Unsuccessful local response");
        }
        hashMap.put(this.transport.getAddress(), ((SuccessfulResponse) response2).getResponseValue());
        return hashMap;
    }

    private void executeOnClusterAsync(final ReplicableCommand replicableCommand) throws Exception {
        this.transport.invokeRemotely(null, replicableCommand, ResponseMode.ASYNCHRONOUS_WITH_SYNC_MARSHALLING, -1L, true, null);
        this.asyncTransportExecutor.submit(new Callable<Object>() { // from class: org.infinispan.topology.ClusterTopologyManagerImpl.3
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                ClusterTopologyManagerImpl.this.gcr.wireDependencies(replicableCommand);
                try {
                    return replicableCommand.perform(null);
                } catch (Throwable th) {
                    throw new Exception(th);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleNewView(List<Address> list, boolean z, int i) {
        if (i <= this.viewId) {
            log.tracef("Ignoring old cluster view notification: %s", Integer.valueOf(i));
            return;
        }
        log.tracef("Received new cluster view: %s", Integer.valueOf(i));
        boolean z2 = !this.isCoordinator && this.transport.isCoordinator();
        this.isCoordinator = this.transport.isCoordinator();
        if (z || z2) {
            try {
                for (Map.Entry<String, List<CacheTopology>> entry : recoverClusterStatus().entrySet()) {
                    this.rebalancePolicy.initCache(entry.getKey(), entry.getValue());
                }
            } catch (InterruptedException e) {
                log.tracef("Cluster state recovery interrupted because the coordinator is shutting down", new Object[0]);
                return;
            } catch (Exception e2) {
                log.failedToRecoverClusterState(e2);
            }
        } else if (this.isCoordinator) {
            try {
                this.rebalancePolicy.updateMembersList(list);
            } catch (Exception e3) {
                log.errorUpdatingMembersList(e3);
            }
        }
        synchronized (this.viewUpdateLock) {
            this.viewId = i;
            this.viewUpdateLock.notifyAll();
        }
    }

    private HashMap<String, List<CacheTopology>> recoverClusterStatus() throws Exception {
        log.debugf("Recovering running caches in the cluster", new Object[0]);
        Map<Address, Object> executeOnClusterSync = executeOnClusterSync(new CacheTopologyControlCommand(null, CacheTopologyControlCommand.Type.GET_STATUS, this.transport.getAddress(), this.transport.getViewId()), getGlobalTimeout());
        HashMap<String, List<CacheTopology>> hashMap = new HashMap<>();
        Iterator<Object> it = executeOnClusterSync.values().iterator();
        while (it.hasNext()) {
            for (Map.Entry entry : ((Map) it.next()).entrySet()) {
                String str = (String) entry.getKey();
                CacheJoinInfo cacheJoinInfo = (CacheJoinInfo) ((Object[]) entry.getValue())[0];
                CacheTopology cacheTopology = (CacheTopology) ((Object[]) entry.getValue())[1];
                List<CacheTopology> list = hashMap.get(str);
                if (list == null) {
                    this.rebalancePolicy.initCache(str, cacheJoinInfo);
                    list = new ArrayList();
                    hashMap.put(str, list);
                }
                list.add(cacheTopology);
            }
        }
        return hashMap;
    }

    private int getGlobalTimeout() {
        return (int) this.globalConfiguration.transport().distributedSyncTimeout();
    }
}
