package org.apache.activemq.artemis.tests.integration.mqtt5.spec;

import io.netty.handler.codec.mqtt.MqttMessageIdAndPropertiesVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPubReplyMessageVariableHeader;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt5/spec/ControlPacketFormatTests.class */
public class ControlPacketFormatTests extends MQTT5TestSupport {
    @Timeout(60)
    @Test
    public void testPacketIdQoSZero() throws Exception {
        String topicName = getTopicName();
        final CountDownLatch countDownLatch = new CountDownLatch(100);
        MqttClient createPahoClient = createPahoClient("consumer");
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.ControlPacketFormatTests.1
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                Assertions.assertEquals(0, mqttMessage.getId());
                Assertions.assertEquals(0, mqttMessage.getQos());
                countDownLatch.countDown();
            }
        });
        createPahoClient.connect();
        createPahoClient.subscribe(topicName, 0);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        for (int i = 0; i < 100; i++) {
            createPahoClient2.publish(topicName, ("foo" + i).getBytes(), 0, false);
        }
        Wait.assertEquals(100L, () -> {
            return getSubscriptionQueue(topicName, "consumer").getMessagesAdded();
        });
        createPahoClient2.disconnect();
        createPahoClient2.close();
        Assertions.assertTrue(countDownLatch.await(3L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Timeout(60)
    @Test
    public void testPacketIdQoSGreaterThanZero() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        final ArrayList arrayList = new ArrayList();
        final Object obj = new Object();
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.ControlPacketFormatTests.2
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                synchronized (obj) {
                    System.out.println(mqttMessage.getId());
                    Assertions.assertFalse(arrayList.contains(Integer.valueOf(mqttMessage.getId())));
                    arrayList.add(Integer.valueOf(mqttMessage.getId()));
                    countDownLatch.countDown();
                }
            }
        });
        createPahoClient.connect();
        createPahoClient.subscribe(topicName, 2);
        Wait.assertTrue(() -> {
            return getSubscriptionQueue(topicName, randomString) != null;
        });
        Wait.assertEquals(1, () -> {
            return getSubscriptionQueue(topicName, randomString).getConsumerCount();
        });
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        for (int i = 0; i < 10; i++) {
            createPahoClient2.publish(topicName, ("foo" + i).getBytes(), (RandomUtil.randomPositiveInt() % 2) + 1, false);
        }
        Wait.assertEquals(10L, () -> {
            return getSubscriptionQueue(topicName, randomString).getMessagesAdded();
        });
        createPahoClient2.disconnect();
        createPahoClient2.close();
        Assertions.assertTrue(countDownLatch.await(3L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Timeout(60)
    @Test
    public void testPacketIdPubAckQoS2() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        MQTTInterceptor mQTTInterceptor = (mqttMessage, remotingConnection) -> {
            if (mqttMessage.fixedHeader().messageType() == MqttMessageType.PUBLISH) {
                atomicInteger.set(((MqttPublishMessage) mqttMessage).variableHeader().packetId());
                atomicInteger2.incrementAndGet();
            }
            if (mqttMessage.fixedHeader().messageType() != MqttMessageType.PUBREL && mqttMessage.fixedHeader().messageType() != MqttMessageType.PUBREC && mqttMessage.fixedHeader().messageType() != MqttMessageType.PUBCOMP) {
                return true;
            }
            if (((MqttPubReplyMessageVariableHeader) mqttMessage.variableHeader()).messageId() != atomicInteger.get()) {
                atomicBoolean.set(true);
            }
            atomicInteger2.incrementAndGet();
            return true;
        };
        MQTTInterceptor mQTTInterceptor2 = (mqttMessage2, remotingConnection2) -> {
            if (mqttMessage2.fixedHeader().messageType() == MqttMessageType.PUBLISH) {
                if (((MqttPublishMessage) mqttMessage2).variableHeader().packetId() != atomicInteger.get()) {
                    atomicBoolean.set(true);
                }
                atomicInteger2.incrementAndGet();
            }
            if (mqttMessage2.fixedHeader().messageType() != MqttMessageType.PUBREL && mqttMessage2.fixedHeader().messageType() != MqttMessageType.PUBREC && mqttMessage2.fixedHeader().messageType() != MqttMessageType.PUBCOMP) {
                return true;
            }
            if (((MqttPubAckMessage) mqttMessage2).variableHeader().messageId() != atomicInteger.get()) {
                atomicBoolean.set(true);
            }
            atomicInteger2.incrementAndGet();
            return true;
        };
        this.server.getRemotingService().addIncomingInterceptor(mQTTInterceptor);
        this.server.getRemotingService().addOutgoingInterceptor(mQTTInterceptor2);
        String topicName = getTopicName();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient("consumer");
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.ControlPacketFormatTests.3
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage3) throws Exception {
                countDownLatch.countDown();
            }
        });
        createPahoClient.connect();
        createPahoClient.subscribe(topicName, 2);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        createPahoClient2.publish(topicName, "foo".getBytes(StandardCharsets.UTF_8), 2, false);
        Wait.assertEquals(1L, () -> {
            return getSubscriptionQueue(topicName, "consumer").getMessagesAdded();
        }, 2000L, 100L);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        Wait.assertEquals(1L, () -> {
            return getSubscriptionQueue(topicName, "consumer").getMessagesAcknowledged();
        }, AmqpSender.DEFAULT_SEND_TIMEOUT, 100L);
        Assertions.assertTrue(countDownLatch.await(15L, TimeUnit.SECONDS));
        Wait.assertFalse(() -> {
            return atomicBoolean.get();
        }, 2000L, 100L);
        Wait.assertEquals(8, () -> {
            return atomicInteger2.get();
        });
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Timeout(60)
    @Test
    public void testPacketIdPubAckQoS1() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        MQTTInterceptor mQTTInterceptor = (mqttMessage, remotingConnection) -> {
            if (mqttMessage.fixedHeader().messageType() == MqttMessageType.PUBLISH) {
                atomicInteger.set(((MqttPublishMessage) mqttMessage).variableHeader().packetId());
                atomicInteger2.incrementAndGet();
            }
            if (mqttMessage.fixedHeader().messageType() != MqttMessageType.PUBACK) {
                return true;
            }
            if (((MqttPubReplyMessageVariableHeader) mqttMessage.variableHeader()).messageId() != atomicInteger.get()) {
                atomicBoolean.set(true);
            }
            atomicInteger2.incrementAndGet();
            return true;
        };
        MQTTInterceptor mQTTInterceptor2 = (mqttMessage2, remotingConnection2) -> {
            if (mqttMessage2.fixedHeader().messageType() == MqttMessageType.PUBLISH) {
                if (((MqttPublishMessage) mqttMessage2).variableHeader().packetId() != atomicInteger.get()) {
                    atomicBoolean.set(true);
                }
                atomicInteger2.incrementAndGet();
            }
            if (mqttMessage2.fixedHeader().messageType() != MqttMessageType.PUBACK) {
                return true;
            }
            if (((MqttPubAckMessage) mqttMessage2).variableHeader().messageId() != atomicInteger.get()) {
                atomicBoolean.set(true);
            }
            atomicInteger2.incrementAndGet();
            return true;
        };
        this.server.getRemotingService().addIncomingInterceptor(mQTTInterceptor);
        this.server.getRemotingService().addOutgoingInterceptor(mQTTInterceptor2);
        String topicName = getTopicName();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient("consumer");
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.ControlPacketFormatTests.4
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage3) throws Exception {
                countDownLatch.countDown();
            }
        });
        createPahoClient.connect();
        createPahoClient.subscribe(topicName, 1);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        createPahoClient2.publish(topicName, "foo".getBytes(StandardCharsets.UTF_8), 1, false);
        Wait.assertEquals(1L, () -> {
            return getSubscriptionQueue(topicName, "consumer").getMessagesAdded();
        }, 2000L, 100L);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        Wait.assertEquals(1L, () -> {
            return getSubscriptionQueue(topicName, "consumer").getMessagesAcknowledged();
        }, AmqpSender.DEFAULT_SEND_TIMEOUT, 100L);
        Assertions.assertTrue(countDownLatch.await(15L, TimeUnit.SECONDS));
        Wait.assertFalse(() -> {
            return atomicBoolean.get();
        }, 2000L, 100L);
        Wait.assertEquals(4, () -> {
            return atomicInteger2.get();
        });
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Timeout(60)
    @Test
    public void testPacketIdSubAckAndUnsubAck() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        AtomicInteger atomicInteger3 = new AtomicInteger(0);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MQTTInterceptor mQTTInterceptor = (mqttMessage, remotingConnection) -> {
            if (mqttMessage.fixedHeader().messageType() == MqttMessageType.SUBSCRIBE) {
                atomicInteger.set(((MqttSubscribeMessage) mqttMessage).variableHeader().messageId());
                atomicInteger3.incrementAndGet();
            }
            if (mqttMessage.fixedHeader().messageType() != MqttMessageType.UNSUBSCRIBE) {
                return true;
            }
            atomicInteger2.set(((MqttUnsubscribeMessage) mqttMessage).variableHeader().messageId());
            atomicInteger3.incrementAndGet();
            return true;
        };
        MQTTInterceptor mQTTInterceptor2 = (mqttMessage2, remotingConnection2) -> {
            if (mqttMessage2.fixedHeader().messageType() == MqttMessageType.SUBACK) {
                if (((MqttMessageIdAndPropertiesVariableHeader) mqttMessage2.variableHeader()).messageId() != atomicInteger.get()) {
                    atomicBoolean.set(true);
                }
                atomicInteger3.incrementAndGet();
                return true;
            }
            if (mqttMessage2.fixedHeader().messageType() != MqttMessageType.UNSUBACK) {
                return true;
            }
            if (((MqttMessageIdAndPropertiesVariableHeader) mqttMessage2.variableHeader()).messageId() != atomicInteger2.get()) {
                atomicBoolean.set(true);
            }
            atomicInteger3.incrementAndGet();
            return true;
        };
        this.server.getRemotingService().addIncomingInterceptor(mQTTInterceptor);
        this.server.getRemotingService().addOutgoingInterceptor(mQTTInterceptor2);
        String topicName = getTopicName();
        MqttClient createPahoClient = createPahoClient("consumer");
        createPahoClient.connect();
        createPahoClient.subscribe(topicName, 1);
        createPahoClient.unsubscribe(topicName);
        Wait.assertFalse(() -> {
            return atomicBoolean.get();
        }, 2000L, 100L);
        Wait.assertEquals(4, () -> {
            return atomicInteger3.get();
        });
        createPahoClient.disconnect();
        createPahoClient.close();
    }
}
