/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ1893Test
extends TestCase {
    private static final Logger log = LoggerFactory.getLogger(AMQ1893Test.class);
    static final String QUEUE_NAME = "TEST";
    static final int MESSAGE_COUNT_OF_ONE_GROUP = 10000;
    static final int[] PRIORITIES = new int[]{0, 5, 10};
    static final boolean debug = false;
    private BrokerService brokerService;
    private ActiveMQQueue destination;

    protected void setUp() throws Exception {
        this.brokerService = new BrokerService();
        this.brokerService.setDeleteAllMessagesOnStartup(true);
        this.brokerService.addConnector("tcp://localhost:0");
        this.brokerService.start();
        this.destination = new ActiveMQQueue(QUEUE_NAME);
    }

    protected void tearDown() throws Exception {
        this.brokerService.stop();
    }

    public void testProduceConsumeWithSelector() throws Exception {
        new TestProducer().produceMessages();
        new TestConsumer().consume();
    }

    class TestConsumer {
        private CountDownLatch finishLatch = new CountDownLatch(1);

        TestConsumer() {
        }

        public void consume() throws Exception {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(((TransportConnector)AMQ1893Test.this.brokerService.getTransportConnectors().get(0)).getConnectUri().toString());
            final int totalMessageCount = 10000 * PRIORITIES.length;
            final AtomicInteger counter = new AtomicInteger();
            MessageListener listener = new MessageListener(){

                public void onMessage(Message message) {
                    if (counter.incrementAndGet() == totalMessageCount) {
                        TestConsumer.this.finishLatch.countDown();
                    }
                }
            };
            int consumerCount = PRIORITIES.length;
            Connection[] connections = new Connection[consumerCount];
            Session[] sessions = new Session[consumerCount];
            MessageConsumer[] consumers = new MessageConsumer[consumerCount];
            for (int i = 0; i < consumerCount; ++i) {
                String selector = "priority = " + PRIORITIES[i];
                connections[i] = connectionFactory.createConnection();
                sessions[i] = connections[i].createSession(false, 1);
                consumers[i] = sessions[i].createConsumer((Destination)AMQ1893Test.this.destination, selector);
                consumers[i].setMessageListener(listener);
            }
            for (Connection connection : connections) {
                connection.start();
            }
            log.info("received " + counter.get() + " messages");
            TestCase.assertTrue((String)"got all messages in time", (boolean)this.finishLatch.await(60L, TimeUnit.SECONDS));
            log.info("received " + counter.get() + " messages");
            for (Connection connection : consumers) {
                connection.close();
            }
            for (Connection connection : sessions) {
                connection.close();
            }
            for (Connection connection : connections) {
                connection.close();
            }
        }
    }

    class TestProducer {
        TestProducer() {
        }

        public void produceMessages() throws Exception {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(((TransportConnector)AMQ1893Test.this.brokerService.getTransportConnectors().get(0)).getConnectUri().toString());
            Connection connection = connectionFactory.createConnection();
            Session session = connection.createSession(false, 1);
            Queue destination = session.createQueue(AMQ1893Test.QUEUE_NAME);
            MessageProducer producer = session.createProducer((Destination)destination);
            producer.setDeliveryMode(2);
            long start = System.currentTimeMillis();
            for (int priority : PRIORITIES) {
                String name = null;
                name = priority == 10 ? "high" : (priority == 5 ? "mid" : "low");
                for (int i = 1; i <= 10000; ++i) {
                    TextMessage message = session.createTextMessage(name + "_" + i);
                    message.setIntProperty("priority", priority);
                    producer.send((Message)message);
                }
            }
            long end = System.currentTimeMillis();
            log.info("sent 30000 messages in " + (end - start) + " ms");
            producer.close();
            session.close();
            connection.close();
        }
    }
}

