package org.springframework.kafka.core.reactive;

import java.util.function.Function;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
import reactor.kafka.sender.TransactionManager;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

/* loaded from: input_file:BOOT-INF/lib/spring-kafka-2.8.4.jar:org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplate.class */
public class ReactiveKafkaProducerTemplate<K, V> implements AutoCloseable, DisposableBean {
    private final KafkaSender<K, V> sender;
    private final RecordMessageConverter messageConverter;

    public ReactiveKafkaProducerTemplate(SenderOptions<K, V> senderOptions) {
        this(senderOptions, new MessagingMessageConverter());
    }

    public ReactiveKafkaProducerTemplate(SenderOptions<K, V> senderOptions, RecordMessageConverter recordMessageConverter) {
        Assert.notNull(senderOptions, "Sender options can not be null");
        Assert.notNull(recordMessageConverter, "Message converter can not be null");
        this.sender = KafkaSender.create(senderOptions);
        this.messageConverter = recordMessageConverter;
    }

    public <T> Flux<SenderResult<T>> sendTransactionally(Publisher<? extends SenderRecord<K, V, T>> publisher) {
        return this.sender.sendTransactionally(Flux.just(publisher)).flatMap(Function.identity());
    }

    public <T> Mono<SenderResult<T>> sendTransactionally(SenderRecord<K, V, T> senderRecord) {
        return sendTransactionally((Publisher) Mono.just(senderRecord)).single();
    }

    public Mono<SenderResult<Void>> send(String str, V v) {
        return send(new ProducerRecord<>(str, v));
    }

    public Mono<SenderResult<Void>> send(String str, K k, V v) {
        return send(new ProducerRecord<>(str, k, v));
    }

    public Mono<SenderResult<Void>> send(String str, int i, K k, V v) {
        return send(new ProducerRecord<>(str, Integer.valueOf(i), k, v));
    }

    public Mono<SenderResult<Void>> send(String str, int i, long j, K k, V v) {
        return send(new ProducerRecord<>(str, Integer.valueOf(i), Long.valueOf(j), k, v));
    }

    public Mono<SenderResult<Void>> send(String str, Message<?> message) {
        byte[] bArr;
        ProducerRecord<?, ?> fromMessage = this.messageConverter.fromMessage(message, str);
        if (!fromMessage.headers().iterator().hasNext() && (bArr = (byte[]) message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class)) != null) {
            fromMessage.headers().add(KafkaHeaders.CORRELATION_ID, bArr);
        }
        return send(fromMessage);
    }

    public Mono<SenderResult<Void>> send(ProducerRecord<K, V> producerRecord) {
        return send(SenderRecord.create(producerRecord, (Object) null));
    }

    public <T> Mono<SenderResult<T>> send(SenderRecord<K, V, T> senderRecord) {
        return send((Publisher) Mono.just(senderRecord)).single();
    }

    public <T> Flux<SenderResult<T>> send(Publisher<? extends SenderRecord<K, V, T>> publisher) {
        return this.sender.send(publisher);
    }

    @Deprecated
    public Mono<?> flush() {
        return doOnProducer(producer -> {
            producer.flush();
            return Mono.empty();
        });
    }

    public Flux<PartitionInfo> partitionsFromProducerFor(String str) {
        return doOnProducer(producer -> {
            return producer.partitionsFor(str);
        }).flatMapIterable(Function.identity());
    }

    public Flux<Tuple2<MetricName, ? extends Metric>> metricsFromProducer() {
        return doOnProducer((v0) -> {
            return v0.metrics();
        }).flatMapIterable((v0) -> {
            return v0.entrySet();
        }).map(entry -> {
            return Tuples.of((MetricName) entry.getKey(), (Metric) entry.getValue());
        });
    }

    public <T> Mono<T> doOnProducer(Function<Producer<K, V>, ? extends T> function) {
        return this.sender.doOnProducer(function);
    }

    public TransactionManager transactionManager() {
        return this.sender.transactionManager();
    }

    @Override // org.springframework.beans.factory.DisposableBean
    public void destroy() {
        doClose();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        doClose();
    }

    private void doClose() {
        this.sender.close();
    }
}
