001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.converter.stream;
018    
019    import java.io.BufferedOutputStream;
020    import java.io.ByteArrayInputStream;
021    import java.io.ByteArrayOutputStream;
022    import java.io.File;
023    import java.io.FileInputStream;
024    import java.io.FileNotFoundException;
025    import java.io.FileOutputStream;
026    import java.io.IOException;
027    import java.io.InputStream;
028    import java.io.OutputStream;
029    import java.util.Map;
030    
031    import org.apache.camel.converter.IOConverter;
032    import org.apache.camel.util.FileUtil;
033    import org.apache.camel.util.IOHelper;
034    
035    /**
036     * This output stream will store the content into a File if the stream context size is exceed the
037     * THRESHOLD which's default value is 64K. The temp file will store in the temp directory, you 
038     * can configure it by setting the TEMP_DIR property. If you don't set the TEMP_DIR property,
039     * it will choice the directory which is set by the system property of "java.io.tmpdir".
040     * You can get a cached input stream of this stream. The temp file which is created with this 
041     * output stream will be deleted when you close this output stream or the cached inputStream.
042     */
043    public class CachedOutputStream extends OutputStream {
044        public static final String THRESHOLD = "CamelCachedOutputStreamThreshold";
045        public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory";
046       
047        protected boolean outputLocked;
048        protected OutputStream currentStream;
049    
050        private long threshold = 64 * 1024;
051    
052        private int totalLength;
053    
054        private boolean inmem;
055    
056        private File tempFile;
057    
058        private File outputDir;   
059    
060        public CachedOutputStream() {
061            currentStream = new ByteArrayOutputStream(2048);
062            inmem = true;
063        }
064    
065        public CachedOutputStream(long threshold) {
066            this();
067            this.threshold = threshold;        
068        }
069        
070        public CachedOutputStream(Map<String, String> properties) {
071            this();
072            String value = properties.get(THRESHOLD);
073            if (value != null) {
074                threshold = Integer.parseInt(value);
075            }
076            value = properties.get(TEMP_DIR);
077            if (value != null) {
078                File f = new File(value);
079                if (f.exists() && f.isDirectory()) {
080                    outputDir = f;
081                } else {
082                    outputDir = null;
083                }
084            } else {
085                outputDir = null;
086            }        
087        }
088    
089        /**
090         * Perform any actions required on stream flush (freeze headers, reset
091         * output stream ... etc.)
092         */
093        protected void doFlush() throws IOException {
094            
095        }
096    
097        public void flush() throws IOException {
098            currentStream.flush();       
099            doFlush();
100        }
101    
102        /**
103         * Perform any actions required on stream closure (handle response etc.)
104         */
105        protected void doClose() throws IOException {
106            
107        }
108        
109        /**
110         * Perform any actions required after stream closure (close the other related stream etc.)
111         */
112        protected void postClose() throws IOException {
113            
114        }
115    
116        /**
117         * Locks the output stream to prevent additional writes, but maintains
118         * a pointer to it so an InputStream can be obtained
119         * @throws IOException
120         */
121        public void lockOutputStream() throws IOException {
122            currentStream.flush();
123            outputLocked = true;
124        }
125        
126        public void close() throws IOException {
127            currentStream.flush();        
128            doClose();
129            currentStream.close();
130            maybeDeleteTempFile(currentStream);
131            postClose();
132        }
133    
134        public boolean equals(Object obj) {
135            return currentStream.equals(obj);
136        }
137    
138        /**
139         * Replace the original stream with the new one, optionally copying the content of the old one
140         * into the new one.
141         * When with Attachment, needs to replace the xml writer stream with the stream used by
142         * AttachmentSerializer or copy the cached output stream to the "real"
143         * output stream, i.e. onto the wire.
144         * 
145         * @param out the new output stream
146         * @param copyOldContent flag indicating if the old content should be copied
147         * @throws IOException
148         */
149        public void resetOut(OutputStream out, boolean copyOldContent) throws IOException {
150            if (out == null) {
151                out = new ByteArrayOutputStream();
152            }
153    
154            if (currentStream instanceof CachedOutputStream) {
155                CachedOutputStream ac = (CachedOutputStream) currentStream;
156                InputStream in = ac.getInputStream();
157                IOHelper.copyAndCloseInput(in, out);
158            } else {
159                if (inmem) {
160                    if (currentStream instanceof ByteArrayOutputStream) {
161                        ByteArrayOutputStream byteOut = (ByteArrayOutputStream) currentStream;
162                        if (copyOldContent && byteOut.size() > 0) {
163                            byteOut.writeTo(out);
164                        }
165                    } else {
166                        throw new IOException("Unknown format of currentStream");
167                    }
168                } else {
169                    // read the file
170                    currentStream.close();
171                    FileInputStream fin = new FileInputStream(tempFile);
172                    if (copyOldContent) {
173                        IOHelper.copyAndCloseInput(fin, out);
174                    }
175                    tempFile.delete();
176                    tempFile = null;
177                    inmem = true;
178                }
179            }
180            currentStream = out;
181            outputLocked = false;
182        }
183    
184        public static void copyStream(InputStream in, OutputStream out, int bufferSize) throws IOException {
185            IOHelper.copyAndCloseInput(in, out, bufferSize);
186        }
187    
188        public int size() {
189            return totalLength;
190        }
191        
192        public byte[] getBytes() throws IOException {
193            flush();
194            if (inmem) {
195                if (currentStream instanceof ByteArrayOutputStream) {
196                    return ((ByteArrayOutputStream)currentStream).toByteArray();
197                } else {
198                    throw new IOException("Unknown format of currentStream");
199                }
200            } else {
201                // read the file
202                FileInputStream fin = new FileInputStream(tempFile);
203                return IOConverter.toBytes(fin);
204            }
205        }
206        
207        public void writeCacheTo(OutputStream out) throws IOException {
208            flush();
209            if (inmem) {
210                if (currentStream instanceof ByteArrayOutputStream) {
211                    ((ByteArrayOutputStream)currentStream).writeTo(out);
212                } else {
213                    throw new IOException("Unknown format of currentStream");
214                }
215            } else {
216                // read the file
217                FileInputStream fin = new FileInputStream(tempFile);
218                IOHelper.copyAndCloseInput(fin, out);
219            }
220        }
221        
222        
223        public void writeCacheTo(StringBuilder out, int limit) throws IOException {
224            flush();
225            if (totalLength < limit
226                || limit == -1) {
227                writeCacheTo(out);
228                return;
229            }
230            
231            int count = 0;
232            if (inmem) {
233                if (currentStream instanceof ByteArrayOutputStream) {
234                    byte bytes[] = ((ByteArrayOutputStream)currentStream).toByteArray();
235                    out.append(IOHelper.newStringFromBytes(bytes, 0, limit));
236                } else {
237                    throw new IOException("Unknown format of currentStream");
238                }
239            } else {
240                // read the file
241                FileInputStream fin = new FileInputStream(tempFile);
242                byte bytes[] = new byte[1024];
243                int x = fin.read(bytes);
244                while (x != -1) {
245                    if ((count + x) > limit) {
246                        x = limit - count;
247                    }
248                    out.append(IOHelper.newStringFromBytes(bytes, 0, x));
249                    count += x;
250                    
251                    if (count >= limit) {
252                        x = -1;
253                    } else {
254                        x = fin.read(bytes);
255                    }
256                }
257                fin.close();
258            }
259        }
260        public void writeCacheTo(StringBuilder out) throws IOException {
261            flush();
262            if (inmem) {
263                if (currentStream instanceof ByteArrayOutputStream) {
264                    byte[] bytes = ((ByteArrayOutputStream)currentStream).toByteArray();
265                    out.append(IOHelper.newStringFromBytes(bytes));
266                } else {
267                    throw new IOException("Unknown format of currentStream");
268                }
269            } else {
270                // read the file
271                FileInputStream fin = new FileInputStream(tempFile);
272                byte bytes[] = new byte[1024];
273                int x = fin.read(bytes);
274                while (x != -1) {
275                    out.append(IOHelper.newStringFromBytes(bytes, 0, x));
276                    x = fin.read(bytes);
277                }
278                fin.close();
279            }
280        }    
281        
282    
283        /**
284         * @return the underlying output stream
285         */
286        public OutputStream getOut() {
287            return currentStream;
288        }
289    
290        public int hashCode() {
291            return currentStream.hashCode();
292        }
293    
294        public String toString() {
295            StringBuilder builder = new StringBuilder().append("[")
296                .append(CachedOutputStream.class.getName())
297                .append(" Content: ");
298            try {
299                writeCacheTo(builder);
300            } catch (IOException e) {
301                //ignore
302            }
303            return builder.append("]").toString();
304        }
305    
306        protected void onWrite() throws IOException {
307            
308        }
309    
310        public void write(byte[] b, int off, int len) throws IOException {
311            if (!outputLocked) {
312                onWrite();
313                this.totalLength += len;
314                if (threshold > 0 && inmem && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
315                    createFileOutputStream();
316                }
317                currentStream.write(b, off, len);
318            }
319        }
320    
321        public void write(byte[] b) throws IOException {
322            if (!outputLocked) {
323                onWrite();
324                this.totalLength += b.length;
325                if (threshold > 0 && inmem && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
326                    createFileOutputStream();
327                }
328                currentStream.write(b);
329            }
330        }
331    
332        public void write(int b) throws IOException {
333            if (!outputLocked) {
334                onWrite();
335                this.totalLength++;
336                if (threshold > 0 && inmem && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
337                    createFileOutputStream();
338                }
339                currentStream.write(b);
340            }
341        }
342    
343        private void createFileOutputStream() throws IOException {
344            ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream;
345            if (outputDir == null) {
346                tempFile = FileUtil.createTempFile("cos", ".tmp");
347            } else {
348                tempFile = FileUtil.createTempFile("cos", ".tmp", outputDir, false);
349            }
350            
351            currentStream = new BufferedOutputStream(new FileOutputStream(tempFile));
352            bout.writeTo(currentStream);
353            inmem = false;
354        }
355    
356        public File getTempFile() {
357            return tempFile != null && tempFile.exists() ? tempFile : null;
358        }
359    
360        public InputStream getInputStream() throws IOException {
361            flush();
362            if (inmem) {
363                if (currentStream instanceof ByteArrayOutputStream) {
364                    return new ByteArrayInputStream(((ByteArrayOutputStream) currentStream).toByteArray());
365                } else {
366                    return null;
367                }
368            } else {
369                try {
370                    FileInputStream fileInputStream = new FileInputStream(tempFile) {
371                        public void close() throws IOException {
372                            super.close();
373                            maybeDeleteTempFile(this);
374                        }
375                    };
376                    return fileInputStream;
377                } catch (FileNotFoundException e) {
378                    throw new IOException("Cached file was deleted, " + e.toString());
379                }
380            }
381        }
382        
383        public StreamCache getStreamCache() throws IOException {
384            flush();
385            if (inmem) {
386                if (currentStream instanceof ByteArrayOutputStream) {
387                    return new InputStreamCache(((ByteArrayOutputStream) currentStream).toByteArray());
388                } else {
389                    return null;
390                }
391            } else {
392                try {
393                    FileInputStreamCache fileInputStream = new FileInputStreamCache(tempFile, this);
394                    return fileInputStream;
395                } catch (FileNotFoundException e) {
396                    throw new IOException("Cached file was deleted, " + e.toString());
397                }
398            }
399        }
400        
401        private void maybeDeleteTempFile(Object stream) {        
402            if (!inmem && tempFile != null) {
403                tempFile.delete();
404                tempFile = null;
405                currentStream = new ByteArrayOutputStream(1024);
406                inmem = true;
407            }
408        }
409    
410        public void setOutputDir(File outputDir) throws IOException {
411            this.outputDir = outputDir;
412        }
413        public void setThreshold(long threshold) {
414            this.threshold = threshold;
415        }
416    
417    }