package io.smallrye.reactive.messaging.kafka.impl;

import io.grpc.Context;
import io.netty.handler.codec.http.HttpHeaders;
import io.opentelemetry.OpenTelemetry;
import io.opentelemetry.common.AttributeKey;
import io.opentelemetry.context.Scope;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.SpanContext;
import io.opentelemetry.trace.TracingContextUtils;
import io.opentelemetry.trace.attributes.SemanticAttributes;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.UniEmitter;
import io.smallrye.reactive.messaging.TracingMetadata;
import io.smallrye.reactive.messaging.ce.OutgoingCloudEventMetadata;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnector;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.OutgoingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.Record;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.ce.KafkaCloudEventHelper;
import io.smallrye.reactive.messaging.kafka.tracing.HeaderInjectAdapter;
import io.vertx.core.AsyncResult;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.producer.KafkaWriteStream;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.kafka.admin.KafkaAdminClient;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Processor;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/impl/KafkaSink.class */
public class KafkaSink {
    private final KafkaWriteStream<?, ?> stream;
    private final int partition;
    private final String topic;
    private final String key;
    private final SubscriberBuilder<? extends Message<?>, Void> subscriber;
    private final long retries;
    private final KafkaConnectorOutgoingConfiguration configuration;
    private final KafkaAdminClient admin;
    private final List<Throwable> failures = new ArrayList();
    private final KafkaSenderProcessor processor;
    private final boolean writeAsBinaryCloudEvent;
    private final boolean writeCloudEvents;
    private final boolean mandatoryCloudEventAttributeSet;
    private final boolean isTracingEnabled;
    private static final Set<Class<? extends Throwable>> NOT_RECOVERABLE = new HashSet(Arrays.asList(InvalidTopicException.class, OffsetMetadataTooLarge.class, RecordBatchTooLargeException.class, RecordTooLargeException.class, UnknownServerException.class, SerializationException.class));

    public KafkaSink(Vertx vertx, KafkaConnectorOutgoingConfiguration kafkaConnectorOutgoingConfiguration, KafkaCDIEvents kafkaCDIEvents) {
        Map<String, Object> map = extractProducerConfiguration(kafkaConnectorOutgoingConfiguration).getMap();
        this.stream = KafkaWriteStream.create(vertx.mo3262getDelegate(), map);
        this.stream.exceptionHandler(th -> {
            if (kafkaConnectorOutgoingConfiguration.getTopic().isPresent()) {
                KafkaLogging.log.unableToWrite(kafkaConnectorOutgoingConfiguration.getChannel(), kafkaConnectorOutgoingConfiguration.getTopic().get(), th);
            } else {
                KafkaLogging.log.unableToWrite(kafkaConnectorOutgoingConfiguration.getChannel(), th);
            }
        });
        kafkaCDIEvents.producer().fire(this.stream.unwrap());
        this.partition = kafkaConnectorOutgoingConfiguration.getPartition().intValue();
        this.retries = kafkaConnectorOutgoingConfiguration.getRetries().longValue();
        Optional<String> topic = kafkaConnectorOutgoingConfiguration.getTopic();
        kafkaConnectorOutgoingConfiguration.getClass();
        this.topic = topic.orElseGet(kafkaConnectorOutgoingConfiguration::getChannel);
        this.key = kafkaConnectorOutgoingConfiguration.getKey().orElse(null);
        this.isTracingEnabled = kafkaConnectorOutgoingConfiguration.getTracingEnabled().booleanValue();
        this.writeCloudEvents = kafkaConnectorOutgoingConfiguration.getCloudEvents().booleanValue();
        this.writeAsBinaryCloudEvent = kafkaConnectorOutgoingConfiguration.getCloudEventsMode().equalsIgnoreCase(HttpHeaders.Values.BINARY);
        boolean booleanValue = kafkaConnectorOutgoingConfiguration.getWaitForWriteCompletion().booleanValue();
        this.configuration = kafkaConnectorOutgoingConfiguration;
        this.mandatoryCloudEventAttributeSet = this.configuration.getCloudEventsType().isPresent() && this.configuration.getCloudEventsSource().isPresent();
        if (this.configuration.getCloudEvents().booleanValue() && this.configuration.getCloudEventsMode().equalsIgnoreCase("structured") && !this.configuration.getValueSerializer().equalsIgnoreCase(StringSerializer.class.getName())) {
            KafkaLogging.log.invalidValueSerializerForStructuredCloudEvent(this.configuration.getValueSerializer());
            throw new IllegalStateException("Invalid value serializer to write a structured Cloud Event. " + StringSerializer.class.getName() + " must be used, found: " + this.configuration.getValueSerializer());
        }
        if (kafkaConnectorOutgoingConfiguration.getHealthEnabled().booleanValue() && kafkaConnectorOutgoingConfiguration.getHealthReadinessEnabled().booleanValue()) {
            this.admin = KafkaAdminHelper.createAdminClient(vertx, map, kafkaConnectorOutgoingConfiguration.getChannel(), false);
        } else {
            this.admin = null;
        }
        long longValue = kafkaConnectorOutgoingConfiguration.getMaxInflightMessages().longValue();
        this.processor = new KafkaSenderProcessor(longValue <= 0 ? Long.MAX_VALUE : longValue, booleanValue, writeMessageToKafka());
        this.subscriber = ReactiveStreams.builder().via((Processor) this.processor).onError(th2 -> {
            KafkaLogging.log.unableToDispatch(th2);
            reportFailure(th2);
        }).ignore();
    }

    private synchronized void reportFailure(Throwable th) {
        if (this.failures.size() == 10) {
            this.failures.remove(0);
        }
        this.failures.add(th);
    }

    private Function<Message<?>, Uni<Void>> writeMessageToKafka() {
        return message -> {
            try {
                OutgoingKafkaRecordMetadata<?> orElse = getOutgoingKafkaRecordMetadata(message).orElse(null);
                String topic = (orElse == null || orElse.getTopic() == null) ? this.topic : orElse.getTopic();
                OutgoingCloudEventMetadata outgoingCloudEventMetadata = (OutgoingCloudEventMetadata) message.getMetadata(OutgoingCloudEventMetadata.class).orElse(null);
                ProducerRecord<?, ?> producerRecord = (!this.writeCloudEvents || (outgoingCloudEventMetadata == null && !this.mandatoryCloudEventAttributeSet)) ? getProducerRecord(message, orElse, topic) : this.writeAsBinaryCloudEvent ? KafkaCloudEventHelper.createBinaryRecord(message, topic, orElse, outgoingCloudEventMetadata, this.configuration) : KafkaCloudEventHelper.createStructuredRecord(message, topic, orElse, outgoingCloudEventMetadata, this.configuration);
                KafkaLogging.log.sendingMessageToTopic(message, topic);
                ProducerRecord<?, ?> producerRecord2 = producerRecord;
                Uni emitter = Uni.createFrom().emitter(uniEmitter -> {
                    this.stream.send(producerRecord2, asyncResult -> {
                        handleWriteResult(asyncResult, message, producerRecord2, uniEmitter);
                    });
                });
                if (this.retries > 0) {
                    emitter = emitter.onFailure(this::isRecoverable).retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(20L)).atMost(this.retries);
                }
                return emitter.onFailure().recoverWithUni(th -> {
                    KafkaLogging.log.nackingMessage(message, topic, th);
                    return Uni.createFrom().completionStage(message.nack(th));
                });
            } catch (RuntimeException e) {
                KafkaLogging.log.unableToSendRecord(e);
                return Uni.createFrom().failure(e);
            }
        };
    }

    private boolean isRecoverable(Throwable th) {
        return !NOT_RECOVERABLE.contains(th.getClass());
    }

    private void handleWriteResult(AsyncResult<?> asyncResult, Message<?> message, ProducerRecord<?, ?> producerRecord, UniEmitter<? super Void> uniEmitter) {
        String str = producerRecord.topic();
        if (!asyncResult.succeeded()) {
            uniEmitter.fail(asyncResult.cause());
        } else {
            KafkaLogging.log.successfullyToTopic(message, str);
            message.ack().whenComplete((r4, th) -> {
                if (th != null) {
                    uniEmitter.fail(th);
                } else {
                    uniEmitter.complete(null);
                }
            });
        }
    }

    private Optional<OutgoingKafkaRecordMetadata<?>> getOutgoingKafkaRecordMetadata(Message<?> message) {
        return message.getMetadata(OutgoingKafkaRecordMetadata.class).map(outgoingKafkaRecordMetadata -> {
            return outgoingKafkaRecordMetadata;
        });
    }

    private ProducerRecord<?, ?> getProducerRecord(Message<?> message, OutgoingKafkaRecordMetadata<?> outgoingKafkaRecordMetadata, String str) {
        long j;
        int partition = (outgoingKafkaRecordMetadata == null || outgoingKafkaRecordMetadata.getPartition() <= -1) ? this.partition : outgoingKafkaRecordMetadata.getPartition();
        Object key = getKey(message, outgoingKafkaRecordMetadata);
        if (outgoingKafkaRecordMetadata == null || outgoingKafkaRecordMetadata.getTimestamp() == null) {
            j = -1;
        } else {
            j = outgoingKafkaRecordMetadata.getTimestamp() != null ? outgoingKafkaRecordMetadata.getTimestamp().toEpochMilli() : -1L;
        }
        Headers recordHeaders = (outgoingKafkaRecordMetadata == null || outgoingKafkaRecordMetadata.getHeaders() == null) ? new RecordHeaders() : outgoingKafkaRecordMetadata.getHeaders();
        createOutgoingTrace(message, str, partition, recordHeaders);
        Object payload = message.getPayload();
        if (payload instanceof Record) {
            payload = ((Record) payload).value();
        }
        return new ProducerRecord<>(str, partition == -1 ? null : Integer.valueOf(partition), j == -1 ? null : Long.valueOf(j), key, payload, recordHeaders);
    }

    private Object getKey(Message<?> message, OutgoingKafkaRecordMetadata<?> outgoingKafkaRecordMetadata) {
        return (outgoingKafkaRecordMetadata == null || outgoingKafkaRecordMetadata.getKey() == null) ? message.getPayload() instanceof Record ? ((Record) message.getPayload()).key() : this.key : outgoingKafkaRecordMetadata.getKey();
    }

    private void createOutgoingTrace(Message<?> message, String str, int i, Headers headers) {
        if (this.isTracingEnabled) {
            Optional<TracingMetadata> fromMessage = TracingMetadata.fromMessage(message);
            Span.Builder spanKind = KafkaConnector.TRACER.spanBuilder(str + " send").setSpanKind(Span.Kind.PRODUCER);
            if (fromMessage.isPresent()) {
                Context previousContext = fromMessage.get().getPreviousContext();
                if (previousContext != null) {
                    spanKind.setParent(previousContext);
                } else {
                    spanKind.setNoParent();
                }
                SpanContext currentSpanContext = fromMessage.get().getCurrentSpanContext();
                if (currentSpanContext != null && currentSpanContext.isValid()) {
                    spanKind.addLink(currentSpanContext);
                }
            } else {
                spanKind.setNoParent();
            }
            Span startSpan = spanKind.startSpan();
            Scope currentContextWith = TracingContextUtils.currentContextWith(startSpan);
            startSpan.setAttribute("partition", i);
            startSpan.setAttribute((AttributeKey<AttributeKey<String>>) SemanticAttributes.MESSAGING_SYSTEM, (AttributeKey<String>) "kafka");
            startSpan.setAttribute((AttributeKey<AttributeKey<String>>) SemanticAttributes.MESSAGING_DESTINATION, (AttributeKey<String>) str);
            startSpan.setAttribute((AttributeKey<AttributeKey<String>>) SemanticAttributes.MESSAGING_DESTINATION_KIND, (AttributeKey<String>) ConsumerProtocol.TOPIC_KEY_NAME);
            OpenTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(), headers, HeaderInjectAdapter.SETTER);
            startSpan.end();
            currentContextWith.close();
        }
    }

    private JsonObject extractProducerConfiguration(KafkaConnectorOutgoingConfiguration kafkaConnectorOutgoingConfiguration) {
        JsonObject asJsonObject = JsonHelper.asJsonObject(kafkaConnectorOutgoingConfiguration.config());
        asJsonObject.put(ProducerConfig.ACKS_CONFIG, kafkaConnectorOutgoingConfiguration.getAcks());
        if (!asJsonObject.containsKey("bootstrap.servers")) {
            KafkaLogging.log.configServers("bootstrap.servers", kafkaConnectorOutgoingConfiguration.getBootstrapServers());
            asJsonObject.put("bootstrap.servers", kafkaConnectorOutgoingConfiguration.getBootstrapServers());
        }
        if (!asJsonObject.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
            KafkaLogging.log.keyDeserializerOmitted();
            asJsonObject.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaConnectorOutgoingConfiguration.getKeySerializer());
        }
        if (!asJsonObject.containsKey("client.id")) {
            String str = "kafka-producer-" + kafkaConnectorOutgoingConfiguration.getChannel();
            KafkaLogging.log.setKafkaProducerClientId(str);
            asJsonObject.put("client.id", str);
        }
        return ConfigurationCleaner.cleanupProducerConfiguration(asJsonObject);
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSink() {
        return this.subscriber;
    }

    public void isAlive(HealthReport.HealthReportBuilder healthReportBuilder) {
        ArrayList arrayList;
        if (this.configuration.getHealthEnabled().booleanValue()) {
            synchronized (this) {
                arrayList = new ArrayList(this.failures);
            }
            if (arrayList.isEmpty()) {
                healthReportBuilder.add(this.configuration.getChannel(), true);
            } else {
                healthReportBuilder.add(this.configuration.getChannel(), false, (String) arrayList.stream().map((v0) -> {
                    return v0.getMessage();
                }).collect(Collectors.joining()));
            }
        }
    }

    public void isReady(HealthReport.HealthReportBuilder healthReportBuilder) {
        if (this.configuration.getHealthEnabled().booleanValue() && this.configuration.getHealthReadinessEnabled().booleanValue()) {
            try {
                if (this.admin.listTopics().await().atMost(Duration.ofMillis(this.configuration.getHealthReadinessTimeout().longValue())).contains(this.topic)) {
                    healthReportBuilder.add(this.configuration.getChannel(), true);
                } else {
                    healthReportBuilder.add(this.configuration.getChannel(), false, "Unable to find topic " + this.topic);
                }
            } catch (Exception e) {
                healthReportBuilder.add(this.configuration.getChannel(), false, "No response from broker for topic " + this.topic + " : " + e);
            }
        }
    }

    public void closeQuietly() {
        if (this.processor != null) {
            this.processor.cancel();
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            this.stream.close(asyncResult -> {
                if (asyncResult.failed()) {
                    KafkaLogging.log.errorWhileClosingWriteStream(asyncResult.cause());
                }
                countDownLatch.countDown();
            });
        } catch (Throwable th) {
            KafkaLogging.log.errorWhileClosingWriteStream(th);
            countDownLatch.countDown();
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        if (this.admin != null) {
            this.admin.closeAndAwait();
        }
    }
}
