/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.XAConnection;
import javax.jms.XASession;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class XAConsumerTest
extends TestCase {
    static final Logger LOG = LoggerFactory.getLogger(XAConsumerTest.class);
    private static final String TEST_AMQ_BROKER_URI = "tcp://localhost:0";
    private String brokerUri;
    private static long txGenerator = 21L;
    private BrokerService broker;

    protected void setUp() throws Exception {
        this.broker = this.createBroker();
        this.broker.start();
        this.broker.waitUntilStarted();
        this.brokerUri = this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString();
    }

    protected void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testPullRequestXAConsumer() throws Exception {
        ActiveMQXAConnectionFactory activeMQConnectionFactory = new ActiveMQXAConnectionFactory("admin", "admin", this.brokerUri + "?trace=true&jms.prefetchPolicy.all=0");
        XAConnection connection = activeMQConnectionFactory.createXAConnection();
        connection.start();
        ActiveMQXAConnectionFactory activeMQConnectionFactoryAutoAck = new ActiveMQXAConnectionFactory("admin", "admin", this.brokerUri + "?trace=true&jms.prefetchPolicy.all=0");
        activeMQConnectionFactoryAutoAck.setXaAckMode(1);
        Connection autoAckConnection = activeMQConnectionFactoryAutoAck.createConnection();
        autoAckConnection.start();
        try {
            LOG.info(">>>INVOKE XA receive with PullRequest Consumer...");
            XASession xaSession = connection.createXASession();
            XAResource xaResource = xaSession.getXAResource();
            Xid xid = this.createXid();
            xaResource.start(xid, 0);
            Queue destination = xaSession.createQueue("TEST.T2");
            final MessageConsumer messageConsumer = xaSession.createConsumer((Destination)destination);
            final CountDownLatch receiveThreadDone = new CountDownLatch(1);
            final CountDownLatch receiveLatch = new CountDownLatch(1);
            Thread receiveThread = new Thread(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        messageConsumer.receive(600000L);
                    }
                    catch (JMSException expected) {
                        receiveLatch.countDown();
                        LOG.info("got expected ex: ", (Throwable)expected);
                    }
                    finally {
                        receiveThreadDone.countDown();
                    }
                }
            });
            receiveThread.start();
            LOG.info(">>>simulate Transaction Rollback");
            xaResource.end(xid, 0x20000000);
            xaResource.rollback(xid);
            LOG.info(">>>Sending message...");
            Session session = autoAckConnection.createSession(false, 1);
            Message messageToSend = session.createMessage();
            MessageProducer messageProducer = session.createProducer((Destination)destination);
            messageProducer.send(messageToSend);
            receiveThreadDone.await(30L, TimeUnit.SECONDS);
            receiveLatch.await(5L, TimeUnit.SECONDS);
            messageConsumer.close();
            xaSession.close();
            MessageConsumer messageConsumer1 = session.createConsumer((Destination)destination);
            Message message = messageConsumer1.receive(5000L);
            XAConsumerTest.assertNotNull((String)"Got message", (Object)message);
            LOG.info("Got message on new session", (Object)message);
            message.acknowledge();
        }
        finally {
            LOG.info(">>>Closing Connection");
            if (connection != null) {
                connection.close();
            }
            if (autoAckConnection != null) {
                autoAckConnection.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testPullRequestXAConsumerSingleConsumer() throws Exception {
        ActiveMQXAConnectionFactory activeMQConnectionFactory = new ActiveMQXAConnectionFactory("admin", "admin", this.brokerUri + "?trace=true&jms.prefetchPolicy.all=0");
        XAConnection connection = activeMQConnectionFactory.createXAConnection();
        connection.start();
        try {
            LOG.info(">>>INVOKE XA receive with PullRequest Consumer...");
            XASession xaSession = connection.createXASession();
            XAResource xaResource = xaSession.getXAResource();
            Xid xid = this.createXid();
            xaResource.start(xid, 0);
            Queue destination = xaSession.createQueue("TEST.T2");
            final MessageConsumer messageConsumer = xaSession.createConsumer((Destination)destination);
            final CountDownLatch receiveThreadDone = new CountDownLatch(1);
            final CountDownLatch receiveLatch = new CountDownLatch(1);
            Thread receiveThread = new Thread(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        messageConsumer.receive(600000L);
                    }
                    catch (JMSException expected) {
                        receiveLatch.countDown();
                        LOG.info("got expected ex: ", (Throwable)expected);
                    }
                    finally {
                        receiveThreadDone.countDown();
                    }
                }
            });
            receiveThread.start();
            LOG.info(">>>simulate Transaction Rollback");
            xaResource.end(xid, 0x20000000);
            xaResource.rollback(xid);
            XASession xaSessionSend = connection.createXASession();
            XAResource xaResourceSend = xaSessionSend.getXAResource();
            Xid xidSend = this.createXid();
            xaResourceSend.start(xidSend, 0);
            LOG.info(">>>Sending message...");
            ActiveMQMessage messageToSend = (ActiveMQMessage)xaSessionSend.createMessage();
            messageToSend.setTransactionId((TransactionId)new XATransactionId(xidSend));
            MessageProducer messageProducer = xaSessionSend.createProducer((Destination)destination);
            messageProducer.send((Message)messageToSend);
            xaResourceSend.end(xidSend, 0x4000000);
            xaResourceSend.commit(xidSend, true);
            receiveThreadDone.await(30L, TimeUnit.SECONDS);
            receiveLatch.await(5L, TimeUnit.SECONDS);
            messageConsumer.close();
            MessageConsumer messageConsumerTwo = xaSession.createConsumer((Destination)destination);
            Xid xidReceiveOk = this.createXid();
            xaResource.start(xidReceiveOk, 0);
            Message message = messageConsumerTwo.receive(10000L);
            XAConsumerTest.assertNotNull((String)"Got message", (Object)message);
            LOG.info("Got message on new session", (Object)message);
            xaResource.end(xidReceiveOk, 0x4000000);
            xaResource.commit(xidReceiveOk, true);
        }
        finally {
            LOG.info(">>>Closing Connection");
            if (connection != null) {
                connection.close();
            }
        }
    }

    public Xid createXid() throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream os = new DataOutputStream(baos);
        os.writeLong(++txGenerator);
        os.close();
        final byte[] bs = baos.toByteArray();
        return new Xid(){

            @Override
            public int getFormatId() {
                return 86;
            }

            @Override
            public byte[] getGlobalTransactionId() {
                return bs;
            }

            @Override
            public byte[] getBranchQualifier() {
                return bs;
            }
        };
    }

    private BrokerService createBroker() throws Exception {
        BrokerService broker = new BrokerService();
        PolicyMap policyMap = new PolicyMap();
        ArrayList<PolicyEntry> entries = new ArrayList<PolicyEntry>();
        PolicyEntry pe = new PolicyEntry();
        pe.setProducerFlowControl(true);
        pe.setUseCache(true);
        pe.setPrioritizedMessages(false);
        pe.setExpireMessagesPeriod(0L);
        pe.setQueuePrefetch(0);
        pe.setQueue(">");
        entries.add(pe);
        policyMap.setPolicyEntries(entries);
        broker.setDestinationPolicy(policyMap);
        broker.addConnector(TEST_AMQ_BROKER_URI);
        broker.deleteAllMessages();
        return broker;
    }
}

