| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.datanode; |
| |
| import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT; |
| |
| import java.io.BufferedOutputStream; |
| import java.io.Closeable; |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.EOFException; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.nio.ByteBuffer; |
| import java.util.Arrays; |
| import java.util.LinkedList; |
| import java.util.zip.Checksum; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.hadoop.fs.FSOutputSummer; |
| import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage; |
| import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader; |
| import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| import org.apache.hadoop.hdfs.protocol.FSConstants; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlock; |
| import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; |
| import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; |
| import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; |
| import org.apache.hadoop.hdfs.util.DataTransferThrottler; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.util.Daemon; |
| import org.apache.hadoop.util.DataChecksum; |
| import org.apache.hadoop.util.PureJavaCrc32; |
| import org.apache.hadoop.util.StringUtils; |
| |
| /** A class that receives a block and writes to its own disk, meanwhile |
| * may copies it to another site. If a throttler is provided, |
| * streaming throttling is also supported. |
| **/ |
| class BlockReceiver implements Closeable, FSConstants { |
| public static final Log LOG = DataNode.LOG; |
| static final Log ClientTraceLog = DataNode.ClientTraceLog; |
| |
| private DataInputStream in = null; // from where data are read |
| private DataChecksum checksum; // from where chunks of a block can be read |
| private OutputStream out = null; // to block file at local disk |
| private OutputStream cout = null; // output stream for cehcksum file |
| private DataOutputStream checksumOut = null; // to crc file at local disk |
| private int bytesPerChecksum; |
| private int checksumSize; |
| private ByteBuffer buf; // contains one full packet. |
| private int bufRead; //amount of valid data in the buf |
| private int maxPacketReadLen; |
| protected final String inAddr; |
| protected final String myAddr; |
| private String mirrorAddr; |
| private DataOutputStream mirrorOut; |
| private Daemon responder = null; |
| private DataTransferThrottler throttler; |
| private FSDataset.BlockWriteStreams streams; |
| private DatanodeInfo srcDataNode = null; |
| private Checksum partialCrc = null; |
| private final DataNode datanode; |
| volatile private boolean mirrorError; |
| |
| /** The client name. It is empty if a datanode is the client */ |
| private final String clientname; |
| private final boolean isClient; |
| private final boolean isDatanode; |
| |
| /** the block to receive */ |
| private final ExtendedBlock block; |
| /** the replica to write */ |
| private final ReplicaInPipelineInterface replicaInfo; |
| /** pipeline stage */ |
| private final BlockConstructionStage stage; |
| private final boolean isTransfer; |
| |
| BlockReceiver(final ExtendedBlock block, final DataInputStream in, |
| final String inAddr, final String myAddr, |
| final BlockConstructionStage stage, |
| final long newGs, final long minBytesRcvd, final long maxBytesRcvd, |
| final String clientname, final DatanodeInfo srcDataNode, |
| final DataNode datanode) throws IOException { |
| try{ |
| this.block = block; |
| this.in = in; |
| this.inAddr = inAddr; |
| this.myAddr = myAddr; |
| this.srcDataNode = srcDataNode; |
| this.datanode = datanode; |
| |
| this.clientname = clientname; |
| this.isDatanode = clientname.length() == 0; |
| this.isClient = !this.isDatanode; |
| |
| //for datanode, we have |
| //1: clientName.length() == 0, and |
| //2: stage == null or PIPELINE_SETUP_CREATE |
| this.stage = stage; |
| this.isTransfer = stage == BlockConstructionStage.TRANSFER_RBW |
| || stage == BlockConstructionStage.TRANSFER_FINALIZED; |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(getClass().getSimpleName() + ": " + block |
| + "\n isClient =" + isClient + ", clientname=" + clientname |
| + "\n isDatanode=" + isDatanode + ", srcDataNode=" + srcDataNode |
| + "\n inAddr=" + inAddr + ", myAddr=" + myAddr |
| ); |
| } |
| |
| // |
| // Open local disk out |
| // |
| if (isDatanode) { //replication or move |
| replicaInfo = datanode.data.createTemporary(block); |
| } else { |
| switch (stage) { |
| case PIPELINE_SETUP_CREATE: |
| replicaInfo = datanode.data.createRbw(block); |
| break; |
| case PIPELINE_SETUP_STREAMING_RECOVERY: |
| replicaInfo = datanode.data.recoverRbw( |
| block, newGs, minBytesRcvd, maxBytesRcvd); |
| block.setGenerationStamp(newGs); |
| break; |
| case PIPELINE_SETUP_APPEND: |
| replicaInfo = datanode.data.append(block, newGs, minBytesRcvd); |
| if (datanode.blockScanner != null) { // remove from block scanner |
| datanode.blockScanner.deleteBlock(block.getBlockPoolId(), |
| block.getLocalBlock()); |
| } |
| block.setGenerationStamp(newGs); |
| break; |
| case PIPELINE_SETUP_APPEND_RECOVERY: |
| replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd); |
| if (datanode.blockScanner != null) { // remove from block scanner |
| datanode.blockScanner.deleteBlock(block.getBlockPoolId(), |
| block.getLocalBlock()); |
| } |
| block.setGenerationStamp(newGs); |
| break; |
| case TRANSFER_RBW: |
| case TRANSFER_FINALIZED: |
| // this is a transfer destination |
| replicaInfo = datanode.data.createTemporary(block); |
| break; |
| default: throw new IOException("Unsupported stage " + stage + |
| " while receiving block " + block + " from " + inAddr); |
| } |
| } |
| // read checksum meta information |
| this.checksum = DataChecksum.newDataChecksum(in); |
| this.bytesPerChecksum = checksum.getBytesPerChecksum(); |
| this.checksumSize = checksum.getChecksumSize(); |
| |
| final boolean isCreate = isDatanode || isTransfer |
| || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE; |
| streams = replicaInfo.createStreams(isCreate, |
| this.bytesPerChecksum, this.checksumSize); |
| if (streams != null) { |
| this.out = streams.dataOut; |
| this.cout = streams.checksumOut; |
| this.checksumOut = new DataOutputStream(new BufferedOutputStream( |
| streams.checksumOut, |
| SMALL_BUFFER_SIZE)); |
| // write data chunk header if creating a new replica |
| if (isCreate) { |
| BlockMetadataHeader.writeHeader(checksumOut, checksum); |
| } |
| } |
| } catch (ReplicaAlreadyExistsException bae) { |
| throw bae; |
| } catch (ReplicaNotFoundException bne) { |
| throw bne; |
| } catch(IOException ioe) { |
| IOUtils.closeStream(this); |
| cleanupBlock(); |
| |
| // check if there is a disk error |
| IOException cause = FSDataset.getCauseIfDiskError(ioe); |
| DataNode.LOG.warn("IOException in BlockReceiver constructor. Cause is ", |
| cause); |
| |
| if (cause != null) { // possible disk error |
| ioe = cause; |
| datanode.checkDiskError(ioe); // may throw an exception here |
| } |
| |
| throw ioe; |
| } |
| } |
| |
| /** Return the datanode object. */ |
| DataNode getDataNode() {return datanode;} |
| |
| /** |
| * close files. |
| */ |
| public void close() throws IOException { |
| |
| IOException ioe = null; |
| // close checksum file |
| try { |
| if (checksumOut != null) { |
| checksumOut.flush(); |
| if (datanode.syncOnClose && (cout instanceof FileOutputStream)) { |
| ((FileOutputStream)cout).getChannel().force(true); |
| } |
| checksumOut.close(); |
| checksumOut = null; |
| } |
| } catch(IOException e) { |
| ioe = e; |
| } |
| // close block file |
| try { |
| if (out != null) { |
| out.flush(); |
| if (datanode.syncOnClose && (out instanceof FileOutputStream)) { |
| ((FileOutputStream)out).getChannel().force(true); |
| } |
| out.close(); |
| out = null; |
| } |
| } catch (IOException e) { |
| ioe = e; |
| } |
| // disk check |
| if(ioe != null) { |
| datanode.checkDiskError(ioe); |
| throw ioe; |
| } |
| } |
| |
| /** |
| * Flush block data and metadata files to disk. |
| * @throws IOException |
| */ |
| void flush() throws IOException { |
| if (checksumOut != null) { |
| checksumOut.flush(); |
| } |
| if (out != null) { |
| out.flush(); |
| } |
| } |
| |
| /** |
| * While writing to mirrorOut, failure to write to mirror should not |
| * affect this datanode unless it is caused by interruption. |
| */ |
| private void handleMirrorOutError(IOException ioe) throws IOException { |
| String bpid = block.getBlockPoolId(); |
| LOG.info(datanode.getDNRegistrationForBP(bpid) + ":Exception writing block " + |
| block + " to mirror " + mirrorAddr + "\n" + |
| StringUtils.stringifyException(ioe)); |
| if (Thread.interrupted()) { // shut down if the thread is interrupted |
| throw ioe; |
| } else { // encounter an error while writing to mirror |
| // continue to run even if can not write to mirror |
| // notify client of the error |
| // and wait for the client to shut down the pipeline |
| mirrorError = true; |
| } |
| } |
| |
| /** |
| * Verify multiple CRC chunks. |
| */ |
| private void verifyChunks( byte[] dataBuf, int dataOff, int len, |
| byte[] checksumBuf, int checksumOff ) |
| throws IOException { |
| DatanodeProtocol nn = datanode.getBPNamenode(block.getBlockPoolId()); |
| while (len > 0) { |
| int chunkLen = Math.min(len, bytesPerChecksum); |
| |
| checksum.update(dataBuf, dataOff, chunkLen); |
| |
| if (!checksum.compare(checksumBuf, checksumOff)) { |
| if (srcDataNode != null) { |
| try { |
| LOG.info("report corrupt block " + block + " from datanode " + |
| srcDataNode + " to namenode"); |
| LocatedBlock lb = new LocatedBlock(block, |
| new DatanodeInfo[] {srcDataNode}); |
| nn.reportBadBlocks(new LocatedBlock[] {lb}); |
| } catch (IOException e) { |
| LOG.warn("Failed to report bad block " + block + |
| " from datanode " + srcDataNode + " to namenode"); |
| } |
| } |
| throw new IOException("Unexpected checksum mismatch " + |
| "while writing " + block + " from " + inAddr); |
| } |
| |
| checksum.reset(); |
| dataOff += chunkLen; |
| checksumOff += checksumSize; |
| len -= chunkLen; |
| } |
| } |
| |
| /** |
| * Makes sure buf.position() is zero without modifying buf.remaining(). |
| * It moves the data if position needs to be changed. |
| */ |
| private void shiftBufData() { |
| if (bufRead != buf.limit()) { |
| throw new IllegalStateException("bufRead should be same as " + |
| "buf.limit()"); |
| } |
| |
| //shift the remaining data on buf to the front |
| if (buf.position() > 0) { |
| int dataLeft = buf.remaining(); |
| if (dataLeft > 0) { |
| byte[] b = buf.array(); |
| System.arraycopy(b, buf.position(), b, 0, dataLeft); |
| } |
| buf.position(0); |
| bufRead = dataLeft; |
| buf.limit(bufRead); |
| } |
| } |
| |
| /** |
| * reads upto toRead byte to buf at buf.limit() and increments the limit. |
| * throws an IOException if read does not succeed. |
| */ |
| private int readToBuf(int toRead) throws IOException { |
| if (toRead < 0) { |
| toRead = (maxPacketReadLen > 0 ? maxPacketReadLen : buf.capacity()) |
| - buf.limit(); |
| } |
| |
| int nRead = in.read(buf.array(), buf.limit(), toRead); |
| |
| if (nRead < 0) { |
| throw new EOFException("while trying to read " + toRead + " bytes"); |
| } |
| bufRead = buf.limit() + nRead; |
| buf.limit(bufRead); |
| return nRead; |
| } |
| |
| |
| /** |
| * Reads (at least) one packet and returns the packet length. |
| * buf.position() points to the start of the packet and |
| * buf.limit() point to the end of the packet. There could |
| * be more data from next packet in buf.<br><br> |
| * |
| * It tries to read a full packet with single read call. |
| * Consecutive packets are usually of the same length. |
| */ |
| private void readNextPacket() throws IOException { |
| /* This dances around buf a little bit, mainly to read |
| * full packet with single read and to accept arbitarary size |
| * for next packet at the same time. |
| */ |
| if (buf == null) { |
| /* initialize buffer to the best guess size: |
| * 'chunksPerPacket' calculation here should match the same |
| * calculation in DFSClient to make the guess accurate. |
| */ |
| int chunkSize = bytesPerChecksum + checksumSize; |
| int chunksPerPacket = (datanode.writePacketSize - PacketHeader.PKT_HEADER_LEN |
| + chunkSize - 1)/chunkSize; |
| buf = ByteBuffer.allocate(PacketHeader.PKT_HEADER_LEN + |
| Math.max(chunksPerPacket, 1) * chunkSize); |
| buf.limit(0); |
| } |
| |
| // See if there is data left in the buffer : |
| if (bufRead > buf.limit()) { |
| buf.limit(bufRead); |
| } |
| |
| while (buf.remaining() < SIZE_OF_INTEGER) { |
| if (buf.position() > 0) { |
| shiftBufData(); |
| } |
| readToBuf(-1); |
| } |
| |
| /* We mostly have the full packet or at least enough for an int |
| */ |
| buf.mark(); |
| int payloadLen = buf.getInt(); |
| buf.reset(); |
| |
| // check corrupt values for pktLen, 100MB upper limit should be ok? |
| if (payloadLen < 0 || payloadLen > (100*1024*1024)) { |
| throw new IOException("Incorrect value for packet payload : " + |
| payloadLen); |
| } |
| |
| // Subtract SIZE_OF_INTEGER since that accounts for the payloadLen that |
| // we read above. |
| int pktSize = payloadLen + PacketHeader.PKT_HEADER_LEN - SIZE_OF_INTEGER; |
| |
| if (buf.remaining() < pktSize) { |
| //we need to read more data |
| int toRead = pktSize - buf.remaining(); |
| |
| // first make sure buf has enough space. |
| int spaceLeft = buf.capacity() - buf.limit(); |
| if (toRead > spaceLeft && buf.position() > 0) { |
| shiftBufData(); |
| spaceLeft = buf.capacity() - buf.limit(); |
| } |
| if (toRead > spaceLeft) { |
| byte oldBuf[] = buf.array(); |
| int toCopy = buf.limit(); |
| buf = ByteBuffer.allocate(toCopy + toRead); |
| System.arraycopy(oldBuf, 0, buf.array(), 0, toCopy); |
| buf.limit(toCopy); |
| } |
| |
| //now read: |
| while (toRead > 0) { |
| toRead -= readToBuf(toRead); |
| } |
| } |
| |
| if (buf.remaining() > pktSize) { |
| buf.limit(buf.position() + pktSize); |
| } |
| |
| if (pktSize > maxPacketReadLen) { |
| maxPacketReadLen = pktSize; |
| } |
| } |
| |
| /** |
| * Receives and processes a packet. It can contain many chunks. |
| * returns the number of data bytes that the packet has. |
| */ |
| private int receivePacket() throws IOException { |
| // read the next packet |
| readNextPacket(); |
| |
| buf.mark(); |
| PacketHeader header = new PacketHeader(); |
| header.readFields(buf); |
| int endOfHeader = buf.position(); |
| buf.reset(); |
| |
| // Sanity check the header |
| if (header.getOffsetInBlock() > replicaInfo.getNumBytes()) { |
| throw new IOException("Received an out-of-sequence packet for " + block + |
| "from " + inAddr + " at offset " + header.getOffsetInBlock() + |
| ". Expecting packet starting at " + replicaInfo.getNumBytes()); |
| } |
| if (header.getDataLen() < 0) { |
| throw new IOException("Got wrong length during writeBlock(" + block + |
| ") from " + inAddr + " at offset " + |
| header.getOffsetInBlock() + ": " + |
| header.getDataLen()); |
| } |
| |
| return receivePacket( |
| header.getOffsetInBlock(), |
| header.getSeqno(), |
| header.isLastPacketInBlock(), |
| header.getDataLen(), endOfHeader); |
| } |
| |
| /** |
| * Write the received packet to disk (data only) |
| */ |
| private void writePacketToDisk(byte[] pktBuf, int startByteToDisk, |
| int numBytesToDisk) throws IOException { |
| out.write(pktBuf, startByteToDisk, numBytesToDisk); |
| } |
| |
| /** |
| * Receives and processes a packet. It can contain many chunks. |
| * returns the number of data bytes that the packet has. |
| */ |
| private int receivePacket(long offsetInBlock, long seqno, |
| boolean lastPacketInBlock, int len, int endOfHeader) throws IOException { |
| if (LOG.isDebugEnabled()){ |
| LOG.debug("Receiving one packet for block " + block + |
| " of length " + len + |
| " seqno " + seqno + |
| " offsetInBlock " + offsetInBlock + |
| " lastPacketInBlock " + lastPacketInBlock); |
| } |
| |
| // update received bytes |
| long firstByteInBlock = offsetInBlock; |
| offsetInBlock += len; |
| if (replicaInfo.getNumBytes() < offsetInBlock) { |
| replicaInfo.setNumBytes(offsetInBlock); |
| } |
| |
| // put in queue for pending acks |
| if (responder != null) { |
| ((PacketResponder)responder.getRunnable()).enqueue(seqno, |
| lastPacketInBlock, offsetInBlock); |
| } |
| |
| //First write the packet to the mirror: |
| if (mirrorOut != null && !mirrorError) { |
| try { |
| mirrorOut.write(buf.array(), buf.position(), buf.remaining()); |
| mirrorOut.flush(); |
| } catch (IOException e) { |
| handleMirrorOutError(e); |
| } |
| } |
| |
| buf.position(endOfHeader); |
| |
| if (lastPacketInBlock || len == 0) { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Receiving an empty packet or the end of the block " + block); |
| } |
| } else { |
| int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)* |
| checksumSize; |
| |
| if ( buf.remaining() != (checksumLen + len)) { |
| throw new IOException("Data remaining in packet does not match" + |
| "sum of checksumLen and dataLen " + |
| " size remaining: " + buf.remaining() + |
| " data len: " + len + |
| " checksum Len: " + checksumLen); |
| } |
| int checksumOff = buf.position(); |
| int dataOff = checksumOff + checksumLen; |
| byte pktBuf[] = buf.array(); |
| |
| buf.position(buf.limit()); // move to the end of the data. |
| |
| /* skip verifying checksum iff this is not the last one in the |
| * pipeline and clientName is non-null. i.e. Checksum is verified |
| * on all the datanodes when the data is being written by a |
| * datanode rather than a client. Whe client is writing the data, |
| * protocol includes acks and only the last datanode needs to verify |
| * checksum. |
| */ |
| if (mirrorOut == null || isDatanode) { |
| verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff); |
| } |
| |
| byte[] lastChunkChecksum; |
| |
| try { |
| long onDiskLen = replicaInfo.getBytesOnDisk(); |
| if (onDiskLen<offsetInBlock) { |
| //finally write to the disk : |
| |
| if (onDiskLen % bytesPerChecksum != 0) { |
| // prepare to overwrite last checksum |
| adjustCrcFilePosition(); |
| } |
| |
| // If this is a partial chunk, then read in pre-existing checksum |
| if (firstByteInBlock % bytesPerChecksum != 0) { |
| LOG.info("Packet starts at " + firstByteInBlock + |
| " for block " + block + |
| " which is not a multiple of bytesPerChecksum " + |
| bytesPerChecksum); |
| long offsetInChecksum = BlockMetadataHeader.getHeaderSize() + |
| onDiskLen / bytesPerChecksum * checksumSize; |
| computePartialChunkCrc(onDiskLen, offsetInChecksum, bytesPerChecksum); |
| } |
| |
| int startByteToDisk = dataOff+(int)(onDiskLen-firstByteInBlock); |
| int numBytesToDisk = (int)(offsetInBlock-onDiskLen); |
| writePacketToDisk(pktBuf, startByteToDisk, numBytesToDisk); |
| |
| // If this is a partial chunk, then verify that this is the only |
| // chunk in the packet. Calculate new crc for this chunk. |
| if (partialCrc != null) { |
| if (len > bytesPerChecksum) { |
| throw new IOException("Got wrong length during writeBlock(" + |
| block + ") from " + inAddr + " " + |
| "A packet can have only one partial chunk."+ |
| " len = " + len + |
| " bytesPerChecksum " + bytesPerChecksum); |
| } |
| partialCrc.update(pktBuf, startByteToDisk, numBytesToDisk); |
| byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize); |
| lastChunkChecksum = Arrays.copyOfRange( |
| buf, buf.length - checksumSize, buf.length |
| ); |
| checksumOut.write(buf); |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Writing out partial crc for data len " + len); |
| } |
| partialCrc = null; |
| } else { |
| lastChunkChecksum = Arrays.copyOfRange( |
| pktBuf, |
| checksumOff + checksumLen - checksumSize, |
| checksumOff + checksumLen |
| ); |
| checksumOut.write(pktBuf, checksumOff, checksumLen); |
| } |
| /// flush entire packet |
| flush(); |
| |
| replicaInfo.setLastChecksumAndDataLen( |
| offsetInBlock, lastChunkChecksum |
| ); |
| |
| datanode.metrics.incrBytesWritten(len); |
| } |
| } catch (IOException iex) { |
| datanode.checkDiskError(iex); |
| throw iex; |
| } |
| } |
| |
| if (throttler != null) { // throttle I/O |
| throttler.throttle(len); |
| } |
| |
| return lastPacketInBlock?-1:len; |
| } |
| |
| void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException { |
| checksum.writeHeader(mirrorOut); |
| } |
| |
| |
| void receiveBlock( |
| DataOutputStream mirrOut, // output to next datanode |
| DataInputStream mirrIn, // input from next datanode |
| DataOutputStream replyOut, // output to previous datanode |
| String mirrAddr, DataTransferThrottler throttlerArg, |
| DatanodeInfo[] downstreams) throws IOException { |
| |
| boolean responderClosed = false; |
| mirrorOut = mirrOut; |
| mirrorAddr = mirrAddr; |
| throttler = throttlerArg; |
| |
| try { |
| if (isClient && !isTransfer) { |
| responder = new Daemon(datanode.threadGroup, |
| new PacketResponder(replyOut, mirrIn, downstreams)); |
| responder.start(); // start thread to processes responses |
| } |
| |
| /* |
| * Receive until the last packet. |
| */ |
| while (receivePacket() >= 0) {} |
| |
| // wait for all outstanding packet responses. And then |
| // indicate responder to gracefully shutdown. |
| // Mark that responder has been closed for future processing |
| if (responder != null) { |
| ((PacketResponder)responder.getRunnable()).close(); |
| responderClosed = true; |
| } |
| |
| // If this write is for a replication or transfer-RBW/Finalized, |
| // then finalize block or convert temporary to RBW. |
| // For client-writes, the block is finalized in the PacketResponder. |
| if (isDatanode || isTransfer) { |
| // close the block/crc files |
| close(); |
| block.setNumBytes(replicaInfo.getNumBytes()); |
| |
| if (stage == BlockConstructionStage.TRANSFER_RBW) { |
| // for TRANSFER_RBW, convert temporary to RBW |
| datanode.data.convertTemporaryToRbw(block); |
| } else { |
| // for isDatnode or TRANSFER_FINALIZED |
| // Finalize the block. Does this fsync()? |
| datanode.data.finalizeBlock(block); |
| } |
| datanode.metrics.incrBlocksWritten(); |
| } |
| |
| } catch (IOException ioe) { |
| LOG.info("Exception in receiveBlock for " + block, ioe); |
| throw ioe; |
| } finally { |
| if (!responderClosed) { // Abnormal termination of the flow above |
| IOUtils.closeStream(this); |
| if (responder != null) { |
| responder.interrupt(); |
| } |
| cleanupBlock(); |
| } |
| if (responder != null) { |
| try { |
| responder.join(); |
| } catch (InterruptedException e) { |
| throw new IOException("Interrupted receiveBlock"); |
| } |
| responder = null; |
| } |
| } |
| } |
| |
| /** Cleanup a partial block |
| * if this write is for a replication request (and not from a client) |
| */ |
| private void cleanupBlock() throws IOException { |
| if (isDatanode) { |
| datanode.data.unfinalizeBlock(block); |
| } |
| } |
| |
| /** |
| * Adjust the file pointer in the local meta file so that the last checksum |
| * will be overwritten. |
| */ |
| private void adjustCrcFilePosition() throws IOException { |
| if (out != null) { |
| out.flush(); |
| } |
| if (checksumOut != null) { |
| checksumOut.flush(); |
| } |
| |
| // rollback the position of the meta file |
| datanode.data.adjustCrcChannelPosition(block, streams, checksumSize); |
| } |
| |
| /** |
| * Convert a checksum byte array to a long |
| */ |
| static private long checksum2long(byte[] checksum) { |
| long crc = 0L; |
| for(int i=0; i<checksum.length; i++) { |
| crc |= (0xffL&(long)checksum[i])<<((checksum.length-i-1)*8); |
| } |
| return crc; |
| } |
| |
| /** |
| * reads in the partial crc chunk and computes checksum |
| * of pre-existing data in partial chunk. |
| */ |
| private void computePartialChunkCrc(long blkoff, long ckoff, |
| int bytesPerChecksum) throws IOException { |
| |
| // find offset of the beginning of partial chunk. |
| // |
| int sizePartialChunk = (int) (blkoff % bytesPerChecksum); |
| int checksumSize = checksum.getChecksumSize(); |
| blkoff = blkoff - sizePartialChunk; |
| LOG.info("computePartialChunkCrc sizePartialChunk " + |
| sizePartialChunk + |
| " block " + block + |
| " offset in block " + blkoff + |
| " offset in metafile " + ckoff); |
| |
| // create an input stream from the block file |
| // and read in partial crc chunk into temporary buffer |
| // |
| byte[] buf = new byte[sizePartialChunk]; |
| byte[] crcbuf = new byte[checksumSize]; |
| FSDataset.BlockInputStreams instr = null; |
| try { |
| instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff); |
| IOUtils.readFully(instr.dataIn, buf, 0, sizePartialChunk); |
| |
| // open meta file and read in crc value computer earlier |
| IOUtils.readFully(instr.checksumIn, crcbuf, 0, crcbuf.length); |
| } finally { |
| IOUtils.closeStream(instr); |
| } |
| |
| // compute crc of partial chunk from data read in the block file. |
| partialCrc = new PureJavaCrc32(); |
| partialCrc.update(buf, 0, sizePartialChunk); |
| LOG.info("Read in partial CRC chunk from disk for block " + block); |
| |
| // paranoia! verify that the pre-computed crc matches what we |
| // recalculated just now |
| if (partialCrc.getValue() != checksum2long(crcbuf)) { |
| String msg = "Partial CRC " + partialCrc.getValue() + |
| " does not match value computed the " + |
| " last time file was closed " + |
| checksum2long(crcbuf); |
| throw new IOException(msg); |
| } |
| } |
| |
| private static enum PacketResponderType { |
| NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE |
| } |
| |
| /** |
| * Processed responses from downstream datanodes in the pipeline |
| * and sends back replies to the originator. |
| */ |
| class PacketResponder implements Runnable, Closeable, FSConstants { |
| |
| /** queue for packets waiting for ack */ |
| private final LinkedList<Packet> ackQueue = new LinkedList<Packet>(); |
| /** the thread that spawns this responder */ |
| private final Thread receiverThread = Thread.currentThread(); |
| /** is this responder running? */ |
| private volatile boolean running = true; |
| |
| /** input from the next downstream datanode */ |
| private final DataInputStream downstreamIn; |
| /** output to upstream datanode/client */ |
| private final DataOutputStream upstreamOut; |
| |
| /** The type of this responder */ |
| private final PacketResponderType type; |
| /** for log and error messages */ |
| private final String myString; |
| |
| @Override |
| public String toString() { |
| return myString; |
| } |
| |
| PacketResponder(final DataOutputStream upstreamOut, |
| final DataInputStream downstreamIn, |
| final DatanodeInfo[] downstreams) { |
| this.downstreamIn = downstreamIn; |
| this.upstreamOut = upstreamOut; |
| |
| this.type = downstreams == null? PacketResponderType.NON_PIPELINE |
| : downstreams.length == 0? PacketResponderType.LAST_IN_PIPELINE |
| : PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE; |
| |
| final StringBuilder b = new StringBuilder(getClass().getSimpleName()) |
| .append(": ").append(block).append(", type=").append(type); |
| if (type != PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) { |
| b.append(", downstreams=").append(downstreams.length) |
| .append(":").append(Arrays.asList(downstreams)); |
| } |
| this.myString = b.toString(); |
| } |
| |
| /** |
| * enqueue the seqno that is still be to acked by the downstream datanode. |
| * @param seqno |
| * @param lastPacketInBlock |
| * @param offsetInBlock |
| */ |
| synchronized void enqueue(final long seqno, |
| final boolean lastPacketInBlock, final long offsetInBlock) { |
| if (running) { |
| final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock); |
| if(LOG.isDebugEnabled()) { |
| LOG.debug(myString + ": enqueue " + p); |
| } |
| ackQueue.addLast(p); |
| notifyAll(); |
| } |
| } |
| |
| /** |
| * wait for all pending packets to be acked. Then shutdown thread. |
| */ |
| @Override |
| public synchronized void close() { |
| while (running && ackQueue.size() != 0 && datanode.shouldRun) { |
| try { |
| wait(); |
| } catch (InterruptedException e) { |
| running = false; |
| } |
| } |
| if(LOG.isDebugEnabled()) { |
| LOG.debug(myString + ": closing"); |
| } |
| running = false; |
| notifyAll(); |
| } |
| |
| /** |
| * Thread to process incoming acks. |
| * @see java.lang.Runnable#run() |
| */ |
| public void run() { |
| boolean lastPacketInBlock = false; |
| final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; |
| while (running && datanode.shouldRun && !lastPacketInBlock) { |
| |
| boolean isInterrupted = false; |
| try { |
| Packet pkt = null; |
| long expected = -2; |
| PipelineAck ack = new PipelineAck(); |
| long seqno = PipelineAck.UNKOWN_SEQNO; |
| try { |
| if (type != PacketResponderType.LAST_IN_PIPELINE |
| && !mirrorError) { |
| // read an ack from downstream datanode |
| ack.readFields(downstreamIn); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(myString + " got " + ack); |
| } |
| seqno = ack.getSeqno(); |
| } |
| if (seqno != PipelineAck.UNKOWN_SEQNO |
| || type == PacketResponderType.LAST_IN_PIPELINE) { |
| synchronized (this) { |
| while (running && datanode.shouldRun && ackQueue.size() == 0) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(myString + ": seqno=" + seqno + |
| " waiting for local datanode to finish write."); |
| } |
| wait(); |
| } |
| if (!running || !datanode.shouldRun) { |
| break; |
| } |
| pkt = ackQueue.getFirst(); |
| expected = pkt.seqno; |
| if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE |
| && seqno != expected) { |
| throw new IOException(myString + "seqno: expected=" |
| + expected + ", received=" + seqno); |
| } |
| lastPacketInBlock = pkt.lastPacketInBlock; |
| } |
| } |
| } catch (InterruptedException ine) { |
| isInterrupted = true; |
| } catch (IOException ioe) { |
| if (Thread.interrupted()) { |
| isInterrupted = true; |
| } else { |
| // continue to run even if can not read from mirror |
| // notify client of the error |
| // and wait for the client to shut down the pipeline |
| mirrorError = true; |
| LOG.info(myString, ioe); |
| } |
| } |
| |
| if (Thread.interrupted() || isInterrupted) { |
| /* The receiver thread cancelled this thread. |
| * We could also check any other status updates from the |
| * receiver thread (e.g. if it is ok to write to replyOut). |
| * It is prudent to not send any more status back to the client |
| * because this datanode has a problem. The upstream datanode |
| * will detect that this datanode is bad, and rightly so. |
| */ |
| LOG.info(myString + ": Thread is interrupted."); |
| running = false; |
| continue; |
| } |
| |
| // If this is the last packet in block, then close block |
| // file and finalize the block before responding success |
| if (lastPacketInBlock) { |
| BlockReceiver.this.close(); |
| final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; |
| block.setNumBytes(replicaInfo.getNumBytes()); |
| datanode.data.finalizeBlock(block); |
| datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT); |
| if (ClientTraceLog.isInfoEnabled() && isClient) { |
| long offset = 0; |
| DatanodeRegistration dnR = |
| datanode.getDNRegistrationForBP(block.getBlockPoolId()); |
| ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT, |
| inAddr, myAddr, block.getNumBytes(), |
| "HDFS_WRITE", clientname, offset, |
| dnR.getStorageID(), block, endTime-startTime)); |
| } else { |
| LOG.info("Received block " + block + " of size " |
| + block.getNumBytes() + " from " + inAddr); |
| } |
| } |
| |
| // construct my ack message |
| Status[] replies = null; |
| if (mirrorError) { // ack read error |
| replies = new Status[2]; |
| replies[0] = Status.SUCCESS; |
| replies[1] = Status.ERROR; |
| } else { |
| short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0 |
| : ack.getNumOfReplies(); |
| replies = new Status[1+ackLen]; |
| replies[0] = Status.SUCCESS; |
| for (int i=0; i<ackLen; i++) { |
| replies[i+1] = ack.getReply(i); |
| } |
| } |
| PipelineAck replyAck = new PipelineAck(expected, replies); |
| |
| if (replyAck.isSuccess() && |
| pkt.offsetInBlock > replicaInfo.getBytesAcked()) |
| replicaInfo.setBytesAcked(pkt.offsetInBlock); |
| |
| // send my ack back to upstream datanode |
| replyAck.write(upstreamOut); |
| upstreamOut.flush(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(myString + ", replyAck=" + replyAck); |
| } |
| if (pkt != null) { |
| // remove the packet from the ack queue |
| removeAckHead(); |
| // update bytes acked |
| } |
| } catch (IOException e) { |
| LOG.warn("IOException in BlockReceiver.run(): ", e); |
| if (running) { |
| try { |
| datanode.checkDiskError(e); // may throw an exception here |
| } catch (IOException ioe) { |
| LOG.warn("DataNode.checkDiskError failed in run() with: ", ioe); |
| } |
| LOG.info(myString, e); |
| running = false; |
| if (!Thread.interrupted()) { // failure not caused by interruption |
| receiverThread.interrupt(); |
| } |
| } |
| } catch (Throwable e) { |
| if (running) { |
| LOG.info(myString, e); |
| running = false; |
| receiverThread.interrupt(); |
| } |
| } |
| } |
| LOG.info(myString + " terminating"); |
| } |
| |
| /** |
| * Remove a packet from the head of the ack queue |
| * |
| * This should be called only when the ack queue is not empty |
| */ |
| private synchronized void removeAckHead() { |
| ackQueue.removeFirst(); |
| notifyAll(); |
| } |
| } |
| |
| /** |
| * This information is cached by the Datanode in the ackQueue. |
| */ |
| private static class Packet { |
| final long seqno; |
| final boolean lastPacketInBlock; |
| final long offsetInBlock; |
| |
| Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock) { |
| this.seqno = seqno; |
| this.lastPacketInBlock = lastPacketInBlock; |
| this.offsetInBlock = offsetInBlock; |
| } |
| |
| @Override |
| public String toString() { |
| return getClass().getSimpleName() + "(seqno=" + seqno |
| + ", lastPacketInBlock=" + lastPacketInBlock |
| + ", offsetInBlock=" + offsetInBlock |
| + ")"; |
| } |
| } |
| } |