/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.persistence;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
import com.hivemq.persistence.ProducerQueues;
import com.hivemq.persistence.SingleWriterService;
import com.hivemq.persistence.SingleWriterServiceImpl;
import com.hivemq.persistence.local.xodus.bucket.BucketUtils;
import com.hivemq.util.ThreadFactoryUtil;
import java.util.List;
import java.util.Queue;
import java.util.SplittableRandom;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class ProducerQueuesImpl
implements ProducerQueues {
    private final AtomicLong taskCount = new AtomicLong(0L);
    private final int amountOfQueues;
    @VisibleForTesting
    final int bucketsPerQueue;
    @VisibleForTesting
    @NotNull
    final ImmutableList<Queue<TaskWithFuture<?>>> queues;
    @NotNull
    private final ImmutableList<AtomicBoolean> locks;
    @NotNull
    private final ImmutableList<AtomicLong> queueTaskCounter;
    @NotNull
    private final SingleWriterServiceImpl singleWriterServiceImpl;
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    @Nullable
    private ListenableFuture<Void> closeFuture;
    private long shutdownStartTime = Long.MAX_VALUE;

    public ProducerQueuesImpl(SingleWriterServiceImpl singleWriterServiceImpl, int amountOfQueues) {
        this.singleWriterServiceImpl = singleWriterServiceImpl;
        int bucketCount = singleWriterServiceImpl.getPersistenceBucketCount();
        this.amountOfQueues = amountOfQueues;
        this.bucketsPerQueue = bucketCount / amountOfQueues;
        ImmutableList.Builder queuesBuilder = ImmutableList.builder();
        for (int i = 0; i < amountOfQueues; ++i) {
            queuesBuilder.add(new ConcurrentLinkedQueue());
        }
        this.queues = queuesBuilder.build();
        ImmutableList.Builder locksBuilder = ImmutableList.builder();
        ImmutableList.Builder counterBuilder = ImmutableList.builder();
        for (int i = 0; i < amountOfQueues; ++i) {
            locksBuilder.add((Object)new AtomicBoolean());
            counterBuilder.add((Object)new AtomicLong(0L));
        }
        this.locks = locksBuilder.build();
        this.queueTaskCounter = counterBuilder.build();
    }

    @Override
    @NotNull
    public <R> ListenableFuture<R> submit(@NotNull String key, @NotNull SingleWriterService.Task<R> task) {
        return this.submitInternal(this.getBucket(key), task, false);
    }

    @Override
    @NotNull
    public <R> ListenableFuture<R> submit(int bucketIndex, @NotNull SingleWriterService.Task<R> task) {
        return this.submitInternal(bucketIndex, task, false);
    }

    @Nullable
    public <R> ListenableFuture<R> submitInternal(int bucketIndex, @NotNull SingleWriterService.Task<R> task, boolean ignoreShutdown) {
        if (!ignoreShutdown && this.shutdown.get() && System.currentTimeMillis() - this.shutdownStartTime > this.singleWriterServiceImpl.getShutdownGracePeriod()) {
            return SettableFuture.create();
        }
        int queueIndex = bucketIndex / this.bucketsPerQueue;
        Queue queue = (Queue)this.queues.get(queueIndex);
        SettableFuture resultFuture = SettableFuture.create();
        queue.add(new TaskWithFuture(resultFuture, task, bucketIndex));
        this.taskCount.incrementAndGet();
        this.singleWriterServiceImpl.getGlobalTaskCount().incrementAndGet();
        if (((AtomicLong)this.queueTaskCounter.get(queueIndex)).getAndIncrement() == 0L) {
            this.singleWriterServiceImpl.incrementNonemptyQueueCounter();
        }
        return resultFuture;
    }

    @NotNull
    public <R> List<ListenableFuture<R>> submitToAllBuckets(@NotNull SingleWriterService.Task<R> task, boolean parallel) {
        if (parallel) {
            return this.submitToAllBucketsParallel(task, false);
        }
        return this.submitToAllBucketsSequential(task);
    }

    @Override
    @NotNull
    public <R> List<ListenableFuture<R>> submitToAllBucketsParallel(@NotNull SingleWriterService.Task<R> task) {
        return this.submitToAllBucketsParallel(task, false);
    }

    @NotNull
    private <R> List<ListenableFuture<R>> submitToAllBucketsParallel(@NotNull SingleWriterService.Task<R> task, boolean ignoreShutdown) {
        ImmutableList.Builder builder = ImmutableList.builder();
        int bucketCount = this.singleWriterServiceImpl.getPersistenceBucketCount();
        for (int bucket = 0; bucket < bucketCount; ++bucket) {
            builder.add(this.submitInternal(bucket, task, ignoreShutdown));
        }
        return builder.build();
    }

    @Override
    @NotNull
    public <R> List<ListenableFuture<R>> submitToAllBucketsSequential(@NotNull SingleWriterService.Task<R> task) {
        ImmutableList.Builder builder = ImmutableList.builder();
        int bucketCount = this.singleWriterServiceImpl.getPersistenceBucketCount();
        ListenableFuture previousFuture = Futures.immediateFuture(null);
        int bucket = 0;
        while (bucket < bucketCount) {
            int finalBucket = bucket++;
            SettableFuture future = SettableFuture.create();
            previousFuture.addListener(() -> future.setFuture(this.submit(finalBucket, task)), MoreExecutors.directExecutor());
            previousFuture = future;
            builder.add((Object)future);
        }
        return builder.build();
    }

    @Override
    public int getBucket(@NotNull String key) {
        return BucketUtils.getBucket(key, this.singleWriterServiceImpl.getPersistenceBucketCount());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void execute(@NotNull SplittableRandom random) {
        int queueIndex = random.nextInt(this.amountOfQueues);
        if (((AtomicLong)this.queueTaskCounter.get(queueIndex)).get() == 0L) {
            return;
        }
        AtomicBoolean lock = (AtomicBoolean)this.locks.get(queueIndex);
        if (!lock.getAndSet(true)) {
            try {
                Queue queue = (Queue)this.queues.get(queueIndex);
                int creditCount = 0;
                while (creditCount < this.singleWriterServiceImpl.getCreditsPerExecution()) {
                    TaskWithFuture taskWithFuture = (TaskWithFuture)queue.poll();
                    if (taskWithFuture == null) {
                        return;
                    }
                    ++creditCount;
                    try {
                        Object result = taskWithFuture.getTask().doTask(taskWithFuture.getBucketIndex());
                        taskWithFuture.getFuture();
                        taskWithFuture.getFuture().set(result);
                    }
                    catch (Throwable e) {
                        taskWithFuture.getFuture();
                        taskWithFuture.getFuture().setException(e);
                    }
                    this.taskCount.decrementAndGet();
                    this.singleWriterServiceImpl.getGlobalTaskCount().decrementAndGet();
                    if (((AtomicLong)this.queueTaskCounter.get(queueIndex)).decrementAndGet() != 0L) continue;
                    this.singleWriterServiceImpl.decrementNonemptyQueueCounter();
                }
            }
            finally {
                lock.set(false);
            }
        }
    }

    @Override
    @NotNull
    public ListenableFuture<Void> shutdown(@Nullable SingleWriterService.Task<Void> finalTask) {
        if (this.shutdown.getAndSet(true)) {
            if (this.closeFuture != null) {
                return this.closeFuture;
            }
            return Futures.immediateFuture(null);
        }
        this.shutdownStartTime = System.currentTimeMillis();
        ThreadFactory threadFactory = ThreadFactoryUtil.create("persistence-shutdown-%d");
        final ListeningScheduledExecutorService executorService = MoreExecutors.listeningDecorator((ScheduledExecutorService)Executors.newSingleThreadScheduledExecutor(threadFactory));
        this.closeFuture = executorService.schedule(() -> {
            if (finalTask != null) {
                Futures.allAsList(this.submitToAllBucketsParallel(finalTask, true)).get();
            } else {
                Futures.allAsList(this.submitToAllBucketsParallel(bucketIndex -> null, true)).get();
            }
            return null;
        }, this.singleWriterServiceImpl.getShutdownGracePeriod() + 50L, TimeUnit.MILLISECONDS);
        Futures.addCallback(this.closeFuture, (FutureCallback)new FutureCallback<Void>(){

            public void onSuccess(@Nullable Void aVoid) {
                executorService.shutdown();
            }

            public void onFailure(@NotNull Throwable throwable) {
                executorService.shutdown();
            }
        }, (Executor)executorService);
        return this.closeFuture;
    }

    @NotNull
    public AtomicLong getTaskCount() {
        return this.taskCount;
    }

    @VisibleForTesting
    static class TaskWithFuture<T> {
        @NotNull
        private final SettableFuture<T> future;
        @NotNull
        private final SingleWriterService.Task task;
        private final int bucketIndex;

        private TaskWithFuture(@NotNull SettableFuture<T> future, @NotNull SingleWriterService.Task task, int bucketIndex) {
            this.future = future;
            this.task = task;
            this.bucketIndex = bucketIndex;
        }

        @NotNull
        public SettableFuture getFuture() {
            return this.future;
        }

        @NotNull
        public SingleWriterService.Task getTask() {
            return this.task;
        }

        public int getBucketIndex() {
            return this.bucketIndex;
        }
    }
}

