blob: 4cfa9e81702605e9c18d1af976d6dab15398fac1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.converter.stream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.camel.Exchange;
import org.apache.camel.StreamCache;
import org.apache.camel.impl.SynchronizationAdapter;
import org.apache.camel.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This output stream will store the content into a File if the stream context size is exceed the
* THRESHOLD which's default value is 64K. The temp file will store in the temp directory, you
* can configure it by setting the TEMP_DIR property. If you don't set the TEMP_DIR property,
* it will choose the directory which is set by the system property of "java.io.tmpdir".
* You can get a cached input stream of this stream. The temp file which is created with this
* output stream will be deleted when you close this output stream or the all cached
* fileInputStream is closed after the exchange is completed.
*/
public class CachedOutputStream extends OutputStream {
public static final String THRESHOLD = "CamelCachedOutputStreamThreshold";
public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory";
private static final transient Logger LOG = LoggerFactory.getLogger(CachedOutputStream.class);
private OutputStream currentStream = new ByteArrayOutputStream(2048);
private boolean inMemory = true;
private int totalLength;
private File tempFile;
private FileInputStreamCache fileInputStreamCache;
private long threshold = 64 * 1024;
private File outputDir;
public CachedOutputStream(Exchange exchange) {
this(exchange, true);
}
public CachedOutputStream(Exchange exchange, boolean closedOnCompletion) {
String hold = exchange.getContext().getProperties().get(THRESHOLD);
String dir = exchange.getContext().getProperties().get(TEMP_DIR);
if (hold != null) {
this.threshold = exchange.getContext().getTypeConverter().convertTo(Long.class, hold);
}
if (dir != null) {
this.outputDir = exchange.getContext().getTypeConverter().convertTo(File.class, dir);
}
if (closedOnCompletion) {
// add on completion so we can cleanup after the exchange is done such as deleting temporary files
exchange.addOnCompletion(new SynchronizationAdapter() {
@Override
public void onDone(Exchange exchange) {
try {
if (fileInputStreamCache != null) {
fileInputStreamCache.close();
}
close();
} catch (Exception e) {
LOG.warn("Error deleting temporary cache file: " + tempFile, e);
}
}
@Override
public String toString() {
return "OnCompletion[CachedOutputStream]";
}
});
}
}
public void flush() throws IOException {
currentStream.flush();
}
public void close() throws IOException {
currentStream.close();
cleanUpTempFile();
}
public boolean equals(Object obj) {
return currentStream.equals(obj);
}
public int hashCode() {
return currentStream.hashCode();
}
public String toString() {
return "CachedOutputStream[size: " + totalLength + "]";
}
public void write(byte[] b, int off, int len) throws IOException {
this.totalLength += len;
if (threshold > 0 && inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
pageToFileStream();
}
currentStream.write(b, off, len);
}
public void write(byte[] b) throws IOException {
this.totalLength += b.length;
if (threshold > 0 && inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
pageToFileStream();
}
currentStream.write(b);
}
public void write(int b) throws IOException {
this.totalLength++;
if (threshold > 0 && inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
pageToFileStream();
}
currentStream.write(b);
}
public InputStream getInputStream() throws IOException {
flush();
if (inMemory) {
if (currentStream instanceof ByteArrayOutputStream) {
return new ByteArrayInputStream(((ByteArrayOutputStream) currentStream).toByteArray());
} else {
throw new IllegalStateException("CurrentStream should be an instance of ByteArrayOutputStream but is: " + currentStream.getClass().getName());
}
} else {
try {
if (fileInputStreamCache == null) {
fileInputStreamCache = new FileInputStreamCache(tempFile);
}
return fileInputStreamCache;
} catch (FileNotFoundException e) {
throw new IOException("Cached file " + tempFile + " not found", e);
}
}
}
public InputStream getWrappedInputStream() throws IOException {
// The WrappedInputStream will close the CachedOuputStream when it is closed
return new WrappedInputStream(this, getInputStream());
}
public StreamCache getStreamCache() throws IOException {
flush();
if (inMemory) {
if (currentStream instanceof ByteArrayOutputStream) {
return new InputStreamCache(((ByteArrayOutputStream) currentStream).toByteArray());
} else {
throw new IllegalStateException("CurrentStream should be an instance of ByteArrayOutputStream but is: " + currentStream.getClass().getName());
}
} else {
try {
if (fileInputStreamCache == null) {
fileInputStreamCache = new FileInputStreamCache(tempFile);
}
return fileInputStreamCache;
} catch (FileNotFoundException e) {
throw new IOException("Cached file " + tempFile + " not found", e);
}
}
}
private void cleanUpTempFile() {
// cleanup temporary file
if (tempFile != null) {
FileUtil.deleteFile(tempFile);
tempFile = null;
}
}
private void pageToFileStream() throws IOException {
flush();
ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream;
if (outputDir == null) {
tempFile = FileUtil.createTempFile("cos", ".tmp");
} else {
tempFile = FileUtil.createTempFile("cos", ".tmp", outputDir);
}
if (LOG.isTraceEnabled()) {
LOG.trace("Creating temporary stream cache file: " + tempFile);
}
try {
currentStream = new BufferedOutputStream(new FileOutputStream(tempFile));
bout.writeTo(currentStream);
} finally {
// ensure flag is flipped to file based
inMemory = false;
}
}
// This class will close the CachedOutputStream when it is closed
private class WrappedInputStream extends InputStream {
private CachedOutputStream cachedOutputStream;
private InputStream inputStream;
WrappedInputStream(CachedOutputStream cos, InputStream is) {
cachedOutputStream = cos;
inputStream = is;
}
@Override
public int read() throws IOException {
return inputStream.read();
}
@Override
public int available() throws IOException {
return inputStream.available();
}
@Override
public void reset() throws IOException {
inputStream.reset();
}
@Override
public void close() throws IOException {
inputStream.close();
cachedOutputStream.close();
}
}
}