package org.apache.activemq.broker.region.cursors;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.IndirectMessageReference;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.command.Message;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.kahadb.plist.PList;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.kahadb.util.ByteSequence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/activemq-core-5.5.1-fuse-03-06.jar:org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.class */
public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
    static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursor.class);
    private static final AtomicLong NAME_COUNT = new AtomicLong();
    protected Broker broker;
    private final PListStore store;
    private final String name;
    private PendingList memoryList;
    private PList diskList;
    private Iterator<MessageReference> iter;
    private Destination regionDestination;
    private boolean iterating;
    private boolean flushRequired;
    private final AtomicBoolean started;
    private final WireFormat wireFormat;

    /* loaded from: input_file:WEB-INF/lib/activemq-core-5.5.1-fuse-03-06.jar:org/apache/activemq/broker/region/cursors/FilePendingMessageCursor$DiskIterator.class */
    final class DiskIterator implements Iterator<MessageReference> {
        private final PList.PListIterator iterator;

        DiskIterator() {
            try {
                this.iterator = FilePendingMessageCursor.this.getDiskList().iterator();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.iterator.hasNext();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public MessageReference next() {
            try {
                return FilePendingMessageCursor.this.getMessage(this.iterator.next().getByteSequence());
            } catch (IOException e) {
                FilePendingMessageCursor.LOG.error("I/O error", (Throwable) e);
                throw new RuntimeException(e);
            }
        }

        @Override // java.util.Iterator
        public void remove() {
            this.iterator.remove();
        }

        public void release() {
            this.iterator.release();
        }
    }

    public FilePendingMessageCursor(Broker broker, String str, boolean z) {
        super(z);
        this.started = new AtomicBoolean();
        this.wireFormat = new OpenWireFormat();
        if (this.prioritizedMessages) {
            this.memoryList = new PrioritizedPendingList();
        } else {
            this.memoryList = new OrderedPendingList();
        }
        this.broker = broker;
        this.store = broker.getTempDataStore();
        this.name = NAME_COUNT.incrementAndGet() + "_" + str;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.Service
    public void start() throws Exception {
        if (this.started.compareAndSet(false, true)) {
            super.start();
            if (this.systemUsage != null) {
                this.systemUsage.getMemoryUsage().addUsageListener(this);
            }
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.Service
    public void stop() throws Exception {
        if (this.started.compareAndSet(true, false)) {
            super.stop();
            if (this.systemUsage != null) {
                this.systemUsage.getMemoryUsage().removeUsageListener(this);
            }
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized boolean isEmpty() {
        if (this.memoryList.isEmpty() && isDiskListEmpty()) {
            return true;
        }
        Iterator<MessageReference> it = this.memoryList.iterator();
        while (it.hasNext()) {
            MessageReference next = it.next();
            if (next != QueueMessageReference.NULL_MESSAGE) {
                if (!next.isDropped()) {
                    return false;
                }
                it.remove();
            }
        }
        return isDiskListEmpty();
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void reset() {
        this.iterating = true;
        this.last = null;
        if (isDiskListEmpty()) {
            this.iter = this.memoryList.iterator();
        } else {
            this.iter = new DiskIterator();
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void release() {
        this.iterating = false;
        if (this.iter instanceof DiskIterator) {
            ((DiskIterator) this.iter).release();
        }
        if (this.flushRequired) {
            this.flushRequired = false;
            if (hasSpace()) {
                return;
            }
            flushToDisk();
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void destroy() throws Exception {
        stop();
        Iterator<MessageReference> it = this.memoryList.iterator();
        while (it.hasNext()) {
            ((Message) it.next()).decrementReferenceCount();
        }
        this.memoryList.clear();
        destroyDiskList();
    }

    private void destroyDiskList() throws Exception {
        if (this.diskList != null) {
            this.store.removePList(this.name);
            this.diskList = null;
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized LinkedList<MessageReference> pageInList(int i) {
        LinkedList<MessageReference> linkedList = new LinkedList<>();
        int i2 = 0;
        Iterator<MessageReference> it = this.memoryList.iterator();
        while (it.hasNext() && i2 < i) {
            MessageReference next = it.next();
            next.incrementReferenceCount();
            linkedList.add(next);
            i2++;
        }
        if (i2 < i && !isDiskListEmpty()) {
            DiskIterator diskIterator = new DiskIterator();
            while (diskIterator.hasNext() && i2 < i) {
                Message message = (Message) diskIterator.next();
                message.setRegionDestination(this.regionDestination);
                message.setMemoryUsage(getSystemUsage().getMemoryUsage());
                message.incrementReferenceCount();
                linkedList.add(message);
                i2++;
            }
        }
        return linkedList;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void addMessageLast(MessageReference messageReference) throws Exception {
        tryAddMessageLast(messageReference, 0L);
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized boolean tryAddMessageLast(MessageReference messageReference, long j) throws Exception {
        if (messageReference.isExpired()) {
            discardExpiredMessage(messageReference);
            return true;
        }
        try {
            this.regionDestination = messageReference.getMessage().getRegionDestination();
            if (isDiskListEmpty() && (hasSpace() || this.store == null)) {
                this.memoryList.addMessageLast(messageReference);
                messageReference.incrementReferenceCount();
                setCacheEnabled(true);
                return true;
            }
            if (!hasSpace() && isDiskListEmpty()) {
                expireOldMessages();
                if (hasSpace()) {
                    this.memoryList.addMessageLast(messageReference);
                    messageReference.incrementReferenceCount();
                    return true;
                }
                flushToDisk();
            }
            if (!this.systemUsage.getTempUsage().waitForSpace(j)) {
                return false;
            }
            getDiskList().addLast(messageReference.getMessageId().toString(), getByteSequence(messageReference.getMessage()));
            return true;
        } catch (Exception e) {
            LOG.error("Caught an Exception adding a message: " + messageReference + " first to FilePendingMessageCursor ", (Throwable) e);
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void addMessageFirst(MessageReference messageReference) {
        if (messageReference.isExpired()) {
            discardExpiredMessage(messageReference);
            return;
        }
        try {
            this.regionDestination = messageReference.getMessage().getRegionDestination();
            if (isDiskListEmpty() && hasSpace()) {
                this.memoryList.addMessageFirst(messageReference);
                messageReference.incrementReferenceCount();
                setCacheEnabled(true);
                return;
            }
            if (!hasSpace() && isDiskListEmpty()) {
                expireOldMessages();
                if (hasSpace()) {
                    this.memoryList.addMessageFirst(messageReference);
                    messageReference.incrementReferenceCount();
                    return;
                }
                flushToDisk();
            }
            this.systemUsage.getTempUsage().waitForSpace();
            messageReference.decrementReferenceCount();
            getDiskList().addFirst(messageReference.getMessageId().toString(), getByteSequence(messageReference.getMessage()));
        } catch (Exception e) {
            LOG.error("Caught an Exception adding a message: " + messageReference + " first to FilePendingMessageCursor ", (Throwable) e);
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized boolean hasNext() {
        return this.iter.hasNext();
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized MessageReference next() {
        MessageReference next = this.iter.next();
        this.last = next;
        if (!isDiskListEmpty()) {
            next.getMessage().setRegionDestination(this.regionDestination);
            next.getMessage().setMemoryUsage(getSystemUsage().getMemoryUsage());
        }
        next.incrementReferenceCount();
        return next;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void remove() {
        this.iter.remove();
        if (this.last != null) {
            this.last.decrementReferenceCount();
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void remove(MessageReference messageReference) {
        if (this.memoryList.remove(messageReference) != null) {
            messageReference.decrementReferenceCount();
        }
        if (isDiskListEmpty()) {
            return;
        }
        try {
            getDiskList().remove(messageReference.getMessageId().toString());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized int size() {
        return this.memoryList.size() + (isDiskListEmpty() ? 0 : (int) getDiskList().size());
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void clear() {
        this.memoryList.clear();
        if (!isDiskListEmpty()) {
            try {
                getDiskList().destroy();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        this.last = null;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized boolean isFull() {
        return super.isFull() || !(isDiskListEmpty() || this.systemUsage == null || !this.systemUsage.getTempUsage().isFull());
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public boolean hasMessagesBufferedToDeliver() {
        return !isEmpty();
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public void setSystemUsage(SystemUsage systemUsage) {
        super.setSystemUsage(systemUsage);
    }

    @Override // org.apache.activemq.usage.UsageListener
    public void onUsageChanged(Usage usage, int i, int i2) {
        if (i2 >= getMemoryUsageHighWaterMark()) {
            synchronized (this) {
                if (!this.flushRequired && size() != 0) {
                    this.flushRequired = true;
                    if (!this.iterating) {
                        expireOldMessages();
                        if (!hasSpace()) {
                            flushToDisk();
                            this.flushRequired = false;
                        }
                    }
                }
            }
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public boolean isTransient() {
        return true;
    }

    protected boolean isSpaceInMemoryList() {
        return hasSpace() && isDiskListEmpty();
    }

    protected synchronized void expireOldMessages() {
        if (this.memoryList.isEmpty()) {
            return;
        }
        Iterator<MessageReference> it = this.memoryList.iterator();
        while (it.hasNext()) {
            MessageReference next = it.next();
            if (next.isExpired()) {
                next.decrementReferenceCount();
                discardExpiredMessage(next);
                it.remove();
            }
        }
    }

    protected synchronized void flushToDisk() {
        if (this.memoryList.isEmpty() || this.store == null) {
            return;
        }
        long j = 0;
        if (LOG.isTraceEnabled()) {
            j = System.currentTimeMillis();
            LOG.trace("" + this.name + ", flushToDisk() mem list size: " + this.memoryList.size() + " " + (this.systemUsage != null ? this.systemUsage.getMemoryUsage() : ""));
        }
        for (MessageReference messageReference : this.memoryList) {
            messageReference.decrementReferenceCount();
            try {
                getDiskList().addLast(messageReference.getMessageId().toString(), getByteSequence(messageReference.getMessage()));
            } catch (IOException e) {
                LOG.error("Failed to write to disk list", (Throwable) e);
                throw new RuntimeException(e);
            }
        }
        this.memoryList.clear();
        setCacheEnabled(false);
        if (LOG.isTraceEnabled()) {
            LOG.trace("" + this.name + ", flushToDisk() done - " + (System.currentTimeMillis() - j) + "ms " + (this.systemUsage != null ? this.systemUsage.getMemoryUsage() : ""));
        }
    }

    protected boolean isDiskListEmpty() {
        return this.diskList == null || this.diskList.isEmpty();
    }

    protected PList getDiskList() {
        if (this.diskList == null) {
            try {
                this.diskList = this.store.getPList(this.name);
            } catch (Exception e) {
                LOG.error("Caught an IO Exception getting the DiskList " + this.name, (Throwable) e);
                throw new RuntimeException(e);
            }
        }
        return this.diskList;
    }

    private void discardExpiredMessage(MessageReference messageReference) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Discarding expired message " + messageReference);
        }
        if (this.broker.isExpired(messageReference)) {
            ConnectionContext connectionContext = new ConnectionContext(new NonCachedMessageEvaluationContext());
            connectionContext.setBroker(this.broker);
            messageReference.getRegionDestination().messageExpired(connectionContext, null, new IndirectMessageReference(messageReference.getMessage()));
        }
    }

    protected ByteSequence getByteSequence(Message message) throws IOException {
        org.apache.activemq.util.ByteSequence marshal = this.wireFormat.marshal(message);
        return new ByteSequence(marshal.data, marshal.offset, marshal.length);
    }

    protected Message getMessage(ByteSequence byteSequence) throws IOException {
        return (Message) this.wireFormat.unmarshal(new org.apache.activemq.util.ByteSequence(byteSequence.getData(), byteSequence.getOffset(), byteSequence.getLength()));
    }
}
