package org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets;

import io.netty.handler.codec.mqtt.MqttMessageIdAndPropertiesVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/UnsubscribeTests.class */
public class UnsubscribeTests extends MQTT5TestSupport {
    @Timeout(60)
    @Test
    public void testUnsubscribe() throws Exception {
        String randomString = RandomUtil.randomString();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        SimpleString[] simpleStringArr = new SimpleString[30];
        for (int i = 0; i < 30; i++) {
            simpleStringArr[i] = SimpleString.of(i + "-" + randomString);
        }
        this.server.getRemotingService().addOutgoingInterceptor((mqttMessage, remotingConnection) -> {
            if (mqttMessage.fixedHeader().messageType() != MqttMessageType.UNSUBACK) {
                return true;
            }
            atomicInteger.incrementAndGet();
            return true;
        });
        MqttClient createPahoClient = createPahoClient("consumer");
        createPahoClient.connect();
        MqttSubscription[] mqttSubscriptionArr = new MqttSubscription[30];
        for (int i2 = 0; i2 < 30; i2++) {
            mqttSubscriptionArr[i2] = new MqttSubscription(simpleStringArr[i2].toString(), 0);
        }
        createPahoClient.subscribe(mqttSubscriptionArr);
        for (int i3 = 0; i3 < 30; i3++) {
            Assertions.assertTrue(this.server.getPostOffice().isAddressBound(simpleStringArr[i3]));
        }
        String[] strArr = new String[15];
        for (int i4 = 0; i4 < 15; i4++) {
            strArr[i4] = simpleStringArr[i4].toString();
        }
        createPahoClient.unsubscribe(strArr);
        for (int i5 = 0; i5 < 15; i5++) {
            Assertions.assertFalse(this.server.getPostOffice().isAddressBound(simpleStringArr[i5]));
        }
        for (int i6 = 15; i6 < 30; i6++) {
            Assertions.assertTrue(this.server.getPostOffice().isAddressBound(simpleStringArr[i6]));
        }
        Assertions.assertEquals(1, atomicInteger.get());
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Timeout(60)
    @Test
    public void testStopAddingMessagesOnUnsubscribe() throws Exception {
        String randomString = RandomUtil.randomString();
        MqttClient createPahoClient = createPahoClient("consumer1");
        createPahoClient.connect();
        createPahoClient.subscribe(randomString, 0);
        MqttClient createPahoClient2 = createPahoClient("consumer2");
        createPahoClient2.connect();
        createPahoClient2.subscribe(randomString, 0);
        Queue subscriptionQueue = getSubscriptionQueue(randomString, "consumer1");
        Queue subscriptionQueue2 = getSubscriptionQueue(randomString, "consumer2");
        MqttClient createPahoClient3 = createPahoClient("producer");
        createPahoClient3.connect();
        createPahoClient3.publish(randomString, new byte[0], 0, false);
        Wait.assertEquals(1L, () -> {
            return subscriptionQueue.getMessagesAdded();
        }, 2000L, 100L);
        Wait.assertEquals(1L, () -> {
            return subscriptionQueue2.getMessagesAdded();
        }, 2000L, 100L);
        createPahoClient2.unsubscribe(randomString);
        createPahoClient3.publish(randomString, new byte[0], 0, false);
        createPahoClient3.disconnect();
        createPahoClient3.close();
        Wait.assertEquals(2L, () -> {
            return subscriptionQueue.getMessagesAdded();
        }, 2000L, 100L);
        Wait.assertTrue(() -> {
            return getSubscriptionQueue(randomString, "consumer2") == null;
        });
        createPahoClient.disconnect();
        createPahoClient.close();
        createPahoClient2.disconnect();
        createPahoClient2.close();
    }

    @Timeout(60)
    @Test
    public void testUnsubAck() throws Exception {
        String randomString = RandomUtil.randomString();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        MQTTInterceptor mQTTInterceptor = (mqttMessage, remotingConnection) -> {
            if (mqttMessage.fixedHeader().messageType() != MqttMessageType.UNSUBSCRIBE) {
                return true;
            }
            atomicBoolean.set(true);
            return true;
        };
        MQTTInterceptor mQTTInterceptor2 = (mqttMessage2, remotingConnection2) -> {
            if (!atomicBoolean.get() || mqttMessage2.fixedHeader().messageType() != MqttMessageType.UNSUBACK) {
                return true;
            }
            countDownLatch.countDown();
            return true;
        };
        this.server.getRemotingService().addIncomingInterceptor(mQTTInterceptor);
        this.server.getRemotingService().addOutgoingInterceptor(mQTTInterceptor2);
        MqttClient createPahoClient = createPahoClient("consumer");
        createPahoClient.connect();
        createPahoClient.subscribe(randomString, 0);
        createPahoClient.unsubscribe(randomString);
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Timeout(60)
    @Test
    public void testUnsubAckPacketId() throws Exception {
        String randomString = RandomUtil.randomString();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        MQTTInterceptor mQTTInterceptor = (mqttMessage, remotingConnection) -> {
            if (mqttMessage.fixedHeader().messageType() != MqttMessageType.UNSUBSCRIBE) {
                return true;
            }
            atomicBoolean.set(true);
            atomicInteger.set(((MqttMessageIdAndPropertiesVariableHeader) mqttMessage.variableHeader()).messageId());
            return true;
        };
        MQTTInterceptor mQTTInterceptor2 = (mqttMessage2, remotingConnection2) -> {
            if (!atomicBoolean.get() || mqttMessage2.fixedHeader().messageType() != MqttMessageType.UNSUBACK) {
                return true;
            }
            Assertions.assertEquals(atomicInteger.get(), ((MqttMessageIdAndPropertiesVariableHeader) mqttMessage2.variableHeader()).messageId());
            countDownLatch.countDown();
            return true;
        };
        this.server.getRemotingService().addIncomingInterceptor(mQTTInterceptor);
        this.server.getRemotingService().addOutgoingInterceptor(mQTTInterceptor2);
        MqttClient createPahoClient = createPahoClient("consumer");
        createPahoClient.connect();
        createPahoClient.subscribe(randomString, 0);
        createPahoClient.unsubscribe(randomString);
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
    }
}
