package org.apache.activemq.broker.interceptor;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.inteceptor.MessageInterceptor;
import org.apache.activemq.broker.inteceptor.MessageInterceptorRegistry;
import org.apache.activemq.command.ActiveMQQueue;

/* loaded from: input_file:org/apache/activemq/broker/interceptor/MessageInterceptorTest.class */
public class MessageInterceptorTest extends TestCase {
    protected BrokerService brokerService;
    protected ActiveMQConnectionFactory factory;
    protected Connection producerConnection;
    protected Connection consumerConnection;
    protected Session consumerSession;
    protected Session producerSession;
    protected MessageConsumer consumer;
    protected MessageProducer producer;
    protected Topic topic;
    protected int messageCount = 10000;
    protected int timeOutInSeconds = 10;

    protected void setUp() throws Exception {
        this.brokerService = new BrokerService();
        this.brokerService.setPersistent(false);
        this.brokerService.start();
        this.factory = new ActiveMQConnectionFactory(BrokerRegistry.getInstance().findFirst().getVmConnectorURI());
        this.consumerConnection = this.factory.createConnection();
        this.consumerConnection.start();
        this.producerConnection = this.factory.createConnection();
        this.producerConnection.start();
        this.consumerSession = this.consumerConnection.createSession(false, 1);
        this.topic = this.consumerSession.createTopic(getName());
        this.producerSession = this.producerConnection.createSession(false, 1);
        this.consumer = this.consumerSession.createConsumer(this.topic);
        this.producer = this.producerSession.createProducer(this.topic);
    }

    protected void tearDown() throws Exception {
        if (this.producerConnection != null) {
            this.producerConnection.close();
        }
        if (this.consumerConnection != null) {
            this.consumerConnection.close();
        }
        if (this.brokerService != null) {
            this.brokerService.stop();
        }
    }

    public void testNoIntercept() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(this.messageCount);
        this.consumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.interceptor.MessageInterceptorTest.1
            public void onMessage(Message message) {
                countDownLatch.countDown();
            }
        });
        for (int i = 0; i < this.messageCount; i++) {
            this.producer.send(this.producerSession.createTextMessage("test: " + i));
        }
        countDownLatch.await(this.timeOutInSeconds, TimeUnit.SECONDS);
        assertEquals(0L, countDownLatch.getCount());
    }

    public void testNoStackOverFlow() throws Exception {
        final MessageInterceptorRegistry messageInterceptorRegistry = MessageInterceptorRegistry.getInstance().get(BrokerRegistry.getInstance().findFirst());
        messageInterceptorRegistry.addMessageInterceptorForTopic(this.topic.getTopicName(), new MessageInterceptor() { // from class: org.apache.activemq.broker.interceptor.MessageInterceptorTest.2
            public void intercept(ProducerBrokerExchange producerBrokerExchange, org.apache.activemq.command.Message message) {
                try {
                    messageInterceptorRegistry.injectMessage(producerBrokerExchange, message);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        final CountDownLatch countDownLatch = new CountDownLatch(this.messageCount);
        this.consumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.interceptor.MessageInterceptorTest.3
            public void onMessage(Message message) {
                countDownLatch.countDown();
            }
        });
        for (int i = 0; i < this.messageCount; i++) {
            this.producer.send(this.producerSession.createTextMessage("test: " + i));
        }
        countDownLatch.await(this.timeOutInSeconds, TimeUnit.SECONDS);
        assertEquals(0L, countDownLatch.getCount());
    }

    public void testInterceptorAll() throws Exception {
        MessageInterceptorRegistry.getInstance().get(BrokerRegistry.getInstance().findFirst()).addMessageInterceptorForTopic(this.topic.getTopicName(), new MessageInterceptor() { // from class: org.apache.activemq.broker.interceptor.MessageInterceptorTest.4
            public void intercept(ProducerBrokerExchange producerBrokerExchange, org.apache.activemq.command.Message message) {
            }
        });
        final CountDownLatch countDownLatch = new CountDownLatch(this.messageCount);
        this.consumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.interceptor.MessageInterceptorTest.5
            public void onMessage(Message message) {
                countDownLatch.countDown();
            }
        });
        for (int i = 0; i < this.messageCount; i++) {
            this.producer.send(this.producerSession.createTextMessage("test: " + i));
        }
        countDownLatch.await(this.timeOutInSeconds, TimeUnit.SECONDS);
        assertEquals(this.messageCount, countDownLatch.getCount());
    }

    public void testReRouteAll() throws Exception {
        final ActiveMQQueue activeMQQueue = new ActiveMQQueue("Reroute.From." + this.topic.getTopicName());
        final MessageInterceptorRegistry messageInterceptorRegistry = MessageInterceptorRegistry.getInstance().get(BrokerRegistry.getInstance().findFirst());
        messageInterceptorRegistry.addMessageInterceptorForTopic(this.topic.getTopicName(), new MessageInterceptor() { // from class: org.apache.activemq.broker.interceptor.MessageInterceptorTest.6
            public void intercept(ProducerBrokerExchange producerBrokerExchange, org.apache.activemq.command.Message message) {
                message.setDestination(activeMQQueue);
                try {
                    messageInterceptorRegistry.injectMessage(producerBrokerExchange, message);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        final CountDownLatch countDownLatch = new CountDownLatch(this.messageCount);
        this.consumer = this.consumerSession.createConsumer(activeMQQueue);
        this.consumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.interceptor.MessageInterceptorTest.7
            public void onMessage(Message message) {
                countDownLatch.countDown();
            }
        });
        for (int i = 0; i < this.messageCount; i++) {
            this.producer.send(this.producerSession.createTextMessage("test: " + i));
        }
        countDownLatch.await(this.timeOutInSeconds, TimeUnit.SECONDS);
        assertEquals(0L, countDownLatch.getCount());
    }

    public void testReRouteAllWithNullProducerExchange() throws Exception {
        final ActiveMQQueue activeMQQueue = new ActiveMQQueue("Reroute.From." + this.topic.getTopicName());
        final MessageInterceptorRegistry messageInterceptorRegistry = MessageInterceptorRegistry.getInstance().get(BrokerRegistry.getInstance().findFirst());
        messageInterceptorRegistry.addMessageInterceptorForTopic(this.topic.getTopicName(), new MessageInterceptor() { // from class: org.apache.activemq.broker.interceptor.MessageInterceptorTest.8
            public void intercept(ProducerBrokerExchange producerBrokerExchange, org.apache.activemq.command.Message message) {
                message.setDestination(activeMQQueue);
                try {
                    messageInterceptorRegistry.injectMessage(producerBrokerExchange, message);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        final CountDownLatch countDownLatch = new CountDownLatch(this.messageCount);
        this.consumer = this.consumerSession.createConsumer(activeMQQueue);
        this.consumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.interceptor.MessageInterceptorTest.9
            public void onMessage(Message message) {
                countDownLatch.countDown();
            }
        });
        for (int i = 0; i < this.messageCount; i++) {
            this.producer.send(this.producerSession.createTextMessage("test: " + i));
        }
        countDownLatch.await(this.timeOutInSeconds, TimeUnit.SECONDS);
        assertEquals(0L, countDownLatch.getCount());
    }

    public void testReRouteAllowWildCards() throws Exception {
        final ActiveMQQueue activeMQQueue = new ActiveMQQueue("testQueueFor." + getName());
        final MessageInterceptorRegistry messageInterceptorRegistry = MessageInterceptorRegistry.getInstance().get(BrokerRegistry.getInstance().findFirst());
        messageInterceptorRegistry.addMessageInterceptorForTopic(">", new MessageInterceptor() { // from class: org.apache.activemq.broker.interceptor.MessageInterceptorTest.10
            public void intercept(ProducerBrokerExchange producerBrokerExchange, org.apache.activemq.command.Message message) {
                try {
                    message.setDestination(activeMQQueue);
                    messageInterceptorRegistry.injectMessage(producerBrokerExchange, message);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        final CountDownLatch countDownLatch = new CountDownLatch(this.messageCount);
        this.consumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.interceptor.MessageInterceptorTest.11
            public void onMessage(Message message) {
                countDownLatch.countDown();
            }
        });
        this.consumerSession.createConsumer(activeMQQueue).setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.interceptor.MessageInterceptorTest.12
            public void onMessage(Message message) {
                countDownLatch.countDown();
            }
        });
        for (int i = 0; i < this.messageCount; i++) {
            this.producer.send(this.producerSession.createTextMessage("test: " + i));
        }
        countDownLatch.await(this.timeOutInSeconds, TimeUnit.SECONDS);
        assertEquals(0L, countDownLatch.getCount());
    }
}
