/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.PendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Assert;
import org.junit.Test;

public class ActiveMQSlowConsumerManualTest {
    private static final int PORT = 12345;
    private static final ActiveMQTopic TOPIC = new ActiveMQTopic("TOPIC");
    private static final String URL = "nio://localhost:12345?socket.tcpNoDelay=true";

    @Test(timeout=60000L)
    public void testDefaultSettings() throws Exception {
        this.runTest("testDefaultSettings", 30, -1, -1, false, false, false, false);
    }

    @Test(timeout=60000L)
    public void testDefaultSettingsWithOptimiseAcknowledge() throws Exception {
        this.runTest("testDefaultSettingsWithOptimiseAcknowledge", 30, -1, -1, false, false, true, false);
    }

    @Test(timeout=60000L)
    public void testBounded() throws Exception {
        this.runTest("testBounded", 30, 5, 25, false, false, false, false);
    }

    @Test(timeout=60000L)
    public void testBoundedWithOptimiseAcknowledge() throws Exception {
        this.runTest("testBoundedWithOptimiseAcknowledge", 30, 5, 25, false, false, true, false);
    }

    public void runTest(String name, int sendMessageCount, int prefetchLimit, int messageLimit, boolean evictOldestMessage, boolean disableFlowControl, boolean optimizeAcknowledge, boolean persistent) throws Exception {
        BrokerService broker = ActiveMQSlowConsumerManualTest.createBroker(persistent);
        broker.setDestinationPolicy(ActiveMQSlowConsumerManualTest.buildPolicy(TOPIC, prefetchLimit, messageLimit, evictOldestMessage, disableFlowControl));
        broker.start();
        Session slowConsumerSession = ActiveMQSlowConsumerManualTest.buildSession("SlowConsumer", URL, optimizeAcknowledge);
        final CountDownLatch blockSlowConsumer = new CountDownLatch(1);
        final AtomicInteger slowConsumerReceiveCount = new AtomicInteger();
        final ArrayList slowConsumerReceived = sendMessageCount <= 1000 ? new ArrayList() : null;
        MessageConsumer slowConsumer = ActiveMQSlowConsumerManualTest.createSubscriber(slowConsumerSession, new MessageListener(){

            public void onMessage(Message message) {
                try {
                    slowConsumerReceiveCount.incrementAndGet();
                    int count = Integer.parseInt(((TextMessage)message).getText());
                    if (slowConsumerReceived != null) {
                        slowConsumerReceived.add(count);
                    }
                    if (count % 10000 == 0) {
                        System.out.println("SlowConsumer: Receive " + count);
                    }
                    blockSlowConsumer.await();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        });
        Session fastConsumerSession = ActiveMQSlowConsumerManualTest.buildSession("FastConsumer", URL, optimizeAcknowledge);
        final AtomicInteger fastConsumerReceiveCount = new AtomicInteger();
        final ArrayList fastConsumerReceived = sendMessageCount <= 1000 ? new ArrayList() : null;
        MessageConsumer fastConsumer = ActiveMQSlowConsumerManualTest.createSubscriber(fastConsumerSession, new MessageListener(){

            public void onMessage(Message message) {
                try {
                    fastConsumerReceiveCount.incrementAndGet();
                    TimeUnit.MILLISECONDS.sleep(5L);
                    int count = Integer.parseInt(((TextMessage)message).getText());
                    if (fastConsumerReceived != null) {
                        fastConsumerReceived.add(count);
                    }
                    if (count % 10000 == 0) {
                        System.out.println("FastConsumer: Receive " + count);
                    }
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        });
        Thread.sleep(500L);
        AtomicInteger sentCount = new AtomicInteger();
        ArrayList<Integer> sent = sendMessageCount <= 1000 ? new ArrayList<Integer>() : null;
        Session publisherSession = ActiveMQSlowConsumerManualTest.buildSession("Publisher", URL, optimizeAcknowledge);
        MessageProducer publisher = ActiveMQSlowConsumerManualTest.createPublisher(publisherSession);
        for (int i = 0; i < sendMessageCount; ++i) {
            sentCount.incrementAndGet();
            if (sent != null) {
                sent.add(i);
            }
            if (i % 10000 == 0) {
                System.out.println("Publisher: Send " + i);
            }
            publisher.send((Message)publisherSession.createTextMessage(Integer.toString(i)));
        }
        Thread.sleep(500L);
        System.out.println(name + ": Publisher Sent: " + sentCount + " " + sent);
        System.out.println(name + ": Whilst slow consumer blocked:");
        System.out.println("\t\t- SlowConsumer Received: " + slowConsumerReceiveCount + " " + slowConsumerReceived);
        System.out.println("\t\t- FastConsumer Received: " + fastConsumerReceiveCount + " " + fastConsumerReceived);
        blockSlowConsumer.countDown();
        Thread.sleep(500L);
        System.out.println(name + ": After slow consumer unblocked:");
        System.out.println("\t\t- SlowConsumer Received: " + slowConsumerReceiveCount + " " + slowConsumerReceived);
        System.out.println("\t\t- FastConsumer Received: " + fastConsumerReceiveCount + " " + fastConsumerReceived);
        System.out.println();
        publisher.close();
        publisherSession.close();
        slowConsumer.close();
        slowConsumerSession.close();
        fastConsumer.close();
        fastConsumerSession.close();
        broker.stop();
        Assert.assertEquals((String)"Fast consumer missed messages whilst slow consumer was blocking", sent, fastConsumerReceived);
    }

    private static BrokerService createBroker(boolean persistent) throws Exception {
        BrokerService broker = new BrokerService();
        broker.setBrokerName("TestBroker");
        broker.setPersistent(persistent);
        broker.addConnector(URL);
        return broker;
    }

    private static MessageConsumer createSubscriber(Session session, MessageListener messageListener) throws JMSException {
        MessageConsumer consumer = session.createConsumer((Destination)TOPIC);
        consumer.setMessageListener(messageListener);
        return consumer;
    }

    private static MessageProducer createPublisher(Session session) throws JMSException {
        MessageProducer producer = session.createProducer((Destination)TOPIC);
        producer.setDeliveryMode(1);
        return producer;
    }

    private static Session buildSession(String clientId, String url, boolean optimizeAcknowledge) throws JMSException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        connectionFactory.setCopyMessageOnSend(false);
        connectionFactory.setDisableTimeStampsByDefault(true);
        connectionFactory.setOptimizeAcknowledge(optimizeAcknowledge);
        if (optimizeAcknowledge) {
            connectionFactory.setOptimizeAcknowledgeTimeOut(1L);
        }
        Connection connection = connectionFactory.createConnection();
        connection.setClientID(clientId);
        Session session = connection.createSession(false, 1);
        connection.start();
        return session;
    }

    private static PolicyMap buildPolicy(ActiveMQTopic topic, int prefetchLimit, int messageLimit, boolean evictOldestMessage, boolean disableFlowControl) {
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        if (evictOldestMessage) {
            policyEntry.setMessageEvictionStrategy((MessageEvictionStrategy)new OldestMessageEvictionStrategy());
        }
        if (disableFlowControl) {
            policyEntry.setProducerFlowControl(false);
        }
        if (prefetchLimit > 0) {
            policyEntry.setTopicPrefetch(prefetchLimit);
        }
        if (messageLimit > 0) {
            ConstantPendingMessageLimitStrategy messageLimitStrategy = new ConstantPendingMessageLimitStrategy();
            messageLimitStrategy.setLimit(messageLimit);
            policyEntry.setPendingMessageLimitStrategy((PendingMessageLimitStrategy)messageLimitStrategy);
        }
        policyMap.put((ActiveMQDestination)topic, (Object)policyEntry);
        return policyMap;
    }
}

