package org.apache.activemq.bugs;

import java.io.IOException;
import java.lang.Thread;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ2183Test.class */
public class AMQ2183Test extends AutoFailTestSupport implements Thread.UncaughtExceptionHandler, MessageListener {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ2183Test.class);
    private static final int maxSent = 2000;
    private final Map<Thread, Throwable> exceptions = new ConcurrentHashMap();
    BrokerService master = new BrokerService();
    BrokerService slave = new BrokerService();
    URI masterUrl;
    URI slaveUrl;

    /* loaded from: input_file:org/apache/activemq/bugs/AMQ2183Test$MessageCounter.class */
    class MessageCounter implements MessageListener {
        int count = 0;

        MessageCounter() {
        }

        public void onMessage(Message message) {
            this.count++;
        }

        int getCount() {
            return this.count;
        }
    }

    public void onException(JMSException jMSException) {
        this.exceptions.put(Thread.currentThread(), jMSException);
    }

    @Override // org.apache.activemq.AutoFailTestSupport
    public void setUp() throws Exception {
        setAutoFail(true);
        super.setUp();
        this.master = new BrokerService();
        this.slave = new BrokerService();
        this.master.setBrokerName("Master");
        this.master.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        this.master.deleteAllMessages();
        this.master.setWaitForSlave(true);
        new Thread() { // from class: org.apache.activemq.bugs.AMQ2183Test.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    AMQ2183Test.this.master.start();
                } catch (Exception e) {
                    e.printStackTrace();
                    AMQ2183Test.this.exceptions.put(Thread.currentThread(), e);
                }
            }
        }.start();
        Thread.sleep(2000L);
        this.masterUrl = ((TransportConnector) this.master.getTransportConnectors().get(0)).getConnectUri();
    }

    private void startSlave() throws IOException, Exception, URISyntaxException {
        this.slave.setBrokerName("Slave");
        this.slave.deleteAllMessages();
        this.slave.addConnector(JmsMultipleBrokersTestSupport.AUTO_ASSIGN_TRANSPORT);
        this.slave.setMasterConnectorURI(this.masterUrl.toString());
        this.slave.start();
        this.slaveUrl = ((TransportConnector) this.slave.getTransportConnectors().get(0)).getConnectUri();
    }

    @Override // org.apache.activemq.AutoFailTestSupport
    public void tearDown() throws Exception {
        this.master.stop();
        this.slave.stop();
        this.exceptions.clear();
    }

    public void testMasterSlaveBugWithStopStartConsumers() throws Exception {
        Thread.setDefaultUncaughtExceptionHandler(this);
        final Connection createConnection = new ActiveMQConnectionFactory("failover:(" + this.masterUrl + ")?randomize=false").createConnection();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Executors.newSingleThreadExecutor().execute(new Runnable() { // from class: org.apache.activemq.bugs.AMQ2183Test.2
            @Override // java.lang.Runnable
            public void run() {
                countDownLatch.countDown();
                try {
                    createConnection.start();
                    countDownLatch2.countDown();
                } catch (Exception e) {
                    AMQ2183Test.this.exceptions.put(Thread.currentThread(), e);
                }
            }
        });
        assertTrue("connection.start has commenced", countDownLatch.await(10L, TimeUnit.SECONDS));
        startSlave();
        assertTrue("connection.start done", countDownLatch2.await(70L, TimeUnit.SECONDS));
        final MessageCounter messageCounter = new MessageCounter();
        createConnection.createSession(false, 1).createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.T")).setMessageListener(messageCounter);
        final MessageCounter messageCounter2 = new MessageCounter();
        createConnection.createSession(false, 1).createConsumer(new ActiveMQQueue("Consumer.B.VirtualTopic.T")).setMessageListener(messageCounter2);
        Thread.sleep(2000L);
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(new ActiveMQTopic("VirtualTopic.T"));
        for (int i = 0; i < 2000; i++) {
            createProducer.send(createSession.createTextMessage("Hi" + i));
        }
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.bugs.AMQ2183Test.3
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return 2000 == messageCounter.getCount() && 2000 == messageCounter2.getCount();
            }
        });
        assertEquals(2000, messageCounter.getCount());
        assertEquals(2000, messageCounter2.getCount());
        assertTrue(this.exceptions.isEmpty());
    }

    @Override // java.lang.Thread.UncaughtExceptionHandler
    public void uncaughtException(Thread thread, Throwable th) {
        this.exceptions.put(thread, th);
    }

    public void onMessage(Message message) {
        LOG.info("message received: " + message);
    }
}
