blob: 0956cc5b7097a87ec39c0a1b24e0141bd0ab244c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.util.StringUtils;
/** A class that receives a block and writes to its own disk, meanwhile
* may copies it to another site. If a throttler is provided,
* streaming throttling is also supported.
**/
class BlockReceiver implements Closeable, FSConstants {
public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
private DataInputStream in = null; // from where data are read
private DataChecksum checksum; // from where chunks of a block can be read
private OutputStream out = null; // to block file at local disk
private OutputStream cout = null; // output stream for cehcksum file
private DataOutputStream checksumOut = null; // to crc file at local disk
private int bytesPerChecksum;
private int checksumSize;
private ByteBuffer buf; // contains one full packet.
private int bufRead; //amount of valid data in the buf
private int maxPacketReadLen;
protected final String inAddr;
protected final String myAddr;
private String mirrorAddr;
private DataOutputStream mirrorOut;
private Daemon responder = null;
private DataTransferThrottler throttler;
private FSDataset.BlockWriteStreams streams;
private DatanodeInfo srcDataNode = null;
private Checksum partialCrc = null;
private final DataNode datanode;
volatile private boolean mirrorError;
/** The client name. It is empty if a datanode is the client */
private final String clientname;
private final boolean isClient;
private final boolean isDatanode;
/** the block to receive */
private final ExtendedBlock block;
/** the replica to write */
private final ReplicaInPipelineInterface replicaInfo;
/** pipeline stage */
private final BlockConstructionStage stage;
private final boolean isTransfer;
BlockReceiver(final ExtendedBlock block, final DataInputStream in,
final String inAddr, final String myAddr,
final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
final String clientname, final DatanodeInfo srcDataNode,
final DataNode datanode) throws IOException {
try{
this.block = block;
this.in = in;
this.inAddr = inAddr;
this.myAddr = myAddr;
this.srcDataNode = srcDataNode;
this.datanode = datanode;
this.clientname = clientname;
this.isDatanode = clientname.length() == 0;
this.isClient = !this.isDatanode;
//for datanode, we have
//1: clientName.length() == 0, and
//2: stage == null or PIPELINE_SETUP_CREATE
this.stage = stage;
this.isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
|| stage == BlockConstructionStage.TRANSFER_FINALIZED;
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ": " + block
+ "\n isClient =" + isClient + ", clientname=" + clientname
+ "\n isDatanode=" + isDatanode + ", srcDataNode=" + srcDataNode
+ "\n inAddr=" + inAddr + ", myAddr=" + myAddr
);
}
//
// Open local disk out
//
if (isDatanode) { //replication or move
replicaInfo = datanode.data.createTemporary(block);
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
replicaInfo = datanode.data.createRbw(block);
break;
case PIPELINE_SETUP_STREAMING_RECOVERY:
replicaInfo = datanode.data.recoverRbw(
block, newGs, minBytesRcvd, maxBytesRcvd);
block.setGenerationStamp(newGs);
break;
case PIPELINE_SETUP_APPEND:
replicaInfo = datanode.data.append(block, newGs, minBytesRcvd);
if (datanode.blockScanner != null) { // remove from block scanner
datanode.blockScanner.deleteBlock(block.getBlockPoolId(),
block.getLocalBlock());
}
block.setGenerationStamp(newGs);
break;
case PIPELINE_SETUP_APPEND_RECOVERY:
replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
if (datanode.blockScanner != null) { // remove from block scanner
datanode.blockScanner.deleteBlock(block.getBlockPoolId(),
block.getLocalBlock());
}
block.setGenerationStamp(newGs);
break;
case TRANSFER_RBW:
case TRANSFER_FINALIZED:
// this is a transfer destination
replicaInfo = datanode.data.createTemporary(block);
break;
default: throw new IOException("Unsupported stage " + stage +
" while receiving block " + block + " from " + inAddr);
}
}
// read checksum meta information
this.checksum = DataChecksum.newDataChecksum(in);
this.bytesPerChecksum = checksum.getBytesPerChecksum();
this.checksumSize = checksum.getChecksumSize();
final boolean isCreate = isDatanode || isTransfer
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
streams = replicaInfo.createStreams(isCreate,
this.bytesPerChecksum, this.checksumSize);
if (streams != null) {
this.out = streams.dataOut;
this.cout = streams.checksumOut;
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
streams.checksumOut,
SMALL_BUFFER_SIZE));
// write data chunk header if creating a new replica
if (isCreate) {
BlockMetadataHeader.writeHeader(checksumOut, checksum);
}
}
} catch (ReplicaAlreadyExistsException bae) {
throw bae;
} catch (ReplicaNotFoundException bne) {
throw bne;
} catch(IOException ioe) {
IOUtils.closeStream(this);
cleanupBlock();
// check if there is a disk error
IOException cause = FSDataset.getCauseIfDiskError(ioe);
DataNode.LOG.warn("IOException in BlockReceiver constructor. Cause is ",
cause);
if (cause != null) { // possible disk error
ioe = cause;
datanode.checkDiskError(ioe); // may throw an exception here
}
throw ioe;
}
}
/** Return the datanode object. */
DataNode getDataNode() {return datanode;}
/**
* close files.
*/
public void close() throws IOException {
IOException ioe = null;
// close checksum file
try {
if (checksumOut != null) {
checksumOut.flush();
if (datanode.syncOnClose && (cout instanceof FileOutputStream)) {
((FileOutputStream)cout).getChannel().force(true);
}
checksumOut.close();
checksumOut = null;
}
} catch(IOException e) {
ioe = e;
}
// close block file
try {
if (out != null) {
out.flush();
if (datanode.syncOnClose && (out instanceof FileOutputStream)) {
((FileOutputStream)out).getChannel().force(true);
}
out.close();
out = null;
}
} catch (IOException e) {
ioe = e;
}
// disk check
if(ioe != null) {
datanode.checkDiskError(ioe);
throw ioe;
}
}
/**
* Flush block data and metadata files to disk.
* @throws IOException
*/
void flush() throws IOException {
if (checksumOut != null) {
checksumOut.flush();
}
if (out != null) {
out.flush();
}
}
/**
* While writing to mirrorOut, failure to write to mirror should not
* affect this datanode unless it is caused by interruption.
*/
private void handleMirrorOutError(IOException ioe) throws IOException {
String bpid = block.getBlockPoolId();
LOG.info(datanode.getDNRegistrationForBP(bpid) + ":Exception writing block " +
block + " to mirror " + mirrorAddr + "\n" +
StringUtils.stringifyException(ioe));
if (Thread.interrupted()) { // shut down if the thread is interrupted
throw ioe;
} else { // encounter an error while writing to mirror
// continue to run even if can not write to mirror
// notify client of the error
// and wait for the client to shut down the pipeline
mirrorError = true;
}
}
/**
* Verify multiple CRC chunks.
*/
private void verifyChunks( byte[] dataBuf, int dataOff, int len,
byte[] checksumBuf, int checksumOff )
throws IOException {
DatanodeProtocol nn = datanode.getBPNamenode(block.getBlockPoolId());
while (len > 0) {
int chunkLen = Math.min(len, bytesPerChecksum);
checksum.update(dataBuf, dataOff, chunkLen);
if (!checksum.compare(checksumBuf, checksumOff)) {
if (srcDataNode != null) {
try {
LOG.info("report corrupt block " + block + " from datanode " +
srcDataNode + " to namenode");
LocatedBlock lb = new LocatedBlock(block,
new DatanodeInfo[] {srcDataNode});
nn.reportBadBlocks(new LocatedBlock[] {lb});
} catch (IOException e) {
LOG.warn("Failed to report bad block " + block +
" from datanode " + srcDataNode + " to namenode");
}
}
throw new IOException("Unexpected checksum mismatch " +
"while writing " + block + " from " + inAddr);
}
checksum.reset();
dataOff += chunkLen;
checksumOff += checksumSize;
len -= chunkLen;
}
}
/**
* Makes sure buf.position() is zero without modifying buf.remaining().
* It moves the data if position needs to be changed.
*/
private void shiftBufData() {
if (bufRead != buf.limit()) {
throw new IllegalStateException("bufRead should be same as " +
"buf.limit()");
}
//shift the remaining data on buf to the front
if (buf.position() > 0) {
int dataLeft = buf.remaining();
if (dataLeft > 0) {
byte[] b = buf.array();
System.arraycopy(b, buf.position(), b, 0, dataLeft);
}
buf.position(0);
bufRead = dataLeft;
buf.limit(bufRead);
}
}
/**
* reads upto toRead byte to buf at buf.limit() and increments the limit.
* throws an IOException if read does not succeed.
*/
private int readToBuf(int toRead) throws IOException {
if (toRead < 0) {
toRead = (maxPacketReadLen > 0 ? maxPacketReadLen : buf.capacity())
- buf.limit();
}
int nRead = in.read(buf.array(), buf.limit(), toRead);
if (nRead < 0) {
throw new EOFException("while trying to read " + toRead + " bytes");
}
bufRead = buf.limit() + nRead;
buf.limit(bufRead);
return nRead;
}
/**
* Reads (at least) one packet and returns the packet length.
* buf.position() points to the start of the packet and
* buf.limit() point to the end of the packet. There could
* be more data from next packet in buf.<br><br>
*
* It tries to read a full packet with single read call.
* Consecutive packets are usually of the same length.
*/
private void readNextPacket() throws IOException {
/* This dances around buf a little bit, mainly to read
* full packet with single read and to accept arbitarary size
* for next packet at the same time.
*/
if (buf == null) {
/* initialize buffer to the best guess size:
* 'chunksPerPacket' calculation here should match the same
* calculation in DFSClient to make the guess accurate.
*/
int chunkSize = bytesPerChecksum + checksumSize;
int chunksPerPacket = (datanode.writePacketSize - PacketHeader.PKT_HEADER_LEN
+ chunkSize - 1)/chunkSize;
buf = ByteBuffer.allocate(PacketHeader.PKT_HEADER_LEN +
Math.max(chunksPerPacket, 1) * chunkSize);
buf.limit(0);
}
// See if there is data left in the buffer :
if (bufRead > buf.limit()) {
buf.limit(bufRead);
}
while (buf.remaining() < SIZE_OF_INTEGER) {
if (buf.position() > 0) {
shiftBufData();
}
readToBuf(-1);
}
/* We mostly have the full packet or at least enough for an int
*/
buf.mark();
int payloadLen = buf.getInt();
buf.reset();
// check corrupt values for pktLen, 100MB upper limit should be ok?
if (payloadLen < 0 || payloadLen > (100*1024*1024)) {
throw new IOException("Incorrect value for packet payload : " +
payloadLen);
}
// Subtract SIZE_OF_INTEGER since that accounts for the payloadLen that
// we read above.
int pktSize = payloadLen + PacketHeader.PKT_HEADER_LEN - SIZE_OF_INTEGER;
if (buf.remaining() < pktSize) {
//we need to read more data
int toRead = pktSize - buf.remaining();
// first make sure buf has enough space.
int spaceLeft = buf.capacity() - buf.limit();
if (toRead > spaceLeft && buf.position() > 0) {
shiftBufData();
spaceLeft = buf.capacity() - buf.limit();
}
if (toRead > spaceLeft) {
byte oldBuf[] = buf.array();
int toCopy = buf.limit();
buf = ByteBuffer.allocate(toCopy + toRead);
System.arraycopy(oldBuf, 0, buf.array(), 0, toCopy);
buf.limit(toCopy);
}
//now read:
while (toRead > 0) {
toRead -= readToBuf(toRead);
}
}
if (buf.remaining() > pktSize) {
buf.limit(buf.position() + pktSize);
}
if (pktSize > maxPacketReadLen) {
maxPacketReadLen = pktSize;
}
}
/**
* Receives and processes a packet. It can contain many chunks.
* returns the number of data bytes that the packet has.
*/
private int receivePacket() throws IOException {
// read the next packet
readNextPacket();
buf.mark();
PacketHeader header = new PacketHeader();
header.readFields(buf);
int endOfHeader = buf.position();
buf.reset();
// Sanity check the header
if (header.getOffsetInBlock() > replicaInfo.getNumBytes()) {
throw new IOException("Received an out-of-sequence packet for " + block +
"from " + inAddr + " at offset " + header.getOffsetInBlock() +
". Expecting packet starting at " + replicaInfo.getNumBytes());
}
if (header.getDataLen() < 0) {
throw new IOException("Got wrong length during writeBlock(" + block +
") from " + inAddr + " at offset " +
header.getOffsetInBlock() + ": " +
header.getDataLen());
}
return receivePacket(
header.getOffsetInBlock(),
header.getSeqno(),
header.isLastPacketInBlock(),
header.getDataLen(), endOfHeader);
}
/**
* Write the received packet to disk (data only)
*/
private void writePacketToDisk(byte[] pktBuf, int startByteToDisk,
int numBytesToDisk) throws IOException {
out.write(pktBuf, startByteToDisk, numBytesToDisk);
}
/**
* Receives and processes a packet. It can contain many chunks.
* returns the number of data bytes that the packet has.
*/
private int receivePacket(long offsetInBlock, long seqno,
boolean lastPacketInBlock, int len, int endOfHeader) throws IOException {
if (LOG.isDebugEnabled()){
LOG.debug("Receiving one packet for block " + block +
" of length " + len +
" seqno " + seqno +
" offsetInBlock " + offsetInBlock +
" lastPacketInBlock " + lastPacketInBlock);
}
// update received bytes
long firstByteInBlock = offsetInBlock;
offsetInBlock += len;
if (replicaInfo.getNumBytes() < offsetInBlock) {
replicaInfo.setNumBytes(offsetInBlock);
}
// put in queue for pending acks
if (responder != null) {
((PacketResponder)responder.getRunnable()).enqueue(seqno,
lastPacketInBlock, offsetInBlock);
}
//First write the packet to the mirror:
if (mirrorOut != null && !mirrorError) {
try {
mirrorOut.write(buf.array(), buf.position(), buf.remaining());
mirrorOut.flush();
} catch (IOException e) {
handleMirrorOutError(e);
}
}
buf.position(endOfHeader);
if (lastPacketInBlock || len == 0) {
if(LOG.isDebugEnabled()) {
LOG.debug("Receiving an empty packet or the end of the block " + block);
}
} else {
int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
checksumSize;
if ( buf.remaining() != (checksumLen + len)) {
throw new IOException("Data remaining in packet does not match" +
"sum of checksumLen and dataLen " +
" size remaining: " + buf.remaining() +
" data len: " + len +
" checksum Len: " + checksumLen);
}
int checksumOff = buf.position();
int dataOff = checksumOff + checksumLen;
byte pktBuf[] = buf.array();
buf.position(buf.limit()); // move to the end of the data.
/* skip verifying checksum iff this is not the last one in the
* pipeline and clientName is non-null. i.e. Checksum is verified
* on all the datanodes when the data is being written by a
* datanode rather than a client. Whe client is writing the data,
* protocol includes acks and only the last datanode needs to verify
* checksum.
*/
if (mirrorOut == null || isDatanode) {
verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
}
byte[] lastChunkChecksum;
try {
long onDiskLen = replicaInfo.getBytesOnDisk();
if (onDiskLen<offsetInBlock) {
//finally write to the disk :
if (onDiskLen % bytesPerChecksum != 0) {
// prepare to overwrite last checksum
adjustCrcFilePosition();
}
// If this is a partial chunk, then read in pre-existing checksum
if (firstByteInBlock % bytesPerChecksum != 0) {
LOG.info("Packet starts at " + firstByteInBlock +
" for block " + block +
" which is not a multiple of bytesPerChecksum " +
bytesPerChecksum);
long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
onDiskLen / bytesPerChecksum * checksumSize;
computePartialChunkCrc(onDiskLen, offsetInChecksum, bytesPerChecksum);
}
int startByteToDisk = dataOff+(int)(onDiskLen-firstByteInBlock);
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
writePacketToDisk(pktBuf, startByteToDisk, numBytesToDisk);
// If this is a partial chunk, then verify that this is the only
// chunk in the packet. Calculate new crc for this chunk.
if (partialCrc != null) {
if (len > bytesPerChecksum) {
throw new IOException("Got wrong length during writeBlock(" +
block + ") from " + inAddr + " " +
"A packet can have only one partial chunk."+
" len = " + len +
" bytesPerChecksum " + bytesPerChecksum);
}
partialCrc.update(pktBuf, startByteToDisk, numBytesToDisk);
byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
lastChunkChecksum = Arrays.copyOfRange(
buf, buf.length - checksumSize, buf.length
);
checksumOut.write(buf);
if(LOG.isDebugEnabled()) {
LOG.debug("Writing out partial crc for data len " + len);
}
partialCrc = null;
} else {
lastChunkChecksum = Arrays.copyOfRange(
pktBuf,
checksumOff + checksumLen - checksumSize,
checksumOff + checksumLen
);
checksumOut.write(pktBuf, checksumOff, checksumLen);
}
/// flush entire packet
flush();
replicaInfo.setLastChecksumAndDataLen(
offsetInBlock, lastChunkChecksum
);
datanode.metrics.incrBytesWritten(len);
}
} catch (IOException iex) {
datanode.checkDiskError(iex);
throw iex;
}
}
if (throttler != null) { // throttle I/O
throttler.throttle(len);
}
return lastPacketInBlock?-1:len;
}
void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
checksum.writeHeader(mirrorOut);
}
void receiveBlock(
DataOutputStream mirrOut, // output to next datanode
DataInputStream mirrIn, // input from next datanode
DataOutputStream replyOut, // output to previous datanode
String mirrAddr, DataTransferThrottler throttlerArg,
DatanodeInfo[] downstreams) throws IOException {
boolean responderClosed = false;
mirrorOut = mirrOut;
mirrorAddr = mirrAddr;
throttler = throttlerArg;
try {
if (isClient && !isTransfer) {
responder = new Daemon(datanode.threadGroup,
new PacketResponder(replyOut, mirrIn, downstreams));
responder.start(); // start thread to processes responses
}
/*
* Receive until the last packet.
*/
while (receivePacket() >= 0) {}
// wait for all outstanding packet responses. And then
// indicate responder to gracefully shutdown.
// Mark that responder has been closed for future processing
if (responder != null) {
((PacketResponder)responder.getRunnable()).close();
responderClosed = true;
}
// If this write is for a replication or transfer-RBW/Finalized,
// then finalize block or convert temporary to RBW.
// For client-writes, the block is finalized in the PacketResponder.
if (isDatanode || isTransfer) {
// close the block/crc files
close();
block.setNumBytes(replicaInfo.getNumBytes());
if (stage == BlockConstructionStage.TRANSFER_RBW) {
// for TRANSFER_RBW, convert temporary to RBW
datanode.data.convertTemporaryToRbw(block);
} else {
// for isDatnode or TRANSFER_FINALIZED
// Finalize the block. Does this fsync()?
datanode.data.finalizeBlock(block);
}
datanode.metrics.incrBlocksWritten();
}
} catch (IOException ioe) {
LOG.info("Exception in receiveBlock for " + block, ioe);
throw ioe;
} finally {
if (!responderClosed) { // Abnormal termination of the flow above
IOUtils.closeStream(this);
if (responder != null) {
responder.interrupt();
}
cleanupBlock();
}
if (responder != null) {
try {
responder.join();
} catch (InterruptedException e) {
throw new IOException("Interrupted receiveBlock");
}
responder = null;
}
}
}
/** Cleanup a partial block
* if this write is for a replication request (and not from a client)
*/
private void cleanupBlock() throws IOException {
if (isDatanode) {
datanode.data.unfinalizeBlock(block);
}
}
/**
* Adjust the file pointer in the local meta file so that the last checksum
* will be overwritten.
*/
private void adjustCrcFilePosition() throws IOException {
if (out != null) {
out.flush();
}
if (checksumOut != null) {
checksumOut.flush();
}
// rollback the position of the meta file
datanode.data.adjustCrcChannelPosition(block, streams, checksumSize);
}
/**
* Convert a checksum byte array to a long
*/
static private long checksum2long(byte[] checksum) {
long crc = 0L;
for(int i=0; i<checksum.length; i++) {
crc |= (0xffL&(long)checksum[i])<<((checksum.length-i-1)*8);
}
return crc;
}
/**
* reads in the partial crc chunk and computes checksum
* of pre-existing data in partial chunk.
*/
private void computePartialChunkCrc(long blkoff, long ckoff,
int bytesPerChecksum) throws IOException {
// find offset of the beginning of partial chunk.
//
int sizePartialChunk = (int) (blkoff % bytesPerChecksum);
int checksumSize = checksum.getChecksumSize();
blkoff = blkoff - sizePartialChunk;
LOG.info("computePartialChunkCrc sizePartialChunk " +
sizePartialChunk +
" block " + block +
" offset in block " + blkoff +
" offset in metafile " + ckoff);
// create an input stream from the block file
// and read in partial crc chunk into temporary buffer
//
byte[] buf = new byte[sizePartialChunk];
byte[] crcbuf = new byte[checksumSize];
FSDataset.BlockInputStreams instr = null;
try {
instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff);
IOUtils.readFully(instr.dataIn, buf, 0, sizePartialChunk);
// open meta file and read in crc value computer earlier
IOUtils.readFully(instr.checksumIn, crcbuf, 0, crcbuf.length);
} finally {
IOUtils.closeStream(instr);
}
// compute crc of partial chunk from data read in the block file.
partialCrc = new PureJavaCrc32();
partialCrc.update(buf, 0, sizePartialChunk);
LOG.info("Read in partial CRC chunk from disk for block " + block);
// paranoia! verify that the pre-computed crc matches what we
// recalculated just now
if (partialCrc.getValue() != checksum2long(crcbuf)) {
String msg = "Partial CRC " + partialCrc.getValue() +
" does not match value computed the " +
" last time file was closed " +
checksum2long(crcbuf);
throw new IOException(msg);
}
}
private static enum PacketResponderType {
NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE
}
/**
* Processed responses from downstream datanodes in the pipeline
* and sends back replies to the originator.
*/
class PacketResponder implements Runnable, Closeable, FSConstants {
/** queue for packets waiting for ack */
private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
/** the thread that spawns this responder */
private final Thread receiverThread = Thread.currentThread();
/** is this responder running? */
private volatile boolean running = true;
/** input from the next downstream datanode */
private final DataInputStream downstreamIn;
/** output to upstream datanode/client */
private final DataOutputStream upstreamOut;
/** The type of this responder */
private final PacketResponderType type;
/** for log and error messages */
private final String myString;
@Override
public String toString() {
return myString;
}
PacketResponder(final DataOutputStream upstreamOut,
final DataInputStream downstreamIn,
final DatanodeInfo[] downstreams) {
this.downstreamIn = downstreamIn;
this.upstreamOut = upstreamOut;
this.type = downstreams == null? PacketResponderType.NON_PIPELINE
: downstreams.length == 0? PacketResponderType.LAST_IN_PIPELINE
: PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE;
final StringBuilder b = new StringBuilder(getClass().getSimpleName())
.append(": ").append(block).append(", type=").append(type);
if (type != PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
b.append(", downstreams=").append(downstreams.length)
.append(":").append(Arrays.asList(downstreams));
}
this.myString = b.toString();
}
/**
* enqueue the seqno that is still be to acked by the downstream datanode.
* @param seqno
* @param lastPacketInBlock
* @param offsetInBlock
*/
synchronized void enqueue(final long seqno,
final boolean lastPacketInBlock, final long offsetInBlock) {
if (running) {
final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock);
if(LOG.isDebugEnabled()) {
LOG.debug(myString + ": enqueue " + p);
}
ackQueue.addLast(p);
notifyAll();
}
}
/**
* wait for all pending packets to be acked. Then shutdown thread.
*/
@Override
public synchronized void close() {
while (running && ackQueue.size() != 0 && datanode.shouldRun) {
try {
wait();
} catch (InterruptedException e) {
running = false;
}
}
if(LOG.isDebugEnabled()) {
LOG.debug(myString + ": closing");
}
running = false;
notifyAll();
}
/**
* Thread to process incoming acks.
* @see java.lang.Runnable#run()
*/
public void run() {
boolean lastPacketInBlock = false;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (running && datanode.shouldRun && !lastPacketInBlock) {
boolean isInterrupted = false;
try {
Packet pkt = null;
long expected = -2;
PipelineAck ack = new PipelineAck();
long seqno = PipelineAck.UNKOWN_SEQNO;
try {
if (type != PacketResponderType.LAST_IN_PIPELINE
&& !mirrorError) {
// read an ack from downstream datanode
ack.readFields(downstreamIn);
if (LOG.isDebugEnabled()) {
LOG.debug(myString + " got " + ack);
}
seqno = ack.getSeqno();
}
if (seqno != PipelineAck.UNKOWN_SEQNO
|| type == PacketResponderType.LAST_IN_PIPELINE) {
synchronized (this) {
while (running && datanode.shouldRun && ackQueue.size() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(myString + ": seqno=" + seqno +
" waiting for local datanode to finish write.");
}
wait();
}
if (!running || !datanode.shouldRun) {
break;
}
pkt = ackQueue.getFirst();
expected = pkt.seqno;
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
&& seqno != expected) {
throw new IOException(myString + "seqno: expected="
+ expected + ", received=" + seqno);
}
lastPacketInBlock = pkt.lastPacketInBlock;
}
}
} catch (InterruptedException ine) {
isInterrupted = true;
} catch (IOException ioe) {
if (Thread.interrupted()) {
isInterrupted = true;
} else {
// continue to run even if can not read from mirror
// notify client of the error
// and wait for the client to shut down the pipeline
mirrorError = true;
LOG.info(myString, ioe);
}
}
if (Thread.interrupted() || isInterrupted) {
/* The receiver thread cancelled this thread.
* We could also check any other status updates from the
* receiver thread (e.g. if it is ok to write to replyOut).
* It is prudent to not send any more status back to the client
* because this datanode has a problem. The upstream datanode
* will detect that this datanode is bad, and rightly so.
*/
LOG.info(myString + ": Thread is interrupted.");
running = false;
continue;
}
// If this is the last packet in block, then close block
// file and finalize the block before responding success
if (lastPacketInBlock) {
BlockReceiver.this.close();
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
if (ClientTraceLog.isInfoEnabled() && isClient) {
long offset = 0;
DatanodeRegistration dnR =
datanode.getDNRegistrationForBP(block.getBlockPoolId());
ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
inAddr, myAddr, block.getNumBytes(),
"HDFS_WRITE", clientname, offset,
dnR.getStorageID(), block, endTime-startTime));
} else {
LOG.info("Received block " + block + " of size "
+ block.getNumBytes() + " from " + inAddr);
}
}
// construct my ack message
Status[] replies = null;
if (mirrorError) { // ack read error
replies = new Status[2];
replies[0] = Status.SUCCESS;
replies[1] = Status.ERROR;
} else {
short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0
: ack.getNumOfReplies();
replies = new Status[1+ackLen];
replies[0] = Status.SUCCESS;
for (int i=0; i<ackLen; i++) {
replies[i+1] = ack.getReply(i);
}
}
PipelineAck replyAck = new PipelineAck(expected, replies);
if (replyAck.isSuccess() &&
pkt.offsetInBlock > replicaInfo.getBytesAcked())
replicaInfo.setBytesAcked(pkt.offsetInBlock);
// send my ack back to upstream datanode
replyAck.write(upstreamOut);
upstreamOut.flush();
if (LOG.isDebugEnabled()) {
LOG.debug(myString + ", replyAck=" + replyAck);
}
if (pkt != null) {
// remove the packet from the ack queue
removeAckHead();
// update bytes acked
}
} catch (IOException e) {
LOG.warn("IOException in BlockReceiver.run(): ", e);
if (running) {
try {
datanode.checkDiskError(e); // may throw an exception here
} catch (IOException ioe) {
LOG.warn("DataNode.checkDiskError failed in run() with: ", ioe);
}
LOG.info(myString, e);
running = false;
if (!Thread.interrupted()) { // failure not caused by interruption
receiverThread.interrupt();
}
}
} catch (Throwable e) {
if (running) {
LOG.info(myString, e);
running = false;
receiverThread.interrupt();
}
}
}
LOG.info(myString + " terminating");
}
/**
* Remove a packet from the head of the ack queue
*
* This should be called only when the ack queue is not empty
*/
private synchronized void removeAckHead() {
ackQueue.removeFirst();
notifyAll();
}
}
/**
* This information is cached by the Datanode in the ackQueue.
*/
private static class Packet {
final long seqno;
final boolean lastPacketInBlock;
final long offsetInBlock;
Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock) {
this.seqno = seqno;
this.lastPacketInBlock = lastPacketInBlock;
this.offsetInBlock = offsetInBlock;
}
@Override
public String toString() {
return getClass().getSimpleName() + "(seqno=" + seqno
+ ", lastPacketInBlock=" + lastPacketInBlock
+ ", offsetInBlock=" + offsetInBlock
+ ")";
}
}
}