package org.apache.activemq.artemis.tests.integration.amqp.connect;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Map;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.integration.client.AutoCreateJmsDestinationTest;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.class */
public class AMQPFederationServerToServerTest extends AmqpClientTestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final int SERVER_PORT = 5672;
    private static final int SERVER_PORT_REMOTE = 5673;
    private static final int SERVER2_PORT_REMOTE = 5674;
    private static final int MIN_LARGE_MESSAGE_SIZE = 10240;
    protected ActiveMQServer remoteServer;
    protected ActiveMQServer remoteServer2;

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    protected String getConfiguredProtocols() {
        return "AMQP,CORE";
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    protected ActiveMQServer createServer() throws Exception {
        this.remoteServer = createServer(SERVER_PORT_REMOTE, false);
        return createServer(5672, false);
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    protected void configureAMQPAcceptorParameters(Map<String, Object> map) {
        map.put("amqpMinLargeMessageSize", Integer.valueOf(MIN_LARGE_MESSAGE_SIZE));
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport, org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport, org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @AfterEach
    public void tearDown() throws Exception {
        super.tearDown();
        try {
            if (this.remoteServer != null) {
                this.remoteServer.stop();
                this.remoteServer = null;
            }
        } catch (Exception e) {
        }
        try {
            if (this.remoteServer2 != null) {
                this.remoteServer2.stop();
                this.remoteServer2 = null;
            }
        } catch (Exception e2) {
        }
    }

    @Timeout(20)
    @Test
    public void testAddresDemandOnLocalBrokerFederatesMessagesFromRemoteAMQP() throws Exception {
        testAddresDemandOnLocalBrokerFederatesMessagesFromRemote("AMQP");
    }

    @Timeout(20)
    @Test
    public void testAddresDemandOnLocalBrokerFederatesMessagesFromRemoteCORE() throws Exception {
        testAddresDemandOnLocalBrokerFederatesMessagesFromRemote("CORE");
    }

    private void testAddresDemandOnLocalBrokerFederatesMessagesFromRemote(String str) throws Exception {
        logger.info("Test started: {}", getTestName());
        AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
        aMQPFederationAddressPolicyElement.setName("test-policy");
        aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
        aMQPFederationAddressPolicyElement.setAutoDelete(false);
        aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
        aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
        AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
        aMQPFederatedBrokerConnectionElement.setName(getTestName());
        aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:5673");
        aMQPBrokerConnectConfiguration.setReconnectAttempts(10);
        aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
        this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.remoteServer.start();
        this.server.start();
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, "tcp://localhost:5672");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory(str, "tcp://localhost:5673");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Connection createConnection2 = createConnectionFactory2.createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                Session createSession2 = createConnection2.createSession(1);
                Topic createTopic = createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME);
                MessageConsumer createConsumer = createSession.createConsumer(createTopic);
                createConnection.start();
                createConnection2.start();
                Wait.assertTrue(() -> {
                    return this.server.addressQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME)).isExists();
                });
                Wait.assertTrue(() -> {
                    return this.remoteServer.addressQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME)).isExists();
                });
                Wait.assertTrue(() -> {
                    return this.server.bindingQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), false).getQueueNames().size() >= 1;
                });
                Wait.assertTrue(() -> {
                    return this.remoteServer.bindingQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), false).getQueueNames().size() >= 1;
                });
                MessageProducer createProducer = createSession2.createProducer(createTopic);
                TextMessage createTextMessage = createSession2.createTextMessage("Hello World");
                createTextMessage.setStringProperty("testProperty", "testValue");
                createProducer.send(createTextMessage);
                TextMessage receive = createConsumer.receive(5000L);
                Assertions.assertNotNull(receive);
                Assertions.assertTrue(receive instanceof TextMessage);
                Assertions.assertEquals("Hello World", receive.getText());
                Assertions.assertTrue(createTextMessage.propertyExists("testProperty"));
                Assertions.assertEquals("testValue", receive.getStringProperty("testProperty"));
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Timeout(20)
    @Test
    public void testDivertAddressDemandOnLocalBrokerFederatesMessagesFromRemoteAMQP() throws Exception {
        testDivertAddresDemandOnLocalBrokerFederatesMessagesFromRemote("AMQP");
    }

    @Timeout(20)
    @Test
    public void testDivertAddresDemandOnLocalBrokerFederatesMessagesFromRemoteCORE() throws Exception {
        testDivertAddresDemandOnLocalBrokerFederatesMessagesFromRemote("CORE");
    }

    private void testDivertAddresDemandOnLocalBrokerFederatesMessagesFromRemote(String str) throws Exception {
        logger.info("Test started: {}", getTestName());
        AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
        aMQPFederationAddressPolicyElement.setName("test-policy");
        aMQPFederationAddressPolicyElement.addToIncludes("source");
        aMQPFederationAddressPolicyElement.setAutoDelete(false);
        aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
        aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
        aMQPFederationAddressPolicyElement.setEnableDivertBindings(true);
        AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
        aMQPFederatedBrokerConnectionElement.setName(getTestName());
        aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:5673");
        aMQPBrokerConnectConfiguration.setReconnectAttempts(10);
        aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
        DivertConfiguration divertConfiguration = new DivertConfiguration();
        divertConfiguration.setName("test-divert");
        divertConfiguration.setAddress("source");
        divertConfiguration.setForwardingAddress("target");
        divertConfiguration.setRoutingType(ComponentConfigurationRoutingType.MULTICAST);
        this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.remoteServer.start();
        this.server.start();
        this.server.deployDivert(divertConfiguration);
        this.server.addAddressInfo(new AddressInfo(SimpleString.of("source"), RoutingType.MULTICAST));
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, "tcp://localhost:5672");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory(str, "tcp://localhost:5673");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Connection createConnection2 = createConnectionFactory2.createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                Session createSession2 = createConnection2.createSession(1);
                Topic createTopic = createSession.createTopic("target");
                Topic createTopic2 = createSession.createTopic("source");
                MessageConsumer createConsumer = createSession.createConsumer(createTopic);
                createConnection.start();
                createConnection2.start();
                Wait.assertTrue(() -> {
                    return this.remoteServer.addressQuery(SimpleString.of("source")).isExists();
                });
                Wait.assertTrue(() -> {
                    return this.server.bindingQuery(SimpleString.of("target"), false).getQueueNames().size() >= 1;
                });
                Wait.assertTrue(() -> {
                    return this.remoteServer.bindingQuery(SimpleString.of("source"), false).getQueueNames().size() >= 1;
                });
                MessageProducer createProducer = createSession2.createProducer(createTopic2);
                TextMessage createTextMessage = createSession2.createTextMessage("Hello World");
                createTextMessage.setStringProperty("testProperty", "testValue");
                createProducer.send(createTextMessage);
                TextMessage receive = createConsumer.receive(5000L);
                Assertions.assertNotNull(receive);
                Assertions.assertTrue(receive instanceof TextMessage);
                Assertions.assertEquals("Hello World", receive.getText());
                Assertions.assertTrue(createTextMessage.propertyExists("testProperty"));
                Assertions.assertEquals("testValue", receive.getStringProperty("testProperty"));
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Timeout(20)
    @Test
    public void testQueueDemandOnLocalBrokerFederatesMessagesFromRemoteAMQP() throws Exception {
        testQueueDemandOnLocalBrokerFederatesMessagesFromRemote("AMQP");
    }

    @Timeout(20)
    @Test
    public void testQueueDemandOnLocalBrokerFederatesMessagesFromRemoteCORE() throws Exception {
        testQueueDemandOnLocalBrokerFederatesMessagesFromRemote("CORE");
    }

    private void testQueueDemandOnLocalBrokerFederatesMessagesFromRemote(String str) throws Exception {
        logger.info("Test started: {}", getTestName());
        AMQPFederationQueuePolicyElement aMQPFederationQueuePolicyElement = new AMQPFederationQueuePolicyElement();
        aMQPFederationQueuePolicyElement.setName("test-policy");
        aMQPFederationQueuePolicyElement.addToIncludes("#", AutoCreateJmsDestinationTest.QUEUE_NAME);
        AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
        aMQPFederatedBrokerConnectionElement.setName(getTestName());
        aMQPFederatedBrokerConnectionElement.addLocalQueuePolicy(aMQPFederationQueuePolicyElement);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:5673");
        aMQPBrokerConnectConfiguration.setReconnectAttempts(10);
        aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
        this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.remoteServer.start();
        this.remoteServer.createQueue(QueueConfiguration.of(AutoCreateJmsDestinationTest.QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).setAutoCreated(false));
        this.server.start();
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, "tcp://localhost:5672");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory(str, "tcp://localhost:5673");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Connection createConnection2 = createConnectionFactory2.createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                Session createSession2 = createConnection2.createSession(1);
                Queue createQueue = createSession.createQueue(AutoCreateJmsDestinationTest.QUEUE_NAME);
                MessageConsumer createConsumer = createSession.createConsumer(createQueue);
                createConnection.start();
                createConnection2.start();
                Wait.assertTrue(() -> {
                    return this.server.queueQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME)).isExists();
                });
                MessageProducer createProducer = createSession2.createProducer(createQueue);
                TextMessage createTextMessage = createSession2.createTextMessage("Hello World");
                createTextMessage.setStringProperty("testProperty", "testValue");
                createProducer.send(createTextMessage);
                TextMessage receive = createConsumer.receive(5000L);
                Assertions.assertNotNull(receive);
                Assertions.assertTrue(receive instanceof TextMessage);
                Assertions.assertEquals("Hello World", receive.getText());
                Assertions.assertTrue(createTextMessage.propertyExists("testProperty"));
                Assertions.assertEquals("testValue", receive.getStringProperty("testProperty"));
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Timeout(20)
    @Test
    public void testAddresDemandOnRemoteBrokerFederatesMessagesFromLocalAMQP() throws Exception {
        testAddresDemandOnRemoteBrokerFederatesMessagesFromLocal("AMQP");
    }

    @Timeout(20)
    @Test
    public void testAddresDemandOnRemoteBrokerFederatesMessagesFromLocalCORE() throws Exception {
        testAddresDemandOnRemoteBrokerFederatesMessagesFromLocal("CORE");
    }

    private void testAddresDemandOnRemoteBrokerFederatesMessagesFromLocal(String str) throws Exception {
        logger.info("Test started: {}", getTestName());
        AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
        aMQPFederationAddressPolicyElement.setName("test-policy");
        aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
        aMQPFederationAddressPolicyElement.setAutoDelete(false);
        aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
        aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
        AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
        aMQPFederatedBrokerConnectionElement.setName(getTestName());
        aMQPFederatedBrokerConnectionElement.addRemoteAddressPolicy(aMQPFederationAddressPolicyElement);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:5673");
        aMQPBrokerConnectConfiguration.setReconnectAttempts(10);
        aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
        this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.remoteServer.start();
        this.server.start();
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, "tcp://localhost:5672");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory(str, "tcp://localhost:5673");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Connection createConnection2 = createConnectionFactory2.createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                Session createSession2 = createConnection2.createSession(1);
                Topic createTopic = createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME);
                MessageConsumer createConsumer = createSession2.createConsumer(createTopic);
                createConnection.start();
                createConnection2.start();
                Wait.assertTrue(() -> {
                    return this.server.addressQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME)).isExists();
                });
                Wait.assertTrue(() -> {
                    return this.remoteServer.addressQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME)).isExists();
                });
                Wait.assertTrue(() -> {
                    return this.server.bindingQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), false).getQueueNames().size() >= 1;
                });
                Wait.assertTrue(() -> {
                    return this.remoteServer.bindingQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), false).getQueueNames().size() >= 1;
                });
                MessageProducer createProducer = createSession.createProducer(createTopic);
                TextMessage createTextMessage = createSession.createTextMessage("Hello World");
                createTextMessage.setStringProperty("testProperty", "testValue");
                createProducer.send(createTextMessage);
                TextMessage receive = createConsumer.receive(5000L);
                Assertions.assertNotNull(receive);
                Assertions.assertTrue(receive instanceof TextMessage);
                Assertions.assertEquals("Hello World", receive.getText());
                Assertions.assertTrue(createTextMessage.propertyExists("testProperty"));
                Assertions.assertEquals("testValue", receive.getStringProperty("testProperty"));
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Timeout(20)
    @Test
    public void testQueueDemandOnRemoteWithRemoteConfigrationLeadsToMessageBeingFederatedAMQP() throws Exception {
        testQueueDemandOnRemoteWithRemoteConfigrationLeadsToMessageBeingFederated("AMQP");
    }

    @Timeout(20)
    @Test
    public void testQueueDemandOnRemoteWithRemoteConfigrationLeadsToMessageBeingFederatedCORE() throws Exception {
        testQueueDemandOnRemoteWithRemoteConfigrationLeadsToMessageBeingFederated("CORE");
    }

    public void testQueueDemandOnRemoteWithRemoteConfigrationLeadsToMessageBeingFederated(String str) throws Exception {
        logger.info("Test started: {}", getTestName());
        AMQPFederationQueuePolicyElement aMQPFederationQueuePolicyElement = new AMQPFederationQueuePolicyElement();
        aMQPFederationQueuePolicyElement.setName("test-policy");
        aMQPFederationQueuePolicyElement.addToIncludes("#", AutoCreateJmsDestinationTest.QUEUE_NAME);
        AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
        aMQPFederatedBrokerConnectionElement.setName(getTestName());
        aMQPFederatedBrokerConnectionElement.addRemoteQueuePolicy(aMQPFederationQueuePolicyElement);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:5673");
        aMQPBrokerConnectConfiguration.setReconnectAttempts(10);
        aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
        this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.remoteServer.start();
        this.server.start();
        this.server.createQueue(QueueConfiguration.of(AutoCreateJmsDestinationTest.QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).setAutoCreated(false));
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, "tcp://localhost:5672");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory(str, "tcp://localhost:5673");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Connection createConnection2 = createConnectionFactory2.createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                Session createSession2 = createConnection2.createSession(1);
                Queue createQueue = createSession.createQueue(AutoCreateJmsDestinationTest.QUEUE_NAME);
                MessageConsumer createConsumer = createSession2.createConsumer(createQueue);
                createConnection.start();
                createConnection2.start();
                Wait.assertTrue(() -> {
                    return this.remoteServer.queueQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME)).isExists();
                });
                MessageProducer createProducer = createSession.createProducer(createQueue);
                TextMessage createTextMessage = createSession.createTextMessage("Hello World");
                createTextMessage.setStringProperty("testProperty", "testValue");
                createProducer.send(createTextMessage);
                TextMessage receive = createConsumer.receive(5000L);
                Assertions.assertNotNull(receive);
                Assertions.assertTrue(receive instanceof TextMessage);
                Assertions.assertEquals("Hello World", receive.getText());
                Assertions.assertTrue(createTextMessage.propertyExists("testProperty"));
                Assertions.assertEquals("testValue", receive.getStringProperty("testProperty"));
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Timeout(20)
    @Test
    public void testDivertAddresDemandOnRemoteBrokerFederatesMessagesFromLocalAMQP() throws Exception {
        testDivertAddresDemandOnRemoteBrokerFederatesMessagesFromLocal("AMQP");
    }

    @Timeout(20)
    @Test
    public void testDivertAddresDemandOnRemoteBrokerFederatesMessagesFromLocalCORE() throws Exception {
        testDivertAddresDemandOnRemoteBrokerFederatesMessagesFromLocal("CORE");
    }

    private void testDivertAddresDemandOnRemoteBrokerFederatesMessagesFromLocal(String str) throws Exception {
        logger.info("Test started: {}", getTestName());
        AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
        aMQPFederationAddressPolicyElement.setName("test-policy");
        aMQPFederationAddressPolicyElement.addToIncludes("source");
        aMQPFederationAddressPolicyElement.setAutoDelete(false);
        aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
        aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
        aMQPFederationAddressPolicyElement.setEnableDivertBindings(true);
        AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
        aMQPFederatedBrokerConnectionElement.setName(getTestName());
        aMQPFederatedBrokerConnectionElement.addRemoteAddressPolicy(aMQPFederationAddressPolicyElement);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:5673");
        aMQPBrokerConnectConfiguration.setReconnectAttempts(10);
        aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
        DivertConfiguration divertConfiguration = new DivertConfiguration();
        divertConfiguration.setName("test-divert");
        divertConfiguration.setAddress("source");
        divertConfiguration.setForwardingAddress("target");
        divertConfiguration.setRoutingType(ComponentConfigurationRoutingType.MULTICAST);
        this.remoteServer.start();
        this.remoteServer.deployDivert(divertConfiguration);
        this.remoteServer.addAddressInfo(new AddressInfo(SimpleString.of("source"), RoutingType.MULTICAST));
        this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.server.start();
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, "tcp://localhost:5672");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory(str, "tcp://localhost:5673");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Connection createConnection2 = createConnectionFactory2.createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                Session createSession2 = createConnection2.createSession(1);
                Topic createTopic = createSession.createTopic("target");
                Topic createTopic2 = createSession.createTopic("source");
                MessageConsumer createConsumer = createSession2.createConsumer(createTopic);
                createConnection.start();
                createConnection2.start();
                Wait.assertTrue(() -> {
                    return this.server.addressQuery(SimpleString.of("source")).isExists();
                });
                Wait.assertTrue(() -> {
                    return this.server.bindingQuery(SimpleString.of("source"), false).getQueueNames().size() >= 1;
                });
                Wait.assertTrue(() -> {
                    return this.remoteServer.bindingQuery(SimpleString.of("target"), false).getQueueNames().size() >= 1;
                });
                MessageProducer createProducer = createSession.createProducer(createTopic2);
                TextMessage createTextMessage = createSession.createTextMessage("Hello World");
                createTextMessage.setStringProperty("testProperty", "testValue");
                createProducer.send(createTextMessage);
                TextMessage receive = createConsumer.receive(5000L);
                Assertions.assertTrue(receive instanceof TextMessage);
                Assertions.assertEquals("Hello World", receive.getText());
                Assertions.assertTrue(createTextMessage.propertyExists("testProperty"));
                Assertions.assertEquals("testValue", receive.getStringProperty("testProperty"));
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Timeout(20)
    @Test
    public void testAddresDemandOnLocalBrokerFederatesLargeMessagesFromRemoteAMQP() throws Exception {
        testAddresDemandOnLocalBrokerFederatesLargeMessagesFromRemote("AMQP", true);
    }

    @Timeout(20)
    @Test
    public void testAddresDemandOnLocalBrokerFederatesLargeMessagesFromRemoteCORENoTunneling() throws Exception {
        testAddresDemandOnLocalBrokerFederatesLargeMessagesFromRemote("CORE", false);
    }

    @Timeout(20)
    @Test
    public void testAddresDemandOnLocalBrokerFederatesLargeMessagesFromRemoteCOREWithTunneling() throws Exception {
        testAddresDemandOnLocalBrokerFederatesLargeMessagesFromRemote("CORE", true);
    }

    private void testAddresDemandOnLocalBrokerFederatesLargeMessagesFromRemote(String str, boolean z) throws Exception {
        logger.info("Test started: {}", getTestName());
        AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
        aMQPFederationAddressPolicyElement.setName("test-policy");
        aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
        aMQPFederationAddressPolicyElement.setAutoDelete(false);
        aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
        aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
        aMQPFederationAddressPolicyElement.addProperty("tunnel-core-messages", Boolean.toString(z));
        AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
        aMQPFederatedBrokerConnectionElement.setName(getTestName());
        aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:5673");
        aMQPBrokerConnectConfiguration.setReconnectAttempts(10);
        aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
        this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.remoteServer.start();
        this.server.start();
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, "tcp://localhost:5672");
        ConnectionFactory createConnectionFactory2 = str.equals("CORE") ? CFUtil.createConnectionFactory(str, "tcp://localhost:5673?minLargeMessageSize=10240") : CFUtil.createConnectionFactory(str, "tcp://localhost:5673");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Connection createConnection2 = createConnectionFactory2.createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                Session createSession2 = createConnection2.createSession(1);
                Topic createTopic = createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME);
                MessageConsumer createConsumer = createSession.createConsumer(createTopic);
                createConnection.start();
                createConnection2.start();
                Wait.assertTrue(() -> {
                    return this.server.addressQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME)).isExists();
                });
                Wait.assertTrue(() -> {
                    return this.remoteServer.addressQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME)).isExists();
                });
                Wait.assertTrue(() -> {
                    return this.server.bindingQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), false).getQueueNames().size() >= 1;
                });
                Wait.assertTrue(() -> {
                    return this.remoteServer.bindingQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), false).getQueueNames().size() >= 1;
                });
                MessageProducer createProducer = createSession2.createProducer(createTopic);
                BytesMessage createBytesMessage = createSession2.createBytesMessage();
                byte[] bArr = new byte[15360];
                Arrays.fill(bArr, (byte) 1);
                createBytesMessage.writeBytes(bArr);
                createBytesMessage.setStringProperty("testProperty", "testValue");
                createProducer.send(createBytesMessage);
                BytesMessage receive = createConsumer.receive(5000L);
                Assertions.assertNotNull(receive);
                Assertions.assertTrue(receive instanceof BytesMessage);
                byte[] bArr2 = new byte[bArr.length];
                receive.readBytes(bArr2);
                Assertions.assertArrayEquals(bArr, bArr2);
                Assertions.assertTrue(createBytesMessage.propertyExists("testProperty"));
                Assertions.assertEquals("testValue", receive.getStringProperty("testProperty"));
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Timeout(20)
    @Test
    public void testQueueDemandOnLocalBrokerFederatesLargeMessagesFromRemoteAMQP() throws Exception {
        testQueueDemandOnLocalBrokerFederatesLargeMessagesFromRemote("AMQP", true);
    }

    @Timeout(20)
    @Test
    public void testQueueDemandOnLocalBrokerFederatesLargeMessagesFromRemoteCORENoTunneling() throws Exception {
        testQueueDemandOnLocalBrokerFederatesLargeMessagesFromRemote("CORE", false);
    }

    @Test
    public void testQueueDemandOnLocalBrokerFederatesLargeMessagesFromRemoteCOREWithTunneling() throws Exception {
        testQueueDemandOnLocalBrokerFederatesLargeMessagesFromRemote("CORE", true);
    }

    private void testQueueDemandOnLocalBrokerFederatesLargeMessagesFromRemote(String str, boolean z) throws Exception {
        logger.info("Test started: {}", getTestName());
        AMQPFederationQueuePolicyElement aMQPFederationQueuePolicyElement = new AMQPFederationQueuePolicyElement();
        aMQPFederationQueuePolicyElement.setName("test-policy");
        aMQPFederationQueuePolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME, AutoCreateJmsDestinationTest.QUEUE_NAME);
        aMQPFederationQueuePolicyElement.addProperty("tunnel-core-messages", Boolean.toString(z));
        AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
        aMQPFederatedBrokerConnectionElement.setName(getTestName());
        aMQPFederatedBrokerConnectionElement.addLocalQueuePolicy(aMQPFederationQueuePolicyElement);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:5673");
        aMQPBrokerConnectConfiguration.setReconnectAttempts(10);
        aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
        this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.remoteServer.start();
        this.remoteServer.createQueue(QueueConfiguration.of(AutoCreateJmsDestinationTest.QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).setAutoCreated(false));
        this.server.start();
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, "tcp://localhost:5672");
        ConnectionFactory createConnectionFactory2 = str.equals("CORE") ? CFUtil.createConnectionFactory(str, "tcp://localhost:5673?minLargeMessageSize=10240") : CFUtil.createConnectionFactory(str, "tcp://localhost:5673");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Connection createConnection2 = createConnectionFactory2.createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                Session createSession2 = createConnection2.createSession(1);
                Queue createQueue = createSession.createQueue(AutoCreateJmsDestinationTest.QUEUE_NAME);
                MessageConsumer createConsumer = createSession.createConsumer(createQueue);
                createConnection.start();
                createConnection2.start();
                Wait.assertTrue(() -> {
                    return this.server.queueQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME)).isExists();
                });
                Wait.assertTrue(() -> {
                    return this.remoteServer.queueQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME)).isExists();
                });
                MessageProducer createProducer = createSession2.createProducer(createQueue);
                BytesMessage createBytesMessage = createSession2.createBytesMessage();
                byte[] bArr = new byte[15360];
                Arrays.fill(bArr, (byte) 1);
                createBytesMessage.writeBytes(bArr);
                createBytesMessage.setStringProperty("testProperty", "testValue");
                createProducer.send(createBytesMessage);
                BytesMessage receive = createConsumer.receive(500000L);
                Assertions.assertNotNull(receive);
                Assertions.assertTrue(receive instanceof BytesMessage);
                byte[] bArr2 = new byte[bArr.length];
                receive.readBytes(bArr2);
                Assertions.assertArrayEquals(bArr, bArr2);
                Assertions.assertTrue(createBytesMessage.propertyExists("testProperty"));
                Assertions.assertEquals("testValue", receive.getStringProperty("testProperty"));
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Timeout(20)
    @Test
    public void testCoreMessageCrossingAddressWithThreeBrokersWithoutTunneling() throws Exception {
        doTestCoreMessageCrossingAddressWithThreeBrokers(false);
    }

    @Timeout(20)
    @Test
    public void testCoreMessageCrossingAddressWithThreeBrokersWithTunneling() throws Exception {
        doTestCoreMessageCrossingAddressWithThreeBrokers(true);
    }

    private void doTestCoreMessageCrossingAddressWithThreeBrokers(boolean z) throws Exception {
        logger.info("Test started: {}", getTestName());
        this.remoteServer2 = createServer(SERVER2_PORT_REMOTE, false);
        SimpleString of = SimpleString.of("target");
        AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
        aMQPFederationAddressPolicyElement.setName("two-hop-policy");
        aMQPFederationAddressPolicyElement.addToIncludes("target");
        aMQPFederationAddressPolicyElement.setAutoDelete(false);
        aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
        aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
        aMQPFederationAddressPolicyElement.setMaxHops(2);
        aMQPFederationAddressPolicyElement.addProperty("tunnel-core-messages", Boolean.toString(z));
        AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
        aMQPFederatedBrokerConnectionElement.setName(getTestName());
        aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:5673");
        aMQPBrokerConnectConfiguration.setReconnectAttempts(10);
        aMQPBrokerConnectConfiguration.setRetryInterval(100);
        aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration2 = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:5674");
        aMQPBrokerConnectConfiguration2.setReconnectAttempts(10);
        aMQPBrokerConnectConfiguration.setRetryInterval(100);
        aMQPBrokerConnectConfiguration2.addElement(aMQPFederatedBrokerConnectionElement);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration3 = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:5672");
        aMQPBrokerConnectConfiguration3.setReconnectAttempts(10);
        aMQPBrokerConnectConfiguration.setRetryInterval(100);
        aMQPBrokerConnectConfiguration3.addElement(aMQPFederatedBrokerConnectionElement);
        this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.remoteServer.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration2);
        this.remoteServer2.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration3);
        this.server.start();
        this.remoteServer.start();
        this.remoteServer2.start();
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:5672");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory("CORE", "tcp://localhost:5673");
        ConnectionFactory createConnectionFactory3 = CFUtil.createConnectionFactory("CORE", "tcp://localhost:5674");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Connection createConnection2 = createConnectionFactory2.createConnection();
            try {
                Connection createConnection3 = createConnectionFactory3.createConnection();
                try {
                    Session createSession = createConnection.createSession(1);
                    Session createSession2 = createConnection2.createSession(1);
                    Session createSession3 = createConnection3.createSession(1);
                    Topic createTopic = createSession.createTopic("target");
                    MessageConsumer createConsumer = createSession.createConsumer(createTopic);
                    MessageConsumer createConsumer2 = createSession2.createConsumer(createTopic);
                    MessageConsumer createConsumer3 = createSession3.createConsumer(createTopic);
                    MessageProducer createProducer = createSession.createProducer(createTopic);
                    MessageProducer createProducer2 = createSession2.createProducer(createTopic);
                    MessageProducer createProducer3 = createSession3.createProducer(createTopic);
                    TextMessage createTextMessage = createSession.createTextMessage("Message1");
                    createTextMessage.setStringProperty(AutoCreateJmsDestinationTest.QUEUE_NAME, "1");
                    TextMessage createTextMessage2 = createSession2.createTextMessage("Message2");
                    createTextMessage2.setStringProperty(AutoCreateJmsDestinationTest.QUEUE_NAME, "2");
                    TextMessage createTextMessage3 = createSession3.createTextMessage("Message3");
                    createTextMessage3.setStringProperty(AutoCreateJmsDestinationTest.QUEUE_NAME, "3");
                    createConnection.start();
                    createConnection2.start();
                    createConnection3.start();
                    Wait.assertTrue(() -> {
                        return this.server.bindingQuery(of).getQueueNames().size() == 2;
                    });
                    Wait.assertTrue(() -> {
                        return this.remoteServer.bindingQuery(of).getQueueNames().size() == 2;
                    });
                    Wait.assertTrue(() -> {
                        return this.remoteServer2.bindingQuery(of).getQueueNames().size() == 2;
                    });
                    createProducer.send(createTextMessage);
                    TextMessage receive = createConsumer.receive(2000L);
                    Assertions.assertNotNull(receive);
                    Assertions.assertTrue(receive instanceof TextMessage);
                    Assertions.assertEquals("Message1", receive.getText());
                    Assertions.assertTrue(receive.propertyExists(AutoCreateJmsDestinationTest.QUEUE_NAME));
                    Assertions.assertEquals("1", receive.getStringProperty(AutoCreateJmsDestinationTest.QUEUE_NAME));
                    TextMessage receive2 = createConsumer2.receive(2000L);
                    Assertions.assertNotNull(receive2);
                    Assertions.assertTrue(receive2 instanceof TextMessage);
                    Assertions.assertEquals("Message1", receive2.getText());
                    Assertions.assertTrue(receive2.propertyExists(AutoCreateJmsDestinationTest.QUEUE_NAME));
                    Assertions.assertEquals("1", receive2.getStringProperty(AutoCreateJmsDestinationTest.QUEUE_NAME));
                    TextMessage receive3 = createConsumer3.receive(2000L);
                    Assertions.assertNotNull(receive3);
                    Assertions.assertTrue(receive3 instanceof TextMessage);
                    Assertions.assertEquals("Message1", receive3.getText());
                    Assertions.assertTrue(receive3.propertyExists(AutoCreateJmsDestinationTest.QUEUE_NAME));
                    Assertions.assertEquals("1", receive3.getStringProperty(AutoCreateJmsDestinationTest.QUEUE_NAME));
                    Assertions.assertNull(createConsumer.receive(100L));
                    Assertions.assertNull(createConsumer2.receive(100L));
                    Assertions.assertNull(createConsumer3.receive(100L));
                    createProducer2.send(createTextMessage2);
                    TextMessage receive4 = createConsumer.receive(2000L);
                    Assertions.assertNotNull(receive4);
                    Assertions.assertTrue(receive4 instanceof TextMessage);
                    Assertions.assertEquals("Message2", receive4.getText());
                    Assertions.assertTrue(receive4.propertyExists(AutoCreateJmsDestinationTest.QUEUE_NAME));
                    Assertions.assertEquals("2", receive4.getStringProperty(AutoCreateJmsDestinationTest.QUEUE_NAME));
                    TextMessage receive5 = createConsumer2.receive(2000L);
                    Assertions.assertNotNull(receive5);
                    Assertions.assertTrue(receive5 instanceof TextMessage);
                    Assertions.assertEquals("Message2", receive5.getText());
                    Assertions.assertTrue(receive5.propertyExists(AutoCreateJmsDestinationTest.QUEUE_NAME));
                    Assertions.assertEquals("2", receive5.getStringProperty(AutoCreateJmsDestinationTest.QUEUE_NAME));
                    TextMessage receive6 = createConsumer3.receive(2000L);
                    Assertions.assertNotNull(receive6);
                    Assertions.assertTrue(receive6 instanceof TextMessage);
                    Assertions.assertEquals("Message2", receive6.getText());
                    Assertions.assertTrue(receive6.propertyExists(AutoCreateJmsDestinationTest.QUEUE_NAME));
                    Assertions.assertEquals("2", receive6.getStringProperty(AutoCreateJmsDestinationTest.QUEUE_NAME));
                    Assertions.assertNull(createConsumer.receiveNoWait());
                    Assertions.assertNull(createConsumer2.receiveNoWait());
                    Assertions.assertNull(createConsumer3.receiveNoWait());
                    createProducer3.send(createTextMessage3);
                    TextMessage receive7 = createConsumer.receive(2000L);
                    Assertions.assertNotNull(receive7);
                    Assertions.assertTrue(receive7 instanceof TextMessage);
                    Assertions.assertEquals("Message3", receive7.getText());
                    Assertions.assertTrue(receive7.propertyExists(AutoCreateJmsDestinationTest.QUEUE_NAME));
                    Assertions.assertEquals("3", receive7.getStringProperty(AutoCreateJmsDestinationTest.QUEUE_NAME));
                    TextMessage receive8 = createConsumer2.receive(2000L);
                    Assertions.assertNotNull(receive8);
                    Assertions.assertTrue(receive8 instanceof TextMessage);
                    Assertions.assertEquals("Message3", receive8.getText());
                    Assertions.assertTrue(receive8.propertyExists(AutoCreateJmsDestinationTest.QUEUE_NAME));
                    Assertions.assertEquals("3", receive8.getStringProperty(AutoCreateJmsDestinationTest.QUEUE_NAME));
                    TextMessage receive9 = createConsumer3.receive(2000L);
                    Assertions.assertNotNull(receive9);
                    Assertions.assertTrue(receive9 instanceof TextMessage);
                    Assertions.assertEquals("Message3", receive9.getText());
                    Assertions.assertTrue(receive9.propertyExists(AutoCreateJmsDestinationTest.QUEUE_NAME));
                    Assertions.assertEquals("3", receive9.getStringProperty(AutoCreateJmsDestinationTest.QUEUE_NAME));
                    Assertions.assertNull(createConsumer.receiveNoWait());
                    Assertions.assertNull(createConsumer2.receiveNoWait());
                    Assertions.assertNull(createConsumer3.receiveNoWait());
                    if (createConnection3 != null) {
                        createConnection3.close();
                    }
                    if (createConnection2 != null) {
                        createConnection2.close();
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } catch (Throwable th) {
                    if (createConnection3 != null) {
                        try {
                            createConnection3.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        } catch (Throwable th5) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th6) {
                    th5.addSuppressed(th6);
                }
            }
            throw th5;
        }
    }

    @Timeout(20)
    @Test
    public void testCoreConsumerDemandOnLocalBrokerFederatesMessageFromAMQPClient() throws Exception {
        testCoreConsumerDemandOnLocalBrokerFederatesMessageFromAMQPClient("CORE", "AMQP", false);
    }

    @Timeout(20)
    @Test
    public void testCoreConsumerDemandOnLocalBrokerFederatesMessageFromCoreClientTunneled() throws Exception {
        testCoreConsumerDemandOnLocalBrokerFederatesMessageFromAMQPClient("CORE", "CORE", true);
    }

    @Timeout(20)
    @Test
    public void testCoreConsumerDemandOnLocalBrokerFederatesMessageFromCoreClientUnTunneled() throws Exception {
        testCoreConsumerDemandOnLocalBrokerFederatesMessageFromAMQPClient("CORE", "CORE", false);
    }

    @Timeout(20)
    @Test
    public void testAMQPConsumerDemandOnLocalBrokerFederatesMessageFromCoreClientTunneled() throws Exception {
        testCoreConsumerDemandOnLocalBrokerFederatesMessageFromAMQPClient("AMQP", "CORE", true);
    }

    @Timeout(20)
    @Test
    public void testAMQPConsumerDemandOnLocalBrokerFederatesMessageFromCoreClientNotTunneled() throws Exception {
        testCoreConsumerDemandOnLocalBrokerFederatesMessageFromAMQPClient("AMQP", "CORE", false);
    }

    private void testCoreConsumerDemandOnLocalBrokerFederatesMessageFromAMQPClient(String str, String str2, boolean z) throws Exception {
        logger.info("Test started: {}", getTestName());
        AMQPFederationQueuePolicyElement aMQPFederationQueuePolicyElement = new AMQPFederationQueuePolicyElement();
        aMQPFederationQueuePolicyElement.setName("test-policy");
        aMQPFederationQueuePolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME, AutoCreateJmsDestinationTest.QUEUE_NAME);
        aMQPFederationQueuePolicyElement.addProperty("tunnel-core-messages", Boolean.toString(z));
        AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
        aMQPFederatedBrokerConnectionElement.setName(getTestName());
        aMQPFederatedBrokerConnectionElement.addLocalQueuePolicy(aMQPFederationQueuePolicyElement);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:5673");
        aMQPBrokerConnectConfiguration.setReconnectAttempts(10);
        aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
        this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.remoteServer.start();
        this.remoteServer.createQueue(QueueConfiguration.of(AutoCreateJmsDestinationTest.QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).setAutoCreated(false));
        this.server.start();
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, "tcp://localhost:5672");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory(str2, "tcp://localhost:5673");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Connection createConnection2 = createConnectionFactory2.createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                Session createSession2 = createConnection2.createSession(1);
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(AutoCreateJmsDestinationTest.QUEUE_NAME));
                createConnection.start();
                createConnection2.start();
                Wait.assertTrue(() -> {
                    return this.server.queueQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME)).isExists();
                });
                Wait.assertTrue(() -> {
                    return this.remoteServer.queueQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME)).isExists();
                });
                MessageProducer createProducer = createSession2.createProducer(createSession2.createQueue(AutoCreateJmsDestinationTest.QUEUE_NAME));
                BytesMessage createBytesMessage = createSession2.createBytesMessage();
                byte[] bArr = new byte[15360];
                Arrays.fill(bArr, (byte) 1);
                createBytesMessage.writeBytes(bArr);
                createBytesMessage.setStringProperty("testProperty", "testValue");
                createBytesMessage.setIntProperty("testIntProperty", 42);
                createBytesMessage.setJMSCorrelationID("myCorrelationId");
                createBytesMessage.setJMSReplyTo(createSession2.createTopic("reply-topic"));
                createProducer.setDeliveryMode(2);
                createProducer.send(createBytesMessage);
                BytesMessage receive = createConsumer.receive(5000L);
                Assertions.assertNotNull(receive);
                Assertions.assertTrue(receive instanceof BytesMessage);
                byte[] bArr2 = new byte[bArr.length];
                receive.readBytes(bArr2);
                Assertions.assertArrayEquals(bArr, bArr2);
                Assertions.assertTrue(createBytesMessage.propertyExists("testProperty"));
                Assertions.assertEquals("testValue", receive.getStringProperty("testProperty"));
                Assertions.assertTrue(createBytesMessage.propertyExists("testIntProperty"));
                Assertions.assertEquals(42, receive.getIntProperty("testIntProperty"));
                Assertions.assertEquals("myCorrelationId", receive.getJMSCorrelationID());
                Assertions.assertEquals("reply-topic", receive.getJMSReplyTo().getTopicName());
                Assertions.assertEquals(2, receive.getJMSDeliveryMode());
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Timeout(20)
    @Test
    public void testQueueDemandOnLocalBrokerFederatesMatchingFilteredMessagesFromRemoteAMQP() throws Exception {
        testQueueDemandOnLocalBrokerFederatesMatchingFilteredMessagesFromRemote("AMQP");
    }

    @Timeout(20)
    @Test
    public void testQueueDemandOnLocalBrokerFederatesMatchingFilteredMessagesFromRemoteCORE() throws Exception {
        testQueueDemandOnLocalBrokerFederatesMatchingFilteredMessagesFromRemote("CORE");
    }

    private void testQueueDemandOnLocalBrokerFederatesMatchingFilteredMessagesFromRemote(String str) throws Exception {
        logger.info("Test started: {}", getTestName());
        AMQPFederationQueuePolicyElement aMQPFederationQueuePolicyElement = new AMQPFederationQueuePolicyElement();
        aMQPFederationQueuePolicyElement.setName("test-policy");
        aMQPFederationQueuePolicyElement.addToIncludes("#", AutoCreateJmsDestinationTest.QUEUE_NAME);
        AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
        aMQPFederatedBrokerConnectionElement.setName(getTestName());
        aMQPFederatedBrokerConnectionElement.addLocalQueuePolicy(aMQPFederationQueuePolicyElement);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:5673");
        aMQPBrokerConnectConfiguration.setReconnectAttempts(10);
        aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
        this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.remoteServer.start();
        this.remoteServer.createQueue(QueueConfiguration.of(AutoCreateJmsDestinationTest.QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).setFilterString("color='red' OR color='green' OR color='blue'").setAutoCreated(false));
        this.server.start();
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, "tcp://localhost:5672");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory(str, "tcp://localhost:5673");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Connection createConnection2 = createConnectionFactory2.createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                Session createSession2 = createConnection2.createSession(1);
                Queue createQueue = createSession.createQueue(AutoCreateJmsDestinationTest.QUEUE_NAME);
                MessageConsumer createConsumer = createSession.createConsumer(createQueue, "color='red'");
                MessageConsumer createConsumer2 = createSession.createConsumer(createQueue, "color='blue'");
                createConnection.start();
                createConnection2.start();
                Wait.assertTrue(() -> {
                    return this.server.queueQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME)).isExists();
                });
                MessageProducer createProducer = createSession2.createProducer(createQueue);
                TextMessage createTextMessage = createSession2.createTextMessage("Hello World 1");
                createTextMessage.setStringProperty("color", "green");
                TextMessage createTextMessage2 = createSession2.createTextMessage("Hello World 2");
                createTextMessage2.setStringProperty("color", "red");
                TextMessage createTextMessage3 = createSession2.createTextMessage("Hello World 3");
                createTextMessage3.setStringProperty("color", "blue");
                createProducer.send(createTextMessage);
                createProducer.send(createTextMessage2);
                createProducer.send(createTextMessage3);
                TextMessage receive = createConsumer.receive(5000L);
                Assertions.assertNotNull(receive);
                Assertions.assertTrue(receive instanceof TextMessage);
                Assertions.assertEquals("Hello World 2", receive.getText());
                Assertions.assertTrue(receive.propertyExists("color"));
                Assertions.assertEquals("red", receive.getStringProperty("color"));
                TextMessage receive2 = createConsumer2.receive(5000L);
                Assertions.assertNotNull(receive2);
                Assertions.assertTrue(receive2 instanceof TextMessage);
                Assertions.assertEquals("Hello World 3", receive2.getText());
                Assertions.assertTrue(receive2.propertyExists("color"));
                Assertions.assertEquals("blue", receive2.getStringProperty("color"));
                TextMessage receive3 = createSession2.createConsumer(createQueue, "color='green'").receive(5000L);
                Assertions.assertNotNull(receive3);
                Assertions.assertTrue(receive3 instanceof TextMessage);
                Assertions.assertEquals("Hello World 1", receive3.getText());
                Assertions.assertTrue(receive3.propertyExists("color"));
                Assertions.assertEquals("green", receive3.getStringProperty("color"));
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Timeout(20)
    @Test
    public void testAddressFederatedOverSingleConnectionNotReflectedBackToSendingNodeAMQP() throws Exception {
        doTestAddressFederatedOverSingleConnectionNotReflectedBackToSendingNode("AMQP");
    }

    @Timeout(20)
    @Test
    public void testAddressFederatedOverSingleConnectionNotReflectedBackToSendingNodeCore() throws Exception {
        doTestAddressFederatedOverSingleConnectionNotReflectedBackToSendingNode("CORE");
    }

    private void doTestAddressFederatedOverSingleConnectionNotReflectedBackToSendingNode(String str) throws Exception {
        logger.info("Test started: {}", getTestName());
        AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
        aMQPFederationAddressPolicyElement.setName("local-test-policy");
        aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
        aMQPFederationAddressPolicyElement.setAutoDelete(false);
        aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
        aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
        aMQPFederationAddressPolicyElement.setMaxHops(0);
        AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement2 = new AMQPFederationAddressPolicyElement();
        aMQPFederationAddressPolicyElement2.setName("remote-test-policy");
        aMQPFederationAddressPolicyElement2.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
        aMQPFederationAddressPolicyElement2.setAutoDelete(false);
        aMQPFederationAddressPolicyElement2.setAutoDeleteDelay(-1L);
        aMQPFederationAddressPolicyElement2.setAutoDeleteMessageCount(-1L);
        aMQPFederationAddressPolicyElement2.setMaxHops(0);
        AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
        aMQPFederatedBrokerConnectionElement.setName(getTestName());
        aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
        aMQPFederatedBrokerConnectionElement.addRemoteAddressPolicy(aMQPFederationAddressPolicyElement2);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:5673");
        aMQPBrokerConnectConfiguration.setReconnectAttempts(10);
        aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
        this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.remoteServer.start();
        this.server.start();
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, "tcp://localhost:5672");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory(str, "tcp://localhost:5673");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Connection createConnection2 = createConnectionFactory2.createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                Session createSession2 = createConnection2.createSession(1);
                Topic createTopic = createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME);
                MessageConsumer createConsumer = createSession.createConsumer(createTopic);
                MessageConsumer createConsumer2 = createSession2.createConsumer(createTopic);
                MessageProducer createProducer = createSession.createProducer(createTopic);
                MessageProducer createProducer2 = createSession2.createProducer(createTopic);
                TextMessage createTextMessage = createSession.createTextMessage("local");
                TextMessage createTextMessage2 = createSession2.createTextMessage("remote");
                createConnection.start();
                createConnection2.start();
                SimpleString of = SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME);
                Wait.assertTrue(() -> {
                    return this.server.addressQuery(of).isExists();
                });
                Wait.assertTrue(() -> {
                    return this.remoteServer.addressQuery(of).isExists();
                });
                Assertions.assertNull(createConsumer.receiveNoWait());
                Assertions.assertNull(createConsumer2.receiveNoWait());
                Wait.assertTrue(() -> {
                    return this.server.bindingQuery(of, false).getQueueNames().size() >= 2;
                });
                Wait.assertTrue(() -> {
                    return this.remoteServer.bindingQuery(of, false).getQueueNames().size() >= 2;
                });
                createProducer.send(createTextMessage);
                TextMessage receive = createConsumer.receive();
                TextMessage receive2 = createConsumer2.receive();
                Assertions.assertNotNull(receive);
                Assertions.assertNotNull(receive2);
                Assertions.assertTrue(receive instanceof TextMessage);
                Assertions.assertTrue(receive2 instanceof TextMessage);
                Assertions.assertEquals("local", receive.getText());
                Assertions.assertEquals("local", receive2.getText());
                createProducer2.send(createTextMessage2);
                TextMessage receive3 = createConsumer.receive();
                TextMessage receive4 = createConsumer2.receive();
                Assertions.assertNotNull(receive3);
                Assertions.assertNotNull(receive4);
                Assertions.assertTrue(receive3 instanceof TextMessage);
                Assertions.assertTrue(receive4 instanceof TextMessage);
                Assertions.assertEquals("remote", receive3.getText());
                Assertions.assertEquals("remote", receive4.getText());
                Assertions.assertNull(createConsumer.receiveNoWait());
                Assertions.assertNull(createConsumer2.receiveNoWait());
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Timeout(20)
    @Test
    public void testAddressFederatedOnTwoConnectionsNotReflectedBackToSendingNodeAMQP() throws Exception {
        doTestAddressFederatedOverTwoConnectionNotReflectedBackToSendingNode("AMQP");
    }

    @Timeout(20)
    @Test
    public void testAddressFederatedOnTwoConnectionsNotReflectedBackToSendingNodeCore() throws Exception {
        doTestAddressFederatedOverTwoConnectionNotReflectedBackToSendingNode("CORE");
    }

    private void doTestAddressFederatedOverTwoConnectionNotReflectedBackToSendingNode(String str) throws Exception {
        logger.info("Test started: {}", getTestName());
        AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
        aMQPFederationAddressPolicyElement.setName("local-test-policy");
        aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
        aMQPFederationAddressPolicyElement.setAutoDelete(false);
        aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
        aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
        aMQPFederationAddressPolicyElement.setMaxHops(0);
        AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement2 = new AMQPFederationAddressPolicyElement();
        aMQPFederationAddressPolicyElement2.setName("remote-test-policy");
        aMQPFederationAddressPolicyElement2.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
        aMQPFederationAddressPolicyElement2.setAutoDelete(false);
        aMQPFederationAddressPolicyElement2.setAutoDeleteDelay(-1L);
        aMQPFederationAddressPolicyElement2.setAutoDeleteMessageCount(-1L);
        aMQPFederationAddressPolicyElement2.setMaxHops(0);
        AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
        aMQPFederatedBrokerConnectionElement.setName(getTestName() + ":1");
        aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
        AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement2 = new AMQPFederatedBrokerConnectionElement();
        aMQPFederatedBrokerConnectionElement2.setName(getTestName() + "2");
        aMQPFederatedBrokerConnectionElement2.addLocalAddressPolicy(aMQPFederationAddressPolicyElement2);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:5673");
        aMQPBrokerConnectConfiguration.setReconnectAttempts(10);
        aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration2 = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:5672");
        aMQPBrokerConnectConfiguration2.setReconnectAttempts(10);
        aMQPBrokerConnectConfiguration2.addElement(aMQPFederatedBrokerConnectionElement2);
        this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.remoteServer.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration2);
        this.remoteServer.start();
        this.server.start();
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, "tcp://localhost:5672");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory(str, "tcp://localhost:5673");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Connection createConnection2 = createConnectionFactory2.createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                Session createSession2 = createConnection2.createSession(1);
                Topic createTopic = createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME);
                MessageConsumer createConsumer = createSession.createConsumer(createTopic);
                MessageConsumer createConsumer2 = createSession2.createConsumer(createTopic);
                MessageProducer createProducer = createSession.createProducer(createTopic);
                MessageProducer createProducer2 = createSession2.createProducer(createTopic);
                TextMessage createTextMessage = createSession.createTextMessage("local");
                TextMessage createTextMessage2 = createSession2.createTextMessage("remote");
                createConnection.start();
                createConnection2.start();
                SimpleString of = SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME);
                Wait.assertTrue(() -> {
                    return this.server.addressQuery(of).isExists();
                });
                Wait.assertTrue(() -> {
                    return this.remoteServer.addressQuery(of).isExists();
                });
                Assertions.assertNull(createConsumer.receiveNoWait());
                Assertions.assertNull(createConsumer2.receiveNoWait());
                Wait.assertTrue(() -> {
                    return this.server.bindingQuery(of, false).getQueueNames().size() >= 2;
                });
                Wait.assertTrue(() -> {
                    return this.remoteServer.bindingQuery(of, false).getQueueNames().size() >= 2;
                });
                createProducer.send(createTextMessage);
                TextMessage receive = createConsumer.receive();
                TextMessage receive2 = createConsumer2.receive();
                Assertions.assertNotNull(receive);
                Assertions.assertNotNull(receive2);
                Assertions.assertTrue(receive instanceof TextMessage);
                Assertions.assertTrue(receive2 instanceof TextMessage);
                Assertions.assertEquals("local", receive.getText());
                Assertions.assertEquals("local", receive2.getText());
                createProducer2.send(createTextMessage2);
                TextMessage receive3 = createConsumer.receive();
                TextMessage receive4 = createConsumer2.receive();
                Assertions.assertNotNull(receive3);
                Assertions.assertNotNull(receive4);
                Assertions.assertTrue(receive3 instanceof TextMessage);
                Assertions.assertTrue(receive4 instanceof TextMessage);
                Assertions.assertEquals("remote", receive3.getText());
                Assertions.assertEquals("remote", receive4.getText());
                Assertions.assertNull(createConsumer.receiveNoWait());
                Assertions.assertNull(createConsumer2.receiveNoWait());
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }
}
