/*
 * Decompiled with CFR 0.152.
 */
package org.jbpm.executor.impl;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import org.apache.commons.io.input.ClassLoaderObjectInputStream;
import org.drools.core.process.instance.WorkItem;
import org.drools.core.time.TimeUtils;
import org.jbpm.executor.ExecutorNotStartedException;
import org.jbpm.executor.impl.event.ExecutorEventSupport;
import org.kie.api.executor.CommandContext;
import org.kie.api.executor.ExecutorStoreService;
import org.kie.api.executor.RequestInfo;
import org.kie.api.executor.STATUS;
import org.kie.internal.executor.api.Executor;
import org.kie.internal.runtime.manager.InternalRuntimeManager;
import org.kie.internal.runtime.manager.RuntimeManagerRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutorImpl
implements Executor {
    private static final Logger logger = LoggerFactory.getLogger(ExecutorImpl.class);
    private static final int DEFAULT_PRIORITY = 5;
    private static final int MAX_PRIORITY = 9;
    private static final int MIN_PRIORITY = 0;
    private static final int INCREMENT_INITIAL_DELAY = 2000;
    private ExecutorStoreService executorStoreService;
    private List<ScheduledFuture<?>> handle = new ArrayList();
    private int threadPoolSize = Integer.parseInt(System.getProperty("org.kie.executor.pool.size", "1"));
    private int retries = Integer.parseInt(System.getProperty("org.kie.executor.retry.count", "3"));
    private int interval = Integer.parseInt(System.getProperty("org.kie.executor.interval", "3"));
    private int initialDelay = Integer.parseInt(System.getProperty("org.kie.executor.initial.delay", "100"));
    private TimeUnit timeunit = TimeUnit.valueOf(System.getProperty("org.kie.executor.timeunit", "SECONDS"));
    private boolean useJMS = Boolean.parseBoolean(System.getProperty("org.kie.executor.jms", "true"));
    private String connectionFactoryName = System.getProperty("org.kie.executor.jms.cf", "java:/JmsXA");
    private String queueName = System.getProperty("org.kie.executor.jms.queue", "queue/KIE.EXECUTOR");
    private boolean transacted = Boolean.parseBoolean(System.getProperty("org.kie.executor.jms.transacted", "false"));
    private ConnectionFactory connectionFactory;
    private Queue queue;
    private ScheduledExecutorService scheduler;
    private ExecutorEventSupport eventSupport = new ExecutorEventSupport();

    public void setEventSupport(ExecutorEventSupport eventSupport) {
        this.eventSupport = eventSupport;
    }

    public void setExecutorStoreService(ExecutorStoreService executorStoreService) {
        this.executorStoreService = executorStoreService;
    }

    public ExecutorStoreService getExecutorStoreService() {
        return this.executorStoreService;
    }

    public String getConnectionFactoryName() {
        return this.connectionFactoryName;
    }

    public void setConnectionFactoryName(String connectionFactoryName) {
        this.connectionFactoryName = connectionFactoryName;
    }

    public String getQueueName() {
        return this.queueName;
    }

    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }

    public ConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    public void setConnectionFactory(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public Queue getQueue() {
        return this.queue;
    }

    public void setQueue(Queue queue) {
        this.queue = queue;
    }

    public int getInterval() {
        return this.interval;
    }

    public void setInterval(int interval) {
        this.interval = interval;
    }

    public int getRetries() {
        return this.retries;
    }

    public void setRetries(int retries) {
        this.retries = retries;
    }

    public int getThreadPoolSize() {
        return this.threadPoolSize;
    }

    public void setThreadPoolSize(int threadPoolSize) {
        this.threadPoolSize = threadPoolSize;
    }

    public TimeUnit getTimeunit() {
        return this.timeunit;
    }

    public void setTimeunit(TimeUnit timeunit) {
        this.timeunit = timeunit;
    }

    public void init() {
        if (!"true".equalsIgnoreCase(System.getProperty("org.kie.executor.disabled"))) {
            logger.info("Starting Executor Component ...\n \t - Thread Pool Size: {}\n \t - Interval: {} {} \n \t - Retries per Request: {}\n", new Object[]{this.threadPoolSize, this.interval, this.timeunit.toString(), this.retries});
            int delayIncremental = this.initialDelay;
            this.scheduler = this.getScheduledExecutorService();
            for (int i = 0; i < this.threadPoolSize; ++i) {
                long delay = 2000 + delayIncremental;
                long interval = TimeUnit.MILLISECONDS.convert(this.interval, this.timeunit);
                logger.debug("Starting executor thread with initial delay {} interval {} and time unit {}", new Object[]{delay, interval, TimeUnit.MILLISECONDS});
                this.handle.add(this.scheduler.scheduleAtFixedRate(this.executorStoreService.buildExecutorRunnable(), delay, interval, TimeUnit.MILLISECONDS));
                delayIncremental += 2000;
            }
            if (this.useJMS) {
                try {
                    InitialContext ctx = new InitialContext();
                    if (this.connectionFactory == null) {
                        this.connectionFactory = (ConnectionFactory)ctx.lookup(this.connectionFactoryName);
                    }
                    if (this.queue == null) {
                        this.queue = (Queue)ctx.lookup(this.queueName);
                    }
                    logger.info("Executor JMS based support successfully activated on queue {}", (Object)this.queue);
                }
                catch (Exception e) {
                    logger.warn("Disabling JMS support in executor because: unable to initialize JMS configuration for executor due to {}", (Object)e.getMessage());
                    logger.debug("JMS support executor failed due to {}", (Object)e.getMessage(), (Object)e);
                    this.useJMS = false;
                }
            }
        } else {
            throw new ExecutorNotStartedException();
        }
    }

    public void init(ThreadFactory threadFactory) {
        if (!"true".equalsIgnoreCase(System.getProperty("org.kie.executor.disabled"))) {
            logger.info("Starting Executor Component ...\n \t - Thread Pool Size: {}\n \t - Interval: {} Seconds\n \t - Retries per Request: {}\n", new Object[]{this.threadPoolSize, this.interval, this.retries});
            int delayIncremental = this.initialDelay;
            this.scheduler = this.getScheduledExecutorService();
            for (int i = 0; i < this.threadPoolSize; ++i) {
                long delay = 2000 + delayIncremental;
                long interval = TimeUnit.MILLISECONDS.convert(this.interval, this.timeunit);
                logger.debug("Starting executor thread with initial delay {} interval {} and time unit {}", new Object[]{delay, interval, TimeUnit.MILLISECONDS});
                this.handle.add(this.scheduler.scheduleAtFixedRate(this.executorStoreService.buildExecutorRunnable(), delay, interval, TimeUnit.MILLISECONDS));
                delayIncremental += 2000;
            }
        } else {
            throw new ExecutorNotStartedException();
        }
    }

    public void destroy() {
        logger.info(" >>>>> Destroying Executor !!!");
        if (this.handle != null) {
            for (ScheduledFuture<?> h : this.handle) {
                h.cancel(false);
            }
        }
        if (this.scheduler != null) {
            this.scheduler.shutdown();
            try {
                boolean terminated = this.scheduler.awaitTermination(60L, TimeUnit.SECONDS);
                if (!terminated) {
                    logger.warn("Timeout occured while waiting on all jobs to be terminated");
                    this.scheduler.shutdownNow();
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    public Long scheduleRequest(String commandId, CommandContext ctx) {
        return this.scheduleRequest(commandId, new Date(), ctx);
    }

    public Long scheduleRequest(String commandId, Date date, CommandContext ctx) {
        if (ctx == null) {
            throw new IllegalStateException("A Context Must Be Provided! ");
        }
        String businessKey = (String)ctx.getData("businessKey");
        org.jbpm.executor.entities.RequestInfo requestInfo = new org.jbpm.executor.entities.RequestInfo();
        requestInfo.setCommandName(commandId);
        requestInfo.setKey(businessKey);
        requestInfo.setStatus(STATUS.QUEUED);
        requestInfo.setTime(date);
        requestInfo.setMessage("Ready to execute");
        requestInfo.setDeploymentId((String)ctx.getData("deploymentId"));
        if (ctx.getData("processInstanceId") != null) {
            requestInfo.setProcessInstanceId(((Number)ctx.getData("processInstanceId")).longValue());
        }
        requestInfo.setOwner((String)ctx.getData("owner"));
        if (ctx.getData("retries") != null) {
            requestInfo.setRetries(Integer.valueOf(String.valueOf(ctx.getData("retries"))));
        } else {
            requestInfo.setRetries(this.retries);
        }
        int priority = 5;
        if (ctx.getData("priority") != null) {
            priority = (Integer)ctx.getData("priority");
            if (priority < 0) {
                logger.warn("Priority {} is not valid (cannot be less than {}) setting it to {}", new Object[]{0, 0, priority});
                priority = 0;
            } else if (priority > 9) {
                logger.warn("Priority {} is not valid (cannot be more than {}) setting it to {}", new Object[]{9, 9, priority});
                priority = 9;
            }
        }
        requestInfo.setPriority(priority);
        if (ctx.getData("retryDelay") != null) {
            String[] timeExpressions;
            ArrayList<Long> retryDelay = new ArrayList<Long>();
            for (String timeExpr : timeExpressions = ((String)ctx.getData("retryDelay")).split(",")) {
                retryDelay.add(TimeUtils.parseTimeString((String)timeExpr));
            }
            ctx.setData("retryDelay", retryDelay);
        }
        if (ctx != null) {
            try {
                ByteArrayOutputStream bout = new ByteArrayOutputStream();
                ObjectOutputStream oout = new ObjectOutputStream(bout);
                oout.writeObject(ctx);
                requestInfo.setRequestData(bout.toByteArray());
            }
            catch (IOException e) {
                logger.warn("Error serializing context data", (Throwable)e);
                requestInfo.setRequestData(null);
            }
        }
        this.eventSupport.fireBeforeJobScheduled((RequestInfo)requestInfo, null);
        try {
            this.executorStoreService.persistRequest((RequestInfo)requestInfo);
            if (this.useJMS) {
                long currentTimestamp = System.currentTimeMillis();
                if (currentTimestamp >= date.getTime()) {
                    logger.debug("Sending JMS message to trigger job execution for job {}", (Object)requestInfo.getId());
                    this.sendMessage(String.valueOf(requestInfo.getId()), priority);
                } else {
                    logger.debug("JMS message not sent for job {} as the job should not be executed immediately but at {}", (Object)requestInfo.getId(), (Object)date);
                }
            }
            logger.debug("Scheduled request for Command: {} - requestId: {} with {} retries", new Object[]{commandId, requestInfo.getId(), requestInfo.getRetries()});
            this.eventSupport.fireAfterJobScheduled((RequestInfo)requestInfo, null);
        }
        catch (Throwable e) {
            this.eventSupport.fireAfterJobScheduled((RequestInfo)requestInfo, e);
        }
        return requestInfo.getId();
    }

    public void cancelRequest(Long requestId) {
        logger.debug("Before - Cancelling Request with Id: {}", (Object)requestId);
        org.jbpm.executor.entities.RequestInfo job = (org.jbpm.executor.entities.RequestInfo)this.executorStoreService.findRequest(requestId);
        this.eventSupport.fireBeforeJobCancelled((RequestInfo)job, null);
        try {
            this.executorStoreService.removeRequest(requestId);
            this.eventSupport.fireAfterJobCancelled((RequestInfo)job, null);
        }
        catch (Throwable e) {
            this.eventSupport.fireAfterJobCancelled((RequestInfo)job, e);
        }
        logger.debug("After - Cancelling Request with Id: {}", (Object)requestId);
    }

    protected void sendMessage(String messageBody, int priority) {
        if (this.connectionFactory == null && this.queue == null) {
            throw new IllegalStateException("ConnectionFactory and Queue cannot be null");
        }
        Connection queueConnection = null;
        Session queueSession = null;
        MessageProducer producer = null;
        try {
            queueConnection = this.connectionFactory.createConnection();
            queueSession = queueConnection.createSession(this.transacted, 1);
            TextMessage message = queueSession.createTextMessage(messageBody);
            producer = queueSession.createProducer((Destination)this.queue);
            producer.setPriority(priority);
            queueConnection.start();
            producer.send((Message)message);
        }
        catch (Exception e) {
            throw new RuntimeException("Error when sending JMS message with executor job request", e);
        }
        finally {
            if (producer != null) {
                try {
                    producer.close();
                }
                catch (JMSException e) {
                    logger.warn("Error when closing producer", (Throwable)e);
                }
            }
            if (queueSession != null) {
                try {
                    queueSession.close();
                }
                catch (JMSException e) {
                    logger.warn("Error when closing queue session", (Throwable)e);
                }
            }
            if (queueConnection != null) {
                try {
                    queueConnection.close();
                }
                catch (JMSException e) {
                    logger.warn("Error when closing queue connection", (Throwable)e);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateRequestData(Long requestId, Map<String, Object> data) {
        WorkItem workItem;
        CommandContext ctx;
        org.jbpm.executor.entities.RequestInfo request;
        block16: {
            logger.debug("About to update request {} data with following {}", (Object)requestId, data);
            request = (org.jbpm.executor.entities.RequestInfo)this.executorStoreService.findRequest(requestId);
            if (request.getStatus().equals((Object)STATUS.CANCELLED) || request.getStatus().equals((Object)STATUS.DONE) || request.getStatus().equals((Object)STATUS.RUNNING)) {
                throw new IllegalStateException("Request data can't be updated when request is in status " + request.getStatus());
            }
            ctx = null;
            ClassLoader cl = this.getClassLoader(request.getDeploymentId());
            try {
                logger.debug("Processing Request Id: {}, status {} command {}", new Object[]{request.getId(), request.getStatus(), request.getCommandName()});
                byte[] reqData = request.getRequestData();
                if (reqData == null) break block16;
                try (Iterator<Map.Entry<String, Object>> in = null;){
                    in = new ClassLoaderObjectInputStream(cl, (InputStream)new ByteArrayInputStream(reqData));
                    ctx = (CommandContext)((ObjectInputStream)((Object)in)).readObject();
                }
            }
            catch (Exception e) {
                logger.error("Unexpected error when reading request data", (Throwable)e);
                throw new RuntimeException(e);
            }
        }
        if (ctx == null) {
            ctx = new CommandContext();
        }
        if ((workItem = (WorkItem)ctx.getData("workItem")) != null) {
            logger.debug("Updating work item {} parameters with data {}", (Object)workItem, data);
            for (Map.Entry<String, Object> entry : data.entrySet()) {
                workItem.setParameter(entry.getKey(), entry.getValue());
            }
        } else {
            logger.debug("Updating request context with data {}", data);
            for (Map.Entry<String, Object> entry : data.entrySet()) {
                ctx.setData(entry.getKey(), entry.getValue());
            }
        }
        try {
            ByteArrayOutputStream bout = new ByteArrayOutputStream();
            ObjectOutputStream oout = new ObjectOutputStream(bout);
            oout.writeObject(ctx);
            request.setRequestData(bout.toByteArray());
        }
        catch (IOException e) {
            throw new RuntimeException("Unable to save updated request data", e);
        }
        this.executorStoreService.updateRequest((RequestInfo)request);
        logger.debug("Request {} data updated successfully", (Object)requestId);
    }

    protected ClassLoader getClassLoader(String deploymentId) {
        ClassLoader cl = Thread.currentThread().getContextClassLoader();
        if (deploymentId == null) {
            return cl;
        }
        InternalRuntimeManager manager = (InternalRuntimeManager)RuntimeManagerRegistry.get().getManager(deploymentId);
        if (manager != null && manager.getEnvironment().getClassLoader() != null) {
            cl = manager.getEnvironment().getClassLoader();
        }
        return cl;
    }

    protected ScheduledExecutorService getScheduledExecutorService() {
        return Executors.newScheduledThreadPool(this.threadPoolSize);
    }
}

