001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.file;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.io.RandomAccessFile;
022    import java.nio.channels.FileChannel;
023    import java.nio.channels.FileLock;
024    import java.util.concurrent.ConcurrentHashMap;
025    
026    import org.apache.camel.AsyncCallback;
027    import org.apache.camel.Processor;
028    import org.apache.camel.impl.ScheduledPollConsumer;
029    import org.apache.camel.processor.DeadLetterChannel;
030    import org.apache.camel.util.LRUCache;
031    import org.apache.camel.util.ObjectHelper;
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    
035    /**
036     * For consuming files.
037     *
038     * @version $Revision: 1925 $
039     */
040    public class FileConsumer extends ScheduledPollConsumer<FileExchange> {
041        private static final transient Log LOG = LogFactory.getLog(FileConsumer.class);
042    
043        private FileEndpoint endpoint;
044        private ConcurrentHashMap<File, File> filesBeingProcessed = new ConcurrentHashMap<File, File>();
045        private ConcurrentHashMap<File, Long> fileSizes = new ConcurrentHashMap<File, Long>(new LRUCache(1000));
046        private ConcurrentHashMap<File, Long> noopMap = new ConcurrentHashMap<File, Long>(new LRUCache(1000));
047    
048        // the options below is @deprecated and will be removed in Camel 2.0
049        private long lastPollTime;
050        private int unchangedDelay;
051        private boolean unchangedSize;
052        private boolean generateEmptyExchangeWhenIdle;
053        private boolean alwaysConsume;
054    
055        private boolean recursive;
056        private String regexPattern = "";
057        private boolean exclusiveReadLock = true;
058    
059        public FileConsumer(final FileEndpoint endpoint, Processor processor) {
060            super(endpoint, processor);
061            this.endpoint = endpoint;
062        }
063    
064        protected synchronized void poll() throws Exception {
065            // should be true the first time as its the top directory
066            int rc = pollFileOrDirectory(endpoint.getFile(), true);
067    
068            // if no files consumes and using generateEmptyExchangeWhenIdle option then process an empty exchange 
069            if (rc == 0 && generateEmptyExchangeWhenIdle) {
070                final FileExchange exchange = endpoint.createExchange((File)null);
071                getAsyncProcessor().process(exchange, new AsyncCallback() {
072                    public void done(boolean sync) {
073                    }
074                });
075            }
076    
077            lastPollTime = System.currentTimeMillis();
078        }
079    
080        /**
081         * Pools the given file or directory for files to process.
082         *
083         * @param fileOrDirectory  file or directory
084         * @param processDir  recursive
085         * @return the number of files processed or being processed async.
086         */
087        protected int pollFileOrDirectory(File fileOrDirectory, boolean processDir) {
088            if (!fileOrDirectory.isDirectory()) {
089                // process the file
090                return pollFile(fileOrDirectory);
091            } else if (processDir) {
092                // directory that can be recursive
093                int rc = 0;
094                if (isValidFile(fileOrDirectory)) {
095                    if (LOG.isTraceEnabled()) {
096                        LOG.trace("Polling directory " + fileOrDirectory);
097                    }
098                    File[] files = fileOrDirectory.listFiles();
099                    for (File file : files) {
100                        rc += pollFileOrDirectory(file, isRecursive()); // self-recursion
101                    }
102                }
103                return rc;
104            } else {
105                if (LOG.isTraceEnabled()) {
106                    LOG.trace("Skipping directory " + fileOrDirectory);
107                }
108                return 0;
109            }
110        }
111    
112        /**
113         * Polls the given file
114         *
115         * @param target  the file
116         * @return returns 1 if the file was processed, 0 otherwise.
117         */
118        protected int pollFile(final File target) {
119            if (LOG.isTraceEnabled()) {
120                LOG.trace("Polling file: " + target);
121            }
122    
123            if (!target.exists()) {
124                return 0;
125            }
126            if (!isValidFile(target)) {
127                return 0;
128            }
129            // we only care about file modified times if we are not deleting/moving files
130            if (!endpoint.isNoop()) {
131                if (filesBeingProcessed.contains(target)) {
132                    return 1;
133                }
134                filesBeingProcessed.put(target, target);
135            }
136    
137            final FileProcessStrategy processStrategy = endpoint.getFileStrategy();
138            final FileExchange exchange = endpoint.createExchange(target);
139    
140            endpoint.configureMessage(target, exchange.getIn());
141            try {
142                // is we use excluse read then acquire the exclusive read (waiting until we got it)
143                if (exclusiveReadLock) {
144                    acquireExclusiveReadLock(target);
145                }
146    
147                if (LOG.isDebugEnabled()) {
148                    LOG.debug("About to process file: " + target + " using exchange: " + exchange);
149                }
150                if (processStrategy.begin(endpoint, exchange, target)) {
151    
152                    // Use the async processor interface so that processing of
153                    // the exchange can happen asynchronously
154                    getAsyncProcessor().process(exchange, new AsyncCallback() {
155                        public void done(boolean sync) {
156                            // must use file from exchange as it can be updated due the preMoveNamePrefix/preMoveNamePostfix options
157                            final File file = exchange.getFile();
158                            boolean failed = exchange.isFailed();
159                            boolean handled = DeadLetterChannel.isFailureHandled(exchange);
160    
161                            if (LOG.isDebugEnabled()) {
162                                LOG.debug("Done processing file: " + file + ". Status is: " + (failed ? "failed: " + failed + ", handled by failure processor: " + handled : "processed OK"));
163                            }
164    
165                            boolean committed = false;
166                            try {
167                                if (!failed || handled) {
168                                    // commit the file strategy if there was no failure or already handled by the DeadLetterChannel
169                                    processStrategyCommit(processStrategy, exchange, file, handled);
170                                    committed = true;
171                                } else {
172                                    // there was an exception but it was not handled by the DeadLetterChannel
173                                    handleException(exchange.getException());
174                                }
175                            } finally {
176                                if (!committed) {
177                                    processStrategyRollback(processStrategy, exchange, file);
178                                }
179                                filesBeingProcessed.remove(file);
180                            }
181                        }
182                    });
183    
184                } else {
185                    LOG.warn(endpoint + " can not process file: " + target);
186                }
187            } catch (Throwable e) {
188                handleException(e);
189            }
190    
191            return 1;
192        }
193    
194        /**
195         * Acquires exclusive read lock to the given file. Will wait until the lock is granted.
196         * After granting the read lock it is realeased, we just want to make sure that when we start
197         * consuming the file its not currently in progress of being written by third party.
198         */
199        protected void acquireExclusiveReadLock(File file) throws IOException {
200            if (LOG.isTraceEnabled()) {
201                LOG.trace("Waiting for exclusive read lock to file: " + file);
202            }
203    
204            // try to acquire rw lock on the file before we can consume it
205            FileChannel channel = new RandomAccessFile(file, "rw").getChannel();
206            try {
207                FileLock lock = channel.lock();
208                if (LOG.isTraceEnabled()) {
209                    LOG.trace("Acquired exclusive read lock: " + lock + " to file: " + file);
210                }
211                // just release it now we dont want to hold it during the rest of the processing
212                lock.release();
213            } finally {
214                // must close channel
215                ObjectHelper.close(channel, "FileConsumer during acquiring of exclusive read lock", LOG);
216            }
217        }
218    
219        /**
220         * Strategy when the file was processed and a commit should be executed.
221         *
222         * @param processStrategy   the strategy to perform the commit
223         * @param exchange          the exchange
224         * @param file              the file processed
225         * @param failureHandled    is <tt>false</tt> if the exchange was processed succesfully, <tt>true</tt> if
226         * an exception occured during processing but it was handled by the failure processor (usually the
227         * DeadLetterChannel).
228         */
229        protected void processStrategyCommit(FileProcessStrategy processStrategy, FileExchange exchange,
230                                             File file, boolean failureHandled) {
231            try {
232                if (LOG.isDebugEnabled()) {
233                    LOG.debug("Committing file strategy: " + processStrategy + " for file: " + file + (failureHandled ? " that was handled by the failure processor." : ""));
234                }
235                processStrategy.commit(endpoint, exchange, file);
236            } catch (Exception e) {
237                LOG.warn("Error committing file strategy: " + processStrategy, e);
238                handleException(e);
239            }
240        }
241    
242        /**
243         * Strategy when the file was not processed and a rollback should be executed.
244         *
245         * @param processStrategy   the strategy to perform the commit
246         * @param exchange          the exchange
247         * @param file              the file processed
248         */
249        protected void processStrategyRollback(FileProcessStrategy processStrategy, FileExchange exchange, File file) {
250            if (LOG.isDebugEnabled()) {
251                LOG.debug("Rolling back file strategy: " + processStrategy + " for file: " + file);
252            }
253            processStrategy.rollback(endpoint, exchange, file);
254        }
255    
256        protected boolean isValidFile(File file) {
257            boolean result = false;
258            if (file != null && file.exists()) {
259                // TODO: maybe use a configurable strategy instead of the hardcoded one based on last file change
260                if (isMatched(file) && (alwaysConsume || isChanged(file))) {
261                    result = true;
262                }
263            }
264            return result;
265        }
266    
267        protected boolean isChanged(File file) {
268            if (file == null) {
269                // Sanity check
270                return false;
271            } else if (file.isDirectory()) {
272                // Allow recursive polling to descend into this directory
273                return true;
274            } else {
275                // @deprecated will be removed on Camel 2.0
276                // the code below is kinda hard to maintain. We should strive to remove
277                // this stuff in Camel 2.0 to keep this component simple and no surprises for end-users
278                // this stuff is not persistent so restarting Camel will reset the state
279                boolean lastModifiedCheck = false;
280                long modifiedDuration = 0;
281                if (getUnchangedDelay() > 0) {
282                    modifiedDuration = System.currentTimeMillis() - file.lastModified();
283                    lastModifiedCheck = modifiedDuration >= getUnchangedDelay();
284                }
285    
286                long fileModified = file.lastModified();
287                Long previousModified = noopMap.get(file);
288                noopMap.put(file, fileModified);
289                if (previousModified == null || fileModified > previousModified) {
290                    lastModifiedCheck = true;
291                }
292    
293                boolean sizeCheck = false;
294                long sizeDifference = 0;
295                if (isUnchangedSize()) {
296                    Long value = fileSizes.get(file);
297                    if (value == null) {
298                        sizeCheck = true;
299                    } else {
300                        sizeCheck = file.length() != value;
301                    }
302                }
303    
304                boolean answer = lastModifiedCheck || sizeCheck;
305    
306                if (LOG.isDebugEnabled()) {
307                    LOG.debug("file:" + file + " isChanged:" + answer + " " + "sizeCheck:" + sizeCheck + "("
308                              + sizeDifference + ") " + "lastModifiedCheck:" + lastModifiedCheck + "("
309                              + modifiedDuration + ")");
310                }
311    
312                if (isUnchangedSize()) {
313                    if (answer) {
314                        fileSizes.put(file, file.length());
315                    } else {
316                        fileSizes.remove(file);
317                    }
318                }
319    
320                return answer;
321            }
322        }
323    
324        protected boolean isMatched(File file) {
325            String name = file.getName();
326    
327            // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock")
328            if (name.startsWith(".")) {
329                return false;
330            }
331            // lock files should be skipped
332            if (name.endsWith(FileEndpoint.DEFAULT_LOCK_FILE_POSTFIX)) {
333                return false;
334            }
335    
336            // directories so far is always regarded as matched (matching on the name is only for files)
337            if (file.isDirectory()) {
338                return true;
339            }
340    
341            if (regexPattern != null && regexPattern.length() > 0) {
342                if (!name.matches(regexPattern)) {
343                    return false;
344                }
345            }
346    
347            if (endpoint.getExcludedNamePrefix() != null) {
348                if (name.startsWith(endpoint.getExcludedNamePrefix())) {
349                    return false;
350                }
351            }
352            String[] prefixes = endpoint.getExcludedNamePrefixes();
353            if (prefixes != null) {
354                for (String prefix : prefixes) {
355                    if (name.startsWith(prefix)) {
356                        return false;
357                    }
358                }
359            }
360            if (endpoint.getExcludedNamePostfix() != null) {
361                if (name.endsWith(endpoint.getExcludedNamePostfix())) {
362                    return false;
363                }
364            }
365            String[] postfixes = endpoint.getExcludedNamePostfixes();
366            if (postfixes != null) {
367                for (String postfix : postfixes) {
368                    if (name.endsWith(postfix)) {
369                        return false;
370                    }
371                }
372            }
373    
374            return true;
375        }
376    
377        public boolean isRecursive() {
378            return this.recursive;
379        }
380    
381        public void setRecursive(boolean recursive) {
382            this.recursive = recursive;
383        }
384    
385        public String getRegexPattern() {
386            return this.regexPattern;
387        }
388    
389        public void setRegexPattern(String regexPattern) {
390            this.regexPattern = regexPattern;
391        }
392    
393        public boolean isGenerateEmptyExchangeWhenIdle() {
394            return generateEmptyExchangeWhenIdle;
395        }
396    
397        /**
398         * @deprecated will be removed in Camel 2.0
399         */
400        public void setGenerateEmptyExchangeWhenIdle(boolean generateEmptyExchangeWhenIdle) {
401            this.generateEmptyExchangeWhenIdle = generateEmptyExchangeWhenIdle;
402        }
403    
404        public int getUnchangedDelay() {
405            return unchangedDelay;
406        }
407    
408        /**
409         * @deprecated will be removed in Camel 2.0
410         */
411        public void setUnchangedDelay(int unchangedDelay) {
412            this.unchangedDelay = unchangedDelay;
413        }
414    
415        public boolean isUnchangedSize() {
416            return unchangedSize;
417        }
418    
419        /**
420         * @deprecated will be removed in Camel 2.0
421         */
422        public void setUnchangedSize(boolean unchangedSize) {
423            this.unchangedSize = unchangedSize;
424        }
425    
426        public boolean isExclusiveReadLock() {
427            return exclusiveReadLock;
428        }
429    
430        public void setExclusiveReadLock(boolean exclusiveReadLock) {
431            this.exclusiveReadLock = exclusiveReadLock;
432        }
433    
434        public boolean isAlwaysConsume() {
435            return alwaysConsume;
436        }
437    
438        /**
439         * @deprecated will be removed in Camel 2.0 (not needed when we get rid of last polltimestamp)
440         */
441        public void setAlwaysConsume(boolean alwaysConsume) {
442            this.alwaysConsume = alwaysConsume;
443        }
444    
445        public boolean isTimestamp() {
446            return !alwaysConsume;
447        }
448    
449        /**
450         * @deprecated will be removed in Camel 2.0 (not needed when we get rid of last polltimestamp)
451         */
452        public void setTimestamp(boolean timestamp) {
453            this.alwaysConsume = !timestamp;
454        }
455    }