package org.apache.activemq.artemis.tests.integration.amqp.connect;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.ha.PrimaryOnlyPolicyConfiguration;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.Symbol;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.class */
public class AMQPRedistributeClusterTest extends AmqpTestSupport {
    private static final String QUEUE_NAME = "REDIST_QUEUE";
    protected static final int A_1_PORT = 5673;
    protected static final int A_2_PORT = 5674;
    ActiveMQServer a1;
    ActiveMQServer a2;
    protected static final int B_1_PORT = 5773;
    protected static final int B_2_PORT = 5774;
    ActiveMQServer b1;
    ActiveMQServer b2;
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final String TOPIC_NAME = "REDIST_TOPIC";
    private static final SimpleString TOPIC_NAME_SIMPLE_STRING = SimpleString.of(TOPIC_NAME);

    @BeforeEach
    public void setCluster() throws Exception {
        this.a1 = createClusteredServer("A_1", A_1_PORT, A_2_PORT, B_1_PORT);
        this.a2 = createClusteredServer("A_2", A_2_PORT, A_1_PORT, B_2_PORT);
        this.a1.start();
        this.a2.start();
        this.b1 = createClusteredServer("B_1", B_1_PORT, B_2_PORT, -1);
        this.b2 = createClusteredServer("B_2", B_2_PORT, B_1_PORT, -1);
        this.b1.start();
        this.b2.start();
    }

    private ActiveMQServer createClusteredServer(String str, int i, int i2, int i3) throws Exception {
        ActiveMQServer createServer = createServer(i, false);
        createServer.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST).addQueueConfig(QueueConfiguration.of(QUEUE_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST)));
        createServer.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
        createServer.getConfiguration().clearAddressSettings();
        createServer.getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(0L));
        createServer.setIdentity(str);
        createServer.getConfiguration().setName("node").setHAPolicyConfiguration(new PrimaryOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", "tcp://localhost:" + i).addConnectorConfiguration("otherNode", "tcp://localhost:" + i2);
        createServer.getConfiguration().addClusterConfiguration(new ClusterConnectionConfiguration().setName("cluster").setConnectorName("thisNode").setMessageLoadBalancingType(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION).setStaticConnectors(Collections.singletonList("otherNode")));
        if (i3 > 0) {
            createServer.getConfiguration().addAMQPConnection(new AMQPBrokerConnectConfiguration("myMirror" + i3, "tcp://localhost:" + i3).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(SimpleString.of(mirrorName(i3)))));
        }
        return createServer;
    }

    private String mirrorName(int i) {
        return "$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + i;
    }

    @Test
    public void testQueueRedistributionAMQP() throws Exception {
        internalQueueRedistribution("AMQP");
    }

    @Test
    public void testQueueRedistributionCORE() throws Exception {
        internalQueueRedistribution("CORE");
    }

    public void internalQueueRedistribution(String str) throws Exception {
        Object obj;
        MessageConsumer createConsumer;
        AssertionLoggerHandler assertionLoggerHandler = new AssertionLoggerHandler();
        runAfter(() -> {
            assertionLoggerHandler.close();
        });
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, "tcp://localhost:5673");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory(str, "tcp://localhost:5674");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue(QUEUE_NAME));
            for (int i = 0; i < 100; i++) {
                createProducer.send(createSession.createTextMessage("Hello" + i));
            }
            if (createConnection != null) {
                createConnection.close();
            }
            Connection createConnection2 = createConnectionFactory.createConnection();
            try {
                Connection createConnection3 = createConnectionFactory2.createConnection();
                try {
                    createConnection2.start();
                    createConnection3.start();
                    Session createSession2 = createConnection2.createSession(false, 1);
                    Session createSession3 = createConnection3.createSession(false, 1);
                    for (int i2 = 0; i2 < 100; i2++) {
                        if (i2 % 2 == 0) {
                            obj = "A1";
                            createConsumer = createSession2.createConsumer(createSession2.createQueue(QUEUE_NAME));
                        } else {
                            obj = "A2";
                            createConsumer = createSession3.createConsumer(createSession3.createQueue(QUEUE_NAME));
                        }
                        TextMessage receive = createConsumer.receive(5000L);
                        Assertions.assertNotNull(receive);
                        logger.debug("Received message {} from {}", receive, obj);
                        createConsumer.close();
                    }
                    if (createConnection3 != null) {
                        createConnection3.close();
                    }
                    if (createConnection2 != null) {
                        createConnection2.close();
                    }
                    assertEmptyQueue(this.a1.locateQueue(QUEUE_NAME));
                    assertEmptyQueue(this.a2.locateQueue(QUEUE_NAME));
                    assertEmptyQueue(this.b1.locateQueue(QUEUE_NAME));
                    assertEmptyQueue(this.b2.locateQueue(QUEUE_NAME));
                    Assertions.assertFalse(assertionLoggerHandler.findText(new String[]{"AMQ222196"}));
                    Assertions.assertFalse(assertionLoggerHandler.findText(new String[]{"AMQ224037"}));
                } catch (Throwable th) {
                    if (createConnection3 != null) {
                        try {
                            createConnection3.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (createConnection2 != null) {
                    try {
                        createConnection2.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        } catch (Throwable th5) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th6) {
                    th5.addSuppressed(th6);
                }
            }
            throw th5;
        }
    }

    @Test
    public void testTopicRedistributionAMQP() throws Exception {
        internalTopicRedistribution("AMQP");
    }

    @Test
    public void testTopicRedistributionCORE() throws Exception {
        internalTopicRedistribution("CORE");
    }

    public void internalTopicRedistribution(String str) throws Exception {
        Object obj;
        MessageConsumer createSharedDurableConsumer;
        AssertionLoggerHandler assertionLoggerHandler = new AssertionLoggerHandler();
        runAfter(() -> {
            assertionLoggerHandler.close();
        });
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, "tcp://localhost:5673");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory(str, "tcp://localhost:5674");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            createSession.createSharedDurableConsumer(createSession.createTopic(TOPIC_NAME), "my-topic-shared-subscription").close();
            if (createConnection != null) {
                createConnection.close();
            }
            Connection createConnection2 = createConnectionFactory2.createConnection();
            try {
                Session createSession2 = createConnection2.createSession(false, 1);
                Topic createTopic = createSession2.createTopic(TOPIC_NAME);
                createSession2.createSharedDurableConsumer(createTopic, "my-topic-shared-subscription").close();
                if (createConnection2 != null) {
                    createConnection2.close();
                }
                Wait.assertTrue(() -> {
                    return this.a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2;
                });
                Wait.assertTrue(() -> {
                    return this.a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2;
                });
                Wait.assertTrue(() -> {
                    return this.b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2;
                });
                Wait.assertTrue(() -> {
                    return this.b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2;
                });
                HashSet hashSet = new HashSet();
                this.a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((str2, binding) -> {
                    logger.debug("{} = {}", str2, binding);
                    if (binding instanceof LocalQueueBinding) {
                        QueueBinding queueBinding = (QueueBinding) binding;
                        hashSet.add(queueBinding.getUniqueName().toString());
                        Wait.assertTrue(() -> {
                            return this.b1.locateQueue(queueBinding.getUniqueName()) != null;
                        });
                    }
                });
                Assertions.assertEquals(1, hashSet.size());
                String str3 = (String) hashSet.iterator().next();
                this.a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((str4, binding2) -> {
                    logger.debug("{} = {}", str4, binding2);
                    if (binding2 instanceof LocalQueueBinding) {
                        QueueBinding queueBinding = (QueueBinding) binding2;
                        Wait.assertTrue(() -> {
                            return this.b2.locateQueue(queueBinding.getUniqueName()) != null;
                        });
                    }
                });
                Queue locateQueue = this.a1.locateQueue(str3);
                Assertions.assertNotNull(locateQueue);
                Queue locateQueue2 = this.a2.locateQueue(str3);
                Assertions.assertNotNull(locateQueue2);
                Queue locateQueue3 = this.b1.locateQueue(str3);
                Assertions.assertNotNull(locateQueue3);
                Queue locateQueue4 = this.b2.locateQueue(str3);
                Assertions.assertNotNull(this.a2);
                createConnection = createConnectionFactory.createConnection();
                try {
                    Session createSession3 = createConnection.createSession(false, 1);
                    MessageProducer createProducer = createSession3.createProducer(createTopic);
                    for (int i = 0; i < 100; i++) {
                        createProducer.send(createSession3.createTextMessage("Hello" + i));
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                    Assertions.assertFalse(assertionLoggerHandler.findText(new String[]{"AMQ222196"}));
                    Assertions.assertFalse(assertionLoggerHandler.findText(new String[]{"AMQ224037"}));
                    Assertions.assertEquals(0, locateQueue.getConsumerCount());
                    Objects.requireNonNull(locateQueue);
                    Wait.assertEquals(50L, locateQueue::getMessageCount);
                    Objects.requireNonNull(locateQueue2);
                    Wait.assertEquals(50L, locateQueue2::getMessageCount);
                    logger.debug("b1={}. b2={}", Long.valueOf(locateQueue3.getMessageCount()), Long.valueOf(locateQueue4.getMessageCount()));
                    Objects.requireNonNull(locateQueue3);
                    Wait.assertEquals(50L, locateQueue3::getMessageCount);
                    Objects.requireNonNull(locateQueue4);
                    Wait.assertEquals(50L, locateQueue4::getMessageCount);
                    Connection createConnection3 = createConnectionFactory.createConnection();
                    try {
                        Connection createConnection4 = createConnectionFactory2.createConnection();
                        try {
                            createConnection3.start();
                            createConnection4.start();
                            Session createSession4 = createConnection3.createSession(false, 1);
                            Session createSession5 = createConnection4.createSession(false, 1);
                            for (int i2 = 0; i2 < 100; i2++) {
                                if (i2 % 2 == 0) {
                                    obj = "A1";
                                    createSharedDurableConsumer = createSession4.createSharedDurableConsumer(createTopic, "my-topic-shared-subscription");
                                } else {
                                    obj = "A2";
                                    createSharedDurableConsumer = createSession5.createSharedDurableConsumer(createTopic, "my-topic-shared-subscription");
                                }
                                TextMessage receive = createSharedDurableConsumer.receive(5000L);
                                Assertions.assertNotNull(receive);
                                logger.debug("Received message {} from {}", receive, obj);
                                createSharedDurableConsumer.close();
                            }
                            if (createConnection4 != null) {
                                createConnection4.close();
                            }
                            if (createConnection3 != null) {
                                createConnection3.close();
                            }
                            Assertions.assertFalse(assertionLoggerHandler.findText(new String[]{"AMQ222196"}));
                            Assertions.assertFalse(assertionLoggerHandler.findText(new String[]{"AMQ224037"}));
                            assertEmptyQueue(locateQueue);
                            assertEmptyQueue(locateQueue2);
                            assertEmptyQueue(locateQueue3);
                            assertEmptyQueue(locateQueue4);
                        } catch (Throwable th) {
                            if (createConnection4 != null) {
                                try {
                                    createConnection4.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    } catch (Throwable th3) {
                        if (createConnection3 != null) {
                            try {
                                createConnection3.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        }
                        throw th3;
                    }
                } finally {
                    if (createConnection != null) {
                        try {
                            createConnection.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testRemoteBindingRouting() throws Exception {
        String str = "my-topic-shared-subscription";
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5674");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Topic createTopic = createSession.createTopic(TOPIC_NAME);
            for (int i = 0; i < 10; i++) {
                createSession.createSharedDurableConsumer(createTopic, "my-topic-shared-subscription" + "_" + i);
            }
            if (createConnection != null) {
                createConnection.close();
            }
            createConnection = createConnectionFactory2.createConnection();
            try {
                Session createSession2 = createConnection.createSession(false, 1);
                Topic createTopic2 = createSession2.createTopic(TOPIC_NAME);
                for (int i2 = 0; i2 < 10; i2++) {
                    createSession2.createSharedDurableConsumer(createTopic2, "my-topic-shared-subscription" + "_" + i2);
                }
                if (createConnection != null) {
                    createConnection.close();
                }
                Wait.assertTrue(() -> {
                    return this.a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20;
                });
                Wait.assertTrue(() -> {
                    return this.a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20;
                });
                Wait.assertTrue(() -> {
                    return this.b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20;
                });
                Wait.assertTrue(() -> {
                    return this.b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20;
                });
                ArrayList arrayList = new ArrayList();
                this.a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((str2, binding) -> {
                    if ((binding instanceof RemoteQueueBindingImpl) && binding.getClusterName().toString().startsWith(str + "_0")) {
                        logger.debug("{} = {}", str2, binding);
                        arrayList.add((RemoteQueueBinding) binding);
                    }
                });
                Assertions.assertEquals(1, arrayList.size());
                RoutingContextImpl mirrorOption = new RoutingContextImpl(new TransactionImpl(this.a2.getStorageManager())).setMirrorOption(RoutingContext.MirrorOption.individualRoute);
                CoreMessage coreMessage = new CoreMessage(this.a2.getStorageManager().generateID(), 512);
                coreMessage.setAddress(TOPIC_NAME);
                coreMessage.putStringProperty("Test", "t1");
                ((RemoteQueueBinding) arrayList.get(0)).route(coreMessage, mirrorOption);
                this.a2.getPostOffice().processRoute(coreMessage, mirrorOption, false);
                mirrorOption.getTransaction().commit();
                int i3 = 0;
                while (i3 < 10) {
                    String str3 = "my-topic-shared-subscription_" + i3 + ":global";
                    if (logger.isDebugEnabled()) {
                        logger.debug("a1 queue {} with {} messages", str3, Long.valueOf(this.a1.locateQueue(str3).getMessageCount()));
                        logger.debug("b1 queue {} with {} messages", str3, Long.valueOf(this.b1.locateQueue(str3).getMessageCount()));
                        logger.debug("a2 queue {} with {} messages", str3, Long.valueOf(this.a2.locateQueue(str3).getMessageCount()));
                        logger.debug("b2 queue {} with {} messages", str3, Long.valueOf(this.b2.locateQueue(str3).getMessageCount()));
                    }
                    long j = i3 == 0 ? 1L : 0L;
                    Queue locateQueue = this.a1.locateQueue(str3);
                    Objects.requireNonNull(locateQueue);
                    Wait.assertEquals(j, locateQueue::getMessageCount);
                    long j2 = i3 == 0 ? 1L : 0L;
                    Queue locateQueue2 = this.b1.locateQueue(str3);
                    Objects.requireNonNull(locateQueue2);
                    Wait.assertEquals(j2, locateQueue2::getMessageCount);
                    Queue locateQueue3 = this.a2.locateQueue(str3);
                    Objects.requireNonNull(locateQueue3);
                    Wait.assertEquals(0L, locateQueue3::getMessageCount);
                    Queue locateQueue4 = this.b2.locateQueue(str3);
                    Objects.requireNonNull(locateQueue4);
                    Wait.assertEquals(0L, locateQueue4::getMessageCount);
                    i3++;
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testFakeMirrorSend() throws Exception {
        String str = "my-topic-shared-subscription";
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5674");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Topic createTopic = createSession.createTopic(TOPIC_NAME);
            for (int i = 0; i < 10; i++) {
                createSession.createSharedDurableConsumer(createTopic, "my-topic-shared-subscription" + "_" + i);
            }
            if (createConnection != null) {
                createConnection.close();
            }
            createConnection = createConnectionFactory2.createConnection();
            try {
                Session createSession2 = createConnection.createSession(false, 1);
                Topic createTopic2 = createSession2.createTopic(TOPIC_NAME);
                for (int i2 = 0; i2 < 10; i2++) {
                    createSession2.createSharedDurableConsumer(createTopic2, "my-topic-shared-subscription" + "_" + i2);
                }
                if (createConnection != null) {
                    createConnection.close();
                }
                Wait.assertTrue(() -> {
                    return this.a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20;
                });
                Wait.assertTrue(() -> {
                    return this.a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20;
                });
                Wait.assertTrue(() -> {
                    return this.b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20;
                });
                Wait.assertTrue(() -> {
                    return this.b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20;
                });
                ArrayList arrayList = new ArrayList();
                this.a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((str2, binding) -> {
                    if ((binding instanceof RemoteQueueBindingImpl) && binding.getClusterName().toString().startsWith(str + "_0")) {
                        logger.debug("{} = {}", str2, binding);
                        arrayList.add((RemoteQueueBinding) binding);
                    }
                });
                Assertions.assertEquals(1, arrayList.size());
                AmqpConnection createAmqpConnection = createAmqpConnection(new URI("tcp://localhost:5673"));
                Objects.requireNonNull(createAmqpConnection);
                runAfter(createAmqpConnection::close);
                AmqpSession createSession3 = createAmqpConnection.createSession();
                AmqpMessage amqpMessage = new AmqpMessage();
                amqpMessage.setAddress(TOPIC_NAME);
                amqpMessage.setDeliveryAnnotation(AMQPMirrorControllerSource.TARGET_QUEUES.toString(), new ArrayList());
                amqpMessage.setDeliveryAnnotation(AMQPMirrorControllerSource.INTERNAL_ID.toString(), Long.valueOf(this.a1.getStorageManager().generateID()));
                amqpMessage.setDeliveryAnnotation(AMQPMirrorControllerSource.BROKER_ID.toString(), String.valueOf(this.b1.getNodeID()));
                AmqpSender createSender = createSession3.createSender(mirrorName(A_1_PORT), new Symbol[]{Symbol.getSymbol("amq.mirror")});
                createSender.send(amqpMessage);
                for (int i3 = 0; i3 < 10; i3++) {
                    String str3 = "my-topic-shared-subscription_" + i3 + ":global";
                    Queue locateQueue = this.a1.locateQueue(str3);
                    Objects.requireNonNull(locateQueue);
                    Wait.assertEquals(0L, locateQueue::getMessageCount);
                    Queue locateQueue2 = this.b1.locateQueue(str3);
                    Objects.requireNonNull(locateQueue2);
                    Wait.assertEquals(0L, locateQueue2::getMessageCount);
                    Queue locateQueue3 = this.a2.locateQueue(str3);
                    Objects.requireNonNull(locateQueue3);
                    Wait.assertEquals(0L, locateQueue3::getMessageCount);
                    Queue locateQueue4 = this.b2.locateQueue(str3);
                    Objects.requireNonNull(locateQueue4);
                    Wait.assertEquals(0L, locateQueue4::getMessageCount);
                }
                AmqpMessage amqpMessage2 = new AmqpMessage();
                amqpMessage2.setAddress(TOPIC_NAME);
                ArrayList arrayList2 = new ArrayList();
                arrayList2.add("my-topic-shared-subscription_3:global");
                arrayList2.add("IDONTEXIST");
                amqpMessage2.setDeliveryAnnotation(AMQPMirrorControllerSource.TARGET_QUEUES.toString(), arrayList2);
                amqpMessage2.setDeliveryAnnotation(AMQPMirrorControllerSource.INTERNAL_ID.toString(), Long.valueOf(this.a1.getStorageManager().generateID()));
                amqpMessage2.setDeliveryAnnotation(AMQPMirrorControllerSource.BROKER_ID.toString(), String.valueOf(this.b1.getNodeID()));
                createSender.send(amqpMessage2);
                for (int i4 = 0; i4 < 10; i4++) {
                    String str4 = "my-topic-shared-subscription_" + i4 + ":global";
                    if (i4 == 3) {
                        Queue locateQueue5 = this.a1.locateQueue(str4);
                        Objects.requireNonNull(locateQueue5);
                        Wait.assertEquals(1L, locateQueue5::getMessageCount);
                    } else {
                        Queue locateQueue6 = this.a1.locateQueue(str4);
                        Objects.requireNonNull(locateQueue6);
                        Wait.assertEquals(0L, locateQueue6::getMessageCount);
                    }
                    Queue locateQueue7 = this.b1.locateQueue(str4);
                    Objects.requireNonNull(locateQueue7);
                    Wait.assertEquals(0L, locateQueue7::getMessageCount);
                    Queue locateQueue8 = this.a2.locateQueue(str4);
                    Objects.requireNonNull(locateQueue8);
                    Wait.assertEquals(0L, locateQueue8::getMessageCount);
                    Queue locateQueue9 = this.b2.locateQueue(str4);
                    Objects.requireNonNull(locateQueue9);
                    Wait.assertEquals(0L, locateQueue9::getMessageCount);
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testMultiNodeSubscription() throws Exception {
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5673");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5674");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            Topic createTopic = createSession.createTopic(TOPIC_NAME);
            for (int i = 0; i < 10; i++) {
                createSession.createSharedDurableConsumer(createTopic, "my-topic-shared-subscription" + "_" + i);
            }
            if (createConnection != null) {
                createConnection.close();
            }
            createConnection = createConnectionFactory2.createConnection();
            try {
                Session createSession2 = createConnection.createSession(false, 1);
                Topic createTopic2 = createSession2.createTopic(TOPIC_NAME);
                for (int i2 = 10; i2 < 20; i2++) {
                    createSession2.createSharedDurableConsumer(createTopic2, "my-topic-shared-subscription" + "_" + i2);
                }
                if (createConnection != null) {
                    createConnection.close();
                }
                Wait.assertTrue(() -> {
                    return this.a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20;
                });
                Wait.assertTrue(() -> {
                    return this.a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20;
                });
                Wait.assertTrue(() -> {
                    return this.b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20;
                });
                Wait.assertTrue(() -> {
                    return this.b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20;
                });
                Connection createConnection2 = createConnectionFactory2.createConnection();
                try {
                    Session createSession3 = createConnection2.createSession(false, 1);
                    createSession3.createProducer(createSession3.createTopic(TOPIC_NAME)).send(createSession3.createTextMessage("hello"));
                    if (createConnection2 != null) {
                        createConnection2.close();
                    }
                    for (int i3 = 0; i3 < 10; i3++) {
                        String str = "my-topic-shared-subscription_" + i3 + ":global";
                        Wait.waitFor(() -> {
                            return this.a1.locateQueue(str) != null;
                        });
                        Wait.waitFor(() -> {
                            return this.b1.locateQueue(str) != null;
                        });
                        Queue locateQueue = this.a1.locateQueue(str);
                        Objects.requireNonNull(locateQueue);
                        Wait.assertEquals(1L, locateQueue::getMessageCount);
                        Queue locateQueue2 = this.b1.locateQueue(str);
                        Objects.requireNonNull(locateQueue2);
                        Wait.assertEquals(1L, locateQueue2::getMessageCount);
                    }
                    for (int i4 = 10; i4 < 20; i4++) {
                        String str2 = "my-topic-shared-subscription_" + i4 + ":global";
                        Wait.waitFor(() -> {
                            return this.a2.locateQueue(str2) != null;
                        });
                        Wait.waitFor(() -> {
                            return this.b2.locateQueue(str2) != null;
                        });
                        Queue locateQueue3 = this.a2.locateQueue(str2);
                        Objects.requireNonNull(locateQueue3);
                        Wait.assertEquals(1L, locateQueue3::getMessageCount);
                        Queue locateQueue4 = this.b2.locateQueue(str2);
                        Objects.requireNonNull(locateQueue4);
                        Wait.assertEquals(1L, locateQueue4::getMessageCount);
                    }
                } finally {
                    if (createConnection2 != null) {
                        try {
                            createConnection2.close();
                        } catch (Throwable th) {
                            th.addSuppressed(th);
                        }
                    }
                }
            } finally {
            }
        } finally {
        }
    }

    private void assertEmptyQueue(Queue queue) {
        Assertions.assertNotNull(queue);
        try {
            Objects.requireNonNull(queue);
            Wait.assertEquals(0L, queue::getMessageCount);
        } catch (Throwable th) {
            if (!(th instanceof AssertionError)) {
                throw new RuntimeException(th.getMessage(), th);
            }
            throw ((AssertionError) th);
        }
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    protected String getConfiguredProtocols() {
        return "AMQP,CORE";
    }
}
