/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.kafka.KafkaEndpoint;
import org.apache.camel.impl.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumer
extends DefaultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
    protected ExecutorService executor;
    private final KafkaEndpoint endpoint;
    private final Processor processor;
    private Map<ConsumerConnector, CyclicBarrier> consumerBarriers;

    public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
        super((Endpoint)endpoint, processor);
        this.endpoint = endpoint;
        this.processor = processor;
        this.consumerBarriers = new HashMap<ConsumerConnector, CyclicBarrier>();
        if (endpoint.getZookeeperConnect() == null) {
            throw new IllegalArgumentException("zookeeper host or zookeeper connect must be specified");
        }
        if (endpoint.getGroupId() == null) {
            throw new IllegalArgumentException("groupId must not be null");
        }
    }

    Properties getProps() {
        Properties props = this.endpoint.getConfiguration().createConsumerProperties();
        props.put("zookeeper.connect", this.endpoint.getZookeeperConnect());
        props.put("group.id", this.endpoint.getGroupId());
        return props;
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.log.info("Starting Kafka consumer");
        this.executor = this.endpoint.createExecutor();
        for (int i = 0; i < this.endpoint.getConsumersCount(); ++i) {
            ConsumerConnector consumer = Consumer.createJavaConsumerConnector((ConsumerConfig)new ConsumerConfig(this.getProps()));
            HashMap<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(this.endpoint.getTopic(), this.endpoint.getConsumerStreams());
            Map consumerMap = consumer.createMessageStreams(topicCountMap);
            List streams = (List)consumerMap.get(this.endpoint.getTopic());
            if (this.endpoint.isAutoCommitEnable() != null && !this.endpoint.isAutoCommitEnable().booleanValue()) {
                if ((this.endpoint.getConsumerTimeoutMs() == null || this.endpoint.getConsumerTimeoutMs() < 0) && this.endpoint.getConsumerStreams() > 1) {
                    LOG.warn("consumerTimeoutMs is set to -1 (infinite) while requested multiple consumer streams.");
                }
                CyclicBarrier barrier = new CyclicBarrier(this.endpoint.getConsumerStreams(), new CommitOffsetTask(consumer));
                for (KafkaStream stream : streams) {
                    this.executor.submit(new BatchingConsumerTask((KafkaStream<byte[], byte[]>)stream, barrier));
                }
                this.consumerBarriers.put(consumer, barrier);
                continue;
            }
            for (KafkaStream stream : streams) {
                this.executor.submit(new AutoCommitConsumerTask((KafkaStream<byte[], byte[]>)stream));
            }
            this.consumerBarriers.put(consumer, null);
        }
    }

    protected void doStop() throws Exception {
        super.doStop();
        this.log.info("Stopping Kafka consumer");
        for (ConsumerConnector consumer : this.consumerBarriers.keySet()) {
            if (consumer == null) continue;
            consumer.shutdown();
        }
        if (this.executor != null) {
            if (this.getEndpoint() != null && this.getEndpoint().getCamelContext() != null) {
                this.getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(this.executor);
            } else {
                this.executor.shutdownNow();
            }
        }
        this.executor = null;
    }

    class AutoCommitConsumerTask
    implements Runnable {
        private KafkaStream<byte[], byte[]> stream;

        public AutoCommitConsumerTask(KafkaStream<byte[], byte[]> stream) {
            this.stream = stream;
        }

        @Override
        public void run() {
            ConsumerIterator it = this.stream.iterator();
            while (KafkaConsumer.this.isRunAllowed() && it.hasNext()) {
                MessageAndMetadata mm = it.next();
                Exchange exchange = KafkaConsumer.this.endpoint.createKafkaExchange((MessageAndMetadata<byte[], byte[]>)mm);
                try {
                    KafkaConsumer.this.processor.process(exchange);
                }
                catch (Exception e) {
                    KafkaConsumer.this.getExceptionHandler().handleException("Error during processing", exchange, (Throwable)e);
                }
            }
        }
    }

    class CommitOffsetTask
    implements Runnable {
        private ConsumerConnector consumer;

        public CommitOffsetTask(ConsumerConnector consumer) {
            this.consumer = consumer;
        }

        @Override
        public void run() {
            this.consumer.commitOffsets();
        }
    }

    class BatchingConsumerTask
    implements Runnable {
        private KafkaStream<byte[], byte[]> stream;
        private CyclicBarrier berrier;

        public BatchingConsumerTask(KafkaStream<byte[], byte[]> stream, CyclicBarrier berrier) {
            this.stream = stream;
            this.berrier = berrier;
        }

        @Override
        public void run() {
            int processed = 0;
            ConsumerIterator it = this.stream.iterator();
            boolean hasNext = true;
            while (hasNext) {
                boolean consumerTimeout;
                block8: {
                    try {
                        consumerTimeout = false;
                        if (KafkaConsumer.this.isRunAllowed() && !KafkaConsumer.this.isSuspendingOrSuspended() && it.hasNext()) {
                            MessageAndMetadata mm = it.next();
                            Exchange exchange = KafkaConsumer.this.endpoint.createKafkaExchange((MessageAndMetadata<byte[], byte[]>)mm);
                            try {
                                KafkaConsumer.this.processor.process(exchange);
                            }
                            catch (Exception e) {
                                LOG.error(e.getMessage(), (Throwable)e);
                            }
                            ++processed;
                            break block8;
                        }
                        hasNext = false;
                    }
                    catch (ConsumerTimeoutException e) {
                        LOG.debug("Consumer timeout occurred due " + e.getMessage(), (Throwable)e);
                        consumerTimeout = true;
                    }
                }
                if (processed < KafkaConsumer.this.endpoint.getBatchSize() && !consumerTimeout && (processed <= 0 || hasNext)) continue;
                try {
                    this.berrier.await(KafkaConsumer.this.endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS);
                    if (consumerTimeout) continue;
                    processed = 0;
                }
                catch (Exception e) {
                    KafkaConsumer.this.getExceptionHandler().handleException("Error waiting for batch to complete", (Throwable)e);
                    break;
                }
            }
        }
    }
}

