/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.network;

import com.google.common.collect.Lists;
import java.io.File;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.virtual.CompositeQueue;
import org.apache.activemq.broker.region.virtual.CompositeTopic;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class VirtualConsumerDemandTest {
    protected static final int MESSAGE_COUNT = 10;
    private static final Logger LOG = LoggerFactory.getLogger(VirtualConsumerDemandTest.class);
    protected Connection localConnection;
    protected Connection remoteConnection;
    protected BrokerService localBroker;
    protected BrokerService remoteBroker;
    protected JavaRuntimeConfigurationBroker runtimeBroker;
    protected Session localSession;
    protected Session remoteSession;
    protected ActiveMQTopic included;
    protected ActiveMQTopic excluded;
    protected String consumerName = "durableSubs";
    protected String testTopicName = "include.test.bar";
    protected String testQueueName = "include.test.foo";
    private final boolean isDuplex;
    private final boolean isUseVirtualDestSubsOnCreation;
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder(new File("target"));
    protected NetworkConnector connector;
    protected AdvisoryBroker remoteAdvisoryBroker;

    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        return Arrays.asList({false, true}, {true, false}, {true, true}, {false, false});
    }

    public VirtualConsumerDemandTest(boolean isDuplex, boolean isUseVirtualDestSubsOnCreation) {
        this.isDuplex = isDuplex;
        this.isUseVirtualDestSubsOnCreation = isUseVirtualDestSubsOnCreation;
    }

    @Test(timeout=60000L)
    public void testVirtualTopic() throws Exception {
        Assume.assumeTrue((boolean)this.isUseVirtualDestSubsOnCreation);
        this.doSetUp(true, null);
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer("VirtualTopic.>");
        MessageProducer includedProducer = this.localSession.createProducer((Destination)new ActiveMQTopic("VirtualTopic.include.test.bar"));
        Thread.sleep(2000L);
        TextMessage test = this.localSession.createTextMessage("test");
        DestinationStatistics destinationStatistics = this.localBroker.getDestination((ActiveMQDestination)new ActiveMQTopic("VirtualTopic.include.test.bar")).getDestinationStatistics();
        DestinationStatistics remoteStats = this.remoteBroker.getDestination((ActiveMQDestination)new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar")).getDestinationStatistics();
        this.waitForConsumerCount(destinationStatistics, 1);
        includedProducer.send((Message)test);
        this.waitForDispatchFromLocalBroker(destinationStatistics, 1);
        this.assertLocalBrokerStatistics(destinationStatistics, 1);
        Assert.assertEquals((String)"remote dest messages", (long)1L, (long)remoteStats.getMessages().getCount());
        this.assertRemoteAdvisoryCount(advisoryConsumer, 1);
        this.assertAdvisoryBrokerCounts(1, 1, 1);
    }

    @Test(timeout=60000L)
    public void testVirtualTopicWithConsumer() throws Exception {
        this.doSetUp(true, null);
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer("VirtualTopic.>");
        MessageProducer includedProducer = this.localSession.createProducer((Destination)new ActiveMQTopic("VirtualTopic.include.test.bar"));
        Thread.sleep(2000L);
        TextMessage test = this.localSession.createTextMessage("test");
        DestinationStatistics destinationStatistics = this.localBroker.getDestination((ActiveMQDestination)new ActiveMQTopic("VirtualTopic.include.test.bar")).getDestinationStatistics();
        MessageConsumer bridgeConsumer = this.remoteSession.createConsumer((Destination)new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar"));
        this.waitForConsumerCount(destinationStatistics, 1);
        includedProducer.send((Message)test);
        Assert.assertNotNull((Object)bridgeConsumer.receive(5000L));
        this.waitForDispatchFromLocalBroker(destinationStatistics, 1);
        this.assertLocalBrokerStatistics(destinationStatistics, 1);
        this.assertRemoteAdvisoryCount(advisoryConsumer, 2, 1);
        if (this.isUseVirtualDestSubsOnCreation) {
            this.assertAdvisoryBrokerCounts(1, 2, 1);
        } else {
            this.assertAdvisoryBrokerCounts(1, 1, 0);
        }
    }

    @Test(timeout=60000L)
    public void testVirtualTopicWithConsumerGoOffline() throws Exception {
        Assume.assumeTrue((boolean)this.isUseVirtualDestSubsOnCreation);
        this.doSetUp(true, null);
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer("VirtualTopic.>");
        MessageProducer includedProducer = this.localSession.createProducer((Destination)new ActiveMQTopic("VirtualTopic.include.test.bar"));
        Thread.sleep(2000L);
        TextMessage test = this.localSession.createTextMessage("test");
        DestinationStatistics destinationStatistics = this.localBroker.getDestination((ActiveMQDestination)new ActiveMQTopic("VirtualTopic.include.test.bar")).getDestinationStatistics();
        MessageConsumer bridgeConsumer = this.remoteSession.createConsumer((Destination)new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar"));
        this.waitForConsumerCount(destinationStatistics, 1);
        includedProducer.send((Message)test);
        Assert.assertNotNull((Object)bridgeConsumer.receive(5000L));
        this.waitForDispatchFromLocalBroker(destinationStatistics, 1);
        this.assertLocalBrokerStatistics(destinationStatistics, 1);
        bridgeConsumer.close();
        Thread.sleep(2000L);
        includedProducer.send((Message)test);
        this.waitForDispatchFromLocalBroker(destinationStatistics, 2);
        this.assertLocalBrokerStatistics(destinationStatistics, 2);
        MessageConsumer bridgeConsumer2 = this.remoteSession.createConsumer((Destination)new ActiveMQQueue("Consumer.cons1.VirtualTopic.include.test.bar"));
        Assert.assertNotNull((Object)bridgeConsumer2.receive(5000L));
        this.assertRemoteAdvisoryCount(advisoryConsumer, 4);
        this.assertAdvisoryBrokerCounts(1, 2, 1);
    }

    @Test(timeout=60000L)
    public void testDynamicFlow() throws Exception {
        Assume.assumeTrue((boolean)this.isUseVirtualDestSubsOnCreation);
        this.doSetUp(true, null);
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        CompositeTopic compositeTopic = this.createCompositeTopic(this.testTopicName, new ActiveMQDestination[]{new ActiveMQQueue("include.test.bar.bridge")});
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{compositeTopic}, true);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)this.included);
        Thread.sleep(2000L);
        TextMessage test = this.localSession.createTextMessage("test");
        DestinationStatistics destinationStatistics = this.localBroker.getDestination((ActiveMQDestination)this.included).getDestinationStatistics();
        DestinationStatistics remoteDestStatistics = this.remoteBroker.getDestination((ActiveMQDestination)new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        this.waitForConsumerCount(destinationStatistics, 1);
        includedProducer.send((Message)test);
        this.waitForDispatchFromLocalBroker(destinationStatistics, 1);
        this.assertLocalBrokerStatistics(destinationStatistics, 1);
        Assert.assertEquals((String)"remote dest messages", (long)1L, (long)remoteDestStatistics.getMessages().getCount());
        this.assertRemoteAdvisoryCount(advisoryConsumer, 1);
        this.assertAdvisoryBrokerCounts(1, 1, 1);
    }

    @Test(timeout=60000L)
    public void testSecondNonIncludedCompositeTopicForwardSameQueue() throws Exception {
        Assume.assumeTrue((boolean)this.isUseVirtualDestSubsOnCreation);
        this.doSetUp(true, null);
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        CompositeTopic compositeTopic = this.createCompositeTopic("include.test.bar2", new ActiveMQDestination[]{new ActiveMQQueue("include.test.bar.bridge")});
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{compositeTopic}, true);
        Thread.sleep(2000L);
        CompositeTopic compositeTopic2 = this.createCompositeTopic(this.testTopicName, new ActiveMQDestination[]{new ActiveMQQueue("include.test.bar.bridge")});
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{compositeTopic, compositeTopic2}, true);
        Thread.sleep(2000L);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)this.included);
        TextMessage test = this.localSession.createTextMessage("test");
        DestinationStatistics destinationStatistics = this.localBroker.getDestination((ActiveMQDestination)this.included).getDestinationStatistics();
        DestinationStatistics remoteDestStatistics = this.remoteBroker.getDestination((ActiveMQDestination)new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        this.waitForConsumerCount(destinationStatistics, 1);
        includedProducer.send((Message)test);
        this.waitForDispatchFromLocalBroker(destinationStatistics, 1);
        this.assertLocalBrokerStatistics(destinationStatistics, 1);
        Assert.assertEquals((String)"remote dest messages", (long)1L, (long)remoteDestStatistics.getMessages().getCount());
        this.assertRemoteAdvisoryCount(advisoryConsumer, 1);
        this.assertAdvisoryBrokerCounts(2, 2, 2);
    }

    @Test(timeout=60000L)
    public void testSecondNonIncludedCompositeTopic() throws Exception {
        Assume.assumeTrue((boolean)this.isUseVirtualDestSubsOnCreation);
        this.doSetUp(true, null);
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        CompositeTopic compositeTopic = this.createCompositeTopic("include.test.bar2", new ActiveMQDestination[]{new ActiveMQQueue("include.test.bar.bridge2")});
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{compositeTopic}, true);
        Thread.sleep(2000L);
        CompositeTopic compositeTopic2 = this.createCompositeTopic(this.testTopicName, new ActiveMQDestination[]{new ActiveMQQueue("include.test.bar.bridge")});
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{compositeTopic, compositeTopic2}, true);
        Thread.sleep(2000L);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)this.included);
        TextMessage test = this.localSession.createTextMessage("test");
        DestinationStatistics destinationStatistics = this.localBroker.getDestination((ActiveMQDestination)this.included).getDestinationStatistics();
        DestinationStatistics remoteDestStatistics = this.remoteBroker.getDestination((ActiveMQDestination)new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        this.waitForConsumerCount(destinationStatistics, 1);
        includedProducer.send((Message)test);
        this.waitForDispatchFromLocalBroker(destinationStatistics, 1);
        this.assertLocalBrokerStatistics(destinationStatistics, 1);
        Assert.assertEquals((String)"remote dest messages", (long)1L, (long)remoteDestStatistics.getMessages().getCount());
        this.assertRemoteAdvisoryCount(advisoryConsumer, 1);
        this.assertAdvisoryBrokerCounts(2, 1, 1);
    }

    @Test(timeout=60000L)
    public void testNoUseVirtualDestinationSubscriptionsOnCreation() throws Exception {
        Assume.assumeTrue((!this.isUseVirtualDestSubsOnCreation ? 1 : 0) != 0);
        this.doSetUp(true, null);
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        CompositeTopic compositeTopic = this.createCompositeTopic(this.testTopicName, new ActiveMQDestination[]{new ActiveMQQueue("include.test.bar.bridge")});
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{compositeTopic}, true);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)this.included);
        Thread.sleep(2000L);
        TextMessage test = this.localSession.createTextMessage("test");
        DestinationStatistics destinationStatistics = this.localBroker.getDestination((ActiveMQDestination)this.included).getDestinationStatistics();
        DestinationStatistics remoteDestStatistics = this.remoteBroker.getDestination((ActiveMQDestination)new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        includedProducer.send((Message)test);
        Thread.sleep(2000L);
        this.waitForDispatchFromLocalBroker(destinationStatistics, 0);
        this.assertLocalBrokerStatistics(destinationStatistics, 0);
        Assert.assertEquals((String)"remote dest messages", (long)0L, (long)remoteDestStatistics.getMessages().getCount());
        this.assertRemoteAdvisoryCount(advisoryConsumer, 0);
        this.assertAdvisoryBrokerCounts(1, 0, 0);
    }

    @Test(timeout=60000L)
    public void testTwoTargetsRemove1() throws Exception {
        Assume.assumeTrue((boolean)this.isUseVirtualDestSubsOnCreation);
        this.doSetUp(true, null);
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        CompositeTopic compositeTopic = this.createCompositeTopic(this.testTopicName, new ActiveMQDestination[]{new ActiveMQQueue("include.test.bar.bridge"), new ActiveMQQueue("include.test.bar.bridge2")});
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{compositeTopic}, true);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)this.included);
        Thread.sleep(2000L);
        TextMessage test = this.localSession.createTextMessage("test");
        DestinationStatistics destinationStatistics = this.localBroker.getDestination((ActiveMQDestination)this.included).getDestinationStatistics();
        DestinationStatistics remoteDestStatistics = this.remoteBroker.getDestination((ActiveMQDestination)new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        DestinationStatistics remoteDestStatistics2 = this.remoteBroker.getDestination((ActiveMQDestination)new ActiveMQQueue("include.test.bar.bridge2")).getDestinationStatistics();
        Thread.sleep(2000L);
        this.assertRemoteAdvisoryCount(advisoryConsumer, 2);
        this.assertAdvisoryBrokerCounts(1, 2, 2);
        this.waitForConsumerCount(destinationStatistics, 1);
        includedProducer.send((Message)test);
        this.waitForDispatchFromLocalBroker(destinationStatistics, 1);
        this.assertLocalBrokerStatistics(destinationStatistics, 1);
        Assert.assertEquals((String)"remote dest messages", (long)1L, (long)remoteDestStatistics.getMessages().getCount());
        Assert.assertEquals((String)"remote2 dest messages", (long)1L, (long)remoteDestStatistics2.getMessages().getCount());
        compositeTopic = this.createCompositeTopic(this.testTopicName, new ActiveMQDestination[]{new ActiveMQQueue("include.test.bar.bridge")});
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{compositeTopic}, true);
        Thread.sleep(2000L);
        includedProducer.send((Message)test);
        this.waitForDispatchFromLocalBroker(destinationStatistics, 2);
        this.assertLocalBrokerStatistics(destinationStatistics, 2);
        Assert.assertEquals((String)"remote dest messages", (long)2L, (long)remoteDestStatistics.getMessages().getCount());
        Assert.assertEquals((String)"remote2 dest messages", (long)1L, (long)remoteDestStatistics2.getMessages().getCount());
        this.assertRemoteAdvisoryCount(advisoryConsumer, 3);
        this.assertAdvisoryBrokerCounts(1, 1, 1);
    }

    @Test(timeout=60000L)
    public void testTwoTargetsRemove1Destination() throws Exception {
        Assume.assumeTrue((boolean)this.isUseVirtualDestSubsOnCreation);
        this.doSetUp(true, null);
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        CompositeTopic compositeTopic = this.createCompositeTopic(this.testTopicName, new ActiveMQDestination[]{new ActiveMQQueue("include.test.bar.bridge"), new ActiveMQQueue("include.test.bar.bridge2")});
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{compositeTopic}, true);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)this.included);
        TextMessage test = this.localSession.createTextMessage("test");
        Thread.sleep(1000L);
        DestinationStatistics destinationStatistics = this.localBroker.getDestination((ActiveMQDestination)this.included).getDestinationStatistics();
        DestinationStatistics remoteDestStatistics = this.remoteBroker.getDestination((ActiveMQDestination)new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        DestinationStatistics remoteDestStatistics2 = this.remoteBroker.getDestination((ActiveMQDestination)new ActiveMQQueue("include.test.bar.bridge2")).getDestinationStatistics();
        this.waitForConsumerCount(destinationStatistics, 1);
        includedProducer.send((Message)test);
        this.waitForDispatchFromLocalBroker(destinationStatistics, 1);
        this.assertLocalBrokerStatistics(destinationStatistics, 1);
        Assert.assertEquals((String)"remote dest messages", (long)1L, (long)remoteDestStatistics.getMessages().getCount());
        Assert.assertEquals((String)"remote2 dest messages", (long)1L, (long)remoteDestStatistics2.getMessages().getCount());
        this.remoteBroker.removeDestination((ActiveMQDestination)new ActiveMQQueue("include.test.bar.bridge2"));
        Thread.sleep(2000L);
        this.assertRemoteAdvisoryCount(advisoryConsumer, 3);
        this.assertAdvisoryBrokerCounts(1, 1, 1);
        includedProducer.send((Message)test);
        this.waitForDispatchFromLocalBroker(destinationStatistics, 2);
        this.assertLocalBrokerStatistics(destinationStatistics, 2);
        Assert.assertEquals((String)"remote dest messages", (long)2L, (long)remoteDestStatistics.getMessages().getCount());
        this.assertRemoteAdvisoryCount(advisoryConsumer, 1);
        this.assertAdvisoryBrokerCounts(1, 2, 2);
    }

    @Test(timeout=60000L)
    public void testTwoTargetsRemoveBoth() throws Exception {
        Assume.assumeTrue((boolean)this.isUseVirtualDestSubsOnCreation);
        this.doSetUp(true, null);
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        CompositeTopic compositeTopic = this.createCompositeTopic(this.testTopicName, new ActiveMQDestination[]{new ActiveMQQueue("include.test.bar.bridge"), new ActiveMQQueue("include.test.bar.bridge2")});
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{compositeTopic}, true);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)this.included);
        TextMessage test = this.localSession.createTextMessage("test");
        Thread.sleep(1000L);
        DestinationStatistics destinationStatistics = this.localBroker.getDestination((ActiveMQDestination)this.included).getDestinationStatistics();
        DestinationStatistics remoteDestStatistics = this.remoteBroker.getDestination((ActiveMQDestination)new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        DestinationStatistics remoteDestStatistics2 = this.remoteBroker.getDestination((ActiveMQDestination)new ActiveMQQueue("include.test.bar.bridge2")).getDestinationStatistics();
        this.waitForConsumerCount(destinationStatistics, 1);
        includedProducer.send((Message)test);
        this.waitForDispatchFromLocalBroker(destinationStatistics, 1);
        this.assertLocalBrokerStatistics(destinationStatistics, 1);
        Assert.assertEquals((String)"remote dest messages", (long)1L, (long)remoteDestStatistics.getMessages().getCount());
        Assert.assertEquals((String)"remote2 dest messages", (long)1L, (long)remoteDestStatistics2.getMessages().getCount());
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[0], true);
        Thread.sleep(2000L);
        includedProducer.send((Message)test);
        Thread.sleep(2000L);
        this.assertLocalBrokerStatistics(destinationStatistics, 1);
        Assert.assertEquals((String)"remote dest messages", (long)1L, (long)remoteDestStatistics.getMessages().getCount());
        Assert.assertEquals((String)"remote2 dest messages", (long)1L, (long)remoteDestStatistics2.getMessages().getCount());
        this.assertRemoteAdvisoryCount(advisoryConsumer, 4);
        this.assertAdvisoryBrokerCounts(0, 0, 0);
    }

    @Test(timeout=60000L)
    public void testDestinationAddedFirst() throws Exception {
        Assume.assumeTrue((boolean)this.isUseVirtualDestSubsOnCreation);
        this.doSetUp(true, null);
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        this.remoteBroker.getBroker().addDestination(this.remoteBroker.getAdminConnectionContext(), (ActiveMQDestination)new ActiveMQQueue("include.test.bar.bridge"), false);
        Thread.sleep(2000L);
        CompositeTopic compositeTopic = this.createCompositeTopic(this.testTopicName, new ActiveMQDestination[]{new ActiveMQQueue("include.test.bar.bridge")});
        DestinationStatistics remoteDestStatistics = this.remoteBroker.getDestination((ActiveMQDestination)new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{compositeTopic}, true);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)this.included);
        TextMessage test = this.localSession.createTextMessage("test");
        Thread.sleep(1000L);
        DestinationStatistics destinationStatistics = this.localBroker.getDestination((ActiveMQDestination)this.included).getDestinationStatistics();
        this.waitForConsumerCount(destinationStatistics, 1);
        includedProducer.send((Message)test);
        this.waitForDispatchFromLocalBroker(destinationStatistics, 1);
        this.assertLocalBrokerStatistics(destinationStatistics, 1);
        Assert.assertEquals((String)"remote dest messages", (long)1L, (long)remoteDestStatistics.getMessages().getCount());
        this.assertRemoteAdvisoryCount(advisoryConsumer, 1);
        this.assertAdvisoryBrokerCounts(1, 1, 1);
    }

    @Test(timeout=60000L)
    public void testWithConsumer() throws Exception {
        this.doSetUp(true, null);
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        CompositeTopic compositeTopic = this.createCompositeTopic(this.testTopicName, new ActiveMQDestination[]{new ActiveMQQueue("include.test.bar.bridge")});
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{compositeTopic}, true);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)this.included);
        TextMessage test = this.localSession.createTextMessage("test");
        Thread.sleep(1000L);
        final DestinationStatistics destinationStatistics = this.localBroker.getDestination((ActiveMQDestination)this.included).getDestinationStatistics();
        MessageConsumer bridgeConsumer = this.remoteSession.createConsumer((Destination)new ActiveMQQueue("include.test.bar.bridge"));
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 1L == destinationStatistics.getConsumers().getCount();
            }
        });
        includedProducer.send((Message)test);
        Assert.assertNotNull((Object)bridgeConsumer.receive(5000L));
        this.waitForDispatchFromLocalBroker(destinationStatistics, 1);
        this.assertLocalBrokerStatistics(destinationStatistics, 1);
        this.assertRemoteAdvisoryCount(advisoryConsumer, 2, 1);
        if (this.isUseVirtualDestSubsOnCreation) {
            this.assertAdvisoryBrokerCounts(1, 2, 1);
        } else {
            this.assertAdvisoryBrokerCounts(1, 1, 0);
        }
    }

    @Test(timeout=60000L)
    public void testWith2ConsumersRemove1() throws Exception {
        this.doSetUp(true, null);
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        CompositeTopic compositeTopic = this.createCompositeTopic(this.testTopicName, new ActiveMQDestination[]{new ActiveMQQueue("include.test.bar.bridge")});
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{compositeTopic}, true);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)this.included);
        TextMessage test = this.localSession.createTextMessage("test");
        Thread.sleep(1000L);
        DestinationStatistics destinationStatistics = this.localBroker.getDestination((ActiveMQDestination)this.included).getDestinationStatistics();
        MessageConsumer bridgeConsumer = this.remoteSession.createConsumer((Destination)new ActiveMQQueue("include.test.bar.bridge"));
        MessageConsumer bridgeConsumer2 = this.remoteSession.createConsumer((Destination)new ActiveMQQueue("include.test.bar.bridge"));
        this.waitForConsumerCount(destinationStatistics, 1);
        includedProducer.send((Message)test);
        this.waitForDispatchFromLocalBroker(destinationStatistics, 1);
        Assert.assertTrue((bridgeConsumer.receive(5000L) != null || bridgeConsumer2.receive(5000L) != null ? 1 : 0) != 0);
        this.assertLocalBrokerStatistics(destinationStatistics, 1);
        bridgeConsumer2.close();
        includedProducer.send((Message)test);
        this.waitForDispatchFromLocalBroker(destinationStatistics, 2);
        this.assertLocalBrokerStatistics(destinationStatistics, 2);
        Assert.assertNotNull((Object)bridgeConsumer.receive(5000L));
        this.assertRemoteAdvisoryCount(advisoryConsumer, 4, 3);
        if (this.isUseVirtualDestSubsOnCreation) {
            this.assertAdvisoryBrokerCounts(1, 2, 1);
        } else {
            this.assertAdvisoryBrokerCounts(1, 1, 0);
        }
    }

    @Test(timeout=60000L)
    public void testWith2ConsumersRemoveBoth() throws Exception {
        Assume.assumeTrue((!this.isUseVirtualDestSubsOnCreation ? 1 : 0) != 0);
        this.doSetUp(true, null);
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        CompositeTopic compositeTopic = this.createCompositeTopic(this.testTopicName, new ActiveMQDestination[]{new ActiveMQQueue("include.test.bar.bridge")});
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{compositeTopic}, true);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)this.included);
        TextMessage test = this.localSession.createTextMessage("test");
        Thread.sleep(1000L);
        DestinationStatistics destinationStatistics = this.localBroker.getDestination((ActiveMQDestination)this.included).getDestinationStatistics();
        MessageConsumer bridgeConsumer = this.remoteSession.createConsumer((Destination)new ActiveMQQueue("include.test.bar.bridge"));
        MessageConsumer bridgeConsumer2 = this.remoteSession.createConsumer((Destination)new ActiveMQQueue("include.test.bar.bridge"));
        this.waitForConsumerCount(destinationStatistics, 1);
        this.assertAdvisoryBrokerCounts(1, 2, 0);
        includedProducer.send((Message)test);
        this.waitForDispatchFromLocalBroker(destinationStatistics, 1);
        Assert.assertTrue((bridgeConsumer.receive(5000L) != null || bridgeConsumer2.receive(5000L) != null ? 1 : 0) != 0);
        this.assertLocalBrokerStatistics(destinationStatistics, 1);
        bridgeConsumer.close();
        bridgeConsumer2.close();
        Thread.sleep(2000L);
        includedProducer.send((Message)test);
        Thread.sleep(2000L);
        this.assertLocalBrokerStatistics(destinationStatistics, 1);
        this.assertRemoteAdvisoryCount(advisoryConsumer, 4);
        this.assertAdvisoryBrokerCounts(1, 0, 0);
    }

    @Test(timeout=60000L)
    public void testExcluded() throws Exception {
        this.doSetUp(true, null);
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        CompositeTopic compositeTopic = this.createCompositeTopic("excluded.test.bar", new ActiveMQDestination[]{new ActiveMQQueue("excluded.test.bar.bridge")});
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{compositeTopic}, true);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)this.excluded);
        TextMessage test = this.localSession.createTextMessage("test");
        Thread.sleep(1000L);
        MessageConsumer bridgeConsumer = this.remoteSession.createConsumer((Destination)new ActiveMQQueue("excluded.test.bar.bridge"));
        Thread.sleep(2000L);
        includedProducer.send((Message)test);
        Assert.assertNull((Object)bridgeConsumer.receive(5000L));
        DestinationStatistics destinationStatistics = this.localBroker.getDestination((ActiveMQDestination)this.excluded).getDestinationStatistics();
        Assert.assertEquals((String)"broker consumer count", (long)0L, (long)destinationStatistics.getConsumers().getCount());
        this.assertLocalBrokerStatistics(destinationStatistics, 0);
        this.assertRemoteAdvisoryCount(advisoryConsumer, 0);
        if (this.isUseVirtualDestSubsOnCreation) {
            this.assertAdvisoryBrokerCounts(1, 2, 1);
        } else {
            this.assertAdvisoryBrokerCounts(1, 1, 0);
        }
    }

    @Test(timeout=60000L)
    public void testSourceQueue() throws Exception {
        this.doSetUp(true, null);
        MessageConsumer advisoryConsumer = this.getQueueVirtualDestinationAdvisoryConsumer(this.testQueueName);
        CompositeQueue compositeQueue = this.createCompositeQueue(this.testQueueName, new ActiveMQDestination[]{new ActiveMQQueue("include.test.foo.bridge")});
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{compositeQueue}, true);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)new ActiveMQQueue(this.testQueueName));
        Thread.sleep(2000L);
        TextMessage test = this.localSession.createTextMessage("test");
        DestinationStatistics destinationStatistics = this.localBroker.getDestination((ActiveMQDestination)new ActiveMQQueue(this.testQueueName)).getDestinationStatistics();
        MessageConsumer bridgeConsumer = this.remoteSession.createConsumer((Destination)new ActiveMQQueue("include.test.foo.bridge"));
        this.waitForConsumerCount(destinationStatistics, 1);
        includedProducer.send((Message)test);
        Assert.assertNotNull((Object)bridgeConsumer.receive(5000L));
        DestinationStatistics remoteStats = this.remoteBroker.getDestination((ActiveMQDestination)new ActiveMQQueue(this.testQueueName)).getDestinationStatistics();
        this.waitForDispatchFromLocalBroker(destinationStatistics, 1);
        Assert.assertEquals((String)"broker consumer count", (long)1L, (long)destinationStatistics.getConsumers().getCount());
        this.assertLocalBrokerStatistics(destinationStatistics, 1);
        Assert.assertEquals((String)"message count", (long)0L, (long)remoteStats.getMessages().getCount());
        this.assertRemoteAdvisoryCount(advisoryConsumer, 2, 1);
        if (this.isUseVirtualDestSubsOnCreation) {
            this.assertAdvisoryBrokerCounts(1, 2, 1);
        } else {
            this.assertAdvisoryBrokerCounts(1, 1, 0);
        }
    }

    @Test(timeout=60000L)
    public void testFlowRemoved() throws Exception {
        CompositeTopic compositeTopic = this.createCompositeTopic(this.testTopicName, new ActiveMQDestination[]{new ActiveMQQueue("include.test.bar.bridge")});
        this.doSetUp(true, new VirtualDestination[]{compositeTopic});
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        Thread.sleep(2000L);
        this.remoteBroker.getBroker().addDestination(this.remoteBroker.getAdminConnectionContext(), (ActiveMQDestination)new ActiveMQQueue("include.test.bar.bridge"), false);
        Thread.sleep(2000L);
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[0], true);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)this.included);
        Thread.sleep(2000L);
        TextMessage test = this.localSession.createTextMessage("test");
        MessageConsumer bridgeConsumer = this.remoteSession.createConsumer((Destination)new ActiveMQQueue("include.test.bar.bridge"));
        Thread.sleep(2000L);
        includedProducer.send((Message)test);
        Assert.assertNull((Object)bridgeConsumer.receive(5000L));
        this.assertRemoteAdvisoryCount(advisoryConsumer, 2, 0);
        this.assertAdvisoryBrokerCounts(0, 0, 0);
    }

    @Test(timeout=60000L)
    public void testReplay() throws Exception {
        Assume.assumeTrue((boolean)this.isUseVirtualDestSubsOnCreation);
        CompositeTopic compositeTopic = this.createCompositeTopic(this.testTopicName, new ActiveMQDestination[]{new ActiveMQQueue("include.test.bar.bridge")});
        this.doSetUp(true, new VirtualDestination[]{compositeTopic}, false);
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        Thread.sleep(2000L);
        this.remoteBroker.getBroker().addDestination(this.remoteBroker.getAdminConnectionContext(), (ActiveMQDestination)new ActiveMQQueue("include.test.bar.bridge"), false);
        Thread.sleep(2000L);
        this.localBroker.addNetworkConnector(this.connector);
        this.connector.start();
        Thread.sleep(2000L);
        this.assertRemoteAdvisoryCount(advisoryConsumer, 2);
        this.assertAdvisoryBrokerCounts(1, 1, 1);
    }

    @Test(timeout=60000L)
    public void testReplayWithConsumer() throws Exception {
        CompositeTopic compositeTopic = this.createCompositeTopic(this.testTopicName, new ActiveMQDestination[]{new ActiveMQQueue("include.test.bar.bridge")});
        this.doSetUp(true, new VirtualDestination[]{compositeTopic}, false);
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        Thread.sleep(2000L);
        this.remoteBroker.getBroker().addDestination(this.remoteBroker.getAdminConnectionContext(), (ActiveMQDestination)new ActiveMQQueue("include.test.bar.bridge"), false);
        Thread.sleep(2000L);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)this.included);
        TextMessage test = this.localSession.createTextMessage("test");
        MessageConsumer bridgeConsumer = this.remoteSession.createConsumer((Destination)new ActiveMQQueue("include.test.bar.bridge"));
        Thread.sleep(2000L);
        this.localBroker.addNetworkConnector(this.connector);
        this.connector.start();
        Thread.sleep(2000L);
        includedProducer.send((Message)test);
        Assert.assertNotNull((Object)bridgeConsumer.receive(5000L));
        this.assertRemoteAdvisoryCount(advisoryConsumer, 4, 2);
        if (this.isUseVirtualDestSubsOnCreation) {
            this.assertAdvisoryBrokerCounts(1, 2, 1);
        } else {
            this.assertAdvisoryBrokerCounts(1, 1, 0);
        }
    }

    @Test(timeout=60000L)
    public void testRemovedIfNoConsumer() throws Exception {
        Assume.assumeTrue((boolean)this.isUseVirtualDestSubsOnCreation);
        CompositeTopic compositeTopic = this.createCompositeTopic(this.testTopicName, new ActiveMQDestination[]{new ActiveMQQueue("include.test.bar.bridge")});
        this.doSetUp(true, new VirtualDestination[]{compositeTopic});
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        Thread.sleep(2000L);
        DestinationStatistics destinationStatistics = this.localBroker.getDestination((ActiveMQDestination)new ActiveMQQueue(this.testQueueName)).getDestinationStatistics();
        DestinationStatistics remoteDestStatistics = this.remoteBroker.getDestination((ActiveMQDestination)new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        Thread.sleep(2000L);
        this.assertAdvisoryBrokerCounts(1, 1, 1);
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[0], true);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)this.included);
        Thread.sleep(2000L);
        TextMessage test = this.localSession.createTextMessage("test");
        includedProducer.send((Message)test);
        Assert.assertEquals((String)"broker consumer count", (long)0L, (long)destinationStatistics.getConsumers().getCount());
        this.assertLocalBrokerStatistics(destinationStatistics, 0);
        Assert.assertEquals((String)"remote dest messages", (long)0L, (long)remoteDestStatistics.getMessages().getCount());
        this.assertRemoteAdvisoryCount(advisoryConsumer, 2);
        this.assertAdvisoryBrokerCounts(0, 0, 0);
    }

    @Test(timeout=60000L)
    public void testToTopic() throws Exception {
        this.doSetUp(true, null);
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        CompositeTopic compositeTopic = this.createCompositeTopic(this.testTopicName, new ActiveMQDestination[]{new ActiveMQTopic("include.test.bar.bridge")});
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{compositeTopic}, true);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)this.included);
        Thread.sleep(2000L);
        TextMessage test = this.localSession.createTextMessage("test");
        MessageConsumer bridgeConsumer = this.remoteSession.createConsumer((Destination)new ActiveMQTopic("include.test.bar.bridge"));
        Thread.sleep(2000L);
        includedProducer.send((Message)test);
        Assert.assertNotNull((Object)bridgeConsumer.receive(5000L));
        this.assertRemoteAdvisoryCount(advisoryConsumer, 1);
        this.assertAdvisoryBrokerCounts(1, 1, 0);
    }

    @Test(timeout=60000L)
    public void testToTopicNoConsumer() throws Exception {
        this.doSetUp(true, null);
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        CompositeTopic compositeTopic = this.createCompositeTopic(this.testTopicName, new ActiveMQDestination[]{new ActiveMQTopic("include.test.bar.bridge")});
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{compositeTopic}, true);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)this.included);
        Thread.sleep(2000L);
        TextMessage test = this.localSession.createTextMessage("test");
        includedProducer.send((Message)test);
        DestinationStatistics destinationStatistics = this.localBroker.getDestination((ActiveMQDestination)this.excluded).getDestinationStatistics();
        Assert.assertEquals((String)"broker consumer count", (long)0L, (long)destinationStatistics.getConsumers().getCount());
        this.assertLocalBrokerStatistics(destinationStatistics, 0);
        this.assertRemoteAdvisoryCount(advisoryConsumer, 0);
        this.assertAdvisoryBrokerCounts(1, 0, 0);
    }

    @Test(timeout=60000L)
    public void testToTopicWithDurable() throws Exception {
        this.doSetUp(true, null);
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        CompositeTopic compositeTopic = this.createCompositeTopic(this.testTopicName, new ActiveMQDestination[]{new ActiveMQTopic("include.test.bar.bridge")});
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{compositeTopic}, true);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)this.included);
        Thread.sleep(2000L);
        TextMessage test = this.localSession.createTextMessage("test");
        final DestinationStatistics destinationStatistics = this.localBroker.getDestination((ActiveMQDestination)this.included).getDestinationStatistics();
        TopicSubscriber bridgeConsumer = this.remoteSession.createDurableSubscriber((Topic)new ActiveMQTopic("include.test.bar.bridge"), "sub1");
        Thread.sleep(2000L);
        includedProducer.send((Message)test);
        Assert.assertNotNull((Object)bridgeConsumer.receive(5000L));
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 1L == destinationStatistics.getDequeues().getCount();
            }
        });
        Assert.assertEquals((String)"broker dest stat dispatched", (long)1L, (long)destinationStatistics.getDispatched().getCount());
        Assert.assertEquals((String)"broker dest stat dequeues", (long)1L, (long)destinationStatistics.getDequeues().getCount());
        this.assertRemoteAdvisoryCount(advisoryConsumer, 1);
        this.assertAdvisoryBrokerCounts(1, 1, 0);
    }

    @Test(timeout=60000L)
    public void testToTopicWithDurableOffline() throws Exception {
        this.doSetUp(true, null);
        MessageConsumer advisoryConsumer = this.getVirtualDestinationAdvisoryConsumer(this.testTopicName);
        CompositeTopic compositeTopic = this.createCompositeTopic(this.testTopicName, new ActiveMQDestination[]{new ActiveMQTopic("include.test.bar.bridge")});
        this.runtimeBroker.setVirtualDestinations(new VirtualDestination[]{compositeTopic}, true);
        MessageProducer includedProducer = this.localSession.createProducer((Destination)this.included);
        Thread.sleep(2000L);
        TextMessage test = this.localSession.createTextMessage("test");
        final DestinationStatistics destinationStatistics = this.localBroker.getDestination((ActiveMQDestination)this.included).getDestinationStatistics();
        TopicSubscriber bridgeConsumer = this.remoteSession.createDurableSubscriber((Topic)new ActiveMQTopic("include.test.bar.bridge"), "sub1");
        bridgeConsumer.close();
        Thread.sleep(2000L);
        includedProducer.send((Message)test);
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return 1L == destinationStatistics.getDequeues().getCount() && destinationStatistics.getDispatched().getCount() == 1L;
            }
        });
        Assert.assertEquals((String)"broker dest stat dispatched", (long)1L, (long)destinationStatistics.getDispatched().getCount());
        Assert.assertEquals((String)"broker dest stat dequeues", (long)1L, (long)destinationStatistics.getDequeues().getCount());
        TopicSubscriber bridgeConsumer2 = this.remoteSession.createDurableSubscriber((Topic)new ActiveMQTopic("include.test.bar.bridge"), "sub1");
        Assert.assertNotNull((Object)bridgeConsumer2.receive(5000L));
        Thread.sleep(2000L);
        Assert.assertEquals((String)"broker dest stat dispatched", (long)1L, (long)destinationStatistics.getDispatched().getCount());
        Assert.assertEquals((String)"broker dest stat dequeues", (long)1L, (long)destinationStatistics.getDequeues().getCount());
        this.assertRemoteAdvisoryCount(advisoryConsumer, 3);
        this.assertAdvisoryBrokerCounts(1, 1, 0);
    }

    @Before
    public void setUp() throws Exception {
    }

    @After
    public void tearDown() throws Exception {
        this.doTearDown();
    }

    protected void doTearDown() throws Exception {
        if (this.localConnection != null) {
            this.localConnection.close();
        }
        if (this.remoteConnection != null) {
            this.remoteConnection.close();
        }
        if (this.localBroker != null) {
            this.localBroker.stop();
        }
        if (this.remoteBroker != null) {
            this.remoteBroker.stop();
        }
    }

    protected void doSetUp(boolean deleteAllMessages, VirtualDestination[] remoteVirtualDests) throws Exception {
        this.doSetUp(deleteAllMessages, remoteVirtualDests, true);
    }

    protected void doSetUp(boolean deleteAllMessages, VirtualDestination[] remoteVirtualDests, boolean startNetworkConnector) throws Exception {
        this.remoteBroker = this.createRemoteBroker(this.isUseVirtualDestSubsOnCreation, remoteVirtualDests);
        this.remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        this.remoteBroker.start();
        this.remoteBroker.waitUntilStarted();
        this.localBroker = this.createLocalBroker(startNetworkConnector);
        this.localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        this.localBroker.start();
        this.localBroker.waitUntilStarted();
        URI localURI = this.localBroker.getVmConnectorURI();
        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
        fac.setAlwaysSyncSend(true);
        fac.setDispatchAsync(false);
        this.localConnection = fac.createConnection();
        this.localConnection.setClientID("clientId");
        this.localConnection.start();
        URI remoteURI = this.remoteBroker.getVmConnectorURI();
        fac = new ActiveMQConnectionFactory(remoteURI);
        this.remoteConnection = fac.createConnection();
        this.remoteConnection.setClientID("clientId");
        this.remoteConnection.start();
        this.included = new ActiveMQTopic(this.testTopicName);
        this.excluded = new ActiveMQTopic("exclude.test.bar");
        this.localSession = this.localConnection.createSession(false, 1);
        this.remoteSession = this.remoteConnection.createSession(false, 1);
    }

    protected BrokerService createLocalBroker(boolean startNetworkConnector) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setMonitorConnectionSplits(true);
        brokerService.setDataDirectoryFile(this.tempFolder.newFolder());
        brokerService.setBrokerName("localBroker");
        this.connector = new DiscoveryNetworkConnector(new URI("static:(tcp://localhost:61617)"));
        this.connector.setName("networkConnector");
        this.connector.setDynamicOnly(false);
        this.connector.setDecreaseNetworkConsumerPriority(false);
        this.connector.setConduitSubscriptions(true);
        this.connector.setDuplex(this.isDuplex);
        this.connector.setUseVirtualDestSubs(true);
        this.connector.setDynamicallyIncludedDestinations((List)Lists.newArrayList((Object[])new ActiveMQDestination[]{new ActiveMQQueue(this.testQueueName), new ActiveMQTopic(this.testTopicName), new ActiveMQTopic("VirtualTopic.>")}));
        this.connector.setExcludedDestinations((List)Lists.newArrayList((Object[])new ActiveMQDestination[]{new ActiveMQQueue("exclude.test.foo"), new ActiveMQTopic("exclude.test.bar")}));
        if (startNetworkConnector) {
            brokerService.addNetworkConnector(this.connector);
        }
        brokerService.addConnector("tcp://localhost:61616");
        return brokerService;
    }

    protected BrokerService createRemoteBroker(boolean isUsevirtualDestinationSubscriptionsOnCreation, VirtualDestination[] remoteVirtualDests) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName("remoteBroker");
        brokerService.setUseJmx(false);
        brokerService.setDataDirectoryFile(this.tempFolder.newFolder());
        brokerService.setPlugins(new BrokerPlugin[]{new JavaRuntimeConfigurationPlugin()});
        brokerService.setUseVirtualDestSubs(true);
        brokerService.setUseVirtualDestSubsOnCreation(isUsevirtualDestinationSubscriptionsOnCreation);
        if (remoteVirtualDests != null) {
            VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
            interceptor.setVirtualDestinations(remoteVirtualDests);
            brokerService.setDestinationInterceptors(new DestinationInterceptor[]{interceptor});
        }
        this.runtimeBroker = (JavaRuntimeConfigurationBroker)brokerService.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class);
        this.remoteAdvisoryBroker = (AdvisoryBroker)brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
        DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI("static:(tcp://localhost:61616)"));
        brokerService.addNetworkConnector((NetworkConnector)connector);
        brokerService.addConnector("tcp://localhost:61617");
        return brokerService;
    }

    protected CompositeTopic createCompositeTopic(String name, ActiveMQDestination ... forwardTo) {
        CompositeTopic compositeTopic = new CompositeTopic();
        compositeTopic.setName(name);
        compositeTopic.setForwardOnly(true);
        compositeTopic.setForwardTo((Collection)Lists.newArrayList((Object[])forwardTo));
        return compositeTopic;
    }

    protected CompositeQueue createCompositeQueue(String name, ActiveMQDestination ... forwardTo) {
        CompositeQueue compositeQueue = new CompositeQueue();
        compositeQueue.setName(name);
        compositeQueue.setForwardOnly(true);
        compositeQueue.setForwardTo((Collection)Lists.newArrayList((Object[])forwardTo));
        return compositeQueue;
    }

    protected void waitForConsumerCount(final DestinationStatistics destinationStatistics, final int count) throws Exception {
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return (long)count == destinationStatistics.getConsumers().getCount();
            }
        });
    }

    protected void waitForDispatchFromLocalBroker(final DestinationStatistics destinationStatistics, final int count) throws Exception {
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return (long)count == destinationStatistics.getDequeues().getCount() && (long)count == destinationStatistics.getDispatched().getCount() && (long)count == destinationStatistics.getForwards().getCount();
            }
        });
    }

    protected MessageConsumer getVirtualDestinationAdvisoryConsumer(String topic) throws JMSException {
        return this.remoteSession.createConsumer((Destination)AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic((ActiveMQDestination)new ActiveMQTopic(topic)));
    }

    protected MessageConsumer getQueueVirtualDestinationAdvisoryConsumer(String queue) throws JMSException {
        return this.remoteSession.createConsumer((Destination)AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic((ActiveMQDestination)new ActiveMQQueue(queue)));
    }

    protected void assertLocalBrokerStatistics(DestinationStatistics localStatistics, int count) {
        Assert.assertEquals((String)"local broker dest stat dispatched", (long)count, (long)localStatistics.getDispatched().getCount());
        Assert.assertEquals((String)"local broker dest stat dequeues", (long)count, (long)localStatistics.getDequeues().getCount());
        Assert.assertEquals((String)"local broker dest stat forwards", (long)count, (long)localStatistics.getForwards().getCount());
    }

    protected void assertRemoteAdvisoryCount(MessageConsumer advisoryConsumer, int count) throws JMSException {
        int available = 0;
        ActiveMQMessage message = null;
        while ((message = (ActiveMQMessage)advisoryConsumer.receive(1000L)) != null) {
            ++available;
            LOG.debug("advisory data structure: {}", (Object)message.getDataStructure());
        }
        Assert.assertEquals((long)count, (long)available);
    }

    protected void assertRemoteAdvisoryCount(MessageConsumer advisoryConsumer, int isSubOnCreationCount, int isNotSubOnCreationCount) throws JMSException {
        if (this.isUseVirtualDestSubsOnCreation) {
            this.assertRemoteAdvisoryCount(advisoryConsumer, isSubOnCreationCount);
        } else {
            this.assertRemoteAdvisoryCount(advisoryConsumer, isNotSubOnCreationCount);
        }
    }

    protected void assertAdvisoryBrokerCounts(int virtualDestinationsCount, int virtualDestinationConsumersCount, int brokerConsumerDestsCount) throws Exception {
        Field virtualDestinationsField = AdvisoryBroker.class.getDeclaredField("virtualDestinations");
        Field virtualDestinationConsumersField = AdvisoryBroker.class.getDeclaredField("virtualDestinationConsumers");
        Field brokerConsumerDestsField = AdvisoryBroker.class.getDeclaredField("brokerConsumerDests");
        virtualDestinationsField.setAccessible(true);
        virtualDestinationConsumersField.setAccessible(true);
        brokerConsumerDestsField.setAccessible(true);
        Set virtualDestinations = (Set)virtualDestinationsField.get(this.remoteAdvisoryBroker);
        ConcurrentMap virtualDestinationConsumers = (ConcurrentMap)virtualDestinationConsumersField.get(this.remoteAdvisoryBroker);
        ConcurrentMap brokerConsumerDests = (ConcurrentMap)brokerConsumerDestsField.get(this.remoteAdvisoryBroker);
        Assert.assertEquals((long)virtualDestinationsCount, (long)virtualDestinations.size());
        Assert.assertEquals((long)virtualDestinationConsumersCount, (long)virtualDestinationConsumers.size());
        Assert.assertEquals((long)brokerConsumerDestsCount, (long)brokerConsumerDests.size());
    }
}

