package org.apache.activemq.artemis.tests.integration.client;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.io.File;
import java.lang.invoke.MethodHandles;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/client/ExpiryLargeMessageTest.class */
public class ExpiryLargeMessageTest extends ActiveMQTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    final SimpleString EXPIRY = new SimpleString("my-expiry");
    final SimpleString DLQ = new SimpleString("my-DLQ");
    final SimpleString MY_QUEUE = new SimpleString("MY-QUEUE");
    final int messageSize = 10240;
    final int numberOfMessages = 50;

    @Test
    public void testExpiryMessagesThenDLQ() throws Exception {
        ActiveMQServer createServer = createServer(true);
        createServer.getConfiguration().setMessageExpiryScanPeriod(600000L);
        AddressSettings maxReadPageMessages = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxDeliveryAttempts(5).setMaxSizeBytes(51200L).setPageSizeBytes(10240).setExpiryAddress(this.EXPIRY).setDeadLetterAddress(this.DLQ).setMaxReadPageBytes(-1).setMaxReadPageMessages(-1);
        createServer.getAddressSettingsRepository().addMatch(this.MY_QUEUE.toString(), maxReadPageMessages);
        createServer.getAddressSettingsRepository().addMatch(this.EXPIRY.toString(), maxReadPageMessages);
        createServer.start();
        createServer.createQueue(new QueueConfiguration(this.EXPIRY).setRoutingType(RoutingType.ANYCAST));
        createServer.createQueue(new QueueConfiguration(this.DLQ).setRoutingType(RoutingType.ANYCAST));
        createServer.createQueue(new QueueConfiguration(this.MY_QUEUE).setRoutingType(RoutingType.ANYCAST));
        ServerLocator createInVMNonHALocator = createInVMNonHALocator();
        ClientSession createSession = addSessionFactory(createSessionFactory(createInVMNonHALocator)).createSession(true, true, 0);
        byte[] bArr = new byte[10240];
        for (int i = 0; i < bArr.length; i++) {
            bArr[i] = getSamplebyte(i);
        }
        ClientProducer createProducer = createSession.createProducer(this.MY_QUEUE);
        long currentTimeMillis = System.currentTimeMillis() + 1000;
        for (int i2 = 0; i2 < 50; i2++) {
            ClientMessage createMessage = createSession.createMessage(true);
            createMessage.putIntProperty("count", i2);
            if (i2 % 2 == 0) {
                createMessage.putBooleanProperty("tst-large", false);
                createMessage.getBodyBuffer().writeBytes(bArr);
            } else {
                createMessage.putBooleanProperty("tst-large", true);
                createMessage.setBodyInputStream(createFakeLargeStream(10240L));
            }
            createMessage.setExpiration(currentTimeMillis);
            createProducer.send(createMessage);
        }
        createSession.close();
        createServer.stop();
        createServer.start();
        Queue locateQueue = createServer.locateQueue(this.EXPIRY);
        Queue locateQueue2 = createServer.locateQueue(this.MY_QUEUE);
        ClientSessionFactory createSessionFactory = createSessionFactory(createInVMNonHALocator);
        Thread.sleep(1500L);
        Wait.assertEquals(50, () -> {
            locateQueue2.expireReferences();
            return getMessageCount(locateQueue);
        });
        ClientSession createSession2 = createSessionFactory.createSession(false, false);
        ClientConsumer createConsumer = createSession2.createConsumer(this.EXPIRY);
        createSession2.start();
        for (int i3 = 0; i3 < 25; i3++) {
            ClientMessage receive = createConsumer.receive(5000L);
            assertNotNull(receive);
            receive.acknowledge();
        }
        createSession2.commit();
        createConsumer.close();
        for (int i4 = 0; i4 < 6; i4++) {
            ClientConsumer createConsumer2 = createSession2.createConsumer(this.EXPIRY);
            createSession2.start();
            for (int i5 = 0; i5 < 25; i5++) {
                ClientMessage receive2 = createConsumer2.receive(5000L);
                assertNotNull(receive2);
                if (i5 % 10 == 0) {
                    logger.debug("Received {}", Integer.valueOf(i5));
                }
                for (int i6 = 0; i6 < 10240; i6++) {
                    assertEquals(getSamplebyte(i6), receive2.getBodyBuffer().readByte());
                }
                receive2.acknowledge();
            }
            createSession2.rollback();
            createConsumer2.close();
            createSession2.close();
            createSessionFactory.close();
            if (i4 == 0) {
                createServer.stop();
                createServer.start();
            }
            createSessionFactory = createSessionFactory(createInVMNonHALocator);
            createSession2 = createSessionFactory.createSession(false, false);
            createSession2.start();
        }
        ClientConsumer createConsumer3 = createSession2.createConsumer(this.EXPIRY);
        createSession2.start();
        assertNull(createConsumer3.receiveImmediate());
        createConsumer3.close();
        createSession2.close();
        createSessionFactory.close();
        for (int i7 = 0; i7 < 2; i7++) {
            createSessionFactory = createSessionFactory(createInVMNonHALocator);
            createSession2 = createSessionFactory.createSession(false, false);
            createConsumer3 = createSession2.createConsumer(this.DLQ);
            createSession2.start();
            for (int i8 = 0; i8 < 25; i8++) {
                ClientMessage receive3 = createConsumer3.receive(5000L);
                assertNotNull(receive3);
                if (i8 % 10 == 0) {
                    logger.debug("Received {}", Integer.valueOf(i8));
                }
                for (int i9 = 0; i9 < 10240; i9++) {
                    assertEquals(getSamplebyte(i9), receive3.getBodyBuffer().readByte());
                }
                receive3.acknowledge();
            }
            if (i7 == 0) {
                createSession2.rollback();
                createSession2.close();
                createSessionFactory.close();
                createServer.stop();
                createServer.start();
            }
        }
        createSession2.commit();
        assertNull(createConsumer3.receiveImmediate());
        createSession2.close();
        createSessionFactory.close();
        createInVMNonHALocator.close();
        validateNoFilesOnLargeDir();
    }

    @Test
    public void testExpiryMessagesAMQP() throws Exception {
        testExpiryMessagesAMQP(false, 307200);
    }

    @Test
    public void testExpiryMessagesAMQPRestartBeforeExpiry() throws Exception {
        testExpiryMessagesAMQP(true, 307200);
    }

    @Test
    public void testExpiryMessagesAMQPRegularMessageStandardMessage() throws Exception {
        testExpiryMessagesAMQP(false, 30);
    }

    @Test
    public void testExpiryMessagesAMQPRestartBeforeExpiryStandardMessage() throws Exception {
        testExpiryMessagesAMQP(true, 30);
    }

    public void testExpiryMessagesAMQP(boolean z, int i) throws Exception {
        ActiveMQServer createServer = createServer(true, true);
        createServer.getConfiguration().setMessageExpiryScanPeriod(6000L);
        AddressSettings maxReadPageBytes = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxDeliveryAttempts(5).setMaxSizeBytes(51200L).setPageSizeBytes(10240).setExpiryAddress(this.EXPIRY).setDeadLetterAddress(this.DLQ).setMaxReadPageMessages(-1).setMaxReadPageBytes(-1);
        createServer.getAddressSettingsRepository().addMatch(this.MY_QUEUE.toString(), maxReadPageBytes);
        createServer.getAddressSettingsRepository().addMatch(this.EXPIRY.toString(), maxReadPageBytes);
        createServer.start();
        createServer.createQueue(new QueueConfiguration(this.EXPIRY).setRoutingType(RoutingType.ANYCAST));
        createServer.createQueue(new QueueConfiguration(this.DLQ).setRoutingType(RoutingType.ANYCAST));
        createServer.createQueue(new QueueConfiguration(this.MY_QUEUE).setRoutingType(RoutingType.ANYCAST));
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61616");
        Connection createConnection = createConnectionFactory.createConnection();
        Session createSession = createConnection.createSession(false, 1);
        byte[] bArr = new byte[i];
        for (int i2 = 0; i2 < bArr.length; i2++) {
            bArr[i2] = getSamplebyte(i2);
        }
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(this.MY_QUEUE.toString()));
        createProducer.setTimeToLive(300L);
        for (int i3 = 0; i3 < 50; i3++) {
            BytesMessage createBytesMessage = createSession.createBytesMessage();
            createBytesMessage.writeBytes(bArr);
            createBytesMessage.setIntProperty("count", i3);
            createProducer.send(createBytesMessage);
        }
        createSession.close();
        createConnection.close();
        if (z) {
            createServer.stop();
            createServer.start();
        }
        Queue locateQueue = createServer.locateQueue(this.EXPIRY);
        Queue locateQueue2 = createServer.locateQueue(this.MY_QUEUE);
        Wait.assertEquals(50, () -> {
            locateQueue2.expireReferences();
            return getMessageCount(locateQueue);
        });
        if (!z) {
            createServer.stop();
            createServer.start();
        }
        Connection createConnection2 = createConnectionFactory.createConnection();
        Session createSession2 = createConnection2.createSession(true, 0);
        MessageConsumer createConsumer = createSession2.createConsumer(createSession2.createQueue(this.EXPIRY.toString()));
        createConnection2.start();
        for (int i4 = 0; i4 < 50; i4++) {
            Message receive = createConsumer.receive(5000L);
            assertNotNull(receive);
            receive.acknowledge();
        }
        createSession2.commit();
        createConnection2.close();
    }

    @Test
    public void testCompatilityWithLinks() throws Exception {
        ActiveMQServer createServer = createServer(true);
        createServer.getConfiguration().setMessageExpiryScanPeriod(600000L);
        AddressSettings deadLetterAddress = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxDeliveryAttempts(5).setMaxSizeBytes(-1L).setPageSizeBytes(10240).setExpiryAddress(this.EXPIRY).setDeadLetterAddress(this.DLQ);
        createServer.getAddressSettingsRepository().addMatch(this.MY_QUEUE.toString(), deadLetterAddress);
        createServer.getAddressSettingsRepository().addMatch(this.EXPIRY.toString(), deadLetterAddress);
        createServer.start();
        createServer.createQueue(new QueueConfiguration(this.EXPIRY).setRoutingType(RoutingType.ANYCAST));
        createServer.createQueue(new QueueConfiguration(this.DLQ).setRoutingType(RoutingType.ANYCAST));
        createServer.createQueue(new QueueConfiguration(this.MY_QUEUE).setRoutingType(RoutingType.ANYCAST));
        ServerLocator createInVMNonHALocator = createInVMNonHALocator();
        ClientSession createSession = addSessionFactory(createSessionFactory(createInVMNonHALocator)).createSession(true, true, 0);
        byte[] bArr = new byte[10240];
        for (int i = 0; i < bArr.length; i++) {
            bArr[i] = getSamplebyte(i);
        }
        ClientProducer createProducer = createSession.createProducer(this.MY_QUEUE);
        long currentTimeMillis = System.currentTimeMillis() + 1000;
        for (int i2 = 0; i2 < 50; i2++) {
            ClientMessage createMessage = createSession.createMessage(true);
            createMessage.putIntProperty("count", i2);
            createMessage.putBooleanProperty("tst-large", true);
            createMessage.setBodyInputStream(createFakeLargeStream(10240L));
            createMessage.setExpiration(currentTimeMillis);
            createProducer.send(createMessage);
        }
        createServer.stop();
        createServer.start();
        ClientSessionFactory createSessionFactory = createSessionFactory(createInVMNonHALocator);
        ClientSession createSession2 = createSessionFactory.createSession(true, true, 0);
        createSession2.start();
        Thread.sleep(1500L);
        assertNull(createSession2.createConsumer(this.MY_QUEUE).receive(1000L));
        createSession2.close();
        ClientSession createSession3 = createSessionFactory.createSession(false, false);
        ClientConsumer createConsumer = createSession3.createConsumer(this.EXPIRY);
        createSession3.start();
        ClientMessage receive = createConsumer.receive(5000L);
        assertNotNull(receive);
        receive.acknowledge();
        createSession3.rollback();
        createServer.stop();
        long messageID = receive.getMessageID();
        long longValue = receive.getLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_ORIG_MESSAGE_ID).longValue();
        File file = new File(getLargeMessagesDir());
        new File(file, messageID + ".msg").renameTo(new File(file, longValue + ".msg"));
        createServer.start();
        ClientSession createSession4 = createSessionFactory(createInVMNonHALocator).createSession(true, true, 0);
        createSession4.start();
        ClientConsumer createConsumer2 = createSession4.createConsumer(this.EXPIRY);
        for (int i3 = 0; i3 < 50; i3++) {
            ClientMessage receive2 = createConsumer2.receive(5000L);
            assertNotNull(receive2);
            if (i3 % 10 == 0) {
                logger.debug("Received {}", Integer.valueOf(i3));
            }
            for (int i4 = 0; i4 < 10240; i4++) {
                assertEquals(getSamplebyte(i4), receive2.getBodyBuffer().readByte());
            }
            receive2.acknowledge();
        }
        createSession4.commit();
        createSession4.close();
    }
}
