package reactor.kafka.sender.internals;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.UnicastProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.sender.KafkaOutbound;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
import reactor.kafka.sender.TransactionManager;

/* loaded from: input_file:BOOT-INF/lib/reactor-kafka-1.2.5.RELEASE.jar:reactor/kafka/sender/internals/DefaultKafkaSender.class */
public class DefaultKafkaSender<K, V> implements KafkaSender<K, V> {
    private static final Logger log = LoggerFactory.getLogger(DefaultKafkaSender.class.getName());
    private static final Set<String> DELEGATE_METHODS = new HashSet(Arrays.asList("sendOffsetsToTransaction", "partitionsFor", "metrics", "flush"));
    private final Mono<Producer<K, V>> producerMono;
    private final AtomicBoolean hasProducer = new AtomicBoolean();
    private final SenderOptions<K, V> senderOptions;
    private final DefaultKafkaSender<K, V>.DefaultTransactionManager transactionManager;
    private Producer<K, V> producerProxy;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/reactor-kafka-1.2.5.RELEASE.jar:reactor/kafka/sender/internals/DefaultKafkaSender$AbstractSendSubscriber.class */
    public abstract class AbstractSendSubscriber<Q, S, C> implements CoreSubscriber<Q> {
        protected final CoreSubscriber<? super S> actual;
        private final boolean stopOnError;
        private final Producer<K, V> producer;
        AtomicReference<SubscriberState> state = new AtomicReference<>(SubscriberState.INIT);
        private AtomicInteger inflight = new AtomicInteger();
        private AtomicReference<Throwable> firstException = new AtomicReference<>();

        AbstractSendSubscriber(Producer<K, V> producer, CoreSubscriber<? super S> coreSubscriber, boolean z) {
            this.producer = producer;
            this.stopOnError = z;
            this.actual = coreSubscriber;
        }

        @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            this.state.set(SubscriberState.ACTIVE);
            this.actual.onSubscribe(subscription);
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(Q q) {
            if (checkComplete(q)) {
                return;
            }
            this.inflight.incrementAndGet();
            if (Thread.interrupted()) {
                DefaultKafkaSender.log.trace("Previous operation on this scheduler was interrupted");
            }
            C correlationMetadata = correlationMetadata(q);
            try {
                if (DefaultKafkaSender.this.senderOptions.isTransactional()) {
                    DefaultKafkaSender.log.trace("Transactional send initiated for producer {} in state {} inflight {}: {}", DefaultKafkaSender.this.senderOptions.transactionalId(), this.state, this.inflight, q);
                }
                this.producer.send(producerRecord(q), (recordMetadata, exc) -> {
                    try {
                        try {
                            if (DefaultKafkaSender.this.senderOptions.isTransactional()) {
                                DefaultKafkaSender.log.trace("Transactional send completed for producer {} in state {} inflight {}: {}", DefaultKafkaSender.this.senderOptions.transactionalId(), this.state, this.inflight, q);
                            }
                            if (exc == null) {
                                handleMetadata(recordMetadata, correlationMetadata);
                            } else {
                                handleError(exc, correlationMetadata, this.stopOnError);
                            }
                            if (this.inflight.decrementAndGet() == 0) {
                                maybeComplete();
                            }
                        } catch (Exception e) {
                            handleError(e, correlationMetadata, true);
                            if (this.inflight.decrementAndGet() == 0) {
                                maybeComplete();
                            }
                        }
                    } catch (Throwable th) {
                        if (this.inflight.decrementAndGet() == 0) {
                            maybeComplete();
                        }
                        throw th;
                    }
                });
            } catch (Exception e) {
                this.inflight.decrementAndGet();
                handleError(e, correlationMetadata, this.stopOnError);
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            DefaultKafkaSender.log.trace("Sender failed with exception", th);
            if (this.state.compareAndSet(SubscriberState.ACTIVE, SubscriberState.COMPLETE) || this.state.compareAndSet(SubscriberState.OUTBOUND_DONE, SubscriberState.COMPLETE)) {
                this.actual.onError(th);
            } else if (this.firstException.compareAndSet(null, th) && this.state.get() == SubscriberState.COMPLETE) {
                Operators.onErrorDropped(th, this.actual.currentContext());
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            if (this.state.compareAndSet(SubscriberState.ACTIVE, SubscriberState.OUTBOUND_DONE) && this.inflight.get() == 0) {
                maybeComplete();
            }
        }

        private void maybeComplete() {
            if (this.state.compareAndSet(SubscriberState.OUTBOUND_DONE, SubscriberState.COMPLETE)) {
                Throwable th = this.firstException.get();
                if (th != null) {
                    this.actual.onError(th);
                } else {
                    this.actual.onComplete();
                }
            }
        }

        public void handleMetadata(RecordMetadata recordMetadata, C c) {
            if (checkComplete(recordMetadata)) {
                return;
            }
            handleResponse(recordMetadata, null, c);
        }

        public void handleError(Exception exc, C c, boolean z) {
            DefaultKafkaSender.log.error("Sender failed", (Throwable) exc);
            boolean checkComplete = checkComplete(exc);
            this.firstException.compareAndSet(null, exc);
            if (checkComplete) {
                return;
            }
            handleResponse(null, exc, c);
            if (z || DefaultKafkaSender.this.senderOptions.fatalException(exc)) {
                onError(exc);
            }
        }

        public <T> boolean checkComplete(T t) {
            boolean z = this.state.get() == SubscriberState.COMPLETE;
            if (z && this.firstException.get() == null) {
                Operators.onNextDropped(t, this.actual.currentContext());
            }
            return z;
        }

        protected abstract void handleResponse(RecordMetadata recordMetadata, Exception exc, C c);

        protected abstract ProducerRecord<K, V> producerRecord(Q q);

        protected abstract C correlationMetadata(Q q);
    }

    /* loaded from: input_file:BOOT-INF/lib/reactor-kafka-1.2.5.RELEASE.jar:reactor/kafka/sender/internals/DefaultKafkaSender$DefaultKafkaOutbound.class */
    private static class DefaultKafkaOutbound<K, V> implements KafkaOutbound<K, V> {
        final DefaultKafkaSender<K, V> sender;

        DefaultKafkaOutbound(DefaultKafkaSender<K, V> defaultKafkaSender) {
            this.sender = defaultKafkaSender;
        }

        @Override // reactor.kafka.sender.KafkaOutbound
        public KafkaOutbound<K, V> send(Publisher<? extends ProducerRecord<K, V>> publisher) {
            return then(this.sender.sendProducerRecords(publisher).then());
        }

        @Override // reactor.kafka.sender.KafkaOutbound
        public KafkaOutbound<K, V> sendTransactionally(Publisher<? extends Publisher<? extends ProducerRecord<K, V>>> publisher) {
            return then(Flux.from(publisher).publishOn(((DefaultKafkaSender) this.sender).senderOptions.scheduler()).concatMapDelayError(publisher2 -> {
                return this.sender.transaction(publisher2);
            }, false, 1));
        }

        @Override // reactor.kafka.sender.KafkaOutbound
        public KafkaOutbound<K, V> then(Publisher<Void> publisher) {
            return new KafkaOutboundThen(this.sender, this, publisher);
        }

        @Override // reactor.kafka.sender.KafkaOutbound
        public Mono<Void> then() {
            return Mono.empty();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/reactor-kafka-1.2.5.RELEASE.jar:reactor/kafka/sender/internals/DefaultKafkaSender$DefaultTransactionManager.class */
    public class DefaultTransactionManager implements TransactionManager {
        private DefaultTransactionManager() {
        }

        @Override // reactor.kafka.sender.TransactionManager
        public <T> Mono<T> begin() {
            return DefaultKafkaSender.this.producerMono.flatMap(producer -> {
                return Mono.create(monoSink -> {
                    producer.beginTransaction();
                    DefaultKafkaSender.log.debug("Begin a new transaction for producer {}", DefaultKafkaSender.this.senderOptions.transactionalId());
                    monoSink.success();
                });
            });
        }

        @Override // reactor.kafka.sender.TransactionManager
        public <T> Mono<T> sendOffsets(Map<TopicPartition, OffsetAndMetadata> map, String str) {
            return DefaultKafkaSender.this.producerMono.flatMap(producer -> {
                return Mono.create(monoSink -> {
                    if (!map.isEmpty()) {
                        producer.sendOffsetsToTransaction((Map<TopicPartition, OffsetAndMetadata>) map, str);
                        DefaultKafkaSender.log.trace("Sent offsets to transaction for producer {}, offsets: {}", DefaultKafkaSender.this.senderOptions.transactionalId(), map);
                    }
                    monoSink.success();
                });
            });
        }

        @Override // reactor.kafka.sender.TransactionManager
        public <T> Mono<T> commit() {
            return DefaultKafkaSender.this.producerMono.flatMap(producer -> {
                return Mono.create(monoSink -> {
                    producer.commitTransaction();
                    DefaultKafkaSender.log.debug("Commit current transaction for producer {}", DefaultKafkaSender.this.senderOptions.transactionalId());
                    monoSink.success();
                });
            });
        }

        @Override // reactor.kafka.sender.TransactionManager
        public <T> Mono<T> abort() {
            return DefaultKafkaSender.this.producerMono.flatMap(producer -> {
                return Mono.create(monoSink -> {
                    producer.abortTransaction();
                    DefaultKafkaSender.log.debug("Abort current transaction for producer {}", DefaultKafkaSender.this.senderOptions.transactionalId());
                    monoSink.success();
                });
            });
        }

        @Override // reactor.kafka.sender.TransactionManager
        public Scheduler scheduler() {
            return DefaultKafkaSender.this.senderOptions.scheduler();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/reactor-kafka-1.2.5.RELEASE.jar:reactor/kafka/sender/internals/DefaultKafkaSender$KafkaOutboundThen.class */
    public static class KafkaOutboundThen<K, V> extends DefaultKafkaOutbound<K, V> {
        private final Mono<Void> thenMono;

        KafkaOutboundThen(DefaultKafkaSender<K, V> defaultKafkaSender, KafkaOutbound<K, V> kafkaOutbound, Publisher<Void> publisher) {
            super(defaultKafkaSender);
            Mono<Void> then = kafkaOutbound.then();
            if (then == Mono.empty()) {
                this.thenMono = Mono.from(publisher);
            } else {
                this.thenMono = then.thenEmpty(publisher);
            }
        }

        @Override // reactor.kafka.sender.internals.DefaultKafkaSender.DefaultKafkaOutbound, reactor.kafka.sender.KafkaOutbound
        public Mono<Void> then() {
            return this.thenMono;
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/reactor-kafka-1.2.5.RELEASE.jar:reactor/kafka/sender/internals/DefaultKafkaSender$Response.class */
    static class Response<T> implements SenderResult<T> {
        private final RecordMetadata metadata;
        private final Exception exception;
        private final T correlationMetadata;

        public Response(RecordMetadata recordMetadata, Exception exc, T t) {
            this.metadata = recordMetadata;
            this.exception = exc;
            this.correlationMetadata = t;
        }

        @Override // reactor.kafka.sender.SenderResult
        public RecordMetadata recordMetadata() {
            return this.metadata;
        }

        @Override // reactor.kafka.sender.SenderResult
        public Exception exception() {
            return this.exception;
        }

        @Override // reactor.kafka.sender.SenderResult
        public T correlationMetadata() {
            return this.correlationMetadata;
        }

        public String toString() {
            return String.format("Correlation=%s metadata=%s exception=%s", this.correlationMetadata, this.metadata, this.exception);
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/reactor-kafka-1.2.5.RELEASE.jar:reactor/kafka/sender/internals/DefaultKafkaSender$SendSubscriber.class */
    private class SendSubscriber<T> extends DefaultKafkaSender<K, V>.AbstractSendSubscriber<SenderRecord<K, V, T>, SenderResult<T>, T> {
        SendSubscriber(Producer<K, V> producer, CoreSubscriber<? super SenderResult<T>> coreSubscriber, boolean z) {
            super(producer, coreSubscriber, z);
        }

        @Override // reactor.kafka.sender.internals.DefaultKafkaSender.AbstractSendSubscriber
        protected void handleResponse(RecordMetadata recordMetadata, Exception exc, T t) {
            this.actual.onNext(new Response(recordMetadata, exc, t));
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // reactor.kafka.sender.internals.DefaultKafkaSender.AbstractSendSubscriber
        public T correlationMetadata(SenderRecord<K, V, T> senderRecord) {
            return senderRecord.correlationMetadata();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // reactor.kafka.sender.internals.DefaultKafkaSender.AbstractSendSubscriber
        public ProducerRecord<K, V> producerRecord(SenderRecord<K, V, T> senderRecord) {
            return senderRecord;
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/reactor-kafka-1.2.5.RELEASE.jar:reactor/kafka/sender/internals/DefaultKafkaSender$SendSubscriberNoResponse.class */
    private class SendSubscriberNoResponse extends DefaultKafkaSender<K, V>.AbstractSendSubscriber<ProducerRecord<K, V>, Object, Void> {
        SendSubscriberNoResponse(Producer<K, V> producer, CoreSubscriber<? super Object> coreSubscriber, boolean z) {
            super(producer, coreSubscriber, z);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // reactor.kafka.sender.internals.DefaultKafkaSender.AbstractSendSubscriber
        public void handleResponse(RecordMetadata recordMetadata, Exception exc, Void r6) {
            if (recordMetadata != null) {
                this.actual.onNext(recordMetadata);
            } else {
                this.actual.onNext(exc);
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // reactor.kafka.sender.internals.DefaultKafkaSender.AbstractSendSubscriber
        public Void correlationMetadata(ProducerRecord<K, V> producerRecord) {
            return null;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // reactor.kafka.sender.internals.DefaultKafkaSender.AbstractSendSubscriber
        public ProducerRecord<K, V> producerRecord(ProducerRecord<K, V> producerRecord) {
            return producerRecord;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/reactor-kafka-1.2.5.RELEASE.jar:reactor/kafka/sender/internals/DefaultKafkaSender$SubscriberState.class */
    public enum SubscriberState {
        INIT,
        ACTIVE,
        OUTBOUND_DONE,
        COMPLETE
    }

    public DefaultKafkaSender(ProducerFactory producerFactory, SenderOptions<K, V> senderOptions) {
        this.senderOptions = senderOptions.toImmutable().scheduler(senderOptions.isTransactional() ? Schedulers.newSingle(senderOptions.transactionalId()) : senderOptions.scheduler());
        Mono<Producer<K, V>> cache = Mono.fromCallable(() -> {
            Producer<K, V> createProducer = producerFactory.createProducer(this.senderOptions);
            if (this.senderOptions.isTransactional()) {
                log.info("Initializing transactions for producer {}", this.senderOptions.transactionalId());
                createProducer.initTransactions();
            }
            return createProducer;
        }).doOnSubscribe(subscription -> {
            this.hasProducer.set(true);
        }).cache();
        if (this.senderOptions.isTransactional()) {
            this.producerMono = cache.publishOn(this.senderOptions.scheduler());
            this.transactionManager = new DefaultTransactionManager();
        } else {
            this.transactionManager = null;
            this.producerMono = cache;
        }
    }

    @Override // reactor.kafka.sender.KafkaSender
    public <T> Flux<SenderResult<T>> send(Publisher<? extends SenderRecord<K, V, T>> publisher) {
        return (Flux<SenderResult<T>>) this.producerMono.flatMapMany(producer -> {
            return new Flux<SenderResult<T>>() { // from class: reactor.kafka.sender.internals.DefaultKafkaSender.1
                @Override // reactor.core.publisher.Flux, reactor.core.CorePublisher
                public void subscribe(CoreSubscriber<? super SenderResult<T>> coreSubscriber) {
                    Flux.from(publisher).subscribe((CoreSubscriber) new SendSubscriber(producer, coreSubscriber, DefaultKafkaSender.this.senderOptions.stopOnError()));
                }
            }.doOnError(th -> {
                log.trace("Send failed with exception", th);
            }).publishOn(this.senderOptions.scheduler(), this.senderOptions.maxInFlight());
        });
    }

    @Override // reactor.kafka.sender.KafkaSender
    public KafkaOutbound<K, V> createOutbound() {
        return new DefaultKafkaOutbound(this);
    }

    @Override // reactor.kafka.sender.KafkaSender
    public <T> Flux<Flux<SenderResult<T>>> sendTransactionally(Publisher<? extends Publisher<? extends SenderRecord<K, V, T>>> publisher) {
        UnicastProcessor create = UnicastProcessor.create();
        return Flux.from(publisher).publishOn(this.senderOptions.scheduler(), false, 1).concatMapDelayError(publisher2 -> {
            return transaction(publisher2, create);
        }, false, 1).window(create).doOnTerminate(() -> {
            create.onComplete();
        }).doOnCancel(() -> {
            create.onComplete();
        });
    }

    @Override // reactor.kafka.sender.KafkaSender
    public TransactionManager transactionManager() {
        if (this.transactionManager == null) {
            throw new IllegalStateException("Transactions are not enabled");
        }
        return this.transactionManager;
    }

    @Override // reactor.kafka.sender.KafkaSender
    public <T> Mono<T> doOnProducer(Function<Producer<K, V>, ? extends T> function) {
        return (Mono<T>) this.producerMono.flatMap(producer -> {
            return Mono.create(monoSink -> {
                try {
                    monoSink.success(function.apply(producerProxy(producer)));
                } catch (Throwable th) {
                    monoSink.error(th);
                }
            });
        });
    }

    @Override // reactor.kafka.sender.KafkaSender
    public void close() {
        if (this.hasProducer.getAndSet(false)) {
            this.producerMono.doOnNext(producer -> {
                producer.close(this.senderOptions.closeTimeout().toMillis(), TimeUnit.MILLISECONDS);
            }).block();
            if (this.senderOptions.isTransactional()) {
                this.senderOptions.scheduler().dispose();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Flux<Object> sendProducerRecords(Publisher<? extends ProducerRecord<K, V>> publisher) {
        return this.producerMono.flatMapMany(producer -> {
            return new Flux<Object>() { // from class: reactor.kafka.sender.internals.DefaultKafkaSender.2
                @Override // reactor.core.publisher.Flux, reactor.core.CorePublisher
                public void subscribe(CoreSubscriber<? super Object> coreSubscriber) {
                    Flux.from(publisher).subscribe((CoreSubscriber) new SendSubscriberNoResponse(producer, coreSubscriber, DefaultKafkaSender.this.senderOptions.stopOnError()));
                }
            }.doOnError(th -> {
                log.trace("Send failed with exception", th);
            }).publishOn(this.senderOptions.scheduler(), this.senderOptions.maxInFlight());
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Mono<Void> transaction(Publisher<? extends ProducerRecord<K, V>> publisher) {
        return transactionManager().begin().thenMany(sendProducerRecords(publisher)).concatWith(transactionManager().commit()).onErrorResume(th -> {
            return transactionManager().abort().then(Mono.error(th));
        }).publishOn(this.senderOptions.scheduler()).then();
    }

    private <T> Flux<SenderResult<T>> transaction(Publisher<? extends SenderRecord<K, V, T>> publisher, UnicastProcessor<Object> unicastProcessor) {
        return transactionManager().begin().thenMany(send(publisher)).concatWith(transactionManager().commit()).concatWith(Mono.create(monoSink -> {
            unicastProcessor.onNext(this);
            monoSink.success();
        })).onErrorResume(th -> {
            return transactionManager().abort().then(Mono.error(th));
        }).publishOn(this.senderOptions.scheduler());
    }

    private synchronized Producer<K, V> producerProxy(Producer<K, V> producer) {
        if (this.producerProxy == null) {
            this.producerProxy = (Producer) Proxy.newProxyInstance(Producer.class.getClassLoader(), new Class[]{Producer.class}, (obj, method, objArr) -> {
                if (!DELEGATE_METHODS.contains(method.getName())) {
                    throw new UnsupportedOperationException("Method is not supported: " + method);
                }
                try {
                    return method.invoke(producer, objArr);
                } catch (InvocationTargetException e) {
                    throw e.getCause();
                }
            });
        }
        return this.producerProxy;
    }
}
