package org.apache.activemq.artemis.tests.integration.amqp.connect;

import io.netty.util.collection.LongObjectHashMap;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.TopicSubscriber;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.journal.collections.JournalHashMap;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManagerProvider;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceIDSupplier;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.tests.integration.management.SimpleManagementTest;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.class */
public class AckManagerTest extends ActiveMQTestBase {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    ActiveMQServer server1;
    private static final String SNF_NAME = "$ACTIVEMQ_ARTEMIS_MIRROR_other";

    @Override // org.apache.activemq.artemis.tests.util.ActiveMQTestBase
    @BeforeEach
    public void setUp() throws Exception {
        super.setUp();
        this.server1 = createServer(true, createDefaultConfig(0, true), 100024, -1L, -1, -1);
        this.server1.getConfiguration().addAddressSetting(SNF_NAME, new AddressSettings().setMaxSizeBytes(-1L).setMaxSizeMessages(-1L).setMaxReadPageMessages(20));
        this.server1.getConfiguration().getAcceptorConfigurations().clear();
        this.server1.getConfiguration().addAcceptorConfiguration("server", SimpleManagementTest.LOCALHOST);
        this.server1.start();
    }

    @Test
    public void testDirectACK() throws Throwable {
        Connection createConnection;
        int i;
        Page usePage;
        SimpleString of = SimpleString.of("tp" + RandomUtil.randomString());
        this.server1.addAddressInfo(new AddressInfo(of).addRoutingType(RoutingType.MULTICAST));
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("AMQP", SimpleManagementTest.LOCALHOST);
        for (int i2 = 0; i2 < 5; i2++) {
            createConnection = createConnectionFactory.createConnection();
            try {
                createConnection.setClientID("c" + i2);
                Session createSession = createConnection.createSession(false, 1);
                createSession.createDurableSubscriber(createSession.createTopic(of.toString()), "s" + i2);
                if (createConnection != null) {
                    createConnection.close();
                }
            } finally {
            }
        }
        int i3 = 100;
        int i4 = 200;
        Queue locateQueue = this.server1.locateQueue("c1.s1");
        Assertions.assertNotNull(locateQueue);
        Queue locateQueue2 = this.server1.locateQueue("c2.s2");
        Assertions.assertNotNull(locateQueue2);
        PagingStore pageStore = this.server1.getPagingManager().getPageStore(of);
        pageStore.startPaging();
        createConnection = createConnectionFactory.createConnection();
        try {
            Session createSession2 = createConnection.createSession(true, 0);
            MessageProducer createProducer = createSession2.createProducer(createSession2.createTopic(of.toString()));
            for (int i5 = 0; i5 < 500; i5++) {
                TextMessage createTextMessage = createSession2.createTextMessage("hello " + i5);
                createTextMessage.setIntProperty("i", i5);
                createProducer.send(createTextMessage);
                if ((i5 + 1) % 100 == 0) {
                    locateQueue.pause();
                    locateQueue2.pause();
                    createSession2.commit();
                }
            }
            createSession2.commit();
            if (createConnection != null) {
                createConnection.close();
            }
            ReferenceIDSupplier referenceIDSupplier = new ReferenceIDSupplier(this.server1);
            AckManager manager = AckManagerProvider.getManager(this.server1);
            AtomicInteger atomicInteger = new AtomicInteger(0);
            long firstPage = pageStore.getFirstPage();
            while (true) {
                long j = firstPage;
                if (j > pageStore.getCurrentWritingPage()) {
                    break;
                }
                usePage = pageStore.usePage(j);
                try {
                    usePage.getMessages().forEach(pagedMessage -> {
                        int incrementAndGet = atomicInteger.incrementAndGet();
                        if (incrementAndGet <= i3) {
                            manager.addRetry(referenceIDSupplier.getServerID(pagedMessage.getMessage()), locateQueue, referenceIDSupplier.getID(pagedMessage.getMessage()).longValue(), AckReason.NORMAL);
                        }
                        if (incrementAndGet <= i4) {
                            manager.addRetry(referenceIDSupplier.getServerID(pagedMessage.getMessage()), locateQueue2, referenceIDSupplier.getID(pagedMessage.getMessage()).longValue(), AckReason.NORMAL);
                        }
                    });
                    usePage.usageDown();
                    firstPage = j + 1;
                } finally {
                }
            }
            for (int i6 = 0; i6 < 2; i6++) {
                logger.info("Repeating {}", Integer.valueOf(i6));
                AckManager manager2 = AckManagerProvider.getManager(this.server1);
                manager2.start();
                HashMap sortRetries = manager2.sortRetries();
                Assertions.assertEquals(1, sortRetries.size());
                LongObjectHashMap longObjectHashMap = (LongObjectHashMap) sortRetries.get(locateQueue.getAddress());
                Assertions.assertEquals(2, longObjectHashMap.size());
                JournalHashMap journalHashMap = (JournalHashMap) longObjectHashMap.get(locateQueue.getID());
                JournalHashMap journalHashMap2 = (JournalHashMap) longObjectHashMap.get(locateQueue2.getID());
                Assertions.assertEquals(100, journalHashMap.size());
                Assertions.assertEquals(200, journalHashMap2.size());
                Objects.requireNonNull(locateQueue);
                Wait.assertEquals(500, locateQueue::getMessageCount);
                Objects.requireNonNull(locateQueue2);
                Wait.assertEquals(500, locateQueue2::getMessageCount);
                AckManager manager3 = AckManagerProvider.getManager(this.server1);
                this.server1.stop();
                Assertions.assertEquals(0, AckManagerProvider.getSize());
                this.server1.start();
                AckManager manager4 = AckManagerProvider.getManager(this.server1);
                Assertions.assertEquals(1, AckManagerProvider.getSize());
                Assertions.assertNotSame(manager3, AckManagerProvider.getManager(this.server1));
                AckManager manager5 = AckManagerProvider.getManager(this.server1);
                Objects.requireNonNull(manager5);
                Wait.assertTrue(manager5::isStarted, 5000L);
                Assertions.assertEquals(1, AckManagerProvider.getSize());
                Assertions.assertNotSame(manager4, manager2);
            }
            AckManager manager6 = AckManagerProvider.getManager(this.server1);
            manager6.start();
            HashMap sortRetries2 = manager6.sortRetries();
            Assertions.assertEquals(1, sortRetries2.size());
            LongObjectHashMap longObjectHashMap2 = (LongObjectHashMap) sortRetries2.get(locateQueue.getAddress());
            JournalHashMap journalHashMap3 = (JournalHashMap) longObjectHashMap2.get(locateQueue.getID());
            JournalHashMap journalHashMap4 = (JournalHashMap) longObjectHashMap2.get(locateQueue2.getID());
            Wait.assertEquals(0, () -> {
                return journalHashMap3.size();
            });
            Wait.assertEquals(0, () -> {
                return journalHashMap4.size();
            });
            for (int i7 = 0; i7 < 5; i7++) {
                AtomicInteger atomicInteger2 = new AtomicInteger(0);
                long firstPage2 = pageStore.getFirstPage();
                while (true) {
                    long j2 = firstPage2;
                    if (j2 <= pageStore.getCurrentWritingPage()) {
                        usePage = pageStore.usePage(j2);
                        try {
                            usePage.getMessages().forEach(pagedMessage2 -> {
                                int incrementAndGet = atomicInteger2.incrementAndGet();
                                if (incrementAndGet <= i3) {
                                    manager6.addRetry(referenceIDSupplier.getServerID(pagedMessage2.getMessage()), locateQueue, referenceIDSupplier.getID(pagedMessage2.getMessage()).longValue(), AckReason.NORMAL);
                                }
                                if (incrementAndGet <= i4) {
                                    manager6.addRetry(referenceIDSupplier.getServerID(pagedMessage2.getMessage()), locateQueue2, referenceIDSupplier.getID(pagedMessage2.getMessage()).longValue(), AckReason.NORMAL);
                                }
                            });
                            usePage.usageDown();
                            firstPage2 = j2 + 1;
                        } finally {
                        }
                    }
                }
                Wait.assertEquals(0, () -> {
                    return journalHashMap3.size();
                });
                Wait.assertEquals(0, () -> {
                    return journalHashMap4.size();
                });
            }
            locateQueue.resume();
            locateQueue2.resume();
            for (int i8 = 0; i8 < 5; i8++) {
                Connection createConnection2 = createConnectionFactory.createConnection();
                try {
                    createConnection2.setClientID("c" + i8);
                    createConnection2.start();
                    Session createSession3 = createConnection2.createSession(false, 1);
                    TopicSubscriber createDurableSubscriber = createSession3.createDurableSubscriber(createSession3.createTopic(of.toString()), "s" + i8);
                    switch (i8) {
                        case 1:
                            i = 100;
                            break;
                        case 2:
                            i = 200;
                            break;
                        default:
                            i = 0;
                            break;
                    }
                    logger.debug("receiving messages for {}", Integer.valueOf(i8));
                    for (int i9 = i; i9 < 500; i9++) {
                        logger.debug("Receiving i={}, m={}", Integer.valueOf(i8), Integer.valueOf(i9));
                        TextMessage receive = createDurableSubscriber.receive(5000L);
                        Assertions.assertNotNull(receive);
                        Assertions.assertEquals("hello " + i9, receive.getText());
                        Assertions.assertEquals(i9, receive.getIntProperty("i"));
                    }
                    Assertions.assertNull(createDurableSubscriber.receiveNoWait());
                    if (createConnection2 != null) {
                        createConnection2.close();
                    }
                } finally {
                    if (createConnection2 != null) {
                        try {
                            createConnection2.close();
                        } catch (Throwable th) {
                            th.addSuppressed(th);
                        }
                    }
                }
            }
            this.server1.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10000);
            Assertions.assertEquals(0, getCounter((byte) 53, countJournal(this.server1.getConfiguration())));
            Assertions.assertEquals(1, AckManagerProvider.getSize());
            Assertions.assertNotNull(this.server1.locateQueue("c1.s1"));
            manager6.addRetry(referenceIDSupplier.getDefaultNodeID(), locateQueue, 10000000L, AckReason.NORMAL);
            manager6.addRetry(referenceIDSupplier.getDefaultNodeID(), locateQueue, 10000001L, AckReason.NORMAL);
            Wait.assertTrue(() -> {
                return manager6.sortRetries().isEmpty();
            }, 5000L);
            this.server1.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10000);
            Assertions.assertEquals(0, getCounter((byte) 53, countJournal(this.server1.getConfiguration())));
            this.server1.stop();
            Assertions.assertEquals(0, AckManagerProvider.getSize());
        } finally {
        }
    }

    @Test
    public void testRetryFromPaging() throws Throwable {
        Connection createConnection;
        SimpleString of = SimpleString.of("tp" + RandomUtil.randomString());
        this.server1.addAddressInfo(new AddressInfo(of).addRoutingType(RoutingType.MULTICAST));
        ConnectionFactory createConnectionFactory = CFUtil.createConnectionFactory("AMQP", SimpleManagementTest.LOCALHOST);
        for (int i = 0; i < 2; i++) {
            createConnection = createConnectionFactory.createConnection();
            try {
                createConnection.setClientID("c" + i);
                Session createSession = createConnection.createSession(false, 1);
                createSession.createDurableSubscriber(createSession.createTopic(of.toString()), "s" + i);
                if (createConnection != null) {
                    createConnection.close();
                }
            } finally {
            }
        }
        int i2 = 100;
        int i3 = 14999;
        Queue locateQueue = this.server1.locateQueue("c0.s0");
        Assertions.assertNotNull(locateQueue);
        Queue locateQueue2 = this.server1.locateQueue("c1.s1");
        Assertions.assertNotNull(locateQueue2);
        PagingStore pageStore = this.server1.getPagingManager().getPageStore(of);
        pageStore.startPaging();
        createConnection = createConnectionFactory.createConnection();
        try {
            Session createSession2 = createConnection.createSession(true, 0);
            MessageProducer createProducer = createSession2.createProducer(createSession2.createTopic(of.toString()));
            for (int i4 = 0; i4 < 15000; i4++) {
                TextMessage createTextMessage = createSession2.createTextMessage("hello " + i4);
                createTextMessage.setIntProperty("i", i4);
                createProducer.send(createTextMessage);
                if ((i4 + 1) % 100 == 0) {
                    locateQueue2.pause();
                    locateQueue.pause();
                    createSession2.commit();
                }
            }
            createSession2.commit();
            if (createConnection != null) {
                createConnection.close();
            }
            ReferenceIDSupplier referenceIDSupplier = new ReferenceIDSupplier(this.server1);
            AckManager manager = AckManagerProvider.getManager(this.server1);
            manager.stop();
            AtomicInteger atomicInteger = new AtomicInteger(0);
            long firstPage = pageStore.getFirstPage();
            while (true) {
                long j = firstPage;
                if (j > pageStore.getCurrentWritingPage()) {
                    this.server1.stop();
                    this.server1.start();
                    Queue locateQueue3 = this.server1.locateQueue("c0.s0");
                    Assertions.assertNotNull(locateQueue3);
                    Queue locateQueue4 = this.server1.locateQueue("c1.s1");
                    Assertions.assertNotNull(locateQueue4);
                    Objects.requireNonNull(locateQueue4);
                    Wait.assertEquals(15000 - 14999, locateQueue4::getMessageCount, 10000L);
                    Objects.requireNonNull(locateQueue4);
                    Wait.assertEquals(14999, locateQueue4::getMessagesAcknowledged, 10000L);
                    Objects.requireNonNull(locateQueue3);
                    Wait.assertEquals(15000 - 100, locateQueue3::getMessageCount, 10000L);
                    Objects.requireNonNull(locateQueue3);
                    Wait.assertEquals(100, locateQueue3::getMessagesAcknowledged, 10000L);
                    this.server1.stop();
                    Assertions.assertEquals(0, AckManagerProvider.getSize());
                    return;
                }
                Page usePage = pageStore.usePage(j);
                try {
                    usePage.getMessages().forEach(pagedMessage -> {
                        int incrementAndGet = atomicInteger.incrementAndGet();
                        if (incrementAndGet <= i2) {
                            manager.addRetry(referenceIDSupplier.getServerID(pagedMessage.getMessage()), locateQueue, referenceIDSupplier.getID(pagedMessage.getMessage()).longValue(), AckReason.NORMAL);
                        }
                        if (incrementAndGet <= i3) {
                            manager.addRetry(referenceIDSupplier.getServerID(pagedMessage.getMessage()), locateQueue2, referenceIDSupplier.getID(pagedMessage.getMessage()).longValue(), AckReason.NORMAL);
                        }
                    });
                    usePage.usageDown();
                    firstPage = j + 1;
                } catch (Throwable th) {
                    usePage.usageDown();
                    throw th;
                }
            }
        } finally {
        }
    }

    private int getCounter(byte b, HashMap<Integer, AtomicInteger> hashMap) {
        AtomicInteger atomicInteger = hashMap.get(Integer.valueOf(b));
        if (atomicInteger == null) {
            return 0;
        }
        return atomicInteger.get();
    }

    protected static AMQPMirrorControllerTarget locateMirrorTarget(ActiveMQServer activeMQServer) {
        for (ActiveMQProtonRemotingConnection activeMQProtonRemotingConnection : ((ActiveMQServerImpl) activeMQServer).getRemotingService().getConnections()) {
            if (activeMQProtonRemotingConnection instanceof ActiveMQProtonRemotingConnection) {
                Iterator it = activeMQProtonRemotingConnection.getAmqpConnection().getSessions().values().iterator();
                while (it.hasNext()) {
                    for (AMQPMirrorControllerTarget aMQPMirrorControllerTarget : ((AMQPSessionContext) it.next()).getReceivers().values()) {
                        if (aMQPMirrorControllerTarget instanceof AMQPMirrorControllerTarget) {
                            return aMQPMirrorControllerTarget;
                        }
                    }
                }
            }
        }
        return null;
    }

    private int acksCount(File file) throws Exception {
        AtomicInteger atomicInteger = countJournal(file, 10485760, 2, 2).get(39);
        if (atomicInteger != null) {
            return atomicInteger.get();
        }
        return 0;
    }
}
