blob: 05e3d48a469a2a9ce5dce2112654f9df258e5bbb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.io.file.tfile;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* Several related classes to support chunk-encoded sub-streams on top of a
* regular stream.
*/
final class Chunk {
/**
* Prevent the instantiation of class.
*/
private Chunk() {
// nothing
}
/**
* Decoding a chain of chunks encoded through ChunkEncoder or
* SingleChunkEncoder.
*/
static public class ChunkDecoder extends InputStream {
private DataInputStream in = null;
private boolean lastChunk;
private int remain = 0;
private boolean closed;
public ChunkDecoder() {
lastChunk = true;
closed = true;
}
public void reset(DataInputStream downStream) {
// no need to wind forward the old input.
in = downStream;
lastChunk = false;
remain = 0;
closed = false;
}
/**
* Constructor
*
* @param in
* The source input stream which contains chunk-encoded data
* stream.
*/
public ChunkDecoder(DataInputStream in) {
this.in = in;
lastChunk = false;
closed = false;
}
/**
* Have we reached the last chunk.
*
* @return true if we have reached the last chunk.
* @throws java.io.IOException
*/
public boolean isLastChunk() throws IOException {
checkEOF();
return lastChunk;
}
/**
* How many bytes remain in the current chunk?
*
* @return remaining bytes left in the current chunk.
* @throws java.io.IOException
*/
public int getRemain() throws IOException {
checkEOF();
return remain;
}
/**
* Reading the length of next chunk.
*
* @throws java.io.IOException
* when no more data is available.
*/
private void readLength() throws IOException {
remain = Utils.readVInt(in);
if (remain >= 0) {
lastChunk = true;
} else {
remain = -remain;
}
}
/**
* Check whether we reach the end of the stream.
*
* @return false if the chunk encoded stream has more data to read (in which
* case available() will be greater than 0); true otherwise.
* @throws java.io.IOException
* on I/O errors.
*/
private boolean checkEOF() throws IOException {
if (isClosed()) return true;
while (true) {
if (remain > 0) return false;
if (lastChunk) return true;
readLength();
}
}
@Override
/*
* This method never blocks the caller. Returning 0 does not mean we reach
* the end of the stream.
*/
public int available() {
return remain;
}
@Override
public int read() throws IOException {
if (checkEOF()) return -1;
int ret = in.read();
if (ret < 0) throw new IOException("Corrupted chunk encoding stream");
--remain;
return ret;
}
@Override
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
}
if (!checkEOF()) {
int n = Math.min(remain, len);
int ret = in.read(b, off, n);
if (ret < 0) throw new IOException("Corrupted chunk encoding stream");
remain -= ret;
return ret;
}
return -1;
}
@Override
public long skip(long n) throws IOException {
if (!checkEOF()) {
long ret = in.skip(Math.min(remain, n));
remain -= ret;
return ret;
}
return 0;
}
@Override
public boolean markSupported() {
return false;
}
public boolean isClosed() {
return closed;
}
@Override
public void close() throws IOException {
if (closed == false) {
try {
while (!checkEOF()) {
skip(Integer.MAX_VALUE);
}
} finally {
closed = true;
}
}
}
}
/**
* Chunk Encoder. Encoding the output data into a chain of chunks in the
* following sequences: -len1, byte[len1], -len2, byte[len2], ... len_n,
* byte[len_n]. Where len1, len2, ..., len_n are the lengths of the data
* chunks. Non-terminal chunks have their lengths negated. Non-terminal chunks
* cannot have length 0. All lengths are in the range of 0 to
* Integer.MAX_VALUE and are encoded in Utils.VInt format.
*/
static public class ChunkEncoder extends OutputStream {
/**
* The data output stream it connects to.
*/
private DataOutputStream out;
/**
* The internal buffer that is only used when we do not know the advertised
* size.
*/
private byte buf[];
/**
* The number of valid bytes in the buffer. This value is always in the
* range <tt>0</tt> through <tt>buf.length</tt>; elements <tt>buf[0]</tt>
* through <tt>buf[count-1]</tt> contain valid byte data.
*/
private int count;
/**
* Constructor.
*
* @param out
* the underlying output stream.
* @param buf
* user-supplied buffer. The buffer would be used exclusively by
* the ChunkEncoder during its life cycle.
*/
public ChunkEncoder(DataOutputStream out, byte[] buf) {
this.out = out;
this.buf = buf;
this.count = 0;
}
/**
* Write out a chunk.
*
* @param chunk
* The chunk buffer.
* @param offset
* Offset to chunk buffer for the beginning of chunk.
* @param len
* @param last
* Is this the last call to flushBuffer?
*/
private void writeChunk(byte[] chunk, int offset, int len, boolean last)
throws IOException {
if (last) { // always write out the length for the last chunk.
Utils.writeVInt(out, len);
if (len > 0) {
out.write(chunk, offset, len);
}
} else {
if (len > 0) {
Utils.writeVInt(out, -len);
out.write(chunk, offset, len);
}
}
}
/**
* Write out a chunk that is a concatenation of the internal buffer plus
* user supplied data. This will never be the last block.
*
* @param data
* User supplied data buffer.
* @param offset
* Offset to user data buffer.
* @param len
* User data buffer size.
*/
private void writeBufData(byte[] data, int offset, int len)
throws IOException {
if (count + len > 0) {
Utils.writeVInt(out, -(count + len));
out.write(buf, 0, count);
count = 0;
out.write(data, offset, len);
}
}
/**
* Flush the internal buffer.
*
* Is this the last call to flushBuffer?
*
* @throws java.io.IOException
*/
private void flushBuffer() throws IOException {
if (count > 0) {
writeChunk(buf, 0, count, false);
count = 0;
}
}
@Override
public void write(int b) throws IOException {
if (count >= buf.length) {
flushBuffer();
}
buf[count++] = (byte) b;
}
@Override
public void write(byte b[]) throws IOException {
write(b, 0, b.length);
}
@Override
public void write(byte b[], int off, int len) throws IOException {
if ((len + count) >= buf.length) {
/*
* If the input data do not fit in buffer, flush the output buffer and
* then write the data directly. In this way buffered streams will
* cascade harmlessly.
*/
writeBufData(b, off, len);
return;
}
System.arraycopy(b, off, buf, count, len);
count += len;
}
@Override
public void flush() throws IOException {
flushBuffer();
out.flush();
}
@Override
public void close() throws IOException {
if (buf != null) {
try {
writeChunk(buf, 0, count, true);
} finally {
buf = null;
out = null;
}
}
}
}
/**
* Encode the whole stream as a single chunk. Expecting to know the size of
* the chunk up-front.
*/
static public class SingleChunkEncoder extends OutputStream {
/**
* The data output stream it connects to.
*/
private final DataOutputStream out;
/**
* The remaining bytes to be written.
*/
private int remain;
private boolean closed = false;
/**
* Constructor.
*
* @param out
* the underlying output stream.
* @param size
* The total # of bytes to be written as a single chunk.
* @throws java.io.IOException
* if an I/O error occurs.
*/
public SingleChunkEncoder(DataOutputStream out, int size)
throws IOException {
this.out = out;
this.remain = size;
Utils.writeVInt(out, size);
}
@Override
public void write(int b) throws IOException {
if (remain > 0) {
out.write(b);
--remain;
} else {
throw new IOException("Writing more bytes than advertised size.");
}
}
@Override
public void write(byte b[]) throws IOException {
write(b, 0, b.length);
}
@Override
public void write(byte b[], int off, int len) throws IOException {
if (remain >= len) {
out.write(b, off, len);
remain -= len;
} else {
throw new IOException("Writing more bytes than advertised size.");
}
}
@Override
public void flush() throws IOException {
out.flush();
}
@Override
public void close() throws IOException {
if (closed == true) {
return;
}
try {
if (remain > 0) {
throw new IOException("Writing less bytes than advertised size.");
}
} finally {
closed = true;
}
}
}
}