package org.apache.activemq.store.jdbc;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Arrays;
import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.XASession;
import javax.management.ObjectName;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQXAConnection;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.TestUtils;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/store/jdbc/XACompletionTest.class */
public class XACompletionTest extends TestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(XACompletionTest.class);
    protected ActiveMQXAConnectionFactory factory;
    protected static final int messagesExpected = 1;
    protected BrokerService broker;
    protected String connectionUri;

    @Parameterized.Parameter
    public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice;

    @Parameterized.Parameters(name = "store={0}")
    public static Iterable<Object[]> getTestParameters() {
        return Arrays.asList(new Object[]{TestSupport.PersistenceAdapterChoice.KahaDB}, new Object[]{TestSupport.PersistenceAdapterChoice.JDBC});
    }

    @Before
    public void setUp() throws Exception {
        this.broker = createBroker();
    }

    @After
    public void stopAll() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
            this.broker = null;
        }
    }

    @Test
    public void testStatsAndRedispatchAfterAckPreparedClosed() throws Exception {
        this.factory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=0");
        this.factory.setWatchTopicAdvisories(false);
        sendMessages(1);
        ActiveMQXAConnection createXAConnection = this.factory.createXAConnection();
        createXAConnection.start();
        XASession createXASession = createXAConnection.createXASession();
        MessageConsumer createConsumer = createXASession.createConsumer(createXASession.createQueue("TEST"));
        XAResource xAResource = createXASession.getXAResource();
        xAResource.recover(16777216);
        xAResource.recover(0);
        Xid createXid = TestUtils.createXid();
        xAResource.start(createXid, 0);
        LOG.info("Received : " + createConsumer.receive(2000L));
        xAResource.end(createXid, 67108864);
        createXAConnection.close();
        dumpMessages();
        dumpMessages();
        LOG.info("Try jmx browse... after commit");
        assertEquals("size", 1L, ((QueueViewMBean) this.broker.getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST"), QueueViewMBean.class, true)).getQueueSize());
        LOG.info("Try receive... after rollback");
        assertNotNull("message gone", regularReceive("TEST"));
    }

    @Test
    public void testStatsAndBrowseAfterAckPreparedCommitted() throws Exception {
        this.factory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=1");
        this.factory.setWatchTopicAdvisories(false);
        sendMessages(1);
        ActiveMQXAConnection createXAConnection = this.factory.createXAConnection();
        createXAConnection.start();
        XASession createXASession = createXAConnection.createXASession();
        MessageConsumer createConsumer = createXASession.createConsumer(createXASession.createQueue("TEST"));
        XAResource xAResource = createXASession.getXAResource();
        xAResource.recover(16777216);
        xAResource.recover(0);
        Xid createXid = TestUtils.createXid();
        xAResource.start(createXid, 0);
        int i = 0;
        for (int i2 = 0; i2 < 1; i2++) {
            try {
                LOG.debug("Receiving message " + (i + 1) + " of 1");
                LOG.info("Received : " + createConsumer.receive(2000L));
                i++;
            } catch (Exception e) {
                LOG.debug("Caught exception:", e);
            }
        }
        xAResource.end(createXid, 67108864);
        xAResource.prepare(createXid);
        createConsumer.close();
        dumpMessages();
        xAResource.commit(createXid, false);
        dumpMessages();
        LOG.info("Try jmx browse... after commit");
        QueueViewMBean queueViewMBean = (QueueViewMBean) this.broker.getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST"), QueueViewMBean.class, true);
        assertTrue(queueViewMBean.browseMessages().isEmpty());
        assertEquals("prefetch 0", 0L, queueViewMBean.getInFlightCount());
        assertEquals("size 0", 0L, queueViewMBean.getQueueSize());
        LOG.info("Try browse... after commit");
        assertNull("message gone", regularBrowseFirst());
        LOG.info("Try receive... after commit");
        assertNull("message gone", regularReceive("TEST"));
    }

    @Test
    public void testStatsAndBrowseAfterAckPreparedRolledback() throws Exception {
        this.factory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0");
        this.factory.setWatchTopicAdvisories(false);
        sendMessages(10);
        final QueueViewMBean queueViewMBean = (QueueViewMBean) this.broker.getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST"), QueueViewMBean.class, true);
        ActiveMQXAConnection createXAConnection = this.factory.createXAConnection();
        createXAConnection.start();
        XASession createXASession = createXAConnection.createXASession();
        MessageConsumer createConsumer = createXASession.createConsumer(createXASession.createQueue("TEST"));
        XAResource xAResource = createXASession.getXAResource();
        xAResource.recover(16777216);
        xAResource.recover(0);
        assertEquals("prefetch 0", 0L, queueViewMBean.getInFlightCount());
        assertEquals("size 0", 10L, queueViewMBean.getQueueSize());
        assertEquals("size 0", 0, queueViewMBean.cursorSize());
        Xid createXid = TestUtils.createXid();
        xAResource.start(createXid, 0);
        for (int i = 0; i < 5; i++) {
            try {
                LOG.info("Received : " + createConsumer.receive(2000L));
            } catch (Exception e) {
                LOG.debug("Caught exception:", e);
            }
        }
        xAResource.end(createXid, 67108864);
        xAResource.prepare(createXid);
        createConsumer.close();
        dumpMessages();
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.store.jdbc.XACompletionTest.1
            public boolean isSatisified() throws Exception {
                return queueViewMBean.getInFlightCount() == 0;
            }
        });
        assertEquals("prefetch", 0L, queueViewMBean.getInFlightCount());
        assertEquals("size", 10L, queueViewMBean.getQueueSize());
        assertEquals("cursor size", 0, queueViewMBean.cursorSize());
        xAResource.rollback(createXid);
        dumpMessages();
        LOG.info("Try jmx browse... after rollback");
        assertEquals(10, queueViewMBean.browseMessages().size());
        assertEquals("prefetch", 0L, queueViewMBean.getInFlightCount());
        assertEquals("size", 10L, queueViewMBean.getQueueSize());
        assertEquals("cursor size", 0, queueViewMBean.cursorSize());
        LOG.info("Try browse... after");
        assertNotNull("message gone", regularBrowseFirst());
        LOG.info("Try receive... after");
        for (int i2 = 0; i2 < 10; i2++) {
            assertNotNull("message gone", regularReceive("TEST"));
        }
    }

    @Test
    public void testStatsAndConsumeAfterAckPreparedRolledback() throws Exception {
        this.factory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0");
        this.factory.setWatchTopicAdvisories(false);
        sendMessages(10);
        ActiveMQXAConnection createXAConnection = this.factory.createXAConnection();
        createXAConnection.start();
        XASession createXASession = createXAConnection.createXASession();
        MessageConsumer createConsumer = createXASession.createConsumer(createXASession.createQueue("TEST"));
        XAResource xAResource = createXASession.getXAResource();
        xAResource.recover(16777216);
        xAResource.recover(0);
        dumpMessages();
        Xid createXid = TestUtils.createXid();
        xAResource.start(createXid, 0);
        int i = 0;
        for (int i2 = 0; i2 < 5; i2++) {
            try {
                LOG.debug("Receiving message " + (i + 1) + " of 1");
                LOG.info("Received : " + createConsumer.receive(2000L));
                i++;
            } catch (Exception e) {
                LOG.debug("Caught exception:", e);
            }
        }
        xAResource.end(createXid, 67108864);
        xAResource.prepare(createXid);
        createConsumer.close();
        LOG.info("after close");
        dumpMessages();
        assertEquals("drain", 5, drainUnack(5, "TEST"));
        dumpMessages();
        this.broker = restartBroker();
        assertEquals("redrain", 5, drainUnack(5, "TEST"));
        LOG.info("Try consume... after restart");
        dumpMessages();
        QueueViewMBean queueViewMBean = (QueueViewMBean) this.broker.getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST"), QueueViewMBean.class, true);
        assertEquals("prefetch", 0L, queueViewMBean.getInFlightCount());
        assertEquals("size", 5L, queueViewMBean.getQueueSize());
        assertEquals("cursor size 0", 0, queueViewMBean.cursorSize());
        this.factory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0");
        this.factory.setWatchTopicAdvisories(false);
        ActiveMQXAConnection createXAConnection2 = this.factory.createXAConnection();
        createXAConnection2.start();
        XAResource xAResource2 = createXAConnection2.createXASession().getXAResource();
        Xid[] recover = xAResource2.recover(16777216);
        xAResource2.recover(0);
        LOG.info("Rollback outcome for ack");
        xAResource2.rollback(recover[0]);
        LOG.info("Try receive... after rollback");
        for (int i3 = 0; i3 < 10; i3++) {
            assertNotNull("message gone: " + i3, regularReceive("TEST"));
        }
        dumpMessages();
        assertNull("none left", regularReceive("TEST"));
        assertEquals("prefetch", 0L, queueViewMBean.getInFlightCount());
        assertEquals("size", 0L, queueViewMBean.getQueueSize());
        assertEquals("cursor size", 0, queueViewMBean.cursorSize());
        assertEquals("dq", 10L, queueViewMBean.getDequeueCount());
    }

    @Test
    public void testStatsAndConsumeAfterAckPreparedRolledbackOutOfOrderRecovery() throws Exception {
        this.factory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0");
        this.factory.setWatchTopicAdvisories(false);
        sendMessages(20);
        for (int i = 0; i < 10; i++) {
            ActiveMQXAConnection createXAConnection = this.factory.createXAConnection();
            createXAConnection.start();
            XASession createXASession = createXAConnection.createXASession();
            MessageConsumer createConsumer = createXASession.createConsumer(createXASession.createQueue("TEST"));
            XAResource xAResource = createXASession.getXAResource();
            Xid createXid = TestUtils.createXid();
            xAResource.start(createXid, 0);
            try {
                LOG.info("Received (" + i + ") : ," + createConsumer.receive(2000L));
            } catch (Exception e) {
                LOG.debug("Caught exception:", e);
            }
            xAResource.end(createXid, 67108864);
            xAResource.prepare(createXid);
        }
        ActiveMQXAConnection createXAConnection2 = this.factory.createXAConnection();
        createXAConnection2.start();
        XAResource xAResource2 = createXAConnection2.createXASession().getXAResource();
        Xid[] recover = xAResource2.recover(16777216);
        xAResource2.recover(0);
        xAResource2.rollback(recover[0]);
        xAResource2.rollback(recover[1]);
        createXAConnection2.close();
        LOG.info("RESTART");
        this.broker = restartBroker();
        dumpMessages();
        Connection createConnection = new ActiveMQConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=1").createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 2);
        createSession.createConsumer(createSession.createQueue("TEST")).close();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0");
        ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0");
        activeMQXAConnectionFactory.setWatchTopicAdvisories(false);
        ActiveMQXAConnection createXAConnection3 = activeMQXAConnectionFactory.createXAConnection();
        createXAConnection3.start();
        XAResource xAResource3 = createXAConnection3.createXASession().getXAResource();
        Xid[] recover2 = xAResource3.recover(16777216);
        xAResource3.recover(0);
        for (Xid xid : recover2) {
            xAResource3.rollback(xid);
        }
        createSession.createConsumer(new ActiveMQQueue("TEST?consumer.prefetchSize=2"));
        LOG.info("Try receive... after rollback");
        Message regularReceiveWith = regularReceiveWith(activeMQConnectionFactory, "TEST");
        assertNotNull("message 1: ", regularReceiveWith);
        LOG.info("Received : " + regularReceiveWith);
        dumpMessages();
        Message regularReceiveWith2 = regularReceiveWith(activeMQConnectionFactory, "TEST");
        assertNotNull("last message", regularReceiveWith2);
        LOG.info("Received : " + regularReceiveWith2);
    }

    @Test
    public void testMoveInTwoBranches() throws Exception {
        this.factory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=1");
        this.factory.setWatchTopicAdvisories(false);
        sendMessages(1);
        ActiveMQXAConnection createXAConnection = this.factory.createXAConnection();
        createXAConnection.start();
        XASession createXASession = createXAConnection.createXASession();
        MessageConsumer createConsumer = createXASession.createConsumer(createXASession.createQueue("TEST"));
        XAResource xAResource = createXASession.getXAResource();
        final Xid createXid = TestUtils.createXid();
        byte[] branchQualifier = createXid.getBranchQualifier();
        final byte[] copyOf = Arrays.copyOf(branchQualifier, branchQualifier.length);
        copyOf[0] = 33;
        Xid xid = new Xid() { // from class: org.apache.activemq.store.jdbc.XACompletionTest.2
            public int getFormatId() {
                return createXid.getFormatId();
            }

            public byte[] getGlobalTransactionId() {
                return createXid.getGlobalTransactionId();
            }

            public byte[] getBranchQualifier() {
                return copyOf;
            }
        };
        xAResource.start(createXid, 0);
        int i = 0;
        for (int i2 = 0; i2 < 1; i2++) {
            try {
                LOG.debug("Receiving message " + (i + 1) + " of 1");
                LOG.info("Received : " + createConsumer.receive(2000L));
                i++;
            } catch (Exception e) {
                LOG.debug("Caught exception:", e);
            }
        }
        xAResource.end(createXid, 67108864);
        this.factory.createXAConnection().start();
        XASession createXASession2 = createXAConnection.createXASession();
        MessageProducer createProducer = createXASession2.createProducer(createXASession2.createQueue("TEST_MOVE"));
        XAResource xAResource2 = createXASession2.getXAResource();
        xAResource2.start(xid, 0);
        ActiveMQMessage createTextMessage = createXASession2.createTextMessage();
        createTextMessage.setTransactionId(new XATransactionId(xid));
        createProducer.send(createTextMessage);
        xAResource2.end(xid, 67108864);
        xAResource2.prepare(xid);
        xAResource.prepare(createXid);
        createConsumer.close();
        LOG.info("Prepared");
        dumpMessages();
        LOG.info("Commit Ack");
        xAResource.commit(createXid, false);
        dumpMessages();
        LOG.info("Commit Send");
        xAResource2.commit(xid, false);
        dumpMessages();
        LOG.info("Try jmx browse... after commit");
        QueueViewMBean queueViewMBean = (QueueViewMBean) this.broker.getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST"), QueueViewMBean.class, true);
        assertTrue(queueViewMBean.browseMessages().isEmpty());
        assertEquals("dq ", 1L, queueViewMBean.getDequeueCount());
        assertEquals("size 0", 0L, queueViewMBean.getQueueSize());
        QueueViewMBean queueViewMBean2 = (QueueViewMBean) this.broker.getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST_MOVE"), QueueViewMBean.class, true);
        assertEquals("enq", 1L, queueViewMBean2.getEnqueueCount());
        assertEquals("size 1", 1L, queueViewMBean2.getQueueSize());
        assertNotNull(regularReceive("TEST_MOVE"));
        assertEquals("size 0", 0L, queueViewMBean2.getQueueSize());
    }

    @Test
    public void testMoveInTwoBranchesTwoBrokers() throws Exception {
        this.factory = new ActiveMQXAConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries=1");
        this.factory.setWatchTopicAdvisories(false);
        sendMessages(1);
        ActiveMQXAConnection createXAConnection = this.factory.createXAConnection();
        createXAConnection.start();
        XASession createXASession = createXAConnection.createXASession();
        MessageConsumer createConsumer = createXASession.createConsumer(createXASession.createQueue("TEST"));
        XAResource xAResource = createXASession.getXAResource();
        final Xid createXid = TestUtils.createXid();
        byte[] branchQualifier = createXid.getBranchQualifier();
        final byte[] copyOf = Arrays.copyOf(branchQualifier, branchQualifier.length);
        copyOf[0] = 33;
        Xid xid = new Xid() { // from class: org.apache.activemq.store.jdbc.XACompletionTest.3
            public int getFormatId() {
                return createXid.getFormatId();
            }

            public byte[] getGlobalTransactionId() {
                return createXid.getGlobalTransactionId();
            }

            public byte[] getBranchQualifier() {
                return copyOf;
            }
        };
        xAResource.start(createXid, 0);
        int i = 0;
        for (int i2 = 0; i2 < 1; i2++) {
            try {
                LOG.debug("Receiving message " + (i + 1) + " of 1");
                LOG.info("Received : " + createConsumer.receive(2000L));
                i++;
            } catch (Exception e) {
                LOG.debug("Caught exception:", e);
            }
        }
        xAResource.end(createXid, 67108864);
        this.factory.createXAConnection().start();
        XASession createXASession2 = createXAConnection.createXASession();
        MessageProducer createProducer = createXASession2.createProducer(createXASession2.createQueue("TEST_MOVE"));
        XAResource xAResource2 = createXASession2.getXAResource();
        xAResource2.start(xid, 0);
        ActiveMQMessage createTextMessage = createXASession2.createTextMessage();
        createTextMessage.setTransactionId(new XATransactionId(xid));
        createProducer.send(createTextMessage);
        xAResource2.end(xid, 67108864);
        xAResource2.prepare(xid);
        xAResource.prepare(createXid);
        createConsumer.close();
        LOG.info("Prepared");
        dumpMessages();
        LOG.info("Commit Ack");
        xAResource.commit(createXid, false);
        dumpMessages();
        LOG.info("Commit Send");
        xAResource2.commit(xid, false);
        dumpMessages();
        LOG.info("Try jmx browse... after commit");
        QueueViewMBean queueViewMBean = (QueueViewMBean) this.broker.getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST"), QueueViewMBean.class, true);
        assertTrue(queueViewMBean.browseMessages().isEmpty());
        assertEquals("dq ", 1L, queueViewMBean.getDequeueCount());
        assertEquals("size 0", 0L, queueViewMBean.getQueueSize());
        QueueViewMBean queueViewMBean2 = (QueueViewMBean) this.broker.getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=TEST_MOVE"), QueueViewMBean.class, true);
        assertEquals("enq", 1L, queueViewMBean2.getEnqueueCount());
        assertEquals("size 1", 1L, queueViewMBean2.getQueueSize());
        assertNotNull(regularReceive("TEST_MOVE"));
        assertEquals("size 0", 0L, queueViewMBean2.getQueueSize());
    }

    private Message regularReceive(String str) throws Exception {
        return regularReceiveWith(new ActiveMQConnectionFactory(this.connectionUri), str);
    }

    private Message regularReceiveWith(ActiveMQConnectionFactory activeMQConnectionFactory, String str) throws Exception {
        Connection createConnection = activeMQConnectionFactory.createConnection();
        try {
            createConnection.start();
            Session createSession = createConnection.createSession(false, 1);
            Message receive = createSession.createConsumer(createSession.createQueue(str)).receive(2000L);
            createConnection.close();
            return receive;
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    private int drainUnack(int i, String str) throws Exception {
        int i2 = 0;
        Connection createConnection = new ActiveMQConnectionFactory(this.connectionUri + "?jms.prefetchPolicy.all=" + i).createConnection();
        try {
            createConnection.start();
            Session createSession = createConnection.createSession(false, 2);
            MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(str));
            while (i2 < i && createConsumer.receive(2000L) != null) {
                i2++;
            }
            createConsumer.close();
            createConnection.close();
            return i2;
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    private Message regularBrowseFirst() throws Exception {
        Connection createConnection = this.factory.createConnection();
        try {
            createConnection.start();
            Session createSession = createConnection.createSession(false, 1);
            Enumeration enumeration = createSession.createBrowser(createSession.createQueue("TEST")).getEnumeration();
            if (!enumeration.hasMoreElements()) {
                return null;
            }
            Message message = (Message) enumeration.nextElement();
            createConnection.close();
            return message;
        } finally {
            createConnection.close();
        }
    }

    protected void sendMessages(int i) throws Exception {
        sendMessagesWith(this.factory, i);
    }

    protected void sendMessagesWith(ConnectionFactory connectionFactory, int i) throws Exception {
        Connection createConnection = connectionFactory.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue("TEST"));
        createProducer.setDeliveryMode(2);
        for (int i2 = 0; i2 < i; i2++) {
            LOG.debug("Sending message " + (i2 + 1) + " of " + i);
            createProducer.send(createSession.createTextMessage("test message " + (i2 + 1)));
        }
        createConnection.close();
    }

    protected void dumpMessages() throws Exception {
        if (this.persistenceAdapterChoice.compareTo(TestSupport.PersistenceAdapterChoice.JDBC) != 0) {
            return;
        }
        OpenWireFormat openWireFormat = new OpenWireFormat();
        java.sql.Connection connection = this.broker.getPersistenceAdapter().getDataSource().getConnection();
        PreparedStatement prepareStatement = connection.prepareStatement("SELECT ID, MSG, XID FROM ACTIVEMQ_MSGS");
        ResultSet executeQuery = prepareStatement.executeQuery();
        LOG.info("Messages in broker db...");
        while (executeQuery.next()) {
            long j = executeQuery.getLong(1);
            org.apache.activemq.command.Message message = (org.apache.activemq.command.Message) openWireFormat.unmarshal(new ByteSequence(executeQuery.getBytes(2)));
            LOG.info("id: " + j + ", message SeqId: " + message.getMessageId().getBrokerSequenceId() + ", XID:" + executeQuery.getString(3) + ", MSG: " + message);
        }
        prepareStatement.close();
        connection.close();
    }

    protected BrokerService createBroker() throws Exception {
        return createBroker(true);
    }

    protected BrokerService restartBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
        return createBroker(false);
    }

    protected BrokerService createBroker(boolean z) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setAdvisorySupport(false);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setExpireMessagesPeriod(0L);
        policyMap.setDefaultEntry(policyEntry);
        brokerService.setDestinationPolicy(policyMap);
        brokerService.setDeleteAllMessagesOnStartup(z);
        setPersistenceAdapter(brokerService, this.persistenceAdapterChoice);
        brokerService.setPersistent(true);
        this.connectionUri = brokerService.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT).getPublishableConnectString();
        brokerService.start();
        return brokerService;
    }
}
