package org.hornetq.core.paging.cursor.impl;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.cursor.PageCache;
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PagedReference;
import org.hornetq.core.paging.cursor.PagedReferenceImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.Future;
import org.hornetq.utils.SoftValueHashMap;
import org.jboss.netty.util.internal.ConcurrentHashMap;

/* loaded from: input_file:org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.class */
public class PageCursorProviderImpl implements PageCursorProvider {
    private static final Logger log = Logger.getLogger(PageCursorProviderImpl.class);
    private final PagingStore pagingStore;
    private final StorageManager storageManager;
    private final ExecutorFactory executorFactory;
    private final Executor executor;
    private final SoftValueHashMap<Long, PageCache> softCache;
    private final ConcurrentMap<Long, PageSubscription> activeCursors = new ConcurrentHashMap();

    public PageCursorProviderImpl(PagingStore pagingStore, StorageManager storageManager, ExecutorFactory executorFactory, int i) {
        this.pagingStore = pagingStore;
        this.storageManager = storageManager;
        this.executorFactory = executorFactory;
        this.executor = executorFactory.getExecutor();
        this.softCache = new SoftValueHashMap<>(i);
    }

    @Override // org.hornetq.core.paging.cursor.PageCursorProvider
    public PagingStore getAssociatedStore() {
        return this.pagingStore;
    }

    @Override // org.hornetq.core.paging.cursor.PageCursorProvider
    public synchronized PageSubscription createSubscription(long j, Filter filter, boolean z) {
        if (this.activeCursors.get(Long.valueOf(j)) != null) {
            throw new IllegalStateException("Cursor " + j + " had already been created");
        }
        PageSubscriptionImpl pageSubscriptionImpl = new PageSubscriptionImpl(this, this.pagingStore, this.storageManager, this.executorFactory.getExecutor(), filter, j, z);
        this.activeCursors.put(Long.valueOf(j), pageSubscriptionImpl);
        return pageSubscriptionImpl;
    }

    @Override // org.hornetq.core.paging.cursor.PageCursorProvider
    public synchronized PageSubscription getSubscription(long j) {
        return this.activeCursors.get(Long.valueOf(j));
    }

    @Override // org.hornetq.core.paging.cursor.PageCursorProvider
    public PagedMessage getMessage(PagePosition pagePosition) throws Exception {
        PageCache pageCache = getPageCache(pagePosition);
        if (pagePosition.getMessageNr() >= pageCache.getNumberOfMessages()) {
            throw new IllegalStateException("Invalid messageNumber passed = " + pagePosition + " on " + pageCache);
        }
        return pageCache.getMessage(pagePosition.getMessageNr());
    }

    @Override // org.hornetq.core.paging.cursor.PageCursorProvider
    public PagedReference newReference(PagePosition pagePosition, PagedMessage pagedMessage, PageSubscription pageSubscription) {
        return new PagedReferenceImpl(pagePosition, pagedMessage, pageSubscription);
    }

    @Override // org.hornetq.core.paging.cursor.PageCursorProvider
    public PageCache getPageCache(PagePosition pagePosition) {
        return getPageCache(pagePosition.getPageNr());
    }

    @Override // org.hornetq.core.paging.cursor.PageCursorProvider
    public void addPageCache(PageCache pageCache) {
        synchronized (this.softCache) {
            this.softCache.put((SoftValueHashMap<Long, PageCache>) Long.valueOf(pageCache.getPageId()), (Long) pageCache);
        }
    }

    @Override // org.hornetq.core.paging.cursor.PageCursorProvider
    public int getCacheMaxSize() {
        return this.softCache.getMaxEelements();
    }

    @Override // org.hornetq.core.paging.cursor.PageCursorProvider
    public void setCacheMaxSize(int i) {
        this.softCache.setMaxElements(i);
    }

    @Override // org.hornetq.core.paging.cursor.PageCursorProvider
    public int getCacheSize() {
        int size;
        synchronized (this.softCache) {
            size = this.softCache.size();
        }
        return size;
    }

    @Override // org.hornetq.core.paging.cursor.PageCursorProvider
    public void processReload() throws Exception {
        Iterator<PageSubscription> it = this.activeCursors.values().iterator();
        while (it.hasNext()) {
            it.next().processReload();
        }
        cleanup();
    }

    @Override // org.hornetq.core.paging.cursor.PageCursorProvider
    public void stop() {
        Iterator<PageSubscription> it = this.activeCursors.values().iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
        Future future = new Future();
        this.executor.execute(future);
        while (!future.await(10000L)) {
            log.warn("Waiting cursor provider " + this + " to finish executors");
        }
    }

    @Override // org.hornetq.core.paging.cursor.PageCursorProvider
    public void flushExecutors() {
        Iterator<PageSubscription> it = this.activeCursors.values().iterator();
        while (it.hasNext()) {
            it.next().flushExecutors();
        }
        Future future = new Future();
        this.executor.execute(future);
        while (!future.await(10000L)) {
            log.warn("Waiting cursor provider " + this + " to finish executors");
        }
    }

    @Override // org.hornetq.core.paging.cursor.PageCursorProvider
    public void close(PageSubscription pageSubscription) {
        this.activeCursors.remove(Long.valueOf(pageSubscription.getId()));
        scheduleCleanup();
    }

    @Override // org.hornetq.core.paging.cursor.PageCursorProvider
    public void scheduleCleanup() {
        this.executor.execute(new Runnable() { // from class: org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl.1
            @Override // java.lang.Runnable
            public void run() {
                PageCursorProviderImpl.this.cleanup();
            }
        });
    }

    @Override // org.hornetq.core.paging.cursor.PageCursorProvider
    public void cleanup() {
        Page depage;
        ArrayList arrayList = new ArrayList();
        this.pagingStore.lock();
        synchronized (this) {
            try {
                try {
                    if (this.pagingStore.isStarted()) {
                        if (this.pagingStore.getNumberOfPages() == 0) {
                            this.pagingStore.unlock();
                            return;
                        }
                        ArrayList arrayList2 = new ArrayList();
                        arrayList2.addAll(this.activeCursors.values());
                        long checkMinPage = checkMinPage(arrayList2);
                        if (checkMinPage == this.pagingStore.getCurrentWritingPage() && this.pagingStore.getCurrentPage().getNumberOfMessages() > 0) {
                            boolean z = true;
                            Iterator it = arrayList2.iterator();
                            while (true) {
                                if (it.hasNext()) {
                                    if (!((PageSubscription) it.next()).isComplete(checkMinPage)) {
                                        z = false;
                                        break;
                                    }
                                } else {
                                    break;
                                }
                            }
                            if (z) {
                                log.info("Address " + ((Object) this.pagingStore.getAddress()) + " is leaving page mode as all messages are consumed and acknowledged from the page store");
                                this.pagingStore.forceAnotherPage();
                                Page currentPage = this.pagingStore.getCurrentPage();
                                try {
                                    Iterator it2 = arrayList2.iterator();
                                    while (it2.hasNext()) {
                                        ((PageSubscription) it2.next()).confirmPosition(new PagePositionImpl(currentPage.getPageId(), -1));
                                    }
                                    this.storageManager.waitOnOperations();
                                    Iterator it3 = arrayList2.iterator();
                                    while (it3.hasNext()) {
                                        ((PageSubscription) it3.next()).enableAutoCleanup();
                                    }
                                    this.pagingStore.stopPaging();
                                    Iterator it4 = arrayList2.iterator();
                                    while (it4.hasNext()) {
                                        ((PageSubscription) it4.next()).scheduleCleanupCheck();
                                    }
                                } catch (Throwable th) {
                                    Iterator it5 = arrayList2.iterator();
                                    while (it5.hasNext()) {
                                        ((PageSubscription) it5.next()).enableAutoCleanup();
                                    }
                                    throw th;
                                }
                            }
                        }
                        for (long firstPage = this.pagingStore.getFirstPage(); firstPage < checkMinPage && (depage = this.pagingStore.depage()) != null; firstPage++) {
                            arrayList.add(depage);
                        }
                        if (this.pagingStore.getNumberOfPages() == 0 || (this.pagingStore.getNumberOfPages() == 1 && this.pagingStore.getCurrentPage().getNumberOfMessages() == 0)) {
                            this.pagingStore.stopPaging();
                        }
                        this.pagingStore.unlock();
                        try {
                            Iterator it6 = arrayList.iterator();
                            while (it6.hasNext()) {
                                ((Page) it6.next()).delete();
                                synchronized (this.softCache) {
                                    this.softCache.remove((Object) Long.valueOf(r0.getPageId()));
                                }
                            }
                        } catch (Exception e) {
                            log.warn("Couldn't complete cleanup on paging", e);
                        }
                    }
                } finally {
                    this.pagingStore.unlock();
                }
            } catch (Exception e2) {
                log.warn("Couldn't complete cleanup on paging", e2);
                this.pagingStore.unlock();
            }
        }
    }

    @Override // org.hornetq.core.paging.cursor.PageCursorProvider
    public void printDebug() {
        System.out.println("Debug information for PageCursorProviderImpl:");
        Iterator<PageCache> it = this.softCache.values().iterator();
        while (it.hasNext()) {
            System.out.println("Cache " + it.next());
        }
    }

    protected PageCacheImpl createPageCache(long j) throws Exception {
        return new PageCacheImpl(this.pagingStore.createPage((int) j));
    }

    private long checkMinPage(List<PageSubscription> list) {
        long j = Long.MAX_VALUE;
        Iterator<PageSubscription> it = list.iterator();
        while (it.hasNext()) {
            long firstPage = it.next().getFirstPage();
            if (firstPage < j) {
                j = firstPage;
            }
        }
        return j;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v12, types: [org.hornetq.core.paging.cursor.PageCache] */
    private PageCache getPageCache(long j) {
        try {
            boolean z = false;
            synchronized (this.softCache) {
                if (j > this.pagingStore.getCurrentWritingPage()) {
                    return null;
                }
                PageCacheImpl pageCacheImpl = this.softCache.get((Object) Long.valueOf(j));
                if (pageCacheImpl == null) {
                    pageCacheImpl = createPageCache(j);
                    z = true;
                    pageCacheImpl.lock();
                    this.softCache.put((SoftValueHashMap<Long, PageCache>) Long.valueOf(j), (Long) pageCacheImpl);
                }
                if (z) {
                    Page page = null;
                    try {
                        page = this.pagingStore.createPage((int) j);
                        page.open();
                        List<PagedMessage> read = page.read();
                        Iterator<PagedMessage> it = read.iterator();
                        while (it.hasNext()) {
                            it.next().initMessage(this.storageManager);
                        }
                        pageCacheImpl.setMessages((PagedMessage[]) read.toArray(new PagedMessage[read.size()]));
                        if (page != null) {
                            try {
                                page.close();
                            } catch (Throwable th) {
                            }
                        }
                        pageCacheImpl.unlock();
                    } catch (Throwable th2) {
                        if (page != null) {
                            try {
                                page.close();
                            } catch (Throwable th3) {
                                pageCacheImpl.unlock();
                                throw th2;
                            }
                        }
                        pageCacheImpl.unlock();
                        throw th2;
                    }
                }
                return pageCacheImpl;
            }
        } catch (Exception e) {
            throw new RuntimeException("Couldn't complete paging due to an IO Exception on Paging - " + e.getMessage(), e);
        }
    }
}
