/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.impl;

import io.grpc.Context;
import io.opentelemetry.OpenTelemetry;
import io.opentelemetry.context.Scope;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.SpanContext;
import io.opentelemetry.trace.TracingContextUtils;
import io.opentelemetry.trace.attributes.SemanticAttributes;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.UniEmitter;
import io.smallrye.reactive.messaging.TracingMetadata;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnector;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.Record;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.ConfigurationCleaner;
import io.smallrye.reactive.messaging.kafka.impl.JsonHelper;
import io.smallrye.reactive.messaging.kafka.impl.KafkaAdminHelper;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSenderProcessor;
import io.smallrye.reactive.messaging.kafka.impl.ce.KafkaCloudEventHelper;
import io.smallrye.reactive.messaging.kafka.tracing.HeaderInjectAdapter;
import io.vertx.core.AsyncResult;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.producer.KafkaWriteStream;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.kafka.admin.KafkaAdminClient;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Processor;

public class KafkaSink {
    private final KafkaWriteStream<?, ?> stream;
    private final int partition;
    private final String topic;
    private final String key;
    private final SubscriberBuilder<? extends Message<?>, Void> subscriber;
    private final long retries;
    private final KafkaConnectorOutgoingConfiguration configuration;
    private final KafkaAdminClient admin;
    private final List<Throwable> failures = new ArrayList<Throwable>();
    private final KafkaSenderProcessor processor;
    private final boolean writeAsBinaryCloudEvent;
    private final boolean writeCloudEvents;
    private final boolean mandatoryCloudEventAttributeSet;
    private final boolean isTracingEnabled;

    public KafkaSink(Vertx vertx, KafkaConnectorOutgoingConfiguration config, KafkaCDIEvents kafkaCDIEvents) {
        JsonObject kafkaConfiguration = this.extractProducerConfiguration(config);
        Map<String, Object> kafkaConfigurationMap = kafkaConfiguration.getMap();
        this.stream = KafkaWriteStream.create(vertx.getDelegate(), kafkaConfigurationMap);
        this.stream.exceptionHandler(e -> {
            if (config.getTopic().isPresent()) {
                KafkaLogging.log.unableToWrite(config.getChannel(), config.getTopic().get(), (Throwable)e);
            } else {
                KafkaLogging.log.unableToWrite(config.getChannel(), (Throwable)e);
            }
        });
        kafkaCDIEvents.producer().fire(this.stream.unwrap());
        this.partition = config.getPartition();
        this.retries = config.getRetries();
        this.topic = config.getTopic().orElseGet(config::getChannel);
        this.key = config.getKey().orElse(null);
        this.isTracingEnabled = config.getTracingEnabled();
        this.writeCloudEvents = config.getCloudEvents();
        this.writeAsBinaryCloudEvent = config.getCloudEventsMode().equalsIgnoreCase("binary");
        boolean waitForWriteCompletion = config.getWaitForWriteCompletion();
        this.configuration = config;
        boolean bl = this.mandatoryCloudEventAttributeSet = this.configuration.getCloudEventsType().isPresent() && this.configuration.getCloudEventsSource().isPresent();
        if (this.configuration.getCloudEvents().booleanValue() && this.configuration.getCloudEventsMode().equalsIgnoreCase("structured") && !this.configuration.getValueSerializer().equalsIgnoreCase(StringSerializer.class.getName())) {
            KafkaLogging.log.invalidValueSerializerForStructuredCloudEvent(this.configuration.getValueSerializer());
            throw new IllegalStateException("Invalid value serializer to write a structured Cloud Event. " + StringSerializer.class.getName() + " must be used, found: " + this.configuration.getValueSerializer());
        }
        this.admin = config.getHealthEnabled() != false && config.getHealthReadinessEnabled() != false ? KafkaAdminHelper.createAdminClient(vertx, kafkaConfigurationMap, config.getChannel(), false) : null;
        long requests = config.getMaxInflightMessages();
        if (requests <= 0L) {
            requests = Long.MAX_VALUE;
        }
        this.processor = new KafkaSenderProcessor(requests, waitForWriteCompletion, this.writeMessageToKafka());
        this.subscriber = ReactiveStreams.builder().via((Processor)this.processor).onError(f -> {
            KafkaLogging.log.unableToDispatch((Throwable)f);
            this.reportFailure((Throwable)f);
        }).ignore();
    }

    private synchronized void reportFailure(Throwable failure) {
        if (this.failures.size() == 10) {
            this.failures.remove(0);
        }
        this.failures.add(failure);
    }

    private Function<Message<?>, Uni<Void>> writeMessageToKafka() {
        return message -> {
            try {
                Optional<OutgoingKafkaRecordMetadata<?>> om = this.getOutgoingKafkaRecordMetadata((Message<?>)message);
                OutgoingKafkaRecordMetadata metadata = om.orElse(null);
                String actualTopic = metadata == null || metadata.getTopic() == null ? this.topic : metadata.getTopic();
                OutgoingCloudEventMetadata ceMetadata = message.getMetadata(OutgoingCloudEventMetadata.class).orElse(null);
                ProducerRecord<?, ?> record = this.writeCloudEvents && (ceMetadata != null || this.mandatoryCloudEventAttributeSet) ? (this.writeAsBinaryCloudEvent ? KafkaCloudEventHelper.createBinaryRecord(message, actualTopic, metadata, ceMetadata, this.configuration) : KafkaCloudEventHelper.createStructuredRecord(message, actualTopic, metadata, ceMetadata, this.configuration)) : this.getProducerRecord((Message<?>)message, metadata, actualTopic);
                KafkaLogging.log.sendingMessageToTopic((Message)message, actualTopic);
                Uni uni = Uni.createFrom().emitter(e -> this.stream.send(record, ar -> this.handleWriteResult((AsyncResult<?>)ar, (Message<?>)message, record, (UniEmitter<? super Void>)e)));
                if (this.retries > 0L) {
                    uni = uni.onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(20L)).atMost(this.retries);
                }
                return uni.onFailure().recoverWithUni(t -> {
                    KafkaLogging.log.nackingMessage((Message)message, actualTopic, (Throwable)t);
                    return Uni.createFrom().completionStage(message.nack((Throwable)t));
                });
            }
            catch (RuntimeException e2) {
                KafkaLogging.log.unableToSendRecord(e2);
                return Uni.createFrom().failure(e2);
            }
        };
    }

    private void handleWriteResult(AsyncResult<?> ar, Message<?> message, ProducerRecord<?, ?> record, UniEmitter<? super Void> emitter) {
        String actualTopic = record.topic();
        if (ar.succeeded()) {
            KafkaLogging.log.successfullyToTopic(message, actualTopic);
            message.ack().whenComplete((x, f) -> {
                if (f != null) {
                    emitter.fail((Throwable)f);
                } else {
                    emitter.complete(null);
                }
            });
        } else {
            emitter.fail(ar.cause());
        }
    }

    private Optional<OutgoingKafkaRecordMetadata<?>> getOutgoingKafkaRecordMetadata(Message<?> message) {
        return message.getMetadata(OutgoingKafkaRecordMetadata.class).map(x -> x);
    }

    private ProducerRecord<?, ?> getProducerRecord(Message<?> message, OutgoingKafkaRecordMetadata<?> om, String actualTopic) {
        int actualPartition = om == null || om.getPartition() <= -1 ? this.partition : om.getPartition();
        Object actualKey = this.getKey(message, om, this.configuration);
        long actualTimestamp = om == null || om.getTimestamp() == null ? -1L : (om.getTimestamp() != null ? om.getTimestamp().toEpochMilli() : -1L);
        Headers kafkaHeaders = om == null || om.getHeaders() == null ? new RecordHeaders() : om.getHeaders();
        this.createOutgoingTrace(message, actualTopic, actualPartition, kafkaHeaders);
        Object payload = message.getPayload();
        if (payload instanceof Record) {
            payload = ((Record)payload).value();
        }
        return new ProducerRecord(actualTopic, actualPartition == -1 ? null : Integer.valueOf(actualPartition), actualTimestamp == -1L ? null : Long.valueOf(actualTimestamp), actualKey, payload, kafkaHeaders);
    }

    private Object getKey(Message<?> message, OutgoingKafkaRecordMetadata<?> metadata, KafkaConnectorOutgoingConfiguration configuration) {
        if (metadata != null && metadata.getKey() != null) {
            return metadata.getKey();
        }
        if (message.getPayload() instanceof Record) {
            return ((Record)message.getPayload()).key();
        }
        return this.key;
    }

    private void createOutgoingTrace(Message<?> message, String topic, int partition, Headers headers) {
        if (this.isTracingEnabled) {
            Optional<TracingMetadata> tracingMetadata = TracingMetadata.fromMessage(message);
            Span.Builder spanBuilder = KafkaConnector.TRACER.spanBuilder(topic + " send").setSpanKind(Span.Kind.PRODUCER);
            if (tracingMetadata.isPresent()) {
                Context parentSpanContext = tracingMetadata.get().getPreviousContext();
                if (parentSpanContext != null) {
                    spanBuilder.setParent(parentSpanContext);
                } else {
                    spanBuilder.setNoParent();
                }
                SpanContext incomingSpan = tracingMetadata.get().getCurrentSpanContext();
                if (incomingSpan != null && incomingSpan.isValid()) {
                    spanBuilder.addLink(incomingSpan);
                }
            } else {
                spanBuilder.setNoParent();
            }
            Span span = spanBuilder.startSpan();
            Scope scope = TracingContextUtils.currentContextWith(span);
            span.setAttribute("partition", (long)partition);
            span.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "kafka");
            span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, topic);
            span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic");
            OpenTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(), headers, HeaderInjectAdapter.SETTER);
            span.end();
            scope.close();
        }
    }

    private JsonObject extractProducerConfiguration(KafkaConnectorOutgoingConfiguration config) {
        JsonObject kafkaConfiguration = JsonHelper.asJsonObject(config.config());
        kafkaConfiguration.put("acks", config.getAcks());
        if (!kafkaConfiguration.containsKey("bootstrap.servers")) {
            KafkaLogging.log.configServers("bootstrap.servers", config.getBootstrapServers());
            kafkaConfiguration.put("bootstrap.servers", config.getBootstrapServers());
        }
        if (!kafkaConfiguration.containsKey("key.serializer")) {
            KafkaLogging.log.keyDeserializerOmitted();
            kafkaConfiguration.put("key.serializer", config.getKeySerializer());
        }
        if (!kafkaConfiguration.containsKey("client.id")) {
            String id = "kafka-producer-" + config.getChannel();
            KafkaLogging.log.setKafkaProducerClientId(id);
            kafkaConfiguration.put("client.id", id);
        }
        return ConfigurationCleaner.cleanupProducerConfiguration(kafkaConfiguration);
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSink() {
        return this.subscriber;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void isAlive(HealthReport.HealthReportBuilder builder) {
        if (this.configuration.getHealthEnabled().booleanValue()) {
            ArrayList<Throwable> actualFailures;
            KafkaSink kafkaSink = this;
            synchronized (kafkaSink) {
                actualFailures = new ArrayList<Throwable>(this.failures);
            }
            if (!actualFailures.isEmpty()) {
                builder.add(this.configuration.getChannel(), false, actualFailures.stream().map(Throwable::getMessage).collect(Collectors.joining()));
            } else {
                builder.add(this.configuration.getChannel(), true);
            }
        }
    }

    public void isReady(HealthReport.HealthReportBuilder builder) {
        if (this.configuration.getHealthEnabled().booleanValue() && this.configuration.getHealthReadinessEnabled().booleanValue()) {
            try {
                Set<String> topics = this.admin.listTopics().await().atMost(Duration.ofMillis(this.configuration.getHealthReadinessTimeout()));
                if (topics.contains(this.topic)) {
                    builder.add(this.configuration.getChannel(), true);
                } else {
                    builder.add(this.configuration.getChannel(), false, "Unable to find topic " + this.topic);
                }
            }
            catch (Exception failed) {
                builder.add(this.configuration.getChannel(), false, "No response from broker for topic " + this.topic + " : " + failed);
            }
        }
    }

    public void closeQuietly() {
        if (this.processor != null) {
            this.processor.cancel();
        }
        CountDownLatch latch = new CountDownLatch(1);
        try {
            this.stream.close(ar -> {
                if (ar.failed()) {
                    KafkaLogging.log.errorWhileClosingWriteStream(ar.cause());
                }
                latch.countDown();
            });
        }
        catch (Throwable e) {
            KafkaLogging.log.errorWhileClosingWriteStream(e);
            latch.countDown();
        }
        try {
            latch.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        if (this.admin != null) {
            this.admin.closeAndAwait();
        }
    }
}

