package org.apache.activemq.artemis.tests.integration.paging;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.lang.invoke.MethodHandles;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.management.SimpleManagementTest;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/paging/MessagesExpiredPagingTest.class */
public class MessagesExpiredPagingTest extends ActiveMQTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final String ADDRESS = "MessagesExpiredPagingTest";
    private static final int NUMBER_OF_QUEUES = 10;
    ActiveMQServer server;
    protected static final int PAGE_MAX = 102400;
    protected static final int PAGE_SIZE = 10240;
    ExecutorService expiresExecutor;
    private AtomicBoolean running = new AtomicBoolean(true);
    Queue[] queues = new Queue[10];

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/paging/MessagesExpiredPagingTest$Consumer.class */
    public class Consumer extends Thread {
        final ConnectionFactory factory;
        final int minimalSize;
        AtomicInteger consumedDelta = new AtomicInteger(0);
        AtomicInteger consumed = new AtomicInteger(0);
        AtomicInteger errors = new AtomicInteger(0);
        final String queueName;

        Consumer(ConnectionFactory connectionFactory, String str, int i) {
            this.factory = connectionFactory;
            this.queueName = str;
            this.minimalSize = i;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                int i = 0;
                Connection createConnection = this.factory.createConnection();
                try {
                    Session createSession = createConnection.createSession(false, 1);
                    jakarta.jms.Queue createQueue = createSession.createQueue("MessagesExpiredPagingTest::" + this.queueName);
                    MessageConsumer createConsumer = createSession.createConsumer(createQueue);
                    createConnection.start();
                    while (MessagesExpiredPagingTest.this.running.get()) {
                        TextMessage receive = createConsumer.receive(500L);
                        if (receive != null) {
                            Assertions.assertTrue(receive.getText().length() > this.minimalSize);
                            this.consumed.incrementAndGet();
                            this.consumedDelta.incrementAndGet();
                            Thread.sleep(2L);
                        }
                        int i2 = i;
                        i++;
                        if (i2 > 10) {
                            i = 0;
                            createConsumer.close();
                            createConsumer = createSession.createConsumer(createQueue);
                        }
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } finally {
                }
            } catch (Throwable th) {
                this.errors.incrementAndGet();
            }
        }
    }

    @Override // org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @AfterEach
    public void tearDown() throws Exception {
        this.running.set(false);
        this.expiresExecutor.shutdown();
        Assertions.assertTrue(this.expiresExecutor.awaitTermination(10L, TimeUnit.SECONDS));
        super.tearDown();
    }

    @Override // org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.expiresExecutor = Executors.newFixedThreadPool(10);
        Configuration journalSyncNonTransactional = createDefaultConfig(0, true).setJournalSyncNonTransactional(false);
        journalSyncNonTransactional.setMessageExpiryScanPeriod(-1L);
        this.server = createServer(true, journalSyncNonTransactional, PAGE_SIZE, 102400L);
        this.server.getAddressSettingsRepository().clear();
        this.server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setPageSizeBytes(PAGE_SIZE).setMaxSizeBytes(102400L).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false));
        this.server.start();
        this.server.addAddressInfo(new AddressInfo(ADDRESS).addRoutingType(RoutingType.MULTICAST));
        for (int i = 0; i < 10; i++) {
            Queue createQueue = this.server.createQueue(QueueConfiguration.of("q" + i).setRoutingType(RoutingType.MULTICAST).setAddress(ADDRESS));
            this.queues[i] = createQueue;
            this.expiresExecutor.execute(() -> {
                Thread.currentThread().setName("Expiry on " + createQueue.getName() + ".." + Thread.currentThread().getName());
                while (this.running.get()) {
                    try {
                        createQueue.expireReferences();
                        Thread.sleep(10L);
                    } catch (Throwable th) {
                        logger.warn(th.getMessage(), th);
                    }
                }
            });
        }
    }

    @Test
    public void testSendReceiveCORELarge() throws Exception {
        testSendReceive("CORE", 50, 20, 10, 512000);
    }

    @Test
    public void testSendReceiveCORE() throws Exception {
        testSendReceive("CORE", 5000, 1000, 100, 0);
    }

    @Test
    public void testSendReceiveAMQP() throws Exception {
        testSendReceive("AMQP", 5000, 1000, 100, 0);
    }

    @Test
    public void testSendReceiveAMQPLarge() throws Exception {
        testSendReceive("AMQP", 50, 20, 10, 512000);
    }

    @Test
    public void testSendReceiveOpenWire() throws Exception {
        testSendReceive("OPENWIRE", 5000, 1000, 100, 0);
    }

    public void testSendReceive(String str, int i, int i2, int i3, int i4) throws Exception {
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory(str, SimpleManagementTest.LOCALHOST);
        StringBuffer stringBuffer = new StringBuffer();
        for (int i5 = 0; i5 < i4; i5++) {
            stringBuffer.append("*");
        }
        String stringBuffer2 = stringBuffer.toString();
        Consumer[] consumerArr = new Consumer[10];
        for (int i6 = 0; i6 < 10; i6++) {
            consumerArr[i6] = new Consumer(createConnectionFactory, "q" + i6, i4);
            consumerArr[i6].start();
        }
        Connection createConnection = createConnectionFactory.createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(createSession.createTopic(ADDRESS));
            for (int i7 = 0; i7 < 10; i7++) {
                createProducer.send(createSession.createTextMessage("hello" + stringBuffer2));
            }
            for (Consumer consumer : consumerArr) {
                AtomicInteger atomicInteger = consumer.consumed;
                Objects.requireNonNull(atomicInteger);
                Wait.assertEquals(10, atomicInteger::get);
            }
            for (Consumer consumer2 : consumerArr) {
                consumer2.consumedDelta.set(0);
            }
            createProducer.setTimeToLive(10L);
            for (int i8 = 0; i8 < i; i8++) {
                if (i8 > 0 && i8 % i3 == 0) {
                    for (Consumer consumer3 : consumerArr) {
                        createProducer.setTimeToLive(TimeUnit.HOURS.toMillis(1L));
                        createProducer.send(createSession.createTextMessage("hello" + stringBuffer2));
                        Wait.assertTrue(() -> {
                            return consumer3.consumedDelta.get() > 0;
                        });
                        createProducer.setTimeToLive(10L);
                        consumer3.consumedDelta.set(0);
                    }
                    this.queues[0].getPagingStore().forceAnotherPage();
                }
                createProducer.send(createSession.createTextMessage("hello" + stringBuffer2));
            }
            createProducer.setTimeToLive(300L);
            for (int i9 = 0; i9 < i2; i9++) {
                if (i9 > 0 && i9 % i3 == 0) {
                    this.queues[0].getPagingStore().forceAnotherPage();
                }
                createProducer.send(createSession.createTextMessage("hello" + stringBuffer2));
            }
            createProducer.setTimeToLive(TimeUnit.HOURS.toMillis(1L));
            createProducer.send(createSession.createTextMessage("hello again" + stringBuffer2));
            for (Consumer consumer4 : consumerArr) {
                Wait.assertTrue(() -> {
                    return consumer4.consumedDelta.get() > 0;
                });
            }
            this.running.set(false);
            for (Consumer consumer5 : consumerArr) {
                consumer5.join(5000L);
                Assertions.assertFalse(consumer5.isAlive());
                Assertions.assertEquals(0, consumer5.errors.get());
            }
            if (createConnection != null) {
                createConnection.close();
            }
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
