package org.apache.camel.component.kafka;

import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelException;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-310-12.zip:modules/system/layers/fuse/org/apache/camel/component/kafka/main/camel-kafka-2.17.0.redhat-630310-12.jar:org/apache/camel/component/kafka/KafkaProducer.class */
public class KafkaProducer extends DefaultAsyncProducer {
    private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
    private final KafkaEndpoint endpoint;
    private ExecutorService workerPool;
    private boolean shutdownWorkerPool;

    /* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-310-12.zip:modules/system/layers/fuse/org/apache/camel/component/kafka/main/camel-kafka-2.17.0.redhat-630310-12.jar:org/apache/camel/component/kafka/KafkaProducer$KafkaProducerCallBack.class */
    private final class KafkaProducerCallBack implements Callback {
        private final Exchange exchange;
        private final AsyncCallback callback;
        private final AtomicInteger count = new AtomicInteger(1);

        KafkaProducerCallBack(Exchange exchange, AsyncCallback asyncCallback) {
            this.exchange = exchange;
            this.callback = asyncCallback;
        }

        void increment() {
            this.count.incrementAndGet();
        }

        boolean allSent() {
            if (this.count.decrementAndGet() != 0) {
                return false;
            }
            this.callback.done(true);
            return true;
        }

        @Override // org.apache.kafka.clients.producer.Callback
        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc != null) {
                this.exchange.setException(exc);
            }
            if (this.count.decrementAndGet() == 0) {
                KafkaProducer.this.workerPool.submit(new Runnable() { // from class: org.apache.camel.component.kafka.KafkaProducer.KafkaProducerCallBack.1
                    @Override // java.lang.Runnable
                    public void run() {
                        KafkaProducerCallBack.this.callback.done(false);
                    }
                });
            }
        }
    }

    public KafkaProducer(KafkaEndpoint kafkaEndpoint) {
        super(kafkaEndpoint);
        this.endpoint = kafkaEndpoint;
    }

    Properties getProps() {
        Properties createProducerProperties = this.endpoint.getConfiguration().createProducerProperties();
        this.endpoint.updateClassProperties(createProducerProperties);
        if (this.endpoint.getBrokers() != null) {
            createProducerProperties.put("bootstrap.servers", this.endpoint.getBrokers());
        }
        return createProducerProperties;
    }

    public org.apache.kafka.clients.producer.KafkaProducer getKafkaProducer() {
        return this.kafkaProducer;
    }

    public void setKafkaProducer(org.apache.kafka.clients.producer.KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    public ExecutorService getWorkerPool() {
        return this.workerPool;
    }

    public void setWorkerPool(ExecutorService executorService) {
        this.workerPool = executorService;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.impl.DefaultProducer, org.apache.camel.support.ServiceSupport
    public void doStart() throws Exception {
        Properties props = getProps();
        if (this.kafkaProducer == null) {
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            try {
                Thread.currentThread().setContextClassLoader(null);
                this.kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer(props);
                Thread.currentThread().setContextClassLoader(contextClassLoader);
            } catch (Throwable th) {
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                throw th;
            }
        }
        if (this.endpoint.isSynchronous() || this.workerPool != null) {
            return;
        }
        this.workerPool = this.endpoint.createProducerExecutor();
        this.shutdownWorkerPool = true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.impl.DefaultProducer, org.apache.camel.support.ServiceSupport
    public void doStop() throws Exception {
        if (this.kafkaProducer != null) {
            this.kafkaProducer.close();
        }
        if (!this.shutdownWorkerPool || this.workerPool == null) {
            return;
        }
        this.endpoint.getCamelContext().getExecutorServiceManager().shutdown(this.workerPool);
        this.workerPool = null;
    }

    protected Iterator<ProducerRecord> createRecorder(Exchange exchange) throws CamelException {
        ProducerRecord producerRecord;
        String topic = this.endpoint.getTopic();
        if (!this.endpoint.isBridgeEndpoint()) {
            topic = (String) exchange.getIn().getHeader(KafkaConstants.TOPIC, topic, String.class);
        }
        if (topic == null) {
            throw new CamelExchangeException("No topic key set", exchange);
        }
        final Object header = exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY);
        final boolean z = header != null;
        final Object header2 = exchange.getIn().getHeader(KafkaConstants.KEY);
        final boolean z2 = header2 != null;
        Object body = exchange.getIn().getBody();
        Iterator it = null;
        if (body instanceof Iterable) {
            it = ((Iterable) body).iterator();
        } else if (body instanceof Iterator) {
            it = (Iterator) body;
        }
        if (it != null) {
            final Iterator it2 = it;
            final String str = topic;
            return new Iterator<ProducerRecord>() { // from class: org.apache.camel.component.kafka.KafkaProducer.1
                @Override // java.util.Iterator
                public boolean hasNext() {
                    return it2.hasNext();
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.Iterator
                public ProducerRecord next() {
                    return (z && z2) ? new ProducerRecord(str, new Integer(header.toString()), header2, it2.next()) : z2 ? new ProducerRecord(str, header2, it2.next()) : new ProducerRecord(str, it2.next());
                }

                @Override // java.util.Iterator
                public void remove() {
                    it2.remove();
                }
            };
        }
        if (z && z2) {
            producerRecord = new ProducerRecord(topic, new Integer(header.toString()), header2, body);
        } else if (z2) {
            producerRecord = new ProducerRecord(topic, header2, body);
        } else {
            this.log.warn("No message key or partition key set");
            producerRecord = new ProducerRecord(topic, body);
        }
        return Collections.singletonList(producerRecord).iterator();
    }

    @Override // org.apache.camel.impl.DefaultAsyncProducer, org.apache.camel.Processor
    public void process(Exchange exchange) throws Exception {
        Iterator<ProducerRecord> createRecorder = createRecorder(exchange);
        LinkedList linkedList = new LinkedList();
        while (createRecorder.hasNext()) {
            linkedList.add(this.kafkaProducer.send(createRecorder.next()));
        }
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
    }

    @Override // org.apache.camel.AsyncProcessor
    public boolean process(Exchange exchange, AsyncCallback asyncCallback) {
        try {
            Iterator<ProducerRecord> createRecorder = createRecorder(exchange);
            KafkaProducerCallBack kafkaProducerCallBack = new KafkaProducerCallBack(exchange, asyncCallback);
            while (createRecorder.hasNext()) {
                kafkaProducerCallBack.increment();
                this.kafkaProducer.send(createRecorder.next(), kafkaProducerCallBack);
            }
            return kafkaProducerCallBack.allSent();
        } catch (Exception e) {
            exchange.setException(e);
            asyncCallback.done(true);
            return true;
        }
    }
}
