package io.smallrye.reactive.messaging.kafka.impl;

import com.github.benmanes.caffeine.cache.LocalCacheFactory;
import io.smallrye.reactive.messaging.kafka.OutgoingKafkaRecordMetadata;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.producer.KafkaWriteStream;
import io.vertx.reactivex.core.Vertx;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/impl/KafkaSink.class */
public class KafkaSink {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KafkaSink.class);
    private final KafkaWriteStream<?, ?> stream;
    private final int partition;
    private final String key;
    private final String topic;
    private final boolean waitForWriteCompletion;
    private final SubscriberBuilder<? extends Message<?>, Void> subscriber;

    public KafkaSink(Vertx vertx, Config config, String str) {
        this.stream = KafkaWriteStream.create(vertx.mo3289getDelegate(), extractProducerConfiguration(config, str).getMap());
        this.stream.exceptionHandler(th -> {
            LOGGER.error("Unable to write to Kafka", th);
        });
        this.partition = ((Integer) config.getOptionalValue("partition", Integer.class).orElse(-1)).intValue();
        this.key = (String) config.getOptionalValue(LocalCacheFactory.KEY, String.class).orElse(null);
        this.topic = getTopicOrNull(config);
        this.waitForWriteCompletion = ((Boolean) config.getOptionalValue("waitForWriteCompletion", Boolean.class).orElse(true)).booleanValue();
        if (this.topic == null) {
            LOGGER.warn("No default topic configured, only sending messages with an explicit topic set");
        }
        this.subscriber = ReactiveStreams.builder().flatMapCompletionStage(message -> {
            try {
                OutgoingKafkaRecordMetadata<?> outgoingKafkaRecordMetadata = (OutgoingKafkaRecordMetadata) message.getMetadata(OutgoingKafkaRecordMetadata.class).orElse(null);
                String topic = (outgoingKafkaRecordMetadata == null || outgoingKafkaRecordMetadata.getTopic() == null) ? this.topic : outgoingKafkaRecordMetadata.getTopic();
                if (topic == null) {
                    LOGGER.error("Ignoring message - no topic set");
                    return CompletableFuture.completedFuture(message);
                }
                ProducerRecord producerRecord = getProducerRecord(message, outgoingKafkaRecordMetadata, topic);
                LOGGER.debug("Sending message {} to Kafka topic '{}'", message, producerRecord.topic());
                CompletableFuture completableFuture = new CompletableFuture();
                Handler<AsyncResult<Void>> handler = asyncResult -> {
                    if (asyncResult.succeeded()) {
                        LOGGER.debug("Message {} sent successfully to Kafka topic '{}'", message, producerRecord.topic());
                        completableFuture.complete(message);
                    } else {
                        LOGGER.error("Message {} was not sent to Kafka topic '{}'", message, producerRecord.topic(), asyncResult.cause());
                        completableFuture.completeExceptionally(asyncResult.cause());
                    }
                };
                CompletableFuture thenApply = completableFuture.thenCompose(message -> {
                    return message.ack();
                }).thenApply(r3 -> {
                    return message;
                });
                this.stream.write(producerRecord, handler);
                return this.waitForWriteCompletion ? thenApply : CompletableFuture.completedFuture(message);
            } catch (RuntimeException e) {
                LOGGER.error("Unable to send a record to Kafka ", (Throwable) e);
                return CompletableFuture.completedFuture(message);
            }
        }).onError(th2 -> {
            LOGGER.error("Unable to dispatch message to Kafka", th2);
        }).ignore();
    }

    private ProducerRecord getProducerRecord(Message<?> message, OutgoingKafkaRecordMetadata<?> outgoingKafkaRecordMetadata, String str) {
        int partition = (outgoingKafkaRecordMetadata == null || outgoingKafkaRecordMetadata.getPartition() <= -1) ? this.partition : outgoingKafkaRecordMetadata.getPartition();
        Object key = (outgoingKafkaRecordMetadata == null || outgoingKafkaRecordMetadata.getKey() == null) ? this.key : outgoingKafkaRecordMetadata.getKey();
        long epochMilli = (outgoingKafkaRecordMetadata == null || outgoingKafkaRecordMetadata.getKey() == null) ? -1L : outgoingKafkaRecordMetadata.getTimestamp() != null ? outgoingKafkaRecordMetadata.getTimestamp().toEpochMilli() : -1L;
        return new ProducerRecord(str, partition == -1 ? null : Integer.valueOf(partition), epochMilli == -1 ? null : Long.valueOf(epochMilli), key, message.getPayload(), (outgoingKafkaRecordMetadata == null || outgoingKafkaRecordMetadata.getHeaders() == null) ? Collections.emptyList() : outgoingKafkaRecordMetadata.getHeaders());
    }

    private JsonObject extractProducerConfiguration(Config config, String str) {
        JsonObject asJsonObject = JsonHelper.asJsonObject(config);
        if (asJsonObject.containsKey(ProducerConfig.ACKS_CONFIG)) {
            asJsonObject.put(ProducerConfig.ACKS_CONFIG, asJsonObject.getValue(ProducerConfig.ACKS_CONFIG).toString());
        }
        if (!asJsonObject.containsKey("bootstrap.servers")) {
            LOGGER.info("Setting {} to {}", "bootstrap.servers", str);
            asJsonObject.put("bootstrap.servers", str);
        }
        if (!asJsonObject.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
            LOGGER.info("Key deserializer omitted, using String as default");
            asJsonObject.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        }
        asJsonObject.remove(ConnectorFactory.CHANNEL_NAME_ATTRIBUTE);
        asJsonObject.remove(ConsumerProtocol.TOPIC_KEY_NAME);
        asJsonObject.remove(ConnectorFactory.CONNECTOR_ATTRIBUTE);
        asJsonObject.remove("partition");
        asJsonObject.remove(LocalCacheFactory.KEY);
        return asJsonObject;
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSink() {
        return this.subscriber;
    }

    public void closeQuietly() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            this.stream.close(asyncResult -> {
                if (asyncResult.failed()) {
                    LOGGER.debug("An error has been caught while closing the Kafka Write Stream", asyncResult.cause());
                }
                countDownLatch.countDown();
            });
        } catch (Throwable th) {
            LOGGER.debug("An error has been caught while closing the Kafka Write Stream", th);
            countDownLatch.countDown();
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private String getTopicOrNull(Config config) {
        return (String) config.getOptionalValue(ConsumerProtocol.TOPIC_KEY_NAME, String.class).orElseGet(() -> {
            return (String) config.getOptionalValue(ConnectorFactory.CHANNEL_NAME_ATTRIBUTE, String.class).orElse(null);
        });
    }
}
