package org.apache.activemq.artemis.tests.integration.cluster.failover;

import jakarta.jms.Connection;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Objects;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/failover/PageCleanupWhileReplicaCatchupTest.class */
public class PageCleanupWhileReplicaCatchupTest extends FailoverTestBase {
    private static final int NUMBER_OF_WORKERS = 5;
    private static final int NUMBER_OF_RESTARTS = 5;
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    volatile boolean running = true;

    /* loaded from: input_file:org/apache/activemq/artemis/tests/integration/cluster/failover/PageCleanupWhileReplicaCatchupTest$Worker.class */
    class Worker extends Thread {
        final String queueName;
        final Queue queue;
        volatile Throwable throwable;

        Worker(String str) {
            super("Worker on queue " + str + " for test on PageCleanupWhileReplicaCatchupTest");
            this.queueName = str;
            this.queue = PageCleanupWhileReplicaCatchupTest.this.liveServer.getServer().locateQueue(this.queueName);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                Connection createConnection = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616").createConnection();
                try {
                    Session createSession = createConnection.createSession(false, 1);
                    createConnection.start();
                    jakarta.jms.Queue createQueue = createSession.createQueue(this.queueName);
                    MessageConsumer createConsumer = createSession.createConsumer(createQueue);
                    MessageProducer createProducer = createSession.createProducer(createQueue);
                    while (PageCleanupWhileReplicaCatchupTest.this.running) {
                        this.queue.getPagingStore().startPaging();
                        for (int i = 0; i < 10; i++) {
                            createProducer.send(createSession.createTextMessage("hello " + i));
                        }
                        for (int i2 = 0; i2 < 10; i2++) {
                            Assert.assertNotNull(createConsumer.receive(5000L));
                        }
                        Thread.sleep(500L);
                    }
                    if (createConnection != null) {
                        createConnection.close();
                    }
                } finally {
                }
            } catch (Throwable th) {
                th.printStackTrace(System.out);
                this.throwable = th;
            }
        }
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase
    @Before
    public void setUp() throws Exception {
        this.startBackupServer = false;
        super.setUp();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase
    public void createConfigs() throws Exception {
        createReplicatedConfigs();
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase
    protected TransportConfiguration getAcceptorTransportConfiguration(boolean z) {
        return getNettyAcceptorTransportConfiguration(z);
    }

    @Override // org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase
    protected TransportConfiguration getConnectorTransportConfiguration(boolean z) {
        return getNettyConnectorTransportConfiguration(z);
    }

    protected ActiveMQServer createInVMFailoverServer(boolean z, Configuration configuration, NodeManager nodeManager, int i) {
        HashMap hashMap = new HashMap();
        hashMap.put(ADDRESS.toString(), new AddressSettings().setMaxSizeBytes(2048L).setPageSizeBytes(1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE));
        return createInVMFailoverServer(z, configuration, 1024, 2048, hashMap, nodeManager, i);
    }

    @Test(timeout = 160000)
    public void testPageCleanup() throws Throwable {
        Worker[] workerArr = new Worker[5];
        for (int i = 0; i < 5; i++) {
            this.liveServer.getServer().addAddressInfo(new AddressInfo("WORKER_" + i).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
            this.liveServer.getServer().createQueue(new QueueConfiguration("WORKER_" + i).setRoutingType(RoutingType.ANYCAST).setDurable(true));
            workerArr[i] = new Worker("WORKER_" + i);
            workerArr[i].start();
        }
        for (int i2 = 0; i2 < 5; i2++) {
            logger.debug("Starting replica {}", Integer.valueOf(i2));
            this.backupServer.start();
            ActiveMQServer server = this.backupServer.getServer();
            Objects.requireNonNull(server);
            Wait.assertTrue(server::isReplicaSync);
            this.backupServer.stop();
        }
        this.running = false;
        for (Worker worker : workerArr) {
            worker.join();
        }
        for (Worker worker2 : workerArr) {
            if (worker2.throwable != null) {
                throw new RuntimeException("Worker " + worker2.queueName + " failed", worker2.throwable);
            }
        }
        for (Worker worker3 : workerArr) {
            PagingStoreImpl pagingStore = worker3.queue.getPagingStore();
            Objects.requireNonNull(pagingStore);
            Wait.assertFalse(pagingStore::isPaging, 5000L, 100L);
        }
    }
}
