package org.apache.activemq.artemis.tests.integration.mqtt.imported;

import java.lang.reflect.Field;
import java.net.ProtocolException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
import org.apache.activemq.artemis.tests.integration.persistence.XmlImportExportTest;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.MQTTException;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;
import org.fusesource.mqtt.codec.PUBLISH;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.class */
public class MQTTTest extends MQTTTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
    private static final int NUM_MESSAGES = 250;

    @Override // org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport
    @Before
    public void setUp() throws Exception {
        Field declaredField = MQTTSession.class.getDeclaredField("SESSIONS");
        declaredField.setAccessible(true);
        declaredField.set(null, new ConcurrentHashMap());
        Field declaredField2 = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS");
        declaredField2.setAccessible(true);
        declaredField2.set(null, new ConcurrentHashSet());
        super.setUp();
    }

    @Test(timeout = 60000)
    public void testSendAndReceiveMQTT() throws Exception {
        final MQTTClientProvider mQTTClientProvider = getMQTTClientProvider();
        initializeConnection(mQTTClientProvider);
        mQTTClientProvider.subscribe("foo/bah", 0);
        final CountDownLatch countDownLatch = new CountDownLatch(NUM_MESSAGES);
        new Thread(new Runnable() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.1
            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < MQTTTest.NUM_MESSAGES; i++) {
                    try {
                        Assert.assertNotNull("Should get a message", mQTTClientProvider.receive(10000));
                        countDownLatch.countDown();
                    } catch (Exception e) {
                        e.printStackTrace();
                        return;
                    }
                }
            }
        }).start();
        MQTTClientProvider mQTTClientProvider2 = getMQTTClientProvider();
        initializeConnection(mQTTClientProvider2);
        for (int i = 0; i < NUM_MESSAGES; i++) {
            mQTTClientProvider2.publish("foo/bah", ("Message " + i).getBytes(), 1);
        }
        countDownLatch.await(10L, TimeUnit.SECONDS);
        assertEquals(0L, countDownLatch.getCount());
        mQTTClientProvider.disconnect();
        mQTTClientProvider2.disconnect();
    }

    @Test(timeout = 60000)
    public void testUnsubscribeMQTT() throws Exception {
        final MQTTClientProvider mQTTClientProvider = getMQTTClientProvider();
        initializeConnection(mQTTClientProvider);
        mQTTClientProvider.subscribe("foo/bah", 0);
        final CountDownLatch countDownLatch = new CountDownLatch(125);
        new Thread(new Runnable() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.2
            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < MQTTTest.NUM_MESSAGES; i++) {
                    try {
                        Assert.assertNotNull("Should get a message", mQTTClientProvider.receive(10000));
                        countDownLatch.countDown();
                    } catch (Exception e) {
                        e.printStackTrace();
                        return;
                    }
                }
            }
        }).start();
        MQTTClientProvider mQTTClientProvider2 = getMQTTClientProvider();
        initializeConnection(mQTTClientProvider2);
        for (int i = 0; i < NUM_MESSAGES; i++) {
            String str = "Message " + i;
            if (i == 125) {
                mQTTClientProvider.unsubscribe("foo/bah");
            }
            mQTTClientProvider2.publish("foo/bah", str.getBytes(), 1);
        }
        countDownLatch.await(20L, TimeUnit.SECONDS);
        assertEquals(0L, countDownLatch.getCount());
        mQTTClientProvider.disconnect();
        mQTTClientProvider2.disconnect();
    }

    @Test(timeout = 60000)
    public void testSendAtMostOnceReceiveExactlyOnce() throws Exception {
        MQTTClientProvider mQTTClientProvider = getMQTTClientProvider();
        initializeConnection(mQTTClientProvider);
        mQTTClientProvider.subscribe("foo", 2);
        for (int i = 0; i < NUM_MESSAGES; i++) {
            String str = "Test Message: " + i;
            mQTTClientProvider.publish("foo", str.getBytes(), 0);
            byte[] receive = mQTTClientProvider.receive(XmlImportExportTest.CONSUMER_TIMEOUT);
            assertNotNull("Should get a message", receive);
            assertEquals(str, new String(receive));
        }
        mQTTClientProvider.disconnect();
    }

    @Test(timeout = 120000)
    public void testSendAtLeastOnceReceiveExactlyOnce() throws Exception {
        MQTTClientProvider mQTTClientProvider = getMQTTClientProvider();
        initializeConnection(mQTTClientProvider);
        mQTTClientProvider.subscribe("foo", 2);
        for (int i = 0; i < NUM_MESSAGES; i++) {
            String str = "Test Message: " + i;
            mQTTClientProvider.publish("foo", str.getBytes(), 1);
            byte[] receive = mQTTClientProvider.receive(XmlImportExportTest.CONSUMER_TIMEOUT);
            assertNotNull("Should get a message", receive);
            assertEquals(str, new String(receive));
        }
        mQTTClientProvider.disconnect();
    }

    @Test(timeout = 120000)
    public void testSendAtLeastOnceReceiveAtMostOnce() throws Exception {
        MQTTClientProvider mQTTClientProvider = getMQTTClientProvider();
        initializeConnection(mQTTClientProvider);
        mQTTClientProvider.subscribe("foo", 0);
        for (int i = 0; i < NUM_MESSAGES; i++) {
            String str = "Test Message: " + i;
            mQTTClientProvider.publish("foo", str.getBytes(), 1);
            byte[] receive = mQTTClientProvider.receive(XmlImportExportTest.CONSUMER_TIMEOUT);
            assertNotNull("Should get a message", receive);
            assertEquals(str, new String(receive));
        }
        mQTTClientProvider.disconnect();
    }

    @Test(timeout = 60000)
    public void testSendAndReceiveAtMostOnce() throws Exception {
        MQTTClientProvider mQTTClientProvider = getMQTTClientProvider();
        initializeConnection(mQTTClientProvider);
        mQTTClientProvider.subscribe("foo", 0);
        for (int i = 0; i < NUM_MESSAGES; i++) {
            String str = "Test Message: " + i;
            mQTTClientProvider.publish("foo", str.getBytes(), 0);
            byte[] receive = mQTTClientProvider.receive(XmlImportExportTest.CONSUMER_TIMEOUT);
            assertNotNull("Should get a message", receive);
            assertEquals(str, new String(receive));
        }
        mQTTClientProvider.disconnect();
    }

    @Test(timeout = 120000)
    public void testSendAndReceiveAtLeastOnce() throws Exception {
        MQTTClientProvider mQTTClientProvider = getMQTTClientProvider();
        initializeConnection(mQTTClientProvider);
        mQTTClientProvider.subscribe("foo", 1);
        for (int i = 0; i < NUM_MESSAGES; i++) {
            String str = "Test Message: " + i;
            mQTTClientProvider.publish("foo", str.getBytes(), 1);
            byte[] receive = mQTTClientProvider.receive(XmlImportExportTest.CONSUMER_TIMEOUT);
            assertNotNull("Should get a message", receive);
            assertEquals(str, new String(receive));
        }
        mQTTClientProvider.disconnect();
    }

    @Test(timeout = 60000)
    public void testSendAndReceiveExactlyOnceWithInterceptors() throws Exception {
        MQTTTestSupport.MQTTIncomingInterceptor.clear();
        MQTTTestSupport.MQTTOutoingInterceptor.clear();
        MQTTClientProvider mQTTClientProvider = getMQTTClientProvider();
        initializeConnection(mQTTClientProvider);
        MQTTClientProvider mQTTClientProvider2 = getMQTTClientProvider();
        initializeConnection(mQTTClientProvider2);
        mQTTClientProvider2.subscribe("foo", 2);
        for (int i = 0; i < NUM_MESSAGES; i++) {
            String str = "Test Message: " + i;
            mQTTClientProvider.publish("foo", str.getBytes(), 2);
            byte[] receive = mQTTClientProvider2.receive(XmlImportExportTest.CONSUMER_TIMEOUT);
            assertNotNull("Should get a message + [" + i + "]", receive);
            assertEquals(str, new String(receive));
        }
        mQTTClientProvider2.disconnect();
        mQTTClientProvider.disconnect();
        assertEquals(250L, MQTTTestSupport.MQTTIncomingInterceptor.getMessageCount());
        assertEquals(250L, MQTTTestSupport.MQTTOutoingInterceptor.getMessageCount());
    }

    @Test(timeout = 600000)
    @Ignore
    public void testSendMoreThanUniqueId() throws Exception {
        MQTTClientProvider mQTTClientProvider = getMQTTClientProvider();
        initializeConnection(mQTTClientProvider);
        MQTTClientProvider mQTTClientProvider2 = getMQTTClientProvider();
        initializeConnection(mQTTClientProvider2);
        int i = 0;
        mQTTClientProvider2.subscribe("foo", 2);
        for (int i2 = 0; i2 < 65535; i2++) {
            String str = "Test Message: " + i2;
            mQTTClientProvider.publish("foo", str.getBytes(), 2);
            byte[] receive = mQTTClientProvider2.receive(XmlImportExportTest.CONSUMER_TIMEOUT);
            assertNotNull("Should get a message + [" + i2 + "]", receive);
            assertEquals(str, new String(receive));
            i++;
        }
        assertEquals(65535, i);
        mQTTClientProvider2.disconnect();
        mQTTClientProvider.disconnect();
    }

    @Test(timeout = 60000)
    public void testSendAndReceiveLargeMessages() throws Exception {
        byte[] bArr = new byte[32768];
        for (int i = 0; i < bArr.length; i++) {
            bArr[i] = 50;
        }
        MQTTClientProvider mQTTClientProvider = getMQTTClientProvider();
        initializeConnection(mQTTClientProvider);
        MQTTClientProvider mQTTClientProvider2 = getMQTTClientProvider();
        initializeConnection(mQTTClientProvider2);
        mQTTClientProvider2.subscribe("foo", 1);
        for (int i2 = 0; i2 < 10; i2++) {
            mQTTClientProvider.publish("foo", bArr, 1);
            byte[] receive = mQTTClientProvider2.receive(XmlImportExportTest.CONSUMER_TIMEOUT);
            assertNotNull("Should get a message", receive);
            assertArrayEquals(bArr, receive);
        }
        mQTTClientProvider2.disconnect();
        mQTTClientProvider.disconnect();
    }

    @Test(timeout = 60000)
    public void testSendAndReceiveRetainedMessages() throws Exception {
        MQTTClientProvider mQTTClientProvider = getMQTTClientProvider();
        initializeConnection(mQTTClientProvider);
        MQTTClientProvider mQTTClientProvider2 = getMQTTClientProvider();
        initializeConnection(mQTTClientProvider2);
        mQTTClientProvider.publish("foo", "retained".getBytes(), 1, true);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add("TEST MESSAGE:" + i);
        }
        mQTTClientProvider2.subscribe("foo", 1);
        for (int i2 = 0; i2 < 10; i2++) {
            mQTTClientProvider.publish("foo", ((String) arrayList.get(i2)).getBytes(), 1);
        }
        byte[] receive = mQTTClientProvider2.receive(XmlImportExportTest.CONSUMER_TIMEOUT);
        assertNotNull(receive);
        assertEquals("retained", new String(receive));
        for (int i3 = 0; i3 < 10; i3++) {
            byte[] receive2 = mQTTClientProvider2.receive(XmlImportExportTest.CONSUMER_TIMEOUT);
            assertNotNull(receive2);
            assertEquals(arrayList.get(i3), new String(receive2));
        }
        mQTTClientProvider2.disconnect();
        mQTTClientProvider.disconnect();
    }

    @Test(timeout = 30000)
    public void testValidZeroLengthClientId() throws Exception {
        MQTT createMQTTConnection = createMQTTConnection();
        createMQTTConnection.setClientId("");
        createMQTTConnection.setCleanSession(true);
        BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        blockingConnection.disconnect();
    }

    @Test(timeout = 120000)
    public void testMQTTPathPatterns() throws Exception {
        MQTT createMQTTConnection = createMQTTConnection();
        createMQTTConnection.setClientId("");
        createMQTTConnection.setCleanSession(true);
        BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        String[] strArr = {"TopicA", "/TopicA", "/", "TopicA/", "//"};
        for (String str : strArr) {
            blockingConnection.publish(str, ("RETAINED" + str).getBytes(), QoS.AT_LEAST_ONCE, true);
            blockingConnection.subscribe(new Topic[]{new Topic(str, QoS.AT_LEAST_ONCE)});
            Message receive = blockingConnection.receive(5L, TimeUnit.SECONDS);
            assertNotNull("No message for " + str, receive);
            assertEquals("RETAINED" + str, new String(receive.getPayload()));
            receive.ack();
            blockingConnection.publish(str, str.getBytes(), QoS.AT_LEAST_ONCE, false);
            Message receive2 = blockingConnection.receive(1000L, TimeUnit.MILLISECONDS);
            assertNotNull(receive2);
            assertEquals(str, new String(receive2.getPayload()));
            receive2.ack();
            blockingConnection.unsubscribe(new String[]{str});
        }
        blockingConnection.disconnect();
        for (String str2 : new String[]{"#", "+", "+/#", "/+", "+/", "+/+", "+/+/", "+/+/+"}) {
            Pattern compile = Pattern.compile(str2.replaceAll("/?#", "(/?.*)*").replaceAll("\\+", "[^/]*"));
            BlockingConnection blockingConnection2 = createMQTTConnection.blockingConnection();
            blockingConnection2.connect();
            assertNotEquals("Subscribe failed " + str2, -128L, blockingConnection2.subscribe(new Topic[]{new Topic(str2, QoS.AT_LEAST_ONCE)})[0]);
            Message receive3 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            do {
                assertNotNull("RETAINED null " + str2, receive3);
                String str3 = new String(receive3.getPayload());
                assertTrue("RETAINED prefix " + str2 + " msg " + str3, str3.startsWith("RETAINED"));
                assertTrue("RETAINED matching " + str2 + " " + receive3.getTopic(), compile.matcher(receive3.getTopic()).matches());
                receive3.ack();
                receive3 = blockingConnection2.receive(5000L, TimeUnit.MILLISECONDS);
            } while (receive3 != null);
            for (String str4 : strArr) {
                blockingConnection2.publish(str4, str4.getBytes(), QoS.AT_LEAST_ONCE, false);
            }
            Message receive4 = blockingConnection2.receive(1000L, TimeUnit.MILLISECONDS);
            do {
                assertNotNull("Non-retained Null " + str2, receive4);
                assertTrue("Non-retained matching " + str2 + " " + receive4.getTopic(), compile.matcher(receive4.getTopic()).matches());
                receive4.ack();
                receive4 = blockingConnection2.receive(1000L, TimeUnit.MILLISECONDS);
            } while (receive4 != null);
            blockingConnection2.unsubscribe(new String[]{str2});
            blockingConnection2.disconnect();
        }
    }

    @Test(timeout = 60000)
    public void testMQTTRetainQoS() throws Exception {
        String[] strArr = {"AT_MOST_ONCE", "AT_LEAST_ONCE", "EXACTLY_ONCE"};
        for (int i = 0; i < strArr.length; i++) {
            String str = strArr[i];
            MQTT createMQTTConnection = createMQTTConnection();
            createMQTTConnection.setClientId("foo");
            createMQTTConnection.setKeepAlive((short) 2);
            final int[] iArr = {-1};
            createMQTTConnection.setTracer(new Tracer() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.3
                public void onReceive(MQTTFrame mQTTFrame) {
                    if (mQTTFrame.messageType() == 3) {
                        iArr[0] = mQTTFrame.qos().ordinal();
                    }
                }
            });
            BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
            blockingConnection.connect();
            blockingConnection.publish(str, str.getBytes(), QoS.EXACTLY_ONCE, true);
            blockingConnection.subscribe(new Topic[]{new Topic(str, QoS.valueOf(str))});
            Message receive = blockingConnection.receive(5000L, TimeUnit.MILLISECONDS);
            assertNotNull(receive);
            assertEquals(str, new String(receive.getPayload()));
            for (int i2 = 0; iArr[0] == -1 && i2 < 10; i2++) {
                Thread.sleep(1000L);
            }
            assertEquals(i, iArr[0]);
            receive.ack();
            blockingConnection.unsubscribe(new String[]{str});
            blockingConnection.disconnect();
        }
    }

    @Test(timeout = 60000)
    public void testDuplicateSubscriptions() throws Exception {
        MQTT createMQTTConnection = createMQTTConnection();
        createMQTTConnection.setClientId("foo");
        createMQTTConnection.setKeepAlive((short) 20);
        final int[] iArr = {-1};
        createMQTTConnection.setTracer(new Tracer() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.4
            public void onReceive(MQTTFrame mQTTFrame) {
                if (mQTTFrame.messageType() == 3) {
                    iArr[0] = mQTTFrame.qos().ordinal();
                }
            }
        });
        BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        blockingConnection.publish("TopicA", "RETAIN".getBytes(), QoS.EXACTLY_ONCE, true);
        for (QoS qoS : new QoS[]{QoS.AT_MOST_ONCE, QoS.AT_MOST_ONCE, QoS.AT_LEAST_ONCE, QoS.EXACTLY_ONCE}) {
            blockingConnection.subscribe(new Topic[]{new Topic("TopicA", qoS)});
            Message receive = blockingConnection.receive(5000L, TimeUnit.MILLISECONDS);
            assertNotNull("No message for " + qoS, receive);
            assertEquals("RETAIN", new String(receive.getPayload()));
            receive.ack();
            for (int i = 0; iArr[0] == -1 && i < 10; i++) {
                Thread.sleep(1000L);
            }
            assertEquals(qoS.ordinal(), iArr[0]);
            iArr[0] = -1;
        }
        blockingConnection.unsubscribe(new String[]{"TopicA"});
        blockingConnection.disconnect();
    }

    @Test(timeout = 120000)
    public void testRetainedMessage() throws Exception {
        MQTT createMQTTConnection = createMQTTConnection();
        createMQTTConnection.setKeepAlive((short) 60);
        for (String str : new String[]{null, "foo", "durable"}) {
            LOG.info("Testing now with Client ID: {}", str);
            createMQTTConnection.setClientId(str);
            createMQTTConnection.setCleanSession(!"durable".equals(str));
            BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
            blockingConnection.connect();
            blockingConnection.publish("TopicA", "RETAIN".getBytes(), QoS.EXACTLY_ONCE, true);
            blockingConnection.subscribe(new Topic[]{new Topic("TopicA", QoS.AT_LEAST_ONCE)});
            Message receive = blockingConnection.receive(5000L, TimeUnit.MILLISECONDS);
            assertNotNull("No retained message for " + str, receive);
            assertEquals("RETAIN", new String(receive.getPayload()));
            receive.ack();
            assertNull(blockingConnection.receive(500L, TimeUnit.MILLISECONDS));
            blockingConnection.subscribe(new Topic[]{new Topic("TopicA", QoS.AT_LEAST_ONCE)});
            Message receive2 = blockingConnection.receive(15000L, TimeUnit.MILLISECONDS);
            assertNotNull("No retained message on duplicate subscription for " + str, receive2);
            assertEquals("RETAIN", new String(receive2.getPayload()));
            receive2.ack();
            assertNull(blockingConnection.receive(500L, TimeUnit.MILLISECONDS));
            blockingConnection.unsubscribe(new String[]{"TopicA"});
            blockingConnection.publish("TopicA", "".getBytes(), QoS.AT_MOST_ONCE, true);
            blockingConnection.subscribe(new Topic[]{new Topic("TopicA", QoS.AT_LEAST_ONCE)});
            assertNull("Retained message not cleared for " + str, blockingConnection.receive(500L, TimeUnit.MILLISECONDS));
            blockingConnection.unsubscribe(new String[]{"TopicA"});
            blockingConnection.publish("TopicA", "RETAIN".getBytes(), QoS.EXACTLY_ONCE, true);
            blockingConnection.subscribe(new Topic[]{new Topic("TopicA", QoS.AT_LEAST_ONCE)});
            Message receive3 = blockingConnection.receive(5000L, TimeUnit.MILLISECONDS);
            assertNotNull("No reset retained message for " + str, receive3);
            assertEquals("RETAIN", new String(receive3.getPayload()));
            receive3.ack();
            assertNull(blockingConnection.receive(500L, TimeUnit.MILLISECONDS));
            blockingConnection.disconnect();
            BlockingConnection blockingConnection2 = createMQTTConnection.blockingConnection();
            blockingConnection2.connect();
            blockingConnection2.subscribe(new Topic[]{new Topic("TopicA", QoS.AT_LEAST_ONCE)});
            Message receive4 = blockingConnection2.receive(5000L, TimeUnit.MILLISECONDS);
            assertNotNull("No reset retained message for " + str, receive4);
            assertEquals("RETAIN", new String(receive4.getPayload()));
            receive4.ack();
            assertNull(blockingConnection2.receive(500L, TimeUnit.MILLISECONDS));
            blockingConnection2.unsubscribe(new String[]{"TopicA"});
            blockingConnection2.disconnect();
        }
    }

    @Test(timeout = 120000)
    @Ignore
    public void testRetainedMessageOnVirtualTopics() throws Exception {
        MQTT createMQTTConnection = createMQTTConnection();
        createMQTTConnection.setKeepAlive((short) 60);
        for (String str : new String[]{null, "foo", "durable"}) {
            LOG.info("Testing now with Client ID: {}", str);
            createMQTTConnection.setClientId(str);
            createMQTTConnection.setCleanSession(!"durable".equals(str));
            BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
            blockingConnection.connect();
            blockingConnection.publish("VirtualTopic/TopicA", "RETAIN".getBytes(), QoS.EXACTLY_ONCE, true);
            blockingConnection.subscribe(new Topic[]{new Topic("VirtualTopic/TopicA", QoS.AT_LEAST_ONCE)});
            Message receive = blockingConnection.receive(5000L, TimeUnit.MILLISECONDS);
            assertNotNull("No retained message for " + str, receive);
            assertEquals("RETAIN", new String(receive.getPayload()));
            receive.ack();
            assertNull(blockingConnection.receive(500L, TimeUnit.MILLISECONDS));
            blockingConnection.subscribe(new Topic[]{new Topic("VirtualTopic/TopicA", QoS.AT_LEAST_ONCE)});
            Message receive2 = blockingConnection.receive(15000L, TimeUnit.MILLISECONDS);
            assertNotNull("No retained message on duplicate subscription for " + str, receive2);
            assertEquals("RETAIN", new String(receive2.getPayload()));
            receive2.ack();
            assertNull(blockingConnection.receive(500L, TimeUnit.MILLISECONDS));
            blockingConnection.unsubscribe(new String[]{"VirtualTopic/TopicA"});
            blockingConnection.publish("VirtualTopic/TopicA", "".getBytes(), QoS.AT_MOST_ONCE, true);
            blockingConnection.subscribe(new Topic[]{new Topic("VirtualTopic/TopicA", QoS.AT_LEAST_ONCE)});
            assertNull("Retained message not cleared for " + str, blockingConnection.receive(500L, TimeUnit.MILLISECONDS));
            blockingConnection.unsubscribe(new String[]{"VirtualTopic/TopicA"});
            blockingConnection.publish("VirtualTopic/TopicA", "RETAIN".getBytes(), QoS.EXACTLY_ONCE, true);
            blockingConnection.subscribe(new Topic[]{new Topic("VirtualTopic/TopicA", QoS.AT_LEAST_ONCE)});
            Message receive3 = blockingConnection.receive(5000L, TimeUnit.MILLISECONDS);
            assertNotNull("No reset retained message for " + str, receive3);
            assertEquals("RETAIN", new String(receive3.getPayload()));
            receive3.ack();
            assertNull(blockingConnection.receive(500L, TimeUnit.MILLISECONDS));
            blockingConnection.disconnect();
            BlockingConnection blockingConnection2 = createMQTTConnection.blockingConnection();
            blockingConnection2.connect();
            blockingConnection2.subscribe(new Topic[]{new Topic("VirtualTopic/TopicA", QoS.AT_LEAST_ONCE)});
            Message receive4 = blockingConnection2.receive(5000L, TimeUnit.MILLISECONDS);
            assertNotNull("No reset retained message for " + str, receive4);
            assertEquals("RETAIN", new String(receive4.getPayload()));
            receive4.ack();
            assertNull(blockingConnection2.receive(500L, TimeUnit.MILLISECONDS));
            LOG.info("Test now unsubscribing from: {} for the last time", "VirtualTopic/TopicA");
            blockingConnection2.unsubscribe(new String[]{"VirtualTopic/TopicA"});
            blockingConnection2.disconnect();
        }
    }

    @Test(timeout = 60000)
    public void testUniqueMessageIds() throws Exception {
        int i;
        MQTT createMQTTConnection = createMQTTConnection();
        createMQTTConnection.setClientId("foo");
        createMQTTConnection.setKeepAlive((short) 2);
        createMQTTConnection.setCleanSession(true);
        final ArrayList arrayList = new ArrayList();
        createMQTTConnection.setTracer(new Tracer() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.5
            public void onReceive(MQTTFrame mQTTFrame) {
                MQTTTest.LOG.info("Client received:\n" + mQTTFrame);
                if (mQTTFrame.messageType() == 3) {
                    PUBLISH publish = new PUBLISH();
                    try {
                        publish.decode(mQTTFrame);
                    } catch (ProtocolException e) {
                        Assert.fail("Error decoding publish " + e.getMessage());
                    }
                    arrayList.add(publish);
                }
            }

            public void onSend(MQTTFrame mQTTFrame) {
                MQTTTest.LOG.info("Client sent:\n" + mQTTFrame);
            }
        });
        BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        QoS[] qoSArr = {QoS.AT_MOST_ONCE, QoS.AT_LEAST_ONCE, QoS.EXACTLY_ONCE};
        blockingConnection.publish("TopicA/", "TopicA/".getBytes(), QoS.EXACTLY_ONCE, true);
        String[] strArr = {"TopicA/", "TopicA/#", "TopicA/+"};
        for (int i2 = 0; i2 < qoSArr.length; i2++) {
            blockingConnection.subscribe(new Topic[]{new Topic(strArr[i2], qoSArr[i2])});
        }
        blockingConnection.publish("TopicA/", "TopicA/".getBytes(), QoS.EXACTLY_ONCE, false);
        int i3 = 0;
        Message receive = blockingConnection.receive(5000L, TimeUnit.MILLISECONDS);
        do {
            assertNotNull(receive);
            assertEquals("TopicA/", new String(receive.getPayload()));
            receive.ack();
            for (int i4 = 0; arrayList.size() <= i3 && i4 < 10; i4++) {
                Thread.sleep(1000L);
            }
            receive = blockingConnection.receive(5000L, TimeUnit.MILLISECONDS);
            if (receive == null) {
                break;
            }
            i = i3;
            i3++;
        } while (i < strArr.length * 2);
        assertEquals("Unexpected number of messages", strArr.length * 2, i3 + 1);
        for (int i5 = 0; i5 < arrayList.size(); i5++) {
            for (int i6 = i5 + 1; i6 < arrayList.size(); i6++) {
                PUBLISH publish = (PUBLISH) arrayList.get(i5);
                PUBLISH publish2 = (PUBLISH) arrayList.get(i6);
                boolean z = false;
                if (publish.qos() == QoS.AT_MOST_ONCE) {
                    z = true;
                    assertEquals(0L, publish.messageId());
                }
                if (publish2.qos() == QoS.AT_MOST_ONCE) {
                    z = true;
                    assertEquals(0L, publish2.messageId());
                }
                if (!z) {
                    assertNotEquals(publish.messageId(), publish2.messageId());
                }
            }
        }
        blockingConnection.unsubscribe(strArr);
        blockingConnection.disconnect();
    }

    @Test(timeout = 60000)
    public void testResendMessageId() throws Exception {
        MQTT createMQTTConnection = createMQTTConnection("resend", false);
        createMQTTConnection.setKeepAlive((short) 5);
        final ArrayList arrayList = new ArrayList();
        createMQTTConnection.setTracer(new Tracer() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.6
            public void onReceive(MQTTFrame mQTTFrame) {
                MQTTTest.LOG.info("Client received:\n" + mQTTFrame);
                if (mQTTFrame.messageType() == 3) {
                    PUBLISH publish = new PUBLISH();
                    try {
                        publish.decode(mQTTFrame);
                    } catch (ProtocolException e) {
                        Assert.fail("Error decoding publish " + e.getMessage());
                    }
                    arrayList.add(publish);
                }
            }

            public void onSend(MQTTFrame mQTTFrame) {
                MQTTTest.LOG.info("Client sent:\n" + mQTTFrame);
            }
        });
        BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        String[] strArr = {"TopicA/", "TopicA/+"};
        blockingConnection.subscribe(new Topic[]{new Topic(strArr[0], QoS.AT_LEAST_ONCE), new Topic(strArr[1], QoS.EXACTLY_ONCE)});
        blockingConnection.publish("TopicA/", "TopicA/".getBytes(), QoS.EXACTLY_ONCE, false);
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.7
            @Override // org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return arrayList.size() == 2;
            }
        }, 5000L);
        assertEquals(2L, arrayList.size());
        blockingConnection.disconnect();
        BlockingConnection blockingConnection2 = createMQTTConnection.blockingConnection();
        blockingConnection2.connect();
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.8
            @Override // org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return arrayList.size() == 4;
            }
        }, 5000L);
        assertEquals(4L, arrayList.size());
        blockingConnection2.unsubscribe(strArr);
        blockingConnection2.disconnect();
    }

    @Test(timeout = 90000)
    public void testPacketIdGeneratorNonCleanSession() throws Exception {
        Message receive;
        MQTT createMQTTConnection = createMQTTConnection("nonclean-packetid", false);
        createMQTTConnection.setKeepAlive((short) 15);
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        createMQTTConnection.setTracer(new Tracer() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.9
            public void onReceive(MQTTFrame mQTTFrame) {
                MQTTTest.LOG.info("Client received:\n" + mQTTFrame);
                if (mQTTFrame.messageType() == 3) {
                    PUBLISH publish = new PUBLISH();
                    try {
                        publish.decode(mQTTFrame);
                        MQTTTest.LOG.info("PUBLISH " + publish);
                    } catch (ProtocolException e) {
                        Assert.fail("Error decoding publish " + e.getMessage());
                    }
                    if (concurrentHashMap.get(Short.valueOf(publish.messageId())) != null) {
                        Assert.assertTrue(publish.dup());
                    }
                    concurrentHashMap.put(Short.valueOf(publish.messageId()), publish);
                }
            }

            public void onSend(MQTTFrame mQTTFrame) {
                MQTTTest.LOG.info("Client sent:\n" + mQTTFrame);
            }
        });
        BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        blockingConnection.subscribe(new Topic[]{new Topic("TopicA/", QoS.EXACTLY_ONCE)});
        for (int i = 0; i < 10; i++) {
            blockingConnection.publish("TopicA/", "TopicA/".getBytes(), QoS.EXACTLY_ONCE, false);
        }
        for (int i2 = 0; i2 < 5; i2++) {
            Message receive2 = blockingConnection.receive(1000L, TimeUnit.MILLISECONDS);
            assertNotNull(receive2);
            assertEquals("TopicA/", new String(receive2.getPayload()));
            receive2.ack();
        }
        blockingConnection.disconnect();
        BlockingConnection blockingConnection2 = createMQTTConnection.blockingConnection();
        blockingConnection2.connect();
        do {
            receive = blockingConnection2.receive(1000L, TimeUnit.MILLISECONDS);
            if (receive != null) {
                assertEquals("TopicA/", new String(receive.getPayload()));
                receive.ack();
            }
        } while (receive != null);
        short s = 1;
        while (true) {
            short s2 = s;
            if (s2 > 10) {
                blockingConnection2.unsubscribe(new String[]{"TopicA/"});
                blockingConnection2.disconnect();
                return;
            } else {
                assertNotNull("No message for id " + ((int) s2), concurrentHashMap.get(Short.valueOf(s2)));
                s = (short) (s2 + 1);
            }
        }
    }

    @Test(timeout = 90000)
    @Ignore
    public void testPacketIdGeneratorCleanSession() throws Exception {
        String[] strArr = {"", "clean-packetid", null};
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        MQTT[] mqttArr = new MQTT[strArr.length];
        for (int i = 0; i < strArr.length; i++) {
            mqttArr[i] = createMQTTConnection("", true);
            mqttArr[i].setKeepAlive((short) 15);
            mqttArr[i].setTracer(new Tracer() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.10
                public void onReceive(MQTTFrame mQTTFrame) {
                    MQTTTest.LOG.info("Client received:\n" + mQTTFrame);
                    if (mQTTFrame.messageType() == 3) {
                        PUBLISH publish = new PUBLISH();
                        try {
                            publish.decode(mQTTFrame);
                            MQTTTest.LOG.info("PUBLISH " + publish);
                        } catch (ProtocolException e) {
                            Assert.fail("Error decoding publish " + e.getMessage());
                        }
                        if (concurrentHashMap.get(Short.valueOf(publish.messageId())) != null) {
                            Assert.assertTrue(publish.dup());
                        }
                        concurrentHashMap.put(Short.valueOf(publish.messageId()), publish);
                    }
                }

                public void onSend(MQTTFrame mQTTFrame) {
                    MQTTTest.LOG.info("Client sent:\n" + mQTTFrame);
                }
            });
        }
        Random random = new Random();
        short s = 0;
        while (true) {
            short s2 = s;
            if (s2 >= 10) {
                return;
            }
            BlockingConnection blockingConnection = mqttArr[random.nextInt(strArr.length)].blockingConnection();
            blockingConnection.connect();
            blockingConnection.subscribe(new Topic[]{new Topic("TopicA/", QoS.EXACTLY_ONCE)});
            blockingConnection.publish("TopicA/", "TopicA/".getBytes(), QoS.EXACTLY_ONCE, false);
            Message receive = blockingConnection.receive(1000L, TimeUnit.MILLISECONDS);
            assertNotNull(receive);
            assertEquals("TopicA/", new String(receive.getPayload()));
            receive.ack();
            assertEquals(1L, concurrentHashMap.size());
            short s3 = (short) (s2 + 1);
            assertNotNull("No message for id " + ((int) s3), concurrentHashMap.get(Short.valueOf(s3)));
            concurrentHashMap.clear();
            blockingConnection.disconnect();
            s = (short) (s2 + 1);
        }
    }

    @Test(timeout = 60000)
    public void testClientConnectionFailure() throws Exception {
        MQTT createMQTTConnection = createMQTTConnection("reconnect", false);
        createMQTTConnection.setKeepAlive((short) 1);
        final BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.11
            @Override // org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return blockingConnection.isConnected();
            }
        });
        byte[] subscribe = blockingConnection.subscribe(new Topic[]{new Topic("TopicA", QoS.EXACTLY_ONCE)});
        assertEquals(QoS.EXACTLY_ONCE.ordinal(), subscribe[0]);
        blockingConnection.publish("TopicA", "TopicA".getBytes(), QoS.EXACTLY_ONCE, false);
        blockingConnection.kill();
        Thread.sleep(10000L);
        final BlockingConnection blockingConnection2 = createMQTTConnection.blockingConnection();
        blockingConnection2.connect();
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.12
            @Override // org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return blockingConnection2.isConnected();
            }
        });
        assertEquals(QoS.EXACTLY_ONCE.ordinal(), subscribe[0]);
        Message receive = blockingConnection2.receive(1000L, TimeUnit.MILLISECONDS);
        assertNotNull(receive);
        assertEquals("TopicA", new String(receive.getPayload()));
        receive.ack();
        blockingConnection2.disconnect();
    }

    @Test(timeout = 60000)
    public void testClientConnectionFailureSendsWillMessage() throws Exception {
        getServer().createQueue(SimpleString.toSimpleString("will"), SimpleString.toSimpleString("will"), (SimpleString) null, true, false);
        MQTT createMQTTConnection = createMQTTConnection("1", false);
        createMQTTConnection.setKeepAlive((short) 1);
        createMQTTConnection.setWillMessage("test message");
        createMQTTConnection.setWillTopic("will");
        createMQTTConnection.setWillQos(QoS.AT_LEAST_ONCE);
        BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        long currentTimeMillis = System.currentTimeMillis();
        while (blockingConnection.isConnected() && currentTimeMillis + 10000 < System.currentTimeMillis()) {
            Thread.sleep(1000L);
        }
        BlockingConnection blockingConnection2 = createMQTTConnection("2", false).blockingConnection();
        blockingConnection2.connect();
        blockingConnection2.subscribe(new Topic[]{new Topic("will", QoS.AT_LEAST_ONCE)});
        blockingConnection.kill();
        Thread.sleep(10000L);
        assertEquals("test message", new String(blockingConnection2.receive(1000L, TimeUnit.MILLISECONDS).getPayload()));
    }

    @Test(timeout = 60000)
    public void testWillMessageIsRetained() throws Exception {
        getServer().createQueue(SimpleString.toSimpleString("will"), SimpleString.toSimpleString("will"), (SimpleString) null, true, false);
        MQTT createMQTTConnection = createMQTTConnection("1", false);
        createMQTTConnection.setKeepAlive((short) 1);
        createMQTTConnection.setWillMessage("test message");
        createMQTTConnection.setWillTopic("will");
        createMQTTConnection.setWillQos(QoS.AT_LEAST_ONCE);
        createMQTTConnection.setWillRetain(true);
        final BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.13
            @Override // org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return blockingConnection.isConnected();
            }
        });
        blockingConnection.kill();
        Thread.sleep(10000L);
        BlockingConnection blockingConnection2 = createMQTTConnection("2", false).blockingConnection();
        blockingConnection2.connect();
        blockingConnection2.subscribe(new Topic[]{new Topic("will", QoS.AT_LEAST_ONCE)});
        Message receive = blockingConnection2.receive(1000L, TimeUnit.MILLISECONDS);
        assertNotNull(receive);
        receive.ack();
        assertEquals("test message", new String(receive.getPayload()));
    }

    @Test(timeout = 60000)
    public void testCleanSession() throws Exception {
        MQTT createMQTTConnection = createMQTTConnection("cleansession", false);
        BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        blockingConnection.subscribe(new Topic[]{new Topic("TopicA", QoS.EXACTLY_ONCE)});
        blockingConnection.publish("TopicA", "TopicA".getBytes(), QoS.EXACTLY_ONCE, false);
        blockingConnection.disconnect();
        BlockingConnection blockingConnection2 = createMQTTConnection.blockingConnection();
        blockingConnection2.connect();
        Message receive = blockingConnection2.receive(10000L, TimeUnit.MILLISECONDS);
        assertNotNull(receive);
        assertEquals("TopicA", new String(receive.getPayload()));
        receive.ack();
        blockingConnection2.publish("TopicA", "TopicA".getBytes(), QoS.EXACTLY_ONCE, false);
        blockingConnection2.disconnect();
        BlockingConnection blockingConnection3 = createMQTTConnection("cleansession", true).blockingConnection();
        blockingConnection3.connect();
        assertNull(blockingConnection3.receive(10000L, TimeUnit.MILLISECONDS));
        blockingConnection3.subscribe(new Topic[]{new Topic("TopicA", QoS.EXACTLY_ONCE)});
        blockingConnection3.publish("TopicA", "TopicA".getBytes(), QoS.EXACTLY_ONCE, false);
        blockingConnection3.disconnect();
        BlockingConnection blockingConnection4 = createMQTTConnection.blockingConnection();
        blockingConnection4.connect();
        assertNull(blockingConnection4.receive(1000L, TimeUnit.MILLISECONDS));
        blockingConnection4.disconnect();
    }

    @Test(timeout = 60000)
    public void testSendMQTTReceiveJMS() throws Exception {
        doTestSendMQTTReceiveJMS("foo.*", "foo/bar");
    }

    public void doTestSendMQTTReceiveJMS(String str, String str2) throws Exception {
        MQTTClientProvider mQTTClientProvider = getMQTTClientProvider();
        initializeConnection(mQTTClientProvider);
        String str3 = "jms/queue/" + str2;
        byte[] bytes = "RETAINED".getBytes();
        Connection createConnection = this.cf.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(str));
        mQTTClientProvider.publish(str3, "RETAINED".getBytes(), 1, true);
        BytesMessage receive = createConsumer.receive(5000L);
        assertNotNull("Should get retained message", receive);
        byte[] bArr = new byte[8];
        receive.readBytes(bArr);
        assertArrayEquals(bytes, bArr);
        for (int i = 0; i < NUM_MESSAGES; i++) {
            String str4 = "Test Message: " + i;
            mQTTClientProvider.publish(str3, str4.getBytes(), 1);
            BytesMessage receive2 = createConsumer.receive(5000L);
            assertNotNull("Should get a message", receive2);
            receive2.readBytes(new byte[str4.getBytes().length]);
            assertArrayEquals(bytes, bArr);
        }
        createConnection.close();
        mQTTClientProvider.disconnect();
    }

    @Test(timeout = 120000)
    public void testSendJMSReceiveMQTT() throws Exception {
        doTestSendJMSReceiveMQTT("foo.far");
    }

    public void doTestSendJMSReceiveMQTT(String str) throws Exception {
        MQTTClientProvider mQTTClientProvider = getMQTTClientProvider();
        initializeConnection(mQTTClientProvider);
        mQTTClientProvider.subscribe("jms/queue/foo/+", 0);
        Connection createConnection = this.cf.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(str));
        byte[] bArr = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
        BytesMessage createBytesMessage = createSession.createBytesMessage();
        createBytesMessage.writeBytes(bArr);
        createProducer.send(createBytesMessage);
        byte[] receive = mQTTClientProvider.receive(10000);
        assertNotNull("Should get retained message", receive);
        assertArrayEquals(bArr, receive);
        mQTTClientProvider.disconnect();
        createConnection.close();
    }

    @Test(timeout = 60000)
    public void testPingKeepsInactivityMonitorAlive() throws Exception {
        MQTT createMQTTConnection = createMQTTConnection();
        createMQTTConnection.setClientId("foo");
        createMQTTConnection.setKeepAlive((short) 2);
        final BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.14
            @Override // org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return blockingConnection.isConnected();
            }
        }));
        blockingConnection.disconnect();
    }

    @Test(timeout = 60000)
    public void testTurnOffInactivityMonitor() throws Exception {
        stopBroker();
        this.protocolConfig = "transport.useInactivityMonitor=false";
        startBroker();
        MQTT createMQTTConnection = createMQTTConnection();
        createMQTTConnection.setClientId("foo3");
        createMQTTConnection.setKeepAlive((short) 2);
        final BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.15
            @Override // org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return blockingConnection.isConnected();
            }
        }));
        blockingConnection.disconnect();
    }

    @Test(timeout = 60000)
    @Ignore
    public void testPublishDollarTopics() throws Exception {
        MQTT createMQTTConnection = createMQTTConnection();
        createMQTTConnection.setClientId("publishDollar");
        createMQTTConnection.setKeepAlive((short) 2);
        BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        blockingConnection.subscribe(new Topic[]{new Topic("$TopicA", QoS.EXACTLY_ONCE)});
        blockingConnection.publish("$TopicA", "$TopicA".getBytes(), QoS.EXACTLY_ONCE, true);
        assertNull("Publish enabled for $ Topics by default", blockingConnection.receive(10L, TimeUnit.SECONDS));
        blockingConnection.disconnect();
        stopBroker();
        this.protocolConfig = "transport.publishDollarTopics=true";
        startBroker();
        MQTT createMQTTConnection2 = createMQTTConnection();
        createMQTTConnection2.setClientId("publishDollar");
        createMQTTConnection2.setKeepAlive((short) 2);
        BlockingConnection blockingConnection2 = createMQTTConnection2.blockingConnection();
        blockingConnection2.connect();
        blockingConnection2.subscribe(new Topic[]{new Topic("$TopicA", QoS.EXACTLY_ONCE)});
        blockingConnection2.publish("$TopicA", "$TopicA".getBytes(), QoS.EXACTLY_ONCE, true);
        Message receive = blockingConnection2.receive(10L, TimeUnit.SECONDS);
        assertNotNull(receive);
        receive.ack();
        assertEquals("Message body", "$TopicA", new String(receive.getPayload()));
        blockingConnection2.disconnect();
    }

    @Test(timeout = 60000)
    @Ignore
    public void testDuplicateClientId() throws Exception {
        MQTT createMQTTConnection = createMQTTConnection("duplicateClient", false);
        createMQTTConnection.setKeepAlive((short) 2);
        final BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        blockingConnection.publish("TopicA", "TopicA".getBytes(), QoS.EXACTLY_ONCE, true);
        MQTT createMQTTConnection2 = createMQTTConnection("duplicateClient", false);
        createMQTTConnection2.setKeepAlive((short) 2);
        final BlockingConnection blockingConnection2 = createMQTTConnection2.blockingConnection();
        blockingConnection2.connect();
        assertTrue("Duplicate client disconnected", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.16
            @Override // org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return blockingConnection2.isConnected();
            }
        }));
        assertTrue("Old client still connected", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.17
            @Override // org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return !blockingConnection.isConnected();
            }
        }));
        blockingConnection2.publish("TopicA", "TopicA".getBytes(), QoS.EXACTLY_ONCE, true);
        blockingConnection2.disconnect();
        stopBroker();
        this.protocolConfig = "allowLinkStealing=false";
        startBroker();
        MQTT createMQTTConnection3 = createMQTTConnection("duplicateClient", false);
        createMQTTConnection3.setKeepAlive((short) 2);
        BlockingConnection blockingConnection3 = createMQTTConnection3.blockingConnection();
        blockingConnection3.connect();
        blockingConnection3.publish("TopicA", "TopicA".getBytes(), QoS.EXACTLY_ONCE, true);
        MQTT createMQTTConnection4 = createMQTTConnection("duplicateClient", false);
        createMQTTConnection4.setKeepAlive((short) 2);
        try {
            createMQTTConnection4.blockingConnection().connect();
            fail("Duplicate client connected");
        } catch (Exception e) {
        }
        assertTrue("Old client disconnected", blockingConnection3.isConnected());
        blockingConnection3.publish("TopicA", "TopicA".getBytes(), QoS.EXACTLY_ONCE, true);
        blockingConnection3.disconnect();
    }

    @Test(timeout = 300000)
    public void testJmsMapping() throws Exception {
        doTestJmsMapping("test.foo");
    }

    public void doTestJmsMapping(String str) throws Exception {
        Connection createConnection = this.cf.createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(str));
        createConnection.start();
        MQTT createMQTTConnection = createMQTTConnection();
        createMQTTConnection.setClientId("foo3");
        createMQTTConnection.setKeepAlive((short) 2);
        BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        for (int i = 0; i < 5; i++) {
            blockingConnection.publish("jms/queue/test/foo", "hello world".getBytes(), QoS.AT_LEAST_ONCE, false);
        }
        blockingConnection.disconnect();
        for (int i2 = 0; i2 < 5; i2++) {
            BytesMessage receive = createConsumer.receive(2000L);
            assertNotNull(receive);
            assertTrue(receive instanceof BytesMessage);
            BytesMessage bytesMessage = receive;
            byte[] bArr = new byte[(int) bytesMessage.getBodyLength()];
            bytesMessage.readBytes(bArr);
            assertEquals("hello world", new String(bArr));
        }
        createConnection.close();
    }

    @Test(timeout = 300000)
    public void testSubscribeMultipleTopics() throws Exception {
        byte[] bArr = new byte[32768];
        for (int i = 0; i < bArr.length; i++) {
            bArr[i] = 50;
        }
        MQTT createMQTTConnection = createMQTTConnection();
        createMQTTConnection.setClientId("MQTT-Client");
        createMQTTConnection.setCleanSession(false);
        BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        Topic[] topicArr = {new Topic("Topic/A", QoS.EXACTLY_ONCE), new Topic("Topic/B", QoS.EXACTLY_ONCE)};
        blockingConnection.subscribe(new Topic[]{new Topic("Topic/#", QoS.AT_LEAST_ONCE)});
        for (Topic topic : topicArr) {
            blockingConnection.publish(topic.name().toString(), bArr, QoS.AT_LEAST_ONCE, false);
        }
        int i2 = 0;
        for (int i3 = 0; i3 < topicArr.length; i3++) {
            Message receive = blockingConnection.receive();
            assertNotNull(receive);
            i2++;
            LOG.info("Received message from topic: " + receive.getTopic() + " Message content: " + new String(receive.getPayload()));
            receive.ack();
        }
        assertEquals("Should have received " + topicArr.length + " messages", topicArr.length, i2);
    }

    @Test(timeout = 60000)
    public void testReceiveMessageSentWhileOffline() throws Exception {
        byte[] bArr = new byte[32768];
        for (int i = 0; i < bArr.length; i++) {
            bArr[i] = 50;
        }
        MQTT createMQTTConnection = createMQTTConnection("MQTT-Pub-Client", true);
        MQTT createMQTTConnection2 = createMQTTConnection("MQTT-Sub-Client", false);
        BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        BlockingConnection blockingConnection2 = createMQTTConnection2.blockingConnection();
        blockingConnection2.connect();
        Topic[] topicArr = {new Topic("TopicA", QoS.EXACTLY_ONCE)};
        blockingConnection2.subscribe(topicArr);
        for (int i2 = 0; i2 < 2; i2++) {
            blockingConnection.publish(topicArr[0].name().toString(), bArr, QoS.AT_LEAST_ONCE, false);
        }
        int i3 = 0;
        for (int i4 = 0; i4 < 2; i4++) {
            Message receive = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            assertNotNull(receive);
            i3++;
            assertTrue(Arrays.equals(bArr, receive.getPayload()));
            receive.ack();
        }
        blockingConnection2.disconnect();
        for (int i5 = 0; i5 < 100; i5++) {
            for (int i6 = 0; i6 < 2; i6++) {
                blockingConnection.publish(topicArr[0].name().toString(), bArr, QoS.AT_LEAST_ONCE, false);
            }
            BlockingConnection blockingConnection3 = createMQTTConnection2.blockingConnection();
            blockingConnection3.connect();
            blockingConnection3.subscribe(topicArr);
            for (int i7 = 0; i7 < 2; i7++) {
                Message receive2 = blockingConnection3.receive(5L, TimeUnit.SECONDS);
                assertNotNull(receive2);
                i3++;
                assertTrue(Arrays.equals(bArr, receive2.getPayload()));
                receive2.ack();
            }
            blockingConnection3.disconnect();
        }
        assertEquals("Should have received " + (2 * (100 + 1)) + " messages", 2 * (100 + 1), i3);
    }

    @Test(timeout = 30000)
    public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception {
        stopBroker();
        this.protocolConfig = "transport.defaultKeepAlive=2000";
        startBroker();
        MQTT createMQTTConnection = createMQTTConnection();
        createMQTTConnection.setClientId("foo");
        createMQTTConnection.setKeepAlive((short) 0);
        final BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.18
            @Override // org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return blockingConnection.isConnected();
            }
        }));
    }

    @Test(timeout = 60000)
    public void testReuseConnection() throws Exception {
        MQTT createMQTTConnection = createMQTTConnection();
        createMQTTConnection.setClientId("Test-Client");
        BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        blockingConnection.disconnect();
        Thread.sleep(1000L);
        BlockingConnection blockingConnection2 = createMQTTConnection.blockingConnection();
        blockingConnection2.connect();
        blockingConnection2.disconnect();
        Thread.sleep(1000L);
    }

    @Test(timeout = 60000)
    public void testNoMessageReceivedAfterUnsubscribeMQTT() throws Exception {
        Topic[] topicArr = {new Topic("TopicA", QoS.EXACTLY_ONCE)};
        MQTT createMQTTConnection = createMQTTConnection("MQTTPub-Client", true);
        MQTT createMQTTConnection2 = createMQTTConnection("MQTTSub-Client", false);
        BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        BlockingConnection blockingConnection2 = createMQTTConnection2.blockingConnection();
        blockingConnection2.connect();
        blockingConnection2.subscribe(topicArr);
        blockingConnection2.disconnect();
        for (int i = 0; i < 5; i++) {
            blockingConnection.publish(topicArr[0].name().toString(), ("Message " + i).getBytes(), QoS.EXACTLY_ONCE, false);
        }
        BlockingConnection blockingConnection3 = createMQTTConnection2.blockingConnection();
        blockingConnection3.connect();
        int i2 = 0;
        for (int i3 = 0; i3 < 5; i3++) {
            Message receive = blockingConnection3.receive(5L, TimeUnit.SECONDS);
            assertNotNull("Missing message " + i3, receive);
            LOG.info("Message is " + new String(receive.getPayload()));
            i2++;
            receive.ack();
        }
        assertEquals(5L, i2);
        blockingConnection3.unsubscribe(new String[]{"TopicA"});
        for (int i4 = 0; i4 < 5; i4++) {
            blockingConnection.publish(topicArr[0].name().toString(), ("Message " + i4).getBytes(), QoS.EXACTLY_ONCE, false);
        }
        assertNull(blockingConnection3.receive(5L, TimeUnit.SECONDS));
        blockingConnection3.disconnect();
        blockingConnection.disconnect();
    }

    @Test(timeout = 60000)
    public void testMQTT311Connection() throws Exception {
        MQTT createMQTTConnection = createMQTTConnection();
        createMQTTConnection.setClientId("foo");
        createMQTTConnection.setVersion("3.1.1");
        BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        blockingConnection.disconnect();
    }

    @Test(timeout = 60000)
    public void testPingOnMQTT() throws Exception {
        stopBroker();
        this.protocolConfig = "maxInactivityDuration=-1";
        startBroker();
        MQTT createMQTTConnection = createMQTTConnection();
        createMQTTConnection.setClientId("test-mqtt");
        createMQTTConnection.setKeepAlive((short) 2);
        final BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        assertTrue("KeepAlive didn't work properly", Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.19
            @Override // org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait.Condition
            public boolean isSatisified() throws Exception {
                return blockingConnection.isConnected();
            }
        }));
        blockingConnection.disconnect();
    }

    @Test
    public void testRetainedMessagesAreCorrectlyFormedAfterRestart() throws Exception {
        Topic[] topicArr = {new Topic("testAddress", QoS.AT_LEAST_ONCE)};
        MQTT createMQTTConnection = createMQTTConnection();
        createMQTTConnection.setClientId("testMqtt");
        BlockingConnection blockingConnection = createMQTTConnection.blockingConnection();
        blockingConnection.connect();
        blockingConnection.publish("testAddress", "This is a test message".getBytes(), QoS.AT_LEAST_ONCE, true);
        getServer().stop(false);
        getServer().start();
        waitForServerToStart(getServer());
        MQTT createMQTTConnection2 = createMQTTConnection();
        createMQTTConnection2.setClientId("testMqtt2");
        BlockingConnection blockingConnection2 = createMQTTConnection2.blockingConnection();
        blockingConnection2.connect();
        blockingConnection2.subscribe(topicArr);
        assertEquals("This is a test message", new String(blockingConnection2.receive(5000L, TimeUnit.MILLISECONDS).getPayload()));
    }

    @Test
    public void testDuplicateIDReturnsError() throws Exception {
        MQTT createMQTTConnection = createMQTTConnection();
        createMQTTConnection.setClientId("clientId");
        createMQTTConnection.blockingConnection().connect();
        MQTTException mQTTException = null;
        try {
            MQTT createMQTTConnection2 = createMQTTConnection();
            createMQTTConnection2.setClientId("clientId");
            createMQTTConnection2.blockingConnection().connect();
        } catch (MQTTException e) {
            mQTTException = e;
        }
        assertTrue(mQTTException.getMessage().contains("CONNECTION_REFUSED_IDENTIFIER_REJECTED"));
    }
}
