package org.apache.activemq.network;

import java.io.File;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.virtual.CompositeTopic;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker;
import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.usecases.DurableSubProcessConcurrentCommitActivateNoDuplicateTest;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/network/DurableSyncNetworkBridgeTest.class */
public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
    protected static final Logger LOG = LoggerFactory.getLogger(DurableSyncNetworkBridgeTest.class);
    protected JavaRuntimeConfigurationBroker remoteRuntimeBroker;
    private BrokerService broker1;
    private BrokerService broker2;
    private Session session1;
    private Session session2;
    private final FLOW flow;
    public static final String KEYSTORE_TYPE = "jks";
    public static final String PASSWORD = "password";
    public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore";
    public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore";
    protected AdvisoryBroker remoteAdvisoryBroker;
    protected String staticIncludeTopics = "include.static.test";
    protected String includedTopics = "include.test.>";
    protected String testTopicName2 = "include.test.bar2";
    private boolean dynamicOnly = false;
    private boolean forceDurable = false;
    private boolean useVirtualDestSubs = false;
    private byte remoteBrokerWireFormatVersion = 12;

    @Rule
    public Timeout globalTimeout = new Timeout(30, TimeUnit.SECONDS);

    /* loaded from: input_file:org/apache/activemq/network/DurableSyncNetworkBridgeTest$FLOW.class */
    public enum FLOW {
        FORWARD,
        REVERSE
    }

    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{FLOW.FORWARD}, new Object[]{FLOW.REVERSE});
    }

    public DurableSyncNetworkBridgeTest(FLOW flow) {
        this.flow = flow;
    }

    @Before
    public void setUp() throws Exception {
        this.includedTopics = "include.test.>";
        this.staticIncludeTopics = "include.static.test";
        this.dynamicOnly = false;
        this.forceDurable = false;
        this.useVirtualDestSubs = false;
        this.remoteBrokerWireFormatVersion = (byte) 12;
        doSetUp(true, true, this.tempFolder.newFolder(), this.tempFolder.newFolder());
    }

    @After
    public void tearDown() throws Exception {
        doTearDown();
    }

    @Test
    public void testRemoveSubscriptionPropagate() throws Exception {
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(this.testTopicName);
        this.session1.createDurableSubscriber(activeMQTopic, this.subName).close();
        assertSubscriptionsCount(this.broker1, activeMQTopic, 1);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 1);
        removeSubscription(this.broker1, activeMQTopic, this.subName);
        assertSubscriptionsCount(this.broker1, activeMQTopic, 0);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 0);
    }

    @Test
    public void testRemoveSubscriptionPropegateAfterRestart() throws Exception {
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(this.testTopicName);
        this.session1.createDurableSubscriber(activeMQTopic, this.subName).close();
        assertSubscriptionsCount(this.broker1, activeMQTopic, 1);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 1);
        restartBrokers(true);
        assertBridgeStarted();
        assertSubscriptionsCount(this.broker1, activeMQTopic, 1);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 1);
        removeSubscription(this.broker1, activeMQTopic, this.subName);
        assertSubscriptionsCount(this.broker1, activeMQTopic, 0);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 0);
    }

    @Test
    public void testRemoveSubscriptionWithBridgeOffline() throws Exception {
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(this.testTopicName);
        this.session1.createDurableSubscriber(activeMQTopic, this.subName).close();
        assertSubscriptionsCount(this.broker1, activeMQTopic, 1);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 1);
        doTearDown();
        restartBroker(this.broker1, false);
        restartBroker(this.broker2, false);
        MessageProducer createProducer = this.session2.createProducer(activeMQTopic);
        for (int i = 0; i < 10; i++) {
            createProducer.send(this.session2.createTextMessage("test"));
        }
        assertSubscriptionsCount(this.broker1, activeMQTopic, 1);
        removeSubscription(this.broker1, activeMQTopic, this.subName);
        assertSubscriptionsCount(this.broker1, activeMQTopic, 0);
        doTearDown();
        restartBroker(this.broker2, true);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 1);
        restartBroker(this.broker1, true);
        assertBridgeStarted();
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 0);
    }

    @Test
    public void testRemoveSubscriptionWithBridgeOfflineIncludedChanged() throws Exception {
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(this.testTopicName);
        this.session1.createDurableSubscriber(activeMQTopic, this.subName).close();
        assertSubscriptionsCount(this.broker1, activeMQTopic, 1);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 1);
        doTearDown();
        this.includedTopics = "different.topic";
        restartBroker(this.broker1, false);
        assertSubscriptionsCount(this.broker1, activeMQTopic, 1);
        removeSubscription(this.broker1, activeMQTopic, this.subName);
        assertSubscriptionsCount(this.broker1, activeMQTopic, 0);
        restartBroker(this.broker2, true);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 1);
        restartBroker(this.broker1, true);
        assertBridgeStarted();
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 0);
    }

    @Test
    public void testSubscriptionRemovedAfterIncludedChanged() throws Exception {
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(this.testTopicName);
        this.session1.createDurableSubscriber(activeMQTopic, this.subName).close();
        assertSubscriptionsCount(this.broker1, activeMQTopic, 1);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 1);
        doTearDown();
        this.includedTopics = "different.topic";
        restartBroker(this.broker1, false);
        assertSubscriptionsCount(this.broker1, activeMQTopic, 1);
        restartBroker(this.broker2, true);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 1);
        restartBroker(this.broker1, true);
        assertBridgeStarted();
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 0);
        assertSubscriptionsCount(this.broker1, activeMQTopic, 1);
    }

    @Test
    public void testSubscriptionRemovedAfterStaticChanged() throws Exception {
        this.forceDurable = true;
        restartBrokers(true);
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(this.staticIncludeTopics);
        this.session1.createDurableSubscriber(activeMQTopic, this.subName).close();
        assertSubscriptionsCount(this.broker1, activeMQTopic, 1);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 1);
        doTearDown();
        this.staticIncludeTopics = "different.topic";
        restartBrokers(false);
        assertSubscriptionsCount(this.broker1, activeMQTopic, 1);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 1);
        MessageProducer createProducer = this.session2.createProducer(activeMQTopic);
        for (int i = 0; i < 10; i++) {
            createProducer.send(this.session2.createTextMessage("test"));
        }
        restartBroker(this.broker2, true);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 1);
        restartBroker(this.broker1, true);
        assertBridgeStarted();
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 0);
        assertSubscriptionsCount(this.broker1, activeMQTopic, 1);
    }

    @Test
    public void testAddAndRemoveSubscriptionWithBridgeOfflineMultiTopics() throws Exception {
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(this.testTopicName);
        ActiveMQTopic activeMQTopic2 = new ActiveMQTopic(this.testTopicName2);
        this.session1.createDurableSubscriber(activeMQTopic, this.subName).close();
        assertSubscriptionsCount(this.broker1, activeMQTopic, 1);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 1);
        doTearDown();
        restartBroker(this.broker1, false);
        assertSubscriptionsCount(this.broker1, activeMQTopic, 1);
        this.session1.createDurableSubscriber(activeMQTopic2, "sub2");
        removeSubscription(this.broker1, activeMQTopic, this.subName);
        assertSubscriptionsCount(this.broker1, activeMQTopic, 0);
        assertSubscriptionsCount(this.broker1, activeMQTopic2, 1);
        restartBroker(this.broker2, true);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 1);
        assertNCDurableSubsCount(this.broker2, activeMQTopic2, 0);
        restartBroker(this.broker1, true);
        assertBridgeStarted();
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 0);
        assertNCDurableSubsCount(this.broker2, activeMQTopic2, 1);
    }

    @Test
    public void testAddSubscriptionsWithBridgeOffline() throws Exception {
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(this.testTopicName);
        ActiveMQTopic activeMQTopic2 = new ActiveMQTopic(this.testTopicName2);
        ActiveMQTopic activeMQTopic3 = new ActiveMQTopic(this.excludeTopicName);
        assertSubscriptionsCount(this.broker1, activeMQTopic, 0);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 0);
        doTearDown();
        restartBroker(this.broker1, false);
        assertSubscriptionsCount(this.broker1, activeMQTopic, 0);
        this.session1.createDurableSubscriber(activeMQTopic, this.subName).close();
        this.session1.createDurableSubscriber(activeMQTopic, "sub2").close();
        this.session1.createDurableSubscriber(activeMQTopic2, "sub3").close();
        assertSubscriptionsCount(this.broker1, activeMQTopic, 2);
        assertSubscriptionsCount(this.broker1, activeMQTopic2, 1);
        restartBrokers(true);
        assertBridgeStarted();
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 1);
        assertNCDurableSubsCount(this.broker2, activeMQTopic2, 1);
        assertNCDurableSubsCount(this.broker2, activeMQTopic3, 0);
    }

    @Test
    public void testSyncLoadTest() throws Exception {
        String str = this.subName;
        for (int i = 0; i < 100; i++) {
            for (int i2 = 0; i2 < 10; i2++) {
                this.session1.createDurableSubscriber(new ActiveMQTopic("include.test." + i), str + i + i2).close();
            }
        }
        for (int i3 = 0; i3 < 100; i3++) {
            assertNCDurableSubsCount(this.broker2, new ActiveMQTopic("include.test." + i3), 1);
        }
        doTearDown();
        restartBroker(this.broker1, false);
        for (int i4 = 0; i4 < 10; i4++) {
            for (int i5 = 0; i5 < 10; i5++) {
                removeSubscription(this.broker1, new ActiveMQTopic("include.test." + i4), str + i4 + i5);
            }
        }
        restartBrokers(true);
        for (int i6 = 0; i6 < 10; i6++) {
            assertNCDurableSubsCount(this.broker2, new ActiveMQTopic("include.test." + i6), 0);
        }
        for (int i7 = 10; i7 < 100; i7++) {
            assertNCDurableSubsCount(this.broker2, new ActiveMQTopic("include.test." + i7), 1);
        }
        assertBridgeStarted();
    }

    @Test
    public void testAddSubscriptionsWithBridgeOfflineOpenWire11() throws Exception {
        this.remoteBrokerWireFormatVersion = (byte) 11;
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(this.testTopicName);
        assertSubscriptionsCount(this.broker1, activeMQTopic, 0);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 0);
        doTearDown();
        restartBroker(this.broker1, false);
        assertSubscriptionsCount(this.broker1, activeMQTopic, 0);
        this.session1.createDurableSubscriber(activeMQTopic, this.subName).close();
        assertSubscriptionsCount(this.broker1, activeMQTopic, 1);
        restartBrokers(true);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 0);
        assertBridgeStarted();
    }

    @Test
    public void testAddOfflineSubscriptionWithBridgeOfflineDynamicTrue() throws Exception {
        this.dynamicOnly = true;
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(this.testTopicName);
        assertSubscriptionsCount(this.broker1, activeMQTopic, 0);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 0);
        doTearDown();
        restartBroker(this.broker1, false);
        assertSubscriptionsCount(this.broker1, activeMQTopic, 0);
        this.session1.createDurableSubscriber(activeMQTopic, this.subName).close();
        assertSubscriptionsCount(this.broker1, activeMQTopic, 1);
        restartBrokers(true);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 0);
        assertBridgeStarted();
    }

    @Test
    public void testAddOnlineSubscriptionWithBridgeOfflineDynamicTrue() throws Exception {
        this.dynamicOnly = true;
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(this.testTopicName);
        assertSubscriptionsCount(this.broker1, activeMQTopic, 0);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 0);
        doTearDown();
        restartBroker(this.broker1, false);
        assertSubscriptionsCount(this.broker1, activeMQTopic, 0);
        this.session1.createDurableSubscriber(activeMQTopic, this.subName).close();
        assertSubscriptionsCount(this.broker1, activeMQTopic, 1);
        restartBrokers(true);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 0);
        this.session1.createDurableSubscriber(activeMQTopic, this.subName);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 1);
        assertBridgeStarted();
    }

    @Test
    public void testAddAndRemoveSubscriptionsWithBridgeOffline() throws Exception {
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(this.testTopicName);
        ActiveMQTopic activeMQTopic2 = new ActiveMQTopic(this.excludeTopicName);
        this.session1.createDurableSubscriber(activeMQTopic, this.subName).close();
        assertSubscriptionsCount(this.broker1, activeMQTopic, 1);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 1);
        doTearDown();
        restartBroker(this.broker1, false);
        assertSubscriptionsCount(this.broker1, activeMQTopic, 1);
        removeSubscription(this.broker1, activeMQTopic, this.subName);
        this.session1.createDurableSubscriber(activeMQTopic, "sub2").close();
        assertSubscriptionsCount(this.broker1, activeMQTopic, 1);
        restartBrokers(true);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 1);
        assertNCDurableSubsCount(this.broker2, activeMQTopic2, 0);
        assertBridgeStarted();
    }

    @Test
    public void testAddOnlineSubscriptionsWithBridgeOffline() throws Exception {
        Assume.assumeTrue(this.flow == FLOW.FORWARD);
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(this.testTopicName);
        ActiveMQTopic activeMQTopic2 = new ActiveMQTopic(this.excludeTopicName);
        assertSubscriptionsCount(this.broker1, activeMQTopic, 0);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 0);
        doTearDown();
        restartBrokers(false);
        assertSubscriptionsCount(this.broker1, activeMQTopic, 0);
        this.session1.createDurableSubscriber(activeMQTopic2, "sub-exclude");
        this.session1.createDurableSubscriber(activeMQTopic, this.subName);
        this.session1.createDurableSubscriber(activeMQTopic, "sub2");
        this.session1.createDurableSubscriber(activeMQTopic, "sub3");
        assertSubscriptionsCount(this.broker1, activeMQTopic, 3);
        restartBrokers(true);
        assertBridgeStarted();
        this.session1.createDurableSubscriber(activeMQTopic, this.subName);
        this.session1.createDurableSubscriber(activeMQTopic, "sub2");
        this.session1.createDurableSubscriber(activeMQTopic, "sub3");
        this.session1.createDurableSubscriber(activeMQTopic2, "sub-exclude");
        Thread.sleep(1000L);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 1);
        assertNCDurableSubsCount(this.broker2, activeMQTopic2, 0);
    }

    @Test
    public void testAddOnlineSubscriptionsTwoBridges() throws Exception {
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(this.testTopicName);
        ActiveMQTopic activeMQTopic2 = new ActiveMQTopic(this.excludeTopicName);
        ActiveMQTopic activeMQTopic3 = new ActiveMQTopic("include.new.topic");
        assertSubscriptionsCount(this.broker1, activeMQTopic, 0);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 0);
        this.session1.createDurableSubscriber(activeMQTopic2, "sub-exclude");
        this.session1.createDurableSubscriber(activeMQTopic, this.subName);
        this.session1.createDurableSubscriber(activeMQTopic, "sub2");
        this.session1.createDurableSubscriber(activeMQTopic, "sub3");
        this.session1.createDurableSubscriber(activeMQTopic3, "secondTopicSubName");
        assertSubscriptionsCount(this.broker1, activeMQTopic, 3);
        assertSubscriptionsCount(this.broker1, activeMQTopic3, 1);
        NetworkConnector configureLocalNetworkConnector = configureLocalNetworkConnector();
        configureLocalNetworkConnector.setName("networkConnector2");
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ActiveMQTopic("include.new.topic?forceDurable=" + this.forceDurable));
        configureLocalNetworkConnector.setDynamicallyIncludedDestinations(arrayList);
        this.localBroker.addNetworkConnector(configureLocalNetworkConnector);
        configureLocalNetworkConnector.start();
        Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.DurableSyncNetworkBridgeTest.1
            public boolean isSatisified() throws Exception {
                return ((NetworkConnector) DurableSyncNetworkBridgeTest.this.localBroker.getNetworkConnectors().get(0)).activeBridges().size() == 1 && ((NetworkConnector) DurableSyncNetworkBridgeTest.this.localBroker.getNetworkConnectors().get(1)).activeBridges().size() == 1;
            }
        }, DurableSubProcessWithRestartTest.BROKER_RESTART, 500L));
        assertNCDurableSubsCount(this.broker2, activeMQTopic3, 1);
        assertNCDurableSubsCount(this.broker2, activeMQTopic, 1);
        assertNCDurableSubsCount(this.broker2, activeMQTopic2, 0);
        this.session2.createProducer(activeMQTopic3).send(this.session2.createTextMessage("test"));
        waitForDispatchFromLocalBroker(this.broker2.getDestination(activeMQTopic3).getDestinationStatistics(), 1);
        assertLocalBrokerStatistics(this.broker2.getDestination(activeMQTopic3).getDestinationStatistics(), 1);
    }

    @Test(timeout = 60000)
    public void testVirtualDestSubForceDurableSync() throws Exception {
        Assume.assumeTrue(this.flow == FLOW.FORWARD);
        this.forceDurable = true;
        this.useVirtualDestSubs = true;
        restartBrokers(true);
        VirtualDestination createCompositeTopic = createCompositeTopic(this.testTopicName, new ActiveMQQueue("include.test.bar.bridge"));
        this.remoteRuntimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic}, true);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        DestinationStatistics destinationStatistics = this.localBroker.getDestination(this.included).getDestinationStatistics();
        DestinationStatistics destinationStatistics2 = this.remoteBroker.getDestination(new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        waitForConsumerCount(destinationStatistics, 1);
        assertNCDurableSubsCount(this.localBroker, this.included, 1);
        createProducer.send(createTextMessage);
        waitForDispatchFromLocalBroker(destinationStatistics, 1);
        assertLocalBrokerStatistics(destinationStatistics, 1);
        Assert.assertEquals("remote dest messages", 1L, destinationStatistics2.getMessages().getCount());
        stopRemoteBroker();
        for (int i = 0; i < 500; i++) {
            createProducer.send(createTextMessage);
        }
        stopLocalBroker();
        restartRemoteBroker();
        this.remoteRuntimeBroker.setVirtualDestinations(new VirtualDestination[]{createCompositeTopic}, true);
        restartLocalBroker(true);
        final DestinationStatistics destinationStatistics3 = this.remoteBroker.getDestination(new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
        Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.DurableSyncNetworkBridgeTest.2
            public boolean isSatisified() throws Exception {
                return destinationStatistics3.getMessages().getCount() == 501;
            }
        }));
    }

    @Test(timeout = 60000)
    public void testForceDurableTopicSubSync() throws Exception {
        Assume.assumeTrue(this.flow == FLOW.FORWARD);
        this.forceDurable = true;
        restartBrokers(true);
        this.remoteSession.createConsumer(this.included);
        MessageProducer createProducer = this.localSession.createProducer(this.included);
        TextMessage createTextMessage = this.localSession.createTextMessage("test");
        DestinationStatistics destinationStatistics = this.localBroker.getDestination(this.included).getDestinationStatistics();
        waitForConsumerCount(destinationStatistics, 1);
        assertNCDurableSubsCount(this.localBroker, this.included, 1);
        createProducer.send(createTextMessage);
        waitForDispatchFromLocalBroker(destinationStatistics, 1);
        assertLocalBrokerStatistics(destinationStatistics, 1);
        this.localBroker.getNetworkConnectorByName("networkConnector").stop();
        for (int i = 0; i < 500; i++) {
            createProducer.send(createTextMessage);
        }
        stopLocalBroker();
        restartLocalBroker(true);
        DestinationStatistics destinationStatistics2 = this.localBroker.getDestination(this.included).getDestinationStatistics();
        waitForDispatchFromLocalBroker(destinationStatistics2, DurableSubProcessConcurrentCommitActivateNoDuplicateTest.SERVER_SLEEP);
        assertLocalBrokerStatistics(destinationStatistics2, DurableSubProcessConcurrentCommitActivateNoDuplicateTest.SERVER_SLEEP);
    }

    protected CompositeTopic createCompositeTopic(String str, ActiveMQDestination... activeMQDestinationArr) {
        CompositeTopic compositeTopic = new CompositeTopic();
        compositeTopic.setName(str);
        compositeTopic.setForwardOnly(true);
        ArrayList arrayList = new ArrayList();
        for (ActiveMQDestination activeMQDestination : activeMQDestinationArr) {
            arrayList.add(activeMQDestination);
        }
        compositeTopic.setForwardTo(arrayList);
        return compositeTopic;
    }

    protected void restartBroker(BrokerService brokerService, boolean z) throws Exception {
        if (brokerService.getBrokerName().equals("localBroker")) {
            restartLocalBroker(z);
        } else {
            restartRemoteBroker();
        }
    }

    protected void restartBrokers(boolean z) throws Exception {
        doTearDown();
        doSetUp(false, z, this.localBroker.getDataDirectoryFile(), this.remoteBroker.getDataDirectoryFile());
    }

    protected void doSetUp(boolean z, boolean z2, File file, File file2) throws Exception {
        this.included = new ActiveMQTopic(this.testTopicName);
        doSetUpRemoteBroker(z, file2, 0);
        doSetUpLocalBroker(z, z2, file);
        Thread.sleep(1000L);
    }

    protected void restartLocalBroker(boolean z) throws Exception {
        stopLocalBroker();
        doSetUpLocalBroker(false, z, this.localBroker.getDataDirectoryFile());
    }

    protected void restartRemoteBroker() throws Exception {
        int i = 0;
        if (this.remoteBroker != null) {
            i = ((TransportConnector) this.remoteBroker.getTransportConnectors().get(0)).getConnectUri().getPort();
        }
        stopRemoteBroker();
        doSetUpRemoteBroker(false, this.remoteBroker.getDataDirectoryFile(), i);
    }

    protected void doSetUpLocalBroker(boolean z, boolean z2, File file) throws Exception {
        this.localBroker = createLocalBroker(file, z2);
        this.localBroker.setDeleteAllMessagesOnStartup(z);
        this.localBroker.start();
        this.localBroker.waitUntilStarted();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(this.localBroker.getVmConnectorURI());
        activeMQConnectionFactory.setAlwaysSyncSend(true);
        activeMQConnectionFactory.setDispatchAsync(false);
        this.localConnection = activeMQConnectionFactory.createConnection();
        this.localConnection.setClientID("clientId");
        this.localConnection.start();
        if (z2) {
            Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.DurableSyncNetworkBridgeTest.3
                public boolean isSatisified() throws Exception {
                    return ((NetworkConnector) DurableSyncNetworkBridgeTest.this.localBroker.getNetworkConnectors().get(0)).activeBridges().size() == 1;
                }
            }, 5000L, 500L);
        }
        this.localSession = this.localConnection.createSession(false, 1);
        if (this.flow.equals(FLOW.FORWARD)) {
            this.broker1 = this.localBroker;
            this.session1 = this.localSession;
        } else {
            this.broker2 = this.localBroker;
            this.session2 = this.localSession;
        }
    }

    protected void doSetUpRemoteBroker(boolean z, File file, int i) throws Exception {
        this.remoteBroker = createRemoteBroker(file, i);
        this.remoteBroker.setDeleteAllMessagesOnStartup(z);
        this.remoteBroker.start();
        this.remoteBroker.waitUntilStarted();
        this.remoteConnection = new ActiveMQConnectionFactory(this.remoteBroker.getVmConnectorURI()).createConnection();
        this.remoteConnection.setClientID("clientId");
        this.remoteConnection.start();
        this.remoteSession = this.remoteConnection.createSession(false, 1);
        if (!this.flow.equals(FLOW.FORWARD)) {
            this.broker1 = this.remoteBroker;
            this.session1 = this.remoteSession;
        } else {
            this.broker2 = this.remoteBroker;
            this.session2 = this.remoteSession;
            this.remoteRuntimeBroker = this.remoteBroker.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class);
        }
    }

    protected BrokerService createLocalBroker(File file, boolean z) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setMonitorConnectionSplits(true);
        brokerService.setBrokerName("localBroker");
        brokerService.setDataDirectoryFile(file);
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setDirectory(file);
        kahaDBPersistenceAdapter.setJournalDiskSyncStrategy(Journal.JournalDiskSyncStrategy.PERIODIC.name());
        brokerService.setPersistenceAdapter(kahaDBPersistenceAdapter);
        brokerService.setUseVirtualDestSubs(this.useVirtualDestSubs);
        brokerService.setUseVirtualDestSubsOnCreation(this.useVirtualDestSubs);
        if (z) {
            brokerService.addNetworkConnector(configureLocalNetworkConnector());
        }
        brokerService.addConnector("auto+nio+ssl://localhost:0");
        return brokerService;
    }

    protected NetworkConnector configureLocalNetworkConnector() throws Exception {
        DiscoveryNetworkConnector discoveryNetworkConnector = new DiscoveryNetworkConnector(new URI("static:(" + ((TransportConnector) this.remoteBroker.getTransportConnectors().get(0)).getConnectUri() + ")"));
        discoveryNetworkConnector.setName("networkConnector");
        discoveryNetworkConnector.setDynamicOnly(this.dynamicOnly);
        discoveryNetworkConnector.setDecreaseNetworkConsumerPriority(false);
        discoveryNetworkConnector.setConduitSubscriptions(true);
        discoveryNetworkConnector.setDuplex(true);
        discoveryNetworkConnector.setStaticBridge(false);
        discoveryNetworkConnector.setSyncDurableSubs(true);
        discoveryNetworkConnector.setUseVirtualDestSubs(this.useVirtualDestSubs);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ActiveMQTopic(this.staticIncludeTopics + "?forceDurable=" + this.forceDurable));
        discoveryNetworkConnector.setStaticallyIncludedDestinations(arrayList);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(new ActiveMQTopic(this.includedTopics + "?forceDurable=" + this.forceDurable));
        discoveryNetworkConnector.setDynamicallyIncludedDestinations(arrayList2);
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(new ActiveMQTopic(this.excludeTopicName));
        discoveryNetworkConnector.setExcludedDestinations(arrayList3);
        return discoveryNetworkConnector;
    }

    protected BrokerService createRemoteBroker(File file, int i) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName("remoteBroker");
        brokerService.setUseJmx(false);
        brokerService.setDataDirectoryFile(file);
        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
        kahaDBPersistenceAdapter.setDirectory(file);
        kahaDBPersistenceAdapter.setJournalDiskSyncStrategy(Journal.JournalDiskSyncStrategy.PERIODIC.name());
        brokerService.setPersistenceAdapter(kahaDBPersistenceAdapter);
        brokerService.setUseVirtualDestSubs(this.useVirtualDestSubs);
        brokerService.setUseVirtualDestSubsOnCreation(this.useVirtualDestSubs);
        if (this.useVirtualDestSubs) {
            brokerService.setPlugins(new BrokerPlugin[]{new JavaRuntimeConfigurationPlugin()});
        }
        this.remoteAdvisoryBroker = brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
        brokerService.addConnector("auto+nio+ssl://localhost:" + i + "?wireFormat.cacheSize=2048&wireFormat.version=" + this.remoteBrokerWireFormatVersion);
        return brokerService;
    }

    static {
        System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
        System.setProperty("javax.net.ssl.trustStorePassword", "password");
        System.setProperty("javax.net.ssl.trustStoreType", "jks");
        System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
        System.setProperty("javax.net.ssl.keyStoreType", "jks");
        System.setProperty("javax.net.ssl.keyStorePassword", "password");
    }
}
