package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import java.lang.invoke.MethodHandles;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSFQQNConsumerTest.class */
public class JMSFQQNConsumerTest extends MultiprotocolJMSClientTestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.activemq.artemis.tests.integration.jms.multiprotocol.JMSFQQNConsumerTest$1RunnableConsumer, reason: invalid class name */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSFQQNConsumerTest$1RunnableConsumer.class */
    public class C1RunnableConsumer implements Runnable {
        int errors = 0;
        final int expected;
        final Connection c;
        final Session session;
        final Topic topic;
        final MessageConsumer consumer;
        final String queueName;
        final String filter;
        final CountDownLatch done;
        final /* synthetic */ String val$protocol;

        C1RunnableConsumer(Connection connection, String str, int i, String str2, CountDownLatch countDownLatch, String str3) throws Exception {
            this.val$protocol = str3;
            this.c = connection;
            this.session = connection.createSession(false, 1);
            this.queueName = str;
            this.expected = i;
            this.topic = this.session.createTopic(str);
            this.consumer = this.session.createConsumer(this.topic, str2);
            this.done = countDownLatch;
            this.filter = str2;
        }

        @Override // java.lang.Runnable
        public void run() {
            for (int i = 0; i < this.expected; i++) {
                try {
                    TextMessage receive = this.consumer.receive(5000L);
                    JMSFQQNConsumerTest.logger.debug("Queue {} received message {}", this.queueName, receive.getText());
                    Assert.assertEquals(i, receive.getIntProperty("i"));
                    Assert.assertNotNull(receive);
                } catch (Throwable th) {
                    this.errors++;
                    JMSFQQNConsumerTest.logger.warn(th.getMessage(), th);
                    return;
                } finally {
                    this.done.countDown();
                }
            }
            if (this.val$protocol.equals("OPENWIRE")) {
                Assert.assertNull(this.consumer.receive(500L));
            } else {
                Assert.assertNull(this.consumer.receiveNoWait());
            }
        }
    }

    @Test
    public void testFQQNTopicConsumerWithSelectorAMQP() throws Exception {
        testFQQNTopicConsumerWithSelector("AMQP", true);
    }

    @Test
    public void testFQQNTopicConsumerWithSelectorOpenWire() throws Exception {
        testFQQNTopicConsumerWithSelector("OPENWIRE", false);
    }

    @Test
    public void testFQQNTopicConsumerWithSelectorCore() throws Exception {
        testFQQNTopicConsumerWithSelector("CORE", true);
    }

    private void testFQQNTopicConsumerWithSelector(String str, boolean z) throws Exception {
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, "tcp://localhost:5672");
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            createConnection.start();
            Session createSession = createConnection.createSession(false, 1);
            MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic(CompositeAddress.toFullyQualified("address", "queue")), "prop='match'");
            Wait.assertTrue(() -> {
                return this.server.locateQueue("queue") != null;
            }, 2000L, 100L);
            Queue locateQueue = this.server.locateQueue(SimpleString.toSimpleString("queue"));
            Assert.assertEquals(RoutingType.MULTICAST, locateQueue.getRoutingType());
            Assert.assertNotNull(locateQueue.getFilter());
            Assert.assertEquals("prop='match'", locateQueue.getFilter().getFilterString().toString());
            assertEquals("prop='match'", this.server.locateQueue("queue").getFilter().getFilterString().toString());
            MessageProducer createProducer = createSession.createProducer(createSession.createTopic("address"));
            TextMessage createTextMessage = createSession.createTextMessage("hello");
            createTextMessage.setStringProperty("prop", "match");
            createProducer.send(createTextMessage);
            Assert.assertNotNull(createConsumer.receive(5000L));
            TextMessage createTextMessage2 = createSession.createTextMessage("hello");
            createTextMessage2.setStringProperty("prop", "nomatch");
            createProducer.send(createTextMessage2);
            if (str.equals("OPENWIRE")) {
                Assert.assertNull(createConsumer.receive(500L));
            } else {
                Assert.assertNull(createConsumer.receiveNoWait());
            }
            if (createConnection != null) {
                createConnection.close();
            }
            if (z) {
                boolean z2 = false;
                try {
                    Connection createConnection2 = createConnectionFactory.createConnection();
                    try {
                        Session createSession2 = createConnection2.createSession(false, 1);
                        createSession2.createConsumer(createSession2.createTopic(CompositeAddress.toFullyQualified("address", "queue")), "shouldThrowException=true");
                        if (createConnection2 != null) {
                            createConnection2.close();
                        }
                    } catch (Throwable th) {
                        if (createConnection2 != null) {
                            try {
                                createConnection2.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Exception e) {
                    logger.debug(e.getMessage(), e);
                    z2 = true;
                }
                Assert.assertTrue(z2);
                createConnection = createConnectionFactory.createConnection();
                try {
                    createConnection.start();
                    Session createSession3 = createConnection.createSession(false, 1);
                    MessageConsumer createConsumer2 = createSession3.createConsumer(createSession3.createTopic(CompositeAddress.toFullyQualified("address", "queue")));
                    Wait.assertTrue(() -> {
                        return this.server.locateQueue("queue") != null;
                    }, 2000L, 100L);
                    Queue locateQueue2 = this.server.locateQueue(SimpleString.toSimpleString("queue"));
                    Wait.assertEquals(1, () -> {
                        return locateQueue2.getConsumers().size();
                    });
                    locateQueue2.getConsumers().forEach(consumer -> {
                        Assert.assertNull(consumer.getFilter());
                    });
                    MessageProducer createProducer2 = createSession3.createProducer(createSession3.createTopic("address"));
                    TextMessage createTextMessage3 = createSession3.createTextMessage("hello");
                    createTextMessage3.setStringProperty("prop", "match");
                    createProducer2.send(createTextMessage3);
                    Assert.assertNotNull(createConsumer2.receive(5000L));
                    TextMessage createTextMessage4 = createSession3.createTextMessage("hello");
                    createTextMessage4.setStringProperty("prop", "nomatch");
                    createProducer2.send(createTextMessage4);
                    if (str.equals("OPENWIRE")) {
                        Assert.assertNull(createConsumer2.receive(500L));
                    } else {
                        Assert.assertNull(createConsumer2.receiveNoWait());
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } catch (Throwable th3) {
                    throw th3;
                }
            }
        } finally {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
        }
    }

    @Test
    public void testFQQNTopicFilterConsumerOnlyAMQP() throws Exception {
        testFQQNTopicFilterConsumerOnly("AMQP");
    }

    @Test
    public void testFQQNTopicFilterConsumerOnlyOPENWIRE() throws Exception {
        testFQQNTopicFilterConsumerOnly("OPENWIRE");
    }

    @Test
    public void testFQQNTopicFilterConsumerOnlyCORE() throws Exception {
        testFQQNTopicFilterConsumerOnly("CORE");
    }

    private void testFQQNTopicFilterConsumerOnly(String str) throws Exception {
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, "tcp://localhost:5672");
        this.server.addAddressInfo(new AddressInfo("address").addRoutingType(RoutingType.MULTICAST));
        this.server.createQueue(new QueueConfiguration().setAddress("address").setName("queue").setRoutingType(RoutingType.MULTICAST));
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            createConnection.start();
            Session createSession = createConnection.createSession(false, 1);
            MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic(CompositeAddress.toFullyQualified("address", "queue")), "prop='match'");
            Wait.assertTrue(() -> {
                return this.server.locateQueue("queue") != null;
            }, 2000L, 100L);
            Queue locateQueue = this.server.locateQueue(SimpleString.toSimpleString("queue"));
            Assert.assertEquals(RoutingType.MULTICAST, locateQueue.getRoutingType());
            Assert.assertNull(locateQueue.getFilter());
            Wait.assertEquals(1, () -> {
                return locateQueue.getConsumers().size();
            });
            locateQueue.getConsumers().forEach(consumer -> {
                Assert.assertNotNull(consumer.getFilter());
                Assert.assertEquals("prop='match'", consumer.getFilter().getFilterString().toString());
            });
            MessageProducer createProducer = createSession.createProducer(createSession.createTopic("address"));
            TextMessage createTextMessage = createSession.createTextMessage("hello");
            createTextMessage.setStringProperty("prop", "match");
            createProducer.send(createTextMessage);
            Assert.assertNotNull(createConsumer.receive(5000L));
            TextMessage createTextMessage2 = createSession.createTextMessage("hello");
            createTextMessage2.setStringProperty("prop", "nomatch");
            createProducer.send(createTextMessage2);
            if (str.equals("OPENWIRE")) {
                assertNull(createConsumer.receive(100L));
            } else {
                assertNull(createConsumer.receiveNoWait());
            }
            if (createConnection != null) {
                createConnection.close();
            }
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testFQQNTopicConsumerDontExistAMQP() throws Exception {
        testFQQNTopicConsumerDontExist("AMQP");
    }

    @Test
    public void testFQQNTopicConsumerDontExistCORE() throws Exception {
        testFQQNTopicConsumerDontExist("CORE");
    }

    private void testFQQNTopicConsumerDontExist(String str) throws Exception {
        AddressSettings autoCreateQueues = new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false);
        this.server.getAddressSettingsRepository().clear();
        this.server.getAddressSettingsRepository().addMatch("#", autoCreateQueues);
        boolean z = false;
        try {
            Connection createConnection = CFUtil.createConnectionFactory(str, "tcp://localhost:5672").createConnection();
            try {
                Session createSession = createConnection.createSession(false, 1);
                createSession.createConsumer(createSession.createTopic(CompositeAddress.toFullyQualified("address", "queue")));
                if (createConnection != null) {
                    createConnection.close();
                }
            } finally {
            }
        } catch (Exception e) {
            logger.debug(e.getMessage(), e);
            z = true;
        }
        Assert.assertTrue(z);
    }

    @Test
    public void testFQQNQueueConsumerWithSelectorAMQP() throws Exception {
        testFQQNQueueConsumerWithSelector("AMQP");
    }

    @Test
    public void testFQQNQueueConsumerWithSelectorOpenWire() throws Exception {
        testFQQNQueueConsumerWithSelector("OPENWIRE");
    }

    @Test
    public void testFQQNQueueConsumerWithSelectorCore() throws Exception {
        testFQQNQueueConsumerWithSelector("CORE");
    }

    private void testFQQNQueueConsumerWithSelector(String str) throws Exception {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultQueueRoutingType(RoutingType.ANYCAST).setDefaultAddressRoutingType(RoutingType.ANYCAST));
        Connection createConnection = CFUtil.createConnectionFactory(str, "tcp://localhost:5672").createConnection();
        try {
            createConnection.start();
            Session createSession = createConnection.createSession(false, 1);
            jakarta.jms.Queue createQueue = createSession.createQueue(CompositeAddress.toFullyQualified("address", "myQueue") + (str.equals("OPENWIRE") ? "?selectorAware=true" : ""));
            MessageConsumer createConsumer = createSession.createConsumer(createQueue, "prop='match'");
            Wait.assertTrue(() -> {
                return this.server.locateQueue("myQueue") != null;
            }, 2000L, 100L);
            Queue locateQueue = this.server.locateQueue(SimpleString.toSimpleString("myQueue"));
            Assert.assertEquals(RoutingType.ANYCAST, locateQueue.getRoutingType());
            Assert.assertNull(locateQueue.getFilter());
            MessageProducer createProducer = createSession.createProducer(createQueue);
            Message createMessage = createSession.createMessage();
            createMessage.setStringProperty("prop", "match");
            createProducer.send(createMessage);
            assertNotNull(createConsumer.receive(1000L));
            Message createMessage2 = createSession.createMessage();
            createMessage2.setStringProperty("prop", "no-match");
            createProducer.send(createMessage2);
            if (str.equals("OPENWIRE")) {
                assertNull(createConsumer.receive(100L));
            } else {
                assertNull(createConsumer.receiveNoWait());
            }
            Wait.assertEquals(1, () -> {
                return locateQueue.getConsumers().size();
            });
            locateQueue.getConsumers().forEach(consumer -> {
                Assert.assertNotNull(consumer.getFilter());
                Assert.assertEquals("prop='match'", consumer.getFilter().getFilterString().toString());
            });
            createConsumer.close();
            Wait.assertEquals(0, () -> {
                return locateQueue.getConsumers().size();
            });
            String str2 = "notHappening=true";
            createSession.createConsumer(createQueue, "notHappening=true");
            Wait.assertEquals(1, () -> {
                return locateQueue.getConsumers().size();
            });
            locateQueue.getConsumers().forEach(consumer2 -> {
                Assert.assertNotNull(consumer2.getFilter());
                Assert.assertEquals(str2, consumer2.getFilter().getFilterString().toString());
            });
            if (createConnection != null) {
                createConnection.close();
            }
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testFQQNTopicMultiConsumerWithSelectorAMQP() throws Exception {
        testFQQNTopicMultiConsumerWithSelector("AMQP", true);
    }

    @Test
    public void testFQQNTopicMultiConsumerWithSelectorOpenWire() throws Exception {
        testFQQNTopicMultiConsumerWithSelector("OPENWIRE", false);
    }

    @Test
    public void testFQQNTopicMultiConsumerWithSelectorCORE() throws Exception {
        testFQQNTopicMultiConsumerWithSelector("CORE", true);
    }

    private void testFQQNTopicMultiConsumerWithSelector(String str, boolean z) throws Exception {
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, "tcp://localhost:5672");
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        Objects.requireNonNull(newFixedThreadPool);
        runAfter(newFixedThreadPool::shutdownNow);
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            createConnection.start();
            CountDownLatch countDownLatch = new CountDownLatch(10);
            C1RunnableConsumer[] c1RunnableConsumerArr = new C1RunnableConsumer[10];
            for (int i = 0; i < 10; i++) {
                c1RunnableConsumerArr[i] = new C1RunnableConsumer(createConnection, "address::queue" + i, i * 10, "i < " + (i * 10), countDownLatch, str);
            }
            Session createSession = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(createSession.createTopic("address"));
            for (int i2 = 0; i2 < 10 * 10; i2++) {
                TextMessage createTextMessage = createSession.createTextMessage("i=" + i2);
                createTextMessage.setIntProperty("i", i2);
                createProducer.send(createTextMessage);
            }
            for (C1RunnableConsumer c1RunnableConsumer : c1RunnableConsumerArr) {
                newFixedThreadPool.execute(c1RunnableConsumer);
            }
            Assert.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
            for (C1RunnableConsumer c1RunnableConsumer2 : c1RunnableConsumerArr) {
                Assert.assertEquals("Error on consumer for queue " + c1RunnableConsumer2.queueName, 0L, r0.errors);
            }
            if (createConnection != null) {
                createConnection.close();
            }
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
