/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.net.URLStreamHandler;
import java.net.URLStreamHandlerFactory;
import java.util.Map;
import java.util.Vector;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.apache.activemq.xbean.XBeanBrokerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RequestReplyNoAdvisoryNetworkTest
extends JmsMultipleBrokersTestSupport {
    private static final transient Logger LOG = LoggerFactory.getLogger(RequestReplyNoAdvisoryNetworkTest.class);
    Vector<BrokerService> brokers = new Vector();
    BrokerService a;
    BrokerService b;
    ActiveMQQueue sendQ = new ActiveMQQueue("sendQ");
    static final String connectionIdMarker = "ID:marker.";
    ActiveMQTempQueue replyQWildcard = new ActiveMQTempQueue("ID:marker.>");
    private final long receiveTimeout = 30000L;

    public void testNonAdvisoryNetworkRequestReplyXmlConfig() throws Exception {
        final String xmlConfigString = new String("<beans xmlns=\"http://www.springframework.org/schema/beans\" xmlns:amq=\"http://activemq.apache.org/schema/core\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:schemaLocation=\"http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd\">  <broker xmlns=\"http://activemq.apache.org/schema/core\" id=\"broker\"    allowTempAutoCreationOnSend=\"true\" schedulePeriodForDestinationPurge=\"1000\"    brokerName=\"%HOST%\" persistent=\"false\" advisorySupport=\"false\" useJmx=\"false\" >   <destinationPolicy>    <policyMap>     <policyEntries>      <policyEntry optimizedDispatch=\"true\"  gcInactiveDestinations=\"true\" gcWithNetworkConsumers=\"true\" inactiveTimoutBeforeGC=\"1000\">       <destination>        <tempQueue physicalName=\"" + this.replyQWildcard.getPhysicalName() + "\"/>" + "       </destination>" + "      </policyEntry>" + "     </policyEntries>" + "    </policyMap>" + "   </destinationPolicy>" + "   <networkConnectors>" + "    <networkConnector uri=\"multicast://default\">" + "     <staticallyIncludedDestinations>" + "      <queue physicalName=\"" + this.sendQ.getPhysicalName() + "\"/>" + "      <tempQueue physicalName=\"" + this.replyQWildcard.getPhysicalName() + "\"/>" + "     </staticallyIncludedDestinations>" + "    </networkConnector>" + "   </networkConnectors>" + "   <transportConnectors>" + "     <transportConnector uri=\"tcp://0.0.0.0:0\" discoveryUri=\"multicast://default\" />" + "   </transportConnectors>" + "  </broker>" + "</beans>");
        String localProtocolScheme = "inline";
        URL.setURLStreamHandlerFactory(new URLStreamHandlerFactory(){

            @Override
            public URLStreamHandler createURLStreamHandler(String protocol) {
                if ("inline".equalsIgnoreCase(protocol)) {
                    return new URLStreamHandler(){

                        @Override
                        protected URLConnection openConnection(URL u) throws IOException {
                            return new URLConnection(u){

                                @Override
                                public void connect() throws IOException {
                                }

                                @Override
                                public InputStream getInputStream() throws IOException {
                                    return new ByteArrayInputStream(xmlConfigString.replace("%HOST%", this.url.getFile()).getBytes("UTF-8"));
                                }
                            };
                        }
                    };
                }
                return null;
            }
        });
        this.a = new XBeanBrokerFactory().createBroker(new URI("xbean:inline:A"));
        this.b = new XBeanBrokerFactory().createBroker(new URI("xbean:inline:B"));
        this.brokers.add(this.a);
        this.brokers.add(this.b);
        this.doTestNonAdvisoryNetworkRequestReply();
    }

    public void testNonAdvisoryNetworkRequestReply() throws Exception {
        this.createBridgeAndStartBrokers();
        this.doTestNonAdvisoryNetworkRequestReply();
    }

    public void testNonAdvisoryNetworkRequestReplyWithPIM() throws Exception {
        this.a = this.configureBroker("A");
        this.b = this.configureBroker("B");
        BrokerService hub = this.configureBroker("M");
        hub.setAllowTempAutoCreationOnSend(true);
        this.configureForPiggyInTheMiddle(this.bridge(this.a, hub));
        this.configureForPiggyInTheMiddle(this.bridge(this.b, hub));
        this.startBrokers();
        this.waitForBridgeFormation(hub, 2, 0);
        this.doTestNonAdvisoryNetworkRequestReply();
    }

    private void configureForPiggyInTheMiddle(NetworkConnector bridge) {
        bridge.setDuplex(true);
        bridge.setNetworkTTL(2);
    }

    public void doTestNonAdvisoryNetworkRequestReply() throws Exception {
        this.waitForBridgeFormation(this.a, 1, 0);
        this.waitForBridgeFormation(this.b, 1, 0);
        ActiveMQConnectionFactory sendFactory = this.createConnectionFactory(this.a);
        ActiveMQConnection sendConnection = this.createConnection(sendFactory);
        ActiveMQSession sendSession = (ActiveMQSession)sendConnection.createSession(false, 1);
        MessageProducer producer = sendSession.createProducer((Destination)this.sendQ);
        ActiveMQTempQueue realReplyQ = (ActiveMQTempQueue)sendSession.createTemporaryQueue();
        TextMessage message = sendSession.createTextMessage("1");
        message.setJMSReplyTo((Destination)realReplyQ);
        producer.send((Message)message);
        LOG.info("request sent");
        ActiveMQConnectionFactory consumerFactory = this.createConnectionFactory(this.b);
        ActiveMQConnection consumerConnection = this.createConnection(consumerFactory);
        ActiveMQSession consumerSession = (ActiveMQSession)consumerConnection.createSession(false, 1);
        MessageConsumer consumer = consumerSession.createConsumer((Destination)this.sendQ);
        TextMessage received = (TextMessage)consumer.receive(30000L);
        RequestReplyNoAdvisoryNetworkTest.assertNotNull((String)"got request from sender ok", (Object)received);
        LOG.info("got request, sending reply");
        MessageProducer consumerProducer = consumerSession.createProducer(received.getJMSReplyTo());
        consumerProducer.send((Message)consumerSession.createTextMessage("got " + received.getText()));
        consumerConnection.close();
        MessageConsumer replyConsumer = sendSession.createConsumer((Destination)realReplyQ);
        TextMessage reply = (TextMessage)replyConsumer.receive(30000L);
        RequestReplyNoAdvisoryNetworkTest.assertNotNull((String)"expected reply message", (Object)reply);
        RequestReplyNoAdvisoryNetworkTest.assertEquals((String)"text is as expected", (String)"got 1", (String)reply.getText());
        sendConnection.close();
        LOG.info("checking for dangling temp destinations");
        for (BrokerService brokerService : this.brokers) {
            final RegionBroker regionBroker = (RegionBroker)brokerService.getRegionBroker();
            RequestReplyNoAdvisoryNetworkTest.assertTrue((String)("all temps are gone on " + regionBroker.getBrokerName()), (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    Map tempTopics = regionBroker.getTempTopicRegion().getDestinationMap();
                    LOG.info("temp topics on " + regionBroker.getBrokerName() + ", " + tempTopics);
                    Map tempQ = regionBroker.getTempQueueRegion().getDestinationMap();
                    LOG.info("temp queues on " + regionBroker.getBrokerName() + ", " + tempQ);
                    return tempQ.isEmpty() && tempTopics.isEmpty();
                }
            }));
        }
    }

    private ActiveMQConnection createConnection(ActiveMQConnectionFactory factory) throws Exception {
        ActiveMQConnection c = (ActiveMQConnection)factory.createConnection();
        c.start();
        return c;
    }

    private ActiveMQConnectionFactory createConnectionFactory(BrokerService brokerService) throws Exception {
        String target = ((TransportConnector)brokerService.getTransportConnectors().get(0)).getPublishableConnectString();
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(target);
        factory.setWatchTopicAdvisories(false);
        factory.setConnectionIDPrefix(connectionIdMarker + brokerService.getBrokerName());
        return factory;
    }

    public void createBridgeAndStartBrokers() throws Exception {
        this.a = this.configureBroker("A");
        this.b = this.configureBroker("B");
        this.bridge(this.a, this.b);
        this.bridge(this.b, this.a);
        this.startBrokers();
    }

    private void startBrokers() throws Exception {
        for (BrokerService broker : this.brokers) {
            broker.start();
        }
    }

    @Override
    public void tearDown() throws Exception {
        for (BrokerService broker : this.brokers) {
            broker.stop();
        }
        this.brokers.clear();
    }

    private NetworkConnector bridge(BrokerService from, BrokerService to) throws Exception {
        TransportConnector toConnector = (TransportConnector)to.getTransportConnectors().get(0);
        NetworkConnector bridge = from.addNetworkConnector("static://" + toConnector.getPublishableConnectString());
        bridge.addStaticallyIncludedDestination((ActiveMQDestination)this.sendQ);
        bridge.addStaticallyIncludedDestination((ActiveMQDestination)this.replyQWildcard);
        return bridge;
    }

    private BrokerService configureBroker(String brokerName) throws Exception {
        BrokerService broker = new BrokerService();
        broker.setBrokerName(brokerName);
        broker.setAdvisorySupport(false);
        broker.setPersistent(false);
        broker.setUseJmx(false);
        broker.setSchedulePeriodForDestinationPurge(1000);
        broker.setAllowTempAutoCreationOnSend(true);
        PolicyMap map = new PolicyMap();
        PolicyEntry tempReplyQPolicy = new PolicyEntry();
        tempReplyQPolicy.setOptimizedDispatch(true);
        tempReplyQPolicy.setGcInactiveDestinations(true);
        tempReplyQPolicy.setGcWithNetworkConsumers(true);
        tempReplyQPolicy.setInactiveTimoutBeforeGC(1000L);
        map.put((ActiveMQDestination)this.replyQWildcard, (Object)tempReplyQPolicy);
        broker.setDestinationPolicy(map);
        broker.addConnector("tcp://localhost:0");
        this.brokers.add(broker);
        return broker;
    }
}

