/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.faulttolerance.core.bulkhead;

import io.smallrye.faulttolerance.core.FaultToleranceStrategy;
import io.smallrye.faulttolerance.core.InvocationContext;
import io.smallrye.faulttolerance.core.bulkhead.BulkheadBase;
import io.smallrye.faulttolerance.core.bulkhead.BulkheadEvents;
import io.smallrye.faulttolerance.core.bulkhead.BulkheadLogger;
import io.smallrye.faulttolerance.core.util.CompletionStages;
import java.util.Deque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Semaphore;

public class CompletionStageThreadPoolBulkhead<V>
extends BulkheadBase<CompletionStage<V>> {
    private final Deque<CompletionStageBulkheadTask> queue = new ConcurrentLinkedDeque<CompletionStageBulkheadTask>();
    private final Semaphore capacitySemaphore;
    private final Semaphore workSemaphore;

    public CompletionStageThreadPoolBulkhead(FaultToleranceStrategy<CompletionStage<V>> delegate, String description, int size, int queueSize) {
        super(description, delegate);
        this.capacitySemaphore = new Semaphore(size + queueSize, true);
        this.workSemaphore = new Semaphore(size, true);
    }

    @Override
    public CompletionStage<V> apply(InvocationContext<CompletionStage<V>> ctx) {
        BulkheadLogger.LOG.trace("CompletionStageBulkhead started");
        try {
            CompletionStage<V> completionStage = this.doApply(ctx);
            return completionStage;
        }
        finally {
            BulkheadLogger.LOG.trace("CompletionStageBulkhead finished");
        }
    }

    private CompletionStage<V> doApply(InvocationContext<CompletionStage<V>> ctx) {
        if (this.capacitySemaphore.tryAcquire()) {
            BulkheadLogger.LOG.trace("Capacity semaphore acquired, accepting task into bulkhead");
            ctx.fireEvent(BulkheadEvents.DecisionMade.ACCEPTED);
            ctx.fireEvent(BulkheadEvents.StartedWaiting.INSTANCE);
            CompletionStageBulkheadTask task = new CompletionStageBulkheadTask(ctx);
            this.queue.addLast(task);
            this.runQueuedTask();
            return task.result;
        }
        BulkheadLogger.LOG.trace("Capacity semaphore not acquired, rejecting task from bulkhead");
        ctx.fireEvent(BulkheadEvents.DecisionMade.REJECTED);
        return CompletionStages.failedStage(this.bulkheadRejected());
    }

    private void runQueuedTask() {
        CompletionStageBulkheadTask queuedTask = this.queue.pollFirst();
        if (queuedTask != null) {
            if (this.workSemaphore.tryAcquire()) {
                BulkheadLogger.LOG.trace("Work semaphore acquired, running task");
                queuedTask.run();
            } else {
                BulkheadLogger.LOG.trace("Work semaphore not acquired, putting task back to queue");
                this.queue.addFirst(queuedTask);
            }
        }
    }

    int getQueueSize() {
        return this.queue.size();
    }

    private class CompletionStageBulkheadTask {
        private final CompletableFuture<V> result = new CompletableFuture();
        private final InvocationContext<CompletionStage<V>> ctx;

        private CompletionStageBulkheadTask(InvocationContext<CompletionStage<V>> ctx) {
            this.ctx = ctx;
        }

        public void run() {
            this.ctx.fireEvent(BulkheadEvents.FinishedWaiting.INSTANCE);
            this.ctx.fireEvent(BulkheadEvents.StartedRunning.INSTANCE);
            try {
                CompletionStage rawResult = CompletionStageThreadPoolBulkhead.this.delegate.apply(this.ctx);
                rawResult.whenComplete((value, error) -> {
                    this.releaseSemaphores();
                    this.ctx.fireEvent(BulkheadEvents.FinishedRunning.INSTANCE);
                    if (error != null) {
                        this.result.completeExceptionally((Throwable)error);
                    } else {
                        this.result.complete(value);
                    }
                    CompletionStageThreadPoolBulkhead.this.runQueuedTask();
                });
            }
            catch (Exception e) {
                this.releaseSemaphores();
                this.ctx.fireEvent(BulkheadEvents.FinishedRunning.INSTANCE);
                this.result.completeExceptionally(e);
                CompletionStageThreadPoolBulkhead.this.runQueuedTask();
            }
        }

        private void releaseSemaphores() {
            CompletionStageThreadPoolBulkhead.this.workSemaphore.release();
            BulkheadLogger.LOG.trace("Work semaphore released, task finished");
            CompletionStageThreadPoolBulkhead.this.capacitySemaphore.release();
            BulkheadLogger.LOG.trace("Capacity semaphore released, task leaving bulkhead");
        }
    }
}

