package org.apache.activemq.network;

import java.net.URI;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.management.ObjectName;
import javax.management.QueryExp;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.transport.tcp.SslBrokerServiceTest;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/network/FailoverStaticNetworkTest.class */
public class FailoverStaticNetworkTest {
    protected static final Logger LOG = LoggerFactory.getLogger(FailoverStaticNetworkTest.class);
    private static final String DESTINATION_NAME = "testQ";
    protected BrokerService brokerA;
    protected BrokerService brokerA1;
    protected BrokerService brokerB;
    protected BrokerService brokerC;
    private SslContext sslContext;

    protected BrokerService createBroker(String str, String str2, String[] strArr) throws Exception {
        return createBroker(str, str2, strArr, null);
    }

    protected BrokerService createBroker(String str, String str2, String[] strArr, HashMap<String, String> hashMap) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.getManagementContext().setCreateConnector(false);
        brokerService.setSslContext(this.sslContext);
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.setBrokerName("Broker_" + str2);
        TransportConnector transportConnector = new TransportConnector();
        transportConnector.setUri(new URI(str + "://localhost:" + str2));
        ArrayList arrayList = new ArrayList();
        arrayList.add(transportConnector);
        brokerService.setTransportConnectors(arrayList);
        if (strArr != null && strArr.length > 0) {
            StringBuilder sb = new StringBuilder("static:(failover:(" + str + "://localhost:");
            sb.append(strArr[0]);
            for (int i = 1; i < strArr.length; i++) {
                sb.append("," + str + "://localhost:" + strArr[i]);
            }
            sb.append(")?maxReconnectAttempts=0)?useExponentialBackOff=false");
            NetworkConnector addNetworkConnector = brokerService.addNetworkConnector(sb.toString());
            if (hashMap != null) {
                IntrospectionSupport.setProperties(addNetworkConnector, hashMap);
            }
        }
        return brokerService;
    }

    private BrokerService createBroker(String str, String str2) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setUseJmx(false);
        brokerService.getManagementContext().setCreateConnector(false);
        brokerService.setBrokerName("Broker_Shared");
        TransportConnector transportConnector = new TransportConnector();
        transportConnector.setUri(new URI("tcp://localhost:" + str));
        brokerService.addConnector(transportConnector);
        brokerService.setDataDirectory(str2);
        return brokerService;
    }

    @Before
    public void setUp() throws Exception {
        this.sslContext = new SslContext(SslBrokerServiceTest.getKeyManager(), SslBrokerServiceTest.getTrustManager(), (SecureRandom) null);
    }

    @After
    public void tearDown() throws Exception {
        this.brokerB.stop();
        this.brokerB.waitUntilStopped();
        this.brokerA.stop();
        this.brokerA.waitUntilStopped();
        if (this.brokerA1 != null) {
            this.brokerA1.stop();
            this.brokerA1.waitUntilStopped();
        }
        if (this.brokerC != null) {
            this.brokerC.stop();
            this.brokerC.waitUntilStopped();
        }
    }

    @Test
    public void testSendReceiveAfterReconnect() throws Exception {
        this.brokerA = createBroker("tcp", "61617", null);
        this.brokerA.start();
        this.brokerB = createBroker("tcp", "62617", new String[]{"61617"});
        this.brokerB.start();
        doTestNetworkSendReceive();
        LOG.info("stopping brokerA");
        this.brokerA.stop();
        this.brokerA.waitUntilStopped();
        LOG.info("restarting brokerA");
        this.brokerA = createBroker("tcp", "61617", null);
        this.brokerA.start();
        doTestNetworkSendReceive();
    }

    @Test
    public void testSendReceiveFailover() throws Exception {
        this.brokerA = createBroker("tcp", "61617", null);
        this.brokerA.start();
        this.brokerB = createBroker("tcp", "62617", new String[]{"61617", "63617"});
        this.brokerB.start();
        doTestNetworkSendReceive();
        Set<String> networkBridgeMBeanName = getNetworkBridgeMBeanName(this.brokerB);
        Assert.assertEquals("only one bridgeName: " + networkBridgeMBeanName, 1L, networkBridgeMBeanName.size());
        LOG.info("stopping brokerA");
        this.brokerA.stop();
        this.brokerA.waitUntilStopped();
        LOG.info("restarting brokerA");
        this.brokerA = createBroker("tcp", "63617", null);
        this.brokerA.start();
        doTestNetworkSendReceive();
        Set<String> networkBridgeMBeanName2 = getNetworkBridgeMBeanName(this.brokerB);
        Assert.assertEquals("only one bridgeName: " + networkBridgeMBeanName2, 1L, networkBridgeMBeanName2.size());
        Assert.assertTrue("there was an addition", networkBridgeMBeanName.addAll(networkBridgeMBeanName2));
    }

    private Set<String> getNetworkBridgeMBeanName(BrokerService brokerService) throws Exception {
        HashSet hashSet = new HashSet();
        for (ObjectName objectName : brokerService.getManagementContext().queryNames((ObjectName) null, (QueryExp) null)) {
            if ("NetworkBridge".equals(objectName.getKeyProperty("Type"))) {
                hashSet.add(objectName.getKeyProperty("Name"));
            }
        }
        return hashSet;
    }

    @Test
    public void testSendReceiveFailoverDuplex() throws Exception {
        final Vector vector = new Vector();
        this.brokerA = createBroker("61617", "target/data/shared");
        this.brokerA.start();
        final BrokerService createBroker = createBroker("63617", "target/data/shared");
        this.brokerA1 = createBroker;
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        newCachedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.network.FailoverStaticNetworkTest.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    createBroker.start();
                } catch (Exception e) {
                    e.printStackTrace();
                    vector.add(e);
                }
            }
        });
        newCachedThreadPool.shutdown();
        HashMap<String, String> hashMap = new HashMap<>();
        hashMap.put("duplex", "true");
        this.brokerB = createBroker("tcp", "62617", new String[]{"61617", "63617"}, hashMap);
        this.brokerB.start();
        doTestNetworkSendReceive(this.brokerA, this.brokerB);
        doTestNetworkSendReceive(this.brokerB, this.brokerA);
        LOG.info("stopping brokerA (master shared_broker)");
        this.brokerA.stop();
        this.brokerA.waitUntilStopped();
        this.brokerA1.waitUntilStarted();
        doTestNetworkSendReceive(this.brokerA1, this.brokerB);
        doTestNetworkSendReceive(this.brokerB, this.brokerA1);
        Assert.assertTrue("No unexpected exceptions " + vector, vector.isEmpty());
    }

    @Test
    public void testSendReceiveFailoverDuplexWithPIM() throws Exception {
        this.brokerA = createBroker("61617", "target/data/shared/pim");
        this.brokerA.start();
        final BrokerService createBroker = createBroker("63617", "target/data/shared/pim");
        this.brokerA1 = createBroker;
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        newCachedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.network.FailoverStaticNetworkTest.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    createBroker.start();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        newCachedThreadPool.shutdown();
        HashMap<String, String> hashMap = new HashMap<>();
        hashMap.put("duplex", "true");
        hashMap.put("networkTTL", "2");
        this.brokerB = createBroker("tcp", "62617", new String[]{"61617", "63617"}, hashMap);
        this.brokerB.start();
        Assert.assertTrue("all props applied", hashMap.isEmpty());
        hashMap.put("duplex", "true");
        hashMap.put("networkTTL", "2");
        this.brokerC = createBroker("tcp", "64617", new String[]{"61617", "63617"}, hashMap);
        this.brokerC.start();
        Assert.assertTrue("all props applied a second time", hashMap.isEmpty());
        doTestNetworkSendReceive(this.brokerC, this.brokerB);
        doTestNetworkSendReceive(this.brokerB, this.brokerC);
        LOG.info("stopping brokerA (master shared_broker)");
        this.brokerA.stop();
        this.brokerA.waitUntilStopped();
        doTestNetworkSendReceive(this.brokerC, this.brokerB);
        doTestNetworkSendReceive(this.brokerB, this.brokerC);
        this.brokerC.stop();
        this.brokerC.waitUntilStopped();
    }

    @Test
    public void testSendReceive() throws Exception {
        this.brokerA = createBroker("tcp", "61617", null);
        this.brokerA.start();
        this.brokerB = createBroker("tcp", "62617", new String[]{"61617", "1111"});
        this.brokerB.start();
        doTestNetworkSendReceive();
    }

    @Test
    public void testSendReceiveSsl() throws Exception {
        this.brokerA = createBroker("ssl", "61617", null);
        this.brokerA.start();
        this.brokerB = createBroker("ssl", "62617", new String[]{"61617", "1111"});
        this.brokerB.start();
        doTestNetworkSendReceive();
    }

    @Test
    public void testRepeatedSendReceiveWithMasterSlaveAlternate() throws Exception {
        doTestRepeatedSendReceiveWithMasterSlaveAlternate(null);
    }

    @Test
    public void testRepeatedSendReceiveWithMasterSlaveAlternateDuplex() throws Exception {
        HashMap<String, String> hashMap = new HashMap<>();
        hashMap.put("duplex", "true");
        doTestRepeatedSendReceiveWithMasterSlaveAlternate(hashMap);
    }

    public void doTestRepeatedSendReceiveWithMasterSlaveAlternate(HashMap<String, String> hashMap) throws Exception {
        this.brokerB = createBroker("tcp", "62617", new String[]{"61610", "61611"}, hashMap);
        this.brokerB.start();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        newCachedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.network.FailoverStaticNetworkTest.3
            @Override // java.lang.Runnable
            public void run() {
                while (!atomicBoolean.get()) {
                    try {
                        FailoverStaticNetworkTest.this.brokerA = FailoverStaticNetworkTest.this.createBroker("tcp", "61610", null);
                        FailoverStaticNetworkTest.this.brokerA.setBrokerName("Pair");
                        FailoverStaticNetworkTest.this.brokerA.setBrokerObjectName(new ObjectName(FailoverStaticNetworkTest.this.brokerA.getManagementContext().getJmxDomainName() + ":BrokerName=" + JMXSupport.encodeObjectNamePart("A") + ",Type=Broker"));
                        FailoverStaticNetworkTest.this.brokerA.getPersistenceAdapter().getLocker().setLockAcquireSleepInterval(1000L);
                        FailoverStaticNetworkTest.this.brokerA.start();
                        FailoverStaticNetworkTest.this.brokerA.waitUntilStopped();
                        FailoverStaticNetworkTest.this.brokerA1.waitUntilStarted();
                    } catch (Exception e) {
                        FailoverStaticNetworkTest.LOG.info("A create/start, unexpected: " + e, e);
                        return;
                    }
                }
            }
        });
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.FailoverStaticNetworkTest.4
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return FailoverStaticNetworkTest.this.brokerA != null && FailoverStaticNetworkTest.this.brokerA.waitUntilStarted();
            }
        });
        newCachedThreadPool.execute(new Runnable() { // from class: org.apache.activemq.network.FailoverStaticNetworkTest.5
            @Override // java.lang.Runnable
            public void run() {
                while (!atomicBoolean.get()) {
                    try {
                        FailoverStaticNetworkTest.this.brokerA1 = FailoverStaticNetworkTest.this.createBroker("tcp", "61611", null);
                        FailoverStaticNetworkTest.this.brokerA1.setBrokerName("Pair");
                        FailoverStaticNetworkTest.this.brokerA1.setBrokerObjectName(new ObjectName(FailoverStaticNetworkTest.this.brokerA.getManagementContext().getJmxDomainName() + ":BrokerName=" + JMXSupport.encodeObjectNamePart("A1") + ",Type=Broker"));
                        FailoverStaticNetworkTest.this.brokerA1.getPersistenceAdapter().getLocker().setLockAcquireSleepInterval(1000L);
                        FailoverStaticNetworkTest.this.brokerA1.start();
                        FailoverStaticNetworkTest.this.brokerA1.waitUntilStopped();
                        FailoverStaticNetworkTest.this.brokerA.waitUntilStarted();
                    } catch (Exception e) {
                        FailoverStaticNetworkTest.LOG.info("A1 create/start, unexpected: " + e, e);
                        return;
                    }
                }
            }
        });
        for (int i = 0; i < 4; i++) {
            BrokerService brokerService = i % 2 == 0 ? this.brokerA : this.brokerA1;
            LOG.info("iteration: " + i + ", using: " + brokerService.getBrokerObjectName().getKeyProperty("BrokerName"));
            brokerService.waitUntilStarted();
            doTestNetworkSendReceive(this.brokerB, brokerService);
            LOG.info("Stopping " + brokerService.getBrokerObjectName().getKeyProperty("BrokerName"));
            brokerService.stop();
            brokerService.waitUntilStopped();
        }
        atomicBoolean.set(true);
        LOG.info("all done");
        newCachedThreadPool.shutdownNow();
    }

    private void doTestNetworkSendReceive() throws Exception, JMSException {
        doTestNetworkSendReceive(this.brokerB, this.brokerA);
    }

    private void doTestNetworkSendReceive(BrokerService brokerService, final BrokerService brokerService2) throws Exception, JMSException {
        LOG.info("Creating Consumer on the networked broker ..." + brokerService2);
        SslContext.setCurrentSslContext(this.sslContext);
        Connection createConnection = createConnectionFactory(brokerService2).createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        ActiveMQDestination activeMQDestination = (ActiveMQDestination) createSession.createQueue(DESTINATION_NAME);
        final MessageConsumer createConsumer = createSession.createConsumer(activeMQDestination);
        LOG.info("publishing to " + brokerService);
        sendMessageTo(activeMQDestination, brokerService);
        boolean waitFor = Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.FailoverStaticNetworkTest.6
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                Message receive = createConsumer.receive(5000L);
                FailoverStaticNetworkTest.LOG.info("from:  " + brokerService2.getBrokerObjectName().getKeyProperty("BrokerName") + ", received: " + receive);
                return receive != null;
            }
        });
        try {
            createConnection.close();
        } catch (JMSException e) {
        }
        Assert.assertTrue("consumer on A got message", waitFor);
    }

    private void sendMessageTo(ActiveMQDestination activeMQDestination, BrokerService brokerService) throws Exception {
        Connection createConnection = createConnectionFactory(brokerService).createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        createSession.createProducer(activeMQDestination).send(createSession.createTextMessage("Hi"));
        createConnection.close();
    }

    protected ConnectionFactory createConnectionFactory(BrokerService brokerService) throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(((TransportConnector) brokerService.getTransportConnectors().get(0)).getServer().getConnectURI().toString());
        activeMQConnectionFactory.setOptimizedMessageDispatch(true);
        activeMQConnectionFactory.setDispatchAsync(false);
        activeMQConnectionFactory.setUseAsyncSend(false);
        activeMQConnectionFactory.setOptimizeAcknowledge(false);
        activeMQConnectionFactory.setAlwaysSyncSend(true);
        return activeMQConnectionFactory;
    }
}
