/*
 * Decompiled with CFR 0.152.
 */
package org.jbpm.executor.impl.jms;

import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple;
import com.arjuna.ats.jta.TransactionManager;
import com.arjuna.ats.jta.UserTransaction;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.XAConnectionFactory;
import javax.naming.InitialContext;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
import org.hornetq.jms.server.embedded.EmbeddedJMS;
import org.jboss.narayana.jta.jms.ConnectionFactoryProxy;
import org.jboss.narayana.jta.jms.TransactionHelper;
import org.jboss.narayana.jta.jms.TransactionHelperImpl;
import org.jbpm.executor.AsynchronousJobEvent;
import org.jbpm.executor.AsynchronousJobListener;
import org.jbpm.executor.ExecutorServiceFactory;
import org.jbpm.executor.impl.ClassCacheManager;
import org.jbpm.executor.impl.ExecutorImpl;
import org.jbpm.executor.impl.ExecutorServiceImpl;
import org.jbpm.executor.impl.jms.JmsAvailableJobsExecutor;
import org.jbpm.executor.test.CountDownAsyncJobListener;
import org.jbpm.test.util.ExecutorTestUtil;
import org.jbpm.test.util.PoolingDataSource;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.kie.api.executor.CommandContext;
import org.kie.api.executor.ExecutorService;
import org.kie.api.runtime.query.QueryContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsAvaiableJobExecutorTest {
    private static final Logger logger = LoggerFactory.getLogger(JmsAvaiableJobExecutorTest.class);
    private ConnectionFactory factory;
    private Queue queue;
    private EmbeddedJMS jmsServer;
    protected ExecutorService executorService;
    protected PoolingDataSource pds;
    protected EntityManagerFactory emf = null;

    @Before
    public void setUp() throws Exception {
        this.startHornetQServer();
        this.pds = ExecutorTestUtil.setupPoolingDataSource();
        this.emf = Persistence.createEntityManagerFactory((String)"org.jbpm.executor");
        this.executorService = ExecutorServiceFactory.newExecutorService((EntityManagerFactory)this.emf);
        ((ExecutorImpl)((ExecutorServiceImpl)this.executorService).getExecutor()).setConnectionFactory(this.factory);
        ((ExecutorImpl)((ExecutorServiceImpl)this.executorService).getExecutor()).setQueue(this.queue);
        this.executorService.setThreadPoolSize(0);
        this.executorService.setInterval(10000);
        this.executorService.init();
    }

    @After
    public void tearDown() throws Exception {
        this.executorService.clearAllRequests();
        this.executorService.clearAllErrors();
        this.executorService.destroy();
        if (this.emf != null) {
            this.emf.close();
        }
        this.pds.close();
        System.clearProperty("org.kie.executor.msg.length");
        System.clearProperty("org.kie.executor.stacktrace.length");
        this.stopHornetQServer();
    }

    protected CountDownAsyncJobListener configureListener(int threads) {
        CountDownAsyncJobListener countDownListener = new CountDownAsyncJobListener(threads);
        ((ExecutorServiceImpl)this.executorService).addAsyncJobListener((AsynchronousJobListener)countDownListener);
        return countDownListener;
    }

    @Test
    public void testAsyncAuditProducer() throws Exception {
        CountDownAsyncJobListener countDownListener = this.configureListener(1);
        CommandContext ctxCMD = new CommandContext();
        ctxCMD.setData("businessKey", (Object)UUID.randomUUID().toString());
        javax.transaction.UserTransaction ut = (javax.transaction.UserTransaction)InitialContext.doLookup("java:comp/UserTransaction");
        ut.begin();
        this.executorService.scheduleRequest("org.jbpm.executor.commands.PrintOutCommand", ctxCMD);
        ut.commit();
        MessageReceiver receiver = new MessageReceiver();
        receiver.receiveAndProcess(this.queue, countDownListener);
        List inErrorRequests = this.executorService.getInErrorRequests(new QueryContext());
        Assert.assertEquals((long)0L, (long)inErrorRequests.size());
        List queuedRequests = this.executorService.getQueuedRequests(new QueryContext());
        Assert.assertEquals((long)0L, (long)queuedRequests.size());
        List executedRequests = this.executorService.getCompletedRequests(new QueryContext());
        Assert.assertEquals((long)1L, (long)executedRequests.size());
    }

    @Test
    public void testAsyncAuditProducerPrioritizedJobs() throws Exception {
        CountDownAsyncJobListener countDownListener = this.configureListener(2);
        final ArrayList executedJobs = new ArrayList();
        ((ExecutorServiceImpl)this.executorService).addAsyncJobListener(new AsynchronousJobListener(){

            public void beforeJobScheduled(AsynchronousJobEvent event) {
            }

            public void beforeJobExecuted(AsynchronousJobEvent event) {
            }

            public void beforeJobCancelled(AsynchronousJobEvent event) {
            }

            public void afterJobScheduled(AsynchronousJobEvent event) {
            }

            public void afterJobExecuted(AsynchronousJobEvent event) {
                executedJobs.add(event.getJob().getKey());
            }

            public void afterJobCancelled(AsynchronousJobEvent event) {
            }
        });
        CommandContext ctxCMD = new CommandContext();
        ctxCMD.setData("businessKey", (Object)"low priority");
        ctxCMD.setData("priority", (Object)2);
        CommandContext ctxCMD2 = new CommandContext();
        ctxCMD2.setData("businessKey", (Object)"high priority");
        ctxCMD2.setData("priority", (Object)8);
        javax.transaction.UserTransaction ut = (javax.transaction.UserTransaction)InitialContext.doLookup("java:comp/UserTransaction");
        ut.begin();
        this.executorService.scheduleRequest("org.jbpm.executor.commands.PrintOutCommand", ctxCMD);
        this.executorService.scheduleRequest("org.jbpm.executor.commands.PrintOutCommand", ctxCMD2);
        ut.commit();
        MessageReceiver receiver = new MessageReceiver();
        receiver.receiveAndProcess(this.queue, countDownListener);
        List inErrorRequests = this.executorService.getInErrorRequests(new QueryContext());
        Assert.assertEquals((long)0L, (long)inErrorRequests.size());
        List queuedRequests = this.executorService.getQueuedRequests(new QueryContext());
        Assert.assertEquals((long)0L, (long)queuedRequests.size());
        List executedRequests = this.executorService.getCompletedRequests(new QueryContext());
        Assert.assertEquals((long)2L, (long)executedRequests.size());
        Assert.assertEquals((long)2L, (long)executedJobs.size());
        Assert.assertEquals((Object)"high priority", executedJobs.get(0));
        Assert.assertEquals((Object)"low priority", executedJobs.get(1));
    }

    @Test
    public void testAsyncAuditProducerNotExistingDeployment() throws Exception {
        CountDownAsyncJobListener countDownListener = this.configureListener(1);
        CommandContext ctxCMD = new CommandContext();
        ctxCMD.setData("businessKey", (Object)UUID.randomUUID().toString());
        ctxCMD.setData("deploymentId", (Object)"not-existing");
        javax.transaction.UserTransaction ut = (javax.transaction.UserTransaction)InitialContext.doLookup("java:comp/UserTransaction");
        ut.begin();
        this.executorService.scheduleRequest("org.jbpm.executor.commands.PrintOutCommand", ctxCMD);
        ut.commit();
        MessageReceiver receiver = new MessageReceiver();
        receiver.receiveAndProcess(this.queue, countDownListener, 3000L);
        List inErrorRequests = this.executorService.getInErrorRequests(new QueryContext());
        Assert.assertEquals((long)0L, (long)inErrorRequests.size());
        List queuedRequests = this.executorService.getQueuedRequests(new QueryContext());
        Assert.assertEquals((long)1L, (long)queuedRequests.size());
        List executedRequests = this.executorService.getCompletedRequests(new QueryContext());
        Assert.assertEquals((long)0L, (long)executedRequests.size());
    }

    private void startHornetQServer() throws Exception {
        this.jmsServer = new EmbeddedJMS();
        this.jmsServer.start();
        logger.debug("Started Embedded JMS Server");
        XAConnectionFactory connectionFactory = (XAConnectionFactory)this.jmsServer.lookup("ConnectionFactory");
        new InitialContext().rebind("java:comp/UserTransaction", (Object)UserTransaction.userTransaction());
        new InitialContext().rebind("java:comp/TransactionManager", (Object)TransactionManager.transactionManager());
        new InitialContext().rebind("java:comp/TransactionSynchronizationRegistry", (Object)new TransactionSynchronizationRegistryImple());
        this.factory = new ConnectionFactoryProxy(connectionFactory, (TransactionHelper)new TransactionHelperImpl(TransactionManager.transactionManager()));
        this.queue = (Queue)this.jmsServer.lookup("/queue/exampleQueue");
    }

    private void stopHornetQServer() throws Exception {
        this.jmsServer.stop();
        this.jmsServer = null;
    }

    private class MessageReceiver {
        private MessageReceiver() {
        }

        void receiveAndProcess(Queue queue, CountDownAsyncJobListener countDownListener) throws Exception {
            this.receiveAndProcess(queue, countDownListener, 100000L);
        }

        void receiveAndProcess(Queue queue, CountDownAsyncJobListener countDownListener, long waitTill) throws Exception {
            Connection qconnetion = JmsAvaiableJobExecutorTest.this.factory.createConnection();
            Session qsession = qconnetion.createSession(true, 1);
            MessageConsumer consumer = qsession.createConsumer((Destination)queue);
            qconnetion.start();
            JmsAvailableJobsExecutor jmsExecutor = new JmsAvailableJobsExecutor();
            jmsExecutor.setClassCacheManager(new ClassCacheManager());
            jmsExecutor.setExecutorStoreService(((ExecutorImpl)((ExecutorServiceImpl)JmsAvaiableJobExecutorTest.this.executorService).getExecutor()).getExecutorStoreService());
            jmsExecutor.setQueryService(((ExecutorServiceImpl)JmsAvaiableJobExecutorTest.this.executorService).getQueryService());
            jmsExecutor.setEventSupport(((ExecutorServiceImpl)JmsAvaiableJobExecutorTest.this.executorService).getEventSupport());
            consumer.setMessageListener((MessageListener)jmsExecutor);
            countDownListener.waitTillCompleted(waitTill);
            consumer.close();
            qsession.close();
            qconnetion.close();
        }

        public List<Message> receive(Queue queue) throws Exception {
            ArrayList<Message> messages = new ArrayList<Message>();
            Connection qconnetion = JmsAvaiableJobExecutorTest.this.factory.createConnection();
            Session qsession = qconnetion.createSession(true, 1);
            MessageConsumer consumer = qsession.createConsumer((Destination)queue);
            qconnetion.start();
            Message m = null;
            while ((m = consumer.receiveNoWait()) != null) {
                messages.add(m);
            }
            consumer.close();
            qsession.close();
            qconnetion.close();
            return messages;
        }
    }
}

