001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.converter.stream; 018 019 import java.io.BufferedOutputStream; 020 import java.io.ByteArrayInputStream; 021 import java.io.ByteArrayOutputStream; 022 import java.io.File; 023 import java.io.FileInputStream; 024 import java.io.FileNotFoundException; 025 import java.io.FileOutputStream; 026 import java.io.IOException; 027 import java.io.InputStream; 028 import java.io.OutputStream; 029 import java.util.Map; 030 031 import org.apache.camel.converter.IOConverter; 032 import org.apache.camel.util.FileUtil; 033 import org.apache.camel.util.IOHelper; 034 035 /** 036 * This output stream will store the content into a File if the stream context size is exceed the 037 * THRESHOLD which's default value is 64K. The temp file will store in the temp directory, you 038 * can configure it by setting the TEMP_DIR property. If you don't set the TEMP_DIR property, 039 * it will choice the directory which is set by the system property of "java.io.tmpdir". 040 * You can get a cached input stream of this stream. The temp file which is created with this 041 * output stream will be deleted when you close this output stream or the cached inputStream. 042 */ 043 public class CachedOutputStream extends OutputStream { 044 public static final String THRESHOLD = "CamelCachedOutputStreamThreshold"; 045 public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory"; 046 047 protected boolean outputLocked; 048 protected OutputStream currentStream; 049 050 private long threshold = 64 * 1024; 051 052 private int totalLength; 053 054 private boolean inmem; 055 056 private File tempFile; 057 058 private File outputDir; 059 060 public CachedOutputStream() { 061 currentStream = new ByteArrayOutputStream(2048); 062 inmem = true; 063 } 064 065 public CachedOutputStream(long threshold) { 066 this(); 067 this.threshold = threshold; 068 } 069 070 public CachedOutputStream(Map<String, String> properties) { 071 this(); 072 String value = properties.get(THRESHOLD); 073 if (value != null) { 074 threshold = Integer.parseInt(value); 075 } 076 value = properties.get(TEMP_DIR); 077 if (value != null) { 078 File f = new File(value); 079 if (f.exists() && f.isDirectory()) { 080 outputDir = f; 081 } else { 082 outputDir = null; 083 } 084 } else { 085 outputDir = null; 086 } 087 } 088 089 /** 090 * Perform any actions required on stream flush (freeze headers, reset 091 * output stream ... etc.) 092 */ 093 protected void doFlush() throws IOException { 094 095 } 096 097 public void flush() throws IOException { 098 currentStream.flush(); 099 doFlush(); 100 } 101 102 /** 103 * Perform any actions required on stream closure (handle response etc.) 104 */ 105 protected void doClose() throws IOException { 106 107 } 108 109 /** 110 * Perform any actions required after stream closure (close the other related stream etc.) 111 */ 112 protected void postClose() throws IOException { 113 114 } 115 116 /** 117 * Locks the output stream to prevent additional writes, but maintains 118 * a pointer to it so an InputStream can be obtained 119 * @throws IOException 120 */ 121 public void lockOutputStream() throws IOException { 122 currentStream.flush(); 123 outputLocked = true; 124 } 125 126 public void close() throws IOException { 127 currentStream.flush(); 128 doClose(); 129 currentStream.close(); 130 maybeDeleteTempFile(currentStream); 131 postClose(); 132 } 133 134 public boolean equals(Object obj) { 135 return currentStream.equals(obj); 136 } 137 138 /** 139 * Replace the original stream with the new one, optionally copying the content of the old one 140 * into the new one. 141 * When with Attachment, needs to replace the xml writer stream with the stream used by 142 * AttachmentSerializer or copy the cached output stream to the "real" 143 * output stream, i.e. onto the wire. 144 * 145 * @param out the new output stream 146 * @param copyOldContent flag indicating if the old content should be copied 147 * @throws IOException 148 */ 149 public void resetOut(OutputStream out, boolean copyOldContent) throws IOException { 150 if (out == null) { 151 out = new ByteArrayOutputStream(); 152 } 153 154 if (currentStream instanceof CachedOutputStream) { 155 CachedOutputStream ac = (CachedOutputStream) currentStream; 156 InputStream in = ac.getInputStream(); 157 IOHelper.copyAndCloseInput(in, out); 158 } else { 159 if (inmem) { 160 if (currentStream instanceof ByteArrayOutputStream) { 161 ByteArrayOutputStream byteOut = (ByteArrayOutputStream) currentStream; 162 if (copyOldContent && byteOut.size() > 0) { 163 byteOut.writeTo(out); 164 } 165 } else { 166 throw new IOException("Unknown format of currentStream"); 167 } 168 } else { 169 // read the file 170 currentStream.close(); 171 FileInputStream fin = new FileInputStream(tempFile); 172 if (copyOldContent) { 173 IOHelper.copyAndCloseInput(fin, out); 174 } 175 tempFile.delete(); 176 tempFile = null; 177 inmem = true; 178 } 179 } 180 currentStream = out; 181 outputLocked = false; 182 } 183 184 public static void copyStream(InputStream in, OutputStream out, int bufferSize) throws IOException { 185 IOHelper.copyAndCloseInput(in, out, bufferSize); 186 } 187 188 public int size() { 189 return totalLength; 190 } 191 192 public byte[] getBytes() throws IOException { 193 flush(); 194 if (inmem) { 195 if (currentStream instanceof ByteArrayOutputStream) { 196 return ((ByteArrayOutputStream)currentStream).toByteArray(); 197 } else { 198 throw new IOException("Unknown format of currentStream"); 199 } 200 } else { 201 // read the file 202 FileInputStream fin = new FileInputStream(tempFile); 203 return IOConverter.toBytes(fin); 204 } 205 } 206 207 public void writeCacheTo(OutputStream out) throws IOException { 208 flush(); 209 if (inmem) { 210 if (currentStream instanceof ByteArrayOutputStream) { 211 ((ByteArrayOutputStream)currentStream).writeTo(out); 212 } else { 213 throw new IOException("Unknown format of currentStream"); 214 } 215 } else { 216 // read the file 217 FileInputStream fin = new FileInputStream(tempFile); 218 IOHelper.copyAndCloseInput(fin, out); 219 } 220 } 221 222 223 public void writeCacheTo(StringBuilder out, int limit) throws IOException { 224 flush(); 225 if (totalLength < limit 226 || limit == -1) { 227 writeCacheTo(out); 228 return; 229 } 230 231 int count = 0; 232 if (inmem) { 233 if (currentStream instanceof ByteArrayOutputStream) { 234 byte bytes[] = ((ByteArrayOutputStream)currentStream).toByteArray(); 235 out.append(IOHelper.newStringFromBytes(bytes, 0, limit)); 236 } else { 237 throw new IOException("Unknown format of currentStream"); 238 } 239 } else { 240 // read the file 241 FileInputStream fin = new FileInputStream(tempFile); 242 byte bytes[] = new byte[1024]; 243 int x = fin.read(bytes); 244 while (x != -1) { 245 if ((count + x) > limit) { 246 x = limit - count; 247 } 248 out.append(IOHelper.newStringFromBytes(bytes, 0, x)); 249 count += x; 250 251 if (count >= limit) { 252 x = -1; 253 } else { 254 x = fin.read(bytes); 255 } 256 } 257 fin.close(); 258 } 259 } 260 public void writeCacheTo(StringBuilder out) throws IOException { 261 flush(); 262 if (inmem) { 263 if (currentStream instanceof ByteArrayOutputStream) { 264 byte[] bytes = ((ByteArrayOutputStream)currentStream).toByteArray(); 265 out.append(IOHelper.newStringFromBytes(bytes)); 266 } else { 267 throw new IOException("Unknown format of currentStream"); 268 } 269 } else { 270 // read the file 271 FileInputStream fin = new FileInputStream(tempFile); 272 byte bytes[] = new byte[1024]; 273 int x = fin.read(bytes); 274 while (x != -1) { 275 out.append(IOHelper.newStringFromBytes(bytes, 0, x)); 276 x = fin.read(bytes); 277 } 278 fin.close(); 279 } 280 } 281 282 283 /** 284 * @return the underlying output stream 285 */ 286 public OutputStream getOut() { 287 return currentStream; 288 } 289 290 public int hashCode() { 291 return currentStream.hashCode(); 292 } 293 294 public String toString() { 295 StringBuilder builder = new StringBuilder().append("[") 296 .append(CachedOutputStream.class.getName()) 297 .append(" Content: "); 298 try { 299 writeCacheTo(builder); 300 } catch (IOException e) { 301 //ignore 302 } 303 return builder.append("]").toString(); 304 } 305 306 protected void onWrite() throws IOException { 307 308 } 309 310 public void write(byte[] b, int off, int len) throws IOException { 311 if (!outputLocked) { 312 onWrite(); 313 this.totalLength += len; 314 if (threshold > 0 && inmem && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) { 315 createFileOutputStream(); 316 } 317 currentStream.write(b, off, len); 318 } 319 } 320 321 public void write(byte[] b) throws IOException { 322 if (!outputLocked) { 323 onWrite(); 324 this.totalLength += b.length; 325 if (threshold > 0 && inmem && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) { 326 createFileOutputStream(); 327 } 328 currentStream.write(b); 329 } 330 } 331 332 public void write(int b) throws IOException { 333 if (!outputLocked) { 334 onWrite(); 335 this.totalLength++; 336 if (threshold > 0 && inmem && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) { 337 createFileOutputStream(); 338 } 339 currentStream.write(b); 340 } 341 } 342 343 private void createFileOutputStream() throws IOException { 344 ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream; 345 if (outputDir == null) { 346 tempFile = FileUtil.createTempFile("cos", ".tmp"); 347 } else { 348 tempFile = FileUtil.createTempFile("cos", ".tmp", outputDir, false); 349 } 350 351 currentStream = new BufferedOutputStream(new FileOutputStream(tempFile)); 352 bout.writeTo(currentStream); 353 inmem = false; 354 } 355 356 public File getTempFile() { 357 return tempFile != null && tempFile.exists() ? tempFile : null; 358 } 359 360 public InputStream getInputStream() throws IOException { 361 flush(); 362 if (inmem) { 363 if (currentStream instanceof ByteArrayOutputStream) { 364 return new ByteArrayInputStream(((ByteArrayOutputStream) currentStream).toByteArray()); 365 } else { 366 return null; 367 } 368 } else { 369 try { 370 FileInputStream fileInputStream = new FileInputStream(tempFile) { 371 public void close() throws IOException { 372 super.close(); 373 maybeDeleteTempFile(this); 374 } 375 }; 376 return fileInputStream; 377 } catch (FileNotFoundException e) { 378 throw new IOException("Cached file was deleted, " + e.toString()); 379 } 380 } 381 } 382 383 public StreamCache getStreamCache() throws IOException { 384 flush(); 385 if (inmem) { 386 if (currentStream instanceof ByteArrayOutputStream) { 387 return new InputStreamCache(((ByteArrayOutputStream) currentStream).toByteArray()); 388 } else { 389 return null; 390 } 391 } else { 392 try { 393 FileInputStreamCache fileInputStream = new FileInputStreamCache(tempFile, this); 394 return fileInputStream; 395 } catch (FileNotFoundException e) { 396 throw new IOException("Cached file was deleted, " + e.toString()); 397 } 398 } 399 } 400 401 private void maybeDeleteTempFile(Object stream) { 402 if (!inmem && tempFile != null) { 403 tempFile.delete(); 404 tempFile = null; 405 currentStream = new ByteArrayOutputStream(1024); 406 inmem = true; 407 } 408 } 409 410 public void setOutputDir(File outputDir) throws IOException { 411 this.outputDir = outputDir; 412 } 413 public void setThreshold(long threshold) { 414 this.threshold = threshold; 415 } 416 417 }