/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.scheduler;

import java.io.File;
import java.io.IOException;
import java.security.ProtectionDomain;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
import org.apache.activemq.util.IOHelper;
import org.junit.After;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchedulerDBVersionTest {
    static String basedir;
    static final Logger LOG;
    static final File VERSION_LEGACY_JMS;
    BrokerService broker = null;

    protected BrokerService createBroker(JobSchedulerStoreImpl scheduler) throws Exception {
        BrokerService answer = new BrokerService();
        answer.setJobSchedulerStore((JobSchedulerStore)scheduler);
        answer.setPersistent(true);
        answer.setDataDirectory("target");
        answer.setSchedulerSupport(true);
        answer.setUseJmx(false);
        return answer;
    }

    @After
    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    @Ignore(value="Used only when a new version of the store needs to archive it's test data.")
    @Test
    public void testCreateStore() throws Exception {
        JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl();
        File dir = new File("src/test/resources/org/apache/activemq/store/schedulerDB/legacy");
        IOHelper.deleteFile((File)dir);
        scheduler.setDirectory(dir);
        scheduler.setJournalMaxFileLength(0x100000);
        this.broker = this.createBroker(scheduler);
        this.broker.start();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
        Connection connection = cf.createConnection();
        connection.start();
        this.scheduleRepeating(connection);
        connection.close();
        this.broker.stop();
    }

    private void scheduleRepeating(Connection connection) throws Exception {
        Session session = connection.createSession(false, 1);
        Queue queue = session.createQueue("test.queue");
        MessageProducer producer = session.createProducer((Destination)queue);
        TextMessage message = session.createTextMessage("test msg");
        long time = 1000L;
        message.setLongProperty("AMQ_SCHEDULED_DELAY", time);
        message.setLongProperty("AMQ_SCHEDULED_PERIOD", 500L);
        message.setIntProperty("AMQ_SCHEDULED_REPEAT", -1);
        producer.send((Message)message);
        producer.close();
    }

    @Test
    public void testLegacyStoreConversion() throws Exception {
        this.doTestScheduleRepeated(VERSION_LEGACY_JMS);
    }

    public void doTestScheduleRepeated(File existingStore) throws Exception {
        File testDir = new File("target/activemq-data/store/scheduler/versionDB");
        IOHelper.deleteFile((File)testDir);
        IOHelper.copyFile((File)existingStore, (File)testDir);
        int NUMBER = 10;
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
        for (int i = 0; i < 3; ++i) {
            JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl();
            scheduler.setDirectory(testDir);
            scheduler.setJournalMaxFileLength(0x100000);
            BrokerService broker = this.createBroker(scheduler);
            broker.start();
            broker.waitUntilStarted();
            final AtomicInteger count = new AtomicInteger();
            Connection connection = cf.createConnection();
            Session session = connection.createSession(false, 1);
            Queue queue = session.createQueue("test.queue");
            MessageConsumer consumer = session.createConsumer((Destination)queue);
            final CountDownLatch latch = new CountDownLatch(10);
            consumer.setMessageListener(new MessageListener(){

                public void onMessage(Message message) {
                    LOG.info("Received scheduled message: {}", (Object)message);
                    latch.countDown();
                    count.incrementAndGet();
                }
            });
            connection.start();
            Assert.assertEquals((long)latch.getCount(), (long)10L);
            latch.await(30L, TimeUnit.SECONDS);
            connection.close();
            broker.stop();
            broker.waitUntilStopped();
            Assert.assertEquals((long)0L, (long)latch.getCount());
        }
    }

    static {
        try {
            ProtectionDomain protectionDomain = SchedulerDBVersionTest.class.getProtectionDomain();
            basedir = new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalPath();
        }
        catch (IOException e) {
            basedir = ".";
        }
        LOG = LoggerFactory.getLogger(SchedulerDBVersionTest.class);
        VERSION_LEGACY_JMS = new File(basedir + "/src/test/resources/org/apache/activemq/store/schedulerDB/legacy");
    }
}

