package org.apache.activemq.usecases;

import java.util.LinkedHashSet;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.class */
public class NonBlockingConsumerRedeliveryTest {
    private static final Logger LOG = LoggerFactory.getLogger(NonBlockingConsumerRedeliveryTest.class);
    private final String destinationName = "Destination";
    private final int MSG_COUNT = 100;
    private BrokerService broker;
    private String connectionUri;
    private ActiveMQConnectionFactory connectionFactory;

    @Test
    public void testMessageDeleiveredWhenNonBlockingEnabled() throws Exception {
        final LinkedHashSet linkedHashSet = new LinkedHashSet();
        LinkedHashSet linkedHashSet2 = new LinkedHashSet();
        LinkedHashSet linkedHashSet3 = new LinkedHashSet();
        Connection createConnection = this.connectionFactory.createConnection();
        Session createSession = createConnection.createSession(true, 1);
        createSession.createConsumer(createSession.createQueue("Destination")).setMessageListener(new MessageListener() { // from class: org.apache.activemq.usecases.NonBlockingConsumerRedeliveryTest.1
            public void onMessage(Message message) {
                linkedHashSet.add(message);
            }
        });
        sendMessages();
        createSession.commit();
        createConnection.start();
        Assert.assertTrue("Pre-Rollback expects to receive: 100 messages.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.NonBlockingConsumerRedeliveryTest.2
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                NonBlockingConsumerRedeliveryTest.LOG.info("Consumer has received " + linkedHashSet.size() + " messages.");
                return linkedHashSet.size() == 100;
            }
        }));
        linkedHashSet2.addAll(linkedHashSet);
        linkedHashSet.clear();
        createSession.rollback();
        Assert.assertTrue("Post-Rollback expects to receive: 100 messages.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.NonBlockingConsumerRedeliveryTest.3
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                NonBlockingConsumerRedeliveryTest.LOG.info("Consumer has received " + linkedHashSet.size() + " messages since rollback.");
                return linkedHashSet.size() == 100;
            }
        }));
        linkedHashSet3.addAll(linkedHashSet);
        linkedHashSet.clear();
        Assert.assertEquals(linkedHashSet2.size(), linkedHashSet3.size());
        Assert.assertEquals(linkedHashSet2, linkedHashSet3);
        createSession.commit();
    }

    @Test
    public void testMessageDeleiveryDoesntStop() throws Exception {
        final LinkedHashSet linkedHashSet = new LinkedHashSet();
        LinkedHashSet linkedHashSet2 = new LinkedHashSet();
        LinkedHashSet linkedHashSet3 = new LinkedHashSet();
        Connection createConnection = this.connectionFactory.createConnection();
        Session createSession = createConnection.createSession(true, 1);
        createSession.createConsumer(createSession.createQueue("Destination")).setMessageListener(new MessageListener() { // from class: org.apache.activemq.usecases.NonBlockingConsumerRedeliveryTest.4
            public void onMessage(Message message) {
                linkedHashSet.add(message);
            }
        });
        sendMessages();
        createConnection.start();
        Assert.assertTrue("Pre-Rollback expects to receive: 100 messages.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.NonBlockingConsumerRedeliveryTest.5
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                NonBlockingConsumerRedeliveryTest.LOG.info("Consumer has received " + linkedHashSet.size() + " messages.");
                return linkedHashSet.size() == 100;
            }
        }));
        linkedHashSet2.addAll(linkedHashSet);
        linkedHashSet.clear();
        createSession.rollback();
        sendMessages();
        Assert.assertTrue("Post-Rollback expects to receive: 100 messages.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.NonBlockingConsumerRedeliveryTest.6
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                NonBlockingConsumerRedeliveryTest.LOG.info("Consumer has received " + linkedHashSet.size() + " messages since rollback.");
                return linkedHashSet.size() == 200;
            }
        }));
        linkedHashSet3.addAll(linkedHashSet);
        linkedHashSet.clear();
        Assert.assertEquals(linkedHashSet2.size() * 2, linkedHashSet3.size());
        createSession.commit();
    }

    @Test
    public void testNonBlockingMessageDeleiveryIsDelayed() throws Exception {
        final LinkedHashSet linkedHashSet = new LinkedHashSet();
        ActiveMQConnection createConnection = this.connectionFactory.createConnection();
        createConnection.getRedeliveryPolicy().setInitialRedeliveryDelay(TimeUnit.SECONDS.toMillis(6L));
        Session createSession = createConnection.createSession(true, 1);
        createSession.createConsumer(createSession.createQueue("Destination")).setMessageListener(new MessageListener() { // from class: org.apache.activemq.usecases.NonBlockingConsumerRedeliveryTest.7
            public void onMessage(Message message) {
                linkedHashSet.add(message);
            }
        });
        sendMessages();
        createConnection.start();
        Assert.assertTrue("Pre-Rollback expects to receive: 100 messages.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.NonBlockingConsumerRedeliveryTest.8
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                NonBlockingConsumerRedeliveryTest.LOG.info("Consumer has received " + linkedHashSet.size() + " messages.");
                return linkedHashSet.size() == 100;
            }
        }));
        linkedHashSet.clear();
        createSession.rollback();
        Assert.assertFalse("Delayed redelivery test not expecting any messages yet.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.NonBlockingConsumerRedeliveryTest.9
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return linkedHashSet.size() > 0;
            }
        }, TimeUnit.SECONDS.toMillis(4L)));
        createSession.commit();
        createSession.close();
    }

    @Test
    public void testNonBlockingMessageDeleiveryWithRollbacks() throws Exception {
        final LinkedHashSet linkedHashSet = new LinkedHashSet();
        ActiveMQConnection createConnection = this.connectionFactory.createConnection();
        final Session createSession = createConnection.createSession(true, 1);
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue("Destination"));
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.usecases.NonBlockingConsumerRedeliveryTest.10
            public void onMessage(Message message) {
                linkedHashSet.add(message);
            }
        });
        sendMessages();
        createConnection.start();
        Assert.assertTrue("Pre-Rollback expects to receive: 100 messages.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.NonBlockingConsumerRedeliveryTest.11
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                NonBlockingConsumerRedeliveryTest.LOG.info("Consumer has received " + linkedHashSet.size() + " messages.");
                return linkedHashSet.size() == 100;
            }
        }));
        linkedHashSet.clear();
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.usecases.NonBlockingConsumerRedeliveryTest.12
            int count = 0;

            public void onMessage(Message message) {
                int i = this.count + 1;
                this.count = i;
                if (i <= 10) {
                    linkedHashSet.add(message);
                    try {
                        createSession.commit();
                        return;
                    } catch (JMSException e) {
                        NonBlockingConsumerRedeliveryTest.LOG.warn("Caught an unexcepted exception: " + e.getMessage());
                        return;
                    }
                }
                try {
                    createSession.rollback();
                    NonBlockingConsumerRedeliveryTest.LOG.info("Rolling back session.");
                    this.count = 0;
                } catch (JMSException e2) {
                    NonBlockingConsumerRedeliveryTest.LOG.warn("Caught an unexcepted exception: " + e2.getMessage());
                }
            }
        });
        createSession.rollback();
        Assert.assertTrue("Post-Rollback expects to receive: 100 messages.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.NonBlockingConsumerRedeliveryTest.13
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                NonBlockingConsumerRedeliveryTest.LOG.info("Consumer has received " + linkedHashSet.size() + " messages since rollback.");
                return linkedHashSet.size() == 100;
            }
        }));
        Assert.assertEquals(100L, linkedHashSet.size());
        createSession.commit();
    }

    @Test
    public void testNonBlockingMessageDeleiveryWithAllRolledBack() throws Exception {
        final LinkedHashSet linkedHashSet = new LinkedHashSet();
        final LinkedHashSet linkedHashSet2 = new LinkedHashSet();
        ActiveMQConnection createConnection = this.connectionFactory.createConnection();
        createConnection.getRedeliveryPolicy().setMaximumRedeliveries(5);
        final Session createSession = createConnection.createSession(true, 1);
        Queue createQueue = createSession.createQueue("Destination");
        Queue createQueue2 = createSession.createQueue("ActiveMQ.DLQ");
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        createSession.createConsumer(createQueue2).setMessageListener(new MessageListener() { // from class: org.apache.activemq.usecases.NonBlockingConsumerRedeliveryTest.14
            public void onMessage(Message message) {
                linkedHashSet2.add(message);
            }
        });
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.usecases.NonBlockingConsumerRedeliveryTest.15
            public void onMessage(Message message) {
                linkedHashSet.add(message);
            }
        });
        sendMessages();
        createConnection.start();
        Assert.assertTrue("Pre-Rollback expects to receive: 100 messages.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.NonBlockingConsumerRedeliveryTest.16
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                NonBlockingConsumerRedeliveryTest.LOG.info("Consumer has received " + linkedHashSet.size() + " messages.");
                return linkedHashSet.size() == 100;
            }
        }));
        createSession.rollback();
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.usecases.NonBlockingConsumerRedeliveryTest.17
            public void onMessage(Message message) {
                try {
                    createSession.rollback();
                } catch (JMSException e) {
                    NonBlockingConsumerRedeliveryTest.LOG.warn("Caught an unexcepted exception: " + e.getMessage());
                }
            }
        });
        Assert.assertTrue("Post-Rollback expects to DLQ: 100 messages.", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.usecases.NonBlockingConsumerRedeliveryTest.18
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                NonBlockingConsumerRedeliveryTest.LOG.info("Consumer has received " + linkedHashSet2.size() + " messages in DLQ.");
                return linkedHashSet2.size() == 100;
            }
        }));
        createSession.commit();
    }

    private void sendMessages() throws Exception {
        Session createSession = this.connectionFactory.createConnection().createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue("Destination"));
        for (int i = 0; i < 100; i++) {
            createProducer.send(createSession.createTextMessage("" + i));
        }
    }

    @Before
    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.setPersistent(false);
        this.broker.setUseJmx(false);
        this.broker.addConnector("tcp://0.0.0.0:0");
        this.broker.start();
        this.broker.waitUntilStarted();
        this.connectionUri = ((TransportConnector) this.broker.getTransportConnectors().get(0)).getPublishableConnectString();
        this.connectionFactory = new ActiveMQConnectionFactory(this.connectionUri);
        this.connectionFactory.setNonBlockingRedelivery(true);
        RedeliveryPolicy redeliveryPolicy = this.connectionFactory.getRedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(TimeUnit.SECONDS.toMillis(2L));
        redeliveryPolicy.setBackOffMultiplier(-1.0d);
        redeliveryPolicy.setRedeliveryDelay(TimeUnit.SECONDS.toMillis(2L));
        redeliveryPolicy.setMaximumRedeliveryDelay(-1L);
        redeliveryPolicy.setUseExponentialBackOff(false);
        redeliveryPolicy.setMaximumRedeliveries(-1);
    }

    @After
    public void stopBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }
}
