/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.sql.DataSource;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ5266Test {
    static Logger LOG = LoggerFactory.getLogger(AMQ5266Test.class);
    String activemqURL = "tcp://localhost:61617";
    BrokerService brokerService;
    private EmbeddedDataSource dataSource;

    @Before
    public void startBroker() throws Exception {
        this.brokerService = new BrokerService();
        this.dataSource = new EmbeddedDataSource();
        this.dataSource.setDatabaseName("target/derbyDb");
        this.dataSource.setCreateDatabase("create");
        JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
        persistenceAdapter.setDataSource((DataSource)this.dataSource);
        this.brokerService.setPersistenceAdapter((PersistenceAdapter)persistenceAdapter);
        this.brokerService.setDeleteAllMessagesOnStartup(true);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry defaultEntry = new PolicyEntry();
        defaultEntry.setEnableAudit(false);
        defaultEntry.setUseCache(false);
        defaultEntry.setMaxPageSize(1000);
        defaultEntry.setOptimizedDispatch(false);
        defaultEntry.setMemoryLimit(0x100000L);
        defaultEntry.setExpireMessagesPeriod(0L);
        policyMap.setDefaultEntry(defaultEntry);
        this.brokerService.setDestinationPolicy(policyMap);
        this.brokerService.getSystemUsage().getMemoryUsage().setLimit(0x20000000L);
        TransportConnector transportConnector = this.brokerService.addConnector("tcp://0.0.0.0:0");
        this.brokerService.start();
        this.activemqURL = transportConnector.getPublishableConnectString();
    }

    @After
    public void stopBroker() throws Exception {
        if (this.brokerService != null) {
            this.brokerService.stop();
        }
        try {
            this.dataSource.setShutdownDatabase("shutdown");
            this.dataSource.getConnection();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void test() throws Exception {
        String activemqQueues = "activemq,activemq2";
        int publisherMessagesPerThread = 1000;
        int publisherThreadCount = 5;
        int consumerThreadsPerQueue = 5;
        int consumerBatchSize = 25;
        int consumerWaitForConsumption = 300000;
        ExportQueuePublisher publisher = null;
        ExportQueueConsumer consumer = null;
        LOG.info("Publisher will publish " + publisherMessagesPerThread * publisherThreadCount + " messages to each queue specified.");
        LOG.info("\nBuilding Publisher...");
        publisher = new ExportQueuePublisher(this.activemqURL, activemqQueues, publisherMessagesPerThread, publisherThreadCount);
        LOG.info("Building Consumer...");
        consumer = new ExportQueueConsumer(this.activemqURL, activemqQueues, consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * publisherThreadCount);
        LOG.info("Starting Publisher...");
        publisher.start();
        LOG.info("Starting Consumer...");
        consumer.start();
        int distinctPublishedCount = 0;
        LOG.info("Waiting For Publisher Completion...");
        publisher.waitForCompletion();
        distinctPublishedCount = publisher.getIDs().size();
        LOG.info("Publisher Complete. Distinct IDs Published: " + distinctPublishedCount);
        long endWait = System.currentTimeMillis() + (long)consumerWaitForConsumption;
        while (!consumer.completed() && System.currentTimeMillis() < endWait) {
            try {
                int secs = (int)(endWait - System.currentTimeMillis()) / 1000;
                LOG.info("Waiting For Consumer Completion. Time left: " + secs + " secs");
                DefaultJDBCAdapter.dumpTables((Connection)this.dataSource.getConnection());
                Thread.sleep(10000L);
            }
            catch (Exception e) {}
        }
        LOG.info("\nConsumer Complete. Shutting Down.");
        consumer.shutdown();
        LOG.info("Consumer Stats:");
        for (Map.Entry<String, List<String>> entry : consumer.getIDs().entrySet()) {
            List<String> idList = entry.getValue();
            int distinctConsumed = new TreeSet<String>(idList).size();
            StringBuilder sb = new StringBuilder();
            sb.append("   Queue: " + entry.getKey() + " -> Total Messages Consumed: " + idList.size() + ", Distinct IDs Consumed: " + distinctConsumed);
            int diff = distinctPublishedCount - distinctConsumed;
            sb.append(" ( " + (diff > 0 ? Integer.valueOf(diff) : "NO") + " STUCK MESSAGES " + " ) ");
            LOG.info(sb.toString());
            Assert.assertEquals((String)"expect to get all messages!", (long)0L, (long)diff);
        }
    }

    public class ExportQueueConsumer {
        private final String amqUser = ActiveMQConnection.DEFAULT_USER;
        private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
        private final int totalToExpect;
        private ActiveMQConnectionFactory connectionFactory = null;
        private String activemqURL = null;
        private String activemqQueues = null;
        private String[] queues = null;
        private Map<String, List<String>> idsByQueue = new HashMap<String, List<String>>();
        private Map<String, List<ConsumerThread>> threads;

        public ExportQueueConsumer(String activemqURL, String activemqQueues, int threadsPerQueue, int batchSize, int totalToExpect) throws Exception {
            this.activemqURL = activemqURL;
            this.activemqQueues = activemqQueues;
            this.totalToExpect = totalToExpect;
            this.queues = this.activemqQueues.split(",");
            for (int i = 0; i < this.queues.length; ++i) {
                this.queues[i] = this.queues[i].trim();
            }
            this.threads = new HashMap<String, List<ConsumerThread>>();
            for (String q : this.queues) {
                ArrayList<ConsumerThread> list = new ArrayList<ConsumerThread>();
                this.idsByQueue.put(q, Collections.synchronizedList(new ArrayList()));
                for (int i = 0; i < threadsPerQueue; ++i) {
                    list.add(new ConsumerThread(q, batchSize));
                }
                this.threads.put(q, list);
            }
        }

        public Map<String, List<String>> getIDs() {
            return this.idsByQueue;
        }

        public void start() throws Exception {
            for (List<ConsumerThread> list : this.threads.values()) {
                for (ConsumerThread ct : list) {
                    ct.start();
                }
            }
        }

        public void shutdown() throws Exception {
            for (List<ConsumerThread> list : this.threads.values()) {
                for (ConsumerThread ct : list) {
                    ct.shutdown();
                }
            }
            for (List<ConsumerThread> list : this.threads.values()) {
                for (ConsumerThread ct : list) {
                    ct.join();
                }
            }
        }

        private Session newSession(QueueConnection queueConnection) throws Exception {
            return queueConnection.createSession(true, 0);
        }

        private QueueConnection newQueueConnection() throws Exception {
            if (this.connectionFactory == null) {
                this.connectionFactory = new ActiveMQConnectionFactory(this.amqUser, this.amqPassword, this.activemqURL);
            }
            RedeliveryPolicy policy = this.connectionFactory.getRedeliveryPolicy();
            policy.setMaximumRedeliveries(-1);
            QueueConnection amqConnection = this.connectionFactory.createQueueConnection();
            amqConnection.start();
            return amqConnection;
        }

        public boolean completed() {
            for (List<ConsumerThread> list : this.threads.values()) {
                for (ConsumerThread ct : list) {
                    if (!ct.isAlive()) continue;
                    LOG.info("thread for {} is still alive.", (Object)ct.qName);
                    return false;
                }
            }
            return true;
        }

        private class ConsumerThread
        extends Thread {
            private int batchSize;
            private QueueConnection qc;
            private Session session;
            private MessageConsumer mc;
            private List<String> idList;
            private boolean shutdown = false;
            private String qName;

            private ConsumerThread(String queueName, int batchSize) throws Exception {
                this.batchSize = batchSize;
                this.qName = queueName;
                this.qc = ExportQueueConsumer.this.newQueueConnection();
                this.session = ExportQueueConsumer.this.newSession(this.qc);
                Queue q = this.session.createQueue(queueName);
                this.mc = this.session.createConsumer((Destination)q);
                this.idList = (List)ExportQueueConsumer.this.idsByQueue.get(queueName);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    int count = 0;
                    while (!this.shutdown) {
                        if (this.idList.size() >= ExportQueueConsumer.this.totalToExpect) {
                            LOG.info("Got {} for q: {}", (Object)this.idList.size(), (Object)this.qName);
                            break;
                        }
                        Message m = this.mc.receive(4000L);
                        if (m != null) {
                            this.idList.add(m.getStringProperty("KEY"));
                            if (++count != this.batchSize) continue;
                            this.session.commit();
                            count = 0;
                            continue;
                        }
                        this.session.commit();
                        count = 0;
                        try {
                            LOG.info("did not receive on {}, current count: {}", (Object)this.qName, (Object)this.idList.size());
                        }
                        catch (Exception exception) {}
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                finally {
                    this.close();
                }
            }

            public void shutdown() {
                this.shutdown = true;
            }

            public void close() {
                try {
                    this.mc.close();
                }
                catch (Exception e) {
                    // empty catch block
                }
                try {
                    this.session.close();
                }
                catch (Exception e) {
                    // empty catch block
                }
                try {
                    this.qc.close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }
    }

    public class ExportQueuePublisher {
        private final String amqUser = ActiveMQConnection.DEFAULT_USER;
        private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
        private ActiveMQConnectionFactory connectionFactory = null;
        private String activemqURL = null;
        private String activemqQueues = null;
        private Set<String> ids = Collections.synchronizedSet(new TreeSet());
        private List<PublisherThread> threads;

        public ExportQueuePublisher(String activemqURL, String activemqQueues, int messagesPerThread, int threadCount) throws Exception {
            this.activemqURL = activemqURL;
            this.activemqQueues = activemqQueues;
            this.threads = new ArrayList<PublisherThread>();
            for (int i = 0; i < threadCount; ++i) {
                PublisherThread pt = new PublisherThread(messagesPerThread);
                this.threads.add(pt);
            }
        }

        public Set<String> getIDs() {
            return this.ids;
        }

        public void start() throws Exception {
            for (PublisherThread pt : this.threads) {
                pt.start();
            }
        }

        public void waitForCompletion() throws Exception {
            for (PublisherThread pt : this.threads) {
                pt.join();
                pt.close();
            }
        }

        private Session newSession(QueueConnection queueConnection) throws Exception {
            return queueConnection.createSession(true, 0);
        }

        private QueueConnection newQueueConnection() throws Exception {
            if (this.connectionFactory == null) {
                this.connectionFactory = new ActiveMQConnectionFactory(this.amqUser, this.amqPassword, this.activemqURL);
            }
            RedeliveryPolicy policy = this.connectionFactory.getRedeliveryPolicy();
            policy.setMaximumRedeliveries(-1);
            QueueConnection amqConnection = this.connectionFactory.createQueueConnection();
            amqConnection.start();
            return amqConnection;
        }

        private class PublisherThread
        extends Thread {
            private int count;
            private QueueConnection qc;
            private Session session;
            private MessageProducer mp;

            private PublisherThread(int count) throws Exception {
                this.count = count;
                this.qc = ExportQueuePublisher.this.newQueueConnection();
                this.session = ExportQueuePublisher.this.newSession(this.qc);
                ActiveMQQueue q = new ActiveMQQueue(ExportQueuePublisher.this.activemqQueues);
                this.mp = this.session.createProducer((Destination)q);
            }

            @Override
            public void run() {
                try {
                    while (this.count-- > 0) {
                        TextMessage tm = this.session.createTextMessage("test");
                        String id = UUID.randomUUID().toString();
                        tm.setStringProperty("KEY", id);
                        ExportQueuePublisher.this.ids.add(id);
                        this.mp.send((Message)tm);
                        this.session.commit();
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }

            public void close() {
                try {
                    this.mp.close();
                }
                catch (Exception e) {
                    // empty catch block
                }
                try {
                    this.session.close();
                }
                catch (Exception e) {
                    // empty catch block
                }
                try {
                    this.qc.close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }
    }
}

