package org.apache.spark.streaming.amqp;

import io.radanalytics.streaming.amqp.AMQPReceiver;
import io.vertx.core.Handler;
import io.vertx.proton.ProtonDelivery;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.proton.message.Message;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.storage.StreamBlockId;
import org.apache.spark.streaming.amqp.ReliableAMQPReceiver;
import org.apache.spark.streaming.receiver.BlockGenerator;
import org.apache.spark.streaming.receiver.BlockGeneratorListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;

/* compiled from: ReliableAMQPReceiver.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005}h\u0001B\u0001\u0003\u00015\u0011ACU3mS\u0006\u0014G.Z!N#B\u0013VmY3jm\u0016\u0014(BA\u0002\u0005\u0003\u0011\tW.\u001d9\u000b\u0005\u00151\u0011!C:ue\u0016\fW.\u001b8h\u0015\t9\u0001\"A\u0003ta\u0006\u00148N\u0003\u0002\n\u0015\u00051\u0011\r]1dQ\u0016T\u0011aC\u0001\u0004_J<7\u0001A\u000b\u0003\u001dm\u00192\u0001A\b(!\r\u0001r#G\u0007\u0002#)\u00111A\u0005\u0006\u0003\u000bMQ!\u0001F\u000b\u0002\u0019I\fG-\u00198bYf$\u0018nY:\u000b\u0003Y\t!![8\n\u0005a\t\"\u0001D!N#B\u0013VmY3jm\u0016\u0014\bC\u0001\u000e\u001c\u0019\u0001!Q\u0001\b\u0001C\u0002u\u0011\u0011\u0001V\t\u0003=\u0011\u0002\"a\b\u0012\u000e\u0003\u0001R\u0011!I\u0001\u0006g\u000e\fG.Y\u0005\u0003G\u0001\u0012qAT8uQ&tw\r\u0005\u0002 K%\u0011a\u0005\t\u0002\u0004\u0003:L\bC\u0001\t)\u0013\tI\u0013C\u0001\u000eB\u001bF\u0003f\t\\8x\u0007>tGO]8mY\u0016\u0014H*[:uK:,'\u000f\u0003\u0005,\u0001\t\u0005\t\u0015!\u0003-\u0003\u0011Awn\u001d;\u0011\u00055\u0002dBA\u0010/\u0013\ty\u0003%\u0001\u0004Qe\u0016$WMZ\u0005\u0003cI\u0012aa\u0015;sS:<'BA\u0018!\u0011!!\u0004A!A!\u0002\u0013)\u0014\u0001\u00029peR\u0004\"a\b\u001c\n\u0005]\u0002#aA%oi\"A\u0011\b\u0001B\u0001B\u0003%!(\u0001\u0005vg\u0016\u0014h.Y7f!\ry2\bL\u0005\u0003y\u0001\u0012aa\u00149uS>t\u0007\u0002\u0003 \u0001\u0005\u0003\u0005\u000b\u0011\u0002\u001e\u0002\u0011A\f7o]<pe\u0012D\u0001\u0002\u0011\u0001\u0003\u0002\u0003\u0006I\u0001L\u0001\bC\u0012$'/Z:t\u0011!\u0011\u0005A!A!\u0002\u0013\u0019\u0015\u0001E7fgN\fw-Z\"p]Z,'\u000f^3s!\u0011yBI\u0012)\n\u0005\u0015\u0003#!\u0003$v]\u000e$\u0018n\u001c82!\t9e*D\u0001I\u0015\tI%*A\u0004nKN\u001c\u0018mZ3\u000b\u0005-c\u0015A\u00029s_R|gN\u0003\u0002N\u0011\u0005!\u0011\u000f]5e\u0013\ty\u0005JA\u0004NKN\u001c\u0018mZ3\u0011\u0007}Y\u0014\u0004C\u0005S\u0001\t\u0005\t\u0015!\u0003T3\u0006a1\u000f^8sC\u001e,G*\u001a<fYB\u0011AkV\u0007\u0002+*\u0011aKB\u0001\bgR|'/Y4f\u0013\tAVK\u0001\u0007Ti>\u0014\u0018mZ3MKZ,G.\u0003\u0002S5&\u00111\f\u0018\u0002\t%\u0016\u001cW-\u001b<fe*\u0011Q\fB\u0001\te\u0016\u001cW-\u001b<fe\")q\f\u0001C\u0001A\u00061A(\u001b8jiz\"\u0002\"Y2eK\u001a<\u0007.\u001b\t\u0004E\u0002IR\"\u0001\u0002\t\u000b-r\u0006\u0019\u0001\u0017\t\u000bQr\u0006\u0019A\u001b\t\u000ber\u0006\u0019\u0001\u001e\t\u000byr\u0006\u0019\u0001\u001e\t\u000b\u0001s\u0006\u0019\u0001\u0017\t\u000b\ts\u0006\u0019A\"\t\u000bIs\u0006\u0019A*\t\u000f-\u0004!\u0019!C\u0007Y\u0006\u0001R*\u0019=Ti>\u0014X-\u0011;uK6\u0004Ho]\u000b\u0002[>\ta.H\u0001\u0004\u0011\u0019\u0001\b\u0001)A\u0007[\u0006\tR*\u0019=Ti>\u0014X-\u0011;uK6\u0004Ho\u001d\u0011\t\u0013I\u0004\u0001\u0019!a\u0001\n\u0013\u0019\u0018A\u00042m_\u000e\\w)\u001a8fe\u0006$xN]\u000b\u0002iB\u0011QO^\u0007\u00029&\u0011q\u000f\u0018\u0002\u000f\u00052|7m[$f]\u0016\u0014\u0018\r^8s\u0011%I\b\u00011AA\u0002\u0013%!0\u0001\ncY>\u001c7nR3oKJ\fGo\u001c:`I\u0015\fHCA>\u007f!\tyB0\u0003\u0002~A\t!QK\\5u\u0011\u001dy\b0!AA\u0002Q\f1\u0001\u001f\u00132\u0011\u001d\t\u0019\u0001\u0001Q!\nQ\fqB\u00197pG.<UM\\3sCR|'\u000f\t\u0005\f\u0003\u000f\u0001\u0001\u0019!a\u0001\n\u0013\tI!\u0001\beK2Lg/\u001a:z\u0005V4g-\u001a:\u0016\u0005\u0005-\u0001CBA\u0007\u0003/\tY\"\u0004\u0002\u0002\u0010)!\u0011\u0011CA\n\u0003\u001diW\u000f^1cY\u0016T1!!\u0006!\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0005\u00033\tyAA\u0006BeJ\f\u0017PQ;gM\u0016\u0014\b\u0003BA\u000f\u0003Ki!!a\b\u000b\u0007-\u000b\tCC\u0002\u0002$U\tQA^3sibLA!a\n\u0002 \tq\u0001K]8u_:$U\r\\5wKJL\bbCA\u0016\u0001\u0001\u0007\t\u0019!C\u0005\u0003[\t!\u0003Z3mSZ,'/\u001f\"vM\u001a,'o\u0018\u0013fcR\u001910a\f\t\u0013}\fI#!AA\u0002\u0005-\u0001\u0002CA\u001a\u0001\u0001\u0006K!a\u0003\u0002\u001f\u0011,G.\u001b<fef\u0014UO\u001a4fe\u0002B1\"a\u000e\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0002:\u0005\u0001\"\r\\8dW\u0012+G.\u001b<fefl\u0015\r]\u000b\u0003\u0003w\u0001\u0002\"!\u0010\u0002L\u0005=\u0013QK\u0007\u0003\u0003\u007fQA!!\u0011\u0002D\u0005Q1m\u001c8dkJ\u0014XM\u001c;\u000b\t\u0005\u0015\u0013qI\u0001\u0005kRLGN\u0003\u0002\u0002J\u0005!!.\u0019<b\u0013\u0011\ti%a\u0010\u0003#\r{gnY;se\u0016tG\u000fS1tQ6\u000b\u0007\u000fE\u0002U\u0003#J1!a\u0015V\u00055\u0019FO]3b[\ncwnY6JIB)q$a\u0016\u0002\u001c%\u0019\u0011\u0011\f\u0011\u0003\u000b\u0005\u0013(/Y=\t\u0017\u0005u\u0003\u00011AA\u0002\u0013%\u0011qL\u0001\u0015E2|7m\u001b#fY&4XM]=NCB|F%Z9\u0015\u0007m\f\t\u0007C\u0005��\u00037\n\t\u00111\u0001\u0002<!A\u0011Q\r\u0001!B\u0013\tY$A\tcY>\u001c7\u000eR3mSZ,'/_'ba\u0002B\u0011\"!\u001b\u0001\u0005\u0004%I!a\u001b\u0002\u00071|w-\u0006\u0002\u0002nA!\u0011qNA;\u001b\t\t\tHC\u0002\u0002t)\tQa\u001d7gi)LA!a\u001e\u0002r\t1Aj\\4hKJD\u0001\"a\u001f\u0001A\u0003%\u0011QN\u0001\u0005Y><\u0007\u0005C\u0004\u0002��\u0001!\t%!!\u0002\u000f=t7\u000b^1siR\t1\u0010C\u0004\u0002\u0006\u0002!\t%!!\u0002\r=t7\u000b^8q\r\u0019\tI\t\u0001\u0004\u0002\f\n)r)\u001a8fe\u0006$X\r\u001a\"m_\u000e\\\u0007*\u00198eY\u0016\u00148CBAD\u0003\u001b\u000b\u0019\nE\u0002 \u0003\u001fK1!!%!\u0005\u0019\te.\u001f*fMB\u0019Q/!&\n\u0007\u0005]EL\u0001\fCY>\u001c7nR3oKJ\fGo\u001c:MSN$XM\\3s\u0011\u001dy\u0016q\u0011C\u0001\u00037#\"!!(\u0011\t\u0005}\u0015qQ\u0007\u0002\u0001!A\u00111UAD\t\u0003\t)+A\u0005p]\u0006#G\rR1uCR)10a*\u0002,\"9\u0011\u0011VAQ\u0001\u0004!\u0013\u0001\u00023bi\u0006Dq!!,\u0002\"\u0002\u0007A%\u0001\u0005nKR\fG-\u0019;b\u0011!\t\t,a\"\u0005\u0002\u0005M\u0016aD8o\u000f\u0016tWM]1uK\ncwnY6\u0015\u0007m\f)\f\u0003\u0005\u00028\u0006=\u0006\u0019AA(\u0003\u001d\u0011Gn\\2l\u0013\u0012D\u0001\"a/\u0002\b\u0012\u0005\u0011QX\u0001\f_:\u0004Vo\u001d5CY>\u001c7\u000eF\u0003|\u0003\u007f\u000b\t\r\u0003\u0005\u00028\u0006e\u0006\u0019AA(\u0011!\t\u0019-!/A\u0002\u0005\u0015\u0017aC1se\u0006L()\u001e4gKJ\u0004D!a2\u0002LB1\u0011QBA\f\u0003\u0013\u00042AGAf\t-\ti-!1\u0002\u0002\u0003\u0005)\u0011A\u000f\u0003\u0007}#\u0013\u0007\u0003\u0005\u0002R\u0006\u001dE\u0011AAj\u0003\u001dyg.\u0012:s_J$Ra_Ak\u0003/Da!SAh\u0001\u0004a\u0003\u0002CAm\u0003\u001f\u0004\r!a7\u0002\u0013QD'o\\<bE2,\u0007\u0003BAo\u0003[tA!a8\u0002j:!\u0011\u0011]At\u001b\t\t\u0019OC\u0002\u0002f2\ta\u0001\u0010:p_Rt\u0014\"A\u0011\n\u0007\u0005-\b%A\u0004qC\u000e\\\u0017mZ3\n\t\u0005=\u0018\u0011\u001f\u0002\n)\"\u0014xn^1cY\u0016T1!a;!\u0011\u001d\t)\u0010\u0001C!\u0003o\f\u0011b\u001c8BGF,\u0018N]3\u0015\u000bm\fI0!@\t\u0011\u0005m\u00181\u001fa\u0001\u00037\t\u0001\u0002Z3mSZ,'/\u001f\u0005\u0007\u0013\u0006M\b\u0019\u0001$")
/* loaded from: input_file:org/apache/spark/streaming/amqp/ReliableAMQPReceiver.class */
public class ReliableAMQPReceiver<T> extends AMQPReceiver<T> {
    public final Function1<Message, Option<T>> org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$messageConverter;
    private final int MaxStoreAttempts;
    private BlockGenerator blockGenerator;
    private ArrayBuffer<ProtonDelivery> org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$deliveryBuffer;
    private ConcurrentHashMap<StreamBlockId, ProtonDelivery[]> org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$blockDeliveryMap;
    private final Logger org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$log;

    /* compiled from: ReliableAMQPReceiver.scala */
    /* loaded from: input_file:org/apache/spark/streaming/amqp/ReliableAMQPReceiver$GeneratedBlockHandler.class */
    public final class GeneratedBlockHandler implements BlockGeneratorListener {
        private final /* synthetic */ ReliableAMQPReceiver $outer;

        public void onAddData(Object obj, Object obj2) {
            this.$outer.org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$log().debug(obj.toString());
            if (Option$.MODULE$.apply(obj2).isDefined()) {
                this.$outer.org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$deliveryBuffer().$plus$eq((ProtonDelivery) obj2);
            }
        }

        public void onGenerateBlock(StreamBlockId streamBlockId) {
            this.$outer.org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$blockDeliveryMap().put(streamBlockId, (ProtonDelivery[]) this.$outer.org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$deliveryBuffer().toArray(ClassTag$.MODULE$.apply(ProtonDelivery.class)));
            this.$outer.org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$deliveryBuffer().clear();
        }

        public void onPushBlock(final StreamBlockId streamBlockId, ArrayBuffer<?> arrayBuffer) {
            int i = 0;
            boolean z = false;
            Option option = None$.MODULE$;
            while (!z && i < 3) {
                try {
                    this.$outer.store((ArrayBuffer) arrayBuffer.flatMap(new ReliableAMQPReceiver$GeneratedBlockHandler$$anonfun$onPushBlock$1(this), ArrayBuffer$.MODULE$.canBuildFrom()));
                    z = true;
                } catch (Exception e) {
                    i++;
                    option = Option$.MODULE$.apply(e);
                }
                if (z) {
                    this.$outer.context().runOnContext(new Handler<Void>(this, streamBlockId) { // from class: org.apache.spark.streaming.amqp.ReliableAMQPReceiver$GeneratedBlockHandler$$anon$1
                        private final /* synthetic */ ReliableAMQPReceiver.GeneratedBlockHandler $outer;
                        private final StreamBlockId blockId$1;

                        @Override // io.vertx.core.Handler
                        public void handle(Void r6) {
                            Predef$.MODULE$.refArrayOps(this.$outer.org$apache$spark$streaming$amqp$ReliableAMQPReceiver$GeneratedBlockHandler$$$outer().org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$blockDeliveryMap().get(this.blockId$1)).foreach(new ReliableAMQPReceiver$GeneratedBlockHandler$$anon$1$$anonfun$handle$1(this));
                            this.$outer.org$apache$spark$streaming$amqp$ReliableAMQPReceiver$GeneratedBlockHandler$$$outer().org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$blockDeliveryMap().remove(this.blockId$1);
                        }

                        {
                            if (this == null) {
                                throw null;
                            }
                            this.$outer = this;
                            this.blockId$1 = streamBlockId;
                        }
                    });
                } else {
                    this.$outer.org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$log().error(((Throwable) option.get()).getMessage(), (Throwable) option.get());
                    this.$outer.stop("Error while storing block into Spark", (Throwable) option.get());
                }
            }
        }

        public void onError(String str, Throwable th) {
            this.$outer.org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$log().error(str, th);
            this.$outer.reportError(str, th);
        }

        public /* synthetic */ ReliableAMQPReceiver org$apache$spark$streaming$amqp$ReliableAMQPReceiver$GeneratedBlockHandler$$$outer() {
            return this.$outer;
        }

        public GeneratedBlockHandler(ReliableAMQPReceiver<T> reliableAMQPReceiver) {
            if (reliableAMQPReceiver == null) {
                throw null;
            }
            this.$outer = reliableAMQPReceiver;
        }
    }

    private final int MaxStoreAttempts() {
        return 3;
    }

    private BlockGenerator blockGenerator() {
        return this.blockGenerator;
    }

    private void blockGenerator_$eq(BlockGenerator blockGenerator) {
        this.blockGenerator = blockGenerator;
    }

    public ArrayBuffer<ProtonDelivery> org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$deliveryBuffer() {
        return this.org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$deliveryBuffer;
    }

    private void org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$deliveryBuffer_$eq(ArrayBuffer<ProtonDelivery> arrayBuffer) {
        this.org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$deliveryBuffer = arrayBuffer;
    }

    public ConcurrentHashMap<StreamBlockId, ProtonDelivery[]> org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$blockDeliveryMap() {
        return this.org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$blockDeliveryMap;
    }

    private void org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$blockDeliveryMap_$eq(ConcurrentHashMap<StreamBlockId, ProtonDelivery[]> concurrentHashMap) {
        this.org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$blockDeliveryMap = concurrentHashMap;
    }

    public Logger org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$log() {
        return this.org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$log;
    }

    @Override // io.radanalytics.streaming.amqp.AMQPReceiver
    public void onStart() {
        org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$deliveryBuffer_$eq(new ArrayBuffer<>());
        org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$blockDeliveryMap_$eq(new ConcurrentHashMap<>());
        blockGenerator_$eq(supervisor().createBlockGenerator(new GeneratedBlockHandler(this)));
        blockGenerator().start();
        super.onStart();
    }

    @Override // io.radanalytics.streaming.amqp.AMQPReceiver
    public void onStop() {
        if (Option$.MODULE$.apply(blockGenerator()).isDefined() && !blockGenerator().isStopped()) {
            blockGenerator().stop();
        }
        super.onStop();
    }

    @Override // io.radanalytics.streaming.amqp.AMQPReceiver, io.radanalytics.streaming.amqp.AMQPFlowControllerListener
    public void onAcquire(ProtonDelivery protonDelivery, Message message) {
        blockGenerator().addDataWithCallback(message, protonDelivery);
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public ReliableAMQPReceiver(String str, int i, Option<String> option, Option<String> option2, String str2, Function1<Message, Option<T>> function1, StorageLevel storageLevel) {
        super(str, i, option, option2, str2, function1, storageLevel);
        this.org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$messageConverter = function1;
        this.org$apache$spark$streaming$amqp$ReliableAMQPReceiver$$log = LoggerFactory.getLogger(getClass());
    }
}
