/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.management.ObjectName;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicProducerDurableSubFlowControlTest
extends TestCase
implements MessageListener {
    private static final Logger LOG = LoggerFactory.getLogger(TopicProducerDurableSubFlowControlTest.class);
    private static final String brokerName = "testBroker";
    private static final String brokerUrl = "vm://testBroker";
    protected static final int destinationMemLimit = 0x200000;
    private static final AtomicLong produced = new AtomicLong();
    private static final AtomicLong consumed = new AtomicLong();
    private static final int numMessagesToSend = 10000;
    private BrokerService broker;

    protected void setUp() throws Exception {
        this.doSetup(true);
    }

    private void doSetup(boolean deleteAll) throws Exception {
        this.broker = new BrokerService();
        this.broker.setBrokerName(brokerName);
        this.broker.setSchedulerSupport(false);
        this.broker.setUseJmx(true);
        this.broker.setUseShutdownHook(false);
        this.broker.addConnector(brokerUrl);
        this.broker.setAdvisorySupport(false);
        this.broker.getSystemUsage().getMemoryUsage().setLimit(0x1400000L);
        this.broker.setDeleteAllMessagesOnStartup(deleteAll);
        PolicyMap pm = new PolicyMap();
        PolicyEntry tpe = new PolicyEntry();
        tpe.setTopic(">");
        tpe.setMemoryLimit(0x200000L);
        tpe.setCursorMemoryHighWaterMark(10);
        tpe.setProducerFlowControl(true);
        tpe.setAdvisoryWhenFull(true);
        tpe.setExpireMessagesPeriod(0L);
        pm.setPolicyEntries(Arrays.asList(tpe));
        this.setDestinationPolicy(this.broker, pm);
        this.broker.start();
        this.broker.waitUntilStarted();
    }

    protected void setDestinationPolicy(BrokerService broker, PolicyMap pm) {
        broker.setDestinationPolicy(pm);
    }

    protected void tearDown() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    public void testTopicProducerFlowControl() throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
        connectionFactory.setAlwaysSyncSend(true);
        Connection c = connectionFactory.createConnection();
        c.setClientID("cliId1");
        c.start();
        Session listenerSession = c.createSession(false, 1);
        TopicSubscriber durable = listenerSession.createDurableSubscriber((Topic)this.createDestination(), "DurableSub-0");
        durable.close();
        durable = listenerSession.createDurableSubscriber((Topic)this.createDestination(), "DurableSub-1");
        durable.setMessageListener((MessageListener)this);
        final Session session = connectionFactory.createConnection().createSession(false, 1);
        final MessageProducer producer = session.createProducer((Destination)this.createDestination());
        final Thread producingThread = new Thread("Producing Thread"){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    for (long i = 0L; i < 10000L; ++i) {
                        producer.send((Message)session.createTextMessage("test"));
                        long count = produced.incrementAndGet();
                        if (count % 10000L != 0L) continue;
                        LOG.info("Produced " + count + " messages");
                    }
                }
                catch (Throwable ex) {
                    ex.printStackTrace();
                }
                finally {
                    try {
                        producer.close();
                        session.close();
                    }
                    catch (Exception e) {}
                }
            }
        };
        producingThread.start();
        ArrayList<ObjectName> subON = new ArrayList<ObjectName>();
        final ArrayList<DurableSubscriptionViewMBean> subViews = new ArrayList<DurableSubscriptionViewMBean>();
        subON.addAll(Arrays.asList(this.broker.getAdminView().getInactiveDurableTopicSubscribers()));
        subON.addAll(Arrays.asList(this.broker.getAdminView().getDurableTopicSubscribers()));
        TopicProducerDurableSubFlowControlTest.assertTrue((String)"have a sub", (!subON.isEmpty() ? 1 : 0) != 0);
        for (ObjectName subName : subON) {
            subViews.add((DurableSubscriptionViewMBean)this.broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true));
        }
        LOG.info("Wait for producer to stop");
        TopicProducerDurableSubFlowControlTest.assertTrue((String)"producer thread is done", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                for (DurableSubscriptionViewMBean sub : subViews) {
                    LOG.info("name: " + sub.getSubscriptionName());
                    LOG.info("cursor size: " + sub.cursorSize());
                    LOG.info("mem usage: " + sub.getCursorMemoryUsage());
                    LOG.info("mem % usage: " + sub.getCursorPercentUsage());
                }
                return !producingThread.isAlive();
            }
        }, (long)300000L));
        for (DurableSubscriptionViewMBean sub : subViews) {
            LOG.info("name: " + sub.getSubscriptionName());
            LOG.info("cursor size: " + sub.cursorSize());
            LOG.info("mem usage: " + sub.getCursorMemoryUsage());
            LOG.info("mem % usage: " + sub.getCursorPercentUsage());
            if (sub.cursorSize() <= 0) continue;
            TopicProducerDurableSubFlowControlTest.assertTrue((String)"Has a decent usage", (sub.getCursorPercentUsage() > 5 ? 1 : 0) != 0);
        }
    }

    protected ActiveMQTopic createDestination() throws Exception {
        return new ActiveMQTopic("test");
    }

    public void onMessage(Message message) {
        long count = consumed.incrementAndGet();
        if (count % 10000L == 0L) {
            LOG.info("\tConsumed " + count + " messages");
        }
    }
}

