/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.net.URI;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.region.AbstractRegion;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThreeBrokerTempDestDemandSubscriptionCleanupTest
extends JmsMultipleBrokersTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(ThreeBrokerTempDestDemandSubscriptionCleanupTest.class);
    boolean enableTempDestinationBridging = true;
    private static final String BROKER_A = "BrokerA";
    private static final String BROKER_B = "BrokerB";
    private static final String BROKER_C = "BrokerC";
    private static final String ECHO_QUEUE_NAME = "echo";
    private static final int NUM_ITER = 100;
    private static final long CONSUME_TIMEOUT = 500L;

    public void testSubscriptionsCleanedUpRace() throws Exception {
        final JmsMultipleBrokersTestSupport.BrokerItem brokerA = (JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get(BROKER_A);
        Runnable tester = new Runnable(){

            @Override
            public void run() {
                for (int i = 0; i < 100; ++i) {
                    Connection conn = null;
                    try {
                        conn = brokerA.createConnection();
                        conn.start();
                        Session sess = conn.createSession(false, 1);
                        Queue destination = sess.createQueue(ThreeBrokerTempDestDemandSubscriptionCleanupTest.ECHO_QUEUE_NAME);
                        MessageProducer producer = sess.createProducer((Destination)destination);
                        LOG.info("Starting iter: " + i);
                        TemporaryQueue replyTo = sess.createTemporaryQueue();
                        MessageConsumer responseConsumer = sess.createConsumer((Destination)replyTo);
                        TextMessage message = sess.createTextMessage("Iteration: " + i);
                        message.setJMSReplyTo((Destination)replyTo);
                        producer.send((Message)message);
                        TextMessage response = (TextMessage)responseConsumer.receive(500L);
                        TestCase.assertNotNull((String)("We should have gotten a response, but didn't for iter: " + i), (Object)response);
                        TestCase.assertEquals((String)"We got the wrong response from the echo service", (String)("Iteration: " + i), (String)response.getText());
                        responseConsumer.close();
                        conn.close();
                        continue;
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        TestCase.fail();
                    }
                }
            }
        };
        ExecutorService threadService = Executors.newFixedThreadPool(2);
        threadService.submit(tester);
        threadService.submit(tester);
        threadService.shutdown();
        ThreeBrokerTempDestDemandSubscriptionCleanupTest.assertTrue((String)"executor done on time", (boolean)threadService.awaitTermination(30L, TimeUnit.SECONDS));
        JmsMultipleBrokersTestSupport.BrokerItem brokerC = (JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get(BROKER_C);
        RegionBroker regionBroker = (RegionBroker)brokerC.broker.getRegionBroker();
        final AbstractRegion region = (AbstractRegion)regionBroker.getTempQueueRegion();
        ThreeBrokerTempDestDemandSubscriptionCleanupTest.assertTrue((String)"There were no lingering temp-queue destinations", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Lingering temps: " + region.getSubscriptions().size());
                return 0 == region.getSubscriptions().size();
            }
        }));
    }

    public void testSubscriptionsCleanedUpAfterConnectionClose() throws Exception {
        JmsMultipleBrokersTestSupport.BrokerItem brokerA = (JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get(BROKER_A);
        for (int i = 0; i < 100; ++i) {
            Connection conn = null;
            try {
                conn = brokerA.createConnection();
                conn.start();
                Session sess = conn.createSession(false, 1);
                Queue destination = sess.createQueue(ECHO_QUEUE_NAME);
                MessageProducer producer = sess.createProducer((Destination)destination);
                LOG.info("Starting iter: " + i);
                TemporaryQueue replyTo = sess.createTemporaryQueue();
                MessageConsumer responseConsumer = sess.createConsumer((Destination)replyTo);
                TextMessage message = sess.createTextMessage("Iteration: " + i);
                message.setJMSReplyTo((Destination)replyTo);
                producer.send((Message)message);
                TextMessage response = (TextMessage)responseConsumer.receive(500L);
                ThreeBrokerTempDestDemandSubscriptionCleanupTest.assertNotNull((String)("We should have gotten a response, but didn't for iter: " + i), (Object)response);
                ThreeBrokerTempDestDemandSubscriptionCleanupTest.assertEquals((String)"We got the wrong response from the echo service", (String)("Iteration: " + i), (String)response.getText());
                conn.close();
                continue;
            }
            catch (Exception e) {
                e.printStackTrace();
                ThreeBrokerTempDestDemandSubscriptionCleanupTest.fail();
            }
        }
        JmsMultipleBrokersTestSupport.BrokerItem brokerC = (JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get(BROKER_C);
        RegionBroker regionBroker = (RegionBroker)brokerC.broker.getRegionBroker();
        final AbstractRegion region = (AbstractRegion)regionBroker.getTempQueueRegion();
        ThreeBrokerTempDestDemandSubscriptionCleanupTest.assertTrue((String)"There were no lingering temp-queue destinations", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("Lingering temps: " + region.getSubscriptions().size());
                return 0 == region.getSubscriptions().size();
            }
        }));
    }

    private void installEchoClientOnBrokerC() throws Exception {
        JmsMultipleBrokersTestSupport.BrokerItem brokerC = (JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get(BROKER_C);
        Connection conn = brokerC.createConnection();
        conn.start();
        final Session sess = conn.createSession(false, 1);
        Queue destination = sess.createQueue(ECHO_QUEUE_NAME);
        MessageConsumer consumer = sess.createConsumer((Destination)destination);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage)message;
                try {
                    Destination replyTo = message.getJMSReplyTo();
                    MessageProducer producer = sess.createProducer(replyTo);
                    TextMessage response = sess.createTextMessage(textMessage.getText());
                    LOG.info("Replying to this request: " + textMessage.getText());
                    producer.send((Message)response);
                    producer.close();
                }
                catch (JMSException e) {
                    e.printStackTrace();
                    TestCase.fail((String)"Could not respond to an echo request");
                }
            }
        });
    }

    @Override
    protected void setUp() throws Exception {
        super.setUp();
        this.createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
        this.createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
        this.createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false"));
        this.bridgeBrokers(BROKER_A, BROKER_B, false, 3);
        this.bridgeBrokers(BROKER_B, BROKER_C, false, 3);
        this.startAllBrokers();
        this.installEchoClientOnBrokerC();
    }

    protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName, boolean dynamicOnly, int networkTTL) throws Exception {
        NetworkConnector connector = super.bridgeBrokers(localBrokerName, remoteBrokerName, dynamicOnly, networkTTL, true);
        connector.setBridgeTempDestinations(this.enableTempDestinationBridging);
        connector.setDuplex(true);
        return connector;
    }
}

