/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.impl;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaProducer;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.ConfigurationCleaner;
import io.smallrye.reactive.messaging.kafka.impl.JsonHelper;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSendingThread;
import io.vertx.core.Context;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;

public class ReactiveKafkaProducer<K, V>
implements KafkaProducer<K, V> {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private Producer<K, V> producer;
    private final KafkaConnectorOutgoingConfiguration configuration;
    private final ExecutorService kafkaWorker;
    private final Map<String, Object> kafkaConfiguration;

    public ReactiveKafkaProducer(KafkaConnectorOutgoingConfiguration config) {
        this.configuration = config;
        this.kafkaConfiguration = this.getKafkaProducerConfiguration(this.configuration);
        String keySerializerCN = (String)this.kafkaConfiguration.get("key.serializer");
        String valueSerializerCN = (String)this.kafkaConfiguration.get("value.serializer");
        if (valueSerializerCN == null) {
            throw KafkaExceptions.ex.missingValueSerializer(config.getChannel(), config.getChannel());
        }
        Serializer keySerializer = ReactiveKafkaProducer.createSerializer(keySerializerCN);
        Serializer valueSerializer = ReactiveKafkaProducer.createSerializer(valueSerializerCN);
        ReactiveKafkaProducer.configureSerializer(keySerializer, this.kafkaConfiguration, true);
        ReactiveKafkaProducer.configureSerializer(valueSerializer, this.kafkaConfiguration, false);
        this.kafkaWorker = Executors.newSingleThreadExecutor(KafkaSendingThread::new);
        this.producer = new org.apache.kafka.clients.producer.KafkaProducer(this.kafkaConfiguration, keySerializer, valueSerializer);
    }

    private static <T> Serializer<T> createSerializer(String clazz) {
        try {
            return Utils.newInstance(clazz, Serializer.class);
        }
        catch (ClassNotFoundException e) {
            throw KafkaExceptions.ex.unableToCreateInstance(clazz, e);
        }
    }

    private static void configureSerializer(Serializer<?> serializer, Map<String, Object> config, boolean isKey) {
        try {
            serializer.configure(config, isKey);
        }
        catch (Exception e) {
            throw new KafkaException(e);
        }
    }

    @Override
    public <T> Uni<T> runOnSendingThread(Function<Producer<K, V>, T> action) {
        return Uni.createFrom().item(() -> action.apply(this.producer)).runSubscriptionOn(this.kafkaWorker);
    }

    @Override
    public Uni<Void> runOnSendingThread(Consumer<Producer<K, V>> action) {
        return Uni.createFrom().item(() -> {
            action.accept(this.producer);
            return null;
        }).runSubscriptionOn(this.kafkaWorker);
    }

    @Override
    public Uni<RecordMetadata> send(ProducerRecord<K, V> record) {
        return Uni.createFrom().emitter(em -> this.producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                if (record.topic() != null) {
                    KafkaLogging.log.unableToWrite(this.configuration.getChannel(), record.topic(), exception);
                } else {
                    KafkaLogging.log.unableToWrite(this.configuration.getChannel(), exception);
                }
                em.fail(exception);
            } else {
                em.complete(metadata);
            }
        })).runSubscriptionOn(this.kafkaWorker);
    }

    @Override
    public Uni<Void> flush() {
        return this.runOnSendingThread((Producer<K, V> producer) -> producer.flush());
    }

    @Override
    public Uni<List<PartitionInfo>> partitionsFor(String topic) {
        return this.runOnSendingThread((Function)producer -> producer.partitionsFor(topic));
    }

    private Map<String, Object> getKafkaProducerConfiguration(KafkaConnectorOutgoingConfiguration configuration) {
        HashMap<String, Object> map = new HashMap<String, Object>();
        JsonHelper.asJsonObject(configuration.config()).forEach(e -> map.put((String)e.getKey(), e.getValue().toString()));
        map.put("acks", configuration.getAcks());
        if (!map.containsKey("bootstrap.servers")) {
            KafkaLogging.log.configServers("bootstrap.servers", configuration.getBootstrapServers());
            map.put("bootstrap.servers", configuration.getBootstrapServers());
        }
        if (!map.containsKey("key.serializer")) {
            KafkaLogging.log.keySerializerOmitted();
            map.put("key.serializer", configuration.getKeySerializer());
        }
        if (!map.containsKey("client.id")) {
            String id = "kafka-producer-" + configuration.getChannel();
            KafkaLogging.log.setKafkaProducerClientId(id);
            map.put("client.id", id);
        }
        if (!map.containsKey("reconnect.backoff.max.ms")) {
            map.put("reconnect.backoff.max.ms", "10000");
        }
        ConfigurationCleaner.cleanupProducerConfiguration(map);
        return map;
    }

    public String get(String attribute) {
        return (String)this.kafkaConfiguration.get(attribute);
    }

    @Override
    public Producer<K, V> unwrap() {
        return this.producer;
    }

    public Map<String, ?> configuration() {
        return this.kafkaConfiguration;
    }

    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            int timeout = this.configuration.getCloseTimeout();
            Uni<Void> uni = this.runOnSendingThread((Producer<K, V> p) -> p.close(Duration.ofMillis(timeout))).onItem().invoke(this.kafkaWorker::shutdown);
            if (Context.isOnEventLoopThread()) {
                uni.subscribeAsCompletionStage();
            } else {
                uni.await().atMost(Duration.ofMillis((long)timeout * 2L));
            }
        }
    }

    boolean isClosed() {
        return this.closed.get();
    }
}

