package org.apache.activemq.artemis.tests.integration.federation;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import java.lang.invoke.MethodHandles;
import java.net.URL;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.apache.activemq.artemis.tests.integration.jms.RedeployTest;
import org.apache.activemq.artemis.tests.integration.management.SimpleManagementTest;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/federation/FederationQueueMatchXMLConfigParsingTest.class */
public class FederationQueueMatchXMLConfigParsingTest extends ActiveMQTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Timeout(20)
    @Test
    public void testOpenWireOverCoreFederationDownstream() throws Exception {
        doTestSimpleQueueFederationOverCoreFederationDoownstream("OPENWIRE");
    }

    @Timeout(20)
    @Test
    public void testCoreOverCoreFederationDownstream() throws Exception {
        doTestSimpleQueueFederationOverCoreFederationDoownstream("CORE");
    }

    @Timeout(20)
    @Test
    public void testAMQPOverCoreFederationDownstream() throws Exception {
        doTestSimpleQueueFederationOverCoreFederationDoownstream("AMQP");
    }

    private void doTestSimpleQueueFederationOverCoreFederationDoownstream(String str) throws Exception {
        URL resource = RedeployTest.class.getClassLoader().getResource("core-federated-queue-match-server1.xml");
        URL resource2 = RedeployTest.class.getClassLoader().getResource("core-federated-queue-match-server2.xml");
        CountDownLatch countDownLatch = new CountDownLatch(5);
        EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
        embeddedActiveMQ.setConfigResourcePath(resource.toURI().toString());
        embeddedActiveMQ.start();
        EmbeddedActiveMQ embeddedActiveMQ2 = new EmbeddedActiveMQ();
        embeddedActiveMQ2.setConfigResourcePath(resource2.toURI().toString());
        embeddedActiveMQ2.start();
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, "tcp://localhost:61618");
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory(str, SimpleManagementTest.LOCALHOST);
        try {
            Connection createConnection = createConnectionFactory.createConnection();
            try {
                Session createSession = createConnection.createSession(false, 1);
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue("exampleQueueTwo"));
                createConnection.start();
                createConsumer.setMessageListener(message -> {
                    logger.info("Received message: {} ", message);
                    countDownLatch.countDown();
                });
                Connection createConnection2 = createConnectionFactory2.createConnection();
                try {
                    createConnection2.start();
                    Session createSession2 = createConnection2.createSession(false, 1);
                    MessageProducer createProducer = createSession2.createProducer(createSession2.createQueue("exampleQueueTwo"));
                    UUID randomUUID = UUID.randomUUID();
                    for (int i = 0; i < 5; i++) {
                        createProducer.send(createSession2.createTextMessage("Test message:" + randomUUID));
                        logger.trace("Sent message: {}", randomUUID);
                    }
                    logger.info("Sent {} messages to queue for federation dispatch.", 5);
                    if (createConnection2 != null) {
                        createConnection2.close();
                    }
                    Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } catch (Throwable th) {
                    if (createConnection2 != null) {
                        try {
                            createConnection2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } finally {
            try {
                embeddedActiveMQ.stop();
            } catch (Exception e) {
            }
            try {
                embeddedActiveMQ2.stop();
            } catch (Exception e2) {
            }
        }
    }

    @Timeout(20)
    @Test
    public void testQueuePolicyMatchesOnlyIndicatedQueueOpenwire() throws Exception {
        doTestQueueMatchPolicyOnlyMatchesIndicatedQueue("OPENWIRE");
    }

    @Timeout(20)
    @Test
    public void testQueuePolicyMatchesOnlyIndicatedQueueCore() throws Exception {
        doTestQueueMatchPolicyOnlyMatchesIndicatedQueue("CORE");
    }

    @Timeout(20)
    @Test
    public void testQueuePolicyMatchesOnlyIndicatedQueueAMQP() throws Exception {
        doTestQueueMatchPolicyOnlyMatchesIndicatedQueue("AMQP");
    }

    private void doTestQueueMatchPolicyOnlyMatchesIndicatedQueue(String str) throws Exception {
        URL resource = RedeployTest.class.getClassLoader().getResource("core-federated-queue-match-server1.xml");
        URL resource2 = RedeployTest.class.getClassLoader().getResource("core-federated-queue-match-server2.xml");
        AtomicInteger atomicInteger = new AtomicInteger();
        CountDownLatch countDownLatch = new CountDownLatch(5);
        EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
        embeddedActiveMQ.setConfigResourcePath(resource.toURI().toString());
        embeddedActiveMQ.start();
        EmbeddedActiveMQ embeddedActiveMQ2 = new EmbeddedActiveMQ();
        embeddedActiveMQ2.setConfigResourcePath(resource2.toURI().toString());
        embeddedActiveMQ2.start();
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, SimpleManagementTest.LOCALHOST);
        ConnectionFactory createConnectionFactory2 = CFUtil.createConnectionFactory(str, "tcp://localhost:61618");
        try {
            Connection createConnection = createConnectionFactory.createConnection();
            try {
                Session createSession = createConnection.createSession(false, 1);
                Queue createQueue = createSession.createQueue("exampleQueueOne");
                Queue createQueue2 = createSession.createQueue("exampleQueueTwo");
                MessageConsumer createConsumer = createSession.createConsumer(createQueue);
                MessageConsumer createConsumer2 = createSession.createConsumer(createQueue2);
                createConnection.start();
                createConsumer.setMessageListener(message -> {
                    logger.info("Consumer #1 Received message: {} ", message);
                    atomicInteger.incrementAndGet();
                });
                createConsumer2.setMessageListener(message2 -> {
                    logger.info("Consumer #2 Received message: {} ", message2);
                    countDownLatch.countDown();
                });
                Connection createConnection2 = createConnectionFactory2.createConnection();
                try {
                    createConnection2.start();
                    Session createSession2 = createConnection2.createSession(false, 1);
                    Queue createQueue3 = createSession2.createQueue("exampleQueueOne");
                    Queue createQueue4 = createSession2.createQueue("exampleQueueTwo");
                    MessageProducer createProducer = createSession2.createProducer(createQueue3);
                    MessageProducer createProducer2 = createSession2.createProducer(createQueue4);
                    UUID randomUUID = UUID.randomUUID();
                    for (int i = 0; i < 5; i++) {
                        createProducer.send(createSession2.createTextMessage("Test message:" + randomUUID));
                        createProducer2.send(createSession2.createTextMessage("Test message:" + randomUUID));
                        logger.trace("Sent message: {}", randomUUID);
                    }
                    logger.info("Sent {} messages to queues for federation dispatch.", 5);
                    if (createConnection2 != null) {
                        createConnection2.close();
                    }
                    Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
                    Assertions.assertEquals(0, atomicInteger.get());
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } catch (Throwable th) {
                    if (createConnection2 != null) {
                        try {
                            createConnection2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } finally {
            try {
                embeddedActiveMQ.stop();
            } catch (Exception e) {
            }
            try {
                embeddedActiveMQ2.stop();
            } catch (Exception e2) {
            }
        }
    }
}
