package io.quarkus.vertx.web.runtime;

import io.quarkus.vertx.web.ReactiveRoutes;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.groups.MultiSubscribe;
import io.smallrye.reactive.messaging.kafka.impl.ce.KafkaCloudEventHelper;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.Json;
import io.vertx.ext.web.RoutingContext;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/quarkus/vertx/web/runtime/MultiSseSupport.class */
public class MultiSseSupport {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.quarkus.vertx.web.runtime.MultiSseSupport$2, reason: invalid class name */
    /* loaded from: input_file:io/quarkus/vertx/web/runtime/MultiSseSupport$2.class */
    public class AnonymousClass2 implements Subscriber<Buffer> {
        Subscription upstream;
        final /* synthetic */ HttpServerResponse val$response;
        final /* synthetic */ RoutingContext val$rc;

        AnonymousClass2(HttpServerResponse httpServerResponse, RoutingContext routingContext) {
            this.val$response = httpServerResponse;
            this.val$rc = routingContext;
        }

        @Override // org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            this.upstream = subscription;
            this.upstream.request(1L);
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(Buffer buffer) {
            MultiSseSupport.initialize(this.val$response);
            this.val$response.write((HttpServerResponse) buffer, new Handler<AsyncResult<Void>>() { // from class: io.quarkus.vertx.web.runtime.MultiSseSupport.2.1
                @Override // io.vertx.core.Handler
                public void handle(AsyncResult<Void> asyncResult) {
                    MultiSseSupport.onWriteDone(AnonymousClass2.this.upstream, asyncResult, AnonymousClass2.this.val$rc);
                }
            });
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            this.val$rc.fail(th);
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            MultiSseSupport.endOfStream(this.val$response);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.quarkus.vertx.web.runtime.MultiSseSupport$3, reason: invalid class name */
    /* loaded from: input_file:io/quarkus/vertx/web/runtime/MultiSseSupport$3.class */
    public class AnonymousClass3 implements Subscriber<Buffer> {
        Subscription upstream;
        final AtomicLong count = new AtomicLong();
        final /* synthetic */ HttpServerResponse val$response;
        final /* synthetic */ RoutingContext val$rc;

        AnonymousClass3(HttpServerResponse httpServerResponse, RoutingContext routingContext) {
            this.val$response = httpServerResponse;
            this.val$rc = routingContext;
        }

        @Override // org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            this.upstream = subscription;
            this.upstream.request(1L);
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(Buffer buffer) {
            MultiSseSupport.initialize(this.val$response);
            this.val$response.write((HttpServerResponse) Buffer.buffer("data: ").appendBuffer(buffer).appendString(StringUtils.LF).appendString("id: " + this.count.getAndIncrement()).appendString("\n\n"), new Handler<AsyncResult<Void>>() { // from class: io.quarkus.vertx.web.runtime.MultiSseSupport.3.1
                @Override // io.vertx.core.Handler
                public void handle(AsyncResult<Void> asyncResult) {
                    MultiSseSupport.onWriteDone(AnonymousClass3.this.upstream, asyncResult, AnonymousClass3.this.val$rc);
                }
            });
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            this.val$rc.fail(th);
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            MultiSseSupport.endOfStream(this.val$response);
        }
    }

    private MultiSseSupport() {
    }

    public static void subscribeString(Multi<String> multi, RoutingContext routingContext) {
        subscribeBuffer(multi.map(new Function<String, Buffer>() { // from class: io.quarkus.vertx.web.runtime.MultiSseSupport.1
            @Override // java.util.function.Function
            public Buffer apply(String str) {
                return Buffer.buffer(str);
            }
        }), routingContext);
    }

    private static void initialize(HttpServerResponse httpServerResponse) {
        if (httpServerResponse.bytesWritten() == 0) {
            MultiMap headers = httpServerResponse.headers();
            if (headers.get(KafkaCloudEventHelper.KAFKA_HEADER_CONTENT_TYPE) == null) {
                headers.set(KafkaCloudEventHelper.KAFKA_HEADER_CONTENT_TYPE, "text/event-stream");
            }
            httpServerResponse.setChunked(true);
        }
    }

    private static void onWriteDone(Subscription subscription, AsyncResult<Void> asyncResult, RoutingContext routingContext) {
        if (asyncResult.failed()) {
            routingContext.fail(asyncResult.cause());
        } else {
            subscription.request(1L);
        }
    }

    public static void write(Multi<Buffer> multi, RoutingContext routingContext) {
        multi.subscribe().withSubscriber((MultiSubscribe<Buffer>) new AnonymousClass2(routingContext.response(), routingContext));
    }

    public static void subscribeBuffer(Multi<Buffer> multi, RoutingContext routingContext) {
        multi.subscribe().withSubscriber((MultiSubscribe<Buffer>) new AnonymousClass3(routingContext.response(), routingContext));
    }

    public static void subscribeMutinyBuffer(Multi<io.vertx.mutiny.core.buffer.Buffer> multi, RoutingContext routingContext) {
        subscribeBuffer(multi.map(new Function<io.vertx.mutiny.core.buffer.Buffer, Buffer>() { // from class: io.quarkus.vertx.web.runtime.MultiSseSupport.4
            @Override // java.util.function.Function
            public Buffer apply(io.vertx.mutiny.core.buffer.Buffer buffer) {
                return buffer.getDelegate();
            }
        }), routingContext);
    }

    public static void subscribeObject(Multi<Object> multi, RoutingContext routingContext) {
        final AtomicLong atomicLong = new AtomicLong();
        write(multi.map(new Function<Object, Buffer>() { // from class: io.quarkus.vertx.web.runtime.MultiSseSupport.5
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.function.Function
            public Buffer apply(Object obj) {
                if (!(obj instanceof ReactiveRoutes.ServerSentEvent)) {
                    return Buffer.buffer("data: " + Json.encodeToBuffer(obj) + "\nid: " + atomicLong.getAndIncrement() + "\n\n");
                }
                ReactiveRoutes.ServerSentEvent serverSentEvent = (ReactiveRoutes.ServerSentEvent) obj;
                return Buffer.buffer((serverSentEvent.event() == null ? "" : "event: " + serverSentEvent.event() + "\n") + "data: " + Json.encodeToBuffer(serverSentEvent.data()) + "\nid: " + (serverSentEvent.id() != -1 ? serverSentEvent.id() : atomicLong.getAndIncrement()) + "\n\n");
            }
        }), routingContext);
    }

    private static void endOfStream(HttpServerResponse httpServerResponse) {
        if (httpServerResponse.bytesWritten() == 0) {
            MultiMap headers = httpServerResponse.headers();
            if (headers.get(KafkaCloudEventHelper.KAFKA_HEADER_CONTENT_TYPE) == null) {
                headers.set(KafkaCloudEventHelper.KAFKA_HEADER_CONTENT_TYPE, "text/event-stream");
            }
        }
        httpServerResponse.end();
    }

    public static boolean isSSE(Multi<?> multi) {
        return multi instanceof SSEMulti;
    }
}
