/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.transport.failover;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayDeque;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.Stack;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
import junit.framework.Test;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.util.DestinationPathSeparatorBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.apache.activemq.util.SocketProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FailoverTransactionTest
extends TestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(FailoverTransactionTest.class);
    private static final String QUEUE_NAME = "Failover.WithTx";
    private static final String TRANSPORT_URI = "tcp://localhost:0";
    private String url;
    BrokerService broker;
    final Random random = new Random();

    public static Test suite() {
        return FailoverTransactionTest.suite(FailoverTransactionTest.class);
    }

    public void setUp() throws Exception {
        super.setMaxTestTime(120000L);
        super.setAutoFail(true);
        super.setUp();
    }

    public void tearDown() throws Exception {
        super.tearDown();
        this.stopBroker();
    }

    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    private void startCleanBroker() throws Exception {
        this.startBroker(true);
    }

    public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
        this.broker = this.createBroker(deleteAllMessagesOnStartup);
        this.broker.start();
    }

    public void startBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
        this.broker = this.createBroker(deleteAllMessagesOnStartup, bindAddress);
        this.broker.start();
    }

    public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
        return this.createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
    }

    public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
        this.broker = new BrokerService();
        this.broker.setUseJmx(false);
        this.broker.setAdvisorySupport(false);
        this.broker.addConnector(bindAddress);
        this.broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry defaultEntry = new PolicyEntry();
        defaultEntry.setUsePrefetchExtension(false);
        policyMap.setDefaultEntry(defaultEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.url = ((TransportConnector)this.broker.getTransportConnectors().get(0)).getConnectUri().toString();
        return this.broker;
    }

    public void configureConnectionFactory(ActiveMQConnectionFactory factory) {
    }

    public void testFailoverProducerCloseBeforeTransaction() throws Exception {
        this.startCleanBroker();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        this.configureConnectionFactory(cf);
        Connection connection = cf.createConnection();
        connection.start();
        Session session = connection.createSession(true, 1);
        Queue destination = session.createQueue(QUEUE_NAME);
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        this.produceMessage(session, destination);
        this.broker.stop();
        this.startBroker(false, this.url);
        session.commit();
        FailoverTransactionTest.assertNotNull((String)"we got the message", (Object)consumer.receive(20000L));
        session.commit();
        connection.close();
    }

    public void initCombosForTestFailoverCommitReplyLost() {
        String osName = System.getProperty("os.name");
        Object[] persistenceAdapters = !osName.equalsIgnoreCase("AIX") && !osName.equalsIgnoreCase("SunOS") ? new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, TestSupport.PersistenceAdapterChoice.LevelDB, TestSupport.PersistenceAdapterChoice.JDBC} : new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, TestSupport.PersistenceAdapterChoice.JDBC};
        this.addCombinationValues("defaultPersistenceAdapter", persistenceAdapters);
    }

    public void testFailoverCommitReplyLost() throws Exception {
        this.broker = this.createBroker(true);
        this.setDefaultPersistenceAdapter(this.broker);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport(){

            public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
                super.commitTransaction(context, xid, onePhase);
                context.setDontSendReponse(true);
                Executors.newSingleThreadExecutor().execute(new Runnable(){

                    @Override
                    public void run() {
                        LOG.info("Stopping broker post commit...");
                        try {
                            FailoverTransactionTest.this.broker.stop();
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }});
        this.broker.start();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        this.configureConnectionFactory(cf);
        Connection connection = cf.createConnection();
        connection.start();
        final Session session = connection.createSession(true, 1);
        Queue destination = session.createQueue(QUEUE_NAME);
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        this.produceMessage(session, destination);
        final CountDownLatch commitDoneLatch = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(new Runnable(){

            @Override
            public void run() {
                LOG.info("doing async commit...");
                try {
                    session.commit();
                }
                catch (JMSException e) {
                    TestCase.assertTrue((boolean)(e instanceof TransactionRolledBackException));
                    LOG.info("got commit exception: ", (Throwable)e);
                }
                commitDoneLatch.countDown();
                LOG.info("done async commit");
            }
        });
        this.broker.waitUntilStopped();
        this.broker = this.createBroker(false, this.url);
        this.setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
        FailoverTransactionTest.assertTrue((String)"tx committed through failover", (boolean)commitDoneLatch.await(30L, TimeUnit.SECONDS));
        javax.jms.Message msg = consumer.receive(20000L);
        LOG.info("Received: " + msg);
        FailoverTransactionTest.assertNotNull((String)"we got the message", (Object)msg);
        FailoverTransactionTest.assertNull((String)"we got just one message", (Object)consumer.receive(2000L));
        session.commit();
        consumer.close();
        connection.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        LOG.info("Checking for remaining/hung messages..");
        this.broker = this.createBroker(false, this.url);
        this.setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
        cf = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        this.configureConnectionFactory(cf);
        connection = cf.createConnection();
        connection.start();
        Session session2 = connection.createSession(false, 1);
        consumer = session2.createConsumer((Destination)destination);
        msg = consumer.receive(1000L);
        LOG.info("Received: " + msg);
        FailoverTransactionTest.assertNull((String)("no messges left dangling but got: " + msg), (Object)msg);
        connection.close();
    }

    public void testFailoverCommitReplyLostWithDestinationPathSeparator() throws Exception {
        ActiveMQDestination[] destinations;
        this.broker = this.createBroker(true);
        this.setDefaultPersistenceAdapter(this.broker);
        this.broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker(), new BrokerPluginSupport(){

            public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
                super.commitTransaction(context, xid, onePhase);
                context.setDontSendReponse(true);
                Executors.newSingleThreadExecutor().execute(new Runnable(){

                    @Override
                    public void run() {
                        LOG.info("Stopping broker post commit...");
                        try {
                            FailoverTransactionTest.this.broker.stop();
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }});
        this.broker.start();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        this.configureConnectionFactory(cf);
        Connection connection = cf.createConnection();
        connection.start();
        final Session session = connection.createSession(true, 1);
        Queue destination = session.createQueue(QUEUE_NAME.replace('.', '/') + "?consumer.prefetchSize=0");
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        this.produceMessage(session, destination);
        final CountDownLatch commitDoneLatch = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(new Runnable(){

            @Override
            public void run() {
                LOG.info("doing async commit...");
                try {
                    session.commit();
                }
                catch (JMSException e) {
                    TestCase.assertTrue((boolean)(e instanceof TransactionRolledBackException));
                    LOG.info("got commit exception: ", (Throwable)e);
                }
                commitDoneLatch.countDown();
                LOG.info("done async commit");
            }
        });
        this.broker.waitUntilStopped();
        this.broker = this.createBroker(false, this.url);
        this.setDefaultPersistenceAdapter(this.broker);
        this.broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker()});
        this.broker.start();
        FailoverTransactionTest.assertTrue((String)"tx committed trough failover", (boolean)commitDoneLatch.await(30L, TimeUnit.SECONDS));
        javax.jms.Message msg = consumer.receive(20000L);
        LOG.info("Received: " + msg);
        FailoverTransactionTest.assertNotNull((String)"we got the message", (Object)msg);
        FailoverTransactionTest.assertNull((String)"we got just one message", (Object)consumer.receive(2000L));
        session.commit();
        consumer.close();
        connection.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        LOG.info("Checking for remaining/hung messages..");
        this.broker = this.createBroker(false, this.url);
        this.setDefaultPersistenceAdapter(this.broker);
        this.broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker()});
        this.broker.start();
        cf = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        this.configureConnectionFactory(cf);
        connection = cf.createConnection();
        connection.start();
        Session session2 = connection.createSession(false, 1);
        consumer = session2.createConsumer((Destination)destination);
        msg = consumer.receive(1000L);
        LOG.info("Received: " + msg);
        FailoverTransactionTest.assertNull((String)("no messges left dangling but got: " + msg), (Object)msg);
        connection.close();
        for (ActiveMQDestination dest : destinations = this.broker.getRegionBroker().getDestinations()) {
            LOG.info("Destinations list: " + dest);
        }
        FailoverTransactionTest.assertEquals((String)"Only one destination", (int)1, (int)this.broker.getRegionBroker().getDestinations().length);
    }

    public void initCombosForTestFailoverSendReplyLost() {
        this.addCombinationValues("defaultPersistenceAdapter", new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, TestSupport.PersistenceAdapterChoice.JDBC});
    }

    public void testFailoverSendReplyLost() throws Exception {
        this.broker = this.createBroker(true);
        this.setDefaultPersistenceAdapter(this.broker);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport(){

            public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
                super.send(producerExchange, messageSend);
                producerExchange.getConnectionContext().setDontSendReponse(true);
                Executors.newSingleThreadExecutor().execute(new Runnable(){

                    @Override
                    public void run() {
                        LOG.info("Stopping broker post send...");
                        try {
                            FailoverTransactionTest.this.broker.stop();
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }});
        this.broker.start();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + this.url + ")?jms.watchTopicAdvisories=false");
        this.configureConnectionFactory(cf);
        Connection connection = cf.createConnection();
        connection.start();
        final Session session = connection.createSession(false, 1);
        final Queue destination = session.createQueue(QUEUE_NAME);
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        final CountDownLatch sendDoneLatch = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(new Runnable(){

            @Override
            public void run() {
                LOG.info("doing async send...");
                try {
                    FailoverTransactionTest.this.produceMessage(session, destination);
                }
                catch (JMSException e) {
                    LOG.error("got send exception: ", (Throwable)e);
                    TestCase.fail((String)("got unexpected send exception" + (Object)((Object)e)));
                }
                sendDoneLatch.countDown();
                LOG.info("done async send");
            }
        });
        this.broker.waitUntilStopped();
        this.broker = this.createBroker(false, this.url);
        this.setDefaultPersistenceAdapter(this.broker);
        LOG.info("restarting....");
        this.broker.start();
        FailoverTransactionTest.assertTrue((String)"message sent through failover", (boolean)sendDoneLatch.await(30L, TimeUnit.SECONDS));
        javax.jms.Message msg = consumer.receive(20000L);
        LOG.info("Received: " + msg);
        FailoverTransactionTest.assertNotNull((String)"we got the message", (Object)msg);
        FailoverTransactionTest.assertNull((String)"we got just one message", (Object)consumer.receive(2000L));
        consumer.close();
        connection.close();
        FailoverTransactionTest.assertEquals((String)"no newly queued messages", (long)0L, (long)((RegionBroker)this.broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
        FailoverTransactionTest.assertEquals((String)"1 dequeue", (long)1L, (long)((RegionBroker)this.broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount());
        this.broker.stop();
        this.broker.waitUntilStopped();
        LOG.info("Checking for remaining/hung messages with second restart..");
        this.broker = this.createBroker(false, this.url);
        this.setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
        cf = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        this.configureConnectionFactory(cf);
        connection = cf.createConnection();
        connection.start();
        Session session2 = connection.createSession(false, 1);
        consumer = session2.createConsumer((Destination)destination);
        msg = consumer.receive(1000L);
        LOG.info("Received: " + msg);
        FailoverTransactionTest.assertNull((String)("no messges left dangling but got: " + msg), (Object)msg);
        connection.close();
    }

    public void initCombosForTestFailoverConnectionSendReplyLost() {
        this.addCombinationValues("defaultPersistenceAdapter", new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, TestSupport.PersistenceAdapterChoice.JDBC});
    }

    public void testFailoverConnectionSendReplyLost() throws Exception {
        this.broker = this.createBroker(true);
        PersistenceAdapter store = this.setDefaultPersistenceAdapter(this.broker);
        if (store instanceof KahaDBPersistenceAdapter) {
            ((KahaDBPersistenceAdapter)store).setConcurrentStoreAndDispatchQueues(false);
        }
        final SocketProxy proxy = new SocketProxy();
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport(){
            private boolean firstSend = true;

            public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
                super.send(producerExchange, messageSend);
                if (this.firstSend) {
                    this.firstSend = false;
                    producerExchange.getConnectionContext().setDontSendReponse(true);
                    Executors.newSingleThreadExecutor().execute(new Runnable(){

                        @Override
                        public void run() {
                            LOG.info("Stopping connection post send...");
                            try {
                                proxy.close();
                            }
                            catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
            }
        }});
        this.broker.start();
        proxy.setTarget(new URI(this.url));
        proxy.open();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl().toASCIIString() + ")?jms.watchTopicAdvisories=false");
        this.configureConnectionFactory(cf);
        Connection connection = cf.createConnection();
        connection.start();
        final Session session = connection.createSession(false, 1);
        final Queue destination = session.createQueue(QUEUE_NAME);
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        final CountDownLatch sendDoneLatch = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(new Runnable(){

            @Override
            public void run() {
                LOG.info("doing async send...");
                try {
                    FailoverTransactionTest.this.produceMessage(session, destination);
                }
                catch (JMSException e) {
                    LOG.info("got send exception: ", (Throwable)e);
                }
                sendDoneLatch.countDown();
                LOG.info("done async send");
            }
        });
        FailoverTransactionTest.assertTrue((String)"proxy was closed", (boolean)proxy.waitUntilClosed(30L));
        LOG.info("restarting proxy");
        proxy.open();
        FailoverTransactionTest.assertTrue((String)"message sent through failover", (boolean)sendDoneLatch.await(30L, TimeUnit.SECONDS));
        javax.jms.Message msg = consumer.receive(20000L);
        LOG.info("Received: " + msg);
        FailoverTransactionTest.assertNotNull((String)"we got the message", (Object)msg);
        FailoverTransactionTest.assertNull((String)"we got just one message", (Object)consumer.receive(2000L));
        consumer.close();
        connection.close();
        FailoverTransactionTest.assertEquals((String)"one queued message", (long)1L, (long)((RegionBroker)this.broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
        this.broker.stop();
        this.broker.waitUntilStopped();
        LOG.info("Checking for remaining/hung messages with restart..");
        this.broker = this.createBroker(false, this.url);
        this.setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
        cf = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        this.configureConnectionFactory(cf);
        connection = cf.createConnection();
        connection.start();
        Session session2 = connection.createSession(false, 1);
        consumer = session2.createConsumer((Destination)destination);
        msg = consumer.receive(1000L);
        LOG.info("Received: " + msg);
        FailoverTransactionTest.assertNull((String)("no messges left dangling but got: " + msg), (Object)msg);
        connection.close();
    }

    public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
        this.startCleanBroker();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + this.url + ")?trackTransactionProducers=false");
        this.configureConnectionFactory(cf);
        Connection connection = cf.createConnection();
        connection.start();
        Session session = connection.createSession(true, 1);
        Queue destination = session.createQueue(QUEUE_NAME);
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        this.produceMessage(session, destination);
        this.broker.stop();
        this.startBroker(false, this.url);
        try {
            session.commit();
            FailoverTransactionTest.fail((String)"expect ex for rollback only on async exc");
        }
        catch (JMSException jMSException) {
            // empty catch block
        }
        FailoverTransactionTest.assertNull((String)"we got the message", (Object)consumer.receive(5000L));
        session.commit();
        connection.close();
    }

    public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception {
        int i;
        this.startCleanBroker();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        this.configureConnectionFactory(cf);
        Connection connection = cf.createConnection();
        connection.start();
        Session session = connection.createSession(true, 1);
        Queue destination = session.createQueue(QUEUE_NAME);
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        int count = 10;
        for (i = 0; i < 10; ++i) {
            MessageProducer producer = session.createProducer((Destination)destination);
            TextMessage message = session.createTextMessage("Test message: 10");
            producer.send((javax.jms.Message)message);
            producer.close();
        }
        this.broker.stop();
        this.startBroker(false, this.url);
        session.commit();
        for (i = 0; i < 10; ++i) {
            FailoverTransactionTest.assertNotNull((String)"we got all the message: 10", (Object)consumer.receive(20000L));
        }
        session.commit();
        connection.close();
    }

    public void testFailoverWithConnectionConsumer() throws Exception {
        int i;
        this.startCleanBroker();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        this.configureConnectionFactory(cf);
        Connection connection = cf.createConnection();
        connection.start();
        Session session = connection.createSession(true, 1);
        Queue destination = session.createQueue("testFailoverWithConnectionConsumer");
        final CountDownLatch connectionConsumerGotOne = new CountDownLatch(1);
        final Session poolSession = connection.createSession(false, 1);
        connection.createConnectionConsumer((Destination)destination, null, new ServerSessionPool(){

            public ServerSession getServerSession() throws JMSException {
                return new ServerSession(){

                    public Session getSession() throws JMSException {
                        return poolSession;
                    }

                    public void start() throws JMSException {
                        connectionConsumerGotOne.countDown();
                        poolSession.run();
                    }
                };
            }
        }, 1);
        MessageConsumer consumer = session.createConsumer((Destination)destination);
        int count = 10;
        for (i = 0; i < 10; ++i) {
            MessageProducer producer = session.createProducer((Destination)destination);
            TextMessage message = session.createTextMessage("Test message: 10");
            producer.send((javax.jms.Message)message);
            producer.close();
        }
        this.broker.stop();
        this.startBroker(false, this.url);
        session.commit();
        for (i = 0; i < 9; ++i) {
            FailoverTransactionTest.assertNotNull((String)"Failed to get message: 10", (Object)consumer.receive(20000L));
        }
        session.commit();
        connection.close();
        FailoverTransactionTest.assertTrue((String)"connectionconsumer did not get a message", (boolean)connectionConsumerGotOne.await(10L, TimeUnit.SECONDS));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testFailoverConsumerAckLost() throws Exception {
        for (int i = 0; i < 3; ++i) {
            try {
                LOG.info("Iteration: " + i);
                this.doTestFailoverConsumerAckLost(i);
                continue;
            }
            finally {
                this.stopBroker();
            }
        }
    }

    public void doTestFailoverConsumerAckLost(int pauseSeconds) throws Exception {
        this.broker = this.createBroker(true);
        this.setDefaultPersistenceAdapter(this.broker);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport(){

            public void acknowledge(ConsumerBrokerExchange consumerExchange, final MessageAck ack) throws Exception {
                consumerExchange.getConnectionContext().setDontSendReponse(true);
                Executors.newSingleThreadExecutor().execute(new Runnable(){

                    @Override
                    public void run() {
                        LOG.info("Stopping broker on ack: " + ack);
                        try {
                            FailoverTransactionTest.this.broker.stop();
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }});
        this.broker.start();
        Vector<Connection> connections = new Vector<Connection>();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        this.configureConnectionFactory(cf);
        Connection connection = cf.createConnection();
        connection.start();
        connections.add(connection);
        Session producerSession = connection.createSession(false, 1);
        Queue destination = producerSession.createQueue("Failover.WithTx?consumer.prefetchSize=1");
        connection = cf.createConnection();
        connection.start();
        connections.add(connection);
        final Session consumerSession1 = connection.createSession(true, 0);
        connection = cf.createConnection();
        connection.start();
        connections.add(connection);
        Session consumerSession2 = connection.createSession(true, 0);
        final MessageConsumer consumer1 = consumerSession1.createConsumer((Destination)destination);
        MessageConsumer consumer2 = consumerSession2.createConsumer((Destination)destination);
        this.produceMessage(producerSession, destination);
        this.produceMessage(producerSession, destination);
        final Vector receivedMessages = new Vector();
        final CountDownLatch commitDoneLatch = new CountDownLatch(1);
        final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false);
        Executors.newSingleThreadExecutor().execute(new Runnable(){

            @Override
            public void run() {
                LOG.info("doing async commit after consume...");
                try {
                    javax.jms.Message msg = consumer1.receive(20000L);
                    LOG.info("consumer1 first attempt got message: " + msg);
                    receivedMessages.add(msg);
                    TimeUnit.SECONDS.sleep(FailoverTransactionTest.this.random.nextInt(5));
                    msg = consumer1.receive(5000L);
                    LOG.info("consumer1 second attempt got message: " + msg);
                    if (msg != null) {
                        receivedMessages.add(msg);
                    }
                    LOG.info("committing consumer1 session: " + receivedMessages.size() + " messsage(s)");
                    try {
                        consumerSession1.commit();
                    }
                    catch (TransactionRolledBackException expected) {
                        LOG.info("got exception ex on commit", (Throwable)expected);
                        gotTransactionRolledBackException.set(true);
                    }
                    commitDoneLatch.countDown();
                    LOG.info("done async commit");
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        this.broker.waitUntilStopped();
        this.broker = this.createBroker(false, this.url);
        this.setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
        FailoverTransactionTest.assertTrue((String)"tx committed through failover", (boolean)commitDoneLatch.await(30L, TimeUnit.SECONDS));
        LOG.info("received message count: " + receivedMessages.size());
        for (int i = 0; i < 2; ++i) {
            javax.jms.Message msg = consumer1.receive(5000L);
            LOG.info("post: from consumer1 received: " + msg);
            consumerSession1.commit();
            if (msg == null) {
                msg = consumer2.receive(10000L);
                LOG.info("post: from consumer2 received: " + msg);
                consumerSession2.commit();
            }
            FailoverTransactionTest.assertNotNull((String)("got message [" + i + "]"), (Object)msg);
        }
        for (Connection c : connections) {
            c.close();
        }
        this.broker.stop();
        this.broker.waitUntilStopped();
        LOG.info("Checking for remaining/hung messages..");
        this.broker = this.createBroker(false, this.url);
        this.setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
        cf = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        this.configureConnectionFactory(cf);
        connection = cf.createConnection();
        connection.start();
        Session sweeperSession = connection.createSession(false, 1);
        MessageConsumer sweeper = sweeperSession.createConsumer((Destination)destination);
        javax.jms.Message msg = sweeper.receive(1000L);
        LOG.info("Sweep received: " + msg);
        FailoverTransactionTest.assertNull((String)("no messges left dangling but got: " + msg), (Object)msg);
        connection.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testPoolingNConsumesAfterReconnect() throws Exception {
        this.broker = this.createBroker(true);
        this.setDefaultPersistenceAdapter(this.broker);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport(){
            int count = 0;

            public void removeConsumer(ConnectionContext context, final ConsumerInfo info) throws Exception {
                if (this.count++ == 1) {
                    Executors.newSingleThreadExecutor().execute(new Runnable(){

                        @Override
                        public void run() {
                            LOG.info("Stopping broker on removeConsumer: " + info);
                            try {
                                FailoverTransactionTest.this.broker.stop();
                            }
                            catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
            }
        }});
        this.broker.start();
        Vector<Connection> connections = new Vector<Connection>();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        this.configureConnectionFactory(cf);
        Connection connection = cf.createConnection();
        connection.start();
        connections.add(connection);
        Session producerSession = connection.createSession(false, 1);
        Queue destination = producerSession.createQueue("Failover.WithTx?consumer.prefetchSize=1");
        this.produceMessage(producerSession, destination);
        connection.close();
        connection = cf.createConnection();
        connection.start();
        connections.add(connection);
        Session consumerSession = connection.createSession(false, 2);
        int sessionCount = 10;
        Stack<Session> sessions = new Stack<Session>();
        for (int i = 0; i < 10; ++i) {
            sessions.push(connection.createSession(false, 1));
        }
        int consumerCount = 1000;
        final ArrayDeque<MessageConsumer> consumers = new ArrayDeque<MessageConsumer>();
        for (int i = 0; i < 1000; ++i) {
            consumers.push(consumerSession.createConsumer((Destination)destination));
        }
        final ExecutorService executorService = Executors.newCachedThreadPool();
        final FailoverTransport failoverTransport = (FailoverTransport)((ActiveMQConnection)connection).getTransport().narrow(FailoverTransport.class);
        final TransportListener delegate = failoverTransport.getTransportListener();
        failoverTransport.setTransportListener(new TransportListener(){

            public void onCommand(Object command) {
                delegate.onCommand(command);
            }

            public void onException(IOException error) {
                delegate.onException(error);
            }

            public void transportInterupted() {
                LOG.error("Transport interrupted: " + failoverTransport, (Throwable)new RuntimeException("HERE"));
                for (int i = 0; i < 1000 && !consumers.isEmpty(); ++i) {
                    executorService.execute(new Runnable(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        @Override
                        public void run() {
                            MessageConsumer localConsumer = null;
                            try {
                                TransportListener transportListener = delegate;
                                synchronized (transportListener) {
                                    localConsumer = (MessageConsumer)consumers.pop();
                                }
                                localConsumer.receive(1L);
                                LOG.info("calling close() " + ((ActiveMQMessageConsumer)localConsumer).getConsumerId());
                                localConsumer.close();
                            }
                            catch (NoSuchElementException noSuchElementException) {
                            }
                            catch (Exception ignored) {
                                LOG.error("Ex on: " + ((ActiveMQMessageConsumer)localConsumer).getConsumerId(), (Throwable)ignored);
                            }
                        }
                    });
                }
                delegate.transportInterupted();
            }

            public void transportResumed() {
                delegate.transportResumed();
            }
        });
        MessageConsumer consumer = null;
        TransportListener transportListener = delegate;
        synchronized (transportListener) {
            consumer = (MessageConsumer)consumers.pop();
        }
        LOG.info("calling close to trigger broker stop " + ((ActiveMQMessageConsumer)consumer).getConsumerId());
        consumer.close();
        this.broker.waitUntilStopped();
        this.broker = this.createBroker(false, this.url);
        this.setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
        consumer = consumerSession.createConsumer((Destination)destination);
        LOG.info("finally consuming message: " + ((ActiveMQMessageConsumer)consumer).getConsumerId());
        javax.jms.Message msg = null;
        for (int i = 0; i < 4 && msg == null; ++i) {
            msg = consumer.receive(1000L);
        }
        LOG.info("post: from consumer1 received: " + msg);
        FailoverTransactionTest.assertNotNull((String)"got message after failover", (Object)msg);
        msg.acknowledge();
        for (Connection c : connections) {
            c.close();
        }
    }

    public void testAutoRollbackWithMissingRedeliveries() throws Exception {
        this.broker = this.createBroker(true);
        this.broker.start();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        this.configureConnectionFactory(cf);
        Connection connection = cf.createConnection();
        connection.start();
        Session producerSession = connection.createSession(false, 1);
        Queue destination = producerSession.createQueue("Failover.WithTx?consumer.prefetchSize=1");
        Session consumerSession = connection.createSession(true, 0);
        MessageConsumer consumer = consumerSession.createConsumer((Destination)destination);
        this.produceMessage(producerSession, destination);
        javax.jms.Message msg = consumer.receive(20000L);
        FailoverTransactionTest.assertNotNull((Object)msg);
        this.broker.stop();
        this.broker = this.createBroker(false, this.url);
        FailoverTransactionTest.setPersistenceAdapter(this.broker, TestSupport.PersistenceAdapterChoice.JDBC);
        this.broker.start();
        try {
            consumerSession.commit();
            FailoverTransactionTest.fail((String)"expected transaciton rolledback ex");
        }
        catch (TransactionRolledBackException transactionRolledBackException) {
            // empty catch block
        }
        this.broker.stop();
        this.broker = this.createBroker(false, this.url);
        this.broker.start();
        FailoverTransactionTest.assertNotNull((String)"should get rolledback message from original restarted broker", (Object)consumer.receive(20000L));
        connection.close();
    }

    public void testWaitForMissingRedeliveries() throws Exception {
        LOG.info("testWaitForMissingRedeliveries()");
        this.broker = this.createBroker(true);
        this.broker.start();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + this.url + ")?jms.consumerFailoverRedeliveryWaitPeriod=30000");
        this.configureConnectionFactory(cf);
        Connection connection = cf.createConnection();
        connection.start();
        Session producerSession = connection.createSession(false, 1);
        Queue destination = producerSession.createQueue(QUEUE_NAME);
        final Session consumerSession = connection.createSession(true, 0);
        MessageConsumer consumer = consumerSession.createConsumer((Destination)destination);
        this.produceMessage(producerSession, destination);
        javax.jms.Message msg = consumer.receive(20000L);
        if (msg == null) {
            AutoFailTestSupport.dumpAllThreads((String)"missing-");
        }
        FailoverTransactionTest.assertNotNull((String)"got message just produced", (Object)msg);
        this.broker.stop();
        this.broker = this.createBroker(false, this.url);
        FailoverTransactionTest.setPersistenceAdapter(this.broker, TestSupport.PersistenceAdapterChoice.JDBC);
        this.broker.start();
        final CountDownLatch commitDone = new CountDownLatch(1);
        final CountDownLatch gotException = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                LOG.info("doing async commit...");
                try {
                    consumerSession.commit();
                }
                catch (JMSException ignored) {
                    ignored.printStackTrace();
                    gotException.countDown();
                }
                finally {
                    commitDone.countDown();
                }
            }
        });
        this.broker.stop();
        this.broker = this.createBroker(false, this.url);
        this.broker.start();
        FailoverTransactionTest.assertTrue((String)"commit was successful", (boolean)commitDone.await(30L, TimeUnit.SECONDS));
        FailoverTransactionTest.assertTrue((String)"got exception on commit", (boolean)gotException.await(30L, TimeUnit.SECONDS));
        FailoverTransactionTest.assertNotNull((String)"should get failed committed message", (Object)consumer.receive(5000L));
        connection.close();
    }

    public void testReDeliveryWhilePending() throws Exception {
        LOG.info("testReDeliveryWhilePending()");
        this.broker = this.createBroker(true);
        this.broker.start();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + this.url + ")?jms.consumerFailoverRedeliveryWaitPeriod=10000");
        this.configureConnectionFactory(cf);
        Connection connection = cf.createConnection();
        connection.start();
        Session producerSession = connection.createSession(false, 1);
        Queue destination = producerSession.createQueue("Failover.WithTx?consumer.prefetchSize=0");
        final Session consumerSession = connection.createSession(true, 0);
        Session secondConsumerSession = connection.createSession(true, 0);
        MessageConsumer consumer = consumerSession.createConsumer((Destination)destination);
        this.produceMessage(producerSession, destination);
        javax.jms.Message msg = consumer.receive(20000L);
        if (msg == null) {
            AutoFailTestSupport.dumpAllThreads((String)"missing-");
        }
        FailoverTransactionTest.assertNotNull((String)"got message just produced", (Object)msg);
        MessageConsumer consumer2 = secondConsumerSession.createConsumer((Destination)consumerSession.createQueue("Failover.WithTx?consumer.prefetchSize=1"));
        this.broker.stop();
        this.broker = this.createBroker(false, this.url);
        this.broker.start();
        final CountDownLatch commitDone = new CountDownLatch(1);
        final CountDownLatch gotRollback = new CountDownLatch(1);
        final Vector exceptions = new Vector();
        Executors.newSingleThreadExecutor().execute(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                LOG.info("doing async commit...");
                try {
                    consumerSession.commit();
                }
                catch (TransactionRolledBackException ex) {
                    gotRollback.countDown();
                }
                catch (JMSException ex) {
                    exceptions.add(ex);
                }
                finally {
                    commitDone.countDown();
                }
            }
        });
        FailoverTransactionTest.assertTrue((String)"commit completed ", (boolean)commitDone.await(15L, TimeUnit.SECONDS));
        FailoverTransactionTest.assertTrue((String)"got Rollback", (boolean)gotRollback.await(15L, TimeUnit.SECONDS));
        FailoverTransactionTest.assertTrue((String)"no other exceptions", (boolean)exceptions.isEmpty());
        javax.jms.Message message = consumer2.receive(2000L);
        if (message == null) {
            message = consumer.receive(2000L);
        }
        consumerSession.commit();
        secondConsumerSession.commit();
        FailoverTransactionTest.assertNotNull((String)"got message after rollback", (Object)message);
        MessageConsumer dlqConsumer = consumerSession.createConsumer((Destination)consumerSession.createQueue("ActiveMQ.DLQ"));
        FailoverTransactionTest.assertNull((String)"nothing in the dlq", (Object)dlqConsumer.receive(2000L));
        connection.close();
    }

    private void produceMessage(Session producerSession, Queue destination) throws JMSException {
        MessageProducer producer = producerSession.createProducer((Destination)destination);
        TextMessage message = producerSession.createTextMessage("Test message");
        producer.send((javax.jms.Message)message);
        producer.close();
    }
}

