package org.apache.activemq.broker.scheduler;

import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.IdGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.class */
public class JobSchedulerManagementTest extends EmbeddedBrokerTestSupport {
    private static final transient Log LOG = LogFactory.getLog(JobSchedulerManagementTest.class);

    public void testRemoveAllScheduled() throws Exception {
        Connection createConnection = createConnection();
        scheduleMessage(createConnection, TimeUnit.SECONDS.toMillis(6L), 5);
        Session createSession = createConnection.createSession(false, 1);
        Topic createTopic = createSession.createTopic("ActiveMQ.Scheduler.Management");
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        final CountDownLatch countDownLatch = new CountDownLatch(5);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.scheduler.JobSchedulerManagementTest.1
            public void onMessage(Message message) {
                countDownLatch.countDown();
            }
        });
        createConnection.start();
        MessageProducer createProducer = createSession.createProducer(createTopic);
        Message createMessage = createSession.createMessage();
        createMessage.setStringProperty("AMQ_SCHEDULER_ACTION", "REMOVEALL");
        createProducer.send(createMessage);
        countDownLatch.await(10L, TimeUnit.SECONDS);
        assertEquals(countDownLatch.getCount(), 5L);
    }

    public void testRemoveAllScheduledAtTime() throws Exception {
        Connection createConnection = createConnection();
        scheduleMessage(createConnection, TimeUnit.SECONDS.toMillis(6L));
        scheduleMessage(createConnection, TimeUnit.SECONDS.toMillis(15L));
        scheduleMessage(createConnection, TimeUnit.SECONDS.toMillis(20L));
        Session createSession = createConnection.createSession(false, 1);
        Topic createTopic = createSession.createTopic("ActiveMQ.Scheduler.Management");
        TemporaryQueue createTemporaryQueue = createSession.createTemporaryQueue();
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.scheduler.JobSchedulerManagementTest.2
            public void onMessage(Message message) {
                countDownLatch.countDown();
            }
        });
        MessageConsumer createConsumer2 = createSession.createConsumer(createTemporaryQueue);
        final CountDownLatch countDownLatch2 = new CountDownLatch(3);
        createConsumer2.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.scheduler.JobSchedulerManagementTest.3
            public void onMessage(Message message) {
                countDownLatch2.countDown();
                JobSchedulerManagementTest.LOG.debug("Scheduled Message Browser got Message: " + message);
            }
        });
        createConnection.start();
        long currentTimeMillis = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
        long currentTimeMillis2 = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30L);
        MessageProducer createProducer = createSession.createProducer(createTopic);
        Message createMessage = createSession.createMessage();
        createMessage.setStringProperty("AMQ_SCHEDULER_ACTION", "REMOVEALL");
        createMessage.setStringProperty("ACTION_START_TIME", Long.toString(currentTimeMillis));
        createMessage.setStringProperty("ACTION_END_TIME", Long.toString(currentTimeMillis2));
        createProducer.send(createMessage);
        Message createMessage2 = createSession.createMessage();
        createMessage2.setStringProperty("AMQ_SCHEDULER_ACTION", "BROWSE");
        createMessage2.setJMSReplyTo(createTemporaryQueue);
        createProducer.send(createMessage2);
        countDownLatch.await(10L, TimeUnit.SECONDS);
        assertEquals(2L, countDownLatch2.getCount());
        countDownLatch.await(10L, TimeUnit.SECONDS);
        assertEquals(2L, countDownLatch.getCount());
    }

    public void testBrowseAllScheduled() throws Exception {
        Connection createConnection = createConnection();
        scheduleMessage(createConnection, TimeUnit.SECONDS.toMillis(9L), 10);
        Session createSession = createConnection.createSession(false, 1);
        Topic createTopic = createSession.createTopic("ActiveMQ.Scheduler.Management");
        TemporaryQueue createTemporaryQueue = createSession.createTemporaryQueue();
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.scheduler.JobSchedulerManagementTest.4
            public void onMessage(Message message) {
                countDownLatch.countDown();
            }
        });
        MessageConsumer createConsumer2 = createSession.createConsumer(createTemporaryQueue);
        final CountDownLatch countDownLatch2 = new CountDownLatch(10);
        createConsumer2.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.scheduler.JobSchedulerManagementTest.5
            public void onMessage(Message message) {
                countDownLatch2.countDown();
                JobSchedulerManagementTest.LOG.debug("Scheduled Message Browser got Message: " + message);
            }
        });
        createConnection.start();
        MessageProducer createProducer = createSession.createProducer(createTopic);
        Message createMessage = createSession.createMessage();
        createMessage.setStringProperty("AMQ_SCHEDULER_ACTION", "BROWSE");
        createMessage.setJMSReplyTo(createTemporaryQueue);
        createProducer.send(createMessage);
        Thread.sleep(2000L);
        assertEquals(countDownLatch.getCount(), 10L);
        countDownLatch.await(10L, TimeUnit.SECONDS);
        assertEquals(countDownLatch2.getCount(), 0L);
        countDownLatch.await(10L, TimeUnit.SECONDS);
        assertEquals(countDownLatch.getCount(), 0L);
    }

    public void testBrowseWindowlScheduled() throws Exception {
        Connection createConnection = createConnection();
        scheduleMessage(createConnection, TimeUnit.SECONDS.toMillis(5L));
        scheduleMessage(createConnection, TimeUnit.SECONDS.toMillis(10L), 10);
        scheduleMessage(createConnection, TimeUnit.SECONDS.toMillis(20L));
        Session createSession = createConnection.createSession(false, 1);
        Topic createTopic = createSession.createTopic("ActiveMQ.Scheduler.Management");
        TemporaryQueue createTemporaryQueue = createSession.createTemporaryQueue();
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        final CountDownLatch countDownLatch = new CountDownLatch(12);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.scheduler.JobSchedulerManagementTest.6
            public void onMessage(Message message) {
                countDownLatch.countDown();
            }
        });
        MessageConsumer createConsumer2 = createSession.createConsumer(createTemporaryQueue);
        final CountDownLatch countDownLatch2 = new CountDownLatch(10);
        createConsumer2.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.scheduler.JobSchedulerManagementTest.7
            public void onMessage(Message message) {
                countDownLatch2.countDown();
                JobSchedulerManagementTest.LOG.debug("Scheduled Message Browser got Message: " + message);
            }
        });
        createConnection.start();
        long currentTimeMillis = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(6L);
        long currentTimeMillis2 = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(15L);
        MessageProducer createProducer = createSession.createProducer(createTopic);
        Message createMessage = createSession.createMessage();
        createMessage.setStringProperty("AMQ_SCHEDULER_ACTION", "BROWSE");
        createMessage.setStringProperty("ACTION_START_TIME", Long.toString(currentTimeMillis));
        createMessage.setStringProperty("ACTION_END_TIME", Long.toString(currentTimeMillis2));
        createMessage.setJMSReplyTo(createTemporaryQueue);
        createProducer.send(createMessage);
        Thread.sleep(2000L);
        assertEquals(12L, countDownLatch.getCount());
        countDownLatch.await(15L, TimeUnit.SECONDS);
        assertEquals(0L, countDownLatch2.getCount());
        countDownLatch.await(20L, TimeUnit.SECONDS);
        assertEquals(0L, countDownLatch.getCount());
    }

    public void testRemoveScheduled() throws Exception {
        Connection createConnection = createConnection();
        scheduleMessage(createConnection, TimeUnit.SECONDS.toMillis(9L), 10);
        Session createSession = createConnection.createSession(false, 1);
        Topic createTopic = createSession.createTopic("ActiveMQ.Scheduler.Management");
        TemporaryQueue createTemporaryQueue = createSession.createTemporaryQueue();
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        MessageProducer createProducer = createSession.createProducer(createTopic);
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.scheduler.JobSchedulerManagementTest.8
            public void onMessage(Message message) {
                countDownLatch.countDown();
            }
        });
        MessageConsumer createConsumer2 = createConnection.createSession(false, 1).createConsumer(createTemporaryQueue);
        createConnection.start();
        Message createMessage = createSession.createMessage();
        createMessage.setStringProperty("AMQ_SCHEDULER_ACTION", "BROWSE");
        createMessage.setJMSReplyTo(createTemporaryQueue);
        createProducer.send(createMessage);
        for (int i = 0; i < 10; i++) {
            Message receive = createConsumer2.receive(2000L);
            assertNotNull(receive);
            try {
                Message createMessage2 = createSession.createMessage();
                createMessage2.setStringProperty("AMQ_SCHEDULER_ACTION", "REMOVE");
                createMessage2.setStringProperty("scheduledJobId", receive.getStringProperty("scheduledJobId"));
                createProducer.send(createMessage2);
            } catch (Exception e) {
            }
        }
        countDownLatch.await(11L, TimeUnit.SECONDS);
        assertEquals(10L, countDownLatch.getCount());
    }

    public void testRemoveNotScheduled() throws Exception {
        Session createSession = createConnection().createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createTopic("ActiveMQ.Scheduler.Management"));
        try {
            Message createMessage = createSession.createMessage();
            createMessage.setStringProperty("AMQ_SCHEDULER_ACTION", "REMOVEALL");
            createMessage.setStringProperty("scheduledJobId", new IdGenerator().generateId());
            createProducer.send(createMessage);
        } catch (Exception e) {
            fail("Caught unexpected exception during remove of unscheduled message.");
        }
    }

    public void testBrowseWithSelector() throws Exception {
        Connection createConnection = createConnection();
        scheduleMessage(createConnection, TimeUnit.SECONDS.toMillis(9L));
        scheduleMessage(createConnection, TimeUnit.SECONDS.toMillis(10L));
        scheduleMessage(createConnection, TimeUnit.SECONDS.toMillis(5L));
        scheduleMessage(createConnection, TimeUnit.SECONDS.toMillis(45L));
        Session createSession = createConnection.createSession(false, 1);
        Topic createTopic = createSession.createTopic("ActiveMQ.Scheduler.Management");
        TemporaryTopic createTemporaryTopic = createSession.createTemporaryTopic();
        MessageConsumer createConsumer = createSession.createConsumer(createTemporaryTopic, "AMQ_SCHEDULED_DELAY = 45000");
        createConnection.start();
        MessageProducer createProducer = createSession.createProducer(createTopic);
        Message createMessage = createSession.createMessage();
        createMessage.setStringProperty("AMQ_SCHEDULER_ACTION", "BROWSE");
        createMessage.setJMSReplyTo(createTemporaryTopic);
        createProducer.send(createMessage);
        Message receive = createConsumer.receive(5000L);
        assertNotNull(receive);
        assertEquals(45000L, receive.getLongProperty("AMQ_SCHEDULED_DELAY"));
        assertNull(createConsumer.receive(5000L));
    }

    protected void scheduleMessage(Connection connection, long j) throws Exception {
        scheduleMessage(connection, j, 1);
    }

    protected void scheduleMessage(Connection connection, long j, int i) throws Exception {
        Session createSession = connection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(this.destination);
        TextMessage createTextMessage = createSession.createTextMessage("test msg");
        createTextMessage.setLongProperty("AMQ_SCHEDULED_DELAY", j);
        for (int i2 = 0; i2 < i; i2++) {
            createProducer.send(createTextMessage);
        }
        createProducer.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.EmbeddedBrokerTestSupport, org.apache.activemq.AutoFailTestSupport
    public void setUp() throws Exception {
        this.bindAddress = "vm://localhost";
        super.setUp();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.EmbeddedBrokerTestSupport
    public BrokerService createBroker() throws Exception {
        return createBroker(true);
    }

    protected BrokerService createBroker(boolean z) throws Exception {
        File file = new File("target/scheduler");
        if (z) {
            IOHelper.mkdirs(file);
            IOHelper.deleteChildren(file);
        }
        BrokerService brokerService = new BrokerService();
        brokerService.setPersistent(isPersistent());
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.setDataDirectory("target");
        brokerService.setSchedulerDirectoryFile(file);
        brokerService.setSchedulerSupport(true);
        brokerService.setUseJmx(false);
        brokerService.addConnector(this.bindAddress);
        return brokerService;
    }
}
