package org.apache.activemq.artemis.tests.integration.mqtt5;

import jakarta.jms.JMSConsumer;
import jakarta.jms.JMSContext;
import jakarta.jms.Message;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImplAccessor;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.Wait;
import org.eclipse.paho.mqttv5.client.MqttActionListener;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.class */
public class MQTT5Test extends MQTT5TestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Timeout(60)
    @Test
    public void testSimpleSendReceive() throws Exception {
        String randomString = RandomUtil.randomString();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient("subscriber");
        createPahoClient.connect();
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5Test.1
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) {
                MQTT5Test.logger.info("Message received from topic {}, message={}", str, mqttMessage);
                countDownLatch.countDown();
            }
        });
        createPahoClient.subscribe(randomString, 1);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        createPahoClient2.publish(randomString, "myMessage".getBytes(StandardCharsets.UTF_8), 1, false);
        Assertions.assertTrue(countDownLatch.await(500L, TimeUnit.MILLISECONDS));
    }

    @Timeout(60)
    @Test
    public void testTopicNameEscape() throws Exception {
        final AtomicReference atomicReference = new AtomicReference();
        MqttClient createPahoClient = createPahoClient("subscriber");
        createPahoClient.connect();
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5Test.2
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) {
                atomicReference.set(str);
            }
        });
        createPahoClient.subscribe("foo1.0/bar/baz", 1);
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        createPahoClient2.publish("foo1.0/bar/baz", "myMessage".getBytes(StandardCharsets.UTF_8), 1, false);
        Objects.requireNonNull(atomicReference);
        Wait.assertEquals("foo1.0/bar/baz", atomicReference::get, 500L, 50L);
    }

    @Timeout(60)
    @Test
    public void testTimestamp() throws Exception {
        String randomString = RandomUtil.randomString();
        createJMSConnection();
        JMSContext createContext = this.cf.createContext();
        JMSConsumer createConsumer = createContext.createConsumer(createContext.createQueue(randomString));
        long currentTimeMillis = System.currentTimeMillis();
        MqttClient createPahoClient = createPahoClient(RandomUtil.randomString());
        createPahoClient.connect();
        createPahoClient.publish(randomString, new byte[0], 1, false);
        createPahoClient.disconnect();
        createPahoClient.close();
        Message receive = createConsumer.receive(200L);
        Assertions.assertNotNull(receive);
        Assertions.assertTrue(receive.getJMSTimestamp() > currentTimeMillis);
        createContext.close();
    }

    @Timeout(60)
    @Test
    public void testResumeSubscriptionsAfterRestart() throws Exception {
        final ArrayList arrayList = new ArrayList(100);
        for (int i = 0; i < 100; i++) {
            arrayList.add(getName() + i);
        }
        final CountDownLatch countDownLatch = new CountDownLatch(100);
        MqttClient createPahoClient = createPahoClient("myConsumerID");
        createPahoClient.connect(new MqttConnectionOptionsBuilder().cleanStart(false).sessionExpiryInterval(999L).build());
        ArrayList arrayList2 = new ArrayList(100);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            arrayList2.add(new MqttSubscription((String) it.next(), 1));
        }
        createPahoClient.subscribe((MqttSubscription[]) arrayList2.toArray(new MqttSubscription[0]));
        createPahoClient.disconnect();
        MqttClient createPahoClient2 = createPahoClient("myProducerID");
        createPahoClient2.connect(new MqttConnectionOptionsBuilder().sessionExpiryInterval(0L).build());
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            createPahoClient2.publish((String) it2.next(), new byte[0], 1, false);
        }
        createPahoClient2.disconnect();
        createPahoClient2.close();
        Wait.assertEquals(1L, () -> {
            return this.server.locateQueue("$sys.mqtt.sessions").getMessageCount();
        }, 2000L, 100L);
        this.server.stop();
        this.server.start();
        Wait.assertEquals(1L, () -> {
            return this.server.locateQueue("$sys.mqtt.sessions").getMessageCount();
        }, 2000L, 100L);
        Wait.assertTrue(() -> {
            return getSessionStates().get("myConsumerID") != null;
        }, 2000L, 100L);
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5Test.3
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) {
                if (arrayList.remove(str)) {
                    countDownLatch.countDown();
                }
            }
        });
        createPahoClient.connect(new MqttConnectionOptionsBuilder().cleanStart(false).sessionExpiryInterval(0L).build());
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient.unsubscribe((String[]) arrayList.toArray(new String[0]));
        createPahoClient.disconnect();
        createPahoClient.close();
        Wait.assertEquals(0L, () -> {
            return this.server.locateQueue("$sys.mqtt.sessions").getMessageCount();
        }, 5000L, 100L);
    }

    @Timeout(60)
    @Test
    public void testAddressAutoCreation() throws Exception {
        String randomString = RandomUtil.randomString();
        this.server.getAddressSettingsRepository().addMatch(randomString, new AddressSettings().setAutoCreateAddresses(true));
        MqttClient createPahoClient = createPahoClient(RandomUtil.randomString());
        createPahoClient.connect();
        createPahoClient.publish(randomString, new byte[0], 0, false);
        createPahoClient.disconnect();
        createPahoClient.close();
        Wait.assertTrue(() -> {
            return this.server.getAddressInfo(SimpleString.of(randomString)) != null;
        }, 2000L, 100L);
    }

    @Timeout(60)
    @Test
    public void testAddressAutoCreationNegative() throws Exception {
        String randomString = RandomUtil.randomString();
        this.server.getAddressSettingsRepository().addMatch(randomString, new AddressSettings().setAutoCreateAddresses(false));
        MqttClient createPahoClient = createPahoClient(RandomUtil.randomString());
        createPahoClient.connect();
        createPahoClient.publish(randomString, new byte[0], 0, false);
        createPahoClient.disconnect();
        createPahoClient.close();
        Assertions.assertTrue(this.server.getAddressInfo(SimpleString.of(randomString)) == null);
    }

    @Timeout(60)
    @Test
    public void testWillMessageProperties() throws Exception {
        byte[] randomBytes = RandomUtil.randomBytes();
        final String[][] strArr = new String[10][2];
        for (String[] strArr2 : strArr) {
            strArr2[0] = RandomUtil.randomString();
            strArr2[1] = RandomUtil.randomString();
        }
        MqttClient createPahoClient = createPahoClient("willConsumer");
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        createPahoClient.setCallback(new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5Test.4
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) {
                int i = 0;
                for (UserProperty userProperty : mqttMessage.getProperties().getUserProperties()) {
                    Assertions.assertEquals(strArr[i][0], userProperty.getKey());
                    Assertions.assertEquals(strArr[i][1], userProperty.getValue());
                    i++;
                }
                countDownLatch.countDown();
            }
        });
        createPahoClient.connect();
        createPahoClient.subscribe("/topic/foo", 1);
        MqttClient createPahoClient2 = createPahoClient("willGenerator");
        MqttProperties mqttProperties = new MqttProperties();
        ArrayList arrayList = new ArrayList();
        for (String[] strArr3 : strArr) {
            arrayList.add(new UserProperty(strArr3[0], strArr3[1]));
        }
        mqttProperties.setUserProperties(arrayList);
        MqttConnectionOptions build = new MqttConnectionOptionsBuilder().will("/topic/foo", new MqttMessage(randomBytes)).build();
        build.setWillMessageProperties(mqttProperties);
        createPahoClient2.connect(build);
        createPahoClient2.disconnectForcibly(0L, 0L, false);
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
    }

    @Timeout(60)
    @Test
    public void testExpiryDelayOnDisconnect() throws Exception {
        MqttAsyncClient createAsyncPahoClient = createAsyncPahoClient(RandomUtil.randomString());
        createAsyncPahoClient.connect(new MqttConnectionOptionsBuilder().sessionExpiryInterval(300L).build()).waitForCompletion();
        MqttProperties mqttProperties = new MqttProperties();
        mqttProperties.setSessionExpiryInterval(0L);
        createAsyncPahoClient.disconnect(0L, (Object) null, (MqttActionListener) null, 0, mqttProperties).waitForCompletion();
        Wait.assertEquals(0, () -> {
            return getSessionStates().size();
        }, 5000L, 10L);
    }

    @Timeout(60)
    @Test
    public void testWillFlagFalseWithSessionExpiryDelay() throws Exception {
        this.server.createQueue(QueueConfiguration.of("activemq.notifications"));
        this.server.createQueue(QueueConfiguration.of("DLA"));
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setSendToDLAOnNoRoute(true).setDeadLetterAddress(SimpleString.of("DLA")));
        MqttClient createPahoClient = createPahoClient("willGenerator");
        createPahoClient.connect(new MqttConnectionOptionsBuilder().sessionExpiryInterval(1L).build());
        createPahoClient.disconnectForcibly(0L, 0L, false);
        scanSessions();
        Assertions.assertEquals(0L, this.server.locateQueue("DLA").getMessageCount());
    }

    @Timeout(60)
    @Test
    public void testQueueCleanOnRestart() throws Exception {
        String randomString = RandomUtil.randomString();
        String randomString2 = RandomUtil.randomString();
        MqttClient createPahoClient = createPahoClient(randomString2);
        createPahoClient.connect(new MqttConnectionOptionsBuilder().sessionExpiryInterval(999L).cleanStart(true).build());
        createPahoClient.subscribe(randomString, 1);
        this.server.stop();
        this.server.start();
        org.apache.activemq.artemis.tests.util.Wait.assertTrue(() -> {
            return getSubscriptionQueue(randomString, randomString2) != null;
        }, 3000L, 10L);
    }

    @Timeout(60)
    @Test
    public void testRecursiveWill() throws Exception {
        AssertionLoggerHandler assertionLoggerHandler = new AssertionLoggerHandler();
        try {
            this.server.createQueue(QueueConfiguration.of("will").setRoutingType(RoutingType.ANYCAST));
            PagingManagerImplAccessor.setDiskFull(this.server.getPagingManager(), true);
            MqttClient createPahoClient = createPahoClient("willGenerator");
            createPahoClient.connect(new MqttConnectionOptionsBuilder().will("will", new MqttMessage(RandomUtil.randomBytes())).build());
            createPahoClient.disconnectForcibly(0L, 0L, false);
            Wait.assertTrue(() -> {
                return assertionLoggerHandler.findText(new String[]{"AMQ229119"});
            }, 2000L, 100L);
            assertionLoggerHandler.close();
        } catch (Throwable th) {
            try {
                assertionLoggerHandler.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(60)
    @Test
    public void testSharedSubscriptionsWithSameName() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        MqttClient createPahoClient = createPahoClient("consumer1");
        createPahoClient.connect();
        createPahoClient.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch));
        createPahoClient.subscribe("$share/mySub/myTopic1", 1);
        Assertions.assertNotNull(this.server.getAddressInfo(SimpleString.of("myTopic1")));
        Queue sharedSubscriptionQueue = getSharedSubscriptionQueue("$share/mySub/myTopic1");
        Assertions.assertNotNull(sharedSubscriptionQueue);
        Assertions.assertEquals("myTopic1", sharedSubscriptionQueue.getAddress().toString());
        Assertions.assertEquals(1, sharedSubscriptionQueue.getConsumerCount());
        MqttClient createPahoClient2 = createPahoClient("consumer2");
        createPahoClient2.connect();
        createPahoClient2.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch2));
        createPahoClient2.subscribe("$share/mySub/myTopic2", 1);
        Assertions.assertNotNull(this.server.getAddressInfo(SimpleString.of("myTopic2")));
        Queue sharedSubscriptionQueue2 = getSharedSubscriptionQueue("$share/mySub/myTopic2");
        Assertions.assertNotNull(sharedSubscriptionQueue2);
        Assertions.assertEquals("myTopic2", sharedSubscriptionQueue2.getAddress().toString());
        Assertions.assertEquals(1, sharedSubscriptionQueue2.getConsumerCount());
        MqttClient createPahoClient3 = createPahoClient("producer");
        createPahoClient3.connect();
        createPahoClient3.publish("myTopic1", new byte[0], 1, false);
        createPahoClient3.publish("myTopic2", new byte[0], 1, false);
        createPahoClient3.disconnect();
        createPahoClient3.close();
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        Assertions.assertTrue(countDownLatch2.await(2L, TimeUnit.SECONDS));
        createPahoClient.unsubscribe("$share/mySub/myTopic1");
        Assertions.assertNull(getSharedSubscriptionQueue("$share/mySub/myTopic1"));
        createPahoClient2.unsubscribe("$share/mySub/myTopic2");
        Assertions.assertNull(getSharedSubscriptionQueue("$share/mySub/myTopic2"));
        createPahoClient.disconnect();
        createPahoClient.close();
        createPahoClient2.disconnect();
        createPahoClient2.close();
    }

    @Timeout(60)
    @Test
    public void testSharedSubscriptionsWithSameName2() throws Exception {
        String[] strArr = {"$share/mySub/myTopic1", "$share/mySub/myTopic2"};
        CountDownLatch countDownLatch = new CountDownLatch(2);
        MqttClient createPahoClient = createPahoClient("consumer1");
        createPahoClient.connect();
        createPahoClient.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch));
        createPahoClient.subscribe(strArr, new int[]{1, 1});
        Assertions.assertNotNull(this.server.getAddressInfo(SimpleString.of("myTopic1")));
        Queue sharedSubscriptionQueue = getSharedSubscriptionQueue(strArr[0]);
        Assertions.assertNotNull(sharedSubscriptionQueue);
        Assertions.assertEquals("myTopic1", sharedSubscriptionQueue.getAddress().toString());
        Assertions.assertEquals(1, sharedSubscriptionQueue.getConsumerCount());
        Assertions.assertNotNull(this.server.getAddressInfo(SimpleString.of("myTopic2")));
        Queue sharedSubscriptionQueue2 = getSharedSubscriptionQueue(strArr[1]);
        Assertions.assertNotNull(sharedSubscriptionQueue2);
        Assertions.assertEquals("myTopic2", sharedSubscriptionQueue2.getAddress().toString());
        Assertions.assertEquals(1, sharedSubscriptionQueue2.getConsumerCount());
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        createPahoClient2.publish("myTopic1", new byte[0], 1, false);
        createPahoClient2.publish("myTopic2", new byte[0], 1, false);
        createPahoClient2.disconnect();
        createPahoClient2.close();
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient.unsubscribe(strArr);
        Assertions.assertNull(getSharedSubscriptionQueue(strArr[0]));
        Assertions.assertNull(getSharedSubscriptionQueue(strArr[1]));
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Timeout(60)
    @Test
    public void testSharedSubscriptionQueueRemoval() throws Exception {
        final ReusableLatch reusableLatch = new ReusableLatch(1);
        MQTT5TestSupport.DefaultMqttCallback defaultMqttCallback = new MQTT5TestSupport.DefaultMqttCallback() { // from class: org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5Test.5
            @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
            public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
                reusableLatch.countDown();
            }
        };
        MqttClient createPahoClient = createPahoClient("consumer1");
        createPahoClient.connect();
        createPahoClient.setCallback(defaultMqttCallback);
        createPahoClient.subscribe("$share/myShare/myTopic", 1);
        MqttClient createPahoClient2 = createPahoClient("consumer2");
        createPahoClient2.connect();
        createPahoClient2.setCallback(defaultMqttCallback);
        createPahoClient2.subscribe("$share/myShare/myTopic", 1);
        Queue locateQueue = this.server.locateQueue("myShare".concat(".").concat("myTopic"));
        Assertions.assertNotNull(locateQueue);
        Assertions.assertEquals("myTopic", locateQueue.getAddress().toString());
        Assertions.assertEquals(2, locateQueue.getConsumerCount());
        MqttClient createPahoClient3 = createPahoClient("producer");
        createPahoClient3.connect();
        createPahoClient3.publish("myTopic", new byte[0], 1, false);
        Assertions.assertTrue(reusableLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient.disconnect();
        Queue locateQueue2 = this.server.locateQueue("myShare".concat(".").concat("myTopic"));
        Assertions.assertNotNull(locateQueue2);
        Assertions.assertEquals("myTopic", locateQueue2.getAddress().toString());
        Assertions.assertEquals(1, locateQueue2.getConsumerCount());
        reusableLatch.countUp();
        createPahoClient3.publish("myTopic", new byte[0], 1, false);
        Assertions.assertTrue(reusableLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient.connect();
        createPahoClient.setCallback(defaultMqttCallback);
        createPahoClient.subscribe("$share/myShare/myTopic", 1);
        reusableLatch.countUp();
        createPahoClient3.publish("myTopic", new byte[0], 1, false);
        Assertions.assertTrue(reusableLatch.await(2L, TimeUnit.SECONDS));
        createPahoClient3.disconnect();
        createPahoClient3.close();
        createPahoClient.disconnect();
        createPahoClient.close();
        createPahoClient2.disconnect();
        createPahoClient2.close();
        Wait.assertTrue(() -> {
            return this.server.locateQueue("myShare".concat(".").concat("myTopic")) == null;
        }, 2000L, 100L);
    }

    @Timeout(60)
    @Test
    public void testAutoDeleteAddressWithWildcardSubscription() throws Exception {
        this.server.getAddressSettingsRepository().addMatch("topic" + ".#", new AddressSettings().setAutoDeleteAddresses(true).setAutoDeleteAddressesSkipUsageCheck(true));
        CountDownLatch countDownLatch = new CountDownLatch(100);
        MqttClient createPahoClient = createPahoClient("consumer");
        createPahoClient.connect();
        createPahoClient.subscribe("topic" + "/#", 1);
        createPahoClient.setCallback(new MQTT5TestSupport.LatchedMqttCallback(countDownLatch));
        MqttClient createPahoClient2 = createPahoClient("producer");
        createPahoClient2.connect();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            String str = "topic" + "/" + RandomUtil.randomString();
            arrayList.add(str.replace('/', '.'));
            createPahoClient2.publish(str, new MqttMessage());
        }
        createPahoClient2.disconnect();
        createPahoClient2.close();
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assertions.assertNotNull(this.server.getAddressInfo(SimpleString.of((String) it.next())));
        }
        PostOfficeTestAccessor.sweepAndReapAddresses(this.server.getPostOffice());
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            Assertions.assertNull(this.server.getAddressInfo(SimpleString.of((String) it2.next())));
        }
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Timeout(60)
    @Test
    public void testConnectionStealingDisabled() throws Exception {
        setAcceptorProperty("allowLinkStealing=false");
        String randomString = RandomUtil.randomString();
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.connect();
        try {
            createPahoClient(randomString).connect();
            Assertions.fail("Should have thrown an exception on connect due to disabled link stealing");
        } catch (Exception e) {
        }
        Wait.assertEquals(1, () -> {
            return getSessionStates().size();
        }, 2000L, 100L);
        Assertions.assertNotNull(getSessionStates().get(randomString));
        Assertions.assertTrue(createPahoClient.isConnected());
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Timeout(60)
    @Test
    public void testConnectionStealingOnMultipleAcceptors() throws Exception {
        String randomString = RandomUtil.randomString();
        this.server.getRemotingService().createAcceptor(RandomUtil.randomString(), "tcp://localhost:" + 1884);
        this.server.getRemotingService().startAcceptors();
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.connect();
        MqttClient createPahoClient2 = createPahoClient(randomString, 1884);
        createPahoClient2.connect();
        Wait.assertEquals(1, () -> {
            return getSessionStates().size();
        }, 2000L, 100L);
        Assertions.assertNotNull(getSessionStates().get(randomString));
        Assertions.assertFalse(createPahoClient.isConnected());
        createPahoClient.close();
        createPahoClient2.disconnect();
        createPahoClient2.close();
    }

    @Timeout(60)
    @Test
    public void testConnectionStealingDisabledOnMultipleAcceptors() throws Exception {
        String randomString = RandomUtil.randomString();
        this.server.getRemotingService().createAcceptor(RandomUtil.randomString(), "tcp://localhost:" + 1884 + "?allowLinkStealing=false");
        this.server.getRemotingService().startAcceptors();
        MqttClient createPahoClient = createPahoClient(randomString);
        createPahoClient.connect();
        try {
            createPahoClient(randomString, 1884).connect();
            Assertions.fail("Should have thrown an exception on connect due to disabled link stealing");
        } catch (Exception e) {
        }
        Wait.assertEquals(1, () -> {
            return getSessionStates().size();
        }, 2000L, 100L);
        Assertions.assertNotNull(getSessionStates().get(randomString));
        Assertions.assertTrue(createPahoClient.isConnected());
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Timeout(60)
    @Test
    public void testQueueCleanedUpOnConsumerFail() throws Exception {
        String name = getName();
        String name2 = getName();
        this.server.getAddressSettingsRepository().addMatch(name, new AddressSettings().setDefaultMaxConsumers(0));
        MqttClient createPahoClient = createPahoClient(name2);
        createPahoClient.connect();
        try {
            createPahoClient.subscribe(name, 1);
        } catch (Exception e) {
        }
        Wait.assertTrue(() -> {
            return getSubscriptionQueue(name, name2) == null;
        }, 2000L, 100L);
        if (createPahoClient.isConnected()) {
            createPahoClient.disconnect();
        }
        createPahoClient.close();
    }

    @Timeout(60)
    @Test
    public void testSubscriptionQueueName() throws Exception {
        MqttClient createPahoClient = createPahoClient("myClientID");
        createPahoClient.connect();
        createPahoClient.subscribe("a/b", 1);
        Wait.assertTrue(() -> {
            return getSubscriptionQueue("a/b", "myClientID") != null;
        }, 2000L, 100L);
        createPahoClient.disconnect();
        createPahoClient.close();
    }

    @Timeout(60)
    @Test
    public void testSubscriptionQueueCreatedWhenAutoCreateDisabled() throws Exception {
        ((AddressSettings) this.server.getAddressSettingsRepository().getMatch("a/b")).setAutoCreateQueues(false);
        MqttClient createPahoClient = createPahoClient("myClientID");
        createPahoClient.connect();
        createPahoClient.subscribe("a/b", 1);
        Wait.assertTrue(() -> {
            return getSubscriptionQueue("a/b", "myClientID") != null;
        }, 2000L, 100L);
        createPahoClient.disconnect();
        createPahoClient.close();
    }
}
