blob: 99958507b421cefe9d1d842200739054d418b1f9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.axiom.attachments.impl;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import javax.activation.DataHandler;
import javax.activation.DataSource;
import javax.activation.FileDataSource;
import org.apache.axiom.om.OMException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Attachment processing uses a lot of buffers.
* The BufferUtils class attempts to reuse buffers to prevent
* excessive GarbageCollection
*/
public class BufferUtils {
private static Log log = LogFactory.getLog(BufferUtils.class);
// Performance testing indicates that 4K is the best size for medium
// and small payloads. And there is a neglible effect on large payloads.
static int BUFFER_LEN = 4 * 1024; // Copy Buffer size
static boolean ENABLE_FILE_CHANNEL = true; // Enable file channel optimization
private static byte[] _cacheBuffer = new byte[BUFFER_LEN];
private static boolean _cacheBufferInUse = false;
private static ByteBuffer _cacheByteBuffer = ByteBuffer.allocate(BUFFER_LEN);
private static boolean _cacheByteBufferInUse = false;
/**
* Private utility to write the InputStream contents to the OutputStream.
* @param is
* @param os
* @throws IOException
*/
public static void inputStream2OutputStream(InputStream is,
OutputStream os)
throws IOException {
// If this is a FileOutputStream, use th
if (ENABLE_FILE_CHANNEL && os instanceof FileOutputStream) {
if (inputStream2FileOutputStream(is, (FileOutputStream) os)) {
return;
}
}
byte[] buffer = getTempBuffer();
try {
int bytesRead = is.read(buffer);
// Continue reading until no bytes are read and no
// bytes are now available.
while (bytesRead > 0 || is.available() > 0) {
if (bytesRead > 0) {
os.write(buffer, 0, bytesRead);
}
bytesRead = is.read(buffer);
}
} finally {
releaseTempBuffer(buffer);
}
}
/**
* @param is InputStream
* @param os OutputStream
* @param limit maximum number of bytes to read
* @return total ytes read
* @throws IOException
*/
public static int inputStream2OutputStream(InputStream is,
OutputStream os,
int limit)
throws IOException {
byte[] buffer = getTempBuffer();
int totalWritten = 0;
int bytesRead = 0;
try {
do {
int len = (limit-totalWritten) > BUFFER_LEN ? BUFFER_LEN : (limit-totalWritten);
bytesRead = is.read(buffer, 0, len);
if (bytesRead > 0) {
os.write(buffer, 0, bytesRead);
if (bytesRead > 0) {
totalWritten += bytesRead;
}
}
} while (totalWritten < limit && (bytesRead > 0 || is.available() > 0));
return totalWritten;
} finally {
releaseTempBuffer(buffer);
}
}
/**
* Opimized writing to FileOutputStream using a channel
* @param is
* @param fos
* @return false if lock was not aquired
* @throws IOException
*/
public static boolean inputStream2FileOutputStream(InputStream is,
FileOutputStream fos)
throws IOException {
// See if a file channel and lock can be obtained on the FileOutputStream
FileChannel channel = null;
FileLock lock = null;
ByteBuffer bb = null;
try {
channel = fos.getChannel();
if (channel != null) {
lock = channel.tryLock();
}
bb = getTempByteBuffer();
} catch (Throwable t) {
}
if (lock == null || bb == null || !bb.hasArray()) {
releaseTempByteBuffer(bb);
return false; // lock could not be set or bb does not have direct array access
}
try {
// Read directly into the ByteBuffer array
int bytesRead = is.read(bb.array());
// Continue reading until no bytes are read and no
// bytes are now available.
while (bytesRead > 0 || is.available() > 0) {
if (bytesRead > 0) {
int written = 0;
if (bytesRead < BUFFER_LEN) {
// If the ByteBuffer is not full, allocate a new one
ByteBuffer temp = ByteBuffer.allocate(bytesRead);
temp.put(bb.array(), 0, bytesRead);
temp.position(0);
written = channel.write(temp);
} else {
// Write to channel
bb.position(0);
written = channel.write(bb);
bb.clear();
}
}
// REVIEW: Do we need to ensure that bytesWritten is
// the same as the number of bytes sent ?
bytesRead = is.read(bb.array());
}
} finally {
// Release the lock
lock.release();
releaseTempByteBuffer(bb);
}
return true;
}
/**
* The method checks to see if attachment is eligble for optimization.
* An attachment is eligible for optimization if and only if the size of
* the attachment is greated then the optimzation threshold size limit.
* if the Content represented by DataHandler has size less than the
* optimize threshold size, the attachment will not be eligible for
* optimization, instead it will be inlined.
* returns 1 if DataHandler data is bigger than limit.
* returns 0 if DataHandler data is smaller.
* return -1 if an error occurs or unsupported.
* @param in
* @return
* @throws IOException
*/
public static int doesDataHandlerExceedLimit(DataHandler dh, int limit){
if(log.isDebugEnabled()){
log.debug("start isEligibleForOptimization");
}
//If Optimized Threshold not set return true.
if(limit==0){
if(log.isDebugEnabled()){
log.debug("optimizedThreshold not set");
}
return -1;
}
InputStream in=null;
//read bytes from input stream to check if
//attachment size is greater than the optimized size.
int totalRead = 0;
try{
in = getInputStream(dh);
if(in.markSupported()){
in.mark(limit);
}
if(in == null){
if(log.isDebugEnabled()){
log.debug("Input Stream is null");
}
return -1;
}
do{
byte[] buffer = getTempBuffer();
int bytesRead = in.read(buffer, 0, BUFFER_LEN);
totalRead = totalRead+bytesRead;
releaseTempBuffer(buffer);
}while((limit>totalRead) && (in.available()>0));
if(in.markSupported()){
in.reset();
}
if(totalRead > limit){
if(log.isDebugEnabled()){
log.debug("Attachment size greater than limit");
}
return 1;
}
}catch(Exception e){
log.warn(e.getMessage());
return -1;
}
if(log.isDebugEnabled()){
log.debug("Attachment Size smaller than limit");
}
return 0;
}
private static java.io.InputStream getInputStream(DataHandler dataHandlerObject) throws OMException {
InputStream inStream = null;
javax.activation.DataHandler dataHandler =
(javax.activation.DataHandler) dataHandlerObject;
try {
DataSource ds = dataHandler.getDataSource();
if(ds instanceof FileDataSource){
inStream = ds.getInputStream();
}else{
inStream = dataHandler.getDataSource().getInputStream();
if(!inStream.markSupported()){
throw new OMException("Stream does not support mark, Cannot read the stream as DataSource will be consumed.");
}
}
} catch (IOException e) {
throw new OMException(
"Cannot get InputStream from DataHandler." + e);
}
return inStream;
}
private static synchronized byte[] getTempBuffer() {
// Try using cached buffer
synchronized(_cacheBuffer) {
if (!_cacheBufferInUse) {
_cacheBufferInUse = true;
return _cacheBuffer;
}
}
// Cache buffer in use, create new buffer
return new byte[BUFFER_LEN];
}
private static void releaseTempBuffer(byte[] buffer) {
// Try using cached buffer
synchronized(_cacheBuffer) {
if (buffer == _cacheBuffer) {
_cacheBufferInUse = false;
}
}
}
private static synchronized ByteBuffer getTempByteBuffer() {
// Try using cached buffer
synchronized(_cacheByteBuffer) {
if (!_cacheByteBufferInUse) {
_cacheByteBufferInUse = true;
return _cacheByteBuffer;
}
}
// Cache buffer in use, create new buffer
return ByteBuffer.allocate(BUFFER_LEN);
}
private static void releaseTempByteBuffer(ByteBuffer buffer) {
// Try using cached buffer
synchronized(_cacheByteBuffer) {
if (buffer == _cacheByteBuffer) {
_cacheByteBufferInUse = false;
}
}
}
}