package io.smallrye.reactive.messaging.kafka.impl;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniOnItem;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.vertx.core.Context;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/impl/ReactiveKafkaProducer.class */
public class ReactiveKafkaProducer<K, V> implements KafkaProducer<K, V> {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private Producer<K, V> producer;
    private final KafkaConnectorOutgoingConfiguration configuration;
    private final ExecutorService kafkaWorker;
    private final Map<String, Object> kafkaConfiguration;

    public ReactiveKafkaProducer(KafkaConnectorOutgoingConfiguration kafkaConnectorOutgoingConfiguration) {
        this.configuration = kafkaConnectorOutgoingConfiguration;
        this.kafkaConfiguration = getKafkaProducerConfiguration(this.configuration);
        String str = (String) this.kafkaConfiguration.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
        String str2 = (String) this.kafkaConfiguration.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
        if (str2 == null) {
            throw KafkaExceptions.ex.missingValueSerializer(kafkaConnectorOutgoingConfiguration.getChannel(), kafkaConnectorOutgoingConfiguration.getChannel());
        }
        Serializer createSerializer = createSerializer(str);
        Serializer createSerializer2 = createSerializer(str2);
        configureSerializer(createSerializer, this.kafkaConfiguration, true);
        configureSerializer(createSerializer2, this.kafkaConfiguration, false);
        this.kafkaWorker = Executors.newSingleThreadExecutor(KafkaSendingThread::new);
        this.producer = new org.apache.kafka.clients.producer.KafkaProducer(this.kafkaConfiguration, createSerializer, createSerializer2);
    }

    private static <T> Serializer<T> createSerializer(String str) {
        try {
            return (Serializer) Utils.newInstance(str, Serializer.class);
        } catch (ClassNotFoundException e) {
            throw KafkaExceptions.ex.unableToCreateInstance(str, e);
        }
    }

    private static void configureSerializer(Serializer<?> serializer, Map<String, Object> map, boolean z) {
        try {
            serializer.configure(map, z);
        } catch (Exception e) {
            throw new KafkaException(e);
        }
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    public <T> Uni<T> runOnSendingThread(Function<Producer<K, V>, T> function) {
        return Uni.createFrom().item((Supplier) () -> {
            return function.apply(this.producer);
        }).runSubscriptionOn(this.kafkaWorker);
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    public Uni<Void> runOnSendingThread(Consumer<Producer<K, V>> consumer) {
        return Uni.createFrom().item(() -> {
            consumer.accept(this.producer);
            return null;
        }).runSubscriptionOn(this.kafkaWorker);
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    public Uni<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
        return Uni.createFrom().emitter(uniEmitter -> {
            this.producer.send(producerRecord, (recordMetadata, exc) -> {
                if (exc == null) {
                    uniEmitter.complete(recordMetadata);
                    return;
                }
                if (producerRecord.topic() != null) {
                    KafkaLogging.log.unableToWrite(this.configuration.getChannel(), producerRecord.topic(), exc);
                } else {
                    KafkaLogging.log.unableToWrite(this.configuration.getChannel(), exc);
                }
                uniEmitter.fail(exc);
            });
        }).runSubscriptionOn(this.kafkaWorker);
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    public Uni<Void> flush() {
        return runOnSendingThread(producer -> {
            producer.flush();
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    public Uni<List<PartitionInfo>> partitionsFor(String str) {
        return runOnSendingThread(producer -> {
            return producer.partitionsFor(str);
        });
    }

    private Map<String, Object> getKafkaProducerConfiguration(KafkaConnectorOutgoingConfiguration kafkaConnectorOutgoingConfiguration) {
        HashMap hashMap = new HashMap();
        JsonHelper.asJsonObject(kafkaConnectorOutgoingConfiguration.config()).forEach(entry -> {
            hashMap.put(entry.getKey(), entry.getValue().toString());
        });
        hashMap.put(ProducerConfig.ACKS_CONFIG, kafkaConnectorOutgoingConfiguration.getAcks());
        if (!hashMap.containsKey("bootstrap.servers")) {
            KafkaLogging.log.configServers("bootstrap.servers", kafkaConnectorOutgoingConfiguration.getBootstrapServers());
            hashMap.put("bootstrap.servers", kafkaConnectorOutgoingConfiguration.getBootstrapServers());
        }
        if (!hashMap.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
            KafkaLogging.log.keySerializerOmitted();
            hashMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaConnectorOutgoingConfiguration.getKeySerializer());
        }
        if (!hashMap.containsKey("client.id")) {
            String str = "kafka-producer-" + kafkaConnectorOutgoingConfiguration.getChannel();
            KafkaLogging.log.setKafkaProducerClientId(str);
            hashMap.put("client.id", str);
        }
        if (!hashMap.containsKey("reconnect.backoff.max.ms")) {
            hashMap.put("reconnect.backoff.max.ms", "10000");
        }
        ConfigurationCleaner.cleanupProducerConfiguration(hashMap);
        return hashMap;
    }

    public String get(String str) {
        return (String) this.kafkaConfiguration.get(str);
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    public Producer<K, V> unwrap() {
        return this.producer;
    }

    public Map<String, ?> configuration() {
        return this.kafkaConfiguration;
    }

    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            int intValue = this.configuration.getCloseTimeout().intValue();
            UniOnItem<Void> onItem = runOnSendingThread(producer -> {
                producer.close(Duration.ofMillis(intValue));
            }).onItem();
            ExecutorService executorService = this.kafkaWorker;
            executorService.getClass();
            Uni<Void> invoke = onItem.invoke(executorService::shutdown);
            if (Context.isOnEventLoopThread()) {
                invoke.subscribeAsCompletionStage();
            } else {
                invoke.await().atMost(Duration.ofMillis(intValue * 2));
            }
        }
    }

    boolean isClosed() {
        return this.closed.get();
    }
}
