/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.LinkedList;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.MessageIdList;

public class TwoBrokerVirtualTopicForwardingTest
extends JmsMultipleBrokersTestSupport {
    public void testBridgeVirtualTopicQueues() throws Exception {
        this.bridgeAndConfigureBrokers("BrokerA", "BrokerB");
        this.startAllBrokers();
        this.waitForBridgeFormation();
        MessageConsumer clientA = this.createConsumer("BrokerA", (Destination)this.createDestination("Consumer.A.VirtualTopic.tempTopic", false));
        MessageConsumer clientB = this.createConsumer("BrokerB", (Destination)this.createDestination("Consumer.B.VirtualTopic.tempTopic", false));
        Thread.sleep(500L);
        ActiveMQQueue queueA = new ActiveMQQueue("Consumer.A.VirtualTopic.tempTopic");
        org.apache.activemq.broker.region.Destination destination = TestSupport.getDestination(((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerA")).broker, (ActiveMQDestination)queueA);
        TwoBrokerVirtualTopicForwardingTest.assertEquals((int)1, (int)destination.getConsumers().size());
        ActiveMQQueue queueB = new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic");
        destination = TestSupport.getDestination(((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerA")).broker, (ActiveMQDestination)queueB);
        TwoBrokerVirtualTopicForwardingTest.assertEquals((int)1, (int)destination.getConsumers().size());
        ActiveMQTopic virtualTopic = new ActiveMQTopic("VirtualTopic.tempTopic");
        TwoBrokerVirtualTopicForwardingTest.assertNull((Object)TestSupport.getDestination(((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerA")).broker, (ActiveMQDestination)virtualTopic));
        TwoBrokerVirtualTopicForwardingTest.assertNull((Object)TestSupport.getDestination(((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerB")).broker, (ActiveMQDestination)virtualTopic));
        this.sendMessages("BrokerA", (Destination)virtualTopic, 1);
        MessageIdList msgsA = this.getConsumerMessages("BrokerA", clientA);
        MessageIdList msgsB = this.getConsumerMessages("BrokerB", clientB);
        msgsA.waitForMessagesToArrive(1);
        msgsB.waitForMessagesToArrive(1);
        Thread.sleep(2000L);
        TwoBrokerVirtualTopicForwardingTest.assertEquals((int)1, (int)msgsA.getMessageCount());
        TwoBrokerVirtualTopicForwardingTest.assertEquals((int)1, (int)msgsB.getMessageCount());
    }

    public void testDontBridgeQueuesWithOnlyQueueConsumers() throws Exception {
        this.dontBridgeVirtualTopicConsumerQueues("BrokerA", "BrokerB");
        this.startAllBrokers();
        this.waitForBridgeFormation();
        MessageConsumer clientA = this.createConsumer("BrokerA", (Destination)this.createDestination("Consumer.A.VirtualTopic.tempTopic", false));
        MessageConsumer clientB = this.createConsumer("BrokerB", (Destination)this.createDestination("Consumer.B.VirtualTopic.tempTopic", false));
        Thread.sleep(500L);
        ActiveMQQueue queueA = new ActiveMQQueue("Consumer.A.VirtualTopic.tempTopic");
        org.apache.activemq.broker.region.Destination destination = TestSupport.getDestination(((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerA")).broker, (ActiveMQDestination)queueA);
        TwoBrokerVirtualTopicForwardingTest.assertEquals((int)1, (int)destination.getConsumers().size());
        ActiveMQQueue queueB = new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic");
        destination = TestSupport.getDestination(((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerA")).broker, (ActiveMQDestination)queueB);
        TwoBrokerVirtualTopicForwardingTest.assertNull((Object)destination);
        ActiveMQTopic virtualTopic = new ActiveMQTopic("VirtualTopic.tempTopic");
        TwoBrokerVirtualTopicForwardingTest.assertNull((Object)TestSupport.getDestination(((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerA")).broker, (ActiveMQDestination)virtualTopic));
        TwoBrokerVirtualTopicForwardingTest.assertNull((Object)TestSupport.getDestination(((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerB")).broker, (ActiveMQDestination)virtualTopic));
        this.sendMessages("BrokerA", (Destination)virtualTopic, 1);
        MessageIdList msgsA = this.getConsumerMessages("BrokerA", clientA);
        MessageIdList msgsB = this.getConsumerMessages("BrokerB", clientB);
        msgsA.waitForMessagesToArrive(1);
        msgsB.waitForMessagesToArrive(0);
        Thread.sleep(2000L);
        TwoBrokerVirtualTopicForwardingTest.assertEquals((int)1, (int)msgsA.getMessageCount());
        TwoBrokerVirtualTopicForwardingTest.assertEquals((int)0, (int)msgsB.getMessageCount());
    }

    public void testDontBridgeQueuesWithBothTypesConsumers() throws Exception {
        this.dontBridgeVirtualTopicConsumerQueues("BrokerA", "BrokerB");
        this.startAllBrokers();
        this.waitForBridgeFormation();
        MessageConsumer clientA = this.createConsumer("BrokerA", (Destination)this.createDestination("Consumer.A.VirtualTopic.tempTopic", false));
        MessageConsumer clientB = this.createConsumer("BrokerB", (Destination)this.createDestination("Consumer.B.VirtualTopic.tempTopic", false));
        MessageConsumer clientC = this.createConsumer("BrokerB", (Destination)this.createDestination("VirtualTopic.tempTopic", true));
        Thread.sleep(500L);
        ActiveMQQueue queueA = new ActiveMQQueue("Consumer.A.VirtualTopic.tempTopic");
        org.apache.activemq.broker.region.Destination destination = TestSupport.getDestination(((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerA")).broker, (ActiveMQDestination)queueA);
        TwoBrokerVirtualTopicForwardingTest.assertEquals((int)1, (int)destination.getConsumers().size());
        ActiveMQQueue queueB = new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic");
        destination = TestSupport.getDestination(((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerA")).broker, (ActiveMQDestination)queueB);
        TwoBrokerVirtualTopicForwardingTest.assertNull((Object)destination);
        ActiveMQTopic virtualTopic = new ActiveMQTopic("VirtualTopic.tempTopic");
        TwoBrokerVirtualTopicForwardingTest.assertNotNull((Object)TestSupport.getDestination(((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerA")).broker, (ActiveMQDestination)virtualTopic));
        TwoBrokerVirtualTopicForwardingTest.assertNotNull((Object)TestSupport.getDestination(((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)"BrokerB")).broker, (ActiveMQDestination)virtualTopic));
        this.sendMessages("BrokerA", (Destination)virtualTopic, 1);
        MessageIdList msgsA = this.getConsumerMessages("BrokerA", clientA);
        MessageIdList msgsB = this.getConsumerMessages("BrokerB", clientB);
        msgsA.waitForMessagesToArrive(1);
        msgsB.waitForMessagesToArrive(1);
        Thread.sleep(2000L);
        TwoBrokerVirtualTopicForwardingTest.assertEquals((int)1, (int)msgsA.getMessageCount());
        TwoBrokerVirtualTopicForwardingTest.assertEquals((int)1, (int)msgsB.getMessageCount());
    }

    private void bridgeAndConfigureBrokers(String local, String remote) throws Exception {
        NetworkConnector bridge = this.bridgeBrokers(local, remote);
        bridge.setDecreaseNetworkConsumerPriority(true);
    }

    private void dontBridgeVirtualTopicConsumerQueues(String local, String remote) throws Exception {
        NetworkConnector bridge = this.bridgeBrokers(local, remote);
        bridge.setDecreaseNetworkConsumerPriority(true);
        LinkedList<ActiveMQQueue> excludedDestinations = new LinkedList<ActiveMQQueue>();
        excludedDestinations.add(new ActiveMQQueue("Consumer.*.VirtualTopic.>"));
        bridge.setExcludedDestinations(excludedDestinations);
    }

    @Override
    public void setUp() throws Exception {
        super.setAutoFail(true);
        super.setUp();
        String options = new String("?useJmx=false&deleteAllMessagesOnStartup=true");
        this.createAndConfigureBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options));
        this.createAndConfigureBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options));
    }

    private BrokerService createAndConfigureBroker(URI uri) throws Exception {
        BrokerService broker = this.createBroker(uri);
        this.configurePersistenceAdapter(broker);
        return broker;
    }

    protected void configurePersistenceAdapter(BrokerService broker) throws IOException {
        File dataFileDir = new File("target/test-amq-data/kahadb/" + broker.getBrokerName());
        KahaDBStore kaha = new KahaDBStore();
        kaha.setDirectory(dataFileDir);
        broker.setPersistenceAdapter((PersistenceAdapter)kaha);
    }
}

