/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.impl;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.subscription.UniEmitter;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.JsonHelper;
import io.vertx.core.AsyncResult;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.producer.KafkaWriteStream;
import io.vertx.mutiny.core.Vertx;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class KafkaSink {
    private final KafkaWriteStream<?, ?> stream;
    private final int partition;
    private final String key;
    private final String topic;
    private final SubscriberBuilder<? extends Message<?>, Void> subscriber;
    private final long retries;

    public KafkaSink(Vertx vertx, KafkaConnectorOutgoingConfiguration config) {
        JsonObject kafkaConfiguration = this.extractProducerConfiguration(config);
        this.stream = KafkaWriteStream.create(vertx.getDelegate(), kafkaConfiguration.getMap());
        this.stream.exceptionHandler(KafkaLogging.log::unableToWrite);
        this.partition = config.getPartition();
        this.retries = config.getRetries();
        this.key = config.getKey().orElse(null);
        this.topic = config.getTopic().orElseGet(config::getChannel);
        boolean waitForWriteCompletion = config.getWaitForWriteCompletion();
        int maxInflight = config.getMaxInflightMessages();
        if (maxInflight == 5) {
            maxInflight = config.config().getOptionalValue("max.block.ms", Integer.class).orElse(5);
        }
        int inflight = maxInflight;
        this.subscriber = ReactiveStreams.builder().via((Processor)new KafkaSenderProcessor(inflight, waitForWriteCompletion, this.writeMessageToKafka())).onError(KafkaLogging.log::unableToDispatch).ignore();
    }

    private Function<Message<?>, Uni<Void>> writeMessageToKafka() {
        return message -> {
            try {
                String actualTopic;
                Optional<OutgoingKafkaRecordMetadata<?>> om = this.getOutgoingKafkaRecordMetadata((Message<?>)message);
                OutgoingKafkaRecordMetadata metadata = om.orElse(null);
                String string = actualTopic = metadata == null || metadata.getTopic() == null ? this.topic : metadata.getTopic();
                if (actualTopic == null) {
                    KafkaLogging.log.ignoringNoTopicSet();
                    return Uni.createFrom().item(() -> null);
                }
                ProducerRecord<?, ?> record = this.getProducerRecord((Message<?>)message, metadata, actualTopic);
                KafkaLogging.log.sendingMessageToTopic((Message)message, actualTopic);
                Uni uni = Uni.createFrom().emitter(e -> this.stream.send(record, ar -> this.handleWriteResult((AsyncResult<?>)ar, (Message<?>)message, record, (UniEmitter<? super Void>)e)));
                if (this.retries > 0L) {
                    uni = uni.onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(20L)).atMost(this.retries);
                }
                return uni.onFailure().recoverWithUni(t -> {
                    KafkaLogging.log.nackingMessage((Message)message, actualTopic, (Throwable)t);
                    return Uni.createFrom().completionStage(message.nack((Throwable)t));
                });
            }
            catch (RuntimeException e2) {
                KafkaLogging.log.unableToSendRecord(e2);
                return Uni.createFrom().failure(e2);
            }
        };
    }

    private void handleWriteResult(AsyncResult<?> ar, Message<?> message, ProducerRecord<?, ?> record, UniEmitter<? super Void> emitter) {
        String actualTopic = record.topic();
        if (ar.succeeded()) {
            KafkaLogging.log.successfullyToTopic(message, actualTopic);
            message.ack().whenComplete((x, f) -> {
                if (f != null) {
                    emitter.fail((Throwable)f);
                } else {
                    emitter.complete(null);
                }
            });
        } else {
            emitter.fail(ar.cause());
        }
    }

    private Optional<OutgoingKafkaRecordMetadata<?>> getOutgoingKafkaRecordMetadata(Message<?> message) {
        return message.getMetadata(OutgoingKafkaRecordMetadata.class).map(x -> x);
    }

    private ProducerRecord<?, ?> getProducerRecord(Message<?> message, OutgoingKafkaRecordMetadata<?> om, String actualTopic) {
        String actualKey;
        int actualPartition = om == null || om.getPartition() <= -1 ? this.partition : om.getPartition();
        String string = actualKey = om == null || om.getKey() == null ? this.key : om.getKey();
        long actualTimestamp = om == null || om.getKey() == null ? -1L : (om.getTimestamp() != null ? om.getTimestamp().toEpochMilli() : -1L);
        List<Header> kafkaHeaders = om == null || om.getHeaders() == null ? Collections.emptyList() : om.getHeaders();
        return new ProducerRecord(actualTopic, actualPartition == -1 ? null : Integer.valueOf(actualPartition), actualTimestamp == -1L ? null : Long.valueOf(actualTimestamp), actualKey, message.getPayload(), kafkaHeaders);
    }

    private JsonObject extractProducerConfiguration(KafkaConnectorOutgoingConfiguration config) {
        JsonObject kafkaConfiguration = JsonHelper.asJsonObject(config.config());
        kafkaConfiguration.put("acks", config.getAcks());
        if (!kafkaConfiguration.containsKey("bootstrap.servers")) {
            KafkaLogging.log.configServers("bootstrap.servers", config.getBootstrapServers());
            kafkaConfiguration.put("bootstrap.servers", config.getBootstrapServers());
        }
        if (!kafkaConfiguration.containsKey("key.serializer")) {
            KafkaLogging.log.keyDeserializerOmitted();
            kafkaConfiguration.put("key.serializer", config.getKeySerializer());
        }
        if (!kafkaConfiguration.containsKey("max.in.flight.requests.per.connection")) {
            kafkaConfiguration.put("max.in.flight.requests.per.connection", config.getMaxInflightMessages());
        }
        kafkaConfiguration.remove("channel-name");
        kafkaConfiguration.remove("topic");
        kafkaConfiguration.remove("connector");
        kafkaConfiguration.remove("partition");
        kafkaConfiguration.remove("key");
        kafkaConfiguration.remove("max-inflight-messages");
        return kafkaConfiguration;
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSink() {
        return this.subscriber;
    }

    public void closeQuietly() {
        CountDownLatch latch = new CountDownLatch(1);
        try {
            this.stream.close(ar -> {
                if (ar.failed()) {
                    KafkaLogging.log.errorWhileClosingWriteStream(ar.cause());
                }
                latch.countDown();
            });
        }
        catch (Throwable e) {
            KafkaLogging.log.errorWhileClosingWriteStream(e);
            latch.countDown();
        }
        try {
            latch.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private static class KafkaSenderProcessor
    implements Processor<Message<?>, Message<?>>,
    Subscription {
        private final long inflights;
        private final boolean waitForCompletion;
        private final Function<Message<?>, Uni<Void>> send;
        private final AtomicReference<Subscription> subscription = new AtomicReference();
        private final AtomicReference<Subscriber<? super Message<?>>> downstream = new AtomicReference();

        public KafkaSenderProcessor(int inflights, boolean waitForCompletion, Function<Message<?>, Uni<Void>> send) {
            this.inflights = inflights;
            this.waitForCompletion = waitForCompletion;
            this.send = send;
        }

        @Override
        public void subscribe(Subscriber<? super Message<?>> subscriber) {
            if (!this.downstream.compareAndSet(null, subscriber)) {
                Subscriptions.fail(subscriber, KafkaExceptions.ex.illegalStateOnlyOneSubscriber());
            } else if (this.subscription.get() != null) {
                subscriber.onSubscribe(this);
            }
        }

        @Override
        public void onSubscribe(Subscription subscription) {
            if (this.subscription.compareAndSet(null, subscription)) {
                Subscriber<Message<?>> subscriber = this.downstream.get();
                if (subscriber != null) {
                    subscriber.onSubscribe(this);
                }
            } else {
                Subscriber<Message<?>> subscriber = this.downstream.get();
                if (subscriber != null) {
                    subscriber.onSubscribe(Subscriptions.CANCELLED);
                }
            }
        }

        @Override
        public void onNext(Message<?> message) {
            if (this.waitForCompletion) {
                this.send.apply(message).subscribe().with(x -> this.requestNext(message), this::onError);
            } else {
                this.send.apply(message).subscribe().with(x -> {}, this::onError);
                this.requestNext(message);
            }
        }

        @Override
        public void request(long l) {
            if (l != Long.MAX_VALUE) {
                throw KafkaExceptions.ex.illegalStateConsumeWithoutBackPressure();
            }
            this.subscription.get().request(this.inflights);
        }

        @Override
        public void cancel() {
            Subscription s = this.subscription.getAndSet(Subscriptions.CANCELLED);
            if (s != null) {
                s.cancel();
            }
        }

        private void requestNext(Message<?> message) {
            Subscription up;
            Subscriber<Message<?>> down = this.downstream.get();
            if (down != null) {
                down.onNext(message);
            }
            if ((up = this.subscription.get()) != null) {
                up.request(1L);
            }
        }

        @Override
        public void onError(Throwable throwable) {
            Subscriber subscriber = this.downstream.getAndSet(null);
            if (subscriber != null) {
                subscriber.onError(throwable);
            }
        }

        @Override
        public void onComplete() {
            Subscriber subscriber = this.downstream.getAndSet(null);
            if (subscriber != null) {
                subscriber.onComplete();
            }
        }
    }
}

