/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.net.URI;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import junit.framework.Test;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.MessageIdList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Assert;

public class NetworkBridgeProducerFlowControlTest
extends JmsMultipleBrokersTestSupport {
    private static final long MAX_TEST_TIME = 120000L;
    private static final Log LOG = LogFactory.getLog(NetworkBridgeProducerFlowControlTest.class);
    public boolean persistentTestMessages;
    public boolean networkIsAlwaysSendSync;
    private Vector<Throwable> exceptions = new Vector();

    public static Test suite() {
        return NetworkBridgeProducerFlowControlTest.suite(NetworkBridgeProducerFlowControlTest.class);
    }

    public void initCombosForTestFastAndSlowRemoteConsumers() {
        this.addCombinationValues("persistentTestMessages", new Object[]{new Boolean(true), new Boolean(false)});
        this.addCombinationValues("networkIsAlwaysSendSync", new Object[]{new Boolean(true), new Boolean(false)});
    }

    @Override
    protected void setUp() throws Exception {
        this.setAutoFail(true);
        this.setMaxTestTime(120000L);
        super.setUp();
    }

    public void testFastAndSlowRemoteConsumers() throws Exception {
        int NUM_MESSAGES = 100;
        long TEST_MESSAGE_SIZE = 1024L;
        long SLOW_CONSUMER_DELAY_MILLIS = 100L;
        ActiveMQQueue SLOW_SHARED_QUEUE = new ActiveMQQueue(NetworkBridgeProducerFlowControlTest.class.getSimpleName() + ".slow.shared?consumer.prefetchSize=1");
        ActiveMQQueue FAST_SHARED_QUEUE = new ActiveMQQueue(NetworkBridgeProducerFlowControlTest.class.getSimpleName() + ".fast.shared?consumer.prefetchSize=1");
        this.createBroker(new URI("broker:(tcp://localhost:0)?brokerName=broker0&persistent=false&useJmx=true"));
        BrokerService remoteBroker = this.createBroker(new URI("broker:(tcp://localhost:0)?brokerName=broker1&persistent=false&useJmx=true"));
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setMemoryLimit(5120L);
        PolicyMap policyMap = new PolicyMap();
        policyMap.put((ActiveMQDestination)SLOW_SHARED_QUEUE, (Object)policyEntry);
        remoteBroker.setDestinationPolicy(policyMap);
        NetworkConnector nc = this.bridgeBrokers("broker0", "broker1");
        nc.setAlwaysSyncSend(this.networkIsAlwaysSendSync);
        nc.setPrefetchSize(1);
        this.startAllBrokers();
        this.waitForBridgeFormation();
        this.persistentDelivery = this.persistentTestMessages;
        this.sendMessages("broker0", (Destination)FAST_SHARED_QUEUE, 100);
        this.sendMessages("broker0", (Destination)SLOW_SHARED_QUEUE, 100);
        final CountDownLatch fastConsumerLatch = new CountDownLatch(100);
        final CountDownLatch slowConsumerLatch = new CountDownLatch(100);
        final long startTimeMillis = System.currentTimeMillis();
        final AtomicLong fastConsumerTime = new AtomicLong();
        final AtomicLong slowConsumerTime = new AtomicLong();
        Thread fastWaitThread = new Thread(){

            @Override
            public void run() {
                try {
                    fastConsumerLatch.await();
                    fastConsumerTime.set(System.currentTimeMillis() - startTimeMillis);
                }
                catch (InterruptedException ex) {
                    NetworkBridgeProducerFlowControlTest.this.exceptions.add(ex);
                    Assert.fail((String)ex.getMessage());
                }
            }
        };
        Thread slowWaitThread = new Thread(){

            @Override
            public void run() {
                try {
                    slowConsumerLatch.await();
                    slowConsumerTime.set(System.currentTimeMillis() - startTimeMillis);
                }
                catch (InterruptedException ex) {
                    NetworkBridgeProducerFlowControlTest.this.exceptions.add(ex);
                    Assert.fail((String)ex.getMessage());
                }
            }
        };
        fastWaitThread.start();
        slowWaitThread.start();
        this.createConsumer("broker1", (Destination)FAST_SHARED_QUEUE, fastConsumerLatch);
        MessageConsumer slowConsumer = this.createConsumer("broker1", (Destination)SLOW_SHARED_QUEUE, slowConsumerLatch);
        MessageIdList messageIdList = ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"broker1")).consumers.get(slowConsumer);
        messageIdList.setProcessingDelay(100L);
        fastWaitThread.join();
        slowWaitThread.join();
        NetworkBridgeProducerFlowControlTest.assertTrue((String)("no exceptions on the wait threads:" + this.exceptions), (boolean)this.exceptions.isEmpty());
        LOG.info((Object)("Fast consumer duration (ms): " + fastConsumerTime.get()));
        LOG.info((Object)("Slow consumer duration (ms): " + slowConsumerTime.get()));
        if (this.networkIsAlwaysSendSync) {
            Assert.assertTrue((fastConsumerTime.get() < slowConsumerTime.get() / 10L ? 1 : 0) != 0);
        } else {
            Assert.assertEquals((Object)this.persistentTestMessages, (Object)(fastConsumerTime.get() < slowConsumerTime.get() / 10L ? 1 : 0));
        }
    }

    public void testSendFailIfNoSpaceDoesNotBlockQueueNetwork() throws Exception {
        ActiveMQQueue SLOW_SHARED_QUEUE = new ActiveMQQueue(NetworkBridgeProducerFlowControlTest.class.getSimpleName() + ".slow.shared?consumer.prefetchSize=1");
        ActiveMQQueue FAST_SHARED_QUEUE = new ActiveMQQueue(NetworkBridgeProducerFlowControlTest.class.getSimpleName() + ".fast.shared?consumer.prefetchSize=1");
        this.doTestSendFailIfNoSpaceDoesNotBlockNetwork((ActiveMQDestination)SLOW_SHARED_QUEUE, (ActiveMQDestination)FAST_SHARED_QUEUE);
    }

    public void testSendFailIfNoSpaceDoesNotBlockTopicNetwork() throws Exception {
        ActiveMQTopic SLOW_SHARED_TOPIC = new ActiveMQTopic(NetworkBridgeProducerFlowControlTest.class.getSimpleName() + ".slow.shared?consumer.prefetchSize=1");
        ActiveMQTopic FAST_SHARED_TOPIC = new ActiveMQTopic(NetworkBridgeProducerFlowControlTest.class.getSimpleName() + ".fast.shared?consumer.prefetchSize=1");
        this.doTestSendFailIfNoSpaceDoesNotBlockNetwork((ActiveMQDestination)SLOW_SHARED_TOPIC, (ActiveMQDestination)FAST_SHARED_TOPIC);
    }

    public void doTestSendFailIfNoSpaceDoesNotBlockNetwork(ActiveMQDestination slowDestination, ActiveMQDestination fastDestination) throws Exception {
        int NUM_MESSAGES = 100;
        long TEST_MESSAGE_SIZE = 1024L;
        long SLOW_CONSUMER_DELAY_MILLIS = 100L;
        this.createBroker(new URI("broker:(tcp://localhost:0)?brokerName=broker0&persistent=false&useJmx=true"));
        BrokerService remoteBroker = this.createBroker(new URI("broker:(tcp://localhost:0)?brokerName=broker1&persistent=false&useJmx=true"));
        remoteBroker.getSystemUsage().setSendFailIfNoSpace(true);
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setMemoryLimit(5120L);
        PolicyMap policyMap = new PolicyMap();
        policyMap.put(slowDestination, (Object)policyEntry);
        remoteBroker.setDestinationPolicy(policyMap);
        NetworkConnector nc = this.bridgeBrokers("broker0", "broker1");
        nc.setAlwaysSyncSend(true);
        nc.setPrefetchSize(1);
        this.startAllBrokers();
        this.waitForBridgeFormation();
        final CountDownLatch fastConsumerLatch = new CountDownLatch(100);
        final CountDownLatch slowConsumerLatch = new CountDownLatch(100);
        final long startTimeMillis = System.currentTimeMillis();
        final AtomicLong fastConsumerTime = new AtomicLong();
        final AtomicLong slowConsumerTime = new AtomicLong();
        Thread fastWaitThread = new Thread(){

            @Override
            public void run() {
                try {
                    fastConsumerLatch.await();
                    fastConsumerTime.set(System.currentTimeMillis() - startTimeMillis);
                }
                catch (InterruptedException ex) {
                    NetworkBridgeProducerFlowControlTest.this.exceptions.add(ex);
                    Assert.fail((String)ex.getMessage());
                }
            }
        };
        Thread slowWaitThread = new Thread(){

            @Override
            public void run() {
                try {
                    slowConsumerLatch.await();
                    slowConsumerTime.set(System.currentTimeMillis() - startTimeMillis);
                }
                catch (InterruptedException ex) {
                    NetworkBridgeProducerFlowControlTest.this.exceptions.add(ex);
                    Assert.fail((String)ex.getMessage());
                }
            }
        };
        fastWaitThread.start();
        slowWaitThread.start();
        this.createConsumer("broker1", (Destination)fastDestination, fastConsumerLatch);
        MessageConsumer slowConsumer = this.createConsumer("broker1", (Destination)slowDestination, slowConsumerLatch);
        MessageIdList messageIdList = ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"broker1")).consumers.get(slowConsumer);
        messageIdList.setProcessingDelay(100L);
        this.persistentDelivery = false;
        this.sendMessages("broker0", (Destination)fastDestination, 100);
        this.sendMessages("broker0", (Destination)slowDestination, 100);
        fastWaitThread.join(TimeUnit.SECONDS.toMillis(60L));
        slowWaitThread.join(TimeUnit.SECONDS.toMillis(60L));
        NetworkBridgeProducerFlowControlTest.assertTrue((String)("no exceptions on the wait threads:" + this.exceptions), (boolean)this.exceptions.isEmpty());
        LOG.info((Object)("Fast consumer duration (ms): " + fastConsumerTime.get()));
        LOG.info((Object)("Slow consumer duration (ms): " + slowConsumerTime.get()));
        NetworkBridgeProducerFlowControlTest.assertTrue((String)"fast time set", (fastConsumerTime.get() > 0L ? 1 : 0) != 0);
        NetworkBridgeProducerFlowControlTest.assertTrue((String)"slow time set", (slowConsumerTime.get() > 0L ? 1 : 0) != 0);
        Assert.assertTrue((fastConsumerTime.get() < slowConsumerTime.get() / 10L ? 1 : 0) != 0);
    }
}

