package org.apache.activemq.artemis.tests.integration.mqtt;

import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.integration.management.SimpleManagementTest;
import org.apache.activemq.artemis.tests.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt/MqttClusterRemoteSubscribeLoadBalanceOffTest.class */
public class MqttClusterRemoteSubscribeLoadBalanceOffTest extends ClusterTestBase {
    @Override // org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase
    protected boolean isResolveProtocols() {
        return true;
    }

    public boolean isNetty() {
        return true;
    }

    @Test
    public void testPub0Sub1() throws Exception {
        Topic[] topicArr = {new Topic("test/1", QoS.AT_MOST_ONCE)};
        setupServers("test/1");
        startServers(0, 1);
        BlockingConnection retrieveMQTTConnection = retrieveMQTTConnection(SimpleManagementTest.LOCALHOST, "clientId1");
        BlockingConnection retrieveMQTTConnection2 = retrieveMQTTConnection("tcp://localhost:61617", "clientId2");
        Assertions.assertTrue(Wait.waitFor(() -> {
            return retrieveMQTTConnection.isConnected();
        }, 5000L, 100L), "Should be connected");
        Assertions.assertTrue(Wait.waitFor(() -> {
            return retrieveMQTTConnection2.isConnected();
        }, 5000L, 100L), "Should be connected");
        waitForTopology(this.servers[0], "cluster0", 2, 5000L);
        waitForTopology(this.servers[1], "cluster1", 2, 5000L);
        retrieveMQTTConnection.subscribe(topicArr);
        retrieveMQTTConnection2.subscribe(topicArr);
        waitForBindings(0, "test/1", 1, 1, false);
        waitForBindings(1, "test/1", 1, 1, false);
        retrieveMQTTConnection.publish("test/1", "This is message 1".getBytes(), QoS.AT_LEAST_ONCE, false);
        retrieveMQTTConnection2.publish("test/1", "This is message 2".getBytes(), QoS.AT_MOST_ONCE, false);
        retrieveMQTTConnection.receive(5L, TimeUnit.SECONDS).ack();
        retrieveMQTTConnection.receive(5L, TimeUnit.SECONDS).ack();
        retrieveMQTTConnection2.receive(5L, TimeUnit.SECONDS).ack();
        retrieveMQTTConnection2.receive(5L, TimeUnit.SECONDS).ack();
        String[] strArr = {"test/1"};
        if (retrieveMQTTConnection != null && retrieveMQTTConnection.isConnected()) {
            retrieveMQTTConnection.unsubscribe(strArr);
            retrieveMQTTConnection.disconnect();
        }
        if (retrieveMQTTConnection2 == null || !retrieveMQTTConnection2.isConnected()) {
            return;
        }
        retrieveMQTTConnection2.unsubscribe(strArr);
        retrieveMQTTConnection2.disconnect();
    }

    private static BlockingConnection retrieveMQTTConnection(String str, String str2) throws Exception {
        MQTT mqtt = new MQTT();
        mqtt.setHost(str);
        mqtt.setClientId(str2);
        mqtt.setConnectAttemptsMax(0L);
        mqtt.setReconnectAttemptsMax(0L);
        BlockingConnection blockingConnection = mqtt.blockingConnection();
        blockingConnection.connect();
        return blockingConnection;
    }

    private void setupServers(String str) throws Exception {
        WildcardConfiguration createWildCardConfiguration = createWildCardConfiguration();
        CoreAddressConfiguration createAddressConfiguration = createAddressConfiguration(str);
        AddressSettings createAddressSettings = createAddressSettings();
        setupServer(0, false, isNetty());
        this.servers[0].getConfiguration().addAddressConfiguration(createAddressConfiguration);
        this.servers[0].getConfiguration().addAddressSetting("#", createAddressSettings);
        this.servers[0].getConfiguration().setWildCardConfiguration(createWildCardConfiguration);
        setupServer(1, false, isNetty());
        this.servers[1].getConfiguration().addAddressConfiguration(createAddressConfiguration);
        this.servers[1].getConfiguration().addAddressSetting("#", createAddressSettings);
        this.servers[1].getConfiguration().setWildCardConfiguration(createWildCardConfiguration);
        setupClusterConnection("cluster0", "", MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 0, 1);
        setupClusterConnection("cluster1", "", MessageLoadBalancingType.OFF, 1, isNetty(), 1, 0);
    }

    private AddressSettings createAddressSettings() {
        AddressSettings addressSettings = new AddressSettings();
        addressSettings.setRedistributionDelay(0L);
        addressSettings.setDefaultAddressRoutingType(RoutingType.MULTICAST);
        return addressSettings;
    }

    private CoreAddressConfiguration createAddressConfiguration(String str) {
        CoreAddressConfiguration coreAddressConfiguration = new CoreAddressConfiguration();
        coreAddressConfiguration.addRoutingType(RoutingType.MULTICAST);
        coreAddressConfiguration.setName(str);
        return coreAddressConfiguration;
    }

    private WildcardConfiguration createWildCardConfiguration() {
        WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
        wildcardConfiguration.setAnyWords('#');
        wildcardConfiguration.setDelimiter('/');
        wildcardConfiguration.setRoutingEnabled(true);
        wildcardConfiguration.setSingleWord('+');
        return wildcardConfiguration;
    }
}
