blob: 1cbd20b652f033f44c150ff5b8256bcfdec08c29 [file] [log] [blame]
/*
* Copyright 2001-2004 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.axis.attachments;
import org.apache.axis.InternalException;
import org.apache.axis.MessageContext;
import org.apache.axis.components.logger.LogFactory;
import org.apache.axis.utils.Messages;
import org.apache.commons.logging.Log;
import java.io.File;
import java.io.BufferedInputStream;
/**
* This class allows small attachments to be cached in memory, while large ones are
* cached out. It implements a Java Activiation Data source interface.
*
* @author Rick Rineholt
*/
public class ManagedMemoryDataSource implements javax.activation.DataSource {
/** Field log */
protected static Log log =
LogFactory.getLog(ManagedMemoryDataSource.class.getName());
/**
* The content type. This defaults to
* <code>application/octet-stream</code>.
*/
protected String contentType = "application/octet-stream";
/** The incoming source stream. */
java.io.InputStream ss = null;
/** Field MIN_MEMORY_DISK_CACHED */
public static final int MIN_MEMORY_DISK_CACHED = -1;
/** Field MAX_MEMORY_DISK_CACHED */
public static final int MAX_MEMORY_DISK_CACHED = 16 * 1024;
/** Field maxCached */
protected int maxCached = MAX_MEMORY_DISK_CACHED; // max in memory cached. Default.
// If set the file the disk is cached to.
/** Field diskCacheFile */
protected java.io.File diskCacheFile = null;
// A list of open input Streams.
/** Field readers */
protected java.util.WeakHashMap readers = new java.util.WeakHashMap();
/**
* Flag to show if the resources behind this have been deleted.
*/
protected boolean deleted =
false;
// Memory is allocated in these size chunks.
/** Field READ_CHUNK_SZ */
public static final int READ_CHUNK_SZ = 32 * 1024;
/** Field debugEnabled */
protected boolean debugEnabled = false; // Log debugging if true.
// Should not be called;
/**
* Constructor ManagedMemoryDataSource.
*/
protected ManagedMemoryDataSource() {
}
/**
* Create a new boundary stream.
*
* @param ss is the source input stream that is used to create this data source.
* @param maxCached This is the max memory that is to be used to cache the data.
* @param contentType the mime type for this data stream.
* by buffering you can some effiency in searching.
*
* @throws java.io.IOException
*/
public ManagedMemoryDataSource(
java.io.InputStream ss, int maxCached, String contentType)
throws java.io.IOException {
this(ss, maxCached, contentType, false);
}
/**
* Create a new boundary stream.
*
* @param ss is the source input stream that is used to create this data source.
* @param maxCached This is the max memory that is to be used to cache the data.
* @param contentType the mime type for this data stream.
* by buffering you can some effiency in searching.
* @param readall if true will read in the whole source.
*
* @throws java.io.IOException
*/
public ManagedMemoryDataSource(
java.io.InputStream ss, int maxCached, String contentType, boolean readall)
throws java.io.IOException {
if(ss instanceof BufferedInputStream) {
this.ss = ss;
} else {
this.ss = new BufferedInputStream(ss);
}
this.maxCached = maxCached;
if ((null != contentType) && (contentType.length() != 0)) {
this.contentType = contentType;
}
if (maxCached < MIN_MEMORY_DISK_CACHED) {
throw new IllegalArgumentException(
Messages.getMessage("badMaxCached", "" + maxCached));
}
if (log.isDebugEnabled()) {
debugEnabled = true; // Logging should be initialized by time;
}
// for now read all in to disk.
if (readall) {
byte[] readbuffer = new byte[READ_CHUNK_SZ];
int read = 0;
do {
read = ss.read(readbuffer);
if (read > 0) {
write(readbuffer, read);
}
} while (read > -1);
close();
}
}
/* javax.activation.Interface DataSource implementation */
/**
* This method returns the MIME type of the data in the form of a string.
* @return The mime type.
*/
public java.lang.String getContentType() {
return contentType;
}
/**
* This method returns an InputStream representing the the data and throws the appropriate exception if it can not do so.
* @return the java.io.InputStream for the data source.
*
* @throws java.io.IOException
*/
public synchronized java.io.InputStream getInputStream()
throws java.io.IOException {
/*
* if (memorybuflist == null) {
* return new java.io.FileInputStream(diskCacheFile);
* }
* else
*/
return new Instream(); // Return the memory held stream.
}
/**
* This will flush any memory source to disk and
* provide the name of the file if desired.
*
* @return the name of the file of the stream
*/
public java.lang.String getName() {
String ret = null;
try {
flushToDisk();
if (diskCacheFile != null) {
ret = diskCacheFile.getAbsolutePath();
}
} catch (Exception e) {
diskCacheFile = null;
}
return ret;
}
/**
* This method returns an OutputStream where the data can be written and
* throws the appropriate exception if it can not do so.
* NOT SUPPORTED, not need for axis, data sources are create by constructors.
*
*
* @return always <code>null</code>
*
* @throws java.io.IOException
*/
public java.io.OutputStream getOutputStream() throws java.io.IOException {
return null;
}
/** The linked list to hold the in memory buffers. */
protected java.util.LinkedList memorybuflist =
new java.util.LinkedList();
/** Hold the last memory buffer. */
protected byte[] currentMemoryBuf = null;
/** The number of bytes written to the above buffer. */
protected int currentMemoryBufSz =
0;
/** The total size in bytes in this data source. */
protected long totalsz = 0;
/** This is the cached disk stream. */
protected java.io.BufferedOutputStream cachediskstream =
null;
/** If true the source input stream is now closed. */
protected boolean closed = false;
/**
* Write bytes to the stream.
*
* @param data all bytes of this array are written to the stream
* @throws java.io.IOException if there was a problem writing the data
*/
protected void write(byte[] data) throws java.io.IOException {
write(data, data.length);
}
/**
* This method is a low level write.
* Note it is designed to in the future to allow streaming to both memory
* AND to disk simultaneously.
*
* @param data
* @param length
*
* @throws java.io.IOException
*/
protected synchronized void write(byte[] data, int length)
throws java.io.IOException {
if (closed) {
throw new java.io.IOException(Messages.getMessage("streamClosed"));
}
int writesz = length;
int byteswritten = 0;
if ((null != memorybuflist)
&& (totalsz + writesz > maxCached)) { // Cache to disk.
if (null == cachediskstream) { // Need to create a disk cache
flushToDisk();
}
}
if (memorybuflist != null) { // Can write to memory.
do {
if (null == currentMemoryBuf) {
currentMemoryBuf = new byte[READ_CHUNK_SZ];
currentMemoryBufSz = 0;
memorybuflist.add(currentMemoryBuf);
}
// bytes to write is the min. between the remaining bytes and what is left in this buffer.
int bytes2write = Math.min((writesz - byteswritten),
(currentMemoryBuf.length
- currentMemoryBufSz));
// copy the data.
System.arraycopy(data, byteswritten, currentMemoryBuf,
currentMemoryBufSz, bytes2write);
byteswritten += bytes2write;
currentMemoryBufSz += bytes2write;
if (byteswritten
< writesz) { // only get more if we really need it.
currentMemoryBuf = new byte[READ_CHUNK_SZ];
currentMemoryBufSz = 0;
memorybuflist.add(currentMemoryBuf); // add it to the chain.
}
} while (byteswritten < writesz);
}
if (null != cachediskstream) { // Write to the out going stream.
cachediskstream.write(data, 0, length);
}
totalsz += writesz;
return;
}
/**
* This method is a low level write.
* Close the stream.
*
* @throws java.io.IOException
*/
protected synchronized void close() throws java.io.IOException {
if (!closed) {
closed = true; // Markit as closed.
if (null != cachediskstream) { // close the disk cache.
cachediskstream.close();
cachediskstream = null;
}
if (null != memorybuflist) { // There is a memory buffer.
if (currentMemoryBufSz > 0) {
byte[] tmp =
new byte[currentMemoryBufSz]; // Get the last buffer and make it the sizeof the actual data.
System.arraycopy(currentMemoryBuf, 0, tmp, 0,
currentMemoryBufSz);
memorybuflist.set(
memorybuflist.size() - 1,
tmp); // Now replace the last buffer with this size.
}
currentMemoryBuf = null; // No need for this anymore.
}
}
}
protected void finalize() throws Throwable {
if (null != cachediskstream) { // close the disk cache.
cachediskstream.close();
cachediskstream = null;
}
}
/**
* Routine to flush data to disk if is in memory.
*
* @throws java.io.IOException
* @throws java.io.FileNotFoundException
*/
protected void flushToDisk()
throws java.io.IOException, java.io.FileNotFoundException {
java.util.LinkedList ml = memorybuflist;
log.debug(Messages.getMessage("maxCached", "" + maxCached,
"" + totalsz));
if (ml != null) {
if (null == cachediskstream) { // Need to create a disk cache
try {
MessageContext mc = MessageContext.getCurrentContext();
String attdir = (mc == null)
? null
: mc.getStrProp(
MessageContext.ATTACHMENTS_DIR);
diskCacheFile = java.io.File.createTempFile("Axis", ".att",
(attdir == null)
? null
: new File(
attdir));
if(log.isDebugEnabled()) {
log.debug(
Messages.getMessage(
"diskCache", diskCacheFile.getAbsolutePath()));
}
cachediskstream = new java.io.BufferedOutputStream(
new java.io.FileOutputStream(diskCacheFile));
int listsz = ml.size();
// Write out the entire memory held store to disk.
for (java.util.Iterator it = ml.iterator();
it.hasNext();) {
byte[] rbuf = (byte[]) it.next();
int bwrite = (listsz-- == 0)
? currentMemoryBufSz
: rbuf.length;
cachediskstream.write(rbuf, 0, bwrite);
if (closed) {
cachediskstream.close();
cachediskstream = null;
}
}
memorybuflist = null;
} catch (java.lang.SecurityException se) {
diskCacheFile = null;
cachediskstream = null;
maxCached = java.lang.Integer.MAX_VALUE;
log.info(Messages.getMessage("nodisk00"), se);
}
}
}
}
public synchronized boolean delete() {
boolean ret = false;
deleted = true;
memorybuflist = null;
if (diskCacheFile != null) {
if (cachediskstream != null) {
try {
cachediskstream.close();
} catch (Exception e) {
}
cachediskstream = null;
}
Object[] array = readers.keySet().toArray();
for (int i = 0; i < array.length; i++) {
Instream stream = (Instream) array[i];
if (null != stream) {
try {
stream.close();
} catch (Exception e) {
}
}
}
readers.clear();
try {
diskCacheFile.delete();
ret = true;
} catch (Exception e) {
// Give it our best shot.
diskCacheFile.deleteOnExit();
}
}
return ret;
}
// inner classes cannot have static declarations...
/** Field is_log */
protected static Log is_log =
LogFactory.getLog(Instream.class.getName());
/**
* Inner class to handle getting an input stream to this data source
* Handles creating an input stream to the source.
*/
private class Instream extends java.io.InputStream {
/** bytes read. */
protected long bread = 0;
/** The real stream. */
java.io.FileInputStream fin = null;
/** The position in the list were we are reading from. */
int currentIndex =
0;
/** the buffer we are currently reading from. */
byte[] currentBuf = null;
/** The current position in there. */
int currentBufPos = 0;
/** The read stream has been closed. */
boolean readClosed = false;
/**
* Constructor Instream.
*
* @throws java.io.IOException if the Instream could not be created or
* if the data source has been deleted
*/
protected Instream() throws java.io.IOException {
if (deleted) {
throw new java.io.IOException(
Messages.getMessage("resourceDeleted"));
}
readers.put(this, null);
}
/**
* Query for the number of bytes available for reading.
*
* @return the number of bytes left
*
* @throws java.io.IOException if this stream is not in a state that
* supports reading
*/
public int available() throws java.io.IOException {
if (deleted) {
throw new java.io.IOException(
Messages.getMessage("resourceDeleted"));
}
if (readClosed) {
throw new java.io.IOException(
Messages.getMessage("streamClosed"));
}
int ret = new Long(Math.min(Integer.MAX_VALUE, totalsz - bread)).intValue();
if (debugEnabled) {
is_log.debug("available() = " + ret + ".");
}
return ret;
}
/**
* Read a byte from the stream.
*
* @return byte read or -1 if no more data.
*
* @throws java.io.IOException
*/
public int read() throws java.io.IOException {
synchronized (ManagedMemoryDataSource.this) {
byte[] retb = new byte[1];
int br = read(retb, 0, 1);
if (br == -1) {
return -1;
}
return 0xFF & retb[0];
}
}
/**
* Not supported.
*
* @return
*/
public boolean markSupported() {
if (debugEnabled) {
is_log.debug("markSupported() = " + false + ".");
}
return false;
}
/**
* Not supported.
*
* @param readlimit
*/
public void mark(int readlimit) {
if (debugEnabled) {
is_log.debug("mark()");
}
}
/**
* Not supported.
*
* @throws java.io.IOException
*/
public void reset() throws java.io.IOException {
if (debugEnabled) {
is_log.debug("reset()");
}
throw new java.io.IOException(Messages.getMessage("noResetMark"));
}
public long skip(long skipped) throws java.io.IOException {
if (debugEnabled) {
is_log.debug("skip(" + skipped + ").");
}
if (deleted) {
throw new java.io.IOException(
Messages.getMessage("resourceDeleted"));
}
if (readClosed) {
throw new java.io.IOException(
Messages.getMessage("streamClosed"));
}
if (skipped < 1) {
return 0; // nothing to skip.
}
synchronized (ManagedMemoryDataSource.this) {
skipped = Math.min(skipped,
totalsz
- bread); // only skip what we've read.
if (skipped == 0) {
return 0;
}
java.util.List ml = memorybuflist; // hold the memory list.
int bwritten = 0;
if (ml != null) {
if (null == currentBuf) { // get the buffer we need to read from.
currentBuf = (byte[]) ml.get(currentIndex);
currentBufPos = 0; // start reading from the begining.
}
do {
long bcopy = Math.min(currentBuf.length
- currentBufPos,
skipped - bwritten);
bwritten += bcopy;
currentBufPos += bcopy;
if (bwritten < skipped) {
currentBuf = (byte[]) ml.get(++currentIndex);
currentBufPos = 0;
}
} while (bwritten < skipped);
}
if (null != fin) {
fin.skip(skipped);
}
bread += skipped;
}
if (debugEnabled) {
is_log.debug("skipped " + skipped + ".");
}
return skipped;
}
public int read(byte[] b, int off, int len) throws java.io.IOException {
if (debugEnabled) {
is_log.debug(this.hashCode() + " read(" + off + ", " + len
+ ")");
}
if (deleted) {
throw new java.io.IOException(
Messages.getMessage("resourceDeleted"));
}
if (readClosed) {
throw new java.io.IOException(
Messages.getMessage("streamClosed"));
}
if (b == null) {
throw new InternalException(Messages.getMessage("nullInput"));
}
if (off < 0) {
throw new IndexOutOfBoundsException(
Messages.getMessage("negOffset", "" + off));
}
if (len < 0) {
throw new IndexOutOfBoundsException(
Messages.getMessage("length", "" + len));
}
if (len + off > b.length) {
throw new IndexOutOfBoundsException(
Messages.getMessage("writeBeyond"));
}
if (len == 0) {
return 0;
}
int bwritten = 0;
synchronized (ManagedMemoryDataSource.this) {
if (bread == totalsz) {
return -1;
}
java.util.List ml = memorybuflist;
long longlen = len;
longlen = Math.min(
longlen,
totalsz
- bread); // Only return the number of bytes in the data store that is left.
len = new Long(longlen).intValue();
if (debugEnabled) {
is_log.debug("len = " + len);
}
if (ml != null) {
if (null == currentBuf) { // Get the buffer we need to read from.
currentBuf = (byte[]) ml.get(currentIndex);
currentBufPos = 0; // New buffer start from the begining.
}
do {
// The bytes to copy, the minimum of the bytes left in this buffer or bytes remaining.
int bcopy = Math.min(currentBuf.length - currentBufPos,
len - bwritten);
// Copy the data.
System.arraycopy(currentBuf, currentBufPos, b,
off + bwritten, bcopy);
bwritten += bcopy;
currentBufPos += bcopy;
if (bwritten < len) { // Get the next buffer.
currentBuf = (byte[]) ml.get(++currentIndex);
currentBufPos = 0;
}
} while (bwritten < len);
}
if ((bwritten == 0) && (null != diskCacheFile)) {
if (debugEnabled) {
is_log.debug(Messages.getMessage("reading", "" + len));
}
if (null == fin) { // we are now reading from disk.
if (debugEnabled) {
is_log.debug(
Messages.getMessage(
"openBread",
diskCacheFile.getCanonicalPath()));
}
if (debugEnabled) {
is_log.debug(Messages.getMessage("openBread",
"" + bread));
}
fin = new java.io.FileInputStream(diskCacheFile);
if (bread > 0) {
fin.skip(bread); // Skip what we've read so far.
}
}
if (cachediskstream != null) {
if (debugEnabled) {
is_log.debug(Messages.getMessage("flushing"));
}
cachediskstream.flush();
}
if (debugEnabled) {
is_log.debug(Messages.getMessage("flushing"));
is_log.debug("len=" + len);
is_log.debug("off=" + off);
is_log.debug("b.length=" + b.length);
}
bwritten = fin.read(b, off, len);
}
if (bwritten > 0) {
bread += bwritten;
}
}
if (debugEnabled) {
is_log.debug(this.hashCode()
+ Messages.getMessage("read", "" + bwritten));
}
return bwritten;
}
/**
* close the stream.
*
* @throws java.io.IOException
*/
public synchronized void close() throws java.io.IOException {
if (debugEnabled) {
is_log.debug("close()");
}
if (!readClosed) {
readers.remove(this);
readClosed = true;
if (fin != null) {
fin.close();
}
fin = null;
}
}
protected void finalize() throws Throwable {
close();
}
} // endof innerclass Instream
// Used to test.
/**
* Method main
*
* @param arg
*/
public static void main(String arg[]) { // test
try {
String readFile = arg[0];
String writeFile = arg[1];
java.io.FileInputStream ss =
new java.io.FileInputStream(readFile);
ManagedMemoryDataSource ms =
new ManagedMemoryDataSource(ss, 1024 * 1024, "foo/data", true);
javax.activation.DataHandler dh =
new javax.activation.DataHandler(ms);
java.io.InputStream is = dh.getInputStream();
java.io.FileOutputStream fo =
new java.io.FileOutputStream(writeFile);
byte[] buf = new byte[512];
int read = 0;
do {
read = is.read(buf);
if (read > 0) {
fo.write(buf, 0, read);
}
} while (read > -1);
fo.close();
is.close();
} catch (java.lang.Exception e) {
log.error(Messages.getMessage("exception00"), e);
}
}
/**
* get the filename of the content if it is cached to disk.
* @return file object pointing to file, or null for memory-stored content
*/
public File getDiskCacheFile() {
return diskCacheFile;
}
}