package org.apache.activemq.artemis.tests.integration.paging;

import jakarta.jms.Connection;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.lang.invoke.MethodHandles;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.integration.management.SimpleManagementTest;
import org.apache.activemq.artemis.tests.integration.replication.ReplicationOrderTest;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.class */
public class PagingCounterTest extends ActiveMQTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private ActiveMQServer server;
    private ServerLocator sl;
    private AssertionLoggerHandler loggerHandler;

    @Override // org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.server = newActiveMQServer();
        this.server.start();
        this.sl = createInVMNonHALocator();
        this.loggerHandler = new AssertionLoggerHandler();
    }

    @AfterEach
    public void checkLoggerEnd() throws Exception {
        if (this.loggerHandler != null) {
            try {
                Assertions.assertFalse(this.loggerHandler.findText(new String[]{"222214"}));
                Assertions.assertFalse(this.loggerHandler.findText(new String[]{"222215"}));
            } finally {
                this.loggerHandler.close();
            }
        }
    }

    @Test
    public void testCounter() throws Exception {
        ClientSessionFactory createSessionFactory = createSessionFactory(this.sl);
        ClientSession createSession = createSessionFactory.createSession();
        try {
            this.server.addAddressInfo(new AddressInfo(SimpleString.of("A1"), RoutingType.ANYCAST));
            PageSubscriptionCounter locateCounter = locateCounter(this.server.createQueue(QueueConfiguration.of("A1").setRoutingType(RoutingType.ANYCAST)));
            this.server.getStorageManager();
            TransactionImpl transactionImpl = new TransactionImpl(this.server.getStorageManager());
            locateCounter.increment(transactionImpl, 1, 1000L);
            Objects.requireNonNull(locateCounter);
            Wait.assertEquals(0L, locateCounter::getValue);
            Objects.requireNonNull(locateCounter);
            Wait.assertEquals(0L, locateCounter::getPersistentSize);
            transactionImpl.commit();
            Objects.requireNonNull(locateCounter);
            Wait.assertEquals(1L, locateCounter::getValue);
            Objects.requireNonNull(locateCounter);
            Wait.assertEquals(1000L, locateCounter::getPersistentSize);
            createSessionFactory.close();
            createSession.close();
        } catch (Throwable th) {
            createSessionFactory.close();
            createSession.close();
            throw th;
        }
    }

    @Test
    public void testMultiThreadUpdates() throws Exception {
        ClientSessionFactory createSessionFactory = createSessionFactory(this.sl);
        ClientSession createSession = createSessionFactory.createSession();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        try {
            this.server.addAddressInfo(new AddressInfo(SimpleString.of("A1"), RoutingType.ANYCAST));
            PageSubscriptionCounter locateCounter = locateCounter(this.server.createQueue(QueueConfiguration.of(SimpleString.of("A1")).setRoutingType(RoutingType.ANYCAST)));
            CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
            CountDownLatch countDownLatch = new CountDownLatch(10);
            Assertions.assertEquals(0L, locateCounter.getValue());
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
            Objects.requireNonNull(newFixedThreadPool);
            runAfter(newFixedThreadPool::shutdownNow);
            for (int i = 0; i < 10; i++) {
                newFixedThreadPool.execute(() -> {
                    try {
                        try {
                            cyclicBarrier.await(10L, TimeUnit.SECONDS);
                            for (int i2 = 0; i2 < 2000; i2++) {
                                locateCounter.increment((Transaction) null, 2, 1L);
                                TransactionImpl transactionImpl = new TransactionImpl(this.server.getStorageManager());
                                locateCounter.increment(transactionImpl, 1, 1L);
                                transactionImpl.commit();
                                locateCounter.increment((Transaction) null, -1, -1L);
                                TransactionImpl transactionImpl2 = new TransactionImpl(this.server.getStorageManager());
                                locateCounter.increment(transactionImpl2, -1, -1L);
                                transactionImpl2.commit();
                            }
                        } catch (Exception e) {
                            logger.warn(e.getMessage(), e);
                            atomicInteger.incrementAndGet();
                            countDownLatch.countDown();
                        }
                    } finally {
                        countDownLatch.countDown();
                    }
                });
            }
            countDownLatch.await(1L, TimeUnit.MINUTES);
            Objects.requireNonNull(locateCounter);
            Wait.assertEquals(20000L, locateCounter::getValue, 5000L, 100L);
            this.server.stop();
            this.server.setRebuildCounters(false);
            this.server.start();
            PageSubscriptionCounter locateCounter2 = locateCounter(this.server.locateQueue("A1"));
            Objects.requireNonNull(locateCounter2);
            Wait.assertEquals(20000L, locateCounter2::getValue, 5000L, 100L);
            createSessionFactory.close();
            createSession.close();
        } catch (Throwable th) {
            createSessionFactory.close();
            createSession.close();
            throw th;
        }
    }

    @Test
    public void testMultiThreadCounter() throws Exception {
        ClientSessionFactory createSessionFactory = createSessionFactory(this.sl);
        ClientSession createSession = createSessionFactory.createSession();
        try {
            this.server.addAddressInfo(new AddressInfo(SimpleString.of("A1"), RoutingType.ANYCAST));
            PageSubscriptionCounter locateCounter = locateCounter(this.server.createQueue(QueueConfiguration.of(SimpleString.of("A1")).setRoutingType(RoutingType.ANYCAST)));
            CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
            CountDownLatch countDownLatch = new CountDownLatch(10);
            Assertions.assertEquals(0L, locateCounter.getValue());
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
            Objects.requireNonNull(newFixedThreadPool);
            runAfter(newFixedThreadPool::shutdownNow);
            for (int i = 0; i < 10; i++) {
                newFixedThreadPool.execute(() -> {
                    try {
                        try {
                            cyclicBarrier.await(10L, TimeUnit.SECONDS);
                            for (int i2 = 0; i2 < 2000; i2++) {
                                locateCounter.increment((Transaction) null, 1, 1L);
                                TransactionImpl transactionImpl = new TransactionImpl(this.server.getStorageManager());
                                locateCounter.increment(transactionImpl, 1, 1L);
                                transactionImpl.commit();
                            }
                        } catch (Exception e) {
                            logger.warn(e.getMessage(), e);
                            countDownLatch.countDown();
                        }
                    } finally {
                        countDownLatch.countDown();
                    }
                });
            }
            countDownLatch.await(1L, TimeUnit.MINUTES);
            Objects.requireNonNull(locateCounter);
            Wait.assertEquals(40000L, locateCounter::getValue, 5000L, 100L);
            this.server.stop();
            this.server.setRebuildCounters(false);
            this.server.start();
            PageSubscriptionCounter locateCounter2 = locateCounter(this.server.locateQueue("A1"));
            Objects.requireNonNull(locateCounter2);
            Wait.assertEquals(40000L, locateCounter2::getValue, 5000L, 100L);
            Assertions.assertEquals(40000L, locateCounter2.getValue());
            createSessionFactory.close();
            createSession.close();
        } catch (Throwable th) {
            createSessionFactory.close();
            createSession.close();
            throw th;
        }
    }

    @Test
    public void testCleanupCounter() throws Exception {
        ClientSessionFactory createSessionFactory = createSessionFactory(this.sl);
        ClientSession createSession = createSessionFactory.createSession();
        try {
            this.server.addAddressInfo(new AddressInfo(SimpleString.of("A1"), RoutingType.ANYCAST));
            PageSubscriptionCounter locateCounter = locateCounter(this.server.createQueue(QueueConfiguration.of(SimpleString.of("A1")).setRoutingType(RoutingType.ANYCAST)));
            StorageManager storageManager = this.server.getStorageManager();
            TransactionImpl transactionImpl = new TransactionImpl(this.server.getStorageManager());
            for (int i = 0; i < 2100; i++) {
                locateCounter.increment(transactionImpl, 1, 1000L);
                if (i % 200 == 0) {
                    transactionImpl.commit();
                    storageManager.waitOnOperations();
                    Objects.requireNonNull(locateCounter);
                    Wait.assertEquals(i + 1, locateCounter::getValue);
                    Objects.requireNonNull(locateCounter);
                    Wait.assertEquals((i + 1) * 1000, locateCounter::getPersistentSize);
                    transactionImpl = new TransactionImpl(this.server.getStorageManager());
                }
            }
            transactionImpl.commit();
            Objects.requireNonNull(locateCounter);
            Wait.assertEquals(2100L, locateCounter::getValue);
            Objects.requireNonNull(locateCounter);
            Wait.assertEquals(2100000L, locateCounter::getPersistentSize);
            this.server.stop();
            this.server = newActiveMQServer();
            this.server.setRebuildCounters(false);
            this.server.start();
            Queue locateQueue = this.server.locateQueue(SimpleString.of("A1"));
            Assertions.assertNotNull(locateQueue);
            PageSubscriptionCounter locateCounter2 = locateCounter(locateQueue);
            Assertions.assertEquals(2100L, locateCounter2.getValue());
            Assertions.assertEquals(2100000L, locateCounter2.getPersistentSize());
            this.server.getPagingManager().rebuildCounters((Set) null);
            Objects.requireNonNull(locateCounter2);
            Wait.assertEquals(0L, locateCounter2::getValue);
            createSessionFactory.close();
            createSession.close();
        } catch (Throwable th) {
            createSessionFactory.close();
            createSession.close();
            throw th;
        }
    }

    @Test
    public void testCleanupCounterNonPersistent() throws Exception {
        ClientSessionFactory createSessionFactory = createSessionFactory(this.sl);
        ClientSession createSession = createSessionFactory.createSession();
        try {
            this.server.addAddressInfo(new AddressInfo(SimpleString.of("A1"), RoutingType.ANYCAST));
            PageSubscriptionCounter locateCounter = locateCounter(this.server.createQueue(QueueConfiguration.of(SimpleString.of("A1")).setRoutingType(RoutingType.ANYCAST)));
            StorageManager storageManager = this.server.getStorageManager();
            TransactionImpl transactionImpl = new TransactionImpl(this.server.getStorageManager());
            for (int i = 0; i < 2100; i++) {
                locateCounter.increment(transactionImpl, 1, 1000L);
                if (i % 200 == 0) {
                    transactionImpl.commit();
                    storageManager.waitOnOperations();
                    Assertions.assertEquals(i + 1, locateCounter.getValue());
                    Assertions.assertEquals((i + 1) * 1000, locateCounter.getPersistentSize());
                    transactionImpl = new TransactionImpl(this.server.getStorageManager());
                }
            }
            transactionImpl.commit();
            storageManager.waitOnOperations();
            Assertions.assertEquals(2100L, locateCounter.getValue());
            Assertions.assertEquals(2100000L, locateCounter.getPersistentSize());
            this.server.stop();
            this.server = newActiveMQServer();
            this.server.start();
            Queue locateQueue = this.server.locateQueue(SimpleString.of("A1"));
            Assertions.assertNotNull(locateQueue);
            PageSubscriptionCounter locateCounter2 = locateCounter(locateQueue);
            Assertions.assertEquals(0L, locateCounter2.getValue());
            Assertions.assertEquals(0L, locateCounter2.getPersistentSize());
            createSessionFactory.close();
            createSession.close();
        } catch (Throwable th) {
            createSessionFactory.close();
            createSession.close();
            throw th;
        }
    }

    @Test
    public void testRestartCounter() throws Exception {
        this.server.addAddressInfo(new AddressInfo(SimpleString.of("A1"), RoutingType.ANYCAST));
        PageSubscriptionCounter locateCounter = locateCounter(this.server.createQueue(QueueConfiguration.of(SimpleString.of("A1")).setRoutingType(RoutingType.ANYCAST)));
        this.server.getStorageManager();
        TransactionImpl transactionImpl = new TransactionImpl(this.server.getStorageManager());
        locateCounter.increment(transactionImpl, 1, 1000L);
        Assertions.assertEquals(0L, locateCounter.getValue());
        Assertions.assertEquals(0L, locateCounter.getPersistentSize());
        transactionImpl.commit();
        Objects.requireNonNull(locateCounter);
        Wait.assertEquals(1L, locateCounter::getValue);
        Objects.requireNonNull(locateCounter);
        Wait.assertEquals(1000L, locateCounter::getPersistentSize);
        this.sl.close();
        this.server.stop();
        this.server = newActiveMQServer();
        this.server.setRebuildCounters(false);
        this.server.start();
        Queue locateQueue = this.server.locateQueue(SimpleString.of("A1"));
        Assertions.assertNotNull(locateQueue);
        PageSubscriptionCounter locateCounter2 = locateCounter(locateQueue);
        Objects.requireNonNull(locateCounter2);
        Wait.assertEquals(1L, locateCounter2::getValue);
        Objects.requireNonNull(locateCounter2);
        Wait.assertEquals(1000L, locateCounter2::getPersistentSize);
        locateCounter2.markRebuilding();
        Objects.requireNonNull(locateCounter2);
        Wait.assertEquals(1L, locateCounter2::getValue);
        TransactionImpl transactionImpl2 = new TransactionImpl(this.server.getStorageManager());
        locateCounter2.increment(transactionImpl2, 10, 10000L);
        transactionImpl2.commit();
        Objects.requireNonNull(locateCounter2);
        Wait.assertEquals(11L, locateCounter2::getValue);
        Objects.requireNonNull(locateCounter2);
        Wait.assertEquals(11000L, locateCounter2::getPersistentSize);
        locateCounter2.finishRebuild();
        this.server.getPagingManager().rebuildCounters((Set) null);
        Objects.requireNonNull(locateCounter2);
        Wait.assertEquals(0L, locateCounter2::getValue);
        Objects.requireNonNull(locateCounter2);
        Wait.assertEquals(0L, locateCounter2::getPersistentSize);
    }

    private PageSubscriptionCounter locateCounter(Queue queue) throws Exception {
        return this.server.getPagingManager().getPageStore(SimpleString.of("A1")).getCursorProvider().getSubscription(queue.getID().longValue()).getCounter();
    }

    @Test
    public void testCommitCounter() throws Exception {
        XidImpl newXID = newXID();
        PageSubscriptionCounter locateCounter = locateCounter(this.server.createQueue(QueueConfiguration.of(SimpleString.of("A1")).setRoutingType(RoutingType.ANYCAST)));
        StorageManager storageManager = this.server.getStorageManager();
        TransactionImpl transactionImpl = new TransactionImpl(newXID, this.server.getStorageManager(), ReplicationOrderTest.NUM);
        for (int i = 0; i < 2000; i++) {
            locateCounter.increment(transactionImpl, 1, 1000L);
        }
        Assertions.assertEquals(0L, locateCounter.getValue());
        transactionImpl.commit();
        storageManager.waitOnOperations();
        Assertions.assertEquals(2000L, locateCounter.getValue());
        this.server.stop();
        this.server = newActiveMQServer();
        this.server.setRebuildCounters(false);
        this.server.start();
        Queue locateQueue = this.server.locateQueue(SimpleString.of("A1"));
        Assertions.assertNotNull(locateQueue);
        PageSubscriptionCounter locateCounter2 = locateCounter(locateQueue);
        Objects.requireNonNull(locateCounter2);
        Wait.assertEquals(2000L, locateCounter2::getValue);
    }

    @Test
    public void testSendNoRebuild() throws Exception {
        Queue createQueue = this.server.createQueue(QueueConfiguration.of(SimpleString.of("A1")).setRoutingType(RoutingType.ANYCAST));
        createQueue.getPagingStore().startPaging();
        locateCounter(createQueue);
        Connection createConnection = CFUtil.createConnectionFactory("core", SimpleManagementTest.LOCALHOST).createConnection();
        try {
            Session createSession = createConnection.createSession(true, 0);
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue("A1"));
            for (int i = 0; i < 3000; i++) {
                createProducer.send(createSession.createTextMessage("i" + i));
            }
            createSession.commit();
            if (createConnection != null) {
                createConnection.close();
            }
            this.server.stop();
            this.server = newActiveMQServer();
            this.server.setRebuildCounters(false);
            this.server.start();
            Queue locateQueue = this.server.locateQueue(SimpleString.of("A1"));
            Assertions.assertNotNull(locateQueue);
            PageSubscriptionCounter locateCounter = locateCounter(locateQueue);
            logger.debug("Counter:: {}", Long.valueOf(locateQueue.getMessageCount()));
            Objects.requireNonNull(locateCounter);
            Wait.assertEquals(3000L, locateCounter::getValue);
            Objects.requireNonNull(locateQueue);
            Wait.assertEquals(3000L, locateQueue::getMessageCount, 1000L, 100L);
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private ActiveMQServer newActiveMQServer() throws Exception {
        OperationContextImpl.clearContext();
        ActiveMQServer createServer = super.createServer(true, true);
        createServer.getAddressSettingsRepository().addMatch("#", new AddressSettings().setPageSizeBytes(10240).setMaxSizeBytes(20480L).setMaxReadPageMessages(10));
        return createServer;
    }
}
