/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.MessageIdList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThreeBrokerVirtualTopicNetworkTest
extends JmsMultipleBrokersTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(ThreeBrokerVirtualTopicNetworkTest.class);
    protected static final int MESSAGE_COUNT = 1;
    public boolean dynamicOnly = false;

    public void testNetworkVirtualTopic() throws Exception {
        int networkTTL = 6;
        boolean conduitSubs = true;
        this.bridgeAndConfigureBrokers("BrokerA", "BrokerB", this.dynamicOnly, networkTTL, conduitSubs);
        this.bridgeAndConfigureBrokers("BrokerA", "BrokerC", this.dynamicOnly, networkTTL, conduitSubs);
        this.bridgeAndConfigureBrokers("BrokerB", "BrokerA", this.dynamicOnly, networkTTL, conduitSubs);
        this.bridgeAndConfigureBrokers("BrokerB", "BrokerC", this.dynamicOnly, networkTTL, conduitSubs);
        this.bridgeAndConfigureBrokers("BrokerC", "BrokerA", this.dynamicOnly, networkTTL, conduitSubs);
        this.bridgeAndConfigureBrokers("BrokerC", "BrokerB", this.dynamicOnly, networkTTL, conduitSubs);
        this.startAllBrokers();
        this.waitForBridgeFormation();
        ActiveMQDestination dest = this.createDestination("TEST.FOO", true);
        MessageConsumer clientA = this.createConsumer("BrokerA", (Destination)this.createDestination("Consumer.A.TEST.FOO", false));
        MessageConsumer clientB = this.createConsumer("BrokerB", (Destination)this.createDestination("Consumer.B.TEST.FOO", false));
        MessageConsumer clientC = this.createConsumer("BrokerC", (Destination)this.createDestination("Consumer.C.TEST.FOO", false));
        Thread.sleep(2000L);
        this.sendMessages("BrokerA", (Destination)dest, 1);
        MessageIdList msgsA = this.getConsumerMessages("BrokerA", clientA);
        MessageIdList msgsB = this.getConsumerMessages("BrokerB", clientB);
        MessageIdList msgsC = this.getConsumerMessages("BrokerC", clientC);
        msgsA.waitForMessagesToArrive(1);
        msgsB.waitForMessagesToArrive(1);
        msgsC.waitForMessagesToArrive(1);
        Thread.sleep(2000L);
        ThreeBrokerVirtualTopicNetworkTest.assertEquals((int)1, (int)msgsA.getMessageCount());
        ThreeBrokerVirtualTopicNetworkTest.assertEquals((int)1, (int)msgsB.getMessageCount());
        ThreeBrokerVirtualTopicNetworkTest.assertEquals((int)1, (int)msgsC.getMessageCount());
        LOG.info("Restarting brokerA");
        JmsMultipleBrokersTestSupport.BrokerItem brokerItem = (JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.remove("BrokerA");
        if (brokerItem != null) {
            brokerItem.destroy();
        }
        BrokerService restartedBroker = this.createAndConfigureBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?useJmx=false"));
        this.bridgeAndConfigureBrokers("BrokerA", "BrokerB", this.dynamicOnly, networkTTL, conduitSubs);
        this.bridgeAndConfigureBrokers("BrokerA", "BrokerC", this.dynamicOnly, networkTTL, conduitSubs);
        restartedBroker.start();
        this.waitForBridgeFormation();
        clientA = this.createConsumer("BrokerA", (Destination)this.createDestination("Consumer.A.TEST.FOO", false));
        LOG.info("recreated clientA");
        Thread.sleep(2000L);
        this.sendMessages("BrokerA", (Destination)dest, 10);
        msgsA = this.getConsumerMessages("BrokerA", clientA);
        msgsA.waitForMessagesToArrive(10);
        msgsB.waitForMessagesToArrive(11);
        msgsC.waitForMessagesToArrive(11);
        Thread.sleep(2000L);
        LOG.info("MessagesA: " + msgsA.getMessageIds());
        ThreeBrokerVirtualTopicNetworkTest.assertEquals((int)10, (int)msgsA.getMessageCount());
        ThreeBrokerVirtualTopicNetworkTest.assertEquals((int)11, (int)msgsB.getMessageCount());
        ThreeBrokerVirtualTopicNetworkTest.assertEquals((int)11, (int)msgsC.getMessageCount());
        LOG.info("Restarting brokerA again");
        brokerItem = (JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.remove("BrokerA");
        if (brokerItem != null) {
            brokerItem.destroy();
        }
        restartedBroker = this.createAndConfigureBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?useJmx=false"));
        this.bridgeAndConfigureBrokers("BrokerA", "BrokerB", this.dynamicOnly, networkTTL, conduitSubs);
        this.bridgeAndConfigureBrokers("BrokerA", "BrokerC", this.dynamicOnly, networkTTL, conduitSubs);
        restartedBroker.start();
        this.waitForBridgeFormation();
        clientA = this.createConsumer("BrokerA", (Destination)this.createDestination("Consumer.A.TEST.FOO", false));
        LOG.info("recreated clientA again");
        Thread.sleep(2000L);
        msgsA = this.getConsumerMessages("BrokerA", clientA);
        Thread.sleep(5000L);
        LOG.info("Extra MessagesA: " + msgsA.getMessageIds());
        ThreeBrokerVirtualTopicNetworkTest.assertEquals((int)0, (int)msgsA.getMessageCount());
        ThreeBrokerVirtualTopicNetworkTest.assertEquals((int)11, (int)msgsB.getMessageCount());
        ThreeBrokerVirtualTopicNetworkTest.assertEquals((int)11, (int)msgsC.getMessageCount());
    }

    private void bridgeAndConfigureBrokers(String local, String remote, boolean dynamicOnly, int networkTTL, boolean conduitSubs) throws Exception {
        NetworkConnector bridge = this.bridgeBrokers(local, remote, dynamicOnly, networkTTL, conduitSubs);
        bridge.setDecreaseNetworkConsumerPriority(true);
    }

    @Override
    public void setUp() throws Exception {
        super.setAutoFail(true);
        super.setUp();
        String options = new String("?useJmx=false&deleteAllMessagesOnStartup=true");
        this.createAndConfigureBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options));
        this.createAndConfigureBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options));
        this.createAndConfigureBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options));
    }

    private BrokerService createAndConfigureBroker(URI uri) throws Exception {
        BrokerService broker = this.createBroker(uri);
        this.configurePersistenceAdapter(broker);
        VirtualDestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor();
        virtualDestinationInterceptor.setVirtualDestinations(new VirtualDestination[]{new VirtualTopic()});
        DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[]{virtualDestinationInterceptor};
        broker.setDestinationInterceptors(destinationInterceptors);
        return broker;
    }

    protected void configurePersistenceAdapter(BrokerService broker) throws IOException {
        File dataFileDir = new File("target/test-amq-data/kahadb/" + broker.getBrokerName());
        KahaDBStore kaha = new KahaDBStore();
        kaha.setDirectory(dataFileDir);
        broker.setPersistenceAdapter((PersistenceAdapter)kaha);
    }
}

