blob: 8ce06a9375aa38c0605a4dbc83af3996e19cd94c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.DataChecksum;
import org.apache.htrace.core.TraceScope;
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.Checksum;
/**
* This is a generic output stream for generating checksums for
* data before it is written to the underlying stream
*/
@InterfaceAudience.LimitedPrivate({"HDFS"})
@InterfaceStability.Unstable
abstract public class FSOutputSummer extends OutputStream {
// data checksum
private final DataChecksum sum;
// internal buffer for storing data before it is checksumed
private byte buf[];
// internal buffer for storing checksum
private byte checksum[];
// The number of valid bytes in the buffer.
private int count;
// We want this value to be a multiple of 3 because the native code checksums
// 3 chunks simultaneously. The chosen value of 9 strikes a balance between
// limiting the number of JNI calls and flushing to the underlying stream
// relatively frequently.
private static final int BUFFER_NUM_CHUNKS = 9;
protected FSOutputSummer(DataChecksum sum) {
this.sum = sum;
this.buf = new byte[sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS];
this.checksum = new byte[getChecksumSize() * BUFFER_NUM_CHUNKS];
this.count = 0;
}
/* write the data chunk in <code>b</code> staring at <code>offset</code> with
* a length of <code>len > 0</code>, and its checksum
*/
protected abstract void writeChunk(byte[] b, int bOffset, int bLen,
byte[] checksum, int checksumOffset, int checksumLen) throws IOException;
/**
* Check if the implementing OutputStream is closed and should no longer
* accept writes. Implementations should do nothing if this stream is not
* closed, and should throw an {@link IOException} if it is closed.
*
* @throws IOException if this stream is already closed.
*/
protected abstract void checkClosed() throws IOException;
/** Write one byte */
@Override
public synchronized void write(int b) throws IOException {
buf[count++] = (byte)b;
if(count == buf.length) {
flushBuffer();
}
}
/**
* Writes <code>len</code> bytes from the specified byte array
* starting at offset <code>off</code> and generate a checksum for
* each data chunk.
*
* <p> This method stores bytes from the given array into this
* stream's buffer before it gets checksumed. The buffer gets checksumed
* and flushed to the underlying output stream when all data
* in a checksum chunk are in the buffer. If the buffer is empty and
* requested length is at least as large as the size of next checksum chunk
* size, this method will checksum and write the chunk directly
* to the underlying output stream. Thus it avoids uneccessary data copy.
*
* @param b the data.
* @param off the start offset in the data.
* @param len the number of bytes to write.
* @exception IOException if an I/O error occurs.
*/
@Override
public synchronized void write(byte b[], int off, int len)
throws IOException {
checkClosed();
if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
for (int n=0;n<len;n+=write1(b, off+n, len-n)) {
}
}
/**
* Write a portion of an array, flushing to the underlying
* stream at most once if necessary.
*/
private int write1(byte b[], int off, int len) throws IOException {
if(count==0 && len>=buf.length) {
// local buffer is empty and user buffer size >= local buffer size, so
// simply checksum the user buffer and send it directly to the underlying
// stream
final int length = buf.length;
writeChecksumChunks(b, off, length);
return length;
}
// copy user data to local buffer
int bytesToCopy = buf.length-count;
bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
System.arraycopy(b, off, buf, count, bytesToCopy);
count += bytesToCopy;
if (count == buf.length) {
// local buffer is full
flushBuffer();
}
return bytesToCopy;
}
/* Forces any buffered output bytes to be checksumed and written out to
* the underlying output stream.
*/
protected synchronized void flushBuffer() throws IOException {
flushBuffer(false, true);
}
/* Forces buffered output bytes to be checksummed and written out to
* the underlying output stream. If there is a trailing partial chunk in the
* buffer,
* 1) flushPartial tells us whether to flush that chunk
* 2) if flushPartial is true, keep tells us whether to keep that chunk in the
* buffer (if flushPartial is false, it is always kept in the buffer)
*
* Returns the number of bytes that were flushed but are still left in the
* buffer (can only be non-zero if keep is true).
*/
protected synchronized int flushBuffer(boolean keep,
boolean flushPartial) throws IOException {
int bufLen = count;
int partialLen = bufLen % sum.getBytesPerChecksum();
int lenToFlush = flushPartial ? bufLen : bufLen - partialLen;
if (lenToFlush != 0) {
writeChecksumChunks(buf, 0, lenToFlush);
if (!flushPartial || keep) {
count = partialLen;
System.arraycopy(buf, bufLen - count, buf, 0, count);
} else {
count = 0;
}
}
// total bytes left minus unflushed bytes left
return count - (bufLen - lenToFlush);
}
/**
* Checksums all complete data chunks and flushes them to the underlying
* stream. If there is a trailing partial chunk, it is not flushed and is
* maintained in the buffer.
*/
public void flush() throws IOException {
flushBuffer(false, false);
}
/**
* Return the number of valid bytes currently in the buffer.
*/
protected synchronized int getBufferedDataSize() {
return count;
}
/** @return the size for a checksum. */
protected int getChecksumSize() {
return sum.getChecksumSize();
}
protected TraceScope createWriteTraceScope() {
return null;
}
/** Generate checksums for the given data chunks and output chunks & checksums
* to the underlying output stream.
*/
private void writeChecksumChunks(byte b[], int off, int len)
throws IOException {
sum.calculateChunkedSums(b, off, len, checksum, 0);
TraceScope scope = createWriteTraceScope();
try {
for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
writeChunk(b, off + i, chunkLen, checksum, ckOffset,
getChecksumSize());
}
} finally {
if (scope != null) {
scope.close();
}
}
}
/**
* Converts a checksum integer value to a byte stream
*/
static public byte[] convertToByteStream(Checksum sum, int checksumSize) {
return int2byte((int)sum.getValue(), new byte[checksumSize]);
}
static byte[] int2byte(int integer, byte[] bytes) {
if (bytes.length != 0) {
bytes[0] = (byte) ((integer >>> 24) & 0xFF);
bytes[1] = (byte) ((integer >>> 16) & 0xFF);
bytes[2] = (byte) ((integer >>> 8) & 0xFF);
bytes[3] = (byte) ((integer >>> 0) & 0xFF);
return bytes;
}
return bytes;
}
/**
* Resets existing buffer with a new one of the specified size.
*/
protected synchronized void setChecksumBufSize(int size) {
this.buf = new byte[size];
this.checksum = new byte[sum.getChecksumSize(size)];
this.count = 0;
}
protected synchronized void resetChecksumBufSize() {
setChecksumBufSize(sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS);
}
}