/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker;

import java.lang.reflect.UndeclaredThrowableException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.management.InstanceNotFoundException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.transaction.xa.Xid;
import junit.framework.Test;
import junit.textui.TestRunner;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.TransactionContext;
import org.apache.activemq.broker.BrokerRestartTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.StubConnection;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.jmx.PersistenceAdapterViewMBean;
import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DataArrayResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.JMXSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class XARecoveryBrokerTest
extends BrokerRestartTestSupport {
    protected static final Logger LOG = LoggerFactory.getLogger(XARecoveryBrokerTest.class);
    public boolean prioritySupport = true;
    public boolean keepDurableSubsActive = false;

    public void testPreparedJmxView() throws Exception {
        ActiveMQDestination destination = this.createDestination();
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        ProducerInfo producerInfo = this.createProducerInfo(sessionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        connection.send((Command)producerInfo);
        ConsumerInfo consumerInfo = this.createConsumerInfo(sessionInfo, destination);
        connection.send((Command)consumerInfo);
        for (int i = 0; i < 4; ++i) {
            XATransactionId txid = this.createXATransaction(sessionInfo);
            connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
            Message message = this.createMessage(producerInfo, destination);
            message.setPersistent(true);
            message.setTransactionId((TransactionId)txid);
            connection.send((Command)message);
            connection.send((Command)this.createPrepareTransaction(connectionInfo, (TransactionId)txid));
        }
        Response response = connection.request((Command)new TransactionInfo(connectionInfo.getConnectionId(), null, 5));
        XARecoveryBrokerTest.assertNotNull((Object)response);
        DataArrayResponse dar = (DataArrayResponse)response;
        XARecoveryBrokerTest.assertEquals((int)4, (int)dar.getData().length);
        if (this.broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
            PersistenceAdapterViewMBean kahadbView = this.getProxyToPersistenceAdapter(this.broker.getPersistenceAdapter().toString());
            String txFromView = kahadbView.getTransactions();
            LOG.info("Tx view fromm PA:" + txFromView);
            XARecoveryBrokerTest.assertTrue((String)("xid with our dud format in transaction string " + txFromView), (boolean)txFromView.contains("XID:[55,"));
        }
        this.restartBroker();
        connection = this.createConnection();
        connectionInfo = this.createConnectionInfo();
        connection.send((Command)connectionInfo);
        response = connection.request((Command)new TransactionInfo(connectionInfo.getConnectionId(), null, 5));
        XARecoveryBrokerTest.assertNotNull((Object)response);
        dar = (DataArrayResponse)response;
        XARecoveryBrokerTest.assertEquals((int)4, (int)dar.getData().length);
        TransactionContext transactionContextXAResource = new TransactionContext(ActiveMQConnection.makeConnection((String)this.broker.getVmConnectorURI().toString()));
        LinkedList<Xid> tracked = new LinkedList<Xid>();
        Xid[] recoveryXids = transactionContextXAResource.recover(0x1000000);
        while (recoveryXids.length > 0) {
            tracked.addAll(Arrays.asList(recoveryXids));
            recoveryXids = transactionContextXAResource.recover(0);
        }
        XARecoveryBrokerTest.assertEquals((String)"got 4 via scan loop", (int)4, (int)tracked.size());
        DestinationViewMBean destinationView = this.getProxyToDestination(this.destinationList(destination)[0]);
        XARecoveryBrokerTest.assertEquals((String)"enqueue count does not see prepared", (long)0L, (long)destinationView.getQueueSize());
        TransactionId first = (TransactionId)dar.getData()[0];
        int commitCount = 0;
        for (int i = 0; i < 4; ++i) {
            RecoveredXATransactionViewMBean mbean = this.getProxyToPreparedTransactionViewMBean((TransactionId)dar.getData()[i]);
            if (i % 2 == 0) {
                mbean.heuristicCommit();
                ++commitCount;
                continue;
            }
            mbean.heuristicRollback();
        }
        response = connection.request((Command)new TransactionInfo(connectionInfo.getConnectionId(), null, 5));
        XARecoveryBrokerTest.assertNotNull((Object)response);
        dar = (DataArrayResponse)response;
        XARecoveryBrokerTest.assertEquals((int)0, (int)dar.getData().length);
        XARecoveryBrokerTest.assertEquals((String)"enqueue count reflects outcome", (long)commitCount, (long)destinationView.getQueueSize());
        try {
            RecoveredXATransactionViewMBean gone = this.getProxyToPreparedTransactionViewMBean(first);
            gone.heuristicRollback();
            XARecoveryBrokerTest.fail((String)"Excepted not found");
        }
        catch (InstanceNotFoundException instanceNotFoundException) {
            // empty catch block
        }
    }

    private PersistenceAdapterViewMBean getProxyToPersistenceAdapter(String name) throws MalformedObjectNameException, JMSException {
        return (PersistenceAdapterViewMBean)this.broker.getManagementContext().newProxyInstance(BrokerMBeanSupport.createPersistenceAdapterName((String)this.broker.getBrokerObjectName().toString(), (String)name), PersistenceAdapterViewMBean.class, true);
    }

    private RecoveredXATransactionViewMBean getProxyToPreparedTransactionViewMBean(TransactionId xid) throws MalformedObjectNameException, JMSException {
        ObjectName objectName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,transactionType=RecoveredXaTransaction,xid=" + JMXSupport.encodeObjectNamePart((String)xid.toString()));
        RecoveredXATransactionViewMBean proxy = (RecoveredXATransactionViewMBean)this.broker.getManagementContext().newProxyInstance(objectName, RecoveredXATransactionViewMBean.class, true);
        return proxy;
    }

    private DestinationViewMBean getProxyToDestination(ActiveMQDestination destination) throws MalformedObjectNameException, JMSException {
        ObjectName objectName = new ObjectName("org.apache.activemq:type=Broker,brokerName=" + this.broker.getBrokerName() + ",destinationType=" + JMXSupport.encodeObjectNamePart((String)destination.getDestinationTypeAsString()) + ",destinationName=" + JMXSupport.encodeObjectNamePart((String)destination.getPhysicalName()));
        DestinationViewMBean proxy = (DestinationViewMBean)this.broker.getManagementContext().newProxyInstance(objectName, DestinationViewMBean.class, true);
        return proxy;
    }

    public void testPreparedTransactionRecoveredOnRestart() throws Exception {
        ActiveMQDestination destination = this.createDestination();
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        ProducerInfo producerInfo = this.createProducerInfo(sessionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        connection.send((Command)producerInfo);
        ConsumerInfo consumerInfo = this.createConsumerInfo(sessionInfo, destination);
        connection.send((Command)consumerInfo);
        for (int i = 0; i < 4; ++i) {
            XATransactionId txid = this.createXATransaction(sessionInfo);
            connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
            Message message = this.createMessage(producerInfo, destination);
            message.setPersistent(true);
            message.setTransactionId((TransactionId)txid);
            connection.send((Command)message);
            connection.send((Command)this.createPrepareTransaction(connectionInfo, (TransactionId)txid));
        }
        XARecoveryBrokerTest.assertNull((Object)this.receiveMessage(connection));
        this.assertNoMessagesLeft(connection);
        connection.request((Command)this.closeConnectionInfo(connectionInfo));
        this.restartBroker();
        connection = this.createConnection();
        connectionInfo = this.createConnectionInfo();
        sessionInfo = this.createSessionInfo(connectionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        consumerInfo = this.createConsumerInfo(sessionInfo, destination);
        connection.send((Command)consumerInfo);
        XARecoveryBrokerTest.assertNull((Object)this.receiveMessage(connection));
        this.assertNoMessagesLeft(connection);
        Response response = connection.request((Command)new TransactionInfo(connectionInfo.getConnectionId(), null, 5));
        XARecoveryBrokerTest.assertNotNull((Object)response);
        DataArrayResponse dar = (DataArrayResponse)response;
        XARecoveryBrokerTest.assertEquals((int)4, (int)dar.getData().length);
        connection.request((Command)this.closeConnectionInfo(connectionInfo));
        connection = this.createConnection();
        connectionInfo = this.createConnectionInfo();
        sessionInfo = this.createSessionInfo(connectionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        consumerInfo = this.createConsumerInfo(sessionInfo, destination);
        connection.send((Command)consumerInfo);
        for (int i = 0; i < dar.getData().length; ++i) {
            TransactionId transactionId = (TransactionId)dar.getData()[i];
            LOG.info("commit: " + transactionId);
            connection.request((Command)this.createCommitTransaction2Phase(connectionInfo, transactionId));
        }
        int countToReceive = this.expectedMessageCount(4, destination);
        for (int i = 0; i < countToReceive; ++i) {
            Message m = this.receiveMessage(connection, TimeUnit.SECONDS.toMillis(10L));
            LOG.info("received: " + m);
            XARecoveryBrokerTest.assertNotNull((String)("Got non null message: " + i), (Object)m);
        }
        this.assertNoMessagesLeft(connection);
        this.assertEmptyDLQ();
    }

    private void assertEmptyDLQ() throws Exception {
        block2: {
            try {
                DestinationViewMBean destinationView = this.getProxyToDestination((ActiveMQDestination)new ActiveMQQueue("ActiveMQ.DLQ"));
                XARecoveryBrokerTest.assertEquals((String)"nothing on dlq", (long)0L, (long)destinationView.getQueueSize());
                XARecoveryBrokerTest.assertEquals((String)"nothing added to dlq", (long)0L, (long)destinationView.getEnqueueCount());
            }
            catch (UndeclaredThrowableException maybeOk) {
                if (maybeOk.getUndeclaredThrowable() instanceof InstanceNotFoundException) break block2;
                throw maybeOk;
            }
        }
    }

    public void testPreparedInterleavedTransactionRecoveredOnRestart() throws Exception {
        int i;
        ActiveMQDestination destination = this.createDestination();
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        ProducerInfo producerInfo = this.createProducerInfo(sessionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        connection.send((Command)producerInfo);
        ConsumerInfo consumerInfo = this.createConsumerInfo(sessionInfo, destination);
        connection.send((Command)consumerInfo);
        for (int i2 = 0; i2 < 4; ++i2) {
            XATransactionId txid = this.createXATransaction(sessionInfo);
            connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
            Message message = this.createMessage(producerInfo, destination);
            message.setPersistent(true);
            message.setTransactionId((TransactionId)txid);
            connection.send((Command)message);
            connection.send((Command)this.createPrepareTransaction(connectionInfo, (TransactionId)txid));
        }
        XARecoveryBrokerTest.assertNull((Object)this.receiveMessage(connection));
        this.assertNoMessagesLeft(connection);
        Message message = this.createMessage(producerInfo, destination);
        message.setPersistent(true);
        connection.request((Command)message);
        connection.request((Command)this.closeConnectionInfo(connectionInfo));
        this.restartBroker();
        connection = this.createConnection();
        connectionInfo = this.createConnectionInfo();
        sessionInfo = this.createSessionInfo(connectionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        consumerInfo = this.createConsumerInfo(sessionInfo, destination);
        connection.send((Command)consumerInfo);
        int countToReceive = this.expectedMessageCount(1, destination);
        for (int i3 = 0; i3 < countToReceive; ++i3) {
            Message m = this.receiveMessage(connection, TimeUnit.SECONDS.toMillis(10L));
            LOG.info("received: " + m);
            XARecoveryBrokerTest.assertNotNull((String)"got non tx message after prepared", (Object)m);
        }
        XARecoveryBrokerTest.assertNull((Object)this.receiveMessage(connection));
        this.assertNoMessagesLeft(connection);
        Response response = connection.request((Command)new TransactionInfo(connectionInfo.getConnectionId(), null, 5));
        XARecoveryBrokerTest.assertNotNull((Object)response);
        DataArrayResponse dar = (DataArrayResponse)response;
        XARecoveryBrokerTest.assertEquals((int)4, (int)dar.getData().length);
        connection.request((Command)this.closeConnectionInfo(connectionInfo));
        connection = this.createConnection();
        connectionInfo = this.createConnectionInfo();
        sessionInfo = this.createSessionInfo(connectionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        for (i = 0; i < dar.getData().length; ++i) {
            TransactionId transactionId = (TransactionId)dar.getData()[i];
            LOG.info("commit: " + transactionId);
            connection.request((Command)this.createCommitTransaction2Phase(connectionInfo, transactionId));
        }
        consumerInfo = this.createConsumerInfo(sessionInfo, destination);
        connection.send((Command)consumerInfo);
        countToReceive = this.expectedMessageCount(5, destination);
        for (i = 0; i < countToReceive; ++i) {
            Message m = this.receiveMessage(connection, TimeUnit.SECONDS.toMillis(10L));
            LOG.info("received: " + m);
            XARecoveryBrokerTest.assertNotNull((String)("Got non null message: " + i), (Object)m);
        }
        this.assertNoMessagesLeft(connection);
        this.assertEmptyDLQ();
    }

    public void testTopicPreparedTransactionRecoveredOnRestart() throws Exception {
        int i;
        ActiveMQTopic destination = new ActiveMQTopic("TryTopic");
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        connectionInfo.setClientId("durable");
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        ProducerInfo producerInfo = this.createProducerInfo(sessionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        connection.send((Command)producerInfo);
        ConsumerInfo consumerInfo = this.createConsumerInfo(sessionInfo, (ActiveMQDestination)destination);
        consumerInfo.setSubscriptionName("durable");
        connection.send((Command)consumerInfo);
        for (int i2 = 0; i2 < 4; ++i2) {
            XATransactionId txid = this.createXATransaction(sessionInfo);
            connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
            Message message = this.createMessage(producerInfo, (ActiveMQDestination)destination);
            message.setPersistent(true);
            message.setTransactionId((TransactionId)txid);
            connection.send((Command)message);
            connection.send((Command)this.createPrepareTransaction(connectionInfo, (TransactionId)txid));
        }
        XARecoveryBrokerTest.assertNull((Object)this.receiveMessage(connection));
        this.assertNoMessagesLeft(connection);
        connection.request((Command)this.closeConnectionInfo(connectionInfo));
        this.restartBroker();
        connection = this.createConnection();
        connectionInfo = this.createConnectionInfo();
        connectionInfo.setClientId("durable");
        sessionInfo = this.createSessionInfo(connectionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        consumerInfo = this.createConsumerInfo(sessionInfo, (ActiveMQDestination)destination);
        consumerInfo.setSubscriptionName("durable");
        connection.send((Command)consumerInfo);
        XARecoveryBrokerTest.assertNull((Object)this.receiveMessage(connection));
        this.assertNoMessagesLeft(connection);
        Response response = connection.request((Command)new TransactionInfo(connectionInfo.getConnectionId(), null, 5));
        XARecoveryBrokerTest.assertNotNull((Object)response);
        DataArrayResponse dar = (DataArrayResponse)response;
        XARecoveryBrokerTest.assertEquals((int)4, (int)dar.getData().length);
        connection.request((Command)this.closeConnectionInfo(connectionInfo));
        connection = this.createConnection();
        connectionInfo = this.createConnectionInfo();
        connectionInfo.setClientId("durable");
        sessionInfo = this.createSessionInfo(connectionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        consumerInfo = this.createConsumerInfo(sessionInfo, (ActiveMQDestination)destination);
        consumerInfo.setSubscriptionName("durable");
        connection.send((Command)consumerInfo);
        for (i = 0; i < dar.getData().length; ++i) {
            connection.request((Command)this.createCommitTransaction2Phase(connectionInfo, (TransactionId)dar.getData()[i]));
        }
        for (i = 0; i < this.expectedMessageCount(4, (ActiveMQDestination)destination); ++i) {
            Message m = this.receiveMessage(connection, TimeUnit.SECONDS.toMillis(10L));
            XARecoveryBrokerTest.assertNotNull((Object)m);
        }
        this.assertNoMessagesLeft(connection);
    }

    public void testQueuePersistentCommitedMessagesNotLostOnRestart() throws Exception {
        ActiveMQDestination destination = this.createDestination();
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        ProducerInfo producerInfo = this.createProducerInfo(sessionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        connection.send((Command)producerInfo);
        XATransactionId txid = this.createXATransaction(sessionInfo);
        connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
        for (int i = 0; i < 4; ++i) {
            Message message = this.createMessage(producerInfo, destination);
            message.setPersistent(true);
            message.setTransactionId((TransactionId)txid);
            connection.send((Command)message);
        }
        connection.send((Command)this.createCommitTransaction1Phase(connectionInfo, (TransactionId)txid));
        connection.request((Command)this.closeConnectionInfo(connectionInfo));
        this.restartBroker();
        connection = this.createConnection();
        connectionInfo = this.createConnectionInfo();
        sessionInfo = this.createSessionInfo(connectionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        ConsumerInfo consumerInfo = this.createConsumerInfo(sessionInfo, destination);
        connection.send((Command)consumerInfo);
        for (int i = 0; i < this.expectedMessageCount(4, destination); ++i) {
            Message m = this.receiveMessage(connection);
            XARecoveryBrokerTest.assertNotNull((Object)m);
        }
        this.assertNoMessagesLeft(connection);
    }

    public void testQueuePersistentCommited2PhaseMessagesNotLostOnRestart() throws Exception {
        ActiveMQDestination destination = this.createDestination();
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        ProducerInfo producerInfo = this.createProducerInfo(sessionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        connection.send((Command)producerInfo);
        XATransactionId txid = this.createXATransaction(sessionInfo);
        connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
        for (int i = 0; i < 4; ++i) {
            Message message = this.createMessage(producerInfo, destination);
            message.setPersistent(true);
            message.setTransactionId((TransactionId)txid);
            connection.send((Command)message);
        }
        connection.request((Command)this.createPrepareTransaction(connectionInfo, (TransactionId)txid));
        connection.send((Command)this.createCommitTransaction2Phase(connectionInfo, (TransactionId)txid));
        connection.request((Command)this.closeConnectionInfo(connectionInfo));
        this.restartBroker();
        connection = this.createConnection();
        connectionInfo = this.createConnectionInfo();
        sessionInfo = this.createSessionInfo(connectionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        ConsumerInfo consumerInfo = this.createConsumerInfo(sessionInfo, destination);
        connection.send((Command)consumerInfo);
        for (int i = 0; i < this.expectedMessageCount(4, destination); ++i) {
            Message m = this.receiveMessage(connection);
            XARecoveryBrokerTest.assertNotNull((Object)m);
        }
        this.assertNoMessagesLeft(connection);
    }

    public void testQueuePersistentCommitedAcksNotLostOnRestart() throws Exception {
        ConsumerInfo consumerInfo;
        ActiveMQDestination destination = this.createDestination();
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        ProducerInfo producerInfo = this.createProducerInfo(sessionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        connection.send((Command)producerInfo);
        for (int i = 0; i < 4; ++i) {
            Message message = this.createMessage(producerInfo, destination);
            message.setPersistent(true);
            connection.send((Command)message);
        }
        XATransactionId txid = this.createXATransaction(sessionInfo);
        connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
        Message m = null;
        for (ActiveMQDestination dest : this.destinationList(destination)) {
            consumerInfo = this.createConsumerInfo(sessionInfo, dest);
            connection.send((Command)consumerInfo);
            for (int i = 0; i < 4; ++i) {
                m = this.receiveMessage(connection);
                XARecoveryBrokerTest.assertNotNull((Object)m);
            }
            MessageAck ack = this.createAck(consumerInfo, m, 4, (byte)2);
            ack.setTransactionId((TransactionId)txid);
            connection.send((Command)ack);
        }
        connection.request((Command)this.createCommitTransaction1Phase(connectionInfo, (TransactionId)txid));
        this.restartBroker();
        connection = this.createConnection();
        connectionInfo = this.createConnectionInfo();
        sessionInfo = this.createSessionInfo(connectionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        consumerInfo = this.createConsumerInfo(sessionInfo, destination);
        connection.send((Command)consumerInfo);
        this.assertNoMessagesLeft(connection);
        m = this.receiveMessage(connection);
        XARecoveryBrokerTest.assertNull((Object)m);
    }

    public void testQueuePersistentPreparedAcksNotLostOnRestart() throws Exception {
        ConsumerInfo consumerInfo;
        ActiveMQDestination destination = this.createDestination();
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        ProducerInfo producerInfo = this.createProducerInfo(sessionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        connection.send((Command)producerInfo);
        for (int i = 0; i < 4; ++i) {
            Message message = this.createMessage(producerInfo, destination);
            message.setPersistent(true);
            connection.send((Command)message);
        }
        XATransactionId txid = this.createXATransaction(sessionInfo);
        connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
        Message m = null;
        for (ActiveMQDestination dest : this.destinationList(destination)) {
            consumerInfo = this.createConsumerInfo(sessionInfo, dest);
            connection.send((Command)consumerInfo);
            for (int i = 0; i < 4; ++i) {
                m = this.receiveMessage(connection);
                XARecoveryBrokerTest.assertNotNull((Object)m);
            }
            MessageAck ack = this.createAck(consumerInfo, m, 4, (byte)2);
            ack.setTransactionId((TransactionId)txid);
            connection.send((Command)ack);
        }
        connection.request((Command)this.createPrepareTransaction(connectionInfo, (TransactionId)txid));
        this.restartBroker();
        connection = this.createConnection();
        connectionInfo = this.createConnectionInfo();
        connection.send((Command)connectionInfo);
        TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, 5);
        DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request((Command)recoverInfo);
        XARecoveryBrokerTest.assertEquals((String)"there is a prepared tx", (int)1, (int)dataArrayResponse.getData().length);
        XARecoveryBrokerTest.assertEquals((String)"it matches", (Object)txid, (Object)dataArrayResponse.getData()[0]);
        sessionInfo = this.createSessionInfo(connectionInfo);
        connection.send((Command)sessionInfo);
        consumerInfo = this.createConsumerInfo(sessionInfo, destination);
        connection.send((Command)consumerInfo);
        m = this.receiveMessage(connection);
        XARecoveryBrokerTest.assertNull((Object)m);
        this.assertNoMessagesLeft(connection);
        DestinationViewMBean destinationView = this.getProxyToDestination(this.destinationList(destination)[0]);
        XARecoveryBrokerTest.assertEquals((String)"enqueue count does not see prepared acks", (long)0L, (long)destinationView.getQueueSize());
        XARecoveryBrokerTest.assertEquals((String)"dequeue count does not see prepared acks", (long)0L, (long)destinationView.getDequeueCount());
        connection.request((Command)this.createCommitTransaction2Phase(connectionInfo, (TransactionId)txid));
        dataArrayResponse = (DataArrayResponse)connection.request((Command)recoverInfo);
        XARecoveryBrokerTest.assertEquals((String)"there are no prepared tx", (int)0, (int)dataArrayResponse.getData().length);
        XARecoveryBrokerTest.assertEquals((String)"enqueue count does not see commited acks", (long)0L, (long)destinationView.getQueueSize());
        XARecoveryBrokerTest.assertEquals((String)"dequeue count does not see commited acks", (long)4L, (long)destinationView.getDequeueCount());
    }

    public void initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() {
        this.addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
    }

    public void testTopicPersistentPreparedAcksNotLostOnRestart() throws Exception {
        ActiveMQTopic destination = new ActiveMQTopic("TryTopic");
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        connectionInfo.setClientId("durable");
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        ProducerInfo producerInfo = this.createProducerInfo(sessionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        connection.send((Command)producerInfo);
        ConsumerInfo consumerInfo = this.createConsumerInfo(sessionInfo, (ActiveMQDestination)destination);
        consumerInfo.setSubscriptionName("durable");
        connection.send((Command)consumerInfo);
        int numMessages = 4;
        for (int i = 0; i < 4; ++i) {
            Message message = this.createMessage(producerInfo, (ActiveMQDestination)destination);
            message.setPersistent(true);
            connection.send((Command)message);
        }
        XATransactionId txid = this.createXATransaction(sessionInfo);
        connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
        int messageCount = this.expectedMessageCount(4, (ActiveMQDestination)destination);
        Message m = null;
        for (int i = 0; i < messageCount; ++i) {
            m = this.receiveMessage(connection);
            XARecoveryBrokerTest.assertNotNull((String)("unexpected null on: " + i), (Object)m);
        }
        MessageAck ack = this.createAck(consumerInfo, m, messageCount, (byte)2);
        ack.setTransactionId((TransactionId)txid);
        connection.send((Command)ack);
        connection.request((Command)this.createPrepareTransaction(connectionInfo, (TransactionId)txid));
        this.restartBroker();
        connection = this.createConnection();
        connectionInfo = this.createConnectionInfo();
        connectionInfo.setClientId("durable");
        connection.send((Command)connectionInfo);
        TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, 5);
        DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request((Command)recoverInfo);
        XARecoveryBrokerTest.assertEquals((String)"there is a prepared tx", (int)1, (int)dataArrayResponse.getData().length);
        XARecoveryBrokerTest.assertEquals((String)"it matches", (Object)txid, (Object)dataArrayResponse.getData()[0]);
        sessionInfo = this.createSessionInfo(connectionInfo);
        connection.send((Command)sessionInfo);
        consumerInfo = this.createConsumerInfo(sessionInfo, (ActiveMQDestination)destination);
        consumerInfo.setSubscriptionName("durable");
        connection.send((Command)consumerInfo);
        m = this.receiveMessage(connection);
        XARecoveryBrokerTest.assertNull((Object)m);
        this.assertNoMessagesLeft(connection);
        connection.request((Command)this.createCommitTransaction2Phase(connectionInfo, (TransactionId)txid));
        dataArrayResponse = (DataArrayResponse)connection.request((Command)recoverInfo);
        XARecoveryBrokerTest.assertEquals((String)"there are no prepared tx", (int)0, (int)dataArrayResponse.getData().length);
    }

    public void testTopicPersistentPreparedAcksNotLostOnRestartForNSubs() throws Exception {
        ActiveMQTopic destination = new ActiveMQTopic("TryTopic");
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        connectionInfo.setClientId("durable");
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        ProducerInfo producerInfo = this.createProducerInfo(sessionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        connection.send((Command)producerInfo);
        ConsumerInfo consumerInfo = this.createConsumerInfo(sessionInfo, (ActiveMQDestination)destination);
        consumerInfo.setSubscriptionName("sub");
        connection.send((Command)consumerInfo);
        ConsumerInfo consumerInfoX = this.createConsumerInfo(sessionInfo, (ActiveMQDestination)destination);
        consumerInfoX.setSubscriptionName("subX");
        connection.send((Command)consumerInfoX);
        connection.send((Command)consumerInfoX.createRemoveCommand());
        int numMessages = 4;
        for (int i = 0; i < 4; ++i) {
            Message message = this.createMessage(producerInfo, (ActiveMQDestination)destination);
            message.setPersistent(true);
            connection.send((Command)message);
        }
        XATransactionId txid = this.createXATransaction(sessionInfo);
        connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
        int messageCount = this.expectedMessageCount(4, (ActiveMQDestination)destination);
        Message m = null;
        for (int i = 0; i < messageCount; ++i) {
            m = this.receiveMessage(connection);
            XARecoveryBrokerTest.assertNotNull((String)("unexpected null on: " + i), (Object)m);
        }
        MessageAck ack = this.createAck(consumerInfo, m, messageCount, (byte)2);
        ack.setTransactionId((TransactionId)txid);
        connection.send((Command)ack);
        connection.request((Command)this.createPrepareTransaction(connectionInfo, (TransactionId)txid));
        this.restartBroker();
        connection = this.createConnection();
        connectionInfo = this.createConnectionInfo();
        connectionInfo.setClientId("durable");
        connection.send((Command)connectionInfo);
        TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, 5);
        DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request((Command)recoverInfo);
        XARecoveryBrokerTest.assertEquals((String)"there is a prepared tx", (int)1, (int)dataArrayResponse.getData().length);
        XARecoveryBrokerTest.assertEquals((String)"it matches", (Object)txid, (Object)dataArrayResponse.getData()[0]);
        sessionInfo = this.createSessionInfo(connectionInfo);
        connection.send((Command)sessionInfo);
        consumerInfo = this.createConsumerInfo(sessionInfo, (ActiveMQDestination)destination);
        consumerInfo.setSubscriptionName("sub");
        connection.send((Command)consumerInfo);
        m = this.receiveMessage(connection);
        XARecoveryBrokerTest.assertNull((Object)m);
        this.assertNoMessagesLeft(connection);
        consumerInfoX = this.createConsumerInfo(sessionInfo, (ActiveMQDestination)destination);
        consumerInfoX.setSubscriptionName("subX");
        connection.send((Command)consumerInfoX);
        for (int i = 0; i < messageCount; ++i) {
            m = this.receiveMessage(connection);
            XARecoveryBrokerTest.assertNotNull((String)("unexpected null for subX on: " + i), (Object)m);
        }
        connection.request((Command)this.createCommitTransaction2Phase(connectionInfo, (TransactionId)txid));
        dataArrayResponse = (DataArrayResponse)connection.request((Command)recoverInfo);
        XARecoveryBrokerTest.assertEquals((String)"there are no prepared tx", (int)0, (int)dataArrayResponse.getData().length);
    }

    public void testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
        ConsumerInfo consumerInfo;
        ActiveMQDestination destination = this.createDestination();
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        ProducerInfo producerInfo = this.createProducerInfo(sessionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        connection.send((Command)producerInfo);
        int numMessages = 4;
        for (int i = 0; i < numMessages; ++i) {
            Message message = this.createMessage(producerInfo, destination);
            message.setPersistent(true);
            connection.send((Command)message);
        }
        XATransactionId txid = this.createXATransaction(sessionInfo);
        connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
        Message message = null;
        for (ActiveMQDestination dest : this.destinationList(destination)) {
            consumerInfo = this.createConsumerInfo(sessionInfo, dest);
            connection.send((Command)consumerInfo);
            for (int i = 0; i < numMessages; ++i) {
                message = this.receiveMessage(connection);
                XARecoveryBrokerTest.assertNotNull((Object)message);
            }
            MessageAck ack = this.createAck(consumerInfo, message, numMessages, (byte)2);
            ack.setTransactionId((TransactionId)txid);
            connection.send((Command)ack);
        }
        connection.request((Command)this.createPrepareTransaction(connectionInfo, (TransactionId)txid));
        this.restartBroker();
        connection = this.createConnection();
        connectionInfo = this.createConnectionInfo();
        connection.send((Command)connectionInfo);
        TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, 5);
        DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request((Command)recoverInfo);
        XARecoveryBrokerTest.assertEquals((String)"there is a prepared tx", (int)1, (int)dataArrayResponse.getData().length);
        XARecoveryBrokerTest.assertEquals((String)"it matches", (Object)txid, (Object)dataArrayResponse.getData()[0]);
        sessionInfo = this.createSessionInfo(connectionInfo);
        connection.send((Command)sessionInfo);
        consumerInfo = this.createConsumerInfo(sessionInfo, destination);
        connection.send((Command)consumerInfo);
        message = this.receiveMessage(connection);
        XARecoveryBrokerTest.assertNull((Object)message);
        this.assertNoMessagesLeft(connection);
        connection.request((Command)consumerInfo.createRemoveCommand());
        LOG.info("Send some more before the rollback");
        producerInfo = this.createProducerInfo(sessionInfo);
        connection.send((Command)producerInfo);
        for (int i = 0; i < numMessages * 2; ++i) {
            message = this.createMessage(producerInfo, destination);
            message.setPersistent(true);
            connection.send((Command)message);
        }
        LOG.info("Send some more before the rollback");
        connection.request((Command)this.createRollbackTransaction(connectionInfo, (TransactionId)txid));
        LOG.info("new tx for redelivery");
        txid = this.createXATransaction(sessionInfo);
        connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
        HashSet<ConsumerInfo> consumerInfoSet = new HashSet<ConsumerInfo>();
        for (ActiveMQDestination dest : this.destinationList(destination)) {
            consumerInfo = this.createConsumerInfo(sessionInfo, dest);
            connection.send((Command)consumerInfo);
            consumerInfoSet.add(consumerInfo);
            LOG.info("consume messages for: " + dest.getPhysicalName() + " " + consumerInfo.getConsumerId());
            for (int i = 0; i < numMessages; ++i) {
                message = this.receiveMessage(connection);
                XARecoveryBrokerTest.assertNotNull((String)("unexpected null on:" + i), (Object)message);
                LOG.info(dest.getPhysicalName() + " ID: " + message.getMessageId());
            }
            MessageAck ack = this.createAck(consumerInfo, message, numMessages, (byte)2);
            ack.setTransactionId((TransactionId)txid);
            connection.request((Command)ack);
            while ((message = this.receiveMessage(connection)) != null) {
                LOG.info("Pre fetched and unwanted: " + message.getMessageId() + " on " + message.getDestination().getPhysicalName());
            }
        }
        LOG.info("commit..");
        connection.request((Command)this.createCommitTransaction1Phase(connectionInfo, (TransactionId)txid));
        for (ConsumerInfo info : consumerInfoSet) {
            connection.request((Command)info.createRemoveCommand());
        }
        consumerInfoSet.clear();
        dataArrayResponse = (DataArrayResponse)connection.request((Command)recoverInfo);
        XARecoveryBrokerTest.assertEquals((String)"there are no prepared tx", (int)0, (int)dataArrayResponse.getData().length);
        LOG.info("consume additional messages");
        while ((message = this.receiveMessage(connection)) != null) {
            LOG.info("Pre fetched and unwanted: " + message.getMessageId() + " on " + message.getDestination().getPhysicalName());
        }
        for (ActiveMQDestination dest : this.destinationList(destination)) {
            consumerInfo = this.createConsumerInfo(sessionInfo, dest);
            connection.request((Command)consumerInfo);
            LOG.info("consume additional messages for: " + dest.getPhysicalName() + " " + consumerInfo.getConsumerId());
            for (int i = 0; i < numMessages * 2; ++i) {
                message = this.receiveMessage(connection);
                XARecoveryBrokerTest.assertNotNull((String)("unexpected null on:" + i), (Object)message);
                LOG.info(dest.getPhysicalName() + " ID: " + message.getMessageId());
                MessageAck ack = this.createAck(consumerInfo, message, 1, (byte)2);
                connection.request((Command)ack);
            }
            connection.request((Command)consumerInfo.createRemoveCommand());
        }
        this.assertNoMessagesLeft(connection);
    }

    public void testQueuePersistentPreparedAcksAvailableAfterRollbackPrefetchOne() throws Exception {
        ActiveMQDestination destination = this.createDestination();
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        ProducerInfo producerInfo = this.createProducerInfo(sessionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        connection.send((Command)producerInfo);
        int numMessages = 1;
        for (int i = 0; i < numMessages; ++i) {
            Message message = this.createMessage(producerInfo, destination);
            message.setPersistent(true);
            connection.send((Command)message);
        }
        XATransactionId txid = this.createXATransaction(sessionInfo);
        connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
        HashSet<ConsumerInfo> consumerInfos = new HashSet<ConsumerInfo>();
        for (ActiveMQDestination dest : this.destinationList(destination)) {
            ConsumerInfo consumerInfo = this.createConsumerInfo(sessionInfo, dest);
            consumerInfo.setPrefetchSize(numMessages);
            consumerInfos.add(consumerInfo);
        }
        for (ConsumerInfo info : consumerInfos) {
            connection.send((Command)info);
        }
        Message message = null;
        for (ConsumerInfo info : consumerInfos) {
            for (int i = 0; i < numMessages; ++i) {
                message = this.receiveMessage(connection);
                XARecoveryBrokerTest.assertNotNull((Object)message);
                connection.send((Command)this.createAck(info, message, 1, (byte)0));
            }
            MessageAck ack = this.createAck(info, message, numMessages, (byte)2);
            ack.setTransactionId((TransactionId)txid);
            connection.send((Command)ack);
        }
        connection.request((Command)this.createPrepareTransaction(connectionInfo, (TransactionId)txid));
        connection.send((Command)connectionInfo.createRemoveCommand());
        connection = this.createConnection();
        connection.send((Command)connectionInfo);
        TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, 5);
        DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request((Command)recoverInfo);
        XARecoveryBrokerTest.assertEquals((String)"there is a prepared tx", (int)1, (int)dataArrayResponse.getData().length);
        XARecoveryBrokerTest.assertEquals((String)"it matches", (Object)txid, (Object)dataArrayResponse.getData()[0]);
        connection.send((Command)sessionInfo);
        for (ConsumerInfo info : consumerInfos) {
            connection.send((Command)info);
        }
        message = this.receiveMessage(connection);
        XARecoveryBrokerTest.assertNull((Object)message);
        this.assertNoMessagesLeft(connection);
        connection.request((Command)this.createRollbackTransaction(connectionInfo, (TransactionId)txid));
        LOG.info("new tx for redelivery");
        txid = this.createXATransaction(sessionInfo);
        connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
        for (ConsumerInfo info : consumerInfos) {
            for (int i = 0; i < numMessages; ++i) {
                message = this.receiveMessage(connection);
                XARecoveryBrokerTest.assertNotNull((String)("unexpected null on:" + i), (Object)message);
                MessageAck ack = this.createAck(info, message, 1, (byte)2);
                ack.setTransactionId((TransactionId)txid);
                connection.send((Command)ack);
            }
        }
        connection.request((Command)this.createCommitTransaction1Phase(connectionInfo, (TransactionId)txid));
        dataArrayResponse = (DataArrayResponse)connection.request((Command)recoverInfo);
        XARecoveryBrokerTest.assertEquals((String)"there are no prepared tx", (int)0, (int)dataArrayResponse.getData().length);
    }

    public void testQueuePersistentPreparedAcksAvailableAfterRollback() throws Exception {
        ActiveMQDestination destination = this.createDestination();
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        ProducerInfo producerInfo = this.createProducerInfo(sessionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        connection.send((Command)producerInfo);
        int numMessages = 4;
        for (int i = 0; i < numMessages; ++i) {
            Message message = this.createMessage(producerInfo, destination);
            connection.send((Command)message);
        }
        XATransactionId txid = this.createXATransaction(sessionInfo);
        connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
        HashSet<ConsumerInfo> consumerInfos = new HashSet<ConsumerInfo>();
        for (ActiveMQDestination dest : this.destinationList(destination)) {
            ConsumerInfo consumerInfo = this.createConsumerInfo(sessionInfo, dest);
            consumerInfos.add(consumerInfo);
        }
        Message message = null;
        for (ConsumerInfo info : consumerInfos) {
            connection.request((Command)info);
            for (int i = 0; i < numMessages; ++i) {
                message = this.receiveMessage(connection);
                XARecoveryBrokerTest.assertNotNull((Object)message);
                LOG.info("ORIG " + message.getMessageId());
                connection.send((Command)this.createAck(info, message, 1, (byte)0));
            }
            MessageAck ack = this.createAck(info, message, numMessages, (byte)2);
            ack.setTransactionId((TransactionId)txid);
            connection.send((Command)ack);
        }
        connection.request((Command)this.createPrepareTransaction(connectionInfo, (TransactionId)txid));
        connection.send((Command)connectionInfo.createRemoveCommand());
        connection = this.createConnection();
        connection.send((Command)connectionInfo);
        TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, 5);
        DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request((Command)recoverInfo);
        XARecoveryBrokerTest.assertEquals((String)"there is a prepared tx", (int)1, (int)dataArrayResponse.getData().length);
        XARecoveryBrokerTest.assertEquals((String)"it matches", (Object)txid, (Object)dataArrayResponse.getData()[0]);
        connection.send((Command)sessionInfo);
        LOG.info("add consumers..");
        for (ConsumerInfo info : consumerInfos) {
            connection.send((Command)info);
        }
        message = this.receiveMessage(connection);
        XARecoveryBrokerTest.assertNull((Object)message);
        this.assertNoMessagesLeft(connection);
        connection.request((Command)this.createRollbackTransaction(connectionInfo, (TransactionId)txid));
        LOG.info("new tx for redelivery");
        txid = this.createXATransaction(sessionInfo);
        connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
        for (ConsumerInfo info : consumerInfos) {
            for (int i = 0; i < numMessages; ++i) {
                message = this.receiveMessage(connection);
                XARecoveryBrokerTest.assertNotNull((String)("unexpected null on:" + i), (Object)message);
                LOG.info("REC " + message.getMessageId());
                MessageAck ack = this.createAck(info, message, 1, (byte)2);
                ack.setTransactionId((TransactionId)txid);
                connection.send((Command)ack);
            }
        }
        connection.request((Command)this.createCommitTransaction1Phase(connectionInfo, (TransactionId)txid));
        dataArrayResponse = (DataArrayResponse)connection.request((Command)recoverInfo);
        XARecoveryBrokerTest.assertEquals((String)"there are no prepared tx", (int)0, (int)dataArrayResponse.getData().length);
    }

    public void initCombosForTestTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() {
        this.addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
    }

    public void testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception {
        Message message;
        ActiveMQTopic destination = new ActiveMQTopic("TryTopic");
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        connectionInfo.setClientId("durable");
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        ProducerInfo producerInfo = this.createProducerInfo(sessionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        connection.send((Command)producerInfo);
        ConsumerInfo consumerInfo = this.createConsumerInfo(sessionInfo, (ActiveMQDestination)destination);
        consumerInfo.setSubscriptionName("durable");
        connection.send((Command)consumerInfo);
        int numMessages = 4;
        for (int i = 0; i < numMessages; ++i) {
            message = this.createMessage(producerInfo, (ActiveMQDestination)destination);
            message.setPersistent(true);
            connection.send((Command)message);
        }
        XATransactionId txid = this.createXATransaction(sessionInfo);
        connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
        message = null;
        for (int i = 0; i < numMessages; ++i) {
            message = this.receiveMessage(connection);
            XARecoveryBrokerTest.assertNotNull((Object)message);
        }
        MessageAck ack = this.createAck(consumerInfo, message, numMessages, (byte)2);
        ack.setTransactionId((TransactionId)txid);
        connection.send((Command)ack);
        connection.request((Command)this.createPrepareTransaction(connectionInfo, (TransactionId)txid));
        this.restartBroker();
        connection = this.createConnection();
        connectionInfo = this.createConnectionInfo();
        connectionInfo.setClientId("durable");
        connection.send((Command)connectionInfo);
        TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, 5);
        DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request((Command)recoverInfo);
        XARecoveryBrokerTest.assertEquals((String)"there is a prepared tx", (int)1, (int)dataArrayResponse.getData().length);
        XARecoveryBrokerTest.assertEquals((String)"it matches", (Object)txid, (Object)dataArrayResponse.getData()[0]);
        sessionInfo = this.createSessionInfo(connectionInfo);
        connection.send((Command)sessionInfo);
        consumerInfo = this.createConsumerInfo(sessionInfo, (ActiveMQDestination)destination);
        consumerInfo.setSubscriptionName("durable");
        connection.send((Command)consumerInfo);
        message = this.receiveMessage(connection);
        XARecoveryBrokerTest.assertNull((Object)message);
        this.assertNoMessagesLeft(connection);
        connection.request((Command)this.createRollbackTransaction(connectionInfo, (TransactionId)txid));
        LOG.info("new tx for redelivery");
        txid = this.createXATransaction(sessionInfo);
        connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
        for (int i = 0; i < numMessages; ++i) {
            message = this.receiveMessage(connection);
            XARecoveryBrokerTest.assertNotNull((String)("unexpected null on:" + i), (Object)message);
        }
        ack = this.createAck(consumerInfo, message, numMessages, (byte)2);
        ack.setTransactionId((TransactionId)txid);
        connection.send((Command)ack);
        connection.request((Command)this.createCommitTransaction1Phase(connectionInfo, (TransactionId)txid));
        dataArrayResponse = (DataArrayResponse)connection.request((Command)recoverInfo);
        XARecoveryBrokerTest.assertEquals((String)"there are no prepared tx", (int)0, (int)dataArrayResponse.getData().length);
    }

    public void initCombosForTestTopicPersistentPreparedAcksAvailableAfterRollback() {
        this.addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
    }

    public void testTopicPersistentPreparedAcksAvailableAfterRollback() throws Exception {
        Message message;
        ActiveMQTopic destination = new ActiveMQTopic("TryTopic");
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        connectionInfo.setClientId("durable");
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        ProducerInfo producerInfo = this.createProducerInfo(sessionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        connection.send((Command)producerInfo);
        ConsumerInfo consumerInfo = this.createConsumerInfo(sessionInfo, (ActiveMQDestination)destination);
        consumerInfo.setSubscriptionName("durable");
        connection.send((Command)consumerInfo);
        int numMessages = 4;
        for (int i = 0; i < numMessages; ++i) {
            message = this.createMessage(producerInfo, (ActiveMQDestination)destination);
            message.setPersistent(true);
            connection.send((Command)message);
        }
        XATransactionId txid = this.createXATransaction(sessionInfo);
        connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
        message = null;
        for (int i = 0; i < numMessages; ++i) {
            message = this.receiveMessage(connection);
            XARecoveryBrokerTest.assertNotNull((Object)message);
        }
        MessageAck ack = this.createAck(consumerInfo, message, numMessages, (byte)2);
        ack.setTransactionId((TransactionId)txid);
        connection.send((Command)ack);
        connection.request((Command)this.createPrepareTransaction(connectionInfo, (TransactionId)txid));
        connection.request((Command)this.createRollbackTransaction(connectionInfo, (TransactionId)txid));
        LOG.info("new consumer/tx for redelivery");
        connection.request((Command)this.closeConnectionInfo(connectionInfo));
        connectionInfo = this.createConnectionInfo();
        connectionInfo.setClientId("durable");
        sessionInfo = this.createSessionInfo(connectionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        consumerInfo = this.createConsumerInfo(sessionInfo, (ActiveMQDestination)destination);
        consumerInfo.setSubscriptionName("durable");
        connection.send((Command)consumerInfo);
        txid = this.createXATransaction(sessionInfo);
        connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
        for (int i = 0; i < numMessages; ++i) {
            message = this.receiveMessage(connection);
            XARecoveryBrokerTest.assertNotNull((String)("unexpected null on:" + i), (Object)message);
        }
        ack = this.createAck(consumerInfo, message, numMessages, (byte)2);
        ack.setTransactionId((TransactionId)txid);
        connection.send((Command)ack);
        connection.request((Command)this.createCommitTransaction1Phase(connectionInfo, (TransactionId)txid));
    }

    public void initCombosForTestTopicPersistentPreparedAcksUnavailableTillRollback() {
        this.addCombinationValues("keepDurableSubsActive", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
    }

    public void testTopicPersistentPreparedAcksUnavailableTillRollback() throws Exception {
        Message message;
        ActiveMQTopic destination = new ActiveMQTopic("TryTopic");
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        connectionInfo.setClientId("durable");
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        ProducerInfo producerInfo = this.createProducerInfo(sessionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        connection.send((Command)producerInfo);
        ConsumerInfo consumerInfo = this.createConsumerInfo(sessionInfo, (ActiveMQDestination)destination);
        consumerInfo.setSubscriptionName("durable");
        connection.send((Command)consumerInfo);
        int numMessages = 4;
        for (int i = 0; i < numMessages; ++i) {
            message = this.createMessage(producerInfo, (ActiveMQDestination)destination);
            message.setPersistent(true);
            connection.send((Command)message);
        }
        XATransactionId txid = this.createXATransaction(sessionInfo);
        connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
        message = null;
        for (int i = 0; i < numMessages; ++i) {
            message = this.receiveMessage(connection);
            XARecoveryBrokerTest.assertNotNull((Object)message);
        }
        MessageAck ack = this.createAck(consumerInfo, message, numMessages, (byte)2);
        ack.setTransactionId((TransactionId)txid);
        connection.send((Command)ack);
        connection.request((Command)this.createEndTransaction(connectionInfo, (TransactionId)txid));
        connection.request((Command)this.createPrepareTransaction(connectionInfo, (TransactionId)txid));
        connection.request((Command)this.closeConnectionInfo(connectionInfo));
        LOG.info("new consumer for *no* redelivery");
        connectionInfo = this.createConnectionInfo();
        connectionInfo.setClientId("durable");
        sessionInfo = this.createSessionInfo(connectionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        consumerInfo = this.createConsumerInfo(sessionInfo, (ActiveMQDestination)destination);
        consumerInfo.setSubscriptionName("durable");
        connection.send((Command)consumerInfo);
        message = this.receiveMessage(connection, 2000L);
        XARecoveryBrokerTest.assertNull((String)"unexpected non null", (Object)message);
        connection.request((Command)this.createRollbackTransaction(connectionInfo, (TransactionId)txid));
        for (int i = 0; i < numMessages; ++i) {
            message = this.receiveMessage(connection);
            XARecoveryBrokerTest.assertNotNull((String)("unexpected null on:" + i), (Object)message);
        }
        connection.request((Command)consumerInfo.createRemoveCommand());
        RemoveSubscriptionInfo removeSubscriptionInfo = new RemoveSubscriptionInfo();
        removeSubscriptionInfo.setClientId(connectionInfo.getClientId());
        removeSubscriptionInfo.setSubscriptionName(consumerInfo.getSubscriptionName());
        connection.request((Command)removeSubscriptionInfo);
    }

    public void initCombosForTestTopicPersistentPreparedAcksUnavailableTillComplete() {
        this.addCombinationValues("keepDurableSubsActive", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
    }

    public void testTopicPersistentPreparedAcksUnavailableTillComplete() throws Exception {
        Message message;
        ActiveMQTopic destination = new ActiveMQTopic("TryTopic");
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        connectionInfo.setClientId("durable");
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        ProducerInfo producerInfo = this.createProducerInfo(sessionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        connection.send((Command)producerInfo);
        ConsumerInfo consumerInfo = this.createConsumerInfo(sessionInfo, (ActiveMQDestination)destination);
        consumerInfo.setSubscriptionName("durable");
        connection.send((Command)consumerInfo);
        int numMessages = 4;
        for (int i = 0; i < numMessages; ++i) {
            message = this.createMessage(producerInfo, (ActiveMQDestination)destination);
            message.setPersistent(true);
            connection.send((Command)message);
        }
        XATransactionId txid = this.createXATransaction(sessionInfo);
        connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
        message = null;
        for (int i = 0; i < numMessages; ++i) {
            message = this.receiveMessage(connection);
            XARecoveryBrokerTest.assertNotNull((Object)message);
        }
        MessageAck ack = this.createAck(consumerInfo, message, numMessages, (byte)2);
        ack.setTransactionId((TransactionId)txid);
        connection.send((Command)ack);
        connection.request((Command)this.createEndTransaction(connectionInfo, (TransactionId)txid));
        connection.request((Command)this.createPrepareTransaction(connectionInfo, (TransactionId)txid));
        connection.request((Command)this.closeConnectionInfo(connectionInfo));
        LOG.info("new consumer for *no* redelivery");
        connectionInfo = this.createConnectionInfo();
        connectionInfo.setClientId("durable");
        sessionInfo = this.createSessionInfo(connectionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        consumerInfo = this.createConsumerInfo(sessionInfo, (ActiveMQDestination)destination);
        consumerInfo.setSubscriptionName("durable");
        connection.send((Command)consumerInfo);
        message = this.receiveMessage(connection, 2000L);
        XARecoveryBrokerTest.assertNull((String)"unexpected non null", (Object)message);
        connection.request((Command)this.createCommitTransaction2Phase(connectionInfo, (TransactionId)txid));
        message = this.receiveMessage(connection, 2000L);
        XARecoveryBrokerTest.assertNull((String)("unexpected non null: " + message), (Object)message);
        connection.request((Command)consumerInfo.createRemoveCommand());
        RemoveSubscriptionInfo removeSubscriptionInfo = new RemoveSubscriptionInfo();
        removeSubscriptionInfo.setClientId(connectionInfo.getClientId());
        removeSubscriptionInfo.setSubscriptionName(consumerInfo.getSubscriptionName());
        connection.request((Command)removeSubscriptionInfo);
    }

    public void initCombosForTestNoDupOnRollbackRedelivery() {
        this.addCombinationValues("keepDurableSubsActive", new Boolean[]{Boolean.FALSE, Boolean.TRUE});
    }

    public void testNoDupOnRollbackRedelivery() throws Exception {
        Message message;
        ActiveMQTopic destination = new ActiveMQTopic("TryTopic");
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        connectionInfo.setClientId("durable");
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        ProducerInfo producerInfo = this.createProducerInfo(sessionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        connection.send((Command)producerInfo);
        ConsumerInfo consumerInfo = this.createConsumerInfo(sessionInfo, (ActiveMQDestination)destination);
        consumerInfo.setSubscriptionName("durable");
        connection.send((Command)consumerInfo);
        int numMessages = 1;
        for (int i = 0; i < numMessages; ++i) {
            message = this.createMessage(producerInfo, (ActiveMQDestination)destination);
            message.setPersistent(true);
            connection.send((Command)message);
        }
        XATransactionId txid = this.createXATransaction(sessionInfo);
        connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
        message = null;
        for (int i = 0; i < numMessages; ++i) {
            message = this.receiveMessage(connection);
            XARecoveryBrokerTest.assertNotNull((Object)message);
        }
        MessageAck ack = this.createAck(consumerInfo, message, numMessages, (byte)2);
        ack.setTransactionId((TransactionId)txid);
        connection.send((Command)ack);
        connection.request((Command)this.createEndTransaction(connectionInfo, (TransactionId)txid));
        connection.request((Command)this.createRollbackTransaction(connectionInfo, (TransactionId)txid));
        connection.send((Command)consumerInfo.createRemoveCommand());
        connection.send((Command)sessionInfo.createRemoveCommand());
        connection.send((Command)connectionInfo.createRemoveCommand());
        LOG.info("new connection/consumer for redelivery");
        connection.request((Command)this.closeConnectionInfo(connectionInfo));
        connectionInfo = this.createConnectionInfo();
        connectionInfo.setClientId("durable");
        sessionInfo = this.createSessionInfo(connectionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        consumerInfo = this.createConsumerInfo(sessionInfo, (ActiveMQDestination)destination);
        consumerInfo.setSubscriptionName("durable");
        connection.send((Command)consumerInfo);
        message = this.receiveMessage(connection);
        XARecoveryBrokerTest.assertNotNull((Object)message);
        Message dup = this.receiveMessage(connection);
        XARecoveryBrokerTest.assertNull((String)("no duplicate send: " + dup), (Object)dup);
        txid = this.createXATransaction(sessionInfo);
        connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
        ack = this.createAck(consumerInfo, message, numMessages, (byte)2);
        ack.setTransactionId((TransactionId)txid);
        connection.send((Command)ack);
        connection.request((Command)this.createEndTransaction(connectionInfo, (TransactionId)txid));
        connection.request((Command)this.createCommitTransaction1Phase(connectionInfo, (TransactionId)txid));
        connection.request((Command)consumerInfo.createRemoveCommand());
        RemoveSubscriptionInfo removeSubscriptionInfo = new RemoveSubscriptionInfo();
        removeSubscriptionInfo.setClientId(connectionInfo.getClientId());
        removeSubscriptionInfo.setSubscriptionName(consumerInfo.getSubscriptionName());
        connection.request((Command)removeSubscriptionInfo);
    }

    private ActiveMQDestination[] destinationList(ActiveMQDestination dest) {
        ActiveMQDestination[] activeMQDestinationArray;
        if (dest.isComposite()) {
            activeMQDestinationArray = dest.getCompositeDestinations();
        } else {
            ActiveMQDestination[] activeMQDestinationArray2 = new ActiveMQDestination[1];
            activeMQDestinationArray = activeMQDestinationArray2;
            activeMQDestinationArray2[0] = dest;
        }
        return activeMQDestinationArray;
    }

    private int expectedMessageCount(int i, ActiveMQDestination destination) {
        return i * (destination.isComposite() ? destination.getCompositeDestinations().length : 1);
    }

    public void testQueuePersistentUncommittedAcksLostOnRestart() throws Exception {
        ConsumerInfo consumerInfo;
        Message message;
        ActiveMQDestination destination = this.createDestination();
        StubConnection connection = this.createConnection();
        ConnectionInfo connectionInfo = this.createConnectionInfo();
        SessionInfo sessionInfo = this.createSessionInfo(connectionInfo);
        ProducerInfo producerInfo = this.createProducerInfo(sessionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        connection.send((Command)producerInfo);
        for (int i = 0; i < 4; ++i) {
            message = this.createMessage(producerInfo, destination);
            message.setPersistent(true);
            connection.send((Command)message);
        }
        XATransactionId txid = this.createXATransaction(sessionInfo);
        connection.send((Command)this.createBeginTransaction(connectionInfo, (TransactionId)txid));
        message = null;
        for (ActiveMQDestination dest : this.destinationList(destination)) {
            consumerInfo = this.createConsumerInfo(sessionInfo, dest);
            connection.send((Command)consumerInfo);
            for (int i = 0; i < 4; ++i) {
                message = this.receiveMessage(connection);
                XARecoveryBrokerTest.assertNotNull((Object)message);
            }
            MessageAck ack = this.createAck(consumerInfo, message, 4, (byte)2);
            ack.setTransactionId((TransactionId)txid);
            connection.request((Command)ack);
        }
        this.restartBroker();
        connection = this.createConnection();
        connectionInfo = this.createConnectionInfo();
        sessionInfo = this.createSessionInfo(connectionInfo);
        connection.send((Command)connectionInfo);
        connection.send((Command)sessionInfo);
        for (ActiveMQDestination dest : this.destinationList(destination)) {
            consumerInfo = this.createConsumerInfo(sessionInfo, dest);
            connection.send((Command)consumerInfo);
            for (int i = 0; i < 4; ++i) {
                message = this.receiveMessage(connection);
                XARecoveryBrokerTest.assertNotNull((Object)message);
            }
        }
        this.assertNoMessagesLeft(connection);
    }

    @Override
    protected PolicyEntry getDefaultPolicy() {
        PolicyEntry policyEntry = super.getDefaultPolicy();
        policyEntry.setPrioritizedMessages(this.prioritySupport);
        return policyEntry;
    }

    @Override
    protected void configureBroker(BrokerService broker) throws Exception {
        super.configureBroker(broker);
        broker.setKeepDurableSubsActive(this.keepDurableSubsActive);
        this.maxWait = 2000;
    }

    public static Test suite() {
        return XARecoveryBrokerTest.suite(XARecoveryBrokerTest.class);
    }

    public static void main(String[] args) {
        TestRunner.run((Test)XARecoveryBrokerTest.suite());
    }

    protected ActiveMQDestination createDestination() {
        return new ActiveMQQueue(((Object)((Object)this)).getClass().getName() + "." + this.getName());
    }
}

