package org.apache.activemq.artemis.tests.integration.paging;

import jakarta.jms.Connection;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.Topic;
import java.lang.invoke.MethodHandles;
import java.util.Objects;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionCounterImpl;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionCounterImplAccessor;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/paging/PageCounterRebuildTest.class */
public class PageCounterRebuildTest extends ActiveMQTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Test
    public void testUnitSize() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        PageSubscriptionCounterImpl pageSubscriptionCounterImpl = new PageSubscriptionCounterImpl((StorageManager) Mockito.mock(StorageManager.class), -1L);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(33);
        Objects.requireNonNull(newFixedThreadPool);
        runAfter(newFixedThreadPool::shutdownNow);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(33);
        ReusableLatch reusableLatch = new ReusableLatch(33);
        for (int i = 0; i < 33; i++) {
            newFixedThreadPool.execute(() -> {
                try {
                    try {
                        cyclicBarrier.await(10L, TimeUnit.SECONDS);
                        for (int i2 = 0; i2 < 777; i2++) {
                            pageSubscriptionCounterImpl.increment((Transaction) null, 7, 17L);
                        }
                    } catch (Throwable th) {
                        logger.warn(th.getMessage(), th);
                        atomicInteger.incrementAndGet();
                        reusableLatch.countDown();
                    }
                } finally {
                    reusableLatch.countDown();
                }
            });
        }
        Assert.assertTrue(reusableLatch.await(10L, TimeUnit.SECONDS));
        Assert.assertEquals(179487L, pageSubscriptionCounterImpl.getValue());
        Assert.assertEquals(435897L, pageSubscriptionCounterImpl.getPersistentSize());
        reusableLatch.setCount(33);
        for (int i2 = 0; i2 < 33; i2++) {
            newFixedThreadPool.execute(() -> {
                try {
                    try {
                        cyclicBarrier.await(10L, TimeUnit.SECONDS);
                        for (int i3 = 0; i3 < 777; i3++) {
                            pageSubscriptionCounterImpl.increment((Transaction) null, -7, -17L);
                        }
                    } catch (Throwable th) {
                        logger.warn(th.getMessage(), th);
                        atomicInteger.incrementAndGet();
                        reusableLatch.countDown();
                    }
                } finally {
                    reusableLatch.countDown();
                }
            });
        }
        Assert.assertTrue(reusableLatch.await(10L, TimeUnit.SECONDS));
        Assert.assertEquals(0L, pageSubscriptionCounterImpl.getValue());
        Assert.assertEquals(0L, pageSubscriptionCounterImpl.getPersistentSize());
        Assert.assertEquals(0L, atomicInteger.get());
    }

    @Test
    public void testResetSubscriptionCounter() throws Exception {
        StorageManager storageManager = (StorageManager) Mockito.mock(StorageManager.class);
        PageSubscriptionCounterImpl pageSubscriptionCounterImpl = new PageSubscriptionCounterImpl(storageManager, 33L);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicLong atomicLong = new AtomicLong(1L);
        ((StorageManager) Mockito.doAnswer(new Answer<Long>() { // from class: org.apache.activemq.artemis.tests.integration.paging.PageCounterRebuildTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Long m227answer(InvocationOnMock invocationOnMock) throws Throwable {
                return Long.valueOf(atomicLong.incrementAndGet());
            }
        }).when(storageManager)).generateID();
        ((StorageManager) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.activemq.artemis.tests.integration.paging.PageCounterRebuildTest.2
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Void m228answer(InvocationOnMock invocationOnMock) throws Throwable {
                atomicInteger.incrementAndGet();
                return null;
            }
        }).when(storageManager)).commit(Mockito.anyLong());
        PageSubscriptionCounterImplAccessor.reset(pageSubscriptionCounterImpl);
        Assert.assertEquals(1L, atomicInteger.get());
    }

    @Test
    public void testRebuildCounter() throws Exception {
        ActiveMQServer createServer = createServer(true, true);
        createServer.getAddressSettingsRepository().addMatch("#", new AddressSettings().setPageSizeBytes(102400).setMaxReadPageMessages(1));
        createServer.start();
        String name = getName();
        String str = getName() + "_nonConsumed";
        createServer.addAddressInfo(new AddressInfo(name).addRoutingType(RoutingType.MULTICAST));
        createServer.createQueue(new QueueConfiguration(str).setAddress(name).setRoutingType(RoutingType.MULTICAST));
        createServer.createQueue(new QueueConfiguration(name).setRoutingType(RoutingType.MULTICAST));
        Queue locateQueue = createServer.locateQueue(name);
        Queue locateQueue2 = createServer.locateQueue(str);
        Assert.assertNotNull(locateQueue);
        Assert.assertNotNull(locateQueue2);
        locateQueue.getPagingStore().startPaging();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(4);
        Objects.requireNonNull(newFixedThreadPool);
        runAfter(newFixedThreadPool::shutdownNow);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(4);
        ReusableLatch reusableLatch = new ReusableLatch(4);
        for (int i = 0; i < 4; i++) {
            int i2 = i;
            newFixedThreadPool.execute(() -> {
                try {
                    try {
                        cyclicBarrier.await(10L, TimeUnit.SECONDS);
                        Connection createConnection = CFUtil.createConnectionFactory("core", "tcp://localhost:61616").createConnection();
                        try {
                            Session createSession = createConnection.createSession(false, 1);
                            try {
                                Session createSession2 = createConnection.createSession(true, 1);
                                try {
                                    logger.info("sending thread {}", Integer.valueOf(i2));
                                    Topic createTopic = createSession.createTopic(name);
                                    MessageProducer createProducer = createSession.createProducer(createTopic);
                                    MessageProducer createProducer2 = createSession2.createProducer(createTopic);
                                    for (int i3 = 0; i3 < 200; i3++) {
                                        createSession.createTextMessage("hello" + i3).setBooleanProperty("first", false);
                                        createProducer.send(createSession.createTextMessage("hello" + i3));
                                    }
                                    for (int i4 = 0; i4 < 2000; i4++) {
                                        createProducer2.send(createSession.createTextMessage("helloTX" + i4));
                                    }
                                    createSession2.commit();
                                    if (createSession2 != null) {
                                        createSession2.close();
                                    }
                                    if (createSession != null) {
                                        createSession.close();
                                    }
                                    if (createConnection != null) {
                                        createConnection.close();
                                    }
                                    reusableLatch.countDown();
                                } catch (Throwable th) {
                                    if (createSession2 != null) {
                                        try {
                                            createSession2.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    }
                                    throw th;
                                }
                            } catch (Throwable th3) {
                                if (createSession != null) {
                                    try {
                                        createSession.close();
                                    } catch (Throwable th4) {
                                        th3.addSuppressed(th4);
                                    }
                                }
                                throw th3;
                            }
                        } catch (Throwable th5) {
                            if (createConnection != null) {
                                try {
                                    createConnection.close();
                                } catch (Throwable th6) {
                                    th5.addSuppressed(th6);
                                }
                            }
                            throw th5;
                        }
                    } catch (Throwable th7) {
                        atomicInteger.incrementAndGet();
                        reusableLatch.countDown();
                    }
                } catch (Throwable th8) {
                    reusableLatch.countDown();
                    throw th8;
                }
            });
        }
        Assert.assertTrue(reusableLatch.await(1L, TimeUnit.MINUTES));
        Objects.requireNonNull(locateQueue);
        Wait.assertEquals(8800L, locateQueue::getMessageCount);
        Connection createConnection = CFUtil.createConnectionFactory("core", "tcp://localhost:61616").createConnection();
        try {
            Session createSession = createConnection.createSession(false, 1);
            try {
                MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(name + "::" + name));
                createConnection.start();
                for (int i3 = 0; i3 < 200; i3++) {
                    Assert.assertNotNull(createConsumer.receive(5000L));
                }
                if (createSession != null) {
                    createSession.close();
                }
                if (createConnection != null) {
                    createConnection.close();
                }
                Objects.requireNonNull(locateQueue);
                Wait.assertEquals(8600L, locateQueue::getMessageCount);
                Objects.requireNonNull(locateQueue2);
                Wait.assertEquals(8800L, locateQueue2::getMessageCount);
                createServer.stop();
                createServer.start();
                Queue locateQueue3 = createServer.locateQueue(name);
                Queue locateQueue4 = createServer.locateQueue(str);
                Objects.requireNonNull(locateQueue3);
                Wait.assertEquals(8600L, locateQueue3::getMessageCount);
                Objects.requireNonNull(locateQueue4);
                Wait.assertEquals(8800L, locateQueue4::getMessageCount);
                locateQueue3.getPageSubscription().getCounter().markRebuilding();
                locateQueue4.getPageSubscription().getCounter().markRebuilding();
                Assert.assertEquals(8600L, locateQueue3.getMessageCount());
                Assert.assertEquals(8800L, locateQueue4.getMessageCount());
                locateQueue3.getPageSubscription().getCounter().finishRebuild();
                locateQueue4.getPageSubscription().getCounter().finishRebuild();
                Assert.assertEquals(0L, locateQueue3.getMessageCount());
                Assert.assertEquals(0L, locateQueue4.getMessageCount());
                createServer.stop();
                createServer.start();
                Queue locateQueue5 = createServer.locateQueue(name);
                Queue locateQueue6 = createServer.locateQueue(str);
                Objects.requireNonNull(locateQueue5);
                Wait.assertEquals(8600L, locateQueue5::getMessageCount);
                Objects.requireNonNull(locateQueue6);
                Wait.assertEquals(8800L, locateQueue6::getMessageCount);
                createServer.stop();
                createServer.start();
                Queue locateQueue7 = createServer.locateQueue(name);
                Queue locateQueue8 = createServer.locateQueue(str);
                Assert.assertNotNull(locateQueue7);
                Assert.assertNotNull(locateQueue8);
                Objects.requireNonNull(locateQueue7);
                Wait.assertEquals(8600L, locateQueue7::getMessageCount);
                Objects.requireNonNull(locateQueue8);
                Wait.assertEquals(8800L, locateQueue8::getMessageCount);
                createServer.stop();
                createServer.start();
                logger.info("Consuming messages");
                createConnection = CFUtil.createConnectionFactory("core", "tcp://localhost:61616").createConnection();
                try {
                    createSession = createConnection.createSession(false, 1);
                    try {
                        MessageConsumer createConsumer2 = createSession.createConsumer(createSession.createQueue(name + "::" + name));
                        createConnection.start();
                        for (int i4 = 0; i4 < 8600; i4++) {
                            Assert.assertNotNull(createConsumer2.receive(5000L));
                            if (i4 % 100 == 0) {
                                logger.info("Received {} messages", Integer.valueOf(i4));
                            }
                        }
                        Assert.assertNull(createConsumer2.receiveNoWait());
                        createConsumer2.close();
                        MessageConsumer createConsumer3 = createSession.createConsumer(createSession.createQueue(name + "::" + str));
                        createConnection.start();
                        for (int i5 = 0; i5 < 8800; i5++) {
                            Assert.assertNotNull(createConsumer3.receive(5000L));
                        }
                        Assert.assertNull(createConsumer3.receiveNoWait());
                        createConsumer3.close();
                        if (createSession != null) {
                            createSession.close();
                        }
                        if (createConnection != null) {
                            createConnection.close();
                        }
                        Queue locateQueue9 = createServer.locateQueue(name);
                        Queue locateQueue10 = createServer.locateQueue(str);
                        Objects.requireNonNull(locateQueue9);
                        Wait.assertEquals(0L, locateQueue9::getMessageCount, 1000L, 100L);
                        Objects.requireNonNull(locateQueue10);
                        Wait.assertEquals(0L, locateQueue10::getMessageCount, 1000L, 100L);
                    } finally {
                        if (createSession != null) {
                            try {
                                createSession.close();
                            } catch (Throwable th) {
                                th.addSuppressed(th);
                            }
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }
}
