package org.apache.helix.manager.zk;

import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.ConfigChangeListener;
import org.apache.helix.ControllerChangeListener;
import org.apache.helix.CurrentStateChangeListener;
import org.apache.helix.ExternalViewChangeListener;
import org.apache.helix.HealthStateChangeListener;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
import org.apache.helix.HelixTimerTask;
import org.apache.helix.IdealStateChangeListener;
import org.apache.helix.InstanceConfigChangeListener;
import org.apache.helix.InstanceType;
import org.apache.helix.LiveInstanceChangeListener;
import org.apache.helix.LiveInstanceInfoProvider;
import org.apache.helix.MessageListener;
import org.apache.helix.PreConnectCallback;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathConfig;
import org.apache.helix.PropertyType;
import org.apache.helix.ScopedConfigChangeListener;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
import org.apache.helix.healthcheck.HealthStatsAggregationTask;
import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.monitoring.ZKPathDataDumpTask;
import org.apache.helix.participant.DistClusterControllerElection;
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.log4j.Logger;
import org.apache.zookeeper.Watcher;
import org.josql.expressions.BindVariable;

/* loaded from: input_file:WEB-INF/lib/helix-core-0.6.1-incubating.jar:org/apache/helix/manager/zk/ZKHelixManager.class */
public class ZKHelixManager implements HelixManager {
    private static Logger logger = Logger.getLogger(ZKHelixManager.class);
    private static final int RETRY_LIMIT = 3;
    private static final int CONNECTIONTIMEOUT = 60000;
    private final String _clusterName;
    private final String _instanceName;
    private final String _zkConnectString;
    private static final int DEFAULT_SESSION_TIMEOUT = 30000;
    private ZKHelixDataAccessor _helixAccessor;
    private ConfigAccessor _configAccessor;
    protected ZkClient _zkClient;
    private final ZkStateChangeListener _zkStateChangeListener;
    private final InstanceType _instanceType;
    volatile String _sessionId;
    private Timer _timer;
    private CallbackHandler _leaderElectionHandler;
    private ParticipantHealthReportCollectorImpl _participantHealthCheckInfoCollector;
    private final DefaultMessagingService _messagingService;
    private ZKHelixAdmin _managementTool;
    private final String _version;
    private final HelixManagerProperties _properties;
    private final StateMachineEngine _stateMachEngine;
    private int _sessionTimeout;
    private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
    private final List<HelixTimerTask> _controllerTimerTasks;
    private BaseDataAccessor<ZNRecord> _baseDataAccessor;
    int _flappingTimeWindowMs;
    int _maxDisconnectThreshold;
    public static final int FLAPPING_TIME_WINDIOW = 300000;
    public static final int MAX_DISCONNECT_THRESHOLD = 5;
    public static final String ALLOW_PARTICIPANT_AUTO_JOIN = "allowParticipantAutoJoin";
    protected final List<CallbackHandler> _handlers = new ArrayList();
    List<PreConnectCallback> _preConnectCallbacks = new LinkedList();
    ZKPropertyTransferServer _transferServer = null;
    LiveInstanceInfoProvider _liveInstanceInfoProvider = null;

    public ZKHelixManager(String str, String str2, InstanceType instanceType, String str3) {
        logger.info("Create a zk-based cluster manager. clusterName:" + str + ", instanceName:" + str2 + ", type:" + instanceType + ", zkSvr:" + str3);
        this._flappingTimeWindowMs = FLAPPING_TIME_WINDIOW;
        try {
            this._flappingTimeWindowMs = Integer.parseInt(System.getProperty("helixmanager.flappingTimeWindow", "300000"));
        } catch (NumberFormatException e) {
            logger.warn("Exception while parsing helixmanager.flappingTimeWindow: " + System.getProperty("helixmanager.flappingTimeWindow", "300000"));
        }
        this._maxDisconnectThreshold = 5;
        try {
            this._maxDisconnectThreshold = Integer.parseInt(System.getProperty("helixmanager.maxDisconnectThreshold", "5"));
        } catch (NumberFormatException e2) {
            logger.warn("Exception while parsing helixmanager.maxDisconnectThreshold: " + System.getProperty("helixmanager.maxDisconnectThreshold", "5"));
        }
        int i = -1;
        try {
            i = Integer.parseInt(System.getProperty("zk.session.timeout", "30000"));
        } catch (NumberFormatException e3) {
            logger.warn("Exception while parsing session timeout: " + System.getProperty("zk.session.timeout", "30000"));
        }
        if (i > 0) {
            this._sessionTimeout = i;
        } else {
            this._sessionTimeout = 30000;
        }
        if (str2 == null) {
            try {
                str2 = InetAddress.getLocalHost().getCanonicalHostName() + "-" + instanceType.toString();
            } catch (UnknownHostException e4) {
                logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e4);
                str2 = "UNKNOWN";
            }
        }
        this._clusterName = str;
        this._instanceName = str2;
        this._instanceType = instanceType;
        this._zkConnectString = str3;
        this._zkStateChangeListener = new ZkStateChangeListener(this, this._flappingTimeWindowMs, this._maxDisconnectThreshold);
        this._timer = null;
        this._messagingService = new DefaultMessagingService(this);
        this._properties = new HelixManagerProperties("cluster-manager-version.properties");
        this._version = this._properties.getVersion();
        this._stateMachEngine = new HelixStateMachineEngine(this);
        this._controllerTimerTasks = new ArrayList();
        if (this._instanceType == InstanceType.CONTROLLER) {
            this._controllerTimerTasks.add(new HealthStatsAggregationTask(this));
        }
    }

    @Override // org.apache.helix.HelixManager
    public boolean removeListener(PropertyKey propertyKey, Object obj) {
        logger.info("Removing listener: " + obj + " on path: " + propertyKey.getPath() + " from cluster: " + this._clusterName + " by instance: " + this._instanceName);
        synchronized (this) {
            ArrayList arrayList = new ArrayList();
            for (CallbackHandler callbackHandler : this._handlers) {
                if (callbackHandler.getPath().equals(propertyKey.getPath()) && callbackHandler.getListener().equals(obj)) {
                    arrayList.add(callbackHandler);
                }
            }
            this._handlers.removeAll(arrayList);
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                ((CallbackHandler) it.next()).reset();
            }
        }
        return true;
    }

    private void addListener(Object obj, PropertyKey propertyKey, HelixConstants.ChangeType changeType, Watcher.Event.EventType[] eventTypeArr) {
        checkConnected();
        PropertyType type = propertyKey.getType();
        synchronized (this) {
            for (CallbackHandler callbackHandler : this._handlers) {
                if (callbackHandler.getPath().equals(propertyKey.getPath()) && callbackHandler.getListener().equals(obj)) {
                    logger.info("Listener: " + obj + " on path: " + propertyKey.getPath() + " already exists. skip adding it");
                    return;
                }
            }
            CallbackHandler createCallBackHandler = createCallBackHandler(propertyKey, obj, eventTypeArr, changeType);
            this._handlers.add(createCallBackHandler);
            logger.info("Add listener: " + obj + " for type: " + type + " to path: " + createCallBackHandler.getPath());
        }
    }

    @Override // org.apache.helix.HelixManager
    public void addIdealStateChangeListener(IdealStateChangeListener idealStateChangeListener) throws Exception {
        addListener(idealStateChangeListener, new PropertyKey.Builder(this._clusterName).idealStates(), HelixConstants.ChangeType.IDEAL_STATE, new Watcher.Event.EventType[]{Watcher.Event.EventType.NodeDataChanged, Watcher.Event.EventType.NodeDeleted, Watcher.Event.EventType.NodeCreated});
    }

    @Override // org.apache.helix.HelixManager
    public void addLiveInstanceChangeListener(LiveInstanceChangeListener liveInstanceChangeListener) throws Exception {
        addListener(liveInstanceChangeListener, new PropertyKey.Builder(this._clusterName).liveInstances(), HelixConstants.ChangeType.LIVE_INSTANCE, new Watcher.Event.EventType[]{Watcher.Event.EventType.NodeDataChanged, Watcher.Event.EventType.NodeChildrenChanged, Watcher.Event.EventType.NodeDeleted, Watcher.Event.EventType.NodeCreated});
    }

    @Override // org.apache.helix.HelixManager
    public void addConfigChangeListener(ConfigChangeListener configChangeListener) {
        addListener(configChangeListener, new PropertyKey.Builder(this._clusterName).instanceConfigs(), HelixConstants.ChangeType.INSTANCE_CONFIG, new Watcher.Event.EventType[]{Watcher.Event.EventType.NodeChildrenChanged});
    }

    @Override // org.apache.helix.HelixManager
    public void addInstanceConfigChangeListener(InstanceConfigChangeListener instanceConfigChangeListener) {
        addListener(instanceConfigChangeListener, new PropertyKey.Builder(this._clusterName).instanceConfigs(), HelixConstants.ChangeType.INSTANCE_CONFIG, new Watcher.Event.EventType[]{Watcher.Event.EventType.NodeChildrenChanged});
    }

    @Override // org.apache.helix.HelixManager
    public void addConfigChangeListener(ScopedConfigChangeListener scopedConfigChangeListener, HelixConfigScope.ConfigScopeProperty configScopeProperty) {
        PropertyKey.Builder builder = new PropertyKey.Builder(this._clusterName);
        PropertyKey propertyKey = null;
        switch (configScopeProperty) {
            case CLUSTER:
                propertyKey = builder.clusterConfigs();
                break;
            case PARTICIPANT:
                propertyKey = builder.instanceConfigs();
                break;
            case RESOURCE:
                propertyKey = builder.resourceConfigs();
                break;
        }
        if (propertyKey != null) {
            addListener(scopedConfigChangeListener, propertyKey, HelixConstants.ChangeType.CONFIG, new Watcher.Event.EventType[]{Watcher.Event.EventType.NodeChildrenChanged});
        } else {
            logger.error("Can't add listener to config scope: " + configScopeProperty);
        }
    }

    @Override // org.apache.helix.HelixManager
    public void addMessageListener(MessageListener messageListener, String str) {
        addListener(messageListener, new PropertyKey.Builder(this._clusterName).messages(str), HelixConstants.ChangeType.MESSAGE, new Watcher.Event.EventType[]{Watcher.Event.EventType.NodeChildrenChanged, Watcher.Event.EventType.NodeDeleted, Watcher.Event.EventType.NodeCreated});
    }

    void addControllerMessageListener(MessageListener messageListener) {
        addListener(messageListener, new PropertyKey.Builder(this._clusterName).controllerMessages(), HelixConstants.ChangeType.MESSAGES_CONTROLLER, new Watcher.Event.EventType[]{Watcher.Event.EventType.NodeChildrenChanged, Watcher.Event.EventType.NodeDeleted, Watcher.Event.EventType.NodeCreated});
    }

    @Override // org.apache.helix.HelixManager
    public void addCurrentStateChangeListener(CurrentStateChangeListener currentStateChangeListener, String str, String str2) {
        addListener(currentStateChangeListener, new PropertyKey.Builder(this._clusterName).currentStates(str, str2), HelixConstants.ChangeType.CURRENT_STATE, new Watcher.Event.EventType[]{Watcher.Event.EventType.NodeChildrenChanged, Watcher.Event.EventType.NodeDeleted, Watcher.Event.EventType.NodeCreated});
    }

    @Override // org.apache.helix.HelixManager
    public void addHealthStateChangeListener(HealthStateChangeListener healthStateChangeListener, String str) {
        addListener(healthStateChangeListener, new PropertyKey.Builder(this._clusterName).healthReports(str), HelixConstants.ChangeType.HEALTH, new Watcher.Event.EventType[]{Watcher.Event.EventType.NodeChildrenChanged, Watcher.Event.EventType.NodeDeleted, Watcher.Event.EventType.NodeCreated});
    }

    @Override // org.apache.helix.HelixManager
    public void addExternalViewChangeListener(ExternalViewChangeListener externalViewChangeListener) {
        addListener(externalViewChangeListener, new PropertyKey.Builder(this._clusterName).externalViews(), HelixConstants.ChangeType.EXTERNAL_VIEW, new Watcher.Event.EventType[]{Watcher.Event.EventType.NodeChildrenChanged, Watcher.Event.EventType.NodeDeleted, Watcher.Event.EventType.NodeCreated});
    }

    @Override // org.apache.helix.HelixManager
    public void addControllerListener(ControllerChangeListener controllerChangeListener) {
        addListener(controllerChangeListener, new PropertyKey.Builder(this._clusterName).controller(), HelixConstants.ChangeType.CONTROLLER, new Watcher.Event.EventType[]{Watcher.Event.EventType.NodeChildrenChanged, Watcher.Event.EventType.NodeDeleted, Watcher.Event.EventType.NodeCreated});
    }

    @Override // org.apache.helix.HelixManager
    public HelixDataAccessor getHelixDataAccessor() {
        checkConnected();
        return this._helixAccessor;
    }

    @Override // org.apache.helix.HelixManager
    public ConfigAccessor getConfigAccessor() {
        checkConnected();
        return this._configAccessor;
    }

    @Override // org.apache.helix.HelixManager
    public String getClusterName() {
        return this._clusterName;
    }

    @Override // org.apache.helix.HelixManager
    public String getInstanceName() {
        return this._instanceName;
    }

    @Override // org.apache.helix.HelixManager
    public void connect() throws Exception {
        logger.info("ClusterManager.connect()");
        if (this._zkStateChangeListener.isConnected()) {
            logger.warn("Cluster manager " + this._clusterName + " " + this._instanceName + " already connected");
            return;
        }
        try {
            createClient(this._zkConnectString);
            this._messagingService.onConnected();
        } catch (Exception e) {
            logger.error(e);
            disconnect();
            throw e;
        }
    }

    @Override // org.apache.helix.HelixManager
    public void disconnect() {
        if (isConnected()) {
            disconnectInternal();
        } else {
            logger.error("ClusterManager " + this._instanceName + " already disconnected");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void disconnectInternal() {
        logger.info("disconnect " + this._instanceName + "(" + this._instanceType + ") from " + this._clusterName);
        this._messagingService.getExecutor().shutdown();
        resetHandlers();
        this._helixAccessor.shutdown();
        if (this._leaderElectionHandler != null) {
            this._leaderElectionHandler.reset();
        }
        if (this._participantHealthCheckInfoCollector != null) {
            this._participantHealthCheckInfoCollector.stop();
        }
        if (this._timer != null) {
            this._timer.cancel();
            this._timer = null;
        }
        if (this._instanceType == InstanceType.CONTROLLER) {
            stopTimerTasks();
        }
        this._zkClient.unsubscribeAll();
        this._zkClient.close();
        this._zkStateChangeListener.disconnect();
        logger.info("Cluster manager: " + this._instanceName + " disconnected");
    }

    @Override // org.apache.helix.HelixManager
    public String getSessionId() {
        checkConnected();
        return this._sessionId;
    }

    @Override // org.apache.helix.HelixManager
    public boolean isConnected() {
        return this._zkStateChangeListener.isConnected();
    }

    @Override // org.apache.helix.HelixManager
    public long getLastNotificationTime() {
        return -1L;
    }

    private void addLiveInstance() {
        LiveInstance liveInstance = new LiveInstance(this._instanceName);
        liveInstance.setSessionId(this._sessionId);
        liveInstance.setHelixVersion(this._version);
        liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
        if (this._liveInstanceInfoProvider != null) {
            logger.info("invoking _liveInstanceInfoProvider");
            ZNRecord additionalLiveInstanceInfo = this._liveInstanceInfoProvider.getAdditionalLiveInstanceInfo();
            if (additionalLiveInstanceInfo != null) {
                additionalLiveInstanceInfo.merge(liveInstance.getRecord());
                liveInstance = new LiveInstance(new ZNRecord(additionalLiveInstanceInfo, this._instanceName));
                logger.info("liveInstance content :" + this._instanceName + " " + liveInstance.toString());
            }
        }
        logger.info("Add live instance: InstanceName: " + this._instanceName + " Session id:" + this._sessionId);
        if (!this._helixAccessor.createProperty(this._helixAccessor.keyBuilder().liveInstance(this._instanceName), liveInstance)) {
            String str = "Fail to create live instance node after waiting, so quit. instance:" + this._instanceName;
            logger.warn(str);
            throw new HelixException(str);
        }
        String path = PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, this._clusterName, this._instanceName, getSessionId());
        if (this._zkClient.exists(path)) {
            return;
        }
        this._zkClient.createPersistent(path);
        logger.info("Creating current state path " + path);
    }

    private void startStatusUpdatedumpTask() {
        if (this._timer == null) {
            this._timer = new Timer(true);
            this._timer.scheduleAtFixedRate(new ZKPathDataDumpTask(this, this._zkClient, 10800000), 1800000L, 7200000L);
        }
    }

    private void createClient(String str) throws Exception {
        this._zkClient = new ZkClient(str, this._sessionTimeout, 60000, ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()).serialize(PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, this._clusterName, new String[0]), new ByteArraySerializer()).build());
        ZkBaseDataAccessor zkBaseDataAccessor = new ZkBaseDataAccessor(this._zkClient);
        if (this._instanceType == InstanceType.PARTICIPANT) {
            this._baseDataAccessor = new ZkCacheBaseDataAccessor(zkBaseDataAccessor, Arrays.asList(PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, this._clusterName, this._instanceName)));
        } else if (this._instanceType == InstanceType.CONTROLLER) {
            this._baseDataAccessor = new ZkCacheBaseDataAccessor(zkBaseDataAccessor, Arrays.asList(PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, this._clusterName, new String[0])));
        } else {
            this._baseDataAccessor = zkBaseDataAccessor;
        }
        this._helixAccessor = new ZKHelixDataAccessor(this._clusterName, this._instanceType, this._baseDataAccessor);
        this._configAccessor = new ConfigAccessor(this._zkClient);
        int i = 0;
        this._zkClient.subscribeStateChanges(this._zkStateChangeListener);
        while (i < 3) {
            try {
                this._zkClient.waitUntilConnected(this._sessionTimeout, TimeUnit.MILLISECONDS);
                this._zkStateChangeListener.handleStateChanged(Watcher.Event.KeeperState.SyncConnected);
                this._zkStateChangeListener.handleNewSession();
                return;
            } catch (HelixException e) {
                logger.error("fail to createClient.", e);
                throw e;
            } catch (Exception e2) {
                i++;
                logger.error("fail to createClient. retry " + i, e2);
                if (i == 3) {
                    throw e2;
                }
            }
        }
    }

    private CallbackHandler createCallBackHandler(PropertyKey propertyKey, Object obj, Watcher.Event.EventType[] eventTypeArr, HelixConstants.ChangeType changeType) {
        if (obj == null) {
            throw new HelixException("Listener cannot be null");
        }
        return new CallbackHandler(this, this._zkClient, propertyKey, obj, eventTypeArr, changeType);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleNewSession() {
        boolean waitUntilConnected = this._zkClient.waitUntilConnected(60000L, TimeUnit.MILLISECONDS);
        while (!waitUntilConnected) {
            logger.error("Could NOT connect to zk server in 60000ms. zkServer: " + this._zkConnectString + ", expiredSessionId: " + this._sessionId + ", clusterName: " + this._clusterName);
            waitUntilConnected = this._zkClient.waitUntilConnected(60000L, TimeUnit.MILLISECONDS);
        }
        ZkConnection zkConnection = (ZkConnection) this._zkClient.getConnection();
        synchronized (this) {
            this._sessionId = Long.toHexString(zkConnection.getZookeeper().getSessionId());
        }
        this._baseDataAccessor.reset();
        resetHandlers();
        logger.info("Handling new session, session id:" + this._sessionId + ", instance:" + this._instanceName + ", instanceTye: " + this._instanceType + ", cluster: " + this._clusterName);
        logger.info(zkConnection.getZookeeper());
        if (!ZKUtil.isClusterSetup(this._clusterName, this._zkClient)) {
            throw new HelixException("Initial cluster structure is not set up for cluster:" + this._clusterName);
        }
        boolean z = false;
        try {
            z = Boolean.parseBoolean(getConfigAccessor().get(new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getClusterName()).build(), ALLOW_PARTICIPANT_AUTO_JOIN));
            logger.info("Auto joining " + this._clusterName + " is true");
        } catch (Exception e) {
        }
        if (!ZKUtil.isInstanceSetup(this._zkClient, this._clusterName, this._instanceName, this._instanceType)) {
            if (!z) {
                throw new HelixException("Initial cluster structure is not set up for instance:" + this._instanceName + " instanceType:" + this._instanceType);
            }
            logger.info("Auto joining instance " + this._instanceName);
            InstanceConfig instanceConfig = new InstanceConfig(this._instanceName);
            String str = this._instanceName;
            String str2 = "";
            int lastIndexOf = this._instanceName.lastIndexOf(BindVariable.SPECIAL_NAME_PREFIX);
            if (lastIndexOf > 0) {
                str = this._instanceName.substring(0, lastIndexOf);
                str2 = this._instanceName.substring(lastIndexOf + 1);
            }
            instanceConfig.setHostName(str);
            instanceConfig.setPort(str2);
            instanceConfig.setInstanceEnabled(true);
            getClusterManagmentTool().addInstance(this._clusterName, instanceConfig);
        }
        if (this._instanceType == InstanceType.PARTICIPANT || this._instanceType == InstanceType.CONTROLLER_PARTICIPANT) {
            handleNewSessionAsParticipant();
        }
        if (this._instanceType == InstanceType.CONTROLLER || this._instanceType == InstanceType.CONTROLLER_PARTICIPANT) {
            addControllerMessageListener(this._messagingService.getExecutor());
            DefaultControllerMessageHandlerFactory defaultControllerMessageHandlerFactory = new DefaultControllerMessageHandlerFactory();
            this._messagingService.getExecutor().registerMessageHandlerFactory(defaultControllerMessageHandlerFactory.getMessageType(), defaultControllerMessageHandlerFactory);
            DefaultSchedulerMessageHandlerFactory defaultSchedulerMessageHandlerFactory = new DefaultSchedulerMessageHandlerFactory(this);
            this._messagingService.getExecutor().registerMessageHandlerFactory(defaultSchedulerMessageHandlerFactory.getMessageType(), defaultSchedulerMessageHandlerFactory);
            DefaultParticipantErrorMessageHandlerFactory defaultParticipantErrorMessageHandlerFactory = new DefaultParticipantErrorMessageHandlerFactory(this);
            this._messagingService.getExecutor().registerMessageHandlerFactory(defaultParticipantErrorMessageHandlerFactory.getMessageType(), defaultParticipantErrorMessageHandlerFactory);
            if (this._leaderElectionHandler != null) {
                this._leaderElectionHandler.reset();
                this._leaderElectionHandler.init();
            } else {
                this._leaderElectionHandler = createCallBackHandler(new PropertyKey.Builder(this._clusterName).controller(), new DistClusterControllerElection(this._zkConnectString), new Watcher.Event.EventType[]{Watcher.Event.EventType.NodeChildrenChanged, Watcher.Event.EventType.NodeDeleted, Watcher.Event.EventType.NodeCreated}, HelixConstants.ChangeType.CONTROLLER);
            }
        }
        if (this._instanceType == InstanceType.PARTICIPANT || this._instanceType == InstanceType.CONTROLLER_PARTICIPANT || (this._instanceType == InstanceType.CONTROLLER && isLeader())) {
            initHandlers();
        }
    }

    private void handleNewSessionAsParticipant() {
        PropertyKey.Builder keyBuilder = this._helixAccessor.keyBuilder();
        if (this._helixAccessor.getProperty(keyBuilder.liveInstance(this._instanceName)) != null) {
            logger.warn("Found another instance with same instanceName: " + this._instanceName + " in cluster " + this._clusterName);
            try {
                Thread.sleep(this._sessionTimeout + 5000);
            } catch (InterruptedException e) {
                logger.warn("Sleep interrupted while waiting for previous liveinstance to go away.", e);
            }
            if (this._helixAccessor.getProperty(keyBuilder.liveInstance(this._instanceName)) != null) {
                String str = "instance " + this._instanceName + " already has a liveinstance in cluster " + this._clusterName;
                logger.error(str);
                throw new HelixException(str);
            }
        }
        Iterator<PreConnectCallback> it = this._preConnectCallbacks.iterator();
        while (it.hasNext()) {
            it.next().onPreConnect();
        }
        addLiveInstance();
        carryOverPreviousCurrentState();
        this._messagingService.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.toString(), this._stateMachEngine);
        addMessageListener(this._messagingService.getExecutor(), this._instanceName);
        addControllerListener(this._helixAccessor);
        this._stateMachEngine.registerStateModelFactory(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, new ScheduledTaskStateModelFactory(this._messagingService.getExecutor()));
        if (this._participantHealthCheckInfoCollector == null) {
            this._participantHealthCheckInfoCollector = new ParticipantHealthReportCollectorImpl(this, this._instanceName);
            this._participantHealthCheckInfoCollector.start();
        }
        String path = this._helixAccessor.keyBuilder().healthReports(this._instanceName).getPath();
        if (this._zkClient.exists(path)) {
            return;
        }
        this._zkClient.createPersistent(path, true);
        logger.info("Creating healthcheck info path " + path);
    }

    @Override // org.apache.helix.HelixManager
    public void addPreConnectCallback(PreConnectCallback preConnectCallback) {
        logger.info("Adding preconnect callback");
        this._preConnectCallbacks.add(preConnectCallback);
    }

    private void resetHandlers() {
        synchronized (this) {
            if (this._handlers != null) {
                ArrayList<CallbackHandler> arrayList = new ArrayList();
                arrayList.addAll(this._handlers);
                for (CallbackHandler callbackHandler : arrayList) {
                    callbackHandler.reset();
                    logger.info("reset handler: " + callbackHandler.getPath() + ", " + callbackHandler.getListener());
                }
            }
        }
    }

    private void initHandlers() {
        synchronized (this) {
            if (this._handlers != null) {
                ArrayList<CallbackHandler> arrayList = new ArrayList();
                arrayList.addAll(this._handlers);
                for (CallbackHandler callbackHandler : arrayList) {
                    callbackHandler.init();
                    logger.info("init handler: " + callbackHandler.getPath() + ", " + callbackHandler.getListener());
                }
            }
        }
    }

    @Override // org.apache.helix.HelixManager
    public boolean isLeader() {
        String instanceName;
        if (!isConnected() || this._instanceType != InstanceType.CONTROLLER) {
            return false;
        }
        LiveInstance liveInstance = (LiveInstance) this._helixAccessor.getProperty(this._helixAccessor.keyBuilder().controllerLeader());
        return (liveInstance == null || (instanceName = liveInstance.getInstanceName()) == null || !instanceName.equals(this._instanceName)) ? false : true;
    }

    private void carryOverPreviousCurrentState() {
        PropertyKey.Builder keyBuilder = this._helixAccessor.keyBuilder();
        List<String> childNames = this._helixAccessor.getChildNames(keyBuilder.sessions(this._instanceName));
        for (String str : childNames) {
            if (!str.equals(this._sessionId)) {
                for (CurrentState currentState : this._helixAccessor.getChildValues(keyBuilder.currentStates(this._instanceName, str))) {
                    logger.info("Carrying over old session: " + str + ", resource: " + currentState.getId() + " to current session: " + this._sessionId);
                    String stateModelDefRef = currentState.getStateModelDefRef();
                    if (stateModelDefRef == null) {
                        logger.error("skip carry-over because previous current state doesn't have a state model definition. previous current-state: " + currentState);
                    } else {
                        this._helixAccessor.getBaseDataAccessor().update(keyBuilder.currentState(this._instanceName, this._sessionId, currentState.getResourceName()).getPath(), new CurStateCarryOverUpdater(this._sessionId, ((StateModelDefinition) this._helixAccessor.getProperty(keyBuilder.stateModelDef(stateModelDefRef))).getInitialState(), currentState), AccessOption.PERSISTENT);
                    }
                }
            }
        }
        for (String str2 : childNames) {
            if (!str2.equals(this._sessionId)) {
                String path = this._helixAccessor.keyBuilder().currentStates(this._instanceName, str2).getPath();
                logger.info("Removing current states from previous sessions. path: " + path);
                this._zkClient.deleteRecursive(path);
            }
        }
    }

    @Override // org.apache.helix.HelixManager
    public synchronized ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore() {
        checkConnected();
        if (this._helixPropertyStore == null) {
            this._helixPropertyStore = new ZkHelixPropertyStore<>(new ZkBaseDataAccessor(this._zkClient), PropertyPathConfig.getPath(PropertyType.HELIX_PROPERTYSTORE, this._clusterName, new String[0]), (List<String>) null);
        }
        return this._helixPropertyStore;
    }

    @Override // org.apache.helix.HelixManager
    public synchronized HelixAdmin getClusterManagmentTool() {
        checkConnected();
        if (this._zkClient != null) {
            this._managementTool = new ZKHelixAdmin(this._zkClient);
        } else {
            logger.error("Couldn't get ZKClusterManagementTool because zkClient is null");
        }
        return this._managementTool;
    }

    @Override // org.apache.helix.HelixManager
    public ClusterMessagingService getMessagingService() {
        return this._messagingService;
    }

    @Override // org.apache.helix.HelixManager
    public ParticipantHealthReportCollector getHealthReportCollector() {
        checkConnected();
        return this._participantHealthCheckInfoCollector;
    }

    @Override // org.apache.helix.HelixManager
    public InstanceType getInstanceType() {
        return this._instanceType;
    }

    private void checkConnected() {
        if (!isConnected()) {
            throw new HelixException("ClusterManager not connected. Call clusterManager.connect()");
        }
    }

    @Override // org.apache.helix.HelixManager
    public String getVersion() {
        return this._version;
    }

    @Override // org.apache.helix.HelixManager
    public HelixManagerProperties getProperties() {
        return this._properties;
    }

    @Override // org.apache.helix.HelixManager
    public StateMachineEngine getStateMachineEngine() {
        return this._stateMachEngine;
    }

    @Override // org.apache.helix.HelixManager
    public void startTimerTasks() {
        Iterator<HelixTimerTask> it = this._controllerTimerTasks.iterator();
        while (it.hasNext()) {
            it.next().start();
        }
        startStatusUpdatedumpTask();
    }

    @Override // org.apache.helix.HelixManager
    public void stopTimerTasks() {
        Iterator<HelixTimerTask> it = this._controllerTimerTasks.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
    }

    @Override // org.apache.helix.HelixManager
    public void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider) {
        this._liveInstanceInfoProvider = liveInstanceInfoProvider;
    }
}
