package org.apache.activemq.artemis.tests.integration.openwire;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/openwire/OpenWireLargeMessageTest.class */
public class OpenWireLargeMessageTest extends BasicOpenWireTest {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    public SimpleString lmAddress = SimpleString.of("LargeMessageAddress");
    public SimpleString lmDropAddress = SimpleString.of("LargeMessageDropAddress");

    @Override // org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest, org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase, org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        this.realStore = true;
        super.setUp();
        this.server.createQueue(QueueConfiguration.of(this.lmAddress).setRoutingType(RoutingType.ANYCAST));
        this.server.createQueue(QueueConfiguration.of(this.lmDropAddress).setRoutingType(RoutingType.ANYCAST));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase
    public void configureAddressSettings(Map<String, AddressSettings> map) {
        map.put("#", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(SimpleString.of("ActiveMQ.DLQ")).setAutoCreateAddresses(true));
        map.put(this.lmDropAddress.toString(), new AddressSettings().setMaxSizeBytes(102400L).setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP).setMaxSizeMessages(2L).setMessageCounterHistoryDayLimit(10).setRedeliveryDelay(0L).setMaxDeliveryAttempts(0));
    }

    @Test
    public void testSendReceiveLargeMessageRestart() throws Exception {
        internalSendReceiveLargeMessage(this.factory, true);
        internalSendReceiveLargeMessage(CFUtil.createConnectionFactory("openwire", "tcp://localhost:61618"), true);
    }

    @Test
    public void testSendReceiveLargeMessage() throws Exception {
        internalSendReceiveLargeMessage(this.factory, false);
        internalSendReceiveLargeMessage(CFUtil.createConnectionFactory("openwire", "tcp://localhost:61618"), false);
    }

    private void internalSendReceiveLargeMessage(ConnectionFactory connectionFactory, boolean z) throws Exception {
        String str = "This is a random String " + RandomUtil.randomString();
        StringBuffer stringBuffer = new StringBuffer();
        while (stringBuffer.length() < 1048576) {
            stringBuffer.append(str);
        }
        String stringBuffer2 = stringBuffer.toString();
        Connection createConnection = connectionFactory.createConnection();
        try {
            createConnection.start();
            Session createSession = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue(this.lmAddress.toString()));
            createProducer.setDeliveryMode(2);
            createProducer.send(createSession.createTextMessage(stringBuffer2));
            if (createConnection != null) {
                createConnection.close();
            }
            if (z) {
                this.server.stop();
                this.server.start();
            }
            createConnection = connectionFactory.createConnection();
            try {
                createConnection.start();
                Session createSession2 = createConnection.createSession(false, 1);
                Assertions.assertEquals(stringBuffer2, createSession2.createConsumer(createSession2.createQueue(this.lmAddress.toString())).receive(5000L).getText());
                if (createConnection != null) {
                    createConnection.close();
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testFastLargeMessageProducerDropOnPaging() throws Exception {
        AssertionLoggerHandler assertionLoggerHandler = new AssertionLoggerHandler(true);
        try {
            byte[] bArr = new byte[204800];
            Connection createConnection = this.factory.createConnection();
            try {
                createConnection.start();
                Session createSession = createConnection.createSession(false, 1);
                try {
                    Queue createQueue = createSession.createQueue(this.lmDropAddress.toString());
                    MessageProducer createProducer = createSession.createProducer(createQueue);
                    try {
                        createProducer.setDeliveryMode(1);
                        bArr[0] = 1;
                        BytesMessage createBytesMessage = createSession.createBytesMessage();
                        createBytesMessage.writeBytes(bArr);
                        PagingStore pageStore = this.server.getPagingManager().getPageStore(this.lmDropAddress);
                        while (!pageStore.isPaging()) {
                            createProducer.send(createBytesMessage);
                        }
                        for (int i = 0; i < 10; i++) {
                            createProducer.send(createBytesMessage);
                        }
                        long messageCount = this.server.locateQueue(this.lmDropAddress).getMessageCount();
                        Assertions.assertTrue(messageCount > 0, "The queue cannot be empty");
                        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
                        for (long j = 0; j < messageCount; j++) {
                            try {
                                if (createConsumer.receive(2000L) == null) {
                                    Assertions.fail("The messages are not finished yet");
                                }
                            } catch (Throwable th) {
                                if (createConsumer != null) {
                                    try {
                                        createConsumer.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                }
                                throw th;
                            }
                        }
                        if (createConsumer != null) {
                            createConsumer.close();
                        }
                        if (createProducer != null) {
                            createProducer.close();
                        }
                        if (createSession != null) {
                            createSession.close();
                        }
                        if (createConnection != null) {
                            createConnection.close();
                        }
                        this.server.stop();
                        Assertions.assertFalse(assertionLoggerHandler.findTrace("NullPointerException"));
                        Assertions.assertFalse(assertionLoggerHandler.findText(new String[]{"It was not possible to delete message"}));
                        assertionLoggerHandler.close();
                    } catch (Throwable th3) {
                        if (createProducer != null) {
                            try {
                                createProducer.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        }
                        throw th3;
                    }
                } catch (Throwable th5) {
                    if (createSession != null) {
                        try {
                            createSession.close();
                        } catch (Throwable th6) {
                            th5.addSuppressed(th6);
                        }
                    }
                    throw th5;
                }
            } finally {
            }
        } catch (Throwable th7) {
            try {
                assertionLoggerHandler.close();
            } catch (Throwable th8) {
                th7.addSuppressed(th8);
            }
            throw th7;
        }
    }

    @Test
    public void testSendReceiveLargeMessageTX() throws Exception {
        TextMessage textMessage;
        int i = 1000;
        int i2 = 100;
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        Objects.requireNonNull(newFixedThreadPool);
        runAfter(newFixedThreadPool::shutdownNow);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        String str = "This is a random String " + RandomUtil.randomString();
        StringBuffer stringBuffer = new StringBuffer();
        while (stringBuffer.length() < 1048576) {
            stringBuffer.append(str);
        }
        String stringBuffer2 = stringBuffer.toString();
        newFixedThreadPool.execute(() -> {
            try {
                try {
                    Connection createConnection = this.factory.createConnection();
                    try {
                        createConnection.start();
                        Session createSession = createConnection.createSession(true, 0);
                        MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(this.lmAddress.toString()));
                        for (int i3 = 0; i3 < i; i3++) {
                            TextMessage receive = createConsumer.receive(5000L);
                            Assertions.assertNotNull(receive);
                            if (receive instanceof TextMessage) {
                                Assertions.assertEquals(stringBuffer2, receive.getText());
                            }
                            if (i3 > 0 && i3 % i2 == 0) {
                                logger.info("Received {} messages", Integer.valueOf(i3));
                                createSession.commit();
                            }
                        }
                        createSession.commit();
                        if (createConnection != null) {
                            createConnection.close();
                        }
                        countDownLatch.countDown();
                    } catch (Throwable th) {
                        if (createConnection != null) {
                            try {
                                createConnection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    logger.warn(th3.getMessage(), th3);
                    atomicInteger.incrementAndGet();
                    countDownLatch.countDown();
                }
            } catch (Throwable th4) {
                countDownLatch.countDown();
                throw th4;
            }
        });
        Connection createConnection = this.factory.createConnection();
        try {
            createConnection.start();
            Session createSession = createConnection.createSession(true, 0);
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue(this.lmAddress.toString()));
            createProducer.setDeliveryMode(2);
            for (int i3 = 0; i3 < 1000; i3++) {
                if (i3 % 2 == 0) {
                    textMessage = createSession.createTextMessage(stringBuffer2);
                } else {
                    TextMessage createBytesMessage = createSession.createBytesMessage();
                    createBytesMessage.writeBytes(stringBuffer2.getBytes(StandardCharsets.UTF_8));
                    textMessage = createBytesMessage;
                }
                createProducer.send(textMessage);
                if (i3 > 0 && i3 % 100 == 0) {
                    logger.info("Sent {} messages", Integer.valueOf(i3));
                    createSession.commit();
                }
            }
            createSession.commit();
            if (createConnection != null) {
                createConnection.close();
            }
            countDownLatch.await(1L, TimeUnit.MINUTES);
            Assertions.assertEquals(0, atomicInteger.get());
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase
    public void extraServerConfig(Configuration configuration) {
        try {
            configuration.addAcceptorConfiguration("openwire", "tcp://0.0.0.0:61618?OPENWIRE;openwireMaxPacketSize=10 * 1024");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
