HDFS-326 Merge with SVN_HEAD of 2010-01-08

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-326@897222 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/.eclipse.templates/.classpath b/.eclipse.templates/.classpath
index a4e8970..680a01e 100644
--- a/.eclipse.templates/.classpath
+++ b/.eclipse.templates/.classpath
@@ -9,20 +9,19 @@
 	<classpathentry kind="src" path="src/contrib/thriftfs/src/java"/>
 	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
 	<classpathentry kind="var" path="ANT_HOME/lib/ant.jar"/>
-	<classpathentry kind="lib" path="lib/hadoop-core-0.22.0-dev.jar"/>
-	<classpathentry kind="lib" path="lib/hadoop-core-test-0.22.0-dev.jar"/>
+	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/hadoop-core-0.22.0-SNAPSHOT.jar"/>
+	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/test/hadoop-core-test-0.22.0-SNAPSHOT.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-cli-1.2.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-codec-1.3.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-el-1.0.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-httpclient-3.0.1.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-logging-1.0.4.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-logging-api-1.0.4.jar"/>
+	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-logging-1.1.1.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/commons-net-1.4.1.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/core-3.1.1.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/hsqldb-1.8.0.10.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jasper-compiler-5.5.12.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jasper-runtime-5.5.12.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jets3t-0.6.1.jar"/>
+	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jets3t-0.7.1.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jetty-6.1.14.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jetty-util-6.1.14.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/jsp-2.1-6.1.14.jar"/>
@@ -32,16 +31,17 @@
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/log4j-1.2.15.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/oro-2.0.8.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/servlet-api-2.5-6.1.14.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/slf4j-api-1.4.3.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/slf4j-log4j12-1.4.3.jar"/>
+	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/slf4j-api-1.5.8.jar"/>
+	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/test/slf4j-log4j12-1.4.3.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/xmlenc-0.52.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/aspectjrt-1.5.3.jar"/>
+	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/test/mockito-all-1.8.0.jar"/>	
+	<classpathentry kind="lib" path="build/ivy/lib/Hadoop-Hdfs/common/aspectjrt-1.6.5.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cactus.core.framework.uberjar.javaEE.14-1.8.0.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cactus.integration.ant-1.8.0.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cactus.integration.shared.api-1.8.0.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cargo-ant-0.9.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/cargo-core-uberjar-0.9.jar"/>
-	<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/standard-1.1.2.jar"/>
+	<classpathentry kind="lib" path="build/ivy/lib/hdfsproxy/common/standard-1.1.2.jar"/>	
 	<classpathentry kind="lib" path="src/contrib/thriftfs/lib/hadoopthriftapi.jar"/>
 	<classpathentry kind="lib" path="src/contrib/thriftfs/lib/libthrift.jar"/>
 	<classpathentry kind="lib" path="build/test/classes"/>
diff --git a/CHANGES.txt b/CHANGES.txt
index 3b95e93..552e0f6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,29 +18,28 @@
 
   IMPROVEMENTS
     
-    HDFS-704. Unify build property names to facilitate cross-projects
-    modifications (cos)
-
     HDFS-703. Replace current fault injection implementation with one
     from (cos)
 
     HDFS-754. Reduce ivy console output to ovservable level (cos)
 
-    HDFS-699. Add unit tests framework (Mockito) (cos, Eli Collins)
+    HDFS-832. HDFS side of HADOOP-6222. (cos)
 
-    HDFS-630  In DFSOutputStream.nextBlockOutputStream(), the client can
-              exclude specific datanodes when locating the next block
-              (Cosmin Lehene via Stack)
+    HDFS-840. Change tests to use FileContext test helper introduced in
+    HADOOP-6394. (Jitendra Nath Pandey via suresh)
 
-    HDFS-519. Create new tests for lease recovery (cos)
+    HDFS-685. Use the user-to-groups mapping service in the NameNode. (boryas, acmurthy)
+
+    HDFS-755. Read multiple checksum chunks at once in DFSInputStream.
+    (Todd Lipcon via tomwhite)
+
+    HDFS-786. Implement getContentSummary in HftpFileSystem.
+    (Tsz Wo (Nicholas), SZE via cdouglas)
 
   OPTIMIZATIONS
 
   BUG FIXES
   
-    HDFS-646. Fix test-patch failure by adding test-contrib ant target.
-    (gkesavan)
-
     HDFS-695. RaidNode should read in configuration from hdfs-site.xml.
     (dhruba)
 
@@ -49,8 +48,6 @@
 
     HDFS-750. Fix build failure due to TestRename. (suresh)
 
-    HDFS-733. TestBlockReport fails intermittently. (cos)
-
     HDFS-712. Move libhdfs from mapreduce subproject to hdfs subproject.
     (Eli Collins via dhruba)
 
@@ -62,13 +59,6 @@
     HDFS-751. Fix TestCrcCorruption to pick up the correct datablocks to
     corrupt. (dhruba)
     
-    HDFS-774. Intermittent race condition in TestFiPipelines (cos)
-
-    HDFS-741. TestHFlush test doesn't seek() past previously written part of
-    the file (cos, szetszwo)
-
-    HDFS-706. Intermittent failures in TestFiHFlush (cos)
- 
     HDFS-763. Fix slightly misleading report from DataBlockScanner 
     about corrupted scans. (dhruba)
 
@@ -81,7 +71,21 @@
     HDFS-785. Add Apache license to several namenode unit tests. 
     (Ravi Phulari via jghoman)
 
-    HDFS-791. Build is broken after HDFS-787 patch has been applied (cos)
+    HDFS-802. Update Eclipse configuration to match changes to Ivy
+    configuration (Edwin Chan via cos)
+
+    HDFS-423. Unbreak FUSE build and fuse_dfs_wrapper.sh (Eli Collins via cos)
+
+    HDFS-825. Build fails to pull latest hadoop-core-* artifacts (cos)
+
+    HDFS-94. The Heap Size printed in the NameNode WebUI is accurate.
+    (Dmytro Molkov via dhruba)
+
+    HDFS-767. An improved retry policy when the DFSClient is unable to fetch a
+    block from the datanode.  (Ning Zhang via dhruba)
+
+    HDFS-187. Initialize secondary namenode http address in TestStartup.
+    (Todd Lipcon via szetszwo)
 
 Release 0.21.0 - Unreleased
 
@@ -186,6 +190,8 @@
     HDFS-631. Rename configuration keys towards API standardization and
     backward compatibility. (Jitendra Nath Pandey via suresh)
 
+    HDFS-669. Add unit tests framework (Mockito) (cos, Eli Collins)
+
     HDFS-731. Support new Syncable interface in HDFS. (hairong)
 
     HDFS-702. Add HDFS implementation of AbstractFileSystem. 
@@ -194,6 +200,9 @@
     HDFS-758. Add decommissioning status page to Namenode Web UI.
     (Jitendra Nath Pandey via suresh)
 
+    HDFS-814. Add an api to get the visible length of a DFSDataInputStream.
+    (szetszwo)
+
   IMPROVEMENTS
 
     HDFS-381. Remove blocks from DataNode maps when corresponding file
@@ -354,6 +363,9 @@
 
     HDFS-680. Add new access method to a copy of a block's replica. (shv)
 
+    HDFS-704. Unify build property names to facilitate cross-projects
+    modifications (cos)
+
     HDFS-705. Create an adapter to access some of package-private methods of
     DataNode from tests (cos)
 
@@ -393,6 +405,12 @@
     HDFS-787. Upgrade some libraries to be consistent with common and 
     mapreduce. (omalley)
 
+    HDFS-519. Create new tests for lease recovery (cos)
+
+    HDFS-804. New unit tests for concurrent lease recovery (cos)
+
+    HDFS-813. Enable the append test in TestReadWhileWriting.  (szetszwo)
+
   BUG FIXES
 
     HDFS-76. Better error message to users when commands fail because of 
@@ -540,6 +558,52 @@
     HDFS-691. Fix an overflow error in DFSClient.DFSInputStream.available().
     (szetszwo)
 
+    HDFS-733. TestBlockReport fails intermittently. (cos)
+
+    HDFS-774. Intermittent race condition in TestFiPipelines (cos)
+
+    HDFS-741. TestHFlush test doesn't seek() past previously written part of
+    the file (cos, szetszwo)
+
+    HDFS-706. Intermittent failures in TestFiHFlush (cos)
+ 
+    HDFS-646. Fix test-patch failure by adding test-contrib ant target.
+    (gkesavan)
+
+    HDFS-791. Build is broken after HDFS-787 patch has been applied (cos)
+
+    HDFS-792. TestHDFSCLI is failing. (Todd Lipcon via cos)
+
+    HDFS-781. Namenode metrics PendingDeletionBlocks is not decremented.
+    (Suresh)
+
+    HDFS-192. Fix TestBackupNode failures. (shv)
+
+    HDFS-797. TestHDFSCLI much slower after HDFS-265 merge. (Todd Lipcon via cos)
+
+    HDFS-824. Stop lease checker in TestReadWhileWriting.  (szetszwo)
+
+    HDFS-823. CheckPointer should use addInternalServlet for image-fetching
+    servlet (jghoman)
+
+    HDFS-456. Fix URI generation for windows file paths. (shv)
+
+    HDFS-812. FSNamesystem#internalReleaseLease throws NullPointerException on
+    a single-block file's lease recovery. (cos)
+
+    HDFS-724. Pipeline hangs if one of the block receiver is not responsive.
+    (hairong)
+
+    HDFS-564. Adding pipeline tests 17-35. (hairong)
+
+    HDFS-849. TestFiDataTransferProtocol2#pipeline_Fi_18 sometimes fails.
+    (hairong)
+
+    HDFS-762. Balancer causes Null Pointer Exception. 
+    (Cristian Ivascu via dhruba)
+
+    HDFS-868. Fix link to Hadoop Upgrade Wiki. (Chris A. Mattmann via shv)
+
 Release 0.20.2 - Unreleased
 
   IMPROVEMENTS
@@ -570,6 +634,15 @@
     HDFS-596. Fix memory leak in hdfsFreeFileInfo() for libhdfs.
     (Zhang Bingjun via dhruba)
 
+    HDFS-793. Data node should receive the whole packet ack message before it
+    constructs and sends its own ack message for the packet. (hairong)
+
+    HDFS-185. Disallow chown, chgrp, chmod, setQuota, and setSpaceQuota when
+    name-node is in safemode. (Ravi Phulari via shv)
+
+    HDFS-101. DFS write pipeline: DFSClient sometimes does not detect second
+    datanode failure. (hairong)
+
 Release 0.20.1 - 2009-09-01
 
   IMPROVEMENTS
diff --git a/build.xml b/build.xml
index ef0a9a4..05ca157 100644
--- a/build.xml
+++ b/build.xml
@@ -1086,6 +1086,8 @@
       <env key="JVM_ARCH" value="${jvm.arch}"/>
       <arg value="install"/>
     </exec>
+    <!-- Create a build platform-agnostic link to c++ libs -->
+    <symlink overwrite="true" link="${build.dir}/c++/lib" resource="${install.c++}/lib"/>
   </target>
 
   <target name="compile-ant-tasks" depends="compile-core">
diff --git a/ivy/ivysettings.xml b/ivy/ivysettings.xml
index 40b5843..9e10e41 100644
--- a/ivy/ivysettings.xml
+++ b/ivy/ivysettings.xml
@@ -39,14 +39,15 @@
 
   <resolvers>
     <ibiblio name="maven2" root="${repo.maven.org}" pattern="${maven2.pattern.ext}" m2compatible="true"/>
-    <ibiblio name="apache-snapshot" root="${snapshot.apache.org}" m2compatible="true"/>
+    <ibiblio name="apache-snapshot" root="${snapshot.apache.org}" m2compatible="true"
+        checkmodified="true" changingPattern=".*SNAPSHOT"/>
 
     <filesystem name="fs" m2compatible="true" force="true">
        <artifact pattern="${repo.dir}/org/apache/hadoop/[module]/[revision]/[module]-[revision].[ext]"/>
        <ivy pattern="${repo.dir}/org/apache/hadoop/[module]/[revision]/[module]-[revision].pom"/>
     </filesystem>
 
-    <chain name="default" dual="true">
+    <chain name="default" dual="true" checkmodified="true" changingPattern=".*SNAPSHOT">
       <resolver ref="apache-snapshot"/> 
       <resolver ref="maven2"/>
     </chain>
diff --git a/src/contrib/build-contrib.xml b/src/contrib/build-contrib.xml
index c7d57cf..8d62945 100644
--- a/src/contrib/build-contrib.xml
+++ b/src/contrib/build-contrib.xml
@@ -43,6 +43,9 @@
   <property name="test.timeout" value="900000"/>
   <property name="build.dir" location="${hadoop.root}/build/contrib/${name}"/>
   <property name="build.classes" location="${build.dir}/classes"/>
+  <!-- NB: sun.arch.data.model is not supported on all platforms -->
+  <property name="build.platform"
+            value="${os.name}-${os.arch}-${sun.arch.data.model}"/>
   <property name="build.test" location="${build.dir}/test"/>
   <property name="build.examples" location="${build.dir}/examples"/>
   <property name="hadoop.log.dir" location="${build.dir}/test/logs"/>
diff --git a/src/contrib/fuse-dfs/build.xml b/src/contrib/fuse-dfs/build.xml
index 6388fdd..e8ab8d3 100644
--- a/src/contrib/fuse-dfs/build.xml
+++ b/src/contrib/fuse-dfs/build.xml
@@ -32,9 +32,9 @@
 
 
   <target name="check-libhdfs-exists" if="fusedfs">
-  <property name="libhdfs.lib" value="${hadoop.root}/build/libhdfs/libhdfs.so"/>
+  <property name="libhdfs.lib" value="${hadoop.root}/build/c++/${build.platform}/lib/libhdfs.so"/>
         <available file="${libhdfs.lib}" property="libhdfs-exists"/>
-    <fail message="libhdfs.so does not exist: ${libhdfs.lib}. Please check flags -Dlibhdfs=1 -Dfusedfs=1 are set or first try ant compile-libhdfs -Dlibhdfs=1">
+    <fail message="libhdfs.so does not exist: ${libhdfs.lib}. Please check flags -Dlibhdfs=1 -Dfusedfs=1 are set or first try ant compile -Dcompile.c++=true -Dlibhdfs=true">
          <condition>
             <not><isset property="libhdfs-exists"/></not>
           </condition>
@@ -59,7 +59,7 @@
       <env key="OS_ARCH" value="${os.arch}"/>
       <env key="HADOOP_HOME" value="${hadoop.root}"/>
       <env key="PACKAGE_VERSION" value="0.1.0"/>
-
+      <env key="BUILD_PLATFORM" value="${build.platform}" />
       <env key="PERMS" value="${perms}"/>
     </exec>
     <mkdir dir="${build.dir}"/>
diff --git a/src/contrib/fuse-dfs/src/Makefile.am b/src/contrib/fuse-dfs/src/Makefile.am
index 3b978e9..053a2fe 100644
--- a/src/contrib/fuse-dfs/src/Makefile.am
+++ b/src/contrib/fuse-dfs/src/Makefile.am
@@ -17,5 +17,4 @@
 bin_PROGRAMS = fuse_dfs
 fuse_dfs_SOURCES = fuse_dfs.c fuse_options.c fuse_trash.c fuse_stat_struct.c fuse_users.c fuse_init.c fuse_connect.c fuse_impls_access.c fuse_impls_chmod.c  fuse_impls_chown.c  fuse_impls_create.c  fuse_impls_flush.c fuse_impls_getattr.c  fuse_impls_mkdir.c  fuse_impls_mknod.c  fuse_impls_open.c fuse_impls_read.c fuse_impls_release.c fuse_impls_readdir.c fuse_impls_rename.c fuse_impls_rmdir.c fuse_impls_statfs.c fuse_impls_symlink.c fuse_impls_truncate.c fuse_impls_utimens.c  fuse_impls_unlink.c fuse_impls_write.c
 AM_CPPFLAGS= -DPERMS=$(PERMS) -D_FILE_OFFSET_BITS=64 -I$(JAVA_HOME)/include -I$(HADOOP_HOME)/src/c++/libhdfs/ -I$(JAVA_HOME)/include/linux/ -D_FUSE_DFS_VERSION=\"$(PACKAGE_VERSION)\" -DPROTECTED_PATHS=\"$(PROTECTED_PATHS)\" -I$(FUSE_HOME)/include
-AM_LDFLAGS= -L$(HADOOP_HOME)/build/libhdfs -lhdfs -L$(FUSE_HOME)/lib -lfuse -L$(JAVA_HOME)/jre/lib/$(OS_ARCH)/server -ljvm
-
+AM_LDFLAGS= -L$(HADOOP_HOME)/build/c++/$(BUILD_PLATFORM)/lib -lhdfs -L$(FUSE_HOME)/lib -lfuse -L$(JAVA_HOME)/jre/lib/$(OS_ARCH)/server -ljvm
diff --git a/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh b/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh
index 35a6c6b..cf0fbcb 100755
--- a/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh
+++ b/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh
@@ -1,3 +1,4 @@
+#!/usr/bin/env bash
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -19,12 +20,6 @@
 export HADOOP_HOME=/usr/local/share/hadoop
 fi
 
-export PATH=$HADOOP_HOME/contrib/fuse_dfs:$PATH
-
-for f in ls $HADOOP_HOME/lib/*.jar $HADOOP_HOME/*.jar ; do
-export  CLASSPATH=$CLASSPATH:$f
-done
-
 if [ "$OS_ARCH" = "" ]; then
 export OS_ARCH=amd64
 fi
@@ -37,4 +32,17 @@
 export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/$OS_ARCH/server:/usr/local/share/hdfs/libhdfs/:/usr/local/lib
 fi
 
-./fuse_dfs $@
+# If dev build set paths accordingly
+if [ -d $HADOOP_HDFS_HOME/build ]; then
+  export HADOOP_HOME=$HADOOP_HDFS_HOME
+  for f in ${HADOOP_HOME}/build/*.jar ; do
+    export CLASSPATH=$CLASSPATH:$f
+  done
+  for f in $HADOOP_HOME/build/ivy/lib/Hadoop-Hdfs/common/*.jar ; do
+    export CLASSPATH=$CLASSPATH:$f
+  done
+  export PATH=$HADOOP_HOME/build/contrib/fuse-dfs:$PATH
+  export LD_LIBRARY_PATH=$HADOOP_HOME/build/c++/lib:$JAVA_HOME/jre/lib/$OS_ARCH/server
+fi
+
+fuse_dfs $@
diff --git a/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml b/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml
index 3c91748..df813d5 100644
--- a/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml
+++ b/src/docs/src/documentation/content/xdocs/hdfs_user_guide.xml
@@ -530,7 +530,7 @@
       of Hadoop and rollback the cluster to the state it was in 
       before
       the upgrade. HDFS upgrade is described in more detail in 
-      <a href="http://wiki.apache.org/hadoop/Hadoop%20Upgrade">Hadoop Upgrade</a> Wiki page.
+      <a href="http://wiki.apache.org/hadoop/Hadoop_Upgrade">Hadoop Upgrade</a> Wiki page.
       HDFS can have one such backup at a time. Before upgrading,
       administrators need to remove existing backup using <code>bin/hadoop
       dfsadmin -finalizeUpgrade</code> command. The following
diff --git a/src/java/hdfs-default.xml b/src/java/hdfs-default.xml
index 3cecb5d..8f35f58 100644
--- a/src/java/hdfs-default.xml
+++ b/src/java/hdfs-default.xml
@@ -169,7 +169,7 @@
 
 <property>
   <name>dfs.namenode.name.dir</name>
-  <value>${hadoop.tmp.dir}/dfs/name</value>
+  <value>file://${hadoop.tmp.dir}/dfs/name</value>
   <description>Determines where on the local filesystem the DFS name node
       should store the name table(fsimage).  If this is a comma-delimited list
       of directories then the name table is replicated in all of the
@@ -447,7 +447,7 @@
 
 <property>
   <name>dfs.namenode.checkpoint.dir</name>
-  <value>${hadoop.tmp.dir}/dfs/namesecondary</value>
+  <value>file://${hadoop.tmp.dir}/dfs/namesecondary</value>
   <description>Determines where on the local filesystem the DFS secondary
       name node should store the temporary images to merge.
       If this is a comma-delimited list of directories then the image is
diff --git a/src/java/org/apache/hadoop/hdfs/DFSClient.java b/src/java/org/apache/hadoop/hdfs/DFSClient.java
index 2e204c7..09bafd1 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -87,6 +87,7 @@
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.hdfs.security.BlockAccessToken;
 import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -1510,26 +1511,60 @@
         }
       }
 
-      int chunkLen = Math.min(dataLeft, bytesPerChecksum);
-      
-      if ( chunkLen > 0 ) {
-        // len should be >= chunkLen
-        IOUtils.readFully(in, buf, offset, chunkLen);
-        checksumBytes.get(checksumBuf, 0, checksumSize);
+      // Sanity checks
+      assert len >= bytesPerChecksum;
+      assert checksum != null;
+      assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
+
+
+      int checksumsToRead, bytesToRead;
+
+      if (checksumSize > 0) {
+
+        // How many chunks left in our stream - this is a ceiling
+        // since we may have a partial chunk at the end of the file
+        int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
+
+        // How many chunks we can fit in databuffer
+        //  - note this is a floor since we always read full chunks
+        int chunksCanFit = Math.min(len / bytesPerChecksum,
+                                    checksumBuf.length / checksumSize);
+
+        // How many chunks should we read
+        checksumsToRead = Math.min(chunksLeft, chunksCanFit);
+        // How many bytes should we actually read
+        bytesToRead = Math.min(
+          checksumsToRead * bytesPerChecksum, // full chunks
+          dataLeft); // in case we have a partial
+      } else {
+        // no checksum
+        bytesToRead = Math.min(dataLeft, len);
+        checksumsToRead = 0;
+      }
+
+      if ( bytesToRead > 0 ) {
+        // Assert we have enough space
+        assert bytesToRead <= len;
+        assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
+        assert checksumBuf.length >= checksumSize * checksumsToRead;
+        IOUtils.readFully(in, buf, offset, bytesToRead);
+        checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
       }
       
-      dataLeft -= chunkLen;
+      dataLeft -= bytesToRead;
+      assert dataLeft >= 0;
+
       lastChunkOffset = chunkOffset;
-      lastChunkLen = chunkLen;
+      lastChunkLen = bytesToRead;
       
-      if ((dataLeft == 0 && isLastPacket) || chunkLen == 0) {
+      if ((dataLeft == 0 && isLastPacket) || bytesToRead == 0) {
         gotEOS = true;
       }
-      if ( chunkLen == 0 ) {
+      if ( bytesToRead == 0 ) {
         return -1;
       }
-      
-      return chunkLen;
+
+      return bytesToRead;
     }
     
     private BlockReader( String file, long blockId, DataInputStream in, 
@@ -1661,7 +1696,7 @@
    * DFSInputStream provides bytes from a named file.  It handles 
    * negotiation of the namenode and various datanodes as necessary.
    ****************************************************************/
-  class DFSInputStream extends FSInputStream {
+  private class DFSInputStream extends FSInputStream {
     private Socket s = null;
     private boolean closed = false;
 
@@ -1676,6 +1711,7 @@
     private long pos = 0;
     private long blockEnd = -1;
     private int failures = 0;
+    private int timeWindow = 3000; // wait time window (in msec) if BlockMissingException is caught
 
     /* XXX Use of CocurrentHashMap is temp fix. Need to fix 
      * parallel accesses to DFSInputStream (through ptreads) properly */
@@ -1695,6 +1731,7 @@
       this.buffersize = buffersize;
       this.src = src;
       prefetchSize = conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, prefetchSize);
+      timeWindow = conf.getInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow);
       openInfo();
     }
 
@@ -2147,7 +2184,19 @@
               + " from any node: " + ie
               + ". Will get new block locations from namenode and retry...");
           try {
-            Thread.sleep(3000);
+            // Introducing a random factor to the wait time before another retry.
+            // The wait time is dependent on # of failures and a random factor.
+            // At the first time of getting a BlockMissingException, the wait time
+            // is a random number between 0..3000 ms. If the first retry
+            // still fails, we will wait 3000 ms grace period before the 2nd retry.
+            // Also at the second retry, the waiting window is expanded to 6000 ms
+            // alleviating the request rate from the server. Similarly the 3rd retry
+            // will wait 6000ms grace period before retry and the waiting window is
+            // expanded to 9000ms. 
+            double waitTime = timeWindow * failures +       // grace period for the last round of attempt
+              timeWindow * (failures + 1) * r.nextDouble(); // expanding time window for each failure
+            LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
+            Thread.sleep((long)waitTime);
           } catch (InterruptedException iex) {
           }
           deadNodes.clear(); //2nd option is to remove only nodes[blockId]
@@ -2392,6 +2441,9 @@
     }
   }
     
+  /**
+   * The Hdfs implementation of {@link FSDataInputStream}
+   */
   public static class DFSDataInputStream extends FSDataInputStream {
     public DFSDataInputStream(DFSInputStream in)
       throws IOException {
@@ -2419,6 +2471,12 @@
       return ((DFSInputStream)in).getAllBlocks();
     }
 
+    /**
+     * @return The visible length of the file.
+     */
+    public long getVisibleLength() throws IOException {
+      return ((DFSInputStream)in).getFileLength();
+    }
   }
 
   /****************************************************************
@@ -2482,7 +2540,27 @@
       int     dataPos;
       int     checksumStart;
       int     checksumPos;      
-  
+      private static final long HEART_BEAT_SEQNO = -1L;
+
+      /**
+       *  create a heartbeat packet
+       */
+      Packet() {
+        this.lastPacketInBlock = false;
+        this.numChunks = 0;
+        this.offsetInBlock = 0;
+        this.seqno = HEART_BEAT_SEQNO;
+        
+        buffer = null;
+        int packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
+        buf = new byte[packetSize];
+        
+        checksumStart = dataStart = packetSize;
+        checksumPos = checksumStart;
+        dataPos = dataStart;
+        maxChunks = 0;
+      }
+      
       // create a new packet
       Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
         this.lastPacketInBlock = false;
@@ -2569,6 +2647,14 @@
         return offsetInBlock + dataPos - dataStart;
       }
       
+      /**
+       * Check if this packet is a heart beat packet
+       * @return true if the sequence number is HEART_BEAT_SEQNO
+       */
+      private boolean isHeartbeatPacket() {
+        return seqno == HEART_BEAT_SEQNO;
+      }
+      
       public String toString() {
         return "packet seqno:" + this.seqno +
         " offsetInBlock:" + this.offsetInBlock + 
@@ -2593,7 +2679,6 @@
       private DataInputStream blockReplyStream;
       private ResponseProcessor response = null;
       private volatile DatanodeInfo[] nodes = null; // list of targets for current block
-      private ArrayList<DatanodeInfo> excludedNodes = new ArrayList<DatanodeInfo>();
       volatile boolean hasError = false;
       volatile int errorIndex = -1;
       private BlockConstructionStage stage;  // block construction stage
@@ -2687,6 +2772,7 @@
        * and closes them. Any error recovery is also done by this thread.
        */
       public void run() {
+        long lastPacket = System.currentTimeMillis();
         while (!streamerClosed && clientRunning) {
 
           // if the Responder encountered an error, shutdown Responder
@@ -2710,19 +2796,32 @@
 
             synchronized (dataQueue) {
               // wait for a packet to be sent.
+              long now = System.currentTimeMillis();
               while ((!streamerClosed && !hasError && clientRunning 
-                  && dataQueue.size() == 0) || doSleep) {
+                  && dataQueue.size() == 0 && 
+                  (stage != BlockConstructionStage.DATA_STREAMING || 
+                   stage == BlockConstructionStage.DATA_STREAMING && 
+                   now - lastPacket < socketTimeout/2)) || doSleep ) {
+                long timeout = socketTimeout/2 - (now-lastPacket);
+                timeout = timeout <= 0 ? 1000 : timeout;
+                timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
+                   timeout : 1000;
                 try {
-                  dataQueue.wait(1000);
+                  dataQueue.wait(timeout);
                 } catch (InterruptedException  e) {
                 }
                 doSleep = false;
+                now = System.currentTimeMillis();
               }
-              if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning) {
+              if (streamerClosed || hasError || !clientRunning) {
                 continue;
               }
               // get packet to be sent.
-              one = dataQueue.getFirst();
+              if (dataQueue.isEmpty()) {
+                one = new Packet();  // heartbeat packet
+              } else {
+                one = dataQueue.getFirst(); // regular data packet
+              }
             }
 
             // get new block from namenode.
@@ -2768,9 +2867,11 @@
 
             synchronized (dataQueue) {
               // move packet from dataQueue to ackQueue
-              dataQueue.removeFirst();
-              ackQueue.addLast(one);
-              dataQueue.notifyAll();
+              if (!one.isHeartbeatPacket()) {
+                dataQueue.removeFirst();
+                ackQueue.addLast(one);
+                dataQueue.notifyAll();
+              }
             }
 
             if (LOG.isDebugEnabled()) {
@@ -2781,6 +2882,10 @@
             // write out data to remote datanode
             blockStream.write(buf.array(), buf.position(), buf.remaining());
             blockStream.flush();
+            lastPacket = System.currentTimeMillis();
+            
+            if (one.isHeartbeatPacket()) {  //heartbeat packet
+            }
             
             // update bytesSent
             long tmpBytesSent = one.getLastByteOffsetBlock();
@@ -2849,6 +2954,9 @@
        */
       void close(boolean force) {
         streamerClosed = true;
+        synchronized (dataQueue) {
+          dataQueue.notifyAll();
+        }
         if (force) {
           this.interrupt();
         }
@@ -2902,45 +3010,22 @@
         public void run() {
 
           this.setName("ResponseProcessor for block " + block);
+          PipelineAck ack = new PipelineAck();
 
           while (!responderClosed && clientRunning && !isLastPacketInBlock) {
             // process responses from datanodes.
             try {
-              // verify seqno from datanode
-              long seqno = blockReplyStream.readLong();
-              LOG.debug("DFSClient received ack for seqno " + seqno);
-              Packet one = null;
-              if (seqno == -1) {
-                continue;
-              } else if (seqno == -2) {
-                // no nothing
-              } else {
-                synchronized (dataQueue) {
-                  one = ackQueue.getFirst();
-                }
-                if (one.seqno != seqno) {
-                  throw new IOException("Responseprocessor: Expecting seqno " + 
-                      " for block " + block +
-                      one.seqno + " but received " + seqno);
-                }
-                isLastPacketInBlock = one.lastPacketInBlock;
-              }
-
-              // processes response status from all datanodes.
-              String replies = null;
+              // read an ack from the pipeline
+              ack.readFields(blockReplyStream);
               if (LOG.isDebugEnabled()) {
-                replies = "DFSClient Replies for seqno " + seqno + " are";
+                LOG.debug("DFSClient " + ack);
               }
-              for (int i = 0; i < targets.length && clientRunning; i++) {
-                final DataTransferProtocol.Status reply
-                    = DataTransferProtocol.Status.read(blockReplyStream);
-                if (LOG.isDebugEnabled()) {
-                  replies += " " + reply;
-                }
+              
+              long seqno = ack.getSeqno();
+              // processes response status from datanodes.
+              for (int i = ack.getNumOfReplies()-1; i >=0  && clientRunning; i--) {
+                final DataTransferProtocol.Status reply = ack.getReply(i);
                 if (reply != SUCCESS) {
-                  if (LOG.isDebugEnabled()) {
-                    LOG.debug(replies);
-                  }
                   errorIndex = i; // first bad datanode
                   throw new IOException("Bad response " + reply +
                       " for block " + block +
@@ -2948,16 +3033,24 @@
                       targets[i].getName());
                 }
               }
+              
+              assert seqno != PipelineAck.UNKOWN_SEQNO : 
+                "Ack for unkown seqno should be a failed ack: " + ack;
+              if (seqno == Packet.HEART_BEAT_SEQNO) {  // a heartbeat ack
+                continue;
+              }
 
-              if (LOG.isDebugEnabled()) {
-                LOG.debug(replies);
+              // a success ack for a data packet
+              Packet one = null;
+              synchronized (dataQueue) {
+                one = ackQueue.getFirst();
               }
-              
-              if (one == null) {
-                throw new IOException("Panic: responder did not receive " +
-                    "an ack for a packet: " + seqno);
+              if (one.seqno != seqno) {
+                throw new IOException("Responseprocessor: Expecting seqno " +
+                                      " for block " + block +
+                                      one.seqno + " but received " + seqno);
               }
-              
+              isLastPacketInBlock = one.lastPacketInBlock;
               // update bytesAcked
               block.setNumBytes(one.getLastByteOffsetBlock());
 
@@ -3118,9 +3211,7 @@
           success = false;
 
           long startTime = System.currentTimeMillis();
-          DatanodeInfo[] w = excludedNodes.toArray(
-              new DatanodeInfo[excludedNodes.size()]);
-          lb = locateFollowingBlock(startTime, w.length > 0 ? w : null);
+          lb = locateFollowingBlock(startTime);
           block = lb.getBlock();
           block.setNumBytes(0);
           accessToken = lb.getAccessToken();
@@ -3136,16 +3227,12 @@
             namenode.abandonBlock(block, src, clientName);
             block = null;
 
-            LOG.info("Excluding datanode " + nodes[errorIndex]);
-            excludedNodes.add(nodes[errorIndex]);
-
             // Connection failed.  Let's wait a little bit and retry
             retry = true;
             try {
               if (System.currentTimeMillis() - startTime > 5000) {
                 LOG.info("Waiting to find target node: " + nodes[0].getName());
               }
-              //TODO fix this timout. Extract it o a constant, maybe make it available from conf
               Thread.sleep(6000);
             } catch (InterruptedException iex) {
             }
@@ -3243,15 +3330,14 @@
         }
       }
 
-      private LocatedBlock locateFollowingBlock(long start,
-          DatanodeInfo[] excludedNodes) throws IOException {
+      private LocatedBlock locateFollowingBlock(long start) throws IOException {
         int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);
         long sleeptime = 400;
         while (true) {
           long localstart = System.currentTimeMillis();
           while (true) {
             try {
-              return namenode.addBlock(src, clientName, block, excludedNodes);
+              return namenode.addBlock(src, clientName, block);
             } catch (RemoteException e) {
               IOException ue = 
                 e.unwrapRemoteException(FileNotFoundException.class,
diff --git a/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index a2054a1..fda289e 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -95,6 +95,7 @@
   public static final String  DFS_NAMENODE_NAME_DIR_KEY = "dfs.namenode.name.dir";
   public static final String  DFS_NAMENODE_EDITS_DIR_KEY = "dfs.namenode.edits.dir";
   public static final String  DFS_CLIENT_READ_PREFETCH_SIZE_KEY = "dfs.client.read.prefetch.size"; 
+  public static final String  DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base";
   public static final String  DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id";
   public static final String  DFS_DATANODE_HOST_NAME_KEY = "dfs.datanode.hostname";
   public static final String  DFS_DATANODE_STORAGEID_KEY = "dfs.datanode.StorageId";
diff --git a/src/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java b/src/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
index b43f669..6048df8 100644
--- a/src/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
+++ b/src/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
@@ -22,6 +22,7 @@
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.security.RefreshUserToGroupMappingsProtocol;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.security.authorize.Service;
@@ -41,6 +42,8 @@
     new Service("security.namenode.protocol.acl", NamenodeProtocol.class),
     new Service("security.refresh.policy.protocol.acl", 
                 RefreshAuthorizationPolicyProtocol.class),
+    new Service("security.refresh.usertogroups.mappings.protocol.acl", 
+                RefreshUserToGroupMappingsProtocol.class),
   };
   
   @Override
diff --git a/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java b/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
index f41a81b..0e5e531 100644
--- a/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
+++ b/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
@@ -36,6 +36,7 @@
 import javax.security.auth.login.LoginException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -329,4 +330,101 @@
     throw new IOException("Not supported");
   }
 
+  /**
+   * A parser for parsing {@link ContentSummary} xml.
+   */
+  private class ContentSummaryParser extends DefaultHandler {
+    private ContentSummary contentsummary;
+
+    /** {@inheritDoc} */
+    public void startElement(String ns, String localname, String qname,
+                Attributes attrs) throws SAXException {
+      if (!ContentSummary.class.getName().equals(qname)) {
+        if (RemoteException.class.getSimpleName().equals(qname)) {
+          throw new SAXException(RemoteException.valueOf(attrs));
+        }
+        throw new SAXException("Unrecognized entry: " + qname);
+      }
+
+      contentsummary = toContentSummary(attrs);
+    }
+
+    /**
+     * Connect to the name node and get content summary.  
+     * @param path The path
+     * @return The content summary for the path.
+     * @throws IOException
+     */
+    private ContentSummary getContentSummary(String path) throws IOException {
+      final HttpURLConnection connection = openConnection(
+          "/contentSummary" + path, "ugi=" + ugi);
+      InputStream in = null;
+      try {
+        in = connection.getInputStream();        
+
+        final XMLReader xr = XMLReaderFactory.createXMLReader();
+        xr.setContentHandler(this);
+        xr.parse(new InputSource(in));
+      } catch(FileNotFoundException fnfe) {
+        //the server may not support getContentSummary
+        return null;
+      } catch(SAXException saxe) {
+        final Exception embedded = saxe.getException();
+        if (embedded != null && embedded instanceof IOException) {
+          throw (IOException)embedded;
+        }
+        throw new IOException("Invalid xml format", saxe);
+      } finally {
+        if (in != null) {
+          in.close();
+        }
+        connection.disconnect();
+      }
+      return contentsummary;
+    }
+  }
+
+  /** Return the object represented in the attributes. */
+  private static ContentSummary toContentSummary(Attributes attrs
+      ) throws SAXException {
+    final String length = attrs.getValue("length");
+    final String fileCount = attrs.getValue("fileCount");
+    final String directoryCount = attrs.getValue("directoryCount");
+    final String quota = attrs.getValue("quota");
+    final String spaceConsumed = attrs.getValue("spaceConsumed");
+    final String spaceQuota = attrs.getValue("spaceQuota");
+
+    if (length == null
+        || fileCount == null
+        || directoryCount == null
+        || quota == null
+        || spaceConsumed == null
+        || spaceQuota == null) {
+      return null;
+    }
+
+    try {
+      return new ContentSummary(
+          Long.parseLong(length),
+          Long.parseLong(fileCount),
+          Long.parseLong(directoryCount),
+          Long.parseLong(quota),
+          Long.parseLong(spaceConsumed),
+          Long.parseLong(spaceQuota));
+    } catch(Exception e) {
+      throw new SAXException("Invalid attributes: length=" + length
+          + ", fileCount=" + fileCount
+          + ", directoryCount=" + directoryCount
+          + ", quota=" + quota
+          + ", spaceConsumed=" + spaceConsumed
+          + ", spaceQuota=" + spaceQuota, e);
+    }
+  }
+
+  /** {@inheritDoc} */
+  public ContentSummary getContentSummary(Path f) throws IOException {
+    final String s = makeQualified(f).toUri().getPath();
+    final ContentSummary cs = new ContentSummaryParser().getContentSummary(s);
+    return cs != null? cs: super.getContentSummary(f);
+  }
 }
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index d0a672f..a740ee1 100644
--- a/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -198,9 +198,6 @@
   public LocatedBlock addBlock(String src, String clientName,
                                Block previous) throws IOException;
 
-  public LocatedBlock addBlock(String src, String clientName,
-      Block previous, DatanodeInfo[] excludedNode) throws IOException;
-
   /**
    * The client is done writing data to the given filename, and would 
    * like to complete it.  
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java b/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
index 1bd2427..3166f33 100644
--- a/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
+++ b/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
@@ -26,6 +26,7 @@
 
 import org.apache.hadoop.hdfs.security.BlockAccessToken;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 
 /**
@@ -39,12 +40,11 @@
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 17:
-   *    Change the block write protocol to support pipeline recovery.
-   *    Additional fields, like recovery flags, new GS, minBytesRcvd, 
-   *    and maxBytesRcvd are included.
+   * Version 19:
+   *    Change the block packet ack protocol to include seqno,
+   *    numberOfReplies, reply0, reply1, ...
    */
-  public static final int DATA_TRANSFER_VERSION = 17;
+  public static final int DATA_TRANSFER_VERSION = 19;
 
   /** Operation */
   public enum Op {
@@ -453,4 +453,98 @@
       return t; 
     }
   }
+  
+  /** reply **/
+  public static class PipelineAck implements Writable {
+    private long seqno;
+    private Status replies[];
+    public final static long UNKOWN_SEQNO = -2;
+
+    /** default constructor **/
+    public PipelineAck() {
+    }
+    
+    /**
+     * Constructor
+     * @param seqno sequence number
+     * @param replies an array of replies
+     */
+    public PipelineAck(long seqno, Status[] replies) {
+      this.seqno = seqno;
+      this.replies = replies;
+    }
+    
+    /**
+     * Get the sequence number
+     * @return the sequence number
+     */
+    public long getSeqno() {
+      return seqno;
+    }
+    
+    /**
+     * Get the number of replies
+     * @return the number of replies
+     */
+    public short getNumOfReplies() {
+      return (short)replies.length;
+    }
+    
+    /**
+     * get the ith reply
+     * @return the the ith reply
+     */
+    public Status getReply(int i) {
+      if (i<0 || i>=replies.length) {
+        throw new IllegalArgumentException("The input parameter " + i + 
+            " should in the range of [0, " + replies.length);
+      }
+      return replies[i];
+    }
+    
+    /**
+     * Check if this ack contains error status
+     * @return true if all statuses are SUCCESS
+     */
+    public boolean isSuccess() {
+      for (Status reply : replies) {
+        if (reply != Status.SUCCESS) {
+          return false;
+        }
+      }
+      return true;
+    }
+    
+    /**** Writable interface ****/
+    @Override // Writable
+    public void readFields(DataInput in) throws IOException {
+      seqno = in.readLong();
+      short numOfReplies = in.readShort();
+      replies = new Status[numOfReplies];
+      for (int i=0; i<numOfReplies; i++) {
+        replies[i] = Status.read(in);
+      }
+    }
+
+    @Override // Writable
+    public void write(DataOutput out) throws IOException {
+      //WritableUtils.writeVLong(out, seqno);
+      out.writeLong(seqno);
+      out.writeShort((short)replies.length);
+      for(Status reply : replies) {
+        reply.write(out);
+      }
+    }
+    
+    @Override //Object
+    public String toString() {
+      StringBuilder ack = new StringBuilder("Replies for seqno ");
+      ack.append( seqno ).append( " are" );
+      for(Status reply : replies) {
+        ack.append(" ");
+        ack.append(reply);
+      }
+      return ack.toString();
+    }
+  }
 }
diff --git a/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index 65370eb..63d4939 100644
--- a/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -789,7 +789,6 @@
   
   /** Default constructor */
   Balancer() throws UnsupportedActionException {
-    checkReplicationPolicyCompatibility(getConf());
   }
   
   /** Construct a balancer from the given configuration */
diff --git a/src/java/org/apache/hadoop/hdfs/server/common/Util.java b/src/java/org/apache/hadoop/hdfs/server/common/Util.java
index 5e8de72..d754237 100644
--- a/src/java/org/apache/hadoop/hdfs/server/common/Util.java
+++ b/src/java/org/apache/hadoop/hdfs/server/common/Util.java
@@ -17,7 +17,19 @@
  */
 package org.apache.hadoop.hdfs.server.common;
 
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 public final class Util {
+  private final static Log LOG = LogFactory.getLog(Util.class.getName());
+
   /**
    * Current system time.
    * @return current time in msec.
@@ -25,4 +37,58 @@
   public static long now() {
     return System.currentTimeMillis();
   }
-}
\ No newline at end of file
+  
+  /**
+   * Interprets the passed string as a URI. In case of error it 
+   * assumes the specified string is a file.
+   *
+   * @param s the string to interpret
+   * @return the resulting URI 
+   * @throws IOException 
+   */
+  public static URI stringAsURI(String s) throws IOException {
+    URI u = null;
+    // try to make a URI
+    try {
+      u = new URI(s);
+    } catch (URISyntaxException e){
+      LOG.warn("Path " + s + " should be specified as a URI " +
+      "in configuration files. Please update hdfs configuration.", e);
+    }
+
+    // if URI is null or scheme is undefined, then assume it's file://
+    if(u == null || u.getScheme() == null){
+      u = fileAsURI(new File(s));
+    }
+    return u;
+  }
+
+  /**
+   * Converts the passed File to a URI.
+   *
+   * @param f the file to convert
+   * @return the resulting URI 
+   * @throws IOException 
+   */
+  public static URI fileAsURI(File f) throws IOException {
+    return f.getCanonicalFile().toURI();
+  }
+
+  /**
+   * Converts a collection of strings into a collection of URIs.
+   * @param names collection of strings to convert to URIs
+   * @return collection of URIs
+   */
+  public static Collection<URI> stringCollectionAsURIs(
+                                  Collection<String> names) {
+    Collection<URI> uris = new ArrayList<URI>(names.size());
+    for(String name : names) {
+      try {
+        uris.add(stringAsURI(name));
+      } catch (IOException e) {
+        LOG.error("Error while processing URI: " + name, e);
+      }
+    }
+    return uris;
+  }
+}
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 5aab2b5..d65a598 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -35,11 +35,11 @@
 import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.FSOutputSummer;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
@@ -77,6 +77,7 @@
   private Checksum partialCrc = null;
   private final DataNode datanode;
   final private ReplicaInPipelineInterface replicaInfo;
+  volatile private boolean mirrorError;
 
   BlockReceiver(Block block, DataInputStream in, String inAddr,
                 String myAddr, BlockConstructionStage stage, 
@@ -217,21 +218,19 @@
 
   /**
    * While writing to mirrorOut, failure to write to mirror should not
-   * affect this datanode unless a client is writing the block.
+   * affect this datanode unless it is caused by interruption.
    */
   private void handleMirrorOutError(IOException ioe) throws IOException {
     LOG.info(datanode.dnRegistration + ":Exception writing block " +
              block + " to mirror " + mirrorAddr + "\n" +
              StringUtils.stringifyException(ioe));
-    mirrorOut = null;
-    //
-    // If stream-copy fails, continue
-    // writing to disk for replication requests. For client
-    // writes, return error so that the client can do error
-    // recovery.
-    //
-    if (clientName.length() > 0) {
+    if (Thread.interrupted()) { // shut down if the thread is interrupted
       throw ioe;
+    } else { // encounter an error while writing to mirror
+      // continue to run even if can not write to mirror
+      // notify client of the error
+      // and wait for the client to shut down the pipeline
+      mirrorError = true;
     }
   }
   
@@ -433,6 +432,14 @@
     return receivePacket(offsetInBlock, seqno, lastPacketInBlock, len, endOfHeader);
   }
 
+  /**
+   * Write the received packet to disk (data only)
+   */
+  private void writePacketToDisk(byte[] pktBuf, int startByteToDisk, 
+      int numBytesToDisk) throws IOException {
+    out.write(pktBuf, startByteToDisk, numBytesToDisk);
+  }
+  
   /** 
    * Receives and processes a packet. It can contain many chunks.
    * returns the number of data bytes that the packet has.
@@ -461,7 +468,7 @@
     }  
 
     //First write the packet to the mirror:
-    if (mirrorOut != null) {
+    if (mirrorOut != null && !mirrorError) {
       try {
         mirrorOut.write(buf.array(), buf.position(), buf.remaining());
         mirrorOut.flush();
@@ -469,7 +476,7 @@
         handleMirrorOutError(e);
       }
     }
-
+    
     buf.position(endOfHeader);        
     
     if (lastPacketInBlock || len == 0) {
@@ -525,7 +532,7 @@
 
           int startByteToDisk = dataOff+(int)(onDiskLen-firstByteInBlock);
           int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
-          out.write(pktBuf, startByteToDisk, numBytesToDisk);
+          writePacketToDisk(pktBuf, startByteToDisk, numBytesToDisk);
 
           // If this is a partial chunk, then verify that this is the only
           // chunk in the packet. Calculate new crc for this chunk.
@@ -560,7 +567,7 @@
       throttler.throttle(len);
     }
     
-    return len;
+    return lastPacketInBlock?-1:len;
   }
 
   void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
@@ -584,14 +591,15 @@
       if (clientName.length() > 0) {
         responder = new Daemon(datanode.threadGroup, 
                                new PacketResponder(this, block, mirrIn, 
-                                                   replyOut, numTargets));
+                                                   replyOut, numTargets,
+                                                   Thread.currentThread()));
         responder.start(); // start thread to processes reponses
       }
 
       /* 
-       * Receive until packet has zero bytes of data.
+       * Receive until the last packet.
        */
-      while (receivePacket() > 0) {}
+      while (receivePacket() >= 0) {}
 
       // wait for all outstanding packet responses. And then
       // indicate responder to gracefully shutdown.
@@ -729,13 +737,16 @@
     DataOutputStream replyOut;  // output to upstream datanode
     private int numTargets;     // number of downstream datanodes including myself
     private BlockReceiver receiver; // The owner of this responder.
+    private Thread receiverThread; // the thread that spawns this responder
 
     public String toString() {
       return "PacketResponder " + numTargets + " for Block " + this.block;
     }
 
     PacketResponder(BlockReceiver receiver, Block b, DataInputStream in, 
-                    DataOutputStream out, int numTargets) {
+                    DataOutputStream out, int numTargets,
+                    Thread receiverThread) {
+      this.receiverThread = receiverThread;
       this.receiver = receiver;
       this.block = b;
       mirrorIn = in;
@@ -775,145 +786,31 @@
       notifyAll();
     }
 
-    private synchronized void lastDataNodeRun() {
-      long lastHeartbeat = System.currentTimeMillis();
-      boolean lastPacket = false;
-      final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-
-      while (running && datanode.shouldRun && !lastPacket) {
-        long now = System.currentTimeMillis();
-        try {
-
-            // wait for a packet to be sent to downstream datanode
-            while (running && datanode.shouldRun && ackQueue.size() == 0) {
-              long idle = now - lastHeartbeat;
-              long timeout = (datanode.socketTimeout/2) - idle;
-              if (timeout <= 0) {
-                timeout = 1000;
-              }
-              try {
-                wait(timeout);
-              } catch (InterruptedException e) {
-                if (running) {
-                  LOG.info("PacketResponder " + numTargets +
-                           " for block " + block + " Interrupted.");
-                  running = false;
-                }
-                break;
-              }
-          
-              // send a heartbeat if it is time.
-              now = System.currentTimeMillis();
-              if (now - lastHeartbeat > datanode.socketTimeout/2) {
-                replyOut.writeLong(-1); // send heartbeat
-                replyOut.flush();
-                lastHeartbeat = now;
-              }
-            }
-
-            if (!running || !datanode.shouldRun) {
-              break;
-            }
-            Packet pkt = ackQueue.getFirst();
-            long expected = pkt.seqno;
-            LOG.debug("PacketResponder " + numTargets +
-                      " for block " + block + 
-                      " acking for packet " + expected);
-
-            // If this is the last packet in block, then close block
-            // file and finalize the block before responding success
-            if (pkt.lastPacketInBlock) {
-              receiver.close();
-              final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-              block.setNumBytes(replicaInfo.getNumBytes());
-              datanode.data.finalizeBlock(block);
-              datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
-              if (ClientTraceLog.isInfoEnabled() &&
-                  receiver.clientName.length() > 0) {
-                long offset = 0;
-                ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
-                    receiver.inAddr, receiver.myAddr, block.getNumBytes(),
-                    "HDFS_WRITE", receiver.clientName, offset,
-                    datanode.dnRegistration.getStorageID(), block, endTime-startTime));
-              } else {
-                LOG.info("Received block " + block + 
-                    " of size " + block.getNumBytes() + 
-                    " from " + receiver.inAddr);
-              }
-              lastPacket = true;
-            }
-
-            ackReply(expected);
-            replyOut.flush();
-            // remove the packet from the ack queue
-            removeAckHead();
-            // update the bytes acked
-            if (pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
-              replicaInfo.setBytesAcked(pkt.lastByteInBlock);
-            }
-        } catch (Exception e) {
-          LOG.warn("IOException in BlockReceiver.lastNodeRun: ", e);
-          if (running) {
-            try {
-              datanode.checkDiskError(e); // may throw an exception here
-            } catch (IOException ioe) {
-              LOG.warn("DataNode.chekDiskError failed in lastDataNodeRun with: ",
-                  ioe);
-            }
-            LOG.info("PacketResponder " + block + " " + numTargets + 
-                     " Exception " + StringUtils.stringifyException(e));
-            running = false;
-          }
-        }
-      }
-      LOG.info("PacketResponder " + numTargets + 
-               " for block " + block + " terminating");
-    }
-
-    // This method is introduced to facilitate testing. Otherwise
-    // there was a little chance to bind an AspectJ advice to such a sequence
-    // of calls
-    private void ackReply(long expected) throws IOException {
-      replyOut.writeLong(expected);
-      SUCCESS.write(replyOut);
-    }
-
     /**
      * Thread to process incoming acks.
      * @see java.lang.Runnable#run()
      */
     public void run() {
-
-      // If this is the last datanode in pipeline, then handle differently
-      if (numTargets == 0) {
-        lastDataNodeRun();
-        return;
-      }
-
       boolean lastPacketInBlock = false;
       final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
       while (running && datanode.shouldRun && !lastPacketInBlock) {
 
         boolean isInterrupted = false;
         try {
-            DataTransferProtocol.Status op = SUCCESS;
-            boolean didRead = false;
             Packet pkt = null;
             long expected = -2;
-            try { 
-              // read seqno from downstream datanode
-              long seqno = mirrorIn.readLong();
-              didRead = true;
-              if (seqno == -1) {
-                replyOut.writeLong(-1); // send keepalive
-                replyOut.flush();
-                LOG.debug("PacketResponder " + numTargets + " got -1");
-                continue;
-              } else if (seqno == -2) {
-                LOG.debug("PacketResponder " + numTargets + " got -2");
-              } else {
-                LOG.debug("PacketResponder " + numTargets + " got seqno = " + 
-                    seqno);
+            PipelineAck ack = new PipelineAck();
+            long seqno = PipelineAck.UNKOWN_SEQNO;
+            try {
+              if (numTargets != 0 && !mirrorError) {// not the last DN & no mirror error
+                // read an ack from downstream datanode
+                ack.readFields(mirrorIn);
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("PacketResponder " + numTargets + " got " + ack);
+                }
+                seqno = ack.getSeqno();
+              }
+              if (seqno != PipelineAck.UNKOWN_SEQNO || numTargets == 0) {
                 synchronized (this) {
                   while (running && datanode.shouldRun && ackQueue.size() == 0) {
                     if (LOG.isDebugEnabled()) {
@@ -922,17 +819,14 @@
                                 " for block " + block +
                                 " waiting for local datanode to finish write.");
                     }
-                    try {
-                      wait();
-                    } catch (InterruptedException e) {
-                      isInterrupted = true;
-                      throw e;
-                    }
+                    wait();
+                  }
+                  if (!running || !datanode.shouldRun) {
+                    break;
                   }
                   pkt = ackQueue.getFirst();
                   expected = pkt.seqno;
-                  LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
-                  if (seqno != expected) {
+                  if (numTargets > 0 && seqno != expected) {
                     throw new IOException("PacketResponder " + numTargets +
                                           " for block " + block +
                                           " expected seqno:" + expected +
@@ -941,11 +835,18 @@
                   lastPacketInBlock = pkt.lastPacketInBlock;
                 }
               }
-            } catch (Throwable e) {
-              if (running) {
+            } catch (InterruptedException ine) {
+              isInterrupted = true;
+            } catch (IOException ioe) {
+              if (Thread.interrupted()) {
+                isInterrupted = true;
+              } else {
+                // continue to run even if can not read from mirror
+                // notify client of the error
+                // and wait for the client to shut down the pipeline
+                mirrorError = true;
                 LOG.info("PacketResponder " + block + " " + numTargets + 
-                         " Exception " + StringUtils.stringifyException(e));
-                running = false;
+                      " Exception " + StringUtils.stringifyException(ioe));
               }
             }
 
@@ -955,8 +856,7 @@
                * receiver thread (e.g. if it is ok to write to replyOut). 
                * It is prudent to not send any more status back to the client
                * because this datanode has a problem. The upstream datanode
-               * will detect a timout on heartbeats and will declare that
-               * this datanode is bad, and rightly so.
+               * will detect that this datanode is bad, and rightly so.
                */
               LOG.info("PacketResponder " + block +  " " + numTargets +
                        " : Thread is interrupted.");
@@ -964,10 +864,6 @@
               continue;
             }
             
-            if (!didRead) {
-              op = ERROR;
-            }
-            
             // If this is the last packet in block, then close block
             // file and finalize the block before responding success
             if (lastPacketInBlock) {
@@ -990,56 +886,39 @@
               }
             }
 
-            // send my status back to upstream datanode
-            ackReply(expected);
-
-            LOG.debug("PacketResponder " + numTargets + 
-                      " for block " + block +
-                      " responded my status " +
-                      " for seqno " + expected);
-
-            boolean success = true;
-            // forward responses from downstream datanodes.
-            for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
-              try {
-                if (op == SUCCESS) {
-                  op = Status.read(mirrorIn);
-                  if (op != SUCCESS) {
-                    success = false;
-                    LOG.debug("PacketResponder for block " + block +
-                              ": error code received from downstream " +
-                              " datanode[" + i + "] " + op);
-                  }
-                }
-              } catch (Throwable e) {
-                op = ERROR;
-                success = false;
+            // construct my ack message
+            Status[] replies = null;
+            if (mirrorError) { // ack read error
+              replies = new Status[2];
+              replies[0] = SUCCESS;
+              replies[1] = ERROR;
+            } else {
+              short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
+              replies = new Status[1+ackLen];
+              replies[0] = SUCCESS;
+              for (int i=0; i<ackLen; i++) {
+                replies[i+1] = ack.getReply(i);
               }
-              op.write(replyOut);
             }
-            replyOut.flush();
+            PipelineAck replyAck = new PipelineAck(expected, replies);
             
-            LOG.debug("PacketResponder " + block + " " + numTargets + 
-                      " responded other status " + " for seqno " + expected);
-
+            // send my ack back to upstream datanode
+            replyAck.write(replyOut);
+            replyOut.flush();
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("PacketResponder " + numTargets + 
+                        " for block " + block +
+                        " responded an ack: " + replyAck);
+            }
             if (pkt != null) {
               // remove the packet from the ack queue
               removeAckHead();
               // update bytes acked
-              if (success && pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
+              if (replyAck.isSuccess() && 
+                  pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
                 replicaInfo.setBytesAcked(pkt.lastByteInBlock);
               }
             }
-            // If we were unable to read the seqno from downstream, then stop.
-            if (expected == -2) {
-              running = false;
-            }
-            // If we forwarded an error response from a downstream datanode
-            // and we are acting on behalf of a client, then we quit. The 
-            // client will drive the recovery mechanism.
-            if (op == ERROR && receiver.clientName.length() > 0) {
-              running = false;
-            }
         } catch (IOException e) {
           LOG.warn("IOException in BlockReceiver.run(): ", e);
           if (running) {
@@ -1051,12 +930,16 @@
             LOG.info("PacketResponder " + block + " " + numTargets + 
                      " Exception " + StringUtils.stringifyException(e));
             running = false;
+            if (!Thread.interrupted()) { // failure not caused by interruption
+              receiverThread.interrupt();
+            }
           }
-        } catch (RuntimeException e) {
+        } catch (Throwable e) {
           if (running) {
             LOG.info("PacketResponder " + block + " " + numTargets + 
                      " Exception " + StringUtils.stringifyException(e));
             running = false;
+            receiverThread.interrupt();
           }
         }
       }
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java b/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
index 42ba15e..6383239 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
@@ -355,12 +355,14 @@
       return dfsUsage.getUsed();
     }
     
+    /**
+     * Calculate the capacity of the filesystem, after removing any
+     * reserved capacity.
+     * @return the unreserved number of bytes left in this filesystem. May be zero.
+     */
     long getCapacity() throws IOException {
-      if (reserved > usage.getCapacity()) {
-        return 0;
-      }
-
-      return usage.getCapacity()-reserved;
+      long remaining = usage.getCapacity() - reserved;
+      return remaining > 0 ? remaining : 0;
     }
       
     long getAvailable() throws IOException {
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java b/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
index d8cdcbe..9e3f618 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
@@ -37,7 +37,6 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.util.Daemon;
 
 /**
  * BackupNode.
@@ -66,8 +65,6 @@
   String nnHttpAddress;
   /** Checkpoint manager */
   Checkpointer checkpointManager;
-  /** Checkpoint daemon */
-  private Daemon cpDaemon;
 
   BackupNode(Configuration conf, NamenodeRole role) throws IOException {
     super(conf, role);
@@ -142,9 +139,17 @@
    */
   @Override // NameNode
   protected void innerClose() throws IOException {
-    if(checkpointManager != null) checkpointManager.shouldRun = false;
-    if(cpDaemon != null) cpDaemon.interrupt();
+    if(checkpointManager != null) {
+      // Prevent from starting a new checkpoint.
+      // Checkpoints that has already been started may proceed until 
+      // the error reporting to the name-node is complete.
+      // Checkpoint manager should not be interrupted yet because it will
+      // close storage file channels and the checkpoint may fail with 
+      // ClosedByInterruptException.
+      checkpointManager.shouldRun = false;
+    }
     if(namenode != null && getRegistration() != null) {
+      // Exclude this node from the list of backup streams on the name-node
       try {
         namenode.errorReport(getRegistration(), NamenodeProtocol.FATAL,
             "Shutting down.");
@@ -152,7 +157,15 @@
         LOG.error("Failed to report to name-node.", e);
       }
     }
-    RPC.stopProxy(namenode); // stop the RPC threads
+    // Stop the RPC client
+    RPC.stopProxy(namenode);
+    namenode = null;
+    // Stop the checkpoint manager
+    if(checkpointManager != null) {
+      checkpointManager.interrupt();
+      checkpointManager = null;
+    }
+    // Stop name-node threads
     super.innerClose();
   }
 
@@ -243,7 +256,7 @@
     this.nnHttpAddress = getHostPortString(super.getHttpServerAddress(conf));
     // get version and id info from the name-node
     NamespaceInfo nsInfo = null;
-    while(!stopRequested) {
+    while(!isStopRequested()) {
       try {
         nsInfo = handshake(namenode);
         break;
@@ -262,8 +275,7 @@
    */
   private void runCheckpointDaemon(Configuration conf) throws IOException {
     checkpointManager = new Checkpointer(conf, this);
-    cpDaemon = new Daemon(checkpointManager);
-    cpDaemon.start();
+    checkpointManager.start();
   }
 
   /**
@@ -300,7 +312,7 @@
 
     setRegistration();
     NamenodeRegistration nnReg = null;
-    while(!stopRequested) {
+    while(!isStopRequested()) {
       try {
         nnReg = namenode.register(getRegistration());
         break;
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java b/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
index 0bc4acf..43d9f5a 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
@@ -41,7 +41,6 @@
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
 import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.mortbay.log.Log;
 
 /**
  * Keeps information related to the blocks stored in the Hadoop cluster.
@@ -1614,6 +1613,7 @@
         NameNode.stateChangeLog.info("BLOCK* ask " + dn.getName()
             + " to delete " + blockList);
       }
+      pendingDeletionBlocksCount -= blocksToInvalidate.size();
       return blocksToInvalidate.size();
     }
   }
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java b/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
index c723196..f6cb181 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
@@ -21,7 +21,6 @@
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.net.Node;
 import org.apache.hadoop.util.ReflectionUtils;
 import java.util.*;
 
@@ -61,26 +60,6 @@
    * choose <i>numOfReplicas</i> data nodes for <i>writer</i> 
    * to re-replicate a block with size <i>blocksize</i> 
    * If not, return as many as we can.
-   * 
-   * @param srcPath the file to which this chooseTargets is being invoked.
-   * @param numOfReplicas additional number of replicas wanted.
-   * @param writer the writer's machine, null if not in the cluster.
-   * @param chosenNodes datanodes that have been chosen as targets.
-   * @param excludedNodes: datanodes that should not be considered as targets.
-   * @param blocksize size of the data to be written.
-   * @return array of DatanodeDescriptor instances chosen as target
-   * and sorted as a pipeline.
-   */
-  abstract DatanodeDescriptor[] chooseTarget(String srcPath,
-                                             int numOfReplicas,
-                                             DatanodeDescriptor writer,
-                                             List<DatanodeDescriptor> chosenNodes,
-                                             HashMap<Node, Node> excludedNodes,
-                                             long blocksize);
-
-  /**
-   * choose <i>numOfReplicas</i> data nodes for <i>writer</i>
-   * If not, return as many as we can.
    * The base implemenatation extracts the pathname of the file from the
    * specified srcInode, but this could be a costly operation depending on the
    * file system implementation. Concrete implementations of this class should
@@ -188,29 +167,4 @@
                         new ArrayList<DatanodeDescriptor>(),
                         blocksize);
   }
-
-  /**
-   * choose <i>numOfReplicas</i> nodes for <i>writer</i> to replicate
-   * a block with size <i>blocksize</i>
-   * If not, return as many as we can.
-   * 
-   * @param srcPath a string representation of the file for which chooseTarget is invoked
-   * @param numOfReplicas number of replicas wanted.
-   * @param writer the writer's machine, null if not in the cluster.
-   * @param blocksize size of the data to be written.
-   * @param excludedNodes: datanodes that should not be considered as targets.
-   * @return array of DatanodeDescriptor instances chosen as targets
-   * and sorted as a pipeline.
-   */
-  DatanodeDescriptor[] chooseTarget(String srcPath,
-                                    int numOfReplicas,
-                                    DatanodeDescriptor writer,
-                                    HashMap<Node, Node> excludedNodes,
-                                    long blocksize) {
-    return chooseTarget(srcPath, numOfReplicas, writer,
-                        new ArrayList<DatanodeDescriptor>(),
-                        excludedNodes,
-                        blocksize);
-  }
-
 }
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java b/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
index 96dca00..b01dcac 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
@@ -68,17 +68,6 @@
   }
 
   /** {@inheritDoc} */
-  public DatanodeDescriptor[] chooseTarget(String srcPath,
-                                    int numOfReplicas,
-                                    DatanodeDescriptor writer,
-                                    List<DatanodeDescriptor> chosenNodes,
-                                    HashMap<Node, Node> excludedNodes,
-                                    long blocksize) {
-    return chooseTarget(numOfReplicas, writer, chosenNodes, excludedNodes, blocksize);
-  }
-
-
-  /** {@inheritDoc} */
   @Override
   public DatanodeDescriptor[] chooseTarget(FSInodeInfo srcInode,
                                     int numOfReplicas,
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java b/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
index a7d5d56..c1c7f66 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
@@ -37,6 +37,7 @@
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.util.Daemon;
 
 /**
  * The Checkpointer is responsible for supporting periodic checkpoints 
@@ -49,7 +50,7 @@
  * The start of a checkpoint is triggered by one of the two factors:
  * (1) time or (2) the size of the edits file.
  */
-class Checkpointer implements Runnable {
+class Checkpointer extends Daemon {
   public static final Log LOG = 
     LogFactory.getLog(Checkpointer.class.getName());
 
@@ -95,7 +96,7 @@
     HttpServer httpServer = backupNode.httpServer;
     httpServer.setAttribute("name.system.image", getFSImage());
     httpServer.setAttribute("name.conf", conf);
-    httpServer.addServlet("getimage", "/getimage", GetImageServlet.class);
+    httpServer.addInternalServlet("getimage", "/getimage", GetImageServlet.class);
 
     LOG.info("Checkpoint Period : " + checkpointPeriod + " secs " +
              "(" + checkpointPeriod/60 + " min)");
@@ -144,7 +145,8 @@
         LOG.error("Exception in doCheckpoint: ", e);
       } catch(Throwable e) {
         LOG.error("Throwable Exception in doCheckpoint: ", e);
-        Runtime.getRuntime().exit(-1);
+        shutdown();
+        break;
       }
       try {
         Thread.sleep(periodMSec);
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryServlet.java b/src/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryServlet.java
new file mode 100644
index 0000000..f534625
--- /dev/null
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryServlet.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.znerd.xmlenc.XMLOutputter;
+
+/** Servlets for file checksum */
+public class ContentSummaryServlet extends DfsServlet {
+  /** For java.io.Serializable */
+  private static final long serialVersionUID = 1L;
+  
+  /** {@inheritDoc} */
+  public void doGet(HttpServletRequest request, HttpServletResponse response
+      ) throws ServletException, IOException {
+    final UnixUserGroupInformation ugi = getUGI(request);
+    final String path = request.getPathInfo();
+
+    final PrintWriter out = response.getWriter();
+    final XMLOutputter xml = new XMLOutputter(out, "UTF-8");
+    xml.declaration();
+    try {
+      //get content summary
+      final ClientProtocol nnproxy = createNameNodeProxy(ugi);
+      final ContentSummary cs = nnproxy.getContentSummary(path);
+
+      //write xml
+      xml.startTag(ContentSummary.class.getName());
+      if (cs != null) {
+        xml.attribute("length"        , "" + cs.getLength());
+        xml.attribute("fileCount"     , "" + cs.getFileCount());
+        xml.attribute("directoryCount", "" + cs.getDirectoryCount());
+        xml.attribute("quota"         , "" + cs.getQuota());
+        xml.attribute("spaceConsumed" , "" + cs.getSpaceConsumed());
+        xml.attribute("spaceQuota"    , "" + cs.getSpaceQuota());
+      }
+      xml.endTag();
+    } catch(IOException ioe) {
+      new RemoteException(ioe.getClass().getName(), ioe.getMessage()
+          ).writeXml(path, xml);
+    }
+    xml.endDocument();
+  }
+}
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java b/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
index 1e0db5c..feb5ca2 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
@@ -22,7 +22,6 @@
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index cfab437..ce4f9ed 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -24,7 +24,6 @@
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Iterator;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -216,7 +215,7 @@
   /**
    * Shutdown the file store.
    */
-  public synchronized void close() {
+  synchronized void close() {
     while (isSyncRunning) {
       try {
         wait(1000);
@@ -275,12 +274,6 @@
 
     String lsd = fsimage.listStorageDirectories();
     FSNamesystem.LOG.info("current list of storage dirs:" + lsd);
-    //EditLogOutputStream
-    if (editStreams == null || editStreams.size() <= 1) {
-      FSNamesystem.LOG.fatal(
-      "Fatal Error : All storage directories are inaccessible."); 
-      Runtime.getRuntime().exit(-1);
-    }
 
     ArrayList<StorageDirectory> al = null;
     for (EditLogOutputStream eStream : errorStreams) {
@@ -311,6 +304,12 @@
       } 
     }
     
+    if (editStreams == null || editStreams.size() <= 0) {
+      String msg = "Fatal Error: All storage directories are inaccessible.";
+      FSNamesystem.LOG.fatal(msg, new IOException(msg)); 
+      Runtime.getRuntime().exit(-1);
+    }
+
     // removed failed SDs
     if(propagate && al != null) fsimage.processIOError(al, false);
     
@@ -867,6 +866,7 @@
         try {
           eStream.flush();
         } catch (IOException ie) {
+          FSNamesystem.LOG.error("Unable to sync edit log.", ie);
           //
           // remember the streams that encountered an error.
           //
@@ -874,8 +874,6 @@
             errorStreams = new ArrayList<EditLogOutputStream>(1);
           }
           errorStreams.add(eStream);
-          FSNamesystem.LOG.error("Unable to sync edit log. " +
-                                 "Fatal Error.");
         }
       }
       long elapsed = FSNamesystem.now() - start;
@@ -1165,6 +1163,7 @@
         // replace by the new stream
         itE.replace(eStream);
       } catch (IOException e) {
+        FSNamesystem.LOG.warn("Error in editStream " + eStream.getName(), e);
         if(errorStreams == null)
           errorStreams = new ArrayList<EditLogOutputStream>(1);
         errorStreams.add(eStream);
@@ -1225,6 +1224,7 @@
         // replace by the new stream
         itE.replace(eStream);
       } catch (IOException e) {
+        FSNamesystem.LOG.warn("Error in editStream " + eStream.getName(), e);
         if(errorStreams == null)
           errorStreams = new ArrayList<EditLogOutputStream>(1);
         errorStreams.add(eStream);
@@ -1390,6 +1390,7 @@
       try {
         eStream.write(data, 0, length);
       } catch (IOException ie) {
+        FSNamesystem.LOG.warn("Error in editStream " + eStream.getName(), ie);
         if(errorStreams == null)
           errorStreams = new ArrayList<EditLogOutputStream>(1);
         errorStreams.add(eStream);
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index 9f34651..754dfe6 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -56,6 +56,7 @@
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.UpgradeManager;
+import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
@@ -305,11 +306,10 @@
     for ( ;it.hasNext(); ) {
       StorageDirectory sd = it.next();
       try {
-        list.add(new URI("file://" + sd.getRoot().getAbsolutePath()));
-      } catch (Exception e) {
+        list.add(Util.fileAsURI(sd.getRoot()));
+      } catch (IOException e) {
         throw new IOException("Exception while processing " +
-            "StorageDirectory " + sd.getRoot().getAbsolutePath() + ". The"
-            + " full error message is " + e.getMessage());
+            "StorageDirectory " + sd.getRoot(), e);
       }
     }
     return list;
@@ -1708,7 +1708,7 @@
     ckptState = CheckpointStates.UPLOAD_DONE;
   }
 
-  void close() throws IOException {
+  synchronized void close() throws IOException {
     getEditLog().close();
     unlockAll();
   }
@@ -1907,25 +1907,9 @@
     if (dirNames.size() == 0 && defaultValue != null) {
       dirNames.add(defaultValue);
     }
-    Collection<URI> dirs = new ArrayList<URI>(dirNames.size());
-    for(String name : dirNames) {
-      try {
-        // process value as URI 
-        URI u = new URI(name);
-        // if scheme is undefined, then assume it's file://
-        if(u.getScheme() == null)
-          u = new URI("file://" + new File(name).getAbsolutePath());
-        // check that scheme is not null (trivial) and supported
-        checkSchemeConsistency(u);
-        dirs.add(u);
-      } catch (Exception e) {
-        LOG.error("Error while processing URI: " + name + 
-            ". The error message was: " + e.getMessage());
-      }
-    }
-    return dirs;
+    return Util.stringCollectionAsURIs(dirNames);
   }
-  
+
   static Collection<URI> getCheckpointEditsDirs(Configuration conf,
       String defaultName) {
     Collection<String> dirNames = 
@@ -1933,23 +1917,7 @@
     if (dirNames.size() == 0 && defaultName != null) {
       dirNames.add(defaultName);
     }
-    Collection<URI> dirs = new ArrayList<URI>(dirNames.size());
-    for(String name : dirNames) {
-      try {
-        // process value as URI 
-        URI u = new URI(name);
-        // if scheme is undefined, then assume it's file://
-        if(u.getScheme() == null)
-          u = new URI("file://" + new File(name).getAbsolutePath());
-        // check that scheme is not null (trivial) and supported
-        checkSchemeConsistency(u);
-        dirs.add(u);
-      } catch (Exception e) {
-        LOG.error("Error while processing URI: " + name + 
-            ". The error message was: " + e.getMessage());
-      }
-    }
-    return dirs;    
+    return Util.stringCollectionAsURIs(dirNames);
   }
 
   static private final DeprecatedUTF8 U_STR = new DeprecatedUTF8();
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index e1c7443..7589682 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -29,6 +29,7 @@
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
+import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMetrics;
 import org.apache.hadoop.security.AccessControlException;
@@ -40,7 +41,6 @@
 import org.apache.hadoop.net.CachedDNSToSwitchMapping;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
@@ -339,26 +339,8 @@
           "\n\t\t- use Backup Node as a persistent and up-to-date storage " +
           "of the file system meta-data.");
     } else if (dirNames.isEmpty())
-      dirNames.add("/tmp/hadoop/dfs/name");
-    Collection<URI> dirs = new ArrayList<URI>(dirNames.size());
-    for(String name : dirNames) {
-      try {
-        URI u = new URI(name);
-        // If the scheme was not declared, default to file://
-        // and use the absolute path of the file, then warn the user 
-        if(u.getScheme() == null) {
-          u = new URI("file://" + new File(name).getAbsolutePath());
-          LOG.warn("Scheme is undefined for " + name);
-          LOG.warn("Please check your file system configuration in " +
-          		"hdfs-site.xml");
-        }
-        dirs.add(u);
-      } catch (Exception e) {
-        LOG.error("Error while processing URI: " + name + 
-            ". The error message was: " + e.getMessage());
-      }
-    }
-    return dirs;
+      dirNames.add("file:///tmp/hadoop/dfs/name");
+    return Util.stringCollectionAsURIs(dirNames);
   }
 
   public static Collection<URI> getNamespaceEditsDirs(Configuration conf) {
@@ -688,6 +670,8 @@
    */
   public synchronized void setPermission(String src, FsPermission permission
       ) throws IOException {
+    if (isInSafeMode())
+      throw new SafeModeException("Cannot set permission for " + src, safeMode);
     checkOwner(src);
     dir.setPermission(src, permission);
     getEditLog().logSync();
@@ -705,6 +689,8 @@
    */
   public synchronized void setOwner(String src, String username, String group
       ) throws IOException {
+    if (isInSafeMode())
+        throw new SafeModeException("Cannot set owner for " + src, safeMode);
     FSPermissionChecker pc = checkOwner(src);
     if (!pc.isSuper) {
       if (username != null && !pc.user.equals(username)) {
@@ -1361,18 +1347,10 @@
    * are replicated.  Will return an empty 2-elt array if we want the
    * client to "try again later".
    */
-  public LocatedBlock getAdditionalBlock(String src,
+  public LocatedBlock getAdditionalBlock(String src, 
                                          String clientName,
                                          Block previous
                                          ) throws IOException {
-    return getAdditionalBlock(src, clientName, previous, null);
-  }
-
-  public LocatedBlock getAdditionalBlock(String src,
-                                         String clientName,
-                                         Block previous,
-                                         HashMap<Node, Node> excludedNodes
-                                         ) throws IOException {
     long fileLength, blockSize;
     int replication;
     DatanodeDescriptor clientNode = null;
@@ -1408,7 +1386,7 @@
 
     // choose targets for the new block to be allocated.
     DatanodeDescriptor targets[] = blockManager.replicator.chooseTarget(
-        src, replication, clientNode, excludedNodes, blockSize);
+        src, replication, clientNode, blockSize);
     if (targets.length < blockManager.minReplication) {
       throw new IOException("File " + src + " could only be replicated to " +
                             targets.length + " nodes, instead of " +
@@ -1909,10 +1887,11 @@
    * contract.
    */
   void setQuota(String path, long nsQuota, long dsQuota) throws IOException {
+    if (isInSafeMode())
+      throw new SafeModeException("Cannot set quota on " + path, safeMode);
     if (isPermissionEnabled) {
       checkSuperuserPrivilege();
     }
-    
     dir.setQuota(path, nsQuota, dsQuota);
     getEditLog().logSync();
   }
@@ -2011,8 +1990,17 @@
     BlockInfoUnderConstruction lastBlock = pendingFile.getLastBlock();
     BlockUCState lastBlockState = lastBlock.getBlockUCState();
     BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
-    BlockUCState penultimateBlockState = (penultimateBlock == null ?
-        BlockUCState.COMPLETE : penultimateBlock.getBlockUCState());
+    boolean penultimateBlockMinReplication;
+    BlockUCState penultimateBlockState;
+    if (penultimateBlock == null) {
+      penultimateBlockState = BlockUCState.COMPLETE;
+      // If penultimate block doesn't exist then its minReplication is met
+      penultimateBlockMinReplication = true;
+    } else {
+      penultimateBlockState = BlockUCState.COMMITTED;
+      penultimateBlockMinReplication = 
+        blockManager.checkMinReplication(penultimateBlock);
+    }
     assert penultimateBlockState == BlockUCState.COMPLETE ||
            penultimateBlockState == BlockUCState.COMMITTED :
            "Unexpected state of penultimate block in " + src;
@@ -2023,7 +2011,7 @@
       break;
     case COMMITTED:
       // Close file if committed blocks are minimally replicated
-      if(blockManager.checkMinReplication(penultimateBlock) &&
+      if(penultimateBlockMinReplication &&
           blockManager.checkMinReplication(lastBlock)) {
         finalizeINodeFileUnderConstruction(src, pendingFile);
         NameNode.stateChangeLog.warn("BLOCK*"
@@ -3860,14 +3848,12 @@
     getFSImage().rollFSImage();
   }
 
-  NamenodeCommand startCheckpoint(NamenodeRegistration bnReg, // backup node
-                                  NamenodeRegistration nnReg) // active name-node
+  synchronized NamenodeCommand startCheckpoint(
+                                NamenodeRegistration bnReg, // backup node
+                                NamenodeRegistration nnReg) // active name-node
   throws IOException {
-    NamenodeCommand cmd;
-    synchronized(this) {
-      cmd = getFSImage().startCheckpoint(bnReg, nnReg);
-    }
     LOG.info("Start checkpoint for " + bnReg.getAddress());
+    NamenodeCommand cmd = getFSImage().startCheckpoint(bnReg, nnReg);
     getEditLog().logSync();
     return cmd;
   }
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index 3a38c02..6a9eaa0 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -22,7 +22,6 @@
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 
@@ -50,6 +49,7 @@
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
+import org.apache.hadoop.security.RefreshUserToGroupMappingsProtocol;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
@@ -74,7 +74,6 @@
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -124,7 +123,8 @@
  **********************************************************/
 public class NameNode extends Service implements ClientProtocol, DatanodeProtocol,
                                  NamenodeProtocol, FSConstants,
-                                 RefreshAuthorizationPolicyProtocol {
+                                 RefreshAuthorizationPolicyProtocol,
+                                 RefreshUserToGroupMappingsProtocol {
   static{
     Configuration.addDefaultResource("hdfs-default.xml");
     Configuration.addDefaultResource("hdfs-site.xml");
@@ -140,6 +140,8 @@
       return NamenodeProtocol.versionID;
     } else if (protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){
       return RefreshAuthorizationPolicyProtocol.versionID;
+    } else if (protocol.equals(RefreshUserToGroupMappingsProtocol.class.getName())){
+      return RefreshUserToGroupMappingsProtocol.versionID;
     } else {
       throw new IOException("Unknown protocol to name node: " + protocol);
     }
@@ -367,6 +369,8 @@
     this.httpServer.addInternalServlet("data", "/data/*", FileDataServlet.class);
     this.httpServer.addInternalServlet("checksum", "/fileChecksum/*",
         FileChecksumServlets.RedirectServlet.class);
+    this.httpServer.addInternalServlet("contentSummary", "/contentSummary/*",
+        ContentSummaryServlet.class);
     this.httpServer.start();
 
     // The web-server port can be ephemeral... ensure we have the correct info
@@ -514,9 +518,11 @@
   protected synchronized void innerClose() throws IOException {
     LOG.info("Closing " + getServiceName());
 
-    if (stopRequested)
-      return;
-    stopRequested = true;
+    synchronized(this) {
+      if (stopRequested)
+        return;
+      stopRequested = true;
+    }
     if (plugins != null) {
       for (ServicePlugin p : plugins) {
         try {
@@ -550,7 +556,11 @@
       namesystem = null;
     }
   }
-  
+
+  synchronized boolean isStopRequested() {
+    return stopRequested;
+  }
+
   /////////////////////////////////////////////////////
   // NamenodeProtocol
   /////////////////////////////////////////////////////
@@ -706,30 +716,14 @@
     namesystem.setOwner(src, username, groupname);
   }
 
-
-  @Override
+  /**
+   */
   public LocatedBlock addBlock(String src, String clientName,
                                Block previous) throws IOException {
-    return addBlock(src, clientName, previous, null);
-  }
-
-  @Override
-  public LocatedBlock addBlock(String src,
-                               String clientName,
-                               Block previous,
-                               DatanodeInfo[] excludedNodes
-                               ) throws IOException {
     stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
                          +src+" for "+clientName);
-    HashMap<Node, Node> excludedNodesSet = null;
-    if (excludedNodes != null) {
-      excludedNodesSet = new HashMap<Node, Node>(excludedNodes.length);
-      for (Node node:excludedNodes) {
-        excludedNodesSet.put(node, node);
-      }
-    }
     LocatedBlock locatedBlock = 
-      namesystem.getAdditionalBlock(src, clientName, previous, excludedNodesSet);
+      namesystem.getAdditionalBlock(src, clientName, previous);
     if (locatedBlock != null)
       myMetrics.numAddBlockOps.inc();
     return locatedBlock;
@@ -1246,6 +1240,13 @@
     SecurityUtil.getPolicy().refresh();
   }
 
+  @Override
+  public void refreshUserToGroupsMappings(Configuration conf) throws IOException {
+    LOG.info("Refreshing all user-to-groups mappings. Requested by user: " + 
+             UserGroupInformation.getCurrentUGI().getUserName());
+    SecurityUtil.getUserToGroupsMappingService(conf).refresh();
+  }
+
   private static void printUsage() {
     System.err.println(
       "Usage: java NameNode [" +
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java b/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
index 2ced987..b13a728 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
@@ -18,6 +18,9 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
 import java.net.InetAddress;
 import java.net.URLEncoder;
 import java.util.ArrayList;
@@ -53,8 +56,11 @@
     long inodes = fsn.dir.totalInodes();
     long blocks = fsn.getBlocksTotal();
     long maxobjects = fsn.getMaxObjects();
-    long totalMemory = Runtime.getRuntime().totalMemory();
-    long maxMemory = Runtime.getRuntime().maxMemory();
+
+    MemoryMXBean mem = ManagementFactory.getMemoryMXBean();
+    MemoryUsage heap = mem.getHeapMemoryUsage();
+    long totalMemory = heap.getUsed();
+    long maxMemory = heap.getMax();
 
     long used = (totalMemory * 100) / maxMemory;
 
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java b/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
index 32b8cbe..1e7c741 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
@@ -589,6 +589,7 @@
         sdEdits = it.next();
       if ((sdName == null) || (sdEdits == null))
         throw new IOException("Could not locate checkpoint directories");
+      this.layoutVersion = -1; // to avoid assert in loadFSImage()
       loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE));
       loadFSEdits(sdEdits);
       sig.validateStorageInfo(this);
diff --git a/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
index 00f0455..19d2edb 100644
--- a/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
+++ b/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
@@ -44,6 +44,7 @@
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.RefreshUserToGroupMappingsProtocol;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.util.StringUtils;
@@ -473,6 +474,7 @@
       "\t[" + SetSpaceQuotaCommand.USAGE + "]\n" +
       "\t[" + ClearSpaceQuotaCommand.USAGE +"]\n" +
       "\t[-refreshServiceAcl]\n" +
+      "\t[-refreshUserToGroupsMappings]\n" +
       "\t[-printTopology]\n" +
       "\t[-help [cmd]]\n";
 
@@ -527,6 +529,9 @@
     String refreshServiceAcl = "-refreshServiceAcl: Reload the service-level authorization policy file\n" +
       "\t\tNamenode will reload the authorization policy file.\n";
     
+    String refreshUserToGroupsMappings = 
+      "-refreshUserToGroupsMappings: Refresh user-to-groups mappings\n";
+    
     String printTopology = "-printTopology: Print a tree of the racks and their\n" +
                            "\t\tnodes as reported by the Namenode\n";
     
@@ -559,6 +564,8 @@
       System.out.println(ClearSpaceQuotaCommand.DESCRIPTION);
     } else if ("refreshServiceAcl".equals(cmd)) {
       System.out.println(refreshServiceAcl);
+    } else if ("refreshUserToGroupsMappings".equals(cmd)) {
+      System.out.println(refreshUserToGroupsMappings);
     } else if ("printTopology".equals(cmd)) {
       System.out.println(printTopology);
     } else if ("help".equals(cmd)) {
@@ -746,6 +753,30 @@
   }
   
   /**
+   * Refresh the user-to-groups mappings on the {@link NameNode}.
+   * @return exitcode 0 on success, non-zero on failure
+   * @throws IOException
+   */
+  public int refreshUserToGroupsMappings() throws IOException {
+    // Get the current configuration
+    Configuration conf = getConf();
+    
+    // Create the client
+    RefreshUserToGroupMappingsProtocol refreshProtocol = 
+      (RefreshUserToGroupMappingsProtocol) 
+      RPC.getProxy(RefreshUserToGroupMappingsProtocol.class, 
+                   RefreshUserToGroupMappingsProtocol.versionID, 
+                   NameNode.getAddress(conf), getUGI(conf), conf,
+                   NetUtils.getSocketFactory(conf, 
+                                             RefreshUserToGroupMappingsProtocol.class));
+    
+    // Refresh the user-to-groups mappings
+    refreshProtocol.refreshUserToGroupsMappings(conf);
+    
+    return 0;
+  }
+  
+  /**
    * Displays format of commands.
    * @param cmd The command that is being executed.
    */
@@ -789,6 +820,9 @@
     } else if ("-refreshServiceAcl".equals(cmd)) {
       System.err.println("Usage: java DFSAdmin"
                          + " [-refreshServiceAcl]");
+    } else if ("-refreshUserToGroupsMappings".equals(cmd)) {
+      System.err.println("Usage: java DFSAdmin"
+                         + " [-refreshUserToGroupsMappings]");
     } else if ("-printTopology".equals(cmd)) {
       System.err.println("Usage: java DFSAdmin"
                          + " [-printTopology]");
@@ -803,6 +837,7 @@
       System.err.println("           [-upgradeProgress status | details | force]");
       System.err.println("           [-metasave filename]");
       System.err.println("           [-refreshServiceAcl]");
+      System.err.println("           [-refreshUserToGroupsMappings]");
       System.err.println("           [-printTopology]");
       System.err.println("           ["+SetQuotaCommand.USAGE+"]");
       System.err.println("           ["+ClearQuotaCommand.USAGE+"]");
@@ -879,11 +914,15 @@
         printUsage(cmd);
         return exitCode;
       }
-      else if ("-printTopology".equals(cmd)) {
-        if(argv.length != 1) {
-          printUsage(cmd);
-          return exitCode;
-        }
+    } else if ("-refreshUserToGroupsMappings".equals(cmd)) {
+      if (argv.length != 1) {
+        printUsage(cmd);
+        return exitCode;
+      }
+    } else if ("-printTopology".equals(cmd)) {
+      if(argv.length != 1) {
+        printUsage(cmd);
+        return exitCode;
       }
     }
     
@@ -927,6 +966,8 @@
         exitCode = new SetSpaceQuotaCommand(argv, i, fs).runAll();
       } else if ("-refreshServiceAcl".equals(cmd)) {
         exitCode = refreshServiceAcl();
+      } else if ("-refreshUserToGroupsMappings".equals(cmd)) {
+        exitCode = refreshUserToGroupsMappings();
       } else if ("-printTopology".equals(cmd)) {
         exitCode = printTopology();
       } else if ("-help".equals(cmd)) {
diff --git a/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java b/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
index 5be5d8d..898cabf 100644
--- a/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
+++ b/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
@@ -59,30 +59,36 @@
     private volatile boolean isSuccess = false;
 
     /** Simulate action for the receiverOpWriteBlock pointcut */
-    public final ActionContainer<DatanodeID> fiReceiverOpWriteBlock
-        = new ActionContainer<DatanodeID>();
+    public final ActionContainer<DatanodeID, IOException> fiReceiverOpWriteBlock
+        = new ActionContainer<DatanodeID, IOException>();
     /** Simulate action for the callReceivePacket pointcut */
-    public final ActionContainer<DatanodeID> fiCallReceivePacket
-        = new ActionContainer<DatanodeID>();
+    public final ActionContainer<DatanodeID, IOException> fiCallReceivePacket
+        = new ActionContainer<DatanodeID, IOException>();
+    /** Simulate action for the callWritePacketToDisk pointcut */
+    public final ActionContainer<DatanodeID, IOException> fiCallWritePacketToDisk
+        = new ActionContainer<DatanodeID, IOException>();
     /** Simulate action for the statusRead pointcut */
-    public final ActionContainer<DatanodeID> fiStatusRead
-        = new ActionContainer<DatanodeID>();
+    public final ActionContainer<DatanodeID, IOException> fiStatusRead
+        = new ActionContainer<DatanodeID, IOException>();
+    /** Simulate action for the afterDownstreamStatusRead pointcut */
+    public final ActionContainer<DatanodeID, IOException> fiAfterDownstreamStatusRead
+        = new ActionContainer<DatanodeID, IOException>();
     /** Simulate action for the pipelineAck pointcut */
-    public final ActionContainer<DatanodeID> fiPipelineAck
-        = new ActionContainer<DatanodeID>();
+    public final ActionContainer<DatanodeID, IOException> fiPipelineAck
+        = new ActionContainer<DatanodeID, IOException>();
     /** Simulate action for the pipelineClose pointcut */
-    public final ActionContainer<DatanodeID> fiPipelineClose
-        = new ActionContainer<DatanodeID>();
+    public final ActionContainer<DatanodeID, IOException> fiPipelineClose
+        = new ActionContainer<DatanodeID, IOException>();
     /** Simulate action for the blockFileClose pointcut */
-    public final ActionContainer<DatanodeID> fiBlockFileClose
-        = new ActionContainer<DatanodeID>();
+    public final ActionContainer<DatanodeID, IOException> fiBlockFileClose
+        = new ActionContainer<DatanodeID, IOException>();
 
     /** Verification action for the pipelineInitNonAppend pointcut */
-    public final ActionContainer<Integer> fiPipelineInitErrorNonAppend
-        = new ActionContainer<Integer>();
+    public final ActionContainer<Integer, RuntimeException> fiPipelineInitErrorNonAppend
+        = new ActionContainer<Integer, RuntimeException>();
     /** Verification action for the pipelineErrorAfterInit pointcut */
-    public final ActionContainer<Integer> fiPipelineErrorAfterInit
-        = new ActionContainer<Integer>();
+    public final ActionContainer<Integer, RuntimeException> fiPipelineErrorAfterInit
+        = new ActionContainer<Integer, RuntimeException>();
 
     /** Get test status */
     public boolean isSuccess() {
@@ -121,7 +127,8 @@
   }
 
   /** Action for DataNode */
-  public static abstract class DataNodeAction implements Action<DatanodeID> {
+  public static abstract class DataNodeAction implements
+      Action<DatanodeID, IOException> {
     /** The name of the test */
     final String currentTest;
     /** The index of the datanode */
@@ -195,6 +202,28 @@
     }
   }
 
+  /** Throws OutOfMemoryError if the count is zero. */
+  public static class CountdownOomAction extends OomAction {
+    private final CountdownConstraint countdown;
+
+    /** Create an action for datanode i in the pipeline with count down. */
+    public CountdownOomAction(String currentTest, int i, int count) {
+      super(currentTest, i);
+      countdown = new CountdownConstraint(count);
+    }
+
+    @Override
+    public void run(DatanodeID id) {
+      final DataTransferTest test = getDataTransferTest();
+      final Pipeline p = test.getPipeline(id);
+      if (p.contains(index, id) && countdown.isSatisfied()) {
+        final String s = toString(id);
+        FiTestUtil.LOG.info(s);
+        throw new OutOfMemoryError(s);
+      }
+    }
+  }
+
   /** Throws DiskOutOfSpaceException. */
   public static class DoosAction extends DataNodeAction {
     /** Create an action for datanode i in the pipeline. */
@@ -242,6 +271,28 @@
     }
   }
 
+  /** Throws DiskOutOfSpaceException if the count is zero. */
+  public static class CountdownDoosAction extends DoosAction {
+    private final CountdownConstraint countdown;
+
+    /** Create an action for datanode i in the pipeline with count down. */
+    public CountdownDoosAction(String currentTest, int i, int count) {
+      super(currentTest, i);
+      countdown = new CountdownConstraint(count);
+    }
+
+    @Override
+    public void run(DatanodeID id) throws DiskOutOfSpaceException {
+      final DataTransferTest test = getDataTransferTest();
+      final Pipeline p = test.getPipeline(id);
+      if (p.contains(index, id) && countdown.isSatisfied()) {
+        final String s = toString(id);
+        FiTestUtil.LOG.info(s);
+        throw new DiskOutOfSpaceException(s);
+      }
+    }
+  }
+
   /**
    * Sleep some period of time so that it slows down the datanode
    * or sleep forever so that datanode becomes not responding.
@@ -307,8 +358,50 @@
     }
   }
 
+  /**
+   * When the count is zero,
+   * sleep some period of time so that it slows down the datanode
+   * or sleep forever so that datanode becomes not responding.
+   */
+  public static class CountdownSleepAction extends SleepAction {
+    private final CountdownConstraint countdown;
+
+    /**
+     * Create an action for datanode i in the pipeline.
+     * @param duration In milliseconds, duration <= 0 means sleeping forever.
+     */
+    public CountdownSleepAction(String currentTest, int i,
+        long duration, int count) {
+      this(currentTest, i, duration, duration+1, count);
+    }
+
+    /** Create an action for datanode i in the pipeline with count down. */
+    public CountdownSleepAction(String currentTest, int i,
+        long minDuration, long maxDuration, int count) {
+      super(currentTest, i, minDuration, maxDuration);
+      countdown = new CountdownConstraint(count);
+    }
+
+    @Override
+    public void run(DatanodeID id) {
+      final DataTransferTest test = getDataTransferTest();
+      final Pipeline p = test.getPipeline(id);
+      if (p.contains(index, id) && countdown.isSatisfied()) {
+        final String s = toString(id) + ", duration = ["
+        + minDuration + "," + maxDuration + ")";
+        FiTestUtil.LOG.info(s);
+        if (maxDuration <= 1) {
+          for(; true; FiTestUtil.sleep(1000)); //sleep forever
+        } else {
+          FiTestUtil.sleep(minDuration, maxDuration);
+        }
+      }
+    }
+  }
+
   /** Action for pipeline error verification */
-  public static class VerificationAction implements Action<Integer> {
+  public static class VerificationAction implements
+      Action<Integer, RuntimeException> {
     /** The name of the test */
     final String currentTest;
     /** The error index of the datanode */
@@ -343,9 +436,10 @@
    *  Create a OomAction with a CountdownConstraint
    *  so that it throws OutOfMemoryError if the count is zero.
    */
-  public static ConstraintSatisfactionAction<DatanodeID> createCountdownOomAction(
-      String currentTest, int i, int count) {
-    return new ConstraintSatisfactionAction<DatanodeID>(
+  public static ConstraintSatisfactionAction<DatanodeID, IOException>
+      createCountdownOomAction(
+        String currentTest, int i, int count) {
+    return new ConstraintSatisfactionAction<DatanodeID, IOException>(
         new OomAction(currentTest, i), new CountdownConstraint(count));
   }
 
@@ -353,9 +447,10 @@
    *  Create a DoosAction with a CountdownConstraint
    *  so that it throws DiskOutOfSpaceException if the count is zero.
    */
-  public static ConstraintSatisfactionAction<DatanodeID> createCountdownDoosAction(
+  public static ConstraintSatisfactionAction<DatanodeID, IOException>
+    createCountdownDoosAction(
       String currentTest, int i, int count) {
-    return new ConstraintSatisfactionAction<DatanodeID>(
+    return new ConstraintSatisfactionAction<DatanodeID, IOException>(
         new DoosAction(currentTest, i), new CountdownConstraint(count));
   }
 
@@ -366,9 +461,9 @@
    * sleep some period of time so that it slows down the datanode
    * or sleep forever so the that datanode becomes not responding.
    */
-  public static ConstraintSatisfactionAction<DatanodeID> createCountdownSleepAction(
+  public static ConstraintSatisfactionAction<DatanodeID, IOException> createCountdownSleepAction(
       String currentTest, int i, long minDuration, long maxDuration, int count) {
-    return new ConstraintSatisfactionAction<DatanodeID>(
+    return new ConstraintSatisfactionAction<DatanodeID, IOException>(
         new SleepAction(currentTest, i, minDuration, maxDuration),
         new CountdownConstraint(count));
   }
@@ -377,7 +472,7 @@
    * Same as
    * createCountdownSleepAction(currentTest, i, duration, duration+1, count).
    */
-  public static ConstraintSatisfactionAction<DatanodeID> createCountdownSleepAction(
+  public static ConstraintSatisfactionAction<DatanodeID, IOException> createCountdownSleepAction(
       String currentTest, int i, long duration, int count) {
     return createCountdownSleepAction(currentTest, i, duration, duration+1,
         count);
diff --git a/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java b/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java
index 8ff53b1..201c8eb 100644
--- a/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java
+++ b/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java
@@ -59,9 +59,9 @@
   
   /** Class adds new type of action */
   public static class HFlushTest extends DataTransferTest {
-    public final ActionContainer<DatanodeID> fiCallHFlush = 
-      new ActionContainer<DatanodeID>();
-    public final ActionContainer<Integer> fiErrorOnCallHFlush = 
-      new ActionContainer<Integer>();
+    public final ActionContainer<DatanodeID, IOException> fiCallHFlush = 
+      new ActionContainer<DatanodeID, IOException>();
+    public final ActionContainer<Integer, RuntimeException> fiErrorOnCallHFlush = 
+      new ActionContainer<Integer, RuntimeException>();
   }
 }
\ No newline at end of file
diff --git a/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java b/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
index 853d271..3f608a4 100644
--- a/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
+++ b/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
@@ -17,7 +17,8 @@
  */
 package org.apache.hadoop.fi;
 
-import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -95,24 +96,23 @@
   }
 
   /** Action interface */
-  public static interface Action<T> {
+  public static interface Action<T, E extends Exception> {
     /** Run the action with the parameter. */
-    public void run(T parameter) throws IOException;
+    public void run(T parameter) throws E;
   }
 
   /** An ActionContainer contains at most one action. */
-  public static class ActionContainer<T> {
-    private Action<T> action;
-
+  public static class ActionContainer<T, E extends Exception> {
+    private List<Action<T, E>> actionList = new ArrayList<Action<T, E>>();
     /** Create an empty container. */
     public ActionContainer() {}
 
     /** Set action. */
-    public void set(Action<T> a) {action = a;}
+    public void set(Action<T, E> a) {actionList.add(a);}
 
     /** Run the action if it exists. */
-    public void run(T obj) throws IOException {
-      if (action != null) {
+    public void run(T obj) throws E {
+      for (Action<T, E> action : actionList) {
         action.run(obj);
       }
     }
@@ -124,21 +124,21 @@
     public boolean isSatisfied();
   }
 
-  /** Counting down, the constraint is satisfied if the count is zero. */
+  /** Counting down, the constraint is satisfied if the count is one. */
   public static class CountdownConstraint implements Constraint {
     private int count;
 
     /** Initialize the count. */
     public CountdownConstraint(int count) {
-      if (count < 0) {
-        throw new IllegalArgumentException(count + " = count < 0");
+      if (count < 1) {
+        throw new IllegalArgumentException(count + " = count < 1");
       }
       this.count = count;
     }
 
     /** Counting down, the constraint is satisfied if the count is zero. */
     public boolean isSatisfied() {
-      if (count > 0) {
+      if (count > 1) {
         count--;
         return false;
       }
@@ -147,13 +147,14 @@
   }
   
   /** An action is fired if all the constraints are satisfied. */
-  public static class ConstraintSatisfactionAction<T> implements Action<T> {
-    private final Action<T> action;
+  public static class ConstraintSatisfactionAction<T, E extends Exception> 
+      implements Action<T, E> {
+    private final Action<T, E> action;
     private final Constraint[] constraints;
     
     /** Constructor */
     public ConstraintSatisfactionAction(
-        Action<T> action, Constraint... constraints) {
+        Action<T, E> action, Constraint... constraints) {
       this.action = action;
       this.constraints = constraints;
     }
@@ -163,7 +164,7 @@
      * Short-circuit-and is used. 
      */
     @Override
-    public final void run(T parameter) throws IOException {
+    public final void run(T parameter) throws E {
       for(Constraint c : constraints) {
         if (!c.isSatisfied()) {
           return;
diff --git a/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj b/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
index 6371135..dc10e1b 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
+++ b/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hdfs;
 
-import java.io.IOException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fi.DataTransferTestUtil;
@@ -49,14 +47,10 @@
   after(DataStreamer datastreamer) returning : pipelineInitNonAppend(datastreamer) {
     LOG.info("FI: after pipelineInitNonAppend: hasError="
         + datastreamer.hasError + " errorIndex=" + datastreamer.errorIndex);
-    try {
-      if (datastreamer.hasError) {
-        DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
-        if (dtTest != null )
-          dtTest.fiPipelineInitErrorNonAppend.run(datastreamer.errorIndex);
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+    if (datastreamer.hasError) {
+      DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+      if (dtTest != null)
+        dtTest.fiPipelineInitErrorNonAppend.run(datastreamer.errorIndex);
     }
   }
 
@@ -78,13 +72,9 @@
   before(DataStreamer datastreamer) : pipelineErrorAfterInit(datastreamer) {
     LOG.info("FI: before pipelineErrorAfterInit: errorIndex="
         + datastreamer.errorIndex);
-    try {
-      DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
-      if (dtTest != null )
-        dtTest.fiPipelineErrorAfterInit.run(datastreamer.errorIndex);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
+    DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+    if (dtTest != null )
+      dtTest.fiPipelineErrorAfterInit.run(datastreamer.errorIndex);
   }
 
   pointcut pipelineClose(DFSOutputStream out):
diff --git a/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java b/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java
index 22a0edb..268c02a 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java
+++ b/src/test/aop/org/apache/hadoop/hdfs/PipelinesTestUtil.java
@@ -39,7 +39,7 @@
   /**
    * Storing acknowleged bytes num. action for fault injection tests
    */
-  public static class ReceivedCheckAction implements FiTestUtil.Action<NodeBytes> {
+  public static class ReceivedCheckAction implements FiTestUtil.Action<NodeBytes, IOException> {
     String name;
     LinkedList<NodeBytes> rcv = ((PipelinesTest) getPipelineTest()).received;
     LinkedList<NodeBytes> ack = ((PipelinesTest) getPipelineTest()).acked;
@@ -77,7 +77,7 @@
   /**
    * Storing acknowleged bytes num. action for fault injection tests
    */
-  public static class AckedCheckAction implements FiTestUtil.Action<NodeBytes> {
+  public static class AckedCheckAction implements FiTestUtil.Action<NodeBytes, IOException> {
     String name;
     LinkedList<NodeBytes> rcv = ((PipelinesTest) getPipelineTest()).received;
     LinkedList<NodeBytes> ack = ((PipelinesTest) getPipelineTest()).acked;
@@ -118,10 +118,10 @@
     LinkedList<NodeBytes> received = new LinkedList<NodeBytes>();
     LinkedList<NodeBytes> acked = new LinkedList<NodeBytes>();
 
-    public final ActionContainer<NodeBytes> fiCallSetNumBytes =
-      new ActionContainer<NodeBytes>();
-    public final ActionContainer<NodeBytes> fiCallSetBytesAcked =
-      new ActionContainer<NodeBytes>();
+    public final ActionContainer<NodeBytes, IOException> fiCallSetNumBytes =
+      new ActionContainer<NodeBytes, IOException>();
+    public final ActionContainer<NodeBytes, IOException> fiCallSetBytesAcked =
+      new ActionContainer<NodeBytes, IOException>();
     
     private static boolean suspend = false;
     private static long lastQueuedPacket = -1;
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
index 62ef10a..a79c383 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
@@ -18,12 +18,14 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.io.OutputStream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.Pipeline;
 import org.apache.hadoop.fi.PipelineTest;
 import org.apache.hadoop.fi.ProbabilityModel;
 import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
@@ -31,6 +33,7 @@
 import org.apache.hadoop.hdfs.PipelinesTestUtil.PipelinesTest;
 import org.apache.hadoop.hdfs.PipelinesTestUtil.NodeBytes;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 
@@ -42,12 +45,7 @@
   public static final Log LOG = LogFactory.getLog(BlockReceiverAspects.class);
 
   pointcut callReceivePacket(BlockReceiver blockreceiver) :
-    call (* OutputStream.write(..))
-      && withincode (* BlockReceiver.receivePacket(..))
-// to further limit the application of this aspect a very narrow 'target' can be used as follows
-//  && target(DataOutputStream)
-      && !within(BlockReceiverAspects +)
-      && this(blockreceiver);
+    call(* receivePacket(..)) && target(blockreceiver);
 	
   before(BlockReceiver blockreceiver
       ) throws IOException : callReceivePacket(blockreceiver) {
@@ -65,7 +63,30 @@
     }
   }
   
-  // Pointcuts and advises for TestFiPipelines  
+  pointcut callWritePacketToDisk(BlockReceiver blockreceiver) :
+    call(* writePacketToDisk(..)) && target(blockreceiver);
+
+  before(BlockReceiver blockreceiver
+      ) throws IOException : callWritePacketToDisk(blockreceiver) {
+    LOG.info("FI: callWritePacketToDisk");
+    DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+    if (dtTest != null)
+      dtTest.fiCallWritePacketToDisk.run(
+          blockreceiver.getDataNode().getDatanodeRegistration());
+  }
+
+  pointcut afterDownstreamStatusRead(BlockReceiver.PacketResponder responder):
+    call(void PipelineAck.readFields(DataInput)) && this(responder);
+
+  after(BlockReceiver.PacketResponder responder)
+      throws IOException: afterDownstreamStatusRead(responder) {
+    final DataNode d = responder.receiver.getDataNode();
+    DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
+    if (dtTest != null)
+      dtTest.fiAfterDownstreamStatusRead.run(d.getDatanodeRegistration());
+  }
+
+    // Pointcuts and advises for TestFiPipelines  
   pointcut callSetNumBytes(BlockReceiver br, long offset) : 
     call (void ReplicaInPipelineInterface.setNumBytes(long)) 
     && withincode (int BlockReceiver.receivePacket(long, long, boolean, int, int))
@@ -97,12 +118,6 @@
     && args(acked) 
     && this(pr);
 
-  pointcut callSetBytesAckedLastDN(PacketResponder pr, long acked) : 
-    call (void ReplicaInPipelineInterface.setBytesAcked(long)) 
-    && withincode (void PacketResponder.lastDataNodeRun())
-    && args(acked) 
-    && this(pr);
-  
   after (PacketResponder pr, long acked) : callSetBytesAcked (pr, acked) {
     PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
     if (pTest == null) {
@@ -115,19 +130,7 @@
       bytesAckedService((PipelinesTest)pTest, pr, acked);
     }
   }
-  after (PacketResponder pr, long acked) : callSetBytesAckedLastDN (pr, acked) {
-    PipelineTest pTest = DataTransferTestUtil.getDataTransferTest();
-    if (pTest == null) {
-      LOG.debug("FI: no pipeline has been found in acking");
-      return;
-    }
-    LOG.debug("FI: Acked total bytes from (last DN): " + 
-        pr.receiver.datanode.dnRegistration.getStorageID() + ": " + acked);
-    if (pTest instanceof PipelinesTest) {
-      bytesAckedService((PipelinesTest)pTest, pr, acked); 
-    }
-  }
-  
+
   private void bytesAckedService 
       (final PipelinesTest pTest, final PacketResponder pr, final long acked) {
     NodeBytes nb = new NodeBytes(pr.receiver.datanode.dnRegistration, acked);
@@ -140,7 +143,7 @@
   }
   
   pointcut preventAckSending () :
-    call (void ackReply(long)) 
+    call (void PipelineAck.write(DataOutput)) 
     && within (PacketResponder);
 
   static int ackCounter = 0;
@@ -193,7 +196,7 @@
   }
 
   pointcut pipelineAck(BlockReceiver.PacketResponder packetresponder) :
-    call (Status Status.read(DataInput))
+    call (void PipelineAck.readFields(DataInput))
       && this(packetresponder);
 
   after(BlockReceiver.PacketResponder packetresponder) throws IOException
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
index 1ea556b..d5b7131 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fi.DataTransferTestUtil;
 import org.apache.hadoop.fi.FiTestUtil;
@@ -101,7 +102,7 @@
   }
   
   private static void runReceiverOpWriteBlockTest(String methodName,
-      int errorIndex, Action<DatanodeID> a) throws IOException {
+      int errorIndex, Action<DatanodeID, IOException> a) throws IOException {
     FiTestUtil.LOG.info("Running " + methodName + " ...");
     final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
         .initTest();
@@ -113,7 +114,7 @@
   }
   
   private static void runStatusReadTest(String methodName, int errorIndex,
-      Action<DatanodeID> a) throws IOException {
+      Action<DatanodeID, IOException> a) throws IOException {
     FiTestUtil.LOG.info("Running " + methodName + " ...");
     final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
         .initTest();
@@ -124,11 +125,11 @@
     Assert.assertTrue(t.isSuccess());
   }
 
-  private static void runCallReceivePacketTest(String methodName,
-      int errorIndex, Action<DatanodeID> a) throws IOException {
+  private static void runCallWritePacketToDisk(String methodName,
+      int errorIndex, Action<DatanodeID, IOException> a) throws IOException {
     FiTestUtil.LOG.info("Running " + methodName + " ...");
     final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest();
-    t.fiCallReceivePacket.set(a);
+    t.fiCallWritePacketToDisk.set(a);
     t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, errorIndex));
     write1byte(methodName);
     Assert.assertTrue(t.isSuccess());
@@ -280,7 +281,7 @@
   @Test
   public void pipeline_Fi_14() throws IOException {
     final String methodName = FiTestUtil.getMethodName();
-    runCallReceivePacketTest(methodName, 0, new DoosAction(methodName, 0));
+    runCallWritePacketToDisk(methodName, 0, new DoosAction(methodName, 0));
   }
 
   /**
@@ -291,7 +292,7 @@
   @Test
   public void pipeline_Fi_15() throws IOException {
     final String methodName = FiTestUtil.getMethodName();
-    runCallReceivePacketTest(methodName, 1, new DoosAction(methodName, 1));
+    runCallWritePacketToDisk(methodName, 1, new DoosAction(methodName, 1));
   }
   
   /**
@@ -302,11 +303,11 @@
   @Test
   public void pipeline_Fi_16() throws IOException {
     final String methodName = FiTestUtil.getMethodName();
-    runCallReceivePacketTest(methodName, 2, new DoosAction(methodName, 2));
+    runCallWritePacketToDisk(methodName, 2, new DoosAction(methodName, 2));
   }
 
   private static void runPipelineCloseTest(String methodName,
-      Action<DatanodeID> a) throws IOException {
+      Action<DatanodeID, IOException> a) throws IOException {
     FiTestUtil.LOG.info("Running " + methodName + " ...");
     final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
         .initTest();
@@ -324,7 +325,7 @@
     final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest();
     final MarkerConstraint marker = new MarkerConstraint(name);
     t.fiPipelineClose.set(new DatanodeMarkingAction(name, i, marker));
-    t.fiPipelineAck.set(new ConstraintSatisfactionAction<DatanodeID>(a, marker));
+    t.fiPipelineAck.set(new ConstraintSatisfactionAction<DatanodeID, IOException>(a, marker));
     write1byte(name);
   }
 
@@ -442,7 +443,7 @@
   }
 
   private static void runBlockFileCloseTest(String methodName,
-      Action<DatanodeID> a) throws IOException {
+      Action<DatanodeID, IOException> a) throws IOException {
     FiTestUtil.LOG.info("Running " + methodName + " ...");
     final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
         .initTest();
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
new file mode 100644
index 0000000..2c3280b
--- /dev/null
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.FiTestUtil;
+import org.apache.hadoop.fi.DataTransferTestUtil.CountdownDoosAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.CountdownOomAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.CountdownSleepAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
+import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.VerificationAction;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.BlockReceiver;
+import org.apache.log4j.Level;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test DataTransferProtocol with fault injection. */
+public class TestFiDataTransferProtocol2 {
+  static final short REPLICATION = 3;
+  static final long BLOCKSIZE = 1L * (1L << 20);
+  static final int PACKET_SIZE = 1024;
+  static final int MIN_N_PACKET = 3;
+  static final int MAX_N_PACKET = 10;
+
+  static final Configuration conf = new Configuration();
+  static {
+    conf.setInt("dfs.datanode.handler.count", 1);
+    conf.setInt("dfs.replication", REPLICATION);
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, PACKET_SIZE);
+    conf.setInt("dfs.socket.timeout", 5000);
+  }
+
+  static final byte[] bytes = new byte[MAX_N_PACKET * PACKET_SIZE];
+  static final byte[] toRead = new byte[MAX_N_PACKET * PACKET_SIZE];
+
+  static private FSDataOutputStream createFile(FileSystem fs, Path p
+      ) throws IOException {
+    return fs.create(p, true, fs.getConf().getInt("io.file.buffer.size", 4096),
+        REPLICATION, BLOCKSIZE);
+  }
+
+  {
+    ((Log4JLogger) BlockReceiver.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+  /**
+   * 1. create files with dfs
+   * 2. write MIN_N_PACKET to MAX_N_PACKET packets
+   * 3. close file
+   * 4. open the same file
+   * 5. read the bytes and compare results
+   */
+  private static void writeSeveralPackets(String methodName) throws IOException {
+    final Random r = FiTestUtil.RANDOM.get();
+    final int nPackets = FiTestUtil.nextRandomInt(MIN_N_PACKET, MAX_N_PACKET + 1);
+    final int lastPacketSize = FiTestUtil.nextRandomInt(1, PACKET_SIZE + 1);
+    final int size = (nPackets - 1)*PACKET_SIZE + lastPacketSize;
+
+    FiTestUtil.LOG.info("size=" + size + ", nPackets=" + nPackets
+        + ", lastPacketSize=" + lastPacketSize);
+
+    final MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION, true, null);
+    final FileSystem dfs = cluster.getFileSystem();
+    try {
+      final Path p = new Path("/" + methodName + "/foo");
+      final FSDataOutputStream out = createFile(dfs, p);
+
+      final long seed = r.nextLong();
+      final Random ran = new Random(seed);
+      ran.nextBytes(bytes);
+      out.write(bytes, 0, size);
+      out.close();
+
+      final FSDataInputStream in = dfs.open(p);
+      int totalRead = 0;
+      int nRead = 0;
+      while ((nRead = in.read(toRead, totalRead, size - totalRead)) > 0) {
+        totalRead += nRead;
+      }
+      Assert.assertEquals("Cannot read file.", size, totalRead);
+      for (int i = 0; i < size; i++) {
+        Assert.assertTrue("File content differ.", bytes[i] == toRead[i]);
+      }
+    }
+    finally {
+      dfs.close();
+      cluster.shutdown();
+    }
+  }
+
+  private static void initSlowDatanodeTest(DataTransferTest t, SleepAction a)
+      throws IOException {
+    t.fiCallReceivePacket.set(a);
+    t.fiReceiverOpWriteBlock.set(a);
+    t.fiStatusRead.set(a);
+  }
+
+  private void runTest17_19(String methodName, int dnIndex)
+      throws IOException {
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final int maxSleep = 3000;
+    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+        .initTest();
+    initSlowDatanodeTest(t, new SleepAction(methodName, 0, 0, maxSleep));
+    initSlowDatanodeTest(t, new SleepAction(methodName, 1, 0, maxSleep));
+    initSlowDatanodeTest(t, new SleepAction(methodName, 2, 0, maxSleep));
+    t.fiCallWritePacketToDisk.set(new CountdownDoosAction(methodName, dnIndex, 3));
+    t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex));
+    writeSeveralPackets(methodName);
+    Assert.assertTrue(t.isSuccess());
+  }
+
+  private void runTest29_30(String methodName, int dnIndex) throws IOException {
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final int maxSleep = 3000;
+    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+        .initTest();
+    initSlowDatanodeTest(t, new SleepAction(methodName, 0, 0, maxSleep));
+    initSlowDatanodeTest(t, new SleepAction(methodName, 1, 0, maxSleep));
+    initSlowDatanodeTest(t, new SleepAction(methodName, 2, 0, maxSleep));
+    t.fiAfterDownstreamStatusRead.set(new CountdownOomAction(methodName, dnIndex, 3));
+    t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex));
+    writeSeveralPackets(methodName);
+    Assert.assertTrue(t.isSuccess());
+  }
+  
+  private void runTest34_35(String methodName, int dnIndex) throws IOException {
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+        .initTest();
+    t.fiAfterDownstreamStatusRead.set(new CountdownSleepAction(methodName, dnIndex, 0, 3));
+    t.fiPipelineErrorAfterInit.set(new VerificationAction(methodName, dnIndex));
+    writeSeveralPackets(methodName);
+    Assert.assertTrue(t.isSuccess());
+  }
+  /**
+   * Streaming:
+   * Randomize datanode speed, write several packets,
+   * DN0 throws a DiskOutOfSpaceError when it writes the third packet to disk.
+   * Client gets an IOException and determines DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_17() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runTest17_19(methodName, 0);
+  }
+  
+  /**
+   * Streaming:
+   * Randomize datanode speed, write several packets,
+   * DN1 throws a DiskOutOfSpaceError when it writes the third packet to disk.
+   * Client gets an IOException and determines DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_18() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runTest17_19(methodName, 1);
+  }
+  
+  /**
+   * Streaming:
+   * Randomize datanode speed, write several packets,
+   * DN2 throws a DiskOutOfSpaceError when it writes the third packet to disk.
+   * Client gets an IOException and determines DN2 bad.
+   */
+  @Test
+  public void pipeline_Fi_19() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runTest17_19(methodName, 2);
+  }
+  
+  /**
+   * Streaming: Client writes several packets with DN0 very slow. Client
+   * finishes write successfully.
+   */
+  @Test
+  public void pipeline_Fi_20() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+        .initTest();
+    initSlowDatanodeTest(t, new SleepAction(methodName, 0, 3000));
+    writeSeveralPackets(methodName);
+  }
+
+  /**
+   * Streaming: Client writes several packets with DN1 very slow. Client
+   * finishes write successfully.
+   */
+  @Test
+  public void pipeline_Fi_21() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+        .initTest();
+    initSlowDatanodeTest(t, new SleepAction(methodName, 1, 3000));
+    writeSeveralPackets(methodName);
+  }
+  
+  /**
+   * Streaming: Client writes several packets with DN2 very slow. Client
+   * finishes write successfully.
+   */
+  @Test
+  public void pipeline_Fi_22() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+        .initTest();
+    initSlowDatanodeTest(t, new SleepAction(methodName, 2, 3000));
+    writeSeveralPackets(methodName);
+  }
+  
+  /**
+   * Streaming: Randomize datanode speed, write several packets, DN1 throws a
+   * OutOfMemoryException when it receives the ack of the third packet from DN2.
+   * Client gets an IOException and determines DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_29() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runTest29_30(methodName, 1);
+  }
+
+  /**
+   * Streaming: Randomize datanode speed, write several packets, DN0 throws a
+   * OutOfMemoryException when it receives the ack of the third packet from DN1.
+   * Client gets an IOException and determines DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_30() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runTest29_30(methodName, 0);
+  }
+  
+  /**
+   * Streaming: Write several packets, DN1 never responses when it receives the
+   * ack of the third packet from DN2. Client gets an IOException and determines
+   * DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_34() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runTest34_35(methodName, 1);
+  }
+
+  /**
+   * Streaming: Write several packets, DN0 never responses when it receives the
+   * ack of the third packet from DN1. Client gets an IOException and determines
+   * DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_35() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runTest34_35(methodName, 0);
+  }
+}
\ No newline at end of file
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
new file mode 100644
index 0000000..96513a9
--- /dev/null
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fi.DataTransferTestUtil;
+import org.apache.hadoop.fi.FiTestUtil;
+import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
+import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
+import org.apache.hadoop.fi.FiTestUtil.Action;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test DataTransferProtocol with fault injection. */
+public class TestFiPipelineClose {
+  static final short REPLICATION = 3;
+  static final long BLOCKSIZE = 1L * (1L << 20);
+
+  static final Configuration conf = new HdfsConfiguration();
+  static {
+    conf.setInt("dfs.datanode.handler.count", 1);
+    conf.setInt("dfs.replication", REPLICATION);
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
+  }
+
+  static private FSDataOutputStream createFile(FileSystem fs, Path p
+      ) throws IOException {
+    return fs.create(p, true, fs.getConf().getInt("io.file.buffer.size", 4096),
+        REPLICATION, BLOCKSIZE);
+  }
+
+  /**
+   * 1. create files with dfs
+   * 2. write 1 byte
+   * 3. close file
+   * 4. open the same file
+   * 5. read the 1 byte and compare results
+   */
+  private static void write1byte(String methodName) throws IOException {
+    final MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION, true,
+        null);
+    final FileSystem dfs = cluster.getFileSystem();
+    try {
+      final Path p = new Path("/" + methodName + "/foo");
+      final FSDataOutputStream out = createFile(dfs, p);
+      out.write(1);
+      out.close();
+      
+      final FSDataInputStream in = dfs.open(p);
+      final int b = in.read();
+      in.close();
+      Assert.assertEquals(1, b);
+    }
+    finally {
+      dfs.close();
+      cluster.shutdown();
+    }
+  }
+
+   private static void runPipelineCloseTest(String methodName,
+      Action<DatanodeID, IOException> a) throws IOException {
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+        .initTest();
+    t.fiPipelineClose.set(a);
+    write1byte(methodName);
+  }
+
+  /**
+   * Pipeline close:
+   * DN0 never responses after received close request from client.
+   * Client gets an IOException and determine DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_36() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runPipelineCloseTest(methodName, new SleepAction(methodName, 0, 0));
+  }
+
+  /**
+   * Pipeline close:
+   * DN1 never responses after received close request from client.
+   * Client gets an IOException and determine DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_37() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runPipelineCloseTest(methodName, new SleepAction(methodName, 1, 0));
+  }
+
+  /**
+   * Pipeline close:
+   * DN2 never responses after received close request from client.
+   * Client gets an IOException and determine DN2 bad.
+   */
+  @Test
+  public void pipeline_Fi_38() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runPipelineCloseTest(methodName, new SleepAction(methodName, 2, 0));
+  }
+}
diff --git a/src/test/hdfs/org/apache/hadoop/cli/CmdFactoryDFS.java b/src/test/hdfs/org/apache/hadoop/cli/CmdFactoryDFS.java
new file mode 100644
index 0000000..4d615f9
--- /dev/null
+++ b/src/test/hdfs/org/apache/hadoop/cli/CmdFactoryDFS.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cli;
+
+import org.apache.hadoop.cli.util.CLICommands;
+import org.apache.hadoop.cli.util.CLITestData;
+import org.apache.hadoop.cli.util.CmdFactory;
+import org.apache.hadoop.cli.util.CommandExecutor;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
+
+public abstract class CmdFactoryDFS extends CmdFactory {
+  public static CommandExecutor getCommandExecutor(CLITestData.TestCmd cmd,
+                                            String tag)
+    throws IllegalArgumentException {
+    CommandExecutor executor;
+    switch (cmd.getType()) {
+      case DFSADMIN:
+        executor = new CLICommands.FSCmdExecutor(tag, new DFSAdmin());
+        break;
+      default:
+        executor = CmdFactory.getCommandExecutor(cmd, tag);
+    }
+    return executor;
+  }
+}
diff --git a/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java b/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
index d91ada9..ffc63a6 100644
--- a/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
+++ b/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
@@ -18,27 +18,27 @@
 
 package org.apache.hadoop.cli;
 
-import org.apache.hadoop.cli.util.CommandExecutor;
 import org.apache.hadoop.cli.util.CLITestData.TestCmd;
 import org.apache.hadoop.cli.util.CommandExecutor.Result;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.security.authorize.PolicyProvider;
-import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
 
-public class TestHDFSCLI extends TestCLI{
+public class TestHDFSCLI extends CLITestHelper {
 
   protected MiniDFSCluster dfsCluster = null;
   protected DistributedFileSystem dfs = null;
   protected String namenode = null;
-  protected DFSAdminCmdExecutor dfsAdmCmdExecutor = null;
-  protected FSCmdExecutor fsCmdExecutor = null;
   
+  @Before
+  @Override
   public void setUp() throws Exception {
     super.setUp();
     conf.setClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
@@ -57,8 +57,6 @@
     namenode = conf.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");
     
     username = System.getProperty("user.name");
-    dfsAdmCmdExecutor = new DFSAdminCmdExecutor(namenode);
-    fsCmdExecutor =  new FSCmdExecutor(namenode);
 
     FileSystem fs = dfsCluster.getFileSystem();
     assertTrue("Not a HDFS: "+fs.getUri(),
@@ -66,10 +64,13 @@
     dfs = (DistributedFileSystem) fs;
   }
 
+  @Override
   protected String getTestFile() {
     return "testHDFSConf.xml";
   }
   
+  @After
+  @Override
   public void tearDown() throws Exception {
     dfs.close();
     dfsCluster.shutdown();
@@ -77,6 +78,7 @@
     super.tearDown();
   }
 
+  @Override
   protected String expandCommand(final String cmd) {
     String expCmd = cmd;
     expCmd = expCmd.replaceAll("NAMENODE", namenode);
@@ -84,43 +86,14 @@
     return expCmd;
   }
   
+  @Override
   protected Result execute(TestCmd cmd) throws Exception {
-    CommandExecutor executor = null;
-    switch(cmd.getType()) {
-    case DFSADMIN:
-      executor = dfsAdmCmdExecutor;
-      break;
-    case FS:
-      executor = fsCmdExecutor;
-      break;
-    default:
-      throw new Exception("Unknow type of Test command:"+ cmd.getType());
-    }
-    return executor.executeCommand(cmd.getCmd());
+    return CmdFactoryDFS.getCommandExecutor(cmd, namenode).executeCommand(cmd.getCmd());
   }
-  
-  public static class DFSAdminCmdExecutor extends CommandExecutor {
-    private String namenode = null;
-    public DFSAdminCmdExecutor(String namenode) {
-      this.namenode = namenode;
-    }
-    
-    protected void execute(final String cmd) throws Exception{
-      DFSAdmin shell = new DFSAdmin();
-      String[] args = getCommandAsArgs(cmd, "NAMENODE", this.namenode);
-      ToolRunner.run(shell, args);
-    }
-  }
-  
-  public static class FSCmdExecutor extends CommandExecutor {
-    private String namenode = null;
-    public FSCmdExecutor(String namenode) {
-      this.namenode = namenode;
-    }
-    protected void execute(final String cmd) throws Exception{
-      FsShell shell = new FsShell();
-      String[] args = getCommandAsArgs(cmd, "NAMENODE", this.namenode);
-      ToolRunner.run(shell, args);
-    }
+
+  @Test
+  @Override
+  public void testAll () {
+    super.testAll();
   }
 }
diff --git a/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml b/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml
index e2ec78e..847e047 100644
--- a/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml
+++ b/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml
@@ -15187,27 +15187,43 @@
       <comparators>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^-du &lt;path>:( |\t)*Show the amount of space, in bytes, used by the files that( )*</expected-output>
+          <expected-output>^-du \[-s\] \[-h\] &lt;path&gt;:\s+Show the amount of space, in bytes, used by the files that\s*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*match the specified file pattern.( )*Equivalent to the unix( )*</expected-output>
+          <expected-output>^\s*match the specified file pattern. The following flags are optional:</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*command "du -sb &lt;path&gt;/\*" in case of a directory,( )*</expected-output>
+          <expected-output>^\s*-s\s*Rather than showing the size of each individual file that</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*and to "du -b &lt;path&gt;" in case of a file.( )*</expected-output>
+          <expected-output>^\s*matches the pattern, shows the total \(summary\) size.</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*The output is in the form( )*</expected-output>
+          <expected-output>^\s*-h\s*Formats the sizes of files in a human-readable fashion</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*name\(full path\) size \(in bytes\)( )*</expected-output>
+          <expected-output>\s*rather than a number of bytes.</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^\s*Note that, even without the -s option, this only shows size summaries</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^\s*one level deep into a directory.</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^\s*The output is in the form </expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^\s*size\s+name\(full path\)\s*</expected-output>
         </comparator>
       </comparators>
     </test>
@@ -15226,15 +15242,7 @@
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*match the specified file pattern.  Equivalent to the unix( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*command "du -sb"  The output is in the form( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*name\(full path\) size \(in bytes\)( )*</expected-output>
+          <expected-output>^( |\t)*match the specified file pattern. This is equivalent to -du -s above.</expected-output>
         </comparator>
       </comparators>
     </test>
@@ -16680,7 +16688,7 @@
         <!-- No cleanup -->
       </cleanup-commands>
       <comparators>
-        <!-- miniDFS cluster started in TestCLI is set to match this output -->
+        <!-- miniDFS cluster started in CLITestHelper is set to match this output -->
         <comparator>
           <type>RegexpAcrossOutputComparator</type>
           <expected-output>^Rack: \/rack1\s*127\.0\.0\.1:\d+\s\(localhost.*\)\s*127\.0\.0\.1:\d+\s\(localhost.*\)</expected-output>
@@ -16868,5 +16876,144 @@
       </comparators>
     </test>
 
+
+     <test> <!--Tested -->
+      <description>Verifying chmod operation is not permitted in safemode</description>
+      <test-commands>
+        <command>-fs NAMENODE -mkdir /test </command>
+        <command>-fs NAMENODE -touchz /test/file1 </command>
+        <dfs-admin-command>-fs NAMENODE -safemode enter </dfs-admin-command>
+        <command>-fs NAMENODE -chmod 777 /test/file1 </command>
+      </test-commands>
+      <cleanup-commands>
+        <dfs-admin-command>-fs NAMENODE -safemode leave </dfs-admin-command>
+       <dfs-admin-command>-fs NAMENODE -rmr  /test </dfs-admin-command>
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>SubstringComparator</type>
+          <expected-output>Cannot set permission for /test/file1. Name node is in safe mode.</expected-output>
+        </comparator>
+      </comparators>
+    </test>
+    <test> <!--Tested -->
+      <description>Verifying chown operation is not permitted in safemode</description>
+      <test-commands>
+        <command>-fs NAMENODE -mkdir /test </command>
+        <command>-fs NAMENODE -touchz /test/file1 </command>
+        <dfs-admin-command>-fs NAMENODE -safemode enter </dfs-admin-command>
+        <command>-fs NAMENODE -chown root /test/file1 </command>
+      </test-commands>
+      <cleanup-commands>
+        <dfs-admin-command>-fs NAMENODE -safemode leave </dfs-admin-command>
+       <dfs-admin-command>-fs NAMENODE -rmr  /test </dfs-admin-command>
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>SubstringComparator</type>
+          <expected-output>Cannot set owner for /test/file1. Name node is in safe mode.</expected-output>
+        </comparator>
+      </comparators>
+    </test>
+
+   <test> <!--Tested -->
+      <description>Verifying chgrp operation is not permitted in safemode</description>
+      <test-commands>
+        <command>-fs NAMENODE -mkdir /test </command>
+        <command>-fs NAMENODE -touchz /test/file1 </command>
+        <dfs-admin-command>-fs NAMENODE -safemode enter </dfs-admin-command>
+        <command>-fs NAMENODE -chgrp newgroup /test/file1 </command>
+     </test-commands>
+      <cleanup-commands>
+        <dfs-admin-command>-fs NAMENODE -safemode leave </dfs-admin-command>
+       <dfs-admin-command>-fs NAMENODE -rmr  /test </dfs-admin-command>
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>SubstringComparator</type>
+          <expected-output>Cannot set owner for /test/file1. Name node is in safe mode.</expected-output>
+        </comparator>
+      </comparators>
+    </test>
+
+    
+   <test> <!--Tested -->
+      <description>Verifying setQuota operation is not permitted in safemode</description>
+      <test-commands>
+        <command>-fs NAMENODE -mkdir /test </command>
+        <dfs-admin-command>-fs NAMENODE -safemode enter </dfs-admin-command>
+        <dfs-admin-command>-fs NAMENODE -setQuota 100 /test </dfs-admin-command>
+      </test-commands>
+      <cleanup-commands>
+        <dfs-admin-command>-fs NAMENODE -safemode leave </dfs-admin-command>
+       <dfs-admin-command>-fs NAMENODE -rmr  /test </dfs-admin-command>
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>SubstringComparator</type>
+          <expected-output>setQuota: org.apache.hadoop.hdfs.server.namenode.SafeModeException: Cannot set quota on /test. Name node is in safe mode.</expected-output>
+        </comparator>
+      </comparators>
+    </test>
+  
+  <test> <!--Tested -->
+      <description>Verifying clrQuota operation is not permitted in safemode</description>
+      <test-commands>
+        <command>-fs NAMENODE -mkdir /test </command>
+       <dfs-admin-command>-fs NAMENODE -safemode enter </dfs-admin-command>
+        <dfs-admin-command>-fs NAMENODE -clrQuota  /test </dfs-admin-command>
+      </test-commands>
+      <cleanup-commands>
+        <dfs-admin-command>-fs NAMENODE -safemode leave </dfs-admin-command>
+       <dfs-admin-command>-fs NAMENODE -rmr  /test </dfs-admin-command>
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>SubstringComparator</type>
+          <expected-output>clrQuota: org.apache.hadoop.hdfs.server.namenode.SafeModeException: Cannot set quota on /test. Name node is in safe mode.</expected-output>
+        </comparator>
+      </comparators>
+    </test>
+  
+    
+   <test> <!--Tested -->
+      <description>Verifying setSpaceQuota operation is not permitted in safemode</description>
+      <test-commands>
+        <command>-fs NAMENODE -mkdir /test </command>
+        <dfs-admin-command>-fs NAMENODE -safemode enter </dfs-admin-command>
+        <dfs-admin-command>-fs NAMENODE -setSpaceQuota 100 /test </dfs-admin-command>
+      </test-commands>
+      <cleanup-commands>
+        <dfs-admin-command>-fs NAMENODE -safemode leave </dfs-admin-command>
+       <dfs-admin-command>-fs NAMENODE -rmr  /test </dfs-admin-command>
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>SubstringComparator</type>
+          <expected-output>setSpaceQuota: org.apache.hadoop.hdfs.server.namenode.SafeModeException: Cannot set quota on /test. Name node is in safe mode.</expected-output>
+        </comparator>
+      </comparators>
+    </test>
+  
+  <test> <!--Tested -->
+      <description>Verifying clrSpaceQuota operation is not permitted in safemode</description>
+      <test-commands>
+        <command>-fs NAMENODE -mkdir /test </command>
+        <dfs-admin-command>-fs NAMENODE -safemode enter </dfs-admin-command>
+        <dfs-admin-command>-fs NAMENODE -clrSpaceQuota  /test </dfs-admin-command>
+      </test-commands>
+      <cleanup-commands>
+        <dfs-admin-command>-fs NAMENODE -safemode leave </dfs-admin-command>
+       <dfs-admin-command>-fs NAMENODE -rmr  /test </dfs-admin-command>
+      </cleanup-commands>
+      <comparators>
+        <comparator>
+          <type>SubstringComparator</type>
+         <expected-output>clrSpaceQuota: org.apache.hadoop.hdfs.server.namenode.SafeModeException: Cannot set quota on /test. Name node is in safe mode.</expected-output>
+        </comparator>
+      </comparators>
+    </test>
+  
+
   </tests>
 </configuration>
diff --git a/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java b/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java
index 3de3316..7d94d12 100644
--- a/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java
+++ b/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java
@@ -37,6 +37,8 @@
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.apache.hadoop.fs.FileContextTestHelper.*;
+
 public class TestHDFSFileContextMainOperations extends
                                   FileContextMainOperationsBaseTest {
   private static MiniDFSCluster cluster;
@@ -99,10 +101,10 @@
   @Test
   public void testOldRenameWithQuota() throws Exception {
     DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
-    Path src1 = getTestRootPath("test/testOldRenameWithQuota/srcdir/src1");
-    Path src2 = getTestRootPath("test/testOldRenameWithQuota/srcdir/src2");
-    Path dst1 = getTestRootPath("test/testOldRenameWithQuota/dstdir/dst1");
-    Path dst2 = getTestRootPath("test/testOldRenameWithQuota/dstdir/dst2");
+    Path src1 = getTestRootPath(fc, "test/testOldRenameWithQuota/srcdir/src1");
+    Path src2 = getTestRootPath(fc, "test/testOldRenameWithQuota/srcdir/src2");
+    Path dst1 = getTestRootPath(fc, "test/testOldRenameWithQuota/dstdir/dst1");
+    Path dst2 = getTestRootPath(fc, "test/testOldRenameWithQuota/dstdir/dst2");
     createFile(src1);
     createFile(src2);
     fs.setQuota(src1.getParent(), FSConstants.QUOTA_DONT_SET,
@@ -134,10 +136,10 @@
   @Test
   public void testRenameWithQuota() throws Exception {
     DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
-    Path src1 = getTestRootPath("test/testRenameWithQuota/srcdir/src1");
-    Path src2 = getTestRootPath("test/testRenameWithQuota/srcdir/src2");
-    Path dst1 = getTestRootPath("test/testRenameWithQuota/dstdir/dst1");
-    Path dst2 = getTestRootPath("test/testRenameWithQuota/dstdir/dst2");
+    Path src1 = getTestRootPath(fc, "test/testRenameWithQuota/srcdir/src1");
+    Path src2 = getTestRootPath(fc, "test/testRenameWithQuota/srcdir/src2");
+    Path dst1 = getTestRootPath(fc, "test/testRenameWithQuota/dstdir/dst1");
+    Path dst2 = getTestRootPath(fc, "test/testRenameWithQuota/dstdir/dst2");
     createFile(src1);
     createFile(src2);
     fs.setQuota(src1.getParent(), FSConstants.QUOTA_DONT_SET,
@@ -184,7 +186,7 @@
   
   @Test
   public void testRenameRoot() throws Exception {
-    Path src = getTestRootPath("test/testRenameRoot/srcdir/src1");
+    Path src = getTestRootPath(fc, "test/testRenameRoot/srcdir/src1");
     Path dst = new Path("/");
     createFile(src);
     rename(src, dst, true, false, true, Rename.OVERWRITE);
@@ -198,8 +200,8 @@
   @Test
   public void testEditsLogOldRename() throws Exception {
     DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
-    Path src1 = getTestRootPath("testEditsLogOldRename/srcdir/src1");
-    Path dst1 = getTestRootPath("testEditsLogOldRename/dstdir/dst1");
+    Path src1 = getTestRootPath(fc, "testEditsLogOldRename/srcdir/src1");
+    Path dst1 = getTestRootPath(fc, "testEditsLogOldRename/dstdir/dst1");
     createFile(src1);
     fs.mkdirs(dst1.getParent());
     createFile(dst1);
@@ -214,8 +216,8 @@
     // loaded from the edits log
     restartCluster();
     fs = (DistributedFileSystem)cluster.getFileSystem();
-    src1 = getTestRootPath("testEditsLogOldRename/srcdir/src1");
-    dst1 = getTestRootPath("testEditsLogOldRename/dstdir/dst1");
+    src1 = getTestRootPath(fc, "testEditsLogOldRename/srcdir/src1");
+    dst1 = getTestRootPath(fc, "testEditsLogOldRename/dstdir/dst1");
     Assert.assertFalse(fs.exists(src1));   // ensure src1 is already renamed
     Assert.assertTrue(fs.exists(dst1));    // ensure rename dst exists
   }
@@ -227,8 +229,8 @@
   @Test
   public void testEditsLogRename() throws Exception {
     DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
-    Path src1 = getTestRootPath("testEditsLogRename/srcdir/src1");
-    Path dst1 = getTestRootPath("testEditsLogRename/dstdir/dst1");
+    Path src1 = getTestRootPath(fc, "testEditsLogRename/srcdir/src1");
+    Path dst1 = getTestRootPath(fc, "testEditsLogRename/dstdir/dst1");
     createFile(src1);
     fs.mkdirs(dst1.getParent());
     createFile(dst1);
@@ -243,8 +245,8 @@
     // loaded from the edits log
     restartCluster();
     fs = (DistributedFileSystem)cluster.getFileSystem();
-    src1 = getTestRootPath("testEditsLogRename/srcdir/src1");
-    dst1 = getTestRootPath("testEditsLogRename/dstdir/dst1");
+    src1 = getTestRootPath(fc, "testEditsLogRename/srcdir/src1");
+    dst1 = getTestRootPath(fc, "testEditsLogRename/dstdir/dst1");
     Assert.assertFalse(fs.exists(src1));   // ensure src1 is already renamed
     Assert.assertTrue(fs.exists(dst1));    // ensure rename dst exists
   }
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java b/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
index 9f12c0b..caeb6f7 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -25,11 +25,15 @@
 import java.io.IOException;
 import java.net.URL;
 import java.net.URLConnection;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -38,8 +42,8 @@
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -294,4 +298,84 @@
         new UnixUserGroupInformation(username, groups));
     return c;
   }
+  
+  
+  /**
+   * modify conf to contain fake users with fake group
+   * @param conf to modify
+   * @throws IOException
+   */
+  static public void updateConfigurationWithFakeUsername(Configuration conf) {
+    // fake users
+    String username="fakeUser1";
+    String[] groups = {"fakeGroup1"};
+    // mapping to groups
+    Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
+    u2g_map.put(username, groups);
+    updateConfWithFakeGroupMapping(conf, u2g_map);
+    
+    UnixUserGroupInformation.saveToConf(conf,
+        UnixUserGroupInformation.UGI_PROPERTY_NAME,
+        new UnixUserGroupInformation(username, groups));
+  }
+  
+  /**
+   * mock class to get group mapping for fake users
+   * 
+   */
+  static class MockUnixGroupsMapping extends ShellBasedUnixGroupsMapping {
+    static Map<String, String []> fakeUser2GroupsMap;
+    private static final List<String> defaultGroups;
+    static {
+      defaultGroups = new ArrayList<String>(1);
+      defaultGroups.add("supergroup");
+      fakeUser2GroupsMap = new HashMap<String, String[]>();
+    }
+  
+    @Override
+    public List<String> getGroups(String user) throws IOException {
+      boolean found = false;
+      
+      // check to see if this is one of fake users
+      List<String> l = new ArrayList<String>();
+      for(String u : fakeUser2GroupsMap.keySet()) {  
+        if(user.equals(u)) {
+          found = true;
+          for(String gr : fakeUser2GroupsMap.get(u)) {
+            l.add(gr);
+          }
+        }
+      }
+      
+      // default
+      if(!found) {
+        l =  super.getGroups(user);
+        if(l.size() == 0) {
+          System.out.println("failed to get real group for " + user + 
+              "; using default");
+          return defaultGroups;
+        }
+      }
+      return l;
+    }
+  }
+  
+  /**
+   * update the configuration with fake class for mapping user to groups
+   * @param conf
+   * @param map - user to groups mapping
+   */
+  static public void updateConfWithFakeGroupMapping
+    (Configuration conf, Map<String, String []> map) {
+    if(map!=null) {
+      MockUnixGroupsMapping.fakeUser2GroupsMap = map;
+    }
+    
+    // fake mapping user to groups
+    conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+        DFSTestUtil.MockUnixGroupsMapping.class,
+        ShellBasedUnixGroupsMapping.class);
+    
+  }
+  
 }
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java b/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 9b826a7..77f266a 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -37,6 +37,7 @@
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
@@ -258,10 +259,12 @@
     FileSystem.setDefaultUri(conf, "hdfs://localhost:"+ Integer.toString(nameNodePort));
     conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "127.0.0.1:0");  
     if (manageNameDfsDirs) {
-      conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, new File(base_dir, "name1").getPath()+","+
-               new File(base_dir, "name2").getPath());
-      conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY, new File(base_dir, "namesecondary1").
-                getPath()+"," + new File(base_dir, "namesecondary2").getPath());
+      conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+          fileAsURI(new File(base_dir, "name1"))+","+
+          fileAsURI(new File(base_dir, "name2")));
+      conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
+          fileAsURI(new File(base_dir, "namesecondary1"))+","+
+          fileAsURI(new File(base_dir, "namesecondary2")));
     }
     
     int replication = conf.getInt("dfs.replication", 3);
@@ -716,8 +719,8 @@
 
   /**
    * Restart a datanode, on the same port if requested
-   * @param dnprop, the datanode to restart
-   * @param keepPort, whether to use the same port 
+   * @param dnprop the datanode to restart
+   * @param keepPort whether to use the same port 
    * @return true if restarting is successful
    * @throws IOException
    */
@@ -808,6 +811,28 @@
   public FileSystem getFileSystem() throws IOException {
     return FileSystem.get(conf);
   }
+  
+
+  /**
+   * Get another FileSystem instance that is different from FileSystem.get(conf).
+   * This simulating different threads working on different FileSystem instances.
+   */
+  public FileSystem getNewFileSystemInstance() throws IOException {
+    return FileSystem.newInstance(conf);
+  }
+  
+  /**
+   * @return a {@link HftpFileSystem} object.
+   */
+  public HftpFileSystem getHftpFileSystem() throws IOException {
+    final String str = "hftp://"
+        + conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
+    try {
+      return (HftpFileSystem)FileSystem.get(new URI(str), conf); 
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
 
   /**
    * Get the directories where the namenode stores its image.
@@ -957,7 +982,6 @@
 
   /**
    * Access to the data directory used for Datanodes
-   * @throws IOException 
    */
   public String getDataDirectory() {
     return data_dir.getAbsolutePath();
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java
deleted file mode 100644
index f77de26..0000000
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import junit.framework.TestCase;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-
-/**
- * These tests make sure that DFSClient retries fetching data from DFS
- * properly in case of errors.
- */
-public class TestDFSClientExcludedNodes extends TestCase {
-  public void testExcludedNodes() throws IOException
-  {   
-    Configuration conf = new HdfsConfiguration();
-    MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
-    FileSystem fs = cluster.getFileSystem();
-    Path filePath = new Path("/testExcludedNodes");
-
-    // kill a datanode
-    cluster.stopDataNode(AppendTestUtil.nextInt(3));
-    OutputStream out = fs.create(filePath, true, 4096);
-    out.write(20);
-    
-    try {
-      out.close();
-    } catch (Exception e) {
-      fail("DataNode failure should not result in a block abort: \n" + e.getMessage());
-    }
-  }
-  
-}
-
-   
-  
\ No newline at end of file
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
index 6cf9ff1..23776f4 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
@@ -20,6 +20,8 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Arrays;
+import java.security.MessageDigest;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -137,17 +139,8 @@
       return versionID;
     }
 
-    public LocatedBlock addBlock(String src, String clientName,
-                                 Block previous) throws IOException {
-
-      return addBlock(src, clientName, previous, null);
-    }
-
-    public LocatedBlock addBlock(String src,
-                                 String clientName,
-                                 Block previous,
-                                 DatanodeInfo[] excludedNode
-                                 ) throws IOException
+    public LocatedBlock addBlock(String src, String clientName, Block previous)
+    throws IOException
     {
       num_calls++;
       if (num_calls > num_calls_allowed) { 
@@ -258,4 +251,242 @@
     }
   }
   
+  
+  /**
+   * Test that a DFSClient waits for random time before retry on busy blocks.
+   */
+  public void testDFSClientRetriesOnBusyBlocks() throws IOException {
+    
+    System.out.println("Testing DFSClient random waiting on busy blocks.");
+    
+    //
+    // Test settings: 
+    // 
+    //           xcievers    fileLen   #clients  timeWindow    #retries
+    //           ========    =======   ========  ==========    ========
+    // Test 1:          2       6 MB         50      300 ms           3
+    // Test 2:          2       6 MB         50      300 ms          50
+    // Test 3:          2       6 MB         50     1000 ms           3
+    // Test 4:          2       6 MB         50     1000 ms          50
+    // 
+    //   Minimum xcievers is 2 since 1 thread is reserved for registry.
+    //   Test 1 & 3 may fail since # retries is low. 
+    //   Test 2 & 4 should never fail since (#threads)/(xcievers-1) is the upper
+    //   bound for guarantee to not throw BlockMissingException.
+    //
+    int xcievers  = 2;
+    int fileLen   = 6*1024*1024;
+    int threads   = 50;
+    int retries   = 3;
+    int timeWin   = 300;
+    
+    //
+    // Test 1: might fail
+    // 
+    long timestamp = System.currentTimeMillis();
+    boolean pass = busyTest(xcievers, threads, fileLen, timeWin, retries);
+    long timestamp2 = System.currentTimeMillis();
+    if ( pass ) {
+      LOG.info("Test 1 succeeded! Time spent: " + (timestamp2-timestamp)/1000.0 + " sec.");
+    } else {
+      LOG.warn("Test 1 failed, but relax. Time spent: " + (timestamp2-timestamp)/1000.0 + " sec.");
+    }
+    
+    //
+    // Test 2: should never fail
+    // 
+    retries = 50;
+    timestamp = System.currentTimeMillis();
+    pass = busyTest(xcievers, threads, fileLen, timeWin, retries);
+    timestamp2 = System.currentTimeMillis();
+    assertTrue("Something wrong! Test 2 got Exception with maxmum retries!", pass);
+    LOG.info("Test 2 succeeded! Time spent: "  + (timestamp2-timestamp)/1000.0 + " sec.");
+    
+    //
+    // Test 3: might fail
+    // 
+    retries = 3;
+    timeWin = 1000;
+    timestamp = System.currentTimeMillis();
+    pass = busyTest(xcievers, threads, fileLen, timeWin, retries);
+    timestamp2 = System.currentTimeMillis();
+    if ( pass ) {
+      LOG.info("Test 3 succeeded! Time spent: " + (timestamp2-timestamp)/1000.0 + " sec.");
+    } else {
+      LOG.warn("Test 3 failed, but relax. Time spent: " + (timestamp2-timestamp)/1000.0 + " sec.");
+    }
+    
+    //
+    // Test 4: should never fail
+    //
+    retries = 50;
+    timeWin = 1000;
+    timestamp = System.currentTimeMillis();
+    pass = busyTest(xcievers, threads, fileLen, timeWin, retries);
+    timestamp2 = System.currentTimeMillis();
+    assertTrue("Something wrong! Test 4 got Exception with maxmum retries!", pass);
+    LOG.info("Test 4 succeeded! Time spent: "  + (timestamp2-timestamp)/1000.0 + " sec.");
+  }
+
+  private boolean busyTest(int xcievers, int threads, int fileLen, int timeWin, int retries) 
+    throws IOException {
+
+    boolean ret = true;
+    short replicationFactor = 1;
+    long blockSize = 128*1024*1024; // DFS block size
+    int bufferSize = 4096;
+    
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt("dfs.datanode.max.xcievers",xcievers);
+    conf.setInt("dfs.client.max.block.acquire.failures", retries);
+    conf.setInt("dfs.client.retry.window.base", timeWin);
+
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, replicationFactor, true, null);
+    cluster.waitActive();
+    
+    FileSystem fs = cluster.getFileSystem();
+    Path file1 = new Path("test_data.dat");
+    file1 = file1.makeQualified(fs.getUri(), fs.getWorkingDirectory()); // make URI hdfs://
+    
+    try {
+      
+      FSDataOutputStream stm = fs.create(file1, true,
+                                         bufferSize,
+                                         replicationFactor,
+                                         blockSize);
+      
+      // verify that file exists in FS namespace
+      assertTrue(file1 + " should be a file", 
+                  fs.getFileStatus(file1).isDir() == false);
+      System.out.println("Path : \"" + file1 + "\"");
+      LOG.info("Path : \"" + file1 + "\"");
+
+      // write 1 block to file
+      byte[] buffer = AppendTestUtil.randomBytes(System.currentTimeMillis(), fileLen);
+      stm.write(buffer, 0, fileLen);
+      stm.close();
+
+      // verify that file size has changed to the full size
+      long len = fs.getFileStatus(file1).getLen();
+      
+      assertTrue(file1 + " should be of size " + fileLen +
+                 " but found to be of size " + len, 
+                  len == fileLen);
+      
+      // read back and check data integrigy
+      byte[] read_buf = new byte[fileLen];
+      InputStream in = fs.open(file1, fileLen);
+      IOUtils.readFully(in, read_buf, 0, fileLen);
+      assert(Arrays.equals(buffer, read_buf));
+      in.close();
+      read_buf = null; // GC it if needed
+      
+      // compute digest of the content to reduce memory space
+      MessageDigest m = MessageDigest.getInstance("SHA");
+      m.update(buffer, 0, fileLen);
+      byte[] hash_sha = m.digest();
+
+      // spawn multiple threads and all trying to access the same block
+      Thread[] readers = new Thread[threads];
+      Counter counter = new Counter(0);
+      for (int i = 0; i < threads; ++i ) {
+        DFSClientReader reader = new DFSClientReader(file1, cluster, hash_sha, fileLen, counter);
+        readers[i] = new Thread(reader);
+        readers[i].start();
+      }
+      
+      // wait for them to exit
+      for (int i = 0; i < threads; ++i ) {
+        readers[i].join();
+      }
+      if ( counter.get() == threads )
+        ret = true;
+      else
+        ret = false;
+      
+    } catch (InterruptedException e) {
+      System.out.println("Thread got InterruptedException.");
+      e.printStackTrace();
+      ret = false;
+    } catch (Exception e) {
+      e.printStackTrace();
+      ret = false;
+    } finally {
+      fs.delete(file1, false);
+      cluster.shutdown();
+    }
+    return ret;
+  }
+  
+  class DFSClientReader implements Runnable {
+    
+    DFSClient client;
+    Configuration conf;
+    byte[] expected_sha;
+    FileSystem  fs;
+    Path filePath;
+    MiniDFSCluster cluster;
+    int len;
+    Counter counter;
+
+    DFSClientReader(Path file, MiniDFSCluster cluster, byte[] hash_sha, int fileLen, Counter cnt) {
+      filePath = file;
+      this.cluster = cluster;
+      counter = cnt;
+      len = fileLen;
+      conf = new HdfsConfiguration();
+      expected_sha = hash_sha;
+      try {
+        cluster.waitActive();
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+    
+    public void run() {
+      try {
+        fs = cluster.getNewFileSystemInstance();
+        
+        int bufferSize = len;
+        byte[] buf = new byte[bufferSize];
+
+        InputStream in = fs.open(filePath, bufferSize);
+        
+        // read the whole file
+        IOUtils.readFully(in, buf, 0, bufferSize);
+        
+        // compare with the expected input
+        MessageDigest m = MessageDigest.getInstance("SHA");
+        m.update(buf, 0, bufferSize);
+        byte[] hash_sha = m.digest();
+        
+        buf = null; // GC if needed since there may be too many threads
+        in.close();
+        fs.close();
+
+        assertTrue("hashed keys are not the same size",
+                   hash_sha.length == expected_sha.length);
+
+        assertTrue("hashed keys are not equal",
+                   Arrays.equals(hash_sha, expected_sha));
+        
+        counter.inc(); // count this thread as successful
+        
+        LOG.info("Thread correctly read the block.");
+        
+      } catch (BlockMissingException e) {
+        LOG.info("Bad - BlockMissingException is caught.");
+        e.printStackTrace();
+      } catch (Exception e) {
+        e.printStackTrace();
+      } 
+    }
+  }
+
+  class Counter {
+    int counter;
+    Counter(int n) { counter = n; }
+    public synchronized void inc() { ++counter; }
+    public int get() { return counter; }
+  }
 }
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSPermission.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSPermission.java
index 113f385..623012f 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSPermission.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSPermission.java
@@ -19,22 +19,29 @@
 
 import java.io.IOException;
 import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Random;
 
 import javax.security.auth.login.LoginException;
 
-import org.apache.commons.logging.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UnixUserGroupInformation;
-
 import junit.framework.AssertionFailedError;
 import junit.framework.TestCase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+
 /** Unit tests for permission */
 public class TestDFSPermission extends TestCase {
   public static final Log LOG = LogFactory.getLog(TestDFSPermission.class);
@@ -81,6 +88,13 @@
       // explicitly turn on permission checking
       conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
       
+      // create fake mapping for the groups
+      Map<String, String[]> u2g_map = new HashMap<String, String[]> (3);
+      u2g_map.put(USER1_NAME, new String[] {GROUP1_NAME, GROUP2_NAME });
+      u2g_map.put(USER2_NAME, new String[] {GROUP2_NAME, GROUP3_NAME });
+      u2g_map.put(USER3_NAME, new String[] {GROUP3_NAME, GROUP4_NAME });
+      DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
+      
       // Initiate all four users
       SUPERUSER = UnixUserGroupInformation.login(conf);
       USER1 = new UnixUserGroupInformation(USER1_NAME, new String[] {
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
index 0096a51..f59d005 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
@@ -19,6 +19,8 @@
 
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.READ_BLOCK;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.WRITE_BLOCK;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
 
@@ -157,9 +159,8 @@
         
     //ok finally write a block with 0 len
     SUCCESS.write(recvOut);
-    Text.writeString(recvOut, ""); // first bad node
-    recvOut.writeLong(100);        // sequencenumber
-    SUCCESS.write(recvOut);
+    Text.writeString(recvOut, "");
+    new PipelineAck(100, new Status[]{SUCCESS}).write(recvOut);
     sendRecvData(description, false);
   }
   
@@ -381,9 +382,8 @@
     // bad data chunk length
     sendOut.writeInt(-1-random.nextInt(oneMil));
     SUCCESS.write(recvOut);
-    Text.writeString(recvOut, ""); // first bad node
-    recvOut.writeLong(100);        // sequencenumber
-    ERROR.write(recvOut);
+    Text.writeString(recvOut, "");
+    new PipelineAck(100, new Status[]{ERROR}).write(recvOut);
     sendRecvData("negative DATA_CHUNK len while writing block " + newBlockId, 
                  true);
 
@@ -406,9 +406,8 @@
     sendOut.flush();
     //ok finally write a block with 0 len
     SUCCESS.write(recvOut);
-    Text.writeString(recvOut, ""); // first bad node
-    recvOut.writeLong(100);        // sequencenumber
-    SUCCESS.write(recvOut);
+    Text.writeString(recvOut, "");
+    new PipelineAck(100, new Status[]{SUCCESS}).write(recvOut);
     sendRecvData("Writing a zero len block blockid " + newBlockId, false);
     
     /* Test OP_READ_BLOCK */
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestFSInputChecker.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestFSInputChecker.java
index 79a763f..a979581 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestFSInputChecker.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestFSInputChecker.java
@@ -212,19 +212,26 @@
   private void testChecker(FileSystem fileSys, boolean readCS)
   throws Exception {
     Path file = new Path("try.dat");
-    if( readCS ) {
-      writeFile(fileSys, file);
-    } else {
-      writeFile(fileSys, file);
+    writeFile(fileSys, file);
+
+    try {
+      if (!readCS) {
+        fileSys.setVerifyChecksum(false);
+      }
+
+      stm = fileSys.open(file);
+      checkReadAndGetPos();
+      checkSeek();
+      checkSkip();
+      //checkMark
+      assertFalse(stm.markSupported());
+      stm.close();
+    } finally {
+      if (!readCS) {
+        fileSys.setVerifyChecksum(true);
+      }
+      cleanupFile(fileSys, file);
     }
-    stm = fileSys.open(file);
-    checkReadAndGetPos();
-    checkSeek();
-    checkSkip();
-    //checkMark
-    assertFalse(stm.markSupported());
-    stm.close();
-    cleanupFile(fileSys, file);
   }
   
   private void testFileCorruption(LocalFileSystem fileSys) throws IOException {
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java
index 3f36360..03243eb 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java
@@ -17,22 +17,31 @@
  */
 package org.apache.hadoop.hdfs;
 
-import junit.framework.TestCase;
-import java.io.*;
+import java.io.IOException;
 import java.util.Random;
 
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.log4j.Level;
 
 /**
  * This class tests the FileStatus API.
  */
 public class TestFileStatus extends TestCase {
+  {
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)FileSystem.LOG).getLogger().setLevel(Level.ALL);
+  }
+
   static final long seed = 0xDEADBEEFL;
   static final int blockSize = 8192;
   static final int fileSize = 16384;
@@ -64,6 +73,7 @@
     Configuration conf = new HdfsConfiguration();
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
     FileSystem fs = cluster.getFileSystem();
+    final HftpFileSystem hftpfs = cluster.getHftpFileSystem();
     final DFSClient dfsClient = new DFSClient(NameNode.getAddress(conf), conf);
     try {
 
@@ -111,8 +121,10 @@
       assertTrue(fs.exists(dir));
       assertTrue(dir + " should be a directory", 
                  fs.getFileStatus(path).isDir() == true);
-      assertTrue(dir + " should be zero size ",
-                 fs.getContentSummary(dir).getLength() == 0);
+      assertEquals(dir + " should be zero size ",
+          0, fs.getContentSummary(dir).getLength());
+      assertEquals(dir + " should be zero size using hftp",
+          0, hftpfs.getContentSummary(dir).getLength());
       assertTrue(dir + " should be zero size ",
                  fs.getFileStatus(dir).getLen() == 0);
       System.out.println("Dir : \"" + dir + "\"");
@@ -139,8 +151,11 @@
 
       // verify that the size of the directory increased by the size 
       // of the two files
-      assertTrue(dir + " size should be " + (blockSize/2), 
-                 blockSize/2 == fs.getContentSummary(dir).getLength());
+      final int expected = blockSize/2;  
+      assertEquals(dir + " size should be " + expected, 
+          expected, fs.getContentSummary(dir).getLength());
+      assertEquals(dir + " size should be " + expected + " using hftp", 
+          expected, hftpfs.getContentSummary(dir).getLength());
     } finally {
       fs.close();
       cluster.shutdown();
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestHDFSServerPorts.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestHDFSServerPorts.java
index 5792b27..a1d27d6 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestHDFSServerPorts.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestHDFSServerPorts.java
@@ -28,6 +28,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.BackupNode;
@@ -92,7 +93,8 @@
       throw new IOException("Could not delete hdfs directory '" + hdfsDir + "'");
     }
     config = new HdfsConfiguration();
-    config.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, new File(hdfsDir, "name1").getPath());
+    config.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+        fileAsURI(new File(hdfsDir, "name1")).toString());
     FileSystem.setDefaultUri(config, "hdfs://"+NAME_NODE_HOST + "0");
     config.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, NAME_NODE_HTTP_HOST + "0");
     NameNode.format(config);
@@ -120,7 +122,8 @@
     assertTrue(currDir2.mkdirs());
     assertTrue(currDir3.mkdirs());
     
-    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, new File(hdfsDir, "name2").getPath());
+    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+        fileAsURI(new File(hdfsDir, "name2")).toString());
     conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, "${dfs.name.dir}");
     
     // Start BackupNode
@@ -246,7 +249,8 @@
 
       // start another namenode on the same port
       Configuration conf2 = new HdfsConfiguration(config);
-      conf2.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, new File(hdfsDir, "name2").getPath());
+      conf2.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+          fileAsURI(new File(hdfsDir, "name2")).toString());
       NameNode.format(conf2);
       boolean started = canStartNameNode(conf2);
       assertFalse(started); // should fail
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
index 3a3d455..718a7cb 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
@@ -17,10 +17,14 @@
  */
 package org.apache.hadoop.hdfs;
 
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.log4j.Level;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import org.junit.Test;
@@ -30,6 +34,11 @@
 /** Class contains a set of tests to verify the correctness of 
  * newly introduced {@link FSDataOutputStream#hflush()} method */
 public class TestHFlush {
+  {
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+  
   private final String fName = "hflushtest.dat";
   
   /** The test uses {@link #doTheJob(Configuration, String, long, short)
@@ -143,4 +152,55 @@
       actual[idx] = 0;
     }
   }
+  
+  /** This creates a slow writer and check to see 
+   * if pipeline heartbeats work fine
+   */
+ @Test
+  public void testPipelineHeartbeat() throws Exception {
+    final int DATANODE_NUM = 2;
+    final int fileLen = 6;
+    Configuration conf = new HdfsConfiguration();
+    final int timeout = 2000;
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 
+        timeout);
+
+    final Path p = new Path("/pipelineHeartbeat/foo");
+    System.out.println("p=" + p);
+    
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null);
+    DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem();
+
+    byte[] fileContents = AppendTestUtil.initBuffer(fileLen);
+
+    // create a new file.
+    FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, DATANODE_NUM);
+
+    stm.write(fileContents, 0, 1);
+    Thread.sleep(timeout);
+    stm.hflush();
+    System.out.println("Wrote 1 byte and hflush " + p);
+
+    // write another byte
+    Thread.sleep(timeout);
+    stm.write(fileContents, 1, 1);
+    stm.hflush();
+    
+    stm.write(fileContents, 2, 1);
+    Thread.sleep(timeout);
+    stm.hflush();
+    
+    stm.write(fileContents, 3, 1);
+    Thread.sleep(timeout);
+    stm.write(fileContents, 4, 1);
+    stm.hflush();
+    
+    stm.write(fileContents, 5, 1);
+    Thread.sleep(timeout);
+    stm.close();
+
+    // verify that entire file is good
+    AppendTestUtil.checkFullFile(fs, p, fileLen,
+        fileContents, "Failed to slowly write to a file");
+  }
 }
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
index 4eb8366..20a41c1 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
@@ -29,7 +31,6 @@
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Level;
@@ -45,6 +46,9 @@
   static final int FILE_SIZE = 1024*16;
   static final short REPLICATION_NUM = (short)3;
   static byte[] buffer = new byte[FILE_SIZE];
+  
+  static private String fakeUsername = "fakeUser1";
+  static private String fakeGroup = "supergroup";
 
   public void testBlockSynchronization() throws Exception {
     final long softLease = 1000;
@@ -56,6 +60,13 @@
     conf.setInt("dfs.heartbeat.interval", 1);
   //  conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 16);
 
+    // create fake mapping user to group and set it to the conf
+    // NOTE. this must be done at the beginning, before first call to mapping
+    // functions
+    Map<String, String []> u2g_map = new HashMap<String, String []>(1);
+    u2g_map.put(fakeUsername, new String[] {fakeGroup});
+    DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
+
     MiniDFSCluster cluster = null;
     DistributedFileSystem dfs = null;
     byte[] actual = new byte[FILE_SIZE];
@@ -93,10 +104,9 @@
       // should fail but will trigger lease recovery.
       {
         Configuration conf2 = new HdfsConfiguration(conf);
-        String username = UserGroupInformation.getCurrentUGI().getUserName()+"_1";
         UnixUserGroupInformation.saveToConf(conf2,
             UnixUserGroupInformation.UGI_PROPERTY_NAME,
-            new UnixUserGroupInformation(username, new String[]{"supergroup"}));
+            new UnixUserGroupInformation(fakeUsername, new String[]{fakeGroup}));
         FileSystem dfs2 = FileSystem.get(conf2);
   
         boolean done = false;
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java
index 3e1148f..921d411 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestListPathServlet.java
@@ -60,7 +60,7 @@
     final String str = "hftp://"
         + CONF.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
     hftpURI = new URI(str);
-    hftpFs = (HftpFileSystem) FileSystem.newInstance(hftpURI, CONF);
+    hftpFs = cluster.getHftpFileSystem();
   }
 
   @AfterClass
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
index b2b6c6e..6e9a05b 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
@@ -17,23 +17,24 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.io.IOException;
+import java.io.OutputStream;
+
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
 /** Test reading from hdfs while a file is being written. */
 public class TestReadWhileWriting {
   {
@@ -44,6 +45,7 @@
   private static final String DIR = "/"
       + TestReadWhileWriting.class.getSimpleName() + "/";
   private static final int BLOCK_SIZE = 8192;
+  private static final long LEASE_LIMIT = 500;
   
   /** Test reading while writing. */
   @Test
@@ -51,13 +53,13 @@
     final Configuration conf = new HdfsConfiguration();
     //enable append
     conf.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
+    conf.setLong("dfs.heartbeat.interval", 1);
 
     // create cluster
     final MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
     try {
-      //change the lease soft limit to 1 second.
-      final long leaseSoftLimit = 1000;
-      cluster.setLeasePeriod(leaseSoftLimit, FSConstants.LEASE_HARDLIMIT_PERIOD);
+      //change the lease limits.
+      cluster.setLeasePeriod(LEASE_LIMIT, LEASE_LIMIT);
 
       //wait for the cluster
       cluster.waitActive();
@@ -81,26 +83,44 @@
       //b. On another machine M2, open file and verify that the half-block
       //   of data can be read successfully.
       checkFile(p, half, conf);
+      AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
+      ((DistributedFileSystem)fs).dfs.leasechecker.interruptAndJoin();
 
-      /* TODO: enable the following when append is done.
       //c. On M1, append another half block of data.  Close file on M1.
       {
-        //sleep to make sure the lease is expired the soft limit.
-        Thread.sleep(2*leaseSoftLimit);
-
-        FSDataOutputStream out = fs.append(p);
+        //sleep to let the lease is expired.
+        Thread.sleep(2*LEASE_LIMIT);
+  
+        final DistributedFileSystem dfs = (DistributedFileSystem)FileSystem.newInstance(conf);
+        final FSDataOutputStream out = append(dfs, p);
         write(out, 0, half);
         out.close();
       }
 
       //d. On M2, open file and read 1 block of data from it. Close file.
       checkFile(p, 2*half, conf);
-      */
     } finally {
       cluster.shutdown();
     }
   }
 
+  /** Try openning a file for append. */
+  private static FSDataOutputStream append(FileSystem fs, Path p) throws Exception {
+    for(int i = 0; i < 10; i++) {
+      try {
+        return fs.append(p);
+      } catch(RemoteException re) {
+        if (re.getClassName().equals(RecoveryInProgressException.class.getName())) {
+          AppendTestUtil.LOG.info("Will sleep and retry, i=" + i +", p="+p, re);
+          Thread.sleep(1000);
+        }
+        else
+          throw re;
+      }
+    }
+    throw new IOException("Cannot append to " + p);
+  }
+
   static private int userCount = 0;
   //check the file
   static void checkFile(Path p, int expectedsize, Configuration conf
@@ -113,10 +133,10 @@
         UnixUserGroupInformation.UGI_PROPERTY_NAME,
         new UnixUserGroupInformation(username, new String[]{"supergroup"}));
     final FileSystem fs = FileSystem.get(conf2);
-    final InputStream in = fs.open(p);
+    final DFSDataInputStream in = (DFSDataInputStream)fs.open(p);
 
-    //Is the data available?
-    Assert.assertTrue(available(in, expectedsize));
+    //Check visible length
+    Assert.assertTrue(in.getVisibleLength() >= expectedsize);
 
     //Able to read?
     for(int i = 0; i < expectedsize; i++) {
@@ -135,15 +155,5 @@
     }
     out.write(bytes);
   }
-
-  /** Is the data available? */
-  private static boolean available(InputStream in, int expectedsize
-      ) throws IOException {
-    final int available = in.available();
-    System.out.println(" in.available()=" + available);
-    Assert.assertTrue(available >= 0);
-    Assert.assertTrue(available <= expectedsize);
-    return available == expectedsize;
-  }
 }
 
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 5b19019..91c0fa1 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -180,6 +180,7 @@
       totalCapacity += capacity;
     }
     runBalancer(conf, totalUsedSpace, totalCapacity);
+    cluster.shutdown();
   }
 
   /* wait for one heartbeat */
@@ -261,6 +262,38 @@
     } while(!balanced);
 
   }
+
+  private void runBalancerDefaultConstructor(Configuration conf,
+      long totalUsedSpace, long totalCapacity) throws Exception {
+    waitForHeartBeat(totalUsedSpace, totalCapacity);
+
+    // start rebalancing
+    balancer = new Balancer();
+    balancer.setConf(conf);
+    balancer.run(new String[0]);
+
+    waitForHeartBeat(totalUsedSpace, totalCapacity);
+    boolean balanced;
+    do {
+      DatanodeInfo[] datanodeReport = client
+          .getDatanodeReport(DatanodeReportType.ALL);
+      assertEquals(datanodeReport.length, cluster.getDataNodes().size());
+      balanced = true;
+      double avgUtilization = ((double) totalUsedSpace) / totalCapacity * 100;
+      for (DatanodeInfo datanode : datanodeReport) {
+        if (Math.abs(avgUtilization - ((double) datanode.getDfsUsed())
+            / datanode.getCapacity() * 100) > 10) {
+          balanced = false;
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException ignored) {
+          }
+          break;
+        }
+      }
+    } while (!balanced);
+
+  }
   
   /** one-node cluster test*/
   private void oneNodeTest(Configuration conf) throws Exception {
@@ -298,6 +331,44 @@
         new long[]{CAPACITY, CAPACITY},
         new String[] {RACK0, RACK1});
   }
+  
+  public void testBalancer2() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    testBalancerDefaultConstructor(conf, new long[] { CAPACITY, CAPACITY },
+        new String[] { RACK0, RACK1 }, CAPACITY, RACK2);
+  }
+
+  private void testBalancerDefaultConstructor(Configuration conf,
+      long[] capacities, String[] racks, long newCapacity, String newRack)
+      throws Exception {
+    int numOfDatanodes = capacities.length;
+    assertEquals(numOfDatanodes, racks.length);
+    cluster = new MiniDFSCluster(0, conf, capacities.length, true, true, null,
+        racks, capacities);
+    try {
+      cluster.waitActive();
+      client = DFSClient.createNamenode(conf);
+
+      long totalCapacity = 0L;
+      for (long capacity : capacities) {
+        totalCapacity += capacity;
+      }
+      // fill up the cluster to be 30% full
+      long totalUsedSpace = totalCapacity * 3 / 10;
+      createFile(totalUsedSpace / numOfDatanodes, (short) numOfDatanodes);
+      // start up an empty node with the same capacity and on the same rack
+      cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
+          new long[] { newCapacity });
+
+      totalCapacity += newCapacity;
+
+      // run balancer and validate results
+      runBalancerDefaultConstructor(conf, totalUsedSpace, totalCapacity);
+    } finally {
+      cluster.shutdown();
+    }
+  }
 
   /**
    * @param args
@@ -306,5 +377,6 @@
     TestBalancer balancerTest = new TestBalancer();
     balancerTest.testBalancer0();
     balancerTest.testBalancer1();
+    balancerTest.testBalancer2();
   }
 }
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestGetUriFromString.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestGetUriFromString.java
new file mode 100644
index 0000000..b1ffca8
--- /dev/null
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/common/TestGetUriFromString.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import java.io.IOException;
+import java.net.URI;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This is a unit test, which tests {@link Util#stringAsURI(String)}
+ * for Windows and Unix style file paths.
+ */
+public class TestGetUriFromString extends TestCase {
+  private static final Log LOG = LogFactory.getLog(TestGetUriFromString.class);
+
+  private static final String RELATIVE_FILE_PATH = "relativeFilePath";
+  private static final String ABSOLUTE_PATH_UNIX = "/tmp/file1";
+  private static final String ABSOLUTE_PATH_WINDOWS =
+    "C:\\Documents and Settings\\All Users";
+  private static final String URI_FILE_SCHEMA = "file";
+  private static final String URI_PATH_UNIX = "/var/www";
+  private static final String URI_PATH_WINDOWS =
+    "/C:/Documents%20and%20Settings/All%20Users";
+  private static final String URI_UNIX = URI_FILE_SCHEMA + "://"
+      + URI_PATH_UNIX;
+  private static final String URI_WINDOWS = URI_FILE_SCHEMA + "://"
+      + URI_PATH_WINDOWS;
+
+  /**
+   * Test for a relative path, os independent
+   * @throws IOException 
+   */
+  public void testRelativePathAsURI() throws IOException {
+    URI u = Util.stringAsURI(RELATIVE_FILE_PATH);
+    LOG.info("Uri: " + u);
+    assertNotNull(u);
+  }
+
+  /**
+   * Test for an OS dependent absolute paths.
+   * @throws IOException 
+   */
+  public void testAbsolutePathAsURI() throws IOException {
+    URI u = null;
+    u = Util.stringAsURI(ABSOLUTE_PATH_WINDOWS);
+    assertNotNull(
+        "Uri should not be null for Windows path" + ABSOLUTE_PATH_WINDOWS, u);
+    assertEquals(URI_FILE_SCHEMA, u.getScheme());
+    u = Util.stringAsURI(ABSOLUTE_PATH_UNIX);
+    assertNotNull("Uri should not be null for Unix path" + ABSOLUTE_PATH_UNIX, u);
+    assertEquals(URI_FILE_SCHEMA, u.getScheme());
+  }
+
+  /**
+   * Test for a URI
+   * @throws IOException 
+   */
+  public void testURI() throws IOException {
+    LOG.info("Testing correct Unix URI: " + URI_UNIX);
+    URI u = Util.stringAsURI(URI_UNIX);
+    LOG.info("Uri: " + u);    
+    assertNotNull("Uri should not be null at this point", u);
+    assertEquals(URI_FILE_SCHEMA, u.getScheme());
+    assertEquals(URI_PATH_UNIX, u.getPath());
+
+    LOG.info("Testing correct windows URI: " + URI_WINDOWS);
+    u = Util.stringAsURI(URI_WINDOWS);
+    LOG.info("Uri: " + u);
+    assertNotNull("Uri should not be null at this point", u);
+    assertEquals(URI_FILE_SCHEMA, u.getScheme());
+    assertEquals(URI_PATH_WINDOWS.replace("%20", " "), u.getPath());
+  }
+}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
index f0e898e..81da8b8 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBackupNode.java
@@ -215,12 +215,12 @@
     try {
       // start name-node and backup node 1
       cluster = new MiniDFSCluster(conf1, 0, true, null);
-      conf1.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY, "0.0.0.0:7770");
+      conf1.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY, "0.0.0.0:7771");
       conf1.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY, "0.0.0.0:7775");
       backup1 = startBackupNode(conf1, StartupOption.BACKUP, 1);
       // try to start backup node 2
       conf2 = new HdfsConfiguration(conf1);
-      conf2.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY, "0.0.0.0:7771");
+      conf2.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY, "0.0.0.0:7772");
       conf2.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY, "0.0.0.0:7776");
       try {
         backup2 = startBackupNode(conf2, StartupOption.BACKUP, 2);
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
index 0bbca80..ed1d6be 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStartup.java
@@ -36,6 +36,7 @@
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
@@ -48,7 +49,7 @@
  */
 public class TestStartup extends TestCase {
   public static final String NAME_NODE_HOST = "localhost:";
-  public static final String NAME_NODE_HTTP_HOST = "0.0.0.0:";
+  public static final String WILDCARD_HTTP_HOST = "0.0.0.0:";
   private static final Log LOG =
     LogFactory.getLog(TestStartup.class.getName());
   private Configuration config;
@@ -74,18 +75,20 @@
 
   protected void setUp() throws Exception {
     config = new HdfsConfiguration();
-    String baseDir = System.getProperty("test.build.data", "/tmp");
+    hdfsDir = new File(MiniDFSCluster.getBaseDirectory());
 
-    hdfsDir = new File(baseDir, "dfs");
     if ( hdfsDir.exists() && !FileUtil.fullyDelete(hdfsDir) ) {
       throw new IOException("Could not delete hdfs directory '" + hdfsDir + "'");
     }
     LOG.info("--hdfsdir is " + hdfsDir.getAbsolutePath());
-    config.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, new File(hdfsDir, "name").getPath());
-    config.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, new File(hdfsDir, "data").getPath());
-
-    config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,new File(hdfsDir, "secondary").getPath());
-    //config.set("fs.default.name", "hdfs://"+ NAME_NODE_HOST + "0");
+    config.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+        fileAsURI(new File(hdfsDir, "name")).toString());
+    config.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
+        new File(hdfsDir, "data").getPath());
+    config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
+        fileAsURI(new File(hdfsDir, "secondary")).toString());
+    config.set(DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY,
+	       WILDCARD_HTTP_HOST + "0");
     
     FileSystem.setDefaultUri(config, "hdfs://"+NAME_NODE_HOST + "0");
   }
@@ -231,11 +234,15 @@
   public void testChkpointStartup2() throws IOException{
     LOG.info("--starting checkpointStartup2 - same directory for checkpoint");
     // different name dirs
-    config.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, new File(hdfsDir, "name").getPath());
-    config.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, new File(hdfsDir, "edits").getPath());
+    config.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+        fileAsURI(new File(hdfsDir, "name")).toString());
+    config.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+        fileAsURI(new File(hdfsDir, "edits")).toString());
     // same checkpoint dirs
-    config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY, new File(hdfsDir, "chkpt").getPath());
-    config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY, new File(hdfsDir, "chkpt").getPath());
+    config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY,
+        fileAsURI(new File(hdfsDir, "chkpt")).toString());
+    config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
+        fileAsURI(new File(hdfsDir, "chkpt")).toString());
 
     createCheckPoint();
 
@@ -253,11 +260,15 @@
     //setUpConfig();
     LOG.info("--starting testStartup Recovery");
     // different name dirs
-    config.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, new File(hdfsDir, "name").getPath());
-    config.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, new File(hdfsDir, "edits").getPath());
+    config.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+        fileAsURI(new File(hdfsDir, "name")).toString());
+    config.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+        fileAsURI(new File(hdfsDir, "edits")).toString());
     // same checkpoint dirs
-    config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY, new File(hdfsDir, "chkpt_edits").getPath());
-    config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY, new File(hdfsDir, "chkpt").getPath());
+    config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY,
+        fileAsURI(new File(hdfsDir, "chkpt_edits")).toString());
+    config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
+        fileAsURI(new File(hdfsDir, "chkpt")).toString());
 
     createCheckPoint();
     corruptNameNodeFiles();
@@ -274,11 +285,15 @@
     //setUpConfig();
     LOG.info("--starting SecondNN startup test");
     // different name dirs
-    config.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, new File(hdfsDir, "name").getPath());
-    config.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, new File(hdfsDir, "name").getPath());
+    config.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+        fileAsURI(new File(hdfsDir, "name")).toString());
+    config.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+        fileAsURI(new File(hdfsDir, "name")).toString());
     // same checkpoint dirs
-    config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY, new File(hdfsDir, "chkpt_edits").getPath());
-    config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY, new File(hdfsDir, "chkpt").getPath());
+    config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY,
+        fileAsURI(new File(hdfsDir, "chkpt_edits")).toString());
+    config.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY,
+        fileAsURI(new File(hdfsDir, "chkpt")).toString());
 
     LOG.info("--starting NN ");
     MiniDFSCluster cluster = null;
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
index 77284b0..c67fc4b 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
@@ -33,7 +33,8 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.cli.TestHDFSCLI;
+import org.apache.hadoop.cli.CmdFactoryDFS;
+import org.apache.hadoop.cli.util.CLITestData;
 import org.apache.hadoop.cli.util.CommandExecutor;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -164,7 +165,7 @@
   /**
    * This function returns a md5 hash of a file.
    * 
-   * @param FileToMd5
+   * @param file input file
    * @return The md5 string
    */
   public String getFileMD5(File file) throws Exception {
@@ -189,7 +190,7 @@
   /**
    * read currentCheckpointTime directly from the file
    * @param currDir
-   * @return
+   * @return the checkpoint time
    * @throws IOException
    */
   long readCheckpointTime(File currDir) throws IOException {
@@ -351,25 +352,25 @@
 
       String cmd = "-fs NAMENODE -restoreFailedStorage false";
       String namenode = config.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");
-      CommandExecutor executor = new TestHDFSCLI.DFSAdminCmdExecutor(namenode);
+      CommandExecutor executor = 
+        CmdFactoryDFS.getCommandExecutor(
+          new CLITestData.TestCmd(cmd, CLITestData.TestCmd.CommandType.DFSADMIN),
+          namenode);
       executor.executeCommand(cmd);
       restore = fsi.getRestoreFailedStorage();
-      LOG.info("After set true call restore is " + restore);
-      assertEquals(restore, false);
+      assertFalse("After set true call restore is " + restore, restore);
 
       // run one more time - to set it to true again
       cmd = "-fs NAMENODE -restoreFailedStorage true";
       executor.executeCommand(cmd);
       restore = fsi.getRestoreFailedStorage();
-      LOG.info("After set false call restore is " + restore);
-      assertEquals(restore, true);
+      assertTrue("After set false call restore is " + restore, restore);
       
    // run one more time - no change in value
       cmd = "-fs NAMENODE -restoreFailedStorage check";
       CommandExecutor.Result cmdResult = executor.executeCommand(cmd);
       restore = fsi.getRestoreFailedStorage();
-      LOG.info("After check call restore is " + restore);
-      assertEquals(restore, true);
+      assertTrue("After check call restore is " + restore, restore);
       String commandOutput = cmdResult.getCommandOutput();
       commandOutput.trim();
       assertTrue(commandOutput.contains("restoreFailedStorage is set to true"));
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
index 7feacd7..2dbacf6 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
@@ -39,11 +39,19 @@
  */
 public class TestNameNodeMetrics extends TestCase {
   private static final Configuration CONF = new HdfsConfiguration();
+  private static final int DFS_REPLICATION_INTERVAL = 1;
+  private static final Path TEST_ROOT_DIR_PATH = 
+    new Path(System.getProperty("test.build.data", "build/test/data"));
+  
+  // Number of datanodes in the cluster
+  private static final int DATANODE_COUNT = 3; 
   static {
     CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100);
     CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
-    CONF.setLong("dfs.heartbeat.interval", 1L);
-    CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
+    CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+        DFS_REPLICATION_INTERVAL);
+    CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
+        DFS_REPLICATION_INTERVAL);
   }
   
   private MiniDFSCluster cluster;
@@ -52,9 +60,13 @@
   private Random rand = new Random();
   private FSNamesystem namesystem;
 
+  private static Path getTestPath(String fileName) {
+    return new Path(TEST_ROOT_DIR_PATH, fileName);
+  }
+  
   @Override
   protected void setUp() throws Exception {
-    cluster = new MiniDFSCluster(CONF, 3, true, null);
+    cluster = new MiniDFSCluster(CONF, DATANODE_COUNT, true, null);
     cluster.waitActive();
     namesystem = cluster.getNamesystem();
     fs = (DistributedFileSystem) cluster.getFileSystem();
@@ -67,9 +79,8 @@
   }
   
   /** create a file with a length of <code>fileLen</code> */
-  private void createFile(String fileName, long fileLen, short replicas) throws IOException {
-    Path filePath = new Path(fileName);
-    DFSTestUtil.createFile(fs, filePath, fileLen, replicas, rand.nextLong());
+  private void createFile(Path file, long fileLen, short replicas) throws IOException {
+    DFSTestUtil.createFile(fs, file, fileLen, replicas, rand.nextLong());
   }
 
   private void updateMetrics() throws Exception {
@@ -82,7 +93,7 @@
   /** Test metrics associated with addition of a file */
   public void testFileAdd() throws Exception {
     // Add files with 100 blocks
-    final String file = "/tmp/t";
+    final Path file = getTestPath("testFileAdd");
     createFile(file, 3200, (short)3);
     final int blockCount = 32;
     int blockCapacity = namesystem.getBlockCapacity();
@@ -96,27 +107,37 @@
       blockCapacity <<= 1;
     }
     updateMetrics();
-    assertEquals(3, metrics.filesTotal.get());
+    int filesTotal = file.depth() + 1; // Add 1 for root
+    assertEquals(filesTotal, metrics.filesTotal.get());
     assertEquals(blockCount, metrics.blocksTotal.get());
     assertEquals(blockCapacity, metrics.blockCapacity.get());
-    fs.delete(new Path(file), true);
+    fs.delete(file, true);
+    filesTotal--; // reduce the filecount for deleted file
+    
+    // Wait for more than DATANODE_COUNT replication intervals to ensure all 
+    // the blocks pending deletion are sent for deletion to the datanodes.
+    Thread.sleep(DFS_REPLICATION_INTERVAL * (DATANODE_COUNT + 1) * 1000);
+    updateMetrics();
+    assertEquals(filesTotal, metrics.filesTotal.get());
+    assertEquals(0, metrics.blocksTotal.get());
+    assertEquals(0, metrics.pendingDeletionBlocks.get());
   }
   
   /** Corrupt a block and ensure metrics reflects it */
   public void testCorruptBlock() throws Exception {
     // Create a file with single block with two replicas
-    String file = "/tmp/t";
+    final Path file = getTestPath("testCorruptBlock");
     createFile(file, 100, (short)2);
     
     // Corrupt first replica of the block
     LocatedBlock block = NameNodeAdapter.getBlockLocations(
-        cluster.getNameNode(), file, 0, 1).get(0);
+        cluster.getNameNode(), file.toString(), 0, 1).get(0);
     namesystem.markBlockAsCorrupt(block.getBlock(), block.getLocations()[0]);
     updateMetrics();
     assertEquals(1, metrics.corruptBlocks.get());
     assertEquals(1, metrics.pendingReplicationBlocks.get());
     assertEquals(1, metrics.scheduledReplicationBlocks.get());
-    fs.delete(new Path(file), true);
+    fs.delete(file, true);
     updateMetrics();
     assertEquals(0, metrics.corruptBlocks.get());
     assertEquals(0, metrics.pendingReplicationBlocks.get());
@@ -127,30 +148,29 @@
    * for a file and ensure metrics reflects it
    */
   public void testExcessBlocks() throws Exception {
-    String file = "/tmp/t";
+    Path file = getTestPath("testExcessBlocks");
     createFile(file, 100, (short)2);
     int totalBlocks = 1;
-    namesystem.setReplication(file, (short)1);
+    namesystem.setReplication(file.toString(), (short)1);
     updateMetrics();
     assertEquals(totalBlocks, metrics.excessBlocks.get());
-    assertEquals(totalBlocks, metrics.pendingDeletionBlocks.get());
-    fs.delete(new Path(file), true);
+    fs.delete(file, true);
   }
   
   /** Test to ensure metrics reflects missing blocks */
   public void testMissingBlock() throws Exception {
     // Create a file with single block with two replicas
-    String file = "/tmp/t";
+    Path file = getTestPath("testMissingBlocks");
     createFile(file, 100, (short)1);
     
     // Corrupt the only replica of the block to result in a missing block
     LocatedBlock block = NameNodeAdapter.getBlockLocations(
-        cluster.getNameNode(), file, 0, 1).get(0);
+        cluster.getNameNode(), file.toString(), 0, 1).get(0);
     namesystem.markBlockAsCorrupt(block.getBlock(), block.getLocations()[0]);
     updateMetrics();
     assertEquals(1, metrics.underReplicatedBlocks.get());
     assertEquals(1, metrics.missingBlocks.get());
-    fs.delete(new Path(file), true);
+    fs.delete(file, true);
     updateMetrics();
     assertEquals(0, metrics.underReplicatedBlocks.get());
   }
diff --git a/src/test/hdfs/org/apache/hadoop/security/TestGroupMappingServiceRefresh.java b/src/test/hdfs/org/apache/hadoop/security/TestGroupMappingServiceRefresh.java
new file mode 100644
index 0000000..c4b2285
--- /dev/null
+++ b/src/test/hdfs/org/apache/hadoop/security/TestGroupMappingServiceRefresh.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.hadoop.security.GroupMappingServiceProvider;
+
+public class TestGroupMappingServiceRefresh {
+  private MiniDFSCluster cluster;
+  Configuration config;
+  private static long groupRefreshTimeoutSec = 1;
+  
+  public static class MockUnixGroupsMapping implements GroupMappingServiceProvider {
+    private int i=0;
+    
+    @Override
+    public List<String> getGroups(String user) throws IOException {
+      String g1 = user + (10 * i + 1);
+      String g2 = user + (10 * i + 2);
+      List<String> l = new ArrayList<String>(2);
+      l.add(g1);
+      l.add(g2);
+      i++;
+      return l;
+    }
+  }
+  
+  @Before
+  public void setUp() throws Exception {
+    config = new HdfsConfiguration();
+    config.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+        TestGroupMappingServiceRefresh.MockUnixGroupsMapping.class,
+        GroupMappingServiceProvider.class);
+    config.setLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, 
+        groupRefreshTimeoutSec);
+    
+    FileSystem.setDefaultUri(config, "hdfs://localhost:" + "0");
+    cluster = new MiniDFSCluster(0, config, 1, true, true, true,  null, null, null, null);
+    cluster.waitActive();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if(cluster!=null) {
+      cluster.shutdown();
+    }
+  }
+  
+  @Test
+  public void testGroupMappingRefresh() throws Exception {
+    DFSAdmin admin = new DFSAdmin(config);
+    String [] args =  new String[]{"-refreshUserToGroupsMappings"};
+    Groups groups = SecurityUtil.getUserToGroupsMappingService(config);
+    String user = UnixUserGroupInformation.getUnixUserName();
+    System.out.println("first attempt:");
+    List<String> g1 = groups.getGroups(user);
+    String [] str_groups = new String [g1.size()];
+    g1.toArray(str_groups);
+    System.out.println(Arrays.toString(str_groups));
+    
+    System.out.println("second attempt, should be same:");
+    List<String> g2 = groups.getGroups(user);
+    g2.toArray(str_groups);
+    System.out.println(Arrays.toString(str_groups));
+    for(int i=0; i<g2.size(); i++) {
+      assertEquals("Should be same group ", g1.get(i), g2.get(i));
+    }
+    admin.run(args);
+    System.out.println("third attempt(after refresh command), should be different:");
+    List<String> g3 = groups.getGroups(user);
+    g3.toArray(str_groups);
+    System.out.println(Arrays.toString(str_groups));
+    for(int i=0; i<g3.size(); i++) {
+      assertFalse("Should be different group ", g1.get(i).equals(g3.get(i)));
+    }
+    
+    // test time out
+    Thread.sleep(groupRefreshTimeoutSec*1100);
+    System.out.println("fourth attempt(after timeout), should be different:");
+    List<String> g4 = groups.getGroups(user);
+    g4.toArray(str_groups);
+    System.out.println(Arrays.toString(str_groups));
+    for(int i=0; i<g4.size(); i++) {
+      assertFalse("Should be different group ", g3.get(i).equals(g4.get(i)));
+    }
+  }
+}
diff --git a/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestNNLeaseRecovery.java b/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestNNLeaseRecovery.java
index 6fa0569..f6125b3 100644
--- a/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestNNLeaseRecovery.java
+++ b/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestNNLeaseRecovery.java
@@ -19,15 +19,6 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import static junit.framework.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.io.IOException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
@@ -39,13 +30,20 @@
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
+import static org.junit.Assert.assertFalse;
 import org.junit.Before;
 import org.junit.Test;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.io.IOException;
 
 public class TestNNLeaseRecovery {
   public static final Log LOG = LogFactory.getLog(TestNNLeaseRecovery.class);
@@ -128,8 +126,8 @@
     PermissionStatus ps =
       new PermissionStatus("test", "test", new FsPermission((short)0777));
     
-    mockFileBlocks(null, 
-      HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps);
+    mockFileBlocks(2, null, 
+      HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps, false);
     
     fsn.internalReleaseLease(lm, file.toString(), null);
     assertTrue("FSNamesystem.internalReleaseLease suppose to throw " +
@@ -152,8 +150,52 @@
     PermissionStatus ps =
       new PermissionStatus("test", "test", new FsPermission((short)0777));
 
-    mockFileBlocks(HdfsConstants.BlockUCState.COMMITTED, 
-      HdfsConstants.BlockUCState.COMMITTED, file, dnd, ps);
+    mockFileBlocks(2, HdfsConstants.BlockUCState.COMMITTED, 
+      HdfsConstants.BlockUCState.COMMITTED, file, dnd, ps, false);
+
+    fsn.internalReleaseLease(lm, file.toString(), null);
+    assertTrue("FSNamesystem.internalReleaseLease suppose to throw " +
+      "AlreadyBeingCreatedException here", false);
+  }
+
+  /**
+   * Mocks FSNamesystem instance, adds an empty file with 0 blocks
+   * and invokes lease recovery method. 
+   * 
+   */
+  @Test
+  public void testInternalReleaseLease_0blocks () throws IOException {
+    LOG.debug("Running " + GenericTestUtils.getMethodName());
+    LeaseManager.Lease lm = mock(LeaseManager.Lease.class);
+    Path file = 
+      spy(new Path("/" + GenericTestUtils.getMethodName() + "_test.dat"));
+    DatanodeDescriptor dnd = mock(DatanodeDescriptor.class);
+    PermissionStatus ps =
+      new PermissionStatus("test", "test", new FsPermission((short)0777));
+
+    mockFileBlocks(0, null, null, file, dnd, ps, false);
+
+    assertTrue("True has to be returned in this case",
+      fsn.internalReleaseLease(lm, file.toString(), null));
+  }
+  
+  /**
+   * Mocks FSNamesystem instance, adds an empty file with 1 block
+   * and invokes lease recovery method. 
+   * AlreadyBeingCreatedException is expected.
+   * @throws AlreadyBeingCreatedException as the result
+   */
+  @Test(expected=AlreadyBeingCreatedException.class)
+  public void testInternalReleaseLease_1blocks () throws IOException {
+    LOG.debug("Running " + GenericTestUtils.getMethodName());
+    LeaseManager.Lease lm = mock(LeaseManager.Lease.class);
+    Path file = 
+      spy(new Path("/" + GenericTestUtils.getMethodName() + "_test.dat"));
+    DatanodeDescriptor dnd = mock(DatanodeDescriptor.class);
+    PermissionStatus ps =
+      new PermissionStatus("test", "test", new FsPermission((short)0777));
+
+    mockFileBlocks(1, null, HdfsConstants.BlockUCState.COMMITTED, file, dnd, ps, false);
 
     fsn.internalReleaseLease(lm, file.toString(), null);
     assertTrue("FSNamesystem.internalReleaseLease suppose to throw " +
@@ -176,25 +218,159 @@
     PermissionStatus ps =
       new PermissionStatus("test", "test", new FsPermission((short)0777));
     
-    mockFileBlocks(HdfsConstants.BlockUCState.COMMITTED, 
-      HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps);
+    mockFileBlocks(2, HdfsConstants.BlockUCState.COMMITTED, 
+      HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps, false);
         
     assertFalse("False is expected in return in this case",
       fsn.internalReleaseLease(lm, file.toString(), null));
   }
 
-  private void mockFileBlocks(HdfsConstants.BlockUCState penUltState,
-                           HdfsConstants.BlockUCState lastState,
-                           Path file, DatanodeDescriptor dnd, 
-                           PermissionStatus ps) throws IOException {
+  @Test
+  public void testCommitBlockSynchronization_BlockNotFound () 
+    throws IOException {
+    LOG.debug("Running " + GenericTestUtils.getMethodName());
+    long recoveryId = 2002;
+    long newSize = 273487234;
+    Path file = 
+      spy(new Path("/" + GenericTestUtils.getMethodName() + "_test.dat"));
+    DatanodeDescriptor dnd = mock(DatanodeDescriptor.class);
+    PermissionStatus ps =
+      new PermissionStatus("test", "test", new FsPermission((short)0777));
+    
+    mockFileBlocks(2, HdfsConstants.BlockUCState.COMMITTED, 
+      HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps, false);
+    
+    BlockInfo lastBlock = fsn.dir.getFileINode(anyString()).getLastBlock(); 
+    try {
+      fsn.commitBlockSynchronization(lastBlock,
+        recoveryId, newSize, true, false, new DatanodeID[1]);
+    } catch (IOException ioe) {
+      assertTrue(ioe.getMessage().startsWith("Block (="));
+    }
+  }
+  
+  @Test
+  public void testCommitBlockSynchronization_notUR () 
+    throws IOException {
+    LOG.debug("Running " + GenericTestUtils.getMethodName());
+    long recoveryId = 2002;
+    long newSize = 273487234;
+    Path file = 
+      spy(new Path("/" + GenericTestUtils.getMethodName() + "_test.dat"));
+    DatanodeDescriptor dnd = mock(DatanodeDescriptor.class);
+    PermissionStatus ps =
+      new PermissionStatus("test", "test", new FsPermission((short)0777));
+    
+    mockFileBlocks(2, HdfsConstants.BlockUCState.COMMITTED, 
+      HdfsConstants.BlockUCState.COMPLETE, file, dnd, ps, true);
+    
+    BlockInfo lastBlock = fsn.dir.getFileINode(anyString()).getLastBlock();
+    when(lastBlock.isComplete()).thenReturn(true);
+    
+    try {
+      fsn.commitBlockSynchronization(lastBlock,
+        recoveryId, newSize, true, false, new DatanodeID[1]);
+    } catch (IOException ioe) {
+      assertTrue(ioe.getMessage().startsWith("Unexpected block (="));
+    }
+  }
+  
+  @Test
+  public void testCommitBlockSynchronization_WrongGreaterRecoveryID() 
+    throws IOException {
+    LOG.debug("Running " + GenericTestUtils.getMethodName());
+    long recoveryId = 2002;
+    long newSize = 273487234;
+    Path file = 
+      spy(new Path("/" + GenericTestUtils.getMethodName() + "_test.dat"));
+    DatanodeDescriptor dnd = mock(DatanodeDescriptor.class);
+    PermissionStatus ps =
+      new PermissionStatus("test", "test", new FsPermission((short)0777));
+    
+    mockFileBlocks(2, HdfsConstants.BlockUCState.COMMITTED, 
+      HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps, true);
+    
+    BlockInfo lastBlock = fsn.dir.getFileINode(anyString()).getLastBlock();
+    when(((BlockInfoUnderConstruction)lastBlock).getBlockRecoveryId()).thenReturn(recoveryId-100);
+    
+    try {
+      fsn.commitBlockSynchronization(lastBlock,
+        recoveryId, newSize, true, false, new DatanodeID[1]);
+    } catch (IOException ioe) {
+      assertTrue(ioe.getMessage().startsWith("The recovery id " + recoveryId + " does not match current recovery id " + (recoveryId-100)));
+    }
+  }  
+  
+  @Test
+  public void testCommitBlockSynchronization_WrongLesserRecoveryID() 
+    throws IOException {
+    LOG.debug("Running " + GenericTestUtils.getMethodName());
+    long recoveryId = 2002;
+    long newSize = 273487234;
+    Path file = 
+      spy(new Path("/" + GenericTestUtils.getMethodName() + "_test.dat"));
+    DatanodeDescriptor dnd = mock(DatanodeDescriptor.class);
+    PermissionStatus ps =
+      new PermissionStatus("test", "test", new FsPermission((short)0777));
+    
+    mockFileBlocks(2, HdfsConstants.BlockUCState.COMMITTED, 
+      HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps, true);
+    
+    BlockInfo lastBlock = fsn.dir.getFileINode(anyString()).getLastBlock();
+    when(((BlockInfoUnderConstruction)lastBlock).getBlockRecoveryId()).thenReturn(recoveryId+100);
+    
+    try {           
+      fsn.commitBlockSynchronization(lastBlock,
+        recoveryId, newSize, true, false, new DatanodeID[1]);
+    } catch (IOException ioe) {
+      assertTrue(ioe.getMessage().startsWith("The recovery id " + recoveryId + " does not match current recovery id " + (recoveryId+100)));
+    }
+  }
+
+  @Test
+  public void testCommitBlockSynchronization_EqualRecoveryID() 
+    throws IOException {
+    LOG.debug("Running " + GenericTestUtils.getMethodName());
+    long recoveryId = 2002;
+    long newSize = 273487234;
+    Path file = 
+      spy(new Path("/" + GenericTestUtils.getMethodName() + "_test.dat"));
+    DatanodeDescriptor dnd = mock(DatanodeDescriptor.class);
+    PermissionStatus ps =
+      new PermissionStatus("test", "test", new FsPermission((short)0777));
+    
+    mockFileBlocks(2, HdfsConstants.BlockUCState.COMMITTED, 
+      HdfsConstants.BlockUCState.UNDER_CONSTRUCTION, file, dnd, ps, true);
+    
+    BlockInfo lastBlock = fsn.dir.getFileINode(anyString()).getLastBlock();
+    when(((BlockInfoUnderConstruction)lastBlock).getBlockRecoveryId()).thenReturn(recoveryId);
+    
+    boolean recoveryChecked = false;
+    try {
+      fsn.commitBlockSynchronization(lastBlock,
+        recoveryId, newSize, true, false, new DatanodeID[1]);
+    } catch (NullPointerException ioe) {
+      // It is fine to get NPE here because the datanodes array is empty
+      recoveryChecked = true;
+    }
+    assertTrue("commitBlockSynchronization had to throw NPE here", recoveryChecked);
+  }
+
+  private void mockFileBlocks(int fileBlocksNumber,
+                              HdfsConstants.BlockUCState penUltState,
+                              HdfsConstants.BlockUCState lastState,
+                              Path file, DatanodeDescriptor dnd,
+                              PermissionStatus ps,
+                              boolean setStoredBlock) throws IOException {
     BlockInfo b = mock(BlockInfo.class);
     BlockInfoUnderConstruction b1 = mock(BlockInfoUnderConstruction.class);
     when(b.getBlockUCState()).thenReturn(penUltState);
     when(b1.getBlockUCState()).thenReturn(lastState);
-    BlockInfo[] blocks = new BlockInfo[]{b, b1};
+    BlockInfo[] blocks;
 
     FSDirectory fsDir = mock(FSDirectory.class);
     INodeFileUnderConstruction iNFmock = mock(INodeFileUnderConstruction.class);
+
     fsn.dir = fsDir;
     FSImage fsImage = mock(FSImage.class);
     FSEditLog editLog = mock(FSEditLog.class);
@@ -203,14 +379,34 @@
     when(fsn.getFSImage().getEditLog()).thenReturn(editLog);
     fsn.getFSImage().setFSNamesystem(fsn);
     
+    switch (fileBlocksNumber) {
+      case 0:
+        blocks = new BlockInfo[0];
+        break;
+      case 1:
+        blocks = new BlockInfo[]{b1};
+        when(iNFmock.getLastBlock()).thenReturn(b1);
+        break;
+      default:
+        when(iNFmock.getPenultimateBlock()).thenReturn(b);
+        when(iNFmock.getLastBlock()).thenReturn(b1);
+        blocks = new BlockInfo[]{b, b1};
+    }
+    
     when(iNFmock.getBlocks()).thenReturn(blocks);
-    when(iNFmock.numBlocks()).thenReturn(2);
-    when(iNFmock.getPenultimateBlock()).thenReturn(b);
-    when(iNFmock.getLastBlock()).thenReturn(b1);
+    when(iNFmock.numBlocks()).thenReturn(blocks.length);
     when(iNFmock.isUnderConstruction()).thenReturn(true);
+    when(iNFmock.convertToInodeFile()).thenReturn(iNFmock);    
     fsDir.addFile(file.toString(), ps, (short)3, 1l, "test", 
       "test-machine", dnd, 1001l);
 
+    fsn.leaseManager = mock(LeaseManager.class);
+    fsn.leaseManager.addLease("mock-lease", file.toString());
+    if (setStoredBlock) {
+      when(b1.getINode()).thenReturn(iNFmock);
+      fsn.blockManager.blocksMap.addINode(b1, iNFmock);
+    }
+
     when(fsDir.getFileINode(anyString())).thenReturn(iNFmock);
   }