/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.fault;

import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.kafka.client.producer.KafkaProducer;
import io.vertx.mutiny.kafka.client.producer.KafkaProducerRecord;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.common.serialization.StringSerializer;

public class KafkaDeadLetterQueue
implements KafkaFailureHandler {
    private final String channel;
    private final KafkaProducer producer;
    private final String topic;

    public KafkaDeadLetterQueue(String channel, String topic, KafkaProducer producer) {
        this.channel = channel;
        this.topic = topic;
        this.producer = producer;
    }

    public static KafkaFailureHandler create(Vertx vertx, Map<String, String> kafkaConfiguration, KafkaConnectorIncomingConfiguration conf) {
        HashMap<String, String> deadQueueProducerConfig = new HashMap<String, String>(kafkaConfiguration);
        String keyDeserializer = (String)deadQueueProducerConfig.remove("key.deserializer");
        String valueDeserializer = (String)deadQueueProducerConfig.remove("value.deserializer");
        deadQueueProducerConfig.put("key.serializer", conf.getDeadLetterQueueKeySerializer().orElse(KafkaDeadLetterQueue.getMirrorSerializer(keyDeserializer)));
        deadQueueProducerConfig.put("value.serializer", conf.getDeadLetterQueueKeySerializer().orElse(KafkaDeadLetterQueue.getMirrorSerializer(valueDeserializer)));
        String deadQueueTopic = conf.getDeadLetterQueueTopic().orElse("dead-letter-topic-" + conf.getChannel());
        KafkaLogging.log.deadLetterConfig(deadQueueTopic, (String)deadQueueProducerConfig.get("key.serializer"), (String)deadQueueProducerConfig.get("value.serializer"));
        KafkaProducer producer = KafkaProducer.create(vertx, deadQueueProducerConfig);
        return new KafkaDeadLetterQueue(conf.getChannel(), deadQueueTopic, producer);
    }

    private static String getMirrorSerializer(String deserializer) {
        if (deserializer == null) {
            return StringSerializer.class.getName();
        }
        return deserializer.replace("Deserializer", "Serializer");
    }

    @Override
    public <K, V> CompletionStage<Void> handle(IncomingKafkaRecord<K, V> record, Throwable reason) {
        KafkaProducerRecord<K, V> dead = KafkaProducerRecord.create(this.topic, record.getKey(), record.getPayload());
        dead.addHeader("dead-letter-reason", reason.getMessage());
        if (reason.getCause() != null) {
            dead.addHeader("dead-letter-cause", reason.getCause().getMessage());
        }
        KafkaLogging.log.messageNackedDeadLetter(this.channel, this.topic);
        return this.producer.send(dead).subscribeAsCompletionStage();
    }
}

