package org.apache.activemq.artemis.tests.integration.mqtt5.spec;

import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPubReplyMessageVariableHeader;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt5/spec/QoSTests.class */
public class QoSTests extends MQTT5TestSupport {
    @Timeout(60)
    @Test
    public void testQoS1andDupFlag() throws Exception {
        String randomString = RandomUtil.randomString();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient("consumer");
        createPahoClient.connect();
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.QoSTests.1
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                Assertions.assertEquals(1, mqttMessage.getQos());
                Assertions.assertFalse(mqttMessage.isDuplicate());
                countDownLatch.countDown();
            }
        });
        createPahoClient.subscribe(randomString, 1);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        createPahoClient2.publish(randomString, RandomUtil.randomString().getBytes(), 1, false);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Timeout(60)
    @Test
    public void testQoS1PubAck() throws Exception {
        String randomString = RandomUtil.randomString();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicInteger atomicInteger = new AtomicInteger();
        MQTTInterceptor mQTTInterceptor = (mqttMessage, remotingConnection) -> {
            if (mqttMessage.fixedHeader().messageType() != MqttMessageType.PUBACK) {
                return true;
            }
            Assertions.assertEquals(1L, getSubscriptionQueue(randomString, "consumer").getMessageCount());
            Assertions.assertEquals(1, getSubscriptionQueue(randomString, "consumer").getDeliveringCount());
            Assertions.assertEquals(atomicInteger.get(), ((MqttPubReplyMessageVariableHeader) mqttMessage.variableHeader()).messageId());
            countDownLatch.countDown();
            return true;
        };
        MQTTInterceptor mQTTInterceptor2 = (mqttMessage2, remotingConnection2) -> {
            if (mqttMessage2.fixedHeader().messageType() != MqttMessageType.PUBLISH) {
                return true;
            }
            atomicInteger.set(((MqttPublishMessage) mqttMessage2).variableHeader().packetId());
            return true;
        };
        this.server.getRemotingService().addIncomingInterceptor(mQTTInterceptor);
        this.server.getRemotingService().addOutgoingInterceptor(mQTTInterceptor2);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient("consumer");
        createPahoClient.connect();
        createPahoClient.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch2));
        createPahoClient.subscribe(randomString, 1);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        createPahoClient2.publish(randomString, RandomUtil.randomString().getBytes(), 1, false);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        Assertions.assertTrue(countDownLatch2.await(2L, TimeUnit.SECONDS));
        Assertions.assertEquals(0L, getSubscriptionQueue(randomString, "consumer").getMessageCount());
        Assertions.assertEquals(0, getSubscriptionQueue(randomString, "consumer").getDeliveringCount());
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Timeout(60)
    @Test
    public void testQoS1PubAckId() throws Exception {
        String randomString = RandomUtil.randomString();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicInteger atomicInteger = new AtomicInteger();
        MQTTInterceptor mQTTInterceptor = (mqttMessage, remotingConnection) -> {
            if (mqttMessage.fixedHeader().messageType() != MqttMessageType.PUBLISH) {
                return true;
            }
            atomicInteger.set(((MqttPublishMessage) mqttMessage).variableHeader().packetId());
            return true;
        };
        MQTTInterceptor mQTTInterceptor2 = (mqttMessage2, remotingConnection2) -> {
            if (mqttMessage2.fixedHeader().messageType() != MqttMessageType.PUBACK) {
                return true;
            }
            Assertions.assertEquals(atomicInteger.get(), ((MqttPubAckMessage) mqttMessage2).variableHeader().messageId());
            countDownLatch.countDown();
            return true;
        };
        this.server.getRemotingService().addIncomingInterceptor(mQTTInterceptor);
        this.server.getRemotingService().addOutgoingInterceptor(mQTTInterceptor2);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient("consumer");
        createPahoClient.connect();
        createPahoClient.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch2));
        createPahoClient.subscribe(randomString, 1);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        createPahoClient2.publish(randomString, RandomUtil.randomString().getBytes(), 1, false);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        Assertions.assertTrue(countDownLatch2.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Timeout(60)
    @Test
    public void testQoS2PubRec() throws Exception {
        String randomString = RandomUtil.randomString();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicInteger atomicInteger = new AtomicInteger();
        MQTTInterceptor mQTTInterceptor = (mqttMessage, remotingConnection) -> {
            if (mqttMessage.fixedHeader().messageType() != MqttMessageType.PUBREC) {
                return true;
            }
            Assertions.assertEquals(1L, getSubscriptionQueue(randomString, "consumer").getMessageCount());
            Assertions.assertEquals(1, getSubscriptionQueue(randomString, "consumer").getDeliveringCount());
            Assertions.assertEquals(atomicInteger.get(), ((MqttPubReplyMessageVariableHeader) mqttMessage.variableHeader()).messageId());
            countDownLatch.countDown();
            return true;
        };
        MQTTInterceptor mQTTInterceptor2 = (mqttMessage2, remotingConnection2) -> {
            if (mqttMessage2.fixedHeader().messageType() != MqttMessageType.PUBLISH) {
                return true;
            }
            atomicInteger.set(((MqttPublishMessage) mqttMessage2).variableHeader().packetId());
            return true;
        };
        this.server.getRemotingService().addIncomingInterceptor(mQTTInterceptor);
        this.server.getRemotingService().addOutgoingInterceptor(mQTTInterceptor2);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient("consumer");
        createPahoClient.connect();
        createPahoClient.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch2));
        createPahoClient.subscribe(randomString, 2);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        createPahoClient2.publish(randomString, RandomUtil.randomString().getBytes(), 2, false);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        Assertions.assertTrue(countDownLatch2.await(2L, TimeUnit.SECONDS));
        Assertions.assertEquals(0L, getSubscriptionQueue(randomString, "consumer").getMessageCount());
        Assertions.assertEquals(0, getSubscriptionQueue(randomString, "consumer").getDeliveringCount());
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Timeout(60)
    @Test
    public void testQoS2PubRelId() throws Exception {
        String randomString = RandomUtil.randomString();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MQTTInterceptor mQTTInterceptor = (mqttMessage, remotingConnection) -> {
            if (mqttMessage.fixedHeader().messageType() == MqttMessageType.PUBLISH) {
                atomicInteger.set(((MqttPublishMessage) mqttMessage).variableHeader().packetId());
            }
            if (mqttMessage.fixedHeader().messageType() != MqttMessageType.PUBREC) {
                return true;
            }
            Assertions.assertEquals(atomicInteger.get(), ((MqttPubReplyMessageVariableHeader) mqttMessage.variableHeader()).messageId());
            Assertions.assertEquals((byte) 0, ((MqttPubReplyMessageVariableHeader) mqttMessage.variableHeader()).reasonCode());
            atomicBoolean.set(true);
            return true;
        };
        MQTTInterceptor mQTTInterceptor2 = (mqttMessage2, remotingConnection2) -> {
            if (mqttMessage2.fixedHeader().messageType() != MqttMessageType.PUBREL) {
                return true;
            }
            Assertions.assertTrue(atomicBoolean.get());
            Assertions.assertEquals(atomicInteger.get(), ((MqttPubAckMessage) mqttMessage2).variableHeader().messageId());
            countDownLatch.countDown();
            return true;
        };
        this.server.getRemotingService().addIncomingInterceptor(mQTTInterceptor);
        this.server.getRemotingService().addOutgoingInterceptor(mQTTInterceptor2);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient("consumer");
        createPahoClient.connect();
        createPahoClient.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch2));
        createPahoClient.subscribe(randomString, 2);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        createPahoClient2.publish(randomString, RandomUtil.randomString().getBytes(), 2, false);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        Assertions.assertTrue(countDownLatch2.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Timeout(60)
    @Test
    public void testQoS2PubRel() throws Exception {
        String randomString = RandomUtil.randomString();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicInteger atomicInteger = new AtomicInteger();
        MQTTInterceptor mQTTInterceptor = (mqttMessage, remotingConnection) -> {
            if (mqttMessage.fixedHeader().messageType() != MqttMessageType.PUBCOMP) {
                return true;
            }
            try {
                Wait.assertEquals(1L, () -> {
                    return this.server.locateQueue("$sys.mqtt.queue.qos2.consumer").getMessageCount();
                }, 2000L, 100L);
                Wait.assertEquals(1L, () -> {
                    return this.server.locateQueue("$sys.mqtt.queue.qos2.consumer").getDeliveringCount();
                }, 2000L, 100L);
                Assertions.assertEquals(atomicInteger.get(), ((MqttPubReplyMessageVariableHeader) mqttMessage.variableHeader()).messageId());
                countDownLatch.countDown();
                return true;
            } catch (Exception e) {
                return false;
            }
        };
        MQTTInterceptor mQTTInterceptor2 = (mqttMessage2, remotingConnection2) -> {
            if (mqttMessage2.fixedHeader().messageType() != MqttMessageType.PUBLISH) {
                return true;
            }
            atomicInteger.set(((MqttPublishMessage) mqttMessage2).variableHeader().packetId());
            return true;
        };
        this.server.getRemotingService().addIncomingInterceptor(mQTTInterceptor);
        this.server.getRemotingService().addOutgoingInterceptor(mQTTInterceptor2);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient("consumer");
        createPahoClient.connect();
        createPahoClient.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch2));
        createPahoClient.subscribe(randomString, 2);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        createPahoClient2.publish(randomString, RandomUtil.randomString().getBytes(), 2, false);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        Assertions.assertTrue(countDownLatch2.await(2L, TimeUnit.SECONDS));
        Assertions.assertEquals(0L, getSubscriptionQueue(randomString, "consumer").getMessageCount());
        Assertions.assertEquals(0, getSubscriptionQueue(randomString, "consumer").getDeliveringCount());
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Timeout(60)
    @Test
    public void testQoS2WithExpiration() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.server.getRemotingService().addIncomingInterceptor((mqttMessage, remotingConnection) -> {
            if (mqttMessage.fixedHeader().messageType() != MqttMessageType.PUBREC) {
                return true;
            }
            Assertions.assertEquals(1L, getSubscriptionQueue("myTopic", "consumer").getMessageCount());
            Assertions.assertEquals(1, getSubscriptionQueue("myTopic", "consumer").getDeliveringCount());
            try {
                Thread.sleep(3000L);
                Queue subscriptionQueue = getSubscriptionQueue("myTopic", "consumer");
                Objects.requireNonNull(countDownLatch2);
                subscriptionQueue.expireReferences(countDownLatch2::countDown);
                Assertions.assertTrue(countDownLatch2.await(2L, TimeUnit.SECONDS));
            } catch (InterruptedException e) {
                e.printStackTrace();
                Assertions.fail();
            }
            countDownLatch.countDown();
            return true;
        });
        final CountDownLatch countDownLatch3 = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient("consumer");
        createPahoClient.connect();
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.QoSTests.2
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage2) throws Exception {
                countDownLatch3.countDown();
            }
        });
        createPahoClient.subscribe("myTopic", 2);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        MqttMessage mqttMessage2 = new MqttMessage();
        MqttProperties mqttProperties = new MqttProperties();
        mqttProperties.setMessageExpiryInterval(2L);
        mqttMessage2.setProperties(mqttProperties);
        mqttMessage2.setQos(2);
        mqttMessage2.setPayload("foo".getBytes(StandardCharsets.UTF_8));
        createPahoClient2.publish("myTopic", mqttMessage2);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        Assertions.assertTrue(countDownLatch.await(4L, TimeUnit.SECONDS));
        Assertions.assertTrue(countDownLatch3.await(4L, TimeUnit.SECONDS));
        Wait.assertEquals(0L, () -> {
            return getSubscriptionQueue("myTopic", "consumer").getMessageCount();
        });
        Wait.assertEquals(0, () -> {
            return getSubscriptionQueue("myTopic", "consumer").getDeliveringCount();
        });
        Wait.assertEquals(0L, () -> {
            return getSubscriptionQueue("myTopic", "consumer").getMessagesExpired();
        });
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Timeout(60)
    @Test
    public void testQoS2PubRecId() throws Exception {
        String randomString = RandomUtil.randomString();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicInteger atomicInteger = new AtomicInteger();
        MQTTInterceptor mQTTInterceptor = (mqttMessage, remotingConnection) -> {
            if (mqttMessage.fixedHeader().messageType() != MqttMessageType.PUBLISH) {
                return true;
            }
            atomicInteger.set(((MqttPublishMessage) mqttMessage).variableHeader().packetId());
            return true;
        };
        MQTTInterceptor mQTTInterceptor2 = (mqttMessage2, remotingConnection2) -> {
            if (mqttMessage2.fixedHeader().messageType() != MqttMessageType.PUBREC) {
                return true;
            }
            Assertions.assertEquals(atomicInteger.get(), ((MqttPubAckMessage) mqttMessage2).variableHeader().messageId());
            countDownLatch.countDown();
            return true;
        };
        this.server.getRemotingService().addIncomingInterceptor(mQTTInterceptor);
        this.server.getRemotingService().addOutgoingInterceptor(mQTTInterceptor2);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient("consumer");
        createPahoClient.connect();
        createPahoClient.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch2));
        createPahoClient.subscribe(randomString, 2);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        createPahoClient2.publish(randomString, RandomUtil.randomString().getBytes(), 2, false);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        Assertions.assertTrue(countDownLatch2.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Timeout(60)
    @Test
    public void testQoS2DuplicatePub() throws Exception {
        String randomString = RandomUtil.randomString();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        byte[] bytes = RandomUtil.randomString().getBytes();
        MQTTInterceptor mQTTInterceptor = (mqttMessage, remotingConnection) -> {
            if (mqttMessage.fixedHeader().messageType() != MqttMessageType.PUBLISH) {
                return true;
            }
            atomicInteger.set(((MqttPublishMessage) mqttMessage).variableHeader().packetId());
            return true;
        };
        MQTTInterceptor mQTTInterceptor2 = (mqttMessage2, remotingConnection2) -> {
            if (atomicInteger2.get() != 0 || mqttMessage2.fixedHeader().messageType() != MqttMessageType.PUBREC) {
                return true;
            }
            countDownLatch.countDown();
            atomicInteger2.incrementAndGet();
            remotingConnection2.disconnect(true);
            return false;
        };
        this.server.getRemotingService().addIncomingInterceptor(mQTTInterceptor);
        this.server.getRemotingService().addOutgoingInterceptor(mQTTInterceptor2);
        Queue createQueue = this.server.createQueue(QueueConfiguration.of(randomString));
        MqttClient createPahoClient = createPahoClient("producer");
        MqttConnectionOptions mqttConnectionOptions = new MqttConnectionOptions();
        mqttConnectionOptions.setCleanStart(false);
        mqttConnectionOptions.setSessionExpiryInterval(999L);
        createPahoClient.connect(mqttConnectionOptions);
        try {
            createPahoClient.publish(randomString, bytes, 2, false);
        } catch (Exception e) {
        }
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnectForcibly(1L, 1L, false);
        createPahoClient.close();
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect(mqttConnectionOptions);
        createPahoClient2.publish(randomString, bytes, 2, false);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        Assertions.assertEquals(1L, createQueue.getMessageCount());
    }

    @Timeout(60)
    @Test
    public void testQoS2PubCompId() throws Exception {
        String randomString = RandomUtil.randomString();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicInteger atomicInteger = new AtomicInteger();
        MQTTInterceptor mQTTInterceptor = (mqttMessage, remotingConnection) -> {
            if (mqttMessage.fixedHeader().messageType() != MqttMessageType.PUBREL) {
                return true;
            }
            atomicInteger.set(((MqttPubReplyMessageVariableHeader) mqttMessage.variableHeader()).messageId());
            return true;
        };
        MQTTInterceptor mQTTInterceptor2 = (mqttMessage2, remotingConnection2) -> {
            if (mqttMessage2.fixedHeader().messageType() != MqttMessageType.PUBCOMP) {
                return true;
            }
            Assertions.assertEquals(atomicInteger.get(), ((MqttPubAckMessage) mqttMessage2).variableHeader().messageId());
            countDownLatch.countDown();
            return true;
        };
        this.server.getRemotingService().addIncomingInterceptor(mQTTInterceptor);
        this.server.getRemotingService().addOutgoingInterceptor(mQTTInterceptor2);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient("consumer");
        createPahoClient.connect();
        createPahoClient.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch2));
        createPahoClient.subscribe(randomString, 2);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        createPahoClient2.publish(randomString, RandomUtil.randomString().getBytes(), 2, false);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        Assertions.assertTrue(countDownLatch2.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Timeout(60)
    @Test
    public void testQoS2WithExpiration2() throws Exception {
        this.server.createQueue(QueueConfiguration.of(MQTTUtil.getCoreQueueFromMqttTopic("myTopic", "consumer", this.server.getConfiguration().getWildcardConfiguration())).setAddress(MQTTUtil.getCoreAddressFromMqttTopic("myTopic", this.server.getConfiguration().getWildcardConfiguration())).setRoutingType(RoutingType.MULTICAST));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.server.getRemotingService().addOutgoingInterceptor((mqttMessage, remotingConnection) -> {
            if (mqttMessage.fixedHeader().messageType() != MqttMessageType.PUBREC) {
                return true;
            }
            Wait.assertTrue(() -> {
                return getSubscriptionQueue("myTopic", "consumer").getMessageCount() == 1;
            }, 2000L, 100L);
            try {
                Thread.sleep(1500L);
                Queue subscriptionQueue = getSubscriptionQueue("myTopic", "consumer");
                Objects.requireNonNull(countDownLatch2);
                subscriptionQueue.expireReferences(countDownLatch2::countDown);
                Assertions.assertTrue(countDownLatch2.await(2L, TimeUnit.SECONDS));
            } catch (InterruptedException e) {
                e.printStackTrace();
                Assertions.fail();
            }
            countDownLatch.countDown();
            return true;
        });
        MqttClient createPahoClient = createPahoClient("producer");
        createPahoClient.connect();
        MqttMessage mqttMessage2 = new MqttMessage();
        MqttProperties mqttProperties = new MqttProperties();
        mqttProperties.setMessageExpiryInterval(1L);
        mqttMessage2.setProperties(mqttProperties);
        mqttMessage2.setQos(2);
        mqttMessage2.setPayload("foo".getBytes(StandardCharsets.UTF_8));
        createPahoClient.publish("myTopic", mqttMessage2);
        createPahoClient.disconnect();
        createPahoClient.close();
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        Wait.assertEquals(1L, () -> {
            return getSubscriptionQueue("myTopic", "consumer").getMessagesExpired();
        });
    }
}
