package io.apicurio.registry.utils.kafka;

import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/apicurio-registry-utils-kafka.jar:io/apicurio/registry/utils/kafka/AsyncProducer.class */
public class AsyncProducer<K, V> implements ProducerActions<K, V> {
    private static final Logger log = LoggerFactory.getLogger(AsyncProducer.class);
    private final Properties producerProps;
    private final Serializer<K> keySerializer;
    private final Serializer<V> valSerializer;
    private KafkaProducer<K, V> producer;
    private boolean closed;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/apicurio-registry-utils-kafka.jar:io/apicurio/registry/utils/kafka/AsyncProducer$CFC.class */
    public class CFC extends CompletableFuture<RecordMetadata> implements Callback {
        private final KafkaProducer<?, ?> producer;

        CFC(KafkaProducer<?, ?> kafkaProducer) {
            this.producer = kafkaProducer;
        }

        @Override // org.apache.kafka.clients.producer.Callback
        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc == null) {
                complete(recordMetadata);
                return;
            }
            try {
                if (isFatalException(exc)) {
                    AsyncProducer.this.closeProducer(this.producer, true);
                }
            } finally {
                completeExceptionally(exc);
            }
        }

        private boolean isFatalException(Exception exc) {
            return (exc instanceof UnsupportedVersionException) || (exc instanceof AuthorizationException) || (exc instanceof ProducerFencedException) || (exc instanceof OutOfOrderSequenceException);
        }
    }

    public AsyncProducer(Properties properties, Serializer<K> serializer, Serializer<V> serializer2) {
        this.producerProps = (Properties) Objects.requireNonNull(properties, "producerProps");
        this.keySerializer = (Serializer) Objects.requireNonNull(serializer, "keySerializer");
        this.valSerializer = (Serializer) Objects.requireNonNull(serializer2, "valSerializer");
    }

    @Override // java.util.function.Function
    public CompletableFuture<RecordMetadata> apply(ProducerRecord<K, V> producerRecord) {
        CompletableFuture<RecordMetadata> completableFuture = null;
        try {
            KafkaProducer<K, V> producer = getProducer();
            completableFuture = new CFC(producer);
            producer.send(producerRecord, (CFC) completableFuture);
        } catch (Exception e) {
            if (completableFuture != null) {
                ((CFC) completableFuture).onCompletion(null, e);
            } else {
                completableFuture = new CompletableFuture<>();
                completableFuture.completeExceptionally(e);
            }
        }
        return completableFuture;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        closeProducer(null, false);
    }

    private synchronized KafkaProducer<K, V> getProducer() {
        if (this.producer == null) {
            if (this.closed) {
                throw new IllegalStateException("This producer is already closed.");
            }
            log.info("Creating new resilient producer.");
            this.producer = new KafkaProducer<>(this.producerProps, (Serializer) this.keySerializer, (Serializer) this.valSerializer);
        }
        return this.producer;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Finally extract failed */
    public synchronized void closeProducer(KafkaProducer<?, ?> kafkaProducer, boolean z) {
        if (kafkaProducer == null) {
            try {
                kafkaProducer = this.producer;
            } finally {
                if (!z) {
                    this.closed = true;
                }
            }
        }
        if (kafkaProducer != null) {
            try {
                if (kafkaProducer == this.producer) {
                    try {
                        log.info("Closing resilient producer.");
                        if (z) {
                            kafkaProducer.close(Duration.ZERO);
                        } else {
                            kafkaProducer.close();
                        }
                        this.producer = null;
                    } catch (Exception e) {
                        log.warn("Exception caught while closing producer.", e);
                        this.producer = null;
                    }
                }
            } catch (Throwable th) {
                this.producer = null;
                throw th;
            }
        }
    }
}
