package org.infinispan.topology;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.infinispan.CacheException;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.remoting.responses.ExceptionResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.topology.CacheTopologyControlCommand;
import org.infinispan.util.concurrent.ConcurrentMapFactory;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

/* loaded from: input_file:infinispan-core-5.2.6.Final.jar:org/infinispan/topology/LocalTopologyManagerImpl.class */
public class LocalTopologyManagerImpl implements LocalTopologyManager {
    private static Log log = LogFactory.getLog(LocalTopologyManagerImpl.class);
    private static final boolean trace = log.isTraceEnabled();
    private Transport transport;
    private ExecutorService asyncTransportExecutor;
    private GlobalComponentRegistry gcr;
    private ConcurrentMap<String, LocalCacheStatus> runningCaches = ConcurrentMapFactory.makeConcurrentMap();
    private volatile boolean running;

    @Inject
    public void inject(Transport transport, @ComponentName("org.infinispan.executors.transport") ExecutorService executorService, GlobalComponentRegistry globalComponentRegistry) {
        this.transport = transport;
        this.asyncTransportExecutor = executorService;
        this.gcr = globalComponentRegistry;
    }

    @Start(priority = 100)
    public void start() {
        this.running = true;
    }

    @Stop(priority = 9)
    public void stop() {
        this.running = false;
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public CacheTopology join(String str, CacheJoinInfo cacheJoinInfo, CacheTopologyHandler cacheTopologyHandler) throws Exception {
        CacheTopology cacheTopology;
        log.debugf("Node %s joining cache %s", this.transport.getAddress(), str);
        this.runningCaches.put(str, new LocalCacheStatus(cacheJoinInfo, cacheTopologyHandler));
        int viewId = this.transport.getViewId();
        CacheTopologyControlCommand cacheTopologyControlCommand = new CacheTopologyControlCommand(str, CacheTopologyControlCommand.Type.JOIN, this.transport.getAddress(), cacheJoinInfo, viewId);
        long timeout = cacheJoinInfo.getTimeout();
        long nanoTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout);
        while (true) {
            try {
                cacheTopology = (CacheTopology) executeOnCoordinator(cacheTopologyControlCommand, timeout);
            } catch (Exception e) {
                log.debugf(e, "Error sending join request for cache %s to coordinator", str);
                if (nanoTime <= System.nanoTime()) {
                    throw e;
                }
                Thread.sleep(1000L);
            }
            if (cacheTopology != null) {
                handleConsistentHashUpdate(str, cacheTopology, viewId);
                return cacheTopology;
            }
            continue;
        }
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public void leave(String str) {
        log.debugf("Node %s leaving cache %s", this.transport.getAddress(), str);
        LocalCacheStatus remove = this.runningCaches.remove(str);
        try {
            executeOnCoordinator(new CacheTopologyControlCommand(str, CacheTopologyControlCommand.Type.LEAVE, this.transport.getAddress(), this.transport.getViewId()), remove.getJoinInfo().getTimeout());
        } catch (Exception e) {
            log.debugf(e, "Error sending the leave request for cache %s to coordinator", str);
        }
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public void confirmRebalance(String str, int i, Throwable th) {
        try {
            executeOnCoordinatorAsync(new CacheTopologyControlCommand(str, CacheTopologyControlCommand.Type.REBALANCE_CONFIRM, this.transport.getAddress(), i, th, this.transport.getViewId()));
        } catch (Exception e) {
            log.debugf(e, "Error sending the rebalance completed notification for cache %s to the coordinator", str);
        }
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public Map<String, Object[]> handleStatusRequest(int i) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, LocalCacheStatus> entry : this.runningCaches.entrySet()) {
            LocalCacheStatus localCacheStatus = this.runningCaches.get(entry.getKey());
            hashMap.put(entry.getKey(), new Object[]{localCacheStatus.getJoinInfo(), localCacheStatus.getTopology()});
        }
        return hashMap;
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public void handleConsistentHashUpdate(String str, CacheTopology cacheTopology, int i) throws InterruptedException {
        if (!this.running) {
            log.debugf("Ignoring consistent hash update %s for cache %s, the local cache manager is not running", Integer.valueOf(cacheTopology.getTopologyId()), str);
            return;
        }
        waitForView(i);
        LocalCacheStatus localCacheStatus = this.runningCaches.get(str);
        if (localCacheStatus == null) {
            log.tracef("Ignoring consistent hash update %s for cache %s that doesn't exist locally", Integer.valueOf(cacheTopology.getTopologyId()), str);
            return;
        }
        synchronized (localCacheStatus) {
            CacheTopology topology = localCacheStatus.getTopology();
            if (topology != null && cacheTopology.getTopologyId() < topology.getTopologyId()) {
                log.tracef("Ignoring consistent hash update %s for cache %s, we have already received a newer topology %s", Integer.valueOf(cacheTopology.getTopologyId()), str, Integer.valueOf(topology.getTopologyId()));
                return;
            }
            log.debugf("Updating local consistent hash(es) for cache %s: new topology = %s", str, cacheTopology);
            localCacheStatus.setTopology(cacheTopology);
            ConsistentHash consistentHash = null;
            if (cacheTopology.getPendingCH() != null) {
                consistentHash = localCacheStatus.getJoinInfo().getConsistentHashFactory().union(cacheTopology.getCurrentCH(), cacheTopology.getPendingCH());
            }
            CacheTopologyHandler handler = localCacheStatus.getHandler();
            CacheTopology cacheTopology2 = new CacheTopology(cacheTopology.getTopologyId(), cacheTopology.getCurrentCH(), consistentHash);
            cacheTopology2.logRoutingTableInformation();
            if ((topology == null || topology.getPendingCH() == null) && consistentHash != null) {
                log.tracef("This topology update has a pending CH, starting the rebalance now", new Object[0]);
                handler.rebalance(cacheTopology2);
            } else {
                handler.updateConsistentHash(cacheTopology2);
            }
        }
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public void handleRebalance(String str, CacheTopology cacheTopology, int i) throws InterruptedException {
        if (!this.running) {
            log.debugf("Ignoring rebalance request %s for cache %s, the local cache manager is not running", Integer.valueOf(cacheTopology.getTopologyId()), str);
            return;
        }
        waitForView(i);
        LocalCacheStatus localCacheStatus = this.runningCaches.get(str);
        if (localCacheStatus == null) {
            log.tracef("Ignoring rebalance %s for cache %s that doesn't exist locally", Integer.valueOf(cacheTopology.getTopologyId()), str);
            return;
        }
        synchronized (localCacheStatus) {
            CacheTopology topology = localCacheStatus.getTopology();
            if (topology != null && cacheTopology.getTopologyId() < topology.getTopologyId()) {
                log.debugf("Ignoring old rebalance for cache %s: %s", str, Integer.valueOf(cacheTopology.getTopologyId()));
                return;
            }
            log.debugf("Starting local rebalance for cache %s, topology = %s", str, cacheTopology);
            cacheTopology.logRoutingTableInformation();
            localCacheStatus.setTopology(cacheTopology);
            localCacheStatus.getHandler().rebalance(new CacheTopology(cacheTopology.getTopologyId(), cacheTopology.getCurrentCH(), localCacheStatus.getJoinInfo().getConsistentHashFactory().union(cacheTopology.getCurrentCH(), cacheTopology.getPendingCH())));
        }
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public CacheTopology getCacheTopology(String str) {
        return this.runningCaches.get(str).getTopology();
    }

    private void waitForView(int i) throws InterruptedException {
        if (this.transport.getViewId() < i) {
            log.tracef("Received a cache topology command with a higher view id: %s, our view id is %s", Integer.valueOf(i), Integer.valueOf(this.transport.getViewId()));
        }
        while (this.transport.getViewId() < i) {
            Thread.sleep(100L);
        }
    }

    private Object executeOnCoordinator(ReplicableCommand replicableCommand, long j) throws Exception {
        Response response;
        if (this.transport.isCoordinator()) {
            try {
                this.gcr.wireDependencies(replicableCommand);
                response = (Response) replicableCommand.perform(null);
            } catch (Throwable th) {
                throw new CacheException("Error handling join request", th);
            }
        } else {
            Address coordinator = this.transport.getCoordinator();
            response = this.transport.invokeRemotely(Collections.singleton(coordinator), replicableCommand, ResponseMode.SYNCHRONOUS, j, true, null).get(coordinator);
        }
        if (response != null && response.isSuccessful()) {
            return ((SuccessfulResponse) response).getResponseValue();
        }
        throw new CacheException("Bad response received from coordinator: " + response, response instanceof ExceptionResponse ? ((ExceptionResponse) response).getException() : null);
    }

    private void executeOnCoordinatorAsync(final ReplicableCommand replicableCommand) throws Exception {
        if (this.transport.isCoordinator()) {
            this.asyncTransportExecutor.submit(new Callable<Object>() { // from class: org.infinispan.topology.LocalTopologyManagerImpl.1
                @Override // java.util.concurrent.Callable
                public Object call() throws Exception {
                    LocalTopologyManagerImpl.this.gcr.wireDependencies(replicableCommand);
                    try {
                        return replicableCommand.perform(null);
                    } catch (Throwable th) {
                        LocalTopologyManagerImpl.log.errorf(th, "Failed to execute ReplicableCommand %s on coordinator async: %s", replicableCommand, th.getMessage());
                        throw new Exception(th);
                    }
                }
            });
        } else {
            this.transport.invokeRemotely(Collections.singleton(this.transport.getCoordinator()), replicableCommand, ResponseMode.ASYNCHRONOUS_WITH_SYNC_MARSHALLING, 0L, true, null);
        }
    }
}
