package org.apache.activemq.usecases;

import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.class */
public class TopicSubscriptionZeroPrefetchTest {
    private static final String TOPIC_NAME = "slow.consumer";
    private Connection connection;
    private Session session;
    private ActiveMQTopic destination;
    private MessageProducer producer;
    private MessageConsumer consumer;
    private BrokerService brokerService;

    @Before
    public void setUp() throws Exception {
        this.brokerService = createBroker();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
        activeMQConnectionFactory.setWatchTopicAdvisories(true);
        this.connection = activeMQConnectionFactory.createConnection();
        this.connection.setClientID("ClientID-1");
        this.session = this.connection.createSession(false, 1);
        this.destination = new ActiveMQTopic(TOPIC_NAME);
        this.producer = this.session.createProducer(this.destination);
        this.connection.start();
    }

    @Test(timeout = 60000)
    public void testTopicConsumerPrefetchZero() throws Exception {
        this.consumer = this.session.createConsumer(new ActiveMQTopic("slow.consumer?consumer.retroactive=true&consumer.prefetchSize=0"));
        this.producer.send(this.session.createTextMessage("M"));
        Assert.assertNotNull("should have received a message the published message", this.consumer.receiveNoWait());
    }

    @Test(timeout = 60000)
    public void testTopicConsumerPrefetchZeroClientAckLoop() throws Exception {
        this.consumer = this.connection.createSession(false, 2).createConsumer(new ActiveMQTopic("slow.consumer?consumer.retroactive=true&consumer.prefetchSize=0"));
        for (int i = 0; i < 10; i++) {
            this.producer.send(this.session.createTextMessage("M:" + i));
        }
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertNotNull("should have received message[" + i2 + "]", this.consumer.receive(2000L));
        }
    }

    @Test(timeout = 60000)
    public void testDurableTopicConsumerPrefetchZero() throws Exception {
        this.consumer = this.session.createDurableSubscriber(new ActiveMQTopic("slow.consumer?consumer.prefetchSize=0"), "mysub1");
        this.producer.send(this.session.createTextMessage("M"));
        Assert.assertNotNull("should have received a message the published message", this.consumer.receive(100L));
    }

    @After
    public void tearDown() throws Exception {
        this.consumer.close();
        this.producer.close();
        this.session.close();
        this.connection.close();
        this.brokerService.stop();
    }

    private BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName("localhost");
        brokerService.setUseJmx(false);
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.addConnector("vm://localhost");
        brokerService.start();
        brokerService.waitUntilStarted();
        return brokerService;
    }
}
