package org.apache.activemq.artemis.tests.integration.amqp.connect;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.ActiveMQServerAMQPFederationPlugin;
import org.apache.activemq.artemis.protocol.amqp.federation.Federation;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.integration.client.AutoCreateJmsDestinationTest;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationBrokerPliuginTest.class */
public class AMQPFederationBrokerPliuginTest extends AmqpClientTestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final int SERVER_PORT = 5672;
    private static final int SERVER_PORT_REMOTE = 5673;
    protected ActiveMQServer remoteServer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationBrokerPliuginTest$AMQPTestFederationBrokerPlugin.class */
    public class AMQPTestFederationBrokerPlugin implements ActiveMQServerAMQPFederationPlugin {
        public final AtomicBoolean started = new AtomicBoolean();
        public final AtomicBoolean stopped = new AtomicBoolean();
        public final AtomicReference<FederationConsumerInfo> beforeCreateConsumerCapture = new AtomicReference<>();
        public final AtomicReference<FederationConsumer> afterCreateConsumerCapture = new AtomicReference<>();
        public final AtomicReference<FederationConsumer> beforeCloseConsumerCapture = new AtomicReference<>();
        public final AtomicReference<FederationConsumer> afterCloseConsumerCapture = new AtomicReference<>();
        public Consumer<FederationConsumerInfo> beforeCreateConsumer = federationConsumerInfo -> {
            this.beforeCreateConsumerCapture.set(federationConsumerInfo);
        };
        public Consumer<FederationConsumer> afterCreateConsumer = federationConsumer -> {
            this.afterCreateConsumerCapture.set(federationConsumer);
        };
        public Consumer<FederationConsumer> beforeCloseConsumer = federationConsumer -> {
            this.beforeCloseConsumerCapture.set(federationConsumer);
        };
        public Consumer<FederationConsumer> afterCloseConsumer = federationConsumer -> {
            this.afterCloseConsumerCapture.set(federationConsumer);
        };
        public BiConsumer<FederationConsumer, Message> beforeMessageHandled = (federationConsumer, message) -> {
        };
        public BiConsumer<FederationConsumer, Message> afterMessageHandled = (federationConsumer, message) -> {
        };
        public Function<AddressInfo, Boolean> shouldCreateConsumerForAddress = addressInfo -> {
            return true;
        };
        public Function<Queue, Boolean> shouldCreateConsumerForQueue = queue -> {
            return true;
        };
        public BiFunction<Divert, Queue, Boolean> shouldCreateConsumerForDivert = (divert, queue) -> {
            return true;
        };

        private AMQPTestFederationBrokerPlugin() {
        }

        public void federationStarted(Federation federation) throws ActiveMQException {
            this.started.set(true);
        }

        public void federationStopped(Federation federation) throws ActiveMQException {
            this.stopped.set(true);
        }

        public void beforeCreateFederationConsumer(FederationConsumerInfo federationConsumerInfo) throws ActiveMQException {
            this.beforeCreateConsumer.accept(federationConsumerInfo);
        }

        public void afterCreateFederationConsumer(FederationConsumer federationConsumer) throws ActiveMQException {
            this.afterCreateConsumer.accept(federationConsumer);
        }

        public void beforeCloseFederationConsumer(FederationConsumer federationConsumer) throws ActiveMQException {
            this.beforeCloseConsumer.accept(federationConsumer);
        }

        public void afterCloseFederationConsumer(FederationConsumer federationConsumer) throws ActiveMQException {
            this.afterCloseConsumer.accept(federationConsumer);
        }

        public void beforeFederationConsumerMessageHandled(FederationConsumer federationConsumer, Message message) throws ActiveMQException {
            this.beforeMessageHandled.accept(federationConsumer, message);
        }

        public void afterFederationConsumerMessageHandled(FederationConsumer federationConsumer, Message message) throws ActiveMQException {
            this.afterMessageHandled.accept(federationConsumer, message);
        }

        public boolean shouldCreateFederationConsumerForAddress(AddressInfo addressInfo) throws ActiveMQException {
            return this.shouldCreateConsumerForAddress.apply(addressInfo).booleanValue();
        }

        public boolean shouldCreateFederationConsumerForQueue(Queue queue) throws ActiveMQException {
            return this.shouldCreateConsumerForQueue.apply(queue).booleanValue();
        }

        public boolean shouldCreateFederationConsumerForDivert(Divert divert, Queue queue) throws ActiveMQException {
            return this.shouldCreateConsumerForDivert.apply(divert, queue).booleanValue();
        }
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    protected String getConfiguredProtocols() {
        return "AMQP,CORE";
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    protected ActiveMQServer createServer() throws Exception {
        this.remoteServer = createServer(SERVER_PORT_REMOTE, false);
        return createServer(5672, false);
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport, org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport, org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @AfterEach
    public void tearDown() throws Exception {
        super.tearDown();
        try {
            if (this.remoteServer != null) {
                this.remoteServer.stop();
            }
        } catch (Exception e) {
        }
    }

    @Timeout(20)
    @Test
    public void testFederationBrokerPluginWithAddressPolicyConfigured() throws Exception {
        logger.info("Test started: {}", getTestName());
        AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
        aMQPFederationAddressPolicyElement.setName("test-policy");
        aMQPFederationAddressPolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME);
        aMQPFederationAddressPolicyElement.setAutoDelete(false);
        aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
        aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
        AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
        aMQPFederatedBrokerConnectionElement.setName(getTestName());
        aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:5673");
        aMQPBrokerConnectConfiguration.setReconnectAttempts(10);
        aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
        AMQPTestFederationBrokerPlugin aMQPTestFederationBrokerPlugin = new AMQPTestFederationBrokerPlugin();
        this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.remoteServer.start();
        this.server.registerBrokerPlugin(aMQPTestFederationBrokerPlugin);
        this.server.start();
        Wait.assertTrue(() -> {
            return aMQPTestFederationBrokerPlugin.started.get();
        });
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Connection createConnection2 = createConnectionFactory2.createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                Session createSession2 = createConnection2.createSession(1);
                Topic createTopic = createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME);
                MessageConsumer createConsumer = createSession.createConsumer(createTopic);
                createConnection.start();
                createConnection2.start();
                Wait.assertTrue(() -> {
                    return this.server.addressQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME)).isExists();
                });
                Wait.assertTrue(() -> {
                    return this.remoteServer.addressQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME)).isExists();
                });
                Wait.assertTrue(() -> {
                    return aMQPTestFederationBrokerPlugin.beforeCreateConsumerCapture.get() != null;
                });
                Wait.assertTrue(() -> {
                    return aMQPTestFederationBrokerPlugin.afterCreateConsumerCapture.get() != null;
                });
                MessageProducer createProducer = createSession2.createProducer(createTopic);
                TextMessage createTextMessage = createSession2.createTextMessage("Hello World");
                AtomicReference atomicReference = new AtomicReference();
                AtomicReference atomicReference2 = new AtomicReference();
                aMQPTestFederationBrokerPlugin.beforeMessageHandled = (federationConsumer, message) -> {
                    atomicReference.set(message);
                };
                aMQPTestFederationBrokerPlugin.afterMessageHandled = (federationConsumer2, message2) -> {
                    atomicReference2.set(message2);
                };
                createProducer.send(createTextMessage);
                Wait.assertTrue(() -> {
                    return atomicReference.get() != null;
                });
                Wait.assertTrue(() -> {
                    return atomicReference2.get() != null;
                });
                Assertions.assertSame(atomicReference.get(), atomicReference2.get());
                jakarta.jms.Message receive = createConsumer.receive(5000L);
                createConsumer.close();
                Wait.assertTrue(() -> {
                    return aMQPTestFederationBrokerPlugin.beforeCloseConsumerCapture.get() != null;
                });
                Wait.assertTrue(() -> {
                    return aMQPTestFederationBrokerPlugin.afterCloseConsumerCapture.get() != null;
                });
                Assertions.assertNotNull(receive);
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
                this.server.stop();
                Wait.assertTrue(() -> {
                    return aMQPTestFederationBrokerPlugin.stopped.get();
                });
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Timeout(20)
    @Test
    public void testFederationBrokerPluginWithQueuePolicyConfigured() throws Exception {
        logger.info("Test started: {}", getTestName());
        AMQPFederationQueuePolicyElement aMQPFederationQueuePolicyElement = new AMQPFederationQueuePolicyElement();
        aMQPFederationQueuePolicyElement.setName("test-policy");
        aMQPFederationQueuePolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME, AutoCreateJmsDestinationTest.QUEUE_NAME);
        AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
        aMQPFederatedBrokerConnectionElement.setName(getTestName());
        aMQPFederatedBrokerConnectionElement.addLocalQueuePolicy(aMQPFederationQueuePolicyElement);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:5673");
        aMQPBrokerConnectConfiguration.setReconnectAttempts(10);
        aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
        AMQPTestFederationBrokerPlugin aMQPTestFederationBrokerPlugin = new AMQPTestFederationBrokerPlugin();
        this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.remoteServer.start();
        this.remoteServer.createQueue(QueueConfiguration.of(AutoCreateJmsDestinationTest.QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).setAutoCreated(false));
        this.server.registerBrokerPlugin(aMQPTestFederationBrokerPlugin);
        this.server.start();
        Wait.assertTrue(() -> {
            return aMQPTestFederationBrokerPlugin.started.get();
        });
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Connection createConnection2 = createConnectionFactory2.createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                Session createSession2 = createConnection2.createSession(1);
                jakarta.jms.Queue createQueue = createSession.createQueue(AutoCreateJmsDestinationTest.QUEUE_NAME);
                MessageConsumer createConsumer = createSession.createConsumer(createQueue);
                createConnection.start();
                createConnection2.start();
                Wait.assertTrue(() -> {
                    return this.server.queueQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME)).isExists();
                });
                Wait.assertTrue(() -> {
                    return aMQPTestFederationBrokerPlugin.beforeCreateConsumerCapture.get() != null;
                });
                Wait.assertTrue(() -> {
                    return aMQPTestFederationBrokerPlugin.afterCreateConsumerCapture.get() != null;
                });
                MessageProducer createProducer = createSession2.createProducer(createQueue);
                TextMessage createTextMessage = createSession2.createTextMessage("Hello World");
                AtomicReference atomicReference = new AtomicReference();
                AtomicReference atomicReference2 = new AtomicReference();
                aMQPTestFederationBrokerPlugin.beforeMessageHandled = (federationConsumer, message) -> {
                    atomicReference.set(message);
                };
                aMQPTestFederationBrokerPlugin.afterMessageHandled = (federationConsumer2, message2) -> {
                    atomicReference2.set(message2);
                };
                createProducer.send(createTextMessage);
                Wait.assertTrue(() -> {
                    return atomicReference.get() != null;
                });
                Wait.assertTrue(() -> {
                    return atomicReference2.get() != null;
                });
                Assertions.assertSame(atomicReference.get(), atomicReference2.get());
                jakarta.jms.Message receive = createConsumer.receive(5000L);
                createConsumer.close();
                Wait.assertTrue(() -> {
                    return aMQPTestFederationBrokerPlugin.beforeCloseConsumerCapture.get() != null;
                });
                Wait.assertTrue(() -> {
                    return aMQPTestFederationBrokerPlugin.afterCloseConsumerCapture.get() != null;
                });
                Assertions.assertNotNull(receive);
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
                this.server.stop();
                Wait.assertTrue(() -> {
                    return aMQPTestFederationBrokerPlugin.stopped.get();
                });
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Timeout(20)
    @Test
    public void testPluginCanBlockAddressFederationConsumerCreate() throws Exception {
        logger.info("Test started: {}", getTestName());
        AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
        aMQPFederationAddressPolicyElement.setName("test-policy");
        aMQPFederationAddressPolicyElement.addToIncludes(getTestName());
        aMQPFederationAddressPolicyElement.setAutoDelete(false);
        aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
        aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
        AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
        aMQPFederatedBrokerConnectionElement.setName(getTestName());
        aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:5673");
        aMQPBrokerConnectConfiguration.setReconnectAttempts(10);
        aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
        AMQPTestFederationBrokerPlugin aMQPTestFederationBrokerPlugin = new AMQPTestFederationBrokerPlugin();
        aMQPTestFederationBrokerPlugin.shouldCreateConsumerForAddress = addressInfo -> {
            return false;
        };
        this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.remoteServer.start();
        this.server.registerBrokerPlugin(aMQPTestFederationBrokerPlugin);
        this.server.start();
        Wait.assertTrue(() -> {
            return aMQPTestFederationBrokerPlugin.started.get();
        });
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Connection createConnection2 = createConnectionFactory2.createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                Session createSession2 = createConnection2.createSession(1);
                Topic createTopic = createSession.createTopic(AutoCreateJmsDestinationTest.QUEUE_NAME);
                MessageConsumer createConsumer = createSession.createConsumer(createTopic);
                createConnection.start();
                createConnection2.start();
                Wait.assertTrue(() -> {
                    return this.server.addressQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME)).isExists();
                });
                MessageProducer createProducer = createSession2.createProducer(createTopic);
                TextMessage createTextMessage = createSession2.createTextMessage("Hello World");
                AtomicReference atomicReference = new AtomicReference();
                aMQPTestFederationBrokerPlugin.beforeMessageHandled = (federationConsumer, message) -> {
                    atomicReference.set(message);
                };
                createProducer.send(createTextMessage);
                Assertions.assertNull(createConsumer.receiveNoWait());
                Assertions.assertNull(atomicReference.get());
                createConsumer.close();
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
                this.server.stop();
                Wait.assertTrue(() -> {
                    return aMQPTestFederationBrokerPlugin.stopped.get();
                });
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Timeout(20)
    @Test
    public void testPluginCanBlockQueueFederationConsumerCreate() throws Exception {
        logger.info("Test started: {}", getTestName());
        AMQPFederationQueuePolicyElement aMQPFederationQueuePolicyElement = new AMQPFederationQueuePolicyElement();
        aMQPFederationQueuePolicyElement.setName("test-policy");
        aMQPFederationQueuePolicyElement.addToIncludes(AutoCreateJmsDestinationTest.QUEUE_NAME, AutoCreateJmsDestinationTest.QUEUE_NAME);
        AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
        aMQPFederatedBrokerConnectionElement.setName(getTestName());
        aMQPFederatedBrokerConnectionElement.addLocalQueuePolicy(aMQPFederationQueuePolicyElement);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:5673");
        aMQPBrokerConnectConfiguration.setReconnectAttempts(10);
        aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
        AMQPTestFederationBrokerPlugin aMQPTestFederationBrokerPlugin = new AMQPTestFederationBrokerPlugin();
        aMQPTestFederationBrokerPlugin.shouldCreateConsumerForQueue = queue -> {
            return false;
        };
        this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.remoteServer.start();
        this.remoteServer.createQueue(QueueConfiguration.of(AutoCreateJmsDestinationTest.QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setAddress(AutoCreateJmsDestinationTest.QUEUE_NAME).setAutoCreated(false));
        this.server.registerBrokerPlugin(aMQPTestFederationBrokerPlugin);
        this.server.start();
        Wait.assertTrue(() -> {
            return aMQPTestFederationBrokerPlugin.started.get();
        });
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Connection createConnection2 = createConnectionFactory2.createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                Session createSession2 = createConnection2.createSession(1);
                jakarta.jms.Queue createQueue = createSession.createQueue(AutoCreateJmsDestinationTest.QUEUE_NAME);
                MessageConsumer createConsumer = createSession.createConsumer(createQueue);
                createConnection.start();
                createConnection2.start();
                Wait.assertTrue(() -> {
                    return this.server.queueQuery(SimpleString.of(AutoCreateJmsDestinationTest.QUEUE_NAME)).isExists();
                });
                MessageProducer createProducer = createSession2.createProducer(createQueue);
                TextMessage createTextMessage = createSession2.createTextMessage("Hello World");
                AtomicReference atomicReference = new AtomicReference();
                aMQPTestFederationBrokerPlugin.beforeMessageHandled = (federationConsumer, message) -> {
                    atomicReference.set(message);
                };
                createProducer.send(createTextMessage);
                Assertions.assertNull(createConsumer.receiveNoWait());
                Assertions.assertNull(atomicReference.get());
                createConsumer.close();
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
                this.server.stop();
                Wait.assertTrue(() -> {
                    return aMQPTestFederationBrokerPlugin.stopped.get();
                });
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Timeout(20)
    @Test
    public void testPluginCanBlockAddressFederationWhenDemandOnDivertIsAdded() throws Exception {
        logger.info("Test started: {}", getTestName());
        AMQPFederationAddressPolicyElement aMQPFederationAddressPolicyElement = new AMQPFederationAddressPolicyElement();
        aMQPFederationAddressPolicyElement.setName("test-policy");
        aMQPFederationAddressPolicyElement.addToIncludes("source");
        aMQPFederationAddressPolicyElement.setAutoDelete(false);
        aMQPFederationAddressPolicyElement.setAutoDeleteDelay(-1L);
        aMQPFederationAddressPolicyElement.setAutoDeleteMessageCount(-1L);
        aMQPFederationAddressPolicyElement.setEnableDivertBindings(true);
        AMQPFederatedBrokerConnectionElement aMQPFederatedBrokerConnectionElement = new AMQPFederatedBrokerConnectionElement();
        aMQPFederatedBrokerConnectionElement.setName(getTestName());
        aMQPFederatedBrokerConnectionElement.addLocalAddressPolicy(aMQPFederationAddressPolicyElement);
        AMQPBrokerConnectConfiguration aMQPBrokerConnectConfiguration = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:5673");
        aMQPBrokerConnectConfiguration.setReconnectAttempts(10);
        aMQPBrokerConnectConfiguration.addElement(aMQPFederatedBrokerConnectionElement);
        DivertConfiguration divertConfiguration = new DivertConfiguration();
        divertConfiguration.setName("test-divert");
        divertConfiguration.setAddress("source");
        divertConfiguration.setForwardingAddress("target");
        divertConfiguration.setRoutingType(ComponentConfigurationRoutingType.MULTICAST);
        AMQPTestFederationBrokerPlugin aMQPTestFederationBrokerPlugin = new AMQPTestFederationBrokerPlugin();
        aMQPTestFederationBrokerPlugin.shouldCreateConsumerForDivert = (divert, queue) -> {
            return false;
        };
        this.server.getConfiguration().addAMQPConnection(aMQPBrokerConnectConfiguration);
        this.remoteServer.start();
        this.server.registerBrokerPlugin(aMQPTestFederationBrokerPlugin);
        this.server.start();
        this.server.deployDivert(divertConfiguration);
        this.server.addAddressInfo(new AddressInfo(SimpleString.of("source"), RoutingType.MULTICAST));
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Connection createConnection2 = createConnectionFactory2.createConnection();
            try {
                Session createSession = createConnection.createSession(1);
                Session createSession2 = createConnection2.createSession(1);
                Topic createTopic = createSession.createTopic("target");
                Topic createTopic2 = createSession.createTopic("source");
                MessageConsumer createConsumer = createSession.createConsumer(createTopic);
                createConnection.start();
                createConnection2.start();
                MessageProducer createProducer = createSession2.createProducer(createTopic2);
                TextMessage createTextMessage = createSession2.createTextMessage("Hello World");
                AtomicReference atomicReference = new AtomicReference();
                aMQPTestFederationBrokerPlugin.beforeMessageHandled = (federationConsumer, message) -> {
                    atomicReference.set(message);
                };
                createProducer.send(createTextMessage);
                Assertions.assertNull(createConsumer.receiveNoWait());
                Assertions.assertNull(atomicReference.get());
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
                this.server.stop();
                Wait.assertTrue(() -> {
                    return aMQPTestFederationBrokerPlugin.stopped.get();
                });
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }
}
