/*
 * Decompiled with CFR 0.152.
 */
package org.jbpm.process.audit.jms;

import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple;
import com.arjuna.ats.jta.TransactionManager;
import com.arjuna.ats.jta.UserTransaction;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.XAConnectionFactory;
import javax.naming.InitialContext;
import javax.persistence.EntityManagerFactory;
import org.assertj.core.api.Assertions;
import org.hornetq.jms.server.embedded.EmbeddedJMS;
import org.jboss.narayana.jta.jms.ConnectionFactoryProxy;
import org.jboss.narayana.jta.jms.TransactionHelper;
import org.jboss.narayana.jta.jms.TransactionHelperImpl;
import org.jbpm.persistence.util.PersistenceUtil;
import org.jbpm.process.audit.AbstractAuditLogServiceTest;
import org.jbpm.process.audit.AbstractAuditLogger;
import org.jbpm.process.audit.AuditLoggerFactory;
import org.jbpm.process.audit.JPAAuditLogService;
import org.jbpm.process.audit.NodeInstanceLog;
import org.jbpm.process.audit.VariableInstanceLog;
import org.jbpm.process.audit.jms.AsyncAuditLogProducer;
import org.jbpm.process.audit.jms.AsyncAuditLogReceiver;
import org.jbpm.process.instance.impl.demo.SystemOutWorkItemHandler;
import org.jbpm.test.util.AbstractBaseTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.kie.api.KieBase;
import org.kie.api.event.process.ProcessEventListener;
import org.kie.api.runtime.Environment;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.process.ProcessInstance;
import org.kie.api.runtime.process.WorkItemHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncAuditLogProducerTest
extends AbstractBaseTest {
    private static final Logger logger = LoggerFactory.getLogger(AsyncAuditLogProducerTest.class);
    private HashMap<String, Object> context;
    private ConnectionFactory factory;
    private Queue queue;
    private EmbeddedJMS jmsServer;

    @Before
    public void setup() throws Exception {
        this.startHornetQServer();
        this.context = PersistenceUtil.setupWithPoolingDataSource((String)"org.jbpm.persistence.jpa");
    }

    @After
    public void tearDown() throws Exception {
        PersistenceUtil.cleanUp(this.context);
        this.stopHornetQServer();
    }

    @Test
    public void testAsyncAuditProducer() throws Exception {
        Environment env = PersistenceUtil.createEnvironment(this.context);
        KieBase kbase = AbstractAuditLogServiceTest.createKnowledgeBase();
        KieSession session = this.createSession(kbase, env);
        HashMap<String, Boolean> jmsProps = new HashMap<String, Boolean>();
        jmsProps.put("jbpm.audit.jms.transacted", false);
        jmsProps.put("jbpm.audit.jms.connection.factory", (Boolean)this.factory);
        jmsProps.put("jbpm.audit.jms.queue", (Boolean)this.queue);
        AbstractAuditLogger logger = AuditLoggerFactory.newInstance((AuditLoggerFactory.Type)AuditLoggerFactory.Type.JMS, (KieSession)session, jmsProps);
        Assertions.assertThat((Object)logger).isNotNull();
        Assertions.assertThat((boolean)(logger instanceof AsyncAuditLogProducer)).isTrue();
        long processInstanceId = session.startProcess("com.sample.ruleflow").getId();
        MessageReceiver receiver = new MessageReceiver();
        List<Message> messages = receiver.receive(this.queue);
        Assertions.assertThat(messages).isNotNull();
        Assertions.assertThat((int)messages.size()).isEqualTo(11);
    }

    @Test
    public void testAsyncAuditProducerTransactional() throws Exception {
        javax.transaction.UserTransaction ut = (javax.transaction.UserTransaction)InitialContext.doLookup("java:comp/UserTransaction");
        ut.begin();
        Environment env = PersistenceUtil.createEnvironment(this.context);
        KieBase kbase = AbstractAuditLogServiceTest.createKnowledgeBase();
        KieSession session = this.createSession(kbase, env);
        HashMap<String, Boolean> jmsProps = new HashMap<String, Boolean>();
        jmsProps.put("jbpm.audit.jms.transacted", true);
        jmsProps.put("jbpm.audit.jms.connection.factory", (Boolean)this.factory);
        jmsProps.put("jbpm.audit.jms.queue", (Boolean)this.queue);
        AbstractAuditLogger logger = AuditLoggerFactory.newInstance((AuditLoggerFactory.Type)AuditLoggerFactory.Type.JMS, (KieSession)session, jmsProps);
        Assertions.assertThat((Object)logger).isNotNull();
        Assertions.assertThat((boolean)(logger instanceof AsyncAuditLogProducer)).isTrue();
        long processInstanceId = session.startProcess("com.sample.ruleflow").getId();
        ut.commit();
        MessageReceiver receiver = new MessageReceiver();
        List<Message> messages = receiver.receive(this.queue);
        Assertions.assertThat(messages).isNotNull();
        Assertions.assertThat((int)messages.size()).isEqualTo(11);
    }

    @Test
    public void testAsyncAuditProducerTransactionalWithRollback() throws Exception {
        javax.transaction.UserTransaction ut = (javax.transaction.UserTransaction)InitialContext.doLookup("java:comp/UserTransaction");
        ut.begin();
        Environment env = PersistenceUtil.createEnvironment(this.context);
        KieBase kbase = AbstractAuditLogServiceTest.createKnowledgeBase();
        KieSession session = this.createSession(kbase, env);
        HashMap<String, Boolean> jmsProps = new HashMap<String, Boolean>();
        jmsProps.put("jbpm.audit.jms.transacted", true);
        jmsProps.put("jbpm.audit.jms.connection.factory", (Boolean)this.factory);
        jmsProps.put("jbpm.audit.jms.queue", (Boolean)this.queue);
        AbstractAuditLogger logger = AuditLoggerFactory.newInstance((AuditLoggerFactory.Type)AuditLoggerFactory.Type.JMS, (KieSession)session, jmsProps);
        Assertions.assertThat((Object)logger).isNotNull();
        Assertions.assertThat((boolean)(logger instanceof AsyncAuditLogProducer)).isTrue();
        long processInstanceId = session.startProcess("com.sample.ruleflow").getId();
        ut.rollback();
        MessageReceiver receiver = new MessageReceiver();
        List<Message> messages = receiver.receive(this.queue);
        Assertions.assertThat(messages).isNotNull();
        Assertions.assertThat((int)messages.size()).isEqualTo(0);
    }

    @Test
    public void testAsyncAuditProducerNonTransactionalWithRollback() throws Exception {
        javax.transaction.UserTransaction ut = (javax.transaction.UserTransaction)InitialContext.doLookup("java:comp/UserTransaction");
        ut.begin();
        Environment env = PersistenceUtil.createEnvironment(this.context);
        KieBase kbase = AbstractAuditLogServiceTest.createKnowledgeBase();
        KieSession session = this.createSession(kbase, env);
        HashMap<String, Object> jmsProps = new HashMap<String, Object>();
        jmsProps.put("jbpm.audit.jms.transacted", false);
        jmsProps.put("jbpm.audit.jms.connection.factory", this.jmsServer.lookup("ConnectionFactory"));
        jmsProps.put("jbpm.audit.jms.queue", this.queue);
        AbstractAuditLogger logger = AuditLoggerFactory.newInstance((AuditLoggerFactory.Type)AuditLoggerFactory.Type.JMS, (KieSession)session, jmsProps);
        Assertions.assertThat((Object)logger).isNotNull();
        Assertions.assertThat((boolean)(logger instanceof AsyncAuditLogProducer)).isTrue();
        long processInstanceId = session.startProcess("com.sample.ruleflow").getId();
        ut.rollback();
        MessageReceiver receiver = new MessageReceiver();
        List<Message> messages = receiver.receive(this.queue);
        Assertions.assertThat(messages).isNotNull();
        Assertions.assertThat((int)messages.size()).isEqualTo(11);
    }

    @Test
    public void testAsyncAuditLoggerComplete() throws Exception {
        Environment env = PersistenceUtil.createEnvironment(this.context);
        KieBase kbase = AbstractAuditLogServiceTest.createKnowledgeBase();
        KieSession session = this.createSession(kbase, env);
        HashMap<String, Boolean> jmsProps = new HashMap<String, Boolean>();
        jmsProps.put("jbpm.audit.jms.transacted", false);
        jmsProps.put("jbpm.audit.jms.connection.factory", (Boolean)this.factory);
        jmsProps.put("jbpm.audit.jms.queue", (Boolean)this.queue);
        AbstractAuditLogger logger = AuditLoggerFactory.newInstance((AuditLoggerFactory.Type)AuditLoggerFactory.Type.JMS, (KieSession)session, jmsProps);
        Assertions.assertThat((Object)logger).isNotNull();
        Assertions.assertThat((boolean)(logger instanceof AsyncAuditLogProducer)).isTrue();
        ProcessInstance processInstance = session.startProcess("com.sample.ruleflow");
        MessageReceiver receiver = new MessageReceiver();
        receiver.receiveAndProcess(this.queue, (EntityManagerFactory)env.get("org.kie.api.persistence.jpa.EntityManagerFactory"), 2000L, 11);
        JPAAuditLogService logService = new JPAAuditLogService(env);
        List processInstances = logService.findProcessInstances("com.sample.ruleflow");
        Assertions.assertThat((int)processInstances.size()).isEqualTo(1);
        List nodeInstances = logService.findNodeInstances(processInstance.getId());
        Assertions.assertThat((int)nodeInstances.size()).isEqualTo(6);
        for (NodeInstanceLog nodeInstance : nodeInstances) {
            Assertions.assertThat((long)processInstance.getId()).isEqualTo(nodeInstance.getProcessInstanceId().longValue());
            Assertions.assertThat((String)nodeInstance.getProcessId()).isEqualTo((Object)"com.sample.ruleflow");
            Assertions.assertThat((Date)nodeInstance.getDate()).isNotNull();
        }
        logService.clear();
        processInstances = logService.findProcessInstances("com.sample.ruleflow");
        logService.dispose();
        Assertions.assertThat((List)processInstances).isEmpty();
    }

    @Test
    public void testAsyncAuditLoggerCompleteDirectCreation() throws Exception {
        Environment env = PersistenceUtil.createEnvironment(this.context);
        KieBase kbase = AbstractAuditLogServiceTest.createKnowledgeBase();
        KieSession session = this.createSession(kbase, env);
        AbstractAuditLogger logger = AuditLoggerFactory.newJMSInstance((boolean)true, (ConnectionFactory)this.factory, (Queue)this.queue);
        Assertions.assertThat((Object)logger).isNotNull();
        Assertions.assertThat((boolean)(logger instanceof AsyncAuditLogProducer)).isTrue();
        session.addEventListener((ProcessEventListener)logger);
        ProcessInstance processInstance = session.startProcess("com.sample.ruleflow");
        MessageReceiver receiver = new MessageReceiver();
        receiver.receiveAndProcess(this.queue, (EntityManagerFactory)env.get("org.kie.api.persistence.jpa.EntityManagerFactory"), 6000L, 11);
        JPAAuditLogService logService = new JPAAuditLogService(env);
        List processInstances = logService.findProcessInstances("com.sample.ruleflow");
        Assertions.assertThat((int)processInstances.size()).isEqualTo(1);
        List nodeInstances = logService.findNodeInstances(processInstance.getId());
        Assertions.assertThat((int)nodeInstances.size()).isEqualTo(6);
        for (NodeInstanceLog nodeInstance : nodeInstances) {
            Assertions.assertThat((long)nodeInstance.getProcessInstanceId()).isEqualTo(processInstance.getId());
            Assertions.assertThat((String)nodeInstance.getProcessId()).isEqualTo((Object)"com.sample.ruleflow");
            Assertions.assertThat((Date)nodeInstance.getDate()).isNotNull();
        }
        logService.clear();
        processInstances = logService.findProcessInstances("com.sample.ruleflow");
        logService.dispose();
        Assertions.assertThat((List)processInstances).isEmpty();
    }

    @Test
    public void testAsyncAuditLoggerCompleteWithVariables() throws Exception {
        Environment env = PersistenceUtil.createEnvironment(this.context);
        KieBase kbase = AbstractAuditLogServiceTest.createKnowledgeBase();
        KieSession session = this.createSession(kbase, env);
        HashMap<String, Boolean> jmsProps = new HashMap<String, Boolean>();
        jmsProps.put("jbpm.audit.jms.transacted", false);
        jmsProps.put("jbpm.audit.jms.connection.factory", (Boolean)this.factory);
        jmsProps.put("jbpm.audit.jms.queue", (Boolean)this.queue);
        AbstractAuditLogger logger = AuditLoggerFactory.newInstance((AuditLoggerFactory.Type)AuditLoggerFactory.Type.JMS, (KieSession)session, jmsProps);
        Assertions.assertThat((Object)logger).isNotNull();
        Assertions.assertThat((boolean)(logger instanceof AsyncAuditLogProducer)).isTrue();
        HashMap<String, String> params = new HashMap<String, String>();
        params.put("s", "test value");
        ProcessInstance processInstance = session.startProcess("com.sample.ruleflow3", params);
        MessageReceiver receiver = new MessageReceiver();
        receiver.receiveAndProcess(this.queue, (EntityManagerFactory)env.get("org.kie.api.persistence.jpa.EntityManagerFactory"), 3000L, 13);
        JPAAuditLogService logService = new JPAAuditLogService(env);
        List processInstances = logService.findProcessInstances("com.sample.ruleflow3");
        Assertions.assertThat((int)processInstances.size()).isEqualTo(1);
        List nodeInstances = logService.findNodeInstances(processInstance.getId());
        Assertions.assertThat((int)nodeInstances.size()).isEqualTo(6);
        for (NodeInstanceLog nodeInstance : nodeInstances) {
            Assertions.assertThat((long)nodeInstance.getProcessInstanceId()).isEqualTo(processInstance.getId());
            Assertions.assertThat((String)nodeInstance.getProcessId()).isEqualTo((Object)"com.sample.ruleflow3");
            Assertions.assertThat((Date)nodeInstance.getDate()).isNotNull();
        }
        List variables = logService.findVariableInstances(processInstance.getId());
        Assertions.assertThat((List)variables).isNotNull();
        Assertions.assertThat((List)variables).hasSize(2);
        VariableInstanceLog var = (VariableInstanceLog)variables.get(0);
        Assertions.assertThat((String)var.getValue()).isEqualTo((Object)"InitialValue");
        Assertions.assertThat((String)var.getOldValue()).isIn(new Object[]{"", " ", null});
        Assertions.assertThat((long)var.getProcessInstanceId()).isEqualTo(processInstance.getId());
        Assertions.assertThat((String)var.getProcessId()).isEqualTo((Object)processInstance.getProcessId());
        Assertions.assertThat((String)var.getVariableId()).isEqualTo((Object)"s");
        Assertions.assertThat((String)var.getVariableInstanceId()).isEqualTo((Object)"s");
        var = (VariableInstanceLog)variables.get(1);
        Assertions.assertThat((String)var.getValue()).isEqualTo((Object)"test value");
        Assertions.assertThat((String)var.getOldValue()).isEqualTo((Object)"InitialValue");
        Assertions.assertThat((long)var.getProcessInstanceId()).isEqualTo(processInstance.getId());
        Assertions.assertThat((String)var.getProcessId()).isEqualTo((Object)processInstance.getProcessId());
        Assertions.assertThat((String)var.getVariableId()).isEqualTo((Object)"s");
        Assertions.assertThat((String)var.getVariableInstanceId()).isEqualTo((Object)"s");
        logService.clear();
        processInstances = logService.findProcessInstances("com.sample.ruleflow3");
        logService.dispose();
        Assertions.assertThat((List)processInstances).isNullOrEmpty();
    }

    @Test
    public void testAsyncAuditLoggerCompleteWithVariablesCustomIndexer() throws Exception {
        Environment env = PersistenceUtil.createEnvironment(this.context);
        KieBase kbase = AbstractAuditLogServiceTest.createKnowledgeBase();
        KieSession session = this.createSession(kbase, env);
        HashMap<String, Boolean> jmsProps = new HashMap<String, Boolean>();
        jmsProps.put("jbpm.audit.jms.transacted", false);
        jmsProps.put("jbpm.audit.jms.connection.factory", (Boolean)this.factory);
        jmsProps.put("jbpm.audit.jms.queue", (Boolean)this.queue);
        AbstractAuditLogger logger = AuditLoggerFactory.newInstance((AuditLoggerFactory.Type)AuditLoggerFactory.Type.JMS, (KieSession)session, jmsProps);
        Assertions.assertThat((Object)logger).isNotNull();
        Assertions.assertThat((boolean)(logger instanceof AsyncAuditLogProducer)).isTrue();
        LinkedList<String> names = new LinkedList<String>();
        names.add("john");
        names.add("mary");
        names.add("peter");
        HashMap<String, LinkedList<String>> params = new HashMap<String, LinkedList<String>>();
        params.put("list", names);
        ProcessInstance processInstance = session.startProcess("com.sample.ruleflow3", params);
        MessageReceiver receiver = new MessageReceiver();
        receiver.receiveAndProcess(this.queue, (EntityManagerFactory)env.get("org.kie.api.persistence.jpa.EntityManagerFactory"), 6000L, 28);
        JPAAuditLogService logService = new JPAAuditLogService(env);
        List processInstances = logService.findProcessInstances("com.sample.ruleflow3");
        Assertions.assertThat((int)processInstances.size()).isEqualTo(1);
        List nodeInstances = logService.findNodeInstances(processInstance.getId());
        Assertions.assertThat((int)nodeInstances.size()).isEqualTo(12);
        for (NodeInstanceLog nodeInstance : nodeInstances) {
            Assertions.assertThat((long)nodeInstance.getProcessInstanceId()).isEqualTo(processInstance.getId());
            Assertions.assertThat((String)nodeInstance.getProcessId()).isEqualTo((Object)"com.sample.ruleflow3");
            Assertions.assertThat((Date)nodeInstance.getDate()).isNotNull();
        }
        List variables = logService.findVariableInstances(processInstance.getId());
        Assertions.assertThat((List)variables).isNotNull();
        Assertions.assertThat((int)variables.size()).isEqualTo(8);
        ArrayList<VariableInstanceLog> listVariables = new ArrayList<VariableInstanceLog>();
        for (VariableInstanceLog v : variables) {
            if (!v.getVariableInstanceId().equals("list")) continue;
            listVariables.add(v);
        }
        Assertions.assertThat((int)listVariables.size()).isEqualTo(3);
        ArrayList<String> variableValues = new ArrayList<String>();
        ArrayList<String> variableIds = new ArrayList<String>();
        for (VariableInstanceLog var : listVariables) {
            variableValues.add(var.getValue());
            variableIds.add(var.getVariableId());
            Assertions.assertThat((String)var.getOldValue()).isIn(new Object[]{"", " ", null});
            Assertions.assertThat((long)var.getProcessInstanceId()).isEqualTo(processInstance.getId());
            Assertions.assertThat((String)var.getProcessId()).isEqualTo((Object)processInstance.getProcessId());
            Assertions.assertThat((String)var.getVariableInstanceId()).isEqualTo((Object)"list");
        }
        Assertions.assertThat(variableValues).contains((Object[])new String[]{"john", "mary", "peter"});
        Assertions.assertThat(variableIds).contains((Object[])new String[]{"list[0]", "list[1]", "list[2]"});
        logService.clear();
        processInstances = logService.findProcessInstances("com.sample.ruleflow3");
        logService.dispose();
        Assertions.assertThat((List)processInstances).isNullOrEmpty();
    }

    public KieSession createSession(KieBase kbase, Environment env) {
        KieSession session = AbstractAuditLogServiceTest.createKieSession(kbase, env);
        session.getWorkItemManager().registerWorkItemHandler("Human Task", (WorkItemHandler)new SystemOutWorkItemHandler());
        return session;
    }

    private void startHornetQServer() throws Exception {
        this.jmsServer = new EmbeddedJMS();
        this.jmsServer.start();
        logger.debug("Started Embedded JMS Server");
        XAConnectionFactory connectionFactory = (XAConnectionFactory)this.jmsServer.lookup("ConnectionFactory");
        new InitialContext().rebind("java:comp/UserTransaction", (Object)UserTransaction.userTransaction());
        new InitialContext().rebind("java:comp/TransactionManager", (Object)TransactionManager.transactionManager());
        new InitialContext().rebind("java:comp/TransactionSynchronizationRegistry", (Object)new TransactionSynchronizationRegistryImple());
        this.factory = new ConnectionFactoryProxy(connectionFactory, (TransactionHelper)new TransactionHelperImpl(TransactionManager.transactionManager()));
        this.queue = (Queue)this.jmsServer.lookup("/queue/exampleQueue");
    }

    private void stopHornetQServer() throws Exception {
        this.jmsServer.stop();
        this.jmsServer = null;
    }

    private class MessageReceiver {
        private MessageReceiver() {
        }

        void receiveAndProcess(Queue queue, EntityManagerFactory entityManagerFactory, long waitTime, int countDown) throws Exception {
            Connection qconnetion = AsyncAuditLogProducerTest.this.factory.createConnection();
            Session qsession = qconnetion.createSession(true, 1);
            MessageConsumer consumer = qsession.createConsumer((Destination)queue);
            qconnetion.start();
            final CountDownLatch latch = new CountDownLatch(countDown);
            AsyncAuditLogReceiver rec = new AsyncAuditLogReceiver(entityManagerFactory){

                public void onMessage(Message message) {
                    try {
                        javax.transaction.UserTransaction ut = (javax.transaction.UserTransaction)InitialContext.doLookup("java:comp/UserTransaction");
                        ut.begin();
                        super.onMessage(message);
                        ut.commit();
                        latch.countDown();
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            consumer.setMessageListener((MessageListener)rec);
            Assertions.assertThat((boolean)latch.await(waitTime, TimeUnit.MILLISECONDS)).isTrue();
            consumer.close();
            qsession.close();
            qconnetion.close();
        }

        public List<Message> receive(Queue queue) throws Exception {
            ArrayList<Message> messages = new ArrayList<Message>();
            Connection qconnetion = AsyncAuditLogProducerTest.this.factory.createConnection();
            Session qsession = qconnetion.createSession(true, 1);
            MessageConsumer consumer = qsession.createConsumer((Destination)queue);
            qconnetion.start();
            Message m = null;
            while ((m = consumer.receiveNoWait()) != null) {
                messages.add(m);
            }
            consumer.close();
            qsession.close();
            qconnetion.close();
            return messages;
        }
    }
}

