package org.apache.activemq.artemis.tests.integration.amqp;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.ObjectMessage;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.message.LargeBodyReader;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
import org.apache.activemq.artemis.tests.integration.client.AutoCreateJmsDestinationTest;
import org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.class */
public class AmqpLargeMessageTest extends AmqpClientTestSupport {
    protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final Random rand = new Random(System.currentTimeMillis());

    @Parameterized.Parameter(0)
    public int frameSize = 32767;

    @Parameterized.Parameter(1)
    public int payload = 112640;

    @Parameterized.Parameter(2)
    public int amqpMinLargeMessageSize = 102400;

    @Parameterized.Parameter(NodeManagerAction.PAUSE_LIVE)
    public boolean jdbc = false;
    String testQueueName = "ConnectionFrameSize";

    @Parameterized.Parameters(name = "frameSize={0}, payload={1}, amqpMinLargeMessageSize={2}, jdbc={3}")
    public static Collection<Object[]> parameters() {
        return Arrays.asList(new Object[]{32767, 112640, 102400, false}, new Object[]{204800, 1126400, 450560, false}, new Object[]{10240, 102400, 20480, true});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    public void addConfiguration(ActiveMQServer activeMQServer) {
        activeMQServer.getConfiguration().setJournalFileSize(5242880);
        if (this.jdbc) {
            setDBStoreType(activeMQServer.getConfiguration());
        }
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    protected void configureAMQPAcceptorParameters(Map<String, Object> map) {
        map.put("maxFrameSize", Integer.valueOf(this.frameSize));
        map.put("amqpMinLargeMessageSize", Integer.valueOf(this.amqpMinLargeMessageSize));
    }

    @Override // org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport
    protected void addAdditionalAcceptors(ActiveMQServer activeMQServer) throws Exception {
        activeMQServer.getConfiguration().addAcceptorConfiguration("tcp", "tcp://localhost:61616");
    }

    @Test(timeout = 60000)
    public void testSendAMQPReceiveCore() throws Exception {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        try {
            sendMessages(200, addConnection);
            assertEquals(200, getMessageCount(this.server.getPostOffice(), this.testQueueName));
            receiveJMS(200, new ActiveMQConnectionFactory());
            addConnection.close();
        } catch (Throwable th) {
            addConnection.close();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testSendAndGetData() throws Exception {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        try {
            sendMessages(1, addConnection);
            assertEquals(1, getMessageCount(this.server.getPostOffice(), this.testQueueName));
            this.server.locateQueue(this.testQueueName).forEach(messageReference -> {
                try {
                    AMQPLargeMessage message = messageReference.getMessage();
                    Assert.assertFalse(message.hasScheduledDeliveryTime());
                    ReadableBuffer data = message.getData();
                    LargeBodyReader largeBodyReader = message.getLargeBodyReader();
                    try {
                        Assert.assertEquals(largeBodyReader.getSize(), data.remaining());
                        largeBodyReader.open();
                        ByteBuffer allocate = ByteBuffer.allocate(data.remaining());
                        largeBodyReader.readInto(allocate);
                        ByteUtil.equals(allocate.array(), data.array());
                        largeBodyReader.close();
                    } catch (Throwable th) {
                        largeBodyReader.close();
                        throw th;
                    }
                } catch (AssertionError e) {
                    throw e;
                } catch (Throwable th2) {
                    throw new RuntimeException(th2.getMessage(), th2);
                }
            });
            addConnection.close();
        } catch (Throwable th) {
            addConnection.close();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testSendAMQPMessageWithComplexAnnotationsReceiveCore() throws Exception {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        try {
            addConnection.connect();
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            linkedHashMap.put("test-key-1", "value-1");
            linkedHashMap.put("test-key-2", "value-2");
            linkedHashMap.put("test-key-3", "value-3");
            AmqpSession createSession = addConnection.createSession();
            AmqpSender createSender = createSession.createSender(this.testQueueName);
            AmqpMessage createAmqpMessage = createAmqpMessage((byte) 65, this.payload);
            createAmqpMessage.setApplicationProperty("IntProperty", 42);
            createAmqpMessage.setDurable(true);
            createAmqpMessage.setMessageAnnotation("x-opt-embedded-map", linkedHashMap);
            createSender.send(createAmqpMessage);
            createSession.close();
            Wait.assertEquals(1, () -> {
                return getMessageCount(this.server.getPostOffice(), this.testQueueName);
            });
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
            try {
                Connection createConnection = activeMQConnectionFactory.createConnection();
                try {
                    Session createSession2 = createConnection.createSession(false, 1);
                    createConnection.start();
                    Assert.assertNotNull(createSession2.createConsumer(createSession2.createQueue(this.testQueueName)).receive(5000L));
                    Assert.assertEquals(42L, r0.getIntProperty("IntProperty"));
                    createConnection.close();
                    if (createConnection != null) {
                        createConnection.close();
                    }
                    activeMQConnectionFactory.close();
                } catch (Throwable th) {
                    if (createConnection != null) {
                        try {
                            createConnection.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } finally {
            addConnection.close();
        }
    }

    @Test(timeout = 60000)
    public void testSendAMQPReceiveOpenWire() throws Exception {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        try {
            sendMessages(200, addConnection);
            assertEquals(200, getMessageCount(this.server.getPostOffice(), this.testQueueName));
            receiveJMS(200, new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616"));
            addConnection.close();
        } catch (Throwable th) {
            addConnection.close();
            throw th;
        }
    }

    private void sendMessages(int i, AmqpConnection amqpConnection) throws Exception {
        amqpConnection.connect();
        AmqpSession createSession = amqpConnection.createSession();
        AmqpSender createSender = createSession.createSender(this.testQueueName);
        for (int i2 = 0; i2 < i; i2++) {
            AmqpMessage createAmqpMessage = createAmqpMessage((byte) 65, this.payload);
            createAmqpMessage.setApplicationProperty("i", Integer.valueOf(i2));
            createAmqpMessage.setDurable(true);
            createSender.send(createAmqpMessage);
        }
        createSession.close();
    }

    private void receiveJMS(int i, ConnectionFactory connectionFactory) throws JMSException {
        Connection createConnection = connectionFactory.createConnection();
        Session createSession = createConnection.createSession(false, 1);
        createConnection.start();
        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(this.testQueueName));
        for (int i2 = 0; i2 < i; i2++) {
            Assert.assertNotNull(createConsumer.receive(5000L));
            Assert.assertEquals(i2, r0.getIntProperty("i"));
        }
        createConnection.close();
    }

    @Test(timeout = 60000)
    public void testSendAMQPReceiveAMQP() throws Exception {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        try {
            sendMessages(200, addConnection);
            assertEquals(200, getMessageCount(this.server.getPostOffice(), "ConnectionFrameSize"));
            AmqpSession createSession = addConnection.createSession();
            AmqpReceiver createReceiver = createSession.createReceiver("ConnectionFrameSize");
            createReceiver.flow(200);
            for (int i = 0; i < 200; i++) {
                AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
                assertNotNull("failed at " + i, receive);
                MessageImpl wrappedMessage = receive.getWrappedMessage();
                if (wrappedMessage.getBody() instanceof Data) {
                    logger.debug("received : message: {}", Integer.valueOf(wrappedMessage.getBody().getValue().getLength()));
                    assertEquals(this.payload, r0.getValue().getLength());
                }
                receive.accept();
            }
            createSession.close();
            addConnection.close();
        } catch (Throwable th) {
            addConnection.close();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testSendAMQPMessageWithComplexAnnotationsReceiveAMQP() throws Exception {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
        String str = "ConnectionFrameSize";
        AmqpClient createAmqpClient = createAmqpClient();
        Symbol valueOf = Symbol.valueOf("x-opt-embedded-map");
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("test-key-1", "value-1");
        linkedHashMap.put("test-key-2", "value-2");
        linkedHashMap.put("test-key-3", "value-3");
        AmqpConnection addConnection = addConnection(createAmqpClient.connect());
        AmqpSession createSession = addConnection.createSession();
        AmqpSender createSender = createSession.createSender("ConnectionFrameSize");
        AmqpMessage createAmqpMessage = createAmqpMessage((byte) 65, this.payload);
        createAmqpMessage.setApplicationProperty("IntProperty", 42);
        createAmqpMessage.setDurable(true);
        createAmqpMessage.setMessageAnnotation(valueOf.toString(), linkedHashMap);
        createSender.send(createAmqpMessage);
        createSession.close();
        addConnection.close();
        Wait.assertEquals(1, () -> {
            return getMessageCount(this.server.getPostOffice(), str);
        });
        AmqpConnection addConnection2 = addConnection(createAmqpClient.connect());
        AmqpSession createSession2 = addConnection2.createSession();
        AmqpReceiver createReceiver = createSession2.createReceiver("ConnectionFrameSize");
        createReceiver.flow(200);
        AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
        assertNotNull("Failed to read message with embedded map in annotations", receive);
        MessageImpl wrappedMessage = receive.getWrappedMessage();
        if (wrappedMessage.getBody() instanceof Data) {
            logger.debug("received : message: {}", Integer.valueOf(wrappedMessage.getBody().getValue().getLength()));
            assertEquals(this.payload, r0.getValue().getLength());
        }
        assertNotNull(receive.getWrappedMessage().getMessageAnnotations());
        assertNotNull(receive.getWrappedMessage().getMessageAnnotations().getValue());
        assertEquals(linkedHashMap, receive.getWrappedMessage().getMessageAnnotations().getValue().get(valueOf));
        receive.accept();
        createSession2.close();
        addConnection2.close();
    }

    @Test(timeout = 60000)
    public void testHugeString() throws Exception {
        Connection createConnection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672").createConnection();
        Session createSession = createConnection.createSession(true, 0);
        Queue createQueue = createSession.createQueue(getQueueName());
        MessageProducer createProducer = createSession.createProducer(createQueue);
        StringBuilder sb = new StringBuilder();
        char c = 1000;
        while (true) {
            char c2 = c;
            if (c2 >= 11000) {
                break;
            }
            sb.append(c2);
            c = (char) (c2 + 1);
        }
        String sb2 = sb.toString();
        StringBuilder sb3 = new StringBuilder();
        while (sb3.length() < 1048576) {
            sb3.append("hello " + sb2);
        }
        createProducer.send(createSession.createTextMessage(sb3.toString()));
        createSession.commit();
        createConnection.start();
        TextMessage receive = createSession.createConsumer(createQueue).receive(50000L);
        Assert.assertNotNull(receive);
        createSession.commit();
        Assert.assertEquals(sb3.toString(), receive.getText());
    }

    @Test(timeout = 60000)
    public void testSendAMQPReceiveAMQPViaJMSObjectMessage() throws Exception {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
        JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory("amqp://localhost:61616");
        sendObjectMessages(1, new JmsConnectionFactory("amqp://localhost:61616"));
        assertEquals(1, getMessageCount(this.server.getPostOffice(), "ConnectionFrameSize"));
        receiveJMS(1, jmsConnectionFactory);
    }

    @Test(timeout = 60000)
    public void testSendAMQPReceiveAMQPViaJMSText() throws Exception {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
        JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory("amqp://localhost:61616");
        sendTextMessages(1, new JmsConnectionFactory("amqp://localhost:61616"));
        assertEquals(1, getMessageCount(this.server.getPostOffice(), "ConnectionFrameSize"));
        receiveJMS(1, jmsConnectionFactory);
    }

    @Test(timeout = 60000)
    public void testSendAMQPReceiveAMQPViaJMSBytes() throws Exception {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
        JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory("amqp://localhost:61616");
        sendBytesMessages(1, new JmsConnectionFactory("amqp://localhost:61616"));
        assertEquals(1, getMessageCount(this.server.getPostOffice(), "ConnectionFrameSize"));
        receiveJMS(1, jmsConnectionFactory);
    }

    private byte[] createLargePayload(int i) {
        byte[] bArr = new byte[i];
        for (int i2 = 0; i2 < i; i2++) {
            bArr[i2] = (byte) this.rand.nextInt(256);
        }
        logger.debug("Created buffer with size : {} bytes", Integer.valueOf(i));
        return bArr;
    }

    @Test(timeout = 60000)
    public void testSendHugeHeader() throws Exception {
        Assume.assumeFalse(this.jdbc);
        doTestSendHugeHeader(this.payload);
    }

    @Test(timeout = 60000)
    public void testSendLargeMessageWithHugeHeader() throws Exception {
        Assume.assumeFalse(this.jdbc);
        doTestSendHugeHeader(1048576);
    }

    public void doTestSendHugeHeader(int i) throws Exception {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        try {
            addConnection.connect();
            AmqpSession createSession = addConnection.createSession();
            AmqpSender createSender = createSession.createSender(this.testQueueName);
            AmqpMessage createAmqpMessage = createAmqpMessage((byte) 65, i);
            StringBuffer stringBuffer = new StringBuffer();
            for (int i2 = 0; i2 < 524288; i2++) {
                stringBuffer.append(" ");
            }
            createAmqpMessage.setApplicationProperty("str", stringBuffer.toString());
            createAmqpMessage.setDurable(true);
            try {
                createSender.send(createAmqpMessage);
                fail();
            } catch (IOException e) {
                Assert.assertTrue(e.getCause() instanceof JMSException);
                Assert.assertTrue(e.getMessage().contains("AMQ149005"));
            }
            createSession.close();
            addConnection.close();
        } catch (Throwable th) {
            addConnection.close();
            throw th;
        }
    }

    @Test
    public void testLargeHeaderTXLargeBody() throws Exception {
        Assume.assumeFalse(this.jdbc);
        testLargeHeaderTX(true);
    }

    @Test
    public void testLargeHeaderTXSmallBody() throws Exception {
        Assume.assumeFalse(this.jdbc);
        testLargeHeaderTX(false);
    }

    private void testLargeHeaderTX(boolean z) throws Exception {
        String randomString = RandomUtil.randomString();
        this.server.createQueue(new QueueConfiguration(randomString).setRoutingType(RoutingType.ANYCAST));
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672");
        StringBuffer stringBuffer = new StringBuffer();
        while (stringBuffer.length() < 1048576) {
            stringBuffer.append("This is a large string ");
        }
        String stringBuffer2 = stringBuffer.toString();
        String str = z ? stringBuffer2 : "small string";
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Session createSession = createConnection.createSession(true, 0);
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue(randomString));
            createProducer.setDeliveryMode(2);
            TextMessage createTextMessage = createSession.createTextMessage(str);
            createTextMessage.setStringProperty(AutoCreateJmsDestinationTest.QUEUE_NAME, stringBuffer2);
            boolean z2 = false;
            try {
                createProducer.send(createTextMessage);
                createSession.commit();
            } catch (Exception e) {
                z2 = true;
            }
            Assert.assertTrue(z2);
            if (createConnection != null) {
                createConnection.close();
            }
            createConnection = createConnectionFactory.createConnection();
            try {
                Session createSession2 = createConnection.createSession(true, 0);
                MessageProducer createProducer2 = createSession2.createProducer(createSession2.createQueue(randomString));
                TextMessage createTextMessage2 = createSession2.createTextMessage(str);
                createTextMessage2.setStringProperty(AutoCreateJmsDestinationTest.QUEUE_NAME, "small string");
                createProducer2.send(createTextMessage2);
                createSession2.commit();
                createConnection.start();
                MessageConsumer createConsumer = createSession2.createConsumer(createSession2.createQueue(randomString));
                TextMessage receive = createConsumer.receive(5000L);
                Assert.assertEquals("small string", receive.getStringProperty(AutoCreateJmsDestinationTest.QUEUE_NAME));
                Assert.assertEquals(str, receive.getText());
                createSession2.commit();
                Assert.assertNull(createConsumer.receiveNoWait());
                if (createConnection != null) {
                    createConnection.close();
                }
                org.apache.activemq.artemis.core.server.Queue locateQueue = this.server.locateQueue(randomString);
                Objects.requireNonNull(locateQueue);
                Wait.assertEquals(0L, locateQueue::getMessageCount);
                File[] listFiles = this.server.getConfiguration().getLargeMessagesLocation().listFiles();
                Assert.assertTrue(listFiles == null ? "Null Files" : "There are " + listFiles.length + " files in the large message folder", listFiles == null || listFiles.length == 0);
            } finally {
            }
        } finally {
        }
    }

    @Test(timeout = 60000)
    public void testSendSmallerMessages() throws Exception {
        for (int i = 512; i <= 8192; i += 512) {
            doTestSendLargeMessage(i);
        }
    }

    @Test(timeout = 120000)
    public void testSendFixedSizedMessages() throws Exception {
        doTestSendLargeMessage(65536);
        doTestSendLargeMessage(131072);
        doTestSendLargeMessage(262144);
    }

    @Test(timeout = 120000)
    public void testSend1MBMessage() throws Exception {
        doTestSendLargeMessage(1048576);
    }

    @Test(timeout = 120000)
    @Ignore("Useful for performance testing")
    public void testSend10MBMessage() throws Exception {
        doTestSendLargeMessage(10485760);
    }

    @Test(timeout = 120000)
    @Ignore("Useful for performance testing")
    public void testSend100MBMessage() throws Exception {
        doTestSendLargeMessage(104857600);
    }

    public void doTestSendLargeMessage(int i) throws Exception {
        logger.debug("doTestSendLargeMessage called with expectedSize {}", Integer.valueOf(i));
        byte[] createLargePayload = createLargePayload(i);
        assertEquals(i, createLargePayload.length);
        Connection createConnection = new JmsConnectionFactory("amqp://localhost:61616").createConnection();
        try {
            long currentTimeMillis = System.currentTimeMillis();
            Session createSession = createConnection.createSession(false, 1);
            Queue createQueue = createSession.createQueue(this.name.getMethodName());
            MessageProducer createProducer = createSession.createProducer(createQueue);
            BytesMessage createBytesMessage = createSession.createBytesMessage();
            createBytesMessage.writeBytes(createLargePayload);
            createProducer.setDeliveryMode(1);
            createProducer.setPriority(4);
            createProducer.send(createBytesMessage);
            logger.debug("Returned from send after {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            long currentTimeMillis2 = System.currentTimeMillis();
            MessageConsumer createConsumer = createSession.createConsumer(createQueue);
            createConnection.start();
            logger.debug("Calling receive");
            BytesMessage receive = createConsumer.receive();
            assertNotNull(receive);
            assertTrue(receive instanceof BytesMessage);
            assertNotNull(receive);
            logger.debug("Returned from receive after {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis2));
            assertEquals(i, r0.readBytes(r0, i));
            assertTrue(Arrays.equals(createLargePayload, new byte[i]));
            createConnection.close();
            if (createConnection != null) {
                createConnection.close();
            }
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testReceiveRedeliveredLargeMessagesWithSessionFlowControl() throws Exception {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
        int i = this.frameSize;
        byte[] createLargePayload = createLargePayload(2000000);
        assertEquals(2000000, createLargePayload.length);
        AmqpConnection createConnection = createAmqpClient().createConnection();
        createConnection.setMaxFrameSize(i);
        createConnection.setSessionIncomingCapacity(2500000);
        createConnection.connect();
        addConnection(createConnection);
        try {
            String testName = getTestName();
            AmqpSession createSession = createConnection.createSession();
            AmqpSender createSender = createSession.createSender(testName);
            for (int i2 = 0; i2 < 10; i2++) {
                AmqpMessage amqpMessage = new AmqpMessage();
                amqpMessage.setBytes(createLargePayload);
                createSender.send(amqpMessage);
            }
            Wait.assertEquals(10, () -> {
                return getMessageCount(this.server.getPostOffice(), testName);
            }, 5000L, 10L);
            AmqpReceiver createReceiver = createSession.createReceiver(testName);
            createReceiver.flow(10);
            ArrayList arrayList = new ArrayList();
            for (int i3 = 0; i3 < 10; i3++) {
                AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
                assertNotNull("failed at " + i3, receive);
                arrayList.add(receive);
            }
            for (int i4 = 0; i4 < 10; i4++) {
                ((AmqpMessage) arrayList.get(i4)).modified(true, false);
            }
            createReceiver.close();
            AmqpReceiver createReceiver2 = createSession.createReceiver(testName);
            createReceiver2.flow(10);
            for (int i5 = 0; i5 < 10; i5++) {
                AmqpMessage receive2 = createReceiver2.receive(5L, TimeUnit.SECONDS);
                validateMessage(createLargePayload, i5, receive2);
                receive2.accept();
            }
            createSession.close();
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testReceiveLargeMessagesMultiplexedOnSameSession() throws Exception {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
        int i = this.frameSize;
        int i2 = this.frameSize * 4;
        int i3 = i / 2;
        int i4 = i2 + i;
        byte[] createLargePayload = createLargePayload(i2);
        assertEquals(i2, createLargePayload.length);
        byte[] createLargePayload2 = createLargePayload(i3);
        assertEquals(i3, createLargePayload2.length);
        String str = getTestName() + "A";
        String str2 = getTestName() + "B";
        AmqpConnection createConnection = createAmqpClient().createConnection();
        createConnection.setMaxFrameSize(i);
        createConnection.setSessionIncomingCapacity(i4);
        createConnection.connect();
        addConnection(createConnection);
        try {
            AmqpSession createSession = createConnection.createSession();
            AmqpSender createSender = createSession.createSender(str);
            AmqpSender createSender2 = createSession.createSender(str2);
            for (int i5 = 0; i5 < 10; i5++) {
                AmqpMessage amqpMessage = new AmqpMessage();
                amqpMessage.setBytes(createLargePayload);
                createSender.send(amqpMessage);
                AmqpMessage amqpMessage2 = new AmqpMessage();
                amqpMessage2.setBytes(createLargePayload2);
                createSender2.send(amqpMessage2);
            }
            Wait.assertEquals(10, () -> {
                return getMessageCount(this.server.getPostOffice(), str);
            }, 5000L, 10L);
            Wait.assertEquals(10, () -> {
                return getMessageCount(this.server.getPostOffice(), str2);
            }, 5000L, 10L);
            AmqpReceiver createReceiver = createSession.createReceiver(str);
            AmqpReceiver createReceiver2 = createSession.createReceiver(str2);
            createReceiver.flow(10 / 2, true);
            createReceiver2.flow(10 / 2);
            createReceiver.flow(10 / 2, true);
            createReceiver2.flow(10 / 2);
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            long nanoTime = System.nanoTime();
            boolean z = true;
            while (true) {
                if (!z) {
                    break;
                }
                if (arrayList.size() < 10) {
                    logger.debug("Attempting to receive message for receiver A");
                    AmqpMessage receive = createReceiver.receive(20L, TimeUnit.MILLISECONDS);
                    if (receive != null) {
                        logger.debug("Got message for receiver A");
                        arrayList.add(receive);
                        receive.accept();
                    }
                }
                if (arrayList2.size() < 10) {
                    logger.debug("Attempting to receive message for receiver B");
                    AmqpMessage receive2 = createReceiver2.receive(20L, TimeUnit.MILLISECONDS);
                    if (receive2 != null) {
                        logger.debug("Got message for receiver B");
                        arrayList2.add(receive2);
                        receive2.accept();
                    }
                }
                if (arrayList.size() == 10 && arrayList2.size() == 10) {
                    logger.debug("Received expected messages");
                    break;
                }
                z = System.nanoTime() - nanoTime < TimeUnit.MILLISECONDS.toNanos(6000L);
            }
            assertTrue("Failed to receive all messages in expected time: A=" + arrayList.size() + ", B=" + arrayList2.size(), z);
            assertNull("Unexpected additional message present for A", createReceiver.receiveNoWait());
            assertNull("Unexpected additional message present for B", createReceiver2.receiveNoWait());
            for (int i6 = 0; i6 < 10; i6++) {
                validateMessage(createLargePayload, i6, (AmqpMessage) arrayList.get(i6));
                validateMessage(createLargePayload2, i6, (AmqpMessage) arrayList2.get(i6));
            }
            createReceiver.close();
            createReceiver2.close();
            createSession.close();
            createConnection.close();
        } catch (Throwable th) {
            createConnection.close();
            throw th;
        }
    }

    private void validateMessage(byte[] bArr, int i, AmqpMessage amqpMessage) {
        assertNotNull("failed at " + i, amqpMessage);
        Data body = amqpMessage.getWrappedMessage().getBody();
        assertNotNull("No message body for msg " + i, body);
        assertTrue("Unexpected message body type for msg " + body.getClass(), body instanceof Data);
        assertEquals("Unexpected body content for msg", new Binary(bArr, 0, bArr.length), body.getValue());
    }

    @Test(timeout = 60000)
    public void testMessageWithAmqpValueAndEmptyBinaryPreservesBody() throws Exception {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        try {
            AmqpSession createSession = addConnection.createSession();
            AmqpSender createSender = createSession.createSender(getTestName());
            AmqpMessage createAmqpLargeMessageWithNoBody = createAmqpLargeMessageWithNoBody();
            createAmqpLargeMessageWithNoBody.getWrappedMessage().setBody(new AmqpValue(new Binary(new byte[0])));
            createSender.send(createAmqpLargeMessageWithNoBody);
            createSender.close();
            AmqpReceiver createReceiver = createSession.createReceiver(getTestName());
            createReceiver.flow(1);
            AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
            assertNotNull("failed to read large AMQP message", receive);
            MessageImpl wrappedMessage = receive.getWrappedMessage();
            assertTrue(wrappedMessage.getBody() instanceof AmqpValue);
            assertTrue(wrappedMessage.getBody().getValue() instanceof Binary);
            assertEquals(0L, ((Binary) r0.getValue()).getLength());
            receive.accept();
            createSession.close();
            addConnection.close();
        } catch (Throwable th) {
            addConnection.close();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testMessageWithDataAndEmptyBinaryPreservesBody() throws Exception {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        try {
            AmqpSession createSession = addConnection.createSession();
            AmqpSender createSender = createSession.createSender(getTestName());
            AmqpMessage createAmqpLargeMessageWithNoBody = createAmqpLargeMessageWithNoBody();
            createAmqpLargeMessageWithNoBody.getWrappedMessage().setBody(new Data(new Binary(new byte[0])));
            createSender.send(createAmqpLargeMessageWithNoBody);
            createSender.close();
            AmqpReceiver createReceiver = createSession.createReceiver(getTestName());
            createReceiver.flow(1);
            AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
            assertNotNull("failed to read large AMQP message", receive);
            MessageImpl wrappedMessage = receive.getWrappedMessage();
            assertTrue(wrappedMessage.getBody() instanceof Data);
            assertTrue(wrappedMessage.getBody().getValue() instanceof Binary);
            assertEquals(0L, r0.getValue().getLength());
            receive.accept();
            createSession.close();
            addConnection.close();
        } catch (Throwable th) {
            addConnection.close();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testMessageWithDataAndContentTypeOfTextPreservesBodyType() throws Exception {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        try {
            AmqpSession createSession = addConnection.createSession();
            AmqpSender createSender = createSession.createSender(getTestName());
            AmqpMessage createAmqpLargeMessageWithNoBody = createAmqpLargeMessageWithNoBody();
            createAmqpLargeMessageWithNoBody.getWrappedMessage().setContentType("text/plain");
            createAmqpLargeMessageWithNoBody.getWrappedMessage().setBody(new Data(new Binary("This text will be in a Data Section".getBytes(StandardCharsets.UTF_8))));
            createSender.send(createAmqpLargeMessageWithNoBody);
            createSender.close();
            AmqpReceiver createReceiver = createSession.createReceiver(getTestName());
            createReceiver.flow(1);
            AmqpMessage receive = createReceiver.receive(10L, TimeUnit.SECONDS);
            assertNotNull("failed to read large AMQP message", receive);
            MessageImpl wrappedMessage = receive.getWrappedMessage();
            assertTrue(wrappedMessage.getBody() instanceof Data);
            Data body = wrappedMessage.getBody();
            assertTrue(body.getValue() instanceof Binary);
            Binary value = body.getValue();
            assertEquals("This text will be in a Data Section", new String(value.getArray(), value.getArrayOffset(), value.getLength(), StandardCharsets.UTF_8));
            receive.accept();
            createSession.close();
            addConnection.close();
        } catch (Throwable th) {
            addConnection.close();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testMessageWithAmqpValueListPreservesBodyType() throws Exception {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        try {
            AmqpSession createSession = addConnection.createSession();
            AmqpSender createSender = createSession.createSender(getTestName());
            AmqpMessage createAmqpLargeMessageWithNoBody = createAmqpLargeMessageWithNoBody();
            ArrayList arrayList = new ArrayList();
            arrayList.add("1");
            arrayList.add("2");
            arrayList.add("3");
            createAmqpLargeMessageWithNoBody.getWrappedMessage().setBody(new AmqpValue(arrayList));
            createSender.send(createAmqpLargeMessageWithNoBody);
            createSender.close();
            AmqpReceiver createReceiver = createSession.createReceiver(getTestName());
            createReceiver.flow(1);
            AmqpMessage receive = createReceiver.receive(10L, TimeUnit.SECONDS);
            assertNotNull("failed to read large AMQP message", receive);
            MessageImpl wrappedMessage = receive.getWrappedMessage();
            assertTrue(wrappedMessage.getBody() instanceof AmqpValue);
            assertTrue(wrappedMessage.getBody().getValue() instanceof List);
            assertEquals(3L, ((List) r0.getValue()).size());
            receive.accept();
            createSession.close();
            addConnection.close();
        } catch (Throwable th) {
            addConnection.close();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testMessageWithAmqpSequencePreservesBodyType() throws Exception {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        try {
            AmqpSession createSession = addConnection.createSession();
            AmqpSender createSender = createSession.createSender(getTestName());
            AmqpMessage createAmqpLargeMessageWithNoBody = createAmqpLargeMessageWithNoBody();
            ArrayList arrayList = new ArrayList();
            arrayList.add("1");
            arrayList.add("2");
            arrayList.add("3");
            createAmqpLargeMessageWithNoBody.getWrappedMessage().setBody(new AmqpSequence(arrayList));
            createSender.send(createAmqpLargeMessageWithNoBody);
            createSender.close();
            AmqpReceiver createReceiver = createSession.createReceiver(getTestName());
            createReceiver.flow(1);
            AmqpMessage receive = createReceiver.receive(10L, TimeUnit.SECONDS);
            assertNotNull("failed to read large AMQP message", receive);
            MessageImpl wrappedMessage = receive.getWrappedMessage();
            assertTrue(wrappedMessage.getBody() instanceof AmqpSequence);
            assertTrue(wrappedMessage.getBody().getValue() instanceof List);
            assertEquals(3L, r0.getValue().size());
            receive.accept();
            createSession.close();
            addConnection.close();
        } catch (Throwable th) {
            addConnection.close();
            throw th;
        }
    }

    @Test
    public void testDeleteUnreferencedMessage() throws Exception {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
        AmqpConnection addConnection = addConnection(createAmqpClient().connect());
        try {
            AmqpSender createSender = addConnection.createSession().createSender(getTestName());
            AmqpMessage createAmqpMessage = createAmqpMessage((byte) 65, this.payload);
            createAmqpMessage.setDurable(true);
            createSender.send(createAmqpMessage);
            createSender.close();
            addConnection.close();
            org.apache.activemq.artemis.core.server.Queue locateQueue = this.server.locateQueue(getTestName());
            locateQueue.forEach(messageReference -> {
                if (messageReference.getMessage().isLargeMessage()) {
                    try {
                        this.server.getStorageManager().storeAcknowledge(locateQueue.getID().longValue(), messageReference.getMessageID());
                    } catch (Exception e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            });
            this.server.stop();
            AssertionLoggerHandler.startCapture();
            runAfter(AssertionLoggerHandler::stopCapture);
            this.server.start();
            Assert.assertTrue(AssertionLoggerHandler.findText(new String[]{"AMQ221019"}));
            validateNoFilesOnLargeDir();
            ActiveMQServer activeMQServer = this.server;
            Objects.requireNonNull(activeMQServer);
            runAfter(activeMQServer::stop);
        } catch (Throwable th) {
            addConnection.close();
            throw th;
        }
    }

    @Test
    public void testSimpleLargeMessageRestart() throws Exception {
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setDefaultAddressRoutingType(RoutingType.ANYCAST));
        AmqpClient createAmqpClient = createAmqpClient();
        AmqpConnection addConnection = addConnection(createAmqpClient.connect());
        try {
            AmqpSender createSender = addConnection.createSession().createSender(getTestName());
            AmqpMessage createAmqpMessage = createAmqpMessage((byte) 65, this.payload);
            createAmqpMessage.setDurable(true);
            createSender.send(createAmqpMessage);
            createSender.close();
            addConnection.close();
            this.server.stop();
            AssertionLoggerHandler.startCapture();
            this.server.start();
            Assert.assertFalse(AssertionLoggerHandler.findText(new String[]{"AMQ221019"}));
            Assert.assertFalse(AssertionLoggerHandler.findText(new String[]{"AMQ221018"}));
            addConnection = addConnection(createAmqpClient.connect());
            try {
                AmqpSession createSession = addConnection.createSession();
                AmqpReceiver createReceiver = createSession.createReceiver(getTestName());
                createReceiver.flow(1);
                AmqpMessage receive = createReceiver.receive(5L, TimeUnit.SECONDS);
                Assert.assertNotNull(receive);
                receive.accept();
                createReceiver.close();
                createSession.close();
                addConnection.close();
                validateNoFilesOnLargeDir();
                ActiveMQServer activeMQServer = this.server;
                Objects.requireNonNull(activeMQServer);
                runAfter(activeMQServer::stop);
            } finally {
            }
        } finally {
        }
    }

    private void sendObjectMessages(int i, ConnectionFactory connectionFactory) throws Exception {
        Connection createConnection = connectionFactory.createConnection();
        try {
            Session createSession = createConnection.createSession();
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue(this.testQueueName));
            ObjectMessage createObjectMessage = createSession.createObjectMessage();
            StringBuilder sb = new StringBuilder();
            for (int i2 = 0; i2 < this.payload; i2++) {
                sb.append("A");
            }
            createObjectMessage.setObject(sb.toString());
            for (int i3 = 0; i3 < i; i3++) {
                createObjectMessage.setIntProperty("i", Integer.valueOf(i3).intValue());
                createProducer.send(createObjectMessage);
            }
            if (createConnection != null) {
                createConnection.close();
            }
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void sendTextMessages(int i, ConnectionFactory connectionFactory) throws Exception {
        Connection createConnection = connectionFactory.createConnection();
        try {
            Session createSession = createConnection.createSession();
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue(this.testQueueName));
            TextMessage createTextMessage = createSession.createTextMessage();
            StringBuilder sb = new StringBuilder();
            for (int i2 = 0; i2 < this.payload; i2++) {
                sb.append("A");
            }
            createTextMessage.setText(sb.toString());
            for (int i3 = 0; i3 < i; i3++) {
                createTextMessage.setIntProperty("i", Integer.valueOf(i3).intValue());
                createProducer.send(createTextMessage);
            }
            if (createConnection != null) {
                createConnection.close();
            }
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void sendBytesMessages(int i, ConnectionFactory connectionFactory) throws Exception {
        Connection createConnection = connectionFactory.createConnection();
        try {
            Session createSession = createConnection.createSession();
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue(this.testQueueName));
            BytesMessage createBytesMessage = createSession.createBytesMessage();
            StringBuilder sb = new StringBuilder();
            for (int i2 = 0; i2 < this.payload; i2++) {
                sb.append("A");
            }
            createBytesMessage.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));
            for (int i3 = 0; i3 < i; i3++) {
                createBytesMessage.setIntProperty("i", Integer.valueOf(i3).intValue());
                createProducer.send(createBytesMessage);
            }
            if (createConnection != null) {
                createConnection.close();
            }
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private AmqpMessage createAmqpMessage(byte b, int i) {
        AmqpMessage amqpMessage = new AmqpMessage();
        byte[] bArr = new byte[i];
        for (int i2 = 0; i2 < bArr.length; i2++) {
            bArr[i2] = b;
        }
        amqpMessage.setBytes(bArr);
        return amqpMessage;
    }

    private AmqpMessage createAmqpLargeMessageWithNoBody() {
        AmqpMessage amqpMessage = new AmqpMessage();
        byte[] bArr = new byte[524288];
        for (int i = 0; i < bArr.length; i++) {
            bArr[i] = 65;
        }
        amqpMessage.setMessageAnnotation("x-opt-big-blob", new String(bArr, StandardCharsets.UTF_8));
        return amqpMessage;
    }
}
