/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.scheduler;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.broker.scheduler.JobSchedulerTestSupport;
import org.apache.activemq.util.IdGenerator;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobSchedulerManagementTest
extends JobSchedulerTestSupport {
    private static final transient Logger LOG = LoggerFactory.getLogger(JobSchedulerManagementTest.class);

    @Test
    public void testRemoveAllScheduled() throws Exception {
        int COUNT = 5;
        Connection connection = this.createConnection();
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(6L), 5);
        Session session = connection.createSession(false, 1);
        Topic management = session.createTopic("ActiveMQ.Scheduler.Management");
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        final CountDownLatch latch = new CountDownLatch(5);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                latch.countDown();
            }
        });
        connection.start();
        MessageProducer producer = session.createProducer((Destination)management);
        Message request = session.createMessage();
        request.setStringProperty("AMQ_SCHEDULER_ACTION", "REMOVEALL");
        producer.send(request);
        latch.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((long)latch.getCount(), (long)5L);
    }

    @Test
    public void testRemoveAllScheduledAtTime() throws Exception {
        int COUNT = 3;
        Connection connection = this.createConnection();
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(6L));
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(15L));
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(20L));
        Session session = connection.createSession(false, 1);
        Topic management = session.createTopic("ActiveMQ.Scheduler.Management");
        TemporaryQueue browseDest = session.createTemporaryQueue();
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        final CountDownLatch latch = new CountDownLatch(3);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                latch.countDown();
            }
        });
        MessageConsumer browser = session.createConsumer((Destination)browseDest);
        final CountDownLatch browsedLatch = new CountDownLatch(3);
        browser.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                browsedLatch.countDown();
                LOG.debug("Scheduled Message Browser got Message: " + message);
            }
        });
        connection.start();
        long start = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10L);
        long end = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30L);
        MessageProducer producer = session.createProducer((Destination)management);
        Message request = session.createMessage();
        request.setStringProperty("AMQ_SCHEDULER_ACTION", "REMOVEALL");
        request.setStringProperty("ACTION_START_TIME", Long.toString(start));
        request.setStringProperty("ACTION_END_TIME", Long.toString(end));
        producer.send(request);
        request = session.createMessage();
        request.setStringProperty("AMQ_SCHEDULER_ACTION", "BROWSE");
        request.setJMSReplyTo((Destination)browseDest);
        producer.send(request);
        latch.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((long)2L, (long)browsedLatch.getCount());
        latch.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((long)2L, (long)latch.getCount());
    }

    @Test
    public void testBrowseAllScheduled() throws Exception {
        int COUNT = 10;
        Connection connection = this.createConnection();
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(9L), 10);
        Session session = connection.createSession(false, 1);
        Topic requestBrowse = session.createTopic("ActiveMQ.Scheduler.Management");
        TemporaryQueue browseDest = session.createTemporaryQueue();
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        final CountDownLatch latch = new CountDownLatch(10);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                latch.countDown();
            }
        });
        MessageConsumer browser = session.createConsumer((Destination)browseDest);
        final CountDownLatch browsedLatch = new CountDownLatch(10);
        browser.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                browsedLatch.countDown();
                LOG.debug("Scheduled Message Browser got Message: " + message);
            }
        });
        connection.start();
        MessageProducer producer = session.createProducer((Destination)requestBrowse);
        Message request = session.createMessage();
        request.setStringProperty("AMQ_SCHEDULER_ACTION", "BROWSE");
        request.setJMSReplyTo((Destination)browseDest);
        producer.send(request);
        Thread.sleep(2000L);
        Assert.assertEquals((long)latch.getCount(), (long)10L);
        latch.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((long)browsedLatch.getCount(), (long)0L);
        latch.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals((long)latch.getCount(), (long)0L);
    }

    @Test
    public void testBrowseWindowlScheduled() throws Exception {
        int COUNT = 10;
        Connection connection = this.createConnection();
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(5L));
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(10L), 10);
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(20L));
        Session session = connection.createSession(false, 1);
        Topic requestBrowse = session.createTopic("ActiveMQ.Scheduler.Management");
        TemporaryQueue browseDest = session.createTemporaryQueue();
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        final CountDownLatch latch = new CountDownLatch(12);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                latch.countDown();
            }
        });
        MessageConsumer browser = session.createConsumer((Destination)browseDest);
        final CountDownLatch browsedLatch = new CountDownLatch(10);
        browser.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                browsedLatch.countDown();
                LOG.debug("Scheduled Message Browser got Message: " + message);
            }
        });
        connection.start();
        long start = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(6L);
        long end = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(15L);
        MessageProducer producer = session.createProducer((Destination)requestBrowse);
        Message request = session.createMessage();
        request.setStringProperty("AMQ_SCHEDULER_ACTION", "BROWSE");
        request.setStringProperty("ACTION_START_TIME", Long.toString(start));
        request.setStringProperty("ACTION_END_TIME", Long.toString(end));
        request.setJMSReplyTo((Destination)browseDest);
        producer.send(request);
        Thread.sleep(2000L);
        Assert.assertEquals((long)12L, (long)latch.getCount());
        latch.await(15L, TimeUnit.SECONDS);
        Assert.assertEquals((long)0L, (long)browsedLatch.getCount());
        latch.await(20L, TimeUnit.SECONDS);
        Assert.assertEquals((long)0L, (long)latch.getCount());
    }

    @Test
    public void testRemoveScheduled() throws Exception {
        int COUNT = 10;
        Connection connection = this.createConnection();
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(9L), 10);
        Session session = connection.createSession(false, 1);
        Topic management = session.createTopic("ActiveMQ.Scheduler.Management");
        TemporaryQueue browseDest = session.createTemporaryQueue();
        MessageConsumer consumer = session.createConsumer((Destination)this.destination);
        MessageProducer producer = session.createProducer((Destination)management);
        final CountDownLatch latch = new CountDownLatch(10);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                latch.countDown();
            }
        });
        Session browseSession = connection.createSession(false, 1);
        MessageConsumer browser = browseSession.createConsumer((Destination)browseDest);
        connection.start();
        Message request = session.createMessage();
        request.setStringProperty("AMQ_SCHEDULER_ACTION", "BROWSE");
        request.setJMSReplyTo((Destination)browseDest);
        producer.send(request);
        for (int i = 0; i < 10; ++i) {
            Message message = browser.receive(2000L);
            Assert.assertNotNull((Object)message);
            try {
                Message remove = session.createMessage();
                remove.setStringProperty("AMQ_SCHEDULER_ACTION", "REMOVE");
                remove.setStringProperty("scheduledJobId", message.getStringProperty("scheduledJobId"));
                producer.send(remove);
                continue;
            }
            catch (Exception e) {
                // empty catch block
            }
        }
        latch.await(11L, TimeUnit.SECONDS);
        Assert.assertEquals((long)10L, (long)latch.getCount());
    }

    @Test
    public void testRemoveNotScheduled() throws Exception {
        Connection connection = this.createConnection();
        Session session = connection.createSession(false, 1);
        Topic management = session.createTopic("ActiveMQ.Scheduler.Management");
        MessageProducer producer = session.createProducer((Destination)management);
        try {
            Message remove = session.createMessage();
            remove.setStringProperty("AMQ_SCHEDULER_ACTION", "REMOVEALL");
            remove.setStringProperty("scheduledJobId", new IdGenerator().generateId());
            producer.send(remove);
        }
        catch (Exception e) {
            Assert.fail((String)"Caught unexpected exception during remove of unscheduled message.");
        }
    }

    @Test
    public void testBrowseWithSelector() throws Exception {
        Connection connection = this.createConnection();
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(9L));
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(10L));
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(5L));
        this.scheduleMessage(connection, TimeUnit.SECONDS.toMillis(45L));
        Session session = connection.createSession(false, 1);
        Topic requestBrowse = session.createTopic("ActiveMQ.Scheduler.Management");
        TemporaryTopic browseDest = session.createTemporaryTopic();
        MessageConsumer browser = session.createConsumer((Destination)browseDest, "AMQ_SCHEDULED_DELAY = 45000");
        connection.start();
        MessageProducer producer = session.createProducer((Destination)requestBrowse);
        Message request = session.createMessage();
        request.setStringProperty("AMQ_SCHEDULER_ACTION", "BROWSE");
        request.setJMSReplyTo((Destination)browseDest);
        producer.send(request);
        Message message = browser.receive(5000L);
        Assert.assertNotNull((Object)message);
        Assert.assertEquals((long)45000L, (long)message.getLongProperty("AMQ_SCHEDULED_DELAY"));
        message = browser.receive(5000L);
        Assert.assertNull((Object)message);
    }

    protected void scheduleMessage(Connection connection, long delay) throws Exception {
        this.scheduleMessage(connection, delay, 1);
    }

    protected void scheduleMessage(Connection connection, long delay, int count) throws Exception {
        Session session = connection.createSession(false, 1);
        MessageProducer producer = session.createProducer((Destination)this.destination);
        TextMessage message = session.createTextMessage("test msg");
        message.setLongProperty("AMQ_SCHEDULED_DELAY", delay);
        for (int i = 0; i < count; ++i) {
            producer.send((Message)message);
        }
        producer.close();
    }
}

