package org.apache.camel.processor;

import java.util.Collection;
import java.util.Iterator;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
import org.apache.camel.impl.LoggingExceptionHandler;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:WEB-INF/lib/camel-core-1.4.4.0-fuse.jar:org/apache/camel/processor/BatchProcessor.class */
public class BatchProcessor extends ServiceSupport implements Runnable, Processor {
    public static final long DEFAULT_BATCH_TIMEOUT = 1000;
    public static final int DEFAULT_BATCH_SIZE = 100;
    private static final transient Log LOG = LogFactory.getLog(BatchProcessor.class);
    private Endpoint endpoint;
    private Processor processor;
    private Collection<Exchange> collection;
    private long batchTimeout = 1000;
    private int batchSize = 100;
    private int outBatchSize;
    private PollingConsumer consumer;
    private ExceptionHandler exceptionHandler;

    public BatchProcessor(Endpoint endpoint, Processor processor, Collection<Exchange> collection) {
        this.endpoint = endpoint;
        this.processor = processor;
        this.collection = collection;
    }

    public String toString() {
        return "BatchProcessor[to: " + this.processor + "]";
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.debug("Starting thread for " + this);
        while (isRunAllowed()) {
            try {
                processBatch();
            } catch (Exception e) {
                getExceptionHandler().handleException(e);
            }
        }
        this.collection.clear();
    }

    public ExceptionHandler getExceptionHandler() {
        if (this.exceptionHandler == null) {
            this.exceptionHandler = new LoggingExceptionHandler(getClass());
        }
        return this.exceptionHandler;
    }

    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
        this.exceptionHandler = exceptionHandler;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public void setBatchSize(int i) {
        this.batchSize = i;
    }

    public int getOutBatchSize() {
        return this.outBatchSize;
    }

    public void setOutBatchSize(int i) {
        this.outBatchSize = i;
    }

    public long getBatchTimeout() {
        return this.batchTimeout;
    }

    public void setBatchTimeout(long j) {
        this.batchTimeout = j;
    }

    public Endpoint getEndpoint() {
        return this.endpoint;
    }

    public Processor getProcessor() {
        return this.processor;
    }

    protected synchronized void processBatch() throws Exception {
        long currentTimeMillis = System.currentTimeMillis() + this.batchTimeout;
        int i = 0;
        while (true) {
            if (isBatchCompleted(i)) {
                break;
            }
            long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis();
            if (currentTimeMillis2 >= 0) {
                Exchange receive = this.consumer.receive(currentTimeMillis2);
                if (receive != null) {
                    this.collection.add(receive);
                    i++;
                } else if (LOG.isTraceEnabled()) {
                    LOG.trace("receive with timeout: " + currentTimeMillis2 + " expired at batch index: " + i);
                }
            } else if (LOG.isTraceEnabled()) {
                LOG.trace("batch timeout expired at batch index: " + i);
            }
        }
        Iterator<Exchange> it = this.collection.iterator();
        while (it.hasNext()) {
            Exchange next = it.next();
            it.remove();
            processExchange(next);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isBatchCompleted(int i) {
        return (this.outBatchSize > 0 && this.collection.size() >= this.outBatchSize) || i >= this.batchSize;
    }

    protected void processExchange(Exchange exchange) throws Exception {
        this.processor.process(exchange);
    }

    @Override // org.apache.camel.impl.ServiceSupport
    protected void doStart() throws Exception {
        this.consumer = this.endpoint.createPollingConsumer();
        ServiceHelper.startServices(this.processor, this.consumer);
        new Thread(this, this + " Polling Thread").start();
    }

    @Override // org.apache.camel.impl.ServiceSupport
    protected void doStop() throws Exception {
        ServiceHelper.stopServices(this.consumer, this.processor);
        this.collection.clear();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Collection<Exchange> getCollection() {
        return this.collection;
    }

    @Override // org.apache.camel.Processor
    public void process(Exchange exchange) throws Exception {
    }
}
