package org.apache.activemq.artemis.tests.integration.federation;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.Collections;
import java.util.Objects;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationQueuePolicyConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationUpstreamConfiguration;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest.class */
public class FederatedQueueTest extends FederatedTestBase {

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/federation/FederatedQueueTest$TestTransformer.class */
    public static class TestTransformer implements Transformer {
        static String TEST_PROPERTY = "transformed";

        public Message transform(Message message) {
            message.putBooleanProperty(TEST_PROPERTY, true);
            return message;
        }
    }

    @Override // org.apache.activemq.artemis.tests.integration.federation.FederatedTestBase, org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
    }

    @Override // org.apache.activemq.artemis.tests.integration.federation.FederatedTestBase
    protected void configureQueues(ActiveMQServer activeMQServer) throws Exception {
        activeMQServer.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false).setDefaultConsumerWindowSize(-1));
        createSimpleQueue(activeMQServer, getName());
    }

    protected ConnectionFactory getCF(int i) throws Exception {
        return new ActiveMQConnectionFactory("vm://" + i);
    }

    @Test
    public void testFederatedQueueRemoteConsumeUpstream() throws Exception {
        String name = getName();
        getServer(0).getConfiguration().getFederationConfigurations().add(FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", name));
        getServer(0).getFederationManager().deploy();
        testFederatedQueueRemoteConsume(name);
    }

    @Test
    public void testMultipleFederatedQueueRemoteConsumersUpstream() throws Exception {
        ((AddressSettings) getServer(0).getAddressSettingsRepository().getMatch("#")).setAutoCreateAddresses(true).setAutoCreateQueues(true);
        ((AddressSettings) getServer(1).getAddressSettingsRepository().getMatch("#")).setAutoCreateAddresses(true).setAutoCreateQueues(true);
        getServer(1).createQueue(QueueConfiguration.of("Test.Q.1").setRoutingType(RoutingType.ANYCAST));
        getServer(1).createQueue(QueueConfiguration.of("Test.Q.2").setRoutingType(RoutingType.ANYCAST));
        getServer(0).getConfiguration().getFederationConfigurations().add(new FederationConfiguration().setName("default").addFederationPolicy(new FederationQueuePolicyConfiguration().setName("myQueuePolicy").addInclude(new FederationQueuePolicyConfiguration.Matcher().setQueueMatch("#").setAddressMatch("Test.#"))).addUpstreamConfiguration(new FederationUpstreamConfiguration().setName("server1-upstream").addPolicyRef("myQueuePolicy").setStaticConnectors(Collections.singletonList("server1"))));
        getServer(0).getFederationManager().deploy();
        ConnectionFactory cf = getCF(0);
        ConnectionFactory cf2 = getCF(0);
        ConnectionFactory cf3 = getCF(1);
        Connection createConnection = cf.createConnection();
        try {
            Connection createConnection2 = cf2.createConnection();
            try {
                Connection createConnection3 = cf3.createConnection();
                try {
                    createConnection.start();
                    Session createSession = createConnection.createSession();
                    createSession.createConsumer(createSession.createQueue("Test.Q.1"));
                    createConnection2.start();
                    Session createSession2 = createConnection2.createSession();
                    Queue createQueue = createSession2.createQueue("Test.Q.2");
                    MessageConsumer createConsumer = createSession2.createConsumer(createQueue);
                    Session createSession3 = createConnection3.createSession();
                    MessageProducer createProducer = createSession3.createProducer(createQueue);
                    createProducer.send(createSession3.createTextMessage("hello"));
                    Assertions.assertNotNull(createConsumer.receive(1000L));
                    createConnection.close();
                    createProducer.send(createSession3.createTextMessage("hello"));
                    Assertions.assertNotNull(createConsumer.receive(1000L));
                    if (createConnection3 != null) {
                        createConnection3.close();
                    }
                    if (createConnection2 != null) {
                        createConnection2.close();
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } catch (Throwable th) {
                    if (createConnection3 != null) {
                        try {
                            createConnection3.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        } catch (Throwable th5) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th6) {
                    th5.addSuppressed(th6);
                }
            }
            throw th5;
        }
    }

    @Test
    public void testFederatedQueueRemoteConsumeUpstreamPriorityAdjustment() throws Exception {
        String name = getName();
        FederationConfiguration createQueueUpstreamFederationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", name);
        ((FederationQueuePolicyConfiguration) createQueueUpstreamFederationConfiguration.getFederationPolicyMap().get("QueuePolicy" + name)).setPriorityAdjustment(1);
        getServer(0).getConfiguration().getFederationConfigurations().add(createQueueUpstreamFederationConfiguration);
        getServer(0).getFederationManager().deploy();
        testFederatedQueueRemoteConsumeUpstreamPriorityAdjustment(name);
    }

    @Test
    public void testFederatedQueueRemoteConsumeDownstreamPriorityAdjustment() throws Exception {
        String name = getName();
        FederationConfiguration createQueueDownstreamFederationConfiguration = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server0", name, "server1");
        ((FederationQueuePolicyConfiguration) createQueueDownstreamFederationConfiguration.getFederationPolicyMap().get("QueuePolicy" + name)).setPriorityAdjustment(1);
        getServer(1).getConfiguration().getFederationConfigurations().add(createQueueDownstreamFederationConfiguration);
        getServer(1).getFederationManager().deploy();
        testFederatedQueueRemoteConsumeUpstreamPriorityAdjustment(name);
    }

    private void testFederatedQueueRemoteConsumeUpstreamPriorityAdjustment(String str) throws Exception {
        ConnectionFactory cf = getCF(1);
        ConnectionFactory cf2 = getCF(0);
        Connection createConnection = cf.createConnection();
        try {
            Connection createConnection2 = cf2.createConnection();
            try {
                createConnection2.start();
                createConnection.start();
                Session createSession = createConnection2.createSession();
                Session createSession2 = createConnection.createSession();
                Queue createQueue = createSession.createQueue(str);
                Queue createQueue2 = createSession2.createQueue(str);
                MessageConsumer createConsumer = createSession2.createConsumer(createQueue2);
                Wait.waitFor(() -> {
                    return getConsumerCount(getServer(1), str, 1);
                });
                MessageConsumer createConsumer2 = createSession.createConsumer(createQueue);
                Wait.waitFor(() -> {
                    return getConsumerCount(getServer(1), str, 2);
                });
                createSession2.createProducer(createQueue2).send(createSession2.createTextMessage("hello"));
                Assertions.assertNull(createConsumer.receiveNoWait());
                Assertions.assertNotNull(createConsumer2.receive(1000L));
                createConsumer2.close();
                createConsumer.close();
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    private void verifyTransformer(String str) throws Exception {
        ConnectionFactory cf = getCF(1);
        ConnectionFactory cf2 = getCF(0);
        Connection createConnection = cf.createConnection();
        try {
            Connection createConnection2 = cf2.createConnection();
            try {
                createConnection.start();
                Session createSession = createConnection.createSession();
                createSession.createProducer(createSession.createQueue(str)).send(createSession.createTextMessage("hello"));
                createConnection2.start();
                Session createSession2 = createConnection2.createSession();
                jakarta.jms.Message receive = createSession2.createConsumer(createSession2.createQueue(str)).receive(1000L);
                Assertions.assertNotNull(receive);
                Assertions.assertEquals(Boolean.valueOf(receive.getBooleanProperty(TestTransformer.TEST_PROPERTY)), true);
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void testFederatedQueueRemoteConsumeUpstreamTransformer() throws Exception {
        String name = getName();
        FederationConfiguration createQueueUpstreamFederationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", name);
        FederatedTestUtil.addQueueTransformerConfiguration(createQueueUpstreamFederationConfiguration, name);
        getServer(0).getConfiguration().getFederationConfigurations().add(createQueueUpstreamFederationConfiguration);
        getServer(0).getFederationManager().deploy();
        verifyTransformer(name);
    }

    @Test
    public void testFederatedQueueRemoteConsumeDownstream() throws Exception {
        String name = getName();
        getServer(1).getConfiguration().getFederationConfigurations().add(FederatedTestUtil.createQueueDownstreamFederationConfiguration("server0", name, "server1"));
        getServer(1).getFederationManager().deploy();
        testFederatedQueueRemoteConsume(name);
    }

    @Test
    public void testFederatedQueueRemoteConsumeDownstreamTransformer() throws Exception {
        String name = getName();
        FederationConfiguration createQueueDownstreamFederationConfiguration = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server0", name, "server1");
        FederatedTestUtil.addQueueTransformerConfiguration(createQueueDownstreamFederationConfiguration, name);
        getServer(1).getConfiguration().getFederationConfigurations().add(createQueueDownstreamFederationConfiguration);
        getServer(1).getFederationManager().deploy();
        verifyTransformer(name);
    }

    private void testFederatedQueueRemoteConsume(String str) throws Exception {
        ConnectionFactory cf = getCF(1);
        ConnectionFactory cf2 = getCF(0);
        Connection createConnection = cf.createConnection();
        try {
            Connection createConnection2 = cf2.createConnection();
            try {
                createConnection.start();
                Session createSession = createConnection.createSession();
                Queue createQueue = createSession.createQueue(str);
                MessageProducer createProducer = createSession.createProducer(createQueue);
                createProducer.send(createSession.createTextMessage("hello"));
                createConnection2.start();
                Session createSession2 = createConnection2.createSession();
                MessageConsumer createConsumer = createSession2.createConsumer(createSession2.createQueue(str));
                Assertions.assertNotNull(createConsumer.receive(1000L));
                createProducer.send(createSession.createTextMessage("hello"));
                Assertions.assertNotNull(createConsumer.receive(1000L));
                MessageConsumer createConsumer2 = createSession.createConsumer(createQueue);
                createProducer.send(createSession.createTextMessage("hello"));
                Assertions.assertNotNull(createConsumer2.receive(1000L));
                Assertions.assertNull(createConsumer.receiveNoWait());
                createConsumer2.close();
                createProducer.send(createSession.createTextMessage("hello"));
                Assertions.assertNotNull(createConsumer.receive(1000L));
                createProducer.send(createTextMessage(createSession, "groupA"));
                Assertions.assertNotNull(createConsumer.receive(1000L));
                MessageConsumer createConsumer3 = createSession.createConsumer(createQueue);
                createProducer.send(createTextMessage(createSession, "groupA"));
                Assertions.assertNull(createConsumer3.receiveNoWait());
                Assertions.assertNotNull(createConsumer.receive(1000L));
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void testWithLargeMessage() throws Exception {
        internalTestWithLargeMessages(1);
    }

    @Test
    public void testWithMultipleLargeMessages() throws Exception {
        internalTestWithLargeMessages(5);
    }

    private void internalTestWithLargeMessages(int i) throws Exception {
        String name = getName();
        getServer(0).getConfiguration().getFederationConfigurations().add(FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", name));
        getServer(0).getFederationManager().deploy();
        ConnectionFactory cf = getCF(1);
        ConnectionFactory cf2 = getCF(0);
        String replace = new String(new byte[1048576]).replace((char) 0, '+');
        Connection createConnection = cf.createConnection();
        try {
            Connection createConnection2 = cf2.createConnection();
            try {
                createConnection.start();
                Session createSession = createConnection.createSession();
                MessageProducer createProducer = createSession.createProducer(createSession.createQueue(name));
                for (int i2 = 0; i2 < i; i2++) {
                    createProducer.send(createSession.createTextMessage(replace));
                }
                createConnection2.start();
                Session createSession2 = createConnection2.createSession();
                MessageConsumer createConsumer = createSession2.createConsumer(createSession2.createQueue(name));
                for (int i3 = 0; i3 < i; i3++) {
                    Assertions.assertNotNull(createConsumer.receive(1000L));
                }
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void testFederatedQueueRemoteConsumeDeployAfterConsumersExist() throws Exception {
        String name = getName();
        ConnectionFactory cf = getCF(0);
        ConnectionFactory cf2 = getCF(1);
        Connection createConnection = cf.createConnection();
        try {
            Connection createConnection2 = cf2.createConnection();
            try {
                createConnection2.start();
                Session createSession = createConnection2.createSession();
                MessageProducer createProducer = createSession.createProducer(createSession.createQueue(name));
                createProducer.send(createSession.createTextMessage("hello"));
                createConnection.start();
                Session createSession2 = createConnection.createSession();
                MessageConsumer createConsumer = createSession2.createConsumer(createSession2.createQueue(name));
                Assertions.assertNull(createConsumer.receiveNoWait());
                getServer(0).getConfiguration().getFederationConfigurations().add(FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", name));
                getServer(0).getFederationManager().deploy();
                createProducer.send(createSession.createTextMessage("hello"));
                Assertions.assertNotNull(createConsumer.receive(10000L));
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void testFederatedQueueBiDirectionalUpstream() throws Exception {
        String name = getName();
        getServer(0).getConfiguration().getFederationConfigurations().add(FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", name));
        getServer(0).getFederationManager().deploy();
        getServer(1).getConfiguration().getFederationConfigurations().add(FederatedTestUtil.createQueueUpstreamFederationConfiguration("server0", name));
        getServer(1).getFederationManager().deploy();
        testFederatedQueueBiDirectional(name, false);
    }

    @Test
    public void testFederatedQueueBiDirectionalDownstream() throws Exception {
        String name = getName();
        getServer(0).getConfiguration().getFederationConfigurations().add(FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1", name, "server0"));
        getServer(0).getFederationManager().deploy();
        getServer(1).getConfiguration().getFederationConfigurations().add(FederatedTestUtil.createQueueDownstreamFederationConfiguration("server0", name, "server1"));
        getServer(1).getFederationManager().deploy();
        testFederatedQueueBiDirectional(name, false);
    }

    @Test
    public void testFederatedQueueBiDirectionalDownstreamUpstream() throws Exception {
        String name = getName();
        FederationConfiguration createQueueDownstreamFederationConfiguration = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream", "server1", name, null, false, "server0");
        createQueueDownstreamFederationConfiguration.addUpstreamConfiguration(FederatedTestUtil.createQueueFederationUpstream("server1", name));
        getServer(0).getConfiguration().getFederationConfigurations().add(createQueueDownstreamFederationConfiguration);
        getServer(0).getFederationManager().deploy();
        testFederatedQueueBiDirectional(name, false);
    }

    @Test
    public void testFederatedQueueBiDirectionalDownstreamUpstreamSharedConnection() throws Exception {
        String name = getName();
        FederationConfiguration createQueueDownstreamFederationConfiguration = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream", "server1", name, null, true, "server0");
        FederationUpstreamConfiguration createQueueFederationUpstream = FederatedTestUtil.createQueueFederationUpstream("server1", name);
        createQueueFederationUpstream.getConnectionConfiguration().setShareConnection(true);
        createQueueDownstreamFederationConfiguration.addUpstreamConfiguration(createQueueFederationUpstream);
        getServer(0).getConfiguration().getFederationConfigurations().add(createQueueDownstreamFederationConfiguration);
        getServer(0).getFederationManager().deploy();
        testFederatedQueueBiDirectional(name, true);
    }

    @Test
    public void testFederatedQueueShareUpstreamConnectionFalse() throws Exception {
        String name = getName();
        FederationConfiguration createQueueDownstreamFederationConfiguration = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream", "server1", name, null, false, "server0");
        createQueueDownstreamFederationConfiguration.addUpstreamConfiguration(FederatedTestUtil.createQueueFederationUpstream("server1", name));
        getServer(0).getConfiguration().getFederationConfigurations().add(createQueueDownstreamFederationConfiguration);
        getServer(0).getFederationManager().deploy();
        testFederatedQueueShareUpstreamConnection(name, 2, 3);
    }

    @Test
    public void testFederatedQueueShareUpstreamConnectionTrue() throws Exception {
        String name = getName();
        FederationConfiguration createQueueDownstreamFederationConfiguration = FederatedTestUtil.createQueueDownstreamFederationConfiguration("server1-downstream", "server1", name, null, true, "server0");
        FederationUpstreamConfiguration createQueueFederationUpstream = FederatedTestUtil.createQueueFederationUpstream("server1", name);
        createQueueFederationUpstream.getConnectionConfiguration().setShareConnection(true);
        createQueueDownstreamFederationConfiguration.addUpstreamConfiguration(createQueueFederationUpstream);
        getServer(0).getConfiguration().getFederationConfigurations().add(createQueueDownstreamFederationConfiguration);
        getServer(0).getFederationManager().deploy();
        testFederatedQueueShareUpstreamConnection(name, 2, 2);
    }

    private void testFederatedQueueShareUpstreamConnection(String str, int i, int i2) throws Exception {
        ConnectionFactory cf = getCF(1);
        ConnectionFactory cf2 = getCF(0);
        Connection createConnection = cf.createConnection();
        try {
            Connection createConnection2 = cf2.createConnection();
            try {
                createConnection2.start();
                createConnection.start();
                Session createSession = createConnection2.createSession();
                Session createSession2 = createConnection.createSession();
                createSession.createConsumer(createSession.createQueue(str));
                createSession2.createConsumer(createSession2.createQueue(str));
                Assertions.assertTrue(Wait.waitFor(() -> {
                    return getServer(0).getConnectionCount() == i;
                }, 500L, 100L));
                Assertions.assertTrue(Wait.waitFor(() -> {
                    return getServer(1).getConnectionCount() == i2;
                }, 500L, 100L));
                Assertions.assertFalse(Wait.waitFor(() -> {
                    return getServer(0).getConnectionCount() > i;
                }, 500L, 100L));
                Assertions.assertFalse(Wait.waitFor(() -> {
                    return getServer(1).getConnectionCount() > i2;
                }, 500L, 100L));
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    private void testFederatedQueueBiDirectional(String str, boolean z) throws Exception {
        ConnectionFactory cf = getCF(1);
        ConnectionFactory cf2 = getCF(0);
        Connection createConnection = cf.createConnection();
        try {
            Connection createConnection2 = cf2.createConnection();
            try {
                createConnection2.start();
                Session createSession = createConnection2.createSession();
                Queue createQueue = createSession.createQueue(str);
                MessageProducer createProducer = createSession.createProducer(createQueue);
                createConnection.start();
                Session createSession2 = createConnection.createSession();
                Queue createQueue2 = createSession2.createQueue(str);
                MessageProducer createProducer2 = createSession2.createProducer(createQueue2);
                MessageConsumer createConsumer = createSession.createConsumer(createQueue);
                createProducer.send(createSession2.createTextMessage("hello"));
                Assertions.assertNotNull(createConsumer.receive(1000L));
                createProducer2.send(createSession2.createTextMessage("hello"));
                Assertions.assertNotNull(createConsumer.receive(1000L));
                Wait.assertTrue(() -> {
                    return getServer(0).getPostOffice().getBinding(SimpleString.of(str)) != null;
                });
                Wait.assertTrue(() -> {
                    return getServer(1).getPostOffice().getBinding(SimpleString.of(str)) != null;
                });
                Assertions.assertFalse(Wait.waitFor(() -> {
                    return getServer(0).locateQueue(SimpleString.of(str)).getConsumerCount() > 1;
                }, 500L, 100L));
                Assertions.assertFalse(Wait.waitFor(() -> {
                    return getServer(1).locateQueue(SimpleString.of(str)).getConsumerCount() > 1;
                }, 500L, 100L));
                int connectionCount = getServer(1).getConnectionCount();
                createConsumer.close();
                Wait.waitFor(() -> {
                    return getServer(0).getPostOffice().getBinding(SimpleString.of(str)).consumerCount() == 0;
                }, 1000L);
                if (z) {
                    Assertions.assertFalse(Wait.waitFor(() -> {
                        return getServer(1).getConnectionCount() == connectionCount - 1;
                    }, 500L, 100L));
                    Assertions.assertTrue(connectionCount == getServer(1).getConnectionCount());
                }
                MessageConsumer createConsumer2 = createSession2.createConsumer(createQueue2);
                createProducer.send(createSession2.createTextMessage("hello"));
                Assertions.assertNotNull(createConsumer2.receive(1000L));
                createProducer2.send(createSession2.createTextMessage("hello"));
                Assertions.assertNotNull(createConsumer2.receive(1000L));
                MessageConsumer createConsumer3 = createSession.createConsumer(createQueue);
                createProducer.send(createSession2.createTextMessage("produce0"));
                createProducer2.send(createSession2.createTextMessage("produce1"));
                TextMessage receive = createConsumer3.receive(1000L);
                Assertions.assertNotNull(receive);
                Assertions.assertEquals("produce0", receive.getText());
                TextMessage receive2 = createConsumer2.receive(1000L);
                Assertions.assertNotNull(receive2);
                Assertions.assertEquals("produce1", receive2.getText());
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void testFederatedQueueChainOfBrokers() throws Exception {
        String name = getName();
        getServer(0).getConfiguration().getFederationConfigurations().add(FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", name, true));
        getServer(0).getFederationManager().deploy();
        getServer(1).getConfiguration().getFederationConfigurations().add(FederatedTestUtil.createQueueUpstreamFederationConfiguration("server2", name, true));
        getServer(1).getFederationManager().deploy();
        ConnectionFactory cf = getCF(2);
        ConnectionFactory cf2 = getCF(0);
        Connection createConnection = cf.createConnection();
        try {
            Connection createConnection2 = cf2.createConnection();
            try {
                createConnection2.start();
                Session createSession = createConnection2.createSession();
                Queue createQueue = createSession.createQueue(name);
                createConnection.start();
                Session createSession2 = createConnection.createSession();
                MessageProducer createProducer = createSession2.createProducer(createSession2.createQueue(name));
                MessageConsumer createConsumer = createSession.createConsumer(createQueue);
                createProducer.send(createSession2.createTextMessage("hello"));
                Assertions.assertNotNull(createConsumer.receive(1000L));
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
            } catch (Throwable th) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void testFederatedQueueRemoteBrokerRestart() throws Exception {
        String name = getName();
        getServer(0).getConfiguration().getFederationConfigurations().add(FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", name));
        getServer(0).getFederationManager().deploy();
        ConnectionFactory cf = getCF(1);
        Connection createConnection = cf.createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession();
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(name));
        createProducer.send(createSession.createTextMessage("hello"));
        Connection createConnection2 = getCF(0).createConnection();
        createConnection2.start();
        Session createSession2 = createConnection2.createSession();
        MessageConsumer createConsumer = createSession2.createConsumer(createSession2.createQueue(name));
        Assertions.assertNotNull(createConsumer.receive(1000L));
        createProducer.send(createSession.createTextMessage("hello"));
        Assertions.assertNotNull(createConsumer.receive(1000L));
        createConnection.close();
        getServer(1).stop();
        Assertions.assertNull(createConsumer.receiveNoWait());
        getServer(1).start();
        ActiveMQServer server = getServer(1);
        Objects.requireNonNull(server);
        Wait.assertTrue(server::isActive);
        createSimpleQueue(getServer(1), getName());
        Connection createConnection3 = cf.createConnection();
        createConnection3.start();
        Session createSession3 = createConnection3.createSession();
        createSession3.createProducer(createSession3.createQueue(name)).send(createSession3.createTextMessage("hello"));
        Wait.waitFor(() -> {
            return getConsumerCount(getServer(1), name, 1);
        });
        Assertions.assertNotNull(createConsumer.receive(1000L));
    }

    private boolean getConsumerCount(ActiveMQServer activeMQServer, String str, int i) {
        QueueBinding binding = activeMQServer.getPostOffice().getBinding(SimpleString.of(str));
        return binding != null && binding.consumerCount() == i;
    }

    @Test
    public void testFederatedQueueLocalBrokerRestart() throws Exception {
        String name = getName();
        getServer(0).getConfiguration().getFederationConfigurations().add(FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", name));
        getServer(0).getFederationManager().deploy();
        Connection createConnection = getCF(1).createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession();
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(name));
        createProducer.send(createSession.createTextMessage("hello"));
        Connection createConnection2 = getCF(0).createConnection();
        createConnection2.start();
        Session createSession2 = createConnection2.createSession();
        MessageConsumer createConsumer = createSession2.createConsumer(createSession2.createQueue(name));
        Assertions.assertNotNull(createConsumer.receive(1000L));
        createProducer.send(createSession.createTextMessage("hello"));
        Assertions.assertNotNull(createConsumer.receive(1000L));
        createConnection2.close();
        getServer(0).stop();
        createProducer.send(createSession.createTextMessage("hello"));
        getServer(0).start();
        Wait.waitFor(() -> {
            return getServer(0).isActive();
        });
        createSimpleQueue(getServer(0), getName());
        Connection createConnection3 = getCF(0).createConnection();
        createConnection3.start();
        Session createSession3 = createConnection3.createSession();
        MessageConsumer createConsumer2 = createSession3.createConsumer(createSession3.createQueue(name));
        createProducer.send(createSession.createTextMessage("hello"));
        Wait.assertTrue(() -> {
            return getServer(1).getPostOffice().getBinding(SimpleString.of(name)) != null;
        });
        Wait.waitFor(() -> {
            return getServer(1).getPostOffice().getBinding(SimpleString.of(name)).consumerCount() == 1;
        });
        Assertions.assertNotNull(createConsumer2.receive(5000L));
    }

    private jakarta.jms.Message createTextMessage(Session session, String str) throws JMSException {
        TextMessage createTextMessage = session.createTextMessage("hello");
        createTextMessage.setStringProperty("JMSXGroupID", str);
        return createTextMessage;
    }
}
