package org.apache.activemq.artemis.tests.integration.amqp.connect;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.Session;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.ActiveMQServerAMQPFederationPlugin;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.integration.client.AutoCreateJmsDestinationTest;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.sasl.PlainMechanism;
import org.apache.activemq.transport.netty.NettyTransportOptions;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.MessageAnnotationsMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.transport.TransferPayloadCompositeMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpValueMatcher;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.jgroups.util.UUID;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.class */
public class AMQPFederationConnectTest extends AmqpClientTestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    protected ActiveMQServer createServer() throws Exception {
        return createServer(NettyTransportOptions.DEFAULT_TCP_PORT, false);
    }

    @Timeout(20)
    @Test
    public void testBrokerConnectsWithAnonymous() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect(new String[]{PlainMechanism.MECH_NAME, "ANONYMOUS"});
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederatedBrokerConnectsWithPlain() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLPlainConnect("user", "pass", new String[]{PlainMechanism.MECH_NAME, "ANONYMOUS"});
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            aMQPBrokerConnectConfiguration.setUser("user");
            aMQPBrokerConnectConfiguration.setPassword("pass");
            this.server.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationConfiguredCreatesControlLink() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("amqpCredits", 100);
        hashMap.put("amqpLowCredits", 50);
        hashMap.put("amqpPullConsumerCredits", 50);
        hashMap.put("minLargeMessageSize", 10000);
        hashMap.put("attach-timeout", 60);
        hashMap.put("ignoreQueueConsumerFilters", false);
        hashMap.put("ignoreQueueConsumerPriorities", false);
        hashMap.put("tunnel-core-messages", false);
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect(new String[]{PlainMechanism.MECH_NAME, "ANONYMOUS"});
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString("federation-"), CoreMatchers.containsString("myFederation")})).withProperty(AMQPFederationConstants.FEDERATION_CONFIGURATION.toString(), hashMap).withTarget().withDynamic(true).withCapabilities(new String[]{"temporary-topic"}).and().respond().withTarget().withAddress("test-control-address").and().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort() + "?amqpCredits=100&amqpLowCredits=50&amqpMinLargeMessageSize=10000");
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement("myFederation");
            aMQPFederatedBrokerConnectionElement.addProperty("attach-timeout", 60);
            aMQPFederatedBrokerConnectionElement.addProperty("tunnel-core-messages", Boolean.toString(false));
            aMQPFederatedBrokerConnectionElement.addProperty("amqpPullConsumerCredits", 50);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            Wait.assertTrue(() -> {
                return this.server.locateQueue("$ACTIVEMQ_ARTEMIS_FEDERATION.control.test-control-address") != null;
            });
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationCreatesControlLinkAndClosesConnectionIfCapabilityIsAbsent() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect(new String[]{PlainMechanism.MECH_NAME, "ANONYMOUS"});
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond();
            protonTestServer.expectClose().optional();
            protonTestServer.expectConnectionToDrop();
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(new AMQPFederatedBrokerConnectionElement(getTestName()));
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationCreatesControlLinkAndClosesConnectionDetachIndicatesNotAuthorized() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect(new String[]{PlainMechanism.MECH_NAME, "ANONYMOUS"});
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()}).withSource().also().withNullTarget();
            protonTestServer.remoteDetach().withErrorCondition("amqp:unauthorized-access", "Not authroized").queue();
            protonTestServer.expectDetach().optional();
            protonTestServer.expectClose().optional();
            protonTestServer.expectConnectionToDrop();
            protonTestServer.expectSASLAnonymousConnect(new String[]{PlainMechanism.MECH_NAME, "ANONYMOUS"});
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withTarget().withDynamic(true).and().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respondInKind().withTarget().withAddress("dynamic-name");
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(1);
            aMQPBrokerConnectConfiguration.setRetryInterval(200);
            aMQPBrokerConnectConfiguration.addElement(new AMQPFederatedBrokerConnectionElement(getTestName()));
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            protonTestServer.waitForScriptToComplete(10L, TimeUnit.SECONDS);
            this.server.stop();
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationSendsReceiveFromQueuePolicyToRemoteWhenSendToIsConfigured() throws Exception {
        MessageAnnotationsMatcher messageAnnotationsMatcher = new MessageAnnotationsMatcher(true);
        messageAnnotationsMatcher.withEntry(AMQPFederationConstants.OPERATION_TYPE.toString(), Matchers.is("ADD_QUEUE_POLICY"));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add("a");
        arrayList.add("b");
        arrayList.add("c");
        arrayList.add("d");
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add("e");
        arrayList2.add("f");
        arrayList2.add("g");
        arrayList2.add("h");
        linkedHashMap.put("policy-name", "test-policy");
        linkedHashMap.put("include-federated", true);
        linkedHashMap.put("priority-adjustment", 42);
        linkedHashMap.put("queue-includes", arrayList);
        linkedHashMap.put("queue-excludes", arrayList2);
        EncodedAmqpValueMatcher encodedAmqpValueMatcher = new EncodedAmqpValueMatcher(linkedHashMap);
        TransferPayloadCompositeMatcher transferPayloadCompositeMatcher = new TransferPayloadCompositeMatcher();
        transferPayloadCompositeMatcher.setMessageAnnotationsMatcher(messageAnnotationsMatcher);
        transferPayloadCompositeMatcher.addMessageContentMatcher(encodedAmqpValueMatcher);
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withTarget().withDynamic(true).and().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respondInKind().withTarget().withAddress("test-dynamic");
            protonTestServer.remoteFlow().withLinkCredit(10L).queue();
            protonTestServer.expectAttach().ofSender().withTarget().withDynamic(true).and().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind().withTarget().withAddress("test-dynamic-events");
            protonTestServer.remoteFlow().withLinkCredit(10L).queue();
            protonTestServer.expectTransfer().withPayload(transferPayloadCompositeMatcher);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPFederationQueuePolicyElement aMQPFederationQueuePolicyElement = new AMQPFederationQueuePolicyElement();
            aMQPFederationQueuePolicyElement.setName("test-policy");
            aMQPFederationQueuePolicyElement.setIncludeFederated(true);
            aMQPFederationQueuePolicyElement.setPriorityAdjustment(42);
            aMQPFederationQueuePolicyElement.addToIncludes("a", "b");
            aMQPFederationQueuePolicyElement.addToIncludes("c", "d");
            aMQPFederationQueuePolicyElement.addToExcludes("e", "f");
            aMQPFederationQueuePolicyElement.addToExcludes("g", "h");
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addRemoteQueuePolicy(aMQPFederationQueuePolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.close();
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationSendsReceiveFromQueuePolicyToRemoteWhenSendToIsConfiguredAndEventSenderRejected() throws Exception {
        MessageAnnotationsMatcher messageAnnotationsMatcher = new MessageAnnotationsMatcher(true);
        messageAnnotationsMatcher.withEntry(AMQPFederationConstants.OPERATION_TYPE.toString(), Matchers.is("ADD_QUEUE_POLICY"));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add("a");
        arrayList.add("b");
        arrayList.add("c");
        arrayList.add("d");
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add("e");
        arrayList2.add("f");
        arrayList2.add("g");
        arrayList2.add("h");
        linkedHashMap.put("policy-name", "test-policy");
        linkedHashMap.put("include-federated", true);
        linkedHashMap.put("priority-adjustment", 42);
        linkedHashMap.put("queue-includes", arrayList);
        linkedHashMap.put("queue-excludes", arrayList2);
        EncodedAmqpValueMatcher encodedAmqpValueMatcher = new EncodedAmqpValueMatcher(linkedHashMap);
        TransferPayloadCompositeMatcher transferPayloadCompositeMatcher = new TransferPayloadCompositeMatcher();
        transferPayloadCompositeMatcher.setMessageAnnotationsMatcher(messageAnnotationsMatcher);
        transferPayloadCompositeMatcher.addMessageContentMatcher(encodedAmqpValueMatcher);
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withTarget().withDynamic(true).and().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respondInKind().withTarget().withAddress("test-dynamic");
            protonTestServer.expectAttach().ofSender().withTarget().withDynamic(true).and().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respond().withNullSource();
            protonTestServer.expectDetach().respond();
            protonTestServer.remoteFlow().withHandle(0L).withLinkCredit(10L).queue();
            protonTestServer.expectTransfer().withPayload(transferPayloadCompositeMatcher);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPFederationQueuePolicyElement aMQPFederationQueuePolicyElement = new AMQPFederationQueuePolicyElement();
            aMQPFederationQueuePolicyElement.setName("test-policy");
            aMQPFederationQueuePolicyElement.setIncludeFederated(true);
            aMQPFederationQueuePolicyElement.setPriorityAdjustment(42);
            aMQPFederationQueuePolicyElement.addToIncludes("a", "b");
            aMQPFederationQueuePolicyElement.addToIncludes("c", "d");
            aMQPFederationQueuePolicyElement.addToExcludes("e", "f");
            aMQPFederationQueuePolicyElement.addToExcludes("g", "h");
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addRemoteQueuePolicy(aMQPFederationQueuePolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.close();
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationSendsReceiveFromAddressPolicyToRemoteWhenSendToIsConfigured() throws Exception {
        MessageAnnotationsMatcher messageAnnotationsMatcher = new MessageAnnotationsMatcher(true);
        messageAnnotationsMatcher.withEntry(AMQPFederationConstants.OPERATION_TYPE.toString(), Matchers.is("ADD_ADDRESS_POLICY"));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add("include");
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add("exclude");
        linkedHashMap.put("policy-name", "test-policy");
        linkedHashMap.put("auto-delete", true);
        linkedHashMap.put("auto-delete-delay", 42L);
        linkedHashMap.put("auto-delete-msg-count", 314L);
        linkedHashMap.put("max-hops", 5);
        linkedHashMap.put("enable-divert-bindings", false);
        linkedHashMap.put("address-includes", arrayList);
        linkedHashMap.put("address-excludes", arrayList2);
        EncodedAmqpValueMatcher encodedAmqpValueMatcher = new EncodedAmqpValueMatcher(linkedHashMap);
        TransferPayloadCompositeMatcher transferPayloadCompositeMatcher = new TransferPayloadCompositeMatcher();
        transferPayloadCompositeMatcher.setMessageAnnotationsMatcher(messageAnnotationsMatcher);
        transferPayloadCompositeMatcher.addMessageContentMatcher(encodedAmqpValueMatcher);
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withTarget().withDynamic(true).and().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respondInKind().withTarget().withAddress("test-dynamic");
            protonTestServer.remoteFlow().withLinkCredit(10L).queue();
            protonTestServer.expectAttach().ofSender().withTarget().withDynamic(true).and().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind().withTarget().withAddress("test-dynamic-events");
            protonTestServer.remoteFlow().withLinkCredit(10L).queue();
            protonTestServer.expectTransfer().withPayload(transferPayloadCompositeMatcher);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("test-policy");
            aMQPFederationAddressPolicyElement.setAutoDelete(true);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(42L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(314L);
            aMQPFederationAddressPolicyElement.setMaxHops(5);
            aMQPFederationAddressPolicyElement.setEnableDivertBindings(false);
            aMQPFederationAddressPolicyElement.addToIncludes("include");
            aMQPFederationAddressPolicyElement.addToExcludes("exclude");
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addRemoteAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.close();
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationSendsReceiveFromAddressPolicyToRemoteWhenSendToIsConfiguredAndEventSenderRejected() throws Exception {
        MessageAnnotationsMatcher messageAnnotationsMatcher = new MessageAnnotationsMatcher(true);
        messageAnnotationsMatcher.withEntry(AMQPFederationConstants.OPERATION_TYPE.toString(), Matchers.is("ADD_ADDRESS_POLICY"));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add("include");
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add("exclude");
        linkedHashMap.put("policy-name", "test-policy");
        linkedHashMap.put("auto-delete", true);
        linkedHashMap.put("auto-delete-delay", 42L);
        linkedHashMap.put("auto-delete-msg-count", 314L);
        linkedHashMap.put("max-hops", 5);
        linkedHashMap.put("enable-divert-bindings", false);
        linkedHashMap.put("address-includes", arrayList);
        linkedHashMap.put("address-excludes", arrayList2);
        EncodedAmqpValueMatcher encodedAmqpValueMatcher = new EncodedAmqpValueMatcher(linkedHashMap);
        TransferPayloadCompositeMatcher transferPayloadCompositeMatcher = new TransferPayloadCompositeMatcher();
        transferPayloadCompositeMatcher.setMessageAnnotationsMatcher(messageAnnotationsMatcher);
        transferPayloadCompositeMatcher.addMessageContentMatcher(encodedAmqpValueMatcher);
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withTarget().withDynamic(true).and().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respondInKind().withTarget().withAddress("test-dynamic");
            protonTestServer.expectAttach().ofSender().withTarget().withDynamic(true).and().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respond().withNullTarget();
            protonTestServer.expectDetach().respond();
            protonTestServer.remoteFlow().withHandle(0L).withLinkCredit(10L).queue();
            protonTestServer.expectTransfer().withPayload(transferPayloadCompositeMatcher);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("test-policy");
            aMQPFederationAddressPolicyElement.setAutoDelete(true);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(42L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(314L);
            aMQPFederationAddressPolicyElement.setMaxHops(5);
            aMQPFederationAddressPolicyElement.setEnableDivertBindings(false);
            aMQPFederationAddressPolicyElement.addToIncludes("include");
            aMQPFederationAddressPolicyElement.addToExcludes("exclude");
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addRemoteAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.close();
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testConnectToBrokerFromRemoteAsFederatedSourceAndCreateControlLink() throws Exception {
        this.server.start();
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            scriptFederationConnectToRemote(protonTestClient, getTestName());
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectClose();
            protonTestClient.remoteClose().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.close();
            this.server.stop();
            logger.info("Test stopped");
            protonTestClient.close();
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testConnectToBrokerFromRemoteAsFederatedSourceAndCreateEventsSenderLink() throws Exception {
        this.server.start();
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            scriptFederationConnectToRemote(protonTestClient, getTestName(), false, null, null, true, false);
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectClose();
            protonTestClient.remoteClose().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.close();
            this.server.stop();
            logger.info("Test stopped");
            protonTestClient.close();
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testConnectToBrokerFromRemoteAsFederatedSourceAndCreateEventsReceiverLink() throws Exception {
        this.server.start();
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            scriptFederationConnectToRemote(protonTestClient, getTestName(), false, null, null, false, true);
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectClose();
            protonTestClient.remoteClose().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.close();
            this.server.stop();
            logger.info("Test stopped");
            protonTestClient.close();
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testConnectToBrokerFromRemoteAsFederatedSourceAndCreateEventsLinks() throws Exception {
        this.server.start();
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            scriptFederationConnectToRemote(protonTestClient, getTestName(), false, null, null, true, true);
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectClose();
            protonTestClient.remoteClose().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.close();
            this.server.stop();
            logger.info("Test stopped");
            protonTestClient.close();
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testControlLinkPassesConnectAttemptWhenUserHasPrivledges() throws Exception {
        enableSecurity(this.server, "$ACTIVEMQ_ARTEMIS_FEDERATION");
        this.server.start();
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            scriptFederationConnectToRemote(protonTestClient, getTestName(), this.fullUser, this.fullPass);
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectClose();
            protonTestClient.remoteClose().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.close();
            this.server.stop();
            logger.info("Test stopped");
            protonTestClient.close();
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testControlAndEventsLinksPassesConnectAttemptWhenUserHasPrivledges() throws Exception {
        enableSecurity(this.server, "$ACTIVEMQ_ARTEMIS_FEDERATION.#");
        this.server.start();
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            scriptFederationConnectToRemote(protonTestClient, getTestName(), true, this.fullUser, this.fullPass, true, true);
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectClose();
            protonTestClient.remoteClose().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.close();
            this.server.stop();
            logger.info("Test stopped");
            protonTestClient.close();
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testControlLinkRefusesConnectAttemptWhenUseDoesNotHavePrivledgesForControlAddress() throws Exception {
        enableSecurity(this.server, "$ACTIVEMQ_ARTEMIS_FEDERATION");
        this.server.start();
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            scriptFederationConnectToRemoteNotAuthorizedForControlAddress(protonTestClient, getTestName(), this.guestUser, this.guestPass);
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.close();
            this.server.stop();
            logger.info("Test stopped");
            protonTestClient.close();
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testRemoteConnectionCannotAttachEventReceiverLinkWithoutControlLink() throws Exception {
        this.server.start();
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            protonTestClient.queueClientSaslAnonymousConnect();
            protonTestClient.remoteOpen().queue();
            protonTestClient.expectOpen();
            protonTestClient.remoteBegin().queue();
            protonTestClient.expectBegin();
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectAttach().ofSender().withName("federation-event-receiver").withNullSource().withTarget();
            protonTestClient.expectDetach().withError(AmqpError.ILLEGAL_STATE.toString()).respond();
            protonTestClient.remoteAttach().ofReceiver().withDesiredCapabilities(new String[]{AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()}).withName("federation-event-receiver").withSenderSettleModeSettled().withReceivervSettlesFirst().withSource().withDurabilityOfNone().withExpiryPolicyOnLinkDetach().withCapabilities(new String[]{"temporary-topic"}).withDynamic(true).and().withTarget().and().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectClose();
            protonTestClient.remoteClose().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.close();
            this.server.stop();
            protonTestClient.close();
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testRemoteConnectionCannotAttachEventSenderLinkWithoutControlLink() throws Exception {
        this.server.start();
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            protonTestClient.queueClientSaslAnonymousConnect();
            protonTestClient.remoteOpen().queue();
            protonTestClient.expectOpen();
            protonTestClient.remoteBegin().queue();
            protonTestClient.expectBegin();
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectAttach().ofReceiver().withName("federation-event-sender").withSource().also().withNullTarget();
            protonTestClient.expectDetach().withError(AmqpError.ILLEGAL_STATE.toString()).respond();
            protonTestClient.remoteAttach().ofSender().withDesiredCapabilities(new String[]{AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()}).withName("federation-event-sender").withSenderSettleModeSettled().withReceivervSettlesFirst().withTarget().withDurabilityOfNone().withExpiryPolicyOnLinkDetach().withCapabilities(new String[]{"temporary-topic"}).withDynamic(true).and().withSource().and().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectClose();
            protonTestClient.remoteClose().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.close();
            this.server.stop();
            protonTestClient.close();
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testControlLinkSenderQueueCreatedWithMaxConsumersOfOne() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect(new String[]{PlainMechanism.MECH_NAME, "ANONYMOUS"});
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString("federation-"), CoreMatchers.containsString("myFederation")})).withTarget().withDynamic(true).withCapabilities(new String[]{"temporary-topic"}).and().respond().withTarget().withAddress("test-control-address").and().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(new AMQPFederatedBrokerConnectionElement("myFederation"));
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            Wait.assertTrue(() -> {
                return this.server.locateQueue("$ACTIVEMQ_ARTEMIS_FEDERATION.control.test-control-address") != null;
            });
            protonTestServer.expectAttach().ofSender().withName("test-control-link-suspect").withNullSource();
            protonTestServer.expectDetach().withClosed(true).withError(AmqpError.INTERNAL_ERROR.toString());
            protonTestServer.remoteAttach().ofReceiver().withName("test-control-link-suspect").withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress("$ACTIVEMQ_ARTEMIS_FEDERATION.control.test-control-address").also().now();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testEventSenderLinkFromTargetUsesNamespacedDynamicQueue() throws Exception {
        this.server.start();
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            protonTestClient.queueClientSaslAnonymousConnect();
            protonTestClient.remoteOpen().queue();
            protonTestClient.expectOpen();
            protonTestClient.remoteBegin().queue();
            protonTestClient.expectBegin();
            protonTestClient.remoteAttach().ofSender().withName("federation-test").withDesiredCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()}).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withSource().also().withTarget().withDynamic(true).withDurabilityOfNone().withExpiryPolicyOnLinkDetach().withLifetimePolicyOfDeleteOnClose().withCapabilities(new String[]{"temporary-topic"}).also().queue();
            protonTestClient.expectAttach().ofReceiver().withName("federation-test").withTarget().withAddress(CoreMatchers.notNullValue()).also().withOfferedCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString());
            protonTestClient.expectFlow();
            protonTestClient.remoteAttach().ofReceiver().withName("events-receiver-test").withDesiredCapabilities(new String[]{AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()}).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withDynamic(true).withDurabilityOfNone().withExpiryPolicyOnLinkDetach().withLifetimePolicyOfDeleteOnClose().withCapabilities(new String[]{"temporary-topic"}).also().queue();
            protonTestClient.remoteFlow().withLinkCredit(10L).queue();
            protonTestClient.expectAttach().ofSender().withName("events-receiver-test").withSource().withAddress(CoreMatchers.notNullValue()).also().withOfferedCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString());
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            Wait.assertTrue(() -> {
                return this.server.locateQueue("$ACTIVEMQ_ARTEMIS_FEDERATION.events.events-receiver-test") != null;
            });
            this.server.stop();
            protonTestClient.close();
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testEventsLinkAtTargetIsCreatedWithMaxConsumersOfOne() throws Exception {
        this.server.start();
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            protonTestClient.queueClientSaslAnonymousConnect();
            protonTestClient.remoteOpen().queue();
            protonTestClient.expectOpen();
            protonTestClient.remoteBegin().queue();
            protonTestClient.expectBegin();
            protonTestClient.remoteAttach().ofSender().withName("federation-test").withDesiredCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()}).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withSource().also().withTarget().withDynamic(true).withDurabilityOfNone().withExpiryPolicyOnLinkDetach().withLifetimePolicyOfDeleteOnClose().withCapabilities(new String[]{"temporary-topic"}).also().queue();
            protonTestClient.expectAttach().ofReceiver().withName("federation-test").withTarget().withAddress(CoreMatchers.notNullValue()).also().withOfferedCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString());
            protonTestClient.expectFlow();
            protonTestClient.remoteAttach().ofReceiver().withName("events-receiver-test").withDesiredCapabilities(new String[]{AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()}).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withDynamic(true).withDurabilityOfNone().withExpiryPolicyOnLinkDetach().withLifetimePolicyOfDeleteOnClose().withCapabilities(new String[]{"temporary-topic"}).also().queue();
            protonTestClient.remoteFlow().withLinkCredit(10L).queue();
            protonTestClient.expectAttach().ofSender().withName("events-receiver-test").withSource().withAddress(CoreMatchers.notNullValue()).also().withOfferedCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString());
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            Wait.assertTrue(() -> {
                return this.server.locateQueue("$ACTIVEMQ_ARTEMIS_FEDERATION.events.events-receiver-test") != null;
            });
            protonTestClient.expectAttach().ofSender().withName("test-events-link-suspect").withNullSource();
            protonTestClient.expectDetach().withClosed(true).withError(AmqpError.INTERNAL_ERROR.toString());
            protonTestClient.remoteAttach().ofReceiver().withName("test-events-link-suspect").withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withAddress("$ACTIVEMQ_ARTEMIS_FEDERATION.events.events-receiver-test").also().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            this.server.stop();
            protonTestClient.close();
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(30)
    @Test
    public void testFederationDemandAddedAndImmediateBrokerShutdownOverlaps() throws Exception {
        for (int i = 0; i < 2; i++) {
            doTestFederationDemandAddedAndImmediateBrokerShutdown();
        }
    }

    private void doTestFederationDemandAddedAndImmediateBrokerShutdown() throws Exception {
        if (this.server == null || !this.server.isStarted()) {
            this.server = createServer();
        }
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withSenderSettleModeSettled().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPFederationQueuePolicyElement aMQPFederationQueuePolicyElement = new AMQPFederationQueuePolicyElement();
            aMQPFederationQueuePolicyElement.setName("queue-policy");
            aMQPFederationQueuePolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME, AutoCreateJmsDestinationTest.QUEUE_NAME);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalQueuePolicy(aMQPFederationQueuePolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.registerBrokerPlugin(new ActiveMQServerAMQPFederationPlugin() { // from class: org.apache.activemq.artemis.tests.integration.amqp.connect.AMQPFederationConnectTest.1
                public void beforeCreateFederationConsumer(FederationConsumerInfo federationConsumerInfo) throws ActiveMQException {
                    AMQPFederationConnectTest.logger.debug("Delaying attach of outgoing federation receiver");
                    ForkJoinPool.commonPool().execute(() -> {
                        try {
                            AMQPFederationConnectTest.this.server.stop();
                        } catch (Exception e) {
                        }
                    });
                    try {
                        Thread.sleep(new Random(System.currentTimeMillis()).nextInt(8));
                    } catch (InterruptedException e) {
                    }
                }
            });
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().optional();
            protonTestServer.expectFlow().optional();
            protonTestServer.expectDetach().optional();
            protonTestServer.expectClose().optional();
            protonTestServer.expectConnectionToDrop();
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                createConnection.start();
                try {
                    createSession.createConsumer(createSession.createQueue(AutoCreateJmsDestinationTest.QUEUE_NAME));
                } catch (JMSException e) {
                }
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void scriptFederationConnectToRemote(ProtonTestClient protonTestClient, String str) {
        scriptFederationConnectToRemote(protonTestClient, str, false, null, null);
    }

    private void scriptFederationConnectToRemote(ProtonTestClient protonTestClient, String str, String str2, String str3) {
        scriptFederationConnectToRemote(protonTestClient, str, true, str2, str3);
    }

    private void scriptFederationConnectToRemote(ProtonTestClient protonTestClient, String str, boolean z, String str2, String str3) {
        scriptFederationConnectToRemote(protonTestClient, str, z, str2, str3, false, false);
    }

    private void scriptFederationConnectToRemote(ProtonTestClient protonTestClient, String str, boolean z, String str2, String str3, boolean z2, boolean z3) {
        String str4 = "Federation:test:" + UUID.randomUUID().toString();
        if (z) {
            protonTestClient.queueClientSaslPlainConnect(str2, str3);
        } else {
            protonTestClient.queueClientSaslAnonymousConnect();
        }
        protonTestClient.remoteOpen().queue();
        protonTestClient.expectOpen();
        protonTestClient.remoteBegin().queue();
        protonTestClient.expectBegin();
        protonTestClient.remoteAttach().ofSender().withName(str4).withDesiredCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()}).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withSource().also().withTarget().withDynamic(true).withDurabilityOfNone().withExpiryPolicyOnLinkDetach().withLifetimePolicyOfDeleteOnClose().withCapabilities(new String[]{"temporary-topic"}).also().queue();
        protonTestClient.expectAttach().ofReceiver().withName(str4).withTarget().withAddress(CoreMatchers.notNullValue()).also().withOfferedCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString());
        protonTestClient.expectFlow();
        if (z2) {
            String str5 = "Federation:events-sender:test:" + UUID.randomUUID().toString();
            protonTestClient.remoteAttach().ofSender().withName(str5).withDesiredCapabilities(new String[]{AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()}).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withSource().also().withTarget().withDynamic(true).withDurabilityOfNone().withExpiryPolicyOnLinkDetach().withLifetimePolicyOfDeleteOnClose().withCapabilities(new String[]{"temporary-topic"}).also().queue();
            protonTestClient.expectAttach().ofReceiver().withName(str5).withTarget().withAddress(CoreMatchers.notNullValue()).also().withOfferedCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString());
            protonTestClient.expectFlow();
        }
        if (z3) {
            String str6 = "Federation:events-receiver:test:" + UUID.randomUUID().toString();
            protonTestClient.remoteAttach().ofReceiver().withName(str6).withDesiredCapabilities(new String[]{AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()}).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withTarget().also().withSource().withDynamic(true).withDurabilityOfNone().withExpiryPolicyOnLinkDetach().withLifetimePolicyOfDeleteOnClose().withCapabilities(new String[]{"temporary-topic"}).also().queue();
            protonTestClient.remoteFlow().withLinkCredit(10L).queue();
            protonTestClient.expectAttach().ofSender().withName(str6).withSource().withAddress(CoreMatchers.notNullValue()).also().withOfferedCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString());
        }
    }

    private void scriptFederationConnectToRemoteNotAuthorizedForControlAddress(ProtonTestClient protonTestClient, String str, String str2, String str3) {
        String str4 = "Federation:test:" + UUID.randomUUID().toString();
        protonTestClient.queueClientSaslPlainConnect(str2, str3);
        protonTestClient.remoteOpen().queue();
        protonTestClient.expectOpen();
        protonTestClient.remoteBegin().queue();
        protonTestClient.expectBegin();
        protonTestClient.remoteAttach().ofSender().withName(str4).withDesiredCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()}).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withSource().also().withTarget().withDynamic(true).withDurabilityOfNone().withExpiryPolicyOnLinkDetach().withLifetimePolicyOfDeleteOnClose().withCapabilities(new String[]{"temporary-topic"}).also().queue();
        protonTestClient.expectAttach().ofReceiver().withTarget(CoreMatchers.nullValue());
        protonTestClient.expectDetach().withError("amqp:unauthorized-access", "User does not have permission to attach to the federation control address").respond();
        protonTestClient.remoteClose().queue();
        protonTestClient.expectClose();
    }
}
