package org.apache.activemq.broker.region.cursors;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:activemq-broker-5.11.0.redhat-630450.jar:org/apache/activemq/broker/region/cursors/AbstractStoreCursor.class */
public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener {
    protected final Destination regionDestination;
    protected final PendingList batchList;
    private Iterator<MessageReference> iterator;
    protected boolean batchResetNeeded;
    protected int size;
    private LinkedList<MessageId> pendingCachedIds;
    final MessageId[] lastCachedIds;
    protected boolean hadSpace;
    final LinkedList<Message> duplicatesFromStore;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AbstractStoreCursor.class);
    private static int SYNC_ADD = 0;
    private static int ASYNC_ADD = 1;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractStoreCursor(Destination destination) {
        super(destination != null ? destination.isPrioritizedMessages() : false);
        this.iterator = null;
        this.batchResetNeeded = false;
        this.pendingCachedIds = new LinkedList<>();
        this.lastCachedIds = new MessageId[2];
        this.hadSpace = false;
        this.duplicatesFromStore = new LinkedList<>();
        this.regionDestination = destination;
        if (this.prioritizedMessages) {
            this.batchList = new PrioritizedPendingList();
        } else {
            this.batchList = new OrderedPendingList();
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.Service
    public final synchronized void start() throws Exception {
        if (isStarted()) {
            return;
        }
        super.start();
        resetBatch();
        resetSize();
        setCacheEnabled(this.size == 0 && this.useCache);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void resetSize() {
        this.size = getStoreSize();
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public void rebase() {
        MessageId messageId = this.lastCachedIds[SYNC_ADD];
        if (messageId != null) {
            try {
                setBatch(messageId);
            } catch (Exception e) {
                LOG.error("{} - Failed to set batch on rebase", this, e);
                throw new RuntimeException(e);
            }
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.Service
    public final synchronized void stop() throws Exception {
        resetBatch();
        super.stop();
        gc();
    }

    @Override // org.apache.activemq.store.MessageRecoveryListener
    public final boolean recoverMessage(Message message) throws Exception {
        return recoverMessage(message, false);
    }

    public synchronized boolean recoverMessage(Message message, boolean z) throws Exception {
        boolean z2 = false;
        message.setRegionDestination(this.regionDestination);
        if (recordUniqueId(message.getMessageId())) {
            if (!z && message.getMemoryUsage() == null) {
                message.setMemoryUsage(getSystemUsage().getMemoryUsage());
            }
            message.incrementReferenceCount();
            this.batchList.addMessageLast(message);
            clearIterator(true);
            z2 = true;
        } else if (z) {
            LOG.warn("{} - cursor got duplicate send {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
            if (gotToTheStore(message)) {
                duplicate(message);
            }
        } else if (!duplicateFromStoreExcepted(message)) {
            LOG.warn("{} - cursor got duplicate from store {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
            duplicate(message);
        } else if (LOG.isTraceEnabled()) {
            LOG.trace("{} store replayed pending message due to concurrentStoreAndDispatchQueues {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
        }
        return z2;
    }

    protected boolean duplicateFromStoreExcepted(Message message) {
        return message.isRecievedByDFBridge();
    }

    public static boolean gotToTheStore(Message message) throws Exception {
        if (!message.isRecievedByDFBridge()) {
            return true;
        }
        Object futureOrSequenceLong = message.getMessageId().getFutureOrSequenceLong();
        if (futureOrSequenceLong instanceof Future) {
            try {
                ((Future) futureOrSequenceLong).get();
            } catch (Exception e) {
            }
        }
        Object futureOrSequenceLong2 = message.getMessageId().getFutureOrSequenceLong();
        return (futureOrSequenceLong2 == null || !(futureOrSequenceLong2 instanceof Long) || Long.compare(((Long) futureOrSequenceLong2).longValue(), -1L) == 0) ? false : true;
    }

    private void duplicate(Message message) {
        this.duplicatesFromStore.add(message);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void dealWithDuplicates() {
        Iterator<Message> it = this.duplicatesFromStore.iterator();
        while (it.hasNext()) {
            this.regionDestination.duplicateFromStore(it.next(), getSubscription());
        }
        this.duplicatesFromStore.clear();
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public final synchronized void reset() {
        if (this.batchList.isEmpty()) {
            try {
                fillBatch();
            } catch (Exception e) {
                LOG.error("{} - Failed to fill batch", this, e);
                throw new RuntimeException(e);
            }
        }
        clearIterator(true);
        size();
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void release() {
        clearIterator(false);
    }

    private synchronized void clearIterator(boolean z) {
        boolean z2 = this.iterator != null;
        this.iterator = null;
        if (z2 && z) {
            ensureIterator();
        }
    }

    private synchronized void ensureIterator() {
        if (this.iterator == null) {
            this.iterator = this.batchList.iterator();
        }
    }

    public final void finished() {
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public final synchronized boolean hasNext() {
        if (this.batchList.isEmpty()) {
            try {
                fillBatch();
            } catch (Exception e) {
                LOG.error("{} - Failed to fill batch", this, e);
                throw new RuntimeException(e);
            }
        }
        ensureIterator();
        return this.iterator.hasNext();
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public final synchronized MessageReference next() {
        MessageReference messageReference = null;
        if (!this.batchList.isEmpty() && this.iterator.hasNext()) {
            messageReference = this.iterator.next();
        }
        this.last = messageReference;
        if (messageReference != null) {
            messageReference.incrementReferenceCount();
        }
        return messageReference;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized boolean addMessageLast(MessageReference messageReference) throws Exception {
        boolean z = false;
        if (!hasSpace()) {
            z = true;
        } else if (isCacheEnabled()) {
            if (!recoverMessage(messageReference.getMessage(), true)) {
                dealWithDuplicates();
                return false;
            }
            trackLastCached(messageReference);
        }
        if (z && isCacheEnabled()) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("{} - disabling cache on add {} {}", this, messageReference.getMessageId(), messageReference.getMessageId().getFutureOrSequenceLong());
            }
            syncWithStore(messageReference.getMessage());
            setCacheEnabled(false);
        }
        this.size++;
        return true;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized boolean isCacheEnabled() {
        return super.isCacheEnabled() || enableCacheNow();
    }

    protected boolean enableCacheNow() {
        boolean z = false;
        if (canEnableCash()) {
            setCacheEnabled(true);
            z = true;
            if (LOG.isTraceEnabled()) {
                LOG.trace("{} enabling cache on empty store", this);
            }
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean canEnableCash() {
        return this.useCache && this.size == 0 && hasSpace() && isStarted();
    }

    @Override // org.apache.activemq.store.MessageRecoveryListener
    public boolean canRecoveryNextMessage() {
        return parentHasSpace(90);
    }

    private void syncWithStore(Message message) throws Exception {
        pruneLastCached();
        ListIterator<MessageId> listIterator = this.pendingCachedIds.listIterator(this.pendingCachedIds.size());
        while (true) {
            if (!listIterator.hasPrevious()) {
                break;
            }
            MessageId previous = listIterator.previous();
            Object futureOrSequenceLong = previous.getFutureOrSequenceLong();
            if (!(futureOrSequenceLong instanceof Future)) {
                setLastCachedId(ASYNC_ADD, previous);
                break;
            }
            Future future = (Future) futureOrSequenceLong;
            if (!future.isCancelled()) {
                try {
                    future.get(5L, TimeUnit.SECONDS);
                    setLastCachedId(ASYNC_ADD, previous);
                    break;
                } catch (CancellationException e) {
                } catch (TimeoutException e2) {
                    LOG.debug("{} timed out waiting for async add", this, e2);
                } catch (Exception e3) {
                    LOG.debug("{} exception waiting for async add", this, e3);
                }
            }
        }
        MessageId messageId = this.lastCachedIds[ASYNC_ADD];
        if (messageId != null && !isAsync(message) && Long.compare(((Long) message.getMessageId().getFutureOrSequenceLong()).longValue(), ((Long) this.lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong()).longValue()) < 0) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("no set batch from async:" + messageId.getFutureOrSequenceLong() + " >= than current: " + message.getMessageId().getFutureOrSequenceLong() + ", " + this);
            }
            messageId = null;
        }
        if (messageId == null) {
            messageId = this.lastCachedIds[SYNC_ADD];
        }
        if (messageId != null) {
            setBatch(messageId);
        }
        MessageId[] messageIdArr = this.lastCachedIds;
        int i = SYNC_ADD;
        this.lastCachedIds[ASYNC_ADD] = null;
        messageIdArr[i] = null;
        this.pendingCachedIds.clear();
    }

    private void trackLastCached(MessageReference messageReference) {
        if (!isAsync(messageReference.getMessage())) {
            setLastCachedId(SYNC_ADD, messageReference.getMessageId());
        } else {
            pruneLastCached();
            this.pendingCachedIds.add(messageReference.getMessageId());
        }
    }

    private static final boolean isAsync(Message message) {
        return message.isRecievedByDFBridge() || (message.getMessageId().getFutureOrSequenceLong() instanceof Future);
    }

    private void pruneLastCached() {
        Iterator<MessageId> it = this.pendingCachedIds.iterator();
        while (it.hasNext()) {
            MessageId next = it.next();
            Object futureOrSequenceLong = next.getFutureOrSequenceLong();
            if (futureOrSequenceLong instanceof Future) {
                Future future = (Future) futureOrSequenceLong;
                if (!future.isDone()) {
                    return;
                }
                if (future.isCancelled()) {
                    it.remove();
                } else {
                    try {
                        future.get(0L, TimeUnit.SECONDS);
                    } catch (ExecutionException e) {
                        it.remove();
                    } catch (Exception e2) {
                        LOG.debug("{} unexpected exception verifying exception state of future", this, e2);
                    }
                }
            } else {
                setLastCachedId(ASYNC_ADD, next);
                if (this.lastCachedIds[SYNC_ADD] != null) {
                    if (Long.compare(((Long) futureOrSequenceLong).longValue(), 1 + ((Long) this.lastCachedIds[SYNC_ADD].getFutureOrSequenceLong()).longValue()) == 0) {
                        setLastCachedId(SYNC_ADD, next);
                    }
                }
                it.remove();
            }
        }
    }

    private void setLastCachedId(int i, MessageId messageId) {
        MessageId messageId2 = this.lastCachedIds[i];
        if (messageId2 == null) {
            this.lastCachedIds[i] = messageId;
            return;
        }
        Object futureOrSequenceLong = messageId2.getFutureOrSequenceLong();
        Object futureOrSequenceLong2 = messageId.getFutureOrSequenceLong();
        if (futureOrSequenceLong == null) {
            this.lastCachedIds[i] = messageId;
            return;
        }
        if (futureOrSequenceLong2 != null && Long.compare(((Long) futureOrSequenceLong2).longValue(), ((Long) futureOrSequenceLong).longValue()) > 0) {
            this.lastCachedIds[i] = messageId;
        } else if (LOG.isTraceEnabled()) {
            LOG.trace("no set last cached[" + i + "] current:" + futureOrSequenceLong + " <= than candidate: " + futureOrSequenceLong2 + ", " + this);
        }
    }

    protected void setBatch(MessageId messageId) throws Exception {
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void addMessageFirst(MessageReference messageReference) throws Exception {
        this.size++;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public final synchronized void remove() {
        this.size--;
        if (this.iterator != null) {
            this.iterator.remove();
        }
        if (this.last != null) {
            this.last.decrementReferenceCount();
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public final synchronized void remove(MessageReference messageReference) {
        if (this.batchList.remove(messageReference) != null) {
            this.size--;
            setCacheEnabled(false);
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public final synchronized void clear() {
        gc();
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public synchronized void gc() {
        for (MessageReference messageReference : this.batchList) {
            rollback(messageReference.getMessageId());
            messageReference.decrementReferenceCount();
        }
        this.batchList.clear();
        clearIterator(false);
        this.batchResetNeeded = true;
        setCacheEnabled(false);
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor
    protected final synchronized void fillBatch() {
        if (LOG.isTraceEnabled()) {
            LOG.trace("{} fillBatch", this);
        }
        if (this.batchResetNeeded) {
            resetSize();
            setMaxBatchSize(Math.min(this.regionDestination.getMaxPageSize(), this.size));
            resetBatch();
            this.batchResetNeeded = false;
        }
        if (!this.batchList.isEmpty() || this.size <= 0) {
            return;
        }
        try {
            doFillBatch();
        } catch (Exception e) {
            LOG.error("{} - Failed to fill batch", this, e);
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public final synchronized boolean isEmpty() {
        return this.size == 0;
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public final synchronized boolean hasMessagesBufferedToDeliver() {
        return !this.batchList.isEmpty();
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor, org.apache.activemq.broker.region.cursors.PendingMessageCursor
    public final synchronized int size() {
        if (this.size < 0) {
            this.size = getStoreSize();
        }
        return this.size;
    }

    public String toString() {
        return super.toString() + ":" + this.regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + this.batchResetNeeded + ",size=" + this.size + ",cacheEnabled=" + this.cacheEnabled + ",maxBatchSize:" + this.maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + this.pendingCachedIds.size() + ",lastSyncCachedId:" + this.lastCachedIds[SYNC_ADD] + ",lastSyncCachedId-seq:" + (this.lastCachedIds[SYNC_ADD] != null ? this.lastCachedIds[SYNC_ADD].getFutureOrSequenceLong() : "null") + ",lastAsyncCachedId:" + this.lastCachedIds[ASYNC_ADD] + ",lastAsyncCachedId-seq:" + (this.lastCachedIds[ASYNC_ADD] != null ? this.lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong() : "null");
    }

    protected abstract void doFillBatch() throws Exception;

    protected abstract void resetBatch();

    protected abstract int getStoreSize();

    protected abstract boolean isStoreEmpty();

    public Subscription getSubscription() {
        return null;
    }
}
