package org.apache.activemq;

import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/MessageListenerRedeliveryTest.class */
public class MessageListenerRedeliveryTest extends TestCase {
    private static final Logger LOG = LoggerFactory.getLogger(MessageListenerRedeliveryTest.class);
    private Connection connection;

    /* loaded from: input_file:org/apache/activemq/MessageListenerRedeliveryTest$TestMessageListener.class */
    private class TestMessageListener implements MessageListener {
        public int counter;
        private Session session;

        public TestMessageListener(Session session) {
            this.session = session;
        }

        public void onMessage(Message message) {
            try {
                MessageListenerRedeliveryTest.LOG.info("Message Received: " + message);
                this.counter++;
                if (this.counter <= 4) {
                    MessageListenerRedeliveryTest.LOG.info("Message Rollback.");
                    this.session.rollback();
                } else {
                    MessageListenerRedeliveryTest.LOG.info("Message Commit.");
                    message.acknowledge();
                    this.session.commit();
                }
            } catch (JMSException e) {
                MessageListenerRedeliveryTest.LOG.error("Error when rolling back transaction");
            }
        }
    }

    protected void setUp() throws Exception {
        this.connection = createConnection();
    }

    protected void tearDown() throws Exception {
        if (this.connection != null) {
            this.connection.close();
            this.connection = null;
        }
    }

    protected RedeliveryPolicy getRedeliveryPolicy() {
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(0L);
        redeliveryPolicy.setRedeliveryDelay(1000L);
        redeliveryPolicy.setMaximumRedeliveries(3);
        redeliveryPolicy.setBackOffMultiplier(2.0d);
        redeliveryPolicy.setUseExponentialBackOff(true);
        return redeliveryPolicy;
    }

    protected Connection createConnection() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false&marshal=true");
        activeMQConnectionFactory.setRedeliveryPolicy(getRedeliveryPolicy());
        return activeMQConnectionFactory.createConnection();
    }

    public void testQueueRollbackConsumerListener() throws JMSException {
        this.connection.start();
        Session createSession = this.connection.createSession(true, 2);
        Queue createQueue = createSession.createQueue("queue-" + getName());
        MessageProducer createProducer = createProducer(createSession, createQueue);
        createProducer.send(createTextMessage(createSession));
        createSession.commit();
        ActiveMQMessageConsumer createConsumer = createSession.createConsumer(createQueue);
        createConsumer.setRedeliveryPolicy(getRedeliveryPolicy());
        TestMessageListener testMessageListener = new TestMessageListener(createSession);
        createConsumer.setMessageListener(testMessageListener);
        try {
            Thread.sleep(500L);
        } catch (InterruptedException e) {
        }
        assertEquals(2, testMessageListener.counter);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e2) {
        }
        assertEquals(3, testMessageListener.counter);
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e3) {
        }
        assertEquals(4, testMessageListener.counter);
        createProducer.send(createTextMessage(createSession));
        createSession.commit();
        try {
            Thread.sleep(500L);
        } catch (InterruptedException e4) {
        }
        assertEquals(5, testMessageListener.counter);
        try {
            Thread.sleep(1500L);
        } catch (InterruptedException e5) {
        }
        assertEquals(5, testMessageListener.counter);
        createSession.close();
    }

    public void testQueueRollbackSessionListener() throws JMSException {
        this.connection.start();
        Session createSession = this.connection.createSession(true, 2);
        Queue createQueue = createSession.createQueue("queue-" + getName());
        MessageProducer createProducer = createProducer(createSession, createQueue);
        createProducer.send(createTextMessage(createSession));
        createSession.commit();
        ActiveMQMessageConsumer createConsumer = createSession.createConsumer(createQueue);
        createConsumer.setRedeliveryPolicy(getRedeliveryPolicy());
        TestMessageListener testMessageListener = new TestMessageListener(createSession);
        createConsumer.setMessageListener(testMessageListener);
        try {
            Thread.sleep(500L);
        } catch (InterruptedException e) {
        }
        assertEquals(2, testMessageListener.counter);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e2) {
        }
        assertEquals(3, testMessageListener.counter);
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e3) {
        }
        assertEquals(4, testMessageListener.counter);
        createProducer.send(createTextMessage(createSession));
        createSession.commit();
        try {
            Thread.sleep(500L);
        } catch (InterruptedException e4) {
        }
        assertEquals(5, testMessageListener.counter);
        try {
            Thread.sleep(1500L);
        } catch (InterruptedException e5) {
        }
        assertEquals(5, testMessageListener.counter);
        createSession.close();
    }

    public void testQueueSessionListenerExceptionRetry() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        Queue createQueue = createSession.createQueue("queue-" + getName());
        MessageProducer createProducer = createProducer(createSession, createQueue);
        createProducer.send(createTextMessage(createSession, "1"));
        createProducer.send(createTextMessage(createSession, "2"));
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final int maximumRedeliveries = getRedeliveryPolicy().getMaximumRedeliveries();
        final ArrayList arrayList = new ArrayList();
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.MessageListenerRedeliveryTest.1
            public void onMessage(Message message) {
                MessageListenerRedeliveryTest.LOG.info("Message Received: " + message);
                try {
                    arrayList.add(((TextMessage) message).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                    Assert.fail(e.toString());
                }
                if (atomicInteger.incrementAndGet() < maximumRedeliveries) {
                    throw new RuntimeException(MessageListenerRedeliveryTest.this.getName() + " force a redelivery");
                }
                atomicInteger.set(0);
                countDownLatch.countDown();
            }
        });
        assertTrue("got message before retry expiry", countDownLatch.await(20L, TimeUnit.SECONDS));
        for (int i = 0; i < maximumRedeliveries; i++) {
            assertEquals("got first redelivered: " + i, "1", (String) arrayList.get(i));
        }
        for (int i2 = maximumRedeliveries; i2 < maximumRedeliveries * 2; i2++) {
            assertEquals("got first redelivered: " + i2, "2", (String) arrayList.get(i2));
        }
        createSession.close();
    }

    public void testQueueSessionListenerExceptionDlq() throws Exception {
        this.connection.start();
        Session createSession = this.connection.createSession(false, 1);
        Queue createQueue = createSession.createQueue("queue-" + getName());
        createProducer(createSession, createQueue).send(createTextMessage(createSession));
        final Message[] messageArr = new Message[1];
        MessageConsumer createConsumer = createSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ"));
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.MessageListenerRedeliveryTest.2
            public void onMessage(Message message) {
                MessageListenerRedeliveryTest.LOG.info("DLQ Message Received: " + message);
                messageArr[0] = message;
                countDownLatch.countDown();
            }
        });
        MessageConsumer createConsumer2 = createSession.createConsumer(createQueue);
        final CountDownLatch countDownLatch2 = new CountDownLatch(getRedeliveryPolicy().getMaximumRedeliveries());
        createConsumer2.setMessageListener(new MessageListener() { // from class: org.apache.activemq.MessageListenerRedeliveryTest.3
            public void onMessage(Message message) {
                MessageListenerRedeliveryTest.LOG.info("Message Received: " + message);
                countDownLatch2.countDown();
                throw new RuntimeException(MessageListenerRedeliveryTest.this.getName() + " force a redelivery");
            }
        });
        assertTrue("got message before retry expiry", countDownLatch2.await(20L, TimeUnit.SECONDS));
        assertTrue("got dlq message", countDownLatch.await(20L, TimeUnit.SECONDS));
        Message message = messageArr[0];
        assertNotNull("dlq message captured", message);
        String stringProperty = message.getStringProperty("dlqDeliveryFailureCause");
        assertTrue("cause exception is remembered", stringProperty.contains("RuntimeException"));
        assertTrue("is correct exception", stringProperty.contains(getName()));
        createSession.close();
    }

    private TextMessage createTextMessage(Session session, String str) throws JMSException {
        return session.createTextMessage(str);
    }

    private TextMessage createTextMessage(Session session) throws JMSException {
        return session.createTextMessage("Hello");
    }

    private MessageProducer createProducer(Session session, Destination destination) throws JMSException {
        MessageProducer createProducer = session.createProducer(destination);
        createProducer.setDeliveryMode(getDeliveryMode());
        return createProducer;
    }

    protected int getDeliveryMode() {
        return 2;
    }
}
