package org.apache.activemq.advisory;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.transport.nio.NIOSSLLoadTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/advisory/ProducerListenerTest.class */
public class ProducerListenerTest extends EmbeddedBrokerTestSupport implements ProducerListener {
    private static final Logger LOG = LoggerFactory.getLogger(ProducerListenerTest.class);
    protected Session consumerSession1;
    protected Session consumerSession2;
    protected int consumerCounter;
    protected ProducerEventSource producerEventSource;
    protected BlockingQueue<ProducerEvent> eventQueue = new ArrayBlockingQueue(NIOSSLLoadTest.MESSAGE_COUNT);
    private Connection connection;

    public void testProducerEvents() throws Exception {
        this.producerEventSource.start();
        this.consumerSession1 = createProducer();
        assertConsumerEvent(1, true);
        this.consumerSession2 = createProducer();
        assertConsumerEvent(2, true);
        this.consumerSession1.close();
        this.consumerSession1 = null;
        assertConsumerEvent(1, false);
        this.consumerSession2.close();
        this.consumerSession2 = null;
        assertConsumerEvent(0, false);
    }

    public void testListenWhileAlreadyConsumersActive() throws Exception {
        this.consumerSession1 = createProducer();
        this.consumerSession2 = createProducer();
        this.producerEventSource.start();
        assertConsumerEvent(2, true);
        assertConsumerEvent(2, true);
        this.consumerSession1.close();
        this.consumerSession1 = null;
        assertConsumerEvent(1, false);
        this.consumerSession2.close();
        this.consumerSession2 = null;
        assertConsumerEvent(0, false);
    }

    public void onProducerEvent(ProducerEvent producerEvent) {
        this.eventQueue.add(producerEvent);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.EmbeddedBrokerTestSupport, org.apache.activemq.AutoFailTestSupport
    public void setUp() throws Exception {
        super.setUp();
        this.connection = createConnection();
        this.connection.start();
        this.producerEventSource = new ProducerEventSource(this.connection, this.destination);
        this.producerEventSource.setProducerListener(this);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.EmbeddedBrokerTestSupport, org.apache.activemq.AutoFailTestSupport
    public void tearDown() throws Exception {
        if (this.producerEventSource != null) {
            this.producerEventSource.stop();
        }
        if (this.consumerSession2 != null) {
            this.consumerSession2.close();
        }
        if (this.consumerSession1 != null) {
            this.consumerSession1.close();
        }
        if (this.connection != null) {
            this.connection.close();
        }
        super.tearDown();
    }

    protected void assertConsumerEvent(int i, boolean z) throws InterruptedException {
        ProducerEvent waitForProducerEvent = waitForProducerEvent();
        assertEquals("Producer count", i, waitForProducerEvent.getProducerCount());
        assertEquals("started", z, waitForProducerEvent.isStarted());
    }

    protected Session createProducer() throws JMSException {
        StringBuilder append = new StringBuilder().append("Consumer: ");
        int i = this.consumerCounter + 1;
        this.consumerCounter = i;
        LOG.info("Creating consumer: " + append.append(i).toString() + " on destination: " + this.destination);
        Session createSession = this.connection.createSession(false, 1);
        createSession.createProducer(this.destination);
        return createSession;
    }

    protected ProducerEvent waitForProducerEvent() throws InterruptedException {
        ProducerEvent poll = this.eventQueue.poll(100000L, TimeUnit.MILLISECONDS);
        assertTrue("Should have received a consumer event!", poll != null);
        return poll;
    }
}
