package org.apache.activemq.network.jms;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.perf.NetworkedSyncTest;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/network/jms/TopicOutboundBridgeReconnectTest.class */
public class TopicOutboundBridgeReconnectTest {
    private static final Logger LOG = LoggerFactory.getLogger(TopicOutboundBridgeReconnectTest.class);
    private BrokerService producerBroker;
    private BrokerService consumerBroker;
    private ActiveMQConnectionFactory producerConnectionFactory;
    private ActiveMQConnectionFactory consumerConnectionFactory;
    private Destination destination;
    private ArrayList<Connection> connections = new ArrayList<>();

    @Test
    public void testMultipleProducerBrokerRestarts() throws Exception {
        for (int i = 0; i < 10; i++) {
            testWithProducerBrokerRestart();
            disposeConsumerConnections();
        }
    }

    @Test
    public void testWithoutRestartsConsumerFirst() throws Exception {
        startConsumerBroker();
        startProducerBroker();
        MessageConsumer createConsumer = createConsumer();
        sendMessage("test123");
        sendMessage("test456");
        TextMessage receive = createConsumer.receive(2000L);
        Assert.assertNotNull(receive);
        Assert.assertEquals("test123", receive.getText());
        TextMessage receive2 = createConsumer.receive(5000L);
        Assert.assertNotNull(receive2);
        Assert.assertEquals("test456", receive2.getText());
        Assert.assertNull(createConsumer.receiveNoWait());
    }

    @Test
    public void testWithoutRestartsProducerFirst() throws Exception {
        startProducerBroker();
        sendMessage("test123");
        startConsumerBroker();
        sendMessage("test456");
        MessageConsumer createConsumer = createConsumer();
        TextMessage receive = createConsumer.receive(5000L);
        Assert.assertNotNull(receive);
        Assert.assertEquals("test123", receive.getText());
        TextMessage receive2 = createConsumer.receive(5000L);
        Assert.assertNotNull(receive2);
        Assert.assertEquals("test456", receive2.getText());
        Assert.assertNull(createConsumer.receiveNoWait());
    }

    @Test
    public void testWithProducerBrokerRestart() throws Exception {
        startProducerBroker();
        startConsumerBroker();
        MessageConsumer createConsumer = createConsumer();
        sendMessage("test123");
        TextMessage receive = createConsumer.receive(5000L);
        Assert.assertNotNull(receive);
        Assert.assertEquals("test123", receive.getText());
        Assert.assertNull(createConsumer.receiveNoWait());
        stopProducerBroker();
        startProducerBroker();
        sendMessage("test123");
        TextMessage receive2 = createConsumer.receive(5000L);
        Assert.assertNotNull(receive2);
        Assert.assertEquals("test123", receive2.getText());
        Assert.assertNull(createConsumer.receiveNoWait());
    }

    @Test
    public void testWithConsumerBrokerRestart() throws Exception {
        startProducerBroker();
        startConsumerBroker();
        MessageConsumer createConsumer = createConsumer();
        sendMessage("test123");
        TextMessage receive = createConsumer.receive(5000L);
        Assert.assertNotNull(receive);
        Assert.assertEquals("test123", receive.getText());
        Assert.assertNull(createConsumer.receiveNoWait());
        createConsumer.close();
        stopConsumerBroker();
        startConsumerBroker();
        sendMessage("test123");
        final MessageConsumer createConsumer2 = createConsumer();
        Assert.assertTrue("Expected recover and delivery failed", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.jms.TopicOutboundBridgeReconnectTest.1
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                TextMessage receiveNoWait = createConsumer2.receiveNoWait();
                return receiveNoWait != null && receiveNoWait.getText().equals("test123");
            }
        }));
        Assert.assertNull(createConsumer2.receiveNoWait());
    }

    @Test
    public void testWithConsumerBrokerStartDelay() throws Exception {
        startConsumerBroker();
        final MessageConsumer createConsumer = createConsumer();
        TimeUnit.SECONDS.sleep(5L);
        startProducerBroker();
        sendMessage("test123");
        Assert.assertTrue("Expected recover and delivery failed", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.network.jms.TopicOutboundBridgeReconnectTest.2
            @Override // org.apache.activemq.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                TextMessage receiveNoWait = createConsumer.receiveNoWait();
                return receiveNoWait != null && receiveNoWait.getText().equals("test123");
            }
        }));
        Assert.assertNull(createConsumer.receiveNoWait());
    }

    @Test
    public void testWithProducerBrokerStartDelay() throws Exception {
        startProducerBroker();
        TimeUnit.SECONDS.sleep(5L);
        startConsumerBroker();
        MessageConsumer createConsumer = createConsumer();
        sendMessage("test123");
        TextMessage receive = createConsumer.receive(2000L);
        Assert.assertNotNull(receive);
        Assert.assertEquals("test123", receive.getText());
        Assert.assertNull(createConsumer.receiveNoWait());
    }

    @Before
    public void setUp() throws Exception {
        this.producerConnectionFactory = createProducerConnectionFactory();
        this.consumerConnectionFactory = createConsumerConnectionFactory();
        this.destination = new ActiveMQTopic("RECONNECT.TEST.TOPIC");
    }

    @After
    public void tearDown() throws Exception {
        disposeConsumerConnections();
        try {
            stopProducerBroker();
        } catch (Throwable th) {
        }
        try {
            stopConsumerBroker();
        } catch (Throwable th2) {
        }
    }

    protected void disposeConsumerConnections() {
        Iterator<Connection> it = this.connections.iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Throwable th) {
            }
        }
    }

    protected void startProducerBroker() throws Exception {
        if (this.producerBroker == null) {
            this.producerBroker = createFirstBroker();
            this.producerBroker.start();
        }
    }

    protected void stopProducerBroker() throws Exception {
        if (this.producerBroker != null) {
            this.producerBroker.stop();
            this.producerBroker = null;
        }
    }

    protected void startConsumerBroker() throws Exception {
        if (this.consumerBroker == null) {
            this.consumerBroker = createSecondBroker();
            this.consumerBroker.start();
        }
    }

    protected void stopConsumerBroker() throws Exception {
        if (this.consumerBroker != null) {
            this.consumerBroker.stop();
            this.consumerBroker = null;
        }
    }

    protected BrokerService createFirstBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName("broker1");
        brokerService.setPersistent(false);
        brokerService.setUseJmx(false);
        brokerService.addConnector(NetworkedSyncTest.broker1URL);
        brokerService.addConnector("vm://broker1");
        JmsConnector jmsTopicConnector = new JmsTopicConnector();
        jmsTopicConnector.setOutboundTopicBridges(new OutboundTopicBridge[]{new OutboundTopicBridge("RECONNECT.TEST.TOPIC")});
        jmsTopicConnector.setOutboundTopicConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61617"));
        brokerService.setJmsBridgeConnectors(new JmsConnector[]{jmsTopicConnector});
        return brokerService;
    }

    protected BrokerService createSecondBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName("broker2");
        brokerService.setPersistent(false);
        brokerService.setUseJmx(false);
        brokerService.addConnector("tcp://localhost:61617");
        brokerService.addConnector("vm://broker2");
        return brokerService;
    }

    protected ActiveMQConnectionFactory createProducerConnectionFactory() {
        return new ActiveMQConnectionFactory("vm://broker1");
    }

    protected ActiveMQConnectionFactory createConsumerConnectionFactory() {
        return new ActiveMQConnectionFactory("vm://broker2");
    }

    protected void sendMessage(String str) throws JMSException {
        Connection connection = null;
        try {
            connection = this.producerConnectionFactory.createConnection();
            Session createSession = connection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(this.destination);
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText(str);
            createProducer.send(createTextMessage);
            try {
                connection.close();
            } catch (Throwable th) {
            }
        } catch (Throwable th2) {
            try {
                connection.close();
            } catch (Throwable th3) {
            }
            throw th2;
        }
    }

    protected MessageConsumer createConsumer() throws JMSException {
        Connection createConnection = this.consumerConnectionFactory.createConnection();
        this.connections.add(createConnection);
        createConnection.start();
        return createConnection.createSession(false, 1).createConsumer(this.destination);
    }
}
