HDFS-988. saveNamespace race can corrupt the edits log. Contributed by Eli Collins


git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@1134951 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 2979bea..b462dc4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1205,6 +1205,8 @@
     HDFS-2039. TestNameNodeMetrics uses a bad test root path, preventing it
     from running inside Eclipse. (todd)
 
+    HDFS-988. saveNamespace race can corrupt the edits log. (eli)
+
 Release 0.21.1 - Unreleased
     HDFS-1466. TestFcHdfsSymlink relies on /tmp/test not existing. (eli)
 
diff --git a/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 860cd9f..b6e5cc2 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -1410,9 +1410,8 @@
   }
   
   /**
-   * flushes out to all replicas of the block. 
-   * The data is in the buffers of the DNs 
-   * but not neccessary on the DN's OS buffers. 
+   * Flushes out to all replicas of the block. The data is in the buffers
+   * of the DNs but not necessarily in the DN's OS buffers.
    *
    * It is a synchronous operation. When it returns,
    * it guarantees that flushed data become visible to new readers. 
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java b/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
index 57d061c..bc9babb 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
@@ -570,6 +570,7 @@
    * dumps the contents of recentInvalidateSets
    */
   private void dumpRecentInvalidateSets(PrintWriter out) {
+    assert namesystem.hasWriteLock();
     int size = recentInvalidateSets.values().size();
     out.println("Metasave: Blocks " + pendingDeletionBlocksCount 
         + " waiting deletion from " + size + " datanodes.");
@@ -1392,7 +1393,7 @@
                                DatanodeDescriptor delNodeHint,
                                boolean logEveryBlock)
   throws IOException {
-    assert (block != null && namesystem.hasWriteLock());
+    assert block != null && namesystem.hasWriteLock();
     BlockInfo storedBlock;
     if (block instanceof BlockInfoUnderConstruction) {
       //refresh our copy in case the block got completed in another thread
@@ -1571,6 +1572,7 @@
    */
   void processOverReplicatedBlock(Block block, short replication,
       DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) {
+    assert namesystem.hasWriteLock();
     if (addedNode == delNodeHint) {
       delNodeHint = null;
     }
@@ -1596,6 +1598,7 @@
   }
 
   void addToExcessReplicate(DatanodeInfo dn, Block block) {
+    assert namesystem.hasWriteLock();
     Collection<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
     if (excessBlocks == null) {
       excessBlocks = new TreeSet<Block>();
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 38b285a..08b10cb 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -53,7 +53,6 @@
 import org.apache.hadoop.hdfs.util.ByteArray;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.security.UserGroupInformation;
 
 /*************************************************
  * FSDirectory stores the filesystem directory state.
@@ -74,29 +73,33 @@
   private final int maxDirItems;
   private final int lsLimit;  // max list limit
 
-  // lock to protect BlockMap.
-  private ReentrantReadWriteLock bLock;
+  // lock to protect the directory and BlockMap
+  private ReentrantReadWriteLock dirLock;
   private Condition cond;
 
   // utility methods to acquire and release read lock and write lock
   void readLock() {
-    this.bLock.readLock().lock();
+    this.dirLock.readLock().lock();
   }
 
   void readUnlock() {
-    this.bLock.readLock().unlock();
+    this.dirLock.readLock().unlock();
   }
 
   void writeLock() {
-    this.bLock.writeLock().lock();
+    this.dirLock.writeLock().lock();
   }
 
   void writeUnlock() {
-    this.bLock.writeLock().unlock();
+    this.dirLock.writeLock().unlock();
   }
 
   boolean hasWriteLock() {
-    return this.bLock.isWriteLockedByCurrentThread();
+    return this.dirLock.isWriteLockedByCurrentThread();
+  }
+
+  boolean hasReadLock() {
+    return this.dirLock.getReadHoldCount() > 0;
   }
 
   /**
@@ -111,8 +114,8 @@
   }
 
   FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) {
-    this.bLock = new ReentrantReadWriteLock(true); // fair
-    this.cond = bLock.writeLock().newCondition();
+    this.dirLock = new ReentrantReadWriteLock(true); // fair
+    this.cond = dirLock.writeLock().newCondition();
     fsImage.setFSNamesystem(ns);
     rootDir = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME,
         ns.createFsOwnerPermissions(new FsPermission((short)0755)),
@@ -275,6 +278,7 @@
       throws UnresolvedLinkException {
     INode newNode;
     long diskspace = UNKNOWN_DISK_SPACE;
+    assert hasWriteLock();
     if (blocks == null)
       newNode = new INodeDirectory(permissions, modificationTime);
     else {
@@ -463,8 +467,13 @@
     }
     waitForReady();
     long now = now();
-    if (!unprotectedRenameTo(src, dst, now))
-      return false;
+    writeLock();
+    try {
+      if (!unprotectedRenameTo(src, dst, now))
+        return false;
+    } finally {
+      writeUnlock();
+    }
     fsImage.getEditLog().logRename(src, dst, now);
     return true;
   }
@@ -482,8 +491,13 @@
     }
     waitForReady();
     long now = now();
-    if (unprotectedRenameTo(src, dst, now, options)) {
-      incrDeletedFileCount(1);
+    writeLock();
+    try {
+      if (unprotectedRenameTo(src, dst, now, options)) {
+        incrDeletedFileCount(1);
+      }
+    } finally {
+      writeUnlock();
     }
     fsImage.getEditLog().logRename(src, dst, now, options);
   }
@@ -502,108 +516,104 @@
   boolean unprotectedRenameTo(String src, String dst, long timestamp)
     throws QuotaExceededException, UnresolvedLinkException, 
     FileAlreadyExistsException {
-    writeLock();
+    assert hasWriteLock();
+    INode[] srcInodes = rootDir.getExistingPathINodes(src, false);
+    INode srcInode = srcInodes[srcInodes.length-1];
+    
+    // check the validation of the source
+    if (srcInode == null) {
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + "failed to rename " + src + " to " + dst
+          + " because source does not exist");
+      return false;
+    } 
+    if (srcInodes.length == 1) {
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          +"failed to rename "+src+" to "+dst+ " because source is the root");
+      return false;
+    }
+    if (isDir(dst)) {
+      dst += Path.SEPARATOR + new Path(src).getName();
+    }
+    
+    // check the validity of the destination
+    if (dst.equals(src)) {
+      return true;
+    }
+    if (srcInode.isLink() && 
+        dst.equals(((INodeSymlink)srcInode).getLinkValue())) {
+      throw new FileAlreadyExistsException(
+          "Cannot rename symlink "+src+" to its target "+dst);
+    }
+    
+    // dst cannot be directory or a file under src
+    if (dst.startsWith(src) && 
+        dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + "failed to rename " + src + " to " + dst
+          + " because destination starts with src");
+      return false;
+    }
+    
+    byte[][] dstComponents = INode.getPathComponents(dst);
+    INode[] dstInodes = new INode[dstComponents.length];
+    rootDir.getExistingPathINodes(dstComponents, dstInodes, false);
+    if (dstInodes[dstInodes.length-1] != null) {
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+                                   +"failed to rename "+src+" to "+dst+ 
+                                   " because destination exists");
+      return false;
+    }
+    if (dstInodes[dstInodes.length-2] == null) {
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          +"failed to rename "+src+" to "+dst+ 
+          " because destination's parent does not exist");
+      return false;
+    }
+    
+    // Ensure dst has quota to accommodate rename
+    verifyQuotaForRename(srcInodes,dstInodes);
+    
+    INode dstChild = null;
+    INode srcChild = null;
+    String srcChildName = null;
     try {
-      INode[] srcInodes = rootDir.getExistingPathINodes(src, false);
-      INode srcInode = srcInodes[srcInodes.length-1];
-      
-      // check the validation of the source
-      if (srcInode == null) {
+      // remove src
+      srcChild = removeChild(srcInodes, srcInodes.length-1);
+      if (srcChild == null) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
             + "failed to rename " + src + " to " + dst
-            + " because source does not exist");
-        return false;
-      } 
-      if (srcInodes.length == 1) {
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            +"failed to rename "+src+" to "+dst+ " because source is the root");
+            + " because the source can not be removed");
         return false;
       }
-      if (isDir(dst)) {
-        dst += Path.SEPARATOR + new Path(src).getName();
-      }
+      srcChildName = srcChild.getLocalName();
+      srcChild.setLocalName(dstComponents[dstInodes.length-1]);
       
-      // check the validity of the destination
-      if (dst.equals(src)) {
+      // add src to the destination
+      dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
+          srcChild, UNKNOWN_DISK_SPACE, false);
+      if (dstChild != null) {
+        srcChild = null;
+        if (NameNode.stateChangeLog.isDebugEnabled()) {
+          NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: " 
+              + src + " is renamed to " + dst);
+        }
+        // update modification time of dst and the parent of src
+        srcInodes[srcInodes.length-2].setModificationTime(timestamp);
+        dstInodes[dstInodes.length-2].setModificationTime(timestamp);
         return true;
       }
-      if (srcInode.isLink() && 
-          dst.equals(((INodeSymlink)srcInode).getLinkValue())) {
-        throw new FileAlreadyExistsException(
-            "Cannot rename symlink "+src+" to its target "+dst);
-      }
-      
-      // dst cannot be directory or a file under src
-      if (dst.startsWith(src) && 
-          dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + "failed to rename " + src + " to " + dst
-            + " because destination starts with src");
-        return false;
-      }
-      
-      byte[][] dstComponents = INode.getPathComponents(dst);
-      INode[] dstInodes = new INode[dstComponents.length];
-      rootDir.getExistingPathINodes(dstComponents, dstInodes, false);
-      if (dstInodes[dstInodes.length-1] != null) {
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-                                     +"failed to rename "+src+" to "+dst+ 
-                                     " because destination exists");
-        return false;
-      }
-      if (dstInodes[dstInodes.length-2] == null) {
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            +"failed to rename "+src+" to "+dst+ 
-            " because destination's parent does not exist");
-        return false;
-      }
-      
-      // Ensure dst has quota to accommodate rename
-      verifyQuotaForRename(srcInodes,dstInodes);
-      
-      INode dstChild = null;
-      INode srcChild = null;
-      String srcChildName = null;
-      try {
-        // remove src
-        srcChild = removeChild(srcInodes, srcInodes.length-1);
-        if (srcChild == null) {
-          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-              + "failed to rename " + src + " to " + dst
-              + " because the source can not be removed");
-          return false;
-        }
-        srcChildName = srcChild.getLocalName();
-        srcChild.setLocalName(dstComponents[dstInodes.length-1]);
-        
-        // add src to the destination
-        dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
-            srcChild, UNKNOWN_DISK_SPACE, false);
-        if (dstChild != null) {
-          srcChild = null;
-          if (NameNode.stateChangeLog.isDebugEnabled()) {
-            NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: " 
-                + src + " is renamed to " + dst);
-          }
-          // update modification time of dst and the parent of src
-          srcInodes[srcInodes.length-2].setModificationTime(timestamp);
-          dstInodes[dstInodes.length-2].setModificationTime(timestamp);
-          return true;
-        }
-      } finally {
-        if (dstChild == null && srcChild != null) {
-          // put it back
-          srcChild.setLocalName(srcChildName);
-          addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, srcChild, 
-              UNKNOWN_DISK_SPACE, false);
-        }
-      }
-      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          +"failed to rename "+src+" to "+dst);
-      return false;
     } finally {
-      writeUnlock();
+      if (dstChild == null && srcChild != null) {
+        // put it back
+        srcChild.setLocalName(srcChildName);
+        addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, srcChild, 
+            UNKNOWN_DISK_SPACE, false);
+      }
     }
+    NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+        +"failed to rename "+src+" to "+dst);
+    return false;
   }
 
   /**
@@ -620,6 +630,7 @@
       Options.Rename... options) throws FileAlreadyExistsException,
       FileNotFoundException, ParentNotDirectoryException,
       QuotaExceededException, UnresolvedLinkException, IOException {
+    assert hasWriteLock();
     boolean overwrite = false;
     if (null != options) {
       for (Rename option : options) {
@@ -629,157 +640,152 @@
       }
     }
     String error = null;
-    writeLock();
-    try {
-      final INode[] srcInodes = rootDir.getExistingPathINodes(src, false);
-      final INode srcInode = srcInodes[srcInodes.length - 1];
-      // validate source
-      if (srcInode == null) {
-        error = "rename source " + src + " is not found.";
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + error);
-        throw new FileNotFoundException(error);
-      }
-      if (srcInodes.length == 1) {
-        error = "rename source cannot be the root";
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + error);
-        throw new IOException(error);
-      }
-
-      // validate the destination
-      if (dst.equals(src)) {
-        throw new FileAlreadyExistsException(
-            "The source "+src+" and destination "+dst+" are the same");
-      }
-      if (srcInode.isLink() && 
-          dst.equals(((INodeSymlink)srcInode).getLinkValue())) {
-        throw new FileAlreadyExistsException(
-            "Cannot rename symlink "+src+" to its target "+dst);
-      }
-      // dst cannot be a directory or a file under src
-      if (dst.startsWith(src) && 
-          dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
-        error = "Rename destination " + dst
-            + " is a directory or file under source " + src;
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + error);
-        throw new IOException(error);
-      }
-      final byte[][] dstComponents = INode.getPathComponents(dst);
-      final INode[] dstInodes = new INode[dstComponents.length];
-      rootDir.getExistingPathINodes(dstComponents, dstInodes, false);
-      INode dstInode = dstInodes[dstInodes.length - 1];
-      if (dstInodes.length == 1) {
-        error = "rename destination cannot be the root";
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + error);
-        throw new IOException(error);
-      }
-      if (dstInode != null) { // Destination exists
-        // It's OK to rename a file to a symlink and vice versa
-        if (dstInode.isDirectory() != srcInode.isDirectory()) {
-          error = "Source " + src + " and destination " + dst
-              + " must both be directories";
-          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-              + error);
-          throw new IOException(error);
-        }
-        if (!overwrite) { // If destination exists, overwrite flag must be true
-          error = "rename destination " + dst + " already exists";
-          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-              + error);
-          throw new FileAlreadyExistsException(error);
-        }
-        List<INode> children = dstInode.isDirectory() ? 
-            ((INodeDirectory) dstInode).getChildrenRaw() : null;
-        if (children != null && children.size() != 0) {
-          error = "rename cannot overwrite non empty destination directory "
-              + dst;
-          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-              + error);
-          throw new IOException(error);
-        }
-      }
-      if (dstInodes[dstInodes.length - 2] == null) {
-        error = "rename destination parent " + dst + " not found.";
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + error);
-        throw new FileNotFoundException(error);
-      }
-      if (!dstInodes[dstInodes.length - 2].isDirectory()) {
-        error = "rename destination parent " + dst + " is a file.";
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + error);
-        throw new ParentNotDirectoryException(error);
-      }
-
-      // Ensure dst has quota to accommodate rename
-      verifyQuotaForRename(srcInodes, dstInodes);
-      INode removedSrc = removeChild(srcInodes, srcInodes.length - 1);
-      if (removedSrc == null) {
-        error = "Failed to rename " + src + " to " + dst
-            + " because the source can not be removed";
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-            + error);
-        throw new IOException(error);
-      }
-      final String srcChildName = removedSrc.getLocalName();
-      String dstChildName = null;
-      INode removedDst = null;
-      try {
-        if (dstInode != null) { // dst exists remove it
-          removedDst = removeChild(dstInodes, dstInodes.length - 1);
-          dstChildName = removedDst.getLocalName();
-        }
-
-        INode dstChild = null;
-        removedSrc.setLocalName(dstComponents[dstInodes.length - 1]);
-        // add src as dst to complete rename
-        dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
-            removedSrc, UNKNOWN_DISK_SPACE, false);
-
-        int filesDeleted = 0;
-        if (dstChild != null) {
-          removedSrc = null;
-          if (NameNode.stateChangeLog.isDebugEnabled()) {
-            NameNode.stateChangeLog.debug(
-                "DIR* FSDirectory.unprotectedRenameTo: " + src
-                + " is renamed to " + dst);
-          }
-          srcInodes[srcInodes.length - 2].setModificationTime(timestamp);
-          dstInodes[dstInodes.length - 2].setModificationTime(timestamp);
-
-          // Collect the blocks and remove the lease for previous dst
-          if (removedDst != null) {
-            INode rmdst = removedDst;
-            removedDst = null;
-            List<Block> collectedBlocks = new ArrayList<Block>();
-            filesDeleted = rmdst.collectSubtreeBlocksAndClear(collectedBlocks);
-            getFSNamesystem().removePathAndBlocks(src, collectedBlocks);
-          }
-          return filesDeleted >0;
-        }
-      } finally {
-        if (removedSrc != null) {
-          // Rename failed - restore src
-          removedSrc.setLocalName(srcChildName);
-          addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, removedSrc, 
-              UNKNOWN_DISK_SPACE, false);
-        }
-        if (removedDst != null) {
-          // Rename failed - restore dst
-          removedDst.setLocalName(dstChildName);
-          addChildNoQuotaCheck(dstInodes, dstInodes.length - 1, removedDst, 
-              UNKNOWN_DISK_SPACE, false);
-        }
-      }
+    final INode[] srcInodes = rootDir.getExistingPathINodes(src, false);
+    final INode srcInode = srcInodes[srcInodes.length - 1];
+    // validate source
+    if (srcInode == null) {
+      error = "rename source " + src + " is not found.";
       NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-          + "failed to rename " + src + " to " + dst);
-      throw new IOException("rename from " + src + " to " + dst + " failed.");
-    } finally {
-      writeUnlock();
+          + error);
+      throw new FileNotFoundException(error);
     }
+    if (srcInodes.length == 1) {
+      error = "rename source cannot be the root";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new IOException(error);
+    }
+
+    // validate the destination
+    if (dst.equals(src)) {
+      throw new FileAlreadyExistsException(
+          "The source "+src+" and destination "+dst+" are the same");
+    }
+    if (srcInode.isLink() && 
+        dst.equals(((INodeSymlink)srcInode).getLinkValue())) {
+      throw new FileAlreadyExistsException(
+          "Cannot rename symlink "+src+" to its target "+dst);
+    }
+    // dst cannot be a directory or a file under src
+    if (dst.startsWith(src) && 
+        dst.charAt(src.length()) == Path.SEPARATOR_CHAR) {
+      error = "Rename destination " + dst
+          + " is a directory or file under source " + src;
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new IOException(error);
+    }
+    final byte[][] dstComponents = INode.getPathComponents(dst);
+    final INode[] dstInodes = new INode[dstComponents.length];
+    rootDir.getExistingPathINodes(dstComponents, dstInodes, false);
+    INode dstInode = dstInodes[dstInodes.length - 1];
+    if (dstInodes.length == 1) {
+      error = "rename destination cannot be the root";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new IOException(error);
+    }
+    if (dstInode != null) { // Destination exists
+      // It's OK to rename a file to a symlink and vice versa
+      if (dstInode.isDirectory() != srcInode.isDirectory()) {
+        error = "Source " + src + " and destination " + dst
+            + " must both be directories";
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+            + error);
+        throw new IOException(error);
+      }
+      if (!overwrite) { // If destination exists, overwrite flag must be true
+        error = "rename destination " + dst + " already exists";
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+            + error);
+        throw new FileAlreadyExistsException(error);
+      }
+      List<INode> children = dstInode.isDirectory() ? 
+          ((INodeDirectory) dstInode).getChildrenRaw() : null;
+      if (children != null && children.size() != 0) {
+        error = "rename cannot overwrite non empty destination directory "
+            + dst;
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+            + error);
+        throw new IOException(error);
+      }
+    }
+    if (dstInodes[dstInodes.length - 2] == null) {
+      error = "rename destination parent " + dst + " not found.";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new FileNotFoundException(error);
+    }
+    if (!dstInodes[dstInodes.length - 2].isDirectory()) {
+      error = "rename destination parent " + dst + " is a file.";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new ParentNotDirectoryException(error);
+    }
+
+    // Ensure dst has quota to accommodate rename
+    verifyQuotaForRename(srcInodes, dstInodes);
+    INode removedSrc = removeChild(srcInodes, srcInodes.length - 1);
+    if (removedSrc == null) {
+      error = "Failed to rename " + src + " to " + dst
+          + " because the source can not be removed";
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+          + error);
+      throw new IOException(error);
+    }
+    final String srcChildName = removedSrc.getLocalName();
+    String dstChildName = null;
+    INode removedDst = null;
+    try {
+      if (dstInode != null) { // dst exists remove it
+        removedDst = removeChild(dstInodes, dstInodes.length - 1);
+        dstChildName = removedDst.getLocalName();
+      }
+
+      INode dstChild = null;
+      removedSrc.setLocalName(dstComponents[dstInodes.length - 1]);
+      // add src as dst to complete rename
+      dstChild = addChildNoQuotaCheck(dstInodes, dstInodes.length - 1,
+          removedSrc, UNKNOWN_DISK_SPACE, false);
+
+      int filesDeleted = 0;
+      if (dstChild != null) {
+        removedSrc = null;
+        if (NameNode.stateChangeLog.isDebugEnabled()) {
+          NameNode.stateChangeLog.debug(
+              "DIR* FSDirectory.unprotectedRenameTo: " + src
+              + " is renamed to " + dst);
+        }
+        srcInodes[srcInodes.length - 2].setModificationTime(timestamp);
+        dstInodes[dstInodes.length - 2].setModificationTime(timestamp);
+
+        // Collect the blocks and remove the lease for previous dst
+        if (removedDst != null) {
+          INode rmdst = removedDst;
+          removedDst = null;
+          List<Block> collectedBlocks = new ArrayList<Block>();
+          filesDeleted = rmdst.collectSubtreeBlocksAndClear(collectedBlocks);
+          getFSNamesystem().removePathAndBlocks(src, collectedBlocks);
+        }
+        return filesDeleted >0;
+      }
+    } finally {
+      if (removedSrc != null) {
+        // Rename failed - restore src
+        removedSrc.setLocalName(srcChildName);
+        addChildNoQuotaCheck(srcInodes, srcInodes.length - 1, removedSrc, 
+            UNKNOWN_DISK_SPACE, false);
+      }
+      if (removedDst != null) {
+        // Rename failed - restore dst
+        removedDst.setLocalName(dstChildName);
+        addChildNoQuotaCheck(dstInodes, dstInodes.length - 1, removedDst, 
+            UNKNOWN_DISK_SPACE, false);
+      }
+    }
+    NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+        + "failed to rename " + src + " to " + dst);
+    throw new IOException("rename from " + src + " to " + dst + " failed.");
   }
 
   /**
@@ -794,10 +800,16 @@
   Block[] setReplication(String src, short replication, int[] oldReplication)
       throws QuotaExceededException, UnresolvedLinkException {
     waitForReady();
-    Block[] fileBlocks = unprotectedSetReplication(src, replication, oldReplication);
-    if (fileBlocks != null)  // log replication change
-      fsImage.getEditLog().logSetReplication(src, replication);
-    return fileBlocks;
+    Block[] fileBlocks = null;
+    writeLock();
+    try {
+      fileBlocks = unprotectedSetReplication(src, replication, oldReplication);
+      if (fileBlocks != null)  // log replication change
+        fsImage.getEditLog().logSetReplication(src, replication);
+      return fileBlocks;
+    } finally {
+      writeUnlock();
+    }
   }
 
   Block[] unprotectedSetReplication(String src, 
@@ -805,37 +817,31 @@
                                     int[] oldReplication
                                     ) throws QuotaExceededException, 
                                     UnresolvedLinkException {
+    assert hasWriteLock();
     if (oldReplication == null) {
       oldReplication = new int[1];
     }
     oldReplication[0] = -1;
-    Block[] fileBlocks = null;
 
-    writeLock();
-    try {
-      INode[] inodes = rootDir.getExistingPathINodes(src, true);
-      INode inode = inodes[inodes.length - 1];
-      if (inode == null) {
-        return null;
-      }
-      assert !inode.isLink();
-      if (inode.isDirectory()) {
-        return null;
-      }
-      INodeFile fileNode = (INodeFile)inode;
-      oldReplication[0] = fileNode.getReplication();
-
-      // check disk quota
-      long dsDelta = (replication - oldReplication[0]) *
-           (fileNode.diskspaceConsumed()/oldReplication[0]);
-      updateCount(inodes, inodes.length-1, 0, dsDelta, true);
-
-      fileNode.setReplication(replication);
-      fileBlocks = fileNode.getBlocks();
-    } finally {
-      writeUnlock();
+    INode[] inodes = rootDir.getExistingPathINodes(src, true);
+    INode inode = inodes[inodes.length - 1];
+    if (inode == null) {
+      return null;
     }
-    return fileBlocks;
+    assert !inode.isLink();
+    if (inode.isDirectory()) {
+      return null;
+    }
+    INodeFile fileNode = (INodeFile)inode;
+    oldReplication[0] = fileNode.getReplication();
+
+    // check disk quota
+    long dsDelta = (replication - oldReplication[0]) *
+         (fileNode.diskspaceConsumed()/oldReplication[0]);
+    updateCount(inodes, inodes.length-1, 0, dsDelta, true);
+
+    fileNode.setReplication(replication);
+    return fileNode.getBlocks();
   }
 
   /**
@@ -876,55 +882,57 @@
     }
   }
 
-  void setPermission(String src, FsPermission permission
-      ) throws FileNotFoundException, UnresolvedLinkException {
-    unprotectedSetPermission(src, permission);
+  void setPermission(String src, FsPermission permission)
+      throws FileNotFoundException, UnresolvedLinkException {
+    writeLock();
+    try {
+      unprotectedSetPermission(src, permission);
+    } finally {
+      writeUnlock();
+    }
     fsImage.getEditLog().logSetPermissions(src, permission);
   }
 
   void unprotectedSetPermission(String src, FsPermission permissions) 
-    throws FileNotFoundException, UnresolvedLinkException {
+      throws FileNotFoundException, UnresolvedLinkException {
+    assert hasWriteLock();
+    INode inode = rootDir.getNode(src, true);
+    if (inode == null) {
+      throw new FileNotFoundException("File does not exist: " + src);
+    }
+    inode.setPermission(permissions);
+  }
+
+  void setOwner(String src, String username, String groupname)
+      throws FileNotFoundException, UnresolvedLinkException {
     writeLock();
     try {
-        INode inode = rootDir.getNode(src, true);
-        if (inode == null) {
-            throw new FileNotFoundException("File does not exist: " + src);
-        }
-        inode.setPermission(permissions);
+      unprotectedSetOwner(src, username, groupname);
     } finally {
       writeUnlock();
     }
-  }
-
-  void setOwner(String src, String username, String groupname
-      ) throws FileNotFoundException, UnresolvedLinkException {
-    unprotectedSetOwner(src, username, groupname);
     fsImage.getEditLog().logSetOwner(src, username, groupname);
   }
 
   void unprotectedSetOwner(String src, String username, String groupname) 
-    throws FileNotFoundException, UnresolvedLinkException {
-    writeLock();
-    try {
-      INode inode = rootDir.getNode(src, true);
-      if (inode == null) {
-          throw new FileNotFoundException("File does not exist: " + src);
-      }
-      if (username != null) {
-        inode.setUser(username);
-      }
-      if (groupname != null) {
-        inode.setGroup(groupname);
-      }
-    } finally {
-      writeUnlock();
+      throws FileNotFoundException, UnresolvedLinkException {
+    assert hasWriteLock();
+    INode inode = rootDir.getNode(src, true);
+    if (inode == null) {
+      throw new FileNotFoundException("File does not exist: " + src);
+    }
+    if (username != null) {
+      inode.setUser(username);
+    }
+    if (groupname != null) {
+      inode.setGroup(groupname);
     }
   }
 
   /**
    * Concat all the blocks from srcs to trg and delete the srcs files
    */
-  public void concatInternal(String target, String [] srcs) 
+  public void concat(String target, String [] srcs) 
       throws UnresolvedLinkException {
     writeLock();
     try {
@@ -950,6 +958,7 @@
    */
   public void unprotectedConcat(String target, String [] srcs, long timestamp) 
       throws UnresolvedLinkException {
+    assert hasWriteLock();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
     }
@@ -999,7 +1008,13 @@
     }
     waitForReady();
     long now = now();
-    int filesRemoved = unprotectedDelete(src, collectedBlocks, now);
+    int filesRemoved;
+    writeLock();
+    try {
+      filesRemoved = unprotectedDelete(src, collectedBlocks, now);
+    } finally {
+      writeUnlock();
+    }
     if (filesRemoved <= 0) {
       return false;
     }
@@ -1052,6 +1067,7 @@
    */ 
   void unprotectedDelete(String src, long mtime) 
     throws UnresolvedLinkException {
+    assert hasWriteLock();
     List<Block> collectedBlocks = new ArrayList<Block>();
     int filesRemoved = unprotectedDelete(src, collectedBlocks, mtime);
     if (filesRemoved > 0) {
@@ -1069,43 +1085,39 @@
    */ 
   int unprotectedDelete(String src, List<Block> collectedBlocks, 
       long mtime) throws UnresolvedLinkException {
+    assert hasWriteLock();
     src = normalizePath(src);
 
-    writeLock();
-    try {
-      INode[] inodes =  rootDir.getExistingPathINodes(src, false);
-      INode targetNode = inodes[inodes.length-1];
+    INode[] inodes =  rootDir.getExistingPathINodes(src, false);
+    INode targetNode = inodes[inodes.length-1];
 
-      if (targetNode == null) { // non-existent src
-        if(NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
-              +"failed to remove "+src+" because it does not exist");
-        }
-        return 0;
-      }
-      if (inodes.length == 1) { // src is the root
-        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
-            "failed to remove " + src +
-            " because the root is not allowed to be deleted");
-        return 0;
-      }
-      int pos = inodes.length - 1;
-      // Remove the node from the namespace
-      targetNode = removeChild(inodes, pos);
-      if (targetNode == null) {
-        return 0;
-      }
-      // set the parent's modification time
-      inodes[pos-1].setModificationTime(mtime);
-      int filesRemoved = targetNode.collectSubtreeBlocksAndClear(collectedBlocks);
-      if (NameNode.stateChangeLog.isDebugEnabled()) {
+    if (targetNode == null) { // non-existent src
+      if(NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
-            +src+" is removed");
+            +"failed to remove "+src+" because it does not exist");
       }
-      return filesRemoved;
-    } finally {
-      writeUnlock();
+      return 0;
     }
+    if (inodes.length == 1) { // src is the root
+      NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
+          "failed to remove " + src +
+          " because the root is not allowed to be deleted");
+      return 0;
+    }
+    int pos = inodes.length - 1;
+    // Remove the node from the namespace
+    targetNode = removeChild(inodes, pos);
+    if (targetNode == null) {
+      return 0;
+    }
+    // set the parent's modification time
+    inodes[pos-1].setModificationTime(mtime);
+    int filesRemoved = targetNode.collectSubtreeBlocksAndClear(collectedBlocks);
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
+          +src+" is removed");
+    }
+    return filesRemoved;
   }
 
   /**
@@ -1350,6 +1362,7 @@
   private void updateCount(INode[] inodes, int numOfINodes, 
                            long nsDelta, long dsDelta, boolean checkQuota)
                            throws QuotaExceededException {
+    assert hasWriteLock();
     if (!ready) {
       //still initializing. do not check or update quotas.
       return;
@@ -1374,6 +1387,7 @@
    */ 
   private void updateCountNoQuotaCheck(INode[] inodes, int numOfINodes, 
                            long nsDelta, long dsDelta) {
+    assert hasWriteLock();
     try {
       updateCount(inodes, numOfINodes, nsDelta, dsDelta, false);
     } catch (QuotaExceededException e) {
@@ -1391,6 +1405,7 @@
    */
    void unprotectedUpdateCount(INode[] inodes, int numOfINodes, 
                                       long nsDelta, long dsDelta) {
+     assert hasWriteLock();
     for(int i=0; i < numOfINodes; i++) {
       if (inodes[i].isQuotaSet()) { // a directory with quota
         INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
@@ -1501,17 +1516,14 @@
   INode unprotectedMkdir(String src, PermissionStatus permissions,
                           long timestamp) throws QuotaExceededException,
                           UnresolvedLinkException {
+    assert hasWriteLock();
     byte[][] components = INode.getPathComponents(src);
     INode[] inodes = new INode[components.length];
-    writeLock();
-    try {
-      rootDir.getExistingPathINodes(components, inodes, false);
-      unprotectedMkdir(inodes, inodes.length-1, components[inodes.length-1],
-          permissions, false, timestamp);
-      return inodes[inodes.length-1];
-    } finally {
-      writeUnlock();
-    }
+
+    rootDir.getExistingPathINodes(components, inodes, false);
+    unprotectedMkdir(inodes, inodes.length-1, components[inodes.length-1],
+        permissions, false, timestamp);
+    return inodes[inodes.length-1];
   }
 
   /** create a directory at index pos.
@@ -1521,6 +1533,7 @@
   private void unprotectedMkdir(INode[] inodes, int pos,
       byte[] name, PermissionStatus permission, boolean inheritPermission,
       long timestamp) throws QuotaExceededException {
+    assert hasWriteLock();
     inodes[pos] = addChild(inodes, pos, 
         new INodeDirectory(name, permission, timestamp),
         -1, inheritPermission );
@@ -1851,7 +1864,8 @@
    */
   INodeDirectory unprotectedSetQuota(String src, long nsQuota, long dsQuota)
     throws FileNotFoundException, QuotaExceededException, 
-    UnresolvedLinkException {
+      UnresolvedLinkException {
+    assert hasWriteLock();
     // sanity check
     if ((nsQuota < 0 && nsQuota != FSConstants.QUOTA_DONT_SET && 
          nsQuota < FSConstants.QUOTA_RESET) || 
@@ -1864,50 +1878,45 @@
     
     String srcs = normalizePath(src);
 
-    writeLock();
-    try {
-      INode[] inodes = rootDir.getExistingPathINodes(src, true);
-      INode targetNode = inodes[inodes.length-1];
-      if (targetNode == null) {
-        throw new FileNotFoundException("Directory does not exist: " + srcs);
-      } else if (!targetNode.isDirectory()) {
-        throw new FileNotFoundException("Cannot set quota on a file: " + srcs);  
-      } else if (targetNode.isRoot() && nsQuota == FSConstants.QUOTA_RESET) {
-        throw new IllegalArgumentException("Cannot clear namespace quota on root.");
-      } else { // a directory inode
-        INodeDirectory dirNode = (INodeDirectory)targetNode;
-        long oldNsQuota = dirNode.getNsQuota();
-        long oldDsQuota = dirNode.getDsQuota();
-        if (nsQuota == FSConstants.QUOTA_DONT_SET) {
-          nsQuota = oldNsQuota;
-        }
-        if (dsQuota == FSConstants.QUOTA_DONT_SET) {
-          dsQuota = oldDsQuota;
-        }        
+    INode[] inodes = rootDir.getExistingPathINodes(src, true);
+    INode targetNode = inodes[inodes.length-1];
+    if (targetNode == null) {
+      throw new FileNotFoundException("Directory does not exist: " + srcs);
+    } else if (!targetNode.isDirectory()) {
+      throw new FileNotFoundException("Cannot set quota on a file: " + srcs);  
+    } else if (targetNode.isRoot() && nsQuota == FSConstants.QUOTA_RESET) {
+      throw new IllegalArgumentException("Cannot clear namespace quota on root.");
+    } else { // a directory inode
+      INodeDirectory dirNode = (INodeDirectory)targetNode;
+      long oldNsQuota = dirNode.getNsQuota();
+      long oldDsQuota = dirNode.getDsQuota();
+      if (nsQuota == FSConstants.QUOTA_DONT_SET) {
+        nsQuota = oldNsQuota;
+      }
+      if (dsQuota == FSConstants.QUOTA_DONT_SET) {
+        dsQuota = oldDsQuota;
+      }        
 
-        if (dirNode instanceof INodeDirectoryWithQuota) { 
-          // a directory with quota; so set the quota to the new value
-          ((INodeDirectoryWithQuota)dirNode).setQuota(nsQuota, dsQuota);
-          if (!dirNode.isQuotaSet()) {
-            // will not come here for root because root's nsQuota is always set
-            INodeDirectory newNode = new INodeDirectory(dirNode);
-            INodeDirectory parent = (INodeDirectory)inodes[inodes.length-2];
-            dirNode = newNode;
-            parent.replaceChild(newNode);
-          }
-        } else {
-          // a non-quota directory; so replace it with a directory with quota
-          INodeDirectoryWithQuota newNode = 
-            new INodeDirectoryWithQuota(nsQuota, dsQuota, dirNode);
-          // non-root directory node; parent != null
+      if (dirNode instanceof INodeDirectoryWithQuota) { 
+        // a directory with quota; so set the quota to the new value
+        ((INodeDirectoryWithQuota)dirNode).setQuota(nsQuota, dsQuota);
+        if (!dirNode.isQuotaSet()) {
+          // will not come here for root because root's nsQuota is always set
+          INodeDirectory newNode = new INodeDirectory(dirNode);
           INodeDirectory parent = (INodeDirectory)inodes[inodes.length-2];
           dirNode = newNode;
           parent.replaceChild(newNode);
         }
-        return (oldNsQuota != nsQuota || oldDsQuota != dsQuota) ? dirNode : null;
+      } else {
+        // a non-quota directory; so replace it with a directory with quota
+        INodeDirectoryWithQuota newNode = 
+          new INodeDirectoryWithQuota(nsQuota, dsQuota, dirNode);
+        // non-root directory node; parent != null
+        INodeDirectory parent = (INodeDirectory)inodes[inodes.length-2];
+        dirNode = newNode;
+        parent.replaceChild(newNode);
       }
-    } finally {
-      writeUnlock();
+      return (oldNsQuota != nsQuota || oldDsQuota != dsQuota) ? dirNode : null;
     }
   }
   
@@ -1941,27 +1950,31 @@
   }
 
   /**
-   * Sets the access time on the file. Logs it in the transaction log
+   * Sets the access time on the file. Logs it in the transaction log.
    */
   void setTimes(String src, INodeFile inode, long mtime, long atime, boolean force) {
-    if (unprotectedSetTimes(src, inode, mtime, atime, force)) {
+    boolean status = false;
+    writeLock();
+    try {
+      status = unprotectedSetTimes(src, inode, mtime, atime, force);
+    } finally {
+      writeUnlock();
+    }
+    if (status) {
       fsImage.getEditLog().logTimes(src, mtime, atime);
     }
   }
 
   boolean unprotectedSetTimes(String src, long mtime, long atime, boolean force) 
-    throws UnresolvedLinkException {
-    writeLock();
-    try {
-      INodeFile inode = getFileINode(src);
-      return unprotectedSetTimes(src, inode, mtime, atime, force);
-    } finally {
-      writeUnlock();
-    }
+      throws UnresolvedLinkException {
+    assert hasWriteLock();
+    INodeFile inode = getFileINode(src);
+    return unprotectedSetTimes(src, inode, mtime, atime, force);
   }
 
   private boolean unprotectedSetTimes(String src, INodeFile inode, long mtime,
                                       long atime, boolean force) {
+    assert hasWriteLock();
     boolean status = false;
     if (mtime != -1) {
       inode.setModificationTimeForce(mtime);
@@ -2040,6 +2053,7 @@
     */
     private HdfsLocatedFileStatus createLocatedFileStatus(
         byte[] path, INode node) throws IOException {
+      assert hasReadLock();
       long size = 0;     // length is zero for directories
       short replication = 0;
       long blocksize = 0;
@@ -2088,8 +2102,14 @@
       }
     }
     final String userName = dirPerms.getUserName();
-    INodeSymlink newNode = unprotectedSymlink(path, target, modTime, modTime,
-      new PermissionStatus(userName, null, FsPermission.getDefault()));         
+    INodeSymlink newNode  = null;
+    writeLock();
+    try {
+      newNode = unprotectedSymlink(path, target, modTime, modTime,
+          new PermissionStatus(userName, null, FsPermission.getDefault()));
+    } finally {
+      writeUnlock();
+    }
     if (newNode == null) {
       NameNode.stateChangeLog.info("DIR* FSDirectory.addSymlink: "
                                    +"failed to add "+path
@@ -2111,14 +2131,10 @@
   INodeSymlink unprotectedSymlink(String path, String target, long modTime, 
                                   long atime, PermissionStatus perm) 
       throws UnresolvedLinkException {
+    assert hasWriteLock();
     INodeSymlink newNode = new INodeSymlink(target, modTime, atime, perm);
     try {
-      writeLock();
-      try {
-        newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE, false);
-      } finally {
-        writeUnlock();
-      }
+      newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE, false);
     } catch (UnresolvedLinkException e) {
       /* All UnresolvedLinkExceptions should have been resolved by now, but we
        * should re-throw them in case that changes so they are not swallowed 
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 0c344bd..8d1ad10 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -129,6 +129,9 @@
         numOpRenewDelegationToken = 0, numOpCancelDelegationToken = 0, 
         numOpUpdateMasterKey = 0, numOpReassignLease = 0, numOpOther = 0;
 
+    fsNamesys.writeLock();
+    fsDir.writeLock();
+
     // Keep track of the file offsets of the last several opcodes.
     // This is handy when manually recovering corrupted edits files.
     PositionTrackingInputStream tracker = new PositionTrackingInputStream(in);
@@ -248,7 +251,7 @@
             HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
             fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
                                       renameOp.timestamp);
-            fsNamesys.changeLease(renameOp.src, renameOp.dst, dinfo);
+            fsNamesys.unprotectedChangeLease(renameOp.src, renameOp.dst, dinfo);
             break;
           }
           case OP_DELETE: {
@@ -339,7 +342,7 @@
             HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
             fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
                                       renameOp.timestamp, renameOp.options);
-            fsNamesys.changeLease(renameOp.src, renameOp.dst, dinfo);
+            fsNamesys.unprotectedChangeLease(renameOp.src, renameOp.dst, dinfo);
             break;
           }
           case OP_GET_DELEGATION_TOKEN: {
@@ -423,6 +426,9 @@
       String errorMessage = sb.toString();
       FSImage.LOG.error(errorMessage);
       throw new IOException(errorMessage, t);
+    } finally {
+      fsDir.writeUnlock();
+      fsNamesys.writeUnlock();
     }
     if (FSImage.LOG.isDebugEnabled()) {
       FSImage.LOG.debug("numOpAdd = " + numOpAdd + " numOpClose = " + numOpClose 
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 4f1b645..19b5002 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -491,6 +491,14 @@
     return this.fsLock.isWriteLockedByCurrentThread();
   }
 
+  boolean hasReadLock() {
+    return this.fsLock.getReadHoldCount() > 0;
+  }
+
+  boolean hasReadOrWriteLock() {
+    return hasReadLock() || hasWriteLock();
+  }
+
   /**
    * dirs is a list of directories where the filesystem directory state 
    * is stored
@@ -601,11 +609,14 @@
   }
   
   NamespaceInfo getNamespaceInfo() {
-    return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
-                             getClusterId(),
-                             getBlockPoolId(),
-                             dir.fsImage.getStorage().getCTime(),
-                             getDistributedUpgradeVersion());
+    readLock();
+    try {
+      return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
+          getClusterId(), getBlockPoolId(),
+          dir.fsImage.getStorage().getCTime(), getDistributedUpgradeVersion());
+    } finally {
+      readUnlock();
+    }
   }
 
   /**
@@ -654,32 +665,32 @@
   void metaSave(String filename) throws IOException {
     writeLock();
     try {
-    checkSuperuserPrivilege();
-    File file = new File(System.getProperty("hadoop.log.dir"), filename);
-    PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
-        true)));
-
-    long totalInodes = this.dir.totalInodes();
-    long totalBlocks = this.getBlocksTotal();
-
-    ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
-    ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
-    this.DFSNodesStatus(live, dead);
-    
-    String str = totalInodes + " files and directories, " + totalBlocks
-        + " blocks = " + (totalInodes + totalBlocks) + " total";
-    out.println(str);
-    out.println("Live Datanodes: "+live.size());
-    out.println("Dead Datanodes: "+dead.size());
-    blockManager.metaSave(out);
-
-    //
-    // Dump all datanodes
-    //
-    datanodeDump(out);
-
-    out.flush();
-    out.close();
+      checkSuperuserPrivilege();
+      File file = new File(System.getProperty("hadoop.log.dir"), filename);
+      PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
+          true)));
+  
+      long totalInodes = this.dir.totalInodes();
+      long totalBlocks = this.getBlocksTotal();
+  
+      ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
+      ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
+      this.DFSNodesStatus(live, dead);
+      
+      String str = totalInodes + " files and directories, " + totalBlocks
+          + " blocks = " + (totalInodes + totalBlocks) + " total";
+      out.println(str);
+      out.println("Live Datanodes: "+live.size());
+      out.println("Dead Datanodes: "+dead.size());
+      blockManager.metaSave(out);
+  
+      //
+      // Dump all datanodes
+      //
+      datanodeDump(out);
+  
+      out.flush();
+      out.close();
     } finally {
       writeUnlock();
     }
@@ -717,46 +728,46 @@
       throws IOException {
     readLock();
     try {
-    checkSuperuserPrivilege();
-
-    DatanodeDescriptor node = getDatanode(datanode);
-    if (node == null) {
-      NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
-          + "Asking for blocks from an unrecorded node " + datanode.getName());
-      throw new IllegalArgumentException(
-          "Unexpected exception.  Got getBlocks message for datanode " +
-          datanode.getName() + ", but there is no info for it");
-    }
-
-    int numBlocks = node.numBlocks();
-    if(numBlocks == 0) {
-      return new BlocksWithLocations(new BlockWithLocations[0]);
-    }
-    Iterator<BlockInfo> iter = node.getBlockIterator();
-    int startBlock = r.nextInt(numBlocks); // starting from a random block
-    // skip blocks
-    for(int i=0; i<startBlock; i++) {
-      iter.next();
-    }
-    List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
-    long totalSize = 0;
-    BlockInfo curBlock;
-    while(totalSize<size && iter.hasNext()) {
-      curBlock = iter.next();
-      if(!curBlock.isComplete())  continue;
-      totalSize += addBlock(curBlock, results);
-    }
-    if(totalSize<size) {
-      iter = node.getBlockIterator(); // start from the beginning
-      for(int i=0; i<startBlock&&totalSize<size; i++) {
+      checkSuperuserPrivilege();
+  
+      DatanodeDescriptor node = getDatanode(datanode);
+      if (node == null) {
+        NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
+            + "Asking for blocks from an unrecorded node " + datanode.getName());
+        throw new IllegalArgumentException(
+            "Unexpected exception.  Got getBlocks message for datanode " +
+            datanode.getName() + ", but there is no info for it");
+      }
+  
+      int numBlocks = node.numBlocks();
+      if(numBlocks == 0) {
+        return new BlocksWithLocations(new BlockWithLocations[0]);
+      }
+      Iterator<BlockInfo> iter = node.getBlockIterator();
+      int startBlock = r.nextInt(numBlocks); // starting from a random block
+      // skip blocks
+      for(int i=0; i<startBlock; i++) {
+        iter.next();
+      }
+      List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
+      long totalSize = 0;
+      BlockInfo curBlock;
+      while(totalSize<size && iter.hasNext()) {
         curBlock = iter.next();
         if(!curBlock.isComplete())  continue;
         totalSize += addBlock(curBlock, results);
       }
-    }
-
-    return new BlocksWithLocations(
-        results.toArray(new BlockWithLocations[results.size()]));
+      if(totalSize<size) {
+        iter = node.getBlockIterator(); // start from the beginning
+        for(int i=0; i<startBlock&&totalSize<size; i++) {
+          curBlock = iter.next();
+          if(!curBlock.isComplete())  continue;
+          totalSize += addBlock(curBlock, results);
+        }
+      }
+  
+      return new BlocksWithLocations(
+          results.toArray(new BlockWithLocations[results.size()]));
     } finally {
       readUnlock();
     }
@@ -777,6 +788,7 @@
    * return the length of the added block; 0 if the block is not added
    */
   private long addBlock(Block block, List<BlockWithLocations> results) {
+    assert hasReadOrWriteLock();
     ArrayList<String> machineSet = blockManager.getValidLocations(block);
     if(machineSet.size() == 0) {
       return 0;
@@ -799,22 +811,26 @@
   public void setPermission(String src, FsPermission permission)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
+    HdfsFileStatus resultingStat = null;
     writeLock();
     try {
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot set permission for " + src, safeMode);
-    checkOwner(src);
-    dir.setPermission(src, permission);
-    getEditLog().logSync();
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      final HdfsFileStatus stat = dir.getFileInfo(src, false);
-      logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    Server.getRemoteIp(),
-                    "setPermission", src, null, stat);
-    }
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot set permission for " + src, safeMode);
+      }
+      checkOwner(src);
+      dir.setPermission(src, permission);
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        resultingStat = dir.getFileInfo(src, false);
+      }
     } finally {
       writeUnlock();
     }
+    getEditLog().logSync();
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      logAuditEvent(UserGroupInformation.getCurrentUser(),
+                    Server.getRemoteIp(),
+                    "setPermission", src, null, resultingStat);
+    }
   }
 
   /**
@@ -824,31 +840,35 @@
   public void setOwner(String src, String username, String group)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
+    HdfsFileStatus resultingStat = null;
     writeLock();
     try {
-    if (isInSafeMode())
+      if (isInSafeMode()) {
         throw new SafeModeException("Cannot set owner for " + src, safeMode);
-    FSPermissionChecker pc = checkOwner(src);
-    if (!pc.isSuper) {
-      if (username != null && !pc.user.equals(username)) {
-        throw new AccessControlException("Non-super user cannot change owner.");
       }
-      if (group != null && !pc.containsGroup(group)) {
-        throw new AccessControlException("User does not belong to " + group
+      FSPermissionChecker pc = checkOwner(src);
+      if (!pc.isSuper) {
+        if (username != null && !pc.user.equals(username)) {
+          throw new AccessControlException("Non-super user cannot change owner.");
+        }
+        if (group != null && !pc.containsGroup(group)) {
+          throw new AccessControlException("User does not belong to " + group
             + " .");
+        }
       }
-    }
-    dir.setOwner(src, username, group);
-    getEditLog().logSync();
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      final HdfsFileStatus stat = dir.getFileInfo(src, false);
-      logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    Server.getRemoteIp(),
-                    "setOwner", src, null, stat);
-    }
+      dir.setOwner(src, username, group);
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        resultingStat = dir.getFileInfo(src, false);
+      }
     } finally {
       writeUnlock();
     }
+    getEditLog().logSync();
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+      logAuditEvent(UserGroupInformation.getCurrentUser(),
+                    Server.getRemoteIp(),
+                    "setOwner", src, null, resultingStat);
+    }
   }
 
   /**
@@ -876,7 +896,7 @@
   /**
    * Get block locations within the specified range.
    * @see ClientProtocol#getBlockLocations(String, long, long)
-   * @throws FileNotFoundException
+   * @throws FileNotFoundException, UnresolvedLinkException, IOException
    */
   LocatedBlocks getBlockLocations(String src, long offset, long length,
       boolean doAccessTime, boolean needBlockToken) throws FileNotFoundException,
@@ -893,7 +913,7 @@
       throw new HadoopIllegalArgumentException(
           "Negative length is not supported. File: " + src);
     }
-    final LocatedBlocks ret = getBlockLocationsInternal(src,
+    final LocatedBlocks ret = getBlockLocationsUpdateTimes(src,
         offset, length, doAccessTime, needBlockToken);  
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
@@ -903,7 +923,11 @@
     return ret;
   }
 
-  private LocatedBlocks getBlockLocationsInternal(String src,
+  /*
+   * Get block locations within the specified range, updating the
+   * access times if necessary. 
+   */
+  private LocatedBlocks getBlockLocationsUpdateTimes(String src,
                                                        long offset, 
                                                        long length,
                                                        boolean doAccessTime, 
@@ -954,8 +978,7 @@
   LocatedBlocks getBlockLocationsInternal(INodeFile inode,
       long offset, long length, boolean needBlockToken)
   throws IOException {
-    readLock();
-    try {
+    assert hasReadOrWriteLock();
     final BlockInfo[] blocks = inode.getBlocks();
     if (LOG.isDebugEnabled()) {
       LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
@@ -987,9 +1010,6 @@
       return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
           lastBlock, last.isComplete());
     }
-    } finally {
-      readUnlock();
-    }
   }
 
   /** Create a LocatedBlock. */
@@ -1021,146 +1041,152 @@
    * @throws IOException
    */
   public void concat(String target, String [] srcs) 
-    throws IOException, UnresolvedLinkException {
+      throws IOException, UnresolvedLinkException {
     if(FSNamesystem.LOG.isDebugEnabled()) {
       FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
           " to " + target);
     }
-    // check safe mode
-    if (isInSafeMode()) {
-      throw new SafeModeException("concat: cannot concat " + target, safeMode);
-    }
     
     // verify args
     if(target.isEmpty()) {
-      throw new IllegalArgumentException("concat: trg file name is empty");
+      throw new IllegalArgumentException("Target file name is empty");
     }
     if(srcs == null || srcs.length == 0) {
-      throw new IllegalArgumentException("concat: srcs list is empty or null");
+      throw new IllegalArgumentException("No sources given");
     }
     
-    // currently we require all the files to be in the same dir
+    // We require all files be in the same directory
     String trgParent = 
       target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR));
-    for(String s : srcs) {
+    for (String s : srcs) {
       String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR));
-      if(! srcParent.equals(trgParent)) {
-        throw new IllegalArgumentException
-           ("concat:  srcs and target shoould be in same dir");
+      if (!srcParent.equals(trgParent)) {
+        throw new IllegalArgumentException(
+           "Sources and target are not in the same directory");
       }
     }
-    
+
+    HdfsFileStatus resultingStat = null;
     writeLock();
     try {
-      // write permission for the target
-      if (isPermissionEnabled) {
-        checkPathAccess(target, FsAction.WRITE);
-
-        // and srcs
-        for(String aSrc: srcs) {
-          checkPathAccess(aSrc, FsAction.READ); // read the file
-          checkParentAccess(aSrc, FsAction.WRITE); // for delete 
-        }
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot concat " + target, safeMode);
       }
-
-
-      // to make sure no two files are the same
-      Set<INode> si = new HashSet<INode>();
-
-      // we put the following prerequisite for the operation
-      // replication and blocks sizes should be the same for ALL the blocks
-      // check the target
-      INode inode = dir.getFileINode(target);
-
-      if(inode == null) {
-        throw new IllegalArgumentException("concat: trg file doesn't exist");
+      concatInternal(target, srcs);
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        resultingStat = dir.getFileInfo(target, false);
       }
-      if(inode.isUnderConstruction()) {
-        throw new IllegalArgumentException("concat: trg file is uner construction");
-      }
-
-      INodeFile trgInode = (INodeFile) inode;
-
-      // per design trg shouldn't be empty and all the blocks same size
-      if(trgInode.blocks.length == 0) {
-        throw new IllegalArgumentException("concat: "+ target + " file is empty");
-      }
-
-      long blockSize = trgInode.getPreferredBlockSize();
-
-      // check the end block to be full
-      if(blockSize != trgInode.blocks[trgInode.blocks.length-1].getNumBytes()) {
-        throw new IllegalArgumentException(target + " blocks size should be the same");
-      }
-
-      si.add(trgInode);
-      short repl = trgInode.getReplication();
-
-      // now check the srcs
-      boolean endSrc = false; // final src file doesn't have to have full end block
-      for(int i=0; i<srcs.length; i++) {
-        String src = srcs[i];
-        if(i==srcs.length-1)
-          endSrc=true;
-
-        INodeFile srcInode = dir.getFileINode(src);
-
-        if(src.isEmpty() 
-            || srcInode == null
-            || srcInode.isUnderConstruction()
-            || srcInode.blocks.length == 0) {
-          throw new IllegalArgumentException("concat: file " + src + 
-          " is invalid or empty or underConstruction");
-        }
-
-        // check replication and blocks size
-        if(repl != srcInode.getReplication()) {
-          throw new IllegalArgumentException(src + " and " + target + " " +
-              "should have same replication: "
-              + repl + " vs. " + srcInode.getReplication());
-        }
-
-        //boolean endBlock=false;
-        // verify that all the blocks are of the same length as target
-        // should be enough to check the end blocks
-        int idx = srcInode.blocks.length-1;
-        if(endSrc)
-          idx = srcInode.blocks.length-2; // end block of endSrc is OK not to be full
-        if(idx >= 0 && srcInode.blocks[idx].getNumBytes() != blockSize) {
-          throw new IllegalArgumentException("concat: blocks sizes of " + 
-              src + " and " + target + " should all be the same");
-        }
-
-        si.add(srcInode);
-      }
-
-      // make sure no two files are the same
-      if(si.size() < srcs.length+1) { // trg + srcs
-        // it means at least two files are the same
-        throw new IllegalArgumentException("at least two files are the same");
-      }
-
-      if(NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " + 
-            Arrays.toString(srcs) + " to " + target);
-      }
-
-      dir.concatInternal(target,srcs);
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-   
-    
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      final HdfsFileStatus stat = dir.getFileInfo(target, false);
       logAuditEvent(UserGroupInformation.getLoginUser(),
                     Server.getRemoteIp(),
-                    "concat", Arrays.toString(srcs), target, stat);
+                    "concat", Arrays.toString(srcs), target, resultingStat);
     }
-   
   }
 
+  /** See {@link #concat(String, String[])} */
+  public void concatInternal(String target, String [] srcs) 
+      throws IOException, UnresolvedLinkException {
+    assert hasWriteLock();
+
+    // write permission for the target
+    if (isPermissionEnabled) {
+      checkPathAccess(target, FsAction.WRITE);
+
+      // and srcs
+      for(String aSrc: srcs) {
+        checkPathAccess(aSrc, FsAction.READ); // read the file
+        checkParentAccess(aSrc, FsAction.WRITE); // for delete 
+      }
+    }
+
+    // to make sure no two files are the same
+    Set<INode> si = new HashSet<INode>();
+
+    // we put the following prerequisite for the operation
+    // replication and blocks sizes should be the same for ALL the blocks
+    // check the target
+    INode inode = dir.getFileINode(target);
+
+    if(inode == null) {
+      throw new IllegalArgumentException("concat: trg file doesn't exist");
+    }
+    if(inode.isUnderConstruction()) {
+      throw new IllegalArgumentException("concat: trg file is uner construction");
+    }
+
+    INodeFile trgInode = (INodeFile) inode;
+
+    // per design trg shouldn't be empty and all the blocks same size
+    if(trgInode.blocks.length == 0) {
+      throw new IllegalArgumentException("concat: "+ target + " file is empty");
+    }
+
+    long blockSize = trgInode.getPreferredBlockSize();
+
+    // check the end block to be full
+    if(blockSize != trgInode.blocks[trgInode.blocks.length-1].getNumBytes()) {
+      throw new IllegalArgumentException(target + " blocks size should be the same");
+    }
+
+    si.add(trgInode);
+    short repl = trgInode.getReplication();
+
+    // now check the srcs
+    boolean endSrc = false; // final src file doesn't have to have full end block
+    for(int i=0; i<srcs.length; i++) {
+      String src = srcs[i];
+      if(i==srcs.length-1)
+        endSrc=true;
+
+      INodeFile srcInode = dir.getFileINode(src);
+
+      if(src.isEmpty() 
+          || srcInode == null
+          || srcInode.isUnderConstruction()
+          || srcInode.blocks.length == 0) {
+        throw new IllegalArgumentException("concat: file " + src + 
+        " is invalid or empty or underConstruction");
+      }
+
+      // check replication and blocks size
+      if(repl != srcInode.getReplication()) {
+        throw new IllegalArgumentException(src + " and " + target + " " +
+            "should have same replication: "
+            + repl + " vs. " + srcInode.getReplication());
+      }
+
+      //boolean endBlock=false;
+      // verify that all the blocks are of the same length as target
+      // should be enough to check the end blocks
+      int idx = srcInode.blocks.length-1;
+      if(endSrc)
+        idx = srcInode.blocks.length-2; // end block of endSrc is OK not to be full
+      if(idx >= 0 && srcInode.blocks[idx].getNumBytes() != blockSize) {
+        throw new IllegalArgumentException("concat: blocks sizes of " + 
+            src + " and " + target + " should all be the same");
+      }
+
+      si.add(srcInode);
+    }
+
+    // make sure no two files are the same
+    if(si.size() < srcs.length+1) { // trg + srcs
+      // it means at least two files are the same
+      throw new IllegalArgumentException("at least two files are the same");
+    }
+
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " + 
+          Arrays.toString(srcs) + " to " + target);
+    }
+
+    dir.concat(target,srcs);
+  }
+  
   /**
    * stores the modification and access time for this inode. 
    * The access time is precise upto an hour. The transaction, if needed, is
@@ -1174,23 +1200,22 @@
     }
     writeLock();
     try {
-    //
-    // The caller needs to have write access to set access & modification times.
-    if (isPermissionEnabled) {
-      checkPathAccess(src, FsAction.WRITE);
-    }
-    INodeFile inode = dir.getFileINode(src);
-    if (inode != null) {
-      dir.setTimes(src, inode, mtime, atime, true);
-      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-        final HdfsFileStatus stat = dir.getFileInfo(src, false);
-        logAuditEvent(UserGroupInformation.getCurrentUser(),
-                      Server.getRemoteIp(),
-                      "setTimes", src, null, stat);
+      // Write access is required to set access and modification times
+      if (isPermissionEnabled) {
+        checkPathAccess(src, FsAction.WRITE);
       }
-    } else {
-      throw new FileNotFoundException("File " + src + " does not exist.");
-    }
+      INodeFile inode = dir.getFileINode(src);
+      if (inode != null) {
+        dir.setTimes(src, inode, mtime, atime, true);
+        if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+          final HdfsFileStatus stat = dir.getFileInfo(src, false);
+          logAuditEvent(UserGroupInformation.getCurrentUser(),
+                        Server.getRemoteIp(),
+                        "setTimes", src, null, stat);
+        }
+      } else {
+        throw new FileNotFoundException("File " + src + " does not exist.");
+      }
     } finally {
       writeUnlock();
     }
@@ -1202,21 +1227,24 @@
   public void createSymlink(String target, String link,
       PermissionStatus dirPerms, boolean createParent) 
       throws IOException, UnresolvedLinkException {
+    HdfsFileStatus resultingStat = null;
     writeLock();
     try {
-    if (!createParent) {
-      verifyParentDir(link);
-    }
-    createSymlinkInternal(target, link, dirPerms, createParent);
+      if (!createParent) {
+        verifyParentDir(link);
+      }
+      createSymlinkInternal(target, link, dirPerms, createParent);
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        resultingStat = dir.getFileInfo(link, false);
+      }
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      final HdfsFileStatus stat = dir.getFileInfo(link, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
-                    "createSymlink", link, target, stat);
+                    "createSymlink", link, target, resultingStat);
     }
   }
 
@@ -1226,13 +1254,11 @@
   private void createSymlinkInternal(String target, String link,
       PermissionStatus dirPerms, boolean createParent)
       throws IOException, UnresolvedLinkException {
-    writeLock();
-    try {
+    assert hasWriteLock();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.createSymlink: target=" + 
         target + " link=" + link);
     }
-
     if (isInSafeMode()) {
       throw new SafeModeException("Cannot create symlink " + link, safeMode);
     }
@@ -1251,9 +1277,6 @@
 
     // add symbolic link to namespace
     dir.addSymlink(link, target, dirPerms, createParent);
-    } finally {
-      writeUnlock();
-    }
   }
 
   /**
@@ -1271,7 +1294,16 @@
    */
   public boolean setReplication(String src, short replication) 
     throws IOException, UnresolvedLinkException {
-    boolean status = setReplicationInternal(src, replication);
+    boolean status = false;
+    writeLock();
+    try {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot set replication for " + src, safeMode);
+      }
+      status = setReplicationInternal(src, replication);
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
     if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
@@ -1284,10 +1316,7 @@
   private boolean setReplicationInternal(String src,
       short replication) throws AccessControlException, QuotaExceededException,
       SafeModeException, UnresolvedLinkException, IOException {
-    writeLock();
-    try {
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot set replication for " + src, safeMode);
+    assert hasWriteLock();
     blockManager.verifyReplication(src, replication, null);
     if (isPermissionEnabled) {
       checkPathAccess(src, FsAction.WRITE);
@@ -1317,17 +1346,19 @@
           + ". New replication is " + replication);
     }
     return true;
-    } finally {
-      writeUnlock();
-    }
   }
     
   long getPreferredBlockSize(String filename) 
-    throws IOException, UnresolvedLinkException {
-    if (isPermissionEnabled) {
-      checkTraverse(filename);
+      throws IOException, UnresolvedLinkException {
+    readLock();
+    try {
+      if (isPermissionEnabled) {
+        checkTraverse(filename);
+      }
+      return dir.getPreferredBlockSize(filename);
+    } finally {
+      readUnlock();
     }
-    return dir.getPreferredBlockSize(filename);
   }
 
   /*
@@ -1335,6 +1366,7 @@
    */
   private void verifyParentDir(String src) throws FileNotFoundException,
       ParentNotDirectoryException, UnresolvedLinkException {
+    assert hasReadOrWriteLock();
     Path parent = new Path(src).getParent();
     if (parent != null) {
       INode[] pathINodes = dir.getExistingPathINodes(parent.toString());
@@ -1360,8 +1392,13 @@
       short replication, long blockSize) throws AccessControlException,
       SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
       FileNotFoundException, ParentNotDirectoryException, IOException {
-    startFileInternal(src, permissions, holder, clientMachine, flag,
-        createParent, replication, blockSize);
+    writeLock();
+    try {
+      startFileInternal(src, permissions, holder, clientMachine, flag,
+          createParent, replication, blockSize);
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
@@ -1393,6 +1430,7 @@
       long blockSize) throws SafeModeException, FileAlreadyExistsException,
       AccessControlException, UnresolvedLinkException, FileNotFoundException,
       ParentNotDirectoryException, IOException {
+    assert hasWriteLock();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
           + ", holder=" + holder
@@ -1401,10 +1439,9 @@
           + ", replication=" + replication
           + ", createFlag=" + flag.toString());
     }
-    writeLock();
-    try {
-    if (isInSafeMode())
+    if (isInSafeMode()) {
       throw new SafeModeException("Cannot create file" + src, safeMode);
+    }
     if (!DFSUtil.isValidName(src)) {
       throw new InvalidPathException(src);
     }
@@ -1512,9 +1549,6 @@
                                    +ie.getMessage());
       throw ie;
     }
-    } finally {
-      writeUnlock();
-    }
     return null;
   }
 
@@ -1529,35 +1563,41 @@
    * @return true if the file is already closed
    * @throws IOException
    */
-  synchronized boolean recoverLease(String src, String holder, String clientMachine)
-  throws IOException {
-    if (isInSafeMode()) {
-      throw new SafeModeException(
-          "Cannot recover the lease of " + src, safeMode);
+  boolean recoverLease(String src, String holder, String clientMachine)
+      throws IOException {
+    writeLock();
+    try {
+      if (isInSafeMode()) {
+        throw new SafeModeException(
+            "Cannot recover the lease of " + src, safeMode);
+      }
+      if (!DFSUtil.isValidName(src)) {
+        throw new IOException("Invalid file name: " + src);
+      }
+  
+      INode inode = dir.getFileINode(src);
+      if (inode == null) {
+        throw new FileNotFoundException("File not found " + src);
+      }
+  
+      if (!inode.isUnderConstruction()) {
+        return true;
+      }
+      if (isPermissionEnabled) {
+        checkPathAccess(src, FsAction.WRITE);
+      }
+  
+      recoverLeaseInternal(inode, src, holder, clientMachine, true);
+    } finally {
+      writeUnlock();
     }
-    if (!DFSUtil.isValidName(src)) {
-      throw new IOException("Invalid file name: " + src);
-    }
-
-    INode inode = dir.getFileINode(src);
-    if (inode == null) {
-      throw new FileNotFoundException("File not found " + src);
-    }
-
-    if (!inode.isUnderConstruction()) {
-      return true;
-    }
-    if (isPermissionEnabled) {
-      checkPathAccess(src, FsAction.WRITE);
-    }
-
-    recoverLeaseInternal(inode, src, holder, clientMachine, true);
     return false;
   }
 
   private void recoverLeaseInternal(INode fileInode, 
       String src, String holder, String clientMachine, boolean force)
-  throws IOException {
+      throws IOException {
+    assert hasWriteLock();
     if (fileInode != null && fileInode.isUnderConstruction()) {
       INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) fileInode;
       //
@@ -1643,12 +1683,16 @@
       throw new UnsupportedOperationException("Append to hdfs not supported." +
                             " Please refer to dfs.support.append configuration parameter.");
     }
-    LocatedBlock lb = 
-      startFileInternal(src, null, holder, clientMachine, 
+    LocatedBlock lb = null;
+    writeLock();
+    try {
+      lb = startFileInternal(src, null, holder, clientMachine, 
                         EnumSet.of(CreateFlag.APPEND), 
                         false, (short)blockManager.maxReplication, (long)0);
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
-
     if (lb != null) {
       if (NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: file "
@@ -1657,7 +1701,6 @@
             +" block size " + lb.getBlock().getNumBytes());
       }
     }
-
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
@@ -1749,6 +1792,9 @@
     // Allocate a new block and record it in the INode. 
     writeLock();
     try {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot add block to " + src, safeMode);
+      }
       INode[] pathINodes = dir.getExistingPathINodes(src);
       int inodesLen = pathINodes.length;
       checkLease(src, clientName, pathINodes[inodesLen-1]);
@@ -1768,7 +1814,7 @@
     } finally {
       writeUnlock();
     }
-        
+
     // Create next block
     LocatedBlock b = new LocatedBlock(getExtendedBlock(newBlock), targets, fileLength);
     if (isBlockTokenEnabled) {
@@ -1788,6 +1834,7 @@
 
     final DatanodeDescriptor clientnode;
     final long preferredblocksize;
+    final List<DatanodeDescriptor> chosen;
     readLock();
     try {
       //check safe mode
@@ -1800,19 +1847,19 @@
       final INodeFileUnderConstruction file = checkLease(src, clientName);
       clientnode = file.getClientNode();
       preferredblocksize = file.getPreferredBlockSize();
+
+      //find datanode descriptors
+      chosen = new ArrayList<DatanodeDescriptor>();
+      for(DatanodeInfo d : existings) {
+        final DatanodeDescriptor descriptor = getDatanode(d);
+        if (descriptor != null) {
+          chosen.add(descriptor);
+        }
+      }
     } finally {
       readUnlock();
     }
 
-    //find datanode descriptors
-    final List<DatanodeDescriptor> chosen = new ArrayList<DatanodeDescriptor>();
-    for(DatanodeInfo d : existings) {
-      final DatanodeDescriptor descriptor = getDatanode(d);
-      if (descriptor != null) {
-        chosen.add(descriptor);
-      }
-    }
-
     // choose new datanodes.
     final DatanodeInfo[] targets = blockManager.replicator.chooseTarget(
         src, numAdditionalNodes, clientnode, chosen, true,
@@ -1833,21 +1880,24 @@
       UnresolvedLinkException, IOException {
     writeLock();
     try {
-    //
-    // Remove the block from the pending creates list
-    //
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
-                                    +b+"of file "+src);
-    }
-    INodeFileUnderConstruction file = checkLease(src, holder);
-    dir.removeBlock(src, file, ExtendedBlock.getLocalBlock(b));
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
-                                    + b
-                                    + " is removed from pendingCreates");
-    }
-    return true;
+      //
+      // Remove the block from the pending creates list
+      //
+      if(NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+                                      +b+"of file "+src);
+      }
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot abandon block " + b +
+                                    " for fle" + src, safeMode);
+      }
+      INodeFileUnderConstruction file = checkLease(src, holder);
+      dir.removeBlock(src, file, ExtendedBlock.getLocalBlock(b));
+      if(NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+                                      + b + " is removed from pendingCreates");
+      }
+      return true;
     } finally {
       writeUnlock();
     }
@@ -1855,7 +1905,8 @@
   
   // make sure that we still have the lease on this file.
   private INodeFileUnderConstruction checkLease(String src, String holder) 
-    throws LeaseExpiredException, UnresolvedLinkException {
+      throws LeaseExpiredException, UnresolvedLinkException {
+    assert hasReadOrWriteLock();
     INodeFile file = dir.getFileINode(src);
     checkLease(src, holder, file);
     return (INodeFileUnderConstruction)file;
@@ -1863,7 +1914,7 @@
 
   private void checkLease(String src, String holder, INode file)
       throws LeaseExpiredException {
-
+    assert hasReadOrWriteLock();
     if (file == null || file.isDirectory()) {
       Lease lease = leaseManager.getLease(holder);
       throw new LeaseExpiredException("No lease on " + src +
@@ -1896,23 +1947,29 @@
   public boolean completeFile(String src, String holder, ExtendedBlock last) 
     throws SafeModeException, UnresolvedLinkException, IOException {
     checkBlock(last);
-    boolean success = completeFileInternal(src, holder, 
+    boolean success = false;
+    writeLock();
+    try {
+      success = completeFileInternal(src, holder, 
         ExtendedBlock.getLocalBlock(last));
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
-    return success ;
+    return success;
   }
 
   private boolean completeFileInternal(String src, 
       String holder, Block last) throws SafeModeException,
       UnresolvedLinkException, IOException {
-    writeLock();
-    try {
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
+    assert hasWriteLock();
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
           src + " for " + holder);
     }
-    if (isInSafeMode())
+    if (isInSafeMode()) {
       throw new SafeModeException("Cannot complete file " + src, safeMode);
+    }
 
     INodeFileUnderConstruction pendingFile = checkLease(src, holder);
     // commit the last block and complete it if it has minimum replicas
@@ -1927,9 +1984,6 @@
     NameNode.stateChangeLog.info("DIR* NameSystem.completeFile: file " + src
                                   + " is closed by " + holder);
     return true;
-    } finally {
-      writeUnlock();
-    }
   }
 
   /** 
@@ -1958,6 +2012,7 @@
    */
   private Block allocateBlock(String src, INode[] inodes,
       DatanodeDescriptor targets[]) throws QuotaExceededException {
+    assert hasWriteLock();
     Block b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 0); 
     while(isValidBlock(b)) {
       b.setBlockId(FSNamesystem.randBlockId.nextLong());
@@ -1975,35 +2030,35 @@
    * all blocks, otherwise check only penultimate block.
    */
   boolean checkFileProgress(INodeFile v, boolean checkall) {
-    writeLock();
+    readLock();
     try {
-    if (checkall) {
-      //
-      // check all blocks of the file.
-      //
-      for (BlockInfo block: v.getBlocks()) {
-        if (!block.isComplete()) {
+      if (checkall) {
+        //
+        // check all blocks of the file.
+        //
+        for (BlockInfo block: v.getBlocks()) {
+          if (!block.isComplete()) {
+            LOG.info("BLOCK* NameSystem.checkFileProgress: "
+                + "block " + block + " has not reached minimal replication "
+                + blockManager.minReplication);
+            return false;
+          }
+        }
+      } else {
+        //
+        // check the penultimate block of this file
+        //
+        BlockInfo b = v.getPenultimateBlock();
+        if (b != null && !b.isComplete()) {
           LOG.info("BLOCK* NameSystem.checkFileProgress: "
-              + "block " + block + " has not reached minimal replication "
+              + "block " + b + " has not reached minimal replication "
               + blockManager.minReplication);
           return false;
         }
       }
-    } else {
-      //
-      // check the penultimate block of this file
-      //
-      BlockInfo b = v.getPenultimateBlock();
-      if (b != null && !b.isComplete()) {
-        LOG.info("BLOCK* NameSystem.checkFileProgress: "
-            + "block " + b + " has not reached minimal replication "
-            + blockManager.minReplication);
-        return false;
-      }
-    }
-    return true;
+      return true;
     } finally {
-      writeUnlock();
+      readUnlock();
     }
   }
 
@@ -2042,13 +2097,26 @@
   @Deprecated
   boolean renameTo(String src, String dst) 
     throws IOException, UnresolvedLinkException {
-    boolean status = renameToInternal(src, dst);
+    boolean status = false;
+    HdfsFileStatus resultingStat = null;
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
+          " to " + dst);
+    }
+    writeLock();
+    try {
+      status = renameToInternal(src, dst);
+      if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
+        resultingStat = dir.getFileInfo(dst, false);
+      }
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
     if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
-      final HdfsFileStatus stat = dir.getFileInfo(dst, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
-                    "rename", src, dst, stat);
+                    "rename", src, dst, resultingStat);
     }
     return status;
   }
@@ -2057,19 +2125,13 @@
   @Deprecated
   private boolean renameToInternal(String src, String dst)
     throws IOException, UnresolvedLinkException {
-
-    writeLock();
-    try {
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
-          " to " + dst);
-    }
-    if (isInSafeMode())
+    assert hasWriteLock();
+    if (isInSafeMode()) {
       throw new SafeModeException("Cannot rename " + src, safeMode);
+    }
     if (!DFSUtil.isValidName(dst)) {
       throw new IOException("Invalid name: " + dst);
     }
-
     if (isPermissionEnabled) {
       //We should not be doing this.  This is move() not renameTo().
       //but for now,
@@ -2081,40 +2143,44 @@
 
     HdfsFileStatus dinfo = dir.getFileInfo(dst, false);
     if (dir.renameTo(src, dst)) {
-      changeLease(src, dst, dinfo);     // update lease with new filename
+      unprotectedChangeLease(src, dst, dinfo);     // update lease with new filename
       return true;
     }
     return false;
-    } finally {
-      writeUnlock();
-    }
   }
   
 
   /** Rename src to dst */
   void renameTo(String src, String dst, Options.Rename... options)
       throws IOException, UnresolvedLinkException {
-    renameToInternal(src, dst, options);
+    HdfsFileStatus resultingStat = null;
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
+          + src + " to " + dst);
+    }
+    writeLock();
+    try {
+      renameToInternal(src, dst, options);
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        resultingStat = dir.getFileInfo(dst, false); 
+      }
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       StringBuilder cmd = new StringBuilder("rename options=");
       for (Rename option : options) {
         cmd.append(option.value()).append(" ");
       }
-      final HdfsFileStatus stat = dir.getFileInfo(dst, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(), Server.getRemoteIp(),
-                    cmd.toString(), src, dst, stat);
+                    cmd.toString(), src, dst, resultingStat);
     }
   }
 
   private void renameToInternal(String src, String dst,
       Options.Rename... options) throws IOException {
-    writeLock();
-    try {
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
-          + src + " to " + dst);
-    }
+    assert hasWriteLock();
     if (isInSafeMode()) {
       throw new SafeModeException("Cannot rename " + src, safeMode);
     }
@@ -2128,10 +2194,7 @@
 
     HdfsFileStatus dinfo = dir.getFileInfo(dst, false);
     dir.renameTo(src, dst, options);
-    changeLease(src, dst, dinfo); // update lease with new filename
-    } finally {
-      writeUnlock();
-    }
+    unprotectedChangeLease(src, dst, dinfo); // update lease with new filename
   }
   
   /**
@@ -2141,15 +2204,12 @@
    * description of exceptions
    */
     public boolean delete(String src, boolean recursive)
-      throws AccessControlException, SafeModeException,
-      UnresolvedLinkException, IOException {
-      if ((!recursive) && (!dir.isDirEmpty(src))) {
-        throw new IOException(src + " is non empty");
-      }
+        throws AccessControlException, SafeModeException,
+               UnresolvedLinkException, IOException {
       if (NameNode.stateChangeLog.isDebugEnabled()) {
         NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
       }
-      boolean status = deleteInternal(src, true);
+      boolean status = deleteInternal(src, recursive, true);
       if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
         logAuditEvent(UserGroupInformation.getCurrentUser(),
                       Server.getRemoteIp(),
@@ -2169,9 +2229,10 @@
    * 
    * @see ClientProtocol#delete(String, boolean) for description of exceptions
    */
-  private boolean deleteInternal(String src, boolean enforcePermission)
-      throws AccessControlException, SafeModeException,
-      UnresolvedLinkException, IOException{
+  private boolean deleteInternal(String src, boolean recursive,
+      boolean enforcePermission)
+      throws AccessControlException, SafeModeException, UnresolvedLinkException,
+             IOException {
     boolean deleteNow = false;
     ArrayList<Block> collectedBlocks = new ArrayList<Block>();
 
@@ -2180,6 +2241,9 @@
       if (isInSafeMode()) {
         throw new SafeModeException("Cannot delete " + src, safeMode);
       }
+      if (!recursive && !dir.isDirEmpty(src)) {
+        throw new IOException(src + " is non empty");
+      }
       if (enforcePermission && isPermissionEnabled) {
         checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
       }
@@ -2194,10 +2258,16 @@
     } finally {
       writeUnlock();
     }
-    // Log directory deletion to editlog
+
     getEditLog().logSync();
-    if (!deleteNow) {
-      removeBlocks(collectedBlocks); // Incremental deletion of blocks
+
+    writeLock();
+    try {
+      if (!deleteNow) {
+        removeBlocks(collectedBlocks); // Incremental deletion of blocks
+      }
+    } finally {
+      writeUnlock();
     }
     collectedBlocks.clear();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2209,24 +2279,21 @@
 
   /** From the given list, incrementally remove the blocks from blockManager */
   private void removeBlocks(List<Block> blocks) {
+    assert hasWriteLock();
     int start = 0;
     int end = 0;
     while (start < blocks.size()) {
       end = BLOCK_DELETION_INCREMENT + start;
       end = end > blocks.size() ? blocks.size() : end;
-      writeLock();
-      try {
-        for (int i=start; i<end; i++) {
-          blockManager.removeBlock(blocks.get(i));
-        }
-      } finally {
-        writeUnlock();
+      for (int i=start; i<end; i++) {
+        blockManager.removeBlock(blocks.get(i));
       }
       start = end;
     }
   }
   
   void removePathAndBlocks(String src, List<Block> blocks) {
+    assert hasWriteLock();
     leaseManager.removeLeaseWithPrefixPath(src);
     if (blocks == null) {
       return;
@@ -2249,13 +2316,18 @@
    */
   HdfsFileStatus getFileInfo(String src, boolean resolveLink) 
     throws AccessControlException, UnresolvedLinkException {
-    if (!DFSUtil.isValidName(src)) {
-      throw new InvalidPathException("Invalid file name: " + src);
+    readLock();
+    try {
+      if (!DFSUtil.isValidName(src)) {
+        throw new InvalidPathException("Invalid file name: " + src);
+      }
+      if (isPermissionEnabled) {
+        checkTraverse(src);
+      }
+      return dir.getFileInfo(src, resolveLink);
+    } finally {
+      readUnlock();
     }
-    if (isPermissionEnabled) {
-      checkTraverse(src);
-    }
-    return dir.getFileInfo(src, resolveLink);
   }
 
   /**
@@ -2263,7 +2335,16 @@
    */
   public boolean mkdirs(String src, PermissionStatus permissions,
       boolean createParent) throws IOException, UnresolvedLinkException {
-    boolean status = mkdirsInternal(src, permissions, createParent);
+    boolean status = false;
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
+    }
+    writeLock();
+    try {
+      status = mkdirsInternal(src, permissions, createParent);
+    } finally {
+      writeUnlock();
+    }
     getEditLog().logSync();
     if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
@@ -2280,10 +2361,9 @@
   private boolean mkdirsInternal(String src,
       PermissionStatus permissions, boolean createParent) 
       throws IOException, UnresolvedLinkException {
-    writeLock();
-    try {
-    if(NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
+    assert hasWriteLock();
+    if (isInSafeMode()) {
+      throw new SafeModeException("Cannot create directory " + src, safeMode);
     }
     if (isPermissionEnabled) {
       checkTraverse(src);
@@ -2293,15 +2373,12 @@
       // a new directory is not created.
       return true;
     }
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot create directory " + src, safeMode);
     if (!DFSUtil.isValidName(src)) {
       throw new InvalidPathException(src);
     }
     if (isPermissionEnabled) {
       checkAncestorAccess(src, FsAction.WRITE);
     }
-
     if (!createParent) {
       verifyParentDir(src);
     }
@@ -2315,17 +2392,19 @@
       throw new IOException("Failed to create directory: " + src);
     }
     return true;
-    } finally {
-      writeUnlock();
-    }
   }
 
   ContentSummary getContentSummary(String src) throws AccessControlException,
       FileNotFoundException, UnresolvedLinkException {
-    if (isPermissionEnabled) {
-      checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
+    readLock();
+    try {
+      if (isPermissionEnabled) {
+        checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
+      }
+      return dir.getContentSummary(src);
+    } finally {
+      readUnlock();
     }
-    return dir.getContentSummary(src);
   }
 
   /**
@@ -2335,12 +2414,18 @@
    */
   void setQuota(String path, long nsQuota, long dsQuota) 
       throws IOException, UnresolvedLinkException {
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot set quota on " + path, safeMode);
-    if (isPermissionEnabled) {
-      checkSuperuserPrivilege();
+    writeLock();
+    try {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot set quota on " + path, safeMode);
+      }
+      if (isPermissionEnabled) {
+        checkSuperuserPrivilege();
+      }
+      dir.setQuota(path, nsQuota, dsQuota);
+    } finally {
+      writeUnlock();
     }
-    dir.setQuota(path, nsQuota, dsQuota);
     getEditLog().logSync();
   }
   
@@ -2351,7 +2436,6 @@
    */
   void fsync(String src, String clientName) 
       throws IOException, UnresolvedLinkException {
-
     NameNode.stateChangeLog.info("BLOCK* NameSystem.fsync: file "
                                   + src + " for " + clientName);
     writeLock();
@@ -2364,6 +2448,7 @@
     } finally {
       writeUnlock();
     }
+    getEditLog().logSync();
   }
 
   /**
@@ -2383,9 +2468,8 @@
       String recoveryLeaseHolder) throws AlreadyBeingCreatedException, 
       IOException, UnresolvedLinkException {
     LOG.info("Recovering lease=" + lease + ", src=" + src);
-    
     assert !isInSafeMode();
-
+    assert hasWriteLock();
     INodeFile iFile = dir.getFileINode(src);
     if (iFile == null) {
       final String message = "DIR* NameSystem.internalReleaseLease: "
@@ -2507,6 +2591,7 @@
 
   Lease reassignLease(Lease lease, String src, String newHolder,
       INodeFileUnderConstruction pendingFile) throws IOException {
+    assert hasWriteLock();
     if(newHolder == null)
       return lease;
     logReassignLease(lease.getHolder(), src, newHolder);
@@ -2515,6 +2600,7 @@
   
   Lease reassignLeaseInternal(Lease lease, String src, String newHolder,
       INodeFileUnderConstruction pendingFile) throws IOException {
+    assert hasWriteLock();
     pendingFile.setClientName(newHolder);
     return leaseManager.reassignLease(lease, src, newHolder);
   }
@@ -2523,7 +2609,7 @@
   private void finalizeINodeFileUnderConstruction(String src, 
       INodeFileUnderConstruction pendingFile) 
       throws IOException, UnresolvedLinkException {
-      
+    assert hasWriteLock();
     leaseManager.removeLease(pendingFile.getClientName(), src);
 
     // The file is no longer pending.
@@ -2544,92 +2630,96 @@
     String src = "";
     writeLock();
     try {
-    LOG.info("commitBlockSynchronization(lastblock=" + lastblock
-          + ", newgenerationstamp=" + newgenerationstamp
-          + ", newlength=" + newlength
-          + ", newtargets=" + Arrays.asList(newtargets)
-          + ", closeFile=" + closeFile
-          + ", deleteBlock=" + deleteblock
-          + ")");
-    final BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock
+      if (isInSafeMode()) {
+        throw new SafeModeException(
+          "Cannot commitBlockSynchronization while in safe mode",
+          safeMode);
+      }
+      LOG.info("commitBlockSynchronization(lastblock=" + lastblock
+               + ", newgenerationstamp=" + newgenerationstamp
+               + ", newlength=" + newlength
+               + ", newtargets=" + Arrays.asList(newtargets)
+               + ", closeFile=" + closeFile
+               + ", deleteBlock=" + deleteblock
+               + ")");
+      final BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock
         .getLocalBlock(lastblock));
-    if (storedBlock == null) {
-      throw new IOException("Block (=" + lastblock + ") not found");
-    }
-    INodeFile iFile = storedBlock.getINode();
-    if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
-      throw new IOException("Unexpected block (=" + lastblock
-          + ") since the file (=" + iFile.getLocalName()
-          + ") is not under construction");
-    }
-
-    long recoveryId =
-      ((BlockInfoUnderConstruction)storedBlock).getBlockRecoveryId();
-    if(recoveryId != newgenerationstamp) {
-      throw new IOException("The recovery id " + newgenerationstamp
-          + " does not match current recovery id "
-          + recoveryId + " for block " + lastblock); 
-    }
-        
-    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
-
-    if (deleteblock) {
-      pendingFile.removeLastBlock(ExtendedBlock.getLocalBlock(lastblock));
-      blockManager.removeBlockFromMap(storedBlock);
-    }
-    else {
-      // update last block
-      storedBlock.setGenerationStamp(newgenerationstamp);
-      storedBlock.setNumBytes(newlength);
-
-      // find the DatanodeDescriptor objects
-      // There should be no locations in the blockManager till now because the
-      // file is underConstruction
-      DatanodeDescriptor[] descriptors = null;
-      if (newtargets.length > 0) {
-        descriptors = new DatanodeDescriptor[newtargets.length];
-        for(int i = 0; i < newtargets.length; i++) {
-          descriptors[i] = getDatanode(newtargets[i]);
-        }
+      if (storedBlock == null) {
+        throw new IOException("Block (=" + lastblock + ") not found");
       }
+      INodeFile iFile = storedBlock.getINode();
+      if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
+        throw new IOException("Unexpected block (=" + lastblock
+                              + ") since the file (=" + iFile.getLocalName()
+                              + ") is not under construction");
+      }
+
+      long recoveryId =
+        ((BlockInfoUnderConstruction)storedBlock).getBlockRecoveryId();
+      if(recoveryId != newgenerationstamp) {
+        throw new IOException("The recovery id " + newgenerationstamp
+                              + " does not match current recovery id "
+                              + recoveryId + " for block " + lastblock); 
+      }
+
+      INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
+
+      if (deleteblock) {
+        pendingFile.removeLastBlock(ExtendedBlock.getLocalBlock(lastblock));
+        blockManager.removeBlockFromMap(storedBlock);
+      }
+      else {
+        // update last block
+        storedBlock.setGenerationStamp(newgenerationstamp);
+        storedBlock.setNumBytes(newlength);
+
+        // find the DatanodeDescriptor objects
+        // There should be no locations in the blockManager till now because the
+        // file is underConstruction
+        DatanodeDescriptor[] descriptors = null;
+        if (newtargets.length > 0) {
+          descriptors = new DatanodeDescriptor[newtargets.length];
+          for(int i = 0; i < newtargets.length; i++) {
+            descriptors[i] = getDatanode(newtargets[i]);
+          }
+        }
+        if (closeFile) {
+          // the file is getting closed. Insert block locations into blockManager.
+          // Otherwise fsck will report these blocks as MISSING, especially if the
+          // blocksReceived from Datanodes take a long time to arrive.
+          for (int i = 0; i < descriptors.length; i++) {
+            descriptors[i].addBlock(storedBlock);
+          }
+        }
+        // add pipeline locations into the INodeUnderConstruction
+        pendingFile.setLastBlock(storedBlock, descriptors);
+      }
+
+      src = leaseManager.findPath(pendingFile);
       if (closeFile) {
-        // the file is getting closed. Insert block locations into blockManager.
-        // Otherwise fsck will report these blocks as MISSING, especially if the
-        // blocksReceived from Datanodes take a long time to arrive.
-        for (int i = 0; i < descriptors.length; i++) {
-          descriptors[i].addBlock(storedBlock);
-        }
-      }
-      // add pipeline locations into the INodeUnderConstruction
-      pendingFile.setLastBlock(storedBlock, descriptors);
-    }
+        // commit the last block and complete it if it has minimum replicas
+        blockManager.commitOrCompleteLastBlock(pendingFile, storedBlock);
 
-    // If this commit does not want to close the file, persist
-    // blocks only if append is supported and return
-    src = leaseManager.findPath(pendingFile);
-    if (!closeFile) {
-      if (supportAppends) {
+        //remove lease, close file
+        finalizeINodeFileUnderConstruction(src, pendingFile);
+      } else if (supportAppends) {
+        // If this commit does not want to close the file, persist
+        // blocks only if append is supported 
         dir.persistBlocks(src, pendingFile);
-        getEditLog().logSync();
       }
-      LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
-      return;
-    }
-
-    // commit the last block and complete it if it has minimum replicas
-    blockManager.commitOrCompleteLastBlock(pendingFile, storedBlock);
-
-    //remove lease, close file
-    finalizeINodeFileUnderConstruction(src, pendingFile);
     } finally {
       writeUnlock();
     }
     getEditLog().logSync();
-    LOG.info("commitBlockSynchronization(newblock=" + lastblock
+    if (closeFile) {
+      LOG.info("commitBlockSynchronization(newblock=" + lastblock
           + ", file=" + src
           + ", newgenerationstamp=" + newgenerationstamp
           + ", newlength=" + newlength
           + ", newtargets=" + Arrays.asList(newtargets) + ") successful");
+    } else {
+      LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
+    }
   }
 
 
@@ -2637,9 +2727,15 @@
    * Renew the lease(s) held by the given client
    */
   void renewLease(String holder) throws IOException {
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
-    leaseManager.renewLease(holder);
+    writeLock();
+    try {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
+      }
+      leaseManager.renewLease(holder);
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
@@ -2657,20 +2753,26 @@
   public DirectoryListing getListing(String src, byte[] startAfter,
       boolean needLocation) 
     throws AccessControlException, UnresolvedLinkException, IOException {
-    if (isPermissionEnabled) {
-      if (dir.isDir(src)) {
-        checkPathAccess(src, FsAction.READ_EXECUTE);
+    DirectoryListing dl;
+    readLock();
+    try {
+      if (isPermissionEnabled) {
+        if (dir.isDir(src)) {
+          checkPathAccess(src, FsAction.READ_EXECUTE);
+        } else {
+          checkTraverse(src);
+        }
       }
-      else {
-        checkTraverse(src);
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        logAuditEvent(UserGroupInformation.getCurrentUser(),
+                      Server.getRemoteIp(),
+                      "listStatus", src, null, null);
       }
+      dl = dir.getListing(src, startAfter, needLocation);
+    } finally {
+      readUnlock();
     }
-    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
-      logAuditEvent(UserGroupInformation.getCurrentUser(),
-                    Server.getRemoteIp(),
-                    "listStatus", src, null, null);
-    }
-    return dir.getListing(src, startAfter, needLocation);
+    return dl;
   }
 
   /////////////////////////////////////////////////////////
@@ -2701,10 +2803,20 @@
    * 
    * @see org.apache.hadoop.hdfs.server.datanode.DataNode
    */
-  public void registerDatanode(DatanodeRegistration nodeReg
-                                            ) throws IOException {
+  public void registerDatanode(DatanodeRegistration nodeReg)
+      throws IOException {
     writeLock();
     try {
+      registerDatanodeInternal(nodeReg);
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  /** @see #registerDatanode(DatanodeRegistration) */
+  public void registerDatanodeInternal(DatanodeRegistration nodeReg)
+      throws IOException {
+    assert hasWriteLock();
     String dnAddress = Server.getRemoteAddress();
     if (dnAddress == null) {
       // Mostly called inside an RPC.
@@ -2821,17 +2933,12 @@
       // because its is done when the descriptor is created
     }
 
-    if (safeMode != null) {
-      safeMode.checkMode();
-    }
-    return;
-    } finally {
-      writeUnlock();
-    }
+    checkSafeMode();
   }
     
   /* Resolve a node's network location */
   private void resolveNetworkLocation (DatanodeDescriptor node) {
+    assert hasWriteLock();
     List<String> names = new ArrayList<String>(1);
     if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
       // get the node's IP address
@@ -2908,7 +3015,23 @@
   DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
       long capacity, long dfsUsed, long remaining, long blockPoolUsed,
       int xceiverCount, int xmitsInProgress, int failedVolumes) 
-      throws IOException {
+        throws IOException {
+    readLock();
+    try {
+      return handleHeartbeatInternal(nodeReg, capacity, dfsUsed, 
+          remaining, blockPoolUsed, xceiverCount, xmitsInProgress, 
+          failedVolumes);
+    } finally {
+      readUnlock();
+    }
+  }
+
+  /** @see #handleHeartbeat(DatanodeRegistration, long, long, long, long, int, int, int) */
+  DatanodeCommand[] handleHeartbeatInternal(DatanodeRegistration nodeReg,
+      long capacity, long dfsUsed, long remaining, long blockPoolUsed,
+      int xceiverCount, int xmitsInProgress, int failedVolumes) 
+        throws IOException {
+    assert hasReadLock();
     DatanodeCommand cmd = null;
     synchronized (heartbeats) {
       synchronized (datanodeMap) {
@@ -3149,10 +3272,14 @@
     int workFound = 0;
     int blocksToProcess = 0;
     int nodesToProcess = 0;
-    // blocks should not be replicated or removed if safe mode is on
+    // Blocks should not be replicated or removed if in safe mode.
+    // It's OK to check safe mode here w/o holding lock, in the worst
+    // case extra replications will be scheduled, and these will get
+    // fixed up later.
     if (isInSafeMode())
       return workFound;
-    synchronized(heartbeats) {
+
+    synchronized (heartbeats) {
       blocksToProcess = (int)(heartbeats.size() 
           * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
       nodesToProcess = (int)Math.ceil((double)heartbeats.size() 
@@ -3186,13 +3313,13 @@
     throws IOException {
     writeLock();
     try {
-    DatanodeDescriptor nodeInfo = getDatanode(nodeID);
-    if (nodeInfo != null) {
-      removeDatanode(nodeInfo);
-    } else {
-      NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
-                                   + nodeID.getName() + " does not exist");
-    }
+      DatanodeDescriptor nodeInfo = getDatanode(nodeID);
+      if (nodeInfo != null) {
+        removeDatanode(nodeInfo);
+      } else {
+        NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
+                                     + nodeID.getName() + " does not exist");
+      }
     } finally {
       writeUnlock();
     }
@@ -3203,6 +3330,7 @@
    * @param nodeInfo datanode descriptor.
    */
   private void removeDatanode(DatanodeDescriptor nodeInfo) {
+    assert hasWriteLock();
     synchronized (heartbeats) {
       if (nodeInfo.isAlive) {
         updateStats(nodeInfo, false);
@@ -3218,12 +3346,11 @@
     unprotectedRemoveDatanode(nodeInfo);
     clusterMap.remove(nodeInfo);
 
-    if (safeMode != null) {
-      safeMode.checkMode();
-    }
+    checkSafeMode();
   }
 
   void unprotectedRemoveDatanode(DatanodeDescriptor nodeDescr) {
+    assert hasWriteLock();
     nodeDescr.resetBlocks();
     blockManager.removeFromInvalidates(nodeDescr.getStorageID());
     if(NameNode.stateChangeLog.isDebugEnabled()) {
@@ -3234,12 +3361,14 @@
   }
     
   void unprotectedAddDatanode(DatanodeDescriptor nodeDescr) {
-    /* To keep host2DataNodeMap consistent with datanodeMap,
-       remove  from host2DataNodeMap the datanodeDescriptor removed
-       from datanodeMap before adding nodeDescr to host2DataNodeMap.
-    */
-    host2DataNodeMap.remove(
+    assert hasWriteLock();
+    // To keep host2DataNodeMap consistent with datanodeMap,
+    // remove  from host2DataNodeMap the datanodeDescriptor removed
+    // from datanodeMap before adding nodeDescr to host2DataNodeMap.
+    synchronized (datanodeMap) {
+      host2DataNodeMap.remove(
                             datanodeMap.put(nodeDescr.getStorageID(), nodeDescr));
+    }
     host2DataNodeMap.add(nodeDescr);
       
     if(NameNode.stateChangeLog.isDebugEnabled()) {
@@ -3256,8 +3385,11 @@
    * @throws IOException
    */
   void wipeDatanode(DatanodeID nodeID) throws IOException {
+    assert hasWriteLock();
     String key = nodeID.getStorageID();
-    host2DataNodeMap.remove(datanodeMap.remove(key));
+    synchronized (datanodeMap) {
+      host2DataNodeMap.remove(datanodeMap.remove(key));
+    }
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug(
           "BLOCK* NameSystem.wipeDatanode: "
@@ -3282,8 +3414,9 @@
    * effect causes more datanodes to be declared dead.
    */
   void heartbeatCheck() {
+    // It's OK to check safe mode w/o taking the lock here, we re-check
+    // for safe mode after taking the lock before removing a datanode.
     if (isInSafeMode()) {
-      // not to check dead nodes if in safemode
       return;
     }
     boolean allAlive = false;
@@ -3308,6 +3441,9 @@
       // acquire the fsnamesystem lock, and then remove the dead node.
       if (foundDead) {
         writeLock();
+        if (isInSafeMode()) {
+          return;
+        }
         try {
           synchronized(heartbeats) {
             synchronized (datanodeMap) {
@@ -3343,21 +3479,21 @@
     writeLock();
     startTime = now(); //after acquiring write lock
     try {
-    DatanodeDescriptor node = getDatanode(nodeID);
-    if (node == null || !node.isAlive) {
-      throw new IOException("ProcessReport from dead or unregistered node: "
-                            + nodeID.getName());
-    }
-    // To minimize startup time, we discard any second (or later) block reports
-    // that we receive while still in startup phase.
-    if (isInStartupSafeMode() && node.numBlocks() > 0) {
-      NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: "
-          + "discarded non-initial block report from " + nodeID.getName()
-          + " because namenode still in startup phase");
-      return;
-    }
-
-    blockManager.processReport(node, newReport);
+      DatanodeDescriptor node = getDatanode(nodeID);
+      if (node == null || !node.isAlive) {
+        throw new IOException("ProcessReport from dead or unregistered node: "
+                              + nodeID.getName());
+      }
+      // To minimize startup time, we discard any second (or later) block reports
+      // that we receive while still in startup phase.
+      if (isInStartupSafeMode() && node.numBlocks() > 0) {
+        NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: "
+            + "discarded non-initial block report from " + nodeID.getName()
+            + " because namenode still in startup phase");
+        return;
+      }
+  
+      blockManager.processReport(node, newReport);
     } finally {
       endTime = now();
       writeUnlock();
@@ -3389,6 +3525,7 @@
                               DatanodeDescriptor addedNode,
                               DatanodeDescriptor delNodeHint,
                               BlockPlacementPolicy replicator) {
+    assert hasWriteLock();
     // first form a rack to datanodes map and
     INodeFile inode = blockManager.getINode(b);
     HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
@@ -3482,20 +3619,20 @@
                                          ) throws IOException {
     writeLock();
     try {
-    DatanodeDescriptor node = getDatanode(nodeID);
-    if (node == null || !node.isAlive) {
-      NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block
-          + " is received from dead or unregistered node " + nodeID.getName());
-      throw new IOException(
-          "Got blockReceived message from unregistered or dead node " + block);
-    }
-        
-    if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
-                                    +block+" is received from " + nodeID.getName());
-    }
-
-    blockManager.addBlock(node, block, delHint);
+      DatanodeDescriptor node = getDatanode(nodeID);
+      if (node == null || !node.isAlive) {
+        NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block
+            + " is received from dead or unregistered node " + nodeID.getName());
+        throw new IOException(
+            "Got blockReceived message from unregistered or dead node " + block);
+      }
+          
+      if (NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
+                                      +block+" is received from " + nodeID.getName());
+      }
+  
+      blockManager.addBlock(node, block, delHint);
     } finally {
       writeUnlock();
     }
@@ -3612,75 +3749,74 @@
   }
 
   private ArrayList<DatanodeDescriptor> getDatanodeListForReport(
-                                                      DatanodeReportType type) {
-    boolean listLiveNodes = type == DatanodeReportType.ALL ||
-                            type == DatanodeReportType.LIVE;
-    boolean listDeadNodes = type == DatanodeReportType.ALL ||
-                            type == DatanodeReportType.DEAD;
-
-    HashMap<String, String> mustList = new HashMap<String, String>();
-
+      DatanodeReportType type) {
     readLock();
-    try {
-    if (listDeadNodes) {
-      //first load all the nodes listed in include and exclude files.
-      for (Iterator<String> it = hostsReader.getHosts().iterator(); 
-           it.hasNext();) {
-        mustList.put(it.next(), "");
+    try {    
+      boolean listLiveNodes = type == DatanodeReportType.ALL ||
+                              type == DatanodeReportType.LIVE;
+      boolean listDeadNodes = type == DatanodeReportType.ALL ||
+                              type == DatanodeReportType.DEAD;
+  
+      HashMap<String, String> mustList = new HashMap<String, String>();
+  
+      if (listDeadNodes) {
+        //first load all the nodes listed in include and exclude files.
+        Iterator<String> it = hostsReader.getHosts().iterator();
+        while (it.hasNext()) {
+          mustList.put(it.next(), "");
+        }
+        it = hostsReader.getExcludedHosts().iterator(); 
+        while (it.hasNext()) {
+          mustList.put(it.next(), "");
+        }
       }
-      for (Iterator<String> it = hostsReader.getExcludedHosts().iterator(); 
-           it.hasNext();) {
-        mustList.put(it.next(), "");
-      }
-    }
-   
-    ArrayList<DatanodeDescriptor> nodes = null;
-    
-    synchronized (datanodeMap) {
-      nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() + 
-                                                mustList.size());
+
+      ArrayList<DatanodeDescriptor> nodes = null;
       
-      for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); 
-                                                               it.hasNext();) {
-        DatanodeDescriptor dn = it.next();
-        boolean isDead = isDatanodeDead(dn);
-        if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
+      synchronized (datanodeMap) {
+        nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() + 
+                                                  mustList.size());
+        Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
+        while (it.hasNext()) { 
+          DatanodeDescriptor dn = it.next();
+          boolean isDead = isDatanodeDead(dn);
+          if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
+            nodes.add(dn);
+          }
+          //Remove any form of the this datanode in include/exclude lists.
+          mustList.remove(dn.getName());
+          mustList.remove(dn.getHost());
+          mustList.remove(dn.getHostName());
+        }
+      }
+      
+      if (listDeadNodes) {
+        Iterator<String> it = mustList.keySet().iterator();
+        while (it.hasNext()) {
+          DatanodeDescriptor dn = 
+              new DatanodeDescriptor(new DatanodeID(it.next()));
+          dn.setLastUpdate(0);
           nodes.add(dn);
         }
-        //Remove any form of the this datanode in include/exclude lists.
-        mustList.remove(dn.getName());
-        mustList.remove(dn.getHost());
-        mustList.remove(dn.getHostName());
       }
-    }
-    
-    if (listDeadNodes) {
-      for (Iterator<String> it = mustList.keySet().iterator(); it.hasNext();) {
-        DatanodeDescriptor dn = 
-            new DatanodeDescriptor(new DatanodeID(it.next()));
-        dn.setLastUpdate(0);
-        nodes.add(dn);
-      }
-    }
-    
-    return nodes;
+      return nodes;
     } finally {
       readUnlock();
     }
   }
 
-  public DatanodeInfo[] datanodeReport( DatanodeReportType type
-      ) throws AccessControlException {
+  public DatanodeInfo[] datanodeReport( DatanodeReportType type)
+      throws AccessControlException {
     readLock();
     try {
-    checkSuperuserPrivilege();
-
-    ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
-    DatanodeInfo[] arr = new DatanodeInfo[results.size()];
-    for (int i=0; i<arr.length; i++) {
-      arr[i] = new DatanodeInfo(results.get(i));
-    }
-    return arr;
+      checkSuperuserPrivilege();
+  
+      ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
+      DatanodeInfo[] arr = new DatanodeInfo[results.size()];
+      for (int i=0; i<arr.length; i++) {
+        arr[i] = new DatanodeInfo(results.get(i));
+      }
+      return arr;
     } finally {
       readUnlock();
     }
@@ -3695,17 +3831,17 @@
    * @throws IOException if 
    */
   void saveNamespace() throws AccessControlException, IOException {
-    writeLock();
+    readLock();
     try {
-    checkSuperuserPrivilege();
-    if(!isInSafeMode()) {
-      throw new IOException("Safe mode should be turned ON " +
-                            "in order to create namespace image.");
-    }
-    getFSImage().saveNamespace(true);
-    LOG.info("New namespace image has been created.");
+      checkSuperuserPrivilege();
+      if (!isInSafeMode()) {
+        throw new IOException("Safe mode should be turned ON " +
+                              "in order to create namespace image.");
+      }
+      getFSImage().saveNamespace(true);
+      LOG.info("New namespace image has been created.");
     } finally {
-      writeUnlock();
+      readUnlock();
     }
   }
   
@@ -3718,16 +3854,16 @@
   boolean restoreFailedStorage(String arg) throws AccessControlException {
     writeLock();
     try {
-    checkSuperuserPrivilege();
-    
-    // if it is disabled - enable it and vice versa.
-    if(arg.equals("check"))
-      return getFSImage().getStorage().getRestoreFailedStorage();
-    
-    boolean val = arg.equals("true");  // false if not
-    getFSImage().getStorage().setRestoreFailedStorage(val);
-    
-    return val;
+      checkSuperuserPrivilege();
+      
+      // if it is disabled - enable it and vice versa.
+      if(arg.equals("check"))
+        return getFSImage().getStorage().getRestoreFailedStorage();
+      
+      boolean val = arg.equals("true");  // false if not
+      getFSImage().getStorage().setRestoreFailedStorage(val);
+      
+      return val;
     } finally {
       writeUnlock();
     }
@@ -3739,15 +3875,15 @@
                                           ArrayList<DatanodeDescriptor> dead) {
     readLock();
     try {
-    ArrayList<DatanodeDescriptor> results = 
-                            getDatanodeListForReport(DatanodeReportType.ALL);    
-    for(Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
-      DatanodeDescriptor node = it.next();
-      if (isDatanodeDead(node))
-        dead.add(node);
-      else
-        live.add(node);
-    }
+      ArrayList<DatanodeDescriptor> results = 
+                              getDatanodeListForReport(DatanodeReportType.ALL);    
+      for(Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
+        DatanodeDescriptor node = it.next();
+        if (isDatanodeDead(node))
+          dead.add(node);
+        else
+          live.add(node);
+      }
     } finally {
       readUnlock();
     }
@@ -3759,13 +3895,13 @@
   private void datanodeDump(PrintWriter out) {
     readLock();
     try {
-    synchronized (datanodeMap) {
-      out.println("Metasave: Number of datanodes: " + datanodeMap.size());
-      for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();) {
-        DatanodeDescriptor node = it.next();
-        out.println(node.dumpDatanode());
+      synchronized (datanodeMap) {
+        out.println("Metasave: Number of datanodes: " + datanodeMap.size());
+        for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();) {
+          DatanodeDescriptor node = it.next();
+          out.println(node.dumpDatanode());
+        }
       }
-    }
     } finally {
       readUnlock();
     }
@@ -3774,9 +3910,9 @@
   /**
    * Start decommissioning the specified datanode. 
    */
-  private void startDecommission (DatanodeDescriptor node) 
+  private void startDecommission(DatanodeDescriptor node) 
     throws IOException {
-
+    assert hasWriteLock();
     if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
       LOG.info("Start Decommissioning node " + node.getName() + " with " + 
           node.numBlocks() +  " blocks.");
@@ -3795,8 +3931,9 @@
   /**
    * Stop decommissioning the specified datanodes.
    */
-  public void stopDecommission (DatanodeDescriptor node) 
+  public void stopDecommission(DatanodeDescriptor node) 
     throws IOException {
+    assert hasWriteLock();
     if (node.isDecommissionInProgress() || node.isDecommissioned()) {
       LOG.info("Stop Decommissioning node " + node.getName());
       synchronized (heartbeats) {
@@ -3807,12 +3944,6 @@
     }
   }
 
-  /** 
-   */
-  public DatanodeInfo getDataNodeInfo(String name) {
-    return datanodeMap.get(name);
-  }
-
   public Date getStartTime() {
     return new Date(systemStart); 
   }
@@ -3881,6 +4012,7 @@
    * decommission completed. Return true if decommission is complete.
    */
   boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
+    assert hasWriteLock();
     //
     // Check to see if all blocks in this decommissioned
     // node has reached their target replication factor.
@@ -3965,7 +4097,7 @@
    * it will be disallowed from registering. 
    */
   private boolean verifyNodeRegistration(DatanodeID nodeReg, String ipAddr) {
-    assert (hasWriteLock());
+    assert hasWriteLock();
     return inHostsList(nodeReg, ipAddr);
   }
     
@@ -3974,6 +4106,7 @@
    */
   private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr) 
     throws IOException {
+    assert hasWriteLock();
     // If the registered node is in exclude list, then decommission it
     if (inExcludedHostsList(nodeReg, ipAddr)) {
       startDecommission(nodeReg);
@@ -3988,6 +4121,7 @@
    * @throws IOException
    */
   public DatanodeDescriptor getDatanode(DatanodeID nodeID) throws IOException {
+    assert hasReadOrWriteLock();
     UnregisteredNodeException e = null;
     DatanodeDescriptor node = datanodeMap.get(nodeID.getStorageID());
     if (node == null) 
@@ -4478,12 +4612,22 @@
     }
     return isInSafeMode();
   }
+  
+  private void checkSafeMode() {
+    // safeMode is volatile, and may be set to null at any time
+    SafeModeInfo safeMode = this.safeMode;
+    if (safeMode != null) {
+      safeMode.checkMode();
+    }
+  }
 
   /**
    * Check whether the name node is in safe mode.
    * @return true if safe mode is ON, false otherwise
    */
-  synchronized boolean isInSafeMode() {
+  boolean isInSafeMode() {
+    // safeMode is volatile, and may be set to null at any time
+    SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null)
       return false;
     return safeMode.isOn();
@@ -4492,18 +4636,23 @@
   /**
    * Check whether the name node is in startup mode.
    */
-  synchronized boolean isInStartupSafeMode() {
+  boolean isInStartupSafeMode() {
+    // safeMode is volatile, and may be set to null at any time
+    SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null)
       return false;
-    return safeMode.isOn() && !safeMode.isManual();
+    return !safeMode.isManual() && safeMode.isOn();
   }
 
   /**
    * Check whether replication queues are populated.
    */
-  synchronized boolean isPopulatingReplQueues() {
-    return (!isInSafeMode() ||
-            safeMode.isPopulatingReplQueues());
+  boolean isPopulatingReplQueues() {
+    // safeMode is volatile, and may be set to null at any time
+    SafeModeInfo safeMode = this.safeMode;
+    if (safeMode == null)
+      return true;
+    return safeMode.isPopulatingReplQueues();
   }
     
   /**
@@ -4511,6 +4660,8 @@
    * @param replication current replication 
    */
   void incrementSafeBlockCount(int replication) {
+    // safeMode is volatile, and may be set to null at any time
+    SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null)
       return;
     safeMode.incrementSafeBlockCount((short)replication);
@@ -4520,6 +4671,8 @@
    * Decrement number of blocks that reached minimal replication.
    */
   void decrementSafeBlockCount(Block b) {
+    // safeMode is volatile, and may be set to null at any time
+    SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null) // mostly true
       return;
     safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas());
@@ -4529,6 +4682,8 @@
    * Set the total number of blocks in the system. 
    */
   void setBlockTotal() {
+    // safeMode is volatile, and may be set to null at any time
+    SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null)
       return;
     safeMode.setBlockTotal((int)getCompleteBlocksTotal());
@@ -4550,29 +4705,34 @@
   long getCompleteBlocksTotal() {
     // Calculate number of blocks under construction
     long numUCBlocks = 0;
-    for (Lease lease : leaseManager.getSortedLeases()) {
-      for (String path : lease.getPaths()) {
-        INode node; 
-        try {
-          node = dir.getFileINode(path);
-        } catch (UnresolvedLinkException e) {
-          throw new AssertionError("Lease files should reside on this FS");
-        }
-        assert node != null : "Found a lease for nonexisting file.";
-        assert node.isUnderConstruction() :
-          "Found a lease for file that is not under construction.";
-        INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
-        BlockInfo[] blocks = cons.getBlocks();
-        if(blocks == null)
-          continue;
-        for(BlockInfo b : blocks) {
-          if(!b.isComplete())
-            numUCBlocks++;
+    readLock();
+    try {
+      for (Lease lease : leaseManager.getSortedLeases()) {
+        for (String path : lease.getPaths()) {
+          INode node;
+          try {
+            node = dir.getFileINode(path);
+          } catch (UnresolvedLinkException e) {
+            throw new AssertionError("Lease files should reside on this FS");
+          }
+          assert node != null : "Found a lease for nonexisting file.";
+          assert node.isUnderConstruction() :
+            "Found a lease for file that is not under construction.";
+          INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
+          BlockInfo[] blocks = cons.getBlocks();
+          if(blocks == null)
+            continue;
+          for(BlockInfo b : blocks) {
+            if(!b.isComplete())
+              numUCBlocks++;
+          }
         }
       }
+      LOG.info("Number of blocks under construction: " + numUCBlocks);
+      return getBlocksTotal() - numUCBlocks;
+    } finally {
+      readUnlock();
     }
-    LOG.info("Number of blocks under construction: " + numUCBlocks);
-    return getBlocksTotal() - numUCBlocks;
   }
 
   /**
@@ -4594,6 +4754,7 @@
       safeMode.setResourcesLow();
     }
     safeMode.setManual();
+    getEditLog().logSyncAll();
     NameNode.stateChangeLog.info("STATE* Safe mode is ON. " 
                                 + safeMode.getTurnOffTip());
     } finally {
@@ -4608,14 +4769,14 @@
   void leaveSafeMode(boolean checkForUpgrades) throws SafeModeException {
     writeLock();
     try {
-    if (!isInSafeMode()) {
-      NameNode.stateChangeLog.info("STATE* Safe mode is already OFF."); 
-      return;
-    }
-    if(getDistributedUpgradeState())
-      throw new SafeModeException("Distributed upgrade is in progress",
-                                  safeMode);
-    safeMode.leave(checkForUpgrades);
+      if (!isInSafeMode()) {
+        NameNode.stateChangeLog.info("STATE* Safe mode is already OFF."); 
+        return;
+      }
+      if(getDistributedUpgradeState())
+        throw new SafeModeException("Distributed upgrade is in progress",
+                                    safeMode);
+      safeMode.leave(checkForUpgrades);
     } finally {
       writeUnlock();
     }
@@ -4624,9 +4785,10 @@
   String getSafeModeTip() {
     readLock();
     try {
-    if (!isInSafeMode())
-      return "";
-    return safeMode.getTurnOffTip();
+      if (!isInSafeMode()) {
+        return "";
+      }
+      return safeMode.getTurnOffTip();
     } finally {
       readUnlock();
     }
@@ -4639,12 +4801,11 @@
   CheckpointSignature rollEditLog() throws IOException {
     writeLock();
     try {
-    if (isInSafeMode()) {
-      throw new SafeModeException("Checkpoint not created",
-                                  safeMode);
-    }
-    LOG.info("Roll Edit Log from " + Server.getRemoteAddress());
-    return getFSImage().rollEditLog();
+      if (isInSafeMode()) {
+        throw new SafeModeException("Log not rolled", safeMode);
+      }
+      LOG.info("Roll Edit Log from " + Server.getRemoteAddress());
+      return getFSImage().rollEditLog();
     } finally {
       writeUnlock();
     }
@@ -4659,12 +4820,11 @@
   void rollFSImage(CheckpointSignature sig) throws IOException {
     writeLock();
     try {
-    if (isInSafeMode()) {
-      throw new SafeModeException("Checkpoint not created",
-                                  safeMode);
-    }
-    LOG.info("Roll FSImage from " + Server.getRemoteAddress());
-    getFSImage().rollFSImage(sig, true);
+      if (isInSafeMode()) {
+        throw new SafeModeException("Image not rolled", safeMode);
+      }
+      LOG.info("Roll FSImage from " + Server.getRemoteAddress());
+      getFSImage().rollFSImage(sig, true);
     } finally {
       writeUnlock();
     }
@@ -4676,10 +4836,13 @@
   throws IOException {
     writeLock();
     try {
-    LOG.info("Start checkpoint for " + bnReg.getAddress());
-    NamenodeCommand cmd = getFSImage().startCheckpoint(bnReg, nnReg);
-    getEditLog().logSync();
-    return cmd;
+      if (isInSafeMode()) {
+        throw new SafeModeException("Checkpoint not started", safeMode);
+      }
+      LOG.info("Start checkpoint for " + bnReg.getAddress());
+      NamenodeCommand cmd = getFSImage().startCheckpoint(bnReg, nnReg);
+      getEditLog().logSync();
+      return cmd;
     } finally {
       writeUnlock();
     }
@@ -4689,8 +4852,11 @@
                             CheckpointSignature sig) throws IOException {
     writeLock();
     try {
-    LOG.info("End checkpoint for " + registration.getAddress());
-    getFSImage().endCheckpoint(sig, registration.getRole());
+      if (isInSafeMode()) {
+        throw new SafeModeException("Checkpoint not ended", safeMode);
+      }
+      LOG.info("End checkpoint for " + registration.getAddress());
+      getFSImage().endCheckpoint(sig, registration.getRole());
     } finally {
       writeUnlock();
     }
@@ -4815,7 +4981,12 @@
   @Override // FSNamesystemMBean
   @Metric
   public long getFilesTotal() {
-    return this.dir.totalInodes();
+    readLock();
+    try {
+      return this.dir.totalInodes();
+    } finally {
+      readUnlock();
+    }
   }
 
   @Override // FSNamesystemMBean
@@ -4949,18 +5120,25 @@
   /**
    * Increments, logs and then returns the stamp
    */
-  long nextGenerationStamp() {
+  private long nextGenerationStamp() throws SafeModeException {
+    assert hasWriteLock();
+    if (isInSafeMode()) {
+      throw new SafeModeException(
+          "Cannot get next generation stamp", safeMode);
+    }
     long gs = generationStamp.nextStamp();
     getEditLog().logGenerationStamp(gs);
+    // NB: callers sync the log
     return gs;
   }
 
   private INodeFileUnderConstruction checkUCBlock(ExtendedBlock block,
       String clientName) throws IOException {
-    // check safe mode
-    if (isInSafeMode())
+    assert hasWriteLock();
+    if (isInSafeMode()) {
       throw new SafeModeException("Cannot get a new generation stamp and an " +
                                 "access token for block " + block, safeMode);
+    }
     
     // check stored block state
     BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock.getLocalBlock(block));
@@ -5001,25 +5179,27 @@
    */
   LocatedBlock updateBlockForPipeline(ExtendedBlock block, 
       String clientName) throws IOException {
+    LocatedBlock locatedBlock;
     writeLock();
     try {
-    // check vadility of parameters
-    checkUCBlock(block, clientName);
-    
-    // get a new generation stamp and an access token
-    block.setGenerationStamp(nextGenerationStamp());
-    LocatedBlock locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
-    if (isBlockTokenEnabled) {
-      locatedBlock.setBlockToken(blockTokenSecretManager.generateToken(
+      // check vadility of parameters
+      checkUCBlock(block, clientName);
+  
+      // get a new generation stamp and an access token
+      block.setGenerationStamp(nextGenerationStamp());
+      locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
+      if (isBlockTokenEnabled) {
+        locatedBlock.setBlockToken(blockTokenSecretManager.generateToken(
           block, EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
-    }
-    return locatedBlock;
+      }
     } finally {
       writeUnlock();
     }
+    // Ensure we record the new generation stamp
+    getEditLog().logSync();
+    return locatedBlock;
   }
   
-  
   /**
    * Update a pipeline for a block under construction
    * 
@@ -5034,15 +5214,32 @@
       throws IOException {
     writeLock();
     try {
-    assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
-    + oldBlock + " has different block identifier";
-    LOG.info("updatePipeline(block=" + oldBlock
-        + ", newGenerationStamp=" + newBlock.getGenerationStamp()
-        + ", newLength=" + newBlock.getNumBytes()
-        + ", newNodes=" + Arrays.asList(newNodes)
-        + ", clientName=" + clientName
-        + ")");
+      if (isInSafeMode()) {
+        throw new SafeModeException("Pipeline not updated", safeMode);
+      }
+      assert newBlock.getBlockId()==oldBlock.getBlockId() : newBlock + " and "
+        + oldBlock + " has different block identifier";
+      LOG.info("updatePipeline(block=" + oldBlock
+               + ", newGenerationStamp=" + newBlock.getGenerationStamp()
+               + ", newLength=" + newBlock.getNumBytes()
+               + ", newNodes=" + Arrays.asList(newNodes)
+               + ", clientName=" + clientName
+               + ")");
+      updatePipelineInternal(clientName, oldBlock, newBlock, newNodes);
+    } finally {
+      writeUnlock();
+    }
+    if (supportAppends) {
+      getEditLog().logSync();
+    }
+    LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
+  }
 
+  /** @see updatePipeline(String, ExtendedBlock, ExtendedBlock, DatanodeID[]) */
+  void updatePipelineInternal(String clientName, ExtendedBlock oldBlock, 
+      ExtendedBlock newBlock, DatanodeID[] newNodes)
+      throws IOException {
+    assert hasWriteLock();
     // check the vadility of the block and lease holder name
     final INodeFileUnderConstruction pendingFile = 
       checkUCBlock(oldBlock, clientName);
@@ -5052,8 +5249,8 @@
     if (newBlock.getGenerationStamp() <= blockinfo.getGenerationStamp() ||
         newBlock.getNumBytes() < blockinfo.getNumBytes()) {
       String msg = "Update " + oldBlock + " (len = " + 
-      blockinfo.getNumBytes() + ") to an older state: " + newBlock + 
-      " (len = " + newBlock.getNumBytes() +")";
+        blockinfo.getNumBytes() + ") to an older state: " + newBlock + 
+        " (len = " + newBlock.getNumBytes() +")";
       LOG.warn(msg);
       throw new IOException(msg);
     }
@@ -5076,21 +5273,15 @@
     String src = leaseManager.findPath(pendingFile);
     if (supportAppends) {
       dir.persistBlocks(src, pendingFile);
-      getEditLog().logSync();
-    }
-    LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
-    return;
-    } finally {
-      writeUnlock();
     }
   }
 
   // rename was successful. If any part of the renamed subtree had
   // files that were being written to, update with new filename.
-  //
-  void changeLease(String src, String dst, HdfsFileStatus dinfo) {
+  void unprotectedChangeLease(String src, String dst, HdfsFileStatus dinfo) {
     String overwrite;
     String replaceBy;
+    assert hasWriteLock();
 
     boolean destinationExisted = true;
     if (dinfo == null) {
@@ -5113,6 +5304,9 @@
    * Serializes leases. 
    */
   void saveFilesUnderConstruction(DataOutputStream out) throws IOException {
+    // This is run by an inferior thread of saveNamespace, which holds a read
+    // lock on our behalf. If we took the read lock here, we could block
+    // for fairness if a writer is waiting on the lock.
     synchronized (leaseManager) {
       out.writeInt(leaseManager.countPath()); // write the size
 
@@ -5201,6 +5395,7 @@
 
   /** Get a datanode descriptor given corresponding storageID */
   DatanodeDescriptor getDatanode(String nodeID) {
+    assert hasReadOrWriteLock();
     return datanodeMap.get(nodeID);
   }
 
@@ -5250,36 +5445,36 @@
 
     readLock();
     try {
-    if (!isPopulatingReplQueues()) {
-      throw new IOException("Cannot run listCorruptFileBlocks because " +
-                            "replication queues have not been initialized.");
-    }
-    checkSuperuserPrivilege();
-    long startBlockId = 0;
-    // print a limited # of corrupt files per call
-    int count = 0;
-    ArrayList<CorruptFileBlockInfo> corruptFiles = new ArrayList<CorruptFileBlockInfo>();
-    
-    if (startBlockAfter != null) {
-      startBlockId = Block.filename2id(startBlockAfter);
-    }
-    BlockIterator blkIterator = blockManager.getCorruptReplicaBlockIterator();
-    while (blkIterator.hasNext()) {
-      Block blk = blkIterator.next();
-      INode inode = blockManager.getINode(blk);
-      if (inode != null && blockManager.countNodes(blk).liveReplicas() == 0) {
-        String src = FSDirectory.getFullPathName(inode);
-        if (((startBlockAfter == null) || (blk.getBlockId() > startBlockId))
-            && (src.startsWith(path))) {
-          corruptFiles.add(new CorruptFileBlockInfo(src, blk));
-          count++;
-          if (count >= DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED)
-            break;
+      if (!isPopulatingReplQueues()) {
+        throw new IOException("Cannot run listCorruptFileBlocks because " +
+                              "replication queues have not been initialized.");
+      }
+      checkSuperuserPrivilege();
+      long startBlockId = 0;
+      // print a limited # of corrupt files per call
+      int count = 0;
+      ArrayList<CorruptFileBlockInfo> corruptFiles = new ArrayList<CorruptFileBlockInfo>();
+      
+      if (startBlockAfter != null) {
+        startBlockId = Block.filename2id(startBlockAfter);
+      }
+      BlockIterator blkIterator = blockManager.getCorruptReplicaBlockIterator();
+      while (blkIterator.hasNext()) {
+        Block blk = blkIterator.next();
+        INode inode = blockManager.getINode(blk);
+        if (inode != null && blockManager.countNodes(blk).liveReplicas() == 0) {
+          String src = FSDirectory.getFullPathName(inode);
+          if (((startBlockAfter == null) || (blk.getBlockId() > startBlockId))
+              && (src.startsWith(path))) {
+            corruptFiles.add(new CorruptFileBlockInfo(src, blk));
+            count++;
+            if (count >= DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED)
+              break;
+          }
         }
       }
-    }
-    LOG.info("list corrupt file blocks returned: " + count);
-    return corruptFiles;
+      LOG.info("list corrupt file blocks returned: " + count);
+      return corruptFiles;
     } finally {
       readUnlock();
     }
@@ -5291,17 +5486,17 @@
   public ArrayList<DatanodeDescriptor> getDecommissioningNodes() {
     readLock();
     try {
-    ArrayList<DatanodeDescriptor> decommissioningNodes = 
+      ArrayList<DatanodeDescriptor> decommissioningNodes = 
         new ArrayList<DatanodeDescriptor>();
-    ArrayList<DatanodeDescriptor> results = 
+      ArrayList<DatanodeDescriptor> results = 
         getDatanodeListForReport(DatanodeReportType.LIVE);
-    for (Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
-      DatanodeDescriptor node = it.next();
-      if (node.isDecommissionInProgress()) {
-        decommissioningNodes.add(node);
+      for (Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
+        DatanodeDescriptor node = it.next();
+        if (node.isDecommissionInProgress()) {
+          decommissioningNodes.add(node);
+        }
       }
-    }
-    return decommissioningNodes;
+      return decommissioningNodes;
     } finally {
       readUnlock();
     }
@@ -5339,32 +5534,38 @@
    */
   public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
       throws IOException {
-    if (isInSafeMode()) {
-      throw new SafeModeException("Cannot issue delegation token", safeMode);
-    }
-    if (!isAllowedDelegationTokenOp()) {
-      throw new IOException(
+    Token<DelegationTokenIdentifier> token;
+    writeLock();
+    try {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot issue delegation token", safeMode);
+      }
+      if (!isAllowedDelegationTokenOp()) {
+        throw new IOException(
           "Delegation Token can be issued only with kerberos or web authentication");
-    }
-    
-    if(dtSecretManager == null || !dtSecretManager.isRunning()) {
-      LOG.warn("trying to get DT with no secret manager running");
-      return null;
-    }
+      }
+      if (dtSecretManager == null || !dtSecretManager.isRunning()) {
+        LOG.warn("trying to get DT with no secret manager running");
+        return null;
+      }
 
-    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    String user = ugi.getUserName();
-    Text owner = new Text(user);
-    Text realUser = null;
-    if (ugi.getRealUser() != null) {
-      realUser = new Text(ugi.getRealUser().getUserName());
-    }
-    DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner,
+      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+      String user = ugi.getUserName();
+      Text owner = new Text(user);
+      Text realUser = null;
+      if (ugi.getRealUser() != null) {
+        realUser = new Text(ugi.getRealUser().getUserName());
+      }
+      DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner,
         renewer, realUser);
-    Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(
+      token = new Token<DelegationTokenIdentifier>(
         dtId, dtSecretManager);
-    long expiryTime = dtSecretManager.getTokenExpiryTime(dtId);
-    logGetDelegationToken(dtId, expiryTime);
+      long expiryTime = dtSecretManager.getTokenExpiryTime(dtId);
+      getEditLog().logGetDelegationToken(dtId, expiryTime);
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
     return token;
   }
 
@@ -5377,20 +5578,27 @@
    */
   public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
       throws InvalidToken, IOException {
-    if (isInSafeMode()) {
-      throw new SafeModeException("Cannot renew delegation token", safeMode);
+    long expiryTime;
+    writeLock();
+    try {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot renew delegation token", safeMode);
+      }
+      if (!isAllowedDelegationTokenOp()) {
+        throw new IOException(
+            "Delegation Token can be renewed only with kerberos or web authentication");
+      }
+      String renewer = UserGroupInformation.getCurrentUser().getShortUserName();
+      expiryTime = dtSecretManager.renewToken(token, renewer);
+      DelegationTokenIdentifier id = new DelegationTokenIdentifier();
+      ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
+      DataInputStream in = new DataInputStream(buf);
+      id.readFields(in);
+      getEditLog().logRenewDelegationToken(id, expiryTime);
+    } finally {
+      writeUnlock();
     }
-    if (!isAllowedDelegationTokenOp()) {
-      throw new IOException(
-          "Delegation Token can be renewed only with kerberos or web authentication");
-    }
-    String renewer = UserGroupInformation.getCurrentUser().getShortUserName();
-    long expiryTime = dtSecretManager.renewToken(token, renewer);
-    DelegationTokenIdentifier id = new DelegationTokenIdentifier();
-    ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
-    DataInputStream in = new DataInputStream(buf);
-    id.readFields(in);
-    logRenewDelegationToken(id, expiryTime);
+    getEditLog().logSync();
     return expiryTime;
   }
 
@@ -5401,13 +5609,19 @@
    */
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
       throws IOException {
-    if (isInSafeMode()) {
-      throw new SafeModeException("Cannot cancel delegation token", safeMode);
-    }
-    String canceller = UserGroupInformation.getCurrentUser().getUserName();
-    DelegationTokenIdentifier id = dtSecretManager
+    writeLock();
+    try {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot cancel delegation token", safeMode);
+      }
+      String canceller = UserGroupInformation.getCurrentUser().getUserName();
+      DelegationTokenIdentifier id = dtSecretManager
         .cancelToken(token, canceller);
-    logCancelDelegationToken(id);
+      getEditLog().logCancelDelegationToken(id);
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
   }
   
   /**
@@ -5425,57 +5639,6 @@
   }
 
   /**
-   * Log the getDelegationToken operation to edit logs
-   * 
-   * @param id identifer of the new delegation token
-   * @param expiryTime when delegation token expires
-   */
-  private void logGetDelegationToken(DelegationTokenIdentifier id,
-      long expiryTime) throws IOException {
-    writeLock();
-    try {
-      getEditLog().logGetDelegationToken(id, expiryTime);
-    } finally {
-      writeUnlock();
-    }
-    getEditLog().logSync();
-  }
-
-  /**
-   * Log the renewDelegationToken operation to edit logs
-   * 
-   * @param id identifer of the delegation token being renewed
-   * @param expiryTime when delegation token expires
-   */
-  private void logRenewDelegationToken(DelegationTokenIdentifier id,
-      long expiryTime) throws IOException {
-    writeLock();
-    try {
-      getEditLog().logRenewDelegationToken(id, expiryTime);
-    } finally {
-      writeUnlock();
-    }
-    getEditLog().logSync();
-  }
-
-  
-  /**
-   * Log the cancelDelegationToken operation to edit logs
-   * 
-   * @param id identifer of the delegation token being cancelled
-   */
-  private void logCancelDelegationToken(DelegationTokenIdentifier id)
-      throws IOException {
-    writeLock();
-    try {
-      getEditLog().logCancelDelegationToken(id);
-    } finally {
-      writeUnlock();
-    }
-    getEditLog().logSync();
-  }
-
-  /**
    * Log the updateMasterKey operation to edit logs
    * 
    * @param key new delegation key.
@@ -5483,6 +5646,10 @@
   public void logUpdateMasterKey(DelegationKey key) throws IOException {
     writeLock();
     try {
+      if (isInSafeMode()) {
+        throw new SafeModeException(
+          "Cannot log master key update in safe mode", safeMode);
+      }
       getEditLog().logUpdateMasterKey(key);
     } finally {
       writeUnlock();
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java b/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
index ff6b2dd..c38ba5b 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
@@ -391,6 +391,7 @@
 
   /** Check the leases beginning from the oldest. */
   private synchronized void checkLeases() {
+    assert fsnamesystem.hasWriteLock();
     for(; sortedLeases.size() > 0; ) {
       final Lease oldest = sortedLeases.first();
       if (!oldest.expiredHardLimit()) {
diff --git a/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java b/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
index 67bf0ce..9ee4a6f 100644
--- a/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
+++ b/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
@@ -56,7 +56,7 @@
                                                  .racks(racks)
                                                  .hosts(hosts)
                                                  .build();
-    
+    dfsCluster.waitClusterUp();
     namenode = conf.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");
     
     username = System.getProperty("user.name");
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java b/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
index 02bbf5f..a1adbb2 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -555,6 +555,15 @@
     IOUtils.copyBytes(is, os, s.length(), true);
   }
 
+  /* Append the given string to the given file */
+  public static void appendFile(FileSystem fs, Path p, String s) 
+      throws IOException {
+    assert fs.exists(p);
+    InputStream is = new ByteArrayInputStream(s.getBytes());
+    FSDataOutputStream os = fs.append(p);
+    IOUtils.copyBytes(is, os, s.length(), true);
+  }
+  
   // Returns url content as string.
   public static String urlGet(URL url) throws IOException {
     URLConnection conn = url.openConnection();
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java
index 1a6de15..6ded5a3 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java
@@ -35,6 +35,7 @@
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import static org.junit.Assert.*;
 
 import org.junit.After;
@@ -201,7 +202,8 @@
     nodes.add(nodename);
     writeConfigFile(excludeFile, nodes);
     cluster.getNamesystem(nnIndex).refreshNodes(conf);
-    DatanodeInfo ret = cluster.getNamesystem(nnIndex).getDatanode(info[index]);
+    DatanodeInfo ret = NameNodeAdapter.getDatanode(
+        cluster.getNameNode(nnIndex), info[index]);
     waitNodeState(ret, waitForState);
     return ret;
   }
@@ -371,7 +373,7 @@
       // Stop decommissioning and verify stats
       writeConfigFile(excludeFile, null);
       fsn.refreshNodes(conf);
-      DatanodeInfo ret = fsn.getDatanode(downnode);
+      DatanodeInfo ret = NameNodeAdapter.getDatanode(namenode, downnode);
       waitNodeState(ret, AdminStates.NORMAL);
       verifyStats(namenode, fsn, ret, false);
     }
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestSafeMode.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestSafeMode.java
index ac62f06..8c98a20 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestSafeMode.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestSafeMode.java
@@ -20,20 +20,44 @@
 
 import java.io.IOException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
 
-import junit.framework.TestCase;
+import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
 
 /**
  * Tests to verify safe mode correctness.
  */
-public class TestSafeMode extends TestCase {
-  
-  static Log LOG = LogFactory.getLog(TestSafeMode.class);
+public class TestSafeMode {
+  Configuration conf; 
+  MiniDFSCluster cluster;
+  FileSystem fs;
+  DistributedFileSystem dfs;
+
+  @Before
+  public void startUp() throws IOException {
+    conf = new HdfsConfiguration();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();      
+    fs = cluster.getFileSystem();
+    dfs = (DistributedFileSystem)fs;
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (fs != null) {
+      fs.close();
+    }
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
 
   /**
    * This test verifies that if SafeMode is manually entered, name-node does not
@@ -51,61 +75,123 @@
    *  
    * @throws IOException
    */
-  public void testManualSafeMode() throws IOException {
-    MiniDFSCluster cluster = null;
-    DistributedFileSystem fs = null;
+  @Test
+  public void testManualSafeMode() throws IOException {      
+    fs = (DistributedFileSystem)cluster.getFileSystem();
+    Path file1 = new Path("/tmp/testManualSafeMode/file1");
+    Path file2 = new Path("/tmp/testManualSafeMode/file2");
+    
+    // create two files with one block each.
+    DFSTestUtil.createFile(fs, file1, 1000, (short)1, 0);
+    DFSTestUtil.createFile(fs, file2, 2000, (short)1, 0);
+    fs.close();
+    cluster.shutdown();
+    
+    // now bring up just the NameNode.
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).format(false).build();
+    cluster.waitActive();
+    dfs = (DistributedFileSystem)cluster.getFileSystem();
+    
+    assertTrue("No datanode is started. Should be in SafeMode", 
+               dfs.setSafeMode(SafeModeAction.SAFEMODE_GET));
+    
+    // manually set safemode.
+    dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+    
+    // now bring up the datanode and wait for it to be active.
+    cluster.startDataNodes(conf, 1, true, null, null);
+    cluster.waitActive();
+    
+    // wait longer than dfs.namenode.safemode.extension
     try {
-      Configuration conf = new HdfsConfiguration();
-      // disable safemode extension to make the test run faster.
-      conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, "1");
-      cluster = new MiniDFSCluster.Builder(conf).build();
-      cluster.waitActive();
-      
-      fs = (DistributedFileSystem)cluster.getFileSystem();
-      Path file1 = new Path("/tmp/testManualSafeMode/file1");
-      Path file2 = new Path("/tmp/testManualSafeMode/file2");
-      
-      LOG.info("Created file1 and file2.");
-      
-      // create two files with one block each.
-      DFSTestUtil.createFile(fs, file1, 1000, (short)1, 0);
-      DFSTestUtil.createFile(fs, file2, 2000, (short)1, 0);
-      fs.close();
-      cluster.shutdown();
-      
-      // now bring up just the NameNode.
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).format(false).build();
-      cluster.waitActive();
-      fs = (DistributedFileSystem)cluster.getFileSystem();
-      
-      LOG.info("Restarted cluster with just the NameNode");
-      
-      assertTrue("No datanode is started. Should be in SafeMode", 
-                 fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
-      
-      // manually set safemode.
-      fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
-      
-      // now bring up the datanode and wait for it to be active.
-      cluster.startDataNodes(conf, 1, true, null, null);
-      cluster.waitActive();
-      
-      LOG.info("Datanode is started.");
+      Thread.sleep(2000);
+    } catch (InterruptedException ignored) {}
 
-      // wait longer than dfs.namenode.safemode.extension
-      try {
-        Thread.sleep(2000);
-      } catch (InterruptedException ignored) {}
-      
-      assertTrue("should still be in SafeMode",
-          fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
-      
-      fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
-      assertFalse("should not be in SafeMode",
-          fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
-    } finally {
-      if(fs != null) fs.close();
-      if(cluster!= null) cluster.shutdown();
-    }
+    assertTrue("should still be in SafeMode",
+        dfs.setSafeMode(SafeModeAction.SAFEMODE_GET));
+    assertFalse("should not be in SafeMode", 
+        dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE));
   }
-}
+
+  public interface FSRun {
+    public abstract void run(FileSystem fs) throws IOException;
+  }
+
+  /**
+   * Assert that the given function fails to run due to a safe 
+   * mode exception.
+   */
+  public void runFsFun(String msg, FSRun f) {
+    try {
+      f.run(fs);
+      fail(msg);
+     } catch (IOException ioe) {
+       assertTrue(ioe.getMessage().contains("safe mode"));
+     }
+  }
+
+  /**
+   * Run various fs operations while the NN is in safe mode,
+   * assert that they are either allowed or fail as expected.
+   */
+  @Test
+  public void testOperationsWhileInSafeMode() throws IOException {
+    final Path file1 = new Path("/file1");
+
+    assertFalse(dfs.setSafeMode(SafeModeAction.SAFEMODE_GET));
+    DFSTestUtil.createFile(fs, file1, 1024, (short)1, 0);
+    assertTrue("Could not enter SM", 
+        dfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER));
+
+    runFsFun("Set quota while in SM", new FSRun() { 
+      public void run(FileSystem fs) throws IOException {
+        ((DistributedFileSystem)fs).setQuota(file1, 1, 1); 
+      }});
+
+    runFsFun("Set perm while in SM", new FSRun() {
+      public void run(FileSystem fs) throws IOException {
+        fs.setPermission(file1, FsPermission.getDefault());
+      }});
+
+    runFsFun("Set owner while in SM", new FSRun() {
+      public void run(FileSystem fs) throws IOException {
+        fs.setOwner(file1, "user", "group");
+      }});
+
+    runFsFun("Set repl while in SM", new FSRun() {
+      public void run(FileSystem fs) throws IOException {
+        fs.setReplication(file1, (short)1);
+      }});
+
+    runFsFun("Append file while in SM", new FSRun() {
+      public void run(FileSystem fs) throws IOException {
+        DFSTestUtil.appendFile(fs, file1, "new bytes");
+      }});
+
+    runFsFun("Delete file while in SM", new FSRun() {
+      public void run(FileSystem fs) throws IOException {
+        fs.delete(file1, false);
+      }});
+
+    runFsFun("Rename file while in SM", new FSRun() {
+      public void run(FileSystem fs) throws IOException {
+        fs.rename(file1, new Path("file2"));
+      }});
+
+    try {
+      fs.setTimes(file1, 0, 0);
+    } catch (IOException ioe) {
+      fail("Set times failed while in SM");
+    }
+
+    try {
+      DFSTestUtil.readFile(fs, file1);
+    } catch (IOException ioe) {
+      fail("Set times failed while in SM");
+    }
+
+    assertFalse("Could not leave SM",
+        dfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE));
+  }
+  
+}
\ No newline at end of file
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index 683f505..883c51f 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -21,6 +21,7 @@
 
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 
 /**
@@ -77,4 +78,18 @@
   public static String getLeaseHolderForPath(NameNode namenode, String path) {
     return namenode.getNamesystem().leaseManager.getLeaseByPath(path).getHolder();
   }
+
+  /**
+   * Return the datanode descriptor for the given datanode.
+   */
+  public static DatanodeDescriptor getDatanode(NameNode namenode,
+      DatanodeID id) throws IOException {
+    FSNamesystem ns = namenode.getNamesystem();
+    ns.readLock();
+    try {
+      return ns.getDatanode(id);
+    } finally {
+      ns.readUnlock();
+    }
+  }
 }
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index 60d697a..0c5aa8c 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -61,7 +61,14 @@
     FSNamesystem namesystem = cluster.getNamesystem();
     String state = alive ? "alive" : "dead";
     while (System.currentTimeMillis() < stopTime) {
-      if (namesystem.getDatanode(nodeID).isAlive == alive) {
+      namesystem.readLock();
+      DatanodeDescriptor dd;
+      try {
+        dd = namesystem.getDatanode(nodeID);
+      } finally {
+        namesystem.readUnlock();
+      }
+      if (dd.isAlive == alive) {
         LOG.info("datanode " + nodeID + " is " + state);
         return;
       }
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHeartbeatHandling.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHeartbeatHandling.java
index 4a2767a..9b10156 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHeartbeatHandling.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHeartbeatHandling.java
@@ -54,7 +54,13 @@
       final DatanodeRegistration nodeReg = 
         DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
         
-      DatanodeDescriptor dd = namesystem.getDatanode(nodeReg);
+      namesystem.readLock();
+      DatanodeDescriptor dd;
+      try {
+        dd = namesystem.getDatanode(nodeReg);
+      } finally {
+        namesystem.readUnlock();
+      }
       
       final int REMAINING_BLOCKS = 1;
       final int MAX_REPLICATE_LIMIT = 
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNNThroughputBenchmark.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNNThroughputBenchmark.java
index 40c25d0..8203292 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNNThroughputBenchmark.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestNNThroughputBenchmark.java
@@ -24,7 +24,6 @@
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Test;
 
 public class TestNNThroughputBenchmark {
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSafeMode.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSafeMode.java
index 875fc33..1f953bb 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSafeMode.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestSafeMode.java
@@ -20,109 +20,31 @@
 
 import java.io.IOException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import junit.framework.TestCase;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
 
 /**
  * Tests to verify safe mode correctness.
  */
-public class TestSafeMode extends TestCase {
+public class TestSafeMode {
   
-  static Log LOG = LogFactory.getLog(TestSafeMode.class);
-
-  /**
-   * This test verifies that if SafeMode is manually entered, name-node does not
-   * come out of safe mode even after the startup safe mode conditions are met.
-   * <ol>
-   * <li>Start cluster with 1 data-node.</li>
-   * <li>Create 2 files with replication 1.</li>
-   * <li>Re-start cluster with 0 data-nodes. 
-   * Name-node should stay in automatic safe-mode.</li>
-   * <li>Enter safe mode manually.</li>
-   * <li>Start the data-node.</li>
-   * <li>Wait longer than <tt>dfs.namenode.safemode.extension</tt> and 
-   * verify that the name-node is still in safe mode.</li>
-   * </ol>
-   *  
-   * @throws IOException
-   */
-  public void testManualSafeMode() throws IOException {
-    MiniDFSCluster cluster = null;
-    DistributedFileSystem fs = null;
-    try {
-      Configuration conf = new HdfsConfiguration();
-      // disable safemode extension to make the test run faster.
-      conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, "1");
-      cluster = new MiniDFSCluster.Builder(conf).build();
-      cluster.waitActive();
-      
-      fs = (DistributedFileSystem)cluster.getFileSystem();
-      Path file1 = new Path("/tmp/testManualSafeMode/file1");
-      Path file2 = new Path("/tmp/testManualSafeMode/file2");
-      
-      LOG.info("Created file1 and file2.");
-      
-      // create two files with one block each.
-      DFSTestUtil.createFile(fs, file1, 1000, (short)1, 0);
-      DFSTestUtil.createFile(fs, file2, 2000, (short)1, 0);
-      fs.close();
-      cluster.shutdown();
-      
-      // now bring up just the NameNode.
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).format(false).build();
-      cluster.waitActive();
-      fs = (DistributedFileSystem)cluster.getFileSystem();
-      
-      LOG.info("Restarted cluster with just the NameNode");
-      
-      assertTrue("No datanode is started. Should be in SafeMode", 
-                 fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
-      
-      // manually set safemode.
-      fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
-      
-      // now bring up the datanode and wait for it to be active.
-      cluster.startDataNodes(conf, 1, true, null, null);
-      cluster.waitActive();
-      
-      LOG.info("Datanode is started.");
-
-      // wait longer than dfs.namenode.safemode.extension
-      try {
-        Thread.sleep(2000);
-      } catch (InterruptedException ignored) {}
-      
-      assertTrue("should still be in SafeMode",
-          fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
-      
-      fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
-      assertFalse("should not be in SafeMode",
-          fs.setSafeMode(SafeModeAction.SAFEMODE_GET));
-    } finally {
-      if(fs != null) fs.close();
-      if(cluster!= null) cluster.shutdown();
-    }
-  }
-
-
   /**
    * Verify that the NameNode stays in safemode when dfs.safemode.datanode.min
    * is set to a number greater than the number of live datanodes.
    */
+  @Test
   public void testDatanodeThreshold() throws IOException {
     MiniDFSCluster cluster = null;
     DistributedFileSystem fs = null;
     try {
-      Configuration conf = new Configuration();
+      Configuration conf = new HdfsConfiguration();
       conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
       conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
 
diff --git a/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestNNLeaseRecovery.java b/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestNNLeaseRecovery.java
index c2aab08..0c22de8 100644
--- a/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestNNLeaseRecovery.java
+++ b/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestNNLeaseRecovery.java
@@ -39,7 +39,8 @@
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
-import static org.junit.Assert.assertFalse;
+
+import static org.junit.Assert.*;
 import org.junit.Before;
 import org.junit.Test;
 import static org.mockito.Matchers.anyString;
@@ -99,6 +100,17 @@
     }
   }
 
+  // Release the lease for the given file
+  private boolean releaseLease(FSNamesystem ns, LeaseManager.Lease lm, 
+      Path file) throws IOException {
+    fsn.writeLock();
+    try {
+      return fsn.internalReleaseLease(lm, file.toString(), null);
+    } finally {
+      fsn.writeUnlock();
+    }
+  }
+
   /**
    * Mocks FSNamesystem instance, adds an empty file and invokes lease recovery
    * method. 
@@ -118,7 +130,7 @@
     fsn.dir.addFile(file.toString(), ps, (short)3, 1l, 
       "test", "test-machine", dnd, 1001l);
     assertTrue("True has to be returned in this case",
-      fsn.internalReleaseLease(lm, file.toString(), null));
+        releaseLease(fsn, lm, file));
   }
   
   /**
@@ -143,9 +155,9 @@
     mockFileBlocks(2, null, 
       HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps, false);
     
-    fsn.internalReleaseLease(lm, file.toString(), null);
-    assertTrue("FSNamesystem.internalReleaseLease suppose to throw " +
-      "IOException here", false);
+    releaseLease(fsn, lm, file);
+    fail("FSNamesystem.internalReleaseLease suppose to throw " +
+      "IOException here");
   }  
 
   /**
@@ -169,15 +181,14 @@
     mockFileBlocks(2, HdfsConstants.BlockUCState.COMMITTED, 
       HdfsConstants.BlockUCState.COMMITTED, file, dnd, ps, false);
 
-    fsn.internalReleaseLease(lm, file.toString(), null);
-    assertTrue("FSNamesystem.internalReleaseLease suppose to throw " +
-      "AlreadyBeingCreatedException here", false);
+    releaseLease(fsn, lm, file);
+    fail("FSNamesystem.internalReleaseLease suppose to throw " +
+      "IOException here");
   }
 
   /**
    * Mocks FSNamesystem instance, adds an empty file with 0 blocks
    * and invokes lease recovery method. 
-   * 
    */
   @Test
   public void testInternalReleaseLease_0blocks () throws IOException {
@@ -194,7 +205,7 @@
     mockFileBlocks(0, null, null, file, dnd, ps, false);
 
     assertTrue("True has to be returned in this case",
-      fsn.internalReleaseLease(lm, file.toString(), null));
+        releaseLease(fsn, lm, file));
   }
   
   /**
@@ -217,9 +228,9 @@
 
     mockFileBlocks(1, null, HdfsConstants.BlockUCState.COMMITTED, file, dnd, ps, false);
 
-    fsn.internalReleaseLease(lm, file.toString(), null);
-    assertTrue("FSNamesystem.internalReleaseLease suppose to throw " +
-      "AlreadyBeingCreatedException here", false);
+    releaseLease(fsn, lm, file);
+    fail("FSNamesystem.internalReleaseLease suppose to throw " +
+      "IOException here");
   }
 
   /**
@@ -244,7 +255,7 @@
       HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps, false);
         
     assertFalse("False is expected in return in this case",
-      fsn.internalReleaseLease(lm, file.toString(), null));
+        releaseLease(fsn, lm, file));
   }
 
   @Test