/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.network.jms;

import java.util.ArrayList;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.jms.InboundTopicBridge;
import org.apache.activemq.network.jms.OutboundTopicBridge;
import org.apache.activemq.network.jms.SimpleJmsTopicConnector;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TopicBridgeStandaloneReconnectTest {
    private SimpleJmsTopicConnector jmsTopicConnector;
    private BrokerService localBroker;
    private BrokerService foreignBroker;
    private ActiveMQConnectionFactory localConnectionFactory;
    private ActiveMQConnectionFactory foreignConnectionFactory;
    private Destination outbound;
    private Destination inbound;
    private final ArrayList<Connection> connections = new ArrayList();

    @Test
    public void testSendAndReceiveOverConnectedBridges() throws Exception {
        this.startLocalBroker();
        this.startForeignBroker();
        this.jmsTopicConnector.start();
        final MessageConsumer local = this.createConsumerForLocalBroker();
        final MessageConsumer foreign = this.createConsumerForForeignBroker();
        this.sendMessageToForeignBroker("to.foreign.broker");
        this.sendMessageToLocalBroker("to.local.broker");
        Assert.assertTrue((String)"Should have received a Message.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                Message message = local.receive(100L);
                return message != null && ((TextMessage)message).getText().equals("to.local.broker");
            }
        }));
        Assert.assertTrue((String)"Should have received a Message.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                Message message = foreign.receive(100L);
                return message != null && ((TextMessage)message).getText().equals("to.foreign.broker");
            }
        }));
    }

    @Test
    public void testSendAndReceiveOverBridgeWhenStartedBeforeBrokers() throws Exception {
        this.jmsTopicConnector.start();
        this.startLocalBroker();
        this.startForeignBroker();
        Assert.assertTrue((String)"Should have Connected.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return TopicBridgeStandaloneReconnectTest.this.jmsTopicConnector.isConnected();
            }
        }));
        final MessageConsumer local = this.createConsumerForLocalBroker();
        final MessageConsumer foreign = this.createConsumerForForeignBroker();
        this.sendMessageToForeignBroker("to.foreign.broker");
        this.sendMessageToLocalBroker("to.local.broker");
        Assert.assertTrue((String)"Should have received a Message.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                Message message = local.receive(100L);
                return message != null && ((TextMessage)message).getText().equals("to.local.broker");
            }
        }));
        Assert.assertTrue((String)"Should have received a Message.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                Message message = foreign.receive(100L);
                return message != null && ((TextMessage)message).getText().equals("to.foreign.broker");
            }
        }));
    }

    @Test
    public void testSendAndReceiveOverBridgeWithRestart() throws Exception {
        this.startLocalBroker();
        this.startForeignBroker();
        this.jmsTopicConnector.start();
        Assert.assertTrue((String)"Should have Connected.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return TopicBridgeStandaloneReconnectTest.this.jmsTopicConnector.isConnected();
            }
        }));
        this.stopLocalBroker();
        this.stopForeignBroker();
        Assert.assertTrue((String)"Should have detected connection drop.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return !TopicBridgeStandaloneReconnectTest.this.jmsTopicConnector.isConnected();
            }
        }));
        this.startLocalBroker();
        this.startForeignBroker();
        Assert.assertTrue((String)"Should have Re-Connected.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return TopicBridgeStandaloneReconnectTest.this.jmsTopicConnector.isConnected();
            }
        }));
        final MessageConsumer local = this.createConsumerForLocalBroker();
        final MessageConsumer foreign = this.createConsumerForForeignBroker();
        this.sendMessageToForeignBroker("to.foreign.broker");
        this.sendMessageToLocalBroker("to.local.broker");
        Assert.assertTrue((String)"Should have received a Message.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                Message message = local.receive(100L);
                return message != null && ((TextMessage)message).getText().equals("to.local.broker");
            }
        }));
        Assert.assertTrue((String)"Should have received a Message.", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                Message message = foreign.receive(100L);
                return message != null && ((TextMessage)message).getText().equals("to.foreign.broker");
            }
        }));
    }

    @Before
    public void setUp() throws Exception {
        this.localConnectionFactory = this.createLocalConnectionFactory();
        this.foreignConnectionFactory = this.createForeignConnectionFactory();
        this.outbound = new ActiveMQTopic("RECONNECT.TEST.OUT.TOPIC");
        this.inbound = new ActiveMQTopic("RECONNECT.TEST.IN.TOPIC");
        this.jmsTopicConnector = new SimpleJmsTopicConnector();
        this.jmsTopicConnector.setOutboundTopicBridges(new OutboundTopicBridge[]{new OutboundTopicBridge("RECONNECT.TEST.OUT.TOPIC")});
        this.jmsTopicConnector.setInboundTopicBridges(new InboundTopicBridge[]{new InboundTopicBridge("RECONNECT.TEST.IN.TOPIC")});
        this.jmsTopicConnector.setOutboundTopicConnectionFactory((TopicConnectionFactory)new ActiveMQConnectionFactory("tcp://localhost:61617"));
        this.jmsTopicConnector.setLocalTopicConnectionFactory((TopicConnectionFactory)new ActiveMQConnectionFactory("tcp://localhost:61616"));
    }

    @After
    public void tearDown() throws Exception {
        this.disposeConsumerConnections();
        try {
            this.jmsTopicConnector.stop();
            this.jmsTopicConnector = null;
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            this.stopLocalBroker();
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        try {
            this.stopForeignBroker();
        }
        catch (Throwable throwable) {
            // empty catch block
        }
    }

    protected void disposeConsumerConnections() {
        for (Connection connection : this.connections) {
            try {
                connection.close();
            }
            catch (Throwable throwable) {}
        }
    }

    protected void startLocalBroker() throws Exception {
        if (this.localBroker == null) {
            this.localBroker = this.createFirstBroker();
            this.localBroker.start();
            this.localBroker.waitUntilStarted();
        }
    }

    protected void stopLocalBroker() throws Exception {
        if (this.localBroker != null) {
            this.localBroker.stop();
            this.localBroker.waitUntilStopped();
            this.localBroker = null;
        }
    }

    protected void startForeignBroker() throws Exception {
        if (this.foreignBroker == null) {
            this.foreignBroker = this.createSecondBroker();
            this.foreignBroker.start();
            this.foreignBroker.waitUntilStarted();
        }
    }

    protected void stopForeignBroker() throws Exception {
        if (this.foreignBroker != null) {
            this.foreignBroker.stop();
            this.foreignBroker.waitUntilStopped();
            this.foreignBroker = null;
        }
    }

    protected BrokerService createFirstBroker() throws Exception {
        BrokerService broker = new BrokerService();
        broker.setBrokerName("broker1");
        broker.setPersistent(false);
        broker.setUseJmx(false);
        broker.addConnector("tcp://localhost:61616");
        return broker;
    }

    protected BrokerService createSecondBroker() throws Exception {
        BrokerService broker = new BrokerService();
        broker.setBrokerName("broker2");
        broker.setPersistent(false);
        broker.setUseJmx(false);
        broker.addConnector("tcp://localhost:61617");
        return broker;
    }

    protected ActiveMQConnectionFactory createLocalConnectionFactory() {
        return new ActiveMQConnectionFactory("tcp://localhost:61616");
    }

    protected ActiveMQConnectionFactory createForeignConnectionFactory() {
        return new ActiveMQConnectionFactory("tcp://localhost:61617");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void sendMessageToForeignBroker(String text) throws JMSException {
        Connection connection = null;
        try {
            connection = this.localConnectionFactory.createConnection();
            Session session = connection.createSession(false, 1);
            MessageProducer producer = session.createProducer(this.outbound);
            TextMessage message = session.createTextMessage();
            message.setText(text);
            producer.send((Message)message);
        }
        finally {
            try {
                connection.close();
            }
            catch (Throwable throwable) {}
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void sendMessageToLocalBroker(String text) throws JMSException {
        Connection connection = null;
        try {
            connection = this.foreignConnectionFactory.createConnection();
            Session session = connection.createSession(false, 1);
            MessageProducer producer = session.createProducer(this.inbound);
            TextMessage message = session.createTextMessage();
            message.setText(text);
            producer.send((Message)message);
        }
        finally {
            try {
                connection.close();
            }
            catch (Throwable throwable) {}
        }
    }

    protected MessageConsumer createConsumerForLocalBroker() throws JMSException {
        Connection connection = this.localConnectionFactory.createConnection();
        this.connections.add(connection);
        connection.start();
        Session session = connection.createSession(false, 1);
        return session.createConsumer(this.inbound);
    }

    protected MessageConsumer createConsumerForForeignBroker() throws JMSException {
        Connection connection = this.foreignConnectionFactory.createConnection();
        this.connections.add(connection);
        connection.start();
        Session session = connection.createSession(false, 1);
        return session.createConsumer(this.outbound);
    }
}

