package org.apache.activemq.artemis.tests.integration.mqtt;

import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
import org.apache.activemq.artemis.tests.integration.client.AutoCreateJmsDestinationTest;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTTest.class */
public class PahoMQTTTest extends MQTTTestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    public String protocol;

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTTest$TestMqttClientCallback.class */
    private class TestMqttClientCallback implements MqttCallback {
        private CountDownLatch latch;

        TestMqttClientCallback(CountDownLatch countDownLatch) {
            this.latch = countDownLatch;
        }

        public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
            this.latch.countDown();
        }

        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        }

        public void connectionLost(Throwable th) {
        }
    }

    @Parameters(name = "protocol={0}")
    public static Collection<Object[]> getParams() {
        return Arrays.asList(new Object[]{"tcp"}, new Object[]{"ws"});
    }

    public PahoMQTTTest(String str) {
        this.protocol = str;
    }

    /* JADX WARN: Type inference failed for: r0v27, types: [org.apache.activemq.artemis.tests.integration.mqtt.PahoMQTTTest$2] */
    @Timeout(60)
    @TestTemplate
    public void testLotsOfClients() throws Exception {
        int intValue = Integer.getInteger("PahoMQTTTest.CLIENTS", 100).intValue();
        logger.debug("Using: {} clients: ", Integer.valueOf(intValue));
        final AtomicInteger atomicInteger = new AtomicInteger();
        MqttClient createPahoClient = createPahoClient("consumer");
        createPahoClient.setCallback(new MqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.PahoMQTTTest.1
            public void connectionLost(Throwable th) {
            }

            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                atomicInteger.incrementAndGet();
            }

            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            }
        });
        createPahoClient.connect();
        createPahoClient.subscribe(AutoCreateJmsDestinationTest.QUEUE_NAME);
        final AtomicReference atomicReference = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(intValue);
        final CountDownLatch countDownLatch2 = new CountDownLatch(intValue);
        final CountDownLatch countDownLatch3 = new CountDownLatch(1);
        for (int i = 0; i < intValue; i++) {
            Thread.sleep(10L);
            new Thread(null, null, "client:" + i) { // from class: org.apache.activemq.artemis.tests.integration.mqtt.PahoMQTTTest.2
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        MqttClient createPahoClient2 = PahoMQTTTest.this.createPahoClient(Thread.currentThread().getName());
                        createPahoClient2.connect();
                        countDownLatch.countDown();
                        countDownLatch3.await();
                        for (int i2 = 0; i2 < 10; i2++) {
                            Thread.sleep(1000L);
                            createPahoClient2.publish(AutoCreateJmsDestinationTest.QUEUE_NAME, "hello".getBytes(), 1, false);
                        }
                        createPahoClient2.disconnect();
                        createPahoClient2.close();
                    } catch (Throwable th) {
                        th.printStackTrace();
                        atomicReference.set(th);
                    } finally {
                        countDownLatch2.countDown();
                    }
                }
            }.start();
        }
        countDownLatch.await();
        Assertions.assertNull(atomicReference.get(), "Async error: " + atomicReference.get());
        countDownLatch3.countDown();
        logger.debug("All clients connected... waiting to receive sent messages...");
        within(30, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(atomicInteger.get() == intValue * 10);
        });
        logger.debug("All messages received.");
        countDownLatch2.await();
        Assertions.assertNull(atomicReference.get(), "Async error: " + atomicReference.get());
    }

    @Timeout(60)
    @TestTemplate
    public void testSendAndReceiveMQTT() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient("consumerId");
        MqttClient createPahoClient2 = createPahoClient("producerId");
        createPahoClient.connect();
        createPahoClient.subscribe(AutoCreateJmsDestinationTest.QUEUE_NAME);
        createPahoClient.setCallback(new MqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.PahoMQTTTest.3
            public void connectionLost(Throwable th) {
            }

            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                countDownLatch.countDown();
            }

            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            }
        });
        createPahoClient2.connect();
        createPahoClient2.publish(AutoCreateJmsDestinationTest.QUEUE_NAME, "hello".getBytes(), 1, false);
        waitForLatch(countDownLatch);
        createPahoClient2.disconnect();
        createPahoClient2.close();
    }

    @Timeout(60)
    @TestTemplate
    public void testSessionPresentWithCleanSession() throws Exception {
        MqttClient createPahoClient = createPahoClient(RandomUtil.randomString());
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(true);
        Assertions.assertFalse(createPahoClient.connectWithResult(mqttConnectOptions).getSessionPresent());
        createPahoClient.disconnect();
    }

    @Timeout(60)
    @TestTemplate
    public void testSessionPresent() throws Exception {
        MqttClient createPahoClient = createPahoClient(RandomUtil.randomString());
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(false);
        Assertions.assertFalse(createPahoClient.connectWithResult(mqttConnectOptions).getSessionPresent());
        createPahoClient.disconnect();
        Assertions.assertTrue(createPahoClient.connectWithResult(mqttConnectOptions).getSessionPresent());
    }

    private MqttClient createPahoClient(String str) throws MqttException {
        return new MqttClient(this.protocol + "://localhost:" + getPort(), str, new MemoryPersistence());
    }

    @Timeout(60)
    @TestTemplate
    public void testDollarAndHashSubscriptions() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(3);
        CountDownLatch countDownLatch2 = new CountDownLatch(2);
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient("test-client-admin");
        MqttClient createPahoClient2 = createPahoClient("test-client-1");
        MqttClient createPahoClient3 = createPahoClient("test-client-2");
        createPahoClient.setCallback(new TestMqttClientCallback(countDownLatch));
        createPahoClient2.setCallback(new TestMqttClientCallback(countDownLatch2));
        createPahoClient3.setCallback(new TestMqttClientCallback(countDownLatch3));
        createPahoClient.connect();
        createPahoClient2.connect();
        createPahoClient3.connect();
        createPahoClient2.subscribe("$dollar/test-client-1/#");
        createPahoClient3.subscribe("$dollar/test-client-2/#");
        createPahoClient.subscribe("#");
        MqttMessage mqttMessage = new MqttMessage(AutoCreateJmsDestinationTest.QUEUE_NAME.getBytes());
        createPahoClient2.publish("$dollar/test-client-1/foo", mqttMessage);
        createPahoClient3.publish("$dollar/test-client-2/foo", mqttMessage);
        createPahoClient.publish("$dollar/test-client-1/bar", mqttMessage);
        createPahoClient.publish("$dollar/test-client-1/bar", mqttMessage);
        createPahoClient2.publish("$dollar/test-client-1/baz", mqttMessage);
        createPahoClient3.publish("$dollar/test-client-2/baz", mqttMessage);
        createPahoClient.publish("$dollar/test-client-1/baz", mqttMessage);
        createPahoClient.publish("$dollar/test-client-2/baz", mqttMessage);
        Assertions.assertTrue(countDownLatch2.await(2L, TimeUnit.SECONDS));
        Assertions.assertTrue(countDownLatch3.await(2L, TimeUnit.SECONDS));
        Assertions.assertFalse(countDownLatch.await(1L, TimeUnit.SECONDS));
        Assertions.assertEquals(3L, countDownLatch.getCount());
        createPahoClient.disconnect();
        createPahoClient.close();
        createPahoClient2.disconnect();
        createPahoClient2.close();
        createPahoClient3.disconnect();
        createPahoClient3.close();
    }
}
