package org.apache.activemq.artemis.tests.integration.openwire.amq;

import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.TopicSubscriber;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerAccessor;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.tests.integration.client.AutoCreateJmsDestinationTest;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.class */
public class RedeliveryPolicyTest extends BasicOpenWireTest {
    @Override // org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest, org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase
    public void setUp() throws Exception {
        this.realStore = true;
        super.setUp();
    }

    @Test
    public void testGetNext() throws Exception {
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(0L);
        redeliveryPolicy.setRedeliveryDelay(500L);
        redeliveryPolicy.setBackOffMultiplier(2.0d);
        redeliveryPolicy.setUseExponentialBackOff(true);
        long nextRedeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(0L);
        assertEquals(500L, nextRedeliveryDelay);
        long nextRedeliveryDelay2 = redeliveryPolicy.getNextRedeliveryDelay(nextRedeliveryDelay);
        assertEquals(1000L, nextRedeliveryDelay2);
        long nextRedeliveryDelay3 = redeliveryPolicy.getNextRedeliveryDelay(nextRedeliveryDelay2);
        assertEquals(2000L, nextRedeliveryDelay3);
        redeliveryPolicy.setUseExponentialBackOff(false);
        assertEquals(500L, redeliveryPolicy.getNextRedeliveryDelay(nextRedeliveryDelay3));
    }

    @Test
    public void testExponentialRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception {
        RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(0L);
        redeliveryPolicy.setRedeliveryDelay(500L);
        redeliveryPolicy.setBackOffMultiplier(2.0d);
        redeliveryPolicy.setUseExponentialBackOff(true);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("testExponentialRedeliveryPolicyDelaysDeliveryOnRollback");
        makeSureCoreQueueExist("testExponentialRedeliveryPolicyDelaysDeliveryOnRollback");
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        createProducer.send(createSession.createTextMessage("1st"));
        createProducer.send(createSession.createTextMessage("2nd"));
        createSession.commit();
        TextMessage receive = createConsumer.receive(1000L);
        assertNotNull(receive);
        assertEquals("1st", receive.getText());
        createSession.rollback();
        assertNotNull(createConsumer.receive(100L));
        createSession.rollback();
        assertNull(createConsumer.receive(100L));
        TextMessage receive2 = createConsumer.receive(700L);
        assertNotNull(receive2);
        assertEquals("1st", receive2.getText());
        createSession.rollback();
        assertNull(createConsumer.receive(100L));
        assertNull(createConsumer.receive(500L));
        TextMessage receive3 = createConsumer.receive(700L);
        assertNotNull(receive3);
        assertEquals("1st", receive3.getText());
    }

    @Test
    public void testNornalRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception {
        RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(0L);
        redeliveryPolicy.setRedeliveryDelay(500L);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("testNornalRedeliveryPolicyDelaysDeliveryOnRollback");
        makeSureCoreQueueExist("testNornalRedeliveryPolicyDelaysDeliveryOnRollback");
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        createProducer.send(createSession.createTextMessage("1st"));
        createProducer.send(createSession.createTextMessage("2nd"));
        createSession.commit();
        TextMessage receive = createConsumer.receive(1000L);
        assertNotNull(receive);
        assertEquals("1st", receive.getText());
        createSession.rollback();
        assertNotNull(createConsumer.receive(100L));
        createSession.rollback();
        assertNull(createConsumer.receive(100L));
        TextMessage receive2 = createConsumer.receive(700L);
        assertNotNull(receive2);
        assertEquals("1st", receive2.getText());
        createSession.rollback();
        assertNull(createConsumer.receive(100L));
        TextMessage receive3 = createConsumer.receive(700L);
        assertNotNull(receive3);
        assertEquals("1st", receive3.getText());
    }

    @Test
    public void testDLQHandling() throws Exception {
        makeSureCoreQueueExist("ActiveMQ.DLQ");
        RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(100L);
        redeliveryPolicy.setUseExponentialBackOff(false);
        redeliveryPolicy.setMaximumRedeliveries(2);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 2);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        makeSureCoreQueueExist("TEST");
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        MessageConsumer createConsumer2 = createSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
        createProducer.send(createSession.createTextMessage("1st"));
        createProducer.send(createSession.createTextMessage("2nd"));
        createSession.commit();
        TextMessage receive = createConsumer.receive(1000L);
        assertNotNull(receive);
        assertEquals("1st", receive.getText());
        createSession.rollback();
        TextMessage receive2 = createConsumer.receive(1000L);
        assertNotNull(receive2);
        assertEquals("1st", receive2.getText());
        createSession.rollback();
        TextMessage receive3 = createConsumer.receive(2000L);
        assertNotNull(receive3);
        assertEquals("1st", receive3.getText());
        createSession.rollback();
        TextMessage receive4 = createConsumer.receive(1000L);
        assertNotNull(receive4);
        assertEquals("2nd", receive4.getText());
        createSession.commit();
        TextMessage receive5 = createConsumer2.receive(1000L);
        assertNotNull(receive5);
        assertEquals("1st", receive5.getText());
        assertTrue("cause exception has policy ref", receive5.getStringProperty("dlqDeliveryFailureCause").contains("RedeliveryPolicy"));
        createSession.commit();
    }

    @Test
    public void testInfiniteMaximumNumberOfRedeliveries() throws Exception {
        RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(100L);
        redeliveryPolicy.setUseExponentialBackOff(false);
        redeliveryPolicy.setMaximumRedeliveries(-1);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        makeSureCoreQueueExist("TEST");
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        createProducer.send(createSession.createTextMessage("1st"));
        createProducer.send(createSession.createTextMessage("2nd"));
        createSession.commit();
        TextMessage receive = createConsumer.receive(1000L);
        assertNotNull(receive);
        assertEquals("1st", receive.getText());
        createSession.rollback();
        TextMessage receive2 = createConsumer.receive(1000L);
        assertNotNull(receive2);
        assertEquals("1st", receive2.getText());
        createSession.rollback();
        TextMessage receive3 = createConsumer.receive(2000L);
        assertNotNull(receive3);
        assertEquals("1st", receive3.getText());
        createSession.rollback();
        TextMessage receive4 = createConsumer.receive(2000L);
        assertNotNull(receive4);
        assertEquals("1st", receive4.getText());
        createSession.rollback();
        TextMessage receive5 = createConsumer.receive(2000L);
        assertNotNull(receive5);
        assertEquals("1st", receive5.getText());
        createSession.rollback();
        TextMessage receive6 = createConsumer.receive(2000L);
        assertNotNull(receive6);
        assertEquals("1st", receive6.getText());
        createSession.commit();
        TextMessage receive7 = createConsumer.receive(2000L);
        assertNotNull(receive7);
        assertEquals("2nd", receive7.getText());
        createSession.commit();
    }

    @Test
    public void testMaximumRedeliveryDelay() throws Exception {
        RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(10L);
        redeliveryPolicy.setUseExponentialBackOff(true);
        redeliveryPolicy.setMaximumRedeliveries(-1);
        redeliveryPolicy.setRedeliveryDelay(50L);
        redeliveryPolicy.setMaximumRedeliveryDelay(1000L);
        redeliveryPolicy.setBackOffMultiplier(2.0d);
        redeliveryPolicy.setUseExponentialBackOff(true);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        makeSureCoreQueueExist("TEST");
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        createProducer.send(createSession.createTextMessage("1st"));
        createProducer.send(createSession.createTextMessage("2nd"));
        createSession.commit();
        for (int i = 0; i < 10; i++) {
            TextMessage receive = createConsumer.receive(2000L);
            assertNotNull(receive);
            assertEquals("1st", receive.getText());
            createSession.rollback();
        }
        TextMessage receive2 = createConsumer.receive(2000L);
        assertNotNull(receive2);
        assertEquals("1st", receive2.getText());
        createSession.commit();
        TextMessage receive3 = createConsumer.receive(2000L);
        assertNotNull(receive3);
        assertEquals("2nd", receive3.getText());
        createSession.commit();
        assertTrue(redeliveryPolicy.getNextRedeliveryDelay(Long.MAX_VALUE) == 1000);
    }

    @Test
    public void testZeroMaximumNumberOfRedeliveries() throws Exception {
        RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(100L);
        redeliveryPolicy.setUseExponentialBackOff(false);
        redeliveryPolicy.setMaximumRedeliveries(0);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        makeSureCoreQueueExist("TEST");
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        createProducer.send(createSession.createTextMessage("1st"));
        createProducer.send(createSession.createTextMessage("2nd"));
        createSession.commit();
        TextMessage receive = createConsumer.receive(1000L);
        assertNotNull(receive);
        assertEquals("1st", receive.getText());
        createSession.rollback();
        TextMessage receive2 = createConsumer.receive(1000L);
        assertNotNull(receive2);
        assertEquals("2nd", receive2.getText());
        createSession.commit();
    }

    @Test
    public void testRedeliveredMessageNotOverflowingPrefetch() throws Exception {
        this.connection.getPrefetchPolicy().setAll(10);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        makeSureCoreQueueExist("TEST");
        QueueControl queueControl = (QueueControl) this.server.getManagementService().getResource("queue.TEST");
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        for (int i = 0; i < 20; i++) {
            createProducer.send(createSession.createTextMessage("MSG" + i));
            createSession.commit();
        }
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        Wait.assertEquals(10, () -> {
            return queueControl.getDeliveringCount();
        }, 3000L, 100L);
        for (int i2 = 0; i2 < 20; i2++) {
            assertNotNull(createConsumer.receive(2000L));
            if (i2 == 3) {
                createSession.rollback();
            } else {
                createSession.commit();
                assertTrue(queueControl.getDeliveringCount() <= 10);
            }
        }
        assertNotNull(createConsumer.receive(2000L));
        createSession.commit();
    }

    @Test
    public void testCountersAreCorrectAfterSendToDLQ() throws Exception {
        RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
        redeliveryPolicy.setMaximumRedeliveries(0);
        redeliveryPolicy.setInitialRedeliveryDelay(0L);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        makeSureCoreQueueExist("TEST");
        QueueControl queueControl = (QueueControl) this.server.getManagementService().getResource("queue.TEST");
        createSession.createProducer(activeMQQueue).send(createSession.createTextMessage("The Message"));
        createSession.commit();
        assertNotNull(createSession.createConsumer(activeMQQueue).receive(2000L));
        createSession.rollback();
        Wait.assertEquals(0L, () -> {
            return queueControl.getMessageCount();
        });
        createSession.close();
        assertEquals(0L, queueControl.getPersistentSize());
    }

    @Test
    public void testRedeliveryRefCleanup() throws Exception {
        RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
        redeliveryPolicy.setUseExponentialBackOff(false);
        redeliveryPolicy.setMaximumRedeliveries(-1);
        redeliveryPolicy.setRedeliveryDelay(50L);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 1);
        Session createSession2 = this.connection.createSession(true, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        makeSureCoreQueueExist("TEST");
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        MessageConsumer createConsumer = createSession2.createConsumer(activeMQQueue);
        for (int i = 0; i < 5; i++) {
            createProducer.send(createSession.createTextMessage("MessageText"));
            createSession.commit();
            assertNotNull(createConsumer.receive(2000L));
            createSession2.rollback();
            assertNotNull(createConsumer.receive(2000L));
            createSession2.commit();
        }
        ServerConsumer serverConsumer = null;
        Iterator it = this.server.getSessions().iterator();
        while (it.hasNext()) {
            for (ServerConsumer serverConsumer2 : ((ServerSession) it.next()).getServerConsumers()) {
                if (serverConsumer2.getQueue().getName().toString() == "TEST") {
                    serverConsumer = serverConsumer2;
                }
            }
        }
        assertTrue(AMQConsumerAccessor.getRolledbackMessageRefs((AMQConsumer) serverConsumer.getProtocolData()).isEmpty());
    }

    @Test
    public void testInitialRedeliveryDelayZero() throws Exception {
        RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(0L);
        redeliveryPolicy.setUseExponentialBackOff(false);
        redeliveryPolicy.setMaximumRedeliveries(1);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        makeSureCoreQueueExist("TEST");
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        createProducer.send(createSession.createTextMessage("1st"));
        createProducer.send(createSession.createTextMessage("2nd"));
        createSession.commit();
        TextMessage receive = createConsumer.receive(100L);
        assertNotNull(receive);
        assertEquals("1st", receive.getText());
        createSession.rollback();
        TextMessage receive2 = createConsumer.receive(100L);
        assertNotNull(receive2);
        assertEquals("1st", receive2.getText());
        TextMessage receive3 = createConsumer.receive(100L);
        assertNotNull(receive3);
        assertEquals("2nd", receive3.getText());
        createSession.commit();
        createSession.commit();
    }

    @Test
    public void testInitialRedeliveryDelayOne() throws Exception {
        RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(1000L);
        redeliveryPolicy.setUseExponentialBackOff(false);
        redeliveryPolicy.setMaximumRedeliveries(1);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        makeSureCoreQueueExist("TEST");
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        createProducer.send(createSession.createTextMessage("1st"));
        createProducer.send(createSession.createTextMessage("2nd"));
        createSession.commit();
        TextMessage receive = createConsumer.receive(100L);
        assertNotNull(receive);
        assertEquals("1st", receive.getText());
        createSession.rollback();
        assertNull(createConsumer.receive(100L));
        TextMessage receive2 = createConsumer.receive(2000L);
        assertNotNull(receive2);
        assertEquals("1st", receive2.getText());
        TextMessage receive3 = createConsumer.receive(100L);
        assertNotNull(receive3);
        assertEquals("2nd", receive3.getText());
        createSession.commit();
    }

    @Test
    public void testRedeliveryDelayOne() throws Exception {
        RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(0L);
        redeliveryPolicy.setRedeliveryDelay(1000L);
        redeliveryPolicy.setUseExponentialBackOff(false);
        redeliveryPolicy.setMaximumRedeliveries(2);
        this.connection.start();
        Session createSession = this.connection.createSession(true, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        makeSureCoreQueueExist("TEST");
        MessageProducer createProducer = createSession.createProducer(activeMQQueue);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        createProducer.send(createSession.createTextMessage("1st"));
        createProducer.send(createSession.createTextMessage("2nd"));
        createSession.commit();
        TextMessage receive = createConsumer.receive(100L);
        assertNotNull(receive);
        assertEquals("1st", receive.getText());
        createSession.rollback();
        assertNotNull("first immediate redelivery", createConsumer.receive(100L));
        createSession.rollback();
        TextMessage receive2 = createConsumer.receive(100L);
        assertNull("second delivery delayed: " + receive2, receive2);
        TextMessage receive3 = createConsumer.receive(2000L);
        assertNotNull(receive3);
        assertEquals("1st", receive3.getText());
        TextMessage receive4 = createConsumer.receive(100L);
        assertNotNull(receive4);
        assertEquals("2nd", receive4.getText());
        createSession.commit();
    }

    private void send(Session session, MessageProducer messageProducer, Destination destination, String str) throws Exception {
        TextMessage createTextMessage = session.createTextMessage(str);
        createTextMessage.setStringProperty("texto", str);
        messageProducer.send(destination, createTextMessage);
    }

    @Test
    public void testRedeliveryPolicyPerDestination() throws Exception {
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(0L);
        redeliveryPolicy.setRedeliveryDelay(1000L);
        redeliveryPolicy.setUseExponentialBackOff(false);
        redeliveryPolicy.setMaximumRedeliveries(2);
        RedeliveryPolicy redeliveryPolicy2 = new RedeliveryPolicy();
        redeliveryPolicy2.setInitialRedeliveryDelay(0L);
        redeliveryPolicy2.setRedeliveryDelay(1000L);
        redeliveryPolicy2.setUseExponentialBackOff(false);
        redeliveryPolicy2.setMaximumRedeliveries(3);
        RedeliveryPolicyMap redeliveryPolicyMap = this.connection.getRedeliveryPolicyMap();
        redeliveryPolicyMap.put(new ActiveMQTopic(">"), redeliveryPolicy2);
        redeliveryPolicyMap.put(new ActiveMQQueue(">"), redeliveryPolicy);
        this.connection.setClientID("id1");
        this.connection.start();
        Session createSession = this.connection.createSession(true, 1);
        ActiveMQQueue activeMQQueue = new ActiveMQQueue("TEST");
        ActiveMQTopic activeMQTopic = new ActiveMQTopic("TESTTOPIC");
        makeSureCoreQueueExist("TEST");
        MessageProducer createProducer = createSession.createProducer((Destination) null);
        createProducer.setDeliveryMode(2);
        MessageConsumer createConsumer = createSession.createConsumer(activeMQQueue);
        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(activeMQTopic, "tp1");
        send(createSession, createProducer, activeMQQueue, "1st");
        send(createSession, createProducer, activeMQQueue, "2nd");
        send(createSession, createProducer, activeMQTopic, "1st");
        send(createSession, createProducer, activeMQTopic, "2nd");
        createSession.commit();
        TextMessage receive = createConsumer.receive(100L);
        assertNotNull(receive);
        assertEquals("1st", receive.getText());
        TextMessage receive2 = createDurableSubscriber.receive(100L);
        assertNotNull(receive2);
        assertEquals("1st", receive2.getText());
        TextMessage receive3 = createConsumer.receive(100L);
        assertNotNull(receive3);
        assertEquals("2nd", receive3.getText());
        TextMessage receive4 = createDurableSubscriber.receive(100L);
        assertNotNull(receive4);
        assertEquals("2nd", receive4.getText());
        createSession.rollback();
        assertNotNull("first immediate redelivery", createConsumer.receive(100L));
        assertNotNull("first immediate redelivery", createDurableSubscriber.receive(100L));
        createSession.rollback();
        TextMessage receive5 = createConsumer.receive(100L);
        assertNull("second delivery delayed: " + receive5, receive5);
        TextMessage receive6 = createDurableSubscriber.receive(100L);
        assertNull("second delivery delayed: " + receive6, receive6);
        TextMessage receive7 = createConsumer.receive(2000L);
        assertNotNull(receive7);
        assertEquals("1st", receive7.getText());
        TextMessage receive8 = createDurableSubscriber.receive(2000L);
        assertNotNull(receive8);
        assertEquals("1st", receive8.getText());
        TextMessage receive9 = createConsumer.receive(100L);
        assertNotNull(receive9);
        assertEquals("2nd", receive9.getText());
        TextMessage receive10 = createDurableSubscriber.receive(100L);
        assertNotNull(receive10);
        assertEquals("2nd", receive10.getText());
        createSession.rollback();
        TextMessage receive11 = createConsumer.receive(2000L);
        assertNotNull(receive11);
        assertEquals("1st", receive11.getText());
        TextMessage receive12 = createDurableSubscriber.receive(2000L);
        assertNotNull(receive12);
        assertEquals("1st", receive12.getText());
        TextMessage receive13 = createConsumer.receive(100L);
        assertNotNull(receive13);
        assertEquals("2nd", receive13.getText());
        TextMessage receive14 = createDurableSubscriber.receive(100L);
        assertNotNull(receive14);
        assertEquals("2nd", receive14.getText());
        createSession.rollback();
        assertNull(createConsumer.receive(2000L));
        TextMessage receive15 = createDurableSubscriber.receive(2000L);
        assertNotNull(receive15);
        assertEquals("1st", receive15.getText());
        assertNull(createConsumer.receive(100L));
        TextMessage receive16 = createDurableSubscriber.receive(100L);
        assertNotNull(receive16);
        assertEquals("2nd", receive16.getText());
        createSession.commit();
    }

    @Test
    public void testClientRedlivery() throws Exception {
        try {
            Session createSession = this.connection.createSession(false, 2);
            makeSureCoreQueueExist("TEST");
            createSession.createProducer(createSession.createQueue("TEST")).send(createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME));
            this.connection.close();
            for (int i = 0; i < 10; i++) {
                this.connection = this.factory.createConnection();
                this.connection.start();
                try {
                    Session createSession2 = this.connection.createSession(false, 2);
                    Message receive = createSession2.createConsumer(createSession2.createQueue("TEST")).receive(1000L);
                    assertNotNull("Message null on iteration " + i, receive);
                    if (i > 0) {
                        assertTrue(receive.getJMSRedelivered());
                    }
                    this.connection.close();
                } finally {
                }
            }
        } finally {
        }
    }

    @Test
    public void verifyNoRedeliveryFlagAfterCloseNoReceive() throws Exception {
        try {
            Session createSession = this.connection.createSession(false, 2);
            makeSureCoreQueueExist("TEST");
            createSession.createProducer(createSession.createQueue("TEST")).send(createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME));
            this.connection.close();
            this.connection = this.factory.createConnection();
            this.connection.start();
            try {
                Session createSession2 = this.connection.createSession(false, 2);
                Queue createQueue = createSession2.createQueue("TEST");
                MessageConsumer createConsumer = createSession2.createConsumer(createQueue);
                TimeUnit.MILLISECONDS.sleep(500L);
                createConsumer.close();
                Message receive = createSession2.createConsumer(createQueue).receive(1000L);
                assertNotNull("Message null", receive);
                assertFalse(receive.getJMSRedelivered());
                this.connection.close();
            } finally {
            }
        } finally {
        }
    }
}
