package org.apache.activemq.artemis.tests.integration.mqtt;

import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.integration.management.SimpleManagementTest;
import org.apache.activemq.artemis.tests.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeTest.class */
public class MqttClusterRemoteSubscribeTest extends ClusterTestBase {
    @Override // org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase
    protected boolean isResolveProtocols() {
        return true;
    }

    public boolean isNetty() {
        return true;
    }

    @Test
    public void useSameClientIdAndAnycastSubscribeRemoteQueue() throws Exception {
        setupServers("anycast/test/1/some/la");
        startServers(0, 1);
        BlockingConnection blockingConnection = null;
        BlockingConnection blockingConnection2 = null;
        BlockingConnection blockingConnection3 = null;
        try {
            Thread.sleep(1000L);
            Topic[] topicArr = {new Topic("anycast/test/1/some/la", QoS.AT_MOST_ONCE)};
            BlockingConnection retrieveMQTTConnection = retrieveMQTTConnection(SimpleManagementTest.LOCALHOST, "subClientId");
            Map connectedClients = locateMQTTPM(this.servers[0]).getStateManager().getConnectedClients();
            Objects.requireNonNull(connectedClients);
            Wait.assertEquals(1, connectedClients::size);
            blockingConnection2 = retrieveMQTTConnection("tcp://localhost:61617", "subClientId");
            blockingConnection3 = retrieveMQTTConnection(SimpleManagementTest.LOCALHOST, "pubClientId");
            Assertions.assertTrue(waitConnectionClosed(retrieveMQTTConnection));
            Map connectedClients2 = locateMQTTPM(this.servers[1]).getStateManager().getConnectedClients();
            Objects.requireNonNull(connectedClients2);
            Wait.assertEquals(1, connectedClients2::size);
            blockingConnection = null;
            blockingConnection2.subscribe(topicArr);
            waitForBindings(0, "anycast/test/1/some/la", 1, 0, true);
            waitForBindings(1, "anycast/test/1/some/la", 1, 1, true);
            waitForBindings(0, "anycast/test/1/some/la", 1, 1, false);
            waitForBindings(1, "anycast/test/1/some/la", 1, 0, false);
            blockingConnection3.publish("anycast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection3.publish("anycast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection3.publish("anycast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            Message receive = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive.ack();
            Message receive2 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive2.ack();
            Message receive3 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive3.ack();
            Assertions.assertEquals("This is message 1", new String(receive.getPayload()));
            Assertions.assertEquals("This is message 2", new String(receive2.getPayload()));
            Assertions.assertEquals("This is message 3", new String(receive3.getPayload()));
            blockingConnection2.unsubscribe(new String[]{"anycast/test/1/some/la"});
            waitForBindings(0, "anycast/test/1/some/la", 1, 0, true);
            waitForBindings(1, "anycast/test/1/some/la", 1, 0, true);
            waitForBindings(0, "anycast/test/1/some/la", 1, 0, false);
            waitForBindings(1, "anycast/test/1/some/la", 1, 0, false);
            blockingConnection3.publish("anycast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection3.publish("anycast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection3.publish("anycast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            Assertions.assertNull(blockingConnection2.receive(100L, TimeUnit.MILLISECONDS));
            Assertions.assertNull(blockingConnection2.receive(100L, TimeUnit.MILLISECONDS));
            Assertions.assertNull(blockingConnection2.receive(100L, TimeUnit.MILLISECONDS));
            String[] strArr = {"anycast/test/1/some/la"};
            if (0 != 0 && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(strArr);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null && blockingConnection2.isConnected()) {
                blockingConnection2.unsubscribe(strArr);
                blockingConnection2.disconnect();
            }
            if (blockingConnection3 == null || !blockingConnection3.isConnected()) {
                return;
            }
            blockingConnection3.disconnect();
        } catch (Throwable th) {
            String[] strArr2 = {"anycast/test/1/some/la"};
            if (blockingConnection != null && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(strArr2);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null && blockingConnection2.isConnected()) {
                blockingConnection2.unsubscribe(strArr2);
                blockingConnection2.disconnect();
            }
            if (blockingConnection3 != null && blockingConnection3.isConnected()) {
                blockingConnection3.disconnect();
            }
            throw th;
        }
    }

    @Test
    public void useDiffClientIdAndAnycastSubscribeRemoteQueue() throws Exception {
        setupServers("anycast/test/1/some/la");
        startServers(0, 1);
        BlockingConnection blockingConnection = null;
        BlockingConnection blockingConnection2 = null;
        try {
            Thread.sleep(1000L);
            Topic[] topicArr = {new Topic("anycast/test/1/some/la", QoS.AT_MOST_ONCE)};
            blockingConnection = retrieveMQTTConnection(SimpleManagementTest.LOCALHOST, "clientId1");
            blockingConnection2 = retrieveMQTTConnection("tcp://localhost:61617", "clientId2");
            blockingConnection.subscribe(topicArr);
            waitForBindings(0, "anycast/test/1/some/la", 1, 1, true);
            waitForBindings(1, "anycast/test/1/some/la", 1, 0, true);
            waitForBindings(0, "anycast/test/1/some/la", 1, 0, false);
            waitForBindings(1, "anycast/test/1/some/la", 1, 1, false);
            blockingConnection2.subscribe(topicArr);
            waitForBindings(0, "anycast/test/1/some/la", 1, 1, true);
            waitForBindings(1, "anycast/test/1/some/la", 1, 1, true);
            waitForBindings(0, "anycast/test/1/some/la", 1, 1, false);
            waitForBindings(1, "anycast/test/1/some/la", 1, 1, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            Message receive = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive.ack();
            Message receive2 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive2.ack();
            Message receive3 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive3.ack();
            Assertions.assertEquals("This is message 1", new String(receive.getPayload()));
            Assertions.assertEquals("This is message 2", new String(receive2.getPayload()));
            Assertions.assertEquals("This is message 3", new String(receive3.getPayload()));
            blockingConnection2.unsubscribe(new String[]{"anycast/test/1/some/la"});
            waitForBindings(0, "anycast/test/1/some/la", 1, 1, true);
            waitForBindings(1, "anycast/test/1/some/la", 1, 0, true);
            waitForBindings(0, "anycast/test/1/some/la", 1, 0, false);
            waitForBindings(1, "anycast/test/1/some/la", 1, 1, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            Message receive4 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive4.ack();
            Message receive5 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive5.ack();
            Message receive6 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive6.ack();
            String str = new String(receive4.getPayload());
            String str2 = new String(receive5.getPayload());
            String str3 = new String(receive6.getPayload());
            Assertions.assertTrue("This is message 1".equals(str) || "This is message 1".equals(str2) || "This is message 1".equals(str3));
            Assertions.assertTrue("This is message 2".equals(str) || "This is message 2".equals(str2) || "This is message 2".equals(str3));
            Assertions.assertTrue("This is message 3".equals(str) || "This is message 3".equals(str2) || "This is message 3".equals(str3));
            String[] strArr = {"anycast/test/1/some/la"};
            if (blockingConnection != null && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(strArr);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 == null || !blockingConnection2.isConnected()) {
                return;
            }
            blockingConnection2.unsubscribe(strArr);
            blockingConnection2.disconnect();
        } catch (Throwable th) {
            String[] strArr2 = {"anycast/test/1/some/la"};
            if (blockingConnection != null && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(strArr2);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null && blockingConnection2.isConnected()) {
                blockingConnection2.unsubscribe(strArr2);
                blockingConnection2.disconnect();
            }
            throw th;
        }
    }

    @Test
    public void useSameClientIdAndMulticastSubscribeRemoteQueue() throws Exception {
        setupServers("anycast/test/1/some/la");
        startServers(0, 1);
        BlockingConnection blockingConnection = null;
        BlockingConnection blockingConnection2 = null;
        BlockingConnection blockingConnection3 = null;
        try {
            Thread.sleep(1000L);
            Topic[] topicArr = {new Topic("multicast/test/1/some/la", QoS.AT_MOST_ONCE)};
            BlockingConnection retrieveMQTTConnection = retrieveMQTTConnection(SimpleManagementTest.LOCALHOST, "subClientId");
            Map connectedClients = locateMQTTPM(this.servers[0]).getStateManager().getConnectedClients();
            Objects.requireNonNull(connectedClients);
            Wait.assertEquals(1, connectedClients::size);
            blockingConnection2 = retrieveMQTTConnection("tcp://localhost:61617", "subClientId");
            blockingConnection3 = retrieveMQTTConnection(SimpleManagementTest.LOCALHOST, "pubClientId");
            Assertions.assertTrue(waitConnectionClosed(retrieveMQTTConnection));
            Map connectedClients2 = locateMQTTPM(this.servers[1]).getStateManager().getConnectedClients();
            Objects.requireNonNull(connectedClients2);
            Wait.assertEquals(1, connectedClients2::size);
            blockingConnection = null;
            blockingConnection2.subscribe(topicArr);
            waitForBindings(0, "multicast/test/1/some/la", 0, 0, true);
            waitForBindings(1, "multicast/test/1/some/la", 1, 1, true);
            waitForBindings(0, "multicast/test/1/some/la", 1, 1, false);
            waitForBindings(1, "multicast/test/1/some/la", 0, 0, false);
            blockingConnection3.publish("multicast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection3.publish("multicast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection3.publish("multicast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            Message receive = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive.ack();
            Message receive2 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive2.ack();
            Message receive3 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive3.ack();
            Assertions.assertEquals("This is message 1", new String(receive.getPayload()));
            Assertions.assertEquals("This is message 2", new String(receive2.getPayload()));
            Assertions.assertEquals("This is message 3", new String(receive3.getPayload()));
            blockingConnection2.unsubscribe(new String[]{"multicast/test/1/some/la"});
            waitForBindings(0, "multicast/test/1/some/la", 0, 0, true);
            waitForBindings(1, "multicast/test/1/some/la", 0, 0, true);
            waitForBindings(0, "multicast/test/1/some/la", 0, 0, false);
            waitForBindings(1, "multicast/test/1/some/la", 0, 0, false);
            blockingConnection3.publish("multicast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection3.publish("multicast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection3.publish("multicast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            Assertions.assertNull(blockingConnection2.receive(100L, TimeUnit.MILLISECONDS));
            Assertions.assertNull(blockingConnection2.receive(100L, TimeUnit.MILLISECONDS));
            Assertions.assertNull(blockingConnection2.receive(100L, TimeUnit.MILLISECONDS));
            String[] strArr = {"multicast/test/1/some/la"};
            if (0 != 0 && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(strArr);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null && blockingConnection2.isConnected()) {
                blockingConnection2.unsubscribe(strArr);
                blockingConnection2.disconnect();
            }
            if (blockingConnection3 == null || !blockingConnection3.isConnected()) {
                return;
            }
            blockingConnection3.disconnect();
        } catch (Throwable th) {
            String[] strArr2 = {"multicast/test/1/some/la"};
            if (blockingConnection != null && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(strArr2);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null && blockingConnection2.isConnected()) {
                blockingConnection2.unsubscribe(strArr2);
                blockingConnection2.disconnect();
            }
            if (blockingConnection3 != null && blockingConnection3.isConnected()) {
                blockingConnection3.disconnect();
            }
            throw th;
        }
    }

    @Test
    public void useDiffClientIdAndMulticastSubscribeRemoteQueue() throws Exception {
        setupServers("anycast/test/1/some/la");
        startServers(0, 1);
        BlockingConnection blockingConnection = null;
        BlockingConnection blockingConnection2 = null;
        try {
            Thread.sleep(1000L);
            Topic[] topicArr = {new Topic("multicast/test/1/some/la", QoS.AT_MOST_ONCE)};
            blockingConnection = retrieveMQTTConnection(SimpleManagementTest.LOCALHOST, "clientId1");
            blockingConnection2 = retrieveMQTTConnection("tcp://localhost:61617", "clientId2");
            blockingConnection.subscribe(topicArr);
            waitForBindings(0, "multicast/test/1/some/la", 1, 1, true);
            waitForBindings(1, "multicast/test/1/some/la", 0, 0, true);
            waitForBindings(0, "multicast/test/1/some/la", 0, 0, false);
            waitForBindings(1, "multicast/test/1/some/la", 1, 1, false);
            blockingConnection2.subscribe(topicArr);
            waitForBindings(0, "multicast/test/1/some/la", 1, 1, true);
            waitForBindings(1, "multicast/test/1/some/la", 1, 1, true);
            waitForBindings(0, "multicast/test/1/some/la", 1, 1, false);
            waitForBindings(1, "multicast/test/1/some/la", 1, 1, false);
            blockingConnection.publish("multicast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection.publish("multicast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection.publish("multicast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            Message receive = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive.ack();
            Message receive2 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive2.ack();
            Message receive3 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive3.ack();
            Assertions.assertEquals("This is message 1", new String(receive.getPayload()));
            Assertions.assertEquals("This is message 2", new String(receive2.getPayload()));
            Assertions.assertEquals("This is message 3", new String(receive3.getPayload()));
            Message receive4 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive4.ack();
            Message receive5 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive5.ack();
            Message receive6 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive6.ack();
            Assertions.assertEquals("This is message 1", new String(receive4.getPayload()));
            Assertions.assertEquals("This is message 2", new String(receive5.getPayload()));
            Assertions.assertEquals("This is message 3", new String(receive6.getPayload()));
            blockingConnection2.unsubscribe(new String[]{"multicast/test/1/some/la"});
            waitForBindings(0, "multicast/test/1/some/la", 1, 1, true);
            waitForBindings(1, "multicast/test/1/some/la", 0, 0, true);
            waitForBindings(0, "multicast/test/1/some/la", 0, 0, false);
            waitForBindings(1, "multicast/test/1/some/la", 1, 1, false);
            blockingConnection.publish("multicast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection.publish("multicast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection.publish("multicast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            Message receive7 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive7.ack();
            Message receive8 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive8.ack();
            Message receive9 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive9.ack();
            Assertions.assertEquals("This is message 1", new String(receive7.getPayload()));
            Assertions.assertEquals("This is message 2", new String(receive8.getPayload()));
            Assertions.assertEquals("This is message 3", new String(receive9.getPayload()));
            String[] strArr = {"multicast/test/1/some/la"};
            if (blockingConnection != null && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(strArr);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 == null || !blockingConnection2.isConnected()) {
                return;
            }
            blockingConnection2.unsubscribe(strArr);
            blockingConnection2.disconnect();
        } catch (Throwable th) {
            String[] strArr2 = {"multicast/test/1/some/la"};
            if (blockingConnection != null && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(strArr2);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null && blockingConnection2.isConnected()) {
                blockingConnection2.unsubscribe(strArr2);
                blockingConnection2.disconnect();
            }
            throw th;
        }
    }

    @Test
    public void useSameClientIdAndAnycastSubscribeRemoteQueueWildCard() throws Exception {
        setupServers("anycast/test/+/some/#");
        startServers(0, 1);
        BlockingConnection blockingConnection = null;
        BlockingConnection blockingConnection2 = null;
        BlockingConnection blockingConnection3 = null;
        try {
            Thread.sleep(1000L);
            Topic[] topicArr = {new Topic("anycast/test/+/some/#", QoS.AT_MOST_ONCE)};
            BlockingConnection retrieveMQTTConnection = retrieveMQTTConnection(SimpleManagementTest.LOCALHOST, "subClientId");
            Map connectedClients = locateMQTTPM(this.servers[0]).getStateManager().getConnectedClients();
            Objects.requireNonNull(connectedClients);
            Wait.assertEquals(1, connectedClients::size);
            blockingConnection2 = retrieveMQTTConnection("tcp://localhost:61617", "subClientId");
            blockingConnection3 = retrieveMQTTConnection(SimpleManagementTest.LOCALHOST, "pubClientId");
            Assertions.assertTrue(waitConnectionClosed(retrieveMQTTConnection));
            Map connectedClients2 = locateMQTTPM(this.servers[1]).getStateManager().getConnectedClients();
            Objects.requireNonNull(connectedClients2);
            Wait.assertEquals(1, connectedClients2::size);
            blockingConnection = null;
            blockingConnection2.subscribe(topicArr);
            waitForBindings(0, "anycast/test/+/some/#", 1, 0, true);
            waitForBindings(1, "anycast/test/+/some/#", 1, 1, true);
            waitForBindings(0, "anycast/test/+/some/#", 1, 1, false);
            waitForBindings(1, "anycast/test/+/some/#", 1, 0, false);
            blockingConnection3.publish("anycast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection3.publish("anycast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection3.publish("anycast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection2.unsubscribe(new String[]{"anycast/test/+/some/#"});
            waitForBindings(0, "anycast/test/+/some/#", 1, 0, true);
            waitForBindings(1, "anycast/test/+/some/#", 1, 0, true);
            waitForBindings(0, "anycast/test/+/some/#", 1, 0, false);
            waitForBindings(1, "anycast/test/+/some/#", 1, 0, false);
            blockingConnection3.publish("anycast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection3.publish("anycast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection3.publish("anycast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            String[] strArr = {"anycast/test/+/some/#"};
            if (0 != 0 && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(strArr);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null && blockingConnection2.isConnected()) {
                blockingConnection2.unsubscribe(strArr);
                blockingConnection2.disconnect();
            }
            if (blockingConnection3 == null || !blockingConnection3.isConnected()) {
                return;
            }
            blockingConnection3.disconnect();
        } catch (Throwable th) {
            String[] strArr2 = {"anycast/test/+/some/#"};
            if (blockingConnection != null && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(strArr2);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null && blockingConnection2.isConnected()) {
                blockingConnection2.unsubscribe(strArr2);
                blockingConnection2.disconnect();
            }
            if (blockingConnection3 != null && blockingConnection3.isConnected()) {
                blockingConnection3.disconnect();
            }
            throw th;
        }
    }

    @Test
    public void useDiffClientIdAndAnycastSubscribeRemoteQueueWildCard() throws Exception {
        setupServers("anycast/test/+/some/#");
        startServers(0, 1);
        BlockingConnection blockingConnection = null;
        BlockingConnection blockingConnection2 = null;
        try {
            Thread.sleep(1000L);
            Topic[] topicArr = {new Topic("anycast/test/+/some/#", QoS.AT_MOST_ONCE)};
            blockingConnection = retrieveMQTTConnection(SimpleManagementTest.LOCALHOST, "clientId1");
            blockingConnection2 = retrieveMQTTConnection("tcp://localhost:61617", "clientId2");
            blockingConnection.subscribe(topicArr);
            waitForBindings(0, "anycast/test/+/some/#", 1, 1, true);
            waitForBindings(1, "anycast/test/+/some/#", 1, 0, true);
            waitForBindings(0, "anycast/test/+/some/#", 1, 0, false);
            waitForBindings(1, "anycast/test/+/some/#", 1, 1, false);
            blockingConnection2.subscribe(topicArr);
            waitForBindings(0, "anycast/test/+/some/#", 1, 1, true);
            waitForBindings(1, "anycast/test/+/some/#", 1, 1, true);
            waitForBindings(0, "anycast/test/+/some/#", 1, 1, false);
            waitForBindings(1, "anycast/test/+/some/#", 1, 1, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection2.unsubscribe(new String[]{"anycast/test/+/some/#"});
            waitForBindings(0, "anycast/test/+/some/#", 1, 1, true);
            waitForBindings(1, "anycast/test/+/some/#", 1, 0, true);
            waitForBindings(0, "anycast/test/+/some/#", 1, 0, false);
            waitForBindings(1, "anycast/test/+/some/#", 1, 1, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            String[] strArr = {"anycast/test/+/some/#"};
            if (blockingConnection != null && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(strArr);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 == null || !blockingConnection2.isConnected()) {
                return;
            }
            blockingConnection2.unsubscribe(strArr);
            blockingConnection2.disconnect();
        } catch (Throwable th) {
            String[] strArr2 = {"anycast/test/+/some/#"};
            if (blockingConnection != null && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(strArr2);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null && blockingConnection2.isConnected()) {
                blockingConnection2.unsubscribe(strArr2);
                blockingConnection2.disconnect();
            }
            throw th;
        }
    }

    MQTTProtocolManager locateMQTTPM(ActiveMQServer activeMQServer) {
        Iterator it = activeMQServer.getRemotingService().getAcceptors().values().iterator();
        while (it.hasNext()) {
            for (MQTTProtocolManager mQTTProtocolManager : ((Acceptor) it.next()).getProtocolMap().values()) {
                if (mQTTProtocolManager instanceof MQTTProtocolManager) {
                    return mQTTProtocolManager;
                }
            }
        }
        return null;
    }

    @Test
    public void useSameClientIdAndMulticastSubscribeRemoteQueueWildCard() throws Exception {
        setupServers("anycast/test/+/some/#");
        startServers(0, 1);
        BlockingConnection blockingConnection = null;
        BlockingConnection blockingConnection2 = null;
        BlockingConnection blockingConnection3 = null;
        try {
            Thread.sleep(1000L);
            Topic[] topicArr = {new Topic("multicast/test/+/some/#", QoS.AT_MOST_ONCE)};
            blockingConnection = retrieveMQTTConnection(SimpleManagementTest.LOCALHOST, "subClientId");
            Map connectedClients = locateMQTTPM(this.servers[0]).getStateManager().getConnectedClients();
            Objects.requireNonNull(connectedClients);
            Wait.assertEquals(1, connectedClients::size);
            blockingConnection2 = retrieveMQTTConnection("tcp://localhost:61617", "subClientId");
            blockingConnection3 = retrieveMQTTConnection(SimpleManagementTest.LOCALHOST, "pubClientId");
            Assertions.assertTrue(waitConnectionClosed(blockingConnection));
            blockingConnection2.subscribe(topicArr);
            waitForBindings(0, "multicast/test/+/some/#", 0, 0, true);
            waitForBindings(1, "multicast/test/+/some/#", 1, 1, true);
            waitForBindings(0, "multicast/test/+/some/#", 1, 1, false);
            waitForBindings(1, "multicast/test/+/some/#", 0, 0, false);
            blockingConnection3.publish("multicast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection3.publish("multicast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection3.publish("multicast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            Message receive = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive.ack();
            Message receive2 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive2.ack();
            Message receive3 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive3.ack();
            Assertions.assertEquals("This is message 1", new String(receive.getPayload()));
            Assertions.assertEquals("This is message 2", new String(receive2.getPayload()));
            Assertions.assertEquals("This is message 3", new String(receive3.getPayload()));
            blockingConnection2.unsubscribe(new String[]{"multicast/test/+/some/#"});
            waitForBindings(0, "multicast/test/+/some/#", 0, 0, true);
            waitForBindings(1, "multicast/test/+/some/#", 0, 0, true);
            waitForBindings(0, "multicast/test/+/some/#", 0, 0, false);
            waitForBindings(1, "multicast/test/+/some/#", 0, 0, false);
            blockingConnection3.publish("multicast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection3.publish("multicast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection3.publish("multicast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            Assertions.assertNull(blockingConnection2.receive(100L, TimeUnit.MILLISECONDS));
            Assertions.assertNull(blockingConnection2.receive(100L, TimeUnit.MILLISECONDS));
            Assertions.assertNull(blockingConnection2.receive(100L, TimeUnit.MILLISECONDS));
            String[] strArr = {"multicast/test/+/some/#"};
            if (blockingConnection != null && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(strArr);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null && blockingConnection2.isConnected()) {
                blockingConnection2.unsubscribe(strArr);
                blockingConnection2.disconnect();
            }
            if (blockingConnection3 == null || !blockingConnection3.isConnected()) {
                return;
            }
            blockingConnection3.disconnect();
        } catch (Throwable th) {
            String[] strArr2 = {"multicast/test/+/some/#"};
            if (blockingConnection != null && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(strArr2);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null && blockingConnection2.isConnected()) {
                blockingConnection2.unsubscribe(strArr2);
                blockingConnection2.disconnect();
            }
            if (blockingConnection3 != null && blockingConnection3.isConnected()) {
                blockingConnection3.disconnect();
            }
            throw th;
        }
    }

    @Test
    public void useDiffClientIdAndMulticastSubscribeRemoteQueueWildCard() throws Exception {
        setupServers("anycast/test/+/some/#");
        startServers(0, 1);
        BlockingConnection blockingConnection = null;
        BlockingConnection blockingConnection2 = null;
        try {
            Thread.sleep(1000L);
            Topic[] topicArr = {new Topic("multicast/test/+/some/#", QoS.AT_MOST_ONCE)};
            blockingConnection = retrieveMQTTConnection(SimpleManagementTest.LOCALHOST, "clientId1");
            Map connectedClients = locateMQTTPM(this.servers[0]).getStateManager().getConnectedClients();
            Objects.requireNonNull(connectedClients);
            Wait.assertEquals(1, connectedClients::size);
            blockingConnection2 = retrieveMQTTConnection("tcp://localhost:61617", "clientId2");
            Map connectedClients2 = locateMQTTPM(this.servers[1]).getStateManager().getConnectedClients();
            Objects.requireNonNull(connectedClients2);
            Wait.assertEquals(1, connectedClients2::size);
            blockingConnection.subscribe(topicArr);
            waitForBindings(0, "multicast/test/+/some/#", 1, 1, true);
            waitForBindings(1, "multicast/test/+/some/#", 0, 0, true);
            waitForBindings(0, "multicast/test/+/some/#", 0, 0, false);
            waitForBindings(1, "multicast/test/+/some/#", 1, 1, false);
            blockingConnection2.subscribe(topicArr);
            waitForBindings(0, "multicast/test/+/some/#", 1, 1, true);
            waitForBindings(1, "multicast/test/+/some/#", 1, 1, true);
            waitForBindings(0, "multicast/test/+/some/#", 1, 1, false);
            waitForBindings(1, "multicast/test/+/some/#", 1, 1, false);
            blockingConnection.publish("multicast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection.publish("multicast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection.publish("multicast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            Message receive = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive.ack();
            Message receive2 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive2.ack();
            Message receive3 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive3.ack();
            Assertions.assertEquals("This is message 1", new String(receive.getPayload()));
            Assertions.assertEquals("This is message 2", new String(receive2.getPayload()));
            Assertions.assertEquals("This is message 3", new String(receive3.getPayload()));
            Message receive4 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive4.ack();
            Message receive5 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive5.ack();
            Message receive6 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive6.ack();
            Assertions.assertEquals("This is message 1", new String(receive4.getPayload()));
            Assertions.assertEquals("This is message 2", new String(receive5.getPayload()));
            Assertions.assertEquals("This is message 3", new String(receive6.getPayload()));
            blockingConnection2.unsubscribe(new String[]{"multicast/test/+/some/#"});
            waitForBindings(0, "multicast/test/+/some/#", 1, 1, true);
            waitForBindings(1, "multicast/test/+/some/#", 0, 0, true);
            waitForBindings(0, "multicast/test/+/some/#", 0, 0, false);
            waitForBindings(1, "multicast/test/+/some/#", 1, 1, false);
            blockingConnection.publish("multicast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection.publish("multicast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection.publish("multicast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            Message receive7 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive7.ack();
            Message receive8 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive8.ack();
            Message receive9 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive9.ack();
            Assertions.assertEquals("This is message 1", new String(receive7.getPayload()));
            Assertions.assertEquals("This is message 2", new String(receive8.getPayload()));
            Assertions.assertEquals("This is message 3", new String(receive9.getPayload()));
            String[] strArr = {"multicast/test/+/some/#"};
            if (blockingConnection != null && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(strArr);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 == null || !blockingConnection2.isConnected()) {
                return;
            }
            blockingConnection2.unsubscribe(strArr);
            blockingConnection2.disconnect();
        } catch (Throwable th) {
            String[] strArr2 = {"multicast/test/+/some/#"};
            if (blockingConnection != null && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(strArr2);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null && blockingConnection2.isConnected()) {
                blockingConnection2.unsubscribe(strArr2);
                blockingConnection2.disconnect();
            }
            throw th;
        }
    }

    @Test
    public void useDiffClientIdSubscribeRemoteQueueMultipleSubscriptions() throws Exception {
        setupServers("anycast/test/1/some/la");
        startServers(0, 1);
        BlockingConnection blockingConnection = null;
        BlockingConnection blockingConnection2 = null;
        try {
            Thread.sleep(1000L);
            blockingConnection = retrieveMQTTConnection(SimpleManagementTest.LOCALHOST, "clientId1");
            blockingConnection2 = retrieveMQTTConnection("tcp://localhost:61617", "clientId2");
            blockingConnection.subscribe(new Topic[]{new Topic("anycast/test/1/some/la", QoS.AT_MOST_ONCE)});
            waitForBindings(0, "anycast/test/1/some/la", 1, 1, true);
            waitForBindings(1, "anycast/test/1/some/la", 1, 0, true);
            waitForBindings(0, "anycast/test/1/some/la", 1, 0, false);
            waitForBindings(1, "anycast/test/1/some/la", 1, 1, false);
            blockingConnection2.subscribe(new Topic[]{new Topic("anycast/test/1/some/la", QoS.AT_MOST_ONCE), new Topic("sample", QoS.AT_MOST_ONCE)});
            waitForBindings(0, "anycast/test/1/some/la", 1, 1, true);
            waitForBindings(1, "anycast/test/1/some/la", 1, 1, true);
            waitForBindings(0, "anycast/test/1/some/la", 1, 1, false);
            waitForBindings(1, "anycast/test/1/some/la", 1, 1, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection.publish("sample", "This is message 4".getBytes(), QoS.AT_MOST_ONCE, false);
            Message receive = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive.ack();
            Message receive2 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive2.ack();
            Message receive3 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive3.ack();
            Message receive4 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive4.ack();
            Assertions.assertEquals("This is message 1", new String(receive.getPayload()));
            Assertions.assertEquals("This is message 2", new String(receive2.getPayload()));
            Assertions.assertEquals("This is message 3", new String(receive3.getPayload()));
            Assertions.assertEquals("This is message 4", new String(receive4.getPayload()));
            blockingConnection2.unsubscribe(new String[]{"anycast/test/1/some/la"});
            waitForBindings(0, "anycast/test/1/some/la", 1, 1, true);
            waitForBindings(1, "anycast/test/1/some/la", 1, 0, true);
            waitForBindings(0, "anycast/test/1/some/la", 1, 0, false);
            waitForBindings(1, "anycast/test/1/some/la", 1, 1, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection.publish("sample", "This is message 4".getBytes(), QoS.AT_MOST_ONCE, false);
            Message receive5 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive5.ack();
            Message receive6 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive6.ack();
            Message receive7 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive7.ack();
            Message receive8 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive8.ack();
            String str = new String(receive7.getPayload());
            String str2 = new String(receive6.getPayload());
            String str3 = new String(receive5.getPayload());
            Assertions.assertTrue("This is message 1".equals(str) || "This is message 1".equals(str2) || "This is message 1".equals(str3));
            Assertions.assertTrue("This is message 2".equals(str) || "This is message 2".equals(str2) || "This is message 2".equals(str3));
            Assertions.assertTrue("This is message 3".equals(str) || "This is message 3".equals(str2) || "This is message 3".equals(str3));
            Assertions.assertEquals("This is message 4", new String(receive8.getPayload()));
            if (blockingConnection != null && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(new String[]{"anycast/test/1/some/la"});
                blockingConnection.disconnect();
            }
            if (blockingConnection2 == null || !blockingConnection2.isConnected()) {
                return;
            }
            blockingConnection2.unsubscribe(new String[]{"anycast/test/1/some/la", "sample"});
            blockingConnection2.disconnect();
        } catch (Throwable th) {
            if (blockingConnection != null && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(new String[]{"anycast/test/1/some/la"});
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null && blockingConnection2.isConnected()) {
                blockingConnection2.unsubscribe(new String[]{"anycast/test/1/some/la", "sample"});
                blockingConnection2.disconnect();
            }
            throw th;
        }
    }

    @Test
    public void useSameClientIdSubscribeRemoteQueueMultipleSubscriptions() throws Exception {
        setupServers("anycast/test/1/some/la");
        startServers(0, 1);
        BlockingConnection blockingConnection = null;
        BlockingConnection blockingConnection2 = null;
        BlockingConnection blockingConnection3 = null;
        try {
            Thread.sleep(1000L);
            blockingConnection = retrieveMQTTConnection(SimpleManagementTest.LOCALHOST, "subClientId");
            Map connectedClients = locateMQTTPM(this.servers[0]).getStateManager().getConnectedClients();
            Objects.requireNonNull(connectedClients);
            Wait.assertEquals(1, connectedClients::size);
            blockingConnection2 = retrieveMQTTConnection("tcp://localhost:61617", "subClientId");
            blockingConnection3 = retrieveMQTTConnection(SimpleManagementTest.LOCALHOST, "pubClientId");
            Assertions.assertTrue(waitConnectionClosed(blockingConnection));
            blockingConnection2.subscribe(new Topic[]{new Topic("anycast/test/1/some/la", QoS.AT_MOST_ONCE), new Topic("sample", QoS.AT_MOST_ONCE)});
            waitForBindings(0, "anycast/test/1/some/la", 1, 0, true);
            waitForBindings(1, "anycast/test/1/some/la", 1, 1, true);
            waitForBindings(0, "anycast/test/1/some/la", 1, 1, false);
            waitForBindings(1, "anycast/test/1/some/la", 1, 0, false);
            blockingConnection3.publish("anycast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection3.publish("anycast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection3.publish("anycast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection3.publish("sample", "This is message 4".getBytes(), QoS.AT_MOST_ONCE, false);
            Message receive = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive.ack();
            Message receive2 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive2.ack();
            Message receive3 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive3.ack();
            Message receive4 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive4.ack();
            String str = new String(receive.getPayload());
            String str2 = new String(receive2.getPayload());
            String str3 = new String(receive3.getPayload());
            String str4 = new String(receive4.getPayload());
            Assertions.assertTrue("This is message 1".equals(str) || "This is message 1".equals(str2) || "This is message 1".equals(str3) || "This is message 1".equals(str4));
            Assertions.assertTrue("This is message 2".equals(str) || "This is message 2".equals(str2) || "This is message 2".equals(str3) || "This is message 2".equals(str4));
            Assertions.assertTrue("This is message 3".equals(str) || "This is message 3".equals(str2) || "This is message 3".equals(str3) || "This is message 3".equals(str4));
            Assertions.assertTrue("This is message 4".equals(str) || "This is message 4".equals(str2) || "This is message 4".equals(str3) || "This is message 4".equals(str4));
            blockingConnection2.unsubscribe(new String[]{"anycast/test/1/some/la"});
            waitForBindings(0, "anycast/test/1/some/la", 1, 0, true);
            waitForBindings(1, "anycast/test/1/some/la", 1, 0, true);
            waitForBindings(0, "anycast/test/1/some/la", 1, 0, false);
            waitForBindings(1, "anycast/test/1/some/la", 1, 0, false);
            blockingConnection3.publish("anycast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection3.publish("anycast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection3.publish("anycast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection3.publish("sample", "This is message 4".getBytes(), QoS.AT_MOST_ONCE, false);
            Message receive5 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive5.ack();
            Assertions.assertEquals("This is message 4", new String(receive5.getPayload()));
            Assertions.assertNull(blockingConnection2.receive(100L, TimeUnit.MILLISECONDS));
            Assertions.assertNull(blockingConnection2.receive(100L, TimeUnit.MILLISECONDS));
            Assertions.assertNull(blockingConnection2.receive(100L, TimeUnit.MILLISECONDS));
            if (blockingConnection != null && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(new String[]{"anycast/test/1/some/la"});
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null && blockingConnection2.isConnected()) {
                blockingConnection2.unsubscribe(new String[]{"anycast/test/1/some/la", "sample"});
                blockingConnection2.disconnect();
            }
            if (blockingConnection3 == null || !blockingConnection3.isConnected()) {
                return;
            }
            blockingConnection3.disconnect();
        } catch (Throwable th) {
            if (blockingConnection != null && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(new String[]{"anycast/test/1/some/la"});
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null && blockingConnection2.isConnected()) {
                blockingConnection2.unsubscribe(new String[]{"anycast/test/1/some/la", "sample"});
                blockingConnection2.disconnect();
            }
            if (blockingConnection3 != null && blockingConnection3.isConnected()) {
                blockingConnection3.disconnect();
            }
            throw th;
        }
    }

    @Test
    public void useSameClientIdSubscribeExistingQueue() throws Exception {
        setupServers("anycast/test/1/some/la");
        startServers(0, 1);
        BlockingConnection blockingConnection = null;
        BlockingConnection blockingConnection2 = null;
        BlockingConnection blockingConnection3 = null;
        BlockingConnection blockingConnection4 = null;
        try {
            Thread.sleep(1000L);
            blockingConnection4 = retrieveMQTTConnection(SimpleManagementTest.LOCALHOST, "pubClientId");
            blockingConnection = retrieveMQTTConnection(SimpleManagementTest.LOCALHOST, "subClientId");
            Map connectedClients = locateMQTTPM(this.servers[0]).getStateManager().getConnectedClients();
            Objects.requireNonNull(connectedClients);
            Wait.assertEquals(2, connectedClients::size);
            blockingConnection2 = retrieveMQTTConnection("tcp://localhost:61617", "subClientId");
            Assertions.assertTrue(waitConnectionClosed(blockingConnection));
            blockingConnection3 = retrieveMQTTConnection("tcp://localhost:61617", "subClientId");
            Assertions.assertTrue(waitConnectionClosed(blockingConnection));
            blockingConnection3.subscribe(new Topic[]{new Topic("anycast/test/1/some/la", QoS.AT_MOST_ONCE)});
            waitForBindings(0, "anycast/test/1/some/la", 1, 0, true);
            waitForBindings(1, "anycast/test/1/some/la", 1, 1, true);
            waitForBindings(0, "anycast/test/1/some/la", 1, 1, false);
            waitForBindings(1, "anycast/test/1/some/la", 1, 0, false);
            blockingConnection4.publish("anycast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection4.publish("anycast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection4.publish("anycast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection4.publish("anycast/test/1/some/la", "This is message 4".getBytes(), QoS.AT_MOST_ONCE, false);
            Message receive = blockingConnection3.receive(5L, TimeUnit.SECONDS);
            receive.ack();
            Message receive2 = blockingConnection3.receive(5L, TimeUnit.SECONDS);
            receive2.ack();
            Message receive3 = blockingConnection3.receive(5L, TimeUnit.SECONDS);
            receive3.ack();
            Message receive4 = blockingConnection3.receive(5L, TimeUnit.SECONDS);
            receive4.ack();
            Assertions.assertEquals("This is message 1", new String(receive.getPayload()));
            Assertions.assertEquals("This is message 2", new String(receive2.getPayload()));
            Assertions.assertEquals("This is message 3", new String(receive3.getPayload()));
            Assertions.assertEquals("This is message 4", new String(receive4.getPayload()));
            blockingConnection3.unsubscribe(new String[]{"anycast/test/1/some/la"});
            waitForBindings(0, "anycast/test/1/some/la", 1, 0, true);
            waitForBindings(1, "anycast/test/1/some/la", 1, 0, true);
            waitForBindings(0, "anycast/test/1/some/la", 1, 0, false);
            waitForBindings(1, "anycast/test/1/some/la", 1, 0, false);
            blockingConnection4.publish("anycast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection4.publish("anycast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection4.publish("anycast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            Assertions.assertNull(blockingConnection3.receive(100L, TimeUnit.MILLISECONDS));
            Assertions.assertNull(blockingConnection3.receive(100L, TimeUnit.MILLISECONDS));
            Assertions.assertNull(blockingConnection3.receive(100L, TimeUnit.MILLISECONDS));
            String[] strArr = {"anycast/test/1/some/la"};
            if (blockingConnection != null && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(strArr);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null && blockingConnection2.isConnected()) {
                blockingConnection2.unsubscribe(strArr);
                blockingConnection2.disconnect();
            }
            if (blockingConnection3 != null && blockingConnection3.isConnected()) {
                blockingConnection3.unsubscribe(strArr);
                blockingConnection3.disconnect();
            }
            if (blockingConnection4 == null || !blockingConnection4.isConnected()) {
                return;
            }
            blockingConnection4.unsubscribe(strArr);
            blockingConnection4.disconnect();
        } catch (Throwable th) {
            String[] strArr2 = {"anycast/test/1/some/la"};
            if (blockingConnection != null && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(strArr2);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null && blockingConnection2.isConnected()) {
                blockingConnection2.unsubscribe(strArr2);
                blockingConnection2.disconnect();
            }
            if (blockingConnection3 != null && blockingConnection3.isConnected()) {
                blockingConnection3.unsubscribe(strArr2);
                blockingConnection3.disconnect();
            }
            if (blockingConnection4 != null && blockingConnection4.isConnected()) {
                blockingConnection4.unsubscribe(strArr2);
                blockingConnection4.disconnect();
            }
            throw th;
        }
    }

    @Test
    public void useDiffClientIdSubscribeExistingQueue() throws Exception {
        setupServers("anycast/test/1/some/la");
        startServers(0, 1);
        BlockingConnection blockingConnection = null;
        BlockingConnection blockingConnection2 = null;
        BlockingConnection blockingConnection3 = null;
        try {
            Thread.sleep(1000L);
            blockingConnection = retrieveMQTTConnection(SimpleManagementTest.LOCALHOST, "clientId1");
            blockingConnection2 = retrieveMQTTConnection("tcp://localhost:61617", "clientId2");
            blockingConnection3 = retrieveMQTTConnection("tcp://localhost:61617", "clientId3");
            Topic[] topicArr = {new Topic("anycast/test/1/some/la", QoS.AT_MOST_ONCE)};
            blockingConnection.subscribe(topicArr);
            waitForBindings(0, "anycast/test/1/some/la", 1, 1, true);
            waitForBindings(1, "anycast/test/1/some/la", 1, 0, true);
            waitForBindings(0, "anycast/test/1/some/la", 1, 0, false);
            waitForBindings(1, "anycast/test/1/some/la", 1, 1, false);
            blockingConnection2.subscribe(topicArr);
            waitForBindings(0, "anycast/test/1/some/la", 1, 1, true);
            waitForBindings(1, "anycast/test/1/some/la", 1, 1, true);
            waitForBindings(0, "anycast/test/1/some/la", 1, 1, false);
            waitForBindings(1, "anycast/test/1/some/la", 1, 1, false);
            blockingConnection3.subscribe(topicArr);
            waitForBindings(0, "anycast/test/1/some/la", 1, 1, true);
            waitForBindings(1, "anycast/test/1/some/la", 1, 2, true);
            waitForBindings(0, "anycast/test/1/some/la", 1, 2, false);
            waitForBindings(1, "anycast/test/1/some/la", 1, 1, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 4".getBytes(), QoS.AT_MOST_ONCE, false);
            Message receive = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive.ack();
            Message receive2 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            receive2.ack();
            Message receive3 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive3.ack();
            Message receive4 = blockingConnection3.receive(5L, TimeUnit.SECONDS);
            receive4.ack();
            Assertions.assertEquals("This is message 1", new String(receive.getPayload()));
            Assertions.assertEquals("This is message 2", new String(receive2.getPayload()));
            Assertions.assertEquals("This is message 3", new String(receive3.getPayload()));
            Assertions.assertEquals("This is message 4", new String(receive4.getPayload()));
            blockingConnection2.unsubscribe(new String[]{"anycast/test/1/some/la"});
            waitForBindings(0, "anycast/test/1/some/la", 1, 1, true);
            waitForBindings(1, "anycast/test/1/some/la", 1, 1, true);
            waitForBindings(0, "anycast/test/1/some/la", 1, 1, false);
            waitForBindings(1, "anycast/test/1/some/la", 1, 1, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection.publish("anycast/test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            Message receive5 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive5.ack();
            Message receive6 = blockingConnection3.receive(5L, TimeUnit.SECONDS);
            receive6.ack();
            Message receive7 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            receive7.ack();
            String str = new String(receive5.getPayload());
            String str2 = new String(receive6.getPayload());
            String str3 = new String(receive7.getPayload());
            Assertions.assertTrue("This is message 1".equals(str) || "This is message 1".equals(str2) || "This is message 1".equals(str3));
            Assertions.assertTrue("This is message 2".equals(str) || "This is message 2".equals(str2) || "This is message 2".equals(str3));
            Assertions.assertTrue("This is message 3".equals(str) || "This is message 3".equals(str2) || "This is message 3".equals(str3));
            String[] strArr = {"anycast/test/1/some/la"};
            if (blockingConnection != null && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(strArr);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null && blockingConnection2.isConnected()) {
                blockingConnection2.unsubscribe(strArr);
                blockingConnection2.disconnect();
            }
            if (blockingConnection3 == null || !blockingConnection3.isConnected()) {
                return;
            }
            blockingConnection3.unsubscribe(strArr);
            blockingConnection3.disconnect();
        } catch (Throwable th) {
            String[] strArr2 = {"anycast/test/1/some/la"};
            if (blockingConnection != null && blockingConnection.isConnected()) {
                blockingConnection.unsubscribe(strArr2);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null && blockingConnection2.isConnected()) {
                blockingConnection2.unsubscribe(strArr2);
                blockingConnection2.disconnect();
            }
            if (blockingConnection3 != null && blockingConnection3.isConnected()) {
                blockingConnection3.unsubscribe(strArr2);
                blockingConnection3.disconnect();
            }
            throw th;
        }
    }

    private static BlockingConnection retrieveMQTTConnection(String str, String str2) throws Exception {
        MQTT mqtt = new MQTT();
        mqtt.setHost(str);
        mqtt.setClientId(str2);
        mqtt.setConnectAttemptsMax(0L);
        mqtt.setReconnectAttemptsMax(0L);
        BlockingConnection blockingConnection = mqtt.blockingConnection();
        blockingConnection.connect();
        return blockingConnection;
    }

    private void setupServers(String str) throws Exception {
        WildcardConfiguration createWildCardConfiguration = createWildCardConfiguration();
        CoreAddressConfiguration createAddressConfiguration = createAddressConfiguration(str);
        AddressSettings createAddressSettings = createAddressSettings();
        setupServer(0, false, isNetty());
        this.servers[0].getConfiguration().setWildCardConfiguration(createWildCardConfiguration);
        this.servers[0].getConfiguration().addAddressConfiguration(createAddressConfiguration);
        this.servers[0].getConfiguration().addAddressSetting("#", createAddressSettings);
        setupServer(1, false, isNetty());
        this.servers[1].getConfiguration().setWildCardConfiguration(createWildCardConfiguration);
        this.servers[1].getConfiguration().addAddressConfiguration(createAddressConfiguration);
        this.servers[1].getConfiguration().addAddressSetting("#", createAddressSettings);
        setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
        setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
    }

    private AddressSettings createAddressSettings() {
        AddressSettings addressSettings = new AddressSettings();
        addressSettings.setRedistributionDelay(0L);
        addressSettings.setDefaultAddressRoutingType(RoutingType.ANYCAST);
        return addressSettings;
    }

    private CoreAddressConfiguration createAddressConfiguration(String str) {
        CoreAddressConfiguration coreAddressConfiguration = new CoreAddressConfiguration();
        coreAddressConfiguration.addRoutingType(RoutingType.ANYCAST);
        coreAddressConfiguration.setName(str);
        coreAddressConfiguration.addQueueConfiguration(QueueConfiguration.of(str).setRoutingType(RoutingType.ANYCAST));
        return coreAddressConfiguration;
    }

    private WildcardConfiguration createWildCardConfiguration() {
        WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
        wildcardConfiguration.setAnyWords('#');
        wildcardConfiguration.setDelimiter('/');
        wildcardConfiguration.setRoutingEnabled(true);
        wildcardConfiguration.setSingleWord('+');
        return wildcardConfiguration;
    }

    private boolean waitConnectionClosed(BlockingConnection blockingConnection) throws Exception {
        return Wait.waitFor(() -> {
            return !blockingConnection.isConnected();
        });
    }
}
