HDFS-1295. Port to yahoo-merge branch. (mattf)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/yahoo-merge@1134449 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index d6e3055..29b413b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -378,6 +378,9 @@
     HDFS-1905. Improve namenode -format command by not making -clusterId
     parameter mandatory. (Bharath Mundlapudi via suresh)
 
+    HDFS-1295. Improve namenode restart times by short-circuiting the
+    first block reports from datanodes. (Matt Foley via suresh)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 7149823..2873c8b 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -909,27 +909,30 @@
      * @throws IOException
      */
     DatanodeCommand blockReport() throws IOException {
-      // send block report
+      // send block report if timer has expired.
       DatanodeCommand cmd = null;
       long startTime = now();
       if (startTime - lastBlockReport > blockReportInterval) {
-        //
-        // Send latest block report if timer has expired.
-        // Get back a list of local block(s) that are obsolete
-        // and can be safely GC'ed.
-        //
-        long brStartTime = now();
+
+        // Create block report
+        long brCreateStartTime = now();
         BlockListAsLongs bReport = data.getBlockReport(blockPoolId);
+
+        // Send block report
+        long brSendStartTime = now();
         cmd = bpNamenode.blockReport(bpRegistration, blockPoolId, bReport
             .getBlockListAsLongs());
-        long brTime = now() - brStartTime;
-        metrics.addBlockReport(brTime);
-        LOG.info("BlockReport of " + bReport.getNumberOfBlocks() +
-            " blocks got processed in " + brTime + " msecs");
-        //
+
+        // Log the block report processing stats from Datanode perspective
+        long brSendCost = now() - brSendStartTime;
+        long brCreateCost = brSendStartTime - brCreateStartTime;
+        metrics.addBlockReport(brSendCost);
+        LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
+            + " blocks took " + brCreateCost + " msec to generate and "
+            + brSendCost + " msecs for RPC and NN processing");
+
         // If we have sent the first block report, then wait a random
         // time before we start the periodic block reports.
-        //
         if (resetBlockReportTime) {
           lastBlockReport = startTime - R.nextInt((int)(blockReportInterval));
           resetBlockReportTime = false;
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java b/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
index 07ee319..5dea248 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
@@ -35,12 +35,13 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
-import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
+import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 
 /**
@@ -759,8 +760,8 @@
         }
 
         // Go through all blocks that need replications.
-        BlockIterator neededReplicationsIterator = neededReplications
-            .iterator();
+        UnderReplicatedBlocks.BlockIterator neededReplicationsIterator = 
+            neededReplications.iterator();
         // skip to the first unprocessed block, which is at replIndex
         for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
           neededReplicationsIterator.next();
@@ -1057,26 +1058,57 @@
   }
 
   /**
+   * StatefulBlockInfo is used to build the "toUC" list, which is a list of
+   * updates to the information about under-construction blocks.
+   * Besides the block in question, it provides the ReplicaState
+   * reported by the datanode in the block report. 
+   */
+  private static class StatefulBlockInfo {
+    final BlockInfoUnderConstruction storedBlock;
+    final ReplicaState reportedState;
+
+    StatefulBlockInfo(BlockInfoUnderConstruction storedBlock, 
+        ReplicaState reportedState) {
+      this.storedBlock = storedBlock;
+      this.reportedState = reportedState;
+    }
+  }
+
+  /**
    * The given node is reporting all its blocks.  Use this info to
-   * update the (machine-->blocklist) and (block-->machinelist) tables.
+   * update the (datanode-->blocklist) and (block-->nodelist) tables.
    */
   public void processReport(DatanodeDescriptor node,
                             BlockListAsLongs report) throws IOException {
-    //
+    
+    boolean isFirstBlockReport = (node.numBlocks() == 0);
+    if (isFirstBlockReport) {
+      // Initial block reports can be processed a lot more efficiently than
+      // ordinary block reports.  This shortens NN restart times.
+      processFirstBlockReport(node, report);
+      return;
+    } 
+
+    // Normal case:
     // Modify the (block-->datanode) map, according to the difference
     // between the old and new block report.
     //
-    Collection<Block> toAdd = new LinkedList<Block>();
+    Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
     Collection<Block> toRemove = new LinkedList<Block>();
     Collection<Block> toInvalidate = new LinkedList<Block>();
     Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
-    node.reportDiff(this, report, toAdd, toRemove, toInvalidate, toCorrupt);
+    Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+    reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC);
 
+    // Process the blocks on each queue
+    for (StatefulBlockInfo b : toUC) { 
+      addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
+    }
     for (Block b : toRemove) {
       removeStoredBlock(b, node);
     }
-    for (Block b : toAdd) {
-      addStoredBlock(b, node, null);
+    for (BlockInfo b : toAdd) {
+      addStoredBlock(b, node, null, true);
     }
     for (Block b : toInvalidate) {
       NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: block "
@@ -1090,16 +1122,286 @@
   }
 
   /**
+   * processFirstBlockReport is intended only for processing "initial" block
+   * reports, the first block report received from a DN after it registers.
+   * It just adds all the valid replicas to the datanode, without calculating 
+   * a toRemove list (since there won't be any).  It also silently discards 
+   * any invalid blocks, thereby deferring their processing until 
+   * the next block report.
+   * @param node - DatanodeDescriptor of the node that sent the report
+   * @param report - the initial block report, to be processed
+   * @throws IOException 
+   */
+  void processFirstBlockReport(DatanodeDescriptor node, BlockListAsLongs report) 
+  throws IOException {
+    if (report == null) return;
+    assert (namesystem.hasWriteLock());
+    assert (node.numBlocks() == 0);
+    BlockReportIterator itBR = report.getBlockReportIterator();
+
+    while(itBR.hasNext()) {
+      Block iblk = itBR.next();
+      ReplicaState reportedState = itBR.getCurrentReplicaState();
+      BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
+      // If block does not belong to any file, we are done.
+      if (storedBlock == null) continue;
+
+      // If block is corrupt, mark it and continue to next block.
+      BlockUCState ucState = storedBlock.getBlockUCState();
+      if (isReplicaCorrupt(iblk, reportedState, storedBlock, ucState, node)) {
+        markBlockAsCorrupt(storedBlock, node);
+        continue;
+      }
+
+      // If block is under construction, add this replica to its list
+      if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
+        ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
+            node, iblk, reportedState);
+        //and fall through to next clause
+      }      
+      //add replica if appropriate
+      if (reportedState == ReplicaState.FINALIZED) {
+        addStoredBlockImmediate(storedBlock, node);
+      }
+    }
+  }
+
+  void reportDiff(DatanodeDescriptor dn,
+      BlockListAsLongs newReport,
+      Collection<BlockInfo> toAdd,    // add to DatanodeDescriptor
+      Collection<Block> toRemove,     // remove from DatanodeDescriptor
+      Collection<Block> toInvalidate, // should be removed from DN
+      Collection<BlockInfo> toCorrupt, // add to corrupt replicas
+      Collection<StatefulBlockInfo> toUC) { // add to under-construction list
+    // place a delimiter in the list which separates blocks 
+    // that have been reported from those that have not
+    BlockInfo delimiter = new BlockInfo(new Block(), 1);
+    boolean added = dn.addBlock(delimiter);
+    assert added : "Delimiting block cannot be present in the node";
+    if(newReport == null)
+      newReport = new BlockListAsLongs();
+    // scan the report and process newly reported blocks
+    BlockReportIterator itBR = newReport.getBlockReportIterator();
+    while(itBR.hasNext()) {
+      Block iblk = itBR.next();
+      ReplicaState iState = itBR.getCurrentReplicaState();
+      BlockInfo storedBlock = processReportedBlock(dn, iblk, iState,
+          toAdd, toInvalidate, toCorrupt, toUC);
+      // move block to the head of the list
+      if(storedBlock != null && storedBlock.findDatanode(dn) >= 0)
+        dn.moveBlockToHead(storedBlock);
+    }
+    // collect blocks that have not been reported
+    // all of them are next to the delimiter
+    Iterator<? extends Block> it = new DatanodeDescriptor.BlockIterator(
+        delimiter.getNext(0), dn);
+    while(it.hasNext())
+      toRemove.add(it.next());
+    dn.removeBlock(delimiter);
+  }
+
+  /**
+   * Process a block replica reported by the data-node.
+   * No side effects except adding to the passed-in Collections.
+   * 
+   * <ol>
+   * <li>If the block is not known to the system (not in blocksMap) then the
+   * data-node should be notified to invalidate this block.</li>
+   * <li>If the reported replica is valid that is has the same generation stamp
+   * and length as recorded on the name-node, then the replica location should
+   * be added to the name-node.</li>
+   * <li>If the reported replica is not valid, then it is marked as corrupt,
+   * which triggers replication of the existing valid replicas.
+   * Corrupt replicas are removed from the system when the block
+   * is fully replicated.</li>
+   * <li>If the reported replica is for a block currently marked "under
+   * construction" in the NN, then it should be added to the 
+   * BlockInfoUnderConstruction's list of replicas.</li>
+   * </ol>
+   * 
+   * @param dn descriptor for the datanode that made the report
+   * @param block reported block replica
+   * @param reportedState reported replica state
+   * @param toAdd add to DatanodeDescriptor
+   * @param toInvalidate missing blocks (not in the blocks map)
+   *        should be removed from the data-node
+   * @param toCorrupt replicas with unexpected length or generation stamp;
+   *        add to corrupt replicas
+   * @param toUC replicas of blocks currently under construction
+   * @return
+   */
+  BlockInfo processReportedBlock(DatanodeDescriptor dn, 
+      Block block, ReplicaState reportedState, 
+      Collection<BlockInfo> toAdd, 
+      Collection<Block> toInvalidate, 
+      Collection<BlockInfo> toCorrupt,
+      Collection<StatefulBlockInfo> toUC) {
+
+    if(FSNamesystem.LOG.isDebugEnabled()) {
+      FSNamesystem.LOG.debug("Reported block " + block
+          + " on " + dn.getName() + " size " + block.getNumBytes()
+          + " replicaState = " + reportedState);
+    }
+
+    // find block by blockId
+    BlockInfo storedBlock = blocksMap.getStoredBlock(block);
+    if(storedBlock == null) {
+      // If blocksMap does not contain reported block id,
+      // the replica should be removed from the data-node.
+      toInvalidate.add(new Block(block));
+      return null;
+    }
+    BlockUCState ucState = storedBlock.getBlockUCState();
+
+    // Block is on the NN
+    if(FSNamesystem.LOG.isDebugEnabled()) {
+      FSNamesystem.LOG.debug("In memory blockUCState = " + ucState);
+    }
+
+    // Ignore replicas already scheduled to be removed from the DN
+    if(belongsToInvalidates(dn.getStorageID(), block)) {
+      assert storedBlock.findDatanode(dn) < 0 : "Block " + block
+        + " in recentInvalidatesSet should not appear in DN " + dn;
+      return storedBlock;
+    }
+
+    if (isReplicaCorrupt(block, reportedState, storedBlock, ucState, dn)) {
+      toCorrupt.add(storedBlock);
+      return storedBlock;
+    }
+
+    if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
+      toUC.add(new StatefulBlockInfo(
+          (BlockInfoUnderConstruction)storedBlock, reportedState));
+      return storedBlock;
+    }
+
+    //add replica if appropriate
+    if (reportedState == ReplicaState.FINALIZED
+        && storedBlock.findDatanode(dn) < 0) {
+      toAdd.add(storedBlock);
+    }
+    return storedBlock;
+  }
+
+  /*
+   * The next two methods test the various cases under which we must conclude
+   * the replica is corrupt, or under construction.  These are laid out
+   * as switch statements, on the theory that it is easier to understand
+   * the combinatorics of reportedState and ucState that way.  It should be
+   * at least as efficient as boolean expressions.
+   */
+  private boolean isReplicaCorrupt(Block iblk, ReplicaState reportedState, 
+      BlockInfo storedBlock, BlockUCState ucState, 
+      DatanodeDescriptor dn) {
+    switch(reportedState) {
+    case FINALIZED:
+      switch(ucState) {
+      case COMPLETE:
+      case COMMITTED:
+        return (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()
+            || storedBlock.getNumBytes() != iblk.getNumBytes());
+      default:
+        return false;
+      }
+    case RBW:
+    case RWR:
+      return storedBlock.isComplete();
+    case RUR:       // should not be reported
+    case TEMPORARY: // should not be reported
+    default:
+      FSNamesystem.LOG.warn("Unexpected replica state " + reportedState
+          + " for block: " + storedBlock + 
+          " on " + dn.getName() + " size " + storedBlock.getNumBytes());
+      return true;
+    }
+  }
+
+  private boolean isBlockUnderConstruction(BlockInfo storedBlock, 
+      BlockUCState ucState, ReplicaState reportedState) {
+    switch(reportedState) {
+    case FINALIZED:
+      switch(ucState) {
+      case UNDER_CONSTRUCTION:
+      case UNDER_RECOVERY:
+        return true;
+      default:
+        return false;
+      }
+    case RBW:
+    case RWR:
+      return (!storedBlock.isComplete());
+    case RUR:       // should not be reported                                                                                             
+    case TEMPORARY: // should not be reported                                                                                             
+    default:
+      return false;
+    }
+  }
+  
+  void addStoredBlockUnderConstruction(
+      BlockInfoUnderConstruction block, 
+      DatanodeDescriptor node, 
+      ReplicaState reportedState) 
+  throws IOException {
+    block.addReplicaIfNotPresent(node, block, reportedState);
+    if (reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) {
+      addStoredBlock(block, node, null, true);
+    }
+  }
+  
+  /**
+   * Faster version of {@link addStoredBlock()}, intended for use with 
+   * initial block report at startup.  If not in startup safe mode, will
+   * call standard addStoredBlock().
+   * Assumes this method is called "immediately" so there is no need to
+   * refresh the storedBlock from blocksMap.
+   * Doesn't handle underReplication/overReplication, or worry about
+   * pendingReplications or corruptReplicas, because it's in startup safe mode.
+   * Doesn't log every block, because there are typically millions of them.
+   * @throws IOException
+   */
+  private void addStoredBlockImmediate(BlockInfo storedBlock,
+                               DatanodeDescriptor node)
+  throws IOException {
+    assert (storedBlock != null && namesystem.hasWriteLock());
+    if (!namesystem.isInStartupSafeMode()) {
+      addStoredBlock(storedBlock, node, null, false);
+      return;
+    }
+
+    // just add it
+    node.addBlock(storedBlock);
+
+    // Now check for completion of blocks and safe block count
+    int numCurrentReplica = countLiveNodes(storedBlock);
+    if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
+        && numCurrentReplica >= minReplication)
+      storedBlock = completeBlock(storedBlock.getINode(), storedBlock);
+
+    // check whether safe replication is reached for the block
+    // only complete blocks are counted towards that
+    if(storedBlock.isComplete())
+      namesystem.incrementSafeBlockCount(numCurrentReplica);
+  }
+
+  /**
    * Modify (block-->datanode) map. Remove block from set of
    * needed replications if this takes care of the problem.
    * @return the block that is stored in blockMap.
    */
-  private Block addStoredBlock(final Block block,
+  private Block addStoredBlock(final BlockInfo block,
                                DatanodeDescriptor node,
-                               DatanodeDescriptor delNodeHint)
+                               DatanodeDescriptor delNodeHint,
+                               boolean logEveryBlock)
   throws IOException {
-    assert (namesystem.hasWriteLock());
-    BlockInfo storedBlock = blocksMap.getStoredBlock(block);
+    assert (block != null && namesystem.hasWriteLock());
+    BlockInfo storedBlock;
+    if (block instanceof BlockInfoUnderConstruction) {
+      //refresh our copy in case the block got completed in another thread
+      storedBlock = blocksMap.getStoredBlock(block);
+    } else {
+      storedBlock = block;
+    }
     if (storedBlock == null || storedBlock.getINode() == null) {
       // If this block does not belong to anyfile, then we are done.
       NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
@@ -1115,29 +1417,25 @@
     INodeFile fileINode = storedBlock.getINode();
     assert fileINode != null : "Block must belong to a file";
 
-    // add block to the data-node
+    // add block to the datanode
     boolean added = node.addBlock(storedBlock);
 
-    int curReplicaDelta = 0;
+    int curReplicaDelta;
     if (added) {
       curReplicaDelta = 1;
-      //
-      // At startup time, because too many new blocks come in
-      // they take up lots of space in the log file.
-      // So, we log only when namenode is out of safemode.
-      //
-      if (!namesystem.isInSafeMode()) {
+      if (logEveryBlock) {
         NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
             + "blockMap updated: " + node.getName() + " is added to " + 
             storedBlock + " size " + storedBlock.getNumBytes());
       }
     } else {
+      curReplicaDelta = 0;
       NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
           + "Redundant addStoredBlock request received for " + storedBlock
           + " on " + node.getName() + " size " + storedBlock.getNumBytes());
     }
 
-    // filter out containingNodes that are marked for decommission.
+    // Now check for completion of blocks and safe block count
     NumberReplicas num = countNodes(storedBlock);
     int numLiveReplicas = num.liveReplicas();
     int numCurrentReplica = numLiveReplicas
@@ -1149,18 +1447,19 @@
 
     // check whether safe replication is reached for the block
     // only complete blocks are counted towards that
+    // Is no-op if not in safe mode.
     if(storedBlock.isComplete())
       namesystem.incrementSafeBlockCount(numCurrentReplica);
 
-    // if file is under construction, then check whether the block
-    // can be completed
+    // if file is under construction, then done for now
     if (fileINode.isUnderConstruction()) {
       return storedBlock;
     }
 
-    // do not handle mis-replicated blocks during startup
-    if (namesystem.isInSafeMode())
+    // do not try to handle over/under-replicated blocks during safe mode
+    if (namesystem.isInSafeMode()) {
       return storedBlock;
+    }
 
     // handle underReplication/overReplication
     short fileReplication = fileINode.getReplication();
@@ -1395,18 +1694,22 @@
     pendingReplications.remove(block);
 
     // blockReceived reports a finalized block
-    Collection<Block> toAdd = new LinkedList<Block>();
+    Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
     Collection<Block> toInvalidate = new LinkedList<Block>();
     Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
-    node.processReportedBlock(this, block, ReplicaState.FINALIZED,
-                              toAdd, toInvalidate, toCorrupt);
-    // the block is only in one of the lists
+    Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+    processReportedBlock(node, block, ReplicaState.FINALIZED,
+        toAdd, toInvalidate, toCorrupt, toUC);
+    // the block is only in one of the to-do lists
     // if it is in none then data-node already has it
-    assert toAdd.size() + toInvalidate.size() <= 1 :
-      "The block should be only in one of the lists.";
+    assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1
+        : "The block should be only in one of the lists.";
 
-    for (Block b : toAdd) {
-      addStoredBlock(b, node, delHintNode);
+    for (StatefulBlockInfo b : toUC) { 
+      addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
+    }
+    for (BlockInfo b : toAdd) {
+      addStoredBlock(b, node, delHintNode, true);
     }
     for (Block b : toInvalidate) {
       NameNode.stateChangeLog.info("BLOCK* NameSystem.addBlock: block "
@@ -1448,6 +1751,32 @@
     return new NumberReplicas(live, count, corrupt, excess);
   }
 
+  /** 
+   * Simpler, faster form of {@link countNodes()} that only returns the number
+   * of live nodes.  If in startup safemode (or its 30-sec extension period),
+   * then it gains speed by ignoring issues of excess replicas or nodes
+   * that are decommissioned or in process of becoming decommissioned.
+   * If not in startup, then it calls {@link countNodes()} instead.
+   * 
+   * @param b - the block being tested
+   * @return count of live nodes for this block
+   */
+  int countLiveNodes(BlockInfo b) {
+    if (!namesystem.isInStartupSafeMode()) {
+      return countNodes(b).liveReplicas();
+    }
+    // else proceed with fast case
+    int live = 0;
+    Iterator<DatanodeDescriptor> nodeIter = blocksMap.nodeIterator(b);
+    Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
+    while (nodeIter.hasNext()) {
+      DatanodeDescriptor node = nodeIter.next();
+      if ((nodesCorrupt == null) || (!nodesCorrupt.contains(node)))
+        live++;
+    }
+    return live;
+  }
+
   private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode,
       NumberReplicas num) {
     int curReplicas = num.liveReplicas();
@@ -1783,7 +2112,7 @@
   /**
    * Return an iterator over the set of blocks for which there are no replicas.
    */
-  BlockIterator getCorruptReplicaBlockIterator() {
+  UnderReplicatedBlocks.BlockIterator getCorruptReplicaBlockIterator() {
     return neededReplications
         .iterator(UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
   }
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java b/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
index 658da2b..1f9b988 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
@@ -24,11 +24,8 @@
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.io.WritableUtils;
@@ -280,7 +277,7 @@
   /**
    * Iterates over the list of blocks belonging to the datanode.
    */
-  static private class BlockIterator implements Iterator<BlockInfo> {
+  static class BlockIterator implements Iterator<BlockInfo> {
     private BlockInfo current;
     private DatanodeDescriptor node;
       
@@ -414,141 +411,6 @@
     return blockarray;
   }
 
-  void reportDiff(BlockManager blockManager,
-                  BlockListAsLongs newReport,
-                  Collection<Block> toAdd,    // add to DatanodeDescriptor
-                  Collection<Block> toRemove, // remove from DatanodeDescriptor
-                  Collection<Block> toInvalidate, // should be removed from DN
-                  Collection<BlockInfo> toCorrupt) {// add to corrupt replicas
-    // place a delimiter in the list which separates blocks 
-    // that have been reported from those that have not
-    BlockInfo delimiter = new BlockInfo(new Block(), 1);
-    boolean added = this.addBlock(delimiter);
-    assert added : "Delimiting block cannot be present in the node";
-    if(newReport == null)
-      newReport = new BlockListAsLongs();
-    // scan the report and process newly reported blocks
-    BlockReportIterator itBR = newReport.getBlockReportIterator();
-    while(itBR.hasNext()) {
-      Block iblk = itBR.next();
-      ReplicaState iState = itBR.getCurrentReplicaState();
-      BlockInfo storedBlock = processReportedBlock(blockManager, iblk, iState,
-                                               toAdd, toInvalidate, toCorrupt);
-      // move block to the head of the list
-      if(storedBlock != null && storedBlock.findDatanode(this) >= 0)
-        this.moveBlockToHead(storedBlock);
-    }
-    // collect blocks that have not been reported
-    // all of them are next to the delimiter
-    Iterator<? extends Block> it = new BlockIterator(delimiter.getNext(0),this);
-    while(it.hasNext())
-      toRemove.add(it.next());
-    this.removeBlock(delimiter);
-  }
-
-  /**
-   * Process a block replica reported by the data-node.
-   * 
-   * <ol>
-   * <li>If the block is not known to the system (not in blocksMap) then the
-   * data-node should be notified to invalidate this block.</li>
-   * <li>If the reported replica is valid that is has the same generation stamp
-   * and length as recorded on the name-node, then the replica location is
-   * added to the name-node.</li>
-   * <li>If the reported replica is not valid, then it is marked as corrupt,
-   * which triggers replication of the existing valid replicas.
-   * Corrupt replicas are removed from the system when the block
-   * is fully replicated.</li>
-   * </ol>
-   * 
-   * @param blockManager
-   * @param block reported block replica
-   * @param rState reported replica state
-   * @param toAdd add to DatanodeDescriptor
-   * @param toInvalidate missing blocks (not in the blocks map)
-   *        should be removed from the data-node
-   * @param toCorrupt replicas with unexpected length or generation stamp;
-   *        add to corrupt replicas
-   * @return
-   */
-  BlockInfo processReportedBlock(
-                  BlockManager blockManager,
-                  Block block,                // reported block replica
-                  ReplicaState rState,        // reported replica state
-                  Collection<Block> toAdd,    // add to DatanodeDescriptor
-                  Collection<Block> toInvalidate, // should be removed from DN
-                  Collection<BlockInfo> toCorrupt) {// add to corrupt replicas
-    if(FSNamesystem.LOG.isDebugEnabled()) {
-      FSNamesystem.LOG.debug("Reported block " + block
-          + " on " + getName() + " size " + block.getNumBytes()
-          + " replicaState = " + rState);
-    }
-
-    // find block by blockId
-    BlockInfo storedBlock = blockManager.blocksMap.getStoredBlock(block);
-    if(storedBlock == null) {
-      // If blocksMap does not contain reported block id,
-      // the replica should be removed from the data-node.
-      toInvalidate.add(new Block(block));
-      return null;
-    }
-
-    if(FSNamesystem.LOG.isDebugEnabled()) {
-      FSNamesystem.LOG.debug("In memory blockUCState = " +
-          storedBlock.getBlockUCState());
-    }
-
-    // Ignore replicas already scheduled to be removed from the DN
-    if(blockManager.belongsToInvalidates(getStorageID(), block)) {
-      assert storedBlock.findDatanode(this) < 0 : "Block " + block 
-        + " in recentInvalidatesSet should not appear in DN " + this;
-      return storedBlock;
-    }
-
-    // Block is on the DN
-    boolean isCorrupt = false;
-    switch(rState) {
-    case FINALIZED:
-      switch(storedBlock.getBlockUCState()) {
-      case COMPLETE:
-      case COMMITTED:
-        if(storedBlock.getGenerationStamp() != block.getGenerationStamp()
-            || storedBlock.getNumBytes() != block.getNumBytes())
-          isCorrupt = true;
-        break;
-      case UNDER_CONSTRUCTION:
-      case UNDER_RECOVERY:
-        ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
-            this, block, rState);
-      }
-      if(!isCorrupt && storedBlock.findDatanode(this) < 0)
-        if (storedBlock.getNumBytes() != block.getNumBytes()) {
-          toAdd.add(new Block(block));
-        } else {
-          toAdd.add(storedBlock);
-        }
-      break;
-    case RBW:
-    case RWR:
-      if(!storedBlock.isComplete())
-        ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent(
-                                                      this, block, rState);
-      else
-        isCorrupt = true;
-      break;
-    case RUR:       // should not be reported
-    case TEMPORARY: // should not be reported
-    default:
-      FSNamesystem.LOG.warn("Unexpected replica state " + rState
-          + " for block: " + storedBlock + 
-          " on " + getName() + " size " + storedBlock.getNumBytes());
-      break;
-    }
-    if(isCorrupt)
-        toCorrupt.add(storedBlock);
-    return storedBlock;
-  }
-
   /** Serialization for FSEditLog */
   void readFieldsFromFSEditLog(DataInput in) throws IOException {
     this.name = DeprecatedUTF8.readString(in);
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 5cf86ea..753ca0a 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -3138,18 +3138,14 @@
    */
   public void processReport(DatanodeID nodeID, String poolId,
       BlockListAsLongs newReport) throws IOException {
-
+    long startTime, endTime;
+    
     writeLock();
+    startTime = now(); //after acquiring write lock
     try {
-    long startTime = now();
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
-                             + "from " + nodeID.getName()+" " + 
-                             newReport.getNumberOfBlocks()+" blocks");
-    }
     DatanodeDescriptor node = getDatanode(nodeID);
     if (node == null || !node.isAlive) {
-      throw new IOException("ProcessReport from dead or unregisterted node: "
+      throw new IOException("ProcessReport from dead or unregistered node: "
                             + nodeID.getName());
     }
     // To minimize startup time, we discard any second (or later) block reports
@@ -3162,10 +3158,16 @@
     }
 
     blockManager.processReport(node, newReport);
-    NameNode.getNameNodeMetrics().addBlockReport((int) (now() - startTime));
     } finally {
+      endTime = now();
       writeUnlock();
     }
+
+    // Log the block report processing stats from Namenode perspective
+    NameNode.getNameNodeMetrics().addBlockReport((int) (endTime - startTime));
+    NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: from "
+        + nodeID.getName() + ", blocks: " + newReport.getNumberOfBlocks()
+        + ", processing time: " + (endTime - startTime) + " msecs");
   }
 
   /**
@@ -3932,7 +3934,13 @@
         }
       }
       // verify blocks replications
+      long startTimeMisReplicatedScan = now();
       blockManager.processMisReplicatedBlocks();
+      NameNode.stateChangeLog.info("STATE* Replication Queue initialization "
+          + "scan for invalid, over- and under-replicated blocks "
+          + "completed in " + (now() - startTimeMisReplicatedScan)
+          + " msec");
+      
       long timeInSafemode = now() - systemStart;
       NameNode.stateChangeLog.info("STATE* Leaving safe mode after " 
                                     + timeInSafemode/1000 + " secs.");