package org.apache.activemq.artemis.tests.integration.amqp.interop;

import jakarta.jms.Connection;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.lang.invoke.MethodHandles;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.tests.integration.amqp.JMSClientTestSupport;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsTopic;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/interop/AmqpCoreTest.class */
public class AmqpCoreTest extends JMSClientTestSupport {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/interop/AmqpCoreTest$CoreMessageHandler.class */
    private class CoreMessageHandler implements MessageHandler {
        int id;
        AtomicInteger numMsg = new AtomicInteger();
        AtomicBoolean zeroLen = new AtomicBoolean();

        CoreMessageHandler(int i) {
            this.id = i;
        }

        public void onMessage(ClientMessage clientMessage) {
            AmqpCoreTest.logger.debug("received: {}", Integer.valueOf(clientMessage.getBodySize()));
            if (clientMessage.getBodySize() == 0) {
                AmqpCoreTest.logger.debug("xxx found zero len message!");
                this.zeroLen.set(true);
            }
            AmqpCoreTest.logger.debug("[receiver {}] recieved: {}", Integer.valueOf(this.id), Integer.valueOf(this.numMsg.incrementAndGet()));
        }

        public void assertMessagesReceived(int i) throws Exception {
            AtomicInteger atomicInteger = this.numMsg;
            Objects.requireNonNull(atomicInteger);
            Wait.assertEquals(i, atomicInteger::get, 30000L);
            Assert.assertFalse(this.zeroLen.get());
        }
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    protected String getConfiguredProtocols() {
        return "AMQP,OPENWIRE,CORE";
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.JMSClientTestSupport, org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport
    public void setUp() throws Exception {
        super.setUp();
    }

    @Test(timeout = 60000)
    public void testMultipleCoreReceiving() throws Exception {
        Connection createCoreConnection = createCoreConnection();
        try {
            ClientSession coreSession = createCoreConnection.createSession(false, 1).getCoreSession();
            coreSession.createQueue(new QueueConfiguration("exampleQueue1").setAddress("exampleQueueAddress"));
            coreSession.createQueue(new QueueConfiguration("exampleQueue2").setAddress("exampleQueueAddress"));
            coreSession.createQueue(new QueueConfiguration("exampleQueue3").setAddress("exampleQueueAddress"));
            ClientConsumer createConsumer = coreSession.createConsumer("exampleQueue1");
            CoreMessageHandler coreMessageHandler = new CoreMessageHandler(1);
            createConsumer.setMessageHandler(coreMessageHandler);
            ClientConsumer createConsumer2 = coreSession.createConsumer("exampleQueue2");
            CoreMessageHandler coreMessageHandler2 = new CoreMessageHandler(2);
            createConsumer2.setMessageHandler(coreMessageHandler2);
            CoreMessageHandler coreMessageHandler3 = new CoreMessageHandler(3);
            coreSession.createConsumer("exampleQueue3").setMessageHandler(coreMessageHandler3);
            sendAmqpMessages("exampleQueueAddress", 100);
            coreMessageHandler.assertMessagesReceived(100);
            coreMessageHandler2.assertMessagesReceived(100);
            coreMessageHandler3.assertMessagesReceived(100);
            createCoreConnection.close();
        } catch (Throwable th) {
            createCoreConnection.close();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testAmqpFailedConversionFromCore() throws Exception {
        SimpleString randomSimpleString = RandomUtil.randomSimpleString();
        Connection createCoreConnection = createCoreConnection();
        Connection createConnection = new JmsConnectionFactory("amqp://127.0.0.1:5672").createConnection();
        try {
            createConnection.start();
            Session createSession = createConnection.createSession(false, 1);
            MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(getQueueName()));
            ClientSession coreSession = createCoreConnection.createSession(false, 1).getCoreSession();
            ClientProducer createProducer = coreSession.createProducer(getQueueName());
            ClientMessage createMessage = coreSession.createMessage(true);
            createMessage.setType((byte) 3);
            createMessage.getBodyBuffer().writeBytes(randomSimpleString.getData());
            createProducer.send(createMessage);
            Wait.assertEquals(1L, () -> {
                return this.server.locateQueue(getDeadLetterAddress()).getMessageCount();
            }, 2000L, 100L);
            ClientMessage createMessage2 = coreSession.createMessage(true);
            createMessage2.setType((byte) 3);
            createMessage2.getBodyBuffer().writeNullableSimpleString(randomSimpleString);
            createProducer.send(createMessage2);
            assertNotNull(createConsumer.receive(500L));
            createCoreConnection.close();
            createConnection.close();
        } catch (Throwable th) {
            createCoreConnection.close();
            createConnection.close();
            throw th;
        }
    }

    private void sendAmqpMessages(String str, int i) throws Exception {
        Connection createConnection = new JmsConnectionFactory("amqp://127.0.0.1:5672").createConnection();
        try {
            createConnection.start();
            Session createSession = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(new JmsTopic(str));
            for (int i2 = 0; i2 < i; i2++) {
                createProducer.send(createSession.createTextMessage("hello"));
            }
        } finally {
            createConnection.close();
        }
    }
}
