package org.apache.activemq.usecases;

import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activemq/usecases/DiscriminatingConsumerLoadTest.class */
public class DiscriminatingConsumerLoadTest extends TestSupport {
    private static final Log LOG = LogFactory.getLog(DiscriminatingConsumerLoadTest.class);
    private Connection producerConnection;
    private Connection consumerConnection;
    public static final String JMSTYPE_EATME = "DiscriminatingLoadClient.EatMe";
    public static final String JMSTYPE_IGNOREME = "DiscriminatingLoadClient.IgnoreMe";
    private final int testSize = 5000;
    BrokerService broker;

    /* loaded from: input_file:org/apache/activemq/usecases/DiscriminatingConsumerLoadTest$Consumer.class */
    private class Consumer extends Thread {
        private String jmsSelector;
        protected int counterReceived = 0;
        private boolean deliveryHalted = false;

        public Consumer(Connection connection, String str) {
            this.jmsSelector = null;
            this.jmsSelector = str;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                Session createSession = DiscriminatingConsumerLoadTest.this.consumerConnection.createSession(false, 1);
                Queue createQueue = createSession.createQueue("test");
                MessageConsumer createConsumer = null != this.jmsSelector ? createSession.createConsumer(createQueue, "JMSType='" + this.jmsSelector + "'") : createSession.createConsumer(createQueue);
                while (!this.deliveryHalted && this.counterReceived < 5000) {
                    TextMessage receive = createConsumer.receive(30000L);
                    if (receive != null) {
                        this.counterReceived++;
                        DiscriminatingConsumerLoadTest.LOG.info("consuming .... JMSType = " + receive.getJMSType() + " received = " + this.counterReceived);
                    } else {
                        DiscriminatingConsumerLoadTest.LOG.info("consuming .... timeout while waiting for a message ... broker must have stopped delivery ...  received = " + this.counterReceived);
                        this.deliveryHalted = true;
                    }
                }
                createSession.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public int getCount() {
            return this.counterReceived;
        }

        public boolean deliveryHalted() {
            return this.deliveryHalted;
        }
    }

    /* loaded from: input_file:org/apache/activemq/usecases/DiscriminatingConsumerLoadTest$Producer.class */
    private class Producer extends Thread {
        private Connection connection;
        private int counterSent = 0;
        public boolean stop = false;

        public Producer(Connection connection) {
            this.connection = null;
            this.connection = connection;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                Session createSession = this.connection.createSession(false, 1);
                Queue createQueue = createSession.createQueue("test");
                Thread.sleep(DurableSubProcessWithRestartTest.BROKER_RESTART);
                MessageProducer createProducer = createSession.createProducer(createQueue);
                while (!this.stop && this.counterSent < 5000) {
                    TextMessage createTextMessage = createSession.createTextMessage("*** Ill ....... Ini ***");
                    createTextMessage.setJMSType(DiscriminatingConsumerLoadTest.JMSTYPE_EATME);
                    createProducer.send(createTextMessage, 1, 0, 1800000L);
                    this.counterSent++;
                    TextMessage createTextMessage2 = createSession.createTextMessage("*** Ill ....... Ini ***");
                    createTextMessage2.setJMSType(DiscriminatingConsumerLoadTest.JMSTYPE_IGNOREME);
                    createProducer.send(createTextMessage2, 1, 0, 1800000L);
                    this.counterSent++;
                }
                createSession.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
            DiscriminatingConsumerLoadTest.LOG.info("producer thread complete ... " + this.counterSent + " messages sent to the queue");
        }
    }

    protected void setUp() throws Exception {
        this.broker = new BrokerService();
        this.broker.setPersistent(false);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setMaxPageSize(5000);
        policyMap.setDefaultEntry(policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.start();
        super.setUp();
        this.producerConnection = createConnection();
        this.consumerConnection = createConnection();
    }

    protected void tearDown() throws Exception {
        if (this.producerConnection != null) {
            this.producerConnection.close();
            this.producerConnection = null;
        }
        if (this.consumerConnection != null) {
            this.consumerConnection.close();
            this.consumerConnection = null;
        }
        super.tearDown();
        this.broker.stop();
    }

    public void testNonDiscriminatingConsumer() throws Exception {
        this.consumerConnection = createConnection();
        this.consumerConnection.start();
        LOG.info("consumerConnection = " + this.consumerConnection);
        try {
            Thread.sleep(1000L);
        } catch (Exception e) {
        }
        Consumer consumer = new Consumer(this.consumerConnection, null);
        Thread thread = new Thread(consumer);
        thread.start();
        this.producerConnection = createConnection();
        this.producerConnection.start();
        LOG.info("producerConnection = " + this.producerConnection);
        try {
            Thread.sleep(3000L);
        } catch (Exception e2) {
        }
        Producer producer = new Producer(this.producerConnection);
        new Thread(producer).start();
        thread.join();
        producer.stop = true;
        if (consumer.getCount() == 5000) {
            LOG.info("test complete .... all messsages consumed!!");
        } else {
            LOG.info("test failed .... Sent 5000 messages intended to be consumed ( 5000 total), but only consumed " + consumer.getCount());
        }
        assertTrue("Sent 5000 messages intended to be consumed, but only consumed " + consumer.getCount(), consumer.getCount() == 5000);
        assertFalse("Delivery of messages to consumer was halted during this test", consumer.deliveryHalted());
    }

    public void testDiscriminatingConsumer() throws Exception {
        this.consumerConnection = createConnection();
        this.consumerConnection.start();
        LOG.info("consumerConnection = " + this.consumerConnection);
        try {
            Thread.sleep(1000L);
        } catch (Exception e) {
        }
        Consumer consumer = new Consumer(this.consumerConnection, JMSTYPE_EATME);
        Thread thread = new Thread(consumer);
        thread.start();
        this.producerConnection = createConnection();
        this.producerConnection.start();
        LOG.info("producerConnection = " + this.producerConnection);
        try {
            Thread.sleep(3000L);
        } catch (Exception e2) {
        }
        Producer producer = new Producer(this.producerConnection);
        new Thread(producer).start();
        thread.join();
        producer.stop = true;
        if (consumer.getCount() == 2500) {
            LOG.info("test complete .... all messsages consumed!!");
        } else {
            LOG.info("test failed .... Sent 5000 original messages, only half of which (2500) were intended to be consumed: consumer paused at: " + consumer.getCount());
        }
        assertTrue("Sent 5000 original messages, only half of which (2500) were intended to be consumed: consumer paused at: " + consumer.getCount(), consumer.getCount() == 2500);
        assertTrue("Delivery of messages to consumer was halted during this test as it only wants half", consumer.deliveryHalted());
    }
}
