package org.apache.camel.component.kafka;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.errors.InterruptException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-396-02.zip:modules/system/layers/fuse/org/apache/camel/component/kafka/main/camel-kafka-2.17.0.redhat-630396-02.jar:org/apache/camel/component/kafka/KafkaConsumer.class */
public class KafkaConsumer extends DefaultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
    protected ExecutorService executor;
    private final KafkaEndpoint endpoint;
    private final Processor processor;

    /* loaded from: input_file:META-INF/repository/fuse-eap-distro-6.3.0.redhat-396-02.zip:modules/system/layers/fuse/org/apache/camel/component/kafka/main/camel-kafka-2.17.0.redhat-630396-02.jar:org/apache/camel/component/kafka/KafkaConsumer$KafkaFetchRecords.class */
    class KafkaFetchRecords implements Runnable {
        private final org.apache.kafka.clients.consumer.KafkaConsumer consumer;
        private final String topicName;
        private final String threadId;
        private final Properties kafkaProps;

        KafkaFetchRecords(String str, String str2, Properties properties) {
            this.topicName = str;
            this.threadId = str + "-Thread " + str2;
            this.kafkaProps = properties;
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            try {
                Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
                this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(properties);
                Thread.currentThread().setContextClassLoader(contextClassLoader);
            } catch (Throwable th) {
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                throw th;
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            int i = 0;
            try {
                try {
                    KafkaConsumer.LOG.debug("Subscribing {} to topic {}", this.threadId, this.topicName);
                    this.consumer.subscribe(Arrays.asList(this.topicName.split(",")));
                    while (KafkaConsumer.this.isRunAllowed() && !KafkaConsumer.this.isSuspendingOrSuspended()) {
                        Iterator it = this.consumer.poll(Long.MAX_VALUE).iterator();
                        while (it.hasNext()) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                            if (KafkaConsumer.LOG.isTraceEnabled()) {
                                KafkaConsumer.LOG.trace("offset = {}, key = {}, value = {}", new Object[]{Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value()});
                            }
                            Exchange createKafkaExchange = KafkaConsumer.this.endpoint.createKafkaExchange(consumerRecord);
                            try {
                                KafkaConsumer.this.processor.process(createKafkaExchange);
                            } catch (Exception e) {
                                KafkaConsumer.this.getExceptionHandler().handleException("Error during processing", createKafkaExchange, e);
                            }
                            i++;
                            if (KafkaConsumer.this.endpoint.isAutoCommitEnable() != null && !KafkaConsumer.this.endpoint.isAutoCommitEnable().booleanValue() && i >= KafkaConsumer.this.endpoint.getBatchSize()) {
                                this.consumer.commitSync();
                                i = 0;
                            }
                        }
                    }
                    KafkaConsumer.LOG.debug("Unsubscribing {} from topic {}", this.threadId, this.topicName);
                    this.consumer.unsubscribe();
                    KafkaConsumer.LOG.debug("Closing {} ", this.threadId);
                    this.consumer.close();
                    KafkaConsumer.LOG.debug("Closing {} ", this.threadId);
                    this.consumer.close();
                } catch (Throwable th) {
                    KafkaConsumer.LOG.debug("Closing {} ", this.threadId);
                    this.consumer.close();
                    throw th;
                }
            } catch (InterruptException e2) {
                KafkaConsumer.this.getExceptionHandler().handleException("Interrupted while consuming " + this.threadId + " from kafka topic", e2);
                this.consumer.unsubscribe();
                Thread.currentThread().interrupt();
                KafkaConsumer.LOG.debug("Closing {} ", this.threadId);
                this.consumer.close();
            } catch (Exception e3) {
                KafkaConsumer.this.getExceptionHandler().handleException("Error consuming " + this.threadId + " from kafka topic", e3);
                KafkaConsumer.LOG.debug("Closing {} ", this.threadId);
                this.consumer.close();
            }
        }
    }

    public KafkaConsumer(KafkaEndpoint kafkaEndpoint, Processor processor) {
        super(kafkaEndpoint, processor);
        this.endpoint = kafkaEndpoint;
        this.processor = processor;
        if (kafkaEndpoint.getBrokers() == null) {
            throw new IllegalArgumentException("BootStrap servers must be specified");
        }
        if (kafkaEndpoint.getGroupId() == null) {
            throw new IllegalArgumentException("groupId must not be null");
        }
    }

    Properties getProps() {
        Properties createConsumerProperties = this.endpoint.getConfiguration().createConsumerProperties();
        this.endpoint.updateClassProperties(createConsumerProperties);
        createConsumerProperties.put("bootstrap.servers", this.endpoint.getBrokers());
        createConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, this.endpoint.getGroupId());
        return createConsumerProperties;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.impl.DefaultConsumer, org.apache.camel.support.ServiceSupport
    public void doStart() throws Exception {
        super.doStart();
        LOG.info("Starting Kafka consumer");
        this.executor = this.endpoint.createExecutor();
        for (int i = 0; i < this.endpoint.getConsumersCount(); i++) {
            this.executor.submit(new KafkaFetchRecords(this.endpoint.getTopic(), i + "", getProps()));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.impl.DefaultConsumer, org.apache.camel.support.ServiceSupport
    public void doStop() throws Exception {
        super.doStop();
        LOG.info("Stopping Kafka consumer");
        if (this.executor != null) {
            if (getEndpoint() == null || getEndpoint().getCamelContext() == null) {
                this.executor.shutdownNow();
            } else {
                getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(this.executor);
            }
        }
        this.executor = null;
    }
}
