package io.smallrye.reactive.messaging.kafka.impl;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniOnItem;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.vertx.core.Context;
import java.security.AccessController;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/impl/ReactiveKafkaProducer.class */
public class ReactiveKafkaProducer<K, V> implements KafkaProducer<K, V> {
    private final AtomicBoolean closed;
    private final String clientId;
    private Producer<K, V> producer;
    private final ExecutorService kafkaWorker;
    private final Map<String, Object> kafkaConfiguration;
    private final String channel;
    private final int closetimeout;

    public ReactiveKafkaProducer(KafkaConnectorOutgoingConfiguration kafkaConnectorOutgoingConfiguration) {
        this(getKafkaProducerConfiguration(kafkaConnectorOutgoingConfiguration), kafkaConnectorOutgoingConfiguration.getChannel(), kafkaConnectorOutgoingConfiguration.getCloseTimeout().intValue());
    }

    public String getClientId() {
        return this.clientId;
    }

    public ReactiveKafkaProducer(Map<String, Object> map, String str, int i) {
        this.closed = new AtomicBoolean(false);
        this.kafkaConfiguration = map;
        this.channel = str;
        this.closetimeout = i;
        this.clientId = map.get("client.id").toString();
        String str2 = (String) map.get("key.serializer");
        String str3 = (String) map.get("value.serializer");
        if (str3 == null) {
            throw KafkaExceptions.ex.missingValueSerializer(this.channel, this.channel);
        }
        Serializer createSerializer = createSerializer(str2);
        Serializer createSerializer2 = createSerializer(str3);
        configureSerializer(createSerializer, map, true);
        configureSerializer(createSerializer2, map, false);
        this.kafkaWorker = Executors.newSingleThreadExecutor(KafkaSendingThread::new);
        this.producer = new org.apache.kafka.clients.producer.KafkaProducer(map, createSerializer, createSerializer2);
    }

    private static <T> Serializer<T> createSerializer(String str) {
        try {
            return (Serializer) Utils.newInstance(str, Serializer.class);
        } catch (ClassNotFoundException e) {
            throw KafkaExceptions.ex.unableToCreateInstance(str, e);
        }
    }

    private static void configureSerializer(Serializer<?> serializer, Map<String, Object> map, boolean z) {
        try {
            serializer.configure(map, z);
        } catch (Exception e) {
            throw new KafkaException(e);
        }
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    public <T> Uni<T> runOnSendingThread(Function<Producer<K, V>, T> function) {
        return Uni.createFrom().item(() -> {
            return function.apply(this.producer);
        }).runSubscriptionOn(this.kafkaWorker);
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    public Uni<Void> runOnSendingThread(Consumer<Producer<K, V>> consumer) {
        return Uni.createFrom().item(() -> {
            consumer.accept(this.producer);
            return null;
        }).runSubscriptionOn(this.kafkaWorker);
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    public Uni<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
        return Uni.createFrom().emitter(uniEmitter -> {
            this.producer.send(producerRecord, (recordMetadata, exc) -> {
                if (exc == null) {
                    uniEmitter.complete(recordMetadata);
                    return;
                }
                if (producerRecord.topic() != null) {
                    KafkaLogging.log.unableToWrite(this.channel, producerRecord.topic(), exc);
                } else {
                    KafkaLogging.log.unableToWrite(this.channel, exc);
                }
                uniEmitter.fail(exc);
            });
        }).runSubscriptionOn(this.kafkaWorker);
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    public Uni<Void> flush() {
        return runOnSendingThread(producer -> {
            producer.flush();
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    public Uni<List<PartitionInfo>> partitionsFor(String str) {
        return runOnSendingThread(producer -> {
            return producer.partitionsFor(str);
        });
    }

    private static Map<String, Object> getKafkaProducerConfiguration(KafkaConnectorOutgoingConfiguration kafkaConnectorOutgoingConfiguration) {
        HashMap hashMap = new HashMap();
        JsonHelper.asJsonObject(kafkaConnectorOutgoingConfiguration.config()).forEach(entry -> {
            hashMap.put((String) entry.getKey(), entry.getValue().toString());
        });
        hashMap.put("acks", kafkaConnectorOutgoingConfiguration.getAcks());
        if (!hashMap.containsKey("bootstrap.servers")) {
            KafkaLogging.log.configServers("bootstrap.servers", kafkaConnectorOutgoingConfiguration.getBootstrapServers());
            hashMap.put("bootstrap.servers", kafkaConnectorOutgoingConfiguration.getBootstrapServers());
        }
        if (!hashMap.containsKey("key.serializer")) {
            KafkaLogging.log.keySerializerOmitted();
            hashMap.put("key.serializer", kafkaConnectorOutgoingConfiguration.getKeySerializer());
        }
        hashMap.putIfAbsent("client.id", "kafka-producer-" + kafkaConnectorOutgoingConfiguration.getChannel());
        if (!hashMap.containsKey("reconnect.backoff.max.ms")) {
            hashMap.put("reconnect.backoff.max.ms", "10000");
        }
        ConfigurationCleaner.cleanupProducerConfiguration(hashMap);
        return hashMap;
    }

    public String get(String str) {
        return (String) this.kafkaConfiguration.get(str);
    }

    @Override // io.smallrye.reactive.messaging.kafka.KafkaProducer
    public Producer<K, V> unwrap() {
        return this.producer;
    }

    public Map<String, ?> configuration() {
        return this.kafkaConfiguration;
    }

    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            int i = this.closetimeout;
            UniOnItem onItem = runOnSendingThread(producer -> {
                if (System.getSecurityManager() == null) {
                    producer.close(Duration.ofMillis(i));
                } else {
                    AccessController.doPrivileged(() -> {
                        producer.close(Duration.ofMillis(i));
                        return null;
                    });
                }
            }).onItem();
            ExecutorService executorService = this.kafkaWorker;
            Objects.requireNonNull(executorService);
            Uni invoke = onItem.invoke(executorService::shutdown);
            if (Context.isOnEventLoopThread()) {
                invoke.subscribeAsCompletionStage();
            } else {
                invoke.await().atMost(Duration.ofMillis(i * 2));
            }
        }
    }

    boolean isClosed() {
        return this.closed.get();
    }
}
