package org.jboss.cache.loader;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.hsqldb.Tokens;
import org.jboss.cache.CacheException;
import org.jboss.cache.Fqn;
import org.jboss.cache.Modification;
import org.jboss.cache.config.CacheLoaderConfig;
import org.jboss.cache.util.Immutables;

/* loaded from: input_file:APP-INF/lib/jbosscache-core-3.2.7.GA.jar:org/jboss/cache/loader/AsyncCacheLoader.class */
public class AsyncCacheLoader extends AbstractDelegatingCacheLoader {
    private static final Log log = LogFactory.getLog(AsyncCacheLoader.class);
    private static final boolean trace = log.isTraceEnabled();
    private static AtomicInteger threadId = new AtomicInteger(0);
    private static final int DEFAULT_QUEUE_SIZE = 10000;
    private AsyncCacheLoaderConfig config;
    private ExecutorService executor;
    private AtomicBoolean stopped;
    private BlockingQueue<Modification> queue;
    private List<Future> processorFutures;

    /* loaded from: input_file:APP-INF/lib/jbosscache-core-3.2.7.GA.jar:org/jboss/cache/loader/AsyncCacheLoader$AsyncProcessor.class */
    private class AsyncProcessor implements Runnable {
        private final List<Modification> mods;

        private AsyncProcessor() {
            this.mods = new ArrayList(AsyncCacheLoader.this.config.getBatchSize());
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    run0();
                } catch (InterruptedException e) {
                }
            }
            try {
                if (AsyncCacheLoader.trace) {
                    AsyncCacheLoader.log.trace("process remaining batch " + this.mods.size());
                }
                put(this.mods);
                if (AsyncCacheLoader.trace) {
                    AsyncCacheLoader.log.trace("process remaining queued " + AsyncCacheLoader.this.queue.size());
                }
                while (!AsyncCacheLoader.this.queue.isEmpty()) {
                    run0();
                }
            } catch (InterruptedException e2) {
                AsyncCacheLoader.log.trace("remaining interrupted");
            }
        }

        private void run0() throws InterruptedException {
            AsyncCacheLoader.log.trace("Checking for modifications");
            if (AsyncCacheLoader.this.queue.drainTo(this.mods, AsyncCacheLoader.this.config.getBatchSize()) == 0) {
                this.mods.add((Modification) AsyncCacheLoader.this.queue.take());
            }
            if (AsyncCacheLoader.trace) {
                AsyncCacheLoader.log.trace("Calling put(List) with " + this.mods.size() + " modifications");
            }
            put(this.mods);
            this.mods.clear();
        }

        private void put(List<Modification> list) {
            try {
                AsyncCacheLoader.super.put(list);
            } catch (Exception e) {
                if (AsyncCacheLoader.log.isWarnEnabled()) {
                    AsyncCacheLoader.log.warn("Failed to process async modifications: " + e);
                }
                if (AsyncCacheLoader.log.isDebugEnabled()) {
                    AsyncCacheLoader.log.debug("Exception: ", e);
                }
            }
        }
    }

    public AsyncCacheLoader() {
        super(null);
        this.stopped = new AtomicBoolean(true);
        this.queue = new ArrayBlockingQueue(10000);
    }

    public AsyncCacheLoader(CacheLoader cacheLoader) {
        super(cacheLoader);
        this.stopped = new AtomicBoolean(true);
        this.queue = new ArrayBlockingQueue(10000);
    }

    @Override // org.jboss.cache.loader.AbstractDelegatingCacheLoader, org.jboss.cache.loader.CacheLoader
    public void setConfig(CacheLoaderConfig.IndividualCacheLoaderConfig individualCacheLoaderConfig) {
        if (individualCacheLoaderConfig instanceof AsyncCacheLoaderConfig) {
            this.config = (AsyncCacheLoaderConfig) individualCacheLoaderConfig;
        } else {
            this.config = new AsyncCacheLoaderConfig(individualCacheLoaderConfig);
        }
        if (this.config.getQueueSize() > 0) {
            this.queue = new ArrayBlockingQueue(this.config.getQueueSize());
        }
        super.setConfig(individualCacheLoaderConfig);
    }

    @Override // org.jboss.cache.loader.AbstractDelegatingCacheLoader, org.jboss.cache.loader.CacheLoader
    public Map get(Fqn fqn) throws Exception {
        try {
            return super.get(fqn);
        } catch (IOException e) {
            log.trace(e);
            return new HashMap();
        }
    }

    Object get(Fqn fqn, Object obj) throws Exception {
        if (!this.config.getReturnOld()) {
            return null;
        }
        try {
            Map map = super.get(fqn);
            if (map != null) {
                return map.get(obj);
            }
            return null;
        } catch (IOException e) {
            log.trace(e);
            return null;
        }
    }

    @Override // org.jboss.cache.loader.AbstractDelegatingCacheLoader, org.jboss.cache.loader.AbstractCacheLoader, org.jboss.cache.loader.CacheLoader
    public void prepare(Object obj, List<Modification> list, boolean z) throws Exception {
        if (z) {
            put(list);
        } else {
            this.transactions.put(obj, list);
        }
    }

    @Override // org.jboss.cache.loader.AbstractDelegatingCacheLoader, org.jboss.cache.loader.AbstractCacheLoader, org.jboss.cache.loader.CacheLoader
    public void commit(Object obj) throws Exception {
        List<Modification> remove = this.transactions.remove(obj);
        if (remove == null) {
            throw new Exception("transaction " + obj + " not found in transaction table");
        }
        put(remove);
    }

    @Override // org.jboss.cache.loader.AbstractDelegatingCacheLoader, org.jboss.cache.loader.AbstractCacheLoader, org.jboss.cache.loader.CacheLoader
    public void rollback(Object obj) {
        this.transactions.remove(obj);
    }

    @Override // org.jboss.cache.loader.AbstractDelegatingCacheLoader, org.jboss.cache.loader.CacheLoader
    public Object put(Fqn fqn, Object obj, Object obj2) throws Exception {
        if (!this.config.getUseAsyncPut()) {
            return super.put(fqn, obj, obj2);
        }
        Object obj3 = get(fqn, obj);
        enqueue(new Modification(Modification.ModificationType.PUT_KEY_VALUE, fqn, obj, obj2));
        return obj3;
    }

    @Override // org.jboss.cache.loader.AbstractDelegatingCacheLoader, org.jboss.cache.loader.CacheLoader
    public void put(Fqn fqn, Map map) throws Exception {
        if (!this.config.getUseAsyncPut()) {
            super.put(fqn, map);
        } else {
            enqueue(new Modification(Modification.ModificationType.PUT_DATA, fqn, map == null ? null : Immutables.immutableMapCopy(map)));
        }
    }

    @Override // org.jboss.cache.loader.AbstractDelegatingCacheLoader, org.jboss.cache.loader.AbstractCacheLoader, org.jboss.cache.loader.CacheLoader
    public void put(List<Modification> list) throws Exception {
        if (!this.config.getUseAsyncPut()) {
            super.put(list);
            return;
        }
        Iterator<Modification> it = list.iterator();
        while (it.hasNext()) {
            enqueue(it.next());
        }
    }

    @Override // org.jboss.cache.loader.AbstractDelegatingCacheLoader, org.jboss.cache.loader.CacheLoader
    public Object remove(Fqn fqn, Object obj) throws Exception {
        Object obj2 = get(fqn, obj);
        enqueue(new Modification(Modification.ModificationType.REMOVE_KEY_VALUE, fqn, obj));
        return obj2;
    }

    @Override // org.jboss.cache.loader.AbstractDelegatingCacheLoader, org.jboss.cache.loader.CacheLoader
    public void remove(Fqn fqn) throws Exception {
        enqueue(new Modification(Modification.ModificationType.REMOVE_NODE, fqn));
    }

    @Override // org.jboss.cache.loader.AbstractDelegatingCacheLoader, org.jboss.cache.loader.CacheLoader
    public void removeData(Fqn fqn) throws Exception {
        enqueue(new Modification(Modification.ModificationType.REMOVE_DATA, fqn));
    }

    @Override // org.jboss.cache.loader.AbstractDelegatingCacheLoader, org.jboss.cache.loader.AbstractCacheLoader, org.jboss.cache.Lifecycle
    public void start() throws Exception {
        if (log.isInfoEnabled()) {
            log.info("Async cache loader starting: " + this);
        }
        this.stopped.set(false);
        super.start();
        this.executor = Executors.newFixedThreadPool(this.config.getThreadPoolSize(), new ThreadFactory() { // from class: org.jboss.cache.loader.AsyncCacheLoader.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable, "AsyncCacheLoader-" + AsyncCacheLoader.threadId.getAndIncrement());
                thread.setDaemon(true);
                return thread;
            }
        });
        this.processorFutures = new ArrayList(this.config.getThreadPoolSize());
        for (int i = 0; i < this.config.getThreadPoolSize(); i++) {
            this.processorFutures.add(this.executor.submit(new AsyncProcessor()));
        }
    }

    @Override // org.jboss.cache.loader.AbstractDelegatingCacheLoader, org.jboss.cache.loader.AbstractCacheLoader, org.jboss.cache.Lifecycle
    public void stop() {
        this.stopped.set(true);
        if (this.executor != null) {
            Iterator<Future> it = this.processorFutures.iterator();
            while (it.hasNext()) {
                it.next().cancel(true);
            }
            this.executor.shutdown();
            try {
                boolean isTerminated = this.executor.isTerminated();
                while (!isTerminated) {
                    isTerminated = this.executor.awaitTermination(60L, TimeUnit.SECONDS);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        this.executor = null;
        super.stop();
    }

    private void enqueue(Modification modification) throws CacheException, InterruptedException {
        if (this.stopped.get()) {
            throw new CacheException("AsyncCacheLoader stopped; no longer accepting more entries.");
        }
        if (trace) {
            log.trace("Enqueuing modification " + modification);
        }
        this.queue.put(modification);
    }

    public String toString() {
        return super.toString() + " delegate=[" + super.getCacheLoader() + Tokens.T_RIGHTBRACKET + " stopped=" + this.stopped + " batchSize=" + this.config.getBatchSize() + " returnOld=" + this.config.getReturnOld() + " asyncPut=" + this.config.getUseAsyncPut() + " threadPoolSize=" + this.config.getThreadPoolSize() + " queue.remainingCapacity()=" + this.queue.remainingCapacity() + " queue.peek()=" + this.queue.peek();
    }
}
