package org.apache.activemq.usecases;

import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Assert;

/* loaded from: input_file:org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.class */
public class TopicSubscriptionSlowConsumerTest extends TestCase {
    private static final String TOPIC_NAME = "slow.consumer";
    Connection connection;
    private Session session;
    private ActiveMQTopic destination;
    private MessageProducer producer;
    private MessageConsumer consumer;
    private BrokerService brokerService;

    public void setUp() throws Exception {
        this.brokerService = createBroker();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
        activeMQConnectionFactory.setWatchTopicAdvisories(true);
        this.connection = activeMQConnectionFactory.createConnection();
        this.session = this.connection.createSession(false, 1);
        this.destination = new ActiveMQTopic(TOPIC_NAME);
        this.producer = this.session.createProducer(this.destination);
        this.connection.start();
    }

    public void testPrefetchValueOne() throws Exception {
        this.consumer = this.session.createConsumer(new ActiveMQTopic("slow.consumer?consumer.prefetchSize=1"));
        MessageConsumer createConsumer = this.session.createConsumer(AdvisorySupport.getSlowConsumerAdvisoryTopic(this.destination));
        TextMessage createTextMessage = this.session.createTextMessage("Sample Text Message");
        for (int i = 0; i < 2; i++) {
            this.producer.send(createTextMessage);
        }
        for (int i2 = 0; i2 < 2; i2++) {
            Assert.assertNotNull("received msg " + i2 + " should not be null", this.consumer.receive(100L));
        }
        Assert.assertNull("should not have received a slow consumer advisory message", createConsumer.receive(100L));
    }

    public void tearDown() throws Exception {
        this.consumer.close();
        this.producer.close();
        this.session.close();
        this.connection.close();
        this.brokerService.stop();
    }

    private BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName("localhost");
        brokerService.setUseJmx(true);
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.addConnector("vm://localhost");
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setAdvisoryForSlowConsumers(true);
        policyMap.setDefaultEntry(policyEntry);
        brokerService.setDestinationPolicy(policyMap);
        brokerService.start();
        brokerService.waitUntilStarted();
        return brokerService;
    }
}
