/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.executors;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class SemaphoreCompletionService<T>
implements CompletionService<T> {
    private static final Log log = LogFactory.getLog(SemaphoreCompletionService.class);
    private static final boolean trace = log.isTraceEnabled();
    private final Executor executor;
    private final CustomSemaphore semaphore;
    private final BlockingQueue<QueueingTask> queue = new LinkedBlockingQueue<QueueingTask>();
    private final BlockingQueue<QueueingTask> completionQueue = new LinkedBlockingQueue<QueueingTask>();

    public SemaphoreCompletionService(Executor executor, int maxConcurrentTasks) {
        this.executor = executor;
        this.semaphore = new CustomSemaphore(maxConcurrentTasks);
    }

    public List<? extends Future<T>> drainCompletionQueue() {
        ArrayList list = new ArrayList();
        this.completionQueue.drainTo(list);
        return list;
    }

    public void cancelQueuedTasks() {
        ArrayList queuedTasks = new ArrayList();
        this.queue.drainTo(queuedTasks);
        for (QueueingTask task : queuedTasks) {
            task.cancel(false);
        }
    }

    public void continueTaskInBackground() {
        if (trace) {
            log.tracef("Moving task to background, available permits %d", (Object)this.semaphore.availablePermits());
        }
        this.semaphore.removePermit();
    }

    public Future<T> backgroundTaskFinished(Callable<T> cleanupTask) {
        QueueingTask futureTask = null;
        if (cleanupTask != null) {
            if (trace) {
                log.tracef("Background task finished, executing cleanup task", new Object[0]);
            }
            futureTask = new QueueingTask(cleanupTask);
            this.executor.execute(futureTask);
        } else {
            this.semaphore.release();
            if (trace) {
                log.tracef("Background task finished, available permits %d", (Object)this.semaphore.availablePermits());
            }
            this.executeFront();
        }
        return futureTask;
    }

    @Override
    public Future<T> submit(Callable<T> task) {
        QueueingTask futureTask = new QueueingTask(task);
        this.queue.add(futureTask);
        if (trace) {
            log.tracef("New task submitted, tasks in queue %d, available permits %d", (Object)this.queue.size(), (Object)this.semaphore.availablePermits());
        }
        this.executeFront();
        return futureTask;
    }

    @Override
    public Future<T> submit(Runnable task, T result) {
        QueueingTask futureTask = new QueueingTask(task, result);
        this.queue.add(futureTask);
        if (trace) {
            log.tracef("New task submitted, tasks in queue %d, available permits %d", (Object)this.queue.size(), (Object)this.semaphore.availablePermits());
        }
        this.executeFront();
        return futureTask;
    }

    private void executeFront() {
        while (!this.queue.isEmpty() && this.semaphore.tryAcquire()) {
            QueueingTask next = (QueueingTask)this.queue.poll();
            if (next != null) {
                this.executor.execute(next);
                return;
            }
            this.semaphore.release();
        }
    }

    @Override
    public Future<T> take() throws InterruptedException {
        return this.completionQueue.take();
    }

    @Override
    public Future<T> poll() {
        return (Future)this.completionQueue.poll();
    }

    @Override
    public Future<T> poll(long timeout, TimeUnit unit) throws InterruptedException {
        return this.completionQueue.poll(timeout, unit);
    }

    private static class CustomSemaphore
    extends Semaphore {
        public CustomSemaphore(int permits) {
            super(permits);
        }

        protected void removePermit() {
            super.reducePermits(1);
        }
    }

    private class QueueingTask
    extends FutureTask<T> {
        public QueueingTask(Callable<T> task) {
            super(task);
        }

        public QueueingTask(Runnable runnable, Object result) {
            super(runnable, result);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                QueueingTask next = this;
                do {
                    next.runInternal();
                    if (SemaphoreCompletionService.this.semaphore.availablePermits() >= 0) continue;
                    break;
                } while ((next = (QueueingTask)SemaphoreCompletionService.this.queue.poll()) != null);
            }
            finally {
                SemaphoreCompletionService.this.semaphore.release();
                if (trace) {
                    log.tracef("All queued tasks finished, available permits %d", (Object)SemaphoreCompletionService.this.semaphore.availablePermits());
                }
                if (!SemaphoreCompletionService.this.queue.isEmpty()) {
                    SemaphoreCompletionService.this.executeFront();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void runInternal() {
            try {
                if (trace) {
                    log.tracef("Task started, tasks in queue %d, available permits %d", (Object)SemaphoreCompletionService.this.queue.size(), (Object)SemaphoreCompletionService.this.semaphore.availablePermits());
                }
                super.run();
            }
            finally {
                SemaphoreCompletionService.this.completionQueue.offer(this);
                if (trace) {
                    log.tracef("Task finished, tasks in queue %d, available permits %d", (Object)SemaphoreCompletionService.this.queue.size(), (Object)SemaphoreCompletionService.this.semaphore.availablePermits());
                }
            }
        }
    }
}

