blob: d65a59884be99c3f5fa9f61c1a812b1fe96a99b9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.util.StringUtils;
/** A class that receives a block and writes to its own disk, meanwhile
* may copies it to another site. If a throttler is provided,
* streaming throttling is also supported.
**/
class BlockReceiver implements java.io.Closeable, FSConstants {
public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
private Block block; // the block to receive
private DataInputStream in = null; // from where data are read
private DataChecksum checksum; // from where chunks of a block can be read
private OutputStream out = null; // to block file at local disk
private DataOutputStream checksumOut = null; // to crc file at local disk
private int bytesPerChecksum;
private int checksumSize;
private ByteBuffer buf; // contains one full packet.
private int bufRead; //amount of valid data in the buf
private int maxPacketReadLen;
protected final String inAddr;
protected final String myAddr;
private String mirrorAddr;
private DataOutputStream mirrorOut;
private Daemon responder = null;
private BlockTransferThrottler throttler;
private FSDataset.BlockWriteStreams streams;
private String clientName;
DatanodeInfo srcDataNode = null;
private Checksum partialCrc = null;
private final DataNode datanode;
final private ReplicaInPipelineInterface replicaInfo;
volatile private boolean mirrorError;
BlockReceiver(Block block, DataInputStream in, String inAddr,
String myAddr, BlockConstructionStage stage,
long newGs, long minBytesRcvd, long maxBytesRcvd,
String clientName, DatanodeInfo srcDataNode, DataNode datanode)
throws IOException {
try{
this.block = block;
this.in = in;
this.inAddr = inAddr;
this.myAddr = myAddr;
this.clientName = clientName;
this.srcDataNode = srcDataNode;
this.datanode = datanode;
//
// Open local disk out
//
if (clientName.length() == 0) { //replication or move
replicaInfo = datanode.data.createTemporary(block);
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
replicaInfo = datanode.data.createRbw(block);
break;
case PIPELINE_SETUP_STREAMING_RECOVERY:
replicaInfo = datanode.data.recoverRbw(
block, newGs, minBytesRcvd, maxBytesRcvd);
block.setGenerationStamp(newGs);
break;
case PIPELINE_SETUP_APPEND:
replicaInfo = datanode.data.append(block, newGs, minBytesRcvd);
if (datanode.blockScanner != null) { // remove from block scanner
datanode.blockScanner.deleteBlock(block);
}
block.setGenerationStamp(newGs);
break;
case PIPELINE_SETUP_APPEND_RECOVERY:
replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
if (datanode.blockScanner != null) { // remove from block scanner
datanode.blockScanner.deleteBlock(block);
}
block.setGenerationStamp(newGs);
break;
default: throw new IOException("Unsupported stage " + stage +
" while receiving block " + block + " from " + inAddr);
}
}
// read checksum meta information
this.checksum = DataChecksum.newDataChecksum(in);
this.bytesPerChecksum = checksum.getBytesPerChecksum();
this.checksumSize = checksum.getChecksumSize();
boolean isCreate = stage == BlockConstructionStage.PIPELINE_SETUP_CREATE
|| clientName.length() == 0;
streams = replicaInfo.createStreams(isCreate,
this.bytesPerChecksum, this.checksumSize);
if (streams != null) {
this.out = streams.dataOut;
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
streams.checksumOut,
SMALL_BUFFER_SIZE));
// write data chunk header if creating a new replica
if (isCreate) {
BlockMetadataHeader.writeHeader(checksumOut, checksum);
}
}
} catch (ReplicaAlreadyExistsException bae) {
throw bae;
} catch (ReplicaNotFoundException bne) {
throw bne;
} catch(IOException ioe) {
IOUtils.closeStream(this);
cleanupBlock();
// check if there is a disk error
IOException cause = FSDataset.getCauseIfDiskError(ioe);
DataNode.LOG.warn("IOException in BlockReceiver constructor. Cause is ",
cause);
if (cause != null) { // possible disk error
ioe = cause;
datanode.checkDiskError(ioe); // may throw an exception here
}
throw ioe;
}
}
/** Return the datanode object. */
DataNode getDataNode() {return datanode;}
/**
* close files.
*/
public void close() throws IOException {
IOException ioe = null;
// close checksum file
try {
if (checksumOut != null) {
checksumOut.flush();
checksumOut.close();
checksumOut = null;
}
} catch(IOException e) {
ioe = e;
}
// close block file
try {
if (out != null) {
out.flush();
out.close();
out = null;
}
} catch (IOException e) {
ioe = e;
}
// disk check
if(ioe != null) {
datanode.checkDiskError(ioe);
throw ioe;
}
}
/**
* Flush block data and metadata files to disk.
* @throws IOException
*/
void flush() throws IOException {
if (checksumOut != null) {
checksumOut.flush();
}
if (out != null) {
out.flush();
}
}
/**
* While writing to mirrorOut, failure to write to mirror should not
* affect this datanode unless it is caused by interruption.
*/
private void handleMirrorOutError(IOException ioe) throws IOException {
LOG.info(datanode.dnRegistration + ":Exception writing block " +
block + " to mirror " + mirrorAddr + "\n" +
StringUtils.stringifyException(ioe));
if (Thread.interrupted()) { // shut down if the thread is interrupted
throw ioe;
} else { // encounter an error while writing to mirror
// continue to run even if can not write to mirror
// notify client of the error
// and wait for the client to shut down the pipeline
mirrorError = true;
}
}
/**
* Verify multiple CRC chunks.
*/
private void verifyChunks( byte[] dataBuf, int dataOff, int len,
byte[] checksumBuf, int checksumOff )
throws IOException {
while (len > 0) {
int chunkLen = Math.min(len, bytesPerChecksum);
checksum.update(dataBuf, dataOff, chunkLen);
if (!checksum.compare(checksumBuf, checksumOff)) {
if (srcDataNode != null) {
try {
LOG.info("report corrupt block " + block + " from datanode " +
srcDataNode + " to namenode");
LocatedBlock lb = new LocatedBlock(block,
new DatanodeInfo[] {srcDataNode});
datanode.namenode.reportBadBlocks(new LocatedBlock[] {lb});
} catch (IOException e) {
LOG.warn("Failed to report bad block " + block +
" from datanode " + srcDataNode + " to namenode");
}
}
throw new IOException("Unexpected checksum mismatch " +
"while writing " + block + " from " + inAddr);
}
checksum.reset();
dataOff += chunkLen;
checksumOff += checksumSize;
len -= chunkLen;
}
}
/**
* Makes sure buf.position() is zero without modifying buf.remaining().
* It moves the data if position needs to be changed.
*/
private void shiftBufData() {
if (bufRead != buf.limit()) {
throw new IllegalStateException("bufRead should be same as " +
"buf.limit()");
}
//shift the remaining data on buf to the front
if (buf.position() > 0) {
int dataLeft = buf.remaining();
if (dataLeft > 0) {
byte[] b = buf.array();
System.arraycopy(b, buf.position(), b, 0, dataLeft);
}
buf.position(0);
bufRead = dataLeft;
buf.limit(bufRead);
}
}
/**
* reads upto toRead byte to buf at buf.limit() and increments the limit.
* throws an IOException if read does not succeed.
*/
private int readToBuf(int toRead) throws IOException {
if (toRead < 0) {
toRead = (maxPacketReadLen > 0 ? maxPacketReadLen : buf.capacity())
- buf.limit();
}
int nRead = in.read(buf.array(), buf.limit(), toRead);
if (nRead < 0) {
throw new EOFException("while trying to read " + toRead + " bytes");
}
bufRead = buf.limit() + nRead;
buf.limit(bufRead);
return nRead;
}
/**
* Reads (at least) one packet and returns the packet length.
* buf.position() points to the start of the packet and
* buf.limit() point to the end of the packet. There could
* be more data from next packet in buf.<br><br>
*
* It tries to read a full packet with single read call.
* Consecutive packets are usually of the same length.
*/
private void readNextPacket() throws IOException {
/* This dances around buf a little bit, mainly to read
* full packet with single read and to accept arbitarary size
* for next packet at the same time.
*/
if (buf == null) {
/* initialize buffer to the best guess size:
* 'chunksPerPacket' calculation here should match the same
* calculation in DFSClient to make the guess accurate.
*/
int chunkSize = bytesPerChecksum + checksumSize;
int chunksPerPacket = (datanode.writePacketSize - DataNode.PKT_HEADER_LEN -
SIZE_OF_INTEGER + chunkSize - 1)/chunkSize;
buf = ByteBuffer.allocate(DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER +
Math.max(chunksPerPacket, 1) * chunkSize);
buf.limit(0);
}
// See if there is data left in the buffer :
if (bufRead > buf.limit()) {
buf.limit(bufRead);
}
while (buf.remaining() < SIZE_OF_INTEGER) {
if (buf.position() > 0) {
shiftBufData();
}
readToBuf(-1);
}
/* We mostly have the full packet or at least enough for an int
*/
buf.mark();
int payloadLen = buf.getInt();
buf.reset();
// check corrupt values for pktLen, 100MB upper limit should be ok?
if (payloadLen < 0 || payloadLen > (100*1024*1024)) {
throw new IOException("Incorrect value for packet payload : " +
payloadLen);
}
int pktSize = payloadLen + DataNode.PKT_HEADER_LEN;
if (buf.remaining() < pktSize) {
//we need to read more data
int toRead = pktSize - buf.remaining();
// first make sure buf has enough space.
int spaceLeft = buf.capacity() - buf.limit();
if (toRead > spaceLeft && buf.position() > 0) {
shiftBufData();
spaceLeft = buf.capacity() - buf.limit();
}
if (toRead > spaceLeft) {
byte oldBuf[] = buf.array();
int toCopy = buf.limit();
buf = ByteBuffer.allocate(toCopy + toRead);
System.arraycopy(oldBuf, 0, buf.array(), 0, toCopy);
buf.limit(toCopy);
}
//now read:
while (toRead > 0) {
toRead -= readToBuf(toRead);
}
}
if (buf.remaining() > pktSize) {
buf.limit(buf.position() + pktSize);
}
if (pktSize > maxPacketReadLen) {
maxPacketReadLen = pktSize;
}
}
/**
* Receives and processes a packet. It can contain many chunks.
* returns the number of data bytes that the packet has.
*/
private int receivePacket() throws IOException {
// read the next packet
readNextPacket();
buf.mark();
//read the header
buf.getInt(); // packet length
long offsetInBlock = buf.getLong(); // get offset of packet in block
if (offsetInBlock > replicaInfo.getNumBytes()) {
throw new IOException("Received an out-of-sequence packet for " + block +
"from " + inAddr + " at offset " + offsetInBlock +
". Expecting packet starting at " + replicaInfo.getNumBytes());
}
long seqno = buf.getLong(); // get seqno
boolean lastPacketInBlock = (buf.get() != 0);
int len = buf.getInt();
if (len < 0) {
throw new IOException("Got wrong length during writeBlock(" + block +
") from " + inAddr + " at offset " +
offsetInBlock + ": " + len);
}
int endOfHeader = buf.position();
buf.reset();
return receivePacket(offsetInBlock, seqno, lastPacketInBlock, len, endOfHeader);
}
/**
* Write the received packet to disk (data only)
*/
private void writePacketToDisk(byte[] pktBuf, int startByteToDisk,
int numBytesToDisk) throws IOException {
out.write(pktBuf, startByteToDisk, numBytesToDisk);
}
/**
* Receives and processes a packet. It can contain many chunks.
* returns the number of data bytes that the packet has.
*/
private int receivePacket(long offsetInBlock, long seqno,
boolean lastPacketInBlock, int len, int endOfHeader) throws IOException {
if (LOG.isDebugEnabled()){
LOG.debug("Receiving one packet for block " + block +
" of length " + len +
" seqno " + seqno +
" offsetInBlock " + offsetInBlock +
" lastPacketInBlock " + lastPacketInBlock);
}
// update received bytes
long firstByteInBlock = offsetInBlock;
offsetInBlock += len;
if (replicaInfo.getNumBytes() < offsetInBlock) {
replicaInfo.setNumBytes(offsetInBlock);
}
// put in queue for pending acks
if (responder != null) {
((PacketResponder)responder.getRunnable()).enqueue(seqno,
lastPacketInBlock, offsetInBlock);
}
//First write the packet to the mirror:
if (mirrorOut != null && !mirrorError) {
try {
mirrorOut.write(buf.array(), buf.position(), buf.remaining());
mirrorOut.flush();
} catch (IOException e) {
handleMirrorOutError(e);
}
}
buf.position(endOfHeader);
if (lastPacketInBlock || len == 0) {
LOG.debug("Receiving an empty packet or the end of the block " + block);
} else {
int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
checksumSize;
if ( buf.remaining() != (checksumLen + len)) {
throw new IOException("Data remaining in packet does not match" +
"sum of checksumLen and dataLen " +
" size remaining: " + buf.remaining() +
" data len: " + len +
" checksum Len: " + checksumLen);
}
int checksumOff = buf.position();
int dataOff = checksumOff + checksumLen;
byte pktBuf[] = buf.array();
buf.position(buf.limit()); // move to the end of the data.
/* skip verifying checksum iff this is not the last one in the
* pipeline and clientName is non-null. i.e. Checksum is verified
* on all the datanodes when the data is being written by a
* datanode rather than a client. Whe client is writing the data,
* protocol includes acks and only the last datanode needs to verify
* checksum.
*/
if (mirrorOut == null || clientName.length() == 0) {
verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
}
try {
long onDiskLen = replicaInfo.getBytesOnDisk();
if (onDiskLen<offsetInBlock) {
//finally write to the disk :
if (onDiskLen % bytesPerChecksum != 0) {
// prepare to overwrite last checksum
adjustCrcFilePosition();
}
// If this is a partial chunk, then read in pre-existing checksum
if (firstByteInBlock % bytesPerChecksum != 0) {
LOG.info("Packet starts at " + firstByteInBlock +
" for block " + block +
" which is not a multiple of bytesPerChecksum " +
bytesPerChecksum);
long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
onDiskLen / bytesPerChecksum * checksumSize;
computePartialChunkCrc(onDiskLen, offsetInChecksum, bytesPerChecksum);
}
int startByteToDisk = dataOff+(int)(onDiskLen-firstByteInBlock);
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
writePacketToDisk(pktBuf, startByteToDisk, numBytesToDisk);
// If this is a partial chunk, then verify that this is the only
// chunk in the packet. Calculate new crc for this chunk.
if (partialCrc != null) {
if (len > bytesPerChecksum) {
throw new IOException("Got wrong length during writeBlock(" +
block + ") from " + inAddr + " " +
"A packet can have only one partial chunk."+
" len = " + len +
" bytesPerChecksum " + bytesPerChecksum);
}
partialCrc.update(pktBuf, startByteToDisk, numBytesToDisk);
byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
checksumOut.write(buf);
LOG.debug("Writing out partial crc for data len " + len);
partialCrc = null;
} else {
checksumOut.write(pktBuf, checksumOff, checksumLen);
}
replicaInfo.setBytesOnDisk(offsetInBlock);
datanode.myMetrics.bytesWritten.inc(len);
/// flush entire packet
flush();
}
} catch (IOException iex) {
datanode.checkDiskError(iex);
throw iex;
}
}
if (throttler != null) { // throttle I/O
throttler.throttle(len);
}
return lastPacketInBlock?-1:len;
}
void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
checksum.writeHeader(mirrorOut);
}
void receiveBlock(
DataOutputStream mirrOut, // output to next datanode
DataInputStream mirrIn, // input from next datanode
DataOutputStream replyOut, // output to previous datanode
String mirrAddr, BlockTransferThrottler throttlerArg,
int numTargets) throws IOException {
boolean responderClosed = false;
mirrorOut = mirrOut;
mirrorAddr = mirrAddr;
throttler = throttlerArg;
try {
if (clientName.length() > 0) {
responder = new Daemon(datanode.threadGroup,
new PacketResponder(this, block, mirrIn,
replyOut, numTargets,
Thread.currentThread()));
responder.start(); // start thread to processes reponses
}
/*
* Receive until the last packet.
*/
while (receivePacket() >= 0) {}
// wait for all outstanding packet responses. And then
// indicate responder to gracefully shutdown.
// Mark that responder has been closed for future processing
if (responder != null) {
((PacketResponder)responder.getRunnable()).close();
responderClosed = true;
}
// if this write is for a replication request (and not
// from a client), then finalize block. For client-writes,
// the block is finalized in the PacketResponder.
if (clientName.length() == 0) {
// close the block/crc files
close();
// Finalize the block. Does this fsync()?
block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
datanode.myMetrics.blocksWritten.inc();
}
} catch (IOException ioe) {
LOG.info("Exception in receiveBlock for block " + block +
" " + ioe);
throw ioe;
} finally {
if (!responderClosed) { // Abnormal termination of the flow above
IOUtils.closeStream(this);
if (responder != null) {
responder.interrupt();
}
cleanupBlock();
}
if (responder != null) {
try {
responder.join();
} catch (InterruptedException e) {
throw new IOException("Interrupted receiveBlock");
}
responder = null;
}
}
}
/** Cleanup a partial block
* if this write is for a replication request (and not from a client)
*/
private void cleanupBlock() throws IOException {
if (clientName.length() == 0) { // not client write
datanode.data.unfinalizeBlock(block);
}
}
/**
* Adjust the file pointer in the local meta file so that the last checksum
* will be overwritten.
*/
private void adjustCrcFilePosition() throws IOException {
if (out != null) {
out.flush();
}
if (checksumOut != null) {
checksumOut.flush();
}
// rollback the position of the meta file
datanode.data.adjustCrcChannelPosition(block, streams, checksumSize);
}
/**
* reads in the partial crc chunk and computes checksum
* of pre-existing data in partial chunk.
*/
private void computePartialChunkCrc(long blkoff, long ckoff,
int bytesPerChecksum) throws IOException {
// find offset of the beginning of partial chunk.
//
int sizePartialChunk = (int) (blkoff % bytesPerChecksum);
int checksumSize = checksum.getChecksumSize();
blkoff = blkoff - sizePartialChunk;
LOG.info("computePartialChunkCrc sizePartialChunk " +
sizePartialChunk +
" block " + block +
" offset in block " + blkoff +
" offset in metafile " + ckoff);
// create an input stream from the block file
// and read in partial crc chunk into temporary buffer
//
byte[] buf = new byte[sizePartialChunk];
byte[] crcbuf = new byte[checksumSize];
FSDataset.BlockInputStreams instr = null;
try {
instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff);
IOUtils.readFully(instr.dataIn, buf, 0, sizePartialChunk);
// open meta file and read in crc value computer earlier
IOUtils.readFully(instr.checksumIn, crcbuf, 0, crcbuf.length);
} finally {
IOUtils.closeStream(instr);
}
// compute crc of partial chunk from data read in the block file.
partialCrc = new PureJavaCrc32();
partialCrc.update(buf, 0, sizePartialChunk);
LOG.info("Read in partial CRC chunk from disk for block " + block);
// paranoia! verify that the pre-computed crc matches what we
// recalculated just now
if (partialCrc.getValue() != FSInputChecker.checksum2long(crcbuf)) {
String msg = "Partial CRC " + partialCrc.getValue() +
" does not match value computed the " +
" last time file was closed " +
FSInputChecker.checksum2long(crcbuf);
throw new IOException(msg);
}
//LOG.debug("Partial CRC matches 0x" +
// Long.toHexString(partialCrc.getValue()));
}
/**
* Processed responses from downstream datanodes in the pipeline
* and sends back replies to the originator.
*/
class PacketResponder implements Runnable, FSConstants {
//packet waiting for ack
private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
private volatile boolean running = true;
private Block block;
DataInputStream mirrorIn; // input from downstream datanode
DataOutputStream replyOut; // output to upstream datanode
private int numTargets; // number of downstream datanodes including myself
private BlockReceiver receiver; // The owner of this responder.
private Thread receiverThread; // the thread that spawns this responder
public String toString() {
return "PacketResponder " + numTargets + " for Block " + this.block;
}
PacketResponder(BlockReceiver receiver, Block b, DataInputStream in,
DataOutputStream out, int numTargets,
Thread receiverThread) {
this.receiverThread = receiverThread;
this.receiver = receiver;
this.block = b;
mirrorIn = in;
replyOut = out;
this.numTargets = numTargets;
}
/**
* enqueue the seqno that is still be to acked by the downstream datanode.
* @param seqno
* @param lastPacketInBlock
* @param lastByteInPacket
*/
synchronized void enqueue(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
if (running) {
LOG.debug("PacketResponder " + numTargets + " adding seqno " + seqno +
" to ack queue.");
ackQueue.addLast(new Packet(seqno, lastPacketInBlock, lastByteInPacket));
notifyAll();
}
}
/**
* wait for all pending packets to be acked. Then shutdown thread.
*/
synchronized void close() {
while (running && ackQueue.size() != 0 && datanode.shouldRun) {
try {
wait();
} catch (InterruptedException e) {
running = false;
}
}
LOG.debug("PacketResponder " + numTargets +
" for block " + block + " Closing down.");
running = false;
notifyAll();
}
/**
* Thread to process incoming acks.
* @see java.lang.Runnable#run()
*/
public void run() {
boolean lastPacketInBlock = false;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (running && datanode.shouldRun && !lastPacketInBlock) {
boolean isInterrupted = false;
try {
Packet pkt = null;
long expected = -2;
PipelineAck ack = new PipelineAck();
long seqno = PipelineAck.UNKOWN_SEQNO;
try {
if (numTargets != 0 && !mirrorError) {// not the last DN & no mirror error
// read an ack from downstream datanode
ack.readFields(mirrorIn);
if (LOG.isDebugEnabled()) {
LOG.debug("PacketResponder " + numTargets + " got " + ack);
}
seqno = ack.getSeqno();
}
if (seqno != PipelineAck.UNKOWN_SEQNO || numTargets == 0) {
synchronized (this) {
while (running && datanode.shouldRun && ackQueue.size() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("PacketResponder " + numTargets +
" seqno = " + seqno +
" for block " + block +
" waiting for local datanode to finish write.");
}
wait();
}
if (!running || !datanode.shouldRun) {
break;
}
pkt = ackQueue.getFirst();
expected = pkt.seqno;
if (numTargets > 0 && seqno != expected) {
throw new IOException("PacketResponder " + numTargets +
" for block " + block +
" expected seqno:" + expected +
" received:" + seqno);
}
lastPacketInBlock = pkt.lastPacketInBlock;
}
}
} catch (InterruptedException ine) {
isInterrupted = true;
} catch (IOException ioe) {
if (Thread.interrupted()) {
isInterrupted = true;
} else {
// continue to run even if can not read from mirror
// notify client of the error
// and wait for the client to shut down the pipeline
mirrorError = true;
LOG.info("PacketResponder " + block + " " + numTargets +
" Exception " + StringUtils.stringifyException(ioe));
}
}
if (Thread.interrupted() || isInterrupted) {
/* The receiver thread cancelled this thread.
* We could also check any other status updates from the
* receiver thread (e.g. if it is ok to write to replyOut).
* It is prudent to not send any more status back to the client
* because this datanode has a problem. The upstream datanode
* will detect that this datanode is bad, and rightly so.
*/
LOG.info("PacketResponder " + block + " " + numTargets +
" : Thread is interrupted.");
running = false;
continue;
}
// If this is the last packet in block, then close block
// file and finalize the block before responding success
if (lastPacketInBlock) {
receiver.close();
final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
block.setNumBytes(replicaInfo.getNumBytes());
datanode.data.finalizeBlock(block);
datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
if (ClientTraceLog.isInfoEnabled() &&
receiver.clientName.length() > 0) {
long offset = 0;
ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
receiver.inAddr, receiver.myAddr, block.getNumBytes(),
"HDFS_WRITE", receiver.clientName, offset,
datanode.dnRegistration.getStorageID(), block, endTime-startTime));
} else {
LOG.info("Received block " + block +
" of size " + block.getNumBytes() +
" from " + receiver.inAddr);
}
}
// construct my ack message
Status[] replies = null;
if (mirrorError) { // ack read error
replies = new Status[2];
replies[0] = SUCCESS;
replies[1] = ERROR;
} else {
short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
replies = new Status[1+ackLen];
replies[0] = SUCCESS;
for (int i=0; i<ackLen; i++) {
replies[i+1] = ack.getReply(i);
}
}
PipelineAck replyAck = new PipelineAck(expected, replies);
// send my ack back to upstream datanode
replyAck.write(replyOut);
replyOut.flush();
if (LOG.isDebugEnabled()) {
LOG.debug("PacketResponder " + numTargets +
" for block " + block +
" responded an ack: " + replyAck);
}
if (pkt != null) {
// remove the packet from the ack queue
removeAckHead();
// update bytes acked
if (replyAck.isSuccess() &&
pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
replicaInfo.setBytesAcked(pkt.lastByteInBlock);
}
}
} catch (IOException e) {
LOG.warn("IOException in BlockReceiver.run(): ", e);
if (running) {
try {
datanode.checkDiskError(e); // may throw an exception here
} catch (IOException ioe) {
LOG.warn("DataNode.chekDiskError failed in run() with: ", ioe);
}
LOG.info("PacketResponder " + block + " " + numTargets +
" Exception " + StringUtils.stringifyException(e));
running = false;
if (!Thread.interrupted()) { // failure not caused by interruption
receiverThread.interrupt();
}
}
} catch (Throwable e) {
if (running) {
LOG.info("PacketResponder " + block + " " + numTargets +
" Exception " + StringUtils.stringifyException(e));
running = false;
receiverThread.interrupt();
}
}
}
LOG.info("PacketResponder " + numTargets +
" for block " + block + " terminating");
}
/**
* Remove a packet from the head of the ack queue
*
* This should be called only when the ack queue is not empty
*/
private synchronized void removeAckHead() {
ackQueue.removeFirst();
notifyAll();
}
}
/**
* This information is cached by the Datanode in the ackQueue.
*/
static private class Packet {
long seqno;
boolean lastPacketInBlock;
long lastByteInBlock;
Packet(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
this.seqno = seqno;
this.lastPacketInBlock = lastPacketInBlock;
this.lastByteInBlock = lastByteInPacket;
}
}
}