/*
 * Decompiled with CFR 0.152.
 */
package org.hornetq.core.paging.cursor.impl;

import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
import org.hornetq.core.paging.cursor.PagedReference;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.cursor.impl.PageSubscriptionCounterImpl;
import org.hornetq.core.paging.impl.Page;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.HornetQServerLogger;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperationAbstract;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.FutureLatch;
import org.hornetq.utils.LinkedListIterator;

final class PageSubscriptionImpl
implements PageSubscription {
    private final boolean isTrace = HornetQServerLogger.LOGGER.isTraceEnabled();
    private boolean empty = true;
    private final AtomicInteger scheduledCleanupCount = new AtomicInteger(0);
    private volatile boolean autoCleanup = true;
    private final StorageManager store;
    private final long cursorId;
    private org.hornetq.core.server.Queue queue;
    private final boolean persistent;
    private final Filter filter;
    private final PagingStore pageStore;
    private final PageCursorProvider cursorProvider;
    private volatile PagePosition lastAckedPosition;
    private List<PagePosition> recoveredACK;
    private final SortedMap<Long, PageCursorInfo> consumedPages = new TreeMap<Long, PageCursorInfo>();
    private final PageSubscriptionCounter counter;
    private final Executor executor;
    private final AtomicLong deliveredCount = new AtomicLong(0L);
    private final Queue<PagePosition> redeliveries = new LinkedList<PagePosition>();

    PageSubscriptionImpl(PageCursorProvider cursorProvider, PagingStore pageStore, StorageManager store, Executor executor, Filter filter, long cursorId, boolean persistent) {
        this.pageStore = pageStore;
        this.store = store;
        this.cursorProvider = cursorProvider;
        this.cursorId = cursorId;
        this.executor = executor;
        this.filter = filter;
        this.persistent = persistent;
        this.counter = new PageSubscriptionCounterImpl(store, this, executor, persistent, cursorId);
    }

    @Override
    public PagingStore getPagingStore() {
        return this.pageStore;
    }

    @Override
    public org.hornetq.core.server.Queue getQueue() {
        return this.queue;
    }

    @Override
    public boolean isPaging() {
        return this.pageStore.isPaging();
    }

    @Override
    public void setQueue(org.hornetq.core.server.Queue queue) {
        this.queue = queue;
    }

    @Override
    public void disableAutoCleanup() {
        this.autoCleanup = false;
    }

    @Override
    public void enableAutoCleanup() {
        this.autoCleanup = true;
    }

    public PageCursorProvider getProvider() {
        return this.cursorProvider;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void notEmpty() {
        SortedMap<Long, PageCursorInfo> sortedMap = this.consumedPages;
        synchronized (sortedMap) {
            this.empty = false;
        }
    }

    @Override
    public void bookmark(PagePosition position) throws Exception {
        PageCursorInfo cursorInfo = this.getPageInfo(position);
        if (position.getMessageNr() > 0) {
            cursorInfo.confirmed.addAndGet(position.getMessageNr());
        }
        this.confirmPosition(position);
    }

    @Override
    public long getMessageCount() {
        if (this.empty) {
            return 0L;
        }
        return this.counter.getValue() - this.deliveredCount.get();
    }

    @Override
    public PageSubscriptionCounter getCounter() {
        return this.counter;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void reloadPageCompletion(PagePosition position) {
        PageCursorInfo info = new PageCursorInfo(position.getPageNr(), position.getMessageNr(), null);
        info.setCompleteInfo(position);
        SortedMap<Long, PageCursorInfo> sortedMap = this.consumedPages;
        synchronized (sortedMap) {
            this.consumedPages.put(position.getPageNr(), info);
        }
    }

    @Override
    public void scheduleCleanupCheck() {
        if (this.autoCleanup) {
            if (this.scheduledCleanupCount.get() > 2) {
                return;
            }
            this.scheduledCleanupCount.incrementAndGet();
            this.executor.execute(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        PageSubscriptionImpl.this.cleanupEntries(false);
                    }
                    catch (Exception e) {
                        HornetQServerLogger.LOGGER.problemCleaningCursorPages(e);
                    }
                    finally {
                        PageSubscriptionImpl.this.scheduledCleanupCount.decrementAndGet();
                    }
                }
            });
        }
    }

    @Override
    public void onPageModeCleared(Transaction tx) throws Exception {
        if (this.counter != null) {
            this.counter.delete(tx);
        }
        this.empty = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void cleanupEntries(final boolean completeDelete) throws Exception {
        if (completeDelete) {
            this.counter.delete();
        }
        TransactionImpl tx = new TransactionImpl(this.store);
        boolean persist = false;
        ArrayList<PageCursorInfo> completedPages = new ArrayList<PageCursorInfo>();
        SortedMap<Long, PageCursorInfo> sortedMap = this.consumedPages;
        synchronized (sortedMap) {
            if (this.lastAckedPosition == null) {
                return;
            }
            for (Map.Entry<Long, PageCursorInfo> entry : this.consumedPages.entrySet()) {
                PageCursorInfo info = entry.getValue();
                if (!info.isDone() || info.isPendingDelete()) continue;
                Page currentPage = this.pageStore.getCurrentPage();
                if (currentPage != null && entry.getKey() == (long)this.pageStore.getCurrentPage().getPageId() && currentPage.isLive()) {
                    HornetQServerLogger.LOGGER.trace("We can't clear page " + entry.getKey() + " now since it's the current page");
                    continue;
                }
                info.setPendingDelete();
                completedPages.add(entry.getValue());
            }
        }
        for (PageCursorInfo infoPG : completedPages) {
            if (this.isPersistent()) {
                PagePositionImpl completePage = new PagePositionImpl(infoPG.getPageId(), infoPG.getNumberOfMessages());
                infoPG.setCompleteInfo(completePage);
                this.store.storePageCompleteTransactional(tx.getID(), this.getId(), completePage);
                if (!persist) {
                    persist = true;
                    tx.setContainsPersistent();
                }
            }
            for (PagePosition pos : infoPG.acks) {
                if (pos.getRecordID() < 0L) continue;
                this.store.deleteCursorAcknowledgeTransactional(tx.getID(), pos.getRecordID());
                if (persist) continue;
                tx.setContainsPersistent();
                persist = true;
            }
            infoPG.acks.clear();
        }
        tx.addOperation(new TransactionOperationAbstract(){

            @Override
            public void afterCommit(Transaction tx1) {
                PageSubscriptionImpl.this.executor.execute(new Runnable(){

                    @Override
                    public void run() {
                        if (!completeDelete) {
                            PageSubscriptionImpl.this.cursorProvider.scheduleCleanup();
                        }
                    }
                });
            }
        });
        tx.commit();
    }

    public String toString() {
        return "PageSubscriptionImpl [cursorId=" + this.cursorId + ", queue=" + this.queue + ", filter = " + this.filter + "]";
    }

    private PagedReference getReference(PagePosition pos) {
        return this.cursorProvider.newReference(pos, this.cursorProvider.getMessage(pos), this);
    }

    @Override
    public LinkedListIterator<PagedReference> iterator() {
        return new CursorIterator();
    }

    private PagedReference internalGetNext(PagePosition pos) {
        PagePosition retPos = pos.nextMessage();
        PageCache cache = this.cursorProvider.getPageCache(pos.getPageNr());
        if (cache != null && !cache.isLive() && retPos.getMessageNr() >= cache.getNumberOfMessages()) {
            cache = null;
        }
        while (cache == null && retPos.getPageNr() <= (long)this.pageStore.getCurrentWritingPage() || cache != null && retPos.getPageNr() <= (long)this.pageStore.getCurrentWritingPage() && cache.getNumberOfMessages() == 0) {
            retPos = this.moveNextPage(retPos);
            cache = this.cursorProvider.getPageCache(retPos.getPageNr());
        }
        if (cache == null) {
            return null;
        }
        PagedMessage serverMessage = cache.getMessage(retPos.getMessageNr());
        if (serverMessage != null) {
            return this.cursorProvider.newReference(retPos, serverMessage, this);
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private PagePosition moveNextPage(PagePosition pos) {
        PagePosition retPos = pos;
        while (true) {
            retPos = retPos.nextPage();
            SortedMap<Long, PageCursorInfo> sortedMap = this.consumedPages;
            synchronized (sortedMap) {
                PageCursorInfo pageInfo = (PageCursorInfo)this.consumedPages.get(retPos.getPageNr());
                if (pageInfo == null || !pageInfo.isPendingDelete() && pageInfo.getCompleteInfo() == null) {
                    return retPos;
                }
            }
        }
    }

    private boolean routed(PagedMessage message) {
        long id = this.getId();
        for (long qid : message.getQueueIDs()) {
            if (qid != id) continue;
            return true;
        }
        return false;
    }

    private synchronized PagePosition getStartPosition() {
        return new PagePositionImpl(this.pageStore.getFirstPage(), -1);
    }

    @Override
    public void confirmPosition(Transaction tx, PagePosition position) throws Exception {
        if (this.persistent) {
            this.store.storeCursorAcknowledgeTransactional(tx.getID(), this.cursorId, position);
        }
        this.installTXCallback(tx, position);
    }

    @Override
    public void ackTx(Transaction tx, PagedReference reference) throws Exception {
        this.confirmPosition(tx, reference.getPosition());
        this.counter.increment(tx, -1);
        PageTransactionInfo txInfo = this.getPageTransaction(reference);
        if (txInfo != null) {
            txInfo.storeUpdate(this.store, this.pageStore.getPagingManager(), tx);
        }
    }

    @Override
    public void ack(PagedReference reference) throws Exception {
        TransactionImpl tx = new TransactionImpl(this.store);
        this.ackTx(tx, reference);
        tx.commit();
    }

    @Override
    public void confirmPosition(final PagePosition position) throws Exception {
        if (this.persistent) {
            this.store.storeCursorAcknowledge(this.cursorId, position);
        }
        this.store.afterCompleteOperations(new IOAsyncTask(){
            volatile String error = "";

            @Override
            public void onError(int errorCode, String errorMessage) {
                this.error = " errorCode=" + errorCode + ", msg=" + errorMessage;
                HornetQServerLogger.LOGGER.pageSubscriptionError(this, this.error);
            }

            @Override
            public void done() {
                PageSubscriptionImpl.this.processACK(position);
            }

            public String toString() {
                return IOAsyncTask.class.getSimpleName() + "(" + PageSubscriptionImpl.class.getSimpleName() + ") " + this.error;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public long getFirstPage() {
        SortedMap<Long, PageCursorInfo> sortedMap = this.consumedPages;
        synchronized (sortedMap) {
            if (this.empty && this.consumedPages.isEmpty()) {
                return -1L;
            }
            long lastPageSeen = 0L;
            for (Map.Entry<Long, PageCursorInfo> info : this.consumedPages.entrySet()) {
                lastPageSeen = info.getKey();
                if (info.getValue().isDone() || info.getValue().isPendingDelete()) continue;
                return info.getKey();
            }
            return lastPageSeen;
        }
    }

    @Override
    public void addPendingDelivery(PagePosition position) {
        this.getPageInfo(position).incrementPendingTX();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void redeliver(PagePosition position) {
        Object object = this.redeliveries;
        synchronized (object) {
            this.redeliveries.add(position);
        }
        object = this.consumedPages;
        synchronized (object) {
            PageCursorInfo pageInfo = (PageCursorInfo)this.consumedPages.get(position.getPageNr());
            if (pageInfo != null) {
                pageInfo.decrementPendingTX();
            }
        }
    }

    @Override
    public PagedMessage queryMessage(PagePosition pos) {
        try {
            return this.cursorProvider.getMessage(pos);
        }
        catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    @Override
    public void reloadACK(PagePosition position) {
        if (this.recoveredACK == null) {
            this.recoveredACK = new LinkedList<PagePosition>();
        }
        this.recoveredACK.add(position);
    }

    @Override
    public void reloadPreparedACK(Transaction tx, PagePosition position) {
        this.deliveredCount.incrementAndGet();
        this.installTXCallback(tx, position);
    }

    @Override
    public void positionIgnored(PagePosition position) {
        this.processACK(position);
    }

    @Override
    public void lateDeliveryRollback(PagePosition position) {
        PageCursorInfo cursorInfo = this.processACK(position);
        cursorInfo.decrementPendingTX();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isComplete(long page) {
        SortedMap<Long, PageCursorInfo> sortedMap = this.consumedPages;
        synchronized (sortedMap) {
            if (this.empty && this.consumedPages.isEmpty()) {
                return true;
            }
            PageCursorInfo info = (PageCursorInfo)this.consumedPages.get(page);
            if (info == null && this.empty) {
                return true;
            }
            return info != null && info.isDone();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void destroy() throws Exception {
        long tx = this.store.generateUniqueID();
        try {
            boolean isPersistent = false;
            SortedMap<Long, PageCursorInfo> sortedMap = this.consumedPages;
            synchronized (sortedMap) {
                for (PageCursorInfo cursor : this.consumedPages.values()) {
                    for (PagePosition info : cursor.acks) {
                        if (info.getRecordID() < 0L) continue;
                        isPersistent = true;
                        this.store.deleteCursorAcknowledgeTransactional(tx, info.getRecordID());
                    }
                    PagePosition completeInfo = cursor.getCompleteInfo();
                    if (completeInfo == null || completeInfo.getRecordID() < 0L) continue;
                    this.store.deletePageComplete(completeInfo.getRecordID());
                    cursor.setCompleteInfo(null);
                }
            }
            if (isPersistent) {
                this.store.commit(tx);
            }
            this.cursorProvider.close(this);
        }
        catch (Exception e) {
            try {
                this.store.rollback(tx);
            }
            catch (Exception ignored) {
                // empty catch block
            }
        }
    }

    @Override
    public long getId() {
        return this.cursorId;
    }

    @Override
    public boolean isPersistent() {
        return this.persistent;
    }

    @Override
    public void processReload() throws Exception {
        if (this.recoveredACK != null) {
            if (this.isTrace) {
                HornetQServerLogger.LOGGER.trace("********** processing reload!!!!!!!");
            }
            Collections.sort(this.recoveredACK);
            long txDeleteCursorOnReload = -1L;
            Iterator<PagePosition> i$ = this.recoveredACK.iterator();
            while (i$.hasNext()) {
                PagePosition pos;
                this.lastAckedPosition = pos = i$.next();
                PageCursorInfo pageInfo = this.getPageInfo(pos);
                if (pageInfo == null) {
                    HornetQServerLogger.LOGGER.pageNotFound(pos);
                    if (txDeleteCursorOnReload == -1L) {
                        txDeleteCursorOnReload = this.store.generateUniqueID();
                    }
                    this.store.deleteCursorAcknowledgeTransactional(txDeleteCursorOnReload, pos.getRecordID());
                    continue;
                }
                pageInfo.loadACK(pos);
            }
            if (txDeleteCursorOnReload >= 0L) {
                this.store.commit(txDeleteCursorOnReload);
            }
            this.recoveredACK.clear();
            this.recoveredACK = null;
        }
    }

    @Override
    public void flushExecutors() {
        FutureLatch future = new FutureLatch();
        this.executor.execute(future);
        while (!future.await(1000L)) {
            HornetQServerLogger.LOGGER.timedOutFlushingExecutorsPagingCursor(this);
        }
    }

    @Override
    public void stop() {
        this.flushExecutors();
    }

    @Override
    public void printDebug() {
        this.printDebug(this.toString());
    }

    public void printDebug(String msg) {
        System.out.println("Debug information on PageCurorImpl- " + msg);
        for (PageCursorInfo info : this.consumedPages.values()) {
            System.out.println(info);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onDeletePage(Page deletedPage) throws Exception {
        PageCursorInfo info;
        SortedMap<Long, PageCursorInfo> sortedMap = this.consumedPages;
        synchronized (sortedMap) {
            info = (PageCursorInfo)this.consumedPages.remove(deletedPage.getPageId());
        }
        if (info != null) {
            PagePosition completeInfo = info.getCompleteInfo();
            if (completeInfo != null) {
                try {
                    this.store.deletePageComplete(completeInfo.getRecordID());
                }
                catch (Exception e) {
                    HornetQServerLogger.LOGGER.warn("Error while deleting page-complete-record", e);
                }
                info.setCompleteInfo(null);
            }
            for (PagePosition deleteInfo : info.acks) {
                if (deleteInfo.getRecordID() < 0L) continue;
                try {
                    this.store.deleteCursorAcknowledge(deleteInfo.getRecordID());
                }
                catch (Exception e) {
                    HornetQServerLogger.LOGGER.warn("Error while deleting page-complete-record", e);
                }
            }
            info.acks.clear();
        }
    }

    @Override
    public Executor getExecutor() {
        return this.executor;
    }

    @Override
    public void reloadPageInfo(long pageNr) {
        this.getPageInfo(pageNr, true);
    }

    private PageCursorInfo getPageInfo(PagePosition pos) {
        return this.getPageInfo(pos.getPageNr(), true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private PageCursorInfo getPageInfo(long pageNr, boolean create) {
        SortedMap<Long, PageCursorInfo> sortedMap = this.consumedPages;
        synchronized (sortedMap) {
            PageCursorInfo pageInfo = (PageCursorInfo)this.consumedPages.get(pageNr);
            if (create && pageInfo == null) {
                PageCache cache = this.cursorProvider.getPageCache(pageNr);
                if (cache == null) {
                    return null;
                }
                pageInfo = new PageCursorInfo(pageNr, cache.getNumberOfMessages(), cache);
                this.consumedPages.put(pageNr, pageInfo);
            }
            return pageInfo;
        }
    }

    private boolean match(ServerMessage message) {
        if (this.filter == null) {
            return true;
        }
        return this.filter.match(message);
    }

    private PageCursorInfo processACK(PagePosition pos) {
        PageCursorInfo info;
        if (this.lastAckedPosition == null || pos.compareTo(this.lastAckedPosition) > 0) {
            if (this.isTrace) {
                HornetQServerLogger.LOGGER.trace("a new position is being processed as ACK");
            }
            if (this.lastAckedPosition != null && this.lastAckedPosition.getPageNr() != pos.getPageNr()) {
                if (this.isTrace) {
                    HornetQServerLogger.LOGGER.trace("Scheduling cleanup on pageSubscription for address = " + this.pageStore.getAddress() + " queue = " + this.getQueue().getName());
                }
                if (this.autoCleanup) {
                    this.scheduleCleanupCheck();
                }
            }
            this.lastAckedPosition = pos;
        }
        if ((info = this.getPageInfo(pos)) == null) {
            HornetQServerLogger.LOGGER.warn("PageCursorInfo == null on address " + this.getPagingStore().getAddress() + ", pos = " + pos + ", queue = " + this.cursorId);
        } else {
            info.addACK(pos);
        }
        return info;
    }

    private void installTXCallback(Transaction tx, PagePosition position) {
        if (position.getRecordID() >= 0L) {
            tx.setContainsPersistent();
        }
        this.getPageInfo(position).remove(position);
        PageCursorTX cursorTX = (PageCursorTX)tx.getProperty(8);
        if (cursorTX == null) {
            cursorTX = new PageCursorTX();
            tx.putProperty(8, cursorTX);
            tx.addOperation(cursorTX);
        }
        cursorTX.addPositionConfirmation(this, position);
    }

    private PageTransactionInfo getPageTransaction(PagedReference reference) {
        if (reference.getPagedMessage().getTransactionID() >= 0L) {
            return this.pageStore.getPagingManager().getTransaction(reference.getPagedMessage().getTransactionID());
        }
        return null;
    }

    private void onPageDone(PageCursorInfo info) {
        if (this.autoCleanup) {
            this.scheduleCleanupCheck();
        }
    }

    private class CursorIterator
    implements LinkedListIterator<PagedReference> {
        private PagePosition position = null;
        private PagePosition lastOperation = null;
        private volatile boolean isredelivery = false;
        private volatile PagedReference lastRedelivery = null;
        private volatile PagedReference cachedNext;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void repeat() {
            if (this.isredelivery) {
                Queue queue = PageSubscriptionImpl.this.redeliveries;
                synchronized (queue) {
                    this.cachedNext = this.lastRedelivery;
                }
            } else {
                this.position = this.lastOperation == null ? null : this.lastOperation;
            }
        }

        @Override
        public synchronized PagedReference next() {
            if (this.cachedNext != null) {
                PagedReference retPos = this.cachedNext;
                this.cachedNext = null;
                return retPos;
            }
            try {
                if (this.position == null) {
                    this.position = PageSubscriptionImpl.this.getStartPosition();
                }
                return this.moveNext();
            }
            catch (RuntimeException e) {
                e.printStackTrace();
                throw e;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private PagedReference moveNext() {
            PageSubscriptionImpl pageSubscriptionImpl = PageSubscriptionImpl.this;
            synchronized (pageSubscriptionImpl) {
                PagedReference message;
                boolean match = false;
                PagePosition lastPosition = this.position;
                PagePosition tmpPosition = this.position;
                do {
                    PageCursorInfo info;
                    Queue queue = PageSubscriptionImpl.this.redeliveries;
                    synchronized (queue) {
                        PagePosition redelivery = (PagePosition)PageSubscriptionImpl.this.redeliveries.poll();
                        if (redelivery != null) {
                            PagedReference redeliveredMsg;
                            this.isredelivery = true;
                            this.lastRedelivery = redeliveredMsg = PageSubscriptionImpl.this.getReference(redelivery);
                            return redeliveredMsg;
                        }
                        this.lastRedelivery = null;
                        this.isredelivery = false;
                        message = PageSubscriptionImpl.this.internalGetNext(tmpPosition);
                    }
                    if (message == null) break;
                    tmpPosition = message.getPosition();
                    boolean valid = true;
                    boolean ignored = false;
                    valid = PageSubscriptionImpl.this.routed(message.getPagedMessage());
                    if (!valid) {
                        ignored = true;
                    }
                    if ((info = PageSubscriptionImpl.this.getPageInfo(message.getPosition().getPageNr(), false)) != null && (info.isRemoved(message.getPosition()) || info.getCompleteInfo() != null)) continue;
                    if (valid && message.getPagedMessage().getTransactionID() >= 0L) {
                        PageTransactionInfo tx = PageSubscriptionImpl.this.pageStore.getPagingManager().getTransaction(message.getPagedMessage().getTransactionID());
                        if (tx == null) {
                            HornetQServerLogger.LOGGER.pageSubscriptionCouldntLoad(message.getPagedMessage().getTransactionID(), message.getPosition(), PageSubscriptionImpl.this.pageStore.getAddress(), PageSubscriptionImpl.this.queue.getName());
                            valid = false;
                            ignored = true;
                        } else if (tx.deliverAfterCommit(PageSubscriptionImpl.this, message.getPosition())) {
                            valid = false;
                            ignored = false;
                        }
                    }
                    if (valid && info != null && info.isRemoved(message.getPosition())) {
                        valid = false;
                    }
                    if (!ignored) {
                        this.position = message.getPosition();
                    }
                    if (valid) {
                        match = PageSubscriptionImpl.this.match(message.getMessage());
                        if (match) continue;
                        PageSubscriptionImpl.this.processACK(message.getPosition());
                        continue;
                    }
                    if (!ignored) continue;
                    PageSubscriptionImpl.this.positionIgnored(message.getPosition());
                } while (message != null && !match);
                if (message != null) {
                    this.lastOperation = lastPosition;
                }
                return message;
            }
        }

        @Override
        public synchronized boolean hasNext() {
            if (this.cachedNext != null) {
                return true;
            }
            if (!PageSubscriptionImpl.this.pageStore.isPaging()) {
                return false;
            }
            this.cachedNext = this.next();
            return this.cachedNext != null;
        }

        @Override
        public void remove() {
            PageSubscriptionImpl.this.deliveredCount.incrementAndGet();
            PageCursorInfo info = PageSubscriptionImpl.this.getPageInfo(this.position);
            if (info != null) {
                info.remove(this.position);
            }
        }

        @Override
        public void close() {
        }
    }

    private static final class PageCursorTX
    extends TransactionOperationAbstract {
        private final Map<PageSubscriptionImpl, List<PagePosition>> pendingPositions = new HashMap<PageSubscriptionImpl, List<PagePosition>>();

        private PageCursorTX() {
        }

        private void addPositionConfirmation(PageSubscriptionImpl cursor, PagePosition position) {
            List<PagePosition> list = this.pendingPositions.get(cursor);
            if (list == null) {
                list = new LinkedList<PagePosition>();
                this.pendingPositions.put(cursor, list);
            }
            list.add(position);
        }

        @Override
        public void afterCommit(Transaction tx) {
            for (Map.Entry<PageSubscriptionImpl, List<PagePosition>> entry : this.pendingPositions.entrySet()) {
                PageSubscriptionImpl cursor = entry.getKey();
                List<PagePosition> positions = entry.getValue();
                for (PagePosition confirmed : positions) {
                    cursor.processACK(confirmed);
                    cursor.deliveredCount.decrementAndGet();
                }
            }
        }

        @Override
        public List<MessageReference> getRelatedMessageReferences() {
            return Collections.emptyList();
        }
    }

    private final class PageCursorInfo {
        private final int numberOfMessages;
        private final long pageId;
        private final Set<PagePosition> acks = Collections.synchronizedSet(new LinkedHashSet());
        private WeakReference<PageCache> cache;
        private final Set<PagePosition> removedReferences = new ConcurrentHashSet<PagePosition>();
        private final boolean wasLive;
        private final AtomicInteger pendingTX = new AtomicInteger(0);
        private boolean pendingDelete;
        private PagePosition completePage;
        private final AtomicInteger confirmed = new AtomicInteger(0);

        public String toString() {
            return "PageCursorInfo::PageID=" + this.pageId + " numberOfMessage = " + this.numberOfMessages + ", confirmed = " + this.confirmed + ", isDone=" + this.isDone();
        }

        public PageCursorInfo(long pageId, int numberOfMessages, PageCache cache) {
            this.pageId = pageId;
            this.numberOfMessages = numberOfMessages;
            if (cache != null) {
                this.wasLive = cache.isLive();
                this.cache = new WeakReference<PageCache>(cache);
            } else {
                this.wasLive = false;
            }
        }

        public void setCompleteInfo(PagePosition completePage) {
            this.completePage = completePage;
        }

        public PagePosition getCompleteInfo() {
            return this.completePage;
        }

        public boolean isDone() {
            return this.completePage != null || this.getNumberOfMessages() == this.confirmed.get() && this.pendingTX.get() == 0;
        }

        public boolean isPendingDelete() {
            return this.pendingDelete || this.completePage != null;
        }

        public void setPendingDelete() {
            this.pendingDelete = true;
        }

        public long getPageId() {
            return this.pageId;
        }

        public void incrementPendingTX() {
            this.pendingTX.incrementAndGet();
        }

        public void decrementPendingTX() {
            this.pendingTX.decrementAndGet();
            this.checkDone();
        }

        public boolean isRemoved(PagePosition pos) {
            return this.removedReferences.contains(pos);
        }

        public void remove(PagePosition position) {
            this.removedReferences.add(position);
        }

        public void addACK(PagePosition posACK) {
            boolean added;
            if (PageSubscriptionImpl.this.isTrace) {
                HornetQServerLogger.LOGGER.trace("numberOfMessages =  " + this.getNumberOfMessages() + " confirmed =  " + (this.confirmed.get() + 1) + " pendingTX = " + this.pendingTX + ", page = " + this.pageId + " posACK = " + posACK);
            }
            if ((added = this.internalAddACK(posACK)) && posACK.getMessageNr() >= 0) {
                this.confirmed.incrementAndGet();
                this.checkDone();
            }
        }

        public void loadACK(PagePosition posACK) {
            if (this.internalAddACK(posACK) && posACK.getMessageNr() >= 0) {
                this.confirmed.incrementAndGet();
            }
        }

        private boolean internalAddACK(PagePosition posACK) {
            this.removedReferences.add(posACK);
            return this.acks.add(posACK);
        }

        protected void checkDone() {
            if (this.isDone()) {
                PageSubscriptionImpl.this.onPageDone(this);
            }
        }

        private int getNumberOfMessages() {
            if (this.wasLive) {
                PageCache localcache = (PageCache)this.cache.get();
                if (localcache == null) {
                    localcache = PageSubscriptionImpl.this.cursorProvider.getPageCache(this.pageId);
                    this.cache = new WeakReference<PageCache>(localcache);
                }
                return localcache.getNumberOfMessages();
            }
            return this.numberOfMessages;
        }
    }
}

