package org.apache.activemq.artemis.tests.integration.amqp.largemessages;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.net.URI;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import javax.management.openmbean.CompositeData;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsInstanceOf;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.class */
public class SimpleStreamingLargeMessageTest extends AmqpClientTestSupport {
    private String smallFrameAcceptor = new String("tcp://localhost:5680");
    int frameSize;
    int minLargeMessageSize;

    @Parameters(name = "frameSize = {0}, minLargeMessage = {1}")
    public static Iterable<? extends Object> testParameters() {
        return Arrays.asList(new Object[]{512, 50000}, new Object[]{1048576, 50000}, new Object[]{1048576, 50000000});
    }

    public SimpleStreamingLargeMessageTest(int i, int i2) {
        this.frameSize = i;
        this.minLargeMessageSize = i2;
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    protected void addAdditionalAcceptors(ActiveMQServer activeMQServer) throws Exception {
        activeMQServer.getConfiguration().addAcceptorConfiguration("flow", this.smallFrameAcceptor + "?protocols=AMQP;useEpoll=false;maxFrameSize=" + this.frameSize + ";amqpMinLargeMessageSize=" + this.minLargeMessageSize);
    }

    @Timeout(60)
    @TestTemplate
    public void testSendNonPersistent() throws Exception {
        testSend(false, false);
    }

    @Timeout(60)
    @TestTemplate
    public void testSendPersistent() throws Exception {
        testSend(true, false);
    }

    @Timeout(60)
    @TestTemplate
    public void testSendPersistentRestartServer() throws Exception {
        testSend(true, true);
    }

    public void testSend(boolean z, boolean z2) throws Exception {
        try {
            AmqpClient createAmqpClient = createAmqpClient(new URI(this.smallFrameAcceptor));
            AmqpConnection createConnection = createAmqpClient.createConnection();
            addConnection(createConnection);
            createConnection.setMaxFrameSize(2048);
            createConnection.connect();
            AmqpSession createSession = createConnection.createSession();
            AmqpSender createSender = createSession.createSender(getQueueName());
            Queue proxyToQueue = getProxyToQueue(getQueueName());
            Assertions.assertNotNull(proxyToQueue);
            Assertions.assertEquals(0L, proxyToQueue.getMessageCount());
            createSession.begin();
            for (int i = 0; i < 10; i++) {
                AmqpMessage amqpMessage = new AmqpMessage();
                amqpMessage.setDurable(z);
                byte[] bArr = new byte[102400];
                for (int i2 = 0; i2 < bArr.length; i2++) {
                    bArr[i2] = 122;
                }
                amqpMessage.setBytes(bArr);
                createSender.send(amqpMessage);
            }
            createSession.commit();
            AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(this.server);
            if (z2) {
                createConnection.close();
                this.server.stop();
                this.server.start();
                createConnection = createAmqpClient.createConnection();
                addConnection(createConnection);
                createConnection.setMaxFrameSize(2048);
                createConnection.connect();
                createSession = createConnection.createSession();
            }
            Queue proxyToQueue2 = getProxyToQueue(getQueueName());
            Objects.requireNonNull(proxyToQueue2);
            Wait.assertEquals(10L, proxyToQueue2::getMessageCount);
            AmqpReceiver createReceiver = createSession.createReceiver(getQueueName());
            createReceiver.flow(10);
            for (int i3 = 0; i3 < 10; i3++) {
                AmqpMessage receive = createReceiver.receive(10L, TimeUnit.SECONDS);
                Assertions.assertNotNull(receive);
                byte[] array = receive.getWrappedMessage().getBody().getValue().getArray();
                for (int i4 = 0; i4 < 102400; i4++) {
                    Assertions.assertEquals((byte) 122, array[i4]);
                }
                receive.accept(true);
            }
            createReceiver.flow(1);
            Assertions.assertNull(createReceiver.receiveNoWait());
            createReceiver.close();
            createConnection.close();
            Objects.requireNonNull(proxyToQueue2);
            Wait.assertEquals(0L, proxyToQueue2::getMessageCount);
            validateNoFilesOnLargeDir();
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
    }

    @TestTemplate
    public void testSendWithPropertiesAndFilterPersistentRestart() throws Exception {
        testSendWithPropertiesAndFilter(true, true);
    }

    @TestTemplate
    public void testSendWithPropertiesAndFilterPersistentNoRestart() throws Exception {
        testSendWithPropertiesAndFilter(true, false);
    }

    @TestTemplate
    public void testSendWithPropertiesNonPersistent() throws Exception {
        testSendWithPropertiesAndFilter(false, false);
    }

    public void testSendWithPropertiesAndFilter(boolean z, boolean z2) throws Exception {
        try {
            AmqpClient createAmqpClient = createAmqpClient(new URI(this.smallFrameAcceptor));
            AmqpConnection createConnection = createAmqpClient.createConnection();
            addConnection(createConnection);
            createConnection.setMaxFrameSize(2048);
            createConnection.connect();
            AmqpSession createSession = createConnection.createSession();
            AmqpSender createSender = createSession.createSender(getQueueName());
            Queue proxyToQueue = getProxyToQueue(getQueueName());
            Assertions.assertNotNull(proxyToQueue);
            Assertions.assertEquals(0L, proxyToQueue.getMessageCount());
            createSession.begin();
            int i = 0;
            for (int i2 = 0; i2 < 10; i2++) {
                AmqpMessage amqpMessage = new AmqpMessage();
                amqpMessage.setDurable(z);
                boolean z3 = i2 % 2 == 0;
                amqpMessage.setApplicationProperty("i", Integer.valueOf(i2));
                amqpMessage.setApplicationProperty("oddString", z3 ? "odd" : "even");
                amqpMessage.setApplicationProperty("odd", Boolean.valueOf(z3));
                if (z3) {
                    int i3 = i;
                    i++;
                    amqpMessage.setApplicationProperty("oddID", Integer.valueOf(i3));
                }
                byte[] bArr = new byte[102400];
                for (int i4 = 0; i4 < bArr.length; i4++) {
                    bArr[i4] = 122;
                }
                amqpMessage.setBytes(bArr);
                createSender.send(amqpMessage);
                if (i2 == 5) {
                    createSession.commit();
                }
            }
            AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(this.server);
            if (z2) {
                createConnection.close();
                this.server.stop();
                this.server.start();
                createConnection = createAmqpClient.createConnection();
                addConnection(createConnection);
                createConnection.setMaxFrameSize(2048);
                createConnection.connect();
                createSession = createConnection.createSession();
            }
            Queue proxyToQueue2 = getProxyToQueue(getQueueName());
            Objects.requireNonNull(proxyToQueue2);
            Wait.assertEquals(10L, proxyToQueue2::getMessageCount);
            AmqpReceiver createReceiver = createSession.createReceiver(getQueueName(), "odd=true");
            createReceiver.flow(10);
            for (int i5 = 0; i5 < 5; i5++) {
                AmqpMessage receive = createReceiver.receive(10L, TimeUnit.SECONDS);
                Assertions.assertNotNull(receive);
                Assertions.assertTrue(((Boolean) receive.getApplicationProperty("odd")).booleanValue());
                Assertions.assertEquals(i5, ((Integer) receive.getApplicationProperty("oddID")).intValue());
                byte[] array = receive.getWrappedMessage().getBody().getValue().getArray();
                for (int i6 = 0; i6 < 102400; i6++) {
                    Assertions.assertEquals((byte) 122, array[i6]);
                }
                receive.accept(true);
            }
            createReceiver.flow(1);
            Assertions.assertNull(createReceiver.receiveNoWait());
            createReceiver.close();
            createConnection.close();
            validateNoFilesOnLargeDir(getLargeMessagesDir(), 5);
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
    }

    @TestTemplate
    public void testSingleMessage() throws Exception {
        try {
            AmqpClient createAmqpClient = createAmqpClient(new URI(this.smallFrameAcceptor));
            AmqpConnection createConnection = createAmqpClient.createConnection();
            addConnection(createConnection);
            createConnection.setMaxFrameSize(2048);
            createConnection.connect();
            AmqpSession createSession = createConnection.createSession();
            AmqpSender createSender = createSession.createSender(getQueueName());
            Queue proxyToQueue = getProxyToQueue(getQueueName());
            Assertions.assertNotNull(proxyToQueue);
            Assertions.assertEquals(0L, proxyToQueue.getMessageCount());
            createSession.begin();
            int i = 0;
            for (int i2 = 0; i2 < 1; i2++) {
                AmqpMessage amqpMessage = new AmqpMessage();
                amqpMessage.setDurable(true);
                boolean z = i2 % 2 == 0;
                amqpMessage.setApplicationProperty("i", Integer.valueOf(i2));
                amqpMessage.setApplicationProperty("oddString", z ? "odd" : "even");
                amqpMessage.setApplicationProperty("odd", Boolean.valueOf(z));
                if (z) {
                    int i3 = i;
                    i++;
                    amqpMessage.setApplicationProperty("oddID", Integer.valueOf(i3));
                }
                byte[] bArr = new byte[102400];
                for (int i4 = 0; i4 < bArr.length; i4++) {
                    bArr[i4] = 122;
                }
                amqpMessage.setBytes(bArr);
                createSender.send(amqpMessage);
            }
            createSession.commit();
            Queue locateQueue = this.server.locateQueue(SimpleString.of(getQueueName()));
            Objects.requireNonNull(locateQueue);
            Wait.assertEquals(1L, locateQueue::getMessageCount);
            LinkedListIterator browserIterator = locateQueue.browserIterator();
            while (browserIterator.hasNext()) {
                Message message = ((MessageReference) browserIterator.next()).getMessage();
                Assertions.assertNotNull(message);
                Assertions.assertTrue(message instanceof LargeServerMessage);
            }
            browserIterator.close();
            createConnection.close();
            this.server.stop();
            this.server.start();
            CompositeData[] browse = ManagementControlHelper.createQueueControl(locateQueue.getAddress(), locateQueue.getName(), RoutingType.ANYCAST, this.mBeanServer).browse();
            Assertions.assertEquals(1, browse.length);
            if (((Boolean) browse[0].get("largeMessage")).booleanValue()) {
                Assertions.assertTrue(browse[0].containsKey("text") || browse[0].containsKey("BodyPreview"));
            }
            AmqpConnection createConnection2 = createAmqpClient.createConnection();
            addConnection(createConnection2);
            createConnection2.setMaxFrameSize(2048);
            createConnection2.connect();
            AmqpReceiver createReceiver = createConnection2.createSession().createReceiver(getQueueName());
            createReceiver.flow(1);
            for (int i5 = 0; i5 < 1; i5++) {
                AmqpMessage receive = createReceiver.receive(10L, TimeUnit.SECONDS);
                Assertions.assertNotNull(receive);
                Assertions.assertTrue(((Boolean) receive.getApplicationProperty("odd")).booleanValue());
                Assertions.assertEquals(i5, ((Integer) receive.getApplicationProperty("oddID")).intValue());
                byte[] array = receive.getWrappedMessage().getBody().getValue().getArray();
                for (int i6 = 0; i6 < 102400; i6++) {
                    Assertions.assertEquals((byte) 122, array[i6]);
                }
                receive.accept(true);
            }
            createReceiver.flow(1);
            Assertions.assertNull(createReceiver.receiveNoWait());
            createReceiver.close();
            createConnection2.close();
            validateNoFilesOnLargeDir(getLargeMessagesDir(), 0);
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
    }

    @TestTemplate
    public void testJMSPersistentTX() throws Exception {
        jmsTest(true, true);
    }

    @TestTemplate
    public void testJMSPersistentNonTX() throws Exception {
        jmsTest(true, false);
    }

    @TestTemplate
    public void testJMSNonPersistentTX() throws Exception {
        jmsTest(false, true);
    }

    @TestTemplate
    public void testJMSNonPersistentNonTX() throws Exception {
        jmsTest(false, false);
    }

    private void jmsTest(boolean z, boolean z2) throws JMSException {
        Connection createConnection = new JmsConnectionFactory("amqp://localhost:5672").createConnection();
        try {
            Session createSession = createConnection.createSession(z2, z2 ? 0 : 1);
            try {
                createConnection.start();
                jakarta.jms.Queue createQueue = createSession.createQueue(getQueueName());
                Connection createConnection2 = new JmsConnectionFactory("amqp://localhost:5672").createConnection();
                try {
                    Session createSession2 = createConnection2.createSession(z2, z2 ? 0 : 1);
                    try {
                        MessageConsumer createConsumer = createSession2.createConsumer(createQueue);
                        try {
                            MessageProducer createProducer = createSession.createProducer(createQueue);
                            try {
                                if (z) {
                                    createProducer.setDeliveryMode(2);
                                } else {
                                    createProducer.setDeliveryMode(1);
                                }
                                createConnection2.start();
                                byte[] bArr = new byte[102400];
                                byte[] bArr2 = new byte[bArr.length];
                                ThreadLocalRandom.current().nextBytes(bArr);
                                for (int i = 0; i < 10; i++) {
                                    BytesMessage createBytesMessage = createSession.createBytesMessage();
                                    createBytesMessage.writeBytes(bArr);
                                    createProducer.send(createBytesMessage);
                                    if (z2) {
                                        createSession.commit();
                                    }
                                    BytesMessage receive = createConsumer.receive(5000L);
                                    Assertions.assertNotNull(receive, "A message should be received in 5000 ms");
                                    if (z2) {
                                        createSession2.commit();
                                    }
                                    MatcherAssert.assertThat(receive, IsInstanceOf.instanceOf(createBytesMessage.getClass()));
                                    Assertions.assertEquals(bArr.length, receive.readBytes(bArr2));
                                    Assertions.assertArrayEquals(bArr, bArr2);
                                }
                                if (createProducer != null) {
                                    createProducer.close();
                                }
                                if (createConsumer != null) {
                                    createConsumer.close();
                                }
                                if (createSession2 != null) {
                                    createSession2.close();
                                }
                                if (createConnection2 != null) {
                                    createConnection2.close();
                                }
                                if (createSession != null) {
                                    createSession.close();
                                }
                                if (createConnection != null) {
                                    createConnection.close();
                                }
                            } catch (Throwable th) {
                                if (createProducer != null) {
                                    try {
                                        createProducer.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                }
                                throw th;
                            }
                        } catch (Throwable th3) {
                            if (createConsumer != null) {
                                try {
                                    createConsumer.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            }
                            throw th3;
                        }
                    } catch (Throwable th5) {
                        if (createSession2 != null) {
                            try {
                                createSession2.close();
                            } catch (Throwable th6) {
                                th5.addSuppressed(th6);
                            }
                        }
                        throw th5;
                    }
                } catch (Throwable th7) {
                    if (createConnection2 != null) {
                        try {
                            createConnection2.close();
                        } catch (Throwable th8) {
                            th7.addSuppressed(th8);
                        }
                    }
                    throw th7;
                }
            } catch (Throwable th9) {
                if (createSession != null) {
                    try {
                        createSession.close();
                    } catch (Throwable th10) {
                        th9.addSuppressed(th10);
                    }
                }
                throw th9;
            }
        } catch (Throwable th11) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th12) {
                    th11.addSuppressed(th12);
                }
            }
            throw th11;
        }
    }
}
