package org.apache.activemq.artemis.tests.integration.mqtt.imported;

import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterWildcardTest.class */
public class MqttClusterWildcardTest extends ClusterTestBase {
    @Override // org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase
    protected boolean isResolveProtocols() {
        return true;
    }

    public boolean isNetty() {
        return true;
    }

    @Test
    public void loadBalanceRequests() throws Exception {
        WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
        wildcardConfiguration.setAnyWords('#');
        wildcardConfiguration.setDelimiter('/');
        wildcardConfiguration.setRoutingEnabled(true);
        wildcardConfiguration.setSingleWord('+');
        setupServer(0, false, isNetty());
        this.servers[0].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
        setupServer(1, false, isNetty());
        this.servers[1].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
        setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
        setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
        startServers(0, 1);
        BlockingConnection blockingConnection = null;
        BlockingConnection blockingConnection2 = null;
        try {
            blockingConnection = retrieveMQTTConnection("tcp://localhost:61616");
            blockingConnection2 = retrieveMQTTConnection("tcp://localhost:61617");
            Topic[] topicArr = {new Topic("test/+/some/#", QoS.AT_MOST_ONCE)};
            blockingConnection.subscribe(topicArr);
            blockingConnection2.subscribe(topicArr);
            waitForBindings(0, "test/+/some/#", 1, 1, true);
            waitForBindings(1, "test/+/some/#", 1, 1, true);
            waitForBindings(0, "test/+/some/#", 1, 1, false);
            waitForBindings(1, "test/+/some/#", 1, 1, false);
            blockingConnection.publish("test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection.publish("test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection.publish("test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            Message receive = blockingConnection.receive(5L, TimeUnit.SECONDS);
            Message receive2 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            Message receive3 = blockingConnection.receive(5L, TimeUnit.SECONDS);
            Message receive4 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            Message receive5 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            Message receive6 = blockingConnection2.receive(5L, TimeUnit.SECONDS);
            assertEquals("This is message 1", new String(receive.getPayload()));
            assertEquals("This is message 2", new String(receive2.getPayload()));
            assertEquals("This is message 3", new String(receive3.getPayload()));
            assertEquals("This is message 1", new String(receive4.getPayload()));
            assertEquals("This is message 2", new String(receive5.getPayload()));
            assertEquals("This is message 3", new String(receive6.getPayload()));
            String[] strArr = {"test/+/some/#"};
            if (blockingConnection != null) {
                blockingConnection.unsubscribe(strArr);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null) {
                blockingConnection2.unsubscribe(strArr);
                blockingConnection2.disconnect();
            }
        } catch (Throwable th) {
            String[] strArr2 = {"test/+/some/#"};
            if (blockingConnection != null) {
                blockingConnection.unsubscribe(strArr2);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null) {
                blockingConnection2.unsubscribe(strArr2);
                blockingConnection2.disconnect();
            }
            throw th;
        }
    }

    @Test
    public void wildcardsWithBroker1Disconnected() throws Exception {
        BlockingConnection blockingConnection = null;
        BlockingConnection blockingConnection2 = null;
        try {
            WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
            wildcardConfiguration.setAnyWords('#');
            wildcardConfiguration.setDelimiter('/');
            wildcardConfiguration.setRoutingEnabled(true);
            wildcardConfiguration.setSingleWord('+');
            setupServer(0, false, isNetty());
            this.servers[0].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
            setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
            startServers(0);
            blockingConnection = retrieveMQTTConnection("tcp://localhost:61616");
            Topic[] topicArr = {new Topic("test/+/some/#", QoS.AT_MOST_ONCE)};
            blockingConnection.subscribe(topicArr);
            waitForBindings(0, "test/+/some/#", 1, 1, true);
            waitForBindings(0, "test/+/some/#", 0, 0, false);
            blockingConnection.publish("test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection.publish("test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection.publish("test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            Message receive = blockingConnection.receive(5L, TimeUnit.SECONDS);
            setupServer(1, false, isNetty());
            this.servers[1].getConfiguration().setWildCardConfiguration(wildcardConfiguration);
            setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
            startServers(1);
            blockingConnection2 = retrieveMQTTConnection("tcp://localhost:61617");
            blockingConnection2.subscribe(topicArr);
            waitForBindings(1, "test/+/some/#", 1, 1, false);
            waitForBindings(1, "test/+/some/#", 1, 1, true);
            waitForBindings(0, "test/+/some/#", 1, 1, false);
            waitForBindings(0, "test/+/some/#", 1, 1, true);
            blockingConnection.publish("test/1/some/la", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
            blockingConnection.publish("test/1/some/la", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
            blockingConnection.publish("test/1/some/la", "This is message 3".getBytes(), QoS.AT_MOST_ONCE, false);
            Message receive2 = blockingConnection.receive(10L, TimeUnit.SECONDS);
            Assert.assertNotNull(receive2);
            Message receive3 = blockingConnection.receive(10L, TimeUnit.SECONDS);
            Assert.assertNotNull(receive3);
            Message receive4 = blockingConnection2.receive(10L, TimeUnit.SECONDS);
            Assert.assertNotNull(receive4);
            Message receive5 = blockingConnection2.receive(10L, TimeUnit.SECONDS);
            Assert.assertNotNull(receive5);
            Message receive6 = blockingConnection2.receive(10L, TimeUnit.SECONDS);
            Assert.assertNotNull(receive6);
            assertEquals("This is message 1", new String(receive.getPayload()));
            assertEquals("This is message 2", new String(receive2.getPayload()));
            assertEquals("This is message 3", new String(receive3.getPayload()));
            assertEquals("This is message 1", new String(receive4.getPayload()));
            assertEquals("This is message 2", new String(receive5.getPayload()));
            assertEquals("This is message 3", new String(receive6.getPayload()));
            String[] strArr = {"test/+/some/#"};
            if (blockingConnection != null) {
                blockingConnection.unsubscribe(strArr);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null) {
                blockingConnection2.unsubscribe(strArr);
                blockingConnection2.disconnect();
            }
        } catch (Throwable th) {
            String[] strArr2 = {"test/+/some/#"};
            if (blockingConnection != null) {
                blockingConnection.unsubscribe(strArr2);
                blockingConnection.disconnect();
            }
            if (blockingConnection2 != null) {
                blockingConnection2.unsubscribe(strArr2);
                blockingConnection2.disconnect();
            }
            throw th;
        }
    }

    private static BlockingConnection retrieveMQTTConnection(String str) throws Exception {
        MQTT mqtt = new MQTT();
        mqtt.setHost(str);
        BlockingConnection blockingConnection = mqtt.blockingConnection();
        blockingConnection.connect();
        return blockingConnection;
    }
}
