package org.apache.activemq.artemis.tests.integration.mqtt5.spec;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.utils.Wait;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt5/spec/SubscriptionTests.class */
public class SubscriptionTests extends MQTT5TestSupport {
    @Timeout(60)
    @Test
    public void testSharedSubscriptionRespectQoS() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        MqttClient createPahoClient = createPahoClient("consumer1");
        createPahoClient.connect();
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.SubscriptionTests.1
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                if (mqttMessage.getQos() == 0) {
                    atomicInteger.incrementAndGet();
                } else {
                    Assertions.fail("Wrong QoS for consumer 1: " + mqttMessage.getId() + " " + mqttMessage.getQos());
                }
            }
        });
        createPahoClient.subscribe("$share/myShare/myTopic", 0);
        Queue locateQueue = this.server.locateQueue("myShare".concat(".").concat("myTopic"));
        Assertions.assertNotNull(locateQueue);
        Assertions.assertEquals("myTopic", locateQueue.getAddress().toString());
        MqttClient createPahoClient2 = createPahoClient("consumer2");
        createPahoClient2.connect();
        createPahoClient2.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.SubscriptionTests.2
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                if (mqttMessage.getQos() == 1) {
                    atomicInteger2.incrementAndGet();
                } else {
                    Assertions.fail("Wrong QoS for consumer 2: " + mqttMessage.getId() + " " + mqttMessage.getQos());
                }
            }
        });
        createPahoClient2.subscribe("$share/myShare/myTopic", 1);
        Assertions.assertEquals(2, locateQueue.getConsumerCount());
        MqttClient createPahoClient3 = createPahoClient("producer");
        createPahoClient3.connect();
        for (int i = 0; i < 100; i++) {
            createPahoClient3.publish("myTopic", new byte[0], 1, false);
        }
        createPahoClient3.disconnect();
        createPahoClient3.close();
        Wait.assertTrue(() -> {
            return atomicInteger.get() > 0;
        }, 2000L, 100L);
        Wait.assertTrue(() -> {
            return atomicInteger2.get() > 0;
        }, 2000L, 100L);
        Wait.assertEquals(100, () -> {
            return atomicInteger.get() + atomicInteger2.get();
        }, 2000L, 100L);
        createPahoClient.disconnect();
        createPahoClient.close();
        createPahoClient2.disconnect();
        createPahoClient2.close();
    }

    @Timeout(60)
    @Test
    public void testSharedSubscriptionWithAck() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient("consumer1");
        createPahoClient.connect();
        createPahoClient.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch));
        createPahoClient.subscribe("$share/myShare/myTopic", 1);
        Queue locateQueue = this.server.locateQueue("myShare".concat(".").concat("myTopic"));
        Assertions.assertNotNull(locateQueue);
        Assertions.assertEquals("myTopic", locateQueue.getAddress().toString());
        Assertions.assertEquals(1, locateQueue.getConsumerCount());
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        createPahoClient2.publish("myTopic", new byte[0], 1, false);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        MqttClient createPahoClient3 = createPahoClient("consumer2");
        createPahoClient3.connect();
        createPahoClient3.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch2));
        createPahoClient3.subscribe("$share/myShare/myTopic", 1);
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        Assertions.assertFalse(countDownLatch2.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
        createPahoClient3.disconnect();
        createPahoClient3.close();
    }
}
