package org.apache.activemq.artemis.tests.integration.amqp.connect;

import jakarta.jms.Connection;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.integration.client.AutoCreateJmsDestinationTest;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.transport.amqp.client.sasl.PlainMechanism;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.class */
public class AMQPMirrorConnectionTest extends AmqpClientTestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final int BROKER_PORT_NUM = 5673;

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    protected ActiveMQServer createServer() throws Exception {
        return createServer(BROKER_PORT_NUM, false);
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    protected String getConfiguredProtocols() {
        return "AMQP,CORE";
    }

    @Timeout(20)
    @Test
    public void testBrokerMirrorConnectsWithAnonymous() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect(new String[]{PlainMechanism.MECH_NAME, "ANONYMOUS"});
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")).withDesiredCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).respond().withOfferedCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).withPropertiesMap(hashMap);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(new AMQPMirrorBrokerConnectionElement());
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            this.server.stop();
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testBrokerMirrorConnectsWithPlain() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLPlainConnect("user", "pass", new String[]{PlainMechanism.MECH_NAME, "ANONYMOUS"});
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")).withDesiredCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).respond().withOfferedCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).withPropertiesMap(hashMap);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.setUser("user");
            aMQPBrokerConnectConfiguration.setPassword("pass");
            aMQPBrokerConnectConfiguration.addElement(new AMQPMirrorBrokerConnectionElement());
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            this.server.stop();
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testBrokerHandlesSenderLinkOmitsMirrorCapability() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect(new String[]{PlainMechanism.MECH_NAME, "ANONYMOUS"});
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")).withDesiredCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).respond();
            protonTestServer.expectClose().withError(AmqpSupport.CONNECTION_FORCED.toString()).optional();
            protonTestServer.expectConnectionToDrop();
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(new AMQPMirrorBrokerConnectionElement());
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            this.server.stop();
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testBrokerAddsAddressAndQueue() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLPlainConnect("user", "pass", new String[]{PlainMechanism.MECH_NAME, "ANONYMOUS"});
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")).withDesiredCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).respond().withOfferedCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).withPropertiesMap(hashMap);
            protonTestServer.remoteFlow().withLinkCredit(10L).queue();
            protonTestServer.expectTransfer().accept();
            protonTestServer.expectTransfer().accept();
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.setUser("user");
            aMQPBrokerConnectConfiguration.setPassword("pass");
            aMQPBrokerConnectConfiguration.addElement(new AMQPMirrorBrokerConnectionElement().setQueueCreation(true).setAddressFilter("sometest"));
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.addAddressInfo(new AddressInfo("sometest").setAutoCreated(false));
            this.server.createQueue(QueueConfiguration.of("sometest").setDurable(true));
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            this.server.stop();
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testCreateDurableConsumerReplicatesAddressAndQueue() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLPlainConnect("user", "pass", new String[]{PlainMechanism.MECH_NAME, "ANONYMOUS"});
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")).withDesiredCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).respond().withOfferedCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).withPropertiesMap(hashMap);
            protonTestServer.remoteFlow().withLinkCredit(10L).queue();
            protonTestServer.expectTransfer().accept();
            protonTestServer.expectTransfer().accept();
            protonTestServer.expectTransfer().accept();
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.setUser("user");
            aMQPBrokerConnectConfiguration.setPassword("pass");
            aMQPBrokerConnectConfiguration.addElement(new AMQPMirrorBrokerConnectionElement().setQueueCreation(true));
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673").createConnection();
            try {
                createConnection.setClientID("test-client-id");
                Session createSession = createConnection.createSession(false, 2);
                createSession.createDurableConsumer(createSession.createTopic("test-topic"), "subscription").close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                this.server.stop();
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testBrokerMirrorHonorsCoreTunnelingEnable() throws Exception {
        testBrokerMirrorHonorsCoreTunnelingEnableOrDisable(true);
    }

    @Timeout(20)
    @Test
    public void testBrokerMirrorHonorsCoreTunnelingDisable() throws Exception {
        testBrokerMirrorHonorsCoreTunnelingEnableOrDisable(false);
    }

    public void testBrokerMirrorHonorsCoreTunnelingEnableOrDisable(boolean z) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
        String[] strArr = z ? new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()} : new String[]{"amq.mirror"};
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLPlainConnect("user", "pass", new String[]{PlainMechanism.MECH_NAME, "ANONYMOUS"});
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")).withDesiredCapabilities(strArr).respond().withOfferedCapabilities(strArr).withPropertiesMap(hashMap);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPMirrorBrokerConnectionElement aMQPMirrorBrokerConnectionElement = new AMQPMirrorBrokerConnectionElement();
            aMQPMirrorBrokerConnectionElement.addProperty("tunnel-core-messages", Boolean.toString(z));
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.setUser("user");
            aMQPBrokerConnectConfiguration.setPassword("pass");
            aMQPBrokerConnectConfiguration.addElement(aMQPMirrorBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            this.server.stop();
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testProducerMessageIsMirroredWithCoreTunnelingUsesCoreMessageFormat() throws Exception {
        doTestProducerMessageIsMirroredWithCorrectMessageFormat(true);
    }

    @Timeout(20)
    @Test
    public void testProducerMessageIsMirroredWithoutCoreTunnelingUsesDefaultMessageFormat() throws Exception {
        doTestProducerMessageIsMirroredWithCorrectMessageFormat(false);
    }

    private void doTestProducerMessageIsMirroredWithCorrectMessageFormat(boolean z) throws Exception {
        String[] strArr;
        int i;
        HashMap hashMap = new HashMap();
        hashMap.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
        if (z) {
            strArr = new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()};
            i = 1183580416;
        } else {
            strArr = new String[]{"amq.mirror"};
            i = 0;
        }
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLPlainConnect("user", "pass", new String[]{PlainMechanism.MECH_NAME, "ANONYMOUS"});
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")).withDesiredCapabilities(strArr).respond().withOfferedCapabilities(strArr).withPropertiesMap(hashMap);
            protonTestServer.remoteFlow().withLinkCredit(10L).queue();
            protonTestServer.expectTransfer().accept();
            protonTestServer.expectTransfer().accept();
            protonTestServer.expectTransfer().accept();
            protonTestServer.expectTransfer().withMessageFormat(i).accept();
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPMirrorBrokerConnectionElement aMQPMirrorBrokerConnectionElement = new AMQPMirrorBrokerConnectionElement();
            aMQPMirrorBrokerConnectionElement.addProperty("tunnel-core-messages", Boolean.toString(z));
            aMQPMirrorBrokerConnectionElement.setQueueCreation(true);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.setUser("user");
            aMQPBrokerConnectConfiguration.setPassword("pass");
            aMQPBrokerConnectConfiguration.addElement(aMQPMirrorBrokerConnectionElement);
            this.server.createQueue(QueueConfiguration.of("myQueue").setDurable(true));
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            Connection createConnection = CFUtil.createConnectionFactory("CORE", "tcp://localhost:5673").createConnection();
            try {
                Session createSession = createConnection.createSession(false, 2);
                Queue createQueue = createSession.createQueue("myQueue");
                MessageConsumer createConsumer = createSession.createConsumer(createQueue);
                MessageProducer createProducer = createSession.createProducer(createQueue);
                TextMessage createTextMessage = createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME);
                createConnection.start();
                createProducer.setDeliveryMode(2);
                createProducer.send(createTextMessage);
                createConsumer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                this.server.stop();
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testRemoteDoesNotOfferTunnelingResultsInDefaultAMQPFormattedMessages() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLPlainConnect("user", "pass", new String[]{PlainMechanism.MECH_NAME, "ANONYMOUS"});
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")).withDesiredCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).respond().withOfferedCapabilities(new String[]{"amq.mirror"}).withPropertiesMap(hashMap);
            protonTestServer.remoteFlow().withLinkCredit(10L).queue();
            protonTestServer.expectTransfer().accept();
            protonTestServer.expectTransfer().accept();
            protonTestServer.expectTransfer().accept();
            protonTestServer.expectTransfer().withMessageFormat(0).accept();
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPMirrorBrokerConnectionElement aMQPMirrorBrokerConnectionElement = new AMQPMirrorBrokerConnectionElement();
            aMQPMirrorBrokerConnectionElement.addProperty("tunnel-core-messages", Boolean.toString(true));
            aMQPMirrorBrokerConnectionElement.setQueueCreation(true);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.setUser("user");
            aMQPBrokerConnectConfiguration.setPassword("pass");
            aMQPBrokerConnectConfiguration.addElement(aMQPMirrorBrokerConnectionElement);
            this.server.createQueue(QueueConfiguration.of("myQueue").setDurable(true));
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            Connection createConnection = CFUtil.createConnectionFactory("CORE", "tcp://localhost:5673").createConnection();
            try {
                Session createSession = createConnection.createSession(false, 2);
                Queue createQueue = createSession.createQueue("myQueue");
                MessageConsumer createConsumer = createSession.createConsumer(createQueue);
                MessageProducer createProducer = createSession.createProducer(createQueue);
                TextMessage createTextMessage = createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME);
                createConnection.start();
                createProducer.setDeliveryMode(2);
                createProducer.send(createTextMessage);
                createConsumer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                this.server.stop();
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testTunnelingDisabledButRemoteOffersDoesNotUseTunneling() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLPlainConnect("user", "pass", new String[]{PlainMechanism.MECH_NAME, "ANONYMOUS"});
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")).withDesiredCapabilities(new String[]{"amq.mirror"}).respond().withOfferedCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).withPropertiesMap(hashMap);
            protonTestServer.remoteFlow().withLinkCredit(10L).queue();
            protonTestServer.expectTransfer().accept();
            protonTestServer.expectTransfer().accept();
            protonTestServer.expectTransfer().accept();
            protonTestServer.expectTransfer().withMessageFormat(0).accept();
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPMirrorBrokerConnectionElement aMQPMirrorBrokerConnectionElement = new AMQPMirrorBrokerConnectionElement();
            aMQPMirrorBrokerConnectionElement.addProperty("tunnel-core-messages", Boolean.toString(false));
            aMQPMirrorBrokerConnectionElement.setQueueCreation(true);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.setUser("user");
            aMQPBrokerConnectConfiguration.setPassword("pass");
            aMQPBrokerConnectConfiguration.addElement(aMQPMirrorBrokerConnectionElement);
            this.server.createQueue(QueueConfiguration.of("myQueue").setDurable(true));
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            Connection createConnection = CFUtil.createConnectionFactory("CORE", "tcp://localhost:5673").createConnection();
            try {
                Session createSession = createConnection.createSession(false, 2);
                Queue createQueue = createSession.createQueue("myQueue");
                MessageConsumer createConsumer = createSession.createConsumer(createQueue);
                MessageProducer createProducer = createSession.createProducer(createQueue);
                TextMessage createTextMessage = createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME);
                createConnection.start();
                createProducer.setDeliveryMode(2);
                createProducer.send(createTextMessage);
                createConsumer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                this.server.stop();
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testMirrorConnectionRemainsUnchangedAfterConfigurationUpdate() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLPlainConnect("user", "pass", new String[]{PlainMechanism.MECH_NAME, "ANONYMOUS"});
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")).withDesiredCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).respond().withOfferedCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).withPropertiesMap(hashMap);
            protonTestServer.remoteFlow().withLinkCredit(10L).queue();
            protonTestServer.expectTransfer().accept();
            protonTestServer.expectTransfer().accept();
            protonTestServer.expectTransfer().accept();
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPMirrorBrokerConnectionElement aMQPMirrorBrokerConnectionElement = new AMQPMirrorBrokerConnectionElement();
            aMQPMirrorBrokerConnectionElement.setQueueCreation(true);
            aMQPMirrorBrokerConnectionElement.setDurable(true);
            aMQPMirrorBrokerConnectionElement.setName(AutoCreateJmsDestinationTest.QUEUE_NAME);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.setUser("user");
            aMQPBrokerConnectConfiguration.setPassword("pass");
            aMQPBrokerConnectConfiguration.addElement(aMQPMirrorBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673").createConnection();
            try {
                Session createSession = createConnection.createSession(false, 2);
                Queue createQueue = createSession.createQueue("myQueue");
                MessageConsumer createConsumer = createSession.createConsumer(createQueue);
                MessageProducer createProducer = createSession.createProducer(createQueue);
                TextMessage createTextMessage = createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME);
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                AMQPMirrorBrokerConnectionElement aMQPMirrorBrokerConnectionElement2 = new AMQPMirrorBrokerConnectionElement();
                aMQPMirrorBrokerConnectionElement2.setQueueCreation(true);
                aMQPMirrorBrokerConnectionElement2.setDurable(false);
                aMQPMirrorBrokerConnectionElement2.setName(AutoCreateJmsDestinationTest.QUEUE_NAME);
                AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration2 = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
                aMQPBrokerConnectConfiguration2.setReconnectAttempts(0);
                aMQPBrokerConnectConfiguration2.setUser("user1");
                aMQPBrokerConnectConfiguration2.setPassword("pass1");
                aMQPBrokerConnectConfiguration2.addElement(aMQPMirrorBrokerConnectionElement2);
                ProtonProtocolManagerFactory protonProtocolManagerFactory = (ProtonProtocolManagerFactory) this.server.getRemotingService().getProtocolFactoryMap().get("AMQP");
                Assertions.assertNotNull(protonProtocolManagerFactory);
                this.server.getConfiguration().clearAMQPConnectionConfigurations();
                this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration2);
                protonProtocolManagerFactory.updateProtocolServices(this.server, Collections.emptyList());
                protonTestServer.waitForScriptToComplete();
                protonTestServer.expectTransfer().withMessageFormat(0).accept();
                createProducer.setDeliveryMode(2);
                createProducer.send(createTextMessage);
                createConsumer.close();
                protonTestServer.waitForScriptToComplete();
                if (createConnection != null) {
                    createConnection.close();
                }
                this.server.stop();
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testMirrorConnectionRemainsUnchangedAfterConfigurationRemoved() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker");
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLPlainConnect("user", "pass", new String[]{PlainMechanism.MECH_NAME, "ANONYMOUS"});
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR")).withDesiredCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).respond().withOfferedCapabilities(new String[]{"amq.mirror", AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).withPropertiesMap(hashMap);
            protonTestServer.remoteFlow().withLinkCredit(10L).queue();
            protonTestServer.expectTransfer().accept();
            protonTestServer.expectTransfer().accept();
            protonTestServer.expectTransfer().accept();
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPMirrorBrokerConnectionElement aMQPMirrorBrokerConnectionElement = new AMQPMirrorBrokerConnectionElement();
            aMQPMirrorBrokerConnectionElement.setQueueCreation(true);
            aMQPMirrorBrokerConnectionElement.setDurable(true);
            aMQPMirrorBrokerConnectionElement.setName(AutoCreateJmsDestinationTest.QUEUE_NAME);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.setUser("user");
            aMQPBrokerConnectConfiguration.setPassword("pass");
            aMQPBrokerConnectConfiguration.addElement(aMQPMirrorBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673").createConnection();
            try {
                Session createSession = createConnection.createSession(false, 2);
                Queue createQueue = createSession.createQueue("myQueue");
                MessageConsumer createConsumer = createSession.createConsumer(createQueue);
                MessageProducer createProducer = createSession.createProducer(createQueue);
                TextMessage createTextMessage = createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME);
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                ProtonProtocolManagerFactory protonProtocolManagerFactory = (ProtonProtocolManagerFactory) this.server.getRemotingService().getProtocolFactoryMap().get("AMQP");
                Assertions.assertNotNull(protonProtocolManagerFactory);
                this.server.getConfiguration().clearAMQPConnectionConfigurations();
                protonProtocolManagerFactory.updateProtocolServices(this.server, Collections.emptyList());
                protonTestServer.waitForScriptToComplete();
                protonTestServer.expectTransfer().withMessageFormat(0).accept();
                createProducer.setDeliveryMode(2);
                createProducer.send(createTextMessage);
                createConsumer.close();
                protonTestServer.waitForScriptToComplete();
                if (createConnection != null) {
                    createConnection.close();
                }
                this.server.stop();
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
