package org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets;

import io.netty.handler.codec.mqtt.MqttMessageType;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttActionListener;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.class */
public class PublishTests extends MQTT5TestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests$TestCallback.class */
    protected class TestCallback implements MQTT5TestSupport.DefaultMqttCallback {
        private CountDownLatch latch;
        private int qosOfSubscription;

        public TestCallback(CountDownLatch countDownLatch, int i) {
            this.latch = countDownLatch;
            this.qosOfSubscription = i;
        }

        @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
        public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
            int intValue = Integer.valueOf(new String(mqttMessage.getPayload(), StandardCharsets.UTF_8)).intValue();
            PublishTests.logger.info("QoS of publish: {}; QoS of subscription: {}; QoS of receive: {}", new Object[]{Integer.valueOf(intValue), Integer.valueOf(this.qosOfSubscription), Integer.valueOf(mqttMessage.getQos())});
            if (intValue == 0) {
                Assert.assertTrue(mqttMessage.getQos() == 0);
            } else if (intValue == 1) {
                Assert.assertTrue(mqttMessage.getQos() == (this.qosOfSubscription == 0 ? 0 : 1));
            } else if (intValue == 2) {
                Assert.assertTrue(mqttMessage.getQos() == this.qosOfSubscription);
            } else {
                Assert.fail("invalid qos");
            }
            this.latch.countDown();
        }
    }

    public PublishTests(String str) {
        super(str);
    }

    @Test(timeout = 300000)
    public void testDupFlag() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.server.getRemotingService().addIncomingInterceptor((mqttMessage, remotingConnection) -> {
            if (atomicBoolean.get() || mqttMessage.fixedHeader().messageType() != MqttMessageType.PUBACK) {
                return true;
            }
            atomicBoolean.set(true);
            return false;
        });
        CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient("producer");
        MqttClient createPahoClient2 = createPahoClient(randomString);
        MqttConnectionOptions build = new MqttConnectionOptionsBuilder().cleanStart(false).sessionExpiryInterval(300L).build();
        createPahoClient2.connect(build);
        createPahoClient2.subscribe(topicName, 1);
        createPahoClient2.disconnect();
        assertEquals(1L, getSessionStates().size());
        assertNotNull(getSessionStates().get(randomString));
        createPahoClient.connect();
        createPahoClient.publish(topicName, "hello".getBytes(), 1, false);
        createPahoClient.disconnect();
        createPahoClient.close();
        assertEquals(1L, getSessionStates().size());
        assertNotNull(getSessionStates().get(randomString));
        createPahoClient2.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch, true));
        createPahoClient2.connect(build);
        waitForLatch(countDownLatch);
        createPahoClient2.disconnect();
        assertEquals(1L, getSessionStates().size());
        assertNotNull(getSessionStates().get(randomString));
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        createPahoClient2.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch2, false));
        createPahoClient2.connect(build);
        waitForLatch(countDownLatch2);
        createPahoClient2.disconnect();
        createPahoClient2.close();
    }

    @Test(timeout = 300000)
    public void testDupFlagQoSZero() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.1
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                Assert.assertFalse(mqttMessage.isDuplicate());
                countDownLatch.countDown();
            }
        });
        createPahoClient.connect();
        createPahoClient.subscribe(topicName, 2);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        createPahoClient2.publish(topicName, new byte[0], 0, false);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        assertTrue(countDownLatch.await(3L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Test(timeout = 300000)
    public void testDupFlagNotPropagated() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient("producer");
        MqttClient createPahoClient2 = createPahoClient(randomString);
        createPahoClient2.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.2
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                Assert.assertFalse(mqttMessage.isDuplicate());
                countDownLatch.countDown();
            }
        });
        createPahoClient2.connect();
        createPahoClient2.subscribe(topicName, 2);
        createPahoClient.connect();
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setDuplicate(true);
        mqttMessage.setPayload("hello".getBytes());
        mqttMessage.setQos(2);
        mqttMessage.setRetained(false);
        createPahoClient.publish(topicName, mqttMessage);
        createPahoClient.disconnect();
        createPahoClient.close();
        assertTrue(countDownLatch.await(3L, TimeUnit.SECONDS));
        createPahoClient2.disconnect();
        createPahoClient2.close();
    }

    @Test(timeout = 300000)
    public void testRetainFlag() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        MqttClient createPahoClient = createPahoClient("producer");
        createPahoClient.connect();
        createPahoClient.publish(topicName, "retain1".getBytes(), 2, true);
        createPahoClient.publish(topicName, "retain2".getBytes(), 2, true);
        createPahoClient.disconnect();
        createPahoClient.close();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient2 = createPahoClient(randomString);
        createPahoClient2.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.3
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                ActiveMQTestBase.assertEqualsByteArrays("retain2".getBytes(StandardCharsets.UTF_8), mqttMessage.getPayload());
                countDownLatch.countDown();
            }
        });
        createPahoClient2.connect();
        createPahoClient2.subscribe(topicName, 2);
        assertTrue(countDownLatch.await(3L, TimeUnit.SECONDS));
        createPahoClient2.disconnect();
        createPahoClient2.close();
    }

    @Test(timeout = 300000)
    public void testRetainFlagWithEmptyMessage() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        MqttClient createPahoClient = createPahoClient("producer");
        createPahoClient.connect();
        createPahoClient.publish(topicName, "retain1".getBytes(), 2, true);
        Wait.assertTrue(() -> {
            return this.server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress("$sys.mqtt.retain.", topicName, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1;
        }, 2000L, 100L);
        createPahoClient.publish(topicName, new byte[0], 2, true);
        createPahoClient.disconnect();
        createPahoClient.close();
        Wait.assertTrue(() -> {
            return this.server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress("$sys.mqtt.retain.", topicName, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 0;
        }, 2000L, 100L);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient2 = createPahoClient(randomString);
        createPahoClient2.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.4
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                countDownLatch.countDown();
            }
        });
        createPahoClient2.connect();
        createPahoClient2.subscribe(topicName, 2);
        assertFalse(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient2.disconnect();
        createPahoClient2.close();
    }

    @Test(timeout = 300000)
    public void testRetainFlagFalse() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        final String randomString2 = RandomUtil.randomString();
        String randomString3 = RandomUtil.randomString();
        MqttClient createPahoClient = createPahoClient("producer");
        createPahoClient.connect();
        createPahoClient.publish(topicName, randomString2.getBytes(), 2, true);
        Wait.assertTrue(() -> {
            return this.server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress("$sys.mqtt.retain.", topicName, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1;
        }, 1000L, 100L);
        createPahoClient.publish(topicName, randomString3.getBytes(), 2, false);
        createPahoClient.disconnect();
        createPahoClient.close();
        Wait.assertFalse(() -> {
            return this.server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress("$sys.mqtt.retain.", topicName, MQTTUtil.MQTT_WILDCARD)).getMessageCount() > 1;
        }, 1000L, 100L);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient2 = createPahoClient(randomString);
        createPahoClient2.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.5
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                ActiveMQTestBase.assertEqualsByteArrays(randomString2.getBytes(StandardCharsets.UTF_8), mqttMessage.getPayload());
                countDownLatch.countDown();
            }
        });
        createPahoClient2.connect();
        createPahoClient2.subscribe(topicName, 2);
        assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient2.disconnect();
        createPahoClient2.close();
    }

    @Test(timeout = 300000)
    public void testRetainHandlingZeroWithOneSubscription() throws Exception {
        internalTestRetainHandlingZero(false, 1);
    }

    @Test(timeout = 300000)
    public void testRetainHandlingZeroWithMultipleSubscriptions() throws Exception {
        internalTestRetainHandlingZero(false, 25);
    }

    @Test(timeout = 300000)
    public void testRetainHandlingZeroWithTopicFilterSubscription() throws Exception {
        internalTestRetainHandlingZero(true, 25);
    }

    public void internalTestRetainHandlingZero(boolean z, final int i) throws Exception {
        String randomString = RandomUtil.randomString();
        String[] strArr = new String[i];
        for (int i2 = 0; i2 < i; i2++) {
            strArr[i2] = "myTopic/" + getTopicName() + i2;
        }
        final String[] strArr2 = new String[i];
        for (int i3 = 0; i3 < i; i3++) {
            strArr2[i3] = RandomUtil.randomString();
        }
        MqttClient createPahoClient = createPahoClient("producer");
        createPahoClient.connect();
        for (int i4 = 0; i4 < i; i4++) {
            String str = strArr[i4];
            createPahoClient.publish(str, strArr2[i4].getBytes(), 2, true);
            Wait.assertTrue(() -> {
                return this.server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress("$sys.mqtt.retain.", str, this.server.getConfiguration().getWildcardConfiguration())).getMessageCount() == 1;
            }, 2000L, 100L);
        }
        createPahoClient.disconnect();
        createPahoClient.close();
        final CountDownLatch countDownLatch = new CountDownLatch(i);
        MqttClient createPahoClient2 = createPahoClient(randomString);
        createPahoClient2.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.6
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str2, MqttMessage mqttMessage) throws Exception {
                boolean z2 = false;
                int i5 = 0;
                while (true) {
                    if (i5 >= i) {
                        break;
                    }
                    if (new String(mqttMessage.getPayload()).equals(strArr2[i5])) {
                        z2 = true;
                        break;
                    }
                    i5++;
                }
                Assert.assertTrue(z2);
                Assert.assertTrue(mqttMessage.isRetained());
                countDownLatch.countDown();
            }
        });
        createPahoClient2.connect();
        if (z) {
            MqttSubscription mqttSubscription = new MqttSubscription("myTopic/#", 2);
            mqttSubscription.setRetainHandling(0);
            createPahoClient2.subscribe(new MqttSubscription[]{mqttSubscription});
        } else {
            MqttSubscription[] mqttSubscriptionArr = new MqttSubscription[i];
            for (int i5 = 0; i5 < i; i5++) {
                MqttSubscription mqttSubscription2 = new MqttSubscription(strArr[i5], 2);
                mqttSubscription2.setRetainHandling(0);
                mqttSubscriptionArr[i5] = mqttSubscription2;
            }
            createPahoClient2.subscribe(mqttSubscriptionArr);
        }
        assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient2.disconnect();
        createPahoClient2.close();
    }

    @Test(timeout = 300000)
    public void testRetainHandlingOne() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        MqttClient createPahoClient = createPahoClient("producer");
        createPahoClient.connect();
        createPahoClient.publish(topicName, "retained".getBytes(), 2, true);
        Wait.assertTrue(() -> {
            return this.server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress("$sys.mqtt.retain.", topicName, this.server.getConfiguration().getWildcardConfiguration())).getMessageCount() == 1;
        }, 2000L, 100L);
        createPahoClient.disconnect();
        createPahoClient.close();
        MqttConnectionOptions build = new MqttConnectionOptionsBuilder().cleanStart(false).sessionExpiryInterval(300L).build();
        MqttClient createPahoClient2 = createPahoClient(randomString);
        createPahoClient2.connect(build);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        createPahoClient2.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.7
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) {
                ActiveMQTestBase.assertEqualsByteArrays("retained".getBytes(StandardCharsets.UTF_8), mqttMessage.getPayload());
                Assert.assertTrue(mqttMessage.isRetained());
                countDownLatch.countDown();
            }
        });
        MqttSubscription mqttSubscription = new MqttSubscription(topicName, 2);
        mqttSubscription.setRetainHandling(1);
        createPahoClient2.subscribe(new MqttSubscription[]{mqttSubscription});
        assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        Wait.assertTrue(() -> {
            return getSubscriptionQueue(topicName).getMessageCount() == 0;
        }, 2000L, 100L);
        createPahoClient2.disconnect();
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        createPahoClient2.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.8
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                countDownLatch2.countDown();
            }
        });
        createPahoClient2.connect(build);
        assertFalse(countDownLatch2.await(2L, TimeUnit.SECONDS));
        createPahoClient2.disconnect();
        createPahoClient2.close();
    }

    @Test(timeout = 300000)
    public void testRetainHandlingTwo() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        MqttClient createPahoClient = createPahoClient("producer");
        createPahoClient.connect();
        createPahoClient.publish(topicName, "retained".getBytes(), 2, true);
        Wait.assertTrue(() -> {
            return this.server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress("$sys.mqtt.retain.", topicName, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1;
        }, 2000L, 100L);
        createPahoClient.disconnect();
        createPahoClient.close();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient2 = createPahoClient(randomString);
        createPahoClient2.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.9
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                countDownLatch.countDown();
            }
        });
        createPahoClient2.connect();
        MqttSubscription mqttSubscription = new MqttSubscription(topicName, 2);
        mqttSubscription.setRetainHandling(2);
        createPahoClient2.subscribe(new MqttSubscription[]{mqttSubscription});
        assertFalse(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient2.disconnect();
        createPahoClient2.close();
    }

    @Test(timeout = 300000)
    public void testRetainAsPublishedZeroOnEstablishedSubscription() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.10
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                ActiveMQTestBase.assertEqualsByteArrays("retained".getBytes(StandardCharsets.UTF_8), mqttMessage.getPayload());
                Assert.assertFalse(mqttMessage.isRetained());
                countDownLatch.countDown();
            }
        });
        createPahoClient.connect();
        MqttSubscription mqttSubscription = new MqttSubscription(topicName, 2);
        mqttSubscription.setRetainAsPublished(false);
        createPahoClient.subscribe(new MqttSubscription[]{mqttSubscription});
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        createPahoClient2.publish(topicName, "retained".getBytes(), 2, true);
        Wait.assertTrue(() -> {
            return this.server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress("$sys.mqtt.retain.", topicName, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1;
        }, 2000L, 100L);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Test(timeout = 300000)
    public void testRetainAsPublishedOneOnEstablishedSubscription() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient(randomString);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.11
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                if (atomicBoolean.getAndSet(false)) {
                    ActiveMQTestBase.assertEqualsByteArrays("retained".getBytes(StandardCharsets.UTF_8), mqttMessage.getPayload());
                    Assert.assertTrue(mqttMessage.isRetained());
                    countDownLatch.countDown();
                } else {
                    ActiveMQTestBase.assertEqualsByteArrays("unretained".getBytes(StandardCharsets.UTF_8), mqttMessage.getPayload());
                    Assert.assertFalse(mqttMessage.isRetained());
                    countDownLatch2.countDown();
                }
            }
        });
        createPahoClient.connect();
        MqttSubscription mqttSubscription = new MqttSubscription(topicName, 2);
        mqttSubscription.setRetainAsPublished(true);
        createPahoClient.subscribe(new MqttSubscription[]{mqttSubscription});
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        createPahoClient2.publish(topicName, "retained".getBytes(), 2, true);
        Wait.assertTrue(() -> {
            return this.server.locateQueue(MQTTUtil.convertMqttTopicFilterToCoreAddress("$sys.mqtt.retain.", topicName, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1;
        }, 2000L, 100L);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        MqttClient createPahoClient3 = createPahoClient("producer");
        createPahoClient3.connect();
        createPahoClient3.publish(topicName, "unretained".getBytes(), 2, false);
        createPahoClient3.disconnect();
        createPahoClient3.close();
        assertTrue(countDownLatch2.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Test(timeout = 300000)
    public void testTopicFilter() throws Exception {
        String str = "myTopic/" + RandomUtil.randomString();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient("consumer");
        createPahoClient.connect(new MqttConnectionOptionsBuilder().cleanStart(false).sessionExpiryInterval(300L).build());
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.12
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str2, MqttMessage mqttMessage) throws Exception {
                System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
                System.out.println(str2 + ", myTopic/");
                Assert.assertTrue(str2.startsWith("myTopic/"));
                countDownLatch.countDown();
            }
        });
        createPahoClient.subscribe("myTopic/#", 1);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        createPahoClient2.publish(str, "hello".getBytes(), 1, false);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Test(timeout = 300000)
    public void testX() throws Exception {
        String str = RandomUtil.randomString();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient("consumer");
        createPahoClient.connect(new MqttConnectionOptionsBuilder().cleanStart(false).sessionExpiryInterval(300L).build());
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.13
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str2, MqttMessage mqttMessage) throws Exception {
                System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
                System.out.println(str2 + ", ");
                Assert.assertTrue(str2.startsWith(""));
                countDownLatch.countDown();
            }
        });
        createPahoClient.subscribe(str, 1);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        createPahoClient2.publish(str, "hello".getBytes(), 1, false);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Test(timeout = 300000)
    public void testPayloadFormatIndicatorTrue() throws Exception {
        internalTestPayloadFormatIndicator(true);
    }

    @Test(timeout = 300000)
    public void testPayloadFormatIndicatorFalse() throws Exception {
        internalTestPayloadFormatIndicator(false);
    }

    private void internalTestPayloadFormatIndicator(final boolean z) throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.connect(new MqttConnectionOptionsBuilder().cleanStart(false).sessionExpiryInterval(300L).build());
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.14
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                if (z) {
                    Assert.assertTrue(mqttMessage.getProperties().getPayloadFormat());
                } else {
                    Assert.assertFalse(mqttMessage.getProperties().getPayloadFormat());
                }
                countDownLatch.countDown();
            }
        });
        createPahoClient.subscribe(topicName, 2);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        MqttMessage mqttMessage = new MqttMessage();
        MqttProperties mqttProperties = new MqttProperties();
        mqttProperties.setPayloadFormat(z);
        mqttMessage.setProperties(mqttProperties);
        mqttMessage.setQos(2);
        mqttMessage.setPayload("foo".getBytes(StandardCharsets.UTF_8));
        createPahoClient2.publish(topicName, mqttMessage);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Test(timeout = 300000)
    public void testMessageExpiryIntervalElapsed() throws Exception {
        this.server.createQueue(new QueueConfiguration(EXPIRY_ADDRESS).setRoutingType(RoutingType.ANYCAST));
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient(randomString);
        MqttConnectionOptions build = new MqttConnectionOptionsBuilder().cleanStart(false).sessionExpiryInterval(300L).build();
        createPahoClient.connect(build);
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.15
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                countDownLatch.countDown();
            }
        });
        createPahoClient.subscribe(topicName, 2);
        createPahoClient.disconnect();
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        MqttMessage mqttMessage = new MqttMessage();
        MqttProperties mqttProperties = new MqttProperties();
        mqttProperties.setMessageExpiryInterval(2L);
        mqttMessage.setProperties(mqttProperties);
        mqttMessage.setQos(2);
        mqttMessage.setPayload("foo".getBytes(StandardCharsets.UTF_8));
        createPahoClient2.publish(topicName, mqttMessage);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        Wait.assertEquals(1L, () -> {
            return getSubscriptionQueue(topicName).getMessageCount();
        }, 1000L, 100L);
        Wait.assertEquals(1L, () -> {
            return this.server.locateQueue("EXPIRY").getMessageCount();
        }, 3000L, 100L);
        Wait.assertEquals(0L, () -> {
            return getSubscriptionQueue(topicName).getMessageCount();
        }, 1000L, 100L);
        createPahoClient.connect(build);
        assertFalse(countDownLatch.await(1L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
    }

    @Test(timeout = 300000)
    public void testMessageExpiryIntervalReturnValue() throws Exception {
        this.server.createQueue(new QueueConfiguration(EXPIRY_ADDRESS).setRoutingType(RoutingType.ANYCAST));
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient(randomString);
        MqttConnectionOptions build = new MqttConnectionOptionsBuilder().cleanStart(false).sessionExpiryInterval(300L).build();
        createPahoClient.connect(build);
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.16
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                long longValue = mqttMessage.getProperties().getMessageExpiryInterval().longValue();
                System.out.println(longValue);
                Assert.assertTrue(longValue <= 6);
                Assert.assertTrue(longValue > 0);
                countDownLatch.countDown();
            }
        });
        createPahoClient.subscribe(topicName, 2);
        createPahoClient.disconnect();
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        MqttMessage mqttMessage = new MqttMessage();
        MqttProperties mqttProperties = new MqttProperties();
        mqttProperties.setMessageExpiryInterval(5L);
        mqttMessage.setProperties(mqttProperties);
        mqttMessage.setQos(2);
        mqttMessage.setPayload("foo".getBytes(StandardCharsets.UTF_8));
        createPahoClient2.publish(topicName, mqttMessage);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        Wait.assertEquals(1L, () -> {
            return getSubscriptionQueue(topicName).getMessageCount();
        }, 500L, 100L);
        Thread.sleep(1000L);
        createPahoClient.connect(build);
        assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
    }

    @Test(timeout = 300000)
    public void testClientTopicAliasMaxFromServer() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.connect(new MqttConnectionOptionsBuilder().topicAliasMaximum(5).build());
        final CountDownLatch countDownLatch = new CountDownLatch(25);
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.17
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                Integer topicAlias = mqttMessage.getProperties().getTopicAlias();
                if (topicAlias != null) {
                    Assert.assertTrue(topicAlias.intValue() <= 5);
                }
                countDownLatch.countDown();
            }
        });
        createPahoClient.subscribe(topicName + "/#", 2);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        for (int i = 0; i < 25; i++) {
            createPahoClient2.publish(topicName + "/" + i, ("foo" + i).getBytes(StandardCharsets.UTF_8), 2, false);
        }
        createPahoClient2.disconnect();
        createPahoClient2.close();
        assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
    }

    @Test(timeout = 300000)
    @Ignore
    public void testTopicAliasesNotCarriedForward() throws Exception {
        MqttProperties mqttProperties = new MqttProperties();
        mqttProperties.setTopicAlias(1);
        MqttClient createPahoClient = createPahoClient("producer");
        MqttConnectionOptions build = new MqttConnectionOptionsBuilder().topicAliasMaximum(2).sessionExpiryInterval(999L).cleanStart(false).build();
        createPahoClient.connect(build);
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setProperties(mqttProperties);
        createPahoClient.publish("myTopicName", mqttMessage);
        MqttMessage mqttMessage2 = new MqttMessage();
        mqttMessage2.setProperties(mqttProperties);
        createPahoClient.publish("", mqttMessage2);
        createPahoClient.disconnect();
        createPahoClient.connect(build);
        MqttMessage mqttMessage3 = new MqttMessage();
        mqttMessage3.setProperties(mqttProperties);
        try {
            createPahoClient.publish("", mqttMessage3);
            fail("Publishing should fail here due to an invalid topic alias");
        } catch (Exception e) {
        }
        assertFalse(createPahoClient.isConnected());
        createPahoClient.close();
    }

    @Test(timeout = 300000)
    public void testServerTopicAliasMax() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        setAcceptorProperty("topicAliasMaximum=5");
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.connect(new MqttConnectionOptionsBuilder().topicAliasMaximum(5).build());
        final CountDownLatch countDownLatch = new CountDownLatch(25);
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.18
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                Integer topicAlias = mqttMessage.getProperties().getTopicAlias();
                if (topicAlias != null) {
                    Assert.assertTrue(topicAlias.intValue() <= 5);
                }
                countDownLatch.countDown();
            }
        });
        createPahoClient.subscribe(topicName + "/#", 2);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        for (int i = 0; i < 25; i++) {
            createPahoClient2.publish(topicName + "/" + i, ("foo" + i).getBytes(StandardCharsets.UTF_8), 2, false);
        }
        createPahoClient2.disconnect();
        createPahoClient2.close();
        assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
    }

    @Test(timeout = 300000)
    public void testResponseTopicUnaltered() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.connect();
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.19
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                Assert.assertEquals("myResponseTopic/a", mqttMessage.getProperties().getResponseTopic());
                countDownLatch.countDown();
            }
        });
        createPahoClient.subscribe(topicName, 2);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(2);
        MqttProperties mqttProperties = new MqttProperties();
        mqttProperties.setResponseTopic("myResponseTopic/a");
        mqttMessage.setProperties(mqttProperties);
        createPahoClient2.publish(topicName, mqttMessage);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Test(timeout = 300000)
    public void testCorrelationDataUnaltered() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        final byte[] bytes = "myCorrelationData".getBytes(StandardCharsets.UTF_8);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.connect();
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.20
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                ActiveMQTestBase.assertEqualsByteArrays(bytes, mqttMessage.getProperties().getCorrelationData());
                countDownLatch.countDown();
            }
        });
        createPahoClient.subscribe(topicName, 2);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(2);
        MqttProperties mqttProperties = new MqttProperties();
        mqttProperties.setCorrelationData(bytes);
        mqttMessage.setProperties(mqttProperties);
        createPahoClient2.publish(topicName, mqttMessage);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Test(timeout = 300000)
    public void testUserProperties() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        final ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(new UserProperty(RandomUtil.randomString(), RandomUtil.randomString()));
        }
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.connect();
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.21
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                List userProperties = mqttMessage.getProperties().getUserProperties();
                for (int i2 = 0; i2 < 10; i2++) {
                    Assert.assertEquals(((UserProperty) arrayList.get(i2)).getKey(), ((UserProperty) userProperties.get(i2)).getKey());
                    Assert.assertEquals(((UserProperty) arrayList.get(i2)).getValue(), ((UserProperty) userProperties.get(i2)).getValue());
                }
                countDownLatch.countDown();
            }
        });
        createPahoClient.subscribe(topicName, 2);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(2);
        MqttProperties mqttProperties = new MqttProperties();
        mqttProperties.setUserProperties(arrayList);
        mqttMessage.setProperties(mqttProperties);
        createPahoClient2.publish(topicName, mqttMessage);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Test(timeout = 300000)
    public void testContentTypeUnaltered() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.connect();
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.22
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                Assert.assertEquals("myContentType", mqttMessage.getProperties().getContentType());
                countDownLatch.countDown();
            }
        });
        createPahoClient.subscribe(topicName, 2);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(2);
        MqttProperties mqttProperties = new MqttProperties();
        mqttProperties.setContentType("myContentType");
        mqttMessage.setProperties(mqttProperties);
        createPahoClient2.publish(topicName, mqttMessage);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Test(timeout = 300000)
    public void testQoS2() throws Exception {
        internalTestQoS(2);
    }

    @Test(timeout = 300000)
    public void testQoS1() throws Exception {
        internalTestQoS(1);
    }

    @Test(timeout = 300000)
    public void testQoS0() throws Exception {
        internalTestQoS(0);
    }

    private void internalTestQoS(int i) throws Exception {
        String topicName = getTopicName();
        MqttClient createPahoClient = createPahoClient("producer");
        createPahoClient.connect();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.23
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void deliveryComplete(IMqttToken iMqttToken) {
                int qoS = iMqttToken.getRequestMessage().getQoS();
                if (qoS == 0) {
                    Assert.assertEquals((Object) null, iMqttToken.getResponse());
                } else if (qoS == 1) {
                    Assert.assertEquals(4L, iMqttToken.getResponse().getType());
                } else if (qoS == 2) {
                    Assert.assertEquals(7L, iMqttToken.getResponse().getType());
                } else {
                    Assert.fail("unrecognized qos");
                }
                countDownLatch.countDown();
            }
        });
        createPahoClient.publish(topicName, new byte[0], i, false);
        assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Test(timeout = 300000)
    public void testOverlappingSubscriptionsWithDifferentQoSMaximums() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(9);
        MqttClient createPahoClient = createPahoClient(RandomUtil.randomString());
        createPahoClient.connect();
        createPahoClient.setCallback(new TestCallback(countDownLatch, 2));
        createPahoClient.subscribe("foo/a/b/#", 2);
        MqttClient createPahoClient2 = createPahoClient(RandomUtil.randomString());
        createPahoClient2.connect();
        createPahoClient2.setCallback(new TestCallback(countDownLatch, 1));
        createPahoClient2.subscribe("foo/a/#", 1);
        MqttClient createPahoClient3 = createPahoClient(RandomUtil.randomString());
        createPahoClient3.connect();
        createPahoClient3.setCallback(new TestCallback(countDownLatch, 0));
        createPahoClient3.subscribe("foo/#", 0);
        MqttClient createPahoClient4 = createPahoClient("producer");
        createPahoClient4.connect();
        for (int i = 0; i < 3; i++) {
            createPahoClient4.publish("foo/a/b/c", Integer.toString(i).getBytes(StandardCharsets.UTF_8), i, false);
        }
        createPahoClient4.disconnect();
        createPahoClient4.close();
        assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
        createPahoClient2.disconnect();
        createPahoClient2.close();
        createPahoClient3.disconnect();
        createPahoClient3.close();
    }

    @Test(timeout = 300000)
    public void testSubscriptionIdentifierMultiLevel() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(6);
        MqttAsyncClient createAsyncPahoClient = createAsyncPahoClient(RandomUtil.randomString());
        createAsyncPahoClient.connect().waitForCompletion();
        createAsyncPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.24
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                try {
                    List subscriptionIdentifiers = mqttMessage.getProperties() != null ? mqttMessage.getProperties().getSubscriptionIdentifiers() : null;
                    System.out.println("subscriptionIdentifers: " + subscriptionIdentifiers + "; message: " + mqttMessage);
                    if (Arrays.equals(mqttMessage.getPayload(), "foo/a".getBytes(StandardCharsets.UTF_8))) {
                        Assert.assertTrue(subscriptionIdentifiers.contains(3));
                        Assert.assertEquals(1L, subscriptionIdentifiers.size());
                    } else if (Arrays.equals(mqttMessage.getPayload(), "foo/a/b".getBytes(StandardCharsets.UTF_8))) {
                        Assert.assertTrue(subscriptionIdentifiers.contains(2));
                        Assert.assertTrue(subscriptionIdentifiers.contains(3));
                        Assert.assertEquals(2L, subscriptionIdentifiers.size());
                    } else if (Arrays.equals(mqttMessage.getPayload(), "foo/a/b/c".getBytes(StandardCharsets.UTF_8))) {
                        Assert.assertTrue(subscriptionIdentifiers.contains(1));
                        Assert.assertTrue(subscriptionIdentifiers.contains(2));
                        Assert.assertTrue(subscriptionIdentifiers.contains(3));
                        Assert.assertEquals(3L, subscriptionIdentifiers.size());
                    } else {
                        Assert.fail("invalid subscription identifer");
                    }
                    countDownLatch.countDown();
                } catch (Throwable th) {
                    th.printStackTrace();
                }
            }
        });
        MqttProperties mqttProperties = new MqttProperties();
        mqttProperties.setSubscriptionIdentifier(1);
        createAsyncPahoClient.subscribe(new MqttSubscription[]{new MqttSubscription("foo/a/b/#", 2)}, (Object) null, (MqttActionListener) null, mqttProperties).waitForCompletion();
        MqttProperties mqttProperties2 = new MqttProperties();
        mqttProperties2.setSubscriptionIdentifier(2);
        createAsyncPahoClient.subscribe(new MqttSubscription[]{new MqttSubscription("foo/a/#", 2)}, (Object) null, (MqttActionListener) null, mqttProperties2).waitForCompletion();
        MqttProperties mqttProperties3 = new MqttProperties();
        mqttProperties3.setSubscriptionIdentifier(3);
        createAsyncPahoClient.subscribe(new MqttSubscription[]{new MqttSubscription("foo/#", 2)}, (Object) null, (MqttActionListener) null, mqttProperties3).waitForCompletion();
        MqttClient createPahoClient = createPahoClient("producer");
        createPahoClient.connect();
        createPahoClient.publish("foo/a", "foo/a".getBytes(StandardCharsets.UTF_8), 2, false);
        createPahoClient.publish("foo/a/b", "foo/a/b".getBytes(StandardCharsets.UTF_8), 2, false);
        createPahoClient.publish("foo/a/b/c", "foo/a/b/c".getBytes(StandardCharsets.UTF_8), 2, false);
        createPahoClient.disconnect();
        createPahoClient.close();
        assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
        createAsyncPahoClient.disconnect().waitForCompletion();
        createAsyncPahoClient.close();
    }

    @Test(timeout = 300000)
    public void testSubscriptionIdentifierSingleLevel() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        MqttAsyncClient createAsyncPahoClient = createAsyncPahoClient(RandomUtil.randomString());
        createAsyncPahoClient.connect().waitForCompletion();
        createAsyncPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.25
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                try {
                    List subscriptionIdentifiers = mqttMessage.getProperties() != null ? mqttMessage.getProperties().getSubscriptionIdentifiers() : null;
                    System.out.println("subscriptionIdentifers: " + subscriptionIdentifiers + "; message: " + mqttMessage);
                    if (Arrays.equals(mqttMessage.getPayload(), "foo/a".getBytes(StandardCharsets.UTF_8))) {
                        Assert.assertTrue(subscriptionIdentifiers.contains(3));
                        Assert.assertEquals(1L, subscriptionIdentifiers.size());
                    } else if (Arrays.equals(mqttMessage.getPayload(), "foo/a/b".getBytes(StandardCharsets.UTF_8))) {
                        Assert.assertTrue(subscriptionIdentifiers.contains(2));
                        Assert.assertEquals(1L, subscriptionIdentifiers.size());
                    } else if (Arrays.equals(mqttMessage.getPayload(), "foo/a/b/c".getBytes(StandardCharsets.UTF_8))) {
                        Assert.assertTrue(subscriptionIdentifiers.contains(1));
                        Assert.assertEquals(1L, subscriptionIdentifiers.size());
                    } else {
                        Assert.fail("invalid subscription identifer");
                    }
                    countDownLatch.countDown();
                } catch (Throwable th) {
                    th.printStackTrace();
                }
            }
        });
        MqttProperties mqttProperties = new MqttProperties();
        mqttProperties.setSubscriptionIdentifier(1);
        createAsyncPahoClient.subscribe(new MqttSubscription[]{new MqttSubscription("foo/a/b/+", 2)}, (Object) null, (MqttActionListener) null, mqttProperties).waitForCompletion();
        MqttProperties mqttProperties2 = new MqttProperties();
        mqttProperties2.setSubscriptionIdentifier(2);
        createAsyncPahoClient.subscribe(new MqttSubscription[]{new MqttSubscription("foo/a/+", 2)}, (Object) null, (MqttActionListener) null, mqttProperties2).waitForCompletion();
        MqttProperties mqttProperties3 = new MqttProperties();
        mqttProperties3.setSubscriptionIdentifier(3);
        createAsyncPahoClient.subscribe(new MqttSubscription[]{new MqttSubscription("foo/+", 2)}, (Object) null, (MqttActionListener) null, mqttProperties3).waitForCompletion();
        MqttClient createPahoClient = createPahoClient("producer");
        createPahoClient.connect();
        createPahoClient.publish("foo/a", "foo/a".getBytes(StandardCharsets.UTF_8), 2, false);
        createPahoClient.publish("foo/a/b", "foo/a/b".getBytes(StandardCharsets.UTF_8), 2, false);
        createPahoClient.publish("foo/a/b/c", "foo/a/b/c".getBytes(StandardCharsets.UTF_8), 2, false);
        createPahoClient.disconnect();
        createPahoClient.close();
        assertTrue(countDownLatch.await(1L, TimeUnit.SECONDS));
        createAsyncPahoClient.disconnect().waitForCompletion();
        createAsyncPahoClient.close();
    }

    @Test(timeout = 300000)
    public void testReceiveMaximum() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MQTTInterceptor mQTTInterceptor = (mqttMessage, remotingConnection) -> {
            if (mqttMessage.fixedHeader().messageType() != MqttMessageType.PUBACK && mqttMessage.fixedHeader().messageType() != MqttMessageType.PUBREC) {
                return true;
            }
            atomicInteger.decrementAndGet();
            return true;
        };
        MQTTInterceptor mQTTInterceptor2 = (mqttMessage2, remotingConnection2) -> {
            if (mqttMessage2.fixedHeader().messageType() != MqttMessageType.PUBLISH || atomicInteger.incrementAndGet() <= 10) {
                return true;
            }
            atomicBoolean.set(true);
            return true;
        };
        this.server.getRemotingService().addIncomingInterceptor(mQTTInterceptor);
        this.server.getRemotingService().addOutgoingInterceptor(mQTTInterceptor2);
        String topicName = getTopicName();
        final CountDownLatch countDownLatch = new CountDownLatch(50);
        MqttAsyncClient createAsyncPahoClient = createAsyncPahoClient(RandomUtil.randomString());
        MqttConnectionOptions mqttConnectionOptions = new MqttConnectionOptions();
        mqttConnectionOptions.setReceiveMaximum(10);
        createAsyncPahoClient.connect(mqttConnectionOptions).waitForCompletion();
        createAsyncPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.26
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage3) throws Exception {
                Thread.sleep(250L);
                countDownLatch.countDown();
            }
        });
        createAsyncPahoClient.subscribe(topicName, 2).waitForCompletion();
        MqttClient createPahoClient = createPahoClient("producer");
        createPahoClient.connect();
        for (int i = 0; i < 50; i++) {
            createPahoClient.publish(topicName, "foo".getBytes(StandardCharsets.UTF_8), (RandomUtil.randomPositiveInt() % 2) + 1, false);
        }
        Wait.assertEquals(50L, () -> {
            return getSubscriptionQueue(topicName).getMessagesAdded();
        }, 2000L, 100L);
        createPahoClient.disconnect();
        createPahoClient.close();
        Wait.assertEquals(0L, () -> {
            return getSubscriptionQueue(topicName).getMessageCount();
        }, 15000L, 100L);
        assertTrue(countDownLatch.await(15L, TimeUnit.SECONDS));
        assertFalse(atomicBoolean.get());
        createAsyncPahoClient.disconnect();
        createAsyncPahoClient.close();
    }

    @Test(timeout = 300000)
    public void testReceiveMaximumQoS0() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        String topicName = getTopicName();
        MQTTInterceptor mQTTInterceptor = (mqttMessage, remotingConnection) -> {
            if (mqttMessage.fixedHeader().messageType() != MqttMessageType.PUBACK && mqttMessage.fixedHeader().messageType() != MqttMessageType.PUBREC) {
                return true;
            }
            atomicInteger.decrementAndGet();
            return true;
        };
        MQTTInterceptor mQTTInterceptor2 = (mqttMessage2, remotingConnection2) -> {
            if (mqttMessage2.fixedHeader().messageType() != MqttMessageType.PUBLISH || atomicInteger.incrementAndGet() <= 10) {
                return true;
            }
            atomicBoolean.set(true);
            return true;
        };
        this.server.getRemotingService().addIncomingInterceptor(mQTTInterceptor);
        this.server.getRemotingService().addOutgoingInterceptor(mQTTInterceptor2);
        final CountDownLatch countDownLatch = new CountDownLatch(50);
        MqttAsyncClient createAsyncPahoClient = createAsyncPahoClient(RandomUtil.randomString());
        MqttConnectionOptions mqttConnectionOptions = new MqttConnectionOptions();
        mqttConnectionOptions.setReceiveMaximum(10);
        createAsyncPahoClient.connect(mqttConnectionOptions).waitForCompletion();
        createAsyncPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.27
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage3) throws Exception {
                Thread.sleep(100L);
                countDownLatch.countDown();
            }
        });
        createAsyncPahoClient.subscribe(topicName, 0).waitForCompletion();
        MqttClient createPahoClient = createPahoClient("producer");
        createPahoClient.connect();
        for (int i = 0; i < 50; i++) {
            createPahoClient.publish(topicName, ("foo" + i).getBytes(StandardCharsets.UTF_8), 0, false);
        }
        Wait.assertEquals(50L, () -> {
            return getSubscriptionQueue(topicName).getMessagesAdded();
        }, 2000L, 100L);
        createPahoClient.disconnect();
        createPahoClient.close();
        Wait.assertEquals(0L, () -> {
            return getSubscriptionQueue(topicName).getMessageCount();
        }, 8000L, 100L);
        assertTrue(countDownLatch.await(8L, TimeUnit.SECONDS));
        assertTrue(atomicBoolean.get());
        createAsyncPahoClient.disconnect();
        createAsyncPahoClient.close();
    }

    @Test(timeout = 300000)
    public void testPacketDelayReceiveMaximum() throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        String topicName = getTopicName();
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        this.server.getRemotingService().addOutgoingInterceptor((mqttMessage, remotingConnection) -> {
            if (!atomicBoolean2.get() || mqttMessage.fixedHeader().messageType() != MqttMessageType.PINGRESP) {
                return true;
            }
            atomicBoolean.set(true);
            return true;
        });
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient("consumer");
        MqttConnectionOptions mqttConnectionOptions = new MqttConnectionOptions();
        mqttConnectionOptions.setReceiveMaximum(1);
        mqttConnectionOptions.setKeepAliveInterval(2);
        createPahoClient.connect(mqttConnectionOptions);
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.PublishTests.28
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage2) throws Exception {
                atomicBoolean2.set(true);
                countDownLatch.await();
            }
        });
        createPahoClient.subscribe(topicName, 2);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        for (int i = 0; i < 2; i++) {
            createPahoClient2.publish(topicName, "foo".getBytes(StandardCharsets.UTF_8), 2, false);
        }
        Wait.assertEquals(2L, () -> {
            return getSubscriptionQueue(topicName).getMessagesAdded();
        }, 2000L, 100L);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        Wait.assertTrue(() -> {
            return atomicBoolean.get();
        }, 40000L, 100L);
        countDownLatch.countDown();
        createPahoClient.disconnect();
        createPahoClient.close();
    }
}
