package org.apache.activemq.artemis.tests.integration.paging;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/activemq/artemis/tests/integration/paging/PagingReceiveTest.class */
public class PagingReceiveTest extends ActiveMQTestBase {
    private static final SimpleString ADDRESS = new SimpleString("catalog-service.price.change.bm");
    private ActiveMQServer server;
    private ServerLocator locator;
    private int numMsgs = 100;

    protected boolean isNetty() {
        return false;
    }

    @Test
    public void testReceive() throws Exception {
        assertNotNull("Message not found.", receiveMessage());
    }

    @Test
    public void testReceiveThenCheckCounter() throws Exception {
        Queue locateQueue = this.server.locateQueue(ADDRESS);
        assertEquals(this.numMsgs, locateQueue.getMessagesAdded());
        receiveAllMessages();
        locateQueue.getPageSubscription().scheduleCleanupCheck();
        AtomicInteger scheduledCleanupCount = locateQueue.getPageSubscription().getScheduledCleanupCount();
        Objects.requireNonNull(scheduledCleanupCount);
        Wait.assertEquals(0, scheduledCleanupCount::get);
        assertEquals(this.numMsgs, locateQueue.getMessagesAdded());
    }

    @Test
    public void testReceiveTx() throws Exception {
        receiveAllMessagesTxAndPageCheckPendingTx();
    }

    private void receiveAllMessagesTxAndPageCheckPendingTx() throws Exception {
        ClientSession createSession = createSessionFactory(this.locator).createSession((String) null, (String) null, false, true, false, false, 0);
        createSession.start();
        ClientConsumer createConsumer = createSession.createConsumer(ADDRESS);
        for (int i = 0; i < this.numMsgs; i++) {
            ClientMessage receive = createConsumer.receive(2000L);
            assertNotNull(receive);
            receive.acknowledge();
        }
        PagingStore pageStore = this.server.getPagingManager().getPageStore(ADDRESS);
        PageSubscriptionImpl subscription = pageStore.getCursorProvider().getSubscription(this.server.locateQueue(ADDRESS).getID().longValue());
        long currentWritingPage = pageStore.getCurrentWritingPage();
        Wait.assertTrue(() -> {
            return ((PageSubscriptionImpl) subscription).getPageInfo(currentWritingPage).getPendingTx() > 0;
        });
        subscription.getPageInfo(currentWritingPage);
        createSession.commit();
        createSession.close();
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.server = internalCreateServer();
        this.server.addAddressInfo(new AddressInfo(ADDRESS, RoutingType.ANYCAST));
        Queue createQueue = this.server.createQueue(new QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
        createQueue.getPageSubscription().getPagingStore().startPaging();
        for (int i = 0; i < 10; i++) {
            createQueue.getPageSubscription().getPagingStore().forceAnotherPage();
        }
        this.locator.setBlockOnNonDurableSend(false).setBlockOnAcknowledge(false);
        ClientSession createSession = createSessionFactory(this.locator).createSession((String) null, (String) null, false, true, true, false, 0);
        ClientProducer createProducer = createSession.createProducer(ADDRESS);
        for (int i2 = 0; i2 < this.numMsgs; i2++) {
            ClientMessage createMessage = createSession.createMessage(true);
            createMessage.putIntProperty("key", i2);
            createProducer.send(createMessage);
            if (i2 > 0 && i2 % 10 == 0) {
                createSession.commit();
            }
        }
        createSession.close();
        this.locator.close();
        this.server.stop();
        this.server = internalCreateServer();
    }

    private ActiveMQServer internalCreateServer() throws Exception {
        ActiveMQServer newActiveMQServer = newActiveMQServer();
        newActiveMQServer.start();
        waitForServerToStart(newActiveMQServer);
        this.locator = createFactory(isNetty());
        return newActiveMQServer;
    }

    private void receiveAllMessages() throws Exception {
        ClientSession createSession = createSessionFactory(this.locator).createSession((String) null, (String) null, false, true, true, false, 0);
        createSession.start();
        ClientConsumer createConsumer = createSession.createConsumer(ADDRESS);
        for (int i = 0; i < this.numMsgs; i++) {
            ClientMessage receive = createConsumer.receive(2000L);
            assertNotNull(receive);
            receive.acknowledge();
        }
        createSession.commit();
        createSession.close();
    }

    private ClientMessage receiveMessage() throws Exception {
        ClientSession createSession = createSessionFactory(this.locator).createSession((String) null, (String) null, false, true, true, false, 0);
        createSession.start();
        ClientConsumer createConsumer = createSession.createConsumer(ADDRESS);
        ClientMessage receive = createConsumer.receive(1000L);
        createSession.commit();
        if (receive != null) {
            receive.acknowledge();
        }
        createConsumer.close();
        createSession.close();
        return receive;
    }

    private ActiveMQServer newActiveMQServer() throws Exception {
        ActiveMQServer createServer = createServer(true, isNetty());
        createServer.getAddressSettingsRepository().addMatch("#", new AddressSettings().setMaxSizeBytes(67108864L).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxRedeliveryDelay(3600000L).setRedeliveryMultiplier(2.0d).setRedeliveryDelay(500L));
        return createServer;
    }
}
