/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class ConcurrentProducerDurableConsumerTest
extends TestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(ConcurrentProducerDurableConsumerTest.class);
    private final int consumerCount = 5;
    BrokerService broker;
    protected List<Connection> connections = Collections.synchronizedList(new ArrayList());
    protected Map<MessageConsumer, TimedMessageListener> consumers = new HashMap<MessageConsumer, TimedMessageListener>();
    protected MessageIdList allMessagesList = new MessageIdList();
    private final int messageSize = 1024;
    private final TestSupport.PersistenceAdapterChoice persistenceAdapterChoice;

    @Parameterized.Parameters
    public static Collection<TestSupport.PersistenceAdapterChoice[]> getTestParameters() {
        TestSupport.PersistenceAdapterChoice[] kahaDb = new TestSupport.PersistenceAdapterChoice[]{TestSupport.PersistenceAdapterChoice.KahaDB};
        TestSupport.PersistenceAdapterChoice[] levelDb = new TestSupport.PersistenceAdapterChoice[]{TestSupport.PersistenceAdapterChoice.LevelDB};
        TestSupport.PersistenceAdapterChoice[] mem = new TestSupport.PersistenceAdapterChoice[]{TestSupport.PersistenceAdapterChoice.MEM};
        ArrayList<TestSupport.PersistenceAdapterChoice[]> choices = new ArrayList<TestSupport.PersistenceAdapterChoice[]>();
        choices.add(kahaDb);
        choices.add(levelDb);
        choices.add(mem);
        return choices;
    }

    public ConcurrentProducerDurableConsumerTest(TestSupport.PersistenceAdapterChoice choice) {
        this.persistenceAdapterChoice = choice;
    }

    @Test(timeout=120000L)
    public void testSendRateWithActivatingConsumers() throws Exception {
        Destination destination = this.createDestination();
        ActiveMQConnectionFactory factory = this.createConnectionFactory();
        this.startInactiveConsumers((ConnectionFactory)factory, destination);
        Connection connection = factory.createConnection();
        Session session = connection.createSession(false, 1);
        MessageProducer producer = this.createMessageProducer(session, destination);
        double[] inactiveConsumerStats = this.produceMessages(destination, 500, 10, session, producer, null);
        LOG.info("With inactive consumers: ave: " + inactiveConsumerStats[1] + ", max: " + inactiveConsumerStats[0] + ", multiplier: " + inactiveConsumerStats[0] / inactiveConsumerStats[1]);
        int consumersToActivate = 5;
        final Object addConsumerSignal = new Object();
        Executors.newCachedThreadPool(new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "ActivateConsumer" + this);
            }
        }).execute(new Runnable((ConnectionFactory)factory, destination){
            final /* synthetic */ ConnectionFactory val$factory;
            final /* synthetic */ Destination val$destination;
            {
                this.val$factory = connectionFactory;
                this.val$destination = destination;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    TopicSubscriber consumer = null;
                    for (int i = 0; i < 5; ++i) {
                        LOG.info("Waiting for add signal from producer...");
                        Object object = addConsumerSignal;
                        synchronized (object) {
                            addConsumerSignal.wait(1800000L);
                        }
                        TimedMessageListener listener = new TimedMessageListener();
                        consumer = ConcurrentProducerDurableConsumerTest.this.createDurableSubscriber(this.val$factory.createConnection(), this.val$destination, "consumer" + (i + 1));
                        LOG.info("Created consumer " + consumer);
                        consumer.setMessageListener((MessageListener)listener);
                        ConcurrentProducerDurableConsumerTest.this.consumers.put((MessageConsumer)consumer, listener);
                    }
                }
                catch (Exception e) {
                    LOG.error("failed to start consumer", (Throwable)e);
                }
            }
        });
        double[] statsWithActive = this.produceMessages(destination, 500, 10, session, producer, addConsumerSignal);
        LOG.info(" with concurrent activate, ave: " + statsWithActive[1] + ", max: " + statsWithActive[0] + ", multiplier: " + statsWithActive[0] / statsWithActive[1]);
        while (this.consumers.size() < 5) {
            TimeUnit.SECONDS.sleep(2L);
        }
        long timeToFirstAccumulator = 0L;
        for (TimedMessageListener listener : this.consumers.values()) {
            long time = listener.getFirstReceipt();
            timeToFirstAccumulator += time;
            LOG.info("Time to first " + time);
        }
        LOG.info("Ave time to first message =" + timeToFirstAccumulator / (long)this.consumers.size());
        for (TimedMessageListener listener : this.consumers.values()) {
            LOG.info("Ave batch receipt time: " + listener.waitForReceivedLimit(10000L) + " max receipt: " + listener.maxReceiptTime);
        }
        LOG.info("Ave send time with active: " + statsWithActive[1] + " as multiplier of ave with none active: " + inactiveConsumerStats[1] + ", multiplier=" + statsWithActive[1] / inactiveConsumerStats[1]);
        ConcurrentProducerDurableConsumerTest.assertTrue((String)("Ave send time with active: " + statsWithActive[1] + " within reasonable multpler of ave with none active: " + inactiveConsumerStats[1] + ", multiplier " + statsWithActive[1] / inactiveConsumerStats[1]), (statsWithActive[1] < 15.0 * inactiveConsumerStats[1] ? 1 : 0) != 0);
    }

    public void x_testSendWithInactiveAndActiveConsumers() throws Exception {
        Destination destination = this.createDestination();
        ActiveMQConnectionFactory factory = this.createConnectionFactory();
        this.startInactiveConsumers((ConnectionFactory)factory, destination);
        Connection connection = factory.createConnection();
        Session session = connection.createSession(false, 1);
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(2);
        int toSend = 100;
        int numIterations = 5;
        double[] noConsumerStats = this.produceMessages(destination, 100, 5, session, producer, null);
        this.startConsumers((ConnectionFactory)factory, destination);
        LOG.info("Activated consumer");
        double[] withConsumerStats = this.produceMessages(destination, 100, 5, session, producer, null);
        LOG.info("With consumer: " + withConsumerStats[1] + " , with noConsumer: " + noConsumerStats[1] + ", multiplier: " + withConsumerStats[1] / noConsumerStats[1]);
        int reasonableMultiplier = 15;
        ConcurrentProducerDurableConsumerTest.assertTrue((String)("max X times as slow with consumer: " + withConsumerStats[1] + ", with no Consumer: " + noConsumerStats[1] + ", multiplier: " + withConsumerStats[1] / noConsumerStats[1]), (withConsumerStats[1] < noConsumerStats[1] * 15.0 ? 1 : 0) != 0);
        int toReceive = 5000;
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("count: " + ConcurrentProducerDurableConsumerTest.this.allMessagesList.getMessageCount());
                return 5000 == ConcurrentProducerDurableConsumerTest.this.allMessagesList.getMessageCount();
            }
        }, (long)60000L);
        ConcurrentProducerDurableConsumerTest.assertEquals((String)"got all messages", (int)5000, (int)this.allMessagesList.getMessageCount());
    }

    private MessageProducer createMessageProducer(Session session, Destination destination) throws JMSException {
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(2);
        return producer;
    }

    private void startInactiveConsumers(ConnectionFactory factory, Destination destination) throws Exception {
        this.startConsumers(factory, destination);
        for (Connection connection : this.connections) {
            connection.close();
        }
        this.connections.clear();
        this.consumers.clear();
    }

    protected void startConsumers(ConnectionFactory factory, Destination dest) throws Exception {
        for (int i = 0; i < 5; ++i) {
            TimedMessageListener list = new TimedMessageListener();
            TopicSubscriber consumer = this.createDurableSubscriber(factory.createConnection(), dest, "consumer" + (i + 1));
            consumer.setMessageListener((MessageListener)list);
            this.consumers.put((MessageConsumer)consumer, list);
        }
    }

    protected TopicSubscriber createDurableSubscriber(Connection conn, Destination dest, String name) throws Exception {
        conn.setClientID(name);
        this.connections.add(conn);
        conn.start();
        Session sess = conn.createSession(false, 1);
        TopicSubscriber consumer = sess.createDurableSubscriber((Topic)dest, name);
        return consumer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private double[] produceMessages(Destination destination, int toSend, int numIterations, Session session, MessageProducer producer, Object addConsumerSignal) throws Exception {
        long count = 0L;
        double batchMax = 0.0;
        double max = 0.0;
        double sum = 0.0;
        for (int i = 0; i < numIterations; ++i) {
            long start = System.currentTimeMillis();
            for (int j = 0; j < toSend; ++j) {
                long singleSendstart = System.currentTimeMillis();
                TextMessage msg = this.createTextMessage(session, "" + j);
                int priority = (int)count % 10;
                producer.send((Message)msg, 2, priority, 0L);
                max = Math.max(max, (double)(System.currentTimeMillis() - singleSendstart));
                if (++count % 500L == 0L && addConsumerSignal != null) {
                    Object object = addConsumerSignal;
                    synchronized (object) {
                        addConsumerSignal.notifyAll();
                        LOG.info("Signalled add consumer");
                    }
                }
                if (count % 5000L != 0L) continue;
                LOG.info("Sent " + count + ", singleSendMax:" + max);
            }
            long duration = System.currentTimeMillis() - start;
            batchMax = Math.max(batchMax, (double)duration);
            sum += (double)duration;
            LOG.info("Iteration " + i + ", sent " + toSend + ", time: " + duration + ", batchMax:" + batchMax + ", singleSendMax:" + max);
        }
        LOG.info("Sent: " + toSend * numIterations + ", batchMax: " + batchMax + " singleSendMax: " + max);
        return new double[]{batchMax, sum / (double)numIterations};
    }

    protected TextMessage createTextMessage(Session session, String initText) throws Exception {
        TextMessage msg = session.createTextMessage();
        if (initText.length() < 1024) {
            char[] data = new char[1024 - initText.length()];
            Arrays.fill(data, '*');
            String str = new String(data);
            msg.setText(initText + str);
        } else {
            msg.setText(initText);
        }
        return msg;
    }

    @Before
    public void setUp() throws Exception {
        this.topic = true;
        super.setUp();
        this.broker = this.createBroker();
        this.broker.start();
    }

    @After
    public void tearDown() throws Exception {
        for (Connection conn : this.connections) {
            try {
                conn.close();
            }
            catch (Throwable throwable) {}
        }
        this.broker.stop();
        this.allMessagesList.flushMessages();
        this.consumers.clear();
        super.tearDown();
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setEnableStatistics(false);
        brokerService.addConnector("tcp://0.0.0.0:0");
        brokerService.setDeleteAllMessagesOnStartup(true);
        PolicyEntry policy = new PolicyEntry();
        policy.setPrioritizedMessages(true);
        policy.setMaxPageSize(500);
        StorePendingDurableSubscriberMessageStoragePolicy durableSubPending = new StorePendingDurableSubscriberMessageStoragePolicy();
        durableSubPending.setImmediatePriorityDispatch(true);
        durableSubPending.setUseCache(true);
        policy.setPendingDurableSubscriberPolicy((PendingDurableSubscriberMessageStoragePolicy)durableSubPending);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policy);
        brokerService.setDestinationPolicy(policyMap);
        ConcurrentProducerDurableConsumerTest.setPersistenceAdapter(brokerService, this.persistenceAdapterChoice);
        return brokerService;
    }

    @Override
    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(((TransportConnector)this.broker.getTransportConnectors().get(0)).getPublishableConnectString());
        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
        prefetchPolicy.setAll(1);
        factory.setPrefetchPolicy(prefetchPolicy);
        factory.setDispatchAsync(true);
        return factory;
    }

    class TimedMessageListener
    implements MessageListener {
        final int batchSize = 1000;
        CountDownLatch firstReceiptLatch = new CountDownLatch(1);
        long mark = System.currentTimeMillis();
        long firstReceipt = 0L;
        long receiptAccumulator = 0L;
        long batchReceiptAccumulator = 0L;
        long maxReceiptTime = 0L;
        AtomicLong count = new AtomicLong(0L);
        Map<Integer, MessageIdList> messageLists = new ConcurrentHashMap<Integer, MessageIdList>(new HashMap());

        TimedMessageListener() {
        }

        public void onMessage(Message message) {
            long current = System.currentTimeMillis();
            long duration = current - this.mark;
            this.receiptAccumulator += duration;
            int priority = 0;
            try {
                priority = message.getJMSPriority();
            }
            catch (JMSException jMSException) {
                // empty catch block
            }
            if (!this.messageLists.containsKey(priority)) {
                MessageIdList perPriorityList = new MessageIdList();
                perPriorityList.setParent(ConcurrentProducerDurableConsumerTest.this.allMessagesList);
                this.messageLists.put(priority, perPriorityList);
            }
            this.messageLists.get(priority).onMessage(message);
            if (this.count.incrementAndGet() == 1L) {
                this.firstReceipt = duration;
                this.firstReceiptLatch.countDown();
                LOG.info("First receipt in " + this.firstReceipt + "ms");
            } else if (this.count.get() % 1000L == 0L) {
                LOG.info("Consumed " + this.count.get() + " in " + this.batchReceiptAccumulator + "ms" + ", priority:" + priority);
                this.batchReceiptAccumulator = 0L;
            }
            this.maxReceiptTime = Math.max(this.maxReceiptTime, duration);
            this.receiptAccumulator += duration;
            this.batchReceiptAccumulator += duration;
            this.mark = current;
        }

        long getMessageCount() {
            return this.count.get();
        }

        long getFirstReceipt() throws Exception {
            this.firstReceiptLatch.await(30L, TimeUnit.SECONDS);
            return this.firstReceipt;
        }

        public long waitForReceivedLimit(long limit) throws Exception {
            long expiry = System.currentTimeMillis() + 1800000L;
            while (this.count.get() < limit) {
                if (System.currentTimeMillis() > expiry) {
                    throw new RuntimeException("Expired waiting for X messages, " + limit);
                }
                TimeUnit.SECONDS.sleep(2L);
                String missing = this.findFirstMissingMessage();
                if (missing == null) continue;
                LOG.info("first missing = " + missing);
                throw new RuntimeException("We have a missing message. " + missing);
            }
            return this.receiptAccumulator / (limit / 1000L);
        }

        private String findFirstMissingMessage() {
            MessageId current = new MessageId();
            for (MessageIdList priorityList : this.messageLists.values()) {
                MessageId previous = null;
                for (String id : priorityList.getMessageIds()) {
                    current.setValue(id);
                    if (previous == null) {
                        previous = current.copy();
                        continue;
                    }
                    if (current.getProducerSequenceId() - 1L != previous.getProducerSequenceId() && current.getProducerSequenceId() - 10L != previous.getProducerSequenceId()) {
                        return "Missing next after: " + previous + ", got: " + current;
                    }
                    previous = current.copy();
                }
            }
            return null;
        }
    }
}

