package org.infinispan.topology;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import net.jcip.annotations.GuardedBy;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.topology.CacheAvailabilityUpdateCommand;
import org.infinispan.commands.topology.CacheJoinCommand;
import org.infinispan.commands.topology.CacheLeaveCommand;
import org.infinispan.commands.topology.CacheShutdownRequestCommand;
import org.infinispan.commands.topology.RebalancePhaseConfirmCommand;
import org.infinispan.commands.topology.RebalancePolicyUpdateCommand;
import org.infinispan.commands.topology.RebalanceStatusRequestCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.IllegalLifecycleStateException;
import org.infinispan.commons.time.TimeService;
import org.infinispan.commons.util.Version;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.globalstate.GlobalStateManager;
import org.infinispan.globalstate.GlobalStateProvider;
import org.infinispan.globalstate.ScopedPersistentState;
import org.infinispan.globalstate.impl.GlobalStateManagerImpl;
import org.infinispan.globalstate.impl.ScopedPersistentStateImpl;
import org.infinispan.jmx.annotations.DataType;
import org.infinispan.jmx.annotations.MBean;
import org.infinispan.jmx.annotations.ManagedAttribute;
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.infinispan.partitionhandling.AvailabilityMode;
import org.infinispan.partitionhandling.impl.PartitionHandlingManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.impl.VoidResponseCollector;
import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.topology.CacheTopology;
import org.infinispan.util.concurrent.ActionSequencer;
import org.infinispan.util.concurrent.BlockingManager;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.concurrent.TimeoutException;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.util.logging.events.EventLogCategory;
import org.infinispan.util.logging.events.EventLogManager;
import org.infinispan.util.logging.events.EventLogger;
import org.infinispan.util.logging.events.Messages;
import org.infinispan.xsite.commands.XSiteStateTransferRestartSendingCommand;
import org.infinispan.xsite.commands.XSiteStatusCommand;

@Scope(Scopes.GLOBAL)
@MBean(objectName = "LocalTopologyManager", description = "Controls the cache membership and state transfer")
/* loaded from: input_file:org/infinispan/topology/LocalTopologyManagerImpl.class */
public class LocalTopologyManagerImpl implements LocalTopologyManager, GlobalStateProvider {
    private static final Log log = LogFactory.getLog(LocalTopologyManagerImpl.class);

    @Inject
    Transport transport;

    @ComponentName(KnownComponentNames.NON_BLOCKING_EXECUTOR)
    @Inject
    ExecutorService nonBlockingExecutor;

    @Inject
    BlockingManager blockingManager;

    @ComponentName(KnownComponentNames.TIMEOUT_SCHEDULE_EXECUTOR)
    @Inject
    ScheduledExecutorService timeoutExecutor;

    @Inject
    GlobalComponentRegistry gcr;

    @Inject
    TimeService timeService;

    @Inject
    GlobalStateManager globalStateManager;

    @Inject
    PersistentUUIDManager persistentUUIDManager;

    @Inject
    EventLogManager eventLogManager;

    @Inject
    CacheManagerNotifier cacheManagerNotifier;

    @Inject
    ClusterTopologyManager clusterTopologyManager;
    private TopologyManagementHelper helper;
    private ActionSequencer actionSequencer;
    private EventLogger eventLogger;
    private final Map<String, LocalCacheStatus> runningCaches = Collections.synchronizedMap(new HashMap());
    private volatile boolean running;

    @GuardedBy("runningCaches")
    private int latestStatusResponseViewId;
    private PersistentUUID persistentUUID;
    private EventLoggerViewListener viewListener;

    @Start(priority = 0)
    public void preStart() {
        this.helper = new TopologyManagementHelper(this.gcr);
        this.actionSequencer = new ActionSequencer(this.nonBlockingExecutor, true, this.timeService);
        if (this.globalStateManager != null) {
            this.globalStateManager.registerStateProvider(this);
        }
    }

    @Start(priority = XSiteStatusCommand.COMMAND_ID)
    public void start() {
        if (log.isTraceEnabled()) {
            log.tracef("Starting LocalTopologyManager on %s", this.transport.getAddress());
        }
        if (this.persistentUUID == null) {
            this.persistentUUID = PersistentUUID.randomUUID();
            this.globalStateManager.writeGlobalState();
        }
        this.persistentUUIDManager.addPersistentAddressMapping(this.transport.getAddress(), this.persistentUUID);
        this.eventLogger = this.eventLogManager.getEventLogger().scope(this.transport.getAddress()).context(getClass().getName());
        this.viewListener = new EventLoggerViewListener(this.eventLogManager);
        this.cacheManagerNotifier.addListener(this.viewListener);
        synchronized (this.runningCaches) {
            this.latestStatusResponseViewId = this.transport.getViewId();
        }
        this.running = true;
    }

    @Stop(priority = XSiteStateTransferRestartSendingCommand.COMMAND_ID)
    public void stop() {
        if (log.isTraceEnabled()) {
            log.tracef("Stopping LocalTopologyManager on %s", this.transport.getAddress());
        }
        this.cacheManagerNotifier.removeListener(this.viewListener);
        this.running = false;
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public CompletionStage<CacheTopology> join(String str, CacheJoinInfo cacheJoinInfo, CacheTopologyHandler cacheTopologyHandler, PartitionHandlingManager partitionHandlingManager) {
        return orderOnCache(str, () -> {
            log.debugf("Node %s joining cache %s", this.transport.getAddress(), str);
            LocalCacheStatus localCacheStatus = new LocalCacheStatus(cacheJoinInfo, cacheTopologyHandler, partitionHandlingManager);
            if (this.runningCaches.put(str, localCacheStatus) != null) {
                throw new IllegalStateException("A cache can only join once");
            }
            long timeout = cacheJoinInfo.getTimeout();
            return sendJoinRequest(str, cacheJoinInfo, timeout, this.timeService.expectedEndTime(timeout, TimeUnit.MILLISECONDS)).thenCompose(cacheStatusResponse -> {
                return handleJoinResponse(str, localCacheStatus, cacheStatusResponse);
            });
        });
    }

    public CompletionStage<CacheStatusResponse> sendJoinRequest(String str, CacheJoinInfo cacheJoinInfo, long j, long j2) {
        int viewId = this.transport.getViewId();
        return CompletionStages.handleAndCompose(this.helper.executeOnCoordinator(this.transport, new CacheJoinCommand(str, this.transport.getAddress(), cacheJoinInfo, viewId), j), (obj, th) -> {
            int viewId2 = this.transport.getViewId();
            if (viewId != viewId2) {
                log.tracef("Received new view %d before join response for cache %s, retrying", viewId2, str);
                return sendJoinRequest(str, cacheJoinInfo, j, j2);
            }
            if (th == null) {
                if (obj != null) {
                    return CompletableFuture.completedFuture((CacheStatusResponse) obj);
                }
                log.debugf("Coordinator sent a null join response, retrying in view %d", viewId + 1);
                return retryJoinInView(str, cacheJoinInfo, j, j2, viewId + 1);
            }
            CacheException extractException = CompletableFutures.extractException(th);
            if (extractException instanceof SuspectException) {
                log.debugf("Join request received CacheNotFoundResponse for cache %s, retrying", str);
                return CompletionStages.scheduleNonBlocking(() -> {
                    return sendJoinRequest(str, cacheJoinInfo, j, j2);
                }, this.timeoutExecutor, 100L, TimeUnit.MILLISECONDS);
            }
            log.debugf(extractException, "Join request failed for cache %s", str);
            if (extractException instanceof TimeoutException) {
                throw ((TimeoutException) extractException);
            }
            throw ((CacheJoinException) extractException.getCause());
        });
    }

    private CompletionStage<CacheStatusResponse> retryJoinInView(String str, CacheJoinInfo cacheJoinInfo, long j, long j2, int i) {
        return withView(i, j, TimeUnit.MILLISECONDS).thenCompose(r15 -> {
            return sendJoinRequest(str, cacheJoinInfo, j, j2);
        });
    }

    public CompletionStage<CacheTopology> handleJoinResponse(String str, LocalCacheStatus localCacheStatus, CacheStatusResponse cacheStatusResponse) {
        int viewId = this.transport.getViewId();
        return doHandleTopologyUpdate(str, cacheStatusResponse.getCacheTopology(), cacheStatusResponse.getAvailabilityMode(), viewId, this.transport.getCoordinator(), localCacheStatus).thenCompose(bool -> {
            if (!bool.booleanValue()) {
                throw new IllegalStateException("We already had a newer topology by the time we received the join response");
            }
            if (localCacheStatus.getJoinInfo().getPersistentUUID() != null) {
                deleteCHState(str);
            }
            return doHandleStableTopologyUpdate(str, cacheStatusResponse.getStableTopology(), viewId, this.transport.getCoordinator(), localCacheStatus);
        }).thenApply(r3 -> {
            return cacheStatusResponse.getCacheTopology();
        });
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public void leave(String str, long j) {
        log.debugf("Node %s leaving cache %s", this.transport.getAddress(), str);
        this.runningCaches.remove(str);
        try {
            CompletionStages.join(this.helper.executeOnCoordinator(this.transport, new CacheLeaveCommand(str, this.transport.getAddress(), this.transport.getViewId()), j));
        } catch (Exception e) {
            log.debugf(e, "Error sending the leave request for cache %s to coordinator", str);
        }
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public void confirmRebalancePhase(String str, int i, int i2, Throwable th) {
        try {
            this.helper.executeOnCoordinatorAsync(this.transport, new RebalancePhaseConfirmCommand(str, this.transport.getAddress(), th, i, this.transport.getViewId()));
        } catch (Exception e) {
            log.debugf(e, "Error sending the rebalance completed notification for cache %s to the coordinator", str);
        }
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public CompletionStage<ManagerStatusResponse> handleStatusRequest(int i) {
        return withView(i, getGlobalTimeout(), TimeUnit.MILLISECONDS).thenApply(r11 -> {
            HashMap hashMap = new HashMap();
            synchronized (this.runningCaches) {
                this.latestStatusResponseViewId = i;
                for (Map.Entry<String, LocalCacheStatus> entry : this.runningCaches.entrySet()) {
                    LocalCacheStatus localCacheStatus = this.runningCaches.get(entry.getKey());
                    if (localCacheStatus.getCurrentTopology() != null) {
                        hashMap.put(entry.getKey(), new CacheStatusResponse(localCacheStatus.getJoinInfo(), localCacheStatus.getCurrentTopology(), localCacheStatus.getStableTopology(), localCacheStatus.getPartitionHandlingManager().getAvailabilityMode()));
                    }
                }
            }
            log.debugf("Sending cluster status response for view %d", i);
            return new ManagerStatusResponse(hashMap, this.gcr.getClusterTopologyManager().isRebalancingEnabled());
        });
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public CompletionStage<Void> handleTopologyUpdate(String str, CacheTopology cacheTopology, AvailabilityMode availabilityMode, int i, Address address) {
        if (!this.running) {
            log.tracef("Ignoring consistent hash update %s for cache %s, the local cache manager is not running", cacheTopology.getTopologyId(), str);
            return CompletableFutures.completedNull();
        }
        LocalCacheStatus localCacheStatus = this.runningCaches.get(str);
        if (localCacheStatus != null) {
            return withView(i, localCacheStatus.getJoinInfo().getTimeout(), TimeUnit.MILLISECONDS).thenCompose(r17 -> {
                return orderOnCache(str, () -> {
                    return doHandleTopologyUpdate(str, cacheTopology, availabilityMode, i, address, localCacheStatus);
                });
            }).handle((bool, th) -> {
                if (th == null || (th instanceof IllegalLifecycleStateException)) {
                    return null;
                }
                log.topologyUpdateError(str, th);
                return null;
            });
        }
        log.tracef("Ignoring consistent hash update %s for cache %s that doesn't exist locally", cacheTopology.getTopologyId(), str);
        return CompletableFutures.completedNull();
    }

    private CompletionStage<Boolean> doHandleTopologyUpdate(String str, CacheTopology cacheTopology, AvailabilityMode availabilityMode, int i, Address address, LocalCacheStatus localCacheStatus) {
        ConsistentHash consistentHash;
        synchronized (localCacheStatus) {
            if (cacheTopology == null) {
                return CompletableFutures.completedTrue();
            }
            registerPersistentUUID(cacheTopology);
            CacheTopology currentTopology = localCacheStatus.getCurrentTopology();
            if (currentTopology != null && cacheTopology.getTopologyId() <= currentTopology.getTopologyId()) {
                log.debugf("Ignoring late consistent hash update for cache %s, current topology is %s: %s", str, Integer.valueOf(currentTopology.getTopologyId()), cacheTopology);
                return CompletableFutures.completedFalse();
            }
            if (!updateCacheTopology(str, cacheTopology, i, address, localCacheStatus)) {
                return CompletableFutures.completedFalse();
            }
            CacheTopologyHandler handler = localCacheStatus.getHandler();
            ConsistentHash currentCH = cacheTopology.getCurrentCH();
            ConsistentHash pendingCH = cacheTopology.getPendingCH();
            if (pendingCH != null) {
                ConsistentHashFactory consistentHashFactory = localCacheStatus.getJoinInfo().getConsistentHashFactory();
                switch (cacheTopology.getPhase()) {
                    case READ_NEW_WRITE_ALL:
                        consistentHash = consistentHashFactory.union(pendingCH, currentCH);
                        break;
                    default:
                        consistentHash = consistentHashFactory.union(currentCH, pendingCH);
                        break;
                }
            } else {
                consistentHash = null;
            }
            CacheTopology cacheTopology2 = new CacheTopology(cacheTopology.getTopologyId(), cacheTopology.getRebalanceId(), currentCH, pendingCH, consistentHash, cacheTopology.getPhase(), cacheTopology.getActualMembers(), this.persistentUUIDManager.mapAddresses(cacheTopology.getActualMembers()));
            boolean z = availabilityMode != AvailabilityMode.AVAILABLE;
            ConsistentHash consistentHash2 = consistentHash;
            CompletionStage thenCompose = resetLocalTopologyBeforeRebalance(str, cacheTopology, currentTopology, handler).thenCompose(r8 -> {
                cacheTopology2.logRoutingTableInformation(str);
                return (!z || availabilityMode == null) ? CompletableFutures.completedNull() : localCacheStatus.getPartitionHandlingManager().setAvailabilityMode(availabilityMode);
            }).thenCompose(r9 -> {
                if ((cacheTopology.getPhase() == CacheTopology.Phase.CONFLICT_RESOLUTION) || consistentHash2 == null || (currentTopology != null && currentTopology.getRebalanceId() == cacheTopology.getRebalanceId())) {
                    return handler.updateConsistentHash(cacheTopology2);
                }
                log.tracef("This topology update has a pending CH, starting the rebalance now", new Object[0]);
                return handler.rebalance(cacheTopology2);
            });
            if (!z) {
                thenCompose = thenCompose.thenCompose(r5 -> {
                    return localCacheStatus.getPartitionHandlingManager().setAvailabilityMode(availabilityMode);
                });
            }
            return thenCompose.thenApply(r2 -> {
                return true;
            });
        }
    }

    private void registerPersistentUUID(CacheTopology cacheTopology) {
        int size = cacheTopology.getActualMembers().size();
        for (int i = 0; i < size; i++) {
            this.persistentUUIDManager.addPersistentAddressMapping(cacheTopology.getActualMembers().get(i), cacheTopology.getMembersPersistentUUIDs().get(i));
        }
    }

    private boolean updateCacheTopology(String str, CacheTopology cacheTopology, int i, Address address, LocalCacheStatus localCacheStatus) {
        synchronized (this.runningCaches) {
            if (!validateCommandViewId(cacheTopology, i, address, str)) {
                return false;
            }
            log.debugf("Updating local topology for cache %s: %s", str, cacheTopology);
            localCacheStatus.setCurrentTopology(cacheTopology);
            return true;
        }
    }

    @GuardedBy("runningCaches")
    private boolean validateCommandViewId(CacheTopology cacheTopology, int i, Address address, String str) {
        if (!address.equals(this.transport.getCoordinator())) {
            log.debugf("Ignoring topology %d for cache %s from old coordinator %s", cacheTopology.getTopologyId(), str, address);
            return false;
        }
        if (i >= this.latestStatusResponseViewId) {
            return true;
        }
        log.debugf("Ignoring topology %d for cache %s from view %d received after status request from view %d", new Object[]{Integer.valueOf(cacheTopology.getTopologyId()), str, Integer.valueOf(i), Integer.valueOf(this.latestStatusResponseViewId)});
        return false;
    }

    private CompletionStage<Void> resetLocalTopologyBeforeRebalance(String str, CacheTopology cacheTopology, CacheTopology cacheTopology2, CacheTopologyHandler cacheTopologyHandler) {
        if ((cacheTopology.getPhase() == CacheTopology.Phase.NO_REBALANCE || cacheTopology.getPhase() == CacheTopology.Phase.CONFLICT_RESOLUTION) ? false : true) {
            if (cacheTopology2 != null && cacheTopology.getTopologyId() > cacheTopology2.getTopologyId() + 1) {
                if (cacheTopology.getRebalanceId() != cacheTopology2.getRebalanceId()) {
                    registerPersistentUUID(cacheTopology);
                    CacheTopology cacheTopology3 = new CacheTopology(cacheTopology.getTopologyId() - 1, cacheTopology.getRebalanceId() - 1, cacheTopology.getCurrentCH(), null, CacheTopology.Phase.NO_REBALANCE, cacheTopology.getActualMembers(), this.persistentUUIDManager.mapAddresses(cacheTopology.getActualMembers()));
                    log.debugf("Installing fake cache topology %s for cache %s", cacheTopology3, str);
                    return cacheTopologyHandler.updateConsistentHash(cacheTopology3);
                }
            }
            return CompletableFutures.completedNull();
        }
        return CompletableFutures.completedNull();
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public CompletionStage<Void> handleStableTopologyUpdate(String str, CacheTopology cacheTopology, Address address, int i) {
        LocalCacheStatus localCacheStatus = this.runningCaches.get(str);
        return localCacheStatus != null ? orderOnCache(str, () -> {
            return doHandleStableTopologyUpdate(str, cacheTopology, i, address, localCacheStatus);
        }) : CompletableFutures.completedNull();
    }

    private CompletionStage<Void> doHandleStableTopologyUpdate(String str, CacheTopology cacheTopology, int i, Address address, LocalCacheStatus localCacheStatus) {
        synchronized (this.runningCaches) {
            if (!validateCommandViewId(cacheTopology, i, address, str)) {
                return CompletableFutures.completedNull();
            }
            CacheTopology stableTopology = localCacheStatus.getStableTopology();
            if (stableTopology == null || stableTopology.getTopologyId() < cacheTopology.getTopologyId()) {
                log.tracef("Updating stable topology for cache %s: %s", str, cacheTopology);
                localCacheStatus.setStableTopology(cacheTopology);
            }
            return CompletableFutures.completedNull();
        }
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public CompletionStage<Void> handleRebalance(String str, CacheTopology cacheTopology, int i, Address address) {
        if (!this.running) {
            log.debugf("Ignoring rebalance request %s for cache %s, the local cache manager is not running", cacheTopology.getTopologyId(), str);
            return CompletableFutures.completedNull();
        }
        LocalCacheStatus localCacheStatus = this.runningCaches.get(str);
        if (localCacheStatus == null) {
            log.tracef("Ignoring rebalance %s for cache %s that doesn't exist locally", cacheTopology.getTopologyId(), str);
            return CompletableFutures.completedNull();
        }
        this.eventLogger.info(EventLogCategory.LIFECYCLE, Messages.MESSAGES.cacheRebalanceStart(cacheTopology.getMembers(), cacheTopology.getPhase(), cacheTopology.getTopologyId()));
        return withView(i, localCacheStatus.getJoinInfo().getTimeout(), TimeUnit.MILLISECONDS).thenCompose(r15 -> {
            return orderOnCache(str, () -> {
                return doHandleRebalance(i, localCacheStatus, cacheTopology, str, address);
            });
        }).handle((r10, th) -> {
            List<Address> members = cacheTopology.getMembers();
            int topologyId = cacheTopology.getTopologyId();
            if (th == null) {
                this.eventLogger.info(EventLogCategory.LIFECYCLE, Messages.MESSAGES.rebalanceFinished(members, topologyId));
                return null;
            }
            Throwable extractException = CompletableFutures.extractException(th);
            if (extractException instanceof IllegalLifecycleStateException) {
                return null;
            }
            log.rebalanceStartError(str, th);
            this.eventLogger.error(EventLogCategory.LIFECYCLE, Messages.MESSAGES.rebalanceFinishedWithFailure(members, topologyId, extractException));
            return null;
        });
    }

    private CompletionStage<Void> doHandleRebalance(int i, LocalCacheStatus localCacheStatus, CacheTopology cacheTopology, String str, Address address) {
        synchronized (localCacheStatus) {
            CacheTopology currentTopology = localCacheStatus.getCurrentTopology();
            if (currentTopology != null && cacheTopology.getTopologyId() <= currentTopology.getTopologyId()) {
                log.debugf("Ignoring old rebalance for cache %s, current topology is %s: %s", str, Integer.valueOf(currentTopology.getTopologyId()), cacheTopology);
                return CompletableFutures.completedNull();
            }
            if (!updateCacheTopology(str, cacheTopology, i, address, localCacheStatus)) {
                return CompletableFutures.completedNull();
            }
            CacheTopologyHandler handler = localCacheStatus.getHandler();
            CacheTopology cacheTopology2 = new CacheTopology(cacheTopology.getTopologyId(), cacheTopology.getRebalanceId(), cacheTopology.getCurrentCH(), cacheTopology.getPendingCH(), localCacheStatus.getJoinInfo().getConsistentHashFactory().union(cacheTopology.getCurrentCH(), cacheTopology.getPendingCH()), cacheTopology.getPhase(), cacheTopology.getActualMembers(), cacheTopology.getMembersPersistentUUIDs());
            return resetLocalTopologyBeforeRebalance(str, cacheTopology, currentTopology, handler).thenCompose(r9 -> {
                log.debugf("Starting local rebalance for cache %s, topology = %s", str, cacheTopology);
                cacheTopology.logRoutingTableInformation(str);
                return handler.rebalance(cacheTopology2);
            });
        }
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public CacheTopology getCacheTopology(String str) {
        LocalCacheStatus localCacheStatus = this.runningCaches.get(str);
        if (localCacheStatus != null) {
            return localCacheStatus.getCurrentTopology();
        }
        return null;
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public CacheTopology getStableCacheTopology(String str) {
        LocalCacheStatus localCacheStatus = this.runningCaches.get(str);
        if (localCacheStatus != null) {
            return localCacheStatus.getStableTopology();
        }
        return null;
    }

    private CompletionStage<Void> withView(int i, long j, TimeUnit timeUnit) {
        CompletableFuture<Void> withView = this.transport.withView(i);
        ScheduledFuture schedule = this.timeoutExecutor.schedule(() -> {
            return Boolean.valueOf(withView.completeExceptionally(Log.CLUSTER.timeoutWaitingForView(i, this.transport.getViewId())));
        }, j, timeUnit);
        withView.whenComplete((r4, th) -> {
            schedule.cancel(false);
        });
        return withView;
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    @ManagedAttribute(description = "Rebalancing enabled", displayName = "Rebalancing enabled", dataType = DataType.TRAIT, writable = true)
    public boolean isRebalancingEnabled() {
        return isCacheRebalancingEnabled(null);
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public void setRebalancingEnabled(boolean z) {
        setCacheRebalancingEnabled(null, z);
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public boolean isCacheRebalancingEnabled(String str) {
        return ((RebalancingStatus) CompletionStages.join(executeOnCoordinatorRetry(new RebalanceStatusRequestCommand(str), this.transport.getViewId(), this.timeService.expectedEndTime((long) getGlobalTimeout(), TimeUnit.MILLISECONDS)))) != RebalancingStatus.SUSPENDED;
    }

    public CompletionStage<Object> executeOnCoordinatorRetry(ReplicableCommand replicableCommand, int i, long j) {
        return CompletionStages.handleAndCompose(this.helper.executeOnCoordinator(this.transport, replicableCommand, this.timeService.remainingTime(j, TimeUnit.MILLISECONDS)), (obj, th) -> {
            if (th == null) {
                return CompletableFuture.completedFuture(obj);
            }
            Throwable extractException = CompletableFutures.extractException(th);
            if (!(extractException instanceof SuspectException)) {
                return CompletableFutures.completedExceptionFuture(extractException);
            }
            if (log.isTraceEnabled()) {
                log.tracef("Coordinator left the cluster while querying rebalancing status, retrying", new Object[0]);
            }
            return executeOnCoordinatorRetry(replicableCommand, Math.max(i + 1, this.transport.getViewId()), j);
        });
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public void setCacheRebalancingEnabled(String str, boolean z) {
        CompletionStages.join(this.helper.executeOnClusterSync(this.transport, new RebalancePolicyUpdateCommand(str, z), getGlobalTimeout(), VoidResponseCollector.ignoreLeavers()));
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public RebalancingStatus getRebalancingStatus(String str) {
        return (RebalancingStatus) CompletionStages.join(executeOnCoordinatorRetry(new RebalanceStatusRequestCommand(str), this.transport.getViewId(), this.timeService.expectedEndTime(getGlobalTimeout(), TimeUnit.MILLISECONDS)));
    }

    @ManagedAttribute(description = "Cluster availability", displayName = "Cluster availability", dataType = DataType.TRAIT, writable = false)
    public String getClusterAvailability() {
        AvailabilityMode availabilityMode = AvailabilityMode.AVAILABLE;
        synchronized (this.runningCaches) {
            Iterator<LocalCacheStatus> it = this.runningCaches.values().iterator();
            while (it.hasNext()) {
                availabilityMode = availabilityMode.min(it.next().getPartitionHandlingManager().getAvailabilityMode());
            }
        }
        return availabilityMode.toString();
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public AvailabilityMode getCacheAvailability(String str) {
        return this.runningCaches.get(str).getPartitionHandlingManager().getAvailabilityMode();
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public void setCacheAvailability(String str, AvailabilityMode availabilityMode) {
        CompletionStages.join(this.helper.executeOnCoordinator(this.transport, new CacheAvailabilityUpdateCommand(str, availabilityMode), getGlobalTimeout()));
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public void cacheShutdown(String str) {
        CompletionStages.join(this.helper.executeOnCoordinator(this.transport, new CacheShutdownRequestCommand(str), getGlobalTimeout()));
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public CompletionStage<Void> handleCacheShutdown(String str) {
        writeCHState(str);
        return CompletableFutures.completedNull();
    }

    private void writeCHState(String str) {
        ScopedPersistentStateImpl scopedPersistentStateImpl = new ScopedPersistentStateImpl(str);
        scopedPersistentStateImpl.setProperty(GlobalStateManagerImpl.VERSION, Version.getVersion());
        scopedPersistentStateImpl.setProperty(GlobalStateManagerImpl.TIMESTAMP, this.timeService.instant().toString());
        scopedPersistentStateImpl.setProperty(GlobalStateManagerImpl.VERSION_MAJOR, Version.getMajor());
        ConsistentHash remapAddresses = this.runningCaches.get(str).getCurrentTopology().getCurrentCH().remapAddresses(this.persistentUUIDManager.addressToPersistentUUID());
        remapAddresses.toScopedState(scopedPersistentStateImpl);
        this.globalStateManager.writeScopedState(scopedPersistentStateImpl);
        if (log.isTraceEnabled()) {
            log.tracef("Written CH state for cache %s, checksum=%s: %s", str, Integer.valueOf(scopedPersistentStateImpl.getChecksum()), remapAddresses);
        }
    }

    private void deleteCHState(String str) {
        this.globalStateManager.deleteScopedState(str);
        if (log.isTraceEnabled()) {
            log.tracef("Removed CH state for cache %s", str);
        }
    }

    private int getGlobalTimeout() {
        return (int) this.gcr.getGlobalConfiguration().transport().distributedSyncTimeout();
    }

    @Override // org.infinispan.globalstate.GlobalStateProvider
    public void prepareForPersist(ScopedPersistentState scopedPersistentState) {
        if (this.persistentUUID != null) {
            scopedPersistentState.setProperty("uuid", this.persistentUUID.toString());
        }
    }

    @Override // org.infinispan.globalstate.GlobalStateProvider
    public void prepareForRestore(ScopedPersistentState scopedPersistentState) {
        if (!scopedPersistentState.containsProperty("uuid")) {
            throw Log.CONFIG.invalidPersistentState(ScopedPersistentState.GLOBAL_SCOPE);
        }
        this.persistentUUID = PersistentUUID.fromString(scopedPersistentState.getProperty("uuid"));
    }

    @Override // org.infinispan.topology.LocalTopologyManager
    public PersistentUUID getPersistentUUID() {
        return this.persistentUUID;
    }

    private <T> CompletionStage<T> orderOnCache(String str, Callable<CompletionStage<T>> callable) {
        return this.actionSequencer.orderOnKey(str, () -> {
            log.tracef("Acquired cache status %s", str);
            return ((CompletionStage) callable.call()).whenComplete((obj, th) -> {
                log.tracef("Released cache status %s", str);
            });
        });
    }
}
