/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.jdbc;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.sql.DataSource;
import junit.framework.Test;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.MessagePriorityTest;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JDBCMessagePriorityTest
extends MessagePriorityTest {
    private static final Logger LOG = LoggerFactory.getLogger(JDBCMessagePriorityTest.class);
    EmbeddedDataSource dataSource;
    JDBCPersistenceAdapter jdbc;

    @Override
    protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception {
        this.jdbc = new JDBCPersistenceAdapter();
        this.dataSource = new EmbeddedDataSource();
        this.dataSource.setDatabaseName("derbyDb");
        this.dataSource.setCreateDatabase("create");
        this.dataSource.setShutdownDatabase(null);
        this.jdbc.setDataSource((DataSource)this.dataSource);
        this.jdbc.deleteAllMessages();
        this.jdbc.setCleanupPeriod(2000);
        return this.jdbc;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void tearDown() throws Exception {
        super.tearDown();
        try {
            if (this.dataSource != null) {
                this.dataSource.setShutdownDatabase("shutdown");
                this.dataSource.getConnection();
            }
        }
        catch (Exception exception) {
        }
        finally {
            this.dataSource.setShutdownDatabase(null);
        }
    }

    public void testDurableSubsReconnectWithFourLevels() throws Exception {
        ActiveMQTopic topic = (ActiveMQTopic)this.sess.createTopic("TEST");
        String subName = "priorityDisconnect";
        TopicSubscriber sub = this.sess.createDurableSubscriber((Topic)topic, "priorityDisconnect");
        sub.close();
        int MED_PRI = this.LOW_PRI + 1;
        int MED_HIGH_PRI = this.HIGH_PRI - 1;
        MessagePriorityTest.ProducerThread lowPri = new MessagePriorityTest.ProducerThread(this, (ActiveMQDestination)topic, this.MSG_NUM, this.LOW_PRI);
        MessagePriorityTest.ProducerThread medPri = new MessagePriorityTest.ProducerThread(this, (ActiveMQDestination)topic, this.MSG_NUM, MED_PRI);
        MessagePriorityTest.ProducerThread medHighPri = new MessagePriorityTest.ProducerThread(this, (ActiveMQDestination)topic, this.MSG_NUM, MED_HIGH_PRI);
        MessagePriorityTest.ProducerThread highPri = new MessagePriorityTest.ProducerThread(this, (ActiveMQDestination)topic, this.MSG_NUM, this.HIGH_PRI);
        lowPri.start();
        highPri.start();
        medPri.start();
        medHighPri.start();
        lowPri.join();
        highPri.join();
        medPri.join();
        medHighPri.join();
        int closeFrequency = this.MSG_NUM;
        int[] priorities = new int[]{this.HIGH_PRI, MED_HIGH_PRI, MED_PRI, this.LOW_PRI};
        sub = this.sess.createDurableSubscriber((Topic)topic, "priorityDisconnect");
        for (int i = 0; i < this.MSG_NUM * 4; ++i) {
            Message msg = sub.receive(10000L);
            LOG.debug("received i=" + i + ", m=" + (msg != null ? msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority() : null));
            JDBCMessagePriorityTest.assertNotNull((String)("Message " + i + " was null"), (Object)msg);
            JDBCMessagePriorityTest.assertEquals((String)("Message " + i + " has wrong priority"), (int)priorities[i / this.MSG_NUM], (int)msg.getJMSPriority());
            if (i <= 0 || i % closeFrequency != 0) continue;
            LOG.info("Closing durable sub.. on: " + i);
            sub.close();
            sub = this.sess.createDurableSubscriber((Topic)topic, "priorityDisconnect");
        }
        LOG.info("closing on done!");
        sub.close();
    }

    public void initCombosForTestConcurrentDurableSubsReconnectWithXLevels() {
        this.addCombinationValues("prioritizeMessages", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testConcurrentDurableSubsReconnectWithXLevels() throws Exception {
        ActiveMQTopic topic = (ActiveMQTopic)this.sess.createTopic("TEST");
        String subName = "priorityDisconnect";
        Connection consumerConn = this.factory.createConnection();
        consumerConn.setClientID("priorityDisconnect");
        consumerConn.start();
        Session consumerSession = consumerConn.createSession(false, 1);
        TopicSubscriber sub = consumerSession.createDurableSubscriber((Topic)topic, "priorityDisconnect");
        sub.close();
        int maxPriority = 5;
        Object[] messageCounts = new AtomicInteger[5];
        long[] messageIds = new long[5];
        Vector<MessagePriorityTest.ProducerThread> producers = new Vector<MessagePriorityTest.ProducerThread>();
        for (int priority = 0; priority < 5; ++priority) {
            producers.add(new MessagePriorityTest.ProducerThread(this, (ActiveMQDestination)topic, this.MSG_NUM, priority));
            messageCounts[priority] = new AtomicInteger(0);
            messageIds[priority] = 1L;
        }
        for (MessagePriorityTest.ProducerThread producer : producers) {
            producer.start();
        }
        int closeFrequency = this.MSG_NUM / 2;
        HashMap<String, String> dups = new HashMap<String, String>();
        sub = consumerSession.createDurableSubscriber((Topic)topic, "priorityDisconnect");
        for (int i = 0; i < this.MSG_NUM * 5; ++i) {
            Message msg = sub.receive(10000L);
            LOG.debug("received i=" + i + ", m=" + (msg != null ? msg.getJMSMessageID() + ", priority: " + msg.getJMSPriority() : null));
            JDBCMessagePriorityTest.assertNotNull((String)("Message " + i + " was null, counts: " + Arrays.toString(messageCounts)), (Object)msg);
            JDBCMessagePriorityTest.assertNull((String)("no duplicate message failed on : " + msg.getJMSMessageID()), (Object)dups.put(msg.getJMSMessageID(), "priorityDisconnect"));
            ((AtomicInteger)messageCounts[msg.getJMSPriority()]).incrementAndGet();
            JDBCMessagePriorityTest.assertEquals((String)("message is in order : " + msg), (long)messageIds[msg.getJMSPriority()], (long)((ActiveMQMessage)msg).getMessageId().getProducerSequenceId());
            int n = msg.getJMSPriority();
            messageIds[n] = messageIds[n] + 1L;
            if (i <= 0 || i % closeFrequency != 0) continue;
            LOG.info("Closing durable sub.. on: " + i + ", counts: " + Arrays.toString(messageCounts));
            sub.close();
            sub = consumerSession.createDurableSubscriber((Topic)topic, "priorityDisconnect");
        }
        LOG.info("closing on done!");
        sub.close();
        consumerSession.close();
        consumerConn.close();
        for (MessagePriorityTest.ProducerThread producer : producers) {
            producer.join();
        }
    }

    public void initCombosForTestConcurrentRate() {
        this.addCombinationValues("prefetchVal", new Object[]{new Integer(1), new Integer(500)});
    }

    public void testConcurrentRate() throws Exception {
        long start;
        ActiveMQTopic topic = (ActiveMQTopic)this.sess.createTopic("TEST");
        String subName = "priorityConcurrent";
        Connection consumerConn = this.factory.createConnection();
        consumerConn.setClientID("subName");
        consumerConn.start();
        Session consumerSession = consumerConn.createSession(false, 1);
        TopicSubscriber sub = consumerSession.createDurableSubscriber((Topic)topic, "priorityConcurrent");
        sub.close();
        int TO_SEND = 2000;
        final Vector duplicates = new Vector();
        final int[] dups = new int[8000];
        double max = 0.0;
        double sum = 0.0;
        MessageProducer messageProducer = this.sess.createProducer((Destination)topic);
        TextMessage message = this.sess.createTextMessage();
        for (int i = 0; i < 2000; ++i) {
            int priority = i % 10;
            message.setText(i + "-" + priority);
            message.setIntProperty("seq", i);
            message.setJMSPriority(priority);
            if (i > 0 && i % 1000 == 0) {
                LOG.info("Max send time: " + max + ". Sending message: " + message.getText());
            }
            start = System.currentTimeMillis();
            messageProducer.send((Message)message, 2, message.getJMSPriority(), 0L);
            long duration = System.currentTimeMillis() - start;
            max = Math.max(max, (double)duration);
            if ((double)duration == max) {
                LOG.info("new max: " + max + " on i=" + i + ", " + message.getText());
            }
            sum += (double)duration;
        }
        LOG.info("Sent: 2000, max send time: " + max);
        double noConsumerAve = sum * 100.0 / 2000.0;
        sub = consumerSession.createDurableSubscriber((Topic)topic, "priorityConcurrent");
        final AtomicInteger count = new AtomicInteger();
        sub.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                try {
                    int seqNo;
                    count.incrementAndGet();
                    if (count.get() % 100 == 0) {
                        LOG.info("onMessage: count: " + count.get() + ", " + ((TextMessage)message).getText() + ", seqNo " + message.getIntProperty("seq") + ", " + message.getJMSMessageID());
                    }
                    if (dups[seqNo = message.getIntProperty("seq")] == 0) {
                        dups[seqNo] = 1;
                    } else {
                        LOG.error("Duplicate: " + ((TextMessage)message).getText() + ", " + message.getJMSMessageID());
                        duplicates.add(message);
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        LOG.info("Activated consumer");
        max = 0.0;
        sum = 0.0;
        for (int i = 2000; i < 4000; ++i) {
            int priority = i % 10;
            message.setText(i + "-" + priority);
            message.setIntProperty("seq", i);
            message.setJMSPriority(priority);
            if (i > 0 && i % 1000 == 0) {
                LOG.info("Max send time: " + max + ". Sending message: " + message.getText());
            }
            start = System.currentTimeMillis();
            messageProducer.send((Message)message, 2, message.getJMSPriority(), 0L);
            long duration = System.currentTimeMillis() - start;
            max = Math.max(max, (double)duration);
            if ((double)duration == max) {
                LOG.info("new max: " + max + " on i=" + i + ", " + message.getText());
            }
            sum += (double)duration;
        }
        LOG.info("Sent another: 2000, max send time: " + max);
        double withConsumerAve = sum * 100.0 / 2000.0;
        int reasonableMultiplier = 4;
        JDBCMessagePriorityTest.assertTrue((String)("max X times as slow with consumer:" + withConsumerAve + " , noConsumerMax:" + noConsumerAve), (withConsumerAve < noConsumerAve * 4.0 ? 1 : 0) != 0);
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("count: " + count.get());
                return 4000 == count.get();
            }
        }, (long)60000L);
        JDBCMessagePriorityTest.assertTrue((String)("No duplicates : " + duplicates), (boolean)duplicates.isEmpty());
        JDBCMessagePriorityTest.assertEquals((String)"got all messages", (int)4000, (int)count.get());
    }

    public void testCleanupPriorityDestination() throws Exception {
        int i;
        JDBCMessagePriorityTest.assertEquals((String)"no messages pending", (int)0, (int)this.messageTableCount());
        ActiveMQTopic topic = (ActiveMQTopic)this.sess.createTopic("TEST");
        String subName = "priorityConcurrent";
        Connection consumerConn = this.factory.createConnection();
        consumerConn.setClientID("subName");
        consumerConn.start();
        Session consumerSession = consumerConn.createSession(false, 1);
        TopicSubscriber sub = consumerSession.createDurableSubscriber((Topic)topic, "priorityConcurrent");
        sub.close();
        MessageProducer messageProducer = this.sess.createProducer((Destination)topic);
        TextMessage message = this.sess.createTextMessage();
        message.setJMSPriority(2);
        messageProducer.send((Message)message, 2, message.getJMSPriority(), 0L);
        message.setJMSPriority(5);
        messageProducer.send((Message)message, 2, message.getJMSPriority(), 0L);
        JDBCMessagePriorityTest.assertEquals((String)"two messages pending", (int)2, (int)this.messageTableCount());
        sub = consumerSession.createDurableSubscriber((Topic)topic, "priorityConcurrent");
        message = sub.receive(5000L);
        JDBCMessagePriorityTest.assertEquals((String)"got high priority", (int)5, (int)message.getJMSPriority());
        this.waitForAck(5);
        for (i = 0; i < 10; ++i) {
            this.jdbc.cleanup();
        }
        JDBCMessagePriorityTest.assertEquals((String)"one messages pending", (int)1, (int)this.messageTableCount());
        message = sub.receive(5000L);
        JDBCMessagePriorityTest.assertEquals((String)"got high priority", (int)2, (int)message.getJMSPriority());
        this.waitForAck(2);
        for (i = 0; i < 10; ++i) {
            this.jdbc.cleanup();
        }
        JDBCMessagePriorityTest.assertEquals((String)"no messages pending", (int)0, (int)this.messageTableCount());
    }

    public void testCleanupNonPriorityDestination() throws Exception {
        int i;
        JDBCMessagePriorityTest.assertEquals((String)"no messages pending", (int)0, (int)this.messageTableCount());
        ActiveMQTopic topic = (ActiveMQTopic)this.sess.createTopic("TEST_CLEANUP_NO_PRIORITY");
        String subName = "subName";
        Connection consumerConn = this.factory.createConnection();
        consumerConn.setClientID("subName");
        consumerConn.start();
        Session consumerSession = consumerConn.createSession(false, 1);
        TopicSubscriber sub = consumerSession.createDurableSubscriber((Topic)topic, "subName");
        sub.close();
        MessageProducer messageProducer = this.sess.createProducer((Destination)topic);
        TextMessage message = this.sess.createTextMessage("ToExpire");
        messageProducer.send((Message)message, 2, 4, 4000L);
        message = this.sess.createTextMessage("A");
        messageProducer.send((Message)message);
        message = this.sess.createTextMessage("B");
        messageProducer.send((Message)message);
        message = null;
        JDBCMessagePriorityTest.assertEquals((String)"three messages pending", (int)3, (int)this.messageTableCount());
        TimeUnit.SECONDS.sleep(5L);
        sub = consumerSession.createDurableSubscriber((Topic)topic, "subName");
        message = sub.receive(5000L);
        JDBCMessagePriorityTest.assertNotNull((String)"got message", (Object)message);
        LOG.info("Got: " + message);
        this.waitForAck(0, 1);
        for (i = 0; i < 10; ++i) {
            this.jdbc.cleanup();
        }
        JDBCMessagePriorityTest.assertEquals((String)"one messages pending", (int)1, (int)this.messageTableCount());
        message = sub.receive(5000L);
        JDBCMessagePriorityTest.assertNotNull((String)"got message two", (Object)message);
        LOG.info("Got: " + message);
        this.waitForAck(0, 2);
        for (i = 0; i < 10; ++i) {
            this.jdbc.cleanup();
        }
        JDBCMessagePriorityTest.assertEquals((String)"no messages pending", (int)0, (int)this.messageTableCount());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int messageTableCount() throws Exception {
        int count = -1;
        java.sql.Connection c = this.dataSource.getConnection();
        try {
            PreparedStatement s = c.prepareStatement("SELECT COUNT(*) FROM ACTIVEMQ_MSGS");
            ResultSet rs = s.executeQuery();
            if (rs.next()) {
                count = rs.getInt(1);
            }
        }
        finally {
            if (c != null) {
                c.close();
            }
        }
        return count;
    }

    private void waitForAck(int priority) throws Exception {
        this.waitForAck(priority, 0);
    }

    private void waitForAck(final int priority, final int minId) throws Exception {
        JDBCMessagePriorityTest.assertTrue((String)("got ack for " + priority), (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public boolean isSatisified() throws Exception {
                int id = 0;
                java.sql.Connection c = JDBCMessagePriorityTest.this.dataSource.getConnection();
                try {
                    PreparedStatement s = c.prepareStatement("SELECT LAST_ACKED_ID FROM ACTIVEMQ_ACKS WHERE PRIORITY=" + priority);
                    ResultSet rs = s.executeQuery();
                    if (rs.next()) {
                        id = rs.getInt(1);
                    }
                }
                finally {
                    if (c != null) {
                        c.close();
                    }
                }
                return id > minId;
            }
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int messageTableDump() throws Exception {
        int count = -1;
        java.sql.Connection c = this.dataSource.getConnection();
        try {
            PreparedStatement s = c.prepareStatement("SELECT * FROM ACTIVEMQ_MSGS");
            ResultSet rs = s.executeQuery();
            if (rs.next()) {
                count = rs.getInt(1);
            }
        }
        finally {
            if (c != null) {
                c.close();
            }
        }
        return count;
    }

    public static Test suite() {
        return JDBCMessagePriorityTest.suite(JDBCMessagePriorityTest.class);
    }
}

