blob: 5aef8b9c401f36596347e3eb7418af9d59fce2be [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.dfs;
import java.util.*;
import org.apache.hadoop.dfs.BlocksMap.BlockInfo;
* DatanodeDescriptor tracks stats on a given DataNode,
* such as available storage capacity, last update time, etc.,
* and maintains a set of blocks stored on the datanode.
* This data structure is a data structure that is internal
* to the namenode. It is *not* sent over-the-wire to the Client
* or the Datnodes. Neither is it stored persistently in the
* fsImage.
public class DatanodeDescriptor extends DatanodeInfo {
/** Block and targets pair */
static class BlockTargetPair {
final Block block;
final DatanodeDescriptor[] targets;
BlockTargetPair(Block block, DatanodeDescriptor[] targets) {
this.block = block;
this.targets = targets;
/** A BlockTargetPair queue. */
private static class BlockQueue {
private final Queue<BlockTargetPair> blockq = new LinkedList<BlockTargetPair>();
/** Size of the queue */
synchronized int size() {return blockq.size();}
/** Enqueue */
synchronized boolean offer(Block block, DatanodeDescriptor[] targets) {
return blockq.offer(new BlockTargetPair(block, targets));
/** Dequeue */
synchronized List<BlockTargetPair> poll(int numTargets) {
if (numTargets <= 0 || blockq.isEmpty()) {
return null;
else {
List<BlockTargetPair> results = new ArrayList<BlockTargetPair>();
for(; !blockq.isEmpty() && numTargets > 0; ) {
numTargets -= blockq.peek().targets.length;
if (numTargets >= 0) {
return results;
private volatile BlockInfo blockList = null;
// isAlive == heartbeats.contains(this)
// This is an optimization, because contains takes O(n) time on Arraylist
protected boolean isAlive = false;
/** A queue of blocks to be replicated by this datanode */
private BlockQueue replicateBlocks = new BlockQueue();
/** A queue of blocks to be recovered by this datanode */
private BlockQueue recoverBlocks = new BlockQueue();
/** A set of blocks to be invalidated by this datanode */
private Set<Block> invalidateBlocks = new TreeSet<Block>();
boolean processedBlockReport = false;
/* Variables for maintaning number of blocks scheduled to be written to
* this datanode. This count is approximate and might be slightly higger
* in case of errors (e.g. datanode does not report if an error occurs
* while writing the block).
private int currApproxBlocksScheduled = 0;
private int prevApproxBlocksScheduled = 0;
private long lastBlocksScheduledRollTime = 0;
private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
/** Default constructor */
public DatanodeDescriptor() {}
/** DatanodeDescriptor constructor
* @param nodeID id of the data node
public DatanodeDescriptor(DatanodeID nodeID) {
this(nodeID, 0L, 0L, 0L, 0);
/** DatanodeDescriptor constructor
* @param nodeID id of the data node
* @param networkLocation location of the data node in network
public DatanodeDescriptor(DatanodeID nodeID,
String networkLocation) {
this(nodeID, networkLocation, null);
/** DatanodeDescriptor constructor
* @param nodeID id of the data node
* @param networkLocation location of the data node in network
* @param hostName it could be different from host specified for DatanodeID
public DatanodeDescriptor(DatanodeID nodeID,
String networkLocation,
String hostName) {
this(nodeID, networkLocation, hostName, 0L, 0L, 0L, 0);
/** DatanodeDescriptor constructor
* @param nodeID id of the data node
* @param capacity capacity of the data node
* @param dfsUsed space used by the data node
* @param remaining remaing capacity of the data node
* @param xceiverCount # of data transfers at the data node
public DatanodeDescriptor(DatanodeID nodeID,
long capacity,
long dfsUsed,
long remaining,
int xceiverCount) {
updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
/** DatanodeDescriptor constructor
* @param nodeID id of the data node
* @param networkLocation location of the data node in network
* @param capacity capacity of the data node, including space used by non-dfs
* @param dfsUsed the used space by dfs datanode
* @param remaining remaing capacity of the data node
* @param xceiverCount # of data transfers at the data node
public DatanodeDescriptor(DatanodeID nodeID,
String networkLocation,
String hostName,
long capacity,
long dfsUsed,
long remaining,
int xceiverCount) {
super(nodeID, networkLocation, hostName);
updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
* Add data-node to the block.
* Add block to the head of the list of blocks belonging to the data-node.
boolean addBlock(BlockInfo b) {
return false;
// add to the head of the data-node list
blockList = b.listInsert(blockList, this);
return true;
* Remove block from the list of blocks belonging to the data-node.
* Remove data-node from the block.
boolean removeBlock(BlockInfo b) {
blockList = b.listRemove(blockList, this);
return b.removeNode(this);
* Move block to the head of the list of blocks belonging to the data-node.
void moveBlockToHead(BlockInfo b) {
blockList = b.listRemove(blockList, this);
blockList = b.listInsert(blockList, this);
void resetBlocks() {
this.capacity = 0;
this.remaining = 0;
this.xceiverCount = 0;
this.blockList = null;
int numBlocks() {
return blockList == null ? 0 : blockList.listCount(this);
void updateHeartbeat(long capacity, long dfsUsed, long remaining,
int xceiverCount) {
this.capacity = capacity;
this.dfsUsed = dfsUsed;
this.remaining = remaining;
this.lastUpdate = System.currentTimeMillis();
this.xceiverCount = xceiverCount;
* Iterates over the list of blocks belonging to the data-node.
static private class BlockIterator implements Iterator<Block> {
private BlockInfo current;
private DatanodeDescriptor node;
BlockIterator(BlockInfo head, DatanodeDescriptor dn) {
this.current = head;
this.node = dn;
public boolean hasNext() {
return current != null;
public BlockInfo next() {
BlockInfo res = current;
current = current.getNext(current.findDatanode(node));
return res;
public void remove() {
throw new UnsupportedOperationException("Sorry. can't remove.");
Iterator<Block> getBlockIterator() {
return new BlockIterator(this.blockList, this);
* Store block replication work.
void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) {
assert(block != null && targets != null && targets.length > 0);
replicateBlocks.offer(block, targets);
* Store block recovery work.
void addBlockToBeRecovered(Block block, DatanodeDescriptor[] targets) {
assert(block != null && targets != null && targets.length > 0);
recoverBlocks.offer(block, targets);
* Store block invalidation work.
void addBlocksToBeInvalidated(List<Block> blocklist) {
assert(blocklist != null && blocklist.size() > 0);
synchronized (invalidateBlocks) {
for(Block blk : blocklist) {
* The number of work items that are pending to be replicated
int getNumberOfBlocksToBeReplicated() {
return replicateBlocks.size();
* The number of block invalidation items that are pending to
* be sent to the datanode
int getNumberOfBlocksToBeInvalidated() {
synchronized (invalidateBlocks) {
return invalidateBlocks.size();
* Set the bit signifying that the first block report from this datanode has been
* processed
void setBlockReportProcessed(boolean val) {
processedBlockReport = val;
* Have we processed any block report from this datanode yet
boolean getBlockReportProcessed() {
return processedBlockReport;
BlockCommand getReplicationCommand(int maxTransfers) {
List<BlockTargetPair> blocktargetlist = replicateBlocks.poll(maxTransfers);
return blocktargetlist == null? null:
new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blocktargetlist);
BlockCommand getLeaseRecoveryCommand(int maxTransfers) {
List<BlockTargetPair> blocktargetlist = recoverBlocks.poll(maxTransfers);
return blocktargetlist == null? null:
new BlockCommand(DatanodeProtocol.DNA_RECOVERBLOCK, blocktargetlist);
* Remove the specified number of blocks to be invalidated
BlockCommand getInvalidateBlocks(int maxblocks) {
Block[] deleteList = getBlockArray(invalidateBlocks, maxblocks);
return deleteList == null?
null: new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, deleteList);
static private Block[] getBlockArray(Collection<Block> blocks, int max) {
Block[] blockarray = null;
synchronized(blocks) {
int available = blocks.size();
int n = available;
if (max > 0 && n > 0) {
if (max < n) {
n = max;
// allocate the properly sized block array ...
blockarray = new Block[n];
// iterate tree collecting n blocks...
Iterator<Block> e = blocks.iterator();
int blockCount = 0;
while (blockCount < n && e.hasNext()) {
// insert into array ...
blockarray[blockCount++] =;
// remove from tree via iterator, if we are removing
// less than total available blocks
if (n < available){
assert(blockarray.length == n);
// now if the number of blocks removed equals available blocks,
// them remove all blocks in one fell swoop via clear
if (n == available) {
return blockarray;
void reportDiff(BlocksMap blocksMap,
BlockListAsLongs newReport,
Collection<Block> toAdd,
Collection<Block> toRemove,
Collection<Block> toInvalidate) {
// place a deilimiter in the list which separates blocks
// that have been reported from those that have not
BlockInfo delimiter = new BlockInfo(new Block(), 1);
boolean added = this.addBlock(delimiter);
assert added : "Delimiting block cannot be present in the node";
if(newReport == null)
newReport = new BlockListAsLongs( new long[0]);
// scan the report and collect newly reported blocks
// Note we are taking special precaution to limit tmp blocks allocated
// as part this block report - which why block list is stored as longs
Block iblk = new Block(); // a fixed new'ed block to be reused with index i
for (int i = 0; i < newReport.getNumberOfBlocks(); ++i) {
iblk.set(newReport.getBlockId(i), newReport.getBlockLen(i),
BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
if(storedBlock == null) {
// If block is not in blocksMap it does not belong to any file
toInvalidate.add(new Block(iblk));
if(storedBlock.findDatanode(this) < 0) {// Known block, but not on the DN
// if the size differs from what is in the blockmap, then return
// the new block. addStoredBlock will then pick up the right size of this
// block and will update the block object in the BlocksMap
if (storedBlock.getNumBytes() != iblk.getNumBytes()) {
toAdd.add(new Block(iblk));
} else {
// move block to the head of the list
// collect blocks that have not been reported
// all of them are next to the delimiter
Iterator<Block> it = new BlockIterator(delimiter.getNext(0), this);
/** Serialization for FSEditLog */
void readFieldsFromFSEditLog(DataInput in) throws IOException { = UTF8.readString(in);
this.storageID = UTF8.readString(in);
this.infoPort = in.readShort() & 0x0000ffff;
this.capacity = in.readLong();
this.dfsUsed = in.readLong();
this.remaining = in.readLong();
this.lastUpdate = in.readLong();
this.xceiverCount = in.readInt();
this.location = Text.readString(in);
this.hostName = Text.readString(in);
setAdminState(WritableUtils.readEnum(in, AdminStates.class));
* @return Approximate number of blocks currently scheduled to be written
* to this datanode.
public int getBlocksScheduled() {
return currApproxBlocksScheduled + prevApproxBlocksScheduled;
* Increments counter for number of blocks scheduled.
void incBlocksScheduled() {
* Decrements counter for number of blocks scheduled.
void decBlocksScheduled() {
if (prevApproxBlocksScheduled > 0) {
} else if (currApproxBlocksScheduled > 0) {
// its ok if both counters are zero.
* Adjusts curr and prev number of blocks scheduled every few minutes.
private void rollBlocksScheduled(long now) {
if ((now - lastBlocksScheduledRollTime) >
prevApproxBlocksScheduled = currApproxBlocksScheduled;
currApproxBlocksScheduled = 0;
lastBlocksScheduledRollTime = now;