/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.amqp;

import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonReceiver;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.proton.message.Message;
import org.apache.spark.streaming.amqp.AMQPFlowController;
import org.apache.spark.streaming.amqp.AMQPFlowControllerListener;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001\u00055d\u0001B\u0001\u0003\r5\u0011\u0011$Q'R!\"\u0013\u0018i]=oG\u001acwn^\"p]R\u0014x\u000e\u001c7fe*\u00111\u0001B\u0001\u0005C6\f\bO\u0003\u0002\u0006\r\u0005I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003\u000f!\tQa\u001d9be.T!!\u0003\u0006\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005Y\u0011aA8sO\u000e\u00011c\u0001\u0001\u000f%A\u0011q\u0002E\u0007\u0002\u0005%\u0011\u0011C\u0001\u0002\u0013\u00036\u000b\u0006K\u00127po\u000e{g\u000e\u001e:pY2,'\u000f\u0005\u0002\u001415\tAC\u0003\u0002\u0016-\u0005!A.\u00198h\u0015\u00059\u0012\u0001\u00026bm\u0006L!!\u0007\u000b\u0003\u0011I+hN\\1cY\u0016D\u0001b\u0007\u0001\u0003\u0002\u0003\u0006I\u0001H\u0001\te\u0016\u001cW-\u001b<feB\u0011Q\u0004J\u0007\u0002=)\u0011q\u0004I\u0001\u0007aJ|Go\u001c8\u000b\u0005\u0005\u0012\u0013!\u0002<feRD(\"A\u0012\u0002\u0005%|\u0017BA\u0013\u001f\u00059\u0001&o\u001c;p]J+7-Z5wKJD\u0001b\n\u0001\u0003\u0002\u0003\u0006I\u0001K\u0001\tY&\u001cH/\u001a8feB\u0011q\"K\u0005\u0003U\t\u0011!$Q'R!\u001acwn^\"p]R\u0014x\u000e\u001c7fe2K7\u000f^3oKJD\u0001\u0002\f\u0001\u0003\u0002\u0003\u0006I!L\u0001\b[\u0006D(+\u0019;f!\t\u0019b&\u0003\u00020)\t!Aj\u001c8h\u0011\u0015\t\u0004\u0001\"\u00013\u0003\u0019a\u0014N\\5u}Q!1\u0007N\u001b7!\ty\u0001\u0001C\u0003\u001ca\u0001\u0007A\u0004C\u0003(a\u0001\u0007\u0001\u0006C\u0003-a\u0001\u0007Q\u0006C\u00049\u0001\t\u0007IQB\u001d\u0002/5Kgn\u0015;bE2,\u0017J\u001c;feZ\fG.T5de>\u001cX#\u0001\u001e\u0010\u0003m\u0002\u0003\u0002Q-\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0005\u0007{\u0001\u0001\u000bQ\u0002\u001e\u000215Kgn\u0015;bE2,\u0017J\u001c;feZ\fG.T5de>\u001c\b\u0005C\u0004@\u0001\u0001\u0007I\u0011\u0001!\u0002\u000bE,X-^3\u0016\u0003\u0005\u00032AQ$J\u001b\u0005\u0019%B\u0001#F\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0003\rZ\tA!\u001e;jY&\u0011\u0001j\u0011\u0002\u000e\u00052|7m[5oOF+X-^3\u0011\t)kuJU\u0007\u0002\u0017*\tA*A\u0003tG\u0006d\u0017-\u0003\u0002O\u0017\n1A+\u001e9mKJ\u0002\"!\b)\n\u0005Es\"A\u0004)s_R|g\u000eR3mSZ,'/\u001f\t\u0003'fk\u0011\u0001\u0016\u0006\u0003+Z\u000bq!\\3tg\u0006<WM\u0003\u0002 /*\u0011\u0001\fC\u0001\u0005cBLG-\u0003\u0002[)\n9Q*Z:tC\u001e,\u0007b\u0002/\u0001\u0001\u0004%\t!X\u0001\ncV,W/Z0%KF$\"AX1\u0011\u0005){\u0016B\u00011L\u0005\u0011)f.\u001b;\t\u000f\t\\\u0016\u0011!a\u0001\u0003\u0006\u0019\u0001\u0010J\u0019\t\r\u0011\u0004\u0001\u0015)\u0003B\u0003\u0019\tX/Z;fA!9a\r\u0001a\u0001\n\u00039\u0017\u0001\u00027bgR,\u0012!\f\u0005\bS\u0002\u0001\r\u0011\"\u0001k\u0003!a\u0017m\u001d;`I\u0015\fHC\u00010l\u0011\u001d\u0011\u0007.!AA\u00025Ba!\u001c\u0001!B\u0013i\u0013!\u00027bgR\u0004\u0003bB8\u0001\u0005\u0004%\t\u0001]\u0001\u0011a\u0016\u0014X.\u001b;t!\u0016\u00148+Z2p]\u0012,\u0012!\u001d\t\u0003\u0015JL!a]&\u0003\r\u0011{WO\u00197f\u0011\u0019)\b\u0001)A\u0005c\u0006\t\u0002/\u001a:nSR\u001c\b+\u001a:TK\u000e|g\u000e\u001a\u0011\t\u000f]\u0004!\u0019!C\u0001a\u0006!2\u000f^1cY\u0016Le\u000e^3sm\u0006dW*[2s_NDa!\u001f\u0001!\u0002\u0013\t\u0018!F:uC\ndW-\u00138uKJ4\u0018\r\\'jGJ|7\u000f\t\u0005\bw\u0002\u0011\r\u0011\"\u0003}\u0003a\u00198\r[3ek2,G-\u0012=fGV$xN]*feZL7-Z\u000b\u0002{B\u0011!I`\u0005\u0003\u007f\u000e\u0013\u0001dU2iK\u0012,H.\u001a3Fq\u0016\u001cW\u000f^8s'\u0016\u0014h/[2f\u0011\u001d\t\u0019\u0001\u0001Q\u0001\nu\f\u0011d]2iK\u0012,H.\u001a3Fq\u0016\u001cW\u000f^8s'\u0016\u0014h/[2fA!I\u0011q\u0001\u0001A\u0002\u0013%\u0011\u0011B\u0001\ng\u000eDW\rZ;mK\u0012,\"!a\u0003\u0011\u000b)\u000bi!!\u0005\n\u0007\u0005=1J\u0001\u0004PaRLwN\u001c\u0019\u0005\u0003'\ti\u0002E\u0003C\u0003+\tI\"C\u0002\u0002\u0018\r\u0013qbU2iK\u0012,H.\u001a3GkR,(/\u001a\t\u0005\u00037\ti\u0002\u0004\u0001\u0005\u0019\u0005}\u0011\u0011EA\u0001\u0002\u0003\u0015\t!a\f\u0003\u0007}#\u0013\u0007\u0003\u0005\u0002$\u0001\u0001\u000b\u0015BA\u0013\u0003)\u00198\r[3ek2,G\r\t\t\u0006\u0015\u00065\u0011q\u0005\u0019\u0005\u0003S\ti\u0003E\u0003C\u0003+\tY\u0003\u0005\u0003\u0002\u001c\u00055B\u0001DA\u0010\u0003C\t\t\u0011!A\u0003\u0002\u0005=\u0012\u0003BA\u0019\u0003o\u00012ASA\u001a\u0013\r\t)d\u0013\u0002\b\u001d>$\b.\u001b8h!\rQ\u0015\u0011H\u0005\u0004\u0003wY%aA!os\"I\u0011q\b\u0001A\u0002\u0013%\u0011\u0011I\u0001\u000eg\u000eDW\rZ;mK\u0012|F%Z9\u0015\u0007y\u000b\u0019\u0005C\u0005c\u0003{\t\t\u00111\u0001\u0002FA)!*!\u0004\u0002HA\"\u0011\u0011JA'!\u0015\u0011\u0015QCA&!\u0011\tY\"!\u0014\u0005\u0019\u0005}\u0011\u0011EA\u0001\u0002\u0003\u0015\t!a\f\t\u000f\u0005E\u0003\u0001\"\u0011\u0002T\u0005Q!-\u001a4pe\u0016|\u0005/\u001a8\u0015\u0003yCq!a\u0016\u0001\t\u0003\n\u0019&A\u0006cK\u001a|'/Z\"m_N,\u0007bBA.\u0001\u0011\u0005\u0013QL\u0001\bC\u000e\fX/\u001b:f)\u0015q\u0016qLA2\u0011\u001d\t\t'!\u0017A\u0002=\u000b\u0001\u0002Z3mSZ,'/\u001f\u0005\u0007+\u0006e\u0003\u0019\u0001*\t\u000f\u0005\u001d\u0004\u0001\"\u0003\u0002T\u0005i1o\u00195fIVdW\rV5nKJDq!a\u001b\u0001\t\u0003\n\u0019&A\u0002sk:\u0004")
public final class AMQPHrAsyncFlowController
extends AMQPFlowController
implements Runnable {
    private final double MinStableIntervalMicros;
    private BlockingQueue<Tuple2<ProtonDelivery, Message>> queue = new LinkedBlockingQueue<Tuple2<ProtonDelivery, Message>>();
    private Long last = Predef$.MODULE$.long2Long(0L);
    private final double permitsPerSecond;
    private final double stableIntervalMicros;
    private final ScheduledExecutorService scheduledExecutorService;
    private Option<ScheduledFuture<?>> scheduled;

    private final double MinStableIntervalMicros() {
        return 100.0;
    }

    public BlockingQueue<Tuple2<ProtonDelivery, Message>> queue() {
        return this.queue;
    }

    public void queue_$eq(BlockingQueue<Tuple2<ProtonDelivery, Message>> x$1) {
        this.queue = x$1;
    }

    public Long last() {
        return this.last;
    }

    public void last_$eq(Long x$1) {
        this.last = x$1;
    }

    public double permitsPerSecond() {
        return this.permitsPerSecond;
    }

    public double stableIntervalMicros() {
        return this.stableIntervalMicros;
    }

    private ScheduledExecutorService scheduledExecutorService() {
        return this.scheduledExecutorService;
    }

    private Option<ScheduledFuture<?>> scheduled() {
        return this.scheduled;
    }

    private void scheduled_$eq(Option<ScheduledFuture<?>> x$1) {
        this.scheduled = x$1;
    }

    @Override
    public void beforeOpen() {
        this.last_$eq(Predef$.MODULE$.long2Long(0L));
        this.queue().clear();
        this.logInfo((Function0<String>)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ AMQPHrAsyncFlowController $outer;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"permitsPerSecond ", ", stableIntervalMicros ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToDouble((double)this.$outer.permitsPerSecond()), BoxesRunTime.boxToDouble((double)this.$outer.stableIntervalMicros())}));
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
    }

    @Override
    public void beforeClose() {
        this.scheduledExecutorService().shutdown();
        this.scheduledExecutorService().awaitTermination(1L, TimeUnit.SECONDS);
        this.scheduled_$eq((Option<ScheduledFuture<?>>)None$.MODULE$);
    }

    @Override
    public void acquire(ProtonDelivery delivery, Message message) {
        Long now = Predef$.MODULE$.long2Long(TimeUnit.NANOSECONDS.toMicros(System.nanoTime()));
        if (this.queue().isEmpty() && this.scheduled().isDefined()) {
            ((Future)this.scheduled().get()).cancel(false);
            this.scheduled_$eq((Option<ScheduledFuture<?>>)None$.MODULE$);
            this.logInfo((Function0<String>)new Serializable(this){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Timer cancelled"})).s((Seq)Nil$.MODULE$);
                }
            });
        }
        if ((double)Predef$.MODULE$.Long2long(now) > (double)Predef$.MODULE$.Long2long(this.last()) + this.stableIntervalMicros() && this.queue().isEmpty()) {
            this.last_$eq(Predef$.MODULE$.long2Long(TimeUnit.NANOSECONDS.toMicros(System.nanoTime())));
            super.acquire(delivery, message);
        } else {
            this.logDebug((Function0<String>)new Serializable(this, delivery){
                public static final long serialVersionUID = 0L;
                private final ProtonDelivery delivery$3;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"--> Enqueue delivery tag [", "]"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{new String(this.delivery$3.getTag())}));
                }
                {
                    this.delivery$3 = delivery$3;
                }
            });
            this.queue().put((Tuple2<ProtonDelivery, Message>)new Tuple2((Object)delivery, (Object)message));
            this.scheduleTimer();
        }
    }

    private void scheduleTimer() {
        if (this.scheduled().isEmpty()) {
            this.logDebug((Function0<String>)new Serializable(this){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ AMQPHrAsyncFlowController $outer;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Timer scheduled every ", " us"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)((long)this.$outer.stableIntervalMicros()))}));
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                }
            });
            this.scheduled_$eq(Option$.MODULE$.apply(this.scheduledExecutorService().scheduleWithFixedDelay(this, (long)this.stableIntervalMicros(), (long)this.stableIntervalMicros(), TimeUnit.MICROSECONDS)));
        }
    }

    @Override
    public void run() {
        if (!this.queue().isEmpty()) {
            Tuple2<ProtonDelivery, Message> t = this.queue().take();
            this.logDebug((Function0<String>)new Serializable(this, t){
                public static final long serialVersionUID = 0L;
                private final Tuple2 t$2;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"<-- Dequeue delivery tag [", "]"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{new String(((ProtonDelivery)this.t$2._1()).getTag())}));
                }
                {
                    this.t$2 = t$2;
                }
            });
            this.last_$eq(Predef$.MODULE$.long2Long(TimeUnit.NANOSECONDS.toMicros(System.nanoTime())));
            super.acquire((ProtonDelivery)t._1(), (Message)t._2());
        }
    }

    public AMQPHrAsyncFlowController(ProtonReceiver receiver, AMQPFlowControllerListener listener, Long maxRate) {
        super(receiver, listener);
        this.permitsPerSecond = Predef$.MODULE$.Long2long(maxRate);
        this.stableIntervalMicros = (double)TimeUnit.SECONDS.toMicros(1L) < this.permitsPerSecond() ? 100.0 : (double)TimeUnit.SECONDS.toMicros(1L) / this.permitsPerSecond();
        this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
        this.scheduled = None$.MODULE$;
    }
}

