package org.apache.activemq.artemis.tests.integration.mqtt5;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt5/MQTTRetainMessageManagerTest.class */
public class MQTTRetainMessageManagerTest extends MQTT5TestSupport {
    private MqttClient mqttPublisher;
    private MqttClient mqttConsumerBeforePublish;
    private MqttClient mqttConsumerAfterPublish;
    private MqttClient mqttConsumerAfterPublish2;
    private final AtomicInteger arrivedCountBeforePublish;
    private final AtomicInteger arrivedCountAferPublish;
    private final AtomicInteger arrivedCountAferPublish2;
    private final AtomicReference<MqttMessage> lastMessagePublished;
    private final AtomicReference<MqttMessage> lastMessageArrivedOnConsumerBeforePublish;
    private final AtomicReference<MqttMessage> lastMessageArrivedOnConsumerAfterPublish;
    private final AtomicReference<MqttMessage> lastMessageArrivedOnConsumerAfterPublish2;
    private final String topic = "fact";
    private final int numberOfMessages = 1000;
    private final int numberOfTests = 10;

    public MQTTRetainMessageManagerTest(String str) {
        super(str);
        this.arrivedCountBeforePublish = new AtomicInteger();
        this.arrivedCountAferPublish = new AtomicInteger();
        this.arrivedCountAferPublish2 = new AtomicInteger();
        this.lastMessagePublished = new AtomicReference<>();
        this.lastMessageArrivedOnConsumerBeforePublish = new AtomicReference<>();
        this.lastMessageArrivedOnConsumerAfterPublish = new AtomicReference<>();
        this.lastMessageArrivedOnConsumerAfterPublish2 = new AtomicReference<>();
        this.topic = "fact";
        this.numberOfMessages = 1000;
        this.numberOfTests = 10;
    }

    @Before
    public void beforeEach() throws MqttException {
        this.mqttPublisher = createPahoClient("publisher");
        this.mqttPublisher.connect();
        MqttMessage mqttMessage = new MqttMessage(new byte[0]);
        mqttMessage.setRetained(true);
        mqttMessage.setQos(1);
        this.mqttPublisher.publish("fact", mqttMessage);
        this.arrivedCountBeforePublish.set(0);
        this.mqttConsumerBeforePublish = createPahoClient("consumer-before");
        this.mqttConsumerBeforePublish.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.MQTTRetainMessageManagerTest.1
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage2) {
                MQTTRetainMessageManagerTest.this.lastMessageArrivedOnConsumerBeforePublish.set(mqttMessage2);
                MQTTRetainMessageManagerTest.this.arrivedCountBeforePublish.incrementAndGet();
            }
        });
        this.mqttConsumerBeforePublish.connect();
        this.arrivedCountAferPublish.set(0);
        this.mqttConsumerAfterPublish = createPahoClient("consumer-after");
        this.mqttConsumerAfterPublish.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.MQTTRetainMessageManagerTest.2
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage2) {
                MQTTRetainMessageManagerTest.this.lastMessageArrivedOnConsumerAfterPublish.set(mqttMessage2);
                MQTTRetainMessageManagerTest.this.arrivedCountAferPublish.incrementAndGet();
            }
        });
        this.mqttConsumerAfterPublish.connect();
        this.arrivedCountAferPublish2.set(0);
        this.mqttConsumerAfterPublish2 = createPahoClient("consumer-after2");
        this.mqttConsumerAfterPublish2.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.MQTTRetainMessageManagerTest.3
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage2) {
                MQTTRetainMessageManagerTest.this.lastMessageArrivedOnConsumerAfterPublish2.set(mqttMessage2);
                MQTTRetainMessageManagerTest.this.arrivedCountAferPublish2.incrementAndGet();
            }
        });
        this.mqttConsumerAfterPublish2.connect();
    }

    @After
    public void afterEach() throws MqttException {
        this.mqttPublisher.disconnect();
        this.mqttPublisher.close();
        this.mqttConsumerBeforePublish.unsubscribe("fact");
        this.mqttConsumerBeforePublish.disconnect();
        this.mqttConsumerBeforePublish.close();
        this.mqttConsumerAfterPublish.unsubscribe("fact");
        this.mqttConsumerAfterPublish.disconnect();
        this.mqttConsumerAfterPublish.close();
        this.mqttConsumerAfterPublish2.unsubscribe("fact");
        this.mqttConsumerAfterPublish2.disconnect();
        this.mqttConsumerAfterPublish2.close();
    }

    @Test(timeout = 300000)
    public void testAtMostOnce() {
        IntStream.of(10).forEach(i -> {
            test(0);
        });
    }

    @Test(timeout = 300000)
    public void testAtLeastOnce() {
        IntStream.of(10).forEach(i -> {
            test(1);
        });
    }

    @Test(timeout = 300000)
    public void testExactlyOnce() {
        IntStream.of(10).forEach(i -> {
            test(2);
        });
    }

    private void test(int i) {
        try {
            this.mqttConsumerBeforePublish.subscribe("fact", i);
            for (int i2 = 0; i2 < 1000; i2++) {
                MqttMessage mqttMessage = new MqttMessage();
                mqttMessage.setQos(i);
                mqttMessage.setRetained(true);
                mqttMessage.setPayload(RandomUtil.randomBytes(128));
                this.mqttPublisher.publish("fact", mqttMessage);
                this.lastMessagePublished.set(mqttMessage);
            }
            Wait.waitFor(() -> {
                return this.server.getAddressInfo(SimpleString.toSimpleString("fact")).getRoutedMessageCount() >= 1000;
            }, 5000L, 100L);
            this.mqttConsumerAfterPublish.subscribe("fact", i);
            this.mqttConsumerAfterPublish2.subscribe("fact", i);
            Wait.waitFor(() -> {
                return this.lastMessageArrivedOnConsumerAfterPublish.get() != null;
            }, 5000L, 100L);
            Wait.waitFor(() -> {
                return this.lastMessageArrivedOnConsumerAfterPublish2.get() != null;
            }, 5000L, 100L);
            assertEquals(1L, this.arrivedCountAferPublish.get());
            assertArrayEquals(String.format("\nMessage arrived on consumer subscribed before the publish is different from the last published message!\nPublished: %s\nArrived  : %s\n", new String(this.lastMessagePublished.get().getPayload()), new String(this.lastMessageArrivedOnConsumerAfterPublish.get().getPayload())), this.lastMessagePublished.get().getPayload(), this.lastMessageArrivedOnConsumerBeforePublish.get().getPayload());
            assertArrayEquals(String.format("\nMessage arrived on consumer subscribed after the publish is different from the last published message!\nPublished: %s\nArrived  : %s\n", new String(this.lastMessagePublished.get().getPayload()), new String(this.lastMessageArrivedOnConsumerAfterPublish.get().getPayload())), this.lastMessagePublished.get().getPayload(), this.lastMessageArrivedOnConsumerAfterPublish.get().getPayload());
            assertArrayEquals(String.format("\nMessage arrived on consumer subscribed after the publish (2) is different from the last published message!\nPublished: %s\nArrived  : %s\n", new String(this.lastMessagePublished.get().getPayload()), new String(this.lastMessageArrivedOnConsumerAfterPublish.get().getPayload())), this.lastMessagePublished.get().getPayload(), this.lastMessageArrivedOnConsumerAfterPublish2.get().getPayload());
        } catch (MqttException e) {
            fail(e.getMessage());
        }
    }
}
