package org.apache.activemq.network;

import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.commons.lang.ArrayUtils;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.class */
public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(MQTTNetworkOfBrokersFailoverTest.class);
    private int localBrokerMQTTPort = -1;
    private int remoteBrokerMQTTPort = -1;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.network.NetworkTestSupport, org.apache.activemq.broker.BrokerTestSupport
    public void setUp() throws Exception {
        this.useJmx = true;
        super.setUp();
        DiscoveryNetworkConnector discoveryNetworkConnector = new DiscoveryNetworkConnector(new URI("static:(" + this.connector.getConnectUri().toString() + ")"));
        discoveryNetworkConnector.setDuplex(true);
        this.remoteBroker.addNetworkConnector(discoveryNetworkConnector);
        discoveryNetworkConnector.start();
        assertFalse(this.localBrokerMQTTPort == -1);
        assertFalse(this.remoteBrokerMQTTPort == -1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.network.NetworkTestSupport, org.apache.activemq.broker.BrokerTestSupport
    public void tearDown() throws Exception {
        if (this.remoteBroker.isStarted()) {
            this.remoteBroker.stop();
            this.remoteBroker.waitUntilStopped();
        }
        if (this.broker.isStarted()) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
        super.tearDown();
    }

    @Test
    public void testNoStaleSubscriptionAcrossNetwork() throws Exception {
        CountDownLatch listenForConsumersOn = listenForConsumersOn(this.broker);
        MQTT createMQTTTcpConnection = createMQTTTcpConnection("foo", false, this.remoteBrokerMQTTPort);
        BlockingConnection blockingConnection = createMQTTTcpConnection.blockingConnection();
        blockingConnection.connect();
        blockingConnection.subscribe(new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)});
        assertTrue("No destination detected!", listenForConsumersOn.await(1L, TimeUnit.SECONDS));
        assertQueueExistsOn(this.remoteBroker, "Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
        assertQueueExistsOn(this.broker, "Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
        blockingConnection.disconnect();
        BlockingConnection blockingConnection2 = createMQTTTcpConnection("foo", false, this.localBrokerMQTTPort).blockingConnection();
        blockingConnection2.connect();
        blockingConnection2.subscribe(new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)});
        BlockingConnection blockingConnection3 = createMQTTTcpConnection.blockingConnection();
        blockingConnection3.connect();
        blockingConnection3.publish("foo/bar", "Hello, World!".getBytes(), QoS.AT_LEAST_ONCE, false);
        Message receive = blockingConnection2.receive(100L, TimeUnit.SECONDS);
        assertNotNull(receive);
        receive.ack();
        assertEquals("Hello, World!", new String(receive.getPayload()));
        assertEquals("foo/bar", receive.getTopic());
        blockingConnection3.subscribe(new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)});
        assertNull("We have duplicate messages across the cluster for a distributed topic", blockingConnection3.receive(500L, TimeUnit.MILLISECONDS));
    }

    private CountDownLatch listenForConsumersOn(BrokerService brokerService) throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final Connection createConnection = new ActiveMQConnectionFactory(brokerService.getVmConnectorURI().toASCIIString()).createConnection();
        createConnection.start();
        final Session createSession = createConnection.createSession(false, 1);
        createSession.createConsumer(createSession.createTopic("ActiveMQ.Advisory.Consumer.Queue.Consumer.foo:AT_LEAST_ONCE.VirtualTopic.foo.bar")).setMessageListener(new MessageListener() { // from class: org.apache.activemq.network.MQTTNetworkOfBrokersFailoverTest.1
            public void onMessage(javax.jms.Message message) {
                countDownLatch.countDown();
                Dispatch.getGlobalQueue().execute(new Runnable() { // from class: org.apache.activemq.network.MQTTNetworkOfBrokersFailoverTest.1.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            createSession.close();
                            createConnection.close();
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        });
        return countDownLatch;
    }

    private void assertQueueExistsOn(BrokerService brokerService, String str) throws Exception {
        ObjectName[] queues = brokerService.getAdminView().getQueues();
        assertEquals(1, queues.length);
        assertTrue(queues[0].toString().contains(str));
    }

    private void assertOneDurableSubOn(BrokerService brokerService, String str) throws Exception {
        BrokerView adminView = brokerService.getAdminView();
        ObjectName[] objectNameArr = (ObjectName[]) ArrayUtils.addAll(adminView.getDurableTopicSubscribers(), adminView.getInactiveDurableTopicSubscribers());
        assertEquals(1, objectNameArr.length);
        assertEquals(str, ((DurableSubscriptionViewMBean) brokerService.getManagementContext().newProxyInstance(objectNameArr[0], DurableSubscriptionViewMBean.class, true)).getClientId());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.network.NetworkTestSupport, org.apache.activemq.broker.BrokerTestSupport
    public BrokerService createBroker() throws Exception {
        BrokerService createBroker = super.createBroker();
        createBroker.setPersistent(true);
        createBroker.setBrokerName("local");
        createBroker.setDataDirectory("target/activemq-data");
        createBroker.setDeleteAllMessagesOnStartup(true);
        this.localBrokerMQTTPort = createBroker.addConnector(getDefaultMQTTTransportConnectorUri()).getConnectUri().getPort();
        return createBroker;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.network.NetworkTestSupport
    public BrokerService createRemoteBroker(PersistenceAdapter persistenceAdapter) throws Exception {
        BrokerService createRemoteBroker = super.createRemoteBroker(persistenceAdapter);
        createRemoteBroker.setPersistent(true);
        createRemoteBroker.setDeleteAllMessagesOnStartup(true);
        createRemoteBroker.setDataDirectory("target/activemq-data");
        this.remoteBrokerMQTTPort = createRemoteBroker.addConnector(getDefaultMQTTTransportConnectorUri()).getConnectUri().getPort();
        return createRemoteBroker;
    }

    private String getDefaultMQTTTransportConnectorUri() {
        return "mqtt://localhost:0?transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions";
    }

    private MQTT createMQTTTcpConnection(String str, boolean z, int i) throws Exception {
        MQTT mqtt = new MQTT();
        mqtt.setConnectAttemptsMax(1L);
        mqtt.setReconnectAttemptsMax(0L);
        mqtt.setTracer(createTracer());
        if (str != null) {
            mqtt.setClientId(str);
        }
        mqtt.setCleanSession(z);
        mqtt.setHost("localhost", i);
        return mqtt;
    }

    protected Tracer createTracer() {
        return new Tracer() { // from class: org.apache.activemq.network.MQTTNetworkOfBrokersFailoverTest.2
            public void onReceive(MQTTFrame mQTTFrame) {
                MQTTNetworkOfBrokersFailoverTest.LOG.info("Client Received:\n" + mQTTFrame);
            }

            public void onSend(MQTTFrame mQTTFrame) {
                MQTTNetworkOfBrokersFailoverTest.LOG.info("Client Sent:\n" + mQTTFrame);
            }

            public void debug(String str, Object... objArr) {
                MQTTNetworkOfBrokersFailoverTest.LOG.info(String.format(str, objArr));
            }
        };
    }
}
