package org.apache.activemq.bugs;

import java.net.URI;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.transport.nio.NIOSSLLoadTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ1853Test.class */
public class AMQ1853Test {
    private static BrokerService broker;
    private static final Logger LOG = LoggerFactory.getLogger(AMQ1853Test.class);
    static final String jmsConnectionURI = "failover:(vm://localhost)";
    private static final String queueFail = "Queue.BlockingConsumer.QueueFail";
    private final int producerMessages = 5;
    private final int totalNumberMessages = 10;
    private final int maxRedeliveries = 2;
    private final int redeliveryDelay = NIOSSLLoadTest.MESSAGE_COUNT;
    private Map<String, AtomicInteger> messageList = null;

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ1853Test$TestConsumer.class */
    private class TestConsumer implements Runnable, ExceptionListener, MessageListener {
        private CountDownLatch latch;
        private boolean bFakeFail;
        String destinationName;
        private int receivedMessageCounter = 0;
        boolean bMessageReceiptIsOrdered = true;
        boolean bStop = false;
        String previousMessageId = null;
        private ActiveMQConnectionFactory connectionFactory = null;
        private ActiveMQConnection connection = null;
        private Session session = null;
        private MessageConsumer consumer = null;

        public TestConsumer(String str, boolean z) {
            this.latch = null;
            this.bFakeFail = false;
            this.destinationName = null;
            this.bFakeFail = z;
            this.latch = new CountDownLatch(10 * (this.bFakeFail ? 3 : 1));
            this.destinationName = str;
        }

        public CountDownLatch getLatch() {
            return this.latch;
        }

        public boolean messageReceiptIsOrdered() {
            return this.bMessageReceiptIsOrdered;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    AMQ1853Test.LOG.info("Started TestConsumer for destination (" + this.destinationName + ")");
                    this.connectionFactory = new ActiveMQConnectionFactory(AMQ1853Test.jmsConnectionURI);
                    this.connection = this.connectionFactory.createConnection();
                    this.connection.setNonBlockingRedelivery(true);
                    this.session = this.connection.createSession(true, 0);
                    RedeliveryPolicy redeliveryPolicy = this.connection.getRedeliveryPolicy();
                    redeliveryPolicy.setInitialRedeliveryDelay(1000L);
                    redeliveryPolicy.setBackOffMultiplier(-1.0d);
                    redeliveryPolicy.setRedeliveryDelay(1000L);
                    redeliveryPolicy.setMaximumRedeliveryDelay(-1L);
                    redeliveryPolicy.setUseExponentialBackOff(false);
                    redeliveryPolicy.setMaximumRedeliveries(2);
                    this.connection.setExceptionListener(this);
                    this.consumer = this.session.createConsumer(this.session.createQueue(this.destinationName));
                    this.consumer.setMessageListener(this);
                    this.connection.start();
                    while (!this.bStop) {
                        Thread.sleep(100L);
                    }
                    AMQ1853Test.LOG.info("Finished TestConsumer for destination name (" + this.destinationName + ") remaining " + this.latch.getCount() + " messages " + toString());
                    try {
                        if (this.consumer != null) {
                            this.consumer.close();
                        }
                        if (this.session != null) {
                            this.session.close();
                        }
                        if (this.connection != null) {
                            this.connection.close();
                        }
                    } catch (Exception e) {
                        AMQ1853Test.LOG.error("Closing connection/session (" + this.destinationName + ")Caught: " + e);
                    }
                } catch (Exception e2) {
                    AMQ1853Test.LOG.error("Consumer (" + this.destinationName + ") Caught: " + e2);
                    try {
                        if (this.consumer != null) {
                            this.consumer.close();
                        }
                        if (this.session != null) {
                            this.session.close();
                        }
                        if (this.connection != null) {
                            this.connection.close();
                        }
                    } catch (Exception e3) {
                        AMQ1853Test.LOG.error("Closing connection/session (" + this.destinationName + ")Caught: " + e3);
                    }
                }
            } catch (Throwable th) {
                try {
                    if (this.consumer != null) {
                        this.consumer.close();
                    }
                    if (this.session != null) {
                        this.session.close();
                    }
                    if (this.connection != null) {
                        this.connection.close();
                    }
                } catch (Exception e4) {
                    AMQ1853Test.LOG.error("Closing connection/session (" + this.destinationName + ")Caught: " + e4);
                }
                throw th;
            }
        }

        public synchronized void onException(JMSException jMSException) {
            AMQ1853Test.LOG.error("Consumer for destination, (" + this.destinationName + "), JMS Exception occured.  Shutting down client.");
        }

        public synchronized void setStop(boolean z) {
            this.bStop = z;
        }

        public synchronized void onMessage(Message message) {
            this.receivedMessageCounter++;
            this.latch.countDown();
            AMQ1853Test.LOG.info("Consumer for destination (" + this.destinationName + ") latch countdown: " + this.latch.getCount() + " :: Number messages received " + this.receivedMessageCounter);
            try {
                if (this.receivedMessageCounter % 3 == 1) {
                    this.previousMessageId = message.getJMSMessageID();
                }
                if (this.bMessageReceiptIsOrdered) {
                    this.bMessageReceiptIsOrdered = this.previousMessageId.trim().equals(message.getJMSMessageID());
                }
                AtomicInteger atomicInteger = (AtomicInteger) AMQ1853Test.this.messageList.get(message.getJMSMessageID());
                atomicInteger.incrementAndGet();
                AMQ1853Test.LOG.info("Consumer for destination (" + this.destinationName + ")\n" + message.getJMSMessageID() + " = currentMessageId\n" + this.previousMessageId + " = previousMessageId\n" + this.bMessageReceiptIsOrdered + "= bMessageReceiptIsOrdered\n>>LATENCY " + (System.currentTimeMillis() - message.getLongProperty("TestTime")) + "\nmessage counter = " + atomicInteger.get());
                if (this.bFakeFail) {
                    AMQ1853Test.LOG.debug("Consumer on destination " + this.destinationName + " rolling back JMS Session for message: " + message.toString());
                    this.session.rollback();
                } else {
                    AMQ1853Test.LOG.debug("Consumer on destination " + this.destinationName + " committing JMS Session for message: " + message.toString());
                    this.session.commit();
                }
            } catch (JMSException e) {
                e.printStackTrace();
                AMQ1853Test.LOG.error("Error reading JMS Message from destination " + this.destinationName + ".");
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ1853Test$TestProducer.class */
    private class TestProducer implements Runnable {
        private CountDownLatch latch;
        private String destinationName;

        public TestProducer(String str) {
            this.latch = null;
            this.destinationName = null;
            this.destinationName = str;
            this.latch = new CountDownLatch(10);
        }

        public CountDownLatch getLatch() {
            return this.latch;
        }

        @Override // java.lang.Runnable
        public void run() {
            ActiveMQConnection activeMQConnection = null;
            ActiveMQSession activeMQSession = null;
            try {
                try {
                    AMQ1853Test.LOG.info("Started TestProducer for destination (" + this.destinationName + ")");
                    activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(AMQ1853Test.jmsConnectionURI).createConnection();
                    activeMQConnection.setCopyMessageOnSend(false);
                    activeMQConnection.start();
                    activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, 1);
                    ActiveMQMessageProducer createProducer = activeMQSession.createProducer(activeMQSession.createQueue(this.destinationName));
                    createProducer.setDeliveryMode(1);
                    for (int i = 0; i < 5; i++) {
                        TextMessage createTextMessage = activeMQSession.createTextMessage();
                        createTextMessage.setLongProperty("TestTime", System.currentTimeMillis());
                        try {
                            createProducer.send(createTextMessage);
                            AMQ1853Test.LOG.info("Producer (" + this.destinationName + ")\n" + createTextMessage.getJMSMessageID() + " = sent messageId\n");
                            this.latch.countDown();
                            AMQ1853Test.LOG.info(" Latch count  " + this.latch.getCount());
                            AMQ1853Test.LOG.info("Producer message list size = " + AMQ1853Test.this.messageList.keySet().size());
                            AMQ1853Test.this.messageList.put(createTextMessage.getJMSMessageID(), new AtomicInteger(0));
                            AMQ1853Test.LOG.info("Producer message list size = " + AMQ1853Test.this.messageList.keySet().size());
                        } catch (Exception e) {
                            AMQ1853Test.LOG.info("Producer for destination (" + this.destinationName + ") Caught: " + e);
                        }
                        Thread.sleep(1000L);
                    }
                    AMQ1853Test.LOG.info("Finished TestProducer for destination (" + this.destinationName + ")");
                    if (activeMQSession != null) {
                        try {
                            activeMQSession.close();
                        } catch (Exception e2) {
                            AMQ1853Test.LOG.error("Closing connection/session (" + this.destinationName + ")Caught: " + e2);
                            return;
                        }
                    }
                    if (activeMQConnection != null) {
                        activeMQConnection.close();
                    }
                } catch (Throwable th) {
                    if (activeMQSession != null) {
                        try {
                            activeMQSession.close();
                        } catch (Exception e3) {
                            AMQ1853Test.LOG.error("Closing connection/session (" + this.destinationName + ")Caught: " + e3);
                            throw th;
                        }
                    }
                    if (activeMQConnection != null) {
                        activeMQConnection.close();
                    }
                    throw th;
                }
            } catch (Exception e4) {
                AMQ1853Test.LOG.error("Terminating TestProducer(" + this.destinationName + ")Caught: " + e4);
                if (activeMQSession != null) {
                    try {
                        activeMQSession.close();
                    } catch (Exception e5) {
                        AMQ1853Test.LOG.error("Closing connection/session (" + this.destinationName + ")Caught: " + e5);
                        return;
                    }
                }
                if (activeMQConnection != null) {
                    activeMQConnection.close();
                }
            }
        }
    }

    @Before
    public void setUp() throws Exception {
        broker = BrokerFactory.createBroker(new URI("broker:()/localhost?persistent=false"));
        broker.setUseJmx(false);
        broker.start();
        broker.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        if (broker != null) {
            broker.stop();
            broker.waitUntilStopped();
            broker = null;
        }
    }

    @Test
    public void testConsumerMessagesAreNotOrdered() throws Exception {
        TestConsumer testConsumer = null;
        this.messageList = new Hashtable();
        try {
            TestProducer testProducer = new TestProducer(queueFail);
            thread(testProducer, false);
            testConsumer = new TestConsumer(queueFail, true);
            thread(testConsumer, false);
            Thread.sleep(1000L);
            thread(testProducer, false);
            Thread.sleep(1000L);
            testProducer.getLatch().await();
            LOG.info("producer successful, count = " + testProducer.getLatch().getCount());
            Assert.assertTrue("message list size =  " + this.messageList.size(), 10 == this.messageList.size());
            LOG.info("final message list size =  " + this.messageList.size());
            testConsumer.getLatch().await();
            LOG.info("consumerAllFail successful, count = " + testConsumer.getLatch().getCount());
            Iterator<String> it = this.messageList.keySet().iterator();
            for (AtomicInteger atomicInteger : this.messageList.values()) {
                String next = it.next();
                Assert.assertTrue("for message " + next + " counter =  " + atomicInteger.get(), atomicInteger.get() == 3);
                LOG.info("final count for message " + next + " counter =  " + atomicInteger.get());
            }
            Assert.assertFalse(testConsumer.messageReceiptIsOrdered());
            if (testConsumer != null) {
                testConsumer.setStop(true);
            }
        } catch (Throwable th) {
            if (testConsumer != null) {
                testConsumer.setStop(true);
            }
            throw th;
        }
    }

    private static Thread thread(Runnable runnable, boolean z) {
        Thread thread = new Thread(runnable);
        thread.setDaemon(z);
        thread.start();
        return thread;
    }
}
