/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.scheduler;

import java.io.File;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.IOHelper;
import org.apache.log4j.BasicConfigurator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class LostScheduledMessagesTest {
    private BrokerService broker;
    private static final File schedulerDirectory = new File("target/test/ScheduledDB");
    private static final File messageDirectory = new File("target/test/MessageDB");
    private static final String QUEUE_NAME = "test";

    @Before
    public void setup() throws Exception {
        IOHelper.mkdirs((File)schedulerDirectory);
        IOHelper.deleteChildren((File)schedulerDirectory);
        IOHelper.mkdirs((File)messageDirectory);
        IOHelper.deleteChildren((File)messageDirectory);
    }

    private void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setSchedulerSupport(true);
        this.broker.setPersistent(true);
        this.broker.setDeleteAllMessagesOnStartup(false);
        this.broker.setDataDirectory("target");
        this.broker.setSchedulerDirectoryFile(schedulerDirectory);
        this.broker.setDataDirectoryFile(messageDirectory);
        this.broker.setUseJmx(false);
        this.broker.addConnector("vm://localhost");
        this.broker.start();
    }

    @After
    public void tearDown() throws Exception {
        this.broker.stop();
        BasicConfigurator.resetConfiguration();
    }

    @Test
    public void MessagePassedNotUsingScheduling() throws Exception {
        this.doTest(false);
    }

    @Test
    public void MessageLostWhenUsingScheduling() throws Exception {
        this.doTest(true);
    }

    private void doTest(boolean useScheduling) throws Exception {
        int DELIVERY_DELAY_MS = 5000;
        this.startBroker();
        long startTime = System.currentTimeMillis();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
        Connection connection = cf.createConnection();
        connection.start();
        Session session = connection.createSession(false, 1);
        MessageProducer producer = session.createProducer((Destination)session.createQueue(QUEUE_NAME));
        TextMessage message = session.createTextMessage(QUEUE_NAME);
        if (useScheduling) {
            message.setLongProperty("AMQ_SCHEDULED_DELAY", (long)DELIVERY_DELAY_MS);
        }
        producer.send((Message)message);
        session.close();
        connection.close();
        this.broker.getServices();
        this.broker.stop();
        this.broker.waitUntilStopped();
        long shutdownTime = System.currentTimeMillis();
        Assert.assertTrue((String)"Failed to shut down broker in expected time. Test results inconclusive", (shutdownTime - startTime < (long)DELIVERY_DELAY_MS ? 1 : 0) != 0);
        TimeUnit.MILLISECONDS.sleep(DELIVERY_DELAY_MS);
        this.startBroker();
        final AtomicLong receiveCounter = new AtomicLong();
        cf = new ActiveMQConnectionFactory("vm://localhost");
        connection = cf.createConnection();
        connection.start();
        session = connection.createSession(false, 1);
        MessageConsumer consumer = session.createConsumer((Destination)session.createQueue(QUEUE_NAME));
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                receiveCounter.incrementAndGet();
            }
        });
        TimeUnit.MILLISECONDS.sleep(DELIVERY_DELAY_MS * 2);
        session.close();
        connection.close();
        Assert.assertEquals((long)1L, (long)receiveCounter.get());
    }
}

