package org.apache.activemq.bugs;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.NamingException;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.Wait;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ1936Test.class */
public class AMQ1936Test extends TestCase {
    private static final String TEST_QUEUE_NAME = "dynamicQueues/duplicate.message.test.queue";
    private static final long TEST_MESSAGE_COUNT = 6000;
    private static final int CONSUMER_COUNT = 2;
    private static final boolean TRANSACTED_RECEIVE = true;
    private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 2, Long.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue());
    private ThreadedMessageReceiver[] receivers = new ThreadedMessageReceiver[2];
    private BrokerService broker = null;
    private static final Logger logger = Logger.getLogger(AMQ1936Test.class);
    static QueueConnectionFactory connectionFactory = null;

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ1936Test$IMessageHandler.class */
    public interface IMessageHandler {
        void onMessage(Message message) throws Exception;
    }

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ1936Test$ThreadedMessageReceiver.class */
    private static final class ThreadedMessageReceiver implements Runnable {
        private String queueName;
        private IMessageHandler handler;
        private AtomicBoolean shouldStop = new AtomicBoolean(false);

        public ThreadedMessageReceiver(String str, IMessageHandler iMessageHandler) {
            this.queueName = null;
            this.handler = null;
            this.queueName = str;
            this.handler = iMessageHandler;
        }

        @Override // java.lang.Runnable
        public void run() {
            QueueConnection queueConnection = null;
            QueueSession queueSession = null;
            QueueReceiver queueReceiver = null;
            Message message = null;
            try {
                try {
                    QueueConnection createQueueConnection = AMQ1936Test.connectionFactory.createQueueConnection();
                    QueueSession createQueueSession = createQueueConnection.createQueueSession(true, 1);
                    QueueReceiver createReceiver = createQueueSession.createReceiver(createQueueSession.createQueue(AMQ1936Test.TEST_QUEUE_NAME));
                    createQueueConnection.start();
                    AMQ1936Test.logger.info("Receiver " + Thread.currentThread().getName() + " connected.");
                    while (!this.shouldStop.get() && !Thread.currentThread().isInterrupted()) {
                        try {
                            message = createReceiver.receive(200L);
                        } catch (Exception e) {
                            if (!(e instanceof InterruptedException) && !(e.getCause() instanceof InterruptedException)) {
                                throw e;
                            }
                        }
                        if (message != null && this.handler != null) {
                            this.handler.onMessage(message);
                        }
                        if (createQueueSession.getTransacted()) {
                            createQueueSession.commit();
                        }
                    }
                    AMQ1936Test.logger.info("Receiver " + Thread.currentThread().getName() + " shutting down.");
                    if (createReceiver != null) {
                        try {
                            createReceiver.close();
                        } catch (JMSException e2) {
                            AMQ1936Test.logger.warn(e2);
                        }
                    }
                    if (createQueueSession != null) {
                        try {
                            createQueueSession.close();
                        } catch (JMSException e3) {
                            AMQ1936Test.logger.warn(e3);
                        }
                    }
                    if (createQueueConnection != null) {
                        createQueueConnection.close();
                    }
                } catch (Throwable th) {
                    if (0 != 0) {
                        try {
                            queueReceiver.close();
                        } catch (JMSException e4) {
                            AMQ1936Test.logger.warn(e4);
                        }
                    }
                    if (0 != 0) {
                        try {
                            queueSession.close();
                        } catch (JMSException e5) {
                            AMQ1936Test.logger.warn(e5);
                        }
                    }
                    if (0 != 0) {
                        queueConnection.close();
                    }
                    throw th;
                }
            } catch (Exception e6) {
                AMQ1936Test.logger.error(e6);
                e6.printStackTrace();
            } catch (JMSException e7) {
                AMQ1936Test.logger.error(e7);
                e7.printStackTrace();
            } catch (NamingException e8) {
                AMQ1936Test.logger.error(e8);
            }
        }

        public Boolean getShouldStop() {
            return Boolean.valueOf(this.shouldStop.get());
        }

        public void setShouldStop(Boolean bool) {
            this.shouldStop.set(bool.booleanValue());
        }
    }

    protected void setUp() throws Exception {
        super.setUp();
        this.broker = new BrokerService();
        this.broker.getSystemUsage().getMemoryUsage().setLimit(5242880L);
        this.broker.setBrokerName("test");
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.start();
        connectionFactory = new ActiveMQConnectionFactory("vm://test");
    }

    protected void tearDown() throws Exception {
        super.tearDown();
        if (this.threadPool != null) {
            for (ThreadedMessageReceiver threadedMessageReceiver : this.receivers) {
                threadedMessageReceiver.setShouldStop(true);
            }
            logger.info("Waiting for receivers to shutdown..");
            if (this.threadPool.awaitTermination(10L, TimeUnit.SECONDS)) {
                logger.info("All receivers shutdown successfully..");
            } else {
                logger.warn("Not all receivers completed shutdown.");
            }
        }
        logger.debug("Stoping the broker.");
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    private void sendTextMessage(String str, int i) throws JMSException, NamingException {
        QueueConnection queueConnection = null;
        QueueSession queueSession = null;
        QueueSender queueSender = null;
        try {
            queueConnection = new ActiveMQConnectionFactory("vm://test").createQueueConnection();
            queueSession = queueConnection.createQueueSession(false, 1);
            Queue createQueue = queueSession.createQueue(TEST_QUEUE_NAME);
            queueSender = queueSession.createSender(createQueue);
            queueSender.setDeliveryMode(2);
            TextMessage createTextMessage = queueSession.createTextMessage(String.valueOf(i));
            queueSender.send(createTextMessage);
            if (queueSession.getTransacted()) {
                queueSession.commit();
            }
            if (i % 1000 == 0) {
                logger.info("Message successfully sent to : " + createQueue.getQueueName() + " messageid: " + createTextMessage.getJMSMessageID() + " content:" + createTextMessage.getText());
            }
            if (queueSender != null) {
                queueSender.close();
            }
            if (queueSession != null) {
                queueSession.close();
            }
            if (queueConnection != null) {
                queueConnection.close();
            }
        } catch (Throwable th) {
            if (queueSender != null) {
                queueSender.close();
            }
            if (queueSession != null) {
                queueSession.close();
            }
            if (queueConnection != null) {
                queueConnection.close();
            }
            throw th;
        }
    }

    public void testForDuplicateMessages() throws Exception {
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        final Object obj = new Object();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        for (int i = 0; i < 3000; i++) {
            if (countDownLatch.getCount() == 0) {
                fail("Duplicate message id detected");
            }
            sendTextMessage(TEST_QUEUE_NAME, i);
        }
        for (int i2 = 0; i2 < 2; i2++) {
            this.receivers[i2] = new ThreadedMessageReceiver(TEST_QUEUE_NAME, new IMessageHandler() { // from class: org.apache.activemq.bugs.AMQ1936Test.1
                @Override // org.apache.activemq.bugs.AMQ1936Test.IMessageHandler
                public void onMessage(Message message) throws Exception {
                    synchronized (obj) {
                        if (atomicInteger.incrementAndGet() % 1000 == 0) {
                            AMQ1936Test.logger.info("Received message:" + message.getJMSMessageID() + " with content: " + ((TextMessage) message).getText());
                        }
                        if (concurrentHashMap.containsKey(message.getJMSMessageID())) {
                            countDownLatch.countDown();
                            AMQ1936Test.logger.fatal("duplicate message id detected:" + message.getJMSMessageID());
                            Assert.fail("Duplicate message id detected:" + message.getJMSMessageID());
                        } else {
                            concurrentHashMap.put(message.getJMSMessageID(), message.getJMSMessageID());
                        }
                    }
                }
            });
            this.threadPool.submit(this.receivers[i2]);
        }
        for (int i3 = 0; i3 < 3000; i3++) {
            if (countDownLatch.getCount() == 0) {
                fail("Duplicate message id detected");
            }
            sendTextMessage(TEST_QUEUE_NAME, i3);
        }
        logger.info("sent all 6000 messages");
        if (!Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ1936Test.2
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return AMQ1936Test.TEST_MESSAGE_COUNT == ((long) concurrentHashMap.size());
            }
        }, 60000L)) {
            AutoFailTestSupport.dumpAllThreads("--STUCK?--");
        }
        assertEquals("Number of messages received does not match the number sent", TEST_MESSAGE_COUNT, concurrentHashMap.size());
        assertEquals(TEST_MESSAGE_COUNT, atomicInteger.get());
    }
}
