/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.topology;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.CollectionFactory;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.infinispan.notifications.cachemanagerlistener.annotation.Merged;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.partionhandling.AvailabilityMode;
import org.infinispan.partionhandling.impl.AvailabilityStrategy;
import org.infinispan.partionhandling.impl.PreferAvailabilityStrategy;
import org.infinispan.partionhandling.impl.PreferConsistencyStrategy;
import org.infinispan.remoting.responses.ExceptionResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.topology.CacheJoinInfo;
import org.infinispan.topology.CacheStatusResponse;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.CacheTopologyControlCommand;
import org.infinispan.topology.ClusterCacheStatus;
import org.infinispan.topology.ClusterTopologyManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class ClusterTopologyManagerImpl
implements ClusterTopologyManager {
    private static final Log log = LogFactory.getLog(ClusterTopologyManagerImpl.class);
    private Transport transport;
    private GlobalConfiguration globalConfiguration;
    private GlobalComponentRegistry gcr;
    private CacheManagerNotifier cacheManagerNotifier;
    private EmbeddedCacheManager cacheManager;
    private ExecutorService asyncTransportExecutor;
    private volatile boolean isCoordinator;
    private volatile boolean isShuttingDown;
    private volatile int viewId = -1;
    private final Object viewUpdateLock = new Object();
    private final Object viewHandlingLock = new Object();
    private final ConcurrentMap<String, ClusterCacheStatus> cacheStatusMap = CollectionFactory.makeConcurrentMap();
    private ClusterViewListener viewListener;
    private volatile boolean isRebalancingEnabled = true;

    @Inject
    public void inject(Transport transport, @ComponentName(value="org.infinispan.executors.transport") ExecutorService asyncTransportExecutor, GlobalConfiguration globalConfiguration, GlobalComponentRegistry gcr, CacheManagerNotifier cacheManagerNotifier, EmbeddedCacheManager cacheManager) {
        this.transport = transport;
        this.asyncTransportExecutor = asyncTransportExecutor;
        this.globalConfiguration = globalConfiguration;
        this.gcr = gcr;
        this.cacheManagerNotifier = cacheManagerNotifier;
        this.cacheManager = cacheManager;
    }

    @Start(priority=100)
    public void start() {
        this.isShuttingDown = false;
        this.isCoordinator = this.transport.isCoordinator();
        this.viewListener = new ClusterViewListener();
        this.cacheManagerNotifier.addListener(this.viewListener);
        this.handleClusterView(false, this.transport.getViewId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Stop(priority=100)
    public void stop() {
        this.isShuttingDown = true;
        this.cacheManagerNotifier.removeListener(this.viewListener);
        Object object = this.viewUpdateLock;
        synchronized (object) {
            this.viewId = Integer.MAX_VALUE;
            this.viewUpdateLock.notifyAll();
        }
    }

    @Override
    public CacheStatusResponse handleJoin(String cacheName, Address joiner, CacheJoinInfo joinInfo, int viewId) throws Exception {
        this.waitForView(viewId);
        if (this.isShuttingDown) {
            log.debugf("Ignoring join request from %s for cache %s, the local cache manager is shutting down", (Object)joiner, (Object)cacheName);
            return null;
        }
        ClusterCacheStatus cacheStatus = this.initCacheStatusIfAbsent(cacheName);
        return cacheStatus.doJoin(joiner, joinInfo);
    }

    @Override
    public void handleLeave(String cacheName, Address leaver, int viewId) throws Exception {
        if (this.isShuttingDown) {
            log.debugf("Ignoring leave request from %s for cache %s, the local cache manager is shutting down", (Object)leaver, (Object)cacheName);
            return;
        }
        ClusterCacheStatus cacheStatus = (ClusterCacheStatus)this.cacheStatusMap.get(cacheName);
        if (cacheStatus == null) {
            log.tracef("Ignoring leave request from %s for cache %s because it doesn't have a cache status entry", new Object[0]);
            return;
        }
        cacheStatus.doLeave(leaver);
    }

    @Override
    public void handleRebalanceCompleted(String cacheName, Address node, int topologyId, Throwable throwable, int viewId) throws Exception {
        if (throwable != null) {
            log.rebalanceError(cacheName, node, throwable);
        }
        log.debugf("Finished local rebalance for cache %s on node %s, topology id = %d", (Object)cacheName, (Object)node, (Object)topologyId);
        ClusterCacheStatus cacheStatus = (ClusterCacheStatus)this.cacheStatusMap.get(cacheName);
        if (cacheStatus == null || !cacheStatus.isRebalanceInProgress()) {
            log.debugf("Ignoring rebalance confirmation from %s for cache %s because it doesn't have a cache status entry", (Object)node, (Object)cacheName);
            return;
        }
        cacheStatus.doConfirmRebalance(node, topologyId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handleClusterView(boolean mergeView, int newViewId) {
        Object object = this.viewHandlingLock;
        synchronized (object) {
            if (newViewId <= this.viewId) {
                log.tracef("Ignoring old cluster view notification: %s", (Object)newViewId);
                return;
            }
            boolean becameCoordinator = !this.isCoordinator && this.transport.isCoordinator();
            this.isCoordinator = this.transport.isCoordinator();
            log.tracef("Received new cluster view: %s, isCoordinator = %s, becameCoordinator = %s", (Object)newViewId, (Object)this.isCoordinator, (Object)becameCoordinator);
            if (!this.isCoordinator) {
                return;
            }
            if (mergeView || becameCoordinator) {
                try {
                    this.recoverClusterStatus(newViewId, mergeView, this.transport.getMembers());
                }
                catch (InterruptedException e) {
                    log.tracef("Cluster state recovery interrupted because the coordinator is shutting down", new Object[0]);
                    return;
                }
                catch (Exception e) {
                    log.failedToRecoverClusterState(e);
                }
            } else {
                try {
                    this.updateCacheMembers(this.transport.getMembers());
                }
                catch (Exception e) {
                    log.errorUpdatingMembersList(e);
                }
            }
            Object object2 = this.viewUpdateLock;
            synchronized (object2) {
                this.viewId = newViewId;
                this.viewUpdateLock.notifyAll();
            }
        }
    }

    private ClusterCacheStatus initCacheStatusIfAbsent(String cacheName) {
        Configuration cacheConfiguration;
        AvailabilityStrategy availabilityStrategy;
        ClusterCacheStatus newCacheStatus;
        ClusterCacheStatus cacheStatus = (ClusterCacheStatus)this.cacheStatusMap.get(cacheName);
        if (cacheStatus == null && (cacheStatus = this.cacheStatusMap.putIfAbsent(cacheName, newCacheStatus = new ClusterCacheStatus(cacheName, availabilityStrategy = (cacheConfiguration = this.cacheManager.getCacheConfiguration(cacheName)) != null && cacheConfiguration.clustering().partitionHandling().enabled() ? new PreferConsistencyStrategy() : new PreferAvailabilityStrategy(), this))) == null) {
            cacheStatus = newCacheStatus;
        }
        return cacheStatus;
    }

    @Override
    public void broadcastRebalanceStart(String cacheName, CacheTopology cacheTopology, boolean totalOrder, boolean distributed) {
        LogFactory.CLUSTER.startRebalance(cacheName, cacheTopology);
        CacheTopologyControlCommand command = new CacheTopologyControlCommand(cacheName, CacheTopologyControlCommand.Type.REBALANCE_START, this.transport.getAddress(), cacheTopology, null, this.transport.getViewId());
        this.executeOnClusterAsync(command, this.getGlobalTimeout(), totalOrder, distributed);
    }

    private void recoverClusterStatus(int newViewId, boolean isMergeView, List<Address> clusterMembers) throws Exception {
        CacheTopologyControlCommand command = new CacheTopologyControlCommand(null, CacheTopologyControlCommand.Type.GET_STATUS, this.transport.getAddress(), newViewId);
        Map<Address, Object> statusResponses = this.executeOnClusterSync(command, this.getGlobalTimeout(), false, false);
        log.debugf("Got %d status responses. members are %s", (Object)statusResponses.size(), (Object)clusterMembers);
        HashMap responsesByCache = new HashMap();
        for (Map.Entry<Address, Object> entry : statusResponses.entrySet()) {
            Address sender = entry.getKey();
            Map nodeStatus = (Map)entry.getValue();
            for (Map.Entry statusEntry : nodeStatus.entrySet()) {
                String cacheName = (String)statusEntry.getKey();
                HashMap cacheResponses = (HashMap)responsesByCache.get(cacheName);
                if (cacheResponses == null) {
                    cacheResponses = new HashMap();
                    responsesByCache.put(cacheName, cacheResponses);
                }
                cacheResponses.put(sender, statusEntry.getValue());
            }
        }
        for (Map.Entry<Address, Object> entry : responsesByCache.entrySet()) {
            ClusterCacheStatus cacheStatus = this.initCacheStatusIfAbsent((String)((Object)entry.getKey()));
            cacheStatus.doMergePartitions((Map)entry.getValue(), clusterMembers, isMergeView);
        }
    }

    public void updateCacheMembers(List<Address> newClusterMembers) throws Exception {
        log.tracef("Updating cluster members for all the caches. New list is %s", (Object)newClusterMembers);
        for (ClusterCacheStatus cacheStatus : this.cacheStatusMap.values()) {
            cacheStatus.doHandleClusterView(newClusterMembers);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitForView(int viewId) throws InterruptedException {
        if (this.viewId < viewId) {
            log.tracef("Received a cache topology command with a higher view id: %s, our view id is %s", (Object)viewId, (Object)this.viewId);
        }
        Object object = this.viewUpdateLock;
        synchronized (object) {
            while (this.viewId < viewId) {
                this.viewUpdateLock.wait(1000L);
            }
        }
    }

    private Map<Address, Object> executeOnClusterSync(final ReplicableCommand command, final int timeout, boolean totalOrder, boolean distributed) throws Exception {
        Response localResponse;
        if (totalOrder) {
            Map<Address, Response> responseMap = this.transport.invokeRemotely(this.transport.getMembers(), command, ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, timeout, false, null, totalOrder, distributed);
            HashMap<Address, Object> responseValues = new HashMap<Address, Object>(this.transport.getMembers().size());
            for (Map.Entry<Address, Response> entry : responseMap.entrySet()) {
                Address address = entry.getKey();
                Response response = entry.getValue();
                if (!response.isSuccessful()) {
                    Exception cause = response instanceof ExceptionResponse ? ((ExceptionResponse)response).getException() : null;
                    throw new CacheException("Unsuccessful response received from node " + address + ": " + response, cause);
                }
                responseValues.put(address, ((SuccessfulResponse)response).getResponseValue());
            }
            return responseValues;
        }
        Future<Map<Address, Response>> remoteFuture = this.asyncTransportExecutor.submit(new Callable<Map<Address, Response>>(){

            @Override
            public Map<Address, Response> call() throws Exception {
                return ClusterTopologyManagerImpl.this.transport.invokeRemotely(null, command, ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, timeout, true, null, false, false);
            }
        });
        this.gcr.wireDependencies(command);
        try {
            if (log.isTraceEnabled()) {
                log.tracef("Attempting to execute command on self: %s", (Object)command);
            }
            localResponse = (Response)command.perform(null);
        }
        catch (Throwable throwable) {
            throw new Exception(throwable);
        }
        if (!localResponse.isSuccessful()) {
            throw new CacheException("Unsuccessful local response: " + localResponse);
        }
        Map<Address, Response> responseMap = remoteFuture.get(timeout, TimeUnit.MILLISECONDS);
        HashMap<Address, Object> responseValues = new HashMap<Address, Object>(this.transport.getMembers().size());
        for (Map.Entry<Address, Response> entry : responseMap.entrySet()) {
            Address address = entry.getKey();
            Response response = entry.getValue();
            if (!response.isSuccessful()) {
                Exception cause = response instanceof ExceptionResponse ? ((ExceptionResponse)response).getException() : null;
                throw new CacheException("Unsuccessful response received from node " + address + ": " + response, cause);
            }
            responseValues.put(address, ((SuccessfulResponse)response).getResponseValue());
        }
        responseValues.put(this.transport.getAddress(), ((SuccessfulResponse)localResponse).getResponseValue());
        return responseValues;
    }

    private int getGlobalTimeout() {
        return (int)this.globalConfiguration.transport().distributedSyncTimeout();
    }

    public void executeOnClusterAsync(final ReplicableCommand command, int timeout, boolean totalOrder, boolean distributed) {
        if (!totalOrder) {
            this.asyncTransportExecutor.submit(new Runnable(){

                @Override
                public void run() {
                    ClusterTopologyManagerImpl.this.gcr.wireDependencies(command);
                    try {
                        if (log.isTraceEnabled()) {
                            log.tracef("Attempting to execute command on self: %s", (Object)command);
                        }
                        command.perform(null);
                    }
                    catch (Throwable throwable) {
                        // empty catch block
                    }
                }
            });
        }
        try {
            this.transport.invokeRemotely(null, command, ResponseMode.ASYNCHRONOUS_WITH_SYNC_MARSHALLING, timeout, true, null, totalOrder, distributed);
        }
        catch (Exception e) {
            throw new CacheException("Failed to broadcast asynchronous command: " + command);
        }
    }

    @Override
    public void broadcastTopologyUpdate(String cacheName, CacheTopology cacheTopology, AvailabilityMode availabilityMode, boolean totalOrder, boolean distributed) {
        log.debugf("Updating cluster-wide current topology for cache %s, topology = %s, availability mode = %s", (Object)cacheName, (Object)cacheTopology, (Object)availabilityMode);
        CacheTopologyControlCommand command = new CacheTopologyControlCommand(cacheName, CacheTopologyControlCommand.Type.CH_UPDATE, this.transport.getAddress(), cacheTopology, availabilityMode, this.transport.getViewId());
        this.executeOnClusterAsync(command, this.getGlobalTimeout(), totalOrder, distributed);
    }

    @Override
    public void broadcastStableTopologyUpdate(String cacheName, CacheTopology cacheTopology, boolean totalOrder, boolean distributed) {
        log.debugf("Updating cluster-wide stable topology for cache %s, topology = %s", (Object)cacheName, (Object)cacheTopology);
        CacheTopologyControlCommand command = new CacheTopologyControlCommand(cacheName, CacheTopologyControlCommand.Type.STABLE_TOPOLOGY_UPDATE, this.transport.getAddress(), cacheTopology, null, this.transport.getViewId());
        this.executeOnClusterAsync(command, this.getGlobalTimeout(), totalOrder, distributed);
    }

    @Override
    public boolean isRebalancingEnabled() {
        return this.isRebalancingEnabled;
    }

    @Override
    public void setRebalancingEnabled(boolean enabled) {
        this.isRebalancingEnabled = enabled;
        for (ClusterCacheStatus cacheStatus : this.cacheStatusMap.values()) {
            cacheStatus.setRebalanceEnabled(enabled);
        }
    }

    @Override
    public void forceRebalance(String cacheName) {
        ClusterCacheStatus cacheStatus = (ClusterCacheStatus)this.cacheStatusMap.get(cacheName);
        if (cacheStatus != null) {
            cacheStatus.forceRebalance();
        }
    }

    @Override
    public void forceAvailabilityMode(String cacheName, AvailabilityMode availabilityMode) {
        ClusterCacheStatus cacheStatus = (ClusterCacheStatus)this.cacheStatusMap.get(cacheName);
        if (cacheStatus != null) {
            cacheStatus.forceAvailabilityMode(availabilityMode);
        }
    }

    @Listener(sync=true)
    public class ClusterViewListener {
        @Merged
        @ViewChanged
        public void handleViewChange(final ViewChangedEvent e) {
            ClusterTopologyManagerImpl.this.asyncTransportExecutor.submit(new Runnable(){

                @Override
                public void run() {
                    ClusterTopologyManagerImpl.this.handleClusterView(e.isMergeView(), e.getViewId());
                }
            });
        }
    }
}

