package org.apache.activemq.artemis.tests.integration.mqtt;

import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.util.ByteSequence;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/mqtt/MQTTOpenwireTest.class */
public class MQTTOpenwireTest extends MQTTTestSupport {
    protected static final int NUM_MESSAGES = 1;

    @Override // org.apache.activemq.artemis.tests.integration.mqtt.MQTTTestSupport
    public void configureBroker() throws Exception {
        super.configureBroker();
        WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
        wildcardConfiguration.setDelimiter('.');
        wildcardConfiguration.setSingleWord('*');
        wildcardConfiguration.setAnyWords('>');
        this.server.getConfiguration().setWildCardConfiguration(wildcardConfiguration);
    }

    @Override // org.apache.activemq.artemis.tests.integration.mqtt.MQTTTestSupport
    public void createJMSConnection() throws Exception {
        this.cf = new ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.cacheEnabled=true");
    }

    @Test
    public void testWildcards() throws Exception {
        doTestSendJMSReceiveMQTT("foo.bar", "foo/+");
        doTestSendJMSReceiveMQTT("foo.bar", "foo/#");
        doTestSendJMSReceiveMQTT("foo.bar.har", "foo/#");
        doTestSendJMSReceiveMQTT("foo.bar.har", "foo/+/+");
        doTestSendMQTTReceiveJMS("foo/bah", "foo.*");
        doTestSendMQTTReceiveJMS("foo/bah", "foo.>");
        doTestSendMQTTReceiveJMS("foo/bah/hah", "foo.*.*");
        doTestSendMQTTReceiveJMS("foo/bah/har", "foo.>");
    }

    public void doTestSendMQTTReceiveJMS(String str, String str2) throws Exception {
        MQTTClientProvider mQTTClientProvider = getMQTTClientProvider();
        initializeConnection(mQTTClientProvider);
        ActiveMQConnection createConnection = this.cf.createConnection();
        try {
            createConnection.setUseRetroactiveConsumer(true);
            createConnection.start();
            Session createSession = createConnection.createSession(false, 1);
            MessageConsumer createConsumer = createSession.createConsumer(createSession.createTopic(str2));
            mQTTClientProvider.publish(str, "RETAINED".getBytes(), 1, true);
            ActiveMQMessage receive = createConsumer.receive(2000L);
            assertNotNull("Should get retained message " + str + "->" + str2, receive);
            ByteSequence content = receive.getContent();
            assertEquals("RETAINED", new String(content.data, content.offset, content.length));
            for (int i = 0; i < 1; i++) {
                String str3 = "Test Message: " + i;
                mQTTClientProvider.publish(str, str3.getBytes(), 1);
                ActiveMQMessage receive2 = createConsumer.receive(1000L);
                assertNotNull("Should get a message " + str + "->" + str2, receive2);
                ByteSequence content2 = receive2.getContent();
                assertEquals(str3, new String(content2.data, content2.offset, content2.length));
            }
        } finally {
            createConnection.close();
            mQTTClientProvider.disconnect();
        }
    }

    public void doTestSendJMSReceiveMQTT(String str, String str2) throws Exception {
        MQTTClientProvider mQTTClientProvider = getMQTTClientProvider();
        initializeConnection(mQTTClientProvider);
        ActiveMQConnection createConnection = this.cf.createConnection();
        try {
            createConnection.setUseRetroactiveConsumer(true);
            createConnection.start();
            Session createSession = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(createSession.createTopic(str));
            mQTTClientProvider.subscribe(str2, 0);
            TextMessage createTextMessage = createSession.createTextMessage("RETAINED");
            createTextMessage.setBooleanProperty("ActiveMQ.Retain", true);
            createTextMessage.setIntProperty("ActiveMQ.MQTT.QoS", 0);
            createProducer.send(createTextMessage);
            byte[] receive = mQTTClientProvider.receive(2000);
            assertNotNull("Should get retained message " + str + "->" + str2, receive);
            assertEquals("RETAINED", new String(receive));
            for (int i = 0; i < 1; i++) {
                String str3 = "This is Test Message: " + i;
                createProducer.send(createSession.createTextMessage(str3));
                byte[] receive2 = mQTTClientProvider.receive(1000);
                assertNotNull("Should get a message " + str + "->" + str2, receive2);
                assertEquals(str3, new String(receive2));
            }
        } finally {
            createConnection.close();
            mQTTClientProvider.disconnect();
        }
    }
}
