Merge remote-tracking branch 'origin/trunk' into HDFS-8966
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index dbe0726..0a19007 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -94,6 +94,7 @@
 
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
 
+import org.apache.hadoop.hdfs.util.RwLock;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -112,7 +113,7 @@
  * Keeps information related to the blocks stored in the Hadoop cluster.
  */
 @InterfaceAudience.Private
-public class BlockManager implements BlockStatsMXBean {
+public class BlockManager implements RwLock, BlockStatsMXBean {
 
   public static final Logger LOG = LoggerFactory.getLogger(BlockManager.class);
   public static final Logger blockLog = NameNode.blockStateChangeLog;
@@ -125,6 +126,7 @@
 
   private final Namesystem namesystem;
 
+  private final BlockManagerLock lock;
   private final DatanodeManager datanodeManager;
   private final HeartbeatManager heartbeatManager;
   private final BlockTokenSecretManager blockTokenSecretManager;
@@ -302,6 +304,7 @@
   public BlockManager(final Namesystem namesystem, final Configuration conf)
     throws IOException {
     this.namesystem = namesystem;
+    this.lock = new BlockManagerLock(namesystem);
     datanodeManager = new DatanodeManager(this, namesystem, conf);
     heartbeatManager = datanodeManager.getHeartbeatManager();
 
@@ -519,7 +522,7 @@
 
   /** Dump meta data to out. */
   public void metaSave(PrintWriter out) {
-    assert namesystem.hasWriteLock();
+    assert hasWriteLock();
     final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
     final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
     datanodeManager.fetchDatanodes(live, dead, false);
@@ -551,7 +554,7 @@
     // Dump all datanodes
     getDatanodeManager().datanodeDump(out);
   }
-  
+
   /**
    * Dump the metadata for the given block in a human-readable
    * form.
@@ -580,12 +583,12 @@
       out.print(fileName + ": ");
     }
     // l: == live:, d: == decommissioned c: == corrupt e: == excess
-    out.print(block + ((usableReplicas > 0)? "" : " MISSING") + 
+    out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
               " (replicas:" +
               " l: " + numReplicas.liveReplicas() +
               " d: " + numReplicas.decommissionedAndDecommissioning() +
               " c: " + numReplicas.corruptReplicas() +
-              " e: " + numReplicas.excessReplicas() + ") "); 
+              " e: " + numReplicas.excessReplicas() + ") ");
 
     Collection<DatanodeDescriptor> corruptNodes = 
                                   corruptReplicas.getNodes(block);
@@ -956,7 +959,7 @@
       final boolean inSnapshot, FileEncryptionInfo feInfo,
       ErasureCodingPolicy ecPolicy)
       throws IOException {
-    assert namesystem.hasReadLock();
+    assert hasReadLock();
     if (blocks == null) {
       return null;
     } else if (blocks.length == 0) {
@@ -990,6 +993,41 @@
     }
   }
 
+  @Override
+  public void readLock() {
+    lock.readLock().lock();
+  }
+
+  @Override
+  public void readUnlock() {
+    lock.readLock().unlock();
+  }
+
+  @Override
+  public boolean hasReadLock() {
+    return lock.hasReadLock();
+  }
+
+  @Override
+  public boolean hasWriteLock() {
+    return lock.hasWriteLock();
+  }
+
+  @Override
+  public void writeLock() {
+    lock.writeLock().lock();
+  }
+
+  @Override
+  public void writeLockInterruptibly() throws InterruptedException {
+    lock.writeLock().lockInterruptibly();
+  }
+
+  @Override
+  public void writeUnlock() {
+    lock.writeLock().unlock();
+  }
+
   /** @return current access keys. */
   public ExportedBlockKeys getBlockKeys() {
     return isBlockTokenEnabled()? blockTokenSecretManager.exportKeys()
@@ -1105,12 +1143,12 @@
   public BlocksWithLocations getBlocks(DatanodeID datanode, long size
       ) throws IOException {
     namesystem.checkOperation(OperationCategory.READ);
-    namesystem.readLock();
+    readLock();
     try {
       namesystem.checkOperation(OperationCategory.READ);
       return getBlocksWithLocations(datanode, size);  
     } finally {
-      namesystem.readUnlock();
+      readUnlock();
     }
   }
 
@@ -1173,7 +1211,7 @@
 
   /** Remove the blocks associated to the given DatanodeStorageInfo. */
   void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
-    assert namesystem.hasWriteLock();
+    assert hasWriteLock();
     final Iterator<BlockInfo> it = storageInfo.getBlockIterator();
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     while(it.hasNext()) {
@@ -1250,7 +1288,7 @@
    */
   public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
       final DatanodeInfo dn, String storageID, String reason) throws IOException {
-    assert namesystem.hasWriteLock();
+    assert hasWriteLock();
     final Block reportedBlock = blk.getLocalBlock();
     final BlockInfo storedBlock = getStoredBlock(reportedBlock);
     if (storedBlock == null) {
@@ -1428,13 +1466,13 @@
    */
   int computeBlockRecoveryWork(int blocksToProcess) {
     List<List<BlockInfo>> blocksToReplicate = null;
-    namesystem.writeLock();
+    writeLock();
     try {
       // Choose the blocks to be replicated
       blocksToReplicate = neededReplications
           .chooseUnderReplicatedBlocks(blocksToProcess);
     } finally {
-      namesystem.writeUnlock();
+      writeUnlock();
     }
     return computeRecoveryWorkForBlocks(blocksToReplicate);
   }
@@ -1452,7 +1490,7 @@
     List<BlockRecoveryWork> recovWork = new LinkedList<>();
 
     // Step 1: categorize at-risk blocks into replication and EC tasks
-    namesystem.writeLock();
+    writeLock();
     try {
       synchronized (neededReplications) {
         for (int priority = 0; priority < blocksToRecover.size(); priority++) {
@@ -1465,7 +1503,7 @@
         }
       }
     } finally {
-      namesystem.writeUnlock();
+      writeUnlock();
     }
 
     // Step 2: choose target nodes for each recovery task
@@ -1487,7 +1525,7 @@
     }
 
     // Step 3: add tasks to the DN
-    namesystem.writeLock();
+    writeLock();
     try {
       for(BlockRecoveryWork rw : recovWork){
         final DatanodeStorageInfo[] targets = rw.getTargets();
@@ -1503,7 +1541,7 @@
         }
       }
     } finally {
-      namesystem.writeUnlock();
+      writeUnlock();
     }
 
     if (blockLog.isInfoEnabled()) {
@@ -1882,7 +1920,7 @@
   private void processPendingReplications() {
     BlockInfo[] timedOutItems = pendingReplications.getTimedOutBlocks();
     if (timedOutItems != null) {
-      namesystem.writeLock();
+      writeLock();
       try {
         for (int i = 0; i < timedOutItems.length; i++) {
           /*
@@ -1900,7 +1938,7 @@
           }
         }
       } finally {
-        namesystem.writeUnlock();
+        writeUnlock();
       }
       /* If we know the target datanodes where the replication timedout,
        * we could invoke decBlocksScheduled() on it. Its ok for now.
@@ -1909,7 +1947,7 @@
   }
 
   public long requestBlockReportLeaseId(DatanodeRegistration nodeReg) {
-    assert namesystem.hasReadLock();
+    assert hasReadLock();
     DatanodeDescriptor node = null;
     try {
       node = datanodeManager.getDatanode(nodeReg);
@@ -1970,7 +2008,7 @@
       final DatanodeStorage storage,
       final BlockListAsLongs newReport, BlockReportContext context,
       boolean lastStorageInRpc) throws IOException {
-    namesystem.writeLock();
+    writeLock();
     final long startTime = Time.monotonicNow(); //after acquiring write lock
     final long endTime;
     DatanodeDescriptor node;
@@ -2046,7 +2084,7 @@
       }
     } finally {
       endTime = Time.monotonicNow();
-      namesystem.writeUnlock();
+      writeUnlock();
     }
 
     if (invalidatedBlocks != null) {
@@ -2073,7 +2111,7 @@
     LOG.warn("processReport 0x{}: removing zombie storage {}, which no " +
             "longer exists on the DataNode.",
         Long.toHexString(context.getReportId()), zombie.getStorageID());
-    assert(namesystem.hasWriteLock());
+    assert hasWriteLock();
     Iterator<BlockInfo> iter = zombie.getBlockIterator();
     int prevBlocks = zombie.numBlocks();
     while (iter.hasNext()) {
@@ -2107,7 +2145,7 @@
     long startTimeRescanPostponedMisReplicatedBlocks = Time.monotonicNow();
     long startPostponedMisReplicatedBlocksCount =
         getPostponedMisreplicatedBlocksCount();
-    namesystem.writeLock();
+    writeLock();
     try {
       // blocksPerRescan is the configured number of blocks per rescan.
       // Randomly select blocksPerRescan consecutive blocks from the HashSet
@@ -2160,7 +2198,7 @@
         }
       }
     } finally {
-      namesystem.writeUnlock();
+      writeUnlock();
       long endPostponedMisReplicatedBlocksCount =
           getPostponedMisreplicatedBlocksCount();
       LOG.info("Rescan of postponedMisreplicatedBlocks completed in " +
@@ -2222,7 +2260,7 @@
       BlockInfo block,
       long oldGenerationStamp, long oldNumBytes, 
       DatanodeStorageInfo[] newStorages) throws IOException {
-    assert namesystem.hasWriteLock();
+    assert hasWriteLock();
     BlockToMarkCorrupt b = null;
     if (block.getGenerationStamp() != oldGenerationStamp) {
       b = new BlockToMarkCorrupt(oldBlock, block, oldGenerationStamp,
@@ -2270,7 +2308,7 @@
       final DatanodeStorageInfo storageInfo,
       final BlockListAsLongs report) throws IOException {
     if (report == null) return;
-    assert (namesystem.hasWriteLock());
+    assert (hasWriteLock());
     assert (storageInfo.getBlockReportCount() == 0);
 
     for (BlockReportReplica iblk : report) {
@@ -2708,7 +2746,7 @@
   private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported,
       DatanodeStorageInfo storageInfo)
   throws IOException {
-    assert (storedBlock != null && namesystem.hasWriteLock());
+    assert (storedBlock != null && hasWriteLock());
     if (!namesystem.isInStartupSafeMode()
         || isPopulatingReplQueues()) {
       addStoredBlock(storedBlock, reported, storageInfo, null, false);
@@ -2743,7 +2781,7 @@
                                DatanodeDescriptor delNodeHint,
                                boolean logEveryBlock)
   throws IOException {
-    assert block != null && namesystem.hasWriteLock();
+    assert block != null && hasWriteLock();
     BlockInfo storedBlock;
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     if (!block.isComplete()) {
@@ -2905,7 +2943,7 @@
    * over or under replicated. Place it into the respective queue.
    */
   public void processMisReplicatedBlocks() {
-    assert namesystem.hasWriteLock();
+    assert hasWriteLock();
     stopReplicationInitializer();
     neededReplications.clear();
     replicationQueuesInitializer = new Daemon() {
@@ -2962,7 +3000,7 @@
 
     while (namesystem.isRunning() && !Thread.currentThread().isInterrupted()) {
       int processed = 0;
-      namesystem.writeLockInterruptibly();
+      writeLockInterruptibly();
       try {
         while (processed < numBlocksPerIteration && blocksItr.hasNext()) {
           BlockInfo block = blocksItr.next();
@@ -3016,7 +3054,7 @@
           break;
         }
       } finally {
-        namesystem.writeUnlock();
+        writeUnlock();
         // Make sure it is out of the write lock for sufficiently long time.
         Thread.sleep(sleepDuration);
       }
@@ -3114,7 +3152,7 @@
   private void processOverReplicatedBlock(final BlockInfo block,
       final short replication, final DatanodeDescriptor addedNode,
       DatanodeDescriptor delNodeHint) {
-    assert namesystem.hasWriteLock();
+    assert hasWriteLock();
     if (addedNode == delNodeHint) {
       delNodeHint = null;
     }
@@ -3152,7 +3190,7 @@
       BlockInfo storedBlock, short replication,
       DatanodeDescriptor addedNode,
       DatanodeDescriptor delNodeHint) {
-    assert namesystem.hasWriteLock();
+    assert hasWriteLock();
     // first form a rack to datanodes map and
     BlockCollection bc = getBlockCollection(storedBlock);
     if (storedBlock.isStriped()) {
@@ -3290,7 +3328,7 @@
   }
 
   private void addToExcessReplicate(DatanodeInfo dn, BlockInfo storedBlock) {
-    assert namesystem.hasWriteLock();
+    assert hasWriteLock();
     LightWeightHashSet<BlockInfo> excessBlocks = excessReplicateMap.get(
         dn.getDatanodeUuid());
     if (excessBlocks == null) {
@@ -3321,7 +3359,7 @@
    */
   public void removeStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
     blockLog.debug("BLOCK* removeStoredBlock: {} from {}", storedBlock, node);
-    assert (namesystem.hasWriteLock());
+    assert hasWriteLock();
     {
       if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) {
         blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
@@ -3498,7 +3536,7 @@
    */
   public void processIncrementalBlockReport(final DatanodeID nodeID,
       final StorageReceivedDeletedBlocks srdb) throws IOException {
-    assert namesystem.hasWriteLock();
+    assert hasWriteLock();
     int received = 0;
     int deleted = 0;
     int receiving = 0;
@@ -3706,7 +3744,7 @@
   }
 
   public void removeBlock(BlockInfo block) {
-    assert namesystem.hasWriteLock();
+    assert hasWriteLock();
     // No need to ACK blocks that are being removed entirely
     // from the namespace, since the removal of the associated
     // file already removes them from the block map below.
@@ -3740,7 +3778,7 @@
   /** updates a block in under replication queue */
   private void updateNeededReplications(final BlockInfo block,
       final int curReplicasDelta, int expectedReplicasDelta) {
-    namesystem.writeLock();
+    writeLock();
     try {
       if (!isPopulatingReplQueues()) {
         return;
@@ -3758,7 +3796,7 @@
             repl.decommissionedAndDecommissioning(), oldExpectedReplicas);
       }
     } finally {
-      namesystem.writeUnlock();
+      writeUnlock();
     }
   }
 
@@ -3820,7 +3858,7 @@
   private int invalidateWorkForOneNode(DatanodeInfo dn) {
     final List<Block> toInvalidate;
     
-    namesystem.writeLock();
+    writeLock();
     try {
       // blocks should not be replicated or removed if safe mode is on
       if (namesystem.isInSafeMode()) {
@@ -3844,7 +3882,7 @@
         return 0;
       }
     } finally {
-      namesystem.writeUnlock();
+      writeUnlock();
     }
     blockLog.debug("BLOCK* {}: ask {} to delete {}", getClass().getSimpleName(),
         dn, toInvalidate);
@@ -4047,12 +4085,12 @@
     int workFound = this.computeBlockRecoveryWork(blocksToProcess);
 
     // Update counters
-    namesystem.writeLock();
+    writeLock();
     try {
       this.updateState();
       this.scheduledReplicationBlocksCount = workFound;
     } finally {
-      namesystem.writeUnlock();
+      writeUnlock();
     }
     workFound += this.computeInvalidateWork(nodesToProcess);
     return workFound;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerLock.java
new file mode 100644
index 0000000..18dc201
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerLock.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+class BlockManagerLock implements ReadWriteLock {
+  private final ReentrantReadWriteLock coarseLock;
+
+  BlockManagerLock(Namesystem ns) {
+    this.coarseLock = ns.getLockImplementation();
+  }
+
+  @Override
+  public Lock readLock() {
+    return coarseLock.readLock();
+  }
+
+  @Override
+  public Lock writeLock() {
+    return coarseLock.writeLock();
+  }
+
+  boolean hasReadLock() {
+    return hasWriteLock() || coarseLock.getReadHoldCount() > 0;
+  }
+
+  boolean hasWriteLock() {
+    return coarseLock.isWriteLockedByCurrentThread();
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
index 2f81ddf..54bcffd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
@@ -55,7 +55,6 @@
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-;
 
 /**
  * Scans the namesystem, scheduling blocks to be cached as appropriate.
@@ -218,7 +217,7 @@
    * after are not atomic.
    */
   public void waitForRescanIfNeeded() {
-    Preconditions.checkArgument(!namesystem.hasWriteLock(),
+    Preconditions.checkArgument(!blockManager.hasWriteLock(),
         "Must not hold the FSN write lock when waiting for a rescan.");
     Preconditions.checkArgument(lock.isHeldByCurrentThread(),
         "Must hold the CRM lock when waiting for a rescan.");
@@ -263,7 +262,7 @@
    */
   @Override
   public void close() throws IOException {
-    Preconditions.checkArgument(namesystem.hasWriteLock());
+    Preconditions.checkArgument(blockManager.hasWriteLock());
     lock.lock();
     try {
       if (shutdown) return;
@@ -285,7 +284,7 @@
     scannedDirectives = 0;
     scannedBlocks = 0;
     try {
-      namesystem.writeLock();
+      blockManager.writeLock();
       try {
         lock.lock();
         if (shutdown) {
@@ -302,7 +301,7 @@
       rescanCachedBlockMap();
       blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime();
     } finally {
-      namesystem.writeUnlock();
+      blockManager.writeUnlock();
     }
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index b32092d..3357fab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -540,7 +540,7 @@
    * @param nodeInfo datanode descriptor.
    */
   private void removeDatanode(DatanodeDescriptor nodeInfo) {
-    assert namesystem.hasWriteLock();
+    assert blockManager.hasWriteLock();
     heartbeatManager.removeDatanode(nodeInfo);
     blockManager.removeBlocksAssociatedTo(nodeInfo);
     networktopology.remove(nodeInfo);
@@ -559,7 +559,7 @@
    */
   public void removeDatanode(final DatanodeID node
       ) throws UnregisteredNodeException {
-    namesystem.writeLock();
+    blockManager.writeLock();
     try {
       final DatanodeDescriptor descriptor = getDatanode(node);
       if (descriptor != null) {
@@ -569,7 +569,7 @@
                                      + node + " does not exist");
       }
     } finally {
-      namesystem.writeUnlock();
+      blockManager.writeUnlock();
     }
   }
 
@@ -993,12 +993,12 @@
    */
   public void refreshNodes(final Configuration conf) throws IOException {
     refreshHostsReader(conf);
-    namesystem.writeLock();
+    blockManager.writeLock();
     try {
       refreshDatanodes();
       countSoftwareVersions();
     } finally {
-      namesystem.writeUnlock();
+      blockManager.writeUnlock();
     }
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
index 42810350..5ce5a83 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
@@ -366,12 +366,12 @@
       numBlocksChecked = 0;
       numNodesChecked = 0;
       // Check decom progress
-      namesystem.writeLock();
+      blockManager.writeLock();
       try {
         processPendingNodes();
         check();
       } finally {
-        namesystem.writeUnlock();
+        blockManager.writeUnlock();
       }
       if (numBlocksChecked + numNodesChecked > 0) {
         LOG.info("Checked {} blocks and {} nodes this tick", numBlocksChecked,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
index d0369aa..24ad6dc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
@@ -351,20 +351,20 @@
       }
       if (dead != null) {
         // acquire the fsnamesystem lock, and then remove the dead node.
-        namesystem.writeLock();
+        blockManager.writeLock();
         try {
           dm.removeDeadDatanode(dead);
         } finally {
-          namesystem.writeUnlock();
+          blockManager.writeUnlock();
         }
       }
       if (failedStorage != null) {
         // acquire the fsnamesystem lock, and remove blocks on the storage.
-        namesystem.writeLock();
+        blockManager.writeLock();
         try {
           blockManager.removeBlocksAssociatedTo(failedStorage);
         } finally {
-          namesystem.writeUnlock();
+          blockManager.writeUnlock();
         }
       }
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
index 4fd9ca8..3e2443b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
@@ -928,7 +928,7 @@
 
   public final void processCacheReport(final DatanodeID datanodeID,
       final List<Long> blockIds) throws IOException {
-    namesystem.writeLock();
+    blockManager.writeLock();
     final long startTime = Time.monotonicNow();
     final long endTime;
     try {
@@ -942,7 +942,7 @@
       processCacheReportImpl(datanode, blockIds);
     } finally {
       endTime = Time.monotonicNow();
-      namesystem.writeUnlock();
+      blockManager.writeUnlock();
     }
 
     // Log the block report processing stats from Namenode perspective
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 65b40c8..dcedcc4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -6293,6 +6293,11 @@
     return haContext;
   }
 
+  @Override
+  public ReentrantReadWriteLock getLockImplementation() {
+    return fsLock.coarseLock;
+  }
+
   @Override  // NameNodeMXBean
   public String getCorruptFiles() {
     List<String> list = new ArrayList<String>();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
index b1012c2..89fe678 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
@@ -30,6 +30,8 @@
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.security.AccessControlException;
 
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 /** Namesystem operations. */
 @InterfaceAudience.Private
 public interface Namesystem extends RwLock, SafeMode {
@@ -67,4 +69,6 @@
   CacheManager getCacheManager();
 
   HAContext getHAContext();
+
+  ReentrantReadWriteLock getLockImplementation();
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index 64d80bd..1ae6c2c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -172,10 +172,10 @@
    * @param dnName the name of the DataNode
    */
   public static void noticeDeadDatanode(NameNode nn, String dnName) {
-    FSNamesystem namesystem = nn.getNamesystem();
-    namesystem.writeLock();
+    final BlockManager bm = nn.getNamesystem().getBlockManager();
+    bm.writeLock();
     try {
-      DatanodeManager dnm = namesystem.getBlockManager().getDatanodeManager();
+      DatanodeManager dnm = bm.getDatanodeManager();
       HeartbeatManager hbm = dnm.getHeartbeatManager();
       DatanodeDescriptor[] dnds = hbm.getDatanodes();
       DatanodeDescriptor theDND = null;
@@ -191,7 +191,7 @@
         hbm.heartbeatCheck();
       }
     } finally {
-      namesystem.writeUnlock();
+      bm.writeUnlock();
     }
   }
   
@@ -220,18 +220,17 @@
    * Call heartbeat check function of HeartbeatManager and get
    * under replicated blocks count within write lock to make sure
    * computeDatanodeWork doesn't interfere.
-   * @param namesystem the FSNamesystem
    * @param bm the BlockManager to manipulate
    * @return the number of under replicated blocks
    */
   public static int checkHeartbeatAndGetUnderReplicatedBlocksCount(
-      FSNamesystem namesystem, BlockManager bm) {
-    namesystem.writeLock();
+      BlockManager bm) {
+    bm.writeLock();
     try {
       bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
       return bm.getUnderReplicatedNotMissingBlocks();
     } finally {
-      namesystem.writeUnlock();
+      bm.writeUnlock();
     }
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 16d482e..76b4f14 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -23,6 +23,7 @@
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
@@ -34,6 +35,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.concurrent.locks.Lock;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -63,12 +65,12 @@
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.Lists;
+import org.mockito.internal.util.reflection.Whitebox;
 
 public class TestBlockManager {
   private DatanodeStorageInfo[] storages;
@@ -97,11 +99,16 @@
     Configuration conf = new HdfsConfiguration();
     conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
              "need to set a dummy value here so it assumes a multi-rack cluster");
-    fsn = Mockito.mock(FSNamesystem.class);
-    Mockito.doReturn(true).when(fsn).hasWriteLock();
-    Mockito.doReturn(true).when(fsn).hasReadLock();
-    Mockito.doReturn(true).when(fsn).isRunning();
+    fsn = mock(FSNamesystem.class);
+    doReturn(true).when(fsn).isRunning();
+    Lock lockImpl = mock(Lock.class);
+    BlockManagerLock lock = mock(BlockManagerLock.class);
     bm = new BlockManager(fsn, conf);
+    Whitebox.setInternalState(bm, "lock", lock);
+    doReturn(true).when(lock).hasWriteLock();
+    doReturn(true).when(lock).hasReadLock();
+    doReturn(lockImpl).when(lock).readLock();
+    doReturn(lockImpl).when(lock).writeLock();
     final String[] racks = {
         "/rackA",
         "/rackA",
@@ -438,9 +445,9 @@
   
   private BlockInfo addBlockOnNodes(long blockId, List<DatanodeDescriptor> nodes) {
     long inodeId = ++mockINodeId;
-    BlockCollection bc = Mockito.mock(BlockCollection.class);
-    Mockito.doReturn(inodeId).when(bc).getId();
-    Mockito.doReturn(bc).when(fsn).getBlockCollection(inodeId);
+    BlockCollection bc = mock(BlockCollection.class);
+    doReturn(inodeId).when(bc).getId();
+    doReturn(bc).when(fsn).getBlockCollection(inodeId);
     BlockInfo blockInfo = blockOnNodes(blockId, nodes);
 
     blockInfo.setReplication((short) 3);
@@ -749,7 +756,7 @@
     Block block = new Block(blkId);
     BlockInfo blockInfo =
         new BlockInfoContiguous(block, (short) 3);
-    BlockCollection bc = Mockito.mock(BlockCollection.class);
+    BlockCollection bc = mock(BlockCollection.class);
     long inodeId = ++mockINodeId;
     doReturn(inodeId).when(bc).getId();
     bm.blocksMap.addBlockCollection(blockInfo, bc);
@@ -761,7 +768,7 @@
     Block block = new Block(blkId);
     BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 3);
     blockInfo.convertToBlockUnderConstruction(UNDER_CONSTRUCTION, null);
-    BlockCollection bc = Mockito.mock(BlockCollection.class);
+    BlockCollection bc = mock(BlockCollection.class);
     long inodeId = ++mockINodeId;
     doReturn(inodeId).when(bc).getId();
     bm.blocksMap.addBlockCollection(blockInfo, bc);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
index b55a716..8bad600 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
@@ -55,6 +55,7 @@
 import org.apache.hadoop.util.Shell;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.internal.util.reflection.Whitebox;
 import static org.hamcrest.core.Is.is;
@@ -70,6 +71,7 @@
   private static DatanodeManager mockDatanodeManager(
       FSNamesystem fsn, Configuration conf) throws IOException {
     BlockManager bm = Mockito.mock(BlockManager.class);
+    Mockito.doReturn(true).when(bm).hasWriteLock();
     BlockReportLeaseManager blm = new BlockReportLeaseManager(conf);
     Mockito.when(bm.getBlockReportLeaseManager()).thenReturn(blm);
     DatanodeManager dm = new DatanodeManager(bm, fsn, conf);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
index 37fcf34..8e9ce12 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
@@ -36,6 +36,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.Lock;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
@@ -66,6 +67,7 @@
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.mockito.internal.util.reflection.Whitebox;
 
 @RunWith(Parameterized.class)
 public class TestReplicationPolicy extends BaseReplicationPolicyTest {
@@ -1221,9 +1223,11 @@
   public void testAddStoredBlockDoesNotCauseSkippedReplication()
       throws IOException {
     Namesystem mockNS = mock(Namesystem.class);
-    when(mockNS.hasWriteLock()).thenReturn(true);
-    when(mockNS.hasReadLock()).thenReturn(true);
     BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
+    BlockManagerLock lock = mock(BlockManagerLock.class);
+    when(lock.hasWriteLock()).thenReturn(true);
+    when(lock.hasReadLock()).thenReturn(true);
+    Whitebox.setInternalState(bm, "lock", lock);
     UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
 
     BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
@@ -1270,9 +1274,12 @@
       testConvertLastBlockToUnderConstructionDoesNotCauseSkippedReplication()
           throws IOException {
     Namesystem mockNS = mock(Namesystem.class);
-    when(mockNS.hasReadLock()).thenReturn(true);
-
     BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
+    BlockManagerLock lock = mock(BlockManagerLock.class);
+    Lock impl = mock(Lock.class);
+    when(lock.hasReadLock()).thenReturn(true);
+    when(lock.writeLock()).thenReturn(impl);
+    Whitebox.setInternalState(bm, "lock", lock);
     UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
 
     long blkID1 = ThreadLocalRandom.current().nextLong();
@@ -1342,9 +1349,12 @@
   public void testupdateNeededReplicationsDoesNotCauseSkippedReplication()
       throws IOException {
     Namesystem mockNS = mock(Namesystem.class);
-    when(mockNS.hasReadLock()).thenReturn(true);
-
     BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
+    BlockManagerLock lock = mock(BlockManagerLock.class);
+    Lock impl = mock(Lock.class);
+    when(lock.hasReadLock()).thenReturn(true);
+    when(lock.writeLock()).thenReturn(impl);
+    Whitebox.setInternalState(bm, "lock", lock);
     UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
 
     BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
index 2c4fcc5..98519c3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
@@ -393,8 +393,7 @@
 
     // underReplicatedBlocks are due to failed volumes
     int underReplicatedBlocks =
-        BlockManagerTestUtil.checkHeartbeatAndGetUnderReplicatedBlocksCount(
-            cluster.getNamesystem(), bm);
+        BlockManagerTestUtil.checkHeartbeatAndGetUnderReplicatedBlocksCount(bm);
     assertTrue("There is no under replicated block after volume failure",
         underReplicatedBlocks > 0);
   }