package org.apache.activemq.artemis.core.paging.cursor.impl;

import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageCache;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.PageIterator;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.FutureLatch;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:WEB-INF/lib/artemis-server-1.1.0.wildfly-008.jar:org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.class */
public final class PageSubscriptionImpl implements PageSubscription {
    private final StorageManager store;
    private final long cursorId;
    private Queue queue;
    private final boolean persistent;
    private final Filter filter;
    private final PagingStore pageStore;
    private final PageCursorProvider cursorProvider;
    private volatile PagePosition lastAckedPosition;
    private List<PagePosition> recoveredACK;
    private final PageSubscriptionCounter counter;
    private final Executor executor;
    private final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
    private boolean empty = true;
    private final AtomicInteger scheduledCleanupCount = new AtomicInteger(0);
    private volatile boolean autoCleanup = true;
    private final SortedMap<Long, PageCursorInfo> consumedPages = new TreeMap();
    private final AtomicLong deliveredCount = new AtomicLong(0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/artemis-server-1.1.0.wildfly-008.jar:org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl$CursorIterator.class */
    public class CursorIterator implements PageIterator {
        private PagePosition position = null;
        private PagePosition lastOperation = null;
        private volatile boolean isredelivery = false;
        private PagedReference currentDelivery = null;
        private volatile PagedReference lastRedelivery = null;
        private final java.util.Queue<PagePosition> redeliveries = new LinkedList();
        private volatile PagedReference cachedNext;

        public CursorIterator() {
        }

        @Override // org.apache.activemq.artemis.core.paging.cursor.PageIterator
        public void redeliver(PagePosition pagePosition) {
            synchronized (this.redeliveries) {
                this.redeliveries.add(pagePosition);
            }
        }

        @Override // org.apache.activemq.artemis.utils.LinkedListIterator
        public void repeat() {
            if (this.isredelivery) {
                synchronized (this.redeliveries) {
                    this.cachedNext = this.lastRedelivery;
                }
            } else if (this.lastOperation == null) {
                this.position = null;
            } else {
                this.position = this.lastOperation;
            }
        }

        @Override // java.util.Iterator
        public synchronized PagedReference next() {
            if (this.cachedNext != null) {
                this.currentDelivery = this.cachedNext;
                this.cachedNext = null;
                return this.currentDelivery;
            }
            try {
                if (this.position == null) {
                    this.position = PageSubscriptionImpl.this.getStartPosition();
                }
                this.currentDelivery = moveNext();
                return this.currentDelivery;
            } catch (RuntimeException e) {
                e.printStackTrace();
                throw e;
            }
        }

        private PagedReference moveNext() {
            PagedReference internalGetNext;
            synchronized (PageSubscriptionImpl.this) {
                boolean z = false;
                PagePosition pagePosition = this.position;
                PagePosition pagePosition2 = this.position;
                do {
                    synchronized (this.redeliveries) {
                        PagePosition poll = this.redeliveries.poll();
                        if (poll == null) {
                            this.lastRedelivery = null;
                            this.isredelivery = false;
                            internalGetNext = PageSubscriptionImpl.this.internalGetNext(pagePosition2);
                            if (internalGetNext != null) {
                                pagePosition2 = internalGetNext.getPosition();
                                boolean z2 = false;
                                boolean routed = PageSubscriptionImpl.this.routed(internalGetNext.getPagedMessage());
                                if (!routed) {
                                    z2 = true;
                                }
                                PageCursorInfo pageInfo = PageSubscriptionImpl.this.getPageInfo(internalGetNext.getPosition().getPageNr(), false);
                                if (pageInfo == null || (!pageInfo.isRemoved(internalGetNext.getPosition()) && pageInfo.getCompleteInfo() == null)) {
                                    if (routed && internalGetNext.getPagedMessage().getTransactionID() >= 0) {
                                        PageTransactionInfo transaction = PageSubscriptionImpl.this.pageStore.getPagingManager().getTransaction(internalGetNext.getPagedMessage().getTransactionID());
                                        if (transaction == null) {
                                            ActiveMQServerLogger.LOGGER.pageSubscriptionCouldntLoad(internalGetNext.getPagedMessage().getTransactionID(), internalGetNext.getPosition(), PageSubscriptionImpl.this.pageStore.getAddress(), PageSubscriptionImpl.this.queue.getName());
                                            routed = false;
                                            z2 = true;
                                        } else if (transaction.deliverAfterCommit(this, PageSubscriptionImpl.this, internalGetNext.getPosition())) {
                                            routed = false;
                                            z2 = false;
                                        }
                                    }
                                    if (routed && pageInfo != null && pageInfo.isRemoved(internalGetNext.getPosition())) {
                                        routed = false;
                                    }
                                    if (!z2) {
                                        this.position = internalGetNext.getPosition();
                                    }
                                    if (routed) {
                                        z = PageSubscriptionImpl.this.match(internalGetNext.getMessage());
                                        if (!z) {
                                            PageSubscriptionImpl.this.processACK(internalGetNext.getPosition());
                                        }
                                    } else if (z2) {
                                        PageSubscriptionImpl.this.positionIgnored(internalGetNext.getPosition());
                                    }
                                }
                                if (internalGetNext == null) {
                                    break;
                                }
                            } else {
                                break;
                            }
                        } else {
                            this.isredelivery = true;
                            PagedReference reference = PageSubscriptionImpl.this.getReference(poll);
                            this.lastRedelivery = reference;
                            return reference;
                        }
                    }
                } while (!z);
                if (internalGetNext != null) {
                    this.lastOperation = pagePosition;
                }
                return internalGetNext;
            }
        }

        @Override // java.util.Iterator
        public synchronized boolean hasNext() {
            if (this.cachedNext != null) {
                return true;
            }
            if (!PageSubscriptionImpl.this.pageStore.isPaging()) {
                return false;
            }
            this.cachedNext = next();
            return this.cachedNext != null;
        }

        @Override // java.util.Iterator
        public void remove() {
            PageCursorInfo pageInfo;
            PageSubscriptionImpl.this.deliveredCount.incrementAndGet();
            PagedReference pagedReference = this.currentDelivery;
            if (pagedReference == null || (pageInfo = PageSubscriptionImpl.this.getPageInfo(pagedReference.getPosition())) == null) {
                return;
            }
            pageInfo.remove(pagedReference.getPosition());
        }

        @Override // org.apache.activemq.artemis.utils.LinkedListIterator, java.lang.AutoCloseable
        public void close() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/artemis-server-1.1.0.wildfly-008.jar:org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl$PageCursorInfo.class */
    public final class PageCursorInfo {
        private final int numberOfMessages;
        private final long pageId;
        private WeakReference<PageCache> cache;
        private final boolean wasLive;
        private boolean pendingDelete;
        private PagePosition completePage;
        private final Set<PagePosition> acks = Collections.synchronizedSet(new LinkedHashSet());
        private final Set<PagePosition> removedReferences = new ConcurrentHashSet();
        private final AtomicInteger pendingTX = new AtomicInteger(0);
        private final AtomicInteger confirmed = new AtomicInteger(0);

        public boolean isAck(PagePosition pagePosition) {
            return this.completePage != null || this.acks.contains(pagePosition);
        }

        public String toString() {
            return "PageCursorInfo::PageID=" + this.pageId + " numberOfMessage = " + this.numberOfMessages + ", confirmed = " + this.confirmed + ", isDone=" + isDone();
        }

        public PageCursorInfo(long j, int i, PageCache pageCache) {
            this.pageId = j;
            this.numberOfMessages = i;
            if (pageCache == null) {
                this.wasLive = false;
            } else {
                this.wasLive = pageCache.isLive();
                this.cache = new WeakReference<>(pageCache);
            }
        }

        public void setCompleteInfo(PagePosition pagePosition) {
            this.completePage = pagePosition;
        }

        public PagePosition getCompleteInfo() {
            return this.completePage;
        }

        public boolean isDone() {
            return this.completePage != null || (getNumberOfMessages() == this.confirmed.get() && this.pendingTX.get() == 0);
        }

        public boolean isPendingDelete() {
            return this.pendingDelete || this.completePage != null;
        }

        public void setPendingDelete() {
            this.pendingDelete = true;
        }

        public long getPageId() {
            return this.pageId;
        }

        public void incrementPendingTX() {
            this.pendingTX.incrementAndGet();
        }

        public void decrementPendingTX() {
            this.pendingTX.decrementAndGet();
            checkDone();
        }

        public boolean isRemoved(PagePosition pagePosition) {
            return this.removedReferences.contains(pagePosition);
        }

        public void remove(PagePosition pagePosition) {
            this.removedReferences.add(pagePosition);
        }

        public void addACK(PagePosition pagePosition) {
            if (PageSubscriptionImpl.this.isTrace) {
                ActiveMQServerLogger.LOGGER.trace("numberOfMessages =  " + getNumberOfMessages() + " confirmed =  " + (this.confirmed.get() + 1) + " pendingTX = " + this.pendingTX + ", page = " + this.pageId + " posACK = " + pagePosition);
            }
            if (!internalAddACK(pagePosition) || pagePosition.getMessageNr() < 0) {
                return;
            }
            this.confirmed.incrementAndGet();
            checkDone();
        }

        public void loadACK(PagePosition pagePosition) {
            if (!internalAddACK(pagePosition) || pagePosition.getMessageNr() < 0) {
                return;
            }
            this.confirmed.incrementAndGet();
        }

        private boolean internalAddACK(PagePosition pagePosition) {
            this.removedReferences.add(pagePosition);
            return this.acks.add(pagePosition);
        }

        protected void checkDone() {
            if (isDone()) {
                PageSubscriptionImpl.this.onPageDone(this);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getNumberOfMessages() {
            if (!this.wasLive) {
                return this.numberOfMessages;
            }
            PageCache pageCache = this.cache.get();
            if (pageCache == null) {
                pageCache = PageSubscriptionImpl.this.cursorProvider.getPageCache(this.pageId);
                this.cache = new WeakReference<>(pageCache);
            }
            return pageCache.getNumberOfMessages();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/artemis-server-1.1.0.wildfly-008.jar:org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl$PageCursorTX.class */
    public static final class PageCursorTX extends TransactionOperationAbstract {
        private final Map<PageSubscriptionImpl, List<PagePosition>> pendingPositions;

        private PageCursorTX() {
            this.pendingPositions = new HashMap();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addPositionConfirmation(PageSubscriptionImpl pageSubscriptionImpl, PagePosition pagePosition) {
            List<PagePosition> list = this.pendingPositions.get(pageSubscriptionImpl);
            if (list == null) {
                list = new LinkedList();
                this.pendingPositions.put(pageSubscriptionImpl, list);
            }
            list.add(pagePosition);
        }

        @Override // org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract, org.apache.activemq.artemis.core.transaction.TransactionOperation
        public void afterCommit(Transaction transaction) {
            for (Map.Entry<PageSubscriptionImpl, List<PagePosition>> entry : this.pendingPositions.entrySet()) {
                PageSubscriptionImpl key = entry.getKey();
                Iterator<PagePosition> it = entry.getValue().iterator();
                while (it.hasNext()) {
                    key.processACK(it.next());
                    key.deliveredCount.decrementAndGet();
                }
            }
        }

        @Override // org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract, org.apache.activemq.artemis.core.transaction.TransactionOperation
        public List<MessageReference> getRelatedMessageReferences() {
            return Collections.emptyList();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PageSubscriptionImpl(PageCursorProvider pageCursorProvider, PagingStore pagingStore, StorageManager storageManager, Executor executor, Filter filter, long j, boolean z) {
        this.pageStore = pagingStore;
        this.store = storageManager;
        this.cursorProvider = pageCursorProvider;
        this.cursorId = j;
        this.executor = executor;
        this.filter = filter;
        this.persistent = z;
        this.counter = new PageSubscriptionCounterImpl(storageManager, this, executor, z, j);
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public PagingStore getPagingStore() {
        return this.pageStore;
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public Queue getQueue() {
        return this.queue;
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public boolean isPaging() {
        return this.pageStore.isPaging();
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void setQueue(Queue queue) {
        this.queue = queue;
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void disableAutoCleanup() {
        this.autoCleanup = false;
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void enableAutoCleanup() {
        this.autoCleanup = true;
    }

    public PageCursorProvider getProvider() {
        return this.cursorProvider;
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void notEmpty() {
        synchronized (this.consumedPages) {
            this.empty = false;
        }
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void bookmark(PagePosition pagePosition) throws Exception {
        PageCursorInfo pageInfo = getPageInfo(pagePosition);
        if (pagePosition.getMessageNr() > 0) {
            pageInfo.confirmed.addAndGet(pagePosition.getMessageNr());
        }
        confirmPosition(pagePosition);
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public long getMessageCount() {
        if (this.empty) {
            return 0L;
        }
        return this.counter.getValue() - this.deliveredCount.get();
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public PageSubscriptionCounter getCounter() {
        return this.counter;
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void reloadPageCompletion(PagePosition pagePosition) {
        PageCursorInfo pageCursorInfo = new PageCursorInfo(pagePosition.getPageNr(), pagePosition.getMessageNr(), null);
        pageCursorInfo.setCompleteInfo(pagePosition);
        synchronized (this.consumedPages) {
            this.consumedPages.put(Long.valueOf(pagePosition.getPageNr()), pageCursorInfo);
        }
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void scheduleCleanupCheck() {
        if (!this.autoCleanup || this.scheduledCleanupCount.get() > 2) {
            return;
        }
        this.scheduledCleanupCount.incrementAndGet();
        this.executor.execute(new Runnable() { // from class: org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionImpl.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    PageSubscriptionImpl.this.cleanupEntries(false);
                } catch (Exception e) {
                    ActiveMQServerLogger.LOGGER.problemCleaningCursorPages(e);
                } finally {
                    PageSubscriptionImpl.this.scheduledCleanupCount.decrementAndGet();
                }
            }
        });
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void onPageModeCleared(Transaction transaction) throws Exception {
        if (this.counter != null) {
            this.counter.delete(transaction);
        }
        this.empty = true;
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void cleanupEntries(final boolean z) throws Exception {
        if (z) {
            this.counter.delete();
        }
        TransactionImpl transactionImpl = new TransactionImpl(this.store);
        boolean z2 = false;
        ArrayList arrayList = new ArrayList();
        synchronized (this.consumedPages) {
            if (this.lastAckedPosition == null) {
                return;
            }
            for (Map.Entry<Long, PageCursorInfo> entry : this.consumedPages.entrySet()) {
                PageCursorInfo value = entry.getValue();
                if (value.isDone() && !value.isPendingDelete()) {
                    Page currentPage = this.pageStore.getCurrentPage();
                    if (currentPage != null && entry.getKey().longValue() == this.pageStore.getCurrentPage().getPageId() && currentPage.isLive()) {
                        ActiveMQServerLogger.LOGGER.trace("We can't clear page " + entry.getKey() + " now since it's the current page");
                    } else {
                        value.setPendingDelete();
                        arrayList.add(entry.getValue());
                    }
                }
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                PageCursorInfo pageCursorInfo = (PageCursorInfo) it.next();
                if (isPersistent()) {
                    PagePositionImpl pagePositionImpl = new PagePositionImpl(pageCursorInfo.getPageId(), pageCursorInfo.getNumberOfMessages());
                    pageCursorInfo.setCompleteInfo(pagePositionImpl);
                    this.store.storePageCompleteTransactional(transactionImpl.getID(), getId(), pagePositionImpl);
                    if (!z2) {
                        z2 = true;
                        transactionImpl.setContainsPersistent();
                    }
                }
                for (PagePosition pagePosition : pageCursorInfo.acks) {
                    if (pagePosition.getRecordID() >= 0) {
                        this.store.deleteCursorAcknowledgeTransactional(transactionImpl.getID(), pagePosition.getRecordID());
                        if (!z2) {
                            transactionImpl.setContainsPersistent();
                            z2 = true;
                        }
                    }
                }
                pageCursorInfo.acks.clear();
                pageCursorInfo.removedReferences.clear();
            }
            transactionImpl.addOperation(new TransactionOperationAbstract() { // from class: org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionImpl.2
                @Override // org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract, org.apache.activemq.artemis.core.transaction.TransactionOperation
                public void afterCommit(Transaction transaction) {
                    PageSubscriptionImpl.this.executor.execute(new Runnable() { // from class: org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionImpl.2.1
                        @Override // java.lang.Runnable
                        public void run() {
                            if (z) {
                                return;
                            }
                            PageSubscriptionImpl.this.cursorProvider.scheduleCleanup();
                        }
                    });
                }
            });
            transactionImpl.commit();
        }
    }

    public String toString() {
        return "PageSubscriptionImpl [cursorId=" + this.cursorId + ", queue=" + this.queue + ", filter = " + this.filter + "]";
    }

    /* JADX INFO: Access modifiers changed from: private */
    public PagedReference getReference(PagePosition pagePosition) {
        return this.cursorProvider.newReference(pagePosition, this.cursorProvider.getMessage(pagePosition), this);
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public PageIterator iterator() {
        return new CursorIterator();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public PagedReference internalGetNext(PagePosition pagePosition) {
        PagedMessage message;
        PagePosition nextMessage = pagePosition.nextMessage();
        PageCache pageCache = this.cursorProvider.getPageCache(pagePosition.getPageNr());
        if (pageCache != null && !pageCache.isLive() && nextMessage.getMessageNr() >= pageCache.getNumberOfMessages()) {
            pageCache = null;
        }
        while (true) {
            if ((pageCache != null || nextMessage.getPageNr() > this.pageStore.getCurrentWritingPage()) && (pageCache == null || nextMessage.getPageNr() > this.pageStore.getCurrentWritingPage() || pageCache.getNumberOfMessages() != 0)) {
                break;
            }
            nextMessage = moveNextPage(nextMessage);
            pageCache = this.cursorProvider.getPageCache(nextMessage.getPageNr());
        }
        if (pageCache == null || (message = pageCache.getMessage(nextMessage.getMessageNr())) == null) {
            return null;
        }
        return this.cursorProvider.newReference(nextMessage, message, this);
    }

    private PagePosition moveNextPage(PagePosition pagePosition) {
        PagePosition pagePosition2 = pagePosition;
        while (true) {
            pagePosition2 = pagePosition2.nextPage();
            synchronized (this.consumedPages) {
                PageCursorInfo pageCursorInfo = this.consumedPages.get(Long.valueOf(pagePosition2.getPageNr()));
                if (pageCursorInfo == null || (!pageCursorInfo.isPendingDelete() && pageCursorInfo.getCompleteInfo() == null)) {
                    break;
                }
            }
        }
        return pagePosition2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean routed(PagedMessage pagedMessage) {
        long id = getId();
        for (long j : pagedMessage.getQueueIDs()) {
            if (j == id) {
                return true;
            }
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized PagePosition getStartPosition() {
        return new PagePositionImpl(this.pageStore.getFirstPage(), -1);
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void confirmPosition(Transaction transaction, PagePosition pagePosition) throws Exception {
        if (this.persistent) {
            this.store.storeCursorAcknowledgeTransactional(transaction.getID(), this.cursorId, pagePosition);
        }
        installTXCallback(transaction, pagePosition);
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void ackTx(Transaction transaction, PagedReference pagedReference) throws Exception {
        confirmPosition(transaction, pagedReference.getPosition());
        this.counter.increment(transaction, -1);
        PageTransactionInfo pageTransaction = getPageTransaction(pagedReference);
        if (pageTransaction != null) {
            pageTransaction.storeUpdate(this.store, this.pageStore.getPagingManager(), transaction);
        }
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void ack(PagedReference pagedReference) throws Exception {
        TransactionImpl transactionImpl = new TransactionImpl(this.store);
        ackTx(transactionImpl, pagedReference);
        transactionImpl.commit();
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public boolean contains(PagedReference pagedReference) throws Exception {
        boolean z = false;
        long[] queueIDs = pagedReference.getPagedMessage().getQueueIDs();
        int length = queueIDs.length;
        int i = 0;
        while (true) {
            if (i >= length) {
                break;
            }
            if (queueIDs[i] == this.cursorId) {
                z = true;
                break;
            }
            i++;
        }
        return z && !getPageInfo(pagedReference.getPosition()).isAck(pagedReference.getPosition());
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void confirmPosition(final PagePosition pagePosition) throws Exception {
        if (this.persistent) {
            this.store.storeCursorAcknowledge(this.cursorId, pagePosition);
        }
        this.store.afterCompleteOperations(new IOCallback() { // from class: org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionImpl.3
            volatile String error = "";

            @Override // org.apache.activemq.artemis.core.io.IOCallback
            public void onError(int i, String str) {
                this.error = " errorCode=" + i + ", msg=" + str;
                ActiveMQServerLogger.LOGGER.pageSubscriptionError(this, this.error);
            }

            @Override // org.apache.activemq.artemis.core.io.IOCallback
            public void done() {
                PageSubscriptionImpl.this.processACK(pagePosition);
            }

            public String toString() {
                return IOCallback.class.getSimpleName() + "(" + PageSubscriptionImpl.class.getSimpleName() + ") " + this.error;
            }
        });
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public long getFirstPage() {
        synchronized (this.consumedPages) {
            if (this.empty && this.consumedPages.isEmpty()) {
                return -1L;
            }
            long j = 0;
            for (Map.Entry<Long, PageCursorInfo> entry : this.consumedPages.entrySet()) {
                j = entry.getKey().longValue();
                if (!entry.getValue().isDone() && !entry.getValue().isPendingDelete()) {
                    return entry.getKey().longValue();
                }
            }
            return j;
        }
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void addPendingDelivery(PagePosition pagePosition) {
        getPageInfo(pagePosition).incrementPendingTX();
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void redeliver(PageIterator pageIterator, PagePosition pagePosition) {
        pageIterator.redeliver(pagePosition);
        synchronized (this.consumedPages) {
            PageCursorInfo pageCursorInfo = this.consumedPages.get(Long.valueOf(pagePosition.getPageNr()));
            if (pageCursorInfo != null) {
                pageCursorInfo.decrementPendingTX();
            }
        }
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public PagedMessage queryMessage(PagePosition pagePosition) {
        try {
            return this.cursorProvider.getMessage(pagePosition);
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void reloadACK(PagePosition pagePosition) {
        if (this.recoveredACK == null) {
            this.recoveredACK = new LinkedList();
        }
        this.recoveredACK.add(pagePosition);
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void reloadPreparedACK(Transaction transaction, PagePosition pagePosition) {
        this.deliveredCount.incrementAndGet();
        installTXCallback(transaction, pagePosition);
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void positionIgnored(PagePosition pagePosition) {
        processACK(pagePosition);
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void lateDeliveryRollback(PagePosition pagePosition) {
        processACK(pagePosition).decrementPendingTX();
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public boolean isComplete(long j) {
        synchronized (this.consumedPages) {
            if (this.empty && this.consumedPages.isEmpty()) {
                return true;
            }
            PageCursorInfo pageCursorInfo = this.consumedPages.get(Long.valueOf(j));
            if (pageCursorInfo == null && this.empty) {
                return true;
            }
            return pageCursorInfo != null && pageCursorInfo.isDone();
        }
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void destroy() throws Exception {
        long generateID = this.store.generateID();
        try {
            boolean z = false;
            synchronized (this.consumedPages) {
                for (PageCursorInfo pageCursorInfo : this.consumedPages.values()) {
                    for (PagePosition pagePosition : pageCursorInfo.acks) {
                        if (pagePosition.getRecordID() >= 0) {
                            z = true;
                            this.store.deleteCursorAcknowledgeTransactional(generateID, pagePosition.getRecordID());
                        }
                    }
                    PagePosition completeInfo = pageCursorInfo.getCompleteInfo();
                    if (completeInfo != null && completeInfo.getRecordID() >= 0) {
                        this.store.deletePageComplete(completeInfo.getRecordID());
                        pageCursorInfo.setCompleteInfo(null);
                    }
                }
            }
            if (z) {
                this.store.commit(generateID);
            }
            this.cursorProvider.close(this);
        } catch (Exception e) {
            try {
                this.store.rollback(generateID);
            } catch (Exception e2) {
            }
        }
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public long getId() {
        return this.cursorId;
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public boolean isPersistent() {
        return this.persistent;
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void processReload() throws Exception {
        if (this.recoveredACK != null) {
            if (this.isTrace) {
                ActiveMQServerLogger.LOGGER.trace("********** processing reload!!!!!!!");
            }
            Collections.sort(this.recoveredACK);
            long j = -1;
            for (PagePosition pagePosition : this.recoveredACK) {
                this.lastAckedPosition = pagePosition;
                PageCursorInfo pageInfo = getPageInfo(pagePosition);
                if (pageInfo == null) {
                    ActiveMQServerLogger.LOGGER.pageNotFound(pagePosition);
                    if (j == -1) {
                        j = this.store.generateID();
                    }
                    this.store.deleteCursorAcknowledgeTransactional(j, pagePosition.getRecordID());
                } else {
                    pageInfo.loadACK(pagePosition);
                }
            }
            if (j >= 0) {
                this.store.commit(j);
            }
            this.recoveredACK.clear();
            this.recoveredACK = null;
        }
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void flushExecutors() {
        FutureLatch futureLatch = new FutureLatch();
        this.executor.execute(futureLatch);
        while (!futureLatch.await(1000L)) {
            ActiveMQServerLogger.LOGGER.timedOutFlushingExecutorsPagingCursor(this);
        }
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void stop() {
        flushExecutors();
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void printDebug() {
        printDebug(toString());
    }

    public void printDebug(String str) {
        System.out.println("Debug information on PageCurorImpl- " + str);
        Iterator<PageCursorInfo> it = this.consumedPages.values().iterator();
        while (it.hasNext()) {
            System.out.println(it.next());
        }
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void onDeletePage(Page page) throws Exception {
        PageCursorInfo remove;
        synchronized (this.consumedPages) {
            remove = this.consumedPages.remove(Long.valueOf(page.getPageId()));
        }
        if (remove != null) {
            PagePosition completeInfo = remove.getCompleteInfo();
            if (completeInfo != null) {
                try {
                    this.store.deletePageComplete(completeInfo.getRecordID());
                } catch (Exception e) {
                    ActiveMQServerLogger.LOGGER.warn("Error while deleting page-complete-record", e);
                }
                remove.setCompleteInfo(null);
            }
            for (PagePosition pagePosition : remove.acks) {
                if (pagePosition.getRecordID() >= 0) {
                    try {
                        this.store.deleteCursorAcknowledge(pagePosition.getRecordID());
                    } catch (Exception e2) {
                        ActiveMQServerLogger.LOGGER.warn("Error while deleting page-complete-record", e2);
                    }
                }
            }
            remove.acks.clear();
        }
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public Executor getExecutor() {
        return this.executor;
    }

    @Override // org.apache.activemq.artemis.core.paging.cursor.PageSubscription
    public void reloadPageInfo(long j) {
        getPageInfo(j, true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public PageCursorInfo getPageInfo(PagePosition pagePosition) {
        return getPageInfo(pagePosition.getPageNr(), true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public PageCursorInfo getPageInfo(long j, boolean z) {
        synchronized (this.consumedPages) {
            PageCursorInfo pageCursorInfo = this.consumedPages.get(Long.valueOf(j));
            if (z && pageCursorInfo == null) {
                PageCache pageCache = this.cursorProvider.getPageCache(j);
                if (pageCache == null) {
                    return null;
                }
                pageCursorInfo = new PageCursorInfo(j, pageCache.getNumberOfMessages(), pageCache);
                this.consumedPages.put(Long.valueOf(j), pageCursorInfo);
            }
            return pageCursorInfo;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean match(ServerMessage serverMessage) {
        if (this.filter == null) {
            return true;
        }
        return this.filter.match(serverMessage);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public PageCursorInfo processACK(PagePosition pagePosition) {
        if (this.lastAckedPosition == null || pagePosition.compareTo(this.lastAckedPosition) > 0) {
            if (this.isTrace) {
                ActiveMQServerLogger.LOGGER.trace("a new position is being processed as ACK");
            }
            if (this.lastAckedPosition != null && this.lastAckedPosition.getPageNr() != pagePosition.getPageNr()) {
                if (this.isTrace) {
                    ActiveMQServerLogger.LOGGER.trace("Scheduling cleanup on pageSubscription for address = " + ((Object) this.pageStore.getAddress()) + " queue = " + ((Object) getQueue().getName()));
                }
                if (this.autoCleanup) {
                    scheduleCleanupCheck();
                }
            }
            this.lastAckedPosition = pagePosition;
        }
        PageCursorInfo pageInfo = getPageInfo(pagePosition);
        if (pageInfo == null) {
            ActiveMQServerLogger.LOGGER.nullPageCursorInfo(getPagingStore().getAddress().toString(), pagePosition.toString(), this.cursorId);
        } else {
            pageInfo.addACK(pagePosition);
        }
        return pageInfo;
    }

    private void installTXCallback(Transaction transaction, PagePosition pagePosition) {
        if (pagePosition.getRecordID() >= 0) {
            transaction.setContainsPersistent();
        }
        getPageInfo(pagePosition).remove(pagePosition);
        PageCursorTX pageCursorTX = (PageCursorTX) transaction.getProperty(8);
        if (pageCursorTX == null) {
            pageCursorTX = new PageCursorTX();
            transaction.putProperty(8, pageCursorTX);
            transaction.addOperation(pageCursorTX);
        }
        pageCursorTX.addPositionConfirmation(this, pagePosition);
    }

    private PageTransactionInfo getPageTransaction(PagedReference pagedReference) {
        if (pagedReference.getPagedMessage().getTransactionID() >= 0) {
            return this.pageStore.getPagingManager().getTransaction(pagedReference.getPagedMessage().getTransactionID());
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onPageDone(PageCursorInfo pageCursorInfo) {
        if (this.autoCleanup) {
            scheduleCleanupCheck();
        }
    }
}
