package org.apache.activemq.artemis.tests.integration.amqp;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.ObjectMessage;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.class */
public class JMSDurableConsumerTest extends JMSClientTestSupport {

    @Parameterized.Parameter(0)
    public boolean amqpUseCoreSubscriptionNaming;

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest$BigObject.class */
    public static class BigObject implements Serializable {
        private char[] contents;

        public BigObject(int i) {
            this.contents = new char[i];
            for (int i2 = 0; i2 < i; i2++) {
                this.contents[i2] = 'X';
            }
        }
    }

    @Parameterized.Parameters(name = "{index}: amqpUseCoreSubscriptionNaming={0}")
    public static Collection<Object[]> parameters() {
        return Arrays.asList(new Object[]{true}, new Object[]{false});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    public void addConfiguration(ActiveMQServer activeMQServer) {
        activeMQServer.getConfiguration().setAmqpUseCoreSubscriptionNaming(this.amqpUseCoreSubscriptionNaming);
    }

    @Test(timeout = 30000)
    public void testDurableConsumerAsync() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicReference atomicReference = new AtomicReference();
        Connection createConnection = createConnection(getTopicName() + "-ClientId");
        try {
            Session createSession = createConnection.createSession(false, 1);
            Topic createTopic = createSession.createTopic(getTopicName());
            createSession.createDurableSubscriber(createTopic, "DurbaleTopic").setMessageListener(new MessageListener() { // from class: org.apache.activemq.artemis.tests.integration.amqp.JMSDurableConsumerTest.1
                public void onMessage(Message message) {
                    atomicReference.set(message);
                    countDownLatch.countDown();
                }
            });
            MessageProducer createProducer = createSession.createProducer(createTopic);
            createProducer.setDeliveryMode(2);
            createConnection.start();
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("hello");
            createProducer.send(createTextMessage);
            assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
            assertNotNull("Should have received a message by now.", atomicReference.get());
            assertTrue("Should be an instance of TextMessage", atomicReference.get() instanceof TextMessage);
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Test(timeout = 30000)
    public void testDurableConsumerSync() throws Exception {
        Connection createConnection = createConnection(getTopicName() + "-ClientId");
        try {
            Session createSession = createConnection.createSession(false, 1);
            Topic createTopic = createSession.createTopic(getTopicName());
            final TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(createTopic, "DurbaleTopic");
            MessageProducer createProducer = createSession.createProducer(createTopic);
            createProducer.setDeliveryMode(2);
            createConnection.start();
            TextMessage createTextMessage = createSession.createTextMessage();
            createTextMessage.setText("hello");
            createProducer.send(createTextMessage);
            final AtomicReference atomicReference = new AtomicReference();
            assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.amqp.JMSDurableConsumerTest.2
                public boolean isSatisfied() throws Exception {
                    atomicReference.set(createDurableSubscriber.receiveNoWait());
                    return atomicReference.get() != null;
                }
            }, TimeUnit.SECONDS.toMillis(25L), TimeUnit.MILLISECONDS.toMillis(200L)));
            assertNotNull("Should have received a message by now.", atomicReference.get());
            assertTrue("Should be an instance of TextMessage", atomicReference.get() instanceof TextMessage);
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Test(timeout = 30000)
    public void testDurableConsumerUnsubscribe() throws Exception {
        Connection createConnection = createConnection(getTopicName() + "-ClientId");
        try {
            Session createSession = createConnection.createSession(false, 1);
            TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(createSession.createTopic(getTopicName()), "DurbaleTopic");
            assertTrue(org.apache.activemq.artemis.tests.util.Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.amqp.JMSDurableConsumerTest.3
                public boolean isSatisfied() throws Exception {
                    return JMSDurableConsumerTest.this.server.getTotalConsumerCount() == 1;
                }
            }, TimeUnit.SECONDS.toMillis(20L), TimeUnit.MILLISECONDS.toMillis(250L)));
            createDurableSubscriber.close();
            assertTrue(org.apache.activemq.artemis.tests.util.Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.amqp.JMSDurableConsumerTest.4
                public boolean isSatisfied() throws Exception {
                    return JMSDurableConsumerTest.this.server.getTotalConsumerCount() == 0;
                }
            }, TimeUnit.SECONDS.toMillis(20L), TimeUnit.MILLISECONDS.toMillis(250L)));
            createSession.unsubscribe("DurbaleTopic");
            assertTrue(org.apache.activemq.artemis.tests.util.Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.amqp.JMSDurableConsumerTest.5
                public boolean isSatisfied() throws Exception {
                    return JMSDurableConsumerTest.this.server.getTotalConsumerCount() == 0;
                }
            }, TimeUnit.SECONDS.toMillis(20L), TimeUnit.MILLISECONDS.toMillis(250L)));
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Test(timeout = 30000)
    public void testDurableConsumerUnsubscribeWhileNoSubscription() throws Exception {
        Connection createConnection = createConnection();
        try {
            createConnection.start();
            Session createSession = createConnection.createSession(false, 1);
            assertTrue(org.apache.activemq.artemis.tests.util.Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.artemis.tests.integration.amqp.JMSDurableConsumerTest.6
                public boolean isSatisfied() throws Exception {
                    return JMSDurableConsumerTest.this.server.getTotalConsumerCount() == 0;
                }
            }, TimeUnit.SECONDS.toMillis(20L), TimeUnit.MILLISECONDS.toMillis(250L)));
            try {
                createSession.unsubscribe("DurbaleTopic");
                fail("Should have thrown as subscription is in use.");
            } catch (JMSException e) {
            }
        } finally {
            createConnection.close();
        }
    }

    @Test(timeout = 30000)
    public void testDurableConsumerUnsubscribeWhileActive() throws Exception {
        Connection createConnection = createConnection(getTopicName() + "-ClientId");
        try {
            createConnection.start();
            Session createSession = createConnection.createSession(false, 1);
            TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(createSession.createTopic(getTopicName()), "DurbaleTopic");
            assertNotNull(createDurableSubscriber);
            assertNull(createDurableSubscriber.receive(10L));
            try {
                createSession.unsubscribe("DurbaleTopic");
                fail("Should have thrown as subscription is in use.");
            } catch (JMSException e) {
            }
        } finally {
            createConnection.close();
        }
    }

    @Test(timeout = 30000)
    public void testDurableConsumerLarge() throws Exception {
        Connection createConnection = createConnection(getTopicName() + "-ClientId");
        try {
            Session createSession = createConnection.createSession(false, 1);
            Topic createTopic = createSession.createTopic(getTopicName());
            TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(createTopic, "DurbaleSub1");
            TopicSubscriber createDurableSubscriber2 = createSession.createDurableSubscriber(createTopic, "DurbaleSub2");
            MessageProducer createProducer = createSession.createProducer(createTopic);
            createProducer.setDeliveryMode(2);
            createConnection.start();
            ObjectMessage createObjectMessage = createSession.createObjectMessage();
            createObjectMessage.setObject(new BigObject(102400));
            createProducer.send(createObjectMessage);
            ObjectMessage receive = createDurableSubscriber.receive(5000L);
            Assert.assertNotNull(receive);
            assertTrue("Should be an instance of TextMessage", receive instanceof ObjectMessage);
            ObjectMessage receive2 = createDurableSubscriber2.receive(5000L);
            assertNotNull("Should have received a message by now.", receive2);
            assertTrue("Should be an instance of TextMessage", receive2 instanceof ObjectMessage);
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Test(timeout = 30000)
    public void testDurableConsumerWithSelectorChange() throws Exception {
        SimpleString simpleString = new SimpleString("foo.SharedConsumer");
        Connection createConnection = createConnection("foo", true);
        try {
            Session createSession = createConnection.createSession(false, 1);
            Topic createTopic = createSession.createTopic(getTopicName());
            MessageConsumer createDurableConsumer = createSession.createDurableConsumer(createTopic, "SharedConsumer", "a=b", false);
            MessageProducer createProducer = createSession.createProducer(createSession.createTopic(getTopicName()));
            Message createMessage = createSession.createMessage();
            createMessage.setStringProperty("a", "1");
            createMessage.setStringProperty("b", "1");
            createProducer.send(createMessage);
            QueueImpl bindable = this.server.getPostOffice().getBinding(simpleString).getBindable();
            assertEquals(1L, bindable.getMaxConsumers());
            Objects.requireNonNull(bindable);
            org.apache.activemq.artemis.tests.util.Wait.assertEquals(1L, bindable::getMessageCount);
            createDurableConsumer.close();
            MessageConsumer createDurableConsumer2 = createSession.createDurableConsumer(createTopic, "SharedConsumer", "a=b and b=c", false);
            QueueImpl bindable2 = this.server.getPostOffice().getBinding(simpleString).getBindable();
            assertEquals(1L, bindable2.getMaxConsumers());
            Objects.requireNonNull(bindable2);
            org.apache.activemq.artemis.tests.util.Wait.assertEquals(0L, bindable2::getMessageCount);
            Message createMessage2 = createSession.createMessage();
            createMessage2.setStringProperty("a", "2");
            createMessage2.setStringProperty("b", "2");
            createMessage2.setStringProperty("c", "2");
            createProducer.send(createMessage2);
            Objects.requireNonNull(bindable2);
            org.apache.activemq.artemis.tests.util.Wait.assertEquals(1L, bindable2::getMessageCount);
            createConnection.start();
            Assert.assertNotNull(createDurableConsumer2.receive(5000L));
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }
}
