/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcDurableSubDupTest {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcDurableSubDupTest.class);
    final int prefetchVal = 150;
    String urlOptions = "jms.watchTopicAdvisories=false";
    String url = null;
    String queueName = "topicTest?consumer.prefetchSize=150";
    String xmlMessage = "<Example 01234567890123456789012345678901234567890123456789 MessageText>";
    String selector = "";
    String clntVersion = "87";
    String clntId = "timsClntId345" + this.clntVersion;
    String subscriptionName = "subscriptionName-y" + this.clntVersion;
    SimpleDateFormat dtf = new SimpleDateFormat("HH:mm:ss");
    final int TO_RECEIVE = 5000;
    BrokerService broker = null;
    Vector<Throwable> exceptions = new Vector();
    final int MAX_MESSAGES = 100000;
    int[] dupChecker = new int[100000];

    @Before
    public void startBroker() throws Exception {
        this.exceptions.clear();
        for (int i = 0; i < 100000; ++i) {
            this.dupChecker[i] = 0;
        }
        this.broker = new BrokerService();
        this.broker.setAdvisorySupport(false);
        this.broker.setPersistenceAdapter((PersistenceAdapter)new JDBCPersistenceAdapter());
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setMaxAuditDepth(3000);
        policyEntry.setMaxPageSize(150);
        policyEntry.setPrioritizedMessages(true);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.addConnector("tcp://localhost:0");
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.url = ((TransportConnector)this.broker.getTransportConnectors().get(0)).getConnectUri().toString() + "?" + this.urlOptions;
    }

    @After
    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    @Test
    public void testNoDupsOnSlowConsumerReconnect() throws Exception {
        JmsConsumerDup consumer = new JmsConsumerDup();
        consumer.done.set(true);
        consumer.run();
        consumer.done.set(false);
        LOG.info("serial production then consumption");
        JmsProvider provider = new JmsProvider();
        provider.run();
        consumer.run();
        Assert.assertTrue((String)("no exceptions: " + this.exceptions), (boolean)this.exceptions.isEmpty());
        for (int i = 0; i < 5000; ++i) {
            Assert.assertTrue((String)("got message " + i), (this.dupChecker[i] == 1 ? 1 : 0) != 0);
        }
    }

    @Test
    public void testNoDupsOnSlowConsumerLargePriorityGapReconnect() throws Exception {
        JmsConsumerDup consumer = new JmsConsumerDup();
        consumer.done.set(true);
        consumer.run();
        consumer.done.set(false);
        JmsProvider provider = new JmsProvider();
        provider.priorityModulator = 2500;
        provider.run();
        consumer.run();
        Assert.assertTrue((String)("no exceptions: " + this.exceptions), (boolean)this.exceptions.isEmpty());
        for (int i = 0; i < 5000; ++i) {
            Assert.assertTrue((String)("got message " + i), (this.dupChecker[i] == 1 ? 1 : 0) != 0);
        }
    }

    class JmsProvider
    implements Runnable {
        int priorityModulator = 10;

        JmsProvider() {
        }

        @Override
        public void run() {
            long timeToLive = 0L;
            TextMessage message = null;
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(JdbcDurableSubDupTest.this.url);
            try {
                Connection connection = factory.createConnection("MyUserName", "MyPassword");
                Session session = connection.createSession(false, 1);
                Topic topic = session.createTopic(JdbcDurableSubDupTest.this.queueName);
                MessageProducer messageProducer = session.createProducer((Destination)topic);
                messageProducer.setPriority(3);
                messageProducer.setTimeToLive(timeToLive);
                messageProducer.setDeliveryMode(2);
                int msgSeqNo = 0;
                int NUM_MSGS = 1000;
                int NUM_GROUPS = 5000 / NUM_MSGS;
                for (int n = 0; n < NUM_GROUPS; ++n) {
                    message = session.createTextMessage();
                    for (int i = 0; i < NUM_MSGS; ++i) {
                        int priority = 0;
                        priority = this.priorityModulator <= 10 ? msgSeqNo % this.priorityModulator : (msgSeqNo >= this.priorityModulator ? 9 : 0);
                        message.setText(JdbcDurableSubDupTest.this.xmlMessage + msgSeqNo + "-" + priority);
                        message.setJMSPriority(priority);
                        message.setIntProperty("SeqNo", msgSeqNo);
                        if (i > 0 && i % 100 == 0) {
                            LOG.info("Sending message: " + message.getText());
                        }
                        messageProducer.send((Message)message, 2, message.getJMSPriority(), timeToLive);
                        ++msgSeqNo;
                    }
                    try {
                        Thread.sleep(1000L);
                        continue;
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                        JdbcDurableSubDupTest.this.exceptions.add(e);
                    }
                }
            }
            catch (JMSException e) {
                LOG.error("caught ", (Throwable)e);
                e.printStackTrace();
                JdbcDurableSubDupTest.this.exceptions.add(e);
            }
        }
    }

    class JmsConsumerDup
    implements MessageListener {
        long count = 0L;
        AtomicBoolean done = new AtomicBoolean(false);

        JmsConsumerDup() {
        }

        public void run() {
            Connection connection = null;
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(JdbcDurableSubDupTest.this.url);
            try {
                connection = factory.createConnection("MyUsername", "MyPassword");
                connection.setClientID(JdbcDurableSubDupTest.this.clntId);
                connection.start();
                Session session = connection.createSession(false, 1);
                Topic topic = session.createTopic(JdbcDurableSubDupTest.this.queueName);
                TopicSubscriber consumer = session.createDurableSubscriber(topic, JdbcDurableSubDupTest.this.subscriptionName, JdbcDurableSubDupTest.this.selector, false);
                consumer.setMessageListener((MessageListener)this);
                LOG.info("Waiting for messages...");
                while (!this.done.get()) {
                    TimeUnit.SECONDS.sleep(5L);
                    if (this.count != 5000L && JdbcDurableSubDupTest.this.exceptions.isEmpty()) continue;
                    this.done.set(true);
                }
            }
            catch (Exception e) {
                LOG.error("caught", (Throwable)e);
                JdbcDurableSubDupTest.this.exceptions.add(e);
                throw new RuntimeException(e);
            }
            finally {
                if (connection != null) {
                    try {
                        LOG.info("consumer done (" + JdbcDurableSubDupTest.this.exceptions.isEmpty() + "), closing connection");
                        connection.close();
                    }
                    catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

        public void onMessage(Message message) {
            ++this.count;
            try {
                Thread.sleep(0L);
            }
            catch (InterruptedException e) {
                // empty catch block
            }
            try {
                int i;
                TextMessage m = (TextMessage)message;
                if (this.count % 100L == 0L) {
                    LOG.info("Rcvd Msg #-" + this.count + " " + m.getText() + " Sent->" + JdbcDurableSubDupTest.this.dtf.format(new Date(m.getJMSTimestamp())) + " Recv->" + JdbcDurableSubDupTest.this.dtf.format(new Date()) + " Expr->" + JdbcDurableSubDupTest.this.dtf.format(new Date(m.getJMSExpiration())) + ", mid: " + m.getJMSMessageID());
                }
                if ((i = m.getIntProperty("SeqNo")) < 100000) {
                    if (JdbcDurableSubDupTest.this.dupChecker[i] == 1) {
                        LOG.error("Duplicate message received at count: " + this.count + ", id: " + m.getJMSMessageID());
                        JdbcDurableSubDupTest.this.exceptions.add(new RuntimeException("Got Duplicate at: " + m.getJMSMessageID()));
                    } else {
                        JdbcDurableSubDupTest.this.dupChecker[i] = 1;
                    }
                }
            }
            catch (JMSException e) {
                LOG.error("caught ", (Throwable)e);
                JdbcDurableSubDupTest.this.exceptions.add(e);
            }
        }
    }
}

