package org.springframework.kafka.listener;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;

/* loaded from: input_file:BOOT-INF/lib/spring-kafka-2.6.9.jar:org/springframework/kafka/listener/DeadLetterPublishingRecoverer.class */
public class DeadLetterPublishingRecoverer implements ConsumerRecordRecoverer {
    protected final LogAccessor logger;
    private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> DEFAULT_DESTINATION_RESOLVER = (consumerRecord, exc) -> {
        return new TopicPartition(consumerRecord.topic() + ".DLT", consumerRecord.partition());
    };
    private final KafkaOperations<Object, Object> template;
    private final Map<Class<?>, KafkaOperations<?, ?>> templates;
    private final boolean transactional;
    private final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver;
    private boolean retainExceptionHeader;
    private BiFunction<ConsumerRecord<?, ?>, Exception, Headers> headersFunction;

    public DeadLetterPublishingRecoverer(KafkaOperations<? extends Object, ? extends Object> kafkaOperations) {
        this(kafkaOperations, DEFAULT_DESTINATION_RESOLVER);
    }

    public DeadLetterPublishingRecoverer(KafkaOperations<? extends Object, ? extends Object> kafkaOperations, BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> biFunction) {
        this((Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>>) Collections.singletonMap(Object.class, kafkaOperations), biFunction);
    }

    @Deprecated
    public DeadLetterPublishingRecoverer(KafkaTemplate<? extends Object, ? extends Object> kafkaTemplate) {
        this(kafkaTemplate, DEFAULT_DESTINATION_RESOLVER);
    }

    @Deprecated
    public DeadLetterPublishingRecoverer(KafkaTemplate<? extends Object, ? extends Object> kafkaTemplate, BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> biFunction) {
        this((Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>>) Collections.singletonMap(Object.class, kafkaTemplate), biFunction);
    }

    public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> map) {
        this(map, DEFAULT_DESTINATION_RESOLVER);
    }

    public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> map, BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> biFunction) {
        this.logger = new LogAccessor(LogFactory.getLog(getClass()));
        this.headersFunction = (consumerRecord, exc) -> {
            return null;
        };
        Assert.isTrue(!ObjectUtils.isEmpty(map), "At least one template is required");
        Assert.notNull(biFunction, "The destinationResolver cannot be null");
        this.template = map.size() == 1 ? map.values().iterator().next() : null;
        this.templates = map;
        this.transactional = map.values().iterator().next().isTransactional();
        Boolean valueOf = Boolean.valueOf(this.transactional);
        Assert.isTrue(map.values().stream().map(kafkaOperations -> {
            return Boolean.valueOf(kafkaOperations.isTransactional());
        }).allMatch(bool -> {
            return bool.equals(valueOf);
        }), "All templates must have the same setting for transactional");
        this.destinationResolver = biFunction;
    }

    public void setRetainExceptionHeader(boolean z) {
        this.retainExceptionHeader = z;
    }

    public void setHeadersFunction(BiFunction<ConsumerRecord<?, ?>, Exception, Headers> biFunction) {
        Assert.notNull(biFunction, "'headersFunction' cannot be null");
        this.headersFunction = biFunction;
    }

    @Override // java.util.function.BiConsumer
    public void accept(ConsumerRecord<?, ?> consumerRecord, Exception exc) {
        TopicPartition apply = this.destinationResolver.apply(consumerRecord, exc);
        boolean z = false;
        DeserializationException exceptionFromHeader = ListenerUtils.getExceptionFromHeader(consumerRecord, ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
        if (exceptionFromHeader == null) {
            exceptionFromHeader = ListenerUtils.getExceptionFromHeader(consumerRecord, ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER, this.logger);
            z = true;
        }
        Headers recordHeaders = (exceptionFromHeader == null || this.retainExceptionHeader) ? new RecordHeaders(consumerRecord.headers().toArray()) : exceptionFromHeader.getHeaders();
        enhanceHeaders(recordHeaders, consumerRecord, exc);
        ProducerRecord<Object, Object> createProducerRecord = createProducerRecord(consumerRecord, apply, recordHeaders, exceptionFromHeader == null ? null : exceptionFromHeader.getData(), z);
        KafkaOperations<Object, Object> findTemplateForValue = findTemplateForValue(createProducerRecord.value());
        if (!this.transactional || findTemplateForValue.inTransaction() || findTemplateForValue.isAllowNonTransactional()) {
            publish(createProducerRecord, findTemplateForValue);
        } else {
            findTemplateForValue.executeInTransaction(kafkaOperations -> {
                publish(createProducerRecord, kafkaOperations);
                return null;
            });
        }
    }

    private KafkaOperations<Object, Object> findTemplateForValue(@Nullable Object obj) {
        if (this.template != null) {
            return this.template;
        }
        if (obj == null) {
            KafkaOperations<?, ?> kafkaOperations = this.templates.get(Void.class);
            return kafkaOperations == null ? this.templates.values().iterator().next() : kafkaOperations;
        }
        Optional<Class<?>> findFirst = this.templates.keySet().stream().filter(cls -> {
            return cls.isAssignableFrom(obj.getClass());
        }).findFirst();
        if (findFirst.isPresent()) {
            return this.templates.get(findFirst.get());
        }
        this.logger.warn(() -> {
            return "Failed to find a template for " + obj.getClass() + " attempting to use the last entry";
        });
        return this.templates.values().stream().reduce((kafkaOperations2, kafkaOperations3) -> {
            return kafkaOperations3;
        }).get();
    }

    protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> consumerRecord, TopicPartition topicPartition, Headers headers, @Nullable byte[] bArr, boolean z) {
        return new ProducerRecord<>(topicPartition.topic(), topicPartition.partition() < 0 ? null : Integer.valueOf(topicPartition.partition()), (!z || bArr == null) ? consumerRecord.key() : bArr, (bArr == null || z) ? consumerRecord.value() : bArr, headers);
    }

    protected void publish(ProducerRecord<Object, Object> producerRecord, KafkaOperations<Object, Object> kafkaOperations) {
        try {
            kafkaOperations.send(producerRecord).addCallback(sendResult -> {
                this.logger.debug(() -> {
                    return "Successful dead-letter publication: " + sendResult;
                });
            }, th -> {
                this.logger.error(th, () -> {
                    return "Dead-letter publication failed for: " + producerRecord;
                });
            });
        } catch (Exception e) {
            this.logger.error(e, () -> {
                return "Dead-letter publication failed for: " + producerRecord;
            });
        }
    }

    private void enhanceHeaders(Headers headers, ConsumerRecord<?, ?> consumerRecord, Exception exc) {
        headers.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC, consumerRecord.topic().getBytes(StandardCharsets.UTF_8)));
        headers.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION, ByteBuffer.allocate(4).putInt(consumerRecord.partition()).array()));
        headers.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET, ByteBuffer.allocate(8).putLong(consumerRecord.offset()).array()));
        headers.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP, ByteBuffer.allocate(8).putLong(consumerRecord.timestamp()).array()));
        headers.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE, consumerRecord.timestampType().toString().getBytes(StandardCharsets.UTF_8)));
        headers.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_FQCN, exc.getClass().getName().getBytes(StandardCharsets.UTF_8)));
        if (exc.getMessage() != null) {
            headers.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE, exc.getMessage().getBytes(StandardCharsets.UTF_8)));
        }
        headers.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE, getStackTraceAsString(exc).getBytes(StandardCharsets.UTF_8)));
        Headers apply = this.headersFunction.apply(consumerRecord, exc);
        if (apply != null) {
            apply.forEach(header -> {
                headers.add(header);
            });
        }
    }

    private String getStackTraceAsString(Throwable th) {
        StringWriter stringWriter = new StringWriter();
        th.printStackTrace(new PrintWriter((Writer) stringWriter, true));
        return stringWriter.getBuffer().toString();
    }
}
