package org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets;

import io.netty.handler.codec.mqtt.MqttMessageType;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.class */
public class ConnectTests extends MQTT5TestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    public ConnectTests(String str) {
        super(str);
    }

    @Test(timeout = 300000)
    public void testWillFlag() throws Exception {
        String randomString = RandomUtil.randomString();
        byte[] randomBytes = RandomUtil.randomBytes();
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.connect(new MqttConnectionOptionsBuilder().will("/topic/foo", new MqttMessage(randomBytes)).build());
        assertEquals(1L, getSessionStates().size());
        assertNotNull(getSessionStates().get(randomString));
        assertTrue(getSessionStates().get(randomString).isWill());
        byte[] bArr = new byte[getSessionStates().get(randomString).getWillMessage().capacity()];
        getSessionStates().get(randomString).getWillMessage().getBytes(0, bArr);
        assertEqualsByteArrays(randomBytes, bArr);
        createPahoClient.disconnect();
    }

    @Test(timeout = 300000)
    public void testWillMessageWithNoSessionExpiryDelayAndNoWillDelay() throws Exception {
        String randomString = RandomUtil.randomString();
        String randomString2 = RandomUtil.randomString();
        byte[] randomBytes = RandomUtil.randomBytes();
        MqttClient createPahoClient = createPahoClient(randomString);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        createPahoClient.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch));
        createPahoClient.connect();
        createPahoClient.subscribe("/topic/foo", 1);
        MqttClient createPahoClient2 = createPahoClient(randomString2);
        createPahoClient2.connect(new MqttConnectionOptionsBuilder().will("/topic/foo", new MqttMessage(randomBytes)).build());
        createPahoClient2.disconnectForcibly(0L, 0L, false);
        assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
    }

    @Test(timeout = 300000)
    public void testWillFlagWithSessionExpiryDelayAndWillDelay() throws Exception {
        byte[] randomBytes = RandomUtil.randomBytes();
        MqttClient createPahoClient = createPahoClient("willConsumer");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        createPahoClient.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch));
        createPahoClient.connect();
        createPahoClient.subscribe("/topic/foo", 1);
        MqttClient createPahoClient2 = createPahoClient("willGenerator");
        MqttProperties mqttProperties = new MqttProperties();
        mqttProperties.setWillDelayInterval(1L);
        MqttConnectionOptions build = new MqttConnectionOptionsBuilder().sessionExpiryInterval(4L).will("/topic/foo", new MqttMessage(randomBytes)).build();
        build.setWillMessageProperties(mqttProperties);
        createPahoClient2.connect(build);
        createPahoClient2.disconnectForcibly(0L, 0L, false);
        long currentTimeMillis = System.currentTimeMillis();
        assertTrue(countDownLatch.await(3L, TimeUnit.SECONDS));
        assertTrue(System.currentTimeMillis() - currentTimeMillis >= 1000);
    }

    @Test(timeout = 300000)
    public void testWillFlagWithNoSessionExpiryDelayAndWillDelay() throws Exception {
        byte[] randomBytes = RandomUtil.randomBytes();
        MqttClient createPahoClient = createPahoClient("willConsumer");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        createPahoClient.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch));
        createPahoClient.connect();
        createPahoClient.subscribe("/topic/foo", 1);
        MqttClient createPahoClient2 = createPahoClient("willGenerator");
        MqttProperties mqttProperties = new MqttProperties();
        mqttProperties.setWillDelayInterval(1L);
        MqttConnectionOptions build = new MqttConnectionOptionsBuilder().will("/topic/foo", new MqttMessage(randomBytes)).build();
        build.setWillMessageProperties(mqttProperties);
        createPahoClient2.connect(build);
        createPahoClient2.disconnectForcibly(0L, 0L, false);
        assertTrue(countDownLatch.await(500L, TimeUnit.MILLISECONDS));
    }

    @Test(timeout = 300000)
    public void testWillFlagWithShorterSessionExpiryDelayThanWillDelay() throws Exception {
        byte[] randomBytes = RandomUtil.randomBytes();
        MqttClient createPahoClient = createPahoClient("willConsumer");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        createPahoClient.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch));
        createPahoClient.connect();
        createPahoClient.subscribe("/topic/foo", 1);
        MqttClient createPahoClient2 = createPahoClient("willGenerator");
        MqttProperties mqttProperties = new MqttProperties();
        mqttProperties.setWillDelayInterval(2L);
        MqttConnectionOptions build = new MqttConnectionOptionsBuilder().sessionExpiryInterval(1L).will("/topic/foo", new MqttMessage(randomBytes)).build();
        build.setWillMessageProperties(mqttProperties);
        createPahoClient2.connect(build);
        createPahoClient2.disconnectForcibly(0L, 0L, false);
        assertTrue(countDownLatch.await(1500L, TimeUnit.MILLISECONDS));
    }

    @Test(timeout = 300000)
    public void testWillFlagWithSessionExpiryDelayAndWillDelayWithReconnect() throws Exception {
        byte[] randomBytes = RandomUtil.randomBytes();
        MqttClient createPahoClient = createPahoClient("willConsumer");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        createPahoClient.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch));
        createPahoClient.connect();
        createPahoClient.subscribe("/topic/foo", 1);
        MqttClient createPahoClient2 = createPahoClient("willGenerator");
        MqttProperties mqttProperties = new MqttProperties();
        mqttProperties.setWillDelayInterval(2L);
        MqttConnectionOptions build = new MqttConnectionOptionsBuilder().sessionExpiryInterval(2L).will("/topic/foo", new MqttMessage(randomBytes)).build();
        build.setWillMessageProperties(mqttProperties);
        createPahoClient2.connect(build);
        createPahoClient2.disconnectForcibly(0L, 0L, false);
        createPahoClient2.connect(new MqttConnectionOptionsBuilder().cleanStart(false).will("/topic/foo", new MqttMessage(randomBytes)).build());
        assertFalse(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient2.disconnect();
    }

    @Test(timeout = 300000)
    public void testWillFlagWithDisconnect() throws Exception {
        byte[] randomBytes = RandomUtil.randomBytes();
        MqttClient createPahoClient = createPahoClient("willConsumer");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        createPahoClient.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch));
        createPahoClient.connect();
        createPahoClient.subscribe("/topic/foo", 1);
        MqttClient createPahoClient2 = createPahoClient("willGenerator");
        createPahoClient2.connect(new MqttConnectionOptionsBuilder().will("/topic/foo", new MqttMessage(randomBytes)).build());
        createPahoClient2.disconnect();
        assertFalse(countDownLatch.await(2L, TimeUnit.SECONDS));
    }

    @Test(timeout = 300000)
    public void testWillMessageRemovedOnceSent() throws Exception {
        String randomString = RandomUtil.randomString();
        String randomString2 = RandomUtil.randomString();
        byte[] randomBytes = RandomUtil.randomBytes();
        MqttClient createPahoClient = createPahoClient(randomString);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        createPahoClient.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch));
        createPahoClient.connect();
        createPahoClient.subscribe("/topic/foo", 1);
        MqttClient createPahoClient2 = createPahoClient(randomString2);
        MqttProperties mqttProperties = new MqttProperties();
        mqttProperties.setWillDelayInterval(1L);
        MqttConnectionOptions build = new MqttConnectionOptionsBuilder().sessionExpiryInterval(5L).will("/topic/foo", new MqttMessage(randomBytes)).build();
        build.setWillMessageProperties(mqttProperties);
        createPahoClient2.connect(build);
        MQTTSessionState mQTTSessionState = getSessionStates().get(randomString2);
        assertNotNull(mQTTSessionState);
        assertNotNull(mQTTSessionState.getWillMessage());
        createPahoClient2.disconnectForcibly(0L, 0L, false);
        assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        assertNull(mQTTSessionState.getWillMessage());
    }

    @Test(timeout = 300000)
    public void testWillMessageRemovedOnDisconnect() throws Exception {
        String randomString = RandomUtil.randomString();
        byte[] randomBytes = RandomUtil.randomBytes();
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.connect(new MqttConnectionOptionsBuilder().will("/topic/foo", new MqttMessage(randomBytes)).build());
        assertNotNull(getSessionStates().get(randomString).getWillMessage());
        createPahoClient.disconnect();
        assertNull(getSessionStates().get(randomString));
    }

    @Test(timeout = 300000)
    public void testWillFlagWithRetain() throws Exception {
        String randomString = RandomUtil.randomString();
        byte[] randomBytes = RandomUtil.randomBytes();
        MqttClient createPahoClient = createPahoClient(randomString);
        MqttConnectionOptions mqttConnectionOptions = new MqttConnectionOptions();
        mqttConnectionOptions.setWill("/topic/foo", new MqttMessage(randomBytes));
        mqttConnectionOptions.setWillMessageProperties(new MqttProperties());
        createPahoClient.connect(mqttConnectionOptions);
        assertEquals(1L, getSessionStates().size());
        assertNotNull(getSessionStates().get(randomString));
        assertTrue(getSessionStates().get(randomString).isWill());
        assertFalse(getSessionStates().get(randomString).isWillRetain());
        createPahoClient.disconnect();
    }

    @Test(timeout = 300000)
    public void testKeepAlive() throws Exception {
        this.server.getRemotingService().addIncomingInterceptor((mqttMessage, remotingConnection) -> {
            if (mqttMessage.fixedHeader().messageType() != MqttMessageType.PINGREQ) {
                return true;
            }
            try {
                logger.info("Caught PING so sleeping...");
                Thread.sleep(3000L);
                return true;
            } catch (InterruptedException e) {
                return true;
            }
        });
        MqttClient createPahoClient = createPahoClient(RandomUtil.randomString());
        MqttConnectionOptions mqttConnectionOptions = new MqttConnectionOptions();
        mqttConnectionOptions.setKeepAliveInterval(1);
        createPahoClient.connect(mqttConnectionOptions);
        Wait.assertEquals(0, () -> {
            return this.server.getConnectionCount();
        }, 2000L, 100L);
    }

    @Test(timeout = 300000)
    public void testMaxPacketSize() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        byte[] bArr = new byte[1500];
        for (int i = 0; i < 3000; i++) {
            bArr[0] = 0;
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient(randomString);
        MqttConnectionOptions build = new MqttConnectionOptionsBuilder().maximumPacketSize(1500L).build();
        createPahoClient.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch));
        createPahoClient.connect(build);
        createPahoClient.subscribe(topicName, 2);
        MqttClient createPahoClient2 = createPahoClient(RandomUtil.randomString());
        createPahoClient2.connect();
        createPahoClient2.publish(topicName, bArr, 2, false);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        Wait.assertEquals(1L, () -> {
            return getSubscriptionQueue(topicName).getMessagesAdded();
        }, 2000L, 100L);
        assertFalse(countDownLatch.await(2L, TimeUnit.SECONDS));
        Wait.assertEquals(1L, () -> {
            return getSubscriptionQueue(topicName).getMessagesAcknowledged();
        }, 2000L, 100L);
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Test(timeout = 300000)
    public void testTopicAliasDisabledOnClient() throws Exception {
        testTopicAliasOnClient(true);
    }

    @Test(timeout = 300000)
    public void testTopicAliasAbsentOnClient() throws Exception {
        testTopicAliasOnClient(true);
    }

    private void testTopicAliasOnClient(boolean z) throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.connect(z ? new MqttConnectionOptionsBuilder().topicAliasMaximum(0).build() : null);
        final CountDownLatch countDownLatch = new CountDownLatch(25);
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.ConnectTests.1
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                Assert.assertNull(mqttMessage.getProperties().getTopicAlias());
                countDownLatch.countDown();
            }
        });
        createPahoClient.subscribe(topicName + "/#", 2);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        for (int i = 0; i < 25; i++) {
            createPahoClient2.publish(topicName + "/" + i, ("foo" + i).getBytes(StandardCharsets.UTF_8), 2, false);
        }
        createPahoClient2.disconnect();
        createPahoClient2.close();
        assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
    }

    @Test(timeout = 300000)
    public void testUserPropertiesOrder() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        final ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(new UserProperty(RandomUtil.randomString(), RandomUtil.randomString()));
        }
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.connect();
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.ConnectTests.2
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                List userProperties = mqttMessage.getProperties().getUserProperties();
                for (int i2 = 0; i2 < 10; i2++) {
                    Assert.assertEquals(((UserProperty) arrayList.get(i2)).getKey(), ((UserProperty) userProperties.get(i2)).getKey());
                    Assert.assertEquals(((UserProperty) arrayList.get(i2)).getValue(), ((UserProperty) userProperties.get(i2)).getValue());
                }
                countDownLatch.countDown();
            }
        });
        createPahoClient.subscribe(topicName, 2);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(2);
        MqttProperties mqttProperties = new MqttProperties();
        mqttProperties.setUserProperties(arrayList);
        mqttMessage.setProperties(mqttProperties);
        createPahoClient2.publish(topicName, mqttMessage);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Test(timeout = 300000)
    public void testClientID() throws Exception {
        String randomString = RandomUtil.randomString();
        assertEquals(0L, getSessionStates().size());
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.connect();
        assertEquals(1L, getSessionStates().size());
        assertNotNull(getSessionStates().get(randomString));
        createPahoClient.disconnect();
    }

    @Test(timeout = 300000)
    public void testEmptyClientID() throws Exception {
        Assume.assumeTrue(this.protocol.equals("tcp"));
        assertEquals(0L, getSessionStates().size());
        MqttClient createPahoClient = createPahoClient("");
        IMqttToken connectWithResult = createPahoClient.connectWithResult((MqttConnectionOptions) null);
        assertFalse(connectWithResult.getSessionPresent());
        String assignedClientIdentifier = connectWithResult.getResponseProperties().getAssignedClientIdentifier();
        assertNotNull(assignedClientIdentifier);
        assertEquals(1L, getSessionStates().size());
        assertNotNull(getSessionStates().get(assignedClientIdentifier));
        createPahoClient.disconnect();
    }

    @Test(timeout = 300000)
    public void testEmptyClientIDWithoutCleanStart() throws Exception {
        Assume.assumeTrue(this.protocol.equals("tcp"));
        assertEquals(0L, getSessionStates().size());
        MqttClient createPahoClient = createPahoClient("");
        try {
            createPahoClient.connect(new MqttConnectionOptionsBuilder().cleanStart(false).build());
            fail("Should throw exception about invalid client identifier");
        } catch (MqttException e) {
            assertEquals(-123L, (byte) e.getReasonCode());
        }
        assertFalse(createPahoClient.isConnected());
        assertEquals(0L, getSessionStates().size());
    }

    @Test(timeout = 300000)
    public void testAuthenticationFailure() throws Exception {
        MqttClient createPahoClient = createPahoClient(RandomUtil.randomString());
        createPahoClient.connect(new MqttConnectionOptionsBuilder().username("bad").password("bad".getBytes(StandardCharsets.UTF_8)).build());
        createPahoClient.disconnect();
    }

    @Test(timeout = 300000)
    public void testConnectionStealing() throws Exception {
        String randomString = RandomUtil.randomString();
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.connect();
        final int[] iArr = new int[1];
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        createPahoClient.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch) { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.ConnectTests.3
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
                iArr[0] = mqttDisconnectResponse.getReturnCode();
                countDownLatch.countDown();
            }

            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void mqttErrorOccurred(MqttException mqttException) {
                mqttException.printStackTrace();
            }
        });
        MqttClient createPahoClient2 = createPahoClient(randomString);
        createPahoClient2.connect();
        assertTrue(countDownLatch.await(500L, TimeUnit.MILLISECONDS));
        assertEquals(-114L, (byte) iArr[0]);
        assertEquals(1L, getSessionStates().size());
        assertNotNull(getSessionStates().get(randomString));
        assertFalse(createPahoClient.isConnected());
        createPahoClient2.disconnect();
    }

    @Test(timeout = 300000)
    public void testConnectionStealingBy3_1_1() throws Exception {
        String randomString = RandomUtil.randomString();
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.connect();
        final int[] iArr = new int[1];
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        createPahoClient.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch) { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets.ConnectTests.4
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
                iArr[0] = mqttDisconnectResponse.getReturnCode();
                countDownLatch.countDown();
            }

            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void mqttErrorOccurred(MqttException mqttException) {
                mqttException.printStackTrace();
            }
        });
        org.eclipse.paho.client.mqttv3.MqttClient createPaho3_1_1Client = createPaho3_1_1Client(randomString);
        createPaho3_1_1Client.connect();
        assertTrue(countDownLatch.await(500L, TimeUnit.MILLISECONDS));
        assertEquals(-114L, (byte) iArr[0]);
        assertEquals(1L, getSessionStates().size());
        assertNotNull(getSessionStates().get(randomString));
        assertFalse(createPahoClient.isConnected());
        createPaho3_1_1Client.disconnect();
    }

    @Test(timeout = 300000)
    public void testConnackWhenCleanStartFalse() throws Exception {
        String randomString = RandomUtil.randomString();
        assertEquals(0L, getSessionStates().size());
        MqttClient createPahoClient = createPahoClient(randomString);
        MqttConnectionOptions build = new MqttConnectionOptionsBuilder().cleanStart(false).sessionExpiryInterval(Long.valueOf(MQTTUtil.FOUR_BYTE_INT_MAX)).build();
        IMqttToken connectWithResult = createPahoClient.connectWithResult(build);
        assertFalse(connectWithResult.getSessionPresent());
        assertTrue(getListOfCodes(connectWithResult.getResponse().getReasonCodes()).contains(0));
        createPahoClient.disconnect();
        assertEquals(1L, getSessionStates().size());
        assertNotNull(getSessionStates().get(randomString));
        IMqttToken connectWithResult2 = createPahoClient.connectWithResult(build);
        assertTrue(connectWithResult2.getSessionPresent());
        assertTrue(getListOfCodes(connectWithResult2.getResponse().getReasonCodes()).contains(0));
    }

    @Test(timeout = 300000)
    public void testConnackWhenCleanStartTrue() throws Exception {
        String randomString = RandomUtil.randomString();
        assertEquals(0L, getSessionStates().size());
        MqttClient createPahoClient = createPahoClient(randomString);
        IMqttToken connectWithResult = createPahoClient.connectWithResult(new MqttConnectionOptionsBuilder().cleanStart(true).build());
        assertFalse(connectWithResult.getSessionPresent());
        assertTrue(getListOfCodes(connectWithResult.getResponse().getReasonCodes()).contains(0));
        createPahoClient.disconnect();
    }

    private List<Integer> getListOfCodes(int[] iArr) {
        return (List) IntStream.of(iArr).boxed().collect(Collectors.toList());
    }

    @Test(timeout = 300000)
    public void testCleanStartFalse() throws Exception {
        String randomString = RandomUtil.randomString();
        assertEquals(0L, getSessionStates().size());
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.connect(new MqttConnectionOptionsBuilder().cleanStart(false).build());
        assertEquals(1L, getSessionStates().size());
        assertNotNull(getSessionStates().get(randomString));
        createPahoClient.disconnect();
    }

    @Test(timeout = 300000)
    public void testCleanStartFalseWithReconnect() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient(RandomUtil.randomString());
        MqttClient createPahoClient2 = createPahoClient(randomString);
        MqttConnectionOptions build = new MqttConnectionOptionsBuilder().cleanStart(false).sessionExpiryInterval(300L).build();
        createPahoClient2.connect(build);
        createPahoClient2.subscribe(topicName, 2);
        createPahoClient2.disconnect();
        assertEquals(1L, getSessionStates().size());
        assertNotNull(getSessionStates().get(randomString));
        createPahoClient.connect();
        createPahoClient.publish(topicName, "hello".getBytes(), 2, false);
        createPahoClient.disconnect();
        createPahoClient.close();
        assertEquals(1L, getSessionStates().size());
        assertNotNull(getSessionStates().get(randomString));
        createPahoClient2.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch));
        createPahoClient2.connect(build);
        waitForLatch(countDownLatch);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        assertEquals(1L, getSessionStates().size());
        assertNotNull(getSessionStates().get(randomString));
    }

    @Test(timeout = 300000)
    public void testCleanStartFalseWith0SessionExpiryInterval() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.connect(new MqttConnectionOptionsBuilder().cleanStart(false).sessionExpiryInterval(0L).build());
        createPahoClient.subscribe(topicName, 2);
        createPahoClient.disconnect();
        assertEquals(0L, getSessionStates().size());
        assertNull(getSessionStates().get(randomString));
    }

    @Test(timeout = 300000)
    public void testCleanStartFalseWithNon0SessionExpiryInterval() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.connect(new MqttConnectionOptionsBuilder().cleanStart(false).sessionExpiryInterval(2L).build());
        createPahoClient.subscribe(topicName, 2);
        createPahoClient.disconnect();
        long currentTimeMillis = System.currentTimeMillis();
        Wait.assertEquals(0, () -> {
            return getSessionStates().size();
        }, 4000L, 100L);
        assertTrue(System.currentTimeMillis() - currentTimeMillis > 2000);
        assertNull(getSessionStates().get(randomString));
    }

    @Test(timeout = 300000)
    public void testCleanStartFalseWithAbsentSessionExpiryInterval() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.connect(new MqttConnectionOptionsBuilder().cleanStart(false).build());
        createPahoClient.subscribe(topicName, 2);
        createPahoClient.disconnect();
        assertEquals(0L, getSessionStates().size());
        assertNull(getSessionStates().get(randomString));
    }

    @Test(timeout = 300000)
    public void testCleanStartFalseWithMaxSessionExpiryInterval() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.connect(new MqttConnectionOptionsBuilder().cleanStart(false).sessionExpiryInterval(Long.valueOf(MQTTUtil.FOUR_BYTE_INT_MAX)).build());
        createPahoClient.subscribe(topicName, 2);
        createPahoClient.disconnect();
        System.currentTimeMillis();
        assertFalse(Wait.waitFor(() -> {
            return getSessionStates().size() == 0;
        }, 4000L, 100L));
        assertNotNull(getSessionStates().get(randomString));
    }

    @Test(timeout = 300000)
    public void testCleanStartTrue() throws Exception {
        String randomString = RandomUtil.randomString();
        String topicName = getTopicName();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient(RandomUtil.randomString());
        MqttClient createPahoClient2 = createPahoClient(randomString);
        MqttConnectionOptions build = new MqttConnectionOptionsBuilder().cleanStart(false).sessionExpiryInterval(Long.valueOf(MQTTUtil.FOUR_BYTE_INT_MAX)).build();
        createPahoClient2.connect(build);
        createPahoClient2.subscribe(topicName, 2);
        createPahoClient2.disconnect();
        assertEquals(1L, getSessionStates().size());
        assertNotNull(getSessionStates().get(randomString));
        createPahoClient.connect();
        createPahoClient.publish(topicName, "hello".getBytes(), 2, false);
        createPahoClient.disconnect();
        createPahoClient.close();
        build.setCleanStart(true);
        createPahoClient2.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch));
        createPahoClient2.connect(build);
        assertFalse(countDownLatch.await(3L, TimeUnit.SECONDS));
        createPahoClient2.disconnect();
        createPahoClient2.close();
    }
}
