/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.net.MalformedURLException;
import java.net.URI;
import java.util.Set;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Assume;

public class DurableSubscriberWithNetworkRestartTest
extends JmsMultipleBrokersTestSupport {
    private static final Log LOG = LogFactory.getLog(DurableSubscriberWithNetworkRestartTest.class);
    private static final String HUB = "HubBroker";
    private static final String SPOKE = "SpokeBroker";
    protected static final int MESSAGE_COUNT = 10;
    public boolean dynamicOnly = false;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testSendOnAReceiveOnBWithTransportDisconnectDynamicOnly() throws Exception {
        this.dynamicOnly = true;
        try {
            this.testSendOnAReceiveOnBWithTransportDisconnect();
        }
        finally {
            this.dynamicOnly = false;
        }
    }

    public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception {
        this.bridge(SPOKE, HUB);
        this.startAllBrokers();
        this.verifyDuplexBridgeMbean();
        URI hubURI = ((TransportConnector)((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)HUB)).broker.getTransportConnectors().get(0)).getPublishableConnectURI();
        URI spokeURI = ((TransportConnector)((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)SPOKE)).broker.getTransportConnectors().get(0)).getPublishableConnectURI();
        ActiveMQConnectionFactory facHub = new ActiveMQConnectionFactory(hubURI);
        ActiveMQConnectionFactory facSpoke = new ActiveMQConnectionFactory(spokeURI);
        Connection conHub = facHub.createConnection();
        Connection conSpoke = facSpoke.createConnection();
        conHub.setClientID("clientHUB");
        conSpoke.setClientID("clientSPOKE");
        conHub.start();
        conSpoke.start();
        Session sesHub = conHub.createSession(false, 1);
        Session sesSpoke = conSpoke.createSession(false, 1);
        ActiveMQTopic topic = new ActiveMQTopic("TEST.FOO");
        String consumerName = "consumerName";
        TopicSubscriber remoteConsumer = sesHub.createDurableSubscriber((Topic)topic, consumerName);
        this.sleep(1000);
        remoteConsumer.close();
        MessageProducer localProducer = sesSpoke.createProducer((Destination)topic);
        localProducer.setDeliveryMode(2);
        String payloadString = new String(new byte[10240]);
        for (int i = 0; i < 10; ++i) {
            TextMessage test = sesSpoke.createTextMessage("test-" + i);
            test.setStringProperty("payload", payloadString);
            localProducer.send((Message)test);
        }
        localProducer.close();
        String options = "?persistent=true&useJmx=true&deleteAllMessagesOnStartup=false";
        for (int i = 0; i < 2; ++i) {
            ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)SPOKE)).broker.stop();
            this.sleep(1000);
            this.createBroker(new URI("broker:(tcp://localhost:61616)/SpokeBroker?persistent=true&useJmx=true&deleteAllMessagesOnStartup=false"));
            this.bridge(SPOKE, HUB);
            ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)SPOKE)).broker.start();
            LOG.info((Object)("restarted spoke..:" + i));
            DurableSubscriberWithNetworkRestartTest.assertTrue((String)"got mbeans on restart", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return DurableSubscriberWithNetworkRestartTest.this.countMbeans(((JmsMultipleBrokersTestSupport.BrokerItem)((DurableSubscriberWithNetworkRestartTest)DurableSubscriberWithNetworkRestartTest.this).brokers.get((Object)DurableSubscriberWithNetworkRestartTest.HUB)).broker, "networkBridge", 20000) == (DurableSubscriberWithNetworkRestartTest.this.dynamicOnly ? 1 : 2);
                }
            }));
        }
    }

    private void verifyDuplexBridgeMbean() throws Exception {
        DurableSubscriberWithNetworkRestartTest.assertEquals((int)1, (int)this.countMbeans(((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)HUB)).broker, "networkBridge", 5000));
    }

    private int countMbeans(BrokerService broker, String type, int timeout) throws Exception {
        long expiryTime = System.currentTimeMillis() + (long)timeout;
        if (!type.contains("=")) {
            type = type + "=*";
        }
        ObjectName beanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=" + broker.getBrokerName() + "," + type + ",*");
        Set mbeans = null;
        int count = 0;
        do {
            if (timeout > 0) {
                Thread.sleep(100L);
            }
            if ((mbeans = broker.getManagementContext().queryNames(beanName, null)) == null) continue;
            count = mbeans.size();
            LOG.info((Object)("Found: " + count + ", matching type: " + type));
            for (ObjectName objectName : mbeans) {
                LOG.info((Object)("" + objectName));
            }
        } while ((mbeans == null || mbeans.isEmpty()) && expiryTime > System.currentTimeMillis());
        if (timeout > 0) {
            Assume.assumeNotNull((Object[])new Object[]{mbeans});
        }
        return count;
    }

    private void logAllMbeans(BrokerService broker) throws MalformedURLException {
        try {
            Set all = broker.getManagementContext().queryNames(null, null);
            LOG.info((Object)("Total MBean count=" + all.size()));
            for (Object o : all) {
                LOG.info(o);
            }
        }
        catch (Exception ignored) {
            LOG.warn((Object)("getMBeanServer ex: " + ignored));
        }
    }

    public NetworkConnector bridge(String from, String to) throws Exception {
        NetworkConnector networkConnector = this.bridgeBrokers(from, to, this.dynamicOnly, -1, true);
        networkConnector.setSuppressDuplicateQueueSubscriptions(true);
        networkConnector.setDecreaseNetworkConsumerPriority(true);
        networkConnector.setConsumerTTL(1);
        networkConnector.setDuplex(true);
        return networkConnector;
    }

    @Override
    protected void startAllBrokers() throws Exception {
        JmsMultipleBrokersTestSupport.BrokerItem brokerItem = (JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get(HUB);
        brokerItem.broker.start();
        brokerItem = (JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get(SPOKE);
        brokerItem.broker.start();
        this.sleep(600);
    }

    @Override
    public void setUp() throws Exception {
        super.setAutoFail(false);
        super.setUp();
        this.createBrokers(true);
    }

    private void createBrokers(boolean del) throws Exception {
        String options = "?persistent=true&useJmx=true&deleteAllMessagesOnStartup=" + del;
        this.createBroker(new URI("broker:(tcp://localhost:61617)/HubBroker" + options));
        this.createBroker(new URI("broker:(tcp://localhost:61616)/SpokeBroker" + options));
    }

    @Override
    protected void configureBroker(BrokerService broker) {
        broker.setKeepDurableSubsActive(false);
        broker.getManagementContext().setCreateConnector(false);
        PolicyMap defaultPolcyMap = new PolicyMap();
        PolicyEntry defaultPolicy = new PolicyEntry();
        if (broker.getBrokerName().equals(HUB)) {
            defaultPolicy.setStoreUsageHighWaterMark(2);
            broker.getSystemUsage().getStoreUsage().setLimit(0x100000L);
        }
        defaultPolcyMap.setDefaultEntry(defaultPolicy);
        broker.setDestinationPolicy(defaultPolcyMap);
        broker.getSystemUsage().getMemoryUsage().setLimit(0x6400000L);
    }

    @Override
    public void tearDown() throws Exception {
        super.tearDown();
    }

    private void sleep(int milliSecondTime) {
        try {
            Thread.sleep(milliSecondTime);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }
}

