package io.smallrye.reactive.messaging.kafka.fault;

import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSource;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.kafka.client.producer.KafkaProducer;
import io.vertx.mutiny.kafka.client.producer.KafkaProducerRecord;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.common.serialization.StringSerializer;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.class */
public class KafkaDeadLetterQueue implements KafkaFailureHandler {
    private final String channel;
    private final KafkaProducer producer;
    private final String topic;
    private final KafkaSource<?, ?> source;

    public KafkaDeadLetterQueue(String str, String str2, KafkaProducer kafkaProducer, KafkaSource<?, ?> kafkaSource) {
        this.channel = str;
        this.topic = str2;
        this.producer = kafkaProducer;
        this.source = kafkaSource;
    }

    public static KafkaFailureHandler create(Vertx vertx, Map<String, String> map, KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, KafkaSource<?, ?> kafkaSource) {
        HashMap hashMap = new HashMap(map);
        String str = (String) hashMap.remove("key.deserializer");
        String str2 = (String) hashMap.remove("value.deserializer");
        hashMap.put("key.serializer", kafkaConnectorIncomingConfiguration.getDeadLetterQueueKeySerializer().orElse(getMirrorSerializer(str)));
        hashMap.put("value.serializer", kafkaConnectorIncomingConfiguration.getDeadLetterQueueValueSerializer().orElse(getMirrorSerializer(str2)));
        String orElse = kafkaConnectorIncomingConfiguration.getDeadLetterQueueTopic().orElse("dead-letter-topic-" + kafkaConnectorIncomingConfiguration.getChannel());
        KafkaLogging.log.deadLetterConfig(orElse, (String) hashMap.get("key.serializer"), (String) hashMap.get("value.serializer"));
        return new KafkaDeadLetterQueue(kafkaConnectorIncomingConfiguration.getChannel(), orElse, KafkaProducer.create(vertx, hashMap), kafkaSource);
    }

    private static String getMirrorSerializer(String str) {
        return str == null ? StringSerializer.class.getName() : str.replace("Deserializer", "Serializer");
    }

    @Override // io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler
    public <K, V> CompletionStage<Void> handle(IncomingKafkaRecord<K, V> incomingKafkaRecord, Throwable th) {
        KafkaProducerRecord create = KafkaProducerRecord.create(this.topic, incomingKafkaRecord.getKey(), incomingKafkaRecord.getPayload());
        create.addHeader("dead-letter-reason", th.getMessage());
        if (th.getCause() != null) {
            create.addHeader("dead-letter-cause", th.getCause().getMessage());
        }
        KafkaLogging.log.messageNackedDeadLetter(this.channel, this.topic);
        return this.producer.send(create).onFailure().invoke(obj -> {
            this.source.reportFailure((Throwable) obj);
        }).onItem().ignore().andContinueWithNull().subscribeAsCompletionStage().thenCompose(obj2 -> {
            return incomingKafkaRecord.ack();
        });
    }
}
