/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.network;

import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.management.ObjectName;
import javax.net.ssl.KeyManager;
import javax.net.ssl.TrustManager;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.transport.tcp.SslBrokerServiceTest;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FailoverStaticNetworkTest {
    protected static final Logger LOG = LoggerFactory.getLogger(FailoverStaticNetworkTest.class);
    private static final String DESTINATION_NAME = "testQ";
    protected BrokerService brokerA;
    protected BrokerService brokerA1;
    protected BrokerService brokerB;
    protected BrokerService brokerC;
    private SslContext sslContext;

    protected BrokerService createBroker(String scheme, String listenPort, String[] networkToPorts) throws Exception {
        return this.createBroker(scheme, listenPort, networkToPorts, null);
    }

    protected BrokerService createBroker(String scheme, String listenPort, String[] networkToPorts, HashMap<String, String> networkProps) throws Exception {
        BrokerService broker = new BrokerService();
        broker.getManagementContext().setCreateConnector(false);
        broker.setSslContext(this.sslContext);
        broker.setDeleteAllMessagesOnStartup(true);
        broker.setBrokerName("Broker_" + listenPort);
        TransportConnector transportConnector = new TransportConnector();
        transportConnector.setUri(new URI(scheme + "://localhost:" + listenPort));
        ArrayList<TransportConnector> transportConnectors = new ArrayList<TransportConnector>();
        transportConnectors.add(transportConnector);
        broker.setTransportConnectors(transportConnectors);
        if (networkToPorts != null && networkToPorts.length > 0) {
            StringBuilder builder = new StringBuilder("static:(failover:(" + scheme + "://localhost:");
            builder.append(networkToPorts[0]);
            for (int i = 1; i < networkToPorts.length; ++i) {
                builder.append("," + scheme + "://localhost:" + networkToPorts[i]);
            }
            builder.append(")?maxReconnectAttempts=0)?useExponentialBackOff=false");
            NetworkConnector nc = broker.addNetworkConnector(builder.toString());
            if (networkProps != null) {
                IntrospectionSupport.setProperties((Object)nc, networkProps);
            }
        }
        return broker;
    }

    private BrokerService createBroker(String listenPort, String dataDir) throws Exception {
        BrokerService broker = new BrokerService();
        broker.setUseJmx(false);
        broker.getManagementContext().setCreateConnector(false);
        broker.setBrokerName("Broker_Shared");
        TransportConnector connector = new TransportConnector();
        connector.setUri(new URI("tcp://localhost:" + listenPort));
        broker.addConnector(connector);
        broker.setDataDirectory(dataDir);
        return broker;
    }

    @Before
    public void setUp() throws Exception {
        KeyManager[] km = SslBrokerServiceTest.getKeyManager();
        TrustManager[] tm = SslBrokerServiceTest.getTrustManager();
        this.sslContext = new SslContext(km, tm, null);
    }

    @After
    public void tearDown() throws Exception {
        this.brokerB.stop();
        this.brokerB.waitUntilStopped();
        this.brokerA.stop();
        this.brokerA.waitUntilStopped();
        if (this.brokerA1 != null) {
            this.brokerA1.stop();
            this.brokerA1.waitUntilStopped();
        }
        if (this.brokerC != null) {
            this.brokerC.stop();
            this.brokerC.waitUntilStopped();
        }
    }

    @Test
    public void testSendReceiveAfterReconnect() throws Exception {
        this.brokerA = this.createBroker("tcp", "61617", null);
        this.brokerA.start();
        this.brokerB = this.createBroker("tcp", "62617", new String[]{"61617"});
        this.brokerB.start();
        this.doTestNetworkSendReceive();
        LOG.info("stopping brokerA");
        this.brokerA.stop();
        this.brokerA.waitUntilStopped();
        LOG.info("restarting brokerA");
        this.brokerA = this.createBroker("tcp", "61617", null);
        this.brokerA.start();
        this.doTestNetworkSendReceive();
    }

    @Test
    public void testSendReceiveFailover() throws Exception {
        this.brokerA = this.createBroker("tcp", "61617", null);
        this.brokerA.start();
        this.brokerB = this.createBroker("tcp", "62617", new String[]{"61617", "63617"});
        this.brokerB.start();
        this.doTestNetworkSendReceive();
        Set<String> bridgeNames = this.getNetworkBridgeMBeanName(this.brokerB);
        Assert.assertEquals((String)("only one bridgeName: " + bridgeNames), (long)1L, (long)bridgeNames.size());
        LOG.info("stopping brokerA");
        this.brokerA.stop();
        this.brokerA.waitUntilStopped();
        LOG.info("restarting brokerA");
        this.brokerA = this.createBroker("tcp", "63617", null);
        this.brokerA.start();
        this.doTestNetworkSendReceive();
        Set<String> otherBridgeNames = this.getNetworkBridgeMBeanName(this.brokerB);
        Assert.assertEquals((String)("only one bridgeName: " + otherBridgeNames), (long)1L, (long)otherBridgeNames.size());
        Assert.assertTrue((String)"there was an addition", (boolean)bridgeNames.addAll(otherBridgeNames));
    }

    private Set<String> getNetworkBridgeMBeanName(BrokerService brokerB) throws Exception {
        HashSet<String> names = new HashSet<String>();
        for (ObjectName objectName : brokerB.getManagementContext().queryNames(null, null)) {
            if (objectName.getKeyProperty("networkBridge") == null) continue;
            names.add(objectName.getKeyProperty("networkBridge"));
        }
        return names;
    }

    @Test
    public void testSendReceiveFailoverDuplex() throws Exception {
        BrokerService slave;
        final Vector errors = new Vector();
        String dataDir = "target/data/shared";
        this.brokerA = this.createBroker("61617", "target/data/shared");
        this.brokerA.start();
        this.brokerA1 = slave = this.createBroker("63617", "target/data/shared");
        ExecutorService executor = Executors.newCachedThreadPool();
        executor.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    slave.start();
                }
                catch (Exception e) {
                    e.printStackTrace();
                    errors.add(e);
                }
            }
        });
        executor.shutdown();
        HashMap<String, String> networkConnectorProps = new HashMap<String, String>();
        networkConnectorProps.put("duplex", "true");
        this.brokerB = this.createBroker("tcp", "62617", new String[]{"61617", "63617"}, networkConnectorProps);
        this.brokerB.start();
        this.doTestNetworkSendReceive(this.brokerA, this.brokerB);
        this.doTestNetworkSendReceive(this.brokerB, this.brokerA);
        LOG.info("stopping brokerA (master shared_broker)");
        this.brokerA.stop();
        this.brokerA.waitUntilStopped();
        this.brokerA1.waitUntilStarted();
        this.doTestNetworkSendReceive(this.brokerA1, this.brokerB);
        this.doTestNetworkSendReceive(this.brokerB, this.brokerA1);
        Assert.assertTrue((String)("No unexpected exceptions " + errors), (boolean)errors.isEmpty());
    }

    @Test
    public void testSendReceiveFailoverDuplexWithPIM() throws Exception {
        BrokerService slave;
        String dataDir = "target/data/shared/pim";
        this.brokerA = this.createBroker("61617", "target/data/shared/pim");
        this.brokerA.start();
        this.brokerA1 = slave = this.createBroker("63617", "target/data/shared/pim");
        ExecutorService executor = Executors.newCachedThreadPool();
        executor.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    slave.start();
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        executor.shutdown();
        HashMap<String, String> networkConnectorProps = new HashMap<String, String>();
        networkConnectorProps.put("duplex", "true");
        networkConnectorProps.put("networkTTL", "2");
        this.brokerB = this.createBroker("tcp", "62617", new String[]{"61617", "63617"}, networkConnectorProps);
        this.brokerB.start();
        Assert.assertTrue((String)"all props applied", (boolean)networkConnectorProps.isEmpty());
        networkConnectorProps.put("duplex", "true");
        networkConnectorProps.put("networkTTL", "2");
        this.brokerC = this.createBroker("tcp", "64617", new String[]{"61617", "63617"}, networkConnectorProps);
        this.brokerC.start();
        Assert.assertTrue((String)"all props applied a second time", (boolean)networkConnectorProps.isEmpty());
        this.doTestNetworkSendReceive(this.brokerC, this.brokerB);
        this.doTestNetworkSendReceive(this.brokerB, this.brokerC);
        LOG.info("stopping brokerA (master shared_broker)");
        this.brokerA.stop();
        this.brokerA.waitUntilStopped();
        this.doTestNetworkSendReceive(this.brokerC, this.brokerB);
        this.doTestNetworkSendReceive(this.brokerB, this.brokerC);
        this.brokerC.stop();
        this.brokerC.waitUntilStopped();
    }

    @Test
    public void testSendReceive() throws Exception {
        this.brokerA = this.createBroker("tcp", "61617", null);
        this.brokerA.start();
        this.brokerB = this.createBroker("tcp", "62617", new String[]{"61617", "1111"});
        this.brokerB.start();
        this.doTestNetworkSendReceive();
    }

    @Test
    public void testSendReceiveSsl() throws Exception {
        this.brokerA = this.createBroker("ssl", "61617", null);
        this.brokerA.start();
        this.brokerB = this.createBroker("ssl", "62617", new String[]{"61617", "1111"});
        this.brokerB.start();
        this.doTestNetworkSendReceive();
    }

    @Test
    public void testRepeatedSendReceiveWithMasterSlaveAlternate() throws Exception {
        this.doTestRepeatedSendReceiveWithMasterSlaveAlternate(null);
    }

    @Test
    public void testRepeatedSendReceiveWithMasterSlaveAlternateDuplex() throws Exception {
        HashMap<String, String> networkConnectorProps = new HashMap<String, String>();
        networkConnectorProps.put("duplex", "true");
        this.doTestRepeatedSendReceiveWithMasterSlaveAlternate(networkConnectorProps);
    }

    public void doTestRepeatedSendReceiveWithMasterSlaveAlternate(HashMap<String, String> networkConnectorProps) throws Exception {
        this.brokerB = this.createBroker("tcp", "62617", new String[]{"61610", "61611"}, networkConnectorProps);
        this.brokerB.start();
        final AtomicBoolean done = new AtomicBoolean(false);
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    while (!done.get()) {
                        FailoverStaticNetworkTest.this.brokerA = FailoverStaticNetworkTest.this.createBroker("tcp", "61610", null);
                        FailoverStaticNetworkTest.this.brokerA.setBrokerName("Pair");
                        FailoverStaticNetworkTest.this.brokerA.setBrokerObjectName(new ObjectName(FailoverStaticNetworkTest.this.brokerA.getManagementContext().getJmxDomainName() + ":" + "brokerName=" + JMXSupport.encodeObjectNamePart((String)"A") + "," + "Type=Broker"));
                        ((KahaDBPersistenceAdapter)FailoverStaticNetworkTest.this.brokerA.getPersistenceAdapter()).getLocker().setLockAcquireSleepInterval(1000L);
                        FailoverStaticNetworkTest.this.brokerA.start();
                        FailoverStaticNetworkTest.this.brokerA.waitUntilStopped();
                        FailoverStaticNetworkTest.this.brokerA1.waitUntilStarted();
                    }
                }
                catch (Exception ignored) {
                    LOG.info("A create/start, unexpected: " + ignored, (Throwable)ignored);
                }
            }
        });
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return FailoverStaticNetworkTest.this.brokerA != null && FailoverStaticNetworkTest.this.brokerA.waitUntilStarted();
            }
        });
        executorService.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    while (!done.get()) {
                        FailoverStaticNetworkTest.this.brokerA1 = FailoverStaticNetworkTest.this.createBroker("tcp", "61611", null);
                        FailoverStaticNetworkTest.this.brokerA1.setBrokerName("Pair");
                        FailoverStaticNetworkTest.this.brokerA1.setBrokerObjectName(new ObjectName(FailoverStaticNetworkTest.this.brokerA.getManagementContext().getJmxDomainName() + ":" + "brokerName=" + JMXSupport.encodeObjectNamePart((String)"A1") + "," + "Type=Broker"));
                        ((KahaDBPersistenceAdapter)FailoverStaticNetworkTest.this.brokerA1.getPersistenceAdapter()).getLocker().setLockAcquireSleepInterval(1000L);
                        FailoverStaticNetworkTest.this.brokerA1.start();
                        FailoverStaticNetworkTest.this.brokerA1.waitUntilStopped();
                        FailoverStaticNetworkTest.this.brokerA.waitUntilStarted();
                    }
                }
                catch (Exception ignored) {
                    LOG.info("A1 create/start, unexpected: " + ignored, (Throwable)ignored);
                }
            }
        });
        for (int i = 0; i < 4; ++i) {
            BrokerService currentMaster = i % 2 == 0 ? this.brokerA : this.brokerA1;
            LOG.info("iteration: " + i + ", using: " + currentMaster.getBrokerObjectName().getKeyProperty("brokerName"));
            currentMaster.waitUntilStarted();
            this.doTestNetworkSendReceive(this.brokerB, currentMaster);
            LOG.info("Stopping " + currentMaster.getBrokerObjectName().getKeyProperty("brokerName"));
            currentMaster.stop();
            currentMaster.waitUntilStopped();
        }
        done.set(true);
        LOG.info("all done");
        executorService.shutdownNow();
    }

    private void doTestNetworkSendReceive() throws Exception, JMSException {
        this.doTestNetworkSendReceive(this.brokerB, this.brokerA);
    }

    private void doTestNetworkSendReceive(BrokerService to, final BrokerService from) throws Exception, JMSException {
        LOG.info("Creating Consumer on the networked broker ..." + from);
        SslContext.setCurrentSslContext((SslContext)this.sslContext);
        ConnectionFactory consFactory = this.createConnectionFactory(from);
        Connection consConn = consFactory.createConnection();
        consConn.start();
        Session consSession = consConn.createSession(false, 1);
        ActiveMQDestination destination = (ActiveMQDestination)consSession.createQueue(DESTINATION_NAME);
        final MessageConsumer consumer = consSession.createConsumer((Destination)destination);
        LOG.info("publishing to " + to);
        this.sendMessageTo(destination, to);
        boolean gotMessage = Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                Message message = consumer.receive(5000L);
                LOG.info("from:  " + from.getBrokerObjectName().getKeyProperty("brokerName") + ", received: " + message);
                return message != null;
            }
        });
        if (!gotMessage) {
            AutoFailTestSupport.dumpAllThreads((String)"noMessage");
        }
        try {
            consConn.close();
        }
        catch (JMSException jMSException) {
            // empty catch block
        }
        Assert.assertTrue((String)"consumer on A got message", (boolean)gotMessage);
    }

    private void sendMessageTo(ActiveMQDestination destination, BrokerService brokerService) throws Exception {
        ConnectionFactory factory = this.createConnectionFactory(brokerService);
        Connection conn = factory.createConnection();
        conn.start();
        Session session = conn.createSession(false, 1);
        session.createProducer((Destination)destination).send((Message)session.createTextMessage("Hi"));
        conn.close();
    }

    protected ConnectionFactory createConnectionFactory(BrokerService broker) throws Exception {
        String url = ((TransportConnector)broker.getTransportConnectors().get(0)).getServer().getConnectURI().toString();
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        connectionFactory.setOptimizedMessageDispatch(true);
        connectionFactory.setDispatchAsync(false);
        connectionFactory.setUseAsyncSend(false);
        connectionFactory.setOptimizeAcknowledge(false);
        connectionFactory.setAlwaysSyncSend(true);
        return connectionFactory;
    }
}

