package org.springframework.kafka.core;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
import java.util.function.Supplier;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:BOOT-INF/lib/spring-kafka-3.0.5.jar:org/springframework/kafka/core/DefaultKafkaProducerFactory.class */
public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory implements ProducerFactory<K, V>, ApplicationContextAware, BeanNameAware, ApplicationListener<ContextStoppedEvent>, DisposableBean {
    private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog((Class<?>) DefaultKafkaProducerFactory.class));
    private final Map<String, Object> configs;
    private final AtomicInteger transactionIdSuffix;
    private final Map<String, BlockingQueue<CloseSafeProducer<K, V>>> cache;
    private final ThreadLocal<CloseSafeProducer<K, V>> threadBoundProducers;
    private final AtomicInteger epoch;
    private final AtomicInteger clientIdCounter;
    private final List<ProducerFactory.Listener<K, V>> listeners;
    private final List<ProducerPostProcessor<K, V>> postProcessors;
    private Supplier<Serializer<K>> keySerializerSupplier;
    private Supplier<Serializer<V>> valueSerializerSupplier;
    private Supplier<Serializer<K>> rawKeySerializerSupplier;
    private Supplier<Serializer<V>> rawValueSerializerSupplier;
    private Duration physicalCloseTimeout;
    private ApplicationContext applicationContext;
    private String beanName;
    private boolean producerPerThread;
    private long maxAge;
    private boolean configureSerializers;
    private volatile String transactionIdPrefix;
    private volatile String clientIdPrefix;
    private volatile CloseSafeProducer<K, V> producer;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:BOOT-INF/lib/spring-kafka-3.0.5.jar:org/springframework/kafka/core/DefaultKafkaProducerFactory$CloseSafeProducer.class */
    public static class CloseSafeProducer<K, V> implements Producer<K, V> {
        private static final Duration CLOSE_TIMEOUT_AFTER_TX_TIMEOUT = Duration.ofMillis(0);
        private final Producer<K, V> delegate;
        private final BiPredicate<CloseSafeProducer<K, V>, Duration> removeProducer;
        final String txIdPrefix;
        final long created;
        private final Duration closeTimeout;
        final String clientId;
        final int epoch;
        private volatile Exception producerFailed;
        volatile boolean closed;

        CloseSafeProducer(Producer<K, V> producer, BiPredicate<CloseSafeProducer<K, V>, Duration> biPredicate, Duration duration, String str, int i) {
            this(producer, biPredicate, null, duration, str, i);
        }

        CloseSafeProducer(Producer<K, V> producer, BiPredicate<CloseSafeProducer<K, V>, Duration> biPredicate, @Nullable String str, Duration duration, String str2, int i) {
            Assert.isTrue(!(producer instanceof CloseSafeProducer), "Cannot double-wrap a producer");
            this.delegate = producer;
            this.removeProducer = biPredicate;
            this.txIdPrefix = str;
            this.closeTimeout = duration;
            Iterator<MetricName> it = producer.metrics().keySet().iterator();
            this.clientId = str2 + "." + (it.hasNext() ? it.next().tags().get(ClientQuotaEntity.CLIENT_ID) : "unknown");
            this.created = System.currentTimeMillis();
            this.epoch = i;
            DefaultKafkaProducerFactory.LOGGER.debug(() -> {
                return "Created new Producer: " + this;
            });
        }

        Producer<K, V> getDelegate() {
            return this.delegate;
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
            DefaultKafkaProducerFactory.LOGGER.trace(() -> {
                return toString() + " send(" + producerRecord + ")";
            });
            return this.delegate.send(producerRecord);
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord, final Callback callback) {
            DefaultKafkaProducerFactory.LOGGER.trace(() -> {
                return toString() + " send(" + producerRecord + ")";
            });
            return this.delegate.send(producerRecord, new Callback() { // from class: org.springframework.kafka.core.DefaultKafkaProducerFactory.CloseSafeProducer.1
                @Override // org.apache.kafka.clients.producer.Callback
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    if (exc instanceof OutOfOrderSequenceException) {
                        CloseSafeProducer.this.producerFailed = exc;
                        CloseSafeProducer.this.close(CloseSafeProducer.this.closeTimeout);
                    }
                    callback.onCompletion(recordMetadata, exc);
                }
            });
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public void flush() {
            DefaultKafkaProducerFactory.LOGGER.trace(() -> {
                return toString() + " flush()";
            });
            this.delegate.flush();
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public List<PartitionInfo> partitionsFor(String str) {
            return this.delegate.partitionsFor(str);
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public Map<MetricName, ? extends Metric> metrics() {
            return this.delegate.metrics();
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public void initTransactions() {
            this.delegate.initTransactions();
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public void beginTransaction() throws ProducerFencedException {
            DefaultKafkaProducerFactory.LOGGER.debug(() -> {
                return toString() + " beginTransaction()";
            });
            try {
                this.delegate.beginTransaction();
            } catch (RuntimeException e) {
                DefaultKafkaProducerFactory.LOGGER.error(e, () -> {
                    return "beginTransaction failed: " + this;
                });
                this.producerFailed = e;
                throw e;
            }
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, String str) throws ProducerFencedException {
            DefaultKafkaProducerFactory.LOGGER.trace(() -> {
                return toString() + " sendOffsetsToTransaction(" + map + ", " + str + ")";
            });
            this.delegate.sendOffsetsToTransaction(map, str);
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, ConsumerGroupMetadata consumerGroupMetadata) throws ProducerFencedException {
            DefaultKafkaProducerFactory.LOGGER.trace(() -> {
                return toString() + " sendOffsetsToTransaction(" + map + ", " + consumerGroupMetadata + ")";
            });
            this.delegate.sendOffsetsToTransaction(map, consumerGroupMetadata);
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public void commitTransaction() throws ProducerFencedException {
            DefaultKafkaProducerFactory.LOGGER.debug(() -> {
                return toString() + " commitTransaction()";
            });
            try {
                this.delegate.commitTransaction();
            } catch (RuntimeException e) {
                DefaultKafkaProducerFactory.LOGGER.error(e, () -> {
                    return "commitTransaction failed: " + this;
                });
                this.producerFailed = e;
                throw e;
            }
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public void abortTransaction() throws ProducerFencedException {
            DefaultKafkaProducerFactory.LOGGER.debug(() -> {
                return toString() + " abortTransaction()";
            });
            if (this.producerFailed != null) {
                DefaultKafkaProducerFactory.LOGGER.debug(() -> {
                    return "abortTransaction ignored - previous txFailed: " + this.producerFailed.getMessage() + ": " + this;
                });
                return;
            }
            try {
                this.delegate.abortTransaction();
            } catch (RuntimeException e) {
                DefaultKafkaProducerFactory.LOGGER.error(e, () -> {
                    return "Abort failed: " + this;
                });
                this.producerFailed = e;
                throw e;
            }
        }

        @Override // org.apache.kafka.clients.producer.Producer, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            close(null);
        }

        @Override // org.apache.kafka.clients.producer.Producer
        public void close(@Nullable Duration duration) {
            DefaultKafkaProducerFactory.LOGGER.trace(() -> {
                return toString() + " close(" + (duration == null ? "null" : duration) + ")";
            });
            if (this.closed) {
                return;
            }
            if (this.producerFailed == null) {
                this.closed = this.removeProducer.test(this, duration);
                return;
            }
            DefaultKafkaProducerFactory.LOGGER.warn(() -> {
                return "Error during some operation; producer removed from cache: " + this;
            });
            this.closed = true;
            this.removeProducer.test(this, this.producerFailed instanceof TimeoutException ? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT : duration);
        }

        void closeDelegate(Duration duration, List<ProducerFactory.Listener<K, V>> list) {
            this.delegate.close(duration == null ? this.closeTimeout : duration);
            list.forEach(listener -> {
                listener.producerRemoved(this.clientId, this);
            });
            this.closed = true;
        }

        public String toString() {
            return "CloseSafeProducer [delegate=" + this.delegate + "]";
        }
    }

    public DefaultKafkaProducerFactory(Map<String, Object> map) {
        this(map, () -> {
            return null;
        }, () -> {
            return null;
        });
    }

    public DefaultKafkaProducerFactory(Map<String, Object> map, @Nullable Serializer<K> serializer, @Nullable Serializer<V> serializer2) {
        this(map, () -> {
            return serializer;
        }, () -> {
            return serializer2;
        }, true);
    }

    public DefaultKafkaProducerFactory(Map<String, Object> map, @Nullable Serializer<K> serializer, @Nullable Serializer<V> serializer2, boolean z) {
        this(map, () -> {
            return serializer;
        }, () -> {
            return serializer2;
        }, z);
    }

    public DefaultKafkaProducerFactory(Map<String, Object> map, @Nullable Supplier<Serializer<K>> supplier, @Nullable Supplier<Serializer<V>> supplier2) {
        this(map, (Supplier) supplier, (Supplier) supplier2, true);
    }

    public DefaultKafkaProducerFactory(Map<String, Object> map, @Nullable Supplier<Serializer<K>> supplier, @Nullable Supplier<Serializer<V>> supplier2, boolean z) {
        this.transactionIdSuffix = new AtomicInteger();
        this.cache = new ConcurrentHashMap();
        this.threadBoundProducers = new ThreadLocal<>();
        this.epoch = new AtomicInteger();
        this.clientIdCounter = new AtomicInteger();
        this.listeners = new ArrayList();
        this.postProcessors = new ArrayList();
        this.physicalCloseTimeout = DEFAULT_PHYSICAL_CLOSE_TIMEOUT;
        this.beanName = "not.managed.by.Spring";
        this.configureSerializers = true;
        this.configs = new ConcurrentHashMap(map);
        this.configureSerializers = z;
        this.keySerializerSupplier = keySerializerSupplier(supplier);
        this.valueSerializerSupplier = valueSerializerSupplier(supplier2);
        if (this.clientIdPrefix == null && (map.get("client.id") instanceof String)) {
            this.clientIdPrefix = (String) map.get("client.id");
        }
        String str = (String) this.configs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
        if (StringUtils.hasText(str)) {
            setTransactionIdPrefix(str);
            this.configs.remove(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
        }
    }

    private Supplier<Serializer<K>> keySerializerSupplier(@Nullable Supplier<Serializer<K>> supplier) {
        this.rawKeySerializerSupplier = supplier;
        return !this.configureSerializers ? supplier : supplier == null ? () -> {
            return null;
        } : () -> {
            Serializer serializer = (Serializer) supplier.get();
            if (serializer != null) {
                serializer.configure(this.configs, true);
            }
            return serializer;
        };
    }

    private Supplier<Serializer<V>> valueSerializerSupplier(@Nullable Supplier<Serializer<V>> supplier) {
        this.rawValueSerializerSupplier = supplier;
        return !this.configureSerializers ? supplier : supplier == null ? () -> {
            return null;
        } : () -> {
            Serializer serializer = (Serializer) supplier.get();
            if (serializer != null) {
                serializer.configure(this.configs, false);
            }
            return serializer;
        };
    }

    @Override // org.springframework.context.ApplicationContextAware
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @Override // org.springframework.beans.factory.BeanNameAware
    public void setBeanName(String str) {
        this.beanName = str;
    }

    public void setKeySerializer(@Nullable Serializer<K> serializer) {
        this.keySerializerSupplier = keySerializerSupplier(() -> {
            return serializer;
        });
    }

    public void setValueSerializer(@Nullable Serializer<V> serializer) {
        this.valueSerializerSupplier = valueSerializerSupplier(() -> {
            return serializer;
        });
    }

    public void setKeySerializerSupplier(Supplier<Serializer<K>> supplier) {
        this.keySerializerSupplier = keySerializerSupplier(supplier);
    }

    public void setValueSerializerSupplier(Supplier<Serializer<V>> supplier) {
        this.valueSerializerSupplier = valueSerializerSupplier(supplier);
    }

    public boolean isConfigureSerializers() {
        return this.configureSerializers;
    }

    public void setConfigureSerializers(boolean z) {
        this.configureSerializers = z;
    }

    public void setPhysicalCloseTimeout(int i) {
        this.physicalCloseTimeout = Duration.ofSeconds(i);
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public Duration getPhysicalCloseTimeout() {
        return this.physicalCloseTimeout;
    }

    public final void setTransactionIdPrefix(String str) {
        Assert.notNull(str, "'transactionIdPrefix' cannot be null");
        this.transactionIdPrefix = str;
        enableIdempotentBehaviour();
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    @Nullable
    public String getTransactionIdPrefix() {
        return this.transactionIdPrefix;
    }

    public void setProducerPerThread(boolean z) {
        this.producerPerThread = z;
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public boolean isProducerPerThread() {
        return this.producerPerThread;
    }

    @Deprecated(since = "3.0", forRemoval = true)
    public void setProducerPerConsumerPartition(boolean z) {
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    @Deprecated(since = "3.0", forRemoval = true)
    public boolean isProducerPerConsumerPartition() {
        return false;
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    @Nullable
    public Serializer<K> getKeySerializer() {
        return this.keySerializerSupplier.get();
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    @Nullable
    public Serializer<V> getValueSerializer() {
        return this.valueSerializerSupplier.get();
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public Supplier<Serializer<K>> getKeySerializerSupplier() {
        return this.rawKeySerializerSupplier;
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public Supplier<Serializer<V>> getValueSerializerSupplier() {
        return this.rawValueSerializerSupplier;
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public Map<String, Object> getConfigurationProperties() {
        HashMap hashMap = new HashMap(this.configs);
        checkBootstrap(hashMap);
        return Collections.unmodifiableMap(hashMap);
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public List<ProducerFactory.Listener<K, V>> getListeners() {
        return Collections.unmodifiableList(this.listeners);
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public List<ProducerPostProcessor<K, V>> getPostProcessors() {
        return Collections.unmodifiableList(this.postProcessors);
    }

    public void setMaxAge(Duration duration) {
        this.maxAge = duration.toMillis();
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public ProducerFactory<K, V> copyWithConfigurationOverride(Map<String, Object> map) {
        HashMap hashMap = new HashMap(getConfigurationProperties());
        hashMap.putAll(map);
        DefaultKafkaProducerFactory defaultKafkaProducerFactory = new DefaultKafkaProducerFactory(ensureExistingTransactionIdPrefixInProperties(hashMap), getKeySerializerSupplier(), getValueSerializerSupplier(), isConfigureSerializers());
        defaultKafkaProducerFactory.setPhysicalCloseTimeout((int) getPhysicalCloseTimeout().getSeconds());
        defaultKafkaProducerFactory.setProducerPerThread(isProducerPerThread());
        Iterator<ProducerPostProcessor<K, V>> it = getPostProcessors().iterator();
        while (it.hasNext()) {
            defaultKafkaProducerFactory.addPostProcessor(it.next());
        }
        Iterator<ProducerFactory.Listener<K, V>> it2 = getListeners().iterator();
        while (it2.hasNext()) {
            defaultKafkaProducerFactory.addListener(it2.next());
        }
        return defaultKafkaProducerFactory;
    }

    private Map<String, Object> ensureExistingTransactionIdPrefixInProperties(Map<String, Object> map) {
        String transactionIdPrefix = getTransactionIdPrefix();
        if (!StringUtils.hasText(transactionIdPrefix) || map.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) {
            return map;
        }
        HashMap hashMap = new HashMap(map);
        hashMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionIdPrefix);
        return hashMap;
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public void addListener(ProducerFactory.Listener<K, V> listener) {
        Assert.notNull(listener, "'listener' cannot be null");
        this.listeners.add(listener);
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public void addListener(int i, ProducerFactory.Listener<K, V> listener) {
        Assert.notNull(listener, "'listener' cannot be null");
        if (i >= this.listeners.size()) {
            this.listeners.add(listener);
        } else {
            this.listeners.add(i, listener);
        }
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public boolean removeListener(ProducerFactory.Listener<K, V> listener) {
        return this.listeners.remove(listener);
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public void addPostProcessor(ProducerPostProcessor<K, V> producerPostProcessor) {
        Assert.notNull(producerPostProcessor, "'postProcessor' cannot be null");
        this.postProcessors.add(producerPostProcessor);
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public boolean removePostProcessor(ProducerPostProcessor<K, V> producerPostProcessor) {
        return this.postProcessors.remove(producerPostProcessor);
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public void updateConfigs(Map<String, Object> map) {
        map.entrySet().forEach(entry -> {
            if (((String) entry.getKey()).equals(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) {
                Assert.isTrue(entry.getValue() instanceof String, (Supplier<String>) () -> {
                    return "'transactional.id' must be a String, not a " + entry.getClass().getName();
                });
                Assert.isTrue(this.transactionIdPrefix != null ? entry.getValue() != null : entry.getValue() == null, "Cannot change transactional capability");
                this.transactionIdPrefix = (String) entry.getValue();
            } else if (!((String) entry.getKey()).equals("client.id")) {
                this.configs.put((String) entry.getKey(), entry.getValue());
            } else {
                Assert.isTrue(entry.getValue() instanceof String, (Supplier<String>) () -> {
                    return "'client.id' must be a String, not a " + entry.getClass().getName();
                });
                this.clientIdPrefix = (String) entry.getValue();
            }
        });
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public void removeConfig(String str) {
        this.configs.remove(str);
    }

    private void enableIdempotentBehaviour() {
        if (Boolean.FALSE.equals(this.configs.putIfAbsent(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true))) {
            LOGGER.debug(() -> {
                return "The 'enable.idempotence' is set to false, may result in duplicate messages";
            });
        }
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public boolean transactionCapable() {
        return this.transactionIdPrefix != null;
    }

    @Override // org.springframework.beans.factory.DisposableBean
    public void destroy() {
        CloseSafeProducer<K, V> closeSafeProducer;
        synchronized (this) {
            closeSafeProducer = this.producer;
            this.producer = null;
        }
        if (closeSafeProducer != null) {
            closeSafeProducer.closeDelegate(this.physicalCloseTimeout, this.listeners);
        }
        this.cache.values().forEach(blockingQueue -> {
            Object poll = blockingQueue.poll();
            while (true) {
                CloseSafeProducer closeSafeProducer2 = (CloseSafeProducer) poll;
                if (closeSafeProducer2 == null) {
                    return;
                }
                try {
                    closeSafeProducer2.closeDelegate(this.physicalCloseTimeout, this.listeners);
                } catch (Exception e) {
                    LOGGER.error(e, "Exception while closing producer");
                }
                poll = blockingQueue.poll();
            }
        });
        this.epoch.incrementAndGet();
    }

    @Override // org.springframework.context.ApplicationListener
    public void onApplicationEvent(ContextStoppedEvent contextStoppedEvent) {
        if (contextStoppedEvent.getApplicationContext().equals(this.applicationContext)) {
            reset();
        }
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public void reset() {
        try {
            destroy();
        } catch (Exception e) {
            LOGGER.error(e, "Exception while closing producer");
        }
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public Producer<K, V> createProducer() {
        return createProducer(this.transactionIdPrefix);
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public Producer<K, V> createProducer(@Nullable String str) {
        return doCreateProducer(str == null ? this.transactionIdPrefix : str);
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public Producer<K, V> createNonTransactionalProducer() {
        return doCreateProducer(null);
    }

    private Producer<K, V> doCreateProducer(@Nullable String str) {
        CloseSafeProducer<K, V> closeSafeProducer;
        if (str != null) {
            return createTransactionalProducer(str);
        }
        if (this.producerPerThread) {
            return getOrCreateThreadBoundProducer();
        }
        synchronized (this) {
            if (this.producer != null && expire(this.producer)) {
                this.producer = null;
            }
            if (this.producer == null) {
                this.producer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer, this.physicalCloseTimeout, this.beanName, this.epoch.get());
                this.listeners.forEach(listener -> {
                    listener.producerAdded(this.producer.clientId, this.producer);
                });
            }
            closeSafeProducer = this.producer;
        }
        return closeSafeProducer;
    }

    private Producer<K, V> getOrCreateThreadBoundProducer() {
        CloseSafeProducer<K, V> closeSafeProducer = this.threadBoundProducers.get();
        if (closeSafeProducer != null && (this.epoch.get() != closeSafeProducer.epoch || expire(closeSafeProducer))) {
            closeThreadBoundProducer();
            closeSafeProducer = null;
        }
        if (closeSafeProducer == null) {
            closeSafeProducer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer, this.physicalCloseTimeout, this.beanName, this.epoch.get());
            Iterator<ProducerFactory.Listener<K, V>> it = this.listeners.iterator();
            while (it.hasNext()) {
                it.next().producerAdded(closeSafeProducer.clientId, closeSafeProducer);
            }
            this.threadBoundProducers.set(closeSafeProducer);
        }
        return closeSafeProducer;
    }

    protected Producer<K, V> createKafkaProducer() {
        return createRawProducer(getProducerConfigs());
    }

    protected final synchronized boolean removeProducer(CloseSafeProducer<K, V> closeSafeProducer, Duration duration) {
        if (!closeSafeProducer.closed) {
            return false;
        }
        if (closeSafeProducer.equals(this.producer)) {
            this.producer = null;
            closeSafeProducer.closeDelegate(duration, this.listeners);
        }
        this.threadBoundProducers.remove();
        return true;
    }

    protected Producer<K, V> createTransactionalProducer() {
        return createTransactionalProducer(this.transactionIdPrefix);
    }

    protected Producer<K, V> createTransactionalProducer(String str) {
        CloseSafeProducer<K, V> closeSafeProducer;
        BlockingQueue<CloseSafeProducer<K, V>> cache = getCache(str);
        Assert.notNull(cache, (Supplier<String>) () -> {
            return "No cache found for " + str;
        });
        CloseSafeProducer<K, V> poll = cache.poll();
        while (true) {
            closeSafeProducer = poll;
            if (closeSafeProducer == null || !expire(closeSafeProducer)) {
                break;
            }
            poll = cache.poll();
        }
        return closeSafeProducer == null ? doCreateTxProducer(str, this.transactionIdSuffix.getAndIncrement(), this::cacheReturner) : closeSafeProducer;
    }

    private boolean expire(CloseSafeProducer<K, V> closeSafeProducer) {
        boolean z = this.maxAge > 0 && System.currentTimeMillis() - closeSafeProducer.created > this.maxAge;
        if (z) {
            closeSafeProducer.closeDelegate(this.physicalCloseTimeout, this.listeners);
        }
        return z;
    }

    boolean cacheReturner(CloseSafeProducer<K, V> closeSafeProducer, Duration duration) {
        if (closeSafeProducer.closed) {
            closeSafeProducer.closeDelegate(duration, this.listeners);
            return true;
        }
        synchronized (this.cache) {
            BlockingQueue<CloseSafeProducer<K, V>> cache = getCache(closeSafeProducer.txIdPrefix);
            if (closeSafeProducer.epoch == this.epoch.get() && (cache == null || cache.contains(closeSafeProducer) || cache.offer(closeSafeProducer))) {
                return false;
            }
            closeSafeProducer.closeDelegate(duration, this.listeners);
            return true;
        }
    }

    private CloseSafeProducer<K, V> doCreateTxProducer(String str, String str2, BiPredicate<CloseSafeProducer<K, V>, Duration> biPredicate) {
        Producer<K, V> createRawProducer = createRawProducer(getTxProducerConfigs(str + str2));
        try {
            createRawProducer.initTransactions();
            CloseSafeProducer<K, V> closeSafeProducer = new CloseSafeProducer<>(createRawProducer, biPredicate, str, this.physicalCloseTimeout, this.beanName, this.epoch.get());
            this.listeners.forEach(listener -> {
                listener.producerAdded(closeSafeProducer.clientId, closeSafeProducer);
            });
            return closeSafeProducer;
        } catch (RuntimeException e) {
            try {
                createRawProducer.close(this.physicalCloseTimeout);
                throw new KafkaException("initTransactions() failed", e);
            } catch (RuntimeException e2) {
                KafkaException kafkaException = new KafkaException("initTransactions() failed and then close() failed", e);
                kafkaException.addSuppressed(e2);
                throw kafkaException;
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v12, types: [org.apache.kafka.clients.producer.Producer] */
    protected Producer<K, V> createRawProducer(Map<String, Object> map) {
        KafkaProducer kafkaProducer = new KafkaProducer(map, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());
        Iterator<ProducerPostProcessor<K, V>> it = this.postProcessors.iterator();
        while (it.hasNext()) {
            kafkaProducer = it.next().apply(kafkaProducer);
        }
        return kafkaProducer;
    }

    @Nullable
    protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
        return getCache(this.transactionIdPrefix);
    }

    @Nullable
    protected BlockingQueue<CloseSafeProducer<K, V>> getCache(String str) {
        if (str == null) {
            return null;
        }
        return this.cache.computeIfAbsent(str, str2 -> {
            return new LinkedBlockingQueue();
        });
    }

    @Override // org.springframework.kafka.core.ProducerFactory
    public void closeThreadBoundProducer() {
        CloseSafeProducer<K, V> closeSafeProducer = this.threadBoundProducers.get();
        if (closeSafeProducer != null) {
            this.threadBoundProducers.remove();
            closeSafeProducer.closeDelegate(this.physicalCloseTimeout, this.listeners);
        }
    }

    protected Map<String, Object> getProducerConfigs() {
        Map<String, Object> hashMap = new HashMap<>(this.configs);
        checkBootstrap(hashMap);
        if (this.clientIdPrefix != null) {
            hashMap.put("client.id", this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
        }
        return hashMap;
    }

    protected Map<String, Object> getTxProducerConfigs(String str) {
        Map<String, Object> producerConfigs = getProducerConfigs();
        producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, str);
        return producerConfigs;
    }
}
