package org.apache.activemq.store;

import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.usecases.DurableSubProcessConcurrentCommitActivateNoDuplicateTest;
import org.apache.activemq.usecases.VerifyNetworkConsumersDisconnectTest;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/store/MessagePriorityTest.class */
public abstract class MessagePriorityTest extends CombinationTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(MessagePriorityTest.class);
    BrokerService broker;
    PersistenceAdapter adapter;
    protected ActiveMQConnectionFactory factory;
    protected Connection conn;
    protected Session sess;
    public boolean useCache = true;
    public int deliveryMode = 2;
    public boolean dispatchAsync = true;
    public boolean prioritizeMessages = true;
    public boolean immediatePriorityDispatch = true;
    public int prefetchVal = DurableSubProcessConcurrentCommitActivateNoDuplicateTest.SERVER_SLEEP;
    public int expireMessagePeriod = VerifyNetworkConsumersDisconnectTest.TIMEOUT;
    public int MSG_NUM = DurableSubProcessConcurrentCommitActivateNoDuplicateTest.CARGO_SIZE;
    public int HIGH_PRI = 7;
    public int LOW_PRI = 3;

    /* loaded from: input_file:org/apache/activemq/store/MessagePriorityTest$ProducerThread.class */
    protected class ProducerThread extends Thread {
        int priority;
        int messageCount;
        ActiveMQDestination dest;

        public ProducerThread(ActiveMQDestination activeMQDestination, int i, int i2) {
            this.messageCount = i;
            this.priority = i2;
            this.dest = activeMQDestination;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                MessageProducer createProducer = MessagePriorityTest.this.sess.createProducer(this.dest);
                createProducer.setPriority(this.priority);
                createProducer.setDeliveryMode(MessagePriorityTest.this.deliveryMode);
                for (int i = 0; i < this.messageCount; i++) {
                    createProducer.send(MessagePriorityTest.this.sess.createTextMessage("message priority: " + this.priority));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void setMessagePriority(int i) {
            this.priority = i;
        }

        public void setMessageCount(int i) {
            this.messageCount = i;
        }
    }

    protected abstract PersistenceAdapter createPersistenceAdapter(boolean z) throws Exception;

    protected void setUp() throws Exception {
        this.broker = new BrokerService();
        this.broker.setBrokerName("priorityTest");
        this.broker.setAdvisorySupport(false);
        this.adapter = createPersistenceAdapter(true);
        this.broker.setPersistenceAdapter(this.adapter);
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setPrioritizedMessages(this.prioritizeMessages);
        policyEntry.setUseCache(this.useCache);
        policyEntry.setExpireMessagesPeriod(this.expireMessagePeriod);
        StorePendingDurableSubscriberMessageStoragePolicy storePendingDurableSubscriberMessageStoragePolicy = new StorePendingDurableSubscriberMessageStoragePolicy();
        storePendingDurableSubscriberMessageStoragePolicy.setImmediatePriorityDispatch(this.immediatePriorityDispatch);
        storePendingDurableSubscriberMessageStoragePolicy.setUseCache(this.useCache);
        policyEntry.setPendingDurableSubscriberPolicy(storePendingDurableSubscriberMessageStoragePolicy);
        PolicyMap policyMap = new PolicyMap();
        policyMap.put(new ActiveMQQueue("TEST"), policyEntry);
        policyMap.put(new ActiveMQTopic("TEST"), policyEntry);
        PolicyEntry policyEntry2 = new PolicyEntry();
        SharedDeadLetterStrategy sharedDeadLetterStrategy = new SharedDeadLetterStrategy();
        sharedDeadLetterStrategy.setProcessExpired(false);
        policyEntry2.setDeadLetterStrategy(sharedDeadLetterStrategy);
        policyMap.put(new ActiveMQTopic("TEST_CLEANUP_NO_PRIORITY"), policyEntry2);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.factory = new ActiveMQConnectionFactory("vm://priorityTest");
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
        activeMQPrefetchPolicy.setAll(this.prefetchVal);
        this.factory.setPrefetchPolicy(activeMQPrefetchPolicy);
        this.factory.setWatchTopicAdvisories(false);
        this.factory.setDispatchAsync(this.dispatchAsync);
        this.conn = this.factory.createConnection();
        this.conn.setClientID("priority");
        this.conn.start();
        this.sess = this.conn.createSession(false, 1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void tearDown() throws Exception {
        try {
            this.sess.close();
            this.conn.close();
            this.broker.stop();
            this.broker.waitUntilStopped();
        } catch (Exception e) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        } catch (Throwable th) {
            this.broker.stop();
            this.broker.waitUntilStopped();
            throw th;
        }
    }

    public void testStoreConfigured() throws Exception {
        final Queue createQueue = this.sess.createQueue("TEST");
        final Topic createTopic = this.sess.createTopic("TEST");
        MessageProducer createProducer = this.sess.createProducer(createQueue);
        MessageProducer createProducer2 = this.sess.createProducer(createTopic);
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.store.MessagePriorityTest.1
            public boolean isSatisified() throws Exception {
                return MessagePriorityTest.this.broker.getRegionBroker().getDestinationMap().get(createQueue) != null;
            }
        });
        assertTrue(((Destination) this.broker.getRegionBroker().getDestinationMap().get(createQueue)).getMessageStore().isPrioritizedMessages());
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.store.MessagePriorityTest.2
            public boolean isSatisified() throws Exception {
                return MessagePriorityTest.this.broker.getRegionBroker().getDestinationMap().get(createTopic) != null;
            }
        });
        assertTrue(((Destination) this.broker.getRegionBroker().getDestinationMap().get(createTopic)).getMessageStore().isPrioritizedMessages());
        createProducer.close();
        createProducer2.close();
    }

    public void initCombosForTestQueues() {
        addCombinationValues("useCache", new Object[]{new Boolean(true), new Boolean(false)});
        addCombinationValues("deliveryMode", new Object[]{new Integer(1), new Integer(2)});
    }

    public void testQueues() throws Exception {
        ActiveMQQueue createQueue = this.sess.createQueue("TEST");
        ProducerThread producerThread = new ProducerThread(createQueue, this.MSG_NUM, this.LOW_PRI);
        ProducerThread producerThread2 = new ProducerThread(createQueue, this.MSG_NUM, this.HIGH_PRI);
        producerThread.start();
        producerThread2.start();
        producerThread.join();
        producerThread2.join();
        MessageConsumer createConsumer = this.sess.createConsumer(createQueue);
        int i = 0;
        while (i < this.MSG_NUM * 2) {
            Message receive = createConsumer.receive(5000L);
            LOG.debug("received i=" + i + ", " + (receive != null ? receive.getJMSMessageID() : null));
            assertNotNull("Message " + i + " was null", receive);
            assertEquals("Message " + i + " has wrong priority", i < this.MSG_NUM ? this.HIGH_PRI : this.LOW_PRI, receive.getJMSPriority());
            i++;
        }
    }

    protected Message createMessage(int i) throws Exception {
        String str = "priority " + i;
        TextMessage createTextMessage = this.sess.createTextMessage(str);
        LOG.info("Sending  " + str);
        return createTextMessage;
    }

    public void initCombosForTestDurableSubs() {
        addCombinationValues("prefetchVal", new Object[]{new Integer(1000), new Integer(this.MSG_NUM / 4)});
    }

    public void testDurableSubs() throws Exception {
        ActiveMQTopic createTopic = this.sess.createTopic("TEST");
        this.sess.createDurableSubscriber(createTopic, "priority").close();
        ProducerThread producerThread = new ProducerThread(createTopic, this.MSG_NUM, this.LOW_PRI);
        ProducerThread producerThread2 = new ProducerThread(createTopic, this.MSG_NUM, this.HIGH_PRI);
        producerThread.start();
        producerThread2.start();
        producerThread.join();
        producerThread2.join();
        TopicSubscriber createDurableSubscriber = this.sess.createDurableSubscriber(createTopic, "priority");
        int i = 0;
        while (i < this.MSG_NUM * 2) {
            Message receive = createDurableSubscriber.receive(5000L);
            assertNotNull("Message " + i + " was null", receive);
            assertEquals("Message " + i + " has wrong priority", i < this.MSG_NUM ? this.HIGH_PRI : this.LOW_PRI, receive.getJMSPriority());
            i++;
        }
        ActiveMQTopic createTopic2 = this.sess.createTopic("HAS_NO_PRIORITY");
        this.sess.createDurableSubscriber(createTopic2, "no_priority").close();
        ProducerThread producerThread3 = new ProducerThread(createTopic2, this.MSG_NUM, this.LOW_PRI);
        ProducerThread producerThread4 = new ProducerThread(createTopic2, this.MSG_NUM, this.HIGH_PRI);
        producerThread3.start();
        producerThread4.start();
        producerThread3.join();
        producerThread4.join();
        TopicSubscriber createDurableSubscriber2 = this.sess.createDurableSubscriber(createTopic2, "no_priority");
        for (int i2 = 0; i2 < this.MSG_NUM * 2; i2++) {
            assertNotNull("Message " + i2 + " was null", createDurableSubscriber2.receive(5000L));
        }
    }

    public void initCombosForTestDurableSubsReconnect() {
        addCombinationValues("prefetchVal", new Object[]{new Integer(1000), new Integer(this.MSG_NUM / 2)});
        addCombinationValues("dispatchAsync", new Object[]{Boolean.FALSE});
        addCombinationValues("useCache", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testDurableSubsReconnect() throws Exception {
        ActiveMQTopic createTopic = this.sess.createTopic("TEST");
        this.sess.createDurableSubscriber(createTopic, "priorityDisconnect").close();
        ProducerThread producerThread = new ProducerThread(createTopic, this.MSG_NUM, this.LOW_PRI);
        ProducerThread producerThread2 = new ProducerThread(createTopic, this.MSG_NUM, this.HIGH_PRI);
        producerThread.start();
        producerThread2.start();
        producerThread.join();
        producerThread2.join();
        int i = this.MSG_NUM / 4;
        TopicSubscriber createDurableSubscriber = this.sess.createDurableSubscriber(createTopic, "priorityDisconnect");
        int i2 = 0;
        while (i2 < this.MSG_NUM * 2) {
            Message receive = createDurableSubscriber.receive(15000L);
            LOG.debug("received i=" + i2 + ", " + (receive != null ? receive.getJMSMessageID() : null));
            assertNotNull("Message " + i2 + " was null", receive);
            assertEquals("Message " + i2 + " has wrong priority", i2 < this.MSG_NUM ? this.HIGH_PRI : this.LOW_PRI, receive.getJMSPriority());
            if (i2 > 0 && i2 % i == 0) {
                LOG.info("Closing durable sub.. on: " + i2);
                createDurableSubscriber.close();
                createDurableSubscriber = this.sess.createDurableSubscriber(createTopic, "priorityDisconnect");
            }
            i2++;
        }
    }

    public void testHighPriorityDelivery() throws Exception {
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
        activeMQPrefetchPolicy.setAll(0);
        this.factory.setPrefetchPolicy(activeMQPrefetchPolicy);
        this.conn.close();
        this.conn = this.factory.createConnection();
        this.conn.setClientID("priority");
        this.conn.start();
        this.sess = this.conn.createSession(false, 1);
        ActiveMQTopic createTopic = this.sess.createTopic("TEST");
        this.sess.createDurableSubscriber(createTopic, "priorityDisconnect").close();
        int[] iArr = new int[4000];
        ProducerThread producerThread = new ProducerThread(createTopic, 2000, this.LOW_PRI + 1);
        producerThread.run();
        LOG.info("Low priority messages sent");
        TopicSubscriber createDurableSubscriber = this.sess.createDurableSubscriber(createTopic, "priorityDisconnect");
        int i = 0;
        for (int i2 = 0; i2 < 2000; i2++) {
            Message receive = createDurableSubscriber.receive(15000L);
            LOG.info("received i=" + i2 + ", " + (receive != null ? receive.getJMSMessageID() + ", priority:" + receive.getJMSPriority() : null));
            assertNotNull("Message " + i2 + " was null", receive);
            assertEquals("Message " + i2 + " has wrong priority", this.LOW_PRI + 1, receive.getJMSPriority());
            assertTrue("not duplicate ", iArr[i2] == 0);
            iArr[i2] = 1;
            if (i2 % 250 == 0) {
                producerThread.setMessagePriority(this.HIGH_PRI);
                producerThread.setMessageCount(1);
                producerThread.run();
                LOG.info("High priority message sent, should be able to receive immediately");
                if ((i2 % 250) * 2 == 0) {
                    producerThread.setMessagePriority(this.HIGH_PRI - 1);
                    producerThread.setMessageCount(1);
                    producerThread.run();
                    LOG.info("High -1 priority message sent, should be able to receive immediately");
                }
                if ((i2 % 250) * 4 == 0) {
                    producerThread.setMessagePriority(this.LOW_PRI);
                    producerThread.setMessageCount(1);
                    producerThread.run();
                    i++;
                    LOG.info("Low low priority message sent, should not be able to receive immediately");
                }
                Message receive2 = createDurableSubscriber.receive(15000L);
                assertNotNull("Message was null", receive2);
                LOG.info("received hi? : " + receive2);
                assertEquals("high priority", this.HIGH_PRI, receive2.getJMSPriority());
                if ((i2 % 250) * 2 == 0) {
                    Message receive3 = createDurableSubscriber.receive(15000L);
                    assertNotNull("Message was null", receive3);
                    LOG.info("received hi -1 ? i=" + i2 + ", " + receive3);
                    assertEquals("high priority", this.HIGH_PRI - 1, receive3.getJMSPriority());
                }
            }
        }
        for (int i3 = 0; i3 < i; i3++) {
            Message receive4 = createDurableSubscriber.receive(15000L);
            LOG.debug("received i=" + i3 + ", " + (receive4 != null ? receive4.getJMSMessageID() : null));
            assertNotNull("Message " + i3 + " was null", receive4);
            assertEquals("Message " + i3 + " has wrong priority", this.LOW_PRI, receive4.getJMSPriority());
        }
    }

    public void initCombosForTestHighPriorityDeliveryInterleaved() {
        addCombinationValues("useCache", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testHighPriorityDeliveryInterleaved() throws Exception {
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
        activeMQPrefetchPolicy.setAll(0);
        this.factory.setPrefetchPolicy(activeMQPrefetchPolicy);
        this.conn.close();
        this.conn = this.factory.createConnection();
        this.conn.setClientID("priority");
        this.conn.start();
        this.sess = this.conn.createSession(false, 1);
        ActiveMQTopic createTopic = this.sess.createTopic("TEST");
        this.sess.createDurableSubscriber(createTopic, "priorityDisconnect").close();
        ProducerThread producerThread = new ProducerThread(createTopic, 1, this.HIGH_PRI);
        producerThread.run();
        producerThread.setMessagePriority(this.HIGH_PRI - 1);
        producerThread.setMessageCount(1);
        producerThread.run();
        producerThread.setMessagePriority(this.LOW_PRI);
        producerThread.setMessageCount(1);
        producerThread.run();
        LOG.info("Ordered priority messages sent");
        TopicSubscriber createDurableSubscriber = this.sess.createDurableSubscriber(createTopic, "priorityDisconnect");
        Message receive = createDurableSubscriber.receive(15000L);
        assertNotNull("Message was null", receive);
        LOG.info("received " + receive.getJMSMessageID() + ", priority:" + receive.getJMSPriority());
        assertEquals("Message has wrong priority", this.HIGH_PRI, receive.getJMSPriority());
        producerThread.setMessagePriority(this.LOW_PRI + 1);
        producerThread.setMessageCount(1);
        producerThread.run();
        Message receive2 = createDurableSubscriber.receive(15000L);
        assertNotNull("Message was null", receive2);
        LOG.info("received " + receive2.getJMSMessageID() + ", priority:" + receive2.getJMSPriority());
        assertEquals("high priority", this.HIGH_PRI - 1, receive2.getJMSPriority());
        Message receive3 = createDurableSubscriber.receive(15000L);
        assertNotNull("Message was null", receive3);
        LOG.info("received hi? : " + receive3);
        assertEquals("high priority", this.LOW_PRI + 1, receive3.getJMSPriority());
        Message receive4 = createDurableSubscriber.receive(15000L);
        assertNotNull("Message was null", receive4);
        LOG.info("received hi? : " + receive4);
        assertEquals("high priority", this.LOW_PRI, receive4.getJMSPriority());
        assertNull("Message was null", createDurableSubscriber.receive(4000L));
    }

    public void initCombosForTestHighPriorityDeliveryThroughBackLog() {
        addCombinationValues("useCache", new Object[]{Boolean.FALSE});
        addCombinationValues("immediatePriorityDispatch", new Object[]{Boolean.TRUE});
    }

    public void testHighPriorityDeliveryThroughBackLog() throws Exception {
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
        activeMQPrefetchPolicy.setAll(0);
        this.factory.setPrefetchPolicy(activeMQPrefetchPolicy);
        this.conn.close();
        this.conn = this.factory.createConnection();
        this.conn.setClientID("priority");
        this.conn.start();
        this.sess = this.conn.createSession(false, 1);
        ActiveMQTopic createTopic = this.sess.createTopic("TEST");
        this.sess.createDurableSubscriber(createTopic, "priorityDisconnect").close();
        ProducerThread producerThread = new ProducerThread(createTopic, DurableSubProcessConcurrentCommitActivateNoDuplicateTest.CARGO_SIZE, this.LOW_PRI);
        producerThread.run();
        TopicSubscriber createDurableSubscriber = this.sess.createDurableSubscriber(createTopic, "priorityDisconnect");
        int i = 0;
        while (i < 300) {
            Message receive = createDurableSubscriber.receive(15000L);
            assertNotNull("Message was null", receive);
            assertEquals("high priority", this.LOW_PRI, receive.getJMSPriority());
            i++;
        }
        producerThread.setMessagePriority(this.HIGH_PRI);
        producerThread.setMessageCount(1);
        producerThread.run();
        Message receive2 = createDurableSubscriber.receive(15000L);
        assertNotNull("Message was null", receive2);
        assertEquals("high priority", this.HIGH_PRI, receive2.getJMSPriority());
        while (i < 600) {
            Message receive3 = createDurableSubscriber.receive(15000L);
            assertNotNull("Message was null", receive3);
            assertEquals("high priority", this.LOW_PRI, receive3.getJMSPriority());
            i++;
        }
    }

    public void initCombosForTestHighPriorityNonDeliveryThroughBackLog() {
        addCombinationValues("useCache", new Object[]{Boolean.FALSE});
        addCombinationValues("immediatePriorityDispatch", new Object[]{Boolean.FALSE});
    }

    public void testHighPriorityNonDeliveryThroughBackLog() throws Exception {
        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
        activeMQPrefetchPolicy.setAll(0);
        this.factory.setPrefetchPolicy(activeMQPrefetchPolicy);
        this.conn.close();
        this.conn = this.factory.createConnection();
        this.conn.setClientID("priority");
        this.conn.start();
        this.sess = this.conn.createSession(false, 1);
        ActiveMQTopic createTopic = this.sess.createTopic("TEST");
        this.sess.createDurableSubscriber(createTopic, "priorityDisconnect").close();
        ProducerThread producerThread = new ProducerThread(createTopic, DurableSubProcessConcurrentCommitActivateNoDuplicateTest.CARGO_SIZE, this.LOW_PRI);
        producerThread.run();
        TopicSubscriber createDurableSubscriber = this.sess.createDurableSubscriber(createTopic, "priorityDisconnect");
        int i = 0;
        while (i < 300) {
            Message receive = createDurableSubscriber.receive(15000L);
            assertNotNull("Message was null", receive);
            assertEquals("high priority", this.LOW_PRI, receive.getJMSPriority());
            i++;
        }
        producerThread.setMessagePriority(this.HIGH_PRI);
        producerThread.setMessageCount(1);
        producerThread.run();
        while (i < 400) {
            Message receive2 = createDurableSubscriber.receive(15000L);
            assertNotNull("Message was null", receive2);
            assertEquals("high priority", this.LOW_PRI, receive2.getJMSPriority());
            i++;
        }
        Message receive3 = createDurableSubscriber.receive(15000L);
        assertNotNull("Message was null", receive3);
        assertEquals("high priority", this.HIGH_PRI, receive3.getJMSPriority());
        while (i < 600) {
            Message receive4 = createDurableSubscriber.receive(15000L);
            assertNotNull("Message was null", receive4);
            assertEquals("high priority", this.LOW_PRI, receive4.getJMSPriority());
            i++;
        }
    }

    public void initCombosForTestQueueBacklog() {
        addCombinationValues("useCache", new Object[]{new Boolean(false)});
        addCombinationValues("expireMessagePeriod", new Object[]{new Integer(0)});
    }

    public void testQueueBacklog() throws Exception {
        ActiveMQQueue createQueue = this.sess.createQueue("TEST");
        ProducerThread producerThread = new ProducerThread(createQueue, 180000, this.LOW_PRI);
        ProducerThread producerThread2 = new ProducerThread(createQueue, 10, this.HIGH_PRI);
        producerThread.start();
        producerThread.join();
        producerThread2.start();
        producerThread2.join();
        LOG.info("Starting consumer...");
        MessageConsumer createConsumer = this.sess.createConsumer(createQueue);
        int i = 0;
        while (i < 500) {
            Message receive = createConsumer.receive(20000L);
            LOG.debug("received i=" + i + ", " + (receive != null ? receive.getJMSMessageID() : null));
            if (receive == null) {
                dumpAllThreads("backlog");
            }
            assertNotNull("Message " + i + " was null", receive);
            assertEquals("Message " + i + " has wrong priority", i < 10 ? this.HIGH_PRI : this.LOW_PRI, receive.getJMSPriority());
            i++;
        }
    }
}
