package org.apache.activemq.transport.failover;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.Stack;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.TransactionRolledBackException;
import junit.framework.Test;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.util.DestinationPathSeparatorBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.SocketProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/failover/FailoverTransactionTest.class */
public class FailoverTransactionTest extends TestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(FailoverTransactionTest.class);
    private static final String QUEUE_NAME = "Failover.WithTx";
    private static final String TRANSPORT_URI = "tcp://localhost:0";
    private String url;
    BrokerService broker;
    final Random random = new Random();

    public static Test suite() {
        return suite(FailoverTransactionTest.class);
    }

    public void setUp() throws Exception {
        super.setMaxTestTime(120000L);
        super.setAutoFail(true);
        super.setUp();
    }

    public void tearDown() throws Exception {
        super.tearDown();
        stopBroker();
    }

    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    private void startCleanBroker() throws Exception {
        startBroker(true);
    }

    public void startBroker(boolean z) throws Exception {
        this.broker = createBroker(z);
        this.broker.start();
    }

    public void startBroker(boolean z, String str) throws Exception {
        this.broker = createBroker(z, str);
        this.broker.start();
    }

    public BrokerService createBroker(boolean z) throws Exception {
        return createBroker(z, "tcp://localhost:0");
    }

    public BrokerService createBroker(boolean z, String str) throws Exception {
        this.broker = new BrokerService();
        this.broker.setUseJmx(false);
        this.broker.setAdvisorySupport(false);
        this.broker.addConnector(str);
        this.broker.setDeleteAllMessagesOnStartup(z);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setUsePrefetchExtension(false);
        policyMap.setDefaultEntry(policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.url = ((TransportConnector) this.broker.getTransportConnectors().get(0)).getConnectUri().toString();
        return this.broker;
    }

    public void configureConnectionFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
    }

    public void testFailoverProducerCloseBeforeTransaction() throws Exception {
        startCleanBroker();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 1);
        Queue createQueue = createSession.createQueue(QUEUE_NAME);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        this.broker.stop();
        startBroker(false, this.url);
        createSession.commit();
        assertNotNull("we got the message", createConsumer.receive(20000L));
        createSession.commit();
        createConnection.close();
    }

    public void initCombosForTestFailoverCommitReplyLost() {
        String property = System.getProperty("os.name");
        addCombinationValues("defaultPersistenceAdapter", (property.equalsIgnoreCase("AIX") || property.equalsIgnoreCase("SunOS")) ? new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, TestSupport.PersistenceAdapterChoice.JDBC} : new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, TestSupport.PersistenceAdapterChoice.LevelDB, TestSupport.PersistenceAdapterChoice.JDBC});
    }

    public void testFailoverCommitReplyLost() throws Exception {
        this.broker = createBroker(true);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.1
            public void commitTransaction(ConnectionContext connectionContext, TransactionId transactionId, boolean z) throws Exception {
                super.commitTransaction(connectionContext, transactionId, z);
                connectionContext.setDontSendReponse(true);
                Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.1.1
                    @Override // java.lang.Runnable
                    public void run() {
                        FailoverTransactionTest.LOG.info("Stopping broker post commit...");
                        try {
                            FailoverTransactionTest.this.broker.stop();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }});
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        final Session createSession = createConnection.createSession(true, 1);
        Queue createQueue = createSession.createQueue(QUEUE_NAME);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.2
            @Override // java.lang.Runnable
            public void run() {
                FailoverTransactionTest.LOG.info("doing async commit...");
                try {
                    createSession.commit();
                } catch (JMSException e) {
                    TestCase.assertTrue(e instanceof TransactionRolledBackException);
                    FailoverTransactionTest.LOG.info("got commit exception: ", e);
                }
                countDownLatch.countDown();
                FailoverTransactionTest.LOG.info("done async commit");
            }
        });
        this.broker.waitUntilStopped();
        this.broker = createBroker(false, this.url);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
        assertTrue("tx committed through failover", countDownLatch.await(30L, TimeUnit.SECONDS));
        Message receive = createConsumer.receive(20000L);
        LOG.info("Received: " + receive);
        assertNotNull("we got the message", receive);
        assertNull("we got just one message", createConsumer.receive(2000L));
        createSession.commit();
        createConsumer.close();
        createConnection.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        LOG.info("Checking for remaining/hung messages..");
        this.broker = createBroker(false, this.url);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory2 = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory2);
        Connection createConnection2 = activeMQConnectionFactory2.createConnection();
        createConnection2.start();
        Message receive2 = createConnection2.createSession(false, 1).createConsumer(createQueue).receive(1000L);
        LOG.info("Received: " + receive2);
        assertNull("no messges left dangling but got: " + receive2, receive2);
        createConnection2.close();
    }

    public void testFailoverCommitReplyLostWithDestinationPathSeparator() throws Exception {
        this.broker = createBroker(true);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker(), new BrokerPluginSupport() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.3
            public void commitTransaction(ConnectionContext connectionContext, TransactionId transactionId, boolean z) throws Exception {
                super.commitTransaction(connectionContext, transactionId, z);
                connectionContext.setDontSendReponse(true);
                Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.3.1
                    @Override // java.lang.Runnable
                    public void run() {
                        FailoverTransactionTest.LOG.info("Stopping broker post commit...");
                        try {
                            FailoverTransactionTest.this.broker.stop();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }});
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        final Session createSession = createConnection.createSession(true, 1);
        Queue createQueue = createSession.createQueue(QUEUE_NAME.replace('.', '/') + "?consumer.prefetchSize=0");
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.4
            @Override // java.lang.Runnable
            public void run() {
                FailoverTransactionTest.LOG.info("doing async commit...");
                try {
                    createSession.commit();
                } catch (JMSException e) {
                    TestCase.assertTrue(e instanceof TransactionRolledBackException);
                    FailoverTransactionTest.LOG.info("got commit exception: ", e);
                }
                countDownLatch.countDown();
                FailoverTransactionTest.LOG.info("done async commit");
            }
        });
        this.broker.waitUntilStopped();
        this.broker = createBroker(false, this.url);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker()});
        this.broker.start();
        assertTrue("tx committed trough failover", countDownLatch.await(30L, TimeUnit.SECONDS));
        Message receive = createConsumer.receive(20000L);
        LOG.info("Received: " + receive);
        assertNotNull("we got the message", receive);
        assertNull("we got just one message", createConsumer.receive(2000L));
        createSession.commit();
        createConsumer.close();
        createConnection.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        LOG.info("Checking for remaining/hung messages..");
        this.broker = createBroker(false, this.url);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker()});
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory2 = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory2);
        Connection createConnection2 = activeMQConnectionFactory2.createConnection();
        createConnection2.start();
        Message receive2 = createConnection2.createSession(false, 1).createConsumer(createQueue).receive(1000L);
        LOG.info("Received: " + receive2);
        assertNull("no messges left dangling but got: " + receive2, receive2);
        createConnection2.close();
        for (ActiveMQDestination activeMQDestination : this.broker.getRegionBroker().getDestinations()) {
            LOG.info("Destinations list: " + activeMQDestination);
        }
        assertEquals("Only one destination", 1, this.broker.getRegionBroker().getDestinations().length);
    }

    public void initCombosForTestFailoverSendReplyLost() {
        addCombinationValues("defaultPersistenceAdapter", new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, TestSupport.PersistenceAdapterChoice.JDBC});
    }

    public void testFailoverSendReplyLost() throws Exception {
        this.broker = createBroker(true);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.5
            public void send(ProducerBrokerExchange producerBrokerExchange, org.apache.activemq.command.Message message) throws Exception {
                super.send(producerBrokerExchange, message);
                producerBrokerExchange.getConnectionContext().setDontSendReponse(true);
                Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.5.1
                    @Override // java.lang.Runnable
                    public void run() {
                        FailoverTransactionTest.LOG.info("Stopping broker post send...");
                        try {
                            FailoverTransactionTest.this.broker.stop();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }});
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")?jms.watchTopicAdvisories=false");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        final Session createSession = createConnection.createSession(false, 1);
        final Queue createQueue = createSession.createQueue(QUEUE_NAME);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.6
            @Override // java.lang.Runnable
            public void run() {
                FailoverTransactionTest.LOG.info("doing async send...");
                try {
                    FailoverTransactionTest.this.produceMessage(createSession, createQueue);
                } catch (JMSException e) {
                    FailoverTransactionTest.LOG.error("got send exception: ", e);
                    TestCase.fail("got unexpected send exception" + e);
                }
                countDownLatch.countDown();
                FailoverTransactionTest.LOG.info("done async send");
            }
        });
        this.broker.waitUntilStopped();
        this.broker = createBroker(false, this.url);
        setDefaultPersistenceAdapter(this.broker);
        LOG.info("restarting....");
        this.broker.start();
        assertTrue("message sent through failover", countDownLatch.await(30L, TimeUnit.SECONDS));
        Message receive = createConsumer.receive(20000L);
        LOG.info("Received: " + receive);
        assertNotNull("we got the message", receive);
        assertNull("we got just one message", createConsumer.receive(2000L));
        createConsumer.close();
        createConnection.close();
        assertEquals("no newly queued messages", 0L, this.broker.getRegionBroker().getDestinationStatistics().getEnqueues().getCount());
        assertEquals("1 dequeue", 1L, this.broker.getRegionBroker().getDestinationStatistics().getDequeues().getCount());
        this.broker.stop();
        this.broker.waitUntilStopped();
        LOG.info("Checking for remaining/hung messages with second restart..");
        this.broker = createBroker(false, this.url);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory2 = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory2);
        Connection createConnection2 = activeMQConnectionFactory2.createConnection();
        createConnection2.start();
        Message receive2 = createConnection2.createSession(false, 1).createConsumer(createQueue).receive(1000L);
        LOG.info("Received: " + receive2);
        assertNull("no messges left dangling but got: " + receive2, receive2);
        createConnection2.close();
    }

    public void initCombosForTestFailoverConnectionSendReplyLost() {
        addCombinationValues("defaultPersistenceAdapter", new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB, TestSupport.PersistenceAdapterChoice.JDBC});
    }

    public void testFailoverConnectionSendReplyLost() throws Exception {
        this.broker = createBroker(true);
        KahaDBPersistenceAdapter defaultPersistenceAdapter = setDefaultPersistenceAdapter(this.broker);
        if (defaultPersistenceAdapter instanceof KahaDBPersistenceAdapter) {
            defaultPersistenceAdapter.setConcurrentStoreAndDispatchQueues(false);
        }
        final SocketProxy socketProxy = new SocketProxy();
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.7
            private boolean firstSend = true;

            public void send(ProducerBrokerExchange producerBrokerExchange, org.apache.activemq.command.Message message) throws Exception {
                super.send(producerBrokerExchange, message);
                if (this.firstSend) {
                    this.firstSend = false;
                    producerBrokerExchange.getConnectionContext().setDontSendReponse(true);
                    Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.7.1
                        @Override // java.lang.Runnable
                        public void run() {
                            FailoverTransactionTest.LOG.info("Stopping connection post send...");
                            try {
                                socketProxy.close();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
            }
        }});
        this.broker.start();
        socketProxy.setTarget(new URI(this.url));
        socketProxy.open();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + socketProxy.getUrl().toASCIIString() + ")?jms.watchTopicAdvisories=false");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        final Session createSession = createConnection.createSession(false, 1);
        final Queue createQueue = createSession.createQueue(QUEUE_NAME);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.8
            @Override // java.lang.Runnable
            public void run() {
                FailoverTransactionTest.LOG.info("doing async send...");
                try {
                    FailoverTransactionTest.this.produceMessage(createSession, createQueue);
                } catch (JMSException e) {
                    FailoverTransactionTest.LOG.info("got send exception: ", e);
                }
                countDownLatch.countDown();
                FailoverTransactionTest.LOG.info("done async send");
            }
        });
        assertTrue("proxy was closed", socketProxy.waitUntilClosed(30L));
        LOG.info("restarting proxy");
        socketProxy.open();
        assertTrue("message sent through failover", countDownLatch.await(30L, TimeUnit.SECONDS));
        Message receive = createConsumer.receive(20000L);
        LOG.info("Received: " + receive);
        assertNotNull("we got the message", receive);
        assertNull("we got just one message", createConsumer.receive(2000L));
        createConsumer.close();
        createConnection.close();
        assertEquals("one queued message", 1L, this.broker.getRegionBroker().getDestinationStatistics().getEnqueues().getCount());
        this.broker.stop();
        this.broker.waitUntilStopped();
        LOG.info("Checking for remaining/hung messages with restart..");
        this.broker = createBroker(false, this.url);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory2 = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory2);
        Connection createConnection2 = activeMQConnectionFactory2.createConnection();
        createConnection2.start();
        Message receive2 = createConnection2.createSession(false, 1).createConsumer(createQueue).receive(1000L);
        LOG.info("Received: " + receive2);
        assertNull("no messges left dangling but got: " + receive2, receive2);
        createConnection2.close();
    }

    public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
        startCleanBroker();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")?trackTransactionProducers=false");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 1);
        Queue createQueue = createSession.createQueue(QUEUE_NAME);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        this.broker.stop();
        startBroker(false, this.url);
        try {
            createSession.commit();
            fail("expect ex for rollback only on async exc");
        } catch (JMSException e) {
        }
        assertNull("we got the message", createConsumer.receive(5000L));
        createSession.commit();
        createConnection.close();
    }

    public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception {
        startCleanBroker();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 1);
        Queue createQueue = createSession.createQueue(QUEUE_NAME);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        for (int i = 0; i < 10; i++) {
            MessageProducer createProducer = createSession.createProducer(createQueue);
            createProducer.send(createSession.createTextMessage("Test message: 10"));
            createProducer.close();
        }
        this.broker.stop();
        startBroker(false, this.url);
        createSession.commit();
        for (int i2 = 0; i2 < 10; i2++) {
            assertNotNull("we got all the message: 10", createConsumer.receive(20000L));
        }
        createSession.commit();
        createConnection.close();
    }

    public void testFailoverWithConnectionConsumer() throws Exception {
        startCleanBroker();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(true, 1);
        Queue createQueue = createSession.createQueue("testFailoverWithConnectionConsumer");
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final Session createSession2 = createConnection.createSession(false, 1);
        createConnection.createConnectionConsumer(createQueue, (String) null, new ServerSessionPool() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.9
            public ServerSession getServerSession() throws JMSException {
                return new ServerSession() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.9.1
                    public Session getSession() throws JMSException {
                        return createSession2;
                    }

                    public void start() throws JMSException {
                        countDownLatch.countDown();
                        createSession2.run();
                    }
                };
            }
        }, 1);
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        for (int i = 0; i < 10; i++) {
            MessageProducer createProducer = createSession.createProducer(createQueue);
            createProducer.send(createSession.createTextMessage("Test message: 10"));
            createProducer.close();
        }
        this.broker.stop();
        startBroker(false, this.url);
        createSession.commit();
        for (int i2 = 0; i2 < 9; i2++) {
            assertNotNull("Failed to get message: 10", createConsumer.receive(20000L));
        }
        createSession.commit();
        createConnection.close();
        assertTrue("connectionconsumer did not get a message", countDownLatch.await(10L, TimeUnit.SECONDS));
    }

    public void testFailoverConsumerAckLost() throws Exception {
        for (int i = 0; i < 3; i++) {
            try {
                LOG.info("Iteration: " + i);
                doTestFailoverConsumerAckLost(i);
                stopBroker();
            } catch (Throwable th) {
                stopBroker();
                throw th;
            }
        }
    }

    public void doTestFailoverConsumerAckLost(int i) throws Exception {
        this.broker = createBroker(true);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.10
            public void acknowledge(ConsumerBrokerExchange consumerBrokerExchange, final MessageAck messageAck) throws Exception {
                consumerBrokerExchange.getConnectionContext().setDontSendReponse(true);
                Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.10.1
                    @Override // java.lang.Runnable
                    public void run() {
                        FailoverTransactionTest.LOG.info("Stopping broker on ack: " + messageAck);
                        try {
                            FailoverTransactionTest.this.broker.stop();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }});
        this.broker.start();
        Vector vector = new Vector();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        vector.add(createConnection);
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue("Failover.WithTx?consumer.prefetchSize=1");
        Connection createConnection2 = activeMQConnectionFactory.createConnection();
        createConnection2.start();
        vector.add(createConnection2);
        final Session createSession2 = createConnection2.createSession(true, 0);
        Connection createConnection3 = activeMQConnectionFactory.createConnection();
        createConnection3.start();
        vector.add(createConnection3);
        Session createSession3 = createConnection3.createSession(true, 0);
        final MessageConsumer createConsumer = createSession2.createConsumer(createQueue);
        MessageConsumer createConsumer2 = createSession3.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        produceMessage(createSession, createQueue);
        final Vector vector2 = new Vector();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.11
            @Override // java.lang.Runnable
            public void run() {
                FailoverTransactionTest.LOG.info("doing async commit after consume...");
                try {
                    Message receive = createConsumer.receive(20000L);
                    FailoverTransactionTest.LOG.info("consumer1 first attempt got message: " + receive);
                    vector2.add(receive);
                    TimeUnit.SECONDS.sleep(FailoverTransactionTest.this.random.nextInt(5));
                    Message receive2 = createConsumer.receive(5000L);
                    FailoverTransactionTest.LOG.info("consumer1 second attempt got message: " + receive2);
                    if (receive2 != null) {
                        vector2.add(receive2);
                    }
                    FailoverTransactionTest.LOG.info("committing consumer1 session: " + vector2.size() + " messsage(s)");
                    try {
                        createSession2.commit();
                    } catch (TransactionRolledBackException e) {
                        FailoverTransactionTest.LOG.info("got exception ex on commit", e);
                        atomicBoolean.set(true);
                    }
                    countDownLatch.countDown();
                    FailoverTransactionTest.LOG.info("done async commit");
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
            }
        });
        this.broker.waitUntilStopped();
        this.broker = createBroker(false, this.url);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
        assertTrue("tx committed through failover", countDownLatch.await(30L, TimeUnit.SECONDS));
        LOG.info("received message count: " + vector2.size());
        for (int i2 = 0; i2 < 2; i2++) {
            Message receive = createConsumer.receive(5000L);
            LOG.info("post: from consumer1 received: " + receive);
            createSession2.commit();
            if (receive == null) {
                receive = createConsumer2.receive(DurableSubProcessWithRestartTest.BROKER_RESTART);
                LOG.info("post: from consumer2 received: " + receive);
                createSession3.commit();
            }
            assertNotNull("got message [" + i2 + "]", receive);
        }
        Iterator it = vector.iterator();
        while (it.hasNext()) {
            ((Connection) it.next()).close();
        }
        this.broker.stop();
        this.broker.waitUntilStopped();
        LOG.info("Checking for remaining/hung messages..");
        this.broker = createBroker(false, this.url);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory2 = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory2);
        Connection createConnection4 = activeMQConnectionFactory2.createConnection();
        createConnection4.start();
        Message receive2 = createConnection4.createSession(false, 1).createConsumer(createQueue).receive(1000L);
        LOG.info("Sweep received: " + receive2);
        assertNull("no messges left dangling but got: " + receive2, receive2);
        createConnection4.close();
    }

    public void testPoolingNConsumesAfterReconnect() throws Exception {
        ActiveMQMessageConsumer activeMQMessageConsumer;
        this.broker = createBroker(true);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.12
            int count = 0;

            public void removeConsumer(ConnectionContext connectionContext, final ConsumerInfo consumerInfo) throws Exception {
                int i = this.count;
                this.count = i + 1;
                if (i == 1) {
                    Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.12.1
                        @Override // java.lang.Runnable
                        public void run() {
                            FailoverTransactionTest.LOG.info("Stopping broker on removeConsumer: " + consumerInfo);
                            try {
                                FailoverTransactionTest.this.broker.stop();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    });
                }
            }
        }});
        this.broker.start();
        Vector vector = new Vector();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        vector.add(createConnection);
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue("Failover.WithTx?consumer.prefetchSize=1");
        produceMessage(createSession, createQueue);
        createConnection.close();
        ActiveMQConnection createConnection2 = activeMQConnectionFactory.createConnection();
        createConnection2.start();
        vector.add(createConnection2);
        Session createSession2 = createConnection2.createSession(false, 2);
        Stack stack = new Stack();
        for (int i = 0; i < 10; i++) {
            stack.push(createConnection2.createSession(false, 1));
        }
        final ArrayDeque arrayDeque = new ArrayDeque();
        for (int i2 = 0; i2 < 1000; i2++) {
            arrayDeque.push(createSession2.createConsumer(createQueue));
        }
        final ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        final FailoverTransport failoverTransport = (FailoverTransport) createConnection2.getTransport().narrow(FailoverTransport.class);
        final TransportListener transportListener = failoverTransport.getTransportListener();
        failoverTransport.setTransportListener(new TransportListener() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.13
            public void onCommand(Object obj) {
                transportListener.onCommand(obj);
            }

            public void onException(IOException iOException) {
                transportListener.onException(iOException);
            }

            public void transportInterupted() {
                FailoverTransactionTest.LOG.error("Transport interrupted: " + failoverTransport, new RuntimeException("HERE"));
                for (int i3 = 0; i3 < 1000 && !arrayDeque.isEmpty(); i3++) {
                    newCachedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.13.1
                        @Override // java.lang.Runnable
                        public void run() {
                            ActiveMQMessageConsumer activeMQMessageConsumer2;
                            try {
                                synchronized (transportListener) {
                                    activeMQMessageConsumer2 = (MessageConsumer) arrayDeque.pop();
                                }
                                activeMQMessageConsumer2.receive(1L);
                                FailoverTransactionTest.LOG.info("calling close() " + activeMQMessageConsumer2.getConsumerId());
                                activeMQMessageConsumer2.close();
                            } catch (NoSuchElementException e) {
                            } catch (Exception e2) {
                                FailoverTransactionTest.LOG.error("Ex on: " + ((ActiveMQMessageConsumer) null).getConsumerId(), e2);
                            }
                        }
                    });
                }
                transportListener.transportInterupted();
            }

            public void transportResumed() {
                transportListener.transportResumed();
            }
        });
        synchronized (transportListener) {
            activeMQMessageConsumer = (MessageConsumer) arrayDeque.pop();
        }
        LOG.info("calling close to trigger broker stop " + activeMQMessageConsumer.getConsumerId());
        activeMQMessageConsumer.close();
        this.broker.waitUntilStopped();
        this.broker = createBroker(false, this.url);
        setDefaultPersistenceAdapter(this.broker);
        this.broker.start();
        ActiveMQMessageConsumer createConsumer = createSession2.createConsumer(createQueue);
        LOG.info("finally consuming message: " + createConsumer.getConsumerId());
        Message message = null;
        for (int i3 = 0; i3 < 4 && message == null; i3++) {
            message = createConsumer.receive(1000L);
        }
        LOG.info("post: from consumer1 received: " + message);
        assertNotNull("got message after failover", message);
        message.acknowledge();
        Iterator it = vector.iterator();
        while (it.hasNext()) {
            ((Connection) it.next()).close();
        }
    }

    public void testAutoRollbackWithMissingRedeliveries() throws Exception {
        this.broker = createBroker(true);
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue("Failover.WithTx?consumer.prefetchSize=1");
        Session createSession2 = createConnection.createSession(true, 0);
        MessageConsumer createConsumer = createSession2.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        assertNotNull(createConsumer.receive(20000L));
        this.broker.stop();
        this.broker = createBroker(false, this.url);
        setPersistenceAdapter(this.broker, TestSupport.PersistenceAdapterChoice.JDBC);
        this.broker.start();
        try {
            createSession2.commit();
            fail("expected transaciton rolledback ex");
        } catch (TransactionRolledBackException e) {
        }
        this.broker.stop();
        this.broker = createBroker(false, this.url);
        this.broker.start();
        assertNotNull("should get rolledback message from original restarted broker", createConsumer.receive(20000L));
        createConnection.close();
    }

    public void testWaitForMissingRedeliveries() throws Exception {
        LOG.info("testWaitForMissingRedeliveries()");
        this.broker = createBroker(true);
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")?jms.consumerFailoverRedeliveryWaitPeriod=30000");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue(QUEUE_NAME);
        final Session createSession2 = createConnection.createSession(true, 0);
        MessageConsumer createConsumer = createSession2.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        Message receive = createConsumer.receive(20000L);
        if (receive == null) {
            AutoFailTestSupport.dumpAllThreads("missing-");
        }
        assertNotNull("got message just produced", receive);
        this.broker.stop();
        this.broker = createBroker(false, this.url);
        setPersistenceAdapter(this.broker, TestSupport.PersistenceAdapterChoice.JDBC);
        this.broker.start();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.14
            @Override // java.lang.Runnable
            public void run() {
                FailoverTransactionTest.LOG.info("doing async commit...");
                try {
                    try {
                        createSession2.commit();
                        countDownLatch.countDown();
                    } catch (JMSException e) {
                        e.printStackTrace();
                        countDownLatch2.countDown();
                        countDownLatch.countDown();
                    }
                } catch (Throwable th) {
                    countDownLatch.countDown();
                    throw th;
                }
            }
        });
        this.broker.stop();
        this.broker = createBroker(false, this.url);
        this.broker.start();
        assertTrue("commit was successful", countDownLatch.await(30L, TimeUnit.SECONDS));
        assertTrue("got exception on commit", countDownLatch2.await(30L, TimeUnit.SECONDS));
        assertNotNull("should get failed committed message", createConsumer.receive(5000L));
        createConnection.close();
    }

    public void testReDeliveryWhilePending() throws Exception {
        LOG.info("testReDeliveryWhilePending()");
        this.broker = createBroker(true);
        this.broker.start();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("failover:(" + this.url + ")?jms.consumerFailoverRedeliveryWaitPeriod=10000");
        configureConnectionFactory(activeMQConnectionFactory);
        Connection createConnection = activeMQConnectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = createSession.createQueue("Failover.WithTx?consumer.prefetchSize=0");
        final Session createSession2 = createConnection.createSession(true, 0);
        Session createSession3 = createConnection.createSession(true, 0);
        MessageConsumer createConsumer = createSession2.createConsumer(createQueue);
        produceMessage(createSession, createQueue);
        Message receive = createConsumer.receive(20000L);
        if (receive == null) {
            AutoFailTestSupport.dumpAllThreads("missing-");
        }
        assertNotNull("got message just produced", receive);
        MessageConsumer createConsumer2 = createSession3.createConsumer(createSession2.createQueue("Failover.WithTx?consumer.prefetchSize=1"));
        this.broker.stop();
        this.broker = createBroker(false, this.url);
        this.broker.start();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        final Vector vector = new Vector();
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.transport.failover.FailoverTransactionTest.15
            @Override // java.lang.Runnable
            public void run() {
                FailoverTransactionTest.LOG.info("doing async commit...");
                try {
                    try {
                        try {
                            createSession2.commit();
                            countDownLatch.countDown();
                        } catch (JMSException e) {
                            vector.add(e);
                            countDownLatch.countDown();
                        }
                    } catch (TransactionRolledBackException e2) {
                        countDownLatch2.countDown();
                        countDownLatch.countDown();
                    }
                } catch (Throwable th) {
                    countDownLatch.countDown();
                    throw th;
                }
            }
        });
        assertTrue("commit completed ", countDownLatch.await(15L, TimeUnit.SECONDS));
        assertTrue("got Rollback", countDownLatch2.await(15L, TimeUnit.SECONDS));
        assertTrue("no other exceptions", vector.isEmpty());
        Message receive2 = createConsumer2.receive(2000L);
        if (receive2 == null) {
            receive2 = createConsumer.receive(2000L);
        }
        createSession2.commit();
        createSession3.commit();
        assertNotNull("got message after rollback", receive2);
        assertNull("nothing in the dlq", createSession2.createConsumer(createSession2.createQueue("ActiveMQ.DLQ")).receive(2000L));
        createConnection.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void produceMessage(Session session, Queue queue) throws JMSException {
        MessageProducer createProducer = session.createProducer(queue);
        createProducer.send(session.createTextMessage("Test message"));
        createProducer.close();
    }
}
