package org.apache.activemq.artemis.tests.integration.amqp;

import java.util.Map;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/ProtonPubSubTest.class */
public class ProtonPubSubTest extends ProtonTestBase {
    private final String prefix = "foo.bar.";
    private final String pubAddress = "pubAddress";
    private final String prefixedPubAddress = "foo.bar.pubAddress";
    private final SimpleString ssPubAddress = new SimpleString("pubAddress");
    private final SimpleString ssprefixedPubAddress = new SimpleString("foo.bar.pubAddress");
    private Connection connection;
    private JmsConnectionFactory factory;

    @Override // org.apache.activemq.artemis.tests.integration.amqp.ProtonTestBase
    protected void configureAmqp(Map<String, Object> map) {
        map.put("pubSubPrefix", "foo.bar.");
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.ProtonTestBase
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.server.createQueue(this.ssPubAddress, this.ssPubAddress, new SimpleString("foo=bar"), false, true);
        this.server.createQueue(this.ssprefixedPubAddress, this.ssprefixedPubAddress, new SimpleString("foo=bar"), false, true);
        this.factory = new JmsConnectionFactory("amqp://localhost:5672");
        this.factory.setClientID("myClientID");
        this.connection = this.factory.createConnection();
        this.connection.setExceptionListener(new ExceptionListener() { // from class: org.apache.activemq.artemis.tests.integration.amqp.ProtonPubSubTest.1
            public void onException(JMSException jMSException) {
                jMSException.printStackTrace();
            }
        });
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.ProtonTestBase
    @After
    public void tearDown() throws Exception {
        try {
            Thread.sleep(250L);
            if (this.connection != null) {
                this.connection.close();
            }
        } finally {
            super.tearDown();
        }
    }

    @Test
    public void testNonDurablePubSub() throws Exception {
        Topic createTopic = createTopic("pubAddress");
        TopicSubscriber createSubscriber = this.connection.createTopicSession(false, 1).createSubscriber(createTopic);
        Session createSession = this.connection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createTopic);
        this.connection.start();
        for (int i = 0; i < 100; i++) {
            createProducer.send(createSession.createTextMessage("message:" + i));
        }
        for (int i2 = 0; i2 < 100; i2++) {
            TextMessage receive = createSubscriber.receive(5000L);
            Assert.assertNotNull(receive);
            Assert.assertEquals(receive.getText(), "message:" + i2);
        }
    }

    @Test
    public void testNonDurableMultiplePubSub() throws Exception {
        Topic createTopic = createTopic("pubAddress");
        TopicSession createTopicSession = this.connection.createTopicSession(false, 1);
        TopicSubscriber createSubscriber = createTopicSession.createSubscriber(createTopic);
        TopicSubscriber createSubscriber2 = createTopicSession.createSubscriber(createTopic);
        TopicSubscriber createSubscriber3 = createTopicSession.createSubscriber(createTopic);
        Session createSession = this.connection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createTopic);
        this.connection.start();
        for (int i = 0; i < 100; i++) {
            createProducer.send(createSession.createTextMessage("message:" + i));
        }
        for (int i2 = 0; i2 < 100; i2++) {
            TextMessage receive = createSubscriber.receive(5000L);
            Assert.assertNotNull(receive);
            Assert.assertEquals(receive.getText(), "message:" + i2);
            TextMessage receive2 = createSubscriber2.receive(5000L);
            Assert.assertNotNull(receive2);
            Assert.assertEquals(receive2.getText(), "message:" + i2);
            TextMessage receive3 = createSubscriber3.receive(5000L);
            Assert.assertNotNull(receive3);
            Assert.assertEquals(receive3.getText(), "message:" + i2);
        }
    }

    @Test
    public void testDurablePubSub() throws Exception {
        Topic createTopic = createTopic("pubAddress");
        TopicSubscriber createDurableSubscriber = this.connection.createSession(false, 1).createDurableSubscriber(createTopic, "myPubId");
        Session createSession = this.connection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createTopic);
        this.connection.start();
        for (int i = 0; i < 100; i++) {
            createProducer.send(createSession.createTextMessage("message:" + i));
        }
        for (int i2 = 0; i2 < 100; i2++) {
            TextMessage receive = createDurableSubscriber.receive(5000L);
            Assert.assertNotNull(receive);
            Assert.assertEquals(receive.getText(), "message:" + i2);
        }
    }

    @Test
    public void testDurableMultiplePubSub() throws Exception {
        Topic createTopic = createTopic("pubAddress");
        Session createSession = this.connection.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(createTopic, "myPubId");
        TopicSubscriber createDurableSubscriber2 = createSession.createDurableSubscriber(createTopic, "myPubId2");
        TopicSubscriber createDurableSubscriber3 = createSession.createDurableSubscriber(createTopic, "myPubId3");
        Session createSession2 = this.connection.createSession(false, 1);
        MessageProducer createProducer = createSession2.createProducer(createTopic);
        this.connection.start();
        for (int i = 0; i < 100; i++) {
            createProducer.send(createSession2.createTextMessage("message:" + i));
        }
        for (int i2 = 0; i2 < 100; i2++) {
            TextMessage receive = createDurableSubscriber.receive(5000L);
            Assert.assertNotNull(receive);
            Assert.assertEquals(receive.getText(), "message:" + i2);
            TextMessage receive2 = createDurableSubscriber2.receive(5000L);
            Assert.assertNotNull(receive2);
            Assert.assertEquals(receive2.getText(), "message:" + i2);
            TextMessage receive3 = createDurableSubscriber3.receive(5000L);
            Assert.assertNotNull(receive3);
            Assert.assertEquals(receive3.getText(), "message:" + i2);
        }
    }

    @Test
    public void testDurablePubSubReconnect() throws Exception {
        Topic createTopic = createTopic("pubAddress");
        TopicSubscriber createDurableSubscriber = this.connection.createSession(false, 1).createDurableSubscriber(createTopic, "myPubId");
        Session createSession = this.connection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(createTopic);
        this.connection.start();
        for (int i = 0; i < 100; i++) {
            createProducer.send(createSession.createTextMessage("message:" + i));
        }
        for (int i2 = 0; i2 < 100; i2++) {
            TextMessage receive = createDurableSubscriber.receive(5000L);
            Assert.assertNotNull(receive);
            Assert.assertEquals(receive.getText(), "message:" + i2);
        }
        this.connection.close();
        this.connection = this.factory.createConnection();
        this.connection.setExceptionListener(new ExceptionListener() { // from class: org.apache.activemq.artemis.tests.integration.amqp.ProtonPubSubTest.2
            public void onException(JMSException jMSException) {
                jMSException.printStackTrace();
            }
        });
        TopicSubscriber createDurableSubscriber2 = this.connection.createSession(false, 1).createDurableSubscriber(createTopic, "myPubId");
        Session createSession2 = this.connection.createSession(false, 1);
        MessageProducer createProducer2 = createSession2.createProducer(createTopic);
        this.connection.start();
        for (int i3 = 0; i3 < 100; i3++) {
            createProducer2.send(createSession2.createTextMessage("message:" + i3));
        }
        for (int i4 = 0; i4 < 100; i4++) {
            TextMessage receive2 = createDurableSubscriber2.receive(5000L);
            Assert.assertNotNull(receive2);
            Assert.assertEquals(receive2.getText(), "message:" + i4);
        }
    }

    @Test
    public void testDurablePubSubUnsubscribe() throws Exception {
        Topic createTopic = createTopic("pubAddress");
        Session createSession = this.connection.createSession(false, 1);
        TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(createTopic, "myPubId");
        Session createSession2 = this.connection.createSession(false, 1);
        MessageProducer createProducer = createSession2.createProducer(createTopic);
        this.connection.start();
        for (int i = 0; i < 100; i++) {
            createProducer.send(createSession2.createTextMessage("message:" + i));
        }
        for (int i2 = 0; i2 < 100; i2++) {
            TextMessage receive = createDurableSubscriber.receive(5000L);
            Assert.assertNotNull(receive);
            Assert.assertEquals(receive.getText(), "message:" + i2);
        }
        createDurableSubscriber.close();
        createSession.unsubscribe("myPubId");
    }

    private Topic createTopic(String str) throws Exception {
        Session createSession = this.connection.createSession(false, 1);
        try {
            Topic createTopic = createSession.createTopic(str);
            createSession.close();
            return createTopic;
        } catch (Throwable th) {
            createSession.close();
            throw th;
        }
    }
}
