/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.util.ArrayList;
import java.util.Enumeration;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueueZeroPrefetchLazyDispatchPriorityTest
extends TestCase {
    static final Logger LOG = LoggerFactory.getLogger(QueueZeroPrefetchLazyDispatchPriorityTest.class);
    private BrokerService broker;
    public static final byte[] PAYLOAD = new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9};

    protected void setUp() throws Exception {
        this.broker = this.createBroker();
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    protected void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    public void testPriorityMessages() throws Exception {
        for (int i = 0; i < 5; ++i) {
            this.produceMessages(4, 4, "TestQ");
            this.produceMessages(1, 5, "TestQ");
            LOG.info("On iteration " + i);
            Thread.sleep(500L);
            ArrayList<Message> consumeList = this.consumeMessages("TestQ");
            LOG.info("Consumed list " + consumeList.size());
            QueueZeroPrefetchLazyDispatchPriorityTest.assertEquals((String)"message 1 should be priority high", (int)5, (int)consumeList.get(0).getJMSPriority());
            QueueZeroPrefetchLazyDispatchPriorityTest.assertEquals((String)"message 2 should be priority medium", (int)4, (int)consumeList.get(1).getJMSPriority());
            QueueZeroPrefetchLazyDispatchPriorityTest.assertEquals((String)"message 3 should be priority medium", (int)4, (int)consumeList.get(2).getJMSPriority());
            QueueZeroPrefetchLazyDispatchPriorityTest.assertEquals((String)"message 4 should be priority medium", (int)4, (int)consumeList.get(3).getJMSPriority());
            QueueZeroPrefetchLazyDispatchPriorityTest.assertEquals((String)"message 5 should be priority medium", (int)4, (int)consumeList.get(4).getJMSPriority());
        }
    }

    public void testPriorityMessagesMoreThanPageSize() throws Exception {
        int numToSend = 450;
        for (int i = 0; i < 5; ++i) {
            this.produceMessages(449, 4, "TestQ");
            Thread.sleep(700L);
            this.produceMessages(1, 5, "TestQ");
            Thread.sleep(500L);
            LOG.info("On iteration " + i);
            ArrayList<Message> consumeList = this.consumeMessages("TestQ");
            LOG.info("Consumed list " + consumeList.size());
            QueueZeroPrefetchLazyDispatchPriorityTest.assertEquals((String)"message 1 should be priority high", (int)5, (int)consumeList.get(0).getJMSPriority());
            for (int j = 1; j < 449; ++j) {
                QueueZeroPrefetchLazyDispatchPriorityTest.assertEquals((String)("message " + j + " should be priority medium"), (int)4, (int)consumeList.get(j).getJMSPriority());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testLongLivedPriorityConsumer() throws Exception {
        int numToSend = 150;
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
        try (Connection connection = connectionFactory.createConnection();){
            Session session = connection.createSession(false, 1);
            MessageConsumer consumer = session.createConsumer((Destination)new ActiveMQQueue("TestQ"));
            connection.start();
            for (int i = 0; i < 5; ++i) {
                this.produceMessages(149, 4, "TestQ");
                this.produceMessages(1, 5, "TestQ");
                Message message = consumer.receive(4000L);
                QueueZeroPrefetchLazyDispatchPriorityTest.assertEquals((String)"message should be priority high", (int)5, (int)message.getJMSPriority());
            }
        }
        ArrayList<Message> consumeList = this.consumeMessages("TestQ");
        LOG.info("Consumed list " + consumeList.size());
        for (Message message : consumeList) {
            QueueZeroPrefetchLazyDispatchPriorityTest.assertEquals((String)"should be priority medium", (int)4, (int)message.getJMSPriority());
        }
    }

    public void testPriorityMessagesWithJmsBrowser() throws Exception {
        int numToSend = 250;
        for (int i = 0; i < 5; ++i) {
            this.produceMessages(249, 4, "TestQ");
            ArrayList<Message> browsed = this.browseMessages("TestQ");
            LOG.info("Browsed: " + browsed.size());
            this.produceMessages(1, 5, "TestQ");
            Thread.sleep(500L);
            LOG.info("On iteration " + i);
            Message message = this.consumeOneMessage("TestQ");
            QueueZeroPrefetchLazyDispatchPriorityTest.assertNotNull((Object)message);
            QueueZeroPrefetchLazyDispatchPriorityTest.assertEquals((int)5, (int)message.getJMSPriority());
            ArrayList<Message> consumeList = this.consumeMessages("TestQ");
            LOG.info("Consumed list " + consumeList.size());
            for (int j = 1; j < 249; ++j) {
                QueueZeroPrefetchLazyDispatchPriorityTest.assertEquals((String)("Iteration: " + i + ", message " + j + " should be priority medium"), (int)4, (int)consumeList.get(j).getJMSPriority());
            }
        }
    }

    public void testJmsBrowserGetsPagedIn() throws Exception {
        int numToSend = 10;
        for (int i = 0; i < 10; ++i) {
            this.produceMessages(10, 4, "TestQ");
            ArrayList<Message> browsed = this.browseMessages("TestQ");
            LOG.info("Browsed: " + browsed.size());
            QueueZeroPrefetchLazyDispatchPriorityTest.assertEquals((int)0, (int)browsed.size());
            Message message = this.consumeOneMessage("TestQ", 2);
            QueueZeroPrefetchLazyDispatchPriorityTest.assertNotNull((Object)message);
            browsed = this.browseMessages("TestQ");
            LOG.info("Browsed: " + browsed.size());
            QueueZeroPrefetchLazyDispatchPriorityTest.assertEquals((String)"see only the paged in for pull", (int)1, (int)browsed.size());
            ArrayList<Message> consumeList = this.consumeMessages("TestQ");
            LOG.info("Consumed list " + consumeList.size());
            QueueZeroPrefetchLazyDispatchPriorityTest.assertEquals((int)10, (int)consumeList.size());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void produceMessages(int numberOfMessages, int priority, String queueName) throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
        connectionFactory.setConnectionIDPrefix("pri-" + priority);
        try (Connection connection = connectionFactory.createConnection();){
            Session session = connection.createSession(false, 1);
            MessageProducer producer = session.createProducer((Destination)new ActiveMQQueue(queueName));
            connection.start();
            for (int i = 0; i < numberOfMessages; ++i) {
                BytesMessage m = session.createBytesMessage();
                m.writeBytes(PAYLOAD);
                m.setJMSPriority(priority);
                producer.send((Message)m, 2, m.getJMSPriority(), 0L);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ArrayList<Message> consumeMessages(String queueName) throws Exception {
        ArrayList<Message> returnedMessages = new ArrayList<Message>();
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
        try (Connection connection = connectionFactory.createConnection();){
            Session session = connection.createSession(false, 1);
            MessageConsumer consumer = session.createConsumer((Destination)new ActiveMQQueue(queueName));
            connection.start();
            boolean finished = false;
            while (!finished) {
                Message message = consumer.receive(1000L);
                if (message == null) {
                    finished = true;
                }
                if (message == null) continue;
                returnedMessages.add(message);
            }
            consumer.close();
            ArrayList<Message> arrayList = returnedMessages;
            return arrayList;
        }
    }

    private Message consumeOneMessage(String queueName) throws Exception {
        return this.consumeOneMessage(queueName, 1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Message consumeOneMessage(String queueName, int ackMode) throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
        try (Connection connection = connectionFactory.createConnection();){
            Session session = connection.createSession(false, ackMode);
            MessageConsumer consumer = session.createConsumer((Destination)new ActiveMQQueue(queueName));
            connection.start();
            Message message = consumer.receive(1000L);
            return message;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ArrayList<Message> browseMessages(String queueName) throws Exception {
        ArrayList<Message> returnedMessages = new ArrayList<Message>();
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.broker.getTransportConnectorByScheme("tcp").getPublishableConnectString());
        try (Connection connection = connectionFactory.createConnection();){
            Session session = connection.createSession(false, 1);
            QueueBrowser consumer = session.createBrowser((Queue)new ActiveMQQueue(queueName));
            connection.start();
            Enumeration enumeration = consumer.getEnumeration();
            while (enumeration.hasMoreElements()) {
                Message message = (Message)enumeration.nextElement();
                returnedMessages.add(message);
            }
            ArrayList<Message> arrayList = returnedMessages;
            return arrayList;
        }
    }

    private BrokerService createBroker() throws Exception {
        BrokerService broker = new BrokerService();
        broker.setDeleteAllMessagesOnStartup(true);
        PolicyMap policyMap = new PolicyMap();
        ArrayList<PolicyEntry> entries = new ArrayList<PolicyEntry>();
        PolicyEntry pe = new PolicyEntry();
        pe.setPrioritizedMessages(true);
        pe.setExpireMessagesPeriod(500L);
        pe.setMaxPageSize(100);
        pe.setMaxExpirePageSize(0);
        pe.setMaxBrowsePageSize(0);
        pe.setQueuePrefetch(0);
        pe.setLazyDispatch(true);
        pe.setOptimizedDispatch(true);
        pe.setUseCache(false);
        pe.setQueue(">");
        entries.add(pe);
        policyMap.setPolicyEntries(entries);
        broker.setDestinationPolicy(policyMap);
        broker.addConnector("tcp://0.0.0.0:0");
        return broker;
    }
}

