package org.apache.activemq.broker.scheduler;

import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ProducerThread;
import org.apache.activemq.util.Wait;

/* loaded from: input_file:org/apache/activemq/broker/scheduler/JmsSchedulerTest.class */
public class JmsSchedulerTest extends EmbeddedBrokerTestSupport {
    public void testCron() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger();
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.scheduler.JmsSchedulerTest.1
            public void onMessage(Message message) {
                countDownLatch.countDown();
                atomicInteger.incrementAndGet();
            }
        });
        createConnection.start();
        MessageProducer createProducer = createSession.createProducer(this.destination);
        TextMessage createTextMessage = createSession.createTextMessage("test msg");
        createTextMessage.setStringProperty("AMQ_SCHEDULED_CRON", "* * * * *");
        createTextMessage.setLongProperty("AMQ_SCHEDULED_DELAY", 1000L);
        createTextMessage.setLongProperty("AMQ_SCHEDULED_PERIOD", 500L);
        createTextMessage.setIntProperty("AMQ_SCHEDULED_REPEAT", 9);
        createProducer.send(createTextMessage);
        createProducer.close();
        Thread.sleep(500L);
        assertEquals(1, this.broker.getBroker().getAdaptor(SchedulerBroker.class).getJobScheduler().getAllJobs().size());
        countDownLatch.await(240L, TimeUnit.SECONDS);
        assertEquals(10, atomicInteger.get());
    }

    public void testSchedule() throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.scheduler.JmsSchedulerTest.2
            public void onMessage(Message message) {
                countDownLatch.countDown();
            }
        });
        createConnection.start();
        MessageProducer createProducer = createSession.createProducer(this.destination);
        TextMessage createTextMessage = createSession.createTextMessage("test msg");
        createTextMessage.setLongProperty("AMQ_SCHEDULED_DELAY", 5000L);
        createProducer.send(createTextMessage);
        createProducer.close();
        Thread.sleep(2000L);
        assertEquals(countDownLatch.getCount(), 1L);
        countDownLatch.await(5L, TimeUnit.SECONDS);
        assertEquals(countDownLatch.getCount(), 0L);
    }

    public void testTransactedSchedule() throws Exception {
        Connection createConnection = createConnection();
        final Session createSession = createConnection.createSession(true, 0);
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.scheduler.JmsSchedulerTest.3
            public void onMessage(Message message) {
                try {
                    createSession.commit();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            }
        });
        createConnection.start();
        MessageProducer createProducer = createSession.createProducer(this.destination);
        TextMessage createTextMessage = createSession.createTextMessage("test msg");
        createTextMessage.setLongProperty("AMQ_SCHEDULED_DELAY", 5000L);
        createProducer.send(createTextMessage);
        createSession.commit();
        createProducer.close();
        Thread.sleep(2000L);
        assertEquals(countDownLatch.getCount(), 1L);
        countDownLatch.await(5L, TimeUnit.SECONDS);
        assertEquals(countDownLatch.getCount(), 0L);
    }

    public void testScheduleRepeated() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger();
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.scheduler.JmsSchedulerTest.4
            public void onMessage(Message message) {
                countDownLatch.countDown();
                atomicInteger.incrementAndGet();
            }
        });
        createConnection.start();
        MessageProducer createProducer = createSession.createProducer(this.destination);
        TextMessage createTextMessage = createSession.createTextMessage("test msg");
        createTextMessage.setLongProperty("AMQ_SCHEDULED_DELAY", 1000L);
        createTextMessage.setLongProperty("AMQ_SCHEDULED_PERIOD", 500L);
        createTextMessage.setIntProperty("AMQ_SCHEDULED_REPEAT", 9);
        createProducer.send(createTextMessage);
        createProducer.close();
        assertEquals(countDownLatch.getCount(), 10L);
        countDownLatch.await(10L, TimeUnit.SECONDS);
        assertEquals(0L, countDownLatch.getCount());
        Thread.sleep(1000L);
        assertEquals(10, atomicInteger.get());
    }

    public void testScheduleRestart() throws Exception {
        Connection createConnection = createConnection();
        Session createSession = createConnection.createSession(false, 1);
        createConnection.start();
        MessageProducer createProducer = createSession.createProducer(this.destination);
        TextMessage createTextMessage = createSession.createTextMessage("test msg");
        createTextMessage.setLongProperty("AMQ_SCHEDULED_DELAY", 5000L);
        createProducer.send(createTextMessage);
        createProducer.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
        this.broker = createBroker(false);
        this.broker.start();
        this.broker.waitUntilStarted();
        Connection createConnection2 = createConnection();
        createConnection2.start();
        Session createSession2 = createConnection2.createSession(false, 1);
        assertNotNull("Didn't receive the message", createSession2.createConsumer(this.destination).receive(5000L));
        MessageProducer createProducer2 = createSession2.createProducer(this.destination);
        createTextMessage.setLongProperty("AMQ_SCHEDULED_DELAY", 5000L);
        createProducer2.send(createTextMessage);
        createProducer2.close();
    }

    public void testJobSchedulerStoreUsage() throws Exception {
        this.broker.getSystemUsage().getJobSchedulerUsage().setLimit(10240L);
        Connection createConnection = new ActiveMQConnectionFactory("vm://localhost").createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        final ProducerThread producerThread = new ProducerThread(createSession, this.destination) { // from class: org.apache.activemq.broker.scheduler.JmsSchedulerTest.5
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.activemq.util.ProducerThread
            public Message createMessage(int i) throws Exception {
                Message createMessage = super.createMessage(i);
                createMessage.setLongProperty("AMQ_SCHEDULED_DELAY", 5000L);
                return createMessage;
            }
        };
        producerThread.setMessageCount(100);
        producerThread.start();
        MessageConsumer createConsumer = createSession.createConsumer(this.destination);
        final CountDownLatch countDownLatch = new CountDownLatch(100);
        createConsumer.setMessageListener(new MessageListener() { // from class: org.apache.activemq.broker.scheduler.JmsSchedulerTest.6
            public void onMessage(Message message) {
                countDownLatch.countDown();
            }
        });
        Thread.sleep(DurableSubProcessWithRestartTest.BROKER_RESTART);
        assertEquals(100L, countDownLatch.getCount());
        this.broker.getSystemUsage().getJobSchedulerUsage().setLimit(34603008L);
        Thread.sleep(DurableSubProcessWithRestartTest.BROKER_RESTART);
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.broker.scheduler.JmsSchedulerTest.7
            public boolean isSatisified() throws Exception {
                return producerThread.getSentCount() == producerThread.getMessageCount();
            }
        }, 20000L);
        assertEquals("Producer didn't send all messages", producerThread.getMessageCount(), producerThread.getSentCount());
        countDownLatch.await(20000L, TimeUnit.MILLISECONDS);
        assertEquals("Consumer did not receive all messages.", 0L, countDownLatch.getCount());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.EmbeddedBrokerTestSupport
    public void setUp() throws Exception {
        this.bindAddress = "vm://localhost";
        super.setUp();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.EmbeddedBrokerTestSupport
    public BrokerService createBroker() throws Exception {
        return createBroker(true);
    }

    protected BrokerService createBroker(boolean z) throws Exception {
        File file = new File("target/scheduler");
        if (z) {
            IOHelper.mkdirs(file);
            IOHelper.deleteChildren(file);
        }
        BrokerService brokerService = new BrokerService();
        brokerService.setPersistent(true);
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.setDataDirectory("target");
        brokerService.setSchedulerDirectoryFile(file);
        brokerService.setSchedulerSupport(true);
        brokerService.setUseJmx(false);
        brokerService.addConnector(this.bindAddress);
        return brokerService;
    }
}
