package org.apache.helix.messaging.handling;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.MessageListener;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.builder.ConfigScopeBuilder;
import org.apache.helix.monitoring.ParticipantMonitor;
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.util.StatusUpdateUtil;
import org.apache.log4j.Logger;

/* loaded from: input_file:WEB-INF/lib/helix-core-0.6.1-incubating.jar:org/apache/helix/messaging/handling/HelixTaskExecutor.class */
public class HelixTaskExecutor implements MessageListener, TaskExecutor {
    public static final int DEFAULT_PARALLEL_TASKS = 40;
    public static final String MAX_THREADS = "maxThreads";
    private static Logger LOG = Logger.getLogger(HelixTaskExecutor.class);
    final ConcurrentHashMap<String, MessageHandlerFactory> _handlerFactoryMap = new ConcurrentHashMap<>();
    Map<String, Integer> _resourceThreadpoolSizeMap = new ConcurrentHashMap();
    protected final Map<String, MessageTaskInfo> _taskMap = new ConcurrentHashMap();
    final ConcurrentHashMap<String, ExecutorService> _executorMap = new ConcurrentHashMap<>();
    private final Object _lock = new Object();
    private final StatusUpdateUtil _statusUpdateUtil = new StatusUpdateUtil();
    private final ParticipantMonitor _monitor = new ParticipantMonitor();
    final Timer _timer = new Timer(true);

    public HelixTaskExecutor() {
        startMonitorThread();
    }

    @Override // org.apache.helix.messaging.handling.TaskExecutor
    public void registerMessageHandlerFactory(String str, MessageHandlerFactory messageHandlerFactory) {
        registerMessageHandlerFactory(str, messageHandlerFactory, 40);
    }

    @Override // org.apache.helix.messaging.handling.TaskExecutor
    public void registerMessageHandlerFactory(String str, MessageHandlerFactory messageHandlerFactory, int i) {
        if (this._handlerFactoryMap.containsKey(str)) {
            LOG.warn("Fail to register msg-handler-factory for type: " + str + ", pool-size: " + i + ", factory: " + messageHandlerFactory);
        } else {
            if (!str.equalsIgnoreCase(messageHandlerFactory.getMessageType())) {
                throw new HelixException("Message factory type mismatch. Type: " + str + " factory : " + messageHandlerFactory.getMessageType());
            }
            this._handlerFactoryMap.put(str, messageHandlerFactory);
            this._executorMap.put(str, Executors.newFixedThreadPool(i));
            LOG.info("Added msg-factory for type: " + str + ", threadpool size " + i);
        }
    }

    public ParticipantMonitor getParticipantMonitor() {
        return this._monitor;
    }

    private void startMonitorThread() {
    }

    void checkResourceConfig(String str, HelixManager helixManager) {
        if (this._resourceThreadpoolSizeMap.containsKey(str)) {
            return;
        }
        int i = -1;
        ConfigAccessor configAccessor = helixManager.getConfigAccessor();
        if (configAccessor != null) {
            String str2 = configAccessor.get(new ConfigScopeBuilder().forCluster(helixManager.getClusterName()).forResource(str).build(), MAX_THREADS);
            if (str2 != null) {
                try {
                    i = Integer.parseInt(str2);
                } catch (Exception e) {
                    LOG.error("", e);
                }
            }
        }
        if (i > 0) {
            this._executorMap.put(Message.MessageType.STATE_TRANSITION.toString() + "." + str, Executors.newFixedThreadPool(i));
            LOG.info("Added per resource threadpool for resource: " + str + " with size: " + i);
        }
        this._resourceThreadpoolSizeMap.put(str, Integer.valueOf(i));
    }

    ExecutorService findExecutorServiceForMsg(Message message) {
        String resourceName;
        ExecutorService executorService = this._executorMap.get(message.getMsgType());
        if (message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.toString()) && (resourceName = message.getResourceName()) != null) {
            String str = message.getMsgType() + "." + resourceName;
            if (this._executorMap.containsKey(str)) {
                LOG.info("Find per-resource thread pool with key: " + str);
                executorService = this._executorMap.get(str);
            }
        }
        return executorService;
    }

    @Override // org.apache.helix.messaging.handling.TaskExecutor
    public List<Future<HelixTaskResult>> invokeAllTasks(List<MessageTask> list, long j, TimeUnit timeUnit) throws InterruptedException {
        if (list == null || list.size() == 0) {
            return null;
        }
        ExecutorService findExecutorServiceForMsg = findExecutorServiceForMsg(list.get(0).getMessage());
        for (int i = 1; i < list.size(); i++) {
            if (findExecutorServiceForMsg(list.get(i).getMessage()) != findExecutorServiceForMsg) {
                LOG.error("Fail to invoke all tasks because they are not using the same executor-service");
                return null;
            }
        }
        return findExecutorServiceForMsg.invokeAll(list, j, timeUnit);
    }

    @Override // org.apache.helix.messaging.handling.TaskExecutor
    public boolean cancelTimeoutTask(MessageTask messageTask) {
        synchronized (this._lock) {
            String taskId = messageTask.getTaskId();
            if (!this._taskMap.containsKey(taskId)) {
                return false;
            }
            MessageTaskInfo messageTaskInfo = this._taskMap.get(taskId);
            if (messageTaskInfo._timerTask != null) {
                messageTaskInfo._timerTask.cancel();
            }
            return true;
        }
    }

    @Override // org.apache.helix.messaging.handling.TaskExecutor
    public boolean scheduleTask(MessageTask messageTask) {
        String taskId = messageTask.getTaskId();
        Message message = messageTask.getMessage();
        NotificationContext notificationContext = messageTask.getNotificationContext();
        try {
            if (message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.toString())) {
                checkResourceConfig(message.getResourceName(), notificationContext.getManager());
            }
            LOG.info("Scheduling message: " + taskId);
            this._statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Message handling task scheduled", notificationContext.getManager().getHelixDataAccessor());
            synchronized (this._lock) {
                if (this._taskMap.containsKey(taskId)) {
                    this._statusUpdateUtil.logWarning(message, HelixTaskExecutor.class, "Message handling task already sheduled for " + taskId, notificationContext.getManager().getHelixDataAccessor());
                    return false;
                }
                Future submit = findExecutorServiceForMsg(message).submit(messageTask);
                MessageTimeoutTask messageTimeoutTask = null;
                if (message.getExecutionTimeout() > 0) {
                    messageTimeoutTask = new MessageTimeoutTask(this, messageTask);
                    this._timer.schedule(messageTimeoutTask, message.getExecutionTimeout());
                    LOG.info("Message starts with timeout " + message.getExecutionTimeout() + " MsgId: " + messageTask.getTaskId());
                } else {
                    LOG.debug("Message does not have timeout. MsgId: " + messageTask.getTaskId());
                }
                this._taskMap.put(taskId, new MessageTaskInfo(messageTask, submit, messageTimeoutTask));
                LOG.info("Message: " + taskId + " handling task scheduled");
                return true;
            }
        } catch (Exception e) {
            LOG.error("Error while executing task. " + message, e);
            this._statusUpdateUtil.logError(message, HelixTaskExecutor.class, e, "Error while executing task " + e, notificationContext.getManager().getHelixDataAccessor());
            return false;
        }
    }

    @Override // org.apache.helix.messaging.handling.TaskExecutor
    public boolean cancelTask(MessageTask messageTask) {
        Message message = messageTask.getMessage();
        NotificationContext notificationContext = messageTask.getNotificationContext();
        String taskId = messageTask.getTaskId();
        synchronized (this._lock) {
            if (this._taskMap.containsKey(taskId)) {
                MessageTaskInfo messageTaskInfo = this._taskMap.get(taskId);
                if (messageTaskInfo._timerTask != null) {
                    messageTaskInfo._timerTask.cancel();
                }
                Future<HelixTaskResult> future = messageTaskInfo.getFuture();
                this._statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceling task: " + taskId, notificationContext.getManager().getHelixDataAccessor());
                if (future.cancel(true)) {
                    this._statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceled task: " + taskId, notificationContext.getManager().getHelixDataAccessor());
                    this._taskMap.remove(taskId);
                    return true;
                }
                this._statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "fail to cancel task: " + taskId, notificationContext.getManager().getHelixDataAccessor());
            } else {
                this._statusUpdateUtil.logWarning(message, HelixTaskExecutor.class, "fail to cancel task: " + taskId + ", future not found", notificationContext.getManager().getHelixDataAccessor());
            }
            return false;
        }
    }

    @Override // org.apache.helix.messaging.handling.TaskExecutor
    public void finishTask(MessageTask messageTask) {
        Message message = messageTask.getMessage();
        String taskId = messageTask.getTaskId();
        LOG.info("message finished: " + taskId + ", took " + (new Date().getTime() - message.getExecuteStartTimeStamp()));
        synchronized (this._lock) {
            if (this._taskMap.containsKey(taskId)) {
                MessageTaskInfo remove = this._taskMap.remove(taskId);
                if (remove._timerTask != null) {
                    remove._timerTask.cancel();
                }
            } else {
                LOG.warn("message " + taskId + " not found in task map");
            }
        }
    }

    private void updateMessageState(List<Message> list, HelixDataAccessor helixDataAccessor, String str) {
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        ArrayList arrayList = new ArrayList();
        Iterator<Message> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getKey(keyBuilder, str));
        }
        helixDataAccessor.setChildren(arrayList, list);
    }

    @Override // org.apache.helix.MessageListener
    public void onMessage(String str, List<Message> list, NotificationContext notificationContext) {
        if (notificationContext.getType() == NotificationContext.Type.FINALIZE) {
            LOG.info("Get FINALIZE notification");
            Iterator<MessageHandlerFactory> it = this._handlerFactoryMap.values().iterator();
            while (it.hasNext()) {
                it.next().reset();
            }
            synchronized (this._lock) {
                Iterator<MessageTaskInfo> it2 = this._taskMap.values().iterator();
                while (it2.hasNext()) {
                    cancelTask(it2.next()._task);
                }
                this._taskMap.clear();
            }
            return;
        }
        if (list == null || list.size() == 0) {
            LOG.info("No Messages to process");
            return;
        }
        Collections.sort(list, Message.CREATE_TIME_COMPARATOR);
        HelixManager manager = notificationContext.getManager();
        HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        ArrayList<MessageHandler> arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        String sessionId = manager.getSessionId();
        List<String> childNames = helixDataAccessor.getChildNames(keyBuilder.currentStates(str, sessionId));
        ArrayList arrayList3 = new ArrayList();
        ArrayList arrayList4 = new ArrayList();
        HashSet hashSet = new HashSet();
        for (Message message : list) {
            if (message.getMsgType().equalsIgnoreCase(Message.MessageType.NO_OP.toString())) {
                LOG.info("Dropping NO-OP message. mid: " + message.getId() + ", from: " + message.getMsgSrc());
                helixDataAccessor.removeProperty(message.getKey(keyBuilder, str));
            } else {
                String tgtSessionId = message.getTgtSessionId();
                if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*")) {
                    String str2 = "SessionId does NOT match. expected sessionId: " + sessionId + ", tgtSessionId in message: " + tgtSessionId + ", messageId: " + message.getMsgId();
                    LOG.warn(str2);
                    helixDataAccessor.removeProperty(message.getKey(keyBuilder, str));
                    this._statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class, str2, helixDataAccessor);
                } else if (Message.MessageState.NEW == message.getMsgState()) {
                    try {
                        MessageHandler createMessageHandler = createMessageHandler(message, notificationContext);
                        if (createMessageHandler != null) {
                            arrayList.add(createMessageHandler);
                            message.setMsgState(Message.MessageState.READ);
                            message.setReadTimeStamp(new Date().getTime());
                            message.setExecuteSessionId(notificationContext.getManager().getSessionId());
                            this._statusUpdateUtil.logInfo(message, HelixStateMachineEngine.class, "New Message", helixDataAccessor);
                            arrayList2.add(message);
                            if (!message.isControlerMsg() && message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.toString())) {
                                String resourceName = message.getResourceName();
                                if (!childNames.contains(resourceName) && !hashSet.contains(resourceName)) {
                                    hashSet.add(resourceName);
                                    arrayList3.add(keyBuilder.currentState(str, sessionId, resourceName));
                                    CurrentState currentState = new CurrentState(resourceName);
                                    currentState.setBucketSize(message.getBucketSize());
                                    currentState.setStateModelDefRef(message.getStateModelDef());
                                    currentState.setSessionId(sessionId);
                                    currentState.setBatchMessageMode(message.getBatchMessageMode());
                                    String stateModelFactoryName = message.getStateModelFactoryName();
                                    if (stateModelFactoryName != null) {
                                        currentState.setStateModelFactoryName(stateModelFactoryName);
                                    } else {
                                        currentState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
                                    }
                                    arrayList4.add(currentState);
                                }
                            }
                        }
                    } catch (Exception e) {
                        LOG.error("Failed to create message handler for " + message.getMsgId(), e);
                        this._statusUpdateUtil.logError(message, HelixStateMachineEngine.class, e, "Failed to create message handler for " + message.getMsgId() + ", exception: " + e, helixDataAccessor);
                        message.setMsgState(Message.MessageState.UNPROCESSABLE);
                        helixDataAccessor.removeProperty(message.getKey(keyBuilder, str));
                        LOG.error("Message cannot be proessed: " + message.getRecord(), e);
                    }
                } else if (LOG.isTraceEnabled()) {
                    LOG.trace("Message already read. msgId: " + message.getMsgId());
                }
            }
        }
        if (arrayList3.size() > 0) {
            try {
                helixDataAccessor.createChildren(arrayList3, arrayList4);
            } catch (Exception e2) {
                LOG.error("fail to create cur-state znodes for messages: " + arrayList2, e2);
            }
        }
        if (arrayList2.size() > 0) {
            updateMessageState(arrayList2, helixDataAccessor, str);
            for (MessageHandler messageHandler : arrayList) {
                scheduleTask(new HelixTask(messageHandler._message, notificationContext, messageHandler, this));
            }
        }
    }

    public MessageHandler createMessageHandler(Message message, NotificationContext notificationContext) {
        String str = message.getMsgType().toString();
        MessageHandlerFactory messageHandlerFactory = this._handlerFactoryMap.get(str);
        if (messageHandlerFactory == null) {
            LOG.warn("Fail to find message handler factory for type: " + str + " msgId: " + message.getMsgId());
            return null;
        }
        notificationContext.add(NotificationContext.MapKey.TASK_EXECUTOR.toString(), this);
        return messageHandlerFactory.createHandler(message, notificationContext);
    }

    @Override // org.apache.helix.messaging.handling.TaskExecutor
    public void shutdown() {
        LOG.info("shutting down TaskExecutor");
        this._timer.cancel();
        synchronized (this._lock) {
            for (String str : this._executorMap.keySet()) {
                LOG.info(this._executorMap.get(str).shutdownNow().size() + " tasks are still in the threadpool for msgType " + str);
            }
            for (String str2 : this._executorMap.keySet()) {
                try {
                    if (!this._executorMap.get(str2).awaitTermination(200L, TimeUnit.MILLISECONDS)) {
                        LOG.warn(str2 + " is not fully termimated in 200 MS");
                        System.out.println(str2 + " is not fully termimated in 200 MS");
                    }
                } catch (InterruptedException e) {
                    LOG.error("Interrupted", e);
                }
            }
        }
        this._monitor.shutDown();
        LOG.info("shutdown finished");
    }
}
