package org.apache.activemq.artemis.tests.integration.amqp.connect;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.net.URL;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationPolicySupport;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.integration.client.AutoCreateJmsDestinationTest;
import org.apache.activemq.artemis.tests.integration.jms.RedeployTest;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.transport.netty.NettyTransportOptions;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConfigurationReloadTest.class */
public class AMQPFederationConfigurationReloadTest extends AmqpClientTestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    protected String getConfiguredProtocols() {
        return "AMQP,CORE";
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    protected ActiveMQServer createServer() throws Exception {
        return createServer(NettyTransportOptions.DEFAULT_TCP_PORT, false);
    }

    @Timeout(20)
    @Test
    public void testFederationConfigurationWithoutChangesIsIgnoredOnUpdate() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setAutoDelete(true);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(10000L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            HashMap hashMap = new HashMap();
            hashMap.put("auto-delete", true);
            hashMap.put("auto-delete-delay", 10000L);
            hashMap.put("auto-delete-msg-count", -1L);
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(1000);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                ProtonProtocolManagerFactory protonProtocolManagerFactory = (ProtonProtocolManagerFactory) this.server.getRemotingService().getProtocolFactoryMap().get("AMQP");
                Assertions.assertNotNull(protonProtocolManagerFactory);
                AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement2 = new AMQPFederationAddressPolicyElement();
                aMQPFederationAddressPolicyElement2.setName("address-policy");
                aMQPFederationAddressPolicyElement2.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
                aMQPFederationAddressPolicyElement2.setAutoDelete(true);
                aMQPFederationAddressPolicyElement2.setAutoDeleteDelay(10000L);
                aMQPFederationAddressPolicyElement2.setAutoDeleteMessageCount(-1L);
                AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement2 = new AMQPFederatedBrokerConnectionElement();
                aMQPFederatedBrokerConnectionElement2.setName(getTestName());
                aMQPFederatedBrokerConnectionElement2.addLocalAddressPolicy(aMQPFederationAddressPolicyElement2);
                aMQPBrokerConnectConfiguration.getConnectionElements().clear();
                aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement2);
                this.server.getConfiguration().getAMQPConnection().clear();
                this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
                protonProtocolManagerFactory.updateProtocolServices(this.server, Collections.emptyList());
                createConnection.start();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectDetach().respond();
                createConsumer.close();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationConnectsToSecondPeerWhenConfigurationUpdatedWithNewConnection() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setAutoDelete(true);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(10000L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName() + ":1");
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName() + ":1", "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            HashMap hashMap = new HashMap();
            hashMap.put("auto-delete", true);
            hashMap.put("auto-delete-delay", 10000L);
            hashMap.put("auto-delete-msg-count", -1L);
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName() + ":1"), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(1000);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                createConnection.start();
                protonTestServer = new ProtonTestServer();
                try {
                    protonTestServer.expectSASLAnonymousConnect();
                    protonTestServer.expectOpen().respond();
                    protonTestServer.expectBegin().respond();
                    protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
                    protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
                    protonTestServer.expectFlow().withLinkCredit(10);
                    protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName() + ":2"), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
                    protonTestServer.expectFlow().withLinkCredit(1000);
                    protonTestServer.start();
                    URI serverURI2 = protonTestServer.getServerURI();
                    logger.info("Test peer 2 started, peer listening on: {}", serverURI2);
                    ProtonProtocolManagerFactory protonProtocolManagerFactory = (ProtonProtocolManagerFactory) this.server.getRemotingService().getProtocolFactoryMap().get("AMQP");
                    Assertions.assertNotNull(protonProtocolManagerFactory);
                    AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement2 = new AMQPFederationAddressPolicyElement();
                    aMQPFederationAddressPolicyElement2.setName("address-policy");
                    aMQPFederationAddressPolicyElement2.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
                    aMQPFederationAddressPolicyElement2.setAutoDelete(true);
                    aMQPFederationAddressPolicyElement2.setAutoDeleteDelay(10000L);
                    aMQPFederationAddressPolicyElement2.setAutoDeleteMessageCount(-1L);
                    AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement2 = new AMQPFederatedBrokerConnectionElement();
                    aMQPFederatedBrokerConnectionElement2.setName(getTestName() + ":2");
                    aMQPFederatedBrokerConnectionElement2.addLocalAddressPolicy(aMQPFederationAddressPolicyElement2);
                    AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration2 = new AMQPBrokerConnectConfiguration(getTestName() + ":2", "tcp://" + serverURI2.getHost() + ":" + serverURI2.getPort());
                    aMQPBrokerConnectConfiguration2.setReconnectAttempts(0);
                    aMQPBrokerConnectConfiguration2.addElement(aMQPFederatedBrokerConnectionElement2);
                    this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration2);
                    protonProtocolManagerFactory.updateProtocolServices(this.server, Collections.emptyList());
                    protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                    protonTestServer.close();
                    protonTestServer.close();
                    protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                    protonTestServer.close();
                    if (createConnection != null) {
                        createConnection.close();
                    }
                    protonTestServer.close();
                } finally {
                    try {
                        protonTestServer.close();
                    } catch (Throwable th) {
                        th.addSuppressed(th);
                    }
                }
            } finally {
            }
        } catch (Throwable th2) {
            throw th2;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationDisconnectsFromExistingPeerIfConfigurationRemoved() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setAutoDelete(true);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(10000L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            HashMap hashMap = new HashMap();
            hashMap.put("auto-delete", true);
            hashMap.put("auto-delete-delay", 10000L);
            hashMap.put("auto-delete-msg-count", -1L);
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(1000);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                createConnection.start();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectDetach().optional();
                protonTestServer.expectClose().optional();
                protonTestServer.expectConnectionToDrop();
                ProtonProtocolManagerFactory protonProtocolManagerFactory = (ProtonProtocolManagerFactory) this.server.getRemotingService().getProtocolFactoryMap().get("AMQP");
                Assertions.assertNotNull(protonProtocolManagerFactory);
                this.server.getConfiguration().clearAMQPConnectionConfigurations();
                protonProtocolManagerFactory.updateProtocolServices(this.server, Collections.emptyList());
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationUpdatesPolicyAndFederatesQueueInsteadOfAddress() throws Exception {
        ProtonTestServer protonTestServer = new ProtonTestServer();
        try {
            protonTestServer.expectSASLAnonymousConnect();
            protonTestServer.expectOpen().respond();
            protonTestServer.expectBegin().respond();
            protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
            protonTestServer.expectFlow().withLinkCredit(10);
            protonTestServer.start();
            URI serverURI = protonTestServer.getServerURI();
            logger.info("Test started, peer listening on: {}", serverURI);
            AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
            aMQPFederationAddressPolicyElement.setName("address-policy");
            aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
            aMQPFederationAddressPolicyElement.setAutoDelete(true);
            aMQPFederationAddressPolicyElement.setAutoDeleteDelay(10000L);
            aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
            AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
            aMQPFederatedBrokerConnectionElement.setName(getTestName());
            aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
            AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
            aMQPBrokerConnectConfiguration.setReconnectAttempts(0);
            aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
            this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
            this.server.start();
            this.server.addAddressInfo(new AddressInfo(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME), RoutingType.MULTICAST));
            HashMap hashMap = new HashMap();
            hashMap.put("auto-delete", true);
            hashMap.put("auto-delete-delay", 10000L);
            hashMap.put("auto-delete-msg-count", -1L);
            protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
            protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString(AutoCreateJmsDestinationTest.QUEUE_NAME), CoreMatchers.containsString("address-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), hashMap).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER.toString()});
            protonTestServer.expectFlow().withLinkCredit(1000);
            Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                createSession.createConsumer(createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME));
                createSession.createConsumer(createSession.createQueue("queue"));
                createConnection.start();
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.expectDetach().optional();
                protonTestServer.expectClose().optional();
                protonTestServer.expectConnectionToDrop();
                protonTestServer.expectSASLAnonymousConnect();
                protonTestServer.expectOpen().respond();
                protonTestServer.expectBegin().respond();
                protonTestServer.expectAttach().ofSender().withDesiredCapability(AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_CONTROL_LINK.toString()});
                protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_EVENT_LINK.toString()).respondInKind();
                protonTestServer.expectFlow().withLinkCredit(10);
                protonTestServer.expectAttach().ofReceiver().withDesiredCapability(AMQPFederationConstants.FEDERATION_QUEUE_RECEIVER.toString()).withName(CoreMatchers.allOf(new Matcher[]{CoreMatchers.containsString(getTestName()), CoreMatchers.containsString("queue::queue"), CoreMatchers.containsString("queue-receiver"), CoreMatchers.containsString(this.server.getNodeID().toString())})).withProperty(AMQPFederationConstants.FEDERATION_RECEIVER_PRIORITY.toString(), -1).respond().withOfferedCapabilities(new String[]{AMQPFederationConstants.FEDERATION_QUEUE_RECEIVER.toString()});
                protonTestServer.expectFlow().withLinkCredit(1000);
                ProtonProtocolManagerFactory protonProtocolManagerFactory = (ProtonProtocolManagerFactory) this.server.getRemotingService().getProtocolFactoryMap().get("AMQP");
                Assertions.assertNotNull(protonProtocolManagerFactory);
                AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement2 = new AMQPFederationAddressPolicyElement();
                aMQPFederationAddressPolicyElement2.setName("address-policy");
                aMQPFederationAddressPolicyElement2.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
                aMQPFederationAddressPolicyElement2.setAutoDelete(true);
                aMQPFederationAddressPolicyElement2.setAutoDeleteDelay(10000L);
                aMQPFederationAddressPolicyElement2.setAutoDeleteMessageCount(-1L);
                AMQPFederationQueuePolicyElement aMQPFederationQueuePolicyElement = new AMQPFederationQueuePolicyElement();
                aMQPFederationQueuePolicyElement.setName("queue-policy");
                aMQPFederationQueuePolicyElement.addToIncludes("*", "queue");
                AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement2 = new AMQPFederatedBrokerConnectionElement();
                aMQPFederatedBrokerConnectionElement2.setName(getTestName());
                aMQPFederatedBrokerConnectionElement2.addLocalQueuePolicy(aMQPFederationQueuePolicyElement);
                AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration2 = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + serverURI.getHost() + ":" + serverURI.getPort());
                aMQPBrokerConnectConfiguration2.setReconnectAttempts(0);
                aMQPBrokerConnectConfiguration2.addElement(aMQPFederatedBrokerConnectionElement2);
                this.server.getConfiguration().getAMQPConnection().clear();
                this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration2);
                protonProtocolManagerFactory.updateProtocolServices(this.server, Collections.emptyList());
                protonTestServer.waitForScriptToComplete(5L, TimeUnit.SECONDS);
                protonTestServer.close();
                if (createConnection != null) {
                    createConnection.close();
                }
                protonTestServer.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                protonTestServer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Timeout(20)
    @Test
    public void testReloadAmqpConnectionAddressPolicyMatches() throws Exception {
        this.server.start();
        Path resolve = getTestDirfile().toPath().resolve("broker.xml");
        URL resource = RedeployTest.class.getClassLoader().getResource("reload-amqp-federated-addresses.xml");
        URL resource2 = RedeployTest.class.getClassLoader().getResource("reload-amqp-federated-addresses-reload.xml");
        Files.copy(resource.openStream(), resolve, new CopyOption[0]);
        EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
        embeddedActiveMQ.setConfigResourcePath(resolve.toUri().toString());
        embeddedActiveMQ.start();
        ReusableLatch reusableLatch = new ReusableLatch(1);
        Objects.requireNonNull(reusableLatch);
        Runnable runnable = reusableLatch::countDown;
        embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(runnable);
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61617");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672");
        try {
            Connection createConnection = createConnectionFactory.createConnection();
            try {
                Connection createConnection2 = createConnectionFactory2.createConnection();
                try {
                    Session createSession = createConnection.createSession(1);
                    Topic createTopic = createSession.createTopic("address1");
                    Topic createTopic2 = createSession.createTopic("address2");
                    MessageConsumer createConsumer = createSession.createConsumer(createTopic);
                    MessageConsumer createConsumer2 = createSession.createConsumer(createTopic2);
                    createConnection.start();
                    Session createSession2 = createConnection2.createSession(1);
                    MessageProducer createProducer = createSession2.createProducer(createTopic);
                    MessageProducer createProducer2 = createSession2.createProducer(createTopic2);
                    reusableLatch.await(10L, TimeUnit.SECONDS);
                    Wait.assertTrue(() -> {
                        return this.server.addressQuery(SimpleString.of("address1")).isExists();
                    });
                    TextMessage createTextMessage = createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME);
                    createProducer.send(createTextMessage);
                    createProducer2.send(createTextMessage);
                    Assertions.assertNotNull(createConsumer.receive(5000L));
                    Assertions.assertNull(createConsumer2.receiveNoWait());
                    Files.copy(resource2.openStream(), resolve, StandardCopyOption.REPLACE_EXISTING);
                    resolve.toFile().setLastModified(System.currentTimeMillis() + 1000);
                    reusableLatch.setCount(1);
                    embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(runnable);
                    reusableLatch.await(10L, TimeUnit.SECONDS);
                    Wait.assertTrue(() -> {
                        return this.server.addressQuery(SimpleString.of("address2")).isExists();
                    });
                    Wait.assertTrue(() -> {
                        return this.server.bindingQuery(SimpleString.of("address2")).getQueueNames().size() > 0;
                    });
                    createProducer.send(createTextMessage);
                    createProducer2.send(createTextMessage);
                    Assertions.assertNotNull(createConsumer.receive(5000L));
                    Assertions.assertNotNull(createConsumer2.receive(5000L));
                    if (createConnection2 != null) {
                        createConnection2.close();
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } catch (Throwable th) {
                    if (createConnection2 != null) {
                        try {
                            createConnection2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } finally {
            embeddedActiveMQ.stop();
        }
    }

    @Timeout(20)
    @Test
    public void testReloadAmqpConnectionQueuePolicyMatches() throws Exception {
        this.server.start();
        this.server.createQueue(QueueConfiguration.of("queue1").setRoutingType(RoutingType.ANYCAST).setAddress("queue1").setAutoCreated(false));
        this.server.createQueue(QueueConfiguration.of("queue2").setRoutingType(RoutingType.ANYCAST).setAddress("queue2").setAutoCreated(false));
        Path resolve = getTestDirfile().toPath().resolve("broker.xml");
        URL resource = RedeployTest.class.getClassLoader().getResource("reload-amqp-federated-queues.xml");
        URL resource2 = RedeployTest.class.getClassLoader().getResource("reload-amqp-federated-queues-reload.xml");
        Files.copy(resource.openStream(), resolve, new CopyOption[0]);
        EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
        embeddedActiveMQ.setConfigResourcePath(resolve.toUri().toString());
        embeddedActiveMQ.start();
        ReusableLatch reusableLatch = new ReusableLatch(1);
        Objects.requireNonNull(reusableLatch);
        Runnable runnable = reusableLatch::countDown;
        embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(runnable);
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61617");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672");
        reusableLatch.await(10L, TimeUnit.SECONDS);
        try {
            Connection createConnection = createConnectionFactory.createConnection();
            try {
                Connection createConnection2 = createConnectionFactory2.createConnection();
                try {
                    Session createSession = createConnection.createSession(1);
                    Queue createQueue = createSession.createQueue("queue1");
                    Queue createQueue2 = createSession.createQueue("queue2");
                    MessageConsumer createConsumer = createSession.createConsumer(createQueue);
                    MessageConsumer createConsumer2 = createSession.createConsumer(createQueue2);
                    createConnection.start();
                    Session createSession2 = createConnection2.createSession(1);
                    MessageProducer createProducer = createSession2.createProducer(createQueue);
                    MessageProducer createProducer2 = createSession2.createProducer(createQueue2);
                    Wait.assertTrue(() -> {
                        return this.server.queueQuery(SimpleString.of("queue1")).isExists();
                    });
                    TextMessage createTextMessage = createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME);
                    createProducer.send(createTextMessage);
                    createProducer2.send(createTextMessage);
                    Wait.assertTrue(() -> {
                        return embeddedActiveMQ.getActiveMQServer().queueQuery(SimpleString.of("queue1")).getMessageCount() == 1;
                    });
                    Assertions.assertNotNull(createConsumer.receiveNoWait());
                    Assertions.assertNull(createConsumer2.receiveNoWait());
                    Files.copy(resource2.openStream(), resolve, StandardCopyOption.REPLACE_EXISTING);
                    resolve.toFile().setLastModified(System.currentTimeMillis() + 1000);
                    reusableLatch.setCount(1);
                    embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(runnable);
                    reusableLatch.await(10L, TimeUnit.SECONDS);
                    Wait.assertTrue(() -> {
                        return this.server.queueQuery(SimpleString.of("queue2")).isExists();
                    });
                    createProducer.send(createTextMessage);
                    Wait.assertTrue(() -> {
                        return embeddedActiveMQ.getActiveMQServer().queueQuery(SimpleString.of("queue1")).getMessageCount() == 1;
                    });
                    Wait.assertTrue(() -> {
                        return embeddedActiveMQ.getActiveMQServer().queueQuery(SimpleString.of("queue2")).getMessageCount() == 1;
                    });
                    Assertions.assertNotNull(createConsumer.receiveNoWait());
                    Assertions.assertNotNull(createConsumer2.receiveNoWait());
                    if (createConnection2 != null) {
                        createConnection2.close();
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } catch (Throwable th) {
                    if (createConnection2 != null) {
                        try {
                            createConnection2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } finally {
            embeddedActiveMQ.stop();
        }
    }

    @Timeout(20)
    @Test
    public void testReloadAmqpConnectionAddressPolicyReplacedWithQueuePolicy() throws Exception {
        this.server.start();
        this.server.createQueue(QueueConfiguration.of("queue1").setRoutingType(RoutingType.ANYCAST).setAddress("queue1").setAutoCreated(false));
        Path resolve = getTestDirfile().toPath().resolve("broker.xml");
        URL resource = RedeployTest.class.getClassLoader().getResource("reload-amqp-federated-addresses.xml");
        URL resource2 = RedeployTest.class.getClassLoader().getResource("reload-amqp-federated-queues.xml");
        Files.copy(resource.openStream(), resolve, new CopyOption[0]);
        EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
        embeddedActiveMQ.setConfigResourcePath(resolve.toUri().toString());
        embeddedActiveMQ.start();
        ReusableLatch reusableLatch = new ReusableLatch(1);
        Objects.requireNonNull(reusableLatch);
        Runnable runnable = reusableLatch::countDown;
        embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(runnable);
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61617");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672");
        try {
            Connection createConnection = createConnectionFactory.createConnection();
            try {
                Connection createConnection2 = createConnectionFactory2.createConnection();
                try {
                    Session createSession = createConnection.createSession(1);
                    Topic createTopic = createSession.createTopic("address1");
                    Queue createQueue = createSession.createQueue("queue1");
                    MessageConsumer createConsumer = createSession.createConsumer(createTopic);
                    MessageConsumer createConsumer2 = createSession.createConsumer(createQueue);
                    createConnection.start();
                    Session createSession2 = createConnection2.createSession(1);
                    MessageProducer createProducer = createSession2.createProducer(createTopic);
                    MessageProducer createProducer2 = createSession2.createProducer(createQueue);
                    reusableLatch.await(10L, TimeUnit.SECONDS);
                    Wait.assertTrue(() -> {
                        return this.server.addressQuery(SimpleString.of("address1")).isExists();
                    });
                    Wait.assertTrue(() -> {
                        return this.server.queueQuery(SimpleString.of("queue1")).isExists();
                    });
                    TextMessage createTextMessage = createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME);
                    createProducer.send(createTextMessage);
                    createProducer2.send(createTextMessage);
                    Assertions.assertNotNull(createConsumer.receive(5000L));
                    Assertions.assertNull(createConsumer2.receiveNoWait());
                    Files.copy(resource2.openStream(), resolve, StandardCopyOption.REPLACE_EXISTING);
                    resolve.toFile().setLastModified(System.currentTimeMillis() + 1000);
                    reusableLatch.setCount(1);
                    embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(runnable);
                    reusableLatch.await(10L, TimeUnit.SECONDS);
                    Assertions.assertNotNull(createConsumer2.receive(5000L));
                    Wait.assertTrue(() -> {
                        return this.server.bindingQuery(SimpleString.of("address2")).getQueueNames().size() == 0;
                    });
                    if (createConnection2 != null) {
                        createConnection2.close();
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } catch (Throwable th) {
                    if (createConnection2 != null) {
                        try {
                            createConnection2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } finally {
            embeddedActiveMQ.stop();
        }
    }

    @Timeout(20)
    @Test
    public void testReloadAmqpConnectionQueuePolicyMatchesFromBrokerProperties() throws Exception {
        this.server.start();
        this.server.createQueue(QueueConfiguration.of("queue1").setRoutingType(RoutingType.ANYCAST).setAddress("queue1").setAutoCreated(false));
        this.server.createQueue(QueueConfiguration.of("queue2").setRoutingType(RoutingType.ANYCAST).setAddress("queue2").setAutoCreated(false));
        Path resolve = getTestDirfile().toPath().resolve("broker.xml");
        Path resolve2 = getTestDirfile().toPath().resolve("broker.properties");
        URL resource = RedeployTest.class.getClassLoader().getResource("reload-amqp-federated-basic.xml");
        URL resource2 = RedeployTest.class.getClassLoader().getResource("reload-amqp-federated-queues.properties");
        URL resource3 = RedeployTest.class.getClassLoader().getResource("reload-amqp-federated-queues-reload.properties");
        Files.copy(resource.openStream(), resolve, new CopyOption[0]);
        Files.copy(resource2.openStream(), resolve2, new CopyOption[0]);
        EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
        embeddedActiveMQ.setConfigResourcePath(resolve.toUri().toString());
        embeddedActiveMQ.setPropertiesResourcePath(resolve2.toString());
        embeddedActiveMQ.start();
        ReusableLatch reusableLatch = new ReusableLatch(1);
        Objects.requireNonNull(reusableLatch);
        Runnable runnable = reusableLatch::countDown;
        embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(runnable);
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61617");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672");
        reusableLatch.await(10L, TimeUnit.SECONDS);
        try {
            Connection createConnection = createConnectionFactory.createConnection();
            try {
                Connection createConnection2 = createConnectionFactory2.createConnection();
                try {
                    Session createSession = createConnection.createSession(1);
                    Queue createQueue = createSession.createQueue("queue1");
                    Queue createQueue2 = createSession.createQueue("queue2");
                    MessageConsumer createConsumer = createSession.createConsumer(createQueue);
                    MessageConsumer createConsumer2 = createSession.createConsumer(createQueue2);
                    createConnection.start();
                    Session createSession2 = createConnection2.createSession(1);
                    MessageProducer createProducer = createSession2.createProducer(createQueue);
                    MessageProducer createProducer2 = createSession2.createProducer(createQueue2);
                    Wait.assertTrue(() -> {
                        return this.server.queueQuery(SimpleString.of("queue1")).isExists();
                    });
                    Wait.assertTrue(() -> {
                        return this.server.queueQuery(SimpleString.of("queue2")).isExists();
                    });
                    TextMessage createTextMessage = createSession.createTextMessage(AutoCreateJmsDestinationTest.QUEUE_NAME);
                    createProducer.send(createTextMessage);
                    createProducer2.send(createTextMessage);
                    Wait.assertTrue(() -> {
                        return embeddedActiveMQ.getActiveMQServer().queueQuery(SimpleString.of("queue1")).getMessageCount() == 1;
                    });
                    Assertions.assertNotNull(createConsumer.receiveNoWait());
                    Assertions.assertNull(createConsumer2.receiveNoWait());
                    Files.copy(resource3.openStream(), resolve2, StandardCopyOption.REPLACE_EXISTING);
                    resolve.toFile().setLastModified(System.currentTimeMillis() + 1000);
                    reusableLatch.setCount(1);
                    embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(runnable);
                    reusableLatch.await(10L, TimeUnit.SECONDS);
                    Wait.assertTrue(() -> {
                        return this.server.queueQuery(SimpleString.of("queue1")).isExists();
                    });
                    Wait.assertTrue(() -> {
                        return this.server.queueQuery(SimpleString.of("queue2")).isExists();
                    });
                    createProducer.send(createTextMessage);
                    Wait.assertTrue(() -> {
                        return embeddedActiveMQ.getActiveMQServer().queueQuery(SimpleString.of("queue1")).getMessageCount() == 1;
                    });
                    Wait.assertTrue(() -> {
                        return embeddedActiveMQ.getActiveMQServer().queueQuery(SimpleString.of("queue2")).getMessageCount() == 1;
                    });
                    Assertions.assertNotNull(createConsumer.receiveNoWait());
                    Assertions.assertNotNull(createConsumer2.receiveNoWait());
                    if (createConnection2 != null) {
                        createConnection2.close();
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } catch (Throwable th) {
                    if (createConnection2 != null) {
                        try {
                            createConnection2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } finally {
            embeddedActiveMQ.stop();
        }
    }
}
