package org.apache.activemq.artemis.tests.integration.mqtt5;

import io.netty.handler.codec.mqtt.MqttPublishMessage;
import jakarta.jms.ConnectionFactory;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState;
import org.apache.activemq.artemis.core.remoting.impl.AbstractAcceptor;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ClassloadingUtil;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.class */
public class MQTT5TestSupport extends ActiveMQTestBase {
    protected static final String TCP = "tcp";
    protected static final String WS = "ws";
    protected static final String SSL = "ssl";
    protected static final String WSS = "wss";
    protected String protocol;
    protected static final long DEFAULT_TIMEOUT = 300000;
    protected ActiveMQServer server;
    protected int port;
    protected int sslPort;
    protected ConnectionFactory cf;
    protected LinkedList<Throwable> exceptions;
    protected boolean persistent;
    protected String protocolScheme;
    protected static final int NUM_MESSAGES = 250;
    public static final int AT_MOST_ONCE = 0;
    public static final int AT_LEAST_ONCE = 1;
    public static final int EXACTLY_ONCE = 2;
    protected String noprivUser;
    protected String noprivPass;
    protected String createAddressUser;
    protected String createAddressPass;
    protected String browseUser;
    protected String browsePass;
    protected String guestUser;
    protected String guestPass;
    protected String fullUser;
    protected String fullPass;
    protected String noDeleteUser;
    protected String noDeletePass;

    @Rule
    public TestName name;
    protected static final SimpleString DEAD_LETTER_ADDRESS = new SimpleString("DLA");
    protected static final SimpleString EXPIRY_ADDRESS = new SimpleString("EXPIRY");
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport$DefaultMqttCallback.class */
    protected interface DefaultMqttCallback extends MqttCallback {
        default void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
        }

        default void mqttErrorOccurred(MqttException mqttException) {
        }

        default void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
        }

        default void deliveryComplete(IMqttToken iMqttToken) {
        }

        default void connectComplete(boolean z, String str) {
        }

        default void authPacketArrived(int i, MqttProperties mqttProperties) {
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport$LatchedMqttCallback.class */
    protected class LatchedMqttCallback implements DefaultMqttCallback {
        CountDownLatch latch;
        boolean fail;

        public LatchedMqttCallback(CountDownLatch countDownLatch) {
            this.latch = countDownLatch;
            this.fail = false;
        }

        public LatchedMqttCallback(CountDownLatch countDownLatch, boolean z) {
            this.latch = countDownLatch;
            this.fail = z;
        }

        @Override // org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport.DefaultMqttCallback
        public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
            System.out.println("Message arrived: " + mqttMessage);
            this.latch.countDown();
            if (this.fail) {
                throw new Exception();
            }
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport$MQTTIncomingInterceptor.class */
    public static class MQTTIncomingInterceptor implements MQTTInterceptor {
        private static int messageCount = 0;

        public boolean intercept(io.netty.handler.codec.mqtt.MqttMessage mqttMessage, RemotingConnection remotingConnection) throws ActiveMQException {
            if (mqttMessage.getClass() != MqttPublishMessage.class) {
                return true;
            }
            messageCount++;
            return true;
        }

        public static void clear() {
            messageCount = 0;
        }

        public static int getMessageCount() {
            return messageCount;
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport$MQTTOutoingInterceptor.class */
    public static class MQTTOutoingInterceptor implements MQTTInterceptor {
        private static int messageCount = 0;

        public boolean intercept(io.netty.handler.codec.mqtt.MqttMessage mqttMessage, RemotingConnection remotingConnection) throws ActiveMQException {
            if (mqttMessage.getClass() != MqttPublishMessage.class) {
                return true;
            }
            messageCount++;
            return true;
        }

        public static void clear() {
            messageCount = 0;
        }

        public static int getMessageCount() {
            return messageCount;
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport$Task.class */
    protected interface Task {
        void run() throws Exception;
    }

    @Parameterized.Parameters(name = "protocol={0}")
    public static Collection<Object[]> getParams() {
        return Arrays.asList(new Object[]{TCP}, new Object[]{WS});
    }

    public MQTT5TestSupport(String str) {
        this.port = 1883;
        this.sslPort = 8883;
        this.exceptions = new LinkedList<>();
        this.noprivUser = "noprivs";
        this.noprivPass = "noprivs";
        this.createAddressUser = "createAddress";
        this.createAddressPass = "createAddress";
        this.browseUser = "browser";
        this.browsePass = "browser";
        this.guestUser = "guest";
        this.guestPass = "guest";
        this.fullUser = "user";
        this.fullPass = "pass";
        this.noDeleteUser = "noDelete";
        this.noDeletePass = "noDelete";
        this.name = new TestName();
        this.protocol = str;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MqttClient createPahoClient(String str) throws MqttException {
        return new MqttClient(this.protocol + "://localhost:" + (isUseSsl() ? getSslPort() : getPort()), str, new MemoryPersistence());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public org.eclipse.paho.client.mqttv3.MqttClient createPaho3_1_1Client(String str) throws org.eclipse.paho.client.mqttv3.MqttException {
        return new org.eclipse.paho.client.mqttv3.MqttClient(this.protocol + "://localhost:" + (isUseSsl() ? getSslPort() : getPort()), str, new org.eclipse.paho.client.mqttv3.persist.MemoryPersistence());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MqttAsyncClient createAsyncPahoClient(String str) throws MqttException {
        return new MqttAsyncClient(this.protocol + "://localhost:" + (isUseSsl() ? getSslPort() : getPort()), str, new MemoryPersistence());
    }

    public MQTT5TestSupport() {
        this.port = 1883;
        this.sslPort = 8883;
        this.exceptions = new LinkedList<>();
        this.noprivUser = "noprivs";
        this.noprivPass = "noprivs";
        this.createAddressUser = "createAddress";
        this.createAddressPass = "createAddress";
        this.browseUser = "browser";
        this.browsePass = "browser";
        this.guestUser = "guest";
        this.guestPass = "guest";
        this.fullUser = "user";
        this.fullPass = "pass";
        this.noDeleteUser = "noDelete";
        this.noDeletePass = "noDelete";
        this.name = new TestName();
        this.protocolScheme = "mqtt";
    }

    public File basedir() throws IOException {
        return new File(new File(getClass().getProtectionDomain().getCodeSource().getLocation().getPath()), "../..").getCanonicalFile();
    }

    public String getName() {
        return this.name.getMethodName();
    }

    public ActiveMQServer getServer() {
        return this.server;
    }

    @Before
    public void setUp() throws Exception {
        this.exceptions.clear();
        startBroker();
        createJMSConnection();
    }

    @After
    public void tearDown() throws Exception {
        stopBroker();
        super.tearDown();
    }

    public void configureBroker() throws Exception {
        super.setUp();
        this.server = createServerForMQTT();
        addCoreConnector();
        addMQTTConnector();
        AddressSettings addressSettings = new AddressSettings();
        addressSettings.setMaxSizeBytes(999999999L);
        addressSettings.setAutoCreateQueues(true);
        addressSettings.setAutoCreateAddresses(true);
        configureBrokerSecurity(this.server);
        this.server.getAddressSettingsRepository().addMatch("#", addressSettings);
        this.server.getConfiguration().setMessageExpiryScanPeriod(500L);
    }

    protected void configureBrokerSecurity(ActiveMQServer activeMQServer) {
        if (!isSecurityEnabled()) {
            activeMQServer.getConfiguration().setSecurityEnabled(false);
            return;
        }
        ActiveMQJAASSecurityManager securityManager = activeMQServer.getSecurityManager();
        securityManager.getConfiguration().addUser(this.noprivUser, this.noprivPass);
        securityManager.getConfiguration().addRole(this.noprivUser, "nothing");
        securityManager.getConfiguration().addUser(this.createAddressUser, this.createAddressPass);
        securityManager.getConfiguration().addRole(this.createAddressUser, "createAddress");
        securityManager.getConfiguration().addUser(this.browseUser, this.browsePass);
        securityManager.getConfiguration().addRole(this.browseUser, "browser");
        securityManager.getConfiguration().addUser(this.guestUser, this.guestPass);
        securityManager.getConfiguration().addRole(this.guestUser, "guest");
        securityManager.getConfiguration().addUser(this.fullUser, this.fullPass);
        securityManager.getConfiguration().addRole(this.fullUser, "full");
        securityManager.getConfiguration().addUser(this.noDeleteUser, this.noDeleteUser);
        securityManager.getConfiguration().addRole(this.noDeleteUser, "noDelete");
        HierarchicalRepository securityRepository = activeMQServer.getSecurityRepository();
        HashSet hashSet = new HashSet();
        hashSet.add(new Role("nothing", false, false, false, false, false, false, false, false, false, false));
        hashSet.add(new Role("browser", false, false, false, false, false, false, false, true, false, false));
        hashSet.add(new Role("guest", false, true, false, false, false, false, false, true, false, false));
        hashSet.add(new Role("full", true, true, true, true, true, true, true, true, true, true));
        hashSet.add(new Role("createAddress", false, false, false, false, false, false, false, false, true, false));
        hashSet.add(new Role("noDelete", true, true, true, false, true, false, true, true, true, true));
        securityRepository.addMatch("#", hashSet);
        activeMQServer.getConfiguration().setSecurityEnabled(true);
    }

    public void startBroker() throws Exception {
        configureBroker();
        this.server.start();
        this.server.waitForActivation(10L, TimeUnit.SECONDS);
    }

    public void createJMSConnection() throws Exception {
        this.cf = new ActiveMQConnectionFactory(false, new TransportConfiguration[]{new TransportConfiguration(ActiveMQTestBase.NETTY_CONNECTOR_FACTORY)});
    }

    private ActiveMQServer createServerForMQTT() throws Exception {
        Configuration outgoingInterceptorClassNames = createDefaultConfig(true).setIncomingInterceptorClassNames(Collections.singletonList(MQTTIncomingInterceptor.class.getName())).setOutgoingInterceptorClassNames(Collections.singletonList(MQTTOutoingInterceptor.class.getName()));
        AddressSettings addressSettings = new AddressSettings();
        addressSettings.setDeadLetterAddress(DEAD_LETTER_ADDRESS);
        addressSettings.setExpiryAddress(EXPIRY_ADDRESS);
        outgoingInterceptorClassNames.getAddressSettings().put("#", addressSettings);
        outgoingInterceptorClassNames.setMqttSessionScanInterval(200L);
        return createServer(true, outgoingInterceptorClassNames);
    }

    protected void addCoreConnector() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("port", "5445");
        hashMap.put("protocols", "CORE");
        this.server.getConfiguration().getAcceptorConfigurations().add(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, hashMap));
        logger.debug("Added CORE connector to broker");
    }

    protected void addMQTTConnector() throws Exception {
        this.server.getConfiguration().addAcceptorConfiguration("MQTT", "tcp://localhost:" + (isUseSsl() ? this.sslPort : this.port) + "?protocols=MQTT;anycastPrefix=anycast:;multicastPrefix=multicast:" + (isUseSsl() ? "&sslEnabled=true&keyStorePath=server-keystore.p12&keyStorePassword=securepass" : "") + (isMutualSsl() ? "&needClientAuth=true&trustStorePath=client-ca-truststore.p12&trustStorePassword=securepass" : ""));
        this.server.getConfiguration().setConnectionTtlCheckInterval(100L);
        logger.debug("Added MQTT connector to broker");
    }

    public void stopBroker() throws Exception {
        if (this.server.isStarted()) {
            this.server.stop();
            this.server = null;
        }
    }

    protected String getQueueName() {
        return getClass().getName() + "." + this.name.getMethodName();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getTopicName() {
        return getClass().getName() + "." + this.name.getMethodName();
    }

    public boolean isPersistent() {
        return this.persistent;
    }

    public int getPort() {
        return this.port;
    }

    public int getSslPort() {
        return this.sslPort;
    }

    public boolean isSecurityEnabled() {
        return false;
    }

    public boolean isUseSsl() {
        return false;
    }

    public boolean isMutualSsl() {
        return false;
    }

    public Map<String, MQTTSessionState> getSessionStates() {
        MQTTProtocolManager protocolManager = getProtocolManager();
        return protocolManager == null ? Collections.emptyMap() : protocolManager.getSessionStates();
    }

    public void scanSessions() {
        getProtocolManager().scanSessions();
    }

    public MQTTProtocolManager getProtocolManager() {
        AbstractAcceptor acceptor = this.server.getRemotingService().getAcceptor("MQTT");
        if (!(acceptor instanceof AbstractAcceptor)) {
            return null;
        }
        MQTTProtocolManager mQTTProtocolManager = (ProtocolManager) acceptor.getProtocolMap().get("MQTT");
        if (mQTTProtocolManager instanceof MQTTProtocolManager) {
            return mQTTProtocolManager;
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Queue getSubscriptionQueue(String str) {
        try {
            Object[] array = this.server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(str)).getBindings().toArray();
            if (array.length == 0) {
                return null;
            }
            return ((LocalQueueBinding) array[0]).getQueue();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Queue getSubscriptionQueue(String str, String str2) {
        return getSubscriptionQueue(str, str2, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Queue getSubscriptionQueue(String str, String str2, String str3) {
        try {
            for (LocalQueueBinding localQueueBinding : this.server.getPostOffice().getMatchingBindings(SimpleString.toSimpleString(str))) {
                if (str3 != null) {
                    if (localQueueBinding.getQueue().getName().startsWith(SimpleString.toSimpleString(str3))) {
                        return localQueueBinding.getQueue();
                    }
                } else if (localQueueBinding.getQueue().getName().startsWith(SimpleString.toSimpleString(str2))) {
                    return localQueueBinding.getQueue();
                }
            }
            return null;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setAcceptorProperty(String str) throws Exception {
        this.server.getRemotingService().getAcceptor("MQTT").stop();
        this.server.getRemotingService().createAcceptor("MQTT", "tcp://localhost:" + this.port + "?protocols=MQTT;" + str).start();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MqttConnectionOptions getSslMqttConnectOptions() {
        MqttConnectionOptions mqttConnectionOptions = new MqttConnectionOptions();
        Properties properties = new Properties();
        properties.setProperty("com.ibm.ssl.trustStore", ClassloadingUtil.findResource("server-ca-truststore.p12").getPath());
        properties.setProperty("com.ibm.ssl.trustStorePassword", "securepass");
        if (isMutualSsl()) {
            properties.setProperty("com.ibm.ssl.keyStore", ClassloadingUtil.findResource("client-keystore.p12").getPath());
            properties.setProperty("com.ibm.ssl.keyStorePassword", "securepass");
        }
        mqttConnectionOptions.setSSLProperties(properties);
        return mqttConnectionOptions;
    }
}
