package io.smallrye.reactive.messaging.kafka.fault;

import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaCDIEvents;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.ConfigurationCleaner;
import io.smallrye.reactive.messaging.kafka.impl.KafkaSource;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.buffer.Buffer;
import io.vertx.mutiny.kafka.client.producer.KafkaHeader;
import io.vertx.mutiny.kafka.client.producer.KafkaProducer;
import io.vertx.mutiny.kafka.client.producer.KafkaProducerRecord;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/fault/KafkaDeadLetterQueue.class */
public class KafkaDeadLetterQueue implements KafkaFailureHandler {
    public static final String DEAD_LETTER_REASON = "dead-letter-reason";
    public static final String DEAD_LETTER_CAUSE = "dead-letter-cause";
    public static final String DEAD_LETTER_TOPIC = "dead-letter-topic";
    public static final String DEAD_LETTER_OFFSET = "dead-letter-offset";
    public static final String DEAD_LETTER_PARTITION = "dead-letter-partition";
    private final String channel;
    private final KafkaProducer producer;
    private final String topic;
    private final KafkaSource<?, ?> source;

    public KafkaDeadLetterQueue(String str, String str2, KafkaProducer kafkaProducer, KafkaSource<?, ?> kafkaSource) {
        this.channel = str;
        this.topic = str2;
        this.producer = kafkaProducer;
        this.source = kafkaSource;
    }

    public static KafkaFailureHandler create(Vertx vertx, Map<String, String> map, KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, KafkaSource<?, ?> kafkaSource, KafkaCDIEvents kafkaCDIEvents) {
        HashMap hashMap = new HashMap(map);
        String str = (String) hashMap.remove(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        String str2 = (String) hashMap.remove(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
        hashMap.remove("interceptor.classes");
        hashMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaConnectorIncomingConfiguration.getDeadLetterQueueKeySerializer().orElse(getMirrorSerializer(str)));
        hashMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaConnectorIncomingConfiguration.getDeadLetterQueueValueSerializer().orElse(getMirrorSerializer(str2)));
        hashMap.put("client.id", "kafka-dead-letter-topic-producer-" + kafkaConnectorIncomingConfiguration.getChannel());
        ConfigurationCleaner.cleanupProducerConfiguration(hashMap);
        String orElse = kafkaConnectorIncomingConfiguration.getDeadLetterQueueTopic().orElse("dead-letter-topic-" + kafkaConnectorIncomingConfiguration.getChannel());
        KafkaLogging.log.deadLetterConfig(orElse, (String) hashMap.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), (String) hashMap.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
        KafkaProducer create = KafkaProducer.create(vertx, hashMap);
        kafkaCDIEvents.producer().fire(create.mo3302getDelegate().unwrap());
        return new KafkaDeadLetterQueue(kafkaConnectorIncomingConfiguration.getChannel(), orElse, create, kafkaSource);
    }

    private static String getMirrorSerializer(String str) {
        return str == null ? StringSerializer.class.getName() : str.replace("Deserializer", "Serializer");
    }

    private String getThrowableMessage(Throwable th) {
        String message = th.getMessage();
        if (message == null) {
            message = th.toString();
        }
        return message;
    }

    @Override // io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler
    public <K, V> CompletionStage<Void> handle(IncomingKafkaRecord<K, V> incomingKafkaRecord, Throwable th) {
        KafkaProducerRecord<K, V> create = KafkaProducerRecord.create(this.topic, incomingKafkaRecord.getKey(), incomingKafkaRecord.getPayload());
        create.addHeader(DEAD_LETTER_REASON, getThrowableMessage(th));
        if (th.getCause() != null) {
            create.addHeader(DEAD_LETTER_CAUSE, getThrowableMessage(th.getCause()));
        }
        create.addHeader(DEAD_LETTER_TOPIC, incomingKafkaRecord.getTopic());
        create.addHeader(DEAD_LETTER_PARTITION, Integer.toString(incomingKafkaRecord.getPartition()));
        create.addHeader(DEAD_LETTER_OFFSET, Long.toString(incomingKafkaRecord.getOffset()));
        incomingKafkaRecord.getHeaders().forEach(header -> {
            create.addHeader(KafkaHeader.header(header.key(), Buffer.buffer(header.value())));
        });
        KafkaLogging.log.messageNackedDeadLetter(this.channel, this.topic);
        return this.producer.send(create).onFailure().invoke(obj -> {
            this.source.reportFailure((Throwable) obj, true);
        }).onItem().ignore().andContinueWithNull().subscribeAsCompletionStage().thenCompose(obj2 -> {
            return incomingKafkaRecord.ack();
        });
    }

    @Override // io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler
    public void terminate() {
        this.producer.closeAndAwait();
    }
}
