| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| /* Took the basic code from Axis 1.2 and modified to fit into the cloud code base */ |
| |
| package com.cloud.bridge.io; |
| |
| import java.io.FilterInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| |
| import org.apache.log4j.Logger; |
| |
| /** |
| * This class takes the input stream and turns it multiple streams. |
| DIME version 0 format |
| <pre> |
| 0 1 2 3 |
| 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ --- |
| | VERSION |B|E|C| TYPE_T| OPT_T | OPTIONS_LENGTH | A |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | ID_LENGTH | TYPE_LENGTH | Always present 12 bytes |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ even on chunked data. |
| | DATA_LENGTH | V |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ --- |
| | / |
| / OPTIONS + PADDING / |
| / (absent for version 0) | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | / |
| / ID + PADDING / |
| / | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | / |
| / TYPE + PADDING / |
| / | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | / |
| / DATA + PADDING / |
| / | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| </pre> |
| * This implementation of input stream does not support marking operations. |
| * |
| * Incoming data is DIME encoded when its MIME type is "application/dime". |
| * Then use this class to pull out 2 streams: |
| * (1) The first stream is the SOAP request, |
| * (2) The second stream is a chunked attachment (e.g., a file to store) |
| * |
| * The DIME format is defined at this reference: |
| * http://msdn.microsoft.com/en-us/library/aa480488.aspx |
| * |
| * @author Rick Rineholt |
| */ |
| public class DimeDelimitedInputStream extends FilterInputStream { |
| protected final static Logger logger = Logger.getLogger(DimeDelimitedInputStream.class); |
| |
| InputStream is = null; //The source input stream. |
| boolean closed = true; //The stream has been closed. |
| boolean theEnd = false; //There are no more streams left. |
| boolean moreChunks = false; //More chunks are a coming! |
| boolean MB = false; //Message begin flag |
| boolean ME = false; //Message end flag |
| String type = null; // |
| String id = null; // |
| String tnf = null; //DIME type format |
| long recordLength = 0L; //length of the current record. |
| long bytesRead = 0L; //How many bytes of the current record have been read. |
| int dataPadLength = 0; //How many pad bytes there are. |
| |
| protected int streamNo = 0; |
| protected IOException streamInError = null; |
| |
| private static byte[] trash = new byte[4]; |
| protected static int streamCount = 0; //number of streams produced. |
| |
| protected static synchronized int newStreamNo() { |
| logger.debug("streamNo " + (streamCount + 1)); |
| return ++streamCount; |
| } |
| |
| /** |
| * There can be multiple streams in a DIME encoding. For example, the first |
| * stream can be a SOAP message, and the second stream a binary attachment (e.g., |
| * a file). During reading after an EOF is returned, this function should be |
| * called to see if there is another stream following the last. |
| * |
| * @return the dime delimited stream, null if there are no more streams |
| * @throws IOException if there was an error loading the data for the next stream |
| */ |
| public synchronized DimeDelimitedInputStream getNextStream() throws IOException { |
| if (null != streamInError) |
| throw streamInError; |
| if (theEnd) |
| return null; |
| |
| //Each Stream must be read in succession |
| if (bytesRead < recordLength || moreChunks) |
| throw new RuntimeException("attach.dimeReadFullyError"); |
| |
| dataPadLength -= readPad(dataPadLength); |
| |
| //Create an new dime stream that comes after this one. |
| return new DimeDelimitedInputStream(this.is); |
| } |
| |
| /** |
| * Create a new dime stream. |
| * |
| * @param is the <code>InputStream</code> to wrap |
| * @throws IOException if anything goes wrong |
| */ |
| public DimeDelimitedInputStream(InputStream is) throws IOException { |
| super(null); |
| streamNo = newStreamNo(); |
| closed = false; |
| this.is = is; |
| readHeader(false); |
| } |
| |
| /** |
| * Make sure to skip the pad which appear in several parts of a DIME message. |
| * @param size |
| * @return |
| * @throws IOException |
| */ |
| private final int readPad(int size) throws IOException { |
| if (0 == size) |
| return 0; |
| int read = readFromStream(trash, 0, size); |
| |
| if (size != read) { |
| streamInError = new IOException("attach.dimeNotPaddedCorrectly"); |
| throw streamInError; |
| } |
| return read; |
| } |
| |
| private final int readFromStream(byte[] b) throws IOException { |
| return readFromStream(b, 0, b.length); |
| } |
| |
| private final int readFromStream(byte[] b, int start, int length) throws IOException { |
| int br = 0; |
| int brTotal = 0; |
| |
| if (length == 0) |
| return 0; |
| |
| do { |
| try { |
| br = is.read(b, brTotal + start, length - brTotal); |
| } catch (IOException e) { |
| streamInError = e; |
| throw e; |
| } |
| if (br > 0) |
| brTotal += br; |
| } while (br > -1 && brTotal < length); |
| |
| return br > -1 ? brTotal : br; |
| } |
| |
| /** |
| * Get the id for this stream part. |
| * @return the id; |
| */ |
| public String getContentId() { |
| return id; |
| } |
| |
| public String getDimeTypeNameFormat() { |
| return tnf; |
| } |
| |
| /** |
| * Get the type, as read from the header. |
| * @return the type of this dime |
| */ |
| public String getType() { |
| return type; |
| } |
| |
| /** |
| * Read from the DIME stream. |
| * |
| * @param b is the array to read into. |
| * @param off is the offset |
| * @return the number of bytes read. -1 if endof stream |
| * @throws IOException if data could not be read from the stream |
| */ |
| public synchronized int read(byte[] b, int off, int len) throws IOException { |
| if (closed) { |
| dataPadLength -= readPad(dataPadLength); |
| throw new IOException("streamClosed"); |
| } |
| return _read(b, off, len); |
| } |
| |
| protected int _read(byte[] b, int off, int len) throws IOException { |
| int totalbytesread = 0; |
| int bytes2read = 0; |
| |
| if (len < 0) |
| throw new IllegalArgumentException("attach.readLengthError" + len); |
| |
| if (off < 0) |
| throw new IllegalArgumentException("attach.readOffsetError" + off); |
| |
| if (b == null) |
| throw new IllegalArgumentException("attach.readArrayNullError"); |
| |
| if (b.length < off + len) |
| throw new IllegalArgumentException("attach.readArraySizeError " + b.length + " " + len + " " + off); |
| |
| if (null != streamInError) |
| throw streamInError; |
| if (0 == len) |
| return 0; //quick. |
| |
| // odd case no data to read -- give back 0 next time -1; |
| if (recordLength == 0 && bytesRead == 0 && !moreChunks) { |
| ++bytesRead; |
| if (ME) |
| finalClose(); |
| return 0; |
| } |
| if (bytesRead >= recordLength && !moreChunks) { |
| dataPadLength -= readPad(dataPadLength); |
| if (ME) |
| finalClose(); |
| return -1; |
| } |
| |
| do { |
| if (bytesRead >= recordLength && moreChunks) |
| readHeader(true); |
| bytes2read = (int)Math.min(recordLength - bytesRead, (long)len - totalbytesread); |
| |
| try { |
| bytes2read = is.read(b, off + totalbytesread, bytes2read); |
| } catch (IOException e) { |
| streamInError = e; |
| throw e; |
| } |
| |
| if (0 < bytes2read) { |
| totalbytesread += bytes2read; |
| bytesRead += bytes2read; |
| } |
| } while (bytes2read > -1 && totalbytesread < len && (bytesRead < recordLength || moreChunks)); |
| |
| if (0 > bytes2read) { |
| if (moreChunks) { |
| streamInError = new IOException("attach.DimeStreamError0"); |
| throw streamInError; |
| } |
| if (bytesRead < recordLength) { |
| streamInError = new IOException("attach.DimeStreamError1 " + (recordLength - bytesRead)); |
| throw streamInError; |
| } |
| if (!ME) { |
| streamInError = new IOException("attach.DimeStreamError0"); |
| throw streamInError; |
| } |
| //in theory the last chunk of data should also have been padded, but lets be tolerant of that. |
| dataPadLength = 0; |
| } else if (bytesRead >= recordLength) { |
| //get rid of pading. |
| try { |
| dataPadLength -= readPad(dataPadLength); |
| } catch (IOException e) { |
| //in theory the last chunk of data should also have been padded, but lets be tolerant of that. |
| if (!ME) |
| throw e; |
| else { |
| dataPadLength = 0; |
| streamInError = null; |
| } |
| } |
| } |
| |
| if (bytesRead >= recordLength && ME) |
| finalClose(); |
| |
| return totalbytesread >= 0 ? totalbytesread : -1; |
| } |
| |
| /** |
| * The DIME header is read into local class data fields and are not |
| * passed as part of the stream data. |
| * |
| * @param isChunk |
| * @throws IOException |
| */ |
| protected void readHeader(boolean isChunk) throws IOException { |
| bytesRead = 0; //How many bytes of the record have been read. |
| |
| if (isChunk) { |
| if (!moreChunks) |
| throw new RuntimeException("attach.DimeStreamError2"); |
| dataPadLength -= readPad(dataPadLength); //Just in case it was left over. |
| } |
| |
| byte[] header = new byte[12]; |
| |
| if (header.length != readFromStream(header)) { |
| streamInError = new IOException("attach.DimeStreamError3 " + header.length); |
| throw streamInError; |
| } |
| |
| //VERSION |
| byte version = (byte)((header[0] >>> 3) & 0x1f); |
| if (version > 1) { |
| streamInError = new IOException("attach.DimeStreamError4 " + version); |
| throw streamInError; |
| } |
| |
| //B, E, C |
| MB = 0 != (0x4 & header[0]); |
| ME = 0 != (0x2 & header[0]); |
| moreChunks = 0 != (0x1 & header[0]); |
| |
| //TYPE_T |
| if (!isChunk) { |
| switch (((header[1] >>> 4) & (byte)0x0f)) { |
| case 0x00: |
| tnf = "UNCHANGED"; |
| break; |
| case 0x01: |
| tnf = "MIME"; |
| break; |
| case 0x02: |
| tnf = "URI"; |
| break; |
| default: |
| tnf = "UNKNOWN"; |
| break; |
| } |
| } |
| |
| //OPTIONS_LENGTH |
| int oneButLastByte = (((int)header[2]) << 8) & 0xff00; |
| int lastByte = (int)header[3] & 0xff; |
| int optionsLength = oneButLastByte | lastByte; |
| |
| //ID_LENGTH |
| oneButLastByte = ((((int)header[4]) << 8) & 0xff00); |
| lastByte = ((int)header[5]) & 0xff; |
| int idLength = oneButLastByte | lastByte; |
| |
| //TYPE_LENGTH |
| oneButLastByte = ((((int)header[6]) << 8) & 0xff00); |
| lastByte = ((int)header[7]) & 0xff; |
| int typeLength = oneButLastByte | lastByte; |
| |
| //DATA_LENGTH |
| recordLength = |
| ((((long)header[8]) << 24) & 0xff000000L) | ((((long)header[9]) << 16) & 0xff0000L) | ((((long)header[10]) << 8) & 0xff00L) | ((long)header[11] & 0xffL); |
| |
| //OPTIONS + PADDING |
| if (0 != optionsLength) { |
| byte[] optBytes = new byte[optionsLength]; |
| |
| if (optionsLength != readFromStream(optBytes)) { |
| streamInError = new IOException("attach.DimeStreamError5 " + optionsLength); |
| throw streamInError; |
| } |
| optBytes = null; // throw it away, don't know anything about options. |
| |
| int pad = (int)((4L - (optionsLength & 0x3L)) & 0x03L); |
| |
| if (pad != readFromStream(header, 0, pad)) { |
| streamInError = new IOException("attach.DimeStreamError7"); |
| throw streamInError; |
| } |
| } |
| |
| // ID + PADDING |
| if (0 < idLength) { |
| byte[] idBytes = new byte[idLength]; |
| |
| if (idLength != readFromStream(idBytes)) { |
| streamInError = new IOException("attach.DimeStreamError8"); |
| throw streamInError; |
| } |
| if (idLength != 0 && !isChunk) |
| id = new String(idBytes); |
| |
| int pad = (int)((4L - (idLength & 0x3L)) & 0x03L); |
| |
| if (pad != readFromStream(header, 0, pad)) { |
| streamInError = new IOException("attach.DimeStreamError9"); |
| throw streamInError; |
| } |
| } |
| |
| //TYPE + PADDING |
| if (0 < typeLength) { |
| byte[] typeBytes = new byte[typeLength]; |
| |
| if (typeLength != readFromStream(typeBytes)) { |
| streamInError = new IOException("attach.DimeStreamError10"); |
| throw streamInError; |
| } |
| if (typeLength != 0 && !isChunk) |
| type = new String(typeBytes); |
| |
| int pad = (int)((4L - (typeLength & 0x3L)) & 0x03L); |
| |
| if (pad != readFromStream(header, 0, pad)) { |
| streamInError = new IOException("attach.DimeStreamError11"); |
| throw streamInError; |
| } |
| } |
| logger.debug("MB:" + MB + ", ME:" + ME + ", CF:" + moreChunks + "Option length:" + optionsLength + ", ID length:" + idLength + ", typeLength:" + typeLength + |
| ", TYPE_T:" + tnf); |
| logger.debug("id:\"" + id + "\""); |
| logger.debug("type:\"" + type + "\""); |
| logger.debug("recordlength:\"" + recordLength + "\""); |
| |
| dataPadLength = (int)((4L - (recordLength & 0x3L)) & 0x03L); |
| } |
| |
| /** |
| * Read from the delimited stream. |
| * |
| * @param b is the array to read into. Read as much as possible |
| * into the size of this array. |
| * @return the number of bytes read. -1 if endof stream |
| * @throws IOException if data could not be read from the stream |
| */ |
| public int read(byte[] b) throws IOException { |
| return read(b, 0, b.length); |
| } |
| |
| // fixme: this seems a bit inefficient |
| /** |
| * Read from the boundary delimited stream. |
| * |
| * @return the byte read, or -1 if endof stream |
| * @throws IOException if there was an error reading the data |
| */ |
| public int read() throws IOException { |
| byte[] b = new byte[1]; |
| int read = read(b, 0, 1); |
| |
| if (read < 0) |
| return -1; // fixme: should we also check for read != 1? |
| return (b[0] & 0xff); // convert byte value to a positive int |
| } |
| |
| /** |
| * Closes the stream. |
| * This will take care of flushing any remaining data to the stream. |
| * Multiple calls to this method will result in the stream being closed once |
| * and then all subsequent calls being ignored. |
| * |
| * @throws IOException if the stream could not be closed |
| */ |
| public void close() throws IOException { |
| synchronized (this) { |
| if (closed) |
| return; |
| closed = true; //mark it closed. |
| } |
| logger.debug("bStreamClosed " + streamNo); |
| |
| if (bytesRead < recordLength || moreChunks) { |
| //We need get this off the stream. Easy way to flush through the stream; |
| byte[] readrest = new byte[1024 * 16]; |
| int bread = 0; |
| |
| do { |
| bread = _read(readrest, 0, readrest.length); //should also close the original stream. |
| } while (bread > -1); |
| } |
| dataPadLength -= readPad(dataPadLength); |
| } |
| |
| /** |
| * Skip n bytes of data in the DIME stream, while reading and processing |
| * any headers in the current stream. |
| * |
| * @param n - number of data bytes to skip |
| * @return number of bytes actually skipped |
| * @throws IOException |
| */ |
| public long skip(long n) throws IOException { |
| long bytesSkipped = 0; |
| long bytes2Read = 0; |
| byte[] dumpbytes = new byte[1024]; |
| |
| while (n > 0) { |
| bytes2Read = (n > 1024 ? 1024 : n); |
| bytes2Read = _read(dumpbytes, 0, (int)bytes2Read); |
| |
| n -= bytes2Read; |
| bytesSkipped += bytes2Read; |
| } |
| |
| return bytesSkipped; |
| } |
| |
| /** |
| * Mark the stream. This is not supported. |
| */ |
| public void mark(int readlimit) { //do nothing |
| } |
| |
| public void reset() throws IOException { |
| streamInError = new IOException("attach.bounday.mns"); |
| throw streamInError; |
| } |
| |
| public boolean markSupported() { |
| return false; |
| } |
| |
| public synchronized int available() throws IOException { |
| if (null != streamInError) |
| throw streamInError; |
| |
| int chunkAvail = (int)Math.min((long)Integer.MAX_VALUE, recordLength - bytesRead); |
| int streamAvail = 0; |
| |
| try { |
| streamAvail = is.available(); |
| } catch (IOException e) { |
| streamInError = e; |
| throw e; |
| } |
| |
| if (chunkAvail == 0 && moreChunks && (12 + dataPadLength) <= streamAvail) { |
| dataPadLength -= readPad(dataPadLength); |
| readHeader(true); |
| return available(); |
| } |
| return Math.min(streamAvail, chunkAvail); |
| } |
| |
| protected void finalClose() throws IOException { |
| try { |
| theEnd = true; |
| if (null != is) |
| is.close(); |
| } finally { |
| is = null; |
| } |
| } |
| } |