package org.apache.activemq.artemis.tests.integration.stomp;

import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collection;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Disabled
@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/stomp/StompWithLargeMessagesTest.class */
public class StompWithLargeMessagesTest extends StompTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Parameters(name = "{0}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{"tcp+v10.stomp"}, new Object[]{"tcp+v12.stomp"});
    }

    @Override // org.apache.activemq.artemis.tests.integration.stomp.StompTestBase, org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
    }

    @Override // org.apache.activemq.artemis.tests.integration.stomp.StompTestBase
    public boolean isCompressLargeMessages() {
        return true;
    }

    @Override // org.apache.activemq.artemis.tests.integration.stomp.StompTestBase
    public boolean isPersistenceEnabled() {
        return true;
    }

    @Override // org.apache.activemq.artemis.tests.integration.stomp.StompTestBase
    public Integer getStompMinLargeMessageSize() {
        return 2048;
    }

    @TestTemplate
    public void testSendReceiveLargeMessage() throws Exception {
        StompClientConnection createClientConnection = StompClientConnectionFactory.createClientConnection(this.uri);
        try {
            this.server.createQueue(QueueConfiguration.of("testLargeMessageAddress").setRoutingType(RoutingType.ANYCAST));
            StringBuilder sb = new StringBuilder(10485760);
            for (int i = 0; i < 10485760; i++) {
                sb.append('t');
            }
            String sb2 = sb.toString();
            createClientConnection.connect(this.defUser, this.defPass);
            subscribe(createClientConnection, null, "auto", null, null, "testLargeMessageAddress", true);
            send(createClientConnection, "testLargeMessageAddress", null, sb2);
            Assertions.assertTrue(createClientConnection.receiveFrame().getBody().equals(sb2));
            createClientConnection.disconnect();
        } catch (Throwable th) {
            createClientConnection.disconnect();
            throw th;
        }
    }

    @TestTemplate
    public void testSendReceiveLargePersistentMessages() throws Exception {
        StompClientConnection createClientConnection = StompClientConnectionFactory.createClientConnection(this.uri);
        createClientConnection.connect(this.defUser, this.defPass);
        char[] cArr = new char[1048576];
        for (int i = 0; i < 1048576; i++) {
            cArr[i] = 'A';
        }
        String str = new String(cArr);
        for (int i2 = 0; i2 < 10; i2++) {
            ClientStompFrame createFrame = createClientConnection.createFrame("SEND");
            createFrame.addHeader("destination", getQueuePrefix() + getQueueName());
            createFrame.addHeader("persistent", "true");
            createFrame.setBody(str);
            createClientConnection.sendFrame(createFrame);
        }
        ClientStompFrame createFrame2 = createClientConnection.createFrame("SUBSCRIBE");
        createFrame2.addHeader("subscription-type", "ANYCAST");
        createFrame2.addHeader("destination", getQueuePrefix() + getQueueName());
        createFrame2.addHeader("ack", "auto");
        createClientConnection.sendFrame(createFrame2);
        for (int i3 = 0; i3 < 10; i3++) {
            ClientStompFrame receiveFrame = createClientConnection.receiveFrame(AmqpConnection.DEFAULT_DRAIN_TIMEOUT);
            Assertions.assertNotNull(receiveFrame);
            Assertions.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
            Assertions.assertTrue(receiveFrame.getHeader("destination").equals(getQueuePrefix() + getQueueName()));
            Assertions.assertEquals(1048576, receiveFrame.getBody().length() - receiveFrame.getBody().indexOf("AAAA"));
        }
        ClientStompFrame createFrame3 = createClientConnection.createFrame("UNSUBSCRIBE");
        createFrame3.addHeader("destination", getQueuePrefix() + getQueueName());
        createFrame3.addHeader("receipt", "567");
        ClientStompFrame sendFrame = createClientConnection.sendFrame(createFrame3);
        Assertions.assertNotNull(sendFrame);
        Assertions.assertTrue(sendFrame.getCommand().equals("RECEIPT"));
        createClientConnection.disconnect();
    }

    @TestTemplate
    public void testReceiveLargePersistentMessagesFromCore() throws Exception {
        StompClientConnection createClientConnection = StompClientConnectionFactory.createClientConnection(this.uri);
        createClientConnection.connect(this.defUser, this.defPass);
        char[] cArr = new char[307200];
        for (int i = 0; i < 307200; i++) {
            cArr[i] = 'B';
        }
        String str = new String(cArr);
        for (int i2 = 0; i2 < 10; i2++) {
            sendJmsMessage(str);
        }
        ClientStompFrame createFrame = createClientConnection.createFrame("SUBSCRIBE");
        createFrame.addHeader("subscription-type", "ANYCAST");
        createFrame.addHeader("destination", getQueuePrefix() + getQueueName());
        createFrame.addHeader("ack", "auto");
        createClientConnection.sendFrame(createFrame);
        for (int i3 = 0; i3 < 10; i3++) {
            ClientStompFrame receiveFrame = createClientConnection.receiveFrame(AmqpConnection.DEFAULT_DRAIN_TIMEOUT);
            Assertions.assertNotNull(receiveFrame);
            Assertions.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
            Assertions.assertTrue(receiveFrame.getHeader("destination").equals(getQueuePrefix() + getQueueName()));
            Assertions.assertEquals(307200, receiveFrame.getBody().length() - receiveFrame.getBody().indexOf("BBB"));
        }
        ClientStompFrame createFrame2 = createClientConnection.createFrame("UNSUBSCRIBE");
        createFrame2.addHeader("destination", getQueuePrefix() + getQueueName());
        createFrame2.addHeader("receipt", "567");
        ClientStompFrame sendFrame = createClientConnection.sendFrame(createFrame2);
        Assertions.assertNotNull(sendFrame);
        Assertions.assertTrue(sendFrame.getCommand().equals("RECEIPT"));
        createClientConnection.disconnect();
    }

    @TestTemplate
    public void testReceiveLargeCompressedToRegularPersistentMessagesFromCore() throws Exception {
        StompClientConnection createClientConnection = StompClientConnectionFactory.createClientConnection(this.uri);
        createClientConnection.connect(this.defUser, this.defPass);
        LargeMessageTestBase.TestLargeMessageInputStream testLargeMessageInputStream = new LargeMessageTestBase.TestLargeMessageInputStream(102400, true);
        LargeMessageTestBase.adjustLargeCompression(true, testLargeMessageInputStream, 102400);
        String str = new String(testLargeMessageInputStream.toArray());
        String substring = str.substring(0, 100);
        for (int i = 0; i < 10; i++) {
            sendJmsMessage(str);
        }
        ClientStompFrame createFrame = createClientConnection.createFrame("SUBSCRIBE");
        createFrame.addHeader("subscription-type", "ANYCAST");
        createFrame.addHeader("destination", getQueuePrefix() + getQueueName());
        createFrame.addHeader("ack", "auto");
        createClientConnection.sendFrame(createFrame);
        for (int i2 = 0; i2 < 10; i2++) {
            ClientStompFrame receiveFrame = createClientConnection.receiveFrame(AmqpConnection.DEFAULT_CLOSE_TIMEOUT);
            Assertions.assertNotNull(receiveFrame);
            Assertions.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
            Assertions.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
            Assertions.assertEquals(str.length(), receiveFrame.getBody().length() - receiveFrame.getBody().indexOf(substring));
        }
        ClientStompFrame createFrame2 = createClientConnection.createFrame("UNSUBSCRIBE");
        createFrame2.addHeader("destination", getQueuePrefix() + getQueueName());
        createFrame2.addHeader("receipt", "567");
        ClientStompFrame sendFrame = createClientConnection.sendFrame(createFrame2);
        Assertions.assertNotNull(sendFrame);
        Assertions.assertTrue(sendFrame.getCommand().equals("RECEIPT"));
        createClientConnection.disconnect();
    }

    @TestTemplate
    public void testReceiveLargeCompressedToLargePersistentMessagesFromCore() throws Exception {
        StompClientConnection createClientConnection = StompClientConnectionFactory.createClientConnection(this.uri);
        try {
            LargeMessageTestBase.TestLargeMessageInputStream testLargeMessageInputStream = new LargeMessageTestBase.TestLargeMessageInputStream(102400, true);
            testLargeMessageInputStream.setSize(1024000);
            LargeMessageTestBase.adjustLargeCompression(false, testLargeMessageInputStream, 1024000);
            String str = new String(testLargeMessageInputStream.toArray());
            String substring = str.substring(0, 100);
            for (int i = 0; i < 10; i++) {
                sendJmsMessage(str);
            }
            createClientConnection.connect(this.defUser, this.defPass);
            ClientStompFrame createFrame = createClientConnection.createFrame("SUBSCRIBE");
            createFrame.addHeader("subscription-type", "ANYCAST");
            createFrame.addHeader("destination", getQueuePrefix() + getQueueName());
            createFrame.addHeader("ack", "auto");
            createClientConnection.sendFrame(createFrame);
            for (int i2 = 0; i2 < 10; i2++) {
                ClientStompFrame receiveFrame = createClientConnection.receiveFrame(AmqpConnection.DEFAULT_DRAIN_TIMEOUT);
                Assertions.assertNotNull(receiveFrame);
                logger.debug(receiveFrame.toString());
                logger.debug("part of frame: {}", receiveFrame.getBody().substring(0, 250));
                Assertions.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
                Assertions.assertTrue(receiveFrame.getHeader("destination").equals(getQueuePrefix() + getQueueName()));
                Assertions.assertEquals(str.length(), receiveFrame.getBody().toString().length() - receiveFrame.getBody().toString().indexOf(substring));
            }
            ClientStompFrame createFrame2 = createClientConnection.createFrame("UNSUBSCRIBE");
            createFrame2.addHeader("destination", getQueuePrefix() + getQueueName());
            createFrame2.addHeader("receipt", "567");
            createClientConnection.sendFrame(createFrame2);
            createClientConnection.disconnect();
            createClientConnection.closeTransport();
        } catch (Throwable th) {
            createClientConnection.disconnect();
            createClientConnection.closeTransport();
            throw th;
        }
    }
}
