/*
 * Decompiled with CFR 0.152.
 */
package org.jbpm.process.audit.jms;

import bitronix.tm.resource.jms.PoolingConnectionFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.XAConnectionFactory;
import javax.naming.InitialContext;
import javax.persistence.EntityManagerFactory;
import javax.transaction.UserTransaction;
import org.hornetq.jms.server.embedded.EmbeddedJMS;
import org.jbpm.persistence.util.PersistenceUtil;
import org.jbpm.process.audit.AbstractAuditLogServiceTest;
import org.jbpm.process.audit.AbstractAuditLogger;
import org.jbpm.process.audit.AuditLoggerFactory;
import org.jbpm.process.audit.JPAAuditLogService;
import org.jbpm.process.audit.NodeInstanceLog;
import org.jbpm.process.audit.VariableInstanceLog;
import org.jbpm.process.audit.jms.AsyncAuditLogProducer;
import org.jbpm.process.audit.jms.AsyncAuditLogReceiver;
import org.jbpm.process.audit.jms.BitronixHornetQXAConnectionFactory;
import org.jbpm.process.instance.impl.demo.SystemOutWorkItemHandler;
import org.jbpm.test.util.AbstractBaseTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.kie.api.KieBase;
import org.kie.api.event.process.ProcessEventListener;
import org.kie.api.runtime.Environment;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.process.ProcessInstance;
import org.kie.api.runtime.process.WorkItemHandler;
import org.kie.internal.KnowledgeBase;
import org.kie.internal.runtime.StatefulKnowledgeSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncAuditLogProducerTest
extends AbstractBaseTest {
    private static final Logger logger = LoggerFactory.getLogger(AsyncAuditLogProducerTest.class);
    private HashMap<String, Object> context;
    private ConnectionFactory factory;
    private Queue queue;
    private EmbeddedJMS jmsServer;

    @Before
    public void setup() throws Exception {
        this.startHornetQServer();
        this.context = PersistenceUtil.setupWithPoolingDataSource((String)"org.jbpm.persistence.jpa");
    }

    @After
    public void tearDown() throws Exception {
        PersistenceUtil.cleanUp(this.context);
        this.stopHornetQServer();
    }

    @Test
    public void testAsyncAuditProducer() throws Exception {
        Environment env = PersistenceUtil.createEnvironment(this.context);
        KnowledgeBase kbase = AbstractAuditLogServiceTest.createKnowledgeBase();
        StatefulKnowledgeSession session = this.createSession(kbase, env);
        HashMap<String, Boolean> jmsProps = new HashMap<String, Boolean>();
        jmsProps.put("jbpm.audit.jms.transacted", false);
        jmsProps.put("jbpm.audit.jms.connection.factory", (Boolean)this.factory);
        jmsProps.put("jbpm.audit.jms.queue", (Boolean)this.queue);
        AbstractAuditLogger logger = AuditLoggerFactory.newInstance((AuditLoggerFactory.Type)AuditLoggerFactory.Type.JMS, (KieSession)session, jmsProps);
        Assert.assertNotNull((Object)logger);
        Assert.assertTrue((boolean)(logger instanceof AsyncAuditLogProducer));
        long processInstanceId = session.startProcess("com.sample.ruleflow").getId();
        MessageReceiver receiver = new MessageReceiver();
        List<Message> messages = receiver.receive(this.queue);
        Assert.assertNotNull(messages);
        Assert.assertEquals((long)11L, (long)messages.size());
    }

    @Test
    public void testAsyncAuditProducerTransactional() throws Exception {
        UserTransaction ut = (UserTransaction)InitialContext.doLookup("java:comp/UserTransaction");
        ut.begin();
        Environment env = PersistenceUtil.createEnvironment(this.context);
        KnowledgeBase kbase = AbstractAuditLogServiceTest.createKnowledgeBase();
        StatefulKnowledgeSession session = this.createSession(kbase, env);
        HashMap<String, Boolean> jmsProps = new HashMap<String, Boolean>();
        jmsProps.put("jbpm.audit.jms.transacted", true);
        jmsProps.put("jbpm.audit.jms.connection.factory", (Boolean)this.factory);
        jmsProps.put("jbpm.audit.jms.queue", (Boolean)this.queue);
        AbstractAuditLogger logger = AuditLoggerFactory.newInstance((AuditLoggerFactory.Type)AuditLoggerFactory.Type.JMS, (KieSession)session, jmsProps);
        Assert.assertNotNull((Object)logger);
        Assert.assertTrue((boolean)(logger instanceof AsyncAuditLogProducer));
        long processInstanceId = session.startProcess("com.sample.ruleflow").getId();
        ut.commit();
        MessageReceiver receiver = new MessageReceiver();
        List<Message> messages = receiver.receive(this.queue);
        Assert.assertNotNull(messages);
        Assert.assertEquals((long)11L, (long)messages.size());
    }

    @Test
    public void testAsyncAuditProducerTransactionalWithRollback() throws Exception {
        UserTransaction ut = (UserTransaction)InitialContext.doLookup("java:comp/UserTransaction");
        ut.begin();
        Environment env = PersistenceUtil.createEnvironment(this.context);
        KnowledgeBase kbase = AbstractAuditLogServiceTest.createKnowledgeBase();
        StatefulKnowledgeSession session = this.createSession(kbase, env);
        HashMap<String, Boolean> jmsProps = new HashMap<String, Boolean>();
        jmsProps.put("jbpm.audit.jms.transacted", true);
        jmsProps.put("jbpm.audit.jms.connection.factory", (Boolean)this.factory);
        jmsProps.put("jbpm.audit.jms.queue", (Boolean)this.queue);
        AbstractAuditLogger logger = AuditLoggerFactory.newInstance((AuditLoggerFactory.Type)AuditLoggerFactory.Type.JMS, (KieSession)session, jmsProps);
        Assert.assertNotNull((Object)logger);
        Assert.assertTrue((boolean)(logger instanceof AsyncAuditLogProducer));
        long processInstanceId = session.startProcess("com.sample.ruleflow").getId();
        ut.rollback();
        MessageReceiver receiver = new MessageReceiver();
        List<Message> messages = receiver.receive(this.queue);
        Assert.assertNotNull(messages);
        Assert.assertEquals((long)0L, (long)messages.size());
    }

    @Test
    public void testAsyncAuditProducerNonTransactionalWithRollback() throws Exception {
        UserTransaction ut = (UserTransaction)InitialContext.doLookup("java:comp/UserTransaction");
        ut.begin();
        Environment env = PersistenceUtil.createEnvironment(this.context);
        KnowledgeBase kbase = AbstractAuditLogServiceTest.createKnowledgeBase();
        StatefulKnowledgeSession session = this.createSession(kbase, env);
        HashMap<String, Object> jmsProps = new HashMap<String, Object>();
        jmsProps.put("jbpm.audit.jms.transacted", false);
        jmsProps.put("jbpm.audit.jms.connection.factory", this.jmsServer.lookup("ConnectionFactory"));
        jmsProps.put("jbpm.audit.jms.queue", this.queue);
        AbstractAuditLogger logger = AuditLoggerFactory.newInstance((AuditLoggerFactory.Type)AuditLoggerFactory.Type.JMS, (KieSession)session, jmsProps);
        Assert.assertNotNull((Object)logger);
        Assert.assertTrue((boolean)(logger instanceof AsyncAuditLogProducer));
        long processInstanceId = session.startProcess("com.sample.ruleflow").getId();
        ut.rollback();
        MessageReceiver receiver = new MessageReceiver();
        List<Message> messages = receiver.receive(this.queue);
        Assert.assertNotNull(messages);
        Assert.assertEquals((long)11L, (long)messages.size());
    }

    @Test
    public void testAsyncAuditLoggerComplete() throws Exception {
        Environment env = PersistenceUtil.createEnvironment(this.context);
        KnowledgeBase kbase = AbstractAuditLogServiceTest.createKnowledgeBase();
        StatefulKnowledgeSession session = this.createSession(kbase, env);
        HashMap<String, Boolean> jmsProps = new HashMap<String, Boolean>();
        jmsProps.put("jbpm.audit.jms.transacted", false);
        jmsProps.put("jbpm.audit.jms.connection.factory", (Boolean)this.factory);
        jmsProps.put("jbpm.audit.jms.queue", (Boolean)this.queue);
        AbstractAuditLogger logger = AuditLoggerFactory.newInstance((AuditLoggerFactory.Type)AuditLoggerFactory.Type.JMS, (KieSession)session, jmsProps);
        Assert.assertNotNull((Object)logger);
        Assert.assertTrue((boolean)(logger instanceof AsyncAuditLogProducer));
        ProcessInstance processInstance = session.startProcess("com.sample.ruleflow");
        MessageReceiver receiver = new MessageReceiver();
        receiver.receiveAndProcess(this.queue, (EntityManagerFactory)env.get("org.kie.api.persistence.jpa.EntityManagerFactory"));
        JPAAuditLogService logService = new JPAAuditLogService(env);
        List processInstances = logService.findProcessInstances("com.sample.ruleflow");
        Assert.assertEquals((long)1L, (long)processInstances.size());
        List nodeInstances = logService.findNodeInstances(processInstance.getId());
        Assert.assertEquals((long)6L, (long)nodeInstances.size());
        for (NodeInstanceLog nodeInstance : nodeInstances) {
            Assert.assertEquals((long)processInstance.getId(), (long)nodeInstance.getProcessInstanceId());
            Assert.assertEquals((Object)"com.sample.ruleflow", (Object)nodeInstance.getProcessId());
            Assert.assertNotNull((Object)nodeInstance.getDate());
        }
        logService.clear();
        processInstances = logService.findProcessInstances("com.sample.ruleflow");
        logService.dispose();
        Assert.assertTrue((boolean)processInstances.isEmpty());
    }

    @Test
    public void testAsyncAuditLoggerCompleteDirectCreation() throws Exception {
        Environment env = PersistenceUtil.createEnvironment(this.context);
        KnowledgeBase kbase = AbstractAuditLogServiceTest.createKnowledgeBase();
        StatefulKnowledgeSession session = this.createSession(kbase, env);
        AbstractAuditLogger logger = AuditLoggerFactory.newJMSInstance((boolean)true, (ConnectionFactory)this.factory, (Queue)this.queue);
        Assert.assertNotNull((Object)logger);
        Assert.assertTrue((boolean)(logger instanceof AsyncAuditLogProducer));
        session.addEventListener((ProcessEventListener)logger);
        ProcessInstance processInstance = session.startProcess("com.sample.ruleflow");
        MessageReceiver receiver = new MessageReceiver();
        receiver.receiveAndProcess(this.queue, (EntityManagerFactory)env.get("org.kie.api.persistence.jpa.EntityManagerFactory"));
        JPAAuditLogService logService = new JPAAuditLogService(env);
        List processInstances = logService.findProcessInstances("com.sample.ruleflow");
        Assert.assertEquals((long)1L, (long)processInstances.size());
        List nodeInstances = logService.findNodeInstances(processInstance.getId());
        Assert.assertEquals((long)6L, (long)nodeInstances.size());
        for (NodeInstanceLog nodeInstance : nodeInstances) {
            Assert.assertEquals((long)processInstance.getId(), (long)nodeInstance.getProcessInstanceId());
            Assert.assertEquals((Object)"com.sample.ruleflow", (Object)nodeInstance.getProcessId());
            Assert.assertNotNull((Object)nodeInstance.getDate());
        }
        logService.clear();
        processInstances = logService.findProcessInstances("com.sample.ruleflow");
        logService.dispose();
        Assert.assertTrue((boolean)processInstances.isEmpty());
    }

    @Test
    public void testAsyncAuditLoggerCompleteWithVariables() throws Exception {
        Environment env = PersistenceUtil.createEnvironment(this.context);
        KnowledgeBase kbase = AbstractAuditLogServiceTest.createKnowledgeBase();
        StatefulKnowledgeSession session = this.createSession(kbase, env);
        HashMap<String, Boolean> jmsProps = new HashMap<String, Boolean>();
        jmsProps.put("jbpm.audit.jms.transacted", false);
        jmsProps.put("jbpm.audit.jms.connection.factory", (Boolean)this.factory);
        jmsProps.put("jbpm.audit.jms.queue", (Boolean)this.queue);
        AbstractAuditLogger logger = AuditLoggerFactory.newInstance((AuditLoggerFactory.Type)AuditLoggerFactory.Type.JMS, (KieSession)session, jmsProps);
        Assert.assertNotNull((Object)logger);
        Assert.assertTrue((boolean)(logger instanceof AsyncAuditLogProducer));
        HashMap<String, String> params = new HashMap<String, String>();
        params.put("s", "test value");
        ProcessInstance processInstance = session.startProcess("com.sample.ruleflow3", params);
        MessageReceiver receiver = new MessageReceiver();
        receiver.receiveAndProcess(this.queue, (EntityManagerFactory)env.get("org.kie.api.persistence.jpa.EntityManagerFactory"));
        JPAAuditLogService logService = new JPAAuditLogService(env);
        List processInstances = logService.findProcessInstances("com.sample.ruleflow3");
        Assert.assertEquals((long)1L, (long)processInstances.size());
        List nodeInstances = logService.findNodeInstances(processInstance.getId());
        Assert.assertEquals((long)6L, (long)nodeInstances.size());
        for (NodeInstanceLog nodeInstance : nodeInstances) {
            Assert.assertEquals((long)processInstance.getId(), (long)nodeInstance.getProcessInstanceId());
            Assert.assertEquals((Object)"com.sample.ruleflow3", (Object)nodeInstance.getProcessId());
            Assert.assertNotNull((Object)nodeInstance.getDate());
        }
        List variables = logService.findVariableInstances(processInstance.getId());
        Assert.assertNotNull((Object)variables);
        Assert.assertEquals((long)2L, (long)variables.size());
        VariableInstanceLog var = (VariableInstanceLog)variables.get(0);
        Assert.assertEquals((Object)"InitialValue", (Object)var.getValue());
        Assert.assertEquals((Object)"", (Object)var.getOldValue());
        Assert.assertEquals((long)processInstance.getId(), (long)var.getProcessInstanceId());
        Assert.assertEquals((Object)processInstance.getProcessId(), (Object)var.getProcessId());
        Assert.assertEquals((Object)"s", (Object)var.getVariableId());
        Assert.assertEquals((Object)"s", (Object)var.getVariableInstanceId());
        var = (VariableInstanceLog)variables.get(1);
        Assert.assertEquals((Object)"test value", (Object)var.getValue());
        Assert.assertEquals((Object)"InitialValue", (Object)var.getOldValue());
        Assert.assertEquals((long)processInstance.getId(), (long)var.getProcessInstanceId());
        Assert.assertEquals((Object)processInstance.getProcessId(), (Object)var.getProcessId());
        Assert.assertEquals((Object)"s", (Object)var.getVariableId());
        Assert.assertEquals((Object)"s", (Object)var.getVariableInstanceId());
        logService.clear();
        processInstances = logService.findProcessInstances("com.sample.ruleflow3");
        logService.dispose();
        Assert.assertTrue((boolean)processInstances.isEmpty());
    }

    @Test
    public void testAsyncAuditLoggerCompleteWithVariablesCustomIndexer() throws Exception {
        Environment env = PersistenceUtil.createEnvironment(this.context);
        KnowledgeBase kbase = AbstractAuditLogServiceTest.createKnowledgeBase();
        StatefulKnowledgeSession session = this.createSession(kbase, env);
        HashMap<String, Boolean> jmsProps = new HashMap<String, Boolean>();
        jmsProps.put("jbpm.audit.jms.transacted", false);
        jmsProps.put("jbpm.audit.jms.connection.factory", (Boolean)this.factory);
        jmsProps.put("jbpm.audit.jms.queue", (Boolean)this.queue);
        AbstractAuditLogger logger = AuditLoggerFactory.newInstance((AuditLoggerFactory.Type)AuditLoggerFactory.Type.JMS, (KieSession)session, jmsProps);
        Assert.assertNotNull((Object)logger);
        Assert.assertTrue((boolean)(logger instanceof AsyncAuditLogProducer));
        LinkedList<String> names = new LinkedList<String>();
        names.add("john");
        names.add("mary");
        names.add("peter");
        HashMap<String, LinkedList<String>> params = new HashMap<String, LinkedList<String>>();
        params.put("list", names);
        ProcessInstance processInstance = session.startProcess("com.sample.ruleflow3", params);
        MessageReceiver receiver = new MessageReceiver();
        receiver.receiveAndProcess(this.queue, (EntityManagerFactory)env.get("org.kie.api.persistence.jpa.EntityManagerFactory"));
        JPAAuditLogService logService = new JPAAuditLogService(env);
        List processInstances = logService.findProcessInstances("com.sample.ruleflow3");
        Assert.assertEquals((long)1L, (long)processInstances.size());
        List nodeInstances = logService.findNodeInstances(processInstance.getId());
        Assert.assertEquals((long)12L, (long)nodeInstances.size());
        for (NodeInstanceLog nodeInstance : nodeInstances) {
            Assert.assertEquals((long)processInstance.getId(), (long)nodeInstance.getProcessInstanceId());
            Assert.assertEquals((Object)"com.sample.ruleflow3", (Object)nodeInstance.getProcessId());
            Assert.assertNotNull((Object)nodeInstance.getDate());
        }
        List variables = logService.findVariableInstances(processInstance.getId());
        Assert.assertNotNull((Object)variables);
        Assert.assertEquals((long)8L, (long)variables.size());
        ArrayList<VariableInstanceLog> listVariables = new ArrayList<VariableInstanceLog>();
        for (VariableInstanceLog v : variables) {
            if (!v.getVariableInstanceId().equals("list")) continue;
            listVariables.add(v);
        }
        Assert.assertEquals((long)3L, (long)listVariables.size());
        VariableInstanceLog var = (VariableInstanceLog)listVariables.get(0);
        Assert.assertEquals((Object)"john", (Object)var.getValue());
        Assert.assertEquals((Object)"", (Object)var.getOldValue());
        Assert.assertEquals((long)processInstance.getId(), (long)var.getProcessInstanceId());
        Assert.assertEquals((Object)processInstance.getProcessId(), (Object)var.getProcessId());
        Assert.assertEquals((Object)"list[0]", (Object)var.getVariableId());
        Assert.assertEquals((Object)"list", (Object)var.getVariableInstanceId());
        var = (VariableInstanceLog)listVariables.get(1);
        Assert.assertEquals((Object)"mary", (Object)var.getValue());
        Assert.assertEquals((Object)"", (Object)var.getOldValue());
        Assert.assertEquals((long)processInstance.getId(), (long)var.getProcessInstanceId());
        Assert.assertEquals((Object)processInstance.getProcessId(), (Object)var.getProcessId());
        Assert.assertEquals((Object)"list[1]", (Object)var.getVariableId());
        Assert.assertEquals((Object)"list", (Object)var.getVariableInstanceId());
        var = (VariableInstanceLog)listVariables.get(2);
        Assert.assertEquals((Object)"peter", (Object)var.getValue());
        Assert.assertEquals((Object)"", (Object)var.getOldValue());
        Assert.assertEquals((long)processInstance.getId(), (long)var.getProcessInstanceId());
        Assert.assertEquals((Object)processInstance.getProcessId(), (Object)var.getProcessId());
        Assert.assertEquals((Object)"list[2]", (Object)var.getVariableId());
        Assert.assertEquals((Object)"list", (Object)var.getVariableInstanceId());
        logService.clear();
        processInstances = logService.findProcessInstances("com.sample.ruleflow3");
        logService.dispose();
        Assert.assertTrue((boolean)processInstances.isEmpty());
    }

    public StatefulKnowledgeSession createSession(KnowledgeBase kbase, Environment env) {
        StatefulKnowledgeSession session = AbstractAuditLogServiceTest.createKieSession((KieBase)kbase, env);
        session.getWorkItemManager().registerWorkItemHandler("Human Task", (WorkItemHandler)new SystemOutWorkItemHandler());
        return session;
    }

    private void startHornetQServer() throws Exception {
        this.jmsServer = new EmbeddedJMS();
        this.jmsServer.start();
        logger.debug("Started Embedded JMS Server");
        BitronixHornetQXAConnectionFactory.connectionFactory = (XAConnectionFactory)this.jmsServer.lookup("ConnectionFactory");
        PoolingConnectionFactory myConnectionFactory = new PoolingConnectionFactory();
        myConnectionFactory.setClassName("org.jbpm.process.audit.jms.BitronixHornetQXAConnectionFactory");
        myConnectionFactory.setUniqueName("hornet");
        myConnectionFactory.setMaxPoolSize(5);
        myConnectionFactory.setAllowLocalTransactions(true);
        myConnectionFactory.init();
        this.factory = myConnectionFactory;
        this.queue = (Queue)this.jmsServer.lookup("/queue/exampleQueue");
    }

    private void stopHornetQServer() throws Exception {
        ((PoolingConnectionFactory)this.factory).close();
        this.jmsServer.stop();
        this.jmsServer = null;
    }

    private class MessageReceiver {
        private MessageReceiver() {
        }

        void receiveAndProcess(Queue queue, EntityManagerFactory entityManagerFactory) throws Exception {
            Connection qconnetion = AsyncAuditLogProducerTest.this.factory.createConnection();
            Session qsession = qconnetion.createSession(true, 1);
            MessageConsumer consumer = qsession.createConsumer((Destination)queue);
            qconnetion.start();
            AsyncAuditLogReceiver rec = new AsyncAuditLogReceiver(entityManagerFactory){

                public void onMessage(Message message) {
                    try {
                        UserTransaction ut = (UserTransaction)InitialContext.doLookup("java:comp/UserTransaction");
                        ut.begin();
                        super.onMessage(message);
                        ut.commit();
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            consumer.setMessageListener((MessageListener)rec);
            Thread.sleep(2000L);
            consumer.close();
            qsession.close();
            qconnetion.close();
        }

        public List<Message> receive(Queue queue) throws Exception {
            ArrayList<Message> messages = new ArrayList<Message>();
            Connection qconnetion = AsyncAuditLogProducerTest.this.factory.createConnection();
            Session qsession = qconnetion.createSession(true, 1);
            MessageConsumer consumer = qsession.createConsumer((Destination)queue);
            qconnetion.start();
            Message m = null;
            while ((m = consumer.receiveNoWait()) != null) {
                messages.add(m);
            }
            consumer.close();
            qsession.close();
            qconnetion.close();
            return messages;
        }
    }
}

