package org.apache.helix.manager.zk;

import java.io.StringReader;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.Criteria;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.messaging.AsyncCallback;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.StatusUpdate;
import org.apache.helix.util.StatusUpdateUtil;
import org.apache.http.HttpHeaders;
import org.apache.log4j.Logger;
import org.apache.lucene.analysis.shingle.ShingleFilter;
import org.codehaus.jackson.map.ObjectMapper;

/* loaded from: input_file:WEB-INF/lib/helix-core-0.6.5.jar:org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.class */
public class DefaultSchedulerMessageHandlerFactory implements MessageHandlerFactory {
    public static final String WAIT_ALL = "WAIT_ALL";
    public static final String SCHEDULER_MSG_ID = "SchedulerMessageId";
    public static final String SCHEDULER_TASK_QUEUE = "SchedulerTaskQueue";
    public static final String CONTROLLER_MSG_ID = "controllerMsgId";
    public static final int TASKQUEUE_BUCKET_NUM = 10;
    private static Logger _logger = Logger.getLogger(DefaultSchedulerMessageHandlerFactory.class);
    HelixManager _manager;

    /* loaded from: input_file:WEB-INF/lib/helix-core-0.6.5.jar:org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory$DefaultSchedulerMessageHandler.class */
    public static class DefaultSchedulerMessageHandler extends MessageHandler {
        HelixManager _manager;

        public DefaultSchedulerMessageHandler(Message message, NotificationContext notificationContext, HelixManager helixManager) {
            super(message, notificationContext);
            this._manager = helixManager;
        }

        void handleMessageUsingScheduledTaskQueue(Criteria criteria, Message message, String str) {
            HelixDataAccessor helixDataAccessor = this._manager.getHelixDataAccessor();
            PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
            HashMap hashMap = new HashMap();
            hashMap.put("MessageCount", "0");
            Map<InstanceType, List<Message>> generateMessage = this._manager.getMessagingService().generateMessage(criteria, message);
            if (generateMessage.size() > 0) {
                String simpleField = this._message.getRecord().getSimpleField("SchedulerTaskQueue");
                if (simpleField == null) {
                    throw new HelixException("SchedulerTaskMessage need to have SchedulerTaskQueue specified.");
                }
                IdealState idealState = new IdealState(simpleField);
                idealState.setBucketSize(10);
                idealState.setStateModelDefRef("SchedulerTaskQueue");
                synchronized (this._manager) {
                    IdealState idealState2 = (IdealState) this._manager.getHelixDataAccessor().getProperty(helixDataAccessor.keyBuilder().idealStates(idealState.getId()));
                    int findTopPartitionId = idealState2 != null ? findTopPartitionId(idealState2) + 1 : 0;
                    List<Message> list = (List) generateMessage.values().toArray()[0];
                    for (Message message2 : list) {
                        String str2 = simpleField + ShingleFilter.DEFAULT_FILLER_TOKEN + findTopPartitionId;
                        findTopPartitionId++;
                        String tgtName = message2.getTgtName();
                        idealState.setPartitionState(str2, tgtName, "COMPLETED");
                        message2.getRecord().setSimpleField(tgtName, "COMPLETED");
                        message2.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID, str);
                        LinkedList linkedList = new LinkedList();
                        linkedList.add(tgtName);
                        idealState.getRecord().setListField(str2, linkedList);
                        idealState.getRecord().setMapField(str2, message2.getRecord().getSimpleFields());
                        DefaultSchedulerMessageHandlerFactory._logger.info("Scheduling for controllerMsg " + str + " , sending task " + str2 + " " + message2.getMsgId() + " to " + tgtName);
                        if (DefaultSchedulerMessageHandlerFactory._logger.isDebugEnabled()) {
                            DefaultSchedulerMessageHandlerFactory._logger.debug(message2.getRecord().getSimpleFields());
                        }
                    }
                    this._manager.getHelixDataAccessor().updateProperty(helixDataAccessor.keyBuilder().idealStates(idealState.getId()), idealState);
                    hashMap.put("MessageCount", "" + list.size());
                }
            }
            ZNRecord record = helixDataAccessor.getProperty(keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), this._message.getMsgId())).getRecord();
            record.getMapFields().put("SentMessageCount", hashMap);
            helixDataAccessor.updateProperty(keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), this._message.getMsgId()), new StatusUpdate(record));
        }

        private int findTopPartitionId(IdealState idealState) {
            int i = 0;
            for (String str : idealState.getPartitionSet()) {
                try {
                    int parseInt = Integer.parseInt(str.substring(str.lastIndexOf(95) + 1));
                    if (i < parseInt) {
                        i = parseInt;
                    }
                } catch (Exception e) {
                    DefaultSchedulerMessageHandlerFactory._logger.error("", e);
                }
            }
            return i;
        }

        @Override // org.apache.helix.messaging.handling.MessageHandler
        public HelixTaskResult handleMessage() throws InterruptedException {
            String msgType = this._message.getMsgType();
            HelixTaskResult helixTaskResult = new HelixTaskResult();
            if (!msgType.equals(Message.MessageType.SCHEDULER_MSG.toString())) {
                throw new HelixException("Unexpected msg type for message " + this._message.getMsgId() + " type:" + this._message.getMsgType());
            }
            int i = -1;
            if (this._message.getRecord().getSimpleFields().containsKey("TIMEOUT")) {
                try {
                    i = Integer.parseInt(this._message.getRecord().getSimpleFields().get("TIMEOUT"));
                } catch (Exception e) {
                }
            }
            ZNRecord zNRecord = new ZNRecord("templateMessage");
            zNRecord.getSimpleFields().putAll(this._message.getRecord().getMapField("MessageTemplate"));
            Message message = new Message(zNRecord);
            try {
                Criteria criteria = (Criteria) new ObjectMapper().readValue(new StringReader(this._message.getRecord().getSimpleField("Criteria")), Criteria.class);
                DefaultSchedulerMessageHandlerFactory._logger.info("Scheduler sending message, criteria:" + criteria);
                boolean z = false;
                if (this._message.getRecord().getSimpleField(DefaultSchedulerMessageHandlerFactory.WAIT_ALL) != null) {
                    try {
                        z = Boolean.parseBoolean(this._message.getRecord().getSimpleField(DefaultSchedulerMessageHandlerFactory.WAIT_ALL));
                    } catch (Exception e2) {
                        DefaultSchedulerMessageHandlerFactory._logger.warn("", e2);
                    }
                }
                boolean containsKey = this._message.getRecord().getSimpleFields().containsKey("SchedulerTaskQueue");
                if (InstanceType.PARTICIPANT == criteria.getRecipientInstanceType() && containsKey) {
                    handleMessageUsingScheduledTaskQueue(criteria, message, this._message.getMsgId());
                    helixTaskResult.setSuccess(true);
                    helixTaskResult.getTaskResultMap().put(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID, this._message.getMsgId());
                    helixTaskResult.getTaskResultMap().put("ControllerResult", "msg " + this._message.getMsgId() + " from " + this._message.getMsgSrc() + " processed");
                    return helixTaskResult;
                }
                DefaultSchedulerMessageHandlerFactory._logger.info("Scheduler sending message to Controller");
                SchedulerAsyncCallback schedulerAsyncCallback = new SchedulerAsyncCallback(this._message, this._manager);
                int sendAndWait = z ? this._manager.getMessagingService().sendAndWait(criteria, message, schedulerAsyncCallback, i) : this._manager.getMessagingService().send(criteria, message, schedulerAsyncCallback, i);
                HelixDataAccessor helixDataAccessor = this._manager.getHelixDataAccessor();
                PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
                HashMap hashMap = new HashMap();
                hashMap.put("MessageCount", "" + sendAndWait);
                ZNRecord record = helixDataAccessor.getProperty(keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), this._message.getMsgId())).getRecord();
                record.getMapFields().put("SentMessageCount", hashMap);
                helixDataAccessor.setProperty(keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), this._message.getMsgId()), new StatusUpdate(record));
                helixTaskResult.getTaskResultMap().put("ControllerResult", "msg " + this._message.getMsgId() + " from " + this._message.getMsgSrc() + " processed");
                helixTaskResult.getTaskResultMap().put(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID, this._message.getMsgId());
                helixTaskResult.setSuccess(true);
                return helixTaskResult;
            } catch (Exception e3) {
                DefaultSchedulerMessageHandlerFactory._logger.error("", e3);
                helixTaskResult.setException(e3);
                helixTaskResult.setSuccess(false);
                return helixTaskResult;
            }
        }

        @Override // org.apache.helix.messaging.handling.MessageHandler
        public void onError(Exception exc, MessageHandler.ErrorCode errorCode, MessageHandler.ErrorType errorType) {
            DefaultSchedulerMessageHandlerFactory._logger.error("Message handling pipeline get an exception. MsgId:" + this._message.getMsgId(), exc);
        }
    }

    /* loaded from: input_file:WEB-INF/lib/helix-core-0.6.5.jar:org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory$SchedulerAsyncCallback.class */
    public static class SchedulerAsyncCallback extends AsyncCallback {
        Message _originalMessage;
        HelixManager _manager;
        StatusUpdateUtil _statusUpdateUtil = new StatusUpdateUtil();
        final Map<String, Map<String, String>> _resultSummaryMap = new ConcurrentHashMap();

        public SchedulerAsyncCallback(Message message, HelixManager helixManager) {
            this._originalMessage = message;
            this._manager = helixManager;
        }

        @Override // org.apache.helix.messaging.AsyncCallback
        public void onTimeOut() {
            DefaultSchedulerMessageHandlerFactory._logger.info("Scheduler msg timeout " + this._originalMessage.getMsgId() + " timout with " + this._timeout + " Ms");
            this._statusUpdateUtil.logError(this._originalMessage, SchedulerAsyncCallback.class, "Task timeout", this._manager.getHelixDataAccessor());
            addSummary(this._resultSummaryMap, this._originalMessage, this._manager, true);
        }

        @Override // org.apache.helix.messaging.AsyncCallback
        public void onReplyMessage(Message message) {
            DefaultSchedulerMessageHandlerFactory._logger.info("Update for scheduler msg " + this._originalMessage.getMsgId() + " Message " + message.getMsgSrc() + " id " + message.getCorrelationId() + " completed");
            this._resultSummaryMap.put("MessageResult " + message.getMsgSrc() + " " + UUID.randomUUID(), message.getResultMap());
            if (isDone()) {
                DefaultSchedulerMessageHandlerFactory._logger.info("Scheduler msg " + this._originalMessage.getMsgId() + " completed");
                this._statusUpdateUtil.logInfo(this._originalMessage, SchedulerAsyncCallback.class, "Scheduler task completed", this._manager.getHelixDataAccessor());
                addSummary(this._resultSummaryMap, this._originalMessage, this._manager, false);
            }
        }

        private void addSummary(Map<String, Map<String, String>> map, Message message, HelixManager helixManager, boolean z) {
            TreeMap treeMap = new TreeMap();
            treeMap.put("TotalMessages:", "" + map.size());
            treeMap.put(HttpHeaders.TIMEOUT, "" + z);
            map.put("Summary", treeMap);
            HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
            PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
            ZNRecord record = helixDataAccessor.getProperty(keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), message.getMsgId())).getRecord();
            record.getMapFields().putAll(map);
            helixDataAccessor.setProperty(keyBuilder.controllerTaskStatus(Message.MessageType.SCHEDULER_MSG.toString(), message.getMsgId()), new StatusUpdate(record));
        }
    }

    public DefaultSchedulerMessageHandlerFactory(HelixManager helixManager) {
        this._manager = helixManager;
    }

    @Override // org.apache.helix.messaging.handling.MessageHandlerFactory
    public MessageHandler createHandler(Message message, NotificationContext notificationContext) {
        if (message.getMsgType().equals(getMessageType())) {
            return new DefaultSchedulerMessageHandler(message, notificationContext, this._manager);
        }
        throw new HelixException("Unexpected msg type for message " + message.getMsgId() + " type:" + message.getMsgType());
    }

    @Override // org.apache.helix.messaging.handling.MessageHandlerFactory
    public String getMessageType() {
        return Message.MessageType.SCHEDULER_MSG.toString();
    }

    @Override // org.apache.helix.messaging.handling.MessageHandlerFactory
    public void reset() {
    }
}
