001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.file;
018    
019    import java.io.File;
020    import java.io.FileOutputStream;
021    import java.io.InputStream;
022    import java.io.RandomAccessFile;
023    import java.nio.ByteBuffer;
024    import java.nio.channels.FileChannel;
025    
026    import org.apache.camel.Exchange;
027    import org.apache.camel.Message;
028    import org.apache.camel.impl.DefaultProducer;
029    import org.apache.camel.util.ExchangeHelper;
030    import org.apache.camel.util.ObjectHelper;
031    import org.apache.camel.util.UuidGenerator;
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    
035    /**
036     * For producing files.
037     *
038     * @version $Revision: 42440 $
039     */
040    public class FileProducer extends DefaultProducer {
041        private static final transient Log LOG = LogFactory.getLog(FileProducer.class);
042        private FileEndpoint endpoint;
043    
044        public FileProducer(FileEndpoint endpoint) {
045            super(endpoint);
046            this.endpoint = endpoint;
047        }
048    
049        public FileEndpoint getEndpoint() {
050            return (FileEndpoint) super.getEndpoint();
051        }
052    
053        public void process(Exchange exchange) throws Exception {
054            FileExchange fileExchange = endpoint.createExchange(exchange);
055            process(fileExchange);
056            ExchangeHelper.copyResults(exchange, fileExchange);
057        }
058    
059        public void process(FileExchange exchange) throws Exception {
060            if (ExchangeHelper.isOutCapable(exchange)) {
061                // lets poll the file
062                Message out = exchange.getOut(true);
063                endpoint.configureMessage(endpoint.getFile(), out);
064                return;
065            }
066    
067            InputStream in = ExchangeHelper.getMandatoryInBody(exchange, InputStream.class);
068            File file = createFileName(exchange.getIn());
069            buildDirectory(file);
070    
071            if (LOG.isDebugEnabled()) {
072                LOG.debug("About to write to: " + file + " from exchange: " + exchange);
073            }
074            FileChannel fc = null;
075            try {
076                if (getEndpoint().isAppend()) {
077                    fc = new RandomAccessFile(file, "rw").getChannel();
078                    fc.position(fc.size());
079                } else {
080                    fc = new FileOutputStream(file).getChannel();
081                }
082    
083                int size = getEndpoint().getBufferSize();
084                byte[] buffer = new byte[size];
085                ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
086                while (true) {
087                    int count = in.read(buffer);
088                    if (count <= 0) {
089                        break;
090                    } else if (count < size) {
091                        byteBuffer = ByteBuffer.wrap(buffer, 0, count);
092                        fc.write(byteBuffer);
093                        break;
094                    } else {
095                        fc.write(byteBuffer);
096                        byteBuffer.clear();
097                    }
098                }
099            } finally {
100                ObjectHelper.close(in, file.getName(), LOG);
101                ObjectHelper.close(fc, file.getName(), LOG);
102            }
103        }
104    
105        protected File createFileName(Message message) {
106            File answer;
107    
108            String name = null;
109            if (!endpoint.isIgnoreFileNameHeader()) {
110                name = message.getHeader(FileComponent.HEADER_FILE_NAME, String.class);
111            }
112    
113            File endpointFile = endpoint.getFile();
114            if (endpointFile.isDirectory()) {
115                if (name != null) {
116                    answer = new File(endpointFile, name);
117                    if (answer.isDirectory()) {
118                        answer = new File(answer, endpoint.getGeneratedFileName(message));
119                    }
120                } else {
121                    answer = new File(endpointFile, endpoint.getGeneratedFileName(message));
122                }
123            } else {
124                if (name == null) {
125                    answer = endpointFile;
126                } else {
127                    answer = new File(endpointFile, name);
128                }
129            }
130    
131            // TODO lets store the name in the header?
132            //message.setHeader(FileComponent.HEADER_FILE_NAME, answer.toString());
133    
134            return answer;
135        }
136    
137        private void buildDirectory(File file) {
138            String dirName = file.getAbsolutePath();
139            int index = dirName.lastIndexOf(File.separatorChar);
140            if (index > 0) {
141                dirName = dirName.substring(0, index);
142                File dir = new File(dirName);
143                dir.mkdirs();
144            }
145        }
146    
147    
148    }