package org.apache.activemq.broker.region.cursors;

import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.class */
public class StoreQueueCursorOrderTest {
    private static final Logger LOG = LoggerFactory.getLogger(StoreQueueCursorOrderTest.class);
    BrokerService brokerService;
    static final String mesageIdRoot = "11111:22222:0:";
    ActiveMQQueue destination = new ActiveMQQueue("queue-" + StoreQueueCursorOrderTest.class.getSimpleName());
    final int messageBytesSize = 1024;
    final String text = new String(new byte[1024]);

    /* loaded from: input_file:org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest$TestMessageStore.class */
    class TestMessageStore extends AbstractMessageStore {
        final Message[] messages;
        public AtomicLong batch;

        public TestMessageStore(Message[] messageArr, ActiveMQDestination activeMQDestination) {
            super(activeMQDestination);
            this.batch = new AtomicLong();
            this.messages = messageArr;
        }

        public void addMessage(ConnectionContext connectionContext, Message message) throws IOException {
        }

        public Message getMessage(MessageId messageId) throws IOException {
            return null;
        }

        public void removeMessage(ConnectionContext connectionContext, MessageAck messageAck) throws IOException {
        }

        public void removeAllMessages(ConnectionContext connectionContext) throws IOException {
        }

        public void recover(MessageRecoveryListener messageRecoveryListener) throws Exception {
        }

        public int getMessageCount() throws IOException {
            return 0;
        }

        public void resetBatching() {
        }

        public void recoverNextMessages(int i, MessageRecoveryListener messageRecoveryListener) throws Exception {
            for (int intValue = this.batch.intValue(); intValue < this.messages.length; intValue++) {
                StoreQueueCursorOrderTest.LOG.info("recovered index:" + intValue);
                messageRecoveryListener.recoverMessage(this.messages[intValue]);
            }
        }

        public void setBatch(MessageId messageId) {
            this.batch.set(((Long) messageId.getFutureOrSequenceLong()).longValue());
            this.batch.incrementAndGet();
        }
    }

    @Before
    public void setUp() throws Exception {
        this.brokerService = createBroker();
        this.brokerService.setUseJmx(false);
        this.brokerService.deleteAllMessages();
        this.brokerService.start();
    }

    protected BrokerService createBroker() throws Exception {
        return new BrokerService();
    }

    @After
    public void tearDown() throws Exception {
        this.brokerService.stop();
    }

    @Test
    public void tesBlockedFuture() throws Exception {
        Message[] messageArr = new Message[2];
        TestMessageStore testMessageStore = new TestMessageStore(messageArr, this.destination);
        ConsumerInfo consumerInfo = new ConsumerInfo();
        DestinationStatistics destinationStatistics = new DestinationStatistics();
        consumerInfo.setExclusive(true);
        Queue queue = new Queue(this.brokerService, this.destination, testMessageStore, destinationStatistics, (TaskRunnerFactory) null);
        testMessageStore.start();
        testMessageStore.registerIndexListener(null);
        QueueStorePrefetch queueStorePrefetch = new QueueStorePrefetch(queue, this.brokerService.getBroker());
        SystemUsage systemUsage = new SystemUsage();
        systemUsage.getMemoryUsage().setLimit(1024L);
        queueStorePrefetch.setSystemUsage(systemUsage);
        queueStorePrefetch.setEnableAudit(false);
        queueStorePrefetch.start();
        Assert.assertTrue("cache enabled", queueStorePrefetch.isUseCache() && queueStorePrefetch.isCacheEnabled());
        ActiveMQTextMessage message = getMessage(0);
        messageArr[1] = message;
        message.setMemoryUsage(systemUsage.getMemoryUsage());
        message.setRecievedByDFBridge(true);
        message.getMessageId().setFutureOrSequenceLong(new FutureTask<Long>(new Runnable() { // from class: org.apache.activemq.broker.region.cursors.StoreQueueCursorOrderTest.1
            @Override // java.lang.Runnable
            public void run() {
            }
        }, 2L) { // from class: org.apache.activemq.broker.region.cursors.StoreQueueCursorOrderTest.2
        });
        queueStorePrefetch.addMessageLast(message);
        Assert.assertTrue("cache enabled", queueStorePrefetch.isUseCache() && queueStorePrefetch.isCacheEnabled());
        ActiveMQTextMessage message2 = getMessage(1);
        messageArr[0] = message2;
        message2.setMemoryUsage(systemUsage.getMemoryUsage());
        message2.getMessageId().setFutureOrSequenceLong(1L);
        queueStorePrefetch.addMessageLast(message2);
        Assert.assertTrue("cache is disabled as limit reached", !queueStorePrefetch.isCacheEnabled());
        Assert.assertEquals("setBatch unset", 0L, testMessageStore.batch.get());
        int i = 0;
        queueStorePrefetch.setMaxBatchSize(2);
        queueStorePrefetch.reset();
        while (queueStorePrefetch.hasNext() && i < 2) {
            MessageReference next = queueStorePrefetch.next();
            next.decrementReferenceCount();
            queueStorePrefetch.remove();
            LOG.info("Received message: {} with body: {}", next.getMessageId(), next.getMessage().getText());
            int i2 = i;
            i++;
            Assert.assertEquals(i2, next.getMessageId().getProducerSequenceId());
        }
        queueStorePrefetch.release();
        Assert.assertEquals(2L, i);
    }

    @Test
    public void testNoSetBatchWithUnOrderedFutureCurrentSync() throws Exception {
        Message[] messageArr = new Message[2];
        TestMessageStore testMessageStore = new TestMessageStore(messageArr, this.destination);
        ConsumerInfo consumerInfo = new ConsumerInfo();
        DestinationStatistics destinationStatistics = new DestinationStatistics();
        consumerInfo.setExclusive(true);
        Queue queue = new Queue(this.brokerService, this.destination, testMessageStore, destinationStatistics, (TaskRunnerFactory) null);
        testMessageStore.start();
        testMessageStore.registerIndexListener(null);
        QueueStorePrefetch queueStorePrefetch = new QueueStorePrefetch(queue, this.brokerService.getBroker());
        SystemUsage systemUsage = new SystemUsage();
        systemUsage.getMemoryUsage().setLimit(1024L);
        queueStorePrefetch.setSystemUsage(systemUsage);
        queueStorePrefetch.setEnableAudit(false);
        queueStorePrefetch.start();
        Assert.assertTrue("cache enabled", queueStorePrefetch.isUseCache() && queueStorePrefetch.isCacheEnabled());
        final ActiveMQTextMessage message = getMessage(0);
        messageArr[1] = message;
        message.setMemoryUsage(systemUsage.getMemoryUsage());
        message.setRecievedByDFBridge(true);
        FutureTask<Long> futureTask = new FutureTask<Long>(new Runnable() { // from class: org.apache.activemq.broker.region.cursors.StoreQueueCursorOrderTest.3
            @Override // java.lang.Runnable
            public void run() {
                message.getMessageId().setFutureOrSequenceLong(1L);
            }
        }, 1L) { // from class: org.apache.activemq.broker.region.cursors.StoreQueueCursorOrderTest.4
        };
        message.getMessageId().setFutureOrSequenceLong(futureTask);
        Executors.newSingleThreadExecutor().submit(futureTask);
        queueStorePrefetch.addMessageLast(message);
        Assert.assertTrue("cache enabled", queueStorePrefetch.isUseCache() && queueStorePrefetch.isCacheEnabled());
        ActiveMQTextMessage message2 = getMessage(1);
        messageArr[0] = message2;
        message2.setMemoryUsage(systemUsage.getMemoryUsage());
        message2.getMessageId().setFutureOrSequenceLong(0L);
        queueStorePrefetch.addMessageLast(message2);
        Assert.assertTrue("cache is disabled as limit reached", !queueStorePrefetch.isCacheEnabled());
        Assert.assertEquals("setBatch unset", 0L, testMessageStore.batch.get());
        int i = 0;
        queueStorePrefetch.setMaxBatchSize(2);
        queueStorePrefetch.reset();
        while (queueStorePrefetch.hasNext() && i < 2) {
            MessageReference next = queueStorePrefetch.next();
            next.decrementReferenceCount();
            queueStorePrefetch.remove();
            LOG.info("Received message: {} with body: {}", next.getMessageId(), next.getMessage().getText());
            int i2 = i;
            i++;
            Assert.assertEquals(i2, next.getMessageId().getProducerSequenceId());
        }
        queueStorePrefetch.release();
        Assert.assertEquals(2L, i);
    }

    @Test
    public void testSetBatchWithOrderedFutureCurrentFuture() throws Exception {
        Message[] messageArr = new Message[2];
        TestMessageStore testMessageStore = new TestMessageStore(messageArr, this.destination);
        ConsumerInfo consumerInfo = new ConsumerInfo();
        DestinationStatistics destinationStatistics = new DestinationStatistics();
        consumerInfo.setExclusive(true);
        Queue queue = new Queue(this.brokerService, this.destination, testMessageStore, destinationStatistics, (TaskRunnerFactory) null);
        testMessageStore.start();
        testMessageStore.registerIndexListener(null);
        QueueStorePrefetch queueStorePrefetch = new QueueStorePrefetch(queue, this.brokerService.getBroker());
        SystemUsage systemUsage = new SystemUsage();
        systemUsage.getMemoryUsage().setLimit(1024L);
        queueStorePrefetch.setSystemUsage(systemUsage);
        queueStorePrefetch.setEnableAudit(false);
        queueStorePrefetch.start();
        Assert.assertTrue("cache enabled", queueStorePrefetch.isUseCache() && queueStorePrefetch.isCacheEnabled());
        final ActiveMQTextMessage message = getMessage(0);
        messageArr[0] = message;
        message.setMemoryUsage(systemUsage.getMemoryUsage());
        message.setRecievedByDFBridge(true);
        FutureTask<Long> futureTask = new FutureTask<Long>(new Runnable() { // from class: org.apache.activemq.broker.region.cursors.StoreQueueCursorOrderTest.5
            @Override // java.lang.Runnable
            public void run() {
                message.getMessageId().setFutureOrSequenceLong(0L);
            }
        }, 0L) { // from class: org.apache.activemq.broker.region.cursors.StoreQueueCursorOrderTest.6
        };
        message.getMessageId().setFutureOrSequenceLong(futureTask);
        Executors.newSingleThreadExecutor().submit(futureTask);
        queueStorePrefetch.addMessageLast(message);
        Assert.assertTrue("cache enabled", queueStorePrefetch.isUseCache() && queueStorePrefetch.isCacheEnabled());
        final ActiveMQTextMessage message2 = getMessage(1);
        messageArr[1] = message2;
        message2.setMemoryUsage(systemUsage.getMemoryUsage());
        message2.setRecievedByDFBridge(true);
        FutureTask<Long> futureTask2 = new FutureTask<Long>(new Runnable() { // from class: org.apache.activemq.broker.region.cursors.StoreQueueCursorOrderTest.7
            @Override // java.lang.Runnable
            public void run() {
                message2.getMessageId().setFutureOrSequenceLong(1L);
            }
        }, 1L) { // from class: org.apache.activemq.broker.region.cursors.StoreQueueCursorOrderTest.8
        };
        message2.getMessageId().setFutureOrSequenceLong(futureTask2);
        Executors.newSingleThreadExecutor().submit(futureTask2);
        queueStorePrefetch.addMessageLast(message2);
        Assert.assertTrue("cache is disabled as limit reached", !queueStorePrefetch.isCacheEnabled());
        Assert.assertEquals("setBatch set", 1L, testMessageStore.batch.get());
        int i = 0;
        queueStorePrefetch.setMaxBatchSize(2);
        queueStorePrefetch.reset();
        while (queueStorePrefetch.hasNext() && i < 2) {
            MessageReference next = queueStorePrefetch.next();
            next.decrementReferenceCount();
            queueStorePrefetch.remove();
            LOG.info("Received message: {} with body: {}", next.getMessageId(), next.getMessage().getText());
            int i2 = i;
            i++;
            Assert.assertEquals(i2, next.getMessageId().getProducerSequenceId());
        }
        queueStorePrefetch.release();
        Assert.assertEquals(2L, i);
    }

    @Test
    public void testSetBatchWithFuture() throws Exception {
        Message[] messageArr = new Message[4];
        TestMessageStore testMessageStore = new TestMessageStore(messageArr, this.destination);
        ConsumerInfo consumerInfo = new ConsumerInfo();
        DestinationStatistics destinationStatistics = new DestinationStatistics();
        consumerInfo.setExclusive(true);
        Queue queue = new Queue(this.brokerService, this.destination, testMessageStore, destinationStatistics, (TaskRunnerFactory) null);
        testMessageStore.start();
        testMessageStore.registerIndexListener(null);
        QueueStorePrefetch queueStorePrefetch = new QueueStorePrefetch(queue, this.brokerService.getBroker());
        SystemUsage systemUsage = new SystemUsage();
        systemUsage.getMemoryUsage().setLimit(10240L);
        queueStorePrefetch.setSystemUsage(systemUsage);
        queueStorePrefetch.setEnableAudit(false);
        queueStorePrefetch.start();
        Assert.assertTrue("cache enabled", queueStorePrefetch.isUseCache() && queueStorePrefetch.isCacheEnabled());
        final ActiveMQTextMessage message = getMessage(0);
        messageArr[0] = message;
        message.setMemoryUsage(systemUsage.getMemoryUsage());
        message.setRecievedByDFBridge(true);
        FutureTask<Long> futureTask = new FutureTask<Long>(new Runnable() { // from class: org.apache.activemq.broker.region.cursors.StoreQueueCursorOrderTest.9
            @Override // java.lang.Runnable
            public void run() {
                message.getMessageId().setFutureOrSequenceLong(0L);
            }
        }, 0L) { // from class: org.apache.activemq.broker.region.cursors.StoreQueueCursorOrderTest.10
        };
        message.getMessageId().setFutureOrSequenceLong(futureTask);
        queueStorePrefetch.addMessageLast(message);
        Executors.newSingleThreadExecutor().submit(futureTask);
        final ActiveMQTextMessage message2 = getMessage(1);
        messageArr[3] = message2;
        message2.setMemoryUsage(systemUsage.getMemoryUsage());
        message2.setRecievedByDFBridge(true);
        FutureTask<Long> futureTask2 = new FutureTask<Long>(new Runnable() { // from class: org.apache.activemq.broker.region.cursors.StoreQueueCursorOrderTest.11
            @Override // java.lang.Runnable
            public void run() {
                message2.getMessageId().setFutureOrSequenceLong(3L);
            }
        }, 3L) { // from class: org.apache.activemq.broker.region.cursors.StoreQueueCursorOrderTest.12
        };
        message2.getMessageId().setFutureOrSequenceLong(futureTask2);
        queueStorePrefetch.addMessageLast(message2);
        ActiveMQTextMessage message3 = getMessage(2);
        messageArr[1] = message3;
        message3.setMemoryUsage(systemUsage.getMemoryUsage());
        message3.getMessageId().setFutureOrSequenceLong(1L);
        queueStorePrefetch.addMessageLast(message3);
        Assert.assertTrue("cache enabled", queueStorePrefetch.isUseCache() && queueStorePrefetch.isCacheEnabled());
        Executors.newSingleThreadExecutor().submit(futureTask2);
        ActiveMQTextMessage message4 = getMessage(3);
        messageArr[2] = message4;
        message4.setMemoryUsage(systemUsage.getMemoryUsage());
        message4.getMessageId().setFutureOrSequenceLong(2L);
        queueStorePrefetch.addMessageLast(message4);
        Assert.assertTrue("cache is disabled as limit reached", !queueStorePrefetch.isCacheEnabled());
        Assert.assertEquals("setBatch set", 2L, testMessageStore.batch.get());
        int i = 0;
        queueStorePrefetch.setMaxBatchSize(4);
        queueStorePrefetch.reset();
        while (queueStorePrefetch.hasNext() && i < 4) {
            MessageReference next = queueStorePrefetch.next();
            next.decrementReferenceCount();
            queueStorePrefetch.remove();
            LOG.info("Received message: {} with body: {}", next.getMessageId(), next.getMessage().getText());
            int i2 = i;
            i++;
            Assert.assertEquals(i2, next.getMessageId().getProducerSequenceId());
        }
        queueStorePrefetch.release();
        Assert.assertEquals(4L, i);
        ActiveMQTextMessage message5 = getMessage(4);
        message5.setMemoryUsage(systemUsage.getMemoryUsage());
        message5.getMessageId().setFutureOrSequenceLong(4L);
        queueStorePrefetch.addMessageLast(message5);
        Assert.assertTrue("cache enabled on empty store", queueStorePrefetch.isCacheEnabled());
    }

    @Test
    public void testSetBatch() throws Exception {
        Message[] messageArr = new Message[3];
        TestMessageStore testMessageStore = new TestMessageStore(messageArr, this.destination);
        ConsumerInfo consumerInfo = new ConsumerInfo();
        DestinationStatistics destinationStatistics = new DestinationStatistics();
        consumerInfo.setExclusive(true);
        Queue queue = new Queue(this.brokerService, this.destination, testMessageStore, destinationStatistics, (TaskRunnerFactory) null);
        testMessageStore.start();
        testMessageStore.registerIndexListener(null);
        QueueStorePrefetch queueStorePrefetch = new QueueStorePrefetch(queue, this.brokerService.getBroker());
        SystemUsage systemUsage = new SystemUsage();
        systemUsage.getMemoryUsage().setLimit(5120L);
        queueStorePrefetch.setSystemUsage(systemUsage);
        queueStorePrefetch.setEnableAudit(false);
        queueStorePrefetch.start();
        Assert.assertTrue("cache enabled", queueStorePrefetch.isUseCache() && queueStorePrefetch.isCacheEnabled());
        ActiveMQTextMessage message = getMessage(0);
        messageArr[0] = message;
        message.setMemoryUsage(systemUsage.getMemoryUsage());
        message.getMessageId().setFutureOrSequenceLong(0L);
        queueStorePrefetch.addMessageLast(message);
        ActiveMQTextMessage message2 = getMessage(1);
        messageArr[1] = message2;
        message2.setMemoryUsage(systemUsage.getMemoryUsage());
        message2.getMessageId().setFutureOrSequenceLong(1L);
        queueStorePrefetch.addMessageLast(message2);
        Assert.assertTrue("cache enabled", queueStorePrefetch.isUseCache() && queueStorePrefetch.isCacheEnabled());
        ActiveMQTextMessage message3 = getMessage(2);
        messageArr[2] = message3;
        message3.setMemoryUsage(systemUsage.getMemoryUsage());
        message3.getMessageId().setFutureOrSequenceLong(2L);
        queueStorePrefetch.addMessageLast(message3);
        Assert.assertTrue("cache is disabled as limit reached", !queueStorePrefetch.isCacheEnabled());
        Assert.assertEquals("setBatch set", 2L, testMessageStore.batch.get());
        int i = 0;
        queueStorePrefetch.setMaxBatchSize(2);
        queueStorePrefetch.reset();
        while (queueStorePrefetch.hasNext() && i < 3) {
            MessageReference next = queueStorePrefetch.next();
            next.decrementReferenceCount();
            queueStorePrefetch.remove();
            LOG.info("Received message: {} with body: {}", next.getMessageId(), next.getMessage().getText());
            int i2 = i;
            i++;
            Assert.assertEquals(i2, next.getMessageId().getProducerSequenceId());
        }
        queueStorePrefetch.release();
        Assert.assertEquals(3L, i);
    }

    private ActiveMQTextMessage getMessage(int i) throws Exception {
        ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();
        MessageId messageId = new MessageId(mesageIdRoot + i);
        messageId.setBrokerSequenceId(i);
        messageId.setProducerSequenceId(i);
        activeMQTextMessage.setMessageId(messageId);
        activeMQTextMessage.setDestination(this.destination);
        activeMQTextMessage.setPersistent(true);
        activeMQTextMessage.setResponseRequired(true);
        activeMQTextMessage.setText("Msg:" + i + " " + this.text);
        Assert.assertEquals(activeMQTextMessage.getMessageId().getProducerSequenceId(), i);
        return activeMQTextMessage;
    }
}
