package org.apache.activemq;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.textui.TestRunner;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/RedeliveryPolicyTest.class */
public class RedeliveryPolicyTest extends JmsTestSupport {
    static final Logger LOG = LoggerFactory.getLogger(RedeliveryPolicyTest.class);

    public static Test suite() {
        return suite(RedeliveryPolicyTest.class);
    }

    public static void main(String[] strArr) {
        TestRunner.run(suite());
    }

    public void testGetNext() throws Exception {
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(0L);
        redeliveryPolicy.setRedeliveryDelay(500L);
        redeliveryPolicy.setBackOffMultiplier(2.0d);
        redeliveryPolicy.setUseExponentialBackOff(true);
        long nextRedeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(0L);
        assertEquals(500L, nextRedeliveryDelay);
        long nextRedeliveryDelay2 = redeliveryPolicy.getNextRedeliveryDelay(nextRedeliveryDelay);
        assertEquals(1000L, nextRedeliveryDelay2);
        long nextRedeliveryDelay3 = redeliveryPolicy.getNextRedeliveryDelay(nextRedeliveryDelay2);
        assertEquals(2000L, nextRedeliveryDelay3);
        redeliveryPolicy.setUseExponentialBackOff(false);
        assertEquals(500L, redeliveryPolicy.getNextRedeliveryDelay(nextRedeliveryDelay3));
    }

    public void testExponentialRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception {
        RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(0L);
        redeliveryPolicy.setRedeliveryDelay(500L);
        redeliveryPolicy.setBackOffMultiplier(2.0d);
        redeliveryPolicy.setUseExponentialBackOff(true);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue(getName());
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        createProducer.send(createSession.createTextMessage("1st"));
        createProducer.send(createSession.createTextMessage("2nd"));
        createSession.commit();
        TextMessage receive = createConsumer.receive(1000L);
        assertNotNull(receive);
        assertEquals("1st", receive.getText());
        createSession.rollback();
        assertNotNull(createConsumer.receive(100L));
        createSession.rollback();
        assertNull(createConsumer.receive(100L));
        TextMessage receive2 = createConsumer.receive(700L);
        assertNotNull(receive2);
        assertEquals("1st", receive2.getText());
        createSession.rollback();
        assertNull(createConsumer.receive(100L));
        assertNull(createConsumer.receive(500L));
        TextMessage receive3 = createConsumer.receive(700L);
        assertNotNull(receive3);
        assertEquals("1st", receive3.getText());
    }

    public void testNornalRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception {
        RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(0L);
        redeliveryPolicy.setRedeliveryDelay(500L);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue(getName());
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        createProducer.send(createSession.createTextMessage("1st"));
        createProducer.send(createSession.createTextMessage("2nd"));
        createSession.commit();
        TextMessage receive = createConsumer.receive(1000L);
        assertNotNull(receive);
        assertEquals("1st", receive.getText());
        createSession.rollback();
        assertNotNull(createConsumer.receive(100L));
        createSession.rollback();
        assertNull(createConsumer.receive(100L));
        TextMessage receive2 = createConsumer.receive(700L);
        assertNotNull(receive2);
        assertEquals("1st", receive2.getText());
        createSession.rollback();
        assertNull(createConsumer.receive(100L));
        TextMessage receive3 = createConsumer.receive(700L);
        assertNotNull(receive3);
        assertEquals("1st", receive3.getText());
    }

    public void testDLQHandling() throws Exception {
        RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(100L);
        redeliveryPolicy.setUseExponentialBackOff(false);
        redeliveryPolicy.setMaximumRedeliveries(2);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        MessageConsumer createConsumer2 = createSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
        createProducer.send(createSession.createTextMessage("1st"));
        createProducer.send(createSession.createTextMessage("2nd"));
        createSession.commit();
        TextMessage receive = createConsumer.receive(1000L);
        assertNotNull(receive);
        assertEquals("1st", receive.getText());
        createSession.rollback();
        TextMessage receive2 = createConsumer.receive(1000L);
        assertNotNull(receive2);
        assertEquals("1st", receive2.getText());
        createSession.rollback();
        TextMessage receive3 = createConsumer.receive(2000L);
        assertNotNull(receive3);
        assertEquals("1st", receive3.getText());
        createSession.rollback();
        TextMessage receive4 = createConsumer.receive(1000L);
        assertNotNull(receive4);
        assertEquals("2nd", receive4.getText());
        createSession.commit();
        TextMessage receive5 = createConsumer2.receive(1000L);
        assertNotNull(receive5);
        assertEquals("1st", receive5.getText());
        createSession.commit();
    }

    public void testInfiniteMaximumNumberOfRedeliveries() throws Exception {
        RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(100L);
        redeliveryPolicy.setUseExponentialBackOff(false);
        redeliveryPolicy.setMaximumRedeliveries(-1);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        createProducer.send(createSession.createTextMessage("1st"));
        createProducer.send(createSession.createTextMessage("2nd"));
        createSession.commit();
        TextMessage receive = createConsumer.receive(1000L);
        assertNotNull(receive);
        assertEquals("1st", receive.getText());
        createSession.rollback();
        TextMessage receive2 = createConsumer.receive(1000L);
        assertNotNull(receive2);
        assertEquals("1st", receive2.getText());
        createSession.rollback();
        TextMessage receive3 = createConsumer.receive(2000L);
        assertNotNull(receive3);
        assertEquals("1st", receive3.getText());
        createSession.rollback();
        TextMessage receive4 = createConsumer.receive(2000L);
        assertNotNull(receive4);
        assertEquals("1st", receive4.getText());
        createSession.rollback();
        TextMessage receive5 = createConsumer.receive(2000L);
        assertNotNull(receive5);
        assertEquals("1st", receive5.getText());
        createSession.rollback();
        TextMessage receive6 = createConsumer.receive(2000L);
        assertNotNull(receive6);
        assertEquals("1st", receive6.getText());
        createSession.commit();
        TextMessage receive7 = createConsumer.receive(2000L);
        assertNotNull(receive7);
        assertEquals("2nd", receive7.getText());
        createSession.commit();
    }

    public void testMaximumRedeliveryDelay() throws Exception {
        RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(10L);
        redeliveryPolicy.setUseExponentialBackOff(true);
        redeliveryPolicy.setMaximumRedeliveries(-1);
        redeliveryPolicy.setRedeliveryDelay(50L);
        redeliveryPolicy.setMaximumRedeliveryDelay(1000L);
        redeliveryPolicy.setBackOffMultiplier(2.0d);
        redeliveryPolicy.setUseExponentialBackOff(true);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        createProducer.send(createSession.createTextMessage("1st"));
        createProducer.send(createSession.createTextMessage("2nd"));
        createSession.commit();
        for (int i = 0; i < 10; i++) {
            TextMessage receive = createConsumer.receive(2000L);
            assertNotNull(receive);
            assertEquals("1st", receive.getText());
            createSession.rollback();
        }
        TextMessage receive2 = createConsumer.receive(2000L);
        assertNotNull(receive2);
        assertEquals("1st", receive2.getText());
        createSession.commit();
        TextMessage receive3 = createConsumer.receive(2000L);
        assertNotNull(receive3);
        assertEquals("2nd", receive3.getText());
        createSession.commit();
        assertTrue(redeliveryPolicy.getNextRedeliveryDelay(Long.MAX_VALUE) == 1000);
    }

    public void testZeroMaximumNumberOfRedeliveries() throws Exception {
        RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(100L);
        redeliveryPolicy.setUseExponentialBackOff(false);
        redeliveryPolicy.setMaximumRedeliveries(0);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        createProducer.send(createSession.createTextMessage("1st"));
        createProducer.send(createSession.createTextMessage("2nd"));
        createSession.commit();
        TextMessage receive = createConsumer.receive(1000L);
        assertNotNull(receive);
        assertEquals("1st", receive.getText());
        createSession.rollback();
        TextMessage receive2 = createConsumer.receive(1000L);
        assertNotNull(receive2);
        assertEquals("2nd", receive2.getText());
        createSession.commit();
    }

    public void testRepeatedRedeliveryReceiveNoCommit() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(true, 0);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        createSession.createProducer(activeMQQueue).send(createSession.createTextMessage("1st"));
        createSession.commit();
        MessageConsumer createConsumer = createSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
        for (int i = 0; i <= 5; i++) {
            this.connection = this.factory.createConnection(this.userName, this.password);
            this.connections.add(this.connection);
            RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
            redeliveryPolicy.setInitialRedeliveryDelay(0L);
            redeliveryPolicy.setUseExponentialBackOff(false);
            redeliveryPolicy.setMaximumRedeliveries(4);
            this.connection.start();
            ActiveMQTextMessage receive = this.connection.createSession(true, 1).createConsumer(activeMQQueue).receive(4000L);
            if (i <= 4) {
                assertEquals("1st", receive.getText());
                assertEquals(i, receive.getRedeliveryCounter());
            } else {
                assertNull("null on exceeding redelivery count", receive);
            }
            this.connection.close();
            this.connections.remove(this.connection);
        }
        TextMessage receive2 = createConsumer.receive(1000L);
        assertNotNull("Got message from DLQ", receive2);
        assertEquals("1st", receive2.getText());
        assertTrue("cause exception has policy ref", receive2.getStringProperty("dlqDeliveryFailureCause").contains("RedeliveryPolicy"));
        createSession.commit();
    }

    public void testRepeatedRedeliveryOnMessageNoCommit() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(true, 0);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        createSession.createProducer(activeMQQueue).send(createSession.createTextMessage("1st"));
        createSession.commit();
        MessageConsumer createConsumer = createSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        for (int i = 0; i <= 5; i++) {
            this.connection = this.factory.createConnection(this.userName, this.password);
            this.connections.add(this.connection);
            RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
            redeliveryPolicy.setInitialRedeliveryDelay(0L);
            redeliveryPolicy.setUseExponentialBackOff(false);
            redeliveryPolicy.setMaximumRedeliveries(4);
            this.connection.start();
            MessageConsumer createConsumer2 = this.connection.createSession(true, 0).createConsumer(activeMQQueue);
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            createConsumer2.setMessageListener(new MessageListener() { // from class: org.apache.activemq.RedeliveryPolicyTest.1
                public void onMessage(Message message) {
                    try {
                        ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage) message;
                        TestCase.assertEquals("1st", activeMQTextMessage.getText());
                        TestCase.assertEquals(atomicInteger.get(), activeMQTextMessage.getRedeliveryCounter());
                        atomicInteger.incrementAndGet();
                        countDownLatch.countDown();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            if (i <= 4) {
                assertTrue("listener done", countDownLatch.await(5L, TimeUnit.SECONDS));
            } else {
                assertFalse("listener done", countDownLatch.await(1L, TimeUnit.SECONDS));
            }
            this.connection.close();
            this.connections.remove(this.connection);
        }
        TextMessage receive = createConsumer.receive(1000L);
        assertNotNull("Got message from DLQ", receive);
        assertEquals("1st", receive.getText());
        assertTrue("cause exception has policy ref", receive.getStringProperty("dlqDeliveryFailureCause").contains("RedeliveryPolicy"));
        createSession.commit();
    }

    public void testRepeatedRedeliveryServerSessionNoCommit() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(true, 0);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        createSession.createProducer(activeMQQueue).send(createSession.createTextMessage("1st"));
        createSession.commit();
        MessageConsumer createConsumer = createSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        for (int i = 0; i <= 5; i++) {
            this.connection = this.factory.createConnection(this.userName, this.password);
            this.connections.add(this.connection);
            RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
            redeliveryPolicy.setInitialRedeliveryDelay(0L);
            redeliveryPolicy.setUseExponentialBackOff(false);
            redeliveryPolicy.setMaximumRedeliveries(4);
            this.connection.start();
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            final ActiveMQSession createSession2 = this.connection.createSession(true, 0);
            createSession2.setMessageListener(new MessageListener() { // from class: org.apache.activemq.RedeliveryPolicyTest.2
                public void onMessage(Message message) {
                    try {
                        ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage) message;
                        RedeliveryPolicyTest.LOG.info("Got: " + ((ActiveMQTextMessage) message).getMessageId() + ", seq:" + ((ActiveMQTextMessage) message).getMessageId().getBrokerSequenceId());
                        TestCase.assertEquals("1st", activeMQTextMessage.getText());
                        TestCase.assertEquals(atomicInteger.get(), activeMQTextMessage.getRedeliveryCounter());
                        atomicInteger.incrementAndGet();
                        countDownLatch.countDown();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            this.connection.createConnectionConsumer(activeMQQueue, (String) null, new ServerSessionPool() { // from class: org.apache.activemq.RedeliveryPolicyTest.3
                public ServerSession getServerSession() throws JMSException {
                    return new ServerSession() { // from class: org.apache.activemq.RedeliveryPolicyTest.3.1
                        public Session getSession() throws JMSException {
                            return createSession2;
                        }

                        public void start() throws JMSException {
                        }
                    };
                }
            }, 100, false);
            Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.RedeliveryPolicyTest.4
                public boolean isSatisified() throws Exception {
                    createSession2.run();
                    return countDownLatch.await(10L, TimeUnit.MILLISECONDS);
                }
            }, 5000L);
            if (i <= 4) {
                assertTrue("listener done @" + i, countDownLatch.await(5L, TimeUnit.SECONDS));
            } else {
                assertFalse("listener not done @" + i, countDownLatch.await(1L, TimeUnit.SECONDS));
            }
            this.connection.close();
            this.connections.remove(this.connection);
        }
        TextMessage receive = createConsumer.receive(1000L);
        assertNotNull("Got message from DLQ", receive);
        assertEquals("1st", receive.getText());
        assertTrue("cause exception has policy ref", receive.getStringProperty("dlqDeliveryFailureCause").contains("RedeliveryPolicy"));
        createSession.commit();
    }

    public void testInitialRedeliveryDelayZero() throws Exception {
        RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(0L);
        redeliveryPolicy.setUseExponentialBackOff(false);
        redeliveryPolicy.setMaximumRedeliveries(1);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        createProducer.send(createSession.createTextMessage("1st"));
        createProducer.send(createSession.createTextMessage("2nd"));
        createSession.commit();
        TextMessage receive = createConsumer.receive(100L);
        assertNotNull(receive);
        assertEquals("1st", receive.getText());
        createSession.rollback();
        TextMessage receive2 = createConsumer.receive(100L);
        assertNotNull(receive2);
        assertEquals("1st", receive2.getText());
        TextMessage receive3 = createConsumer.receive(100L);
        assertNotNull(receive3);
        assertEquals("2nd", receive3.getText());
        createSession.commit();
        createSession.commit();
    }

    public void testInitialRedeliveryDelayOne() throws Exception {
        RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(1000L);
        redeliveryPolicy.setUseExponentialBackOff(false);
        redeliveryPolicy.setMaximumRedeliveries(1);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        createProducer.send(createSession.createTextMessage("1st"));
        createProducer.send(createSession.createTextMessage("2nd"));
        createSession.commit();
        TextMessage receive = createConsumer.receive(100L);
        assertNotNull(receive);
        assertEquals("1st", receive.getText());
        createSession.rollback();
        assertNull(createConsumer.receive(100L));
        TextMessage receive2 = createConsumer.receive(2000L);
        assertNotNull(receive2);
        assertEquals("1st", receive2.getText());
        TextMessage receive3 = createConsumer.receive(100L);
        assertNotNull(receive3);
        assertEquals("2nd", receive3.getText());
        createSession.commit();
    }

    public void testRedeliveryDelayOne() throws Exception {
        RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(0L);
        redeliveryPolicy.setRedeliveryDelay(1000L);
        redeliveryPolicy.setUseExponentialBackOff(false);
        redeliveryPolicy.setMaximumRedeliveries(2);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        createProducer.send(createSession.createTextMessage("1st"));
        createProducer.send(createSession.createTextMessage("2nd"));
        createSession.commit();
        TextMessage receive = createConsumer.receive(100L);
        assertNotNull(receive);
        assertEquals("1st", receive.getText());
        createSession.rollback();
        assertNotNull("first immediate redelivery", createConsumer.receive(100L));
        createSession.rollback();
        TextMessage receive2 = createConsumer.receive(100L);
        assertNull("second delivery delayed: " + receive2, receive2);
        TextMessage receive3 = createConsumer.receive(2000L);
        assertNotNull(receive3);
        assertEquals("1st", receive3.getText());
        TextMessage receive4 = createConsumer.receive(100L);
        assertNotNull(receive4);
        assertEquals("2nd", receive4.getText());
        createSession.commit();
    }

    public void testRedeliveryPolicyPerDestination() throws Exception {
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(0L);
        redeliveryPolicy.setRedeliveryDelay(1000L);
        redeliveryPolicy.setUseExponentialBackOff(false);
        redeliveryPolicy.setMaximumRedeliveries(2);
        RedeliveryPolicy redeliveryPolicy2 = new RedeliveryPolicy();
        redeliveryPolicy2.setInitialRedeliveryDelay(0L);
        redeliveryPolicy2.setRedeliveryDelay(1000L);
        redeliveryPolicy2.setUseExponentialBackOff(false);
        redeliveryPolicy2.setMaximumRedeliveries(3);
        RedeliveryPolicyMap redeliveryPolicyMap = this.connection.getRedeliveryPolicyMap();
        redeliveryPolicyMap.put(new ActiveMQTopic(">"), redeliveryPolicy2);
        redeliveryPolicyMap.put(new ActiveMQQueue(">"), redeliveryPolicy);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        ActiveMQTopic activeMQTopic = new ActiveMQTopic("TEST");
        MessageProducer createProducer = createSession.createProducer((Destination) null);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        MessageConsumer createConsumer2 = createSession.createConsumer(activeMQTopic);
        createProducer.send(activeMQQueue, createSession.createTextMessage("1st"));
        createProducer.send(activeMQQueue, createSession.createTextMessage("2nd"));
        createProducer.send(activeMQTopic, createSession.createTextMessage("1st"));
        createProducer.send(activeMQTopic, createSession.createTextMessage("2nd"));
        createSession.commit();
        TextMessage receive = createConsumer.receive(100L);
        assertNotNull(receive);
        assertEquals("1st", receive.getText());
        TextMessage receive2 = createConsumer2.receive(100L);
        assertNotNull(receive2);
        assertEquals("1st", receive2.getText());
        TextMessage receive3 = createConsumer.receive(100L);
        assertNotNull(receive3);
        assertEquals("2nd", receive3.getText());
        TextMessage receive4 = createConsumer2.receive(100L);
        assertNotNull(receive4);
        assertEquals("2nd", receive4.getText());
        createSession.rollback();
        assertNotNull("first immediate redelivery", createConsumer.receive(100L));
        assertNotNull("first immediate redelivery", createConsumer2.receive(100L));
        createSession.rollback();
        TextMessage receive5 = createConsumer.receive(100L);
        assertNull("second delivery delayed: " + receive5, receive5);
        TextMessage receive6 = createConsumer2.receive(100L);
        assertNull("second delivery delayed: " + receive6, receive6);
        TextMessage receive7 = createConsumer.receive(2000L);
        assertNotNull(receive7);
        assertEquals("1st", receive7.getText());
        TextMessage receive8 = createConsumer2.receive(2000L);
        assertNotNull(receive8);
        assertEquals("1st", receive8.getText());
        TextMessage receive9 = createConsumer.receive(100L);
        assertNotNull(receive9);
        assertEquals("2nd", receive9.getText());
        TextMessage receive10 = createConsumer2.receive(100L);
        assertNotNull(receive10);
        assertEquals("2nd", receive10.getText());
        createSession.rollback();
        TextMessage receive11 = createConsumer.receive(2000L);
        assertNotNull(receive11);
        assertEquals("1st", receive11.getText());
        TextMessage receive12 = createConsumer2.receive(2000L);
        assertNotNull(receive12);
        assertEquals("1st", receive12.getText());
        TextMessage receive13 = createConsumer.receive(100L);
        assertNotNull(receive13);
        assertEquals("2nd", receive13.getText());
        TextMessage receive14 = createConsumer2.receive(100L);
        assertNotNull(receive14);
        assertEquals("2nd", receive14.getText());
        createSession.rollback();
        assertNull(createConsumer.receive(2000L));
        TextMessage receive15 = createConsumer2.receive(2000L);
        assertNotNull(receive15);
        assertEquals("1st", receive15.getText());
        assertNull(createConsumer.receive(100L));
        TextMessage receive16 = createConsumer2.receive(100L);
        assertNotNull(receive16);
        assertEquals("2nd", receive16.getText());
        createSession.commit();
    }
}
