/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.advisory;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TemporaryTopic;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.advisory.ProducerEvent;
import org.apache.activemq.advisory.ProducerEventSource;
import org.apache.activemq.advisory.ProducerListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerListenerTest
extends EmbeddedBrokerTestSupport
implements ProducerListener {
    private static final Logger LOG = LoggerFactory.getLogger(ProducerListenerTest.class);
    protected Session consumerSession1;
    protected Session consumerSession2;
    protected int consumerCounter;
    protected ProducerEventSource producerEventSource;
    protected BlockingQueue<ProducerEvent> eventQueue = new ArrayBlockingQueue<ProducerEvent>(1000);
    private Connection connection;

    public void testProducerEvents() throws Exception {
        this.producerEventSource.start();
        this.consumerSession1 = this.createProducer();
        this.assertProducerEvent(1, true);
        this.consumerSession2 = this.createProducer();
        this.assertProducerEvent(2, true);
        this.consumerSession1.close();
        this.consumerSession1 = null;
        this.assertProducerEvent(1, false);
        this.consumerSession2.close();
        this.consumerSession2 = null;
        this.assertProducerEvent(0, false);
    }

    public void testListenWhileAlreadyConsumersActive() throws Exception {
        this.consumerSession1 = this.createProducer();
        this.consumerSession2 = this.createProducer();
        this.producerEventSource.start();
        this.assertProducerEvent(2, true);
        this.assertProducerEvent(2, true);
        this.consumerSession1.close();
        this.consumerSession1 = null;
        this.assertProducerEvent(1, false);
        this.consumerSession2.close();
        this.consumerSession2 = null;
        this.assertProducerEvent(0, false);
    }

    public void testConsumerEventsOnTemporaryDestination() throws Exception {
        Session s = this.connection.createSession(true, 1);
        TemporaryTopic dest = this.useTopic ? s.createTemporaryTopic() : s.createTemporaryQueue();
        this.producerEventSource = new ProducerEventSource(this.connection, (Destination)dest);
        this.producerEventSource.setProducerListener((ProducerListener)this);
        this.producerEventSource.start();
        MessageProducer producer = s.createProducer((Destination)dest);
        this.assertProducerEvent(1, true);
        producer.close();
        this.assertProducerEvent(0, false);
    }

    public void onProducerEvent(ProducerEvent event) {
        this.eventQueue.add(event);
    }

    @Override
    protected void setUp() throws Exception {
        super.setUp();
        this.connection = this.createConnection();
        this.connection.start();
        this.producerEventSource = new ProducerEventSource(this.connection, (Destination)this.destination);
        this.producerEventSource.setProducerListener((ProducerListener)this);
    }

    @Override
    protected void tearDown() throws Exception {
        if (this.producerEventSource != null) {
            this.producerEventSource.stop();
        }
        if (this.consumerSession2 != null) {
            this.consumerSession2.close();
        }
        if (this.consumerSession1 != null) {
            this.consumerSession1.close();
        }
        if (this.connection != null) {
            this.connection.close();
        }
        super.tearDown();
    }

    protected void assertProducerEvent(int count, boolean started) throws InterruptedException {
        ProducerEvent event = this.waitForProducerEvent();
        ProducerListenerTest.assertEquals((String)"Producer count", (int)count, (int)event.getProducerCount());
        ProducerListenerTest.assertEquals((String)"started", (boolean)started, (boolean)event.isStarted());
    }

    protected Session createProducer() throws JMSException {
        String consumerText = "Consumer: " + ++this.consumerCounter;
        LOG.info("Creating consumer: " + consumerText + " on destination: " + this.destination);
        Session answer = this.connection.createSession(false, 1);
        MessageProducer producer = answer.createProducer((Destination)this.destination);
        ProducerListenerTest.assertNotNull((Object)producer);
        return answer;
    }

    protected ProducerEvent waitForProducerEvent() throws InterruptedException {
        ProducerEvent answer = this.eventQueue.poll(100000L, TimeUnit.MILLISECONDS);
        ProducerListenerTest.assertTrue((String)"Should have received a consumer event!", (answer != null ? 1 : 0) != 0);
        return answer;
    }
}

