/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.io.File;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MemoryUsageBlockResumeTest
extends TestSupport
implements Thread.UncaughtExceptionHandler {
    public int deliveryMode = 2;
    private static final Logger LOG = LoggerFactory.getLogger(MemoryUsageBlockResumeTest.class);
    private static byte[] buf = new byte[4096];
    private static byte[] bigBuf = new byte[49152];
    private BrokerService broker;
    AtomicInteger messagesSent = new AtomicInteger(0);
    AtomicInteger messagesConsumed = new AtomicInteger(0);
    protected long messageReceiveTimeout = 10000L;
    Destination destination = new ActiveMQQueue("FooTwo");
    Destination bigDestination = new ActiveMQQueue("FooTwoBig");
    private String connectionUri;
    private final Vector<Throwable> exceptions = new Vector();

    public void testBlockByOtherResumeNoException() throws Exception {
        int count;
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(this.connectionUri);
        factory.setProducerWindowSize(49152);
        ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
        prefetch.setTopicPrefetch(10);
        factory.setPrefetchPolicy(prefetch);
        Connection consumerConnection = factory.createConnection();
        consumerConnection.start();
        Session consumerSession = consumerConnection.createSession(false, 1);
        MessageConsumer consumer = consumerSession.createConsumer(this.bigDestination);
        final Connection producerConnection = factory.createConnection();
        producerConnection.start();
        int fillWithBigCount = 10;
        Session session = producerConnection.createSession(false, 1);
        MessageProducer producer = session.createProducer(null);
        producer.setDeliveryMode(this.deliveryMode);
        for (int idx = 0; idx < 10; ++idx) {
            TextMessage message = session.createTextMessage(new String(bigBuf) + idx);
            producer.send(this.bigDestination, (Message)message);
            this.messagesSent.incrementAndGet();
            LOG.info("After big: " + idx + ", System Memory Usage " + this.broker.getSystemUsage().getMemoryUsage().getPercentUsage());
        }
        int toSend = 20;
        Thread producingThread = new Thread("Producing thread"){

            @Override
            public void run() {
                try {
                    Session session = producerConnection.createSession(false, 1);
                    MessageProducer producer = session.createProducer(MemoryUsageBlockResumeTest.this.destination);
                    producer.setDeliveryMode(MemoryUsageBlockResumeTest.this.deliveryMode);
                    for (int idx = 0; idx < 20; ++idx) {
                        TextMessage message = session.createTextMessage(new String(buf) + idx);
                        producer.send(MemoryUsageBlockResumeTest.this.destination, (Message)message);
                        MemoryUsageBlockResumeTest.this.messagesSent.incrementAndGet();
                        LOG.info("After little:" + idx + ", System Memory Usage " + MemoryUsageBlockResumeTest.this.broker.getSystemUsage().getMemoryUsage().getPercentUsage());
                    }
                }
                catch (Throwable ex) {
                    ex.printStackTrace();
                }
            }
        };
        producingThread.start();
        Thread producingThreadTwo = new Thread("Producing thread"){

            @Override
            public void run() {
                try {
                    Session session = producerConnection.createSession(false, 1);
                    MessageProducer producer = session.createProducer(MemoryUsageBlockResumeTest.this.destination);
                    producer.setDeliveryMode(MemoryUsageBlockResumeTest.this.deliveryMode);
                    for (int idx = 0; idx < 20; ++idx) {
                        TextMessage message = session.createTextMessage(new String(buf) + idx);
                        producer.send(MemoryUsageBlockResumeTest.this.destination, (Message)message);
                        MemoryUsageBlockResumeTest.this.messagesSent.incrementAndGet();
                        LOG.info("After little:" + idx + ", System Memory Usage " + MemoryUsageBlockResumeTest.this.broker.getSystemUsage().getMemoryUsage().getPercentUsage());
                    }
                }
                catch (Throwable ex) {
                    ex.printStackTrace();
                }
            }
        };
        producingThreadTwo.start();
        MemoryUsageBlockResumeTest.assertTrue((String)"producer has sent x in a reasonable time", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Checking for : X sent, System Memory Usage " + MemoryUsageBlockResumeTest.this.broker.getSystemUsage().getMemoryUsage().getPercentUsage() + ", sent:  " + MemoryUsageBlockResumeTest.this.messagesSent);
                return MemoryUsageBlockResumeTest.this.messagesSent.get() > 20;
            }
        }));
        LOG.info("Consuming from big q to allow delivery to smaller q from pending");
        Message m = null;
        for (count = 0; count < 10; ++count) {
            m = consumer.receive(this.messageReceiveTimeout);
            MemoryUsageBlockResumeTest.assertTrue((m != null ? 1 : 0) != 0);
            LOG.info("Recieved Message (" + count + "):" + m + ", System Memory Usage " + this.broker.getSystemUsage().getMemoryUsage().getPercentUsage());
            this.messagesConsumed.incrementAndGet();
        }
        consumer.close();
        producingThread.join();
        producingThreadTwo.join();
        MemoryUsageBlockResumeTest.assertEquals((String)("Incorrect number of Messages Sent: " + this.messagesSent.get()), (int)this.messagesSent.get(), (int)50);
        consumer = consumerSession.createConsumer(this.destination);
        for (count = 0; count < 40; ++count) {
            m = consumer.receive(this.messageReceiveTimeout);
            MemoryUsageBlockResumeTest.assertTrue((m != null ? 1 : 0) != 0);
            LOG.info("Recieved Message (" + count + "):" + m + ", System Memory Usage " + this.broker.getSystemUsage().getMemoryUsage().getPercentUsage());
            this.messagesConsumed.incrementAndGet();
        }
        MemoryUsageBlockResumeTest.assertEquals((String)("Incorrect number of Messages consumed: " + this.messagesConsumed.get()), (int)this.messagesSent.get(), (int)this.messagesConsumed.get());
    }

    public void setUp() throws Exception {
        Thread.setDefaultUncaughtExceptionHandler(this);
        this.broker = new BrokerService();
        this.broker.setDataDirectory("target" + File.separator + "activemq-data");
        this.broker.setPersistent(true);
        this.broker.setUseJmx(false);
        this.broker.setAdvisorySupport(false);
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.setDefaultPersistenceAdapter(this.broker);
        this.broker.getSystemUsage().getMemoryUsage().setLimit(491520L);
        PolicyEntry defaultPolicy = new PolicyEntry();
        defaultPolicy.setOptimizedDispatch(true);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(defaultPolicy);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.addConnector("tcp://localhost:0");
        this.broker.start();
        this.connectionUri = ((TransportConnector)this.broker.getTransportConnectors().get(0)).getPublishableConnectString();
    }

    public void tearDown() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    @Override
    public void uncaughtException(Thread t, Throwable e) {
        LOG.error("Unexpected Unhandeled ex on: " + t, e);
        this.exceptions.add(e);
    }
}

