| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.fs; |
| |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.util.zip.Checksum; |
| |
| /** |
| * This is a generic output stream for generating checksums for |
| * data before it is written to the underlying stream |
| */ |
| |
| abstract public class FSOutputSummer extends OutputStream { |
| // data checksum |
| private Checksum sum; |
| // internal buffer for storing data before it is checksumed |
| private byte buf[]; |
| // internal buffer for storing checksum |
| private byte checksum[]; |
| // The number of valid bytes in the buffer. |
| private int count; |
| |
| protected FSOutputSummer(Checksum sum, int maxChunkSize, int checksumSize) { |
| this.sum = sum; |
| this.buf = new byte[maxChunkSize]; |
| this.checksum = new byte[checksumSize]; |
| this.count = 0; |
| } |
| |
| /* write the data chunk in <code>b</code> staring at <code>offset</code> with |
| * a length of <code>len</code>, and its checksum |
| */ |
| protected abstract void writeChunk(byte[] b, int offset, int len, byte[] checksum) |
| throws IOException; |
| |
| /** Write one byte */ |
| public synchronized void write(int b) throws IOException { |
| sum.update(b); |
| buf[count++] = (byte)b; |
| if(count == buf.length) { |
| flushBuffer(); |
| } |
| } |
| |
| /** |
| * Writes <code>len</code> bytes from the specified byte array |
| * starting at offset <code>off</code> and generate a checksum for |
| * each data chunk. |
| * |
| * <p> This method stores bytes from the given array into this |
| * stream's buffer before it gets checksumed. The buffer gets checksumed |
| * and flushed to the underlying output stream when all data |
| * in a checksum chunk are in the buffer. If the buffer is empty and |
| * requested length is at least as large as the size of next checksum chunk |
| * size, this method will checksum and write the chunk directly |
| * to the underlying output stream. Thus it avoids uneccessary data copy. |
| * |
| * @param b the data. |
| * @param off the start offset in the data. |
| * @param len the number of bytes to write. |
| * @exception IOException if an I/O error occurs. |
| */ |
| public synchronized void write(byte b[], int off, int len) |
| throws IOException { |
| if (off < 0 || len < 0 || off > b.length - len) { |
| throw new ArrayIndexOutOfBoundsException(); |
| } |
| |
| for (int n=0;n<len;n+=write1(b, off+n, len-n)) { |
| } |
| } |
| |
| /** |
| * Write a portion of an array, flushing to the underlying |
| * stream at most once if necessary. |
| */ |
| private int write1(byte b[], int off, int len) throws IOException { |
| if(count==0 && len>=buf.length) { |
| // local buffer is empty and user data has one chunk |
| // checksum and output data |
| final int length = buf.length; |
| sum.update(b, off, length); |
| writeChecksumChunk(b, off, length, false); |
| return length; |
| } |
| |
| // copy user data to local buffer |
| int bytesToCopy = buf.length-count; |
| bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy; |
| sum.update(b, off, bytesToCopy); |
| System.arraycopy(b, off, buf, count, bytesToCopy); |
| count += bytesToCopy; |
| if (count == buf.length) { |
| // local buffer is full |
| flushBuffer(); |
| } |
| return bytesToCopy; |
| } |
| |
| /* Forces any buffered output bytes to be checksumed and written out to |
| * the underlying output stream. |
| */ |
| protected synchronized void flushBuffer() throws IOException { |
| flushBuffer(false); |
| } |
| |
| /* Forces any buffered output bytes to be checksumed and written out to |
| * the underlying output stream. If keep is true, then the state of |
| * this object remains intact. |
| */ |
| protected synchronized void flushBuffer(boolean keep) throws IOException { |
| if (count != 0) { |
| int chunkLen = count; |
| count = 0; |
| writeChecksumChunk(buf, 0, chunkLen, keep); |
| if (keep) { |
| count = chunkLen; |
| } |
| } |
| } |
| |
| /** Generate checksum for the data chunk and output data chunk & checksum |
| * to the underlying output stream. If keep is true then keep the |
| * current checksum intact, do not reset it. |
| */ |
| private void writeChecksumChunk(byte b[], int off, int len, boolean keep) |
| throws IOException { |
| int tempChecksum = (int)sum.getValue(); |
| if (!keep) { |
| sum.reset(); |
| } |
| int2byte(tempChecksum, checksum); |
| writeChunk(b, off, len, checksum); |
| } |
| |
| /** |
| * Converts a checksum integer value to a byte stream |
| */ |
| static public byte[] convertToByteStream(Checksum sum, int checksumSize) { |
| return int2byte((int)sum.getValue(), new byte[checksumSize]); |
| } |
| |
| static byte[] int2byte(int integer, byte[] bytes) { |
| bytes[0] = (byte)((integer >>> 24) & 0xFF); |
| bytes[1] = (byte)((integer >>> 16) & 0xFF); |
| bytes[2] = (byte)((integer >>> 8) & 0xFF); |
| bytes[3] = (byte)((integer >>> 0) & 0xFF); |
| return bytes; |
| } |
| |
| /** |
| * Resets existing buffer with a new one of the specified size. |
| */ |
| protected synchronized void resetChecksumChunk(int size) { |
| sum.reset(); |
| this.buf = new byte[size]; |
| this.count = 0; |
| } |
| } |