package org.apache.activemq.usecases;

import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.commons.lang.RandomStringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/usecases/DurableSubscriptionHangTestCase.class */
public class DurableSubscriptionHangTestCase {
    private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionHangTestCase.class);
    static final String brokerName = "DurableSubscriptionHangTestCase";
    static final String clientID = "myId";
    private static final String topicName = "myTopic";
    private static final String durableSubName = "mySub";
    BrokerService brokerService;

    @Before
    public void startBroker() throws Exception {
        this.brokerService = new BrokerService();
        this.brokerService.setDeleteAllMessagesOnStartup(true);
        this.brokerService.setBrokerName(brokerName);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setExpireMessagesPeriod(5000L);
        policyMap.setDefaultEntry(policyEntry);
        this.brokerService.setDestinationPolicy(policyMap);
        this.brokerService.start();
    }

    @After
    public void brokerStop() throws Exception {
        this.brokerService.stop();
    }

    @Test
    public void testHanging() throws Exception {
        registerDurableSubscription();
        produceExpiredAndOneNonExpiredMessages();
        TimeUnit.SECONDS.sleep(10L);
        Message collectMessagesFromDurableSubscriptionForOneMinute = collectMessagesFromDurableSubscriptionForOneMinute();
        LOG.info("got message:" + collectMessagesFromDurableSubscriptionForOneMinute);
        Assert.assertNotNull("Unable to read unexpired message", collectMessagesFromDurableSubscriptionForOneMinute);
    }

    private void produceExpiredAndOneNonExpiredMessages() throws JMSException {
        TopicConnection createTopicConnection = new ActiveMQConnectionFactory("vm://DurableSubscriptionHangTestCase").createTopicConnection();
        TopicSession createTopicSession = createTopicConnection.createTopicSession(false, 1);
        MessageProducer createProducer = createTopicSession.createProducer(createTopicSession.createTopic(topicName));
        createProducer.setTimeToLive(TimeUnit.SECONDS.toMillis(1L));
        for (int i = 0; i < 40000; i++) {
            sendRandomMessage(createTopicSession, createProducer);
        }
        createProducer.setTimeToLive(TimeUnit.DAYS.toMillis(1L));
        sendRandomMessage(createTopicSession, createProducer);
        createTopicConnection.close();
        LOG.info("produceExpiredAndOneNonExpiredMessages done");
    }

    private void registerDurableSubscription() throws JMSException {
        TopicConnection createTopicConnection = new ActiveMQConnectionFactory("vm://DurableSubscriptionHangTestCase").createTopicConnection();
        createTopicConnection.setClientID(clientID);
        TopicSession createTopicSession = createTopicConnection.createTopicSession(false, 1);
        TopicSubscriber createDurableSubscriber = createTopicSession.createDurableSubscriber(createTopicSession.createTopic(topicName), durableSubName);
        createTopicConnection.start();
        createDurableSubscriber.close();
        createTopicConnection.close();
        LOG.info("Durable Sub Registered");
    }

    private Message collectMessagesFromDurableSubscriptionForOneMinute() throws Exception {
        TopicConnection createTopicConnection = new ActiveMQConnectionFactory("vm://DurableSubscriptionHangTestCase").createTopicConnection();
        createTopicConnection.setClientID(clientID);
        TopicSession createTopicSession = createTopicConnection.createTopicSession(false, 1);
        Topic createTopic = createTopicSession.createTopic(topicName);
        createTopicConnection.start();
        TopicSubscriber createDurableSubscriber = createTopicSession.createDurableSubscriber(createTopic, durableSubName);
        LOG.info("About to receive messages");
        Message receive = createDurableSubscriber.receive(120000L);
        createDurableSubscriber.close();
        createTopicConnection.close();
        LOG.info("collectMessagesFromDurableSubscriptionForOneMinute done");
        return receive;
    }

    private void sendRandomMessage(TopicSession topicSession, MessageProducer messageProducer) throws JMSException {
        TextMessage createTextMessage = topicSession.createTextMessage();
        createTextMessage.setText(RandomStringUtils.random(500, "abcdefghijklmnopqrstuvwxyz"));
        messageProducer.send(createTextMessage);
    }
}
