package org.apache.activemq.artemis.tests.integration.amqp.connect;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationPolicySupport;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.ActiveMQServerAMQPFederationPlugin;
import org.apache.activemq.artemis.protocol.amqp.federation.Federation;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpJmsSelectorFilter;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpNoLocalFilter;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.integration.client.AutoCreateJmsDestinationTest;
import org.apache.activemq.artemis.tests.integration.replication.ReplicationOrderTest;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpTransferTagGenerator;
import org.apache.activemq.transport.netty.NettyTransportOptions;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.LinkError;
import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
import org.apache.qpid.protonj2.test.driver.ProtonTestPeer;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.apache.qpid.protonj2.test.driver.codec.messaging.Source;
import org.apache.qpid.protonj2.test.driver.codec.primitives.DescribedType;
import org.apache.qpid.protonj2.test.driver.codec.primitives.Symbol;
import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong;
import org.apache.qpid.protonj2.test.driver.codec.transport.Attach;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.HeaderMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.MessageAnnotationsMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.PropertiesMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.transport.TransferPayloadCompositeMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpValueMatcher;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.jgroups.util.UUID;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.class */
public class AMQPFederationAddressPolicyTest extends AmqpClientTestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final WildcardConfiguration DEFAULT_WILDCARD_CONFIGURATION = new WildcardConfiguration();

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest$AMQPTestFederationBrokerPlugin.class */
    private class AMQPTestFederationBrokerPlugin implements ActiveMQServerAMQPFederationPlugin {
        public final AtomicBoolean started = new AtomicBoolean();
        public final AtomicBoolean stopped = new AtomicBoolean();
        public final AtomicReference<FederationConsumerInfo> beforeCreateConsumerCapture = new AtomicReference<>();
        public final AtomicReference<FederationConsumer> afterCreateConsumerCapture = new AtomicReference<>();
        public final AtomicReference<FederationConsumer> beforeCloseConsumerCapture = new AtomicReference<>();
        public final AtomicReference<FederationConsumer> afterCloseConsumerCapture = new AtomicReference<>();
        public Consumer<FederationConsumerInfo> beforeCreateConsumer = federationConsumerInfo -> {
            this.beforeCreateConsumerCapture.set(federationConsumerInfo);
        };
        public Consumer<FederationConsumer> afterCreateConsumer = federationConsumer -> {
            this.afterCreateConsumerCapture.set(federationConsumer);
        };
        public Consumer<FederationConsumer> beforeCloseConsumer = federationConsumer -> {
            this.beforeCloseConsumerCapture.set(federationConsumer);
        };
        public Consumer<FederationConsumer> afterCloseConsumer = federationConsumer -> {
            this.afterCloseConsumerCapture.set(federationConsumer);
        };
        public BiConsumer<FederationConsumer, Message> beforeMessageHandled = (federationConsumer, message) -> {
        };
        public BiConsumer<FederationConsumer, Message> afterMessageHandled = (federationConsumer, message) -> {
        };
        public Function<AddressInfo, Boolean> shouldCreateConsumerForAddress = addressInfo -> {
            return true;
        };
        public Function<Queue, Boolean> shouldCreateConsumerForQueue = queue -> {
            return true;
        };
        public BiFunction<Divert, Queue, Boolean> shouldCreateConsumerForDivert = (divert, queue) -> {
            return true;
        };

        private AMQPTestFederationBrokerPlugin() {
        }

        public void federationStarted(Federation federation) throws ActiveMQException {
            this.started.set(true);
        }

        public void federationStopped(Federation federation) throws ActiveMQException {
            this.stopped.set(true);
        }

        public void beforeCreateFederationConsumer(FederationConsumerInfo federationConsumerInfo) throws ActiveMQException {
            this.beforeCreateConsumer.accept(federationConsumerInfo);
        }

        public void afterCreateFederationConsumer(FederationConsumer federationConsumer) throws ActiveMQException {
            this.afterCreateConsumer.accept(federationConsumer);
        }

        public void beforeCloseFederationConsumer(FederationConsumer federationConsumer) throws ActiveMQException {
            this.beforeCloseConsumer.accept(federationConsumer);
        }

        public void afterCloseFederationConsumer(FederationConsumer federationConsumer) throws ActiveMQException {
            this.afterCloseConsumer.accept(federationConsumer);
        }

        public void beforeFederationConsumerMessageHandled(FederationConsumer federationConsumer, Message message) throws ActiveMQException {
            this.beforeMessageHandled.accept(federationConsumer, message);
        }

        public void afterFederationConsumerMessageHandled(FederationConsumer federationConsumer, Message message) throws ActiveMQException {
            this.afterMessageHandled.accept(federationConsumer, message);
        }

        public boolean shouldCreateFederationConsumerForAddress(AddressInfo addressInfo) throws ActiveMQException {
            return this.shouldCreateConsumerForAddress.apply(addressInfo).booleanValue();
        }

        public boolean shouldCreateFederationConsumerForQueue(Queue queue) throws ActiveMQException {
            return this.shouldCreateConsumerForQueue.apply(queue).booleanValue();
        }

        public boolean shouldCreateFederationConsumerForDivert(Divert divert, Queue queue) throws ActiveMQException {
            return this.shouldCreateConsumerForDivert.apply(divert, queue).booleanValue();
        }
    }

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest$ApplicationPropertiesTransformer.class */
    public static class ApplicationPropertiesTransformer implements Transformer {
        private final Map<String, String> properties = new HashMap();

        public void init(Map<String, String> map) {
            this.properties.putAll(map);
        }

        public Message transform(Message message) {
            if (!(message instanceof AMQPMessage)) {
                return message;
            }
            this.properties.forEach((str, str2) -> {
                message.putStringProperty(str, str2);
            });
            message.reencode();
            return message;
        }
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    protected String getConfiguredProtocols() {
        return "AMQP,CORE";
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    protected ActiveMQServer createServer() throws Exception {
        return createServer(NettyTransportOptions.DEFAULT_TCP_PORT, false);
    }

    @Timeout(20)
    @Test
    public void testFederationCreatesAddressReceiverWhenLocalQueueIsStaticlyDefined() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withSenderSettleModeSettled().withSource().withDynamic(true).and().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind().withTarget().withAddress("test-dynamic-events");
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setAutoDelete(false);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            HashMap hashMap = new HashMap();
            hashMap.put("auto-delete", false);
            hashMap.put("auto-delete-delay", -1L);
            hashMap.put("auto-delete-msg-count", -1L);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(1000);
            this.server.createQueue(QueueConfiguration.of(AutoCreateJmsDestinationTest.QUEUE_NAME).setRoutingType(RoutingType.MULTICAST).setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).setAutoCreated(false));
            Wait.assertTrue(() -> {
                return this.server.queueQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME)).isExists();
            });
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                createConnection.start();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectDetach().respond();
                logger.info("Removing Queues from federated address to eliminate demand");
                this.server.destroyQueue(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME));
                Wait.assertFalse(() -> {
                    return this.server.queueQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME)).isExists();
                });
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectClose();
                protonTestServer.remoteClose().now();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationCreatesAddressReceiverLinkForAddressMatch() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withSource().withDynamic(true).and().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind().withTarget().withAddress("test-dynamic-events");
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setAutoDelete(true);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(10000L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            HashMap hashMap = new HashMap();
            hashMap.put("auto-delete", true);
            hashMap.put("auto-delete-delay", 10000L);
            hashMap.put("auto-delete-msg-count", -1L);
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(1000);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                createConnection.start();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationCreatesAddressReceiverLinkForAddressMatchUsingPolicyCredit() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withSource().withDynamic(true).and().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind().withTarget().withAddress("test-dynamic-events");
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setAutoDelete(true);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(10000L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            aMQPFederationAddressPolicyElement.addProperty("amqpCredits", "25");
            aMQPFederationAddressPolicyElement.addProperty("amqpLowCredits", "5");
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            HashMap hashMap = new HashMap();
            hashMap.put("auto-delete", true);
            hashMap.put("auto-delete-delay", 10000L);
            hashMap.put("auto-delete-msg-count", -1L);
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(25);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                createConnection.start();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationCreatesAddressReceiverLinkForAddressMatchWithMaxHopsFilter() throws Exception {
        doTestFederationCreatesAddressReceiverLinkForAddressWithCorrectFilters(true);
    }

    @Timeout(20)
    @Test
    public void testFederationCreatesAddressReceiverLinkForAddressMatchWithoutMaxHopsFilter() throws Exception {
        doTestFederationCreatesAddressReceiverLinkForAddressWithCorrectFilters(false);
    }

    private void doTestFederationCreatesAddressReceiverLinkForAddressWithCorrectFilters(boolean z) throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withSource().withDynamic(true).and().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind().withTarget().withAddress("test-dynamic-events");
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            if (z) {
                aMQPFederationAddressPolicyElement.setMaxHops(1);
            } else {
                aMQPFederationAddressPolicyElement.setMaxHops(0);
            }
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setAutoDelete(true);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(10000L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            String generateAddressFilter = AMQPFederationPolicySupport.generateAddressFilter(1);
            Symbol valueOf = Symbol.valueOf("jms-selector");
            Symbol valueOf2 = Symbol.valueOf("apache.org:no-local-filter:list");
            UnsignedLong valueOf3 = UnsignedLong.valueOf(77567109365763L);
            UnsignedLong valueOf4 = UnsignedLong.valueOf(77567109365764L);
            HashMap hashMap = new HashMap();
            hashMap.put(AmqpSupport.JMS_SELECTOR_KEY.toString(), new AmqpJmsSelectorFilter(generateAddressFilter));
            hashMap.put(AmqpSupport.NO_LOCAL_NAME.toString(), new AmqpNoLocalFilter());
            HashMap hashMap2 = new HashMap();
            hashMap2.put("auto-delete", true);
            hashMap2.put("auto-delete-delay", 10000L);
            hashMap2.put("auto-delete-msg-count", -1L);
            AtomicReference atomicReference = new AtomicReference();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap2).withCapture(attach -> {
                atomicReference.set(attach);
            }).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(1000);
            ProtonTestClient protonTestClient = new ProtonTestClient();
            try {
                protonTestClient.queueClientSaslAnonymousConnect();
                protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
                protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestClient.expectOpen();
                protonTestClient.expectBegin();
                protonTestClient.expectAttach();
                protonTestClient.remoteOpen().withContainerId("test-sender").now();
                protonTestClient.remoteBegin().now();
                protonTestClient.remoteAttach().ofReceiver().withInitialDeliveryCount(0L).withName("sending-peer").withSource().withAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).withCapabilities(new String[]{"topic"}).also().withTarget().also().now();
                protonTestClient.remoteFlow().withLinkCredit(10L).now();
                protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                Assertions.assertNotNull(atomicReference.get());
                Source source = ((Attach) atomicReference.get()).getSource();
                Assertions.assertNotNull(source);
                Map filter = source.getFilter();
                Assertions.assertNotNull(filter);
                if (z) {
                    Assertions.assertTrue(filter.containsKey(valueOf));
                    DescribedType describedType = (DescribedType) filter.get(valueOf);
                    Assertions.assertNotNull(describedType);
                    Assertions.assertEquals(describedType.getDescriptor(), valueOf4);
                    Assertions.assertEquals(describedType.getDescribed().toString(), generateAddressFilter);
                } else {
                    Assertions.assertFalse(filter.containsKey(valueOf));
                }
                Assertions.assertTrue(filter.containsKey(valueOf2));
                DescribedType describedType2 = (DescribedType) filter.get(valueOf2);
                Assertions.assertNotNull(describedType2);
                Assertions.assertEquals(describedType2.getDescriptor(), valueOf3);
                HeaderMatcher headerMatcher = new HeaderMatcher(true);
                MessageAnnotationsMatcher messageAnnotationsMatcher = new MessageAnnotationsMatcher(true);
                messageAnnotationsMatcher.withEntry("x-opt-test", Matchers.equalTo(AutoCreateJmsDestinationTest.QUEUE_NAME));
                messageAnnotationsMatcher.withEntry(AMQPFederationPolicySupport.MESSAGE_HOPS_ANNOTATION.toString(), Matchers.equalTo(1));
                EncodedAmqpValueMatcher encodedAmqpValueMatcher = new EncodedAmqpValueMatcher("Hello World");
                TransferPayloadCompositeMatcher transferPayloadCompositeMatcher = new TransferPayloadCompositeMatcher();
                transferPayloadCompositeMatcher.setHeadersMatcher(headerMatcher);
                transferPayloadCompositeMatcher.setMessageAnnotationsMatcher(messageAnnotationsMatcher);
                transferPayloadCompositeMatcher.addMessageContentMatcher(encodedAmqpValueMatcher);
                protonTestClient.expectTransfer().withPayload(transferPayloadCompositeMatcher).accept();
                protonTestServer.expectDisposition().withState().accepted();
                protonTestServer.remoteTransfer().withHeader().withDurability(true).also().withMessageAnnotations().withAnnotation("x-opt-test", AutoCreateJmsDestinationTest.QUEUE_NAME).also().withBody().withString("Hello World").also().withDeliveryId(1).now();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestClient.close();
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationClosesAddressReceiverLinkWhenDemandRemoved() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withSource().withDynamic(true).and().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind().withTarget().withAddress("test-dynamic-events");
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setAutoDelete(false);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            HashMap hashMap = new HashMap();
            hashMap.put("auto-delete", false);
            hashMap.put("auto-delete-delay", -1L);
            hashMap.put("auto-delete-msg-count", -1L);
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(1000);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                createConnection.start();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectDetach().respond();
                createConsumer.close();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationRetainsAddressReceiverLinkWhenDurableSubscriberIsOffline() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withSource().withDynamic(true).and().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind().withTarget().withAddress("test-dynamic-events");
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setAutoDelete(false);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            HashMap hashMap = new HashMap();
            hashMap.put("auto-delete", false);
            hashMap.put("auto-delete-delay", -1L);
            hashMap.put("auto-delete-msg-count", -1L);
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(1000);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                createConnection.setClientID("test-clientId");
                Session createSession = createConnection.createSession(1);
                MessageConsumer createSharedDurableConsumer = createSession.createSharedDurableConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME), "shared-subscription");
                createConnection.start();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                createSharedDurableConsumer.close();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectDetach().respond();
                createSession.unsubscribe("shared-subscription");
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationClosesAddressReceiverLinkWaitsForAllDemandToRemoved() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withSource().withDynamic(true).and().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind().withTarget().withAddress("test-dynamic-events");
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setAutoDelete(false);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            HashMap hashMap = new HashMap();
            hashMap.put("auto-delete", false);
            hashMap.put("auto-delete-delay", -1L);
            hashMap.put("auto-delete-msg-count", -1L);
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(1000);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                MessageConsumer createConsumer2 = createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                createConnection.start();
                createConsumer.close();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectDetach().respond();
                createConsumer2.close();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationHandlesAddressDeletedAndConsumerRecreates() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respondInKind();
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(1000).optional();
            ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672");
            Connection createConnection = createConnectionFactory.createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                createConnection.start();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectDetach().respond();
                this.server.removeAddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), (SecurityAuth) null, true);
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).respondInKind();
                protonTestServer.expectFlow().withLinkCredit(1000).optional();
                createConnection = createConnectionFactory.createConnection();
                try {
                    Session createSession2 = createConnection.createSession(1);
                    createSession2.createConsumer(createSession2.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                    createConnection.start();
                    protonTestServer.expectDetach().respond();
                    if (createConnection != null) {
                        createConnection.close();
                    }
                    protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                    protonTestServer.close();
                    protonTestServer.close();
                } finally {
                }
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationConsumerCreatedWhenDemandAddedToDivertAddress() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setAutoDelete(false);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            aMQPFederationAddressPolicyElement.setEnableDivertBindings(true);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            DivertConfiguration name = new DivertConfiguration().setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).setForwardingAddress("forward").setName("test-divert");
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.deployDivert(name);
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            HashMap hashMap = new HashMap();
            hashMap.put("auto-delete", false);
            hashMap.put("auto-delete-delay", -1L);
            hashMap.put("auto-delete-msg-count", -1L);
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(1000);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic("forward"));
                createConnection.start();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectDetach().respond();
                createConsumer.close();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationConsumerCreatedWhenDemandAddedToCompositeDivertAddress() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setAutoDelete(false);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            aMQPFederationAddressPolicyElement.setEnableDivertBindings(true);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            DivertConfiguration name = new DivertConfiguration().setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).setForwardingAddress("forward1,forward2").setName("test-divert");
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.deployDivert(name);
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            HashMap hashMap = new HashMap();
            hashMap.put("auto-delete", false);
            hashMap.put("auto-delete-delay", -1L);
            hashMap.put("auto-delete-msg-count", -1L);
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(1000);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic("forward1"));
                MessageConsumer createConsumer2 = createSession.createConsumer(createSession.createTopic("forward2"));
                createConnection.start();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                createConsumer.close();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectDetach().respond();
                createConsumer2.close();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationConsumerRemovesDemandFromDivertConsumersOnlyWhenAllDemandIsRemoved() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setAutoDelete(false);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            aMQPFederationAddressPolicyElement.setEnableDivertBindings(true);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            DivertConfiguration name = new DivertConfiguration().setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).setForwardingAddress("forward").setName("test-divert");
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.deployDivert(name);
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            HashMap hashMap = new HashMap();
            hashMap.put("auto-delete", false);
            hashMap.put("auto-delete-delay", -1L);
            hashMap.put("auto-delete-msg-count", -1L);
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(1000);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic("forward"));
                MessageConsumer createConsumer2 = createSession.createConsumer(createSession.createTopic("forward"));
                createConnection.start();
                createConsumer.close();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectDetach().respond();
                createConsumer2.close();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationConsumerRetainsDemandForDivertBindingWithoutActiveAnycastSubscriptions() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes("source");
            aMQPFederationAddressPolicyElement.setAutoDelete(false);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            aMQPFederationAddressPolicyElement.setEnableDivertBindings(true);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            DivertConfiguration name = new DivertConfiguration().setAddress("source").setForwardingAddress("forward").setRoutingType(ComponentConfigurationRoutingType.ANYCAST).setName("test-divert");
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.deployDivert(name);
            this.server.addAddressInfo(new AddressInfo(SimpleString.of("source"), RoutingType.MULTICAST));
            HashMap hashMap = new HashMap();
            hashMap.put("auto-delete", false);
            hashMap.put("auto-delete-delay", -1L);
            hashMap.put("auto-delete-msg-count", -1L);
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString("source"), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(1000);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                jakarta.jms.Queue createQueue = createSession.createQueue("forward");
                MessageConsumer createConsumer = createSession.createConsumer(createQueue);
                MessageConsumer createConsumer2 = createSession.createConsumer(createQueue);
                createConnection.start();
                createConsumer.close();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                createConsumer2.close();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationConsumerRemovesDemandForDivertBindingWithoutActiveMulticastSubscriptions() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes("source");
            aMQPFederationAddressPolicyElement.setAutoDelete(false);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            aMQPFederationAddressPolicyElement.setEnableDivertBindings(true);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            DivertConfiguration name = new DivertConfiguration().setAddress("source").setForwardingAddress("forward").setRoutingType(ComponentConfigurationRoutingType.MULTICAST).setName("test-divert");
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.deployDivert(name);
            this.server.addAddressInfo(new AddressInfo(SimpleString.of("source"), RoutingType.MULTICAST));
            HashMap hashMap = new HashMap();
            hashMap.put("auto-delete", false);
            hashMap.put("auto-delete-delay", -1L);
            hashMap.put("auto-delete-msg-count", -1L);
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString("source"), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(1000);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                Topic createTopic = createSession.createTopic("forward");
                MessageConsumer createConsumer = createSession.createConsumer(createTopic);
                MessageConsumer createConsumer2 = createSession.createConsumer(createTopic);
                createConnection.start();
                createConsumer.close();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectDetach().respond();
                createConsumer2.close();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationRemovesRemoteDemandIfDivertIsRemoved() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes("source");
            aMQPFederationAddressPolicyElement.setAutoDelete(false);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            aMQPFederationAddressPolicyElement.setEnableDivertBindings(true);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            DivertConfiguration name = new DivertConfiguration().setAddress("source").setForwardingAddress("forward").setName("test-divert");
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.deployDivert(name);
            this.server.addAddressInfo(new AddressInfo(SimpleString.of("source"), RoutingType.MULTICAST));
            HashMap hashMap = new HashMap();
            hashMap.put("auto-delete", false);
            hashMap.put("auto-delete-delay", -1L);
            hashMap.put("auto-delete-msg-count", -1L);
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString("source"), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(1000);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                createSession.createConsumer(createSession.createTopic("forward"));
                createConnection.start();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectDetach().respond();
                this.server.destroyDivert(SimpleString.of("test-divert"));
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testDivertBindingsDoNotCreateAdditionalDemandIfDemandOnForwardingAddressAlreadyExists() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setAutoDelete(false);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            aMQPFederationAddressPolicyElement.setEnableDivertBindings(true);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            DivertConfiguration name = new DivertConfiguration().setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).setForwardingAddress("forward").setName("test-divert");
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.deployDivert(name);
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            HashMap hashMap = new HashMap();
            hashMap.put("auto-delete", false);
            hashMap.put("auto-delete-delay", -1L);
            hashMap.put("auto-delete-msg-count", -1L);
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(1000);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                createConnection.start();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                MessageConsumer createConsumer2 = createSession.createConsumer(createSession.createTopic("forward"));
                MessageConsumer createConsumer3 = createSession.createConsumer(createSession.createTopic("forward"));
                createConsumer2.close();
                createConsumer3.close();
                this.server.destroyDivert(SimpleString.of("test-divert"));
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectDetach().respond();
                createConsumer.close();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testInboundMessageRoutedToReceiverOnLocalAddress() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respondInKind();
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(1000);
            protonTestServer.remoteTransfer().withBody().withString("test-message").also().withDeliveryId(0).queue();
            protonTestServer.expectDisposition().withSettled(true).withState().accepted();
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                createConnection.start();
                TextMessage receive = createConsumer.receive(5000L);
                Assertions.assertNotNull(receive);
                Assertions.assertTrue(receive instanceof TextMessage);
                Assertions.assertEquals("test-message", receive.getText());
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectDetach();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testRemoteBrokerAcceptsAddressPolicyFromControlLink() throws Exception {
        this.server.start();
        ArrayList arrayList = new ArrayList();
        arrayList.add("address1");
        arrayList.add("address2");
        ArrayList arrayList2 = new ArrayList();
        arrayList.add("address3");
        FederationReceiveFromAddressPolicy federationReceiveFromAddressPolicy = new FederationReceiveFromAddressPolicy("test-address-policy", true, AmqpConnection.DEFAULT_CLOSE_TIMEOUT, 1000L, 1, true, arrayList, arrayList2, (Map) null, (TransformerConfiguration) null, DEFAULT_WILDCARD_CONFIGURATION);
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            scriptFederationConnectToRemote(protonTestClient, AutoCreateJmsDestinationTest.QUEUE_NAME);
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectDisposition().withSettled(true).withState().accepted();
            sendAddresPolicyToRemote(protonTestClient, federationReceiveFromAddressPolicy);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectClose();
            protonTestClient.remoteClose().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.close();
            this.server.stop();
            protonTestClient.close();
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testRemoteBrokerAcceptsAddressPolicyFromControlLinkWithTransformerConfiguration() throws Exception {
        this.server.start();
        ArrayList arrayList = new ArrayList();
        arrayList.add("address1");
        arrayList.add("address2");
        ArrayList arrayList2 = new ArrayList();
        arrayList.add("address3");
        HashMap hashMap = new HashMap();
        hashMap.put("key1", "value1");
        hashMap.put("key2", "value2");
        TransformerConfiguration transformerConfiguration = new TransformerConfiguration();
        transformerConfiguration.setClassName(ApplicationPropertiesTransformer.class.getName());
        transformerConfiguration.setProperties(hashMap);
        FederationReceiveFromAddressPolicy federationReceiveFromAddressPolicy = new FederationReceiveFromAddressPolicy("test-address-policy", true, AmqpConnection.DEFAULT_CLOSE_TIMEOUT, 1000L, 1, true, arrayList, arrayList2, (Map) null, transformerConfiguration, DEFAULT_WILDCARD_CONFIGURATION);
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            scriptFederationConnectToRemote(protonTestClient, AutoCreateJmsDestinationTest.QUEUE_NAME);
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectDisposition().withSettled(true).withState().accepted();
            sendAddresPolicyToRemote(protonTestClient, federationReceiveFromAddressPolicy);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectClose();
            protonTestClient.remoteClose().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.close();
            this.server.stop();
            protonTestClient.close();
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testRemoteFederatesAddressWhenDemandIsApplied() throws Exception {
        this.server.start();
        ArrayList arrayList = new ArrayList();
        arrayList.add("address1");
        FederationReceiveFromAddressPolicy federationReceiveFromAddressPolicy = new FederationReceiveFromAddressPolicy("test-address-policy", true, AmqpConnection.DEFAULT_CLOSE_TIMEOUT, 1000L, 1, true, arrayList, (Collection) null, (Map) null, (TransformerConfiguration) null, DEFAULT_WILDCARD_CONFIGURATION);
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            scriptFederationConnectToRemote(protonTestClient, AutoCreateJmsDestinationTest.QUEUE_NAME);
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectDisposition().withSettled(true).withState().accepted();
            sendAddresPolicyToRemote(protonTestClient, federationReceiveFromAddressPolicy);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withSource().withAddress("address1").and().respondInKind();
            protonTestClient.expectFlow().withLinkCredit(1000);
            protonTestClient.remoteTransfer().withBody().withString("test-message").also().withDeliveryId(1).queue();
            protonTestClient.expectDisposition().withSettled(true).withState().accepted();
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic("address1"));
                createConnection.start();
                TextMessage receive = createConsumer.receive(5000L);
                Assertions.assertNotNull(receive);
                Assertions.assertTrue(receive instanceof TextMessage);
                Assertions.assertEquals("test-message", receive.getText());
                protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestClient.expectDetach();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestClient.expectClose();
                protonTestClient.remoteClose().now();
                protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestClient.close();
                this.server.stop();
                protonTestClient.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testRemoteFederatesAddressWhenDemandIsAppliedUsingControllerDefinedLinkCredit() throws Exception {
        this.server.start();
        ArrayList arrayList = new ArrayList();
        arrayList.add("address1");
        FederationReceiveFromAddressPolicy federationReceiveFromAddressPolicy = new FederationReceiveFromAddressPolicy("test-address-policy", true, AmqpConnection.DEFAULT_CLOSE_TIMEOUT, 1000L, 1, true, arrayList, (Collection) null, (Map) null, (TransformerConfiguration) null, DEFAULT_WILDCARD_CONFIGURATION);
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            scriptFederationConnectToRemote(protonTestClient, AutoCreateJmsDestinationTest.QUEUE_NAME, 10, 9);
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectDisposition().withSettled(true).withState().accepted();
            sendAddresPolicyToRemote(protonTestClient, federationReceiveFromAddressPolicy);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withSource().withAddress("address1").and().respondInKind();
            protonTestClient.expectFlow().withLinkCredit(10);
            protonTestClient.remoteTransfer().withBody().withString("test-message").also().withDeliveryId(1).queue();
            protonTestClient.expectFlow().withLinkCredit(10);
            protonTestClient.expectDisposition().withSettled(true).withState().accepted();
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic("address1"));
                createConnection.start();
                TextMessage receive = createConsumer.receive(5000L);
                Assertions.assertNotNull(receive);
                Assertions.assertTrue(receive instanceof TextMessage);
                Assertions.assertEquals("test-message", receive.getText());
                protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestClient.expectDetach();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestClient.expectClose();
                protonTestClient.remoteClose().now();
                protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestClient.close();
                this.server.stop();
                protonTestClient.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testRemoteFederatesAddressWhenDemandIsAppliedUsingPolicyDefinedLinkCredit() throws Exception {
        this.server.start();
        ArrayList arrayList = new ArrayList();
        arrayList.add("address1");
        HashMap hashMap = new HashMap();
        hashMap.put("amqpCredits", 40);
        hashMap.put("amqpLowCredits", "39");
        hashMap.put("minLargeMessageSize", Integer.valueOf(AmqpTransferTagGenerator.DEFAULT_TAG_POOL_SIZE));
        FederationReceiveFromAddressPolicy federationReceiveFromAddressPolicy = new FederationReceiveFromAddressPolicy("test-address-policy", true, AmqpConnection.DEFAULT_CLOSE_TIMEOUT, 1000L, 1, true, arrayList, (Collection) null, hashMap, (TransformerConfiguration) null, DEFAULT_WILDCARD_CONFIGURATION);
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            scriptFederationConnectToRemote(protonTestClient, AutoCreateJmsDestinationTest.QUEUE_NAME, 10, 9);
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectDisposition().withSettled(true).withState().accepted();
            sendAddresPolicyToRemote(protonTestClient, federationReceiveFromAddressPolicy);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withSource().withAddress("address1").and().respondInKind();
            protonTestClient.expectFlow().withLinkCredit(40);
            protonTestClient.remoteTransfer().withBody().withString("test-message").also().withDeliveryId(1).queue();
            protonTestClient.expectFlow().withLinkCredit(40);
            protonTestClient.expectDisposition().withSettled(true).withState().accepted();
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic("address1"));
                createConnection.start();
                TextMessage receive = createConsumer.receive(5000L);
                Assertions.assertNotNull(receive);
                Assertions.assertTrue(receive instanceof TextMessage);
                Assertions.assertEquals("test-message", receive.getText());
                protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestClient.expectDetach();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestClient.expectClose();
                protonTestClient.remoteClose().now();
                protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestClient.close();
                this.server.stop();
                protonTestClient.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testRemoteFederatesAddressAndAppliesTransformerWhenDemandIsApplied() throws Exception {
        this.server.start();
        ArrayList arrayList = new ArrayList();
        arrayList.add("address1");
        HashMap hashMap = new HashMap();
        hashMap.put("key1", "value1");
        hashMap.put("key2", "value2");
        TransformerConfiguration transformerConfiguration = new TransformerConfiguration();
        transformerConfiguration.setClassName(ApplicationPropertiesTransformer.class.getName());
        transformerConfiguration.setProperties(hashMap);
        FederationReceiveFromAddressPolicy federationReceiveFromAddressPolicy = new FederationReceiveFromAddressPolicy("test-address-policy", true, AmqpConnection.DEFAULT_CLOSE_TIMEOUT, 1000L, 1, true, arrayList, (Collection) null, (Map) null, transformerConfiguration, DEFAULT_WILDCARD_CONFIGURATION);
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            scriptFederationConnectToRemote(protonTestClient, AutoCreateJmsDestinationTest.QUEUE_NAME);
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectDisposition().withSettled(true).withState().accepted();
            sendAddresPolicyToRemote(protonTestClient, federationReceiveFromAddressPolicy);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withSource().withAddress("address1").and().respondInKind();
            protonTestClient.expectFlow().withLinkCredit(1000);
            protonTestClient.remoteTransfer().withBody().withString("test-message").also().withDeliveryId(1).queue();
            protonTestClient.expectDisposition().withSettled(true).withState().accepted();
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic("address1"));
                createConnection.start();
                TextMessage receive = createConsumer.receive(5000L);
                Assertions.assertNotNull(receive);
                Assertions.assertTrue(receive instanceof TextMessage);
                Assertions.assertEquals("test-message", receive.getText());
                Assertions.assertEquals("value1", receive.getStringProperty("key1"));
                Assertions.assertEquals("value2", receive.getStringProperty("key2"));
                protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestClient.expectDetach();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestClient.expectClose();
                protonTestClient.remoteClose().now();
                protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestClient.close();
                this.server.stop();
                protonTestClient.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testRemoteBrokerAnswersAttachOfFederationReceiverProperly() throws Exception {
        this.server.start();
        HashMap hashMap = new HashMap();
        hashMap.put("auto-delete", true);
        hashMap.put("auto-delete-delay", 10000L);
        hashMap.put("auto-delete-msg-count", 1L);
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            scriptFederationConnectToRemote(protonTestClient, AutoCreateJmsDestinationTest.QUEUE_NAME);
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectAttach().ofSender().withName("federation-address-receiver").withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()}).withTarget().also().withSource().withAddress(AutoCreateJmsDestinationTest.QUEUE_NAME);
            protonTestClient.remoteAttach().ofReceiver().withDesiredCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()}).withName("federation-address-receiver").withSenderSettleModeUnsettled().withReceivervSettlesFirst().withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).withSource().withDurabilityOfNone().withExpiryPolicyOnLinkDetach().withAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).withCapabilities(new String[]{"topic"}).and().withTarget().and().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectClose();
            protonTestClient.remoteClose().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.close();
            this.server.stop();
            protonTestClient.close();
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testReceiverWithMaxHopsFilterAppliesFilterCorrectly() throws Exception {
        this.server.start();
        String str = "\"m." + AMQPFederationPolicySupport.MESSAGE_HOPS_ANNOTATION + "\" IS NULL OR \"m." + AMQPFederationPolicySupport.MESSAGE_HOPS_ANNOTATION + "\"<2";
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            scriptFederationConnectToRemote(protonTestClient, AutoCreateJmsDestinationTest.QUEUE_NAME);
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectAttach().ofSender().withName("federation-address-receiver").withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()}).withSource().withAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).withJMSSelector(str);
            protonTestClient.remoteAttach().ofReceiver().withDesiredCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()}).withName("federation-address-receiver").withSenderSettleModeUnsettled().withReceivervSettlesFirst().withSource().withDurabilityOfNone().withExpiryPolicyOnLinkDetach().withAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).withCapabilities(new String[]{"topic"}).withJMSSelector(str).and().withTarget().and().now();
            protonTestClient.remoteFlow().withLinkCredit(10L).now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            HeaderMatcher headerMatcher = new HeaderMatcher(true);
            MessageAnnotationsMatcher messageAnnotationsMatcher = new MessageAnnotationsMatcher(true);
            PropertiesMatcher propertiesMatcher = new PropertiesMatcher(true);
            EncodedAmqpValueMatcher encodedAmqpValueMatcher = new EncodedAmqpValueMatcher("Hello World");
            TransferPayloadCompositeMatcher transferPayloadCompositeMatcher = new TransferPayloadCompositeMatcher();
            transferPayloadCompositeMatcher.setHeadersMatcher(headerMatcher);
            transferPayloadCompositeMatcher.setMessageAnnotationsMatcher(messageAnnotationsMatcher);
            transferPayloadCompositeMatcher.setPropertiesMatcher(propertiesMatcher);
            transferPayloadCompositeMatcher.addMessageContentMatcher(encodedAmqpValueMatcher);
            protonTestClient.expectTransfer().withPayload(transferPayloadCompositeMatcher).accept();
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                createSession.createProducer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME)).send(createSession.createTextMessage("Hello World"));
                protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestClient.expectTransfer().withPayload(transferPayloadCompositeMatcher).accept();
                ProtonTestClient protonTestClient2 = new ProtonTestClient();
                try {
                    protonTestClient2.queueClientSaslAnonymousConnect();
                    protonTestClient2.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
                    protonTestClient2.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                    protonTestClient2.expectOpen();
                    protonTestClient2.expectBegin();
                    protonTestClient2.expectAttach();
                    protonTestClient2.expectFlow();
                    protonTestClient2.remoteOpen().withContainerId("test-sender").now();
                    protonTestClient2.remoteBegin().now();
                    protonTestClient2.remoteAttach().ofSender().withInitialDeliveryCount(0L).withName("sending-peer").withTarget().withAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).withCapabilities(new String[]{"topic"}).also().withSource().also().now();
                    protonTestClient2.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                    protonTestClient2.expectDisposition().withSettled(true).withState().accepted();
                    protonTestClient2.expectDisposition().withSettled(true).withState().accepted();
                    protonTestClient2.remoteTransfer().withDeliveryId(0).withHeader().withDurability(false).also().withProperties().withMessageId("ID:1").also().withMessageAnnotations().withAnnotation(AMQPFederationPolicySupport.MESSAGE_HOPS_ANNOTATION.toString(), 1).also().withBody().withString("Hello World").also().now();
                    protonTestClient2.remoteTransfer().withDeliveryId(1).withHeader().withDurability(false).also().withProperties().withMessageId("ID:2").also().withMessageAnnotations().withAnnotation(AMQPFederationPolicySupport.MESSAGE_HOPS_ANNOTATION.toString(), 2).also().withBody().withString("Hello World").also().now();
                    protonTestClient2.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                    protonTestClient2.close();
                    protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                    protonTestClient.expectClose();
                    protonTestClient.remoteClose().now();
                    protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                    protonTestClient.close();
                    this.server.stop();
                    protonTestClient.close();
                } finally {
                }
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testRemoteConnectionCannotAttachAddressFederationLinkWithoutControlLink() throws Exception {
        this.server.start();
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            protonTestClient.queueClientSaslAnonymousConnect();
            protonTestClient.remoteOpen().queue();
            protonTestClient.expectOpen();
            protonTestClient.remoteBegin().queue();
            protonTestClient.expectBegin();
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectAttach().ofSender().withName("federation-address-receiver").withSource(CoreMatchers.nullValue()).withTarget();
            protonTestClient.expectDetach().respond();
            protonTestClient.remoteAttach().ofReceiver().withDesiredCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()}).withName("federation-address-receiver").withSenderSettleModeUnsettled().withReceivervSettlesFirst().withSource().withDurabilityOfNone().withExpiryPolicyOnLinkDetach().withAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).withCapabilities(new String[]{"topic"}).and().withTarget().and().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectClose();
            protonTestClient.remoteClose().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.close();
            this.server.stop();
            protonTestClient.close();
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testTransformInboundFederatedMessageBeforeDispatch() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respondInKind();
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            HashMap hashMap = new HashMap();
            hashMap.put("appProperty1", "one");
            hashMap.put("appProperty2", "two");
            TransformerConfiguration transformerConfiguration = new TransformerConfiguration();
            transformerConfiguration.setClassName(ApplicationPropertiesTransformer.class.getName());
            transformerConfiguration.setProperties(hashMap);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.setTransformerConfiguration(transformerConfiguration);
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(1000);
            protonTestServer.remoteTransfer().withBody().withString("test-message").also().withDeliveryId(0).queue();
            protonTestServer.expectDisposition().withSettled(true).withState().accepted();
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                createConnection.start();
                TextMessage receive = createConsumer.receive(5000L);
                Assertions.assertNotNull(receive);
                Assertions.assertTrue(receive instanceof TextMessage);
                Assertions.assertEquals("test-message", receive.getText());
                Assertions.assertEquals("one", receive.getStringProperty("appProperty1"));
                Assertions.assertEquals("two", receive.getStringProperty("appProperty2"));
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectDetach();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationDoesNotCreateAddressReceiverLinkForAddressMatchWhenLinkCreditIsSetToZero() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setAutoDelete(true);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(10000L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort() + "?amqpCredits=0");
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                createConnection.start();
                Assertions.assertNull(createConsumer.receiveNoWait());
                createConsumer.close();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testCoreMessageConvertedToAMQPWhenTunnelingDisabled() throws Exception {
        doTestCoreMessageHandlingBasedOnTunnelingState(false);
    }

    @Timeout(20)
    @Test
    public void testCoreMessageNotConvertedToAMQPWhenTunnelingEnabled() throws Exception {
        doTestCoreMessageHandlingBasedOnTunnelingState(true);
    }

    private void doTestCoreMessageHandlingBasedOnTunnelingState(boolean z) throws Exception {
        String[] strArr;
        int i;
        this.server.start();
        if (z) {
            strArr = new String[]{AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()};
            i = 1183580416;
        } else {
            strArr = null;
            i = 0;
        }
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            scriptFederationConnectToRemote(protonTestClient, AutoCreateJmsDestinationTest.QUEUE_NAME);
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectAttach().ofSender().withName("federation-address-receiver").withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()}).withDesiredCapabilities(new String[]{AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).withSource().withAddress(AutoCreateJmsDestinationTest.QUEUE_NAME);
            protonTestClient.remoteAttach().ofReceiver().withOfferedCapabilities(strArr).withDesiredCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()}).withName("federation-address-receiver").withSenderSettleModeUnsettled().withReceivervSettlesFirst().withSource().withDurabilityOfNone().withExpiryPolicyOnLinkDetach().withAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).withCapabilities(new String[]{"topic"}).and().withTarget().and().now();
            protonTestClient.remoteFlow().withLinkCredit(10L).now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectTransfer().withNonNullPayload().withMessageFormat(i).accept();
            Connection createConnection = CFUtil.createConnectionFactory("CORE", "tcp://localhost:5672?minLargeMessageSize=512").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                createSession.createProducer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME)).send(createSession.createTextMessage("Hello World"));
                protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestClient.expectClose();
                protonTestClient.remoteClose().now();
                protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestClient.close();
                this.server.stop();
                protonTestClient.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testCoreLargeMessageConvertedToAMQPWhenTunnelingDisabled() throws Exception {
        doTestCoreLargeMessageHandlingBasedOnTunnelingState(false);
    }

    @Timeout(20)
    @Test
    public void testCoreLargeMessageNotConvertedToAMQPWhenTunnelingEnabled() throws Exception {
        doTestCoreLargeMessageHandlingBasedOnTunnelingState(true);
    }

    private void doTestCoreLargeMessageHandlingBasedOnTunnelingState(boolean z) throws Exception {
        String[] strArr;
        int i;
        this.server.start();
        if (z) {
            strArr = new String[]{AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()};
            i = 1183580672;
        } else {
            strArr = null;
            i = 0;
        }
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            scriptFederationConnectToRemote(protonTestClient, AutoCreateJmsDestinationTest.QUEUE_NAME);
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectAttach().ofSender().withName("federation-address-receiver").withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()}).withDesiredCapabilities(new String[]{AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString()}).withSource().withAddress(AutoCreateJmsDestinationTest.QUEUE_NAME);
            protonTestClient.remoteAttach().ofReceiver().withDesiredCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()}).withOfferedCapabilities(strArr).withName("federation-address-receiver").withSenderSettleModeUnsettled().withReceivervSettlesFirst().withSource().withDurabilityOfNone().withExpiryPolicyOnLinkDetach().withAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).withCapabilities(new String[]{"topic"}).and().withTarget().and().now();
            protonTestClient.remoteFlow().withLinkCredit(10L).now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectTransfer().withNonNullPayload().withMessageFormat(i).accept();
            Connection createConnection = CFUtil.createConnectionFactory("CORE", "tcp://localhost:5672?minLargeMessageSize=512").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                MessageProducer createProducer = createSession.createProducer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                byte[] bArr = new byte[AmqpTransferTagGenerator.DEFAULT_TAG_POOL_SIZE];
                Arrays.fill(bArr, (byte) 65);
                BytesMessage createBytesMessage = createSession.createBytesMessage();
                createBytesMessage.writeBytes(bArr);
                createProducer.send(createBytesMessage);
                protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestClient.expectClose();
                protonTestClient.remoteClose().now();
                protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestClient.close();
                this.server.stop();
                protonTestClient.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationStartedTriggersRemoteDemandWithExistingAddressBindings() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.setAutostart(false);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.createQueue(QueueConfiguration.of(AutoCreateJmsDestinationTest.QUEUE_NAME).setRoutingType(RoutingType.MULTICAST).setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).setAutoCreated(false));
            Wait.assertTrue(() -> {
                return this.server.queueQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME)).isExists();
            });
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            Session createSession = createConnection.createSession(1);
            createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
            createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
            createSession.createConsumer(createSession.createTopic("a1"));
            createSession.createConsumer(createSession.createTopic("a2"));
            createConnection.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(1000);
            this.server.getBrokerConnections().forEach(brokerConnection -> {
                try {
                    brokerConnection.start();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
            createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
            createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            createConnection.close();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectDetach().respond();
            logger.info("Removing Queues from federated address to eliminate demand");
            this.server.destroyQueue(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME));
            Wait.assertFalse(() -> {
                return this.server.queueQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME)).isExists();
            });
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectClose();
            protonTestServer.remoteClose().now();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.close();
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationStartedTriggersRemoteDemandWithExistingAddressAndDivertBindings() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setEnableDivertBindings(true);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.setAutostart(false);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            DivertConfiguration divertConfiguration = new DivertConfiguration();
            divertConfiguration.setName("test-divert");
            divertConfiguration.setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME);
            divertConfiguration.setExclusive(false);
            divertConfiguration.setForwardingAddress("target");
            divertConfiguration.setRoutingType(ComponentConfigurationRoutingType.MULTICAST);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            this.server.addAddressInfo(new AddressInfo(SimpleString.of("target"), RoutingType.MULTICAST));
            this.server.deployDivert(divertConfiguration);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            Session createSession = createConnection.createSession(1);
            Topic createTopic = createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME);
            Topic createTopic2 = createSession.createTopic("target");
            createSession.createConsumer(createTopic);
            createSession.createConsumer(createTopic);
            createSession.createConsumer(createTopic2);
            createSession.createConsumer(createTopic2);
            createSession.createConsumer(createSession.createTopic("a1"));
            createSession.createConsumer(createSession.createTopic("a2"));
            createConnection.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(1000);
            this.server.getBrokerConnections().forEach(brokerConnection -> {
                try {
                    brokerConnection.start();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
            createSession.createConsumer(createTopic);
            createSession.createConsumer(createTopic2);
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectDetach().respond();
            createConnection.close();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectClose();
            protonTestServer.remoteClose().now();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.close();
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationStartTriggersFederationWithMultipleDivertsAndRemainsActiveAfterOneRemoved() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setEnableDivertBindings(true);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.setAutostart(false);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            DivertConfiguration divertConfiguration = new DivertConfiguration();
            divertConfiguration.setName("test-divert-1");
            divertConfiguration.setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME);
            divertConfiguration.setExclusive(false);
            divertConfiguration.setForwardingAddress("target1,target2");
            divertConfiguration.setRoutingType(ComponentConfigurationRoutingType.MULTICAST);
            DivertConfiguration divertConfiguration2 = new DivertConfiguration();
            divertConfiguration2.setName("test-divert-2");
            divertConfiguration2.setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME);
            divertConfiguration2.setExclusive(false);
            divertConfiguration2.setForwardingAddress("target1,target3");
            divertConfiguration2.setRoutingType(ComponentConfigurationRoutingType.MULTICAST);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            this.server.addAddressInfo(new AddressInfo(SimpleString.of("target1"), RoutingType.MULTICAST));
            this.server.addAddressInfo(new AddressInfo(SimpleString.of("target2"), RoutingType.MULTICAST));
            this.server.addAddressInfo(new AddressInfo(SimpleString.of("target3"), RoutingType.MULTICAST));
            this.server.deployDivert(divertConfiguration);
            this.server.deployDivert(divertConfiguration2);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            Session createSession = createConnection.createSession(1);
            Topic createTopic = createSession.createTopic("target1");
            Topic createTopic2 = createSession.createTopic("target2");
            Topic createTopic3 = createSession.createTopic("target2");
            createSession.createConsumer(createTopic);
            createSession.createConsumer(createTopic2);
            createSession.createConsumer(createTopic3);
            createConnection.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(1000);
            this.server.getBrokerConnections().forEach(brokerConnection -> {
                try {
                    brokerConnection.start();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
            createSession.createConsumer(createTopic);
            createSession.createConsumer(createTopic2);
            createSession.createConsumer(createTopic3);
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            this.server.destroyDivert(SimpleString.of(divertConfiguration.getName()));
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectDetach().respond();
            this.server.destroyDivert(SimpleString.of(divertConfiguration2.getName()));
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            createConnection.close();
            protonTestServer.expectClose();
            protonTestServer.remoteClose().now();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.close();
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationPluginCanLimitDemandToOnlyTheConfiguredDivert() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setEnableDivertBindings(true);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(5);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            DivertConfiguration divertConfiguration = new DivertConfiguration();
            divertConfiguration.setName("test-divert-1");
            divertConfiguration.setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME);
            divertConfiguration.setExclusive(false);
            divertConfiguration.setForwardingAddress("target");
            divertConfiguration.setRoutingType(ComponentConfigurationRoutingType.MULTICAST);
            AMQPTestFederationBrokerPlugin aMQPTestFederationBrokerPlugin = new AMQPTestFederationBrokerPlugin();
            aMQPTestFederationBrokerPlugin.shouldCreateConsumerForDivert = (divert, queue) -> {
                return true;
            };
            aMQPTestFederationBrokerPlugin.shouldCreateConsumerForQueue = queue2 -> {
                return !queue2.getAddress().toString().equals(AutoCreateJmsDestinationTest.QUEUE_NAME);
            };
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            this.server.addAddressInfo(new AddressInfo(SimpleString.of("target"), RoutingType.MULTICAST));
            this.server.deployDivert(divertConfiguration);
            this.server.registerBrokerPlugin(aMQPTestFederationBrokerPlugin);
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                createConnection.start();
                createConsumer.receiveNoWait();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
                protonTestServer.expectFlow().withLinkCredit(1000);
                createSession.createConsumer(createSession.createTopic("target"));
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationCreatesEventSenderAndReceiverWhenLocalAndRemotePoliciesAdded() throws Exception {
        MessageAnnotationsMatcher messageAnnotationsMatcher = new MessageAnnotationsMatcher(true);
        messageAnnotationsMatcher.withEntry(AMQPFederationConstants.OPERATION_TYPE.toString(), Matchers.is("ADD_ADDRESS_POLICY"));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(AutoCreateJmsDestinationTest.QUEUE_NAME);
        linkedHashMap.put("policy-name", "remote-address-policy");
        linkedHashMap.put("auto-delete", false);
        linkedHashMap.put("auto-delete-delay", -1L);
        linkedHashMap.put("auto-delete-msg-count", -1L);
        linkedHashMap.put("max-hops", 5);
        linkedHashMap.put("enable-divert-bindings", false);
        linkedHashMap.put("address-includes", arrayList);
        EncodedAmqpValueMatcher encodedAmqpValueMatcher = new EncodedAmqpValueMatcher(linkedHashMap);
        TransferPayloadCompositeMatcher transferPayloadCompositeMatcher = new TransferPayloadCompositeMatcher();
        transferPayloadCompositeMatcher.setMessageAnnotationsMatcher(messageAnnotationsMatcher);
        transferPayloadCompositeMatcher.addMessageContentMatcher(encodedAmqpValueMatcher);
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withHandle(0).withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofSender().withTarget().withDynamic(true).and().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind().withTarget().withAddress("test-dynamic-events-sender");
            protonTestServer.remoteFlow().withLinkCredit(10L).queue();
            protonTestServer.expectAttach().ofReceiver().withSource().withDynamic(true).and().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind().withSource().withAddress("test-dynamic-events-receiver");
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.remoteFlow().withLinkCredit(10L).withHandle(0L).queue();
            protonTestServer.expectTransfer().withPayload(transferPayloadCompositeMatcher);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setAutoDelete(false);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement2 = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement2.setName("remote-address-policy");
            aMQPFederationAddressPolicyElement2.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement2.setAutoDelete(false);
            aMQPFederationAddressPolicyElement2.setAutoDeleteDelay(-1L);
            aMQPFederationAddressPolicyElement2.setAutoDeleteMessageCount(-1L);
            aMQPFederationAddressPolicyElement2.setMaxHops(5);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            aMQPFederatedBrokerConnectionElement.addRemoteAddressPolicy(aMQPFederationAddressPolicyElement2);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.close();
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationSendsRemotePolicyIfEventsSenderLinkRejected() throws Exception {
        MessageAnnotationsMatcher messageAnnotationsMatcher = new MessageAnnotationsMatcher(true);
        messageAnnotationsMatcher.withEntry(AMQPFederationConstants.OPERATION_TYPE.toString(), Matchers.is("ADD_ADDRESS_POLICY"));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        ArrayList arrayList = new ArrayList();
        arrayList.add(AutoCreateJmsDestinationTest.QUEUE_NAME);
        linkedHashMap.put("policy-name", "remote-address-policy");
        linkedHashMap.put("auto-delete", false);
        linkedHashMap.put("auto-delete-delay", -1L);
        linkedHashMap.put("auto-delete-msg-count", -1L);
        linkedHashMap.put("max-hops", 5);
        linkedHashMap.put("enable-divert-bindings", false);
        linkedHashMap.put("address-includes", arrayList);
        EncodedAmqpValueMatcher encodedAmqpValueMatcher = new EncodedAmqpValueMatcher(linkedHashMap);
        TransferPayloadCompositeMatcher transferPayloadCompositeMatcher = new TransferPayloadCompositeMatcher();
        transferPayloadCompositeMatcher.setMessageAnnotationsMatcher(messageAnnotationsMatcher);
        transferPayloadCompositeMatcher.addMessageContentMatcher(encodedAmqpValueMatcher);
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withHandle(0).withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofSender().withTarget().withDynamic(true).and().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).reject(true, LinkError.DETACH_FORCED.toString(), "Unknown error");
            protonTestServer.expectDetach();
            protonTestServer.remoteFlow().withLinkCredit(10L).withHandle(0L).queue();
            protonTestServer.expectTransfer().withPayload(transferPayloadCompositeMatcher);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("remote-address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setAutoDelete(false);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            aMQPFederationAddressPolicyElement.setMaxHops(5);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addRemoteAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.close();
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testRemoteBrokerSendsAddressAddedEventForInterestedPeer() throws Exception {
        AddressSettings addressSettings = new AddressSettings();
        addressSettings.setAutoCreateQueues(false);
        addressSettings.setAutoCreateAddresses(false);
        this.server.getConfiguration().getAddressSettings().put("#", addressSettings);
        this.server.start();
        HashMap hashMap = new HashMap();
        hashMap.put("auto-delete", true);
        hashMap.put("auto-delete-delay", 10000L);
        hashMap.put("auto-delete-msg-count", 1L);
        MessageAnnotationsMatcher messageAnnotationsMatcher = new MessageAnnotationsMatcher(true);
        messageAnnotationsMatcher.withEntry(AMQPFederationConstants.EVENT_TYPE.toString(), Matchers.is("REQUESTED_ADDRESS_ADDED_EVENT"));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("REQUESTED_ADDRESS_NAME", AutoCreateJmsDestinationTest.QUEUE_NAME);
        EncodedAmqpValueMatcher encodedAmqpValueMatcher = new EncodedAmqpValueMatcher(linkedHashMap);
        TransferPayloadCompositeMatcher transferPayloadCompositeMatcher = new TransferPayloadCompositeMatcher();
        transferPayloadCompositeMatcher.setMessageAnnotationsMatcher(messageAnnotationsMatcher);
        transferPayloadCompositeMatcher.addMessageContentMatcher(encodedAmqpValueMatcher);
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            scriptFederationConnectToRemote(protonTestClient, AutoCreateJmsDestinationTest.QUEUE_NAME, false, true);
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectAttach().ofSender().withName("federation-address-receiver").withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()}).withTarget().also().withNullSource();
            protonTestClient.expectDetach().respond();
            protonTestClient.remoteAttach().ofReceiver().withDesiredCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()}).withName("federation-address-receiver").withSenderSettleModeUnsettled().withReceivervSettlesFirst().withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).withSource().withDurabilityOfNone().withExpiryPolicyOnLinkDetach().withAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).withCapabilities(new String[]{"topic"}).and().withTarget().and().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectTransfer().withPayload(transferPayloadCompositeMatcher).accept();
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            this.server.createQueue(QueueConfiguration.of(AutoCreateJmsDestinationTest.QUEUE_NAME).setRoutingType(RoutingType.MULTICAST).setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).setAutoCreated(false));
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectClose();
            protonTestClient.remoteClose().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.close();
            this.server.stop();
            protonTestClient.close();
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationCreatesAddressReceiverInResponseToAddressAddedEvent() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withHandle(0).withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.remoteFlow().withLinkCredit(10L);
            protonTestServer.expectAttach().ofReceiver().withHandle(1).withSenderSettleModeSettled().withSource().withDynamic(true).and().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind().withTarget().withAddress("test-dynamic-events");
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setAutoDelete(false);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            HashMap hashMap = new HashMap();
            hashMap.put("auto-delete", false);
            hashMap.put("auto-delete-delay", -1L);
            hashMap.put("auto-delete-msg-count", -1L);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).respond().withNullSource().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.remoteDetach().withClosed(true).withErrorCondition(AmqpError.NOT_FOUND.toString(), "Address not found").queue();
            protonTestServer.expectFlow().optional();
            protonTestServer.expectDetach();
            this.server.createQueue(QueueConfiguration.of(AutoCreateJmsDestinationTest.QUEUE_NAME).setRoutingType(RoutingType.MULTICAST).setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).setAutoCreated(false));
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(1000);
            sendAddressAddedEvent(protonTestServer, "target", 1, 0);
            sendAddressAddedEvent(protonTestServer, AutoCreateJmsDestinationTest.QUEUE_NAME, 1, 1);
            sendAddressAddedEvent(protonTestServer, AutoCreateJmsDestinationTest.QUEUE_NAME, 1, 2);
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.close();
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testAddressAddedEventIgnoredIfFederationConsumerAlreadyCreated() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withHandle(0).withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respondInKind();
            protonTestServer.remoteFlow().withLinkCredit(10L);
            protonTestServer.expectAttach().ofReceiver().withHandle(1).withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).respond().withNullSource().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.remoteDetach().withClosed(true).withErrorCondition(AmqpError.NOT_FOUND.toString(), "Address not found").queue();
            protonTestServer.expectFlow().optional();
            protonTestServer.expectDetach();
            this.server.createQueue(QueueConfiguration.of(AutoCreateJmsDestinationTest.QUEUE_NAME).setRoutingType(RoutingType.MULTICAST).setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).setAutoCreated(false));
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(1000);
            Session createSession = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection().createSession(1);
            createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            sendAddressAddedEvent(protonTestServer, AutoCreateJmsDestinationTest.QUEUE_NAME, 1, 0);
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.close();
            protonTestServer.close();
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testRemoteBrokerClosesFederationReceiverAfterAddressRemoved() throws Exception {
        this.server.start();
        this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            scriptFederationConnectToRemote(protonTestClient, AutoCreateJmsDestinationTest.QUEUE_NAME, true, true);
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectAttach().ofSender().withName("federation-address-receiver").withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()}).withSource().withAddress(AutoCreateJmsDestinationTest.QUEUE_NAME);
            protonTestClient.remoteAttach().ofReceiver().withDesiredCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()}).withName("federation-address-receiver").withSenderSettleModeUnsettled().withReceivervSettlesFirst().withSource().withDurabilityOfNone().withExpiryPolicyOnLinkDetach().withAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).withCapabilities(new String[]{"topic"}).and().withTarget().and().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectDetach().withError(AmqpError.RESOURCE_DELETED.toString());
            this.server.removeAddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), (SecurityAuth) null, true);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            MessageAnnotationsMatcher messageAnnotationsMatcher = new MessageAnnotationsMatcher(true);
            messageAnnotationsMatcher.withEntry(AMQPFederationConstants.EVENT_TYPE.toString(), Matchers.is("REQUESTED_ADDRESS_ADDED_EVENT"));
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            linkedHashMap.put("REQUESTED_ADDRESS_NAME", AutoCreateJmsDestinationTest.QUEUE_NAME);
            EncodedAmqpValueMatcher encodedAmqpValueMatcher = new EncodedAmqpValueMatcher(linkedHashMap);
            TransferPayloadCompositeMatcher transferPayloadCompositeMatcher = new TransferPayloadCompositeMatcher();
            transferPayloadCompositeMatcher.setMessageAnnotationsMatcher(messageAnnotationsMatcher);
            transferPayloadCompositeMatcher.addMessageContentMatcher(encodedAmqpValueMatcher);
            protonTestClient.expectTransfer().withPayload(transferPayloadCompositeMatcher).withSettled(true);
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            this.server.removeAddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), (SecurityAuth) null, true);
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectClose();
            protonTestClient.remoteClose().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.close();
            this.server.stop();
            protonTestClient.close();
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationAddressDemandTrackedWhenRemoteRejectsInitialAttempts() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                Topic createTopic = createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME);
                createConnection.start();
                protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).respondInKind().withNullSource();
                protonTestServer.expectFlow().withLinkCredit(1000);
                protonTestServer.remoteDetach().withErrorCondition("amqp:not-found", "the requested queue was not found").queue().afterDelay(10);
                protonTestServer.expectDetach();
                MessageConsumer createConsumer = createSession.createConsumer(createTopic);
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).respondInKind().withNullSource();
                protonTestServer.expectFlow().withLinkCredit(1000);
                protonTestServer.remoteDetach().withErrorCondition("amqp:not-found", "the requested queue was not found").queue().afterDelay(10);
                protonTestServer.expectDetach();
                MessageConsumer createConsumer2 = createSession.createConsumer(createTopic);
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).respondInKind();
                protonTestServer.expectFlow().withLinkCredit(1000);
                MessageConsumer createConsumer3 = createSession.createConsumer(createTopic);
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                createConsumer3.close();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                createConsumer2.close();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectDetach().respond();
                createConsumer.close();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationAddressDemandTrackedWhenPluginBlocksInitialAttempts() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Connect test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            AtomicInteger atomicInteger = new AtomicInteger(2);
            AMQPTestFederationBrokerPlugin aMQPTestFederationBrokerPlugin = new AMQPTestFederationBrokerPlugin();
            aMQPTestFederationBrokerPlugin.shouldCreateConsumerForDivert = (divert, queue) -> {
                return true;
            };
            aMQPTestFederationBrokerPlugin.shouldCreateConsumerForQueue = queue2 -> {
                return true;
            };
            aMQPTestFederationBrokerPlugin.shouldCreateConsumerForAddress = addressInfo -> {
                return Boolean.valueOf(atomicInteger.getAndDecrement() == 0);
            };
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.registerBrokerPlugin(aMQPTestFederationBrokerPlugin);
            this.server.start();
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                Topic createTopic = createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME);
                createConnection.start();
                MessageConsumer createConsumer = createSession.createConsumer(createTopic);
                MessageConsumer createConsumer2 = createSession.createConsumer(createTopic);
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).respondInKind();
                protonTestServer.expectFlow().withLinkCredit(1000);
                MessageConsumer createConsumer3 = createSession.createConsumer(createTopic);
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                createConsumer3.close();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                createConsumer2.close();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectDetach().respond();
                createConsumer.close();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testBrokerAllowsAttachToPreviouslyNonExistentAddressAfterItIsAdded() throws Exception {
        AddressSettings addressSettings = new AddressSettings();
        addressSettings.setAutoCreateAddresses(false);
        this.server.getConfiguration().getAddressSettings().put("#", addressSettings);
        this.server.start();
        HashMap hashMap = new HashMap();
        hashMap.put("auto-delete", false);
        ProtonTestClient protonTestClient = new ProtonTestClient();
        try {
            scriptFederationConnectToRemote(protonTestClient, AutoCreateJmsDestinationTest.QUEUE_NAME);
            protonTestClient.connect("localhost", NettyTransportOptions.DEFAULT_TCP_PORT);
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectAttach().ofSender().withName("federation-address-receiver").withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()}).withTarget().also().withNullSource();
            protonTestClient.expectDetach().respond();
            protonTestClient.remoteAttach().ofReceiver().withDesiredCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()}).withName("federation-address-receiver").withSenderSettleModeUnsettled().withReceivervSettlesFirst().withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).withSource().withDurabilityOfNone().withExpiryPolicyOnLinkDetach().withAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).withCapabilities(new String[]{"topic"}).and().withTarget().and().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            this.server.addAddressInfo(new AddressInfo(AutoCreateJmsDestinationTest.QUEUE_NAME).addRoutingType(RoutingType.MULTICAST));
            protonTestClient.expectAttach().ofSender().withName("federation-address-receiver").withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()}).withTarget().also().withSource().withAddress(AutoCreateJmsDestinationTest.QUEUE_NAME);
            protonTestClient.remoteAttach().ofReceiver().withDesiredCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()}).withName("federation-address-receiver").withSenderSettleModeUnsettled().withReceivervSettlesFirst().withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).withSource().withDurabilityOfNone().withExpiryPolicyOnLinkDetach().withAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).withCapabilities(new String[]{"topic"}).and().withTarget().and().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.expectClose();
            protonTestClient.remoteClose().now();
            protonTestClient.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestClient.close();
            this.server.stop();
            protonTestClient.close();
        } catch (Throwable th) {
            try {
                protonTestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testAddressPolicyCanOverridesZeroCreditsInFederationConfigurationAndFederateAddress() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withSource().withDynamic(true).and().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind().withTarget().withAddress("test-dynamic-events");
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setAutoDelete(true);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(10000L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            aMQPFederationAddressPolicyElement.addProperty("amqpCredits", 10);
            aMQPFederationAddressPolicyElement.addProperty("amqpLowCredits", 3);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            aMQPFederatedBrokerConnectionElement.addProperty("amqpCredits", 0);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            HashMap hashMap = new HashMap();
            hashMap.put("auto-delete", true);
            hashMap.put("auto-delete-delay", 10000L);
            hashMap.put("auto-delete-msg-count", -1L);
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(10);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                createConnection.start();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private static void sendAddressAddedEvent(ProtonTestPeer protonTestPeer, String str, int i, int i2) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("REQUESTED_ADDRESS_NAME", str);
        protonTestPeer.remoteTransfer().withHandle(i).withDeliveryId(i2).withSettled(true).withMessageAnnotations().withAnnotation(AMQPFederationConstants.EVENT_TYPE.toString(), "REQUESTED_ADDRESS_ADDED_EVENT").also().withBody().withValue(linkedHashMap).also().now();
    }

    private static void sendAddresPolicyToRemote(ProtonTestClient protonTestClient, FederationReceiveFromAddressPolicy federationReceiveFromAddressPolicy) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("policy-name", federationReceiveFromAddressPolicy.getPolicyName());
        linkedHashMap.put("auto-delete", Boolean.valueOf(federationReceiveFromAddressPolicy.isAutoDelete()));
        linkedHashMap.put("auto-delete-delay", Long.valueOf(federationReceiveFromAddressPolicy.getAutoDeleteDelay()));
        linkedHashMap.put("auto-delete-msg-count", Long.valueOf(federationReceiveFromAddressPolicy.getAutoDeleteMessageCount()));
        linkedHashMap.put("max-hops", Integer.valueOf(federationReceiveFromAddressPolicy.getMaxHops()));
        linkedHashMap.put("enable-divert-bindings", Boolean.valueOf(federationReceiveFromAddressPolicy.isEnableDivertBindings()));
        if (!federationReceiveFromAddressPolicy.getIncludes().isEmpty()) {
            linkedHashMap.put("address-includes", new ArrayList(federationReceiveFromAddressPolicy.getIncludes()));
        }
        if (!federationReceiveFromAddressPolicy.getExcludes().isEmpty()) {
            linkedHashMap.put("address-excludes", new ArrayList(federationReceiveFromAddressPolicy.getExcludes()));
        }
        TransformerConfiguration transformerConfiguration = federationReceiveFromAddressPolicy.getTransformerConfiguration();
        if (transformerConfiguration != null) {
            linkedHashMap.put("transformer-class-name", transformerConfiguration.getClassName());
            if (transformerConfiguration.getProperties() != null && !transformerConfiguration.getProperties().isEmpty()) {
                linkedHashMap.put("transformer-properties-map", transformerConfiguration.getProperties());
            }
        }
        if (!federationReceiveFromAddressPolicy.getProperties().isEmpty()) {
            linkedHashMap.put("policy-properties-map", federationReceiveFromAddressPolicy.getProperties());
        }
        protonTestClient.remoteTransfer().withDeliveryId(0).withMessageAnnotations().withAnnotation(AMQPFederationConstants.OPERATION_TYPE.toString(), "ADD_ADDRESS_POLICY").also().withBody().withValue(linkedHashMap).also().now();
    }

    private static void scriptFederationConnectToRemote(ProtonTestClient protonTestClient, String str) {
        scriptFederationConnectToRemote(protonTestClient, str, 1000, ReplicationOrderTest.NUM);
    }

    private static void scriptFederationConnectToRemote(ProtonTestClient protonTestClient, String str, int i, int i2) {
        scriptFederationConnectToRemote(protonTestClient, str, i, i2, false, false);
    }

    private static void scriptFederationConnectToRemote(ProtonTestClient protonTestClient, String str, boolean z, boolean z2) {
        scriptFederationConnectToRemote(protonTestClient, str, 1000, ReplicationOrderTest.NUM, z, z2);
    }

    private static void scriptFederationConnectToRemote(ProtonTestClient protonTestClient, String str, int i, int i2, boolean z, boolean z2) {
        String str2 = "Federation:control:" + UUID.randomUUID().toString();
        HashMap hashMap = new HashMap();
        hashMap.put("amqpCredits", Integer.valueOf(i));
        hashMap.put("amqpLowCredits", Integer.valueOf(i2));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(AMQPFederationConstants.FEDERATION_CONFIGURATION.toString(), hashMap);
        protonTestClient.queueClientSaslAnonymousConnect();
        protonTestClient.remoteOpen().queue();
        protonTestClient.expectOpen();
        protonTestClient.remoteBegin().queue();
        protonTestClient.expectBegin();
        protonTestClient.remoteAttach().ofSender().withInitialDeliveryCount(0L).withName(str2).withPropertiesMap(hashMap2).withDesiredCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()}).withSenderSettleModeUnsettled().withReceivervSettlesFirst().withSource().also().withTarget().withDynamic(true).withDurabilityOfNone().withExpiryPolicyOnLinkDetach().withLifetimePolicyOfDeleteOnClose().withCapabilities(new String[]{"temporary-topic"}).also().queue();
        protonTestClient.expectAttach().ofReceiver().withTarget().withAddress(CoreMatchers.notNullValue()).also().withOfferedCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString());
        protonTestClient.expectFlow();
        if (z) {
            String str3 = "Federation:events-sender:test:" + UUID.randomUUID().toString();
            protonTestClient.remoteAttach().ofSender().withName(str3).withDesiredCapabilities(new String[]{AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()}).withSenderSettleModeSettled().withReceivervSettlesFirst().withSource().also().withTarget().withDynamic(true).withDurabilityOfNone().withExpiryPolicyOnLinkDetach().withLifetimePolicyOfDeleteOnClose().withCapabilities(new String[]{"temporary-topic"}).also().queue();
            protonTestClient.expectAttach().ofReceiver().withName(str3).withTarget().withAddress(CoreMatchers.notNullValue()).also().withOfferedCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString());
            protonTestClient.expectFlow();
        }
        if (z2) {
            String str4 = "Federation:events-receiver:test:" + UUID.randomUUID().toString();
            protonTestClient.remoteAttach().ofReceiver().withName(str4).withDesiredCapabilities(new String[]{AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()}).withSenderSettleModeSettled().withReceivervSettlesFirst().withTarget().also().withSource().withDynamic(true).withDurabilityOfNone().withExpiryPolicyOnLinkDetach().withLifetimePolicyOfDeleteOnClose().withCapabilities(new String[]{"temporary-topic"}).also().queue();
            protonTestClient.remoteFlow().withLinkCredit(10L).queue();
            protonTestClient.expectAttach().ofSender().withName(str4).withSource().withAddress(CoreMatchers.notNullValue()).also().withOfferedCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString());
        }
    }
}
