package org.apache.activemq.artemis.tests.integration.mqtt;

import java.util.LinkedList;
import org.apache.activemq.artemis.tests.util.Wait;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.jgroups.util.UUID;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt/MqttAcknowledgementTest.class */
public class MqttAcknowledgementTest extends MQTTTestSupport {
    private volatile LinkedList<Integer> messageIds = new LinkedList<>();
    private volatile boolean messageArrived = false;
    private MqttClient subscriber;
    private MqttClient sender;

    @After
    public void clean() throws MqttException {
        this.messageArrived = false;
        this.messageIds.clear();
        if (this.subscriber.isConnected()) {
            this.subscriber.disconnect();
        }
        if (this.sender.isConnected()) {
            this.sender.disconnect();
        }
        this.subscriber.close();
        this.sender.close();
    }

    @Test(timeout = 300000)
    public void testAcknowledgementQOS1() throws Exception {
        test(1);
    }

    @Test(timeout = 300000, expected = AssertionError.class)
    public void testAcknowledgementQOS0() throws Exception {
        test(0);
    }

    private void test(int i) throws Exception {
        String uuid = UUID.randomUUID().toString();
        String uuid2 = UUID.randomUUID().toString();
        String uuid3 = UUID.randomUUID().toString();
        this.subscriber = createMqttClient(uuid);
        this.subscriber.subscribe(uuid3, i);
        this.sender = createMqttClient(uuid2);
        this.sender.publish(uuid3, UUID.randomUUID().toString().getBytes(), i, false);
        this.sender.publish(uuid3, UUID.randomUUID().toString().getBytes(), i, false);
        if (!Wait.waitFor(() -> {
            return this.messageIds.size() == 2;
        }, 5000L)) {
            Assert.fail();
        }
        this.subscriber.messageArrivedComplete(this.messageIds.getLast().intValue(), i);
        this.subscriber.disconnect();
        this.subscriber.close();
        this.messageArrived = false;
        if (!Wait.waitFor(() -> {
            try {
                this.subscriber = createMqttClient(uuid);
                return true;
            } catch (MqttException e) {
                return false;
            }
        }, 60000L)) {
            Assert.fail();
        }
        if (Wait.waitFor(() -> {
            return this.messageArrived;
        }, 5000L)) {
            return;
        }
        Assert.fail();
    }

    private MqttClient createMqttClient(String str) throws MqttException {
        MqttClient mqttClient = new MqttClient("tcp://localhost:" + getPort(), str, new MemoryPersistence());
        mqttClient.setCallback(createCallback());
        mqttClient.setManualAcks(true);
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(false);
        mqttClient.connect(mqttConnectOptions);
        return mqttClient;
    }

    private MqttCallback createCallback() {
        return new MqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt.MqttAcknowledgementTest.1
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                MqttAcknowledgementTest.this.messageIds.add(Integer.valueOf(mqttMessage.getId()));
                MqttAcknowledgementTest.this.messageArrived = true;
            }

            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            }

            public void connectionLost(Throwable th) {
            }
        };
    }
}
