package com.viber.bot.middleware;

import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.common.util.concurrent.AtomicDouble;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.viber.bot.Request;
import com.viber.bot.ViberEnvironmentConfiguration;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:com/viber/bot/middleware/PubSubMiddleware.class */
class PubSubMiddleware extends AbstractScheduledService implements Middleware {
    private static final Logger logger = LoggerFactory.getLogger(PubSubMiddleware.class);
    private static final int CYCLE_MINUTES = 1;
    private static final boolean SHOULD_RECOVER = true;
    private final RequestReceiver requestReceiver;
    private final ExecutorService pollingThreadPool = Executors.newSingleThreadExecutor();
    private final ListeningExecutorService ioThreadPool = ViberEnvironmentConfiguration.getExecutorService();
    private final TransferQueue<Request> transferQueue = new LinkedTransferQueue();
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private final Stats stats = new Stats();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/viber/bot/middleware/PubSubMiddleware$Stats.class */
    public static class Stats {
        private final AtomicInteger requestsPerCycle;
        private final AtomicInteger exceptionsPerCycle;
        private final AtomicDouble requestsLatency;

        private Stats() {
            this.requestsPerCycle = new AtomicInteger(0);
            this.exceptionsPerCycle = new AtomicInteger(0);
            this.requestsLatency = new AtomicDouble(0.0d);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void reset() {
            this.requestsPerCycle.set(0);
            this.exceptionsPerCycle.set(0);
            this.requestsLatency.set(0.0d);
        }

        int getRequestsPerCycle() {
            return this.requestsPerCycle.get();
        }

        int getExceptionsPerCycle() {
            return this.exceptionsPerCycle.get();
        }

        double getRequestsLatency() {
            return this.requestsLatency.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PubSubMiddleware(@Nonnull RequestReceiver requestReceiver) {
        this.requestReceiver = (RequestReceiver) Preconditions.checkNotNull(requestReceiver);
        startPolling();
        startAsync();
    }

    @Override // com.viber.bot.middleware.Middleware
    public ListenableFuture<InputStream> incoming(@Nonnull Request request) {
        try {
            this.transferQueue.transfer(request);
        } catch (InterruptedException e) {
            Throwables.throwIfUnchecked(e);
        }
        return this.ioThreadPool.submit(() -> {
            waitOnRequestToFinish(request);
            return request.getResponseInputStream();
        });
    }

    private void startPolling() {
        Preconditions.checkState(this.isRunning.compareAndSet(false, true), "already running");
        try {
            logger.info("Started polling from queue..");
            this.pollingThreadPool.execute(() -> {
                requestLoop();
            });
        } catch (Exception e) {
            logger.error("Exception from request receiver, restarting polling", e);
            restartPolling();
        }
    }

    private void restartPolling() {
        Preconditions.checkState(this.isRunning.compareAndSet(true, false), "not running");
        Preconditions.checkState(!this.pollingThreadPool.isTerminated(), "executor is already terminated, cannot reset");
        Preconditions.checkState(true, "could not restart polling, not allowed to recover");
        startPolling();
    }

    private void requestLoop() {
        Stopwatch createStarted = Stopwatch.createStarted();
        while (this.isRunning.get()) {
            createStarted.reset().start();
            acceptNextRequest();
            this.stats.requestsLatency.addAndGet(createStarted.elapsed(TimeUnit.MILLISECONDS));
        }
    }

    private void acceptNextRequest() {
        try {
            Request take = this.transferQueue.take();
            Throwable th = null;
            try {
                this.requestReceiver.acceptRequest(take);
                this.stats.requestsPerCycle.incrementAndGet();
                if (take != null) {
                    if (0 != 0) {
                        try {
                            take.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        take.close();
                    }
                }
            } finally {
            }
        } catch (Exception e) {
            logger.error("Exception during event loop", e);
            this.stats.exceptionsPerCycle.incrementAndGet();
        }
    }

    private void waitOnRequestToFinish(@Nonnull Request request) {
        synchronized (request) {
            try {
                if (!request.isClosed()) {
                    request.wait();
                }
            } catch (InterruptedException e) {
                logger.error("Interrupted during request", e);
            }
        }
    }

    protected void runOneIteration() throws Exception {
        logger.debug("Requests: {}, Exceptions: {}, Avg latency: {}s, Queue size: {}", new Object[]{Integer.valueOf(this.stats.getRequestsPerCycle()), Integer.valueOf(this.stats.getExceptionsPerCycle()), Long.valueOf(Math.round((this.stats.getRequestsLatency() / Math.max(this.stats.getRequestsPerCycle(), 1)) / 1000.0d)), Integer.valueOf(this.transferQueue.size())});
        this.stats.reset();
    }

    protected AbstractScheduledService.Scheduler scheduler() {
        return AbstractScheduledService.Scheduler.newFixedRateSchedule(0L, 1L, TimeUnit.MINUTES);
    }
}
