package org.apache.activemq.usecases;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import javax.jms.MessageConsumer;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.MessageIdList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activemq/usecases/ThreeBrokerVirtualTopicNetworkTest.class */
public class ThreeBrokerVirtualTopicNetworkTest extends JmsMultipleBrokersTestSupport {
    private static final Log LOG = LogFactory.getLog(ThreeBrokerVirtualTopicNetworkTest.class);
    protected static final int MESSAGE_COUNT = 1;
    public boolean dynamicOnly = false;

    public void testNetworkVirtualTopic() throws Exception {
        bridgeAndConfigureBrokers("BrokerA", "BrokerB", this.dynamicOnly, 6, true);
        bridgeAndConfigureBrokers("BrokerA", "BrokerC", this.dynamicOnly, 6, true);
        bridgeAndConfigureBrokers("BrokerB", "BrokerA", this.dynamicOnly, 6, true);
        bridgeAndConfigureBrokers("BrokerB", "BrokerC", this.dynamicOnly, 6, true);
        bridgeAndConfigureBrokers("BrokerC", "BrokerA", this.dynamicOnly, 6, true);
        bridgeAndConfigureBrokers("BrokerC", "BrokerB", this.dynamicOnly, 6, true);
        startAllBrokers();
        waitForBridgeFormation();
        ActiveMQDestination createDestination = createDestination("TEST.FOO", true);
        MessageConsumer createConsumer = createConsumer("BrokerA", createDestination("Consumer.A.TEST.FOO", false));
        MessageConsumer createConsumer2 = createConsumer("BrokerB", createDestination("Consumer.B.TEST.FOO", false));
        MessageConsumer createConsumer3 = createConsumer("BrokerC", createDestination("Consumer.C.TEST.FOO", false));
        Thread.sleep(2000L);
        sendMessages("BrokerA", createDestination, 1);
        MessageIdList consumerMessages = getConsumerMessages("BrokerA", createConsumer);
        MessageIdList consumerMessages2 = getConsumerMessages("BrokerB", createConsumer2);
        MessageIdList consumerMessages3 = getConsumerMessages("BrokerC", createConsumer3);
        consumerMessages.waitForMessagesToArrive(1);
        consumerMessages2.waitForMessagesToArrive(1);
        consumerMessages3.waitForMessagesToArrive(1);
        Thread.sleep(2000L);
        assertEquals(1, consumerMessages.getMessageCount());
        assertEquals(1, consumerMessages2.getMessageCount());
        assertEquals(1, consumerMessages3.getMessageCount());
        LOG.info("Restarting brokerA");
        JmsMultipleBrokersTestSupport.BrokerItem remove = this.brokers.remove("BrokerA");
        if (remove != null) {
            remove.destroy();
        }
        BrokerService createAndConfigureBroker = createAndConfigureBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?useJmx=false"));
        bridgeAndConfigureBrokers("BrokerA", "BrokerB", this.dynamicOnly, 6, true);
        bridgeAndConfigureBrokers("BrokerA", "BrokerC", this.dynamicOnly, 6, true);
        createAndConfigureBroker.start();
        waitForBridgeFormation();
        MessageConsumer createConsumer4 = createConsumer("BrokerA", createDestination("Consumer.A.TEST.FOO", false));
        LOG.info("recreated clientA");
        Thread.sleep(2000L);
        sendMessages("BrokerA", createDestination, 10);
        MessageIdList consumerMessages4 = getConsumerMessages("BrokerA", createConsumer4);
        consumerMessages4.waitForMessagesToArrive(10);
        consumerMessages2.waitForMessagesToArrive(11);
        consumerMessages3.waitForMessagesToArrive(11);
        Thread.sleep(2000L);
        LOG.info("MessagesA: " + consumerMessages4.getMessageIds());
        assertEquals(10, consumerMessages4.getMessageCount());
        assertEquals(11, consumerMessages2.getMessageCount());
        assertEquals(11, consumerMessages3.getMessageCount());
        LOG.info("Restarting brokerA again");
        JmsMultipleBrokersTestSupport.BrokerItem remove2 = this.brokers.remove("BrokerA");
        if (remove2 != null) {
            remove2.destroy();
        }
        BrokerService createAndConfigureBroker2 = createAndConfigureBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?useJmx=false"));
        bridgeAndConfigureBrokers("BrokerA", "BrokerB", this.dynamicOnly, 6, true);
        bridgeAndConfigureBrokers("BrokerA", "BrokerC", this.dynamicOnly, 6, true);
        createAndConfigureBroker2.start();
        waitForBridgeFormation();
        MessageConsumer createConsumer5 = createConsumer("BrokerA", createDestination("Consumer.A.TEST.FOO", false));
        LOG.info("recreated clientA again");
        Thread.sleep(2000L);
        MessageIdList consumerMessages5 = getConsumerMessages("BrokerA", createConsumer5);
        Thread.sleep(5000L);
        LOG.info("Extra MessagesA: " + consumerMessages5.getMessageIds());
        assertEquals(0, consumerMessages5.getMessageCount());
        assertEquals(11, consumerMessages2.getMessageCount());
        assertEquals(11, consumerMessages3.getMessageCount());
    }

    private void bridgeAndConfigureBrokers(String str, String str2, boolean z, int i, boolean z2) throws Exception {
        bridgeBrokers(str, str2, z, i, z2).setDecreaseNetworkConsumerPriority(true);
    }

    @Override // org.apache.activemq.JmsMultipleBrokersTestSupport, org.apache.activemq.AutoFailTestSupport
    public void setUp() throws Exception {
        super.setAutoFail(true);
        super.setUp();
        String str = new String("?useJmx=false&deleteAllMessagesOnStartup=true");
        createAndConfigureBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + str));
        createAndConfigureBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + str));
        createAndConfigureBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + str));
    }

    private BrokerService createAndConfigureBroker(URI uri) throws Exception {
        BrokerService createBroker = createBroker(uri);
        configurePersistenceAdapter(createBroker);
        DestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor();
        virtualDestinationInterceptor.setVirtualDestinations(new VirtualDestination[]{new VirtualTopic()});
        createBroker.setDestinationInterceptors(new DestinationInterceptor[]{virtualDestinationInterceptor});
        return createBroker;
    }

    protected void configurePersistenceAdapter(BrokerService brokerService) throws IOException {
        File file = new File("target/test-amq-data/kahadb/" + brokerService.getBrokerName());
        KahaDBStore kahaDBStore = new KahaDBStore();
        kahaDBStore.setDirectory(file);
        brokerService.setPersistenceAdapter(kahaDBStore);
    }
}
