| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.datanode; |
| |
| import java.io.BufferedInputStream; |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.net.SocketException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.FileChannel; |
| import java.util.Arrays; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.hadoop.fs.ChecksumException; |
| import org.apache.hadoop.hdfs.protocol.Block; |
| import org.apache.hadoop.hdfs.protocol.FSConstants; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.net.SocketOutputStream; |
| import org.apache.hadoop.util.DataChecksum; |
| import org.apache.hadoop.util.StringUtils; |
| |
| /** |
| * Reads a block from the disk and sends it to a recipient. |
| */ |
| class BlockSender implements java.io.Closeable, FSConstants { |
| public static final Log LOG = DataNode.LOG; |
| static final Log ClientTraceLog = DataNode.ClientTraceLog; |
| |
| private Block block; // the block to read from |
| private InputStream blockIn; // data stream |
| private long blockInPosition = -1; // updated while using transferTo(). |
| private DataInputStream checksumIn; // checksum datastream |
| private DataChecksum checksum; // checksum stream |
| private long offset; // starting position to read |
| private long endOffset; // ending position |
| private long blockLength; |
| private int bytesPerChecksum; // chunk size |
| private int checksumSize; // checksum size |
| private boolean corruptChecksumOk; // if need to verify checksum |
| private boolean chunkOffsetOK; // if need to send chunk offset |
| private long seqno; // sequence number of packet |
| |
| private boolean transferToAllowed = true; |
| private boolean blockReadFully; //set when the whole block is read |
| private boolean verifyChecksum; //if true, check is verified while reading |
| private BlockTransferThrottler throttler; |
| private final String clientTraceFmt; // format of client trace log message |
| |
| /** |
| * Minimum buffer used while sending data to clients. Used only if |
| * transferTo() is enabled. 64KB is not that large. It could be larger, but |
| * not sure if there will be much more improvement. |
| */ |
| private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024; |
| |
| |
| BlockSender(Block block, long startOffset, long length, |
| boolean corruptChecksumOk, boolean chunkOffsetOK, |
| boolean verifyChecksum, DataNode datanode) throws IOException { |
| this(block, startOffset, length, corruptChecksumOk, chunkOffsetOK, |
| verifyChecksum, datanode, null); |
| } |
| |
| BlockSender(Block block, long startOffset, long length, |
| boolean corruptChecksumOk, boolean chunkOffsetOK, |
| boolean verifyChecksum, DataNode datanode, String clientTraceFmt) |
| throws IOException { |
| try { |
| this.block = block; |
| this.chunkOffsetOK = chunkOffsetOK; |
| this.corruptChecksumOk = corruptChecksumOk; |
| this.verifyChecksum = verifyChecksum; |
| this.blockLength = datanode.data.getLength(block); |
| this.transferToAllowed = datanode.transferToAllowed; |
| this.clientTraceFmt = clientTraceFmt; |
| |
| if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) { |
| checksumIn = new DataInputStream( |
| new BufferedInputStream(datanode.data.getMetaDataInputStream(block), |
| BUFFER_SIZE)); |
| |
| // read and handle the common header here. For now just a version |
| BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn); |
| short version = header.getVersion(); |
| |
| if (version != FSDataset.METADATA_VERSION) { |
| LOG.warn("Wrong version (" + version + ") for metadata file for " |
| + block + " ignoring ..."); |
| } |
| checksum = header.getChecksum(); |
| } else { |
| LOG.warn("Could not find metadata file for " + block); |
| // This only decides the buffer size. Use BUFFER_SIZE? |
| checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL, |
| 16 * 1024); |
| } |
| |
| /* If bytesPerChecksum is very large, then the metadata file |
| * is mostly corrupted. For now just truncate bytesPerchecksum to |
| * blockLength. |
| */ |
| bytesPerChecksum = checksum.getBytesPerChecksum(); |
| if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > blockLength){ |
| checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(), |
| Math.max((int)blockLength, 10*1024*1024)); |
| bytesPerChecksum = checksum.getBytesPerChecksum(); |
| } |
| checksumSize = checksum.getChecksumSize(); |
| |
| if (length < 0) { |
| length = blockLength; |
| } |
| |
| endOffset = blockLength; |
| if (startOffset < 0 || startOffset > endOffset |
| || (length + startOffset) > endOffset) { |
| String msg = " Offset " + startOffset + " and length " + length |
| + " don't match block " + block + " ( blockLen " + endOffset + " )"; |
| LOG.warn(datanode.dnRegistration + ":sendBlock() : " + msg); |
| throw new IOException(msg); |
| } |
| |
| |
| offset = (startOffset - (startOffset % bytesPerChecksum)); |
| if (length >= 0) { |
| // Make sure endOffset points to end of a checksumed chunk. |
| long tmpLen = startOffset + length; |
| if (tmpLen % bytesPerChecksum != 0) { |
| tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum); |
| } |
| if (tmpLen < endOffset) { |
| endOffset = tmpLen; |
| } |
| } |
| |
| // seek to the right offsets |
| if (offset > 0) { |
| long checksumSkip = (offset / bytesPerChecksum) * checksumSize; |
| // note blockInStream is seeked when created below |
| if (checksumSkip > 0) { |
| // Should we use seek() for checksum file as well? |
| IOUtils.skipFully(checksumIn, checksumSkip); |
| } |
| } |
| seqno = 0; |
| |
| blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset |
| } catch (IOException ioe) { |
| IOUtils.closeStream(this); |
| IOUtils.closeStream(blockIn); |
| throw ioe; |
| } |
| } |
| |
| /** |
| * close opened files. |
| */ |
| public void close() throws IOException { |
| IOException ioe = null; |
| // close checksum file |
| if(checksumIn!=null) { |
| try { |
| checksumIn.close(); |
| } catch (IOException e) { |
| ioe = e; |
| } |
| checksumIn = null; |
| } |
| // close data file |
| if(blockIn!=null) { |
| try { |
| blockIn.close(); |
| } catch (IOException e) { |
| ioe = e; |
| } |
| blockIn = null; |
| } |
| // throw IOException if there is any |
| if(ioe!= null) { |
| throw ioe; |
| } |
| } |
| |
| /** |
| * Converts an IOExcpetion (not subclasses) to SocketException. |
| * This is typically done to indicate to upper layers that the error |
| * was a socket error rather than often more serious exceptions like |
| * disk errors. |
| */ |
| private static IOException ioeToSocketException(IOException ioe) { |
| if (ioe.getClass().equals(IOException.class)) { |
| // "se" could be a new class in stead of SocketException. |
| IOException se = new SocketException("Original Exception : " + ioe); |
| se.initCause(ioe); |
| /* Change the stacktrace so that original trace is not truncated |
| * when printed.*/ |
| se.setStackTrace(ioe.getStackTrace()); |
| return se; |
| } |
| // otherwise just return the same exception. |
| return ioe; |
| } |
| |
| /** |
| * Sends upto maxChunks chunks of data. |
| * |
| * When blockInPosition is >= 0, assumes 'out' is a |
| * {@link SocketOutputStream} and tries |
| * {@link SocketOutputStream#transferToFully(FileChannel, long, int)} to |
| * send data (and updates blockInPosition). |
| */ |
| private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) |
| throws IOException { |
| // Sends multiple chunks in one packet with a single write(). |
| |
| int len = Math.min((int) (endOffset - offset), |
| bytesPerChecksum*maxChunks); |
| if (len == 0) { |
| return 0; |
| } |
| |
| int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum; |
| int packetLen = len + numChunks*checksumSize + 4; |
| pkt.clear(); |
| |
| // write packet header |
| pkt.putInt(packetLen); |
| pkt.putLong(offset); |
| pkt.putLong(seqno); |
| pkt.put((byte)((offset + len >= endOffset) ? 1 : 0)); |
| //why no ByteBuf.putBoolean()? |
| pkt.putInt(len); |
| |
| int checksumOff = pkt.position(); |
| int checksumLen = numChunks * checksumSize; |
| byte[] buf = pkt.array(); |
| |
| if (checksumSize > 0 && checksumIn != null) { |
| try { |
| checksumIn.readFully(buf, checksumOff, checksumLen); |
| } catch (IOException e) { |
| LOG.warn(" Could not read or failed to veirfy checksum for data" + |
| " at offset " + offset + " for block " + block + " got : " |
| + StringUtils.stringifyException(e)); |
| IOUtils.closeStream(checksumIn); |
| checksumIn = null; |
| if (corruptChecksumOk) { |
| if (checksumOff < checksumLen) { |
| // Just fill the array with zeros. |
| Arrays.fill(buf, checksumOff, checksumLen, (byte) 0); |
| } |
| } else { |
| throw e; |
| } |
| } |
| } |
| |
| int dataOff = checksumOff + checksumLen; |
| |
| if (blockInPosition < 0) { |
| //normal transfer |
| IOUtils.readFully(blockIn, buf, dataOff, len); |
| |
| if (verifyChecksum) { |
| int dOff = dataOff; |
| int cOff = checksumOff; |
| int dLeft = len; |
| |
| for (int i=0; i<numChunks; i++) { |
| checksum.reset(); |
| int dLen = Math.min(dLeft, bytesPerChecksum); |
| checksum.update(buf, dOff, dLen); |
| if (!checksum.compare(buf, cOff)) { |
| throw new ChecksumException("Checksum failed at " + |
| (offset + len - dLeft), len); |
| } |
| dLeft -= dLen; |
| dOff += dLen; |
| cOff += checksumSize; |
| } |
| } |
| //writing is done below (mainly to handle IOException) |
| } |
| |
| try { |
| if (blockInPosition >= 0) { |
| //use transferTo(). Checks on out and blockIn are already done. |
| |
| SocketOutputStream sockOut = (SocketOutputStream)out; |
| //first write the packet |
| sockOut.write(buf, 0, dataOff); |
| // no need to flush. since we know out is not a buffered stream. |
| |
| sockOut.transferToFully(((FileInputStream)blockIn).getChannel(), |
| blockInPosition, len); |
| |
| blockInPosition += len; |
| } else { |
| // normal transfer |
| out.write(buf, 0, dataOff + len); |
| } |
| |
| } catch (IOException e) { |
| /* exception while writing to the client (well, with transferTo(), |
| * it could also be while reading from the local file). |
| */ |
| throw ioeToSocketException(e); |
| } |
| |
| if (throttler != null) { // rebalancing so throttle |
| throttler.throttle(packetLen); |
| } |
| |
| return len; |
| } |
| |
| /** |
| * sendBlock() is used to read block and its metadata and stream the data to |
| * either a client or to another datanode. |
| * |
| * @param out stream to which the block is written to |
| * @param baseStream optional. if non-null, <code>out</code> is assumed to |
| * be a wrapper over this stream. This enables optimizations for |
| * sending the data, e.g. |
| * {@link SocketOutputStream#transferToFully(FileChannel, |
| * long, int)}. |
| * @param throttler for sending data. |
| * @return total bytes reads, including crc. |
| */ |
| long sendBlock(DataOutputStream out, OutputStream baseStream, |
| BlockTransferThrottler throttler) throws IOException { |
| if( out == null ) { |
| throw new IOException( "out stream is null" ); |
| } |
| this.throttler = throttler; |
| |
| long initialOffset = offset; |
| long totalRead = 0; |
| OutputStream streamForSendChunks = out; |
| |
| final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; |
| try { |
| try { |
| checksum.writeHeader(out); |
| if ( chunkOffsetOK ) { |
| out.writeLong( offset ); |
| } |
| out.flush(); |
| } catch (IOException e) { //socket error |
| throw ioeToSocketException(e); |
| } |
| |
| int maxChunksPerPacket; |
| int pktSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER; |
| |
| if (transferToAllowed && !verifyChecksum && |
| baseStream instanceof SocketOutputStream && |
| blockIn instanceof FileInputStream) { |
| |
| FileChannel fileChannel = ((FileInputStream)blockIn).getChannel(); |
| |
| // blockInPosition also indicates sendChunks() uses transferTo. |
| blockInPosition = fileChannel.position(); |
| streamForSendChunks = baseStream; |
| |
| // assure a mininum buffer size. |
| maxChunksPerPacket = (Math.max(BUFFER_SIZE, |
| MIN_BUFFER_WITH_TRANSFERTO) |
| + bytesPerChecksum - 1)/bytesPerChecksum; |
| |
| // allocate smaller buffer while using transferTo(). |
| pktSize += checksumSize * maxChunksPerPacket; |
| } else { |
| maxChunksPerPacket = Math.max(1, |
| (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum); |
| pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket; |
| } |
| |
| ByteBuffer pktBuf = ByteBuffer.allocate(pktSize); |
| |
| while (endOffset > offset) { |
| long len = sendChunks(pktBuf, maxChunksPerPacket, |
| streamForSendChunks); |
| offset += len; |
| totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum* |
| checksumSize); |
| seqno++; |
| } |
| try { |
| out.writeInt(0); // mark the end of block |
| out.flush(); |
| } catch (IOException e) { //socket error |
| throw ioeToSocketException(e); |
| } |
| } finally { |
| if (clientTraceFmt != null) { |
| final long endTime = System.nanoTime(); |
| ClientTraceLog.info(String.format(clientTraceFmt, totalRead, initialOffset, endTime - startTime)); |
| } |
| close(); |
| } |
| |
| blockReadFully = (initialOffset == 0 && offset >= blockLength); |
| |
| return totalRead; |
| } |
| |
| boolean isBlockReadFully() { |
| return blockReadFully; |
| } |
| } |