/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.leveldb.LevelDBStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ1866
extends TestCase {
    private static final Logger log = LoggerFactory.getLogger(ConsumerThread.class);
    private BrokerService brokerService;
    private ArrayList<Thread> threads = new ArrayList();
    private final String ACTIVEMQ_BROKER_BIND = "tcp://localhost:0";
    private String ACTIVEMQ_BROKER_URI;
    AtomicBoolean shutdown = new AtomicBoolean();
    private ActiveMQQueue destination;

    protected void setUp() throws Exception {
        this.brokerService = new BrokerService();
        LevelDBStore adaptor = new LevelDBStore();
        this.brokerService.setPersistenceAdapter((PersistenceAdapter)adaptor);
        this.brokerService.deleteAllMessages();
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry pe = new PolicyEntry();
        pe.setMaxPageSize(1);
        policyMap.put((ActiveMQDestination)new ActiveMQQueue(">"), (Object)pe);
        this.brokerService.setDestinationPolicy(policyMap);
        this.brokerService.addConnector("tcp://localhost:0");
        this.brokerService.start();
        this.ACTIVEMQ_BROKER_URI = ((TransportConnector)this.brokerService.getTransportConnectors().get(0)).getPublishableConnectString();
        this.destination = new ActiveMQQueue(this.getName());
    }

    protected void tearDown() throws Exception {
        this.shutdown.set(true);
        for (Thread t : this.threads) {
            t.interrupt();
            t.join();
        }
        this.brokerService.stop();
    }

    public void testConsumerSlowDownPrefetch0() throws Exception {
        this.ACTIVEMQ_BROKER_URI = this.ACTIVEMQ_BROKER_URI + "?jms.prefetchPolicy.queuePrefetch=0";
        this.doTestConsumerSlowDown();
    }

    public void testConsumerSlowDownPrefetch10() throws Exception {
        this.ACTIVEMQ_BROKER_URI = this.ACTIVEMQ_BROKER_URI + "?jms.prefetchPolicy.queuePrefetch=10";
        this.doTestConsumerSlowDown();
    }

    public void testConsumerSlowDownDefaultPrefetch() throws Exception {
        this.doTestConsumerSlowDown();
    }

    public void doTestConsumerSlowDown() throws Exception {
        this.produce(20000);
        Thread producer = new Thread(){

            @Override
            public void run() {
                try {
                    while (!AMQ1866.this.shutdown.get()) {
                        AMQ1866.this.produce(1000);
                    }
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        };
        this.threads.add(producer);
        producer.start();
        ConsumerThread c1 = new ConsumerThread("Consumer-1");
        this.threads.add(c1);
        c1.start();
        Thread.sleep(500L);
        ConsumerThread c2 = new ConsumerThread("Consumer-2");
        this.threads.add(c2);
        c2.start();
        int totalReceived = 0;
        for (int i = 0; i < 30; ++i) {
            Thread.sleep(1000L);
            long c1Counter = c1.counter.getAndSet(0L);
            long c2Counter = c2.counter.getAndSet(0L);
            log.debug("c1: " + c1Counter + ", c2: " + c2Counter);
            totalReceived = (int)((long)totalReceived + c1Counter);
            totalReceived = (int)((long)totalReceived + c2Counter);
            if (i <= 10) continue;
            AMQ1866.assertTrue((String)("Total received=" + totalReceived + ", Consumer 2 should be receiving new messages every second."), (c2Counter > 0L ? 1 : 0) != 0);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void produce(int count) throws Exception {
        Connection connection = null;
        try {
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.ACTIVEMQ_BROKER_URI);
            factory.setDispatchAsync(true);
            connection = factory.createConnection();
            Session session = connection.createSession(false, 1);
            MessageProducer producer = session.createProducer((Destination)this.destination);
            connection.start();
            for (int i = 0; i < count; ++i) {
                producer.send((Message)session.createTextMessage(this.getName() + " Message " + ++i));
            }
        }
        finally {
            try {
                connection.close();
            }
            catch (Throwable throwable) {}
        }
    }

    public class ConsumerThread
    extends Thread {
        final AtomicLong counter;

        public ConsumerThread(String threadId) {
            super(threadId);
            this.counter = new AtomicLong();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Connection connection = null;
            try {
                log.debug(this.getName() + ": is running");
                ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(AMQ1866.this.ACTIVEMQ_BROKER_URI);
                factory.setDispatchAsync(true);
                connection = factory.createConnection();
                Session session = connection.createSession(false, 1);
                MessageConsumer consumer = session.createConsumer((Destination)AMQ1866.this.destination);
                connection.start();
                while (!AMQ1866.this.shutdown.get()) {
                    TextMessage msg = (TextMessage)consumer.receive(1000L);
                    if (msg == null) continue;
                    int sleepingTime = this.getName().equals("Consumer-1") ? 1000000 : 1;
                    this.counter.incrementAndGet();
                    Thread.sleep(sleepingTime);
                }
            }
            catch (Exception exception) {
            }
            finally {
                log.debug(this.getName() + ": is stopping");
                try {
                    connection.close();
                }
                catch (Throwable throwable) {}
            }
        }
    }
}

