package org.apache.camel.component.file;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.apache.camel.AsyncCallback;
import org.apache.camel.BatchConsumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.impl.ScheduledPollConsumer;
import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StopWatch;
import org.apache.camel.util.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/camel-core-2.9.0.fuse-7-037.jar:org/apache/camel/component/file/GenericFileConsumer.class */
public abstract class GenericFileConsumer<T> extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware {
    protected final transient Logger log;
    protected GenericFileEndpoint<T> endpoint;
    protected GenericFileOperations<T> operations;
    protected boolean loggedIn;
    protected String fileExpressionResult;
    protected int maxMessagesPerPoll;
    protected volatile ShutdownRunningTask shutdownRunningTask;
    protected volatile int pendingExchanges;
    protected Processor customProcessor;

    public GenericFileConsumer(GenericFileEndpoint<T> genericFileEndpoint, Processor processor, GenericFileOperations<T> genericFileOperations) {
        super(genericFileEndpoint, processor);
        this.log = LoggerFactory.getLogger(getClass());
        this.endpoint = genericFileEndpoint;
        this.operations = genericFileOperations;
    }

    public Processor getCustomProcessor() {
        return this.customProcessor;
    }

    public void setCustomProcessor(Processor processor) {
        this.customProcessor = processor;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.impl.ScheduledPollConsumer
    public int poll() throws Exception {
        this.fileExpressionResult = null;
        this.shutdownRunningTask = null;
        this.pendingExchanges = 0;
        if (!prePollCheck()) {
            this.log.debug("Skipping poll as pre poll check returned false");
            return 0;
        }
        ArrayList arrayList = new ArrayList();
        String directory = this.endpoint.getConfiguration().getDirectory();
        StopWatch stopWatch = new StopWatch();
        boolean z = !pollDirectory(directory, arrayList, 0);
        long stop = stopWatch.stop();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Took {} to poll: {}", TimeUtils.printDuration(stop), directory);
        }
        if (z) {
            this.log.debug("Limiting maximum messages to poll at {} files as there was more messages in this poll.", Integer.valueOf(this.maxMessagesPerPoll));
        }
        if (this.endpoint.getSorter() != null) {
            Collections.sort(arrayList, this.endpoint.getSorter());
        }
        LinkedList linkedList = new LinkedList();
        for (GenericFile<T> genericFile : arrayList) {
            Exchange createExchange = this.endpoint.createExchange(genericFile);
            this.endpoint.configureExchange(createExchange);
            this.endpoint.configureMessage(genericFile, createExchange.getIn());
            linkedList.add(createExchange);
        }
        if (this.endpoint.getSortBy() != null) {
            Collections.sort(linkedList, this.endpoint.getSortBy());
        }
        int size = linkedList.size();
        if (size > 0) {
            this.log.debug("Total {} files to consume", Integer.valueOf(size));
        }
        int processBatch = processBatch(CastUtils.cast((Queue<?>) linkedList));
        postPollCheck();
        return processBatch;
    }

    @Override // org.apache.camel.BatchConsumer
    public void setMaxMessagesPerPoll(int i) {
        this.maxMessagesPerPoll = i;
    }

    @Override // org.apache.camel.BatchConsumer
    public int processBatch(Queue<Object> queue) {
        int size = queue.size();
        if (this.maxMessagesPerPoll > 0 && size > this.maxMessagesPerPoll) {
            this.log.debug("Limiting to maximum messages to poll {} as there was {} messages in this poll.", Integer.valueOf(this.maxMessagesPerPoll), Integer.valueOf(size));
            size = this.maxMessagesPerPoll;
        }
        int i = 0;
        while (i < size && isBatchAllowed()) {
            Exchange exchange = (Exchange) queue.poll();
            exchange.setProperty(Exchange.BATCH_INDEX, Integer.valueOf(i));
            exchange.setProperty(Exchange.BATCH_SIZE, Integer.valueOf(size));
            exchange.setProperty(Exchange.BATCH_COMPLETE, Boolean.valueOf(i == size - 1));
            this.pendingExchanges = (size - i) - 1;
            if (this.customProcessor != null) {
                customProcessExchange(exchange, this.customProcessor);
            } else {
                processExchange(exchange);
            }
            i++;
        }
        while (queue.size() > 0) {
            this.endpoint.getInProgressRepository().remove(((GenericFile) ((Exchange) queue.poll()).getProperty(FileComponent.FILE_EXCHANGE_FILE)).getAbsoluteFilePath());
        }
        return size;
    }

    @Override // org.apache.camel.spi.ShutdownAware
    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
        this.shutdownRunningTask = shutdownRunningTask;
        return false;
    }

    @Override // org.apache.camel.spi.ShutdownAware
    public int getPendingExchangesSize() {
        int i = ShutdownRunningTask.CompleteAllTasks == this.shutdownRunningTask ? this.pendingExchanges : 0;
        if (i == 0 && isPolling()) {
            this.log.trace("Currently polling so returning 1 as pending exchanges");
            i = 1;
        }
        return i;
    }

    @Override // org.apache.camel.spi.ShutdownAware
    public void prepareShutdown() {
    }

    @Override // org.apache.camel.BatchConsumer
    public boolean isBatchAllowed() {
        if (isRunAllowed()) {
            return this.shutdownRunningTask == null || ShutdownRunningTask.CompleteAllTasks == this.shutdownRunningTask;
        }
        return false;
    }

    public boolean canPollMoreFiles(List<?> list) {
        return this.maxMessagesPerPoll <= 0 || list.size() < this.maxMessagesPerPoll;
    }

    protected boolean prePollCheck() throws Exception {
        return true;
    }

    protected void postPollCheck() {
    }

    protected abstract boolean pollDirectory(String str, List<GenericFile<T>> list, int i);

    public void setOperations(GenericFileOperations<T> genericFileOperations) {
        this.operations = genericFileOperations;
    }

    protected void processExchange(Exchange exchange) {
        GenericFile<T> exchangeFileProperty = getExchangeFileProperty(exchange);
        this.log.trace("Processing file: {}", exchangeFileProperty);
        String absoluteFilePath = exchangeFileProperty.getAbsoluteFilePath();
        try {
            if (!this.endpoint.getGenericFileProcessStrategy().begin(this.operations, this.endpoint, exchange, exchangeFileProperty)) {
                this.log.debug(this.endpoint + " cannot begin processing file: {}", exchangeFileProperty);
                this.endpoint.getInProgressRepository().remove(absoluteFilePath);
                return;
            }
            final GenericFile<T> exchangeFileProperty2 = getExchangeFileProperty(exchange);
            String absoluteFilePath2 = exchangeFileProperty2.getAbsoluteFilePath();
            try {
                this.log.trace("Retrieving file: {} from: {}", absoluteFilePath2, this.endpoint);
                if (!this.operations.retrieveFile(absoluteFilePath2, exchange)) {
                    throw new GenericFileOperationFailedException("Cannot retrieve file: " + exchangeFileProperty + " from: " + this.endpoint);
                }
                this.log.trace("Retrieved file: {} from: {}", absoluteFilePath2, this.endpoint);
                exchange.addOnCompletion(new GenericFileOnCompletion(this.endpoint, this.operations, exchangeFileProperty2, absoluteFilePath));
                this.log.debug("About to process file: {} using exchange: {}", exchangeFileProperty2, exchange);
                getAsyncProcessor().process(exchange, new AsyncCallback() { // from class: org.apache.camel.component.file.GenericFileConsumer.1
                    @Override // org.apache.camel.AsyncCallback
                    public void done(boolean z) {
                        if (GenericFileConsumer.this.log.isTraceEnabled()) {
                            GenericFileConsumer.this.log.trace("Done processing file: {} {}", exchangeFileProperty2, z ? "synchronously" : "asynchronously");
                        }
                    }
                });
            } catch (Exception e) {
                this.endpoint.getInProgressRepository().remove(absoluteFilePath);
                handleException(e);
            }
        } catch (Exception e2) {
            if (this.log.isDebugEnabled()) {
                this.log.debug(this.endpoint + " cannot begin processing file: " + exchangeFileProperty + " due to: " + e2.getMessage(), e2);
            }
            this.endpoint.getInProgressRepository().remove(absoluteFilePath);
        }
    }

    protected void customProcessExchange(Exchange exchange, Processor processor) {
        GenericFile<T> exchangeFileProperty = getExchangeFileProperty(exchange);
        this.log.trace("Custom processing file: {}", exchangeFileProperty);
        String absoluteFilePath = exchangeFileProperty.getAbsoluteFilePath();
        try {
            try {
                processor.process(exchange);
                this.endpoint.getInProgressRepository().remove(absoluteFilePath);
            } catch (Exception e) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug(this.endpoint + " error custom processing: " + exchangeFileProperty + " due to: " + e.getMessage() + ". This exception will be ignored.", e);
                }
                this.endpoint.getInProgressRepository().remove(absoluteFilePath);
            }
        } catch (Throwable th) {
            this.endpoint.getInProgressRepository().remove(absoluteFilePath);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isValidFile(GenericFile<T> genericFile, boolean z) {
        if (!isMatched(genericFile, z)) {
            this.log.trace("File did not match. Will skip this file: {}", genericFile);
            return false;
        }
        if (!this.endpoint.isIdempotent().booleanValue() || !this.endpoint.getIdempotentRepository().contains(genericFile.getAbsoluteFilePath())) {
            return true;
        }
        this.log.trace("This consumer is idempotent and the file has been consumed before. Will skip this file: {}", genericFile);
        return false;
    }

    protected boolean isMatched(GenericFile<T> genericFile, boolean z) {
        String fileNameOnly = genericFile.getFileNameOnly();
        if (fileNameOnly.startsWith(".") || fileNameOnly.endsWith(FileComponent.DEFAULT_LOCK_FILE_POSTFIX)) {
            return false;
        }
        if (z) {
            return true;
        }
        if (this.endpoint.getFilter() != null && !this.endpoint.getFilter().accept(genericFile)) {
            return false;
        }
        if (ObjectHelper.isNotEmpty(this.endpoint.getExclude()) && fileNameOnly.matches(this.endpoint.getExclude())) {
            return false;
        }
        if (ObjectHelper.isNotEmpty(this.endpoint.getInclude()) && !fileNameOnly.matches(this.endpoint.getInclude())) {
            return false;
        }
        if (this.endpoint.getFileName() != null) {
            evaluateFileExpression();
            if (this.fileExpressionResult != null && !fileNameOnly.equals(this.fileExpressionResult)) {
                return false;
            }
        }
        if (this.endpoint.getDoneFileName() == null) {
            return true;
        }
        String createDoneFileName = this.endpoint.createDoneFileName(genericFile.getAbsoluteFilePath());
        ObjectHelper.notEmpty(createDoneFileName, "doneFileName", this.endpoint);
        if (!this.endpoint.isDoneFile(genericFile.getFileNameOnly())) {
            return isMatched(genericFile, createDoneFileName);
        }
        this.log.trace("Skipping done file: {}", genericFile);
        return false;
    }

    protected boolean isMatched(GenericFile<T> genericFile, String str) {
        if (this.operations.existsFile(str)) {
            return true;
        }
        this.log.trace("Done file: {} does not exist", str);
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isInProgress(GenericFile<T> genericFile) {
        return !this.endpoint.getInProgressRepository().add(genericFile.getAbsoluteFilePath());
    }

    private void evaluateFileExpression() {
        if (this.fileExpressionResult == null) {
            this.fileExpressionResult = (String) this.endpoint.getFileName().evaluate(this.endpoint.createExchange(), String.class);
        }
    }

    private GenericFile<T> getExchangeFileProperty(Exchange exchange) {
        return (GenericFile) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
    }

    @Override // org.apache.camel.impl.ScheduledPollConsumer, org.apache.camel.impl.DefaultConsumer, org.apache.camel.support.ServiceSupport
    protected void doStart() throws Exception {
        super.doStart();
        this.endpoint.getGenericFileProcessStrategy().prepareOnStartup(this.operations, this.endpoint);
    }
}
