package org.apache.activemq.bugs;

import java.lang.Thread;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ4092Test.class */
public class AMQ4092Test extends TestCase {
    static final String QUEUE_NAME = "TEST";
    static final int NUM_TO_SEND_PER_PRODUCER = 1000;
    static final int NUM_PRODUCERS = 5;
    static final boolean debug = false;
    private BrokerService brokerService;
    private ActiveMQQueue destination;
    private HashMap<Thread, Throwable> exceptions = new HashMap<>();
    private ExceptionListener exceptionListener = new ExceptionListener() { // from class: org.apache.activemq.bugs.AMQ4092Test.1
        public void onException(JMSException jMSException) {
            jMSException.printStackTrace();
            AMQ4092Test.this.exceptions.put(Thread.currentThread(), jMSException);
        }
    };
    private static final Logger log = LoggerFactory.getLogger(AMQ4092Test.class);
    static final ActiveMQQueue[] DESTINATIONS = {new ActiveMQQueue("A"), new ActiveMQQueue("B")};

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ4092Test$TestConsumer.class */
    class TestConsumer implements Runnable {
        private CountDownLatch finishLatch = new CountDownLatch(1);

        TestConsumer() {
        }

        public void consume() throws Exception {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(((TransportConnector) AMQ4092Test.this.brokerService.getTransportConnectors().get(0)).getConnectUri().toString());
            activeMQConnectionFactory.setExceptionListener(AMQ4092Test.this.exceptionListener);
            final int length = 1000 * AMQ4092Test.DESTINATIONS.length * 5;
            final AtomicInteger atomicInteger = new AtomicInteger();
            MessageListener messageListener = new MessageListener() { // from class: org.apache.activemq.bugs.AMQ4092Test.TestConsumer.1
                public void onMessage(Message message) {
                    boolean z = false;
                    try {
                        z = message.getBooleanProperty("JMSXGroupFirstForConsumer");
                    } catch (JMSException e) {
                        e.printStackTrace();
                        AMQ4092Test.this.exceptions.put(Thread.currentThread(), e);
                    }
                    TestCase.assertTrue("Always is first message", z);
                    if (atomicInteger.incrementAndGet() == length) {
                        AMQ4092Test.log.info("Got all:" + atomicInteger.get());
                        TestConsumer.this.finishLatch.countDown();
                    }
                }
            };
            int length2 = AMQ4092Test.DESTINATIONS.length * 100;
            Connection[] connectionArr = new Connection[length2];
            Session[] sessionArr = new Session[length2];
            MessageConsumer[] messageConsumerArr = new MessageConsumer[length2];
            for (int i = 0; i < length2; i++) {
                connectionArr[i] = activeMQConnectionFactory.createConnection();
                connectionArr[i].start();
                sessionArr[i] = connectionArr[i].createSession(false, 1);
                messageConsumerArr[i] = sessionArr[i].createConsumer(AMQ4092Test.DESTINATIONS[i % AMQ4092Test.DESTINATIONS.length], (String) null);
                messageConsumerArr[i].setMessageListener(messageListener);
            }
            AMQ4092Test.log.info("received " + atomicInteger.get() + " messages");
            TestCase.assertTrue("got all messages in time", this.finishLatch.await(4L, TimeUnit.MINUTES));
            AMQ4092Test.log.info("received " + atomicInteger.get() + " messages");
            for (MessageConsumer messageConsumer : messageConsumerArr) {
                messageConsumer.close();
            }
            for (Session session : sessionArr) {
                session.close();
            }
            for (Connection connection : connectionArr) {
                connection.close();
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                consume();
            } catch (Exception e) {
                e.printStackTrace();
                AMQ4092Test.this.exceptions.put(Thread.currentThread(), e);
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ4092Test$TestProducer.class */
    class TestProducer implements Runnable {
        TestProducer() {
        }

        public void produceMessages() throws Exception {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(((TransportConnector) AMQ4092Test.this.brokerService.getTransportConnectors().get(0)).getConnectUri().toString());
            activeMQConnectionFactory.setExceptionListener(AMQ4092Test.this.exceptionListener);
            activeMQConnectionFactory.setUseAsyncSend(true);
            Connection createConnection = activeMQConnectionFactory.createConnection();
            Session createSession = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(AMQ4092Test.this.destination);
            createProducer.setDeliveryMode(2);
            String str = new String(new byte[2048]);
            for (int i = 1; i <= 1000; i++) {
                TextMessage createTextMessage = createSession.createTextMessage(str + "_" + i);
                for (int i2 = 0; i2 < 100; i2++) {
                    createTextMessage.setStringProperty("Prop" + i2, "" + i2);
                }
                createTextMessage.setStringProperty("JMSXGroupID", Thread.currentThread().getName() + i);
                createTextMessage.setIntProperty("JMSXGroupSeq", 1);
                createProducer.send(createTextMessage);
            }
            createProducer.close();
            createSession.close();
            createConnection.close();
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                produceMessages();
            } catch (Exception e) {
                e.printStackTrace();
                AMQ4092Test.this.exceptions.put(Thread.currentThread(), e);
            }
        }
    }

    protected void setUp() throws Exception {
        this.brokerService = new BrokerService();
        this.brokerService.setDeleteAllMessagesOnStartup(true);
        this.brokerService.getPersistenceAdapter().setConcurrentStoreAndDispatchQueues(false);
        this.brokerService.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        this.brokerService.start();
        this.destination = new ActiveMQQueue();
        this.destination.setCompositeDestinations(DESTINATIONS);
        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: org.apache.activemq.bugs.AMQ4092Test.2
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                AMQ4092Test.this.exceptions.put(thread, th);
            }
        });
    }

    protected void tearDown() throws Exception {
        this.brokerService.stop();
    }

    public void testConcurrentGroups() throws Exception {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        newCachedThreadPool.submit(new TestConsumer());
        for (int i = 0; i < 5; i++) {
            newCachedThreadPool.submit(new TestProducer());
        }
        newCachedThreadPool.shutdown();
        newCachedThreadPool.awaitTermination(5L, TimeUnit.MINUTES);
        assertTrue("no exceptions: " + this.exceptions, this.exceptions.isEmpty());
    }
}
