blob: d730671f539dad56746a456036abe62da037aed0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.Checksum;
/**
* This is a generic output stream for generating checksums for
* data before it is written to the underlying stream
*/
abstract public class FSOutputSummer extends OutputStream {
// data checksum
private Checksum sum;
// internal buffer for storing data before it is checksumed
private byte buf[];
// internal buffer for storing checksum
private byte checksum[];
// The number of valid bytes in the buffer.
private int count;
protected FSOutputSummer(Checksum sum, int maxChunkSize, int checksumSize) {
this.sum = sum;
this.buf = new byte[maxChunkSize];
this.checksum = new byte[checksumSize];
this.count = 0;
}
/* write the data chunk in <code>b</code> staring at <code>offset</code> with
* a length of <code>len</code>, and its checksum
*/
protected abstract void writeChunk(byte[] b, int offset, int len, byte[] checksum)
throws IOException;
/** Write one byte */
public synchronized void write(int b) throws IOException {
sum.update(b);
buf[count++] = (byte)b;
if(count == buf.length) {
flushBuffer();
}
}
/**
* Writes <code>len</code> bytes from the specified byte array
* starting at offset <code>off</code> and generate a checksum for
* each data chunk.
*
* <p> This method stores bytes from the given array into this
* stream's buffer before it gets checksumed. The buffer gets checksumed
* and flushed to the underlying output stream when all data
* in a checksum chunk are in the buffer. If the buffer is empty and
* requested length is at least as large as the size of next checksum chunk
* size, this method will checksum and write the chunk directly
* to the underlying output stream. Thus it avoids uneccessary data copy.
*
* @param b the data.
* @param off the start offset in the data.
* @param len the number of bytes to write.
* @exception IOException if an I/O error occurs.
*/
public synchronized void write(byte b[], int off, int len)
throws IOException {
if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
for (int n=0;n<len;n+=write1(b, off+n, len-n)) {
}
}
/**
* Write a portion of an array, flushing to the underlying
* stream at most once if necessary.
*/
private int write1(byte b[], int off, int len) throws IOException {
if(count==0 && len>=buf.length) {
// local buffer is empty and user data has one chunk
// checksum and output data
final int length = buf.length;
sum.update(b, off, length);
writeChecksumChunk(b, off, length, false);
return length;
}
// copy user data to local buffer
int bytesToCopy = buf.length-count;
bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
sum.update(b, off, bytesToCopy);
System.arraycopy(b, off, buf, count, bytesToCopy);
count += bytesToCopy;
if (count == buf.length) {
// local buffer is full
flushBuffer();
}
return bytesToCopy;
}
/* Forces any buffered output bytes to be checksumed and written out to
* the underlying output stream.
*/
protected synchronized void flushBuffer() throws IOException {
flushBuffer(false);
}
/* Forces any buffered output bytes to be checksumed and written out to
* the underlying output stream. If keep is true, then the state of
* this object remains intact.
*/
protected synchronized void flushBuffer(boolean keep) throws IOException {
if (count != 0) {
int chunkLen = count;
count = 0;
writeChecksumChunk(buf, 0, chunkLen, keep);
if (keep) {
count = chunkLen;
}
}
}
/** Generate checksum for the data chunk and output data chunk & checksum
* to the underlying output stream. If keep is true then keep the
* current checksum intact, do not reset it.
*/
private void writeChecksumChunk(byte b[], int off, int len, boolean keep)
throws IOException {
int tempChecksum = (int)sum.getValue();
if (!keep) {
sum.reset();
}
int2byte(tempChecksum, checksum);
writeChunk(b, off, len, checksum);
}
/**
* Converts a checksum integer value to a byte stream
*/
static public byte[] convertToByteStream(Checksum sum, int checksumSize) {
return int2byte((int)sum.getValue(), new byte[checksumSize]);
}
static byte[] int2byte(int integer, byte[] bytes) {
bytes[0] = (byte)((integer >>> 24) & 0xFF);
bytes[1] = (byte)((integer >>> 16) & 0xFF);
bytes[2] = (byte)((integer >>> 8) & 0xFF);
bytes[3] = (byte)((integer >>> 0) & 0xFF);
return bytes;
}
/**
* Resets existing buffer with a new one of the specified size.
*/
protected synchronized void resetChecksumChunk(int size) {
sum.reset();
this.buf = new byte[size];
this.count = 0;
}
}