Merging changes r1090113:r1095461 from trunk to federation


git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1095512 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 923576f..1524c6d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -258,6 +258,16 @@
     HDFS-1813. Federation: Authentication using BlockToken in RPC to datanode 
                fails. (jitendra)
 
+    HDFS_1630. Support fsedits checksum. (hairong)
+
+    HDFS-1606. Provide a stronger data guarantee in the write pipeline by
+    adding a new datanode when an existing datanode failed.  (szetszwo)
+
+    HDFS-1442. Api to get delegation token in Hdfs class. (jitendra)
+
+    HDFS-1070. Speedup namenode image loading and saving by storing only
+    local file names. (hairong)
+
   IMPROVEMENTS
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)
@@ -327,6 +337,25 @@
     HDFS-1767. Namenode ignores non-initial block report from datanodes
     when in safemode during startup. (Matt Foley via suresh)
 
+    HDFS-1817. Move pipeline_Fi_[39-51] from TestFiDataTransferProtocol
+    to TestFiPipelineClose.  (szetszwo)
+
+    HDFS-1760. In FSDirectory.getFullPathName(..), it is better to return "/"
+    for root directory instead of an empty string.  (Daryn Sharp via szetszwo)
+
+    HDFS-1833. Reduce repeated string constructions and unnecessary fields,
+    and fix comments in BlockReceiver.PacketResponder.  (szetszwo)
+
+    HDFS-1486. Generalize CLITest structure and interfaces to faciliate
+    upstream adoption (e.g. for web testing). (cos)
+
+    HDFS-1844. Move "fs -help" shell command tests from HDFS to COMMOM; see
+    also HADOOP-7230.  (Daryn Sharp via szetszwo)
+
+    HDFS-1840. In DFSClient, terminate the lease renewing thread when all files
+    being written are closed for a grace period, and start a new thread when
+    new files are opened for write.  (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
@@ -397,6 +426,18 @@
     HDFS-1543. Reduce dev. cycle time by moving system testing artifacts from
     default build and push to maven for HDFS (Luke Lu via cos)
 
+    HDFS-1818. TestHDFSCLI is failing on trunk after HADOOP-7202.
+    (Aaron T. Myers via todd)
+
+    HDFS-1828. TestBlocksWithNotEnoughRacks intermittently fails assert.
+    (Matt Foley via eli)
+
+    HDFS-1824. delay instantiation of file system object until it is
+     needed (linked to HADOOP-7207) (boryas)
+
+    HDFS-1831. Fix append bug in FileContext and implement CreateFlag
+    check (related to HADOOP-7223). (suresh)
+
 Release 0.22.0 - Unreleased
 
   NEW FEATURES
@@ -853,9 +894,20 @@
 
     HDFS-1781. Fix the path for jsvc in bin/hdfs.  (John George via szetszwo)
 
-    HDFS-1782. Fix an NPE in RFSNamesystem.startFileInternal(..).
+    HDFS-1782. Fix an NPE in FSNamesystem.startFileInternal(..).
     (John George via szetszwo)
 
+    HDFS-1821. Fix username resolution in NameNode.createSymlink(..) and
+    FSDirectory.addSymlink(..).  (John George via szetszwo)
+
+    HDFS-1806. TestBlockReport.blockReport_08() and _09() are timing-dependent
+    and likely to fail on fast servers. (Matt Foley via eli)
+
+    HDFS-1845. Symlink comes up as directory after namenode restart.
+    (John George via eli)
+
+    HDFS-1666. Disable failing hdfsproxy test TestAuthorizationFilter (todd)
+
 Release 0.21.1 - Unreleased
 
     HDFS-1411. Correct backup node startup command in hdfs user guide.
diff --git a/src/contrib/build.xml b/src/contrib/build.xml
index 05e781f..8bbc85e 100644
--- a/src/contrib/build.xml
+++ b/src/contrib/build.xml
@@ -46,9 +46,11 @@
   <!-- Test all the contribs.                               -->
   <!-- ====================================================== -->
   <target name="test">
+    <!-- hdfsproxy tests failing due to HDFS-1666
     <subant target="test">
       <fileset dir="." includes="hdfsproxy/build.xml"/>
     </subant>
+      -->
   </target>
   
   
diff --git a/src/java/hdfs-default.xml b/src/java/hdfs-default.xml
index 03a2c2c..2bb84e6 100644
--- a/src/java/hdfs-default.xml
+++ b/src/java/hdfs-default.xml
@@ -318,6 +318,42 @@
 </property>
 
 <property>
+  <name>dfs.client.block.write.replace-datanode-on-failure.enable</name>
+  <value>ture</value>
+  <description>
+    If there is a datanode/network failure in the write pipeline,
+    DFSClient will try to remove the failed datanode from the pipeline
+    and then continue writing with the remaining datanodes. As a result,
+    the number of datanodes in the pipeline is decreased.  The feature is
+    to add new datanodes to the pipeline.
+
+    This is a site-wise property to enable/disable the feature.
+
+    See also dfs.client.block.write.replace-datanode-on-failure.policy
+  </description>
+</property>
+
+<property>
+  <name>dfs.client.block.write.replace-datanode-on-failure.policy</name>
+  <value>DEFAULT</value>
+  <description>
+    This property is used only if the value of
+    dfs.client.block.write.replace-datanode-on-failure.enable is true.
+
+    ALWAYS: always add a new datanode when an existing datanode is removed.
+    
+    NEVER: never add a new datanode.
+
+    DEFAULT: 
+      Let r be the replication number.
+      Let n be the number of existing datanodes.
+      Add a new datanode only if r is greater than or equal to 3 and either
+      (1) floor(r/2) is greater than or equal to n; or
+      (2) r is greater than n and the block is hflushed/appended.
+  </description>
+</property>
+
+<property>
   <name>dfs.blockreport.intervalMsec</name>
   <value>21600000</value>
   <description>Determines block reporting interval in milliseconds.</description>
diff --git a/src/java/org/apache/hadoop/fs/Hdfs.java b/src/java/org/apache/hadoop/fs/Hdfs.java
index c674b4c..fbcf042 100644
--- a/src/java/org/apache/hadoop/fs/Hdfs.java
+++ b/src/java/org/apache/hadoop/fs/Hdfs.java
@@ -25,6 +25,8 @@
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.List;
+import java.util.NoSuchElementException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -37,8 +39,13 @@
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
 import org.apache.hadoop.util.Progressable;
 
 @InterfaceAudience.Private
@@ -249,7 +256,7 @@
       if (hasNext()) {
         return thisListing.getPartialListing()[i++];
       }
-      throw new java.util.NoSuchElementException("No more entry in " + src);
+      throw new NoSuchElementException("No more entry in " + src);
     }
   }
 
@@ -384,4 +391,43 @@
   public Path getLinkTarget(Path p) throws IOException { 
     return new Path(dfs.getLinkTarget(getUriPath(p)));
   }
+  
+  @Override //AbstractFileSystem
+  public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
+    Token<DelegationTokenIdentifier> result = dfs
+        .getDelegationToken(renewer == null ? null : new Text(renewer));
+    result.setService(new Text(this.getCanonicalServiceName()));
+    List<Token<?>> tokenList = new ArrayList<Token<?>>();
+    tokenList.add(result);
+    return tokenList;
+  }
+
+  /**
+   * Renew an existing delegation token.
+   * 
+   * @param token delegation token obtained earlier
+   * @return the new expiration time
+   * @throws InvalidToken
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public long renewDelegationToken(
+      Token<? extends AbstractDelegationTokenIdentifier> token)
+      throws InvalidToken, IOException {
+    return dfs.renewDelegationToken((Token<DelegationTokenIdentifier>) token);
+  }
+
+  /**
+   * Cancel an existing delegation token.
+   * 
+   * @param token delegation token
+   * @throws InvalidToken
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public void cancelDelegationToken(
+      Token<? extends AbstractDelegationTokenIdentifier> token)
+      throws InvalidToken, IOException {
+    dfs.cancelDelegationToken((Token<DelegationTokenIdentifier>) token);
+  }
 }
diff --git a/src/java/org/apache/hadoop/hdfs/DFSClient.java b/src/java/org/apache/hadoop/hdfs/DFSClient.java
index f528d24..d0c6d97 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -45,6 +45,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
@@ -128,15 +129,16 @@
   private volatile long serverDefaultsLastUpdate;
   static Random r = new Random();
   final String clientName;
-  final LeaseChecker leasechecker = new LeaseChecker();
   Configuration conf;
   long defaultBlockSize;
   private short defaultReplication;
   SocketFactory socketFactory;
   int socketTimeout;
   final int writePacketSize;
+  final DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
   final FileSystem.Statistics stats;
   final int hdfsTimeout;    // timeout value for a DFS operation.
+  final LeaseChecker leasechecker;
 
   /**
    * The locking hierarchy is to first acquire lock on DFSClient object, followed by 
@@ -248,8 +250,11 @@
     this.writePacketSize = 
       conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 
                   DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
+    this.dtpReplaceDatanodeOnFailure = DataTransferProtocol.ReplaceDatanodeOnFailure.get(conf);
+
     // The hdfsTimeout is currently the same as the ipc timeout 
     this.hdfsTimeout = Client.getTimeout(conf);
+    this.leasechecker = new LeaseChecker(hdfsTimeout);
 
     this.ugi = UserGroupInformation.getCurrentUser();
     
@@ -570,8 +575,9 @@
                              int buffersize)
       throws IOException {
     return create(src, FsPermission.getDefault(),
-        overwrite ? EnumSet.of(CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE), 
-        replication, blockSize, progress, buffersize);
+        overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+            : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
+        buffersize);
   }
 
   /**
@@ -637,9 +643,29 @@
   }
   
   /**
+   * Append to an existing file if {@link CreateFlag#APPEND} is present
+   */
+  private OutputStream primitiveAppend(String src, EnumSet<CreateFlag> flag,
+      int buffersize, Progressable progress) throws IOException {
+    if (flag.contains(CreateFlag.APPEND)) {
+      HdfsFileStatus stat = getFileInfo(src);
+      if (stat == null) { // No file to append to
+        // New file needs to be created if create option is present
+        if (!flag.contains(CreateFlag.CREATE)) {
+          throw new FileNotFoundException("failed to append to non-existent file "
+              + src + " on client " + clientName);
+        }
+        return null;
+      }
+      return callAppend(stat, src, buffersize, progress);
+    }
+    return null;
+  }
+  
+  /**
    * Same as {{@link #create(String, FsPermission, EnumSet, short, long,
    *  Progressable, int)} except that the permission
-   *   is absolute (ie has already been masked with umask.
+   *  is absolute (ie has already been masked with umask.
    */
   public OutputStream primitiveCreate(String src, 
                              FsPermission absPermission,
@@ -652,9 +678,13 @@
                              int bytesPerChecksum)
       throws IOException, UnresolvedLinkException {
     checkOpen();
-    OutputStream result = new DFSOutputStream(this, src, absPermission,
-        flag, createParent, replication, blockSize, progress, buffersize,
-        bytesPerChecksum);
+    CreateFlag.validate(flag);
+    OutputStream result = primitiveAppend(src, flag, buffersize, progress);
+    if (result == null) {
+      result = new DFSOutputStream(this, src, absPermission,
+          flag, createParent, replication, blockSize, progress, buffersize,
+          bytesPerChecksum);
+    }
     leasechecker.put(src, result);
     return result;
   }
@@ -696,6 +726,25 @@
     }
   }
 
+  /** Method to get stream returned by append call */
+  private OutputStream callAppend(HdfsFileStatus stat, String src,
+      int buffersize, Progressable progress) throws IOException {
+    LocatedBlock lastBlock = null;
+    try {
+      lastBlock = namenode.append(src, clientName);
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+                                     FileNotFoundException.class,
+                                     SafeModeException.class,
+                                     DSQuotaExceededException.class,
+                                     UnsupportedOperationException.class,
+                                     UnresolvedPathException.class);
+    }
+    return new DFSOutputStream(this, src, buffersize, progress,
+        lastBlock, stat, conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 
+                                     DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
+  }
+  
   /**
    * Append to an existing HDFS file.  
    * 
@@ -709,22 +758,8 @@
   OutputStream append(String src, int buffersize, Progressable progress)
       throws IOException {
     checkOpen();
-    HdfsFileStatus stat = null;
-    LocatedBlock lastBlock = null;
-    try {
-      stat = getFileInfo(src);
-      lastBlock = namenode.append(src, clientName);
-    } catch(RemoteException re) {
-      throw re.unwrapRemoteException(AccessControlException.class,
-                                     FileNotFoundException.class,
-                                     SafeModeException.class,
-                                     DSQuotaExceededException.class,
-                                     UnsupportedOperationException.class,
-                                     UnresolvedPathException.class);
-    }
-    OutputStream result = new DFSOutputStream(this, src, buffersize, progress,
-        lastBlock, stat, conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 
-                                     DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
+    HdfsFileStatus stat = getFileInfo(src);
+    OutputStream result = callAppend(stat, src, buffersize, progress);
     leasechecker.put(src, result);
     return result;
   }
@@ -1325,38 +1360,106 @@
     }
   }
 
-  boolean isLeaseCheckerStarted() {
-    return leasechecker.daemon != null;
-  }
-
   /** Lease management*/
-  class LeaseChecker implements Runnable {
+  class LeaseChecker {
+    static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
+    static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
     /** A map from src -> DFSOutputStream of files that are currently being
      * written by this client.
      */
     private final SortedMap<String, OutputStream> pendingCreates
         = new TreeMap<String, OutputStream>();
+    /** The time in milliseconds that the map became empty. */
+    private long emptyTime = Long.MAX_VALUE;
+    /** A fixed lease renewal time period in milliseconds */
+    private final long renewal;
 
+    /** A daemon for renewing lease */
     private Daemon daemon = null;
-    
+    /** Only the daemon with currentId should run. */
+    private int currentId = 0;
+
+    /** 
+     * A period in milliseconds that the lease renewer thread should run
+     * after the map became empty.
+     * If the map is empty for a time period longer than the grace period,
+     * the renewer should terminate.  
+     */
+    private long gracePeriod;
+    /**
+     * The time period in milliseconds
+     * that the renewer sleeps for each iteration. 
+     */
+    private volatile long sleepPeriod;
+
+    private LeaseChecker(final long timeout) {
+      this.renewal = (timeout > 0 && timeout < LEASE_SOFTLIMIT_PERIOD)? 
+          timeout/2: LEASE_SOFTLIMIT_PERIOD/2;
+      setGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
+    }
+
+    /** Set the grace period and adjust the sleep period accordingly. */
+    void setGraceSleepPeriod(final long gracePeriod) {
+      if (gracePeriod < 100L) {
+        throw new HadoopIllegalArgumentException(gracePeriod
+            + " = gracePeriod < 100ms is too small.");
+      }
+      synchronized(this) {
+        this.gracePeriod = gracePeriod;
+      }
+      final long half = gracePeriod/2;
+      this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT?
+          half: LEASE_RENEWER_SLEEP_DEFAULT;
+    }
+
+    /** Is the daemon running? */
+    synchronized boolean isRunning() {
+      return daemon != null && daemon.isAlive();
+    }
+
+    /** Is the empty period longer than the grace period? */  
+    private synchronized boolean isRenewerExpired() {
+      return emptyTime != Long.MAX_VALUE
+          && System.currentTimeMillis() - emptyTime > gracePeriod;
+    }
+
     synchronized void put(String src, OutputStream out) {
       if (clientRunning) {
-        if (daemon == null) {
-          daemon = new Daemon(this);
+        if (daemon == null || isRenewerExpired()) {
+          //start a new deamon with a new id.
+          final int id = ++currentId;
+          daemon = new Daemon(new Runnable() {
+            @Override
+            public void run() {
+              try {
+                LeaseChecker.this.run(id);
+              } catch(InterruptedException e) {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug(LeaseChecker.this.getClass().getSimpleName()
+                      + " is interrupted.", e);
+                }
+              }
+            }
+          });
           daemon.start();
         }
         pendingCreates.put(src, out);
+        emptyTime = Long.MAX_VALUE;
       }
     }
     
     synchronized void remove(String src) {
       pendingCreates.remove(src);
+      if (pendingCreates.isEmpty() && emptyTime == Long.MAX_VALUE) {
+        //discover the first time that the map is empty.
+        emptyTime = System.currentTimeMillis();
+      }
     }
     
     void interruptAndJoin() throws InterruptedException {
       Daemon daemonCopy = null;
       synchronized (this) {
-        if (daemon != null) {
+        if (isRunning()) {
           daemon.interrupt();
           daemonCopy = daemon;
         }
@@ -1423,37 +1526,30 @@
      * Periodically check in with the namenode and renew all the leases
      * when the lease period is half over.
      */
-    public void run() {
-      long lastRenewed = 0;
-      int renewal = (int)(LEASE_SOFTLIMIT_PERIOD / 2);
-      if (hdfsTimeout > 0) {
-        renewal = Math.min(renewal, hdfsTimeout/2);
-      }
-      while (clientRunning && !Thread.interrupted()) {
-        if (System.currentTimeMillis() - lastRenewed > renewal) {
+    private void run(final int id) throws InterruptedException {
+      for(long lastRenewed = System.currentTimeMillis();
+          clientRunning && !Thread.interrupted();
+          Thread.sleep(sleepPeriod)) {
+        if (System.currentTimeMillis() - lastRenewed >= renewal) {
           try {
             renew();
             lastRenewed = System.currentTimeMillis();
           } catch (SocketTimeoutException ie) {
-            LOG.warn("Problem renewing lease for " + clientName +
-                     " for a period of " + (hdfsTimeout/1000) +
-                     " seconds. Shutting down HDFS client...", ie);
+            LOG.warn("Failed to renew lease for " + clientName + " for "
+                + (renewal/1000) + " seconds.  Aborting ...", ie);
             abort();
             break;
           } catch (IOException ie) {
-            LOG.warn("Problem renewing lease for " + clientName +
-                     " for a period of " + (hdfsTimeout/1000) +
-                     " seconds. Will retry shortly...", ie);
+            LOG.warn("Failed to renew lease for " + clientName + " for "
+                + (renewal/1000) + " seconds.  Will retry shortly ...", ie);
           }
         }
 
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException ie) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(this + " is interrupted.", ie);
+        synchronized(this) {
+          if (id != currentId || isRenewerExpired()) {
+            //no longer the current daemon or expired
+            return;
           }
-          return;
         }
       }
     }
diff --git a/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 8737c4b..25616a6 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -40,6 +40,10 @@
   public static final int     DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
   public static final String  DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
   public static final int     DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
+  public static final String  DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY = "dfs.client.block.write.replace-datanode-on-failure.enable";
+  public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT = true;
+  public static final String  DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY = "dfs.client.block.write.replace-datanode-on-failure.policy";
+  public static final String  DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT";
   
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
diff --git a/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index efc460e..53add30 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -31,8 +31,10 @@
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
@@ -96,9 +98,6 @@
  * starts sending packets from the dataQueue.
 ****************************************************************/
 class DFSOutputStream extends FSOutputSummer implements Syncable {
-  /**
-   * 
-   */
   private final DFSClient dfsClient;
   private Configuration conf;
   private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
@@ -295,10 +294,18 @@
     private BlockConstructionStage stage;  // block construction stage
     private long bytesSent = 0; // number of bytes that've been sent
 
+    /** Nodes have been used in the pipeline before and have failed. */
+    private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
+    /** Has the current block been hflushed? */
+    private boolean isHflushed = false;
+    /** Append on an existing block? */
+    private final boolean isAppend;
+
     /**
      * Default construction for file create
      */
     private DataStreamer() {
+      isAppend = false;
       stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
     }
     
@@ -311,6 +318,7 @@
      */
     private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
         int bytesPerChecksum) throws IOException {
+      isAppend = true;
       stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
       block = lastBlock.getBlock();
       bytesSent = block.getNumBytes();
@@ -750,6 +758,105 @@
       return doSleep;
     }
 
+    private void setHflush() {
+      isHflushed = true;
+    }
+
+    private int findNewDatanode(final DatanodeInfo[] original
+        ) throws IOException {
+      if (nodes.length != original.length + 1) {
+        throw new IOException("Failed to add a datanode:"
+            + " nodes.length != original.length + 1, nodes="
+            + Arrays.asList(nodes) + ", original=" + Arrays.asList(original));
+      }
+      for(int i = 0; i < nodes.length; i++) {
+        int j = 0;
+        for(; j < original.length && !nodes[i].equals(original[j]); j++);
+        if (j == original.length) {
+          return i;
+        }
+      }
+      throw new IOException("Failed: new datanode not found: nodes="
+          + Arrays.asList(nodes) + ", original=" + Arrays.asList(original));
+    }
+
+    private void addDatanode2ExistingPipeline() throws IOException {
+      if (DataTransferProtocol.LOG.isDebugEnabled()) {
+        DataTransferProtocol.LOG.debug("lastAckedSeqno = " + lastAckedSeqno);
+      }
+      /*
+       * Is data transfer necessary?  We have the following cases.
+       * 
+       * Case 1: Failure in Pipeline Setup
+       * - Append
+       *    + Transfer the stored replica, which may be a RBW or a finalized.
+       * - Create
+       *    + If no data, then no transfer is required.
+       *    + If there are data written, transfer RBW. This case may happens 
+       *      when there are streaming failure earlier in this pipeline.
+       *
+       * Case 2: Failure in Streaming
+       * - Append/Create:
+       *    + transfer RBW
+       * 
+       * Case 3: Failure in Close
+       * - Append/Create:
+       *    + no transfer, let NameNode replicates the block.
+       */
+      if (!isAppend && lastAckedSeqno < 0
+          && stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
+        //no data have been written
+        return;
+      } else if (stage == BlockConstructionStage.PIPELINE_CLOSE
+          || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+        //pipeline is closing
+        return;
+      }
+
+      //get a new datanode
+      final DatanodeInfo[] original = nodes;
+      final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
+          src, block, nodes, failed.toArray(new DatanodeInfo[failed.size()]),
+          1, dfsClient.clientName);
+      nodes = lb.getLocations();
+
+      //find the new datanode
+      final int d = findNewDatanode(original);
+
+      //transfer replica
+      final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
+      final DatanodeInfo[] targets = {nodes[d]};
+      transfer(src, targets, lb.getBlockToken());
+    }
+
+    private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
+        final Token<BlockTokenIdentifier> blockToken) throws IOException {
+      //transfer replica to the new datanode
+      Socket sock = null;
+      DataOutputStream out = null;
+      DataInputStream in = null;
+      try {
+        sock = createSocketForPipeline(src, 2, dfsClient);
+        final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
+        out = new DataOutputStream(new BufferedOutputStream(
+            NetUtils.getOutputStream(sock, writeTimeout),
+            DataNode.SMALL_BUFFER_SIZE));
+
+        //send the TRANSFER_BLOCK request
+        DataTransferProtocol.Sender.opTransferBlock(out, block,
+            dfsClient.clientName, targets, blockToken);
+
+        //ack
+        in = new DataInputStream(NetUtils.getInputStream(sock));
+        if (SUCCESS != DataTransferProtocol.Status.read(in)) {
+          throw new IOException("Failed to add a datanode");
+        }
+      } finally {
+        IOUtils.closeStream(in);
+        IOUtils.closeStream(out);
+        IOUtils.closeSocket(sock);
+      }
+    }
 
     /**
      * Open a DataOutputStream to a DataNode pipeline so that 
@@ -793,6 +900,8 @@
           DFSClient.LOG.warn("Error Recovery for block " + block +
               " in pipeline " + pipelineMsg + 
               ": bad datanode " + nodes[errorIndex].getName());
+          failed.add(nodes[errorIndex]);
+
           DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
           System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
           System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
@@ -803,6 +912,12 @@
           errorIndex = -1;
         }
 
+        // Check if replace-datanode policy is satisfied.
+        if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(blockReplication,
+            nodes, isAppend, isHflushed)) {
+          addDatanode2ExistingPipeline();
+        }
+
         // get a new generation stamp and an access token
         LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
         newGS = lb.getBlock().getGenerationStamp();
@@ -888,7 +1003,7 @@
 
       boolean result = false;
       try {
-        s = createSocketForPipeline(nodes, dfsClient);
+        s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
         long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
 
         //
@@ -1026,18 +1141,19 @@
 
   /**
    * Create a socket for a write pipeline
-   * @param datanodes the datanodes on the pipeline 
+   * @param first the first datanode 
+   * @param length the pipeline length
    * @param client
    * @return the socket connected to the first datanode
    */
-  static Socket createSocketForPipeline(final DatanodeInfo[] datanodes,
-      final DFSClient client) throws IOException {
+  static Socket createSocketForPipeline(final DatanodeInfo first,
+      final int length, final DFSClient client) throws IOException {
     if(DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug("Connecting to datanode " + datanodes[0].getName());
+      DFSClient.LOG.debug("Connecting to datanode " + first.getName());
     }
-    final InetSocketAddress isa = NetUtils.createSocketAddr(datanodes[0].getName());
+    final InetSocketAddress isa = NetUtils.createSocketAddr(first.getName());
     final Socket sock = client.socketFactory.createSocket();
-    final int timeout = client.getDatanodeReadTimeout(datanodes.length);
+    final int timeout = client.getDatanodeReadTimeout(length);
     NetUtils.connect(sock, isa, timeout);
     sock.setSoTimeout(timeout);
     sock.setSendBufferSize(DFSClient.DEFAULT_DATA_SOCKET_SIZE);
@@ -1363,6 +1479,12 @@
           throw ioe;
         }
       }
+
+      synchronized(this) {
+        if (streamer != null) {
+          streamer.setHflush();
+        }
+      }
     } catch (InterruptedIOException interrupt) {
       // This kind of error doesn't mean that the stream itself is broken - just the
       // flushing thread got interrupted. So, we shouldn't close down the writer,
@@ -1577,7 +1699,7 @@
   /**
    * Returns the access token currently used by streamer, for testing only
    */
-  Token<BlockTokenIdentifier> getBlockToken() {
+  synchronized Token<BlockTokenIdentifier> getBlockToken() {
     return streamer.getBlockToken();
   }
 
diff --git a/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index a4cb9e3..3e7de73 100644
--- a/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -24,6 +24,7 @@
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -242,9 +243,9 @@
     Progressable progress) throws IOException {
     statistics.incrementWriteOps(1);
     return new FSDataOutputStream(dfs.create(getPathName(f), permission,
-        overwrite ? EnumSet.of(CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE),
-        replication, blockSize, progress, bufferSize),
-        statistics);
+        overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+            : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
+        bufferSize), statistics);
   }
   
   @SuppressWarnings("deprecation")
@@ -266,6 +267,9 @@
       EnumSet<CreateFlag> flag, int bufferSize, short replication,
       long blockSize, Progressable progress) throws IOException {
     statistics.incrementWriteOps(1);
+    if (flag.contains(CreateFlag.OVERWRITE)) {
+      flag.add(CreateFlag.CREATE);
+    }
     return new FSDataOutputStream(dfs.create(getPathName(f), permission, flag,
         false, replication, blockSize, progress, bufferSize), statistics);
   }
@@ -810,6 +814,14 @@
       throws IOException {
     return dfs.getDelegationToken(renewer);
   }
+  
+  @Override // FileSystem
+  public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
+    List<Token<?>> tokenList = new ArrayList<Token<?>>();
+    Token<DelegationTokenIdentifier> token = this.getDelegationToken(renewer);
+    tokenList.add(token);
+    return tokenList;
+  }
 
   /**
    * Renew an existing delegation token.
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index 22e8e04..9db6fa4 100644
--- a/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -67,9 +67,9 @@
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 66: Add block pool ID to Block
+   * 67: Add block pool ID to Block
    */
-  public static final long versionID = 66L;
+  public static final long versionID = 67L;
   
   ///////////////////////////////////////
   // File contents
@@ -298,6 +298,30 @@
       NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
       IOException;
 
+  /** 
+   * Get a datanode for an existing pipeline.
+   * 
+   * @param src the file being written
+   * @param blk the block being written
+   * @param existings the existing nodes in the pipeline
+   * @param excludes the excluded nodes
+   * @param numAdditionalNodes number of additional datanodes
+   * @param clientName the name of the client
+   * 
+   * @return the located block.
+   * 
+   * @throws AccessControlException If access is denied
+   * @throws FileNotFoundException If file <code>src</code> is not found
+   * @throws SafeModeException create not allowed in safemode
+   * @throws UnresolvedLinkException If <code>src</code> contains a symlink
+   * @throws IOException If an I/O error occurred
+   */
+  public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
+      final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
+      final int numAdditionalNodes, final String clientName
+      ) throws AccessControlException, FileNotFoundException,
+          SafeModeException, UnresolvedLinkException, IOException;
+
   /**
    * The client is done writing data to the given filename, and would 
    * like to complete it.  
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java b/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
index a225982..2a4895b 100644
--- a/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
+++ b/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
@@ -25,8 +25,13 @@
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -39,18 +44,18 @@
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public interface DataTransferProtocol {
-  
+  public static final Log LOG = LogFactory.getLog(DataTransferProtocol.class);
   
   /** Version for data transfers between clients and datanodes
    * This should change when serialization of DatanodeInfo, not just
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 22:
+   * Version 23:
    *    Changed the protocol methods to use ExtendedBlock instead
    *    of Block.
    */
-  public static final int DATA_TRANSFER_VERSION = 21;
+  public static final int DATA_TRANSFER_VERSION = 23;
 
   /** Operation */
   public enum Op {
@@ -750,4 +755,93 @@
     }
   }
 
+  /**
+   * The setting of replace-datanode-on-failure feature.
+   */
+  public enum ReplaceDatanodeOnFailure {
+    /** The feature is disabled in the entire site. */
+    DISABLE,
+    /** Never add a new datanode. */
+    NEVER,
+    /**
+     * DEFAULT policy:
+     *   Let r be the replication number.
+     *   Let n be the number of existing datanodes.
+     *   Add a new datanode only if r >= 3 and either
+     *   (1) floor(r/2) >= n; or
+     *   (2) r > n and the block is hflushed/appended.
+     */
+    DEFAULT,
+    /** Always add a new datanode when an existing datanode is removed. */
+    ALWAYS;
+
+    /** Check if the feature is enabled. */
+    public void checkEnabled() {
+      if (this == DISABLE) {
+        throw new UnsupportedOperationException(
+            "This feature is disabled.  Please refer to "
+            + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY
+            + " configuration property.");
+      }
+    }
+
+    /** Is the policy satisfied? */
+    public boolean satisfy(
+        final short replication, final DatanodeInfo[] existings,
+        final boolean isAppend, final boolean isHflushed) {
+      final int n = existings == null? 0: existings.length;
+      if (n == 0 || n >= replication) {
+        //don't need to add datanode for any policy.
+        return false;
+      } else if (this == DISABLE || this == NEVER) {
+        return false;
+      } else if (this == ALWAYS) {
+        return true;
+      } else {
+        //DEFAULT
+        if (replication < 3) {
+          return false;
+        } else {
+          if (n <= (replication/2)) {
+            return true;
+          } else {
+            return isAppend || isHflushed;
+          }
+        }
+      }
+    }
+
+    /** Get the setting from configuration. */
+    public static ReplaceDatanodeOnFailure get(final Configuration conf) {
+      final boolean enabled = conf.getBoolean(
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT);
+      if (!enabled) {
+        return DISABLE;
+      }
+
+      final String policy = conf.get(
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT);
+      for(int i = 1; i < values().length; i++) {
+        final ReplaceDatanodeOnFailure rdof = values()[i];
+        if (rdof.name().equalsIgnoreCase(policy)) {
+          return rdof;
+        }
+      }
+      throw new HadoopIllegalArgumentException("Illegal configuration value for "
+          + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY
+          + ": " + policy);
+    }
+
+    /** Write the setting to configuration. */
+    public void write(final Configuration conf) {
+      conf.setBoolean(
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
+          this != DISABLE);
+      conf.set(
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
+          name());
+    }
+  }
 }
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java b/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
index 69f7117..72d8157 100644
--- a/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
+++ b/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
@@ -88,7 +88,7 @@
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -29;
+  public static final int LAYOUT_VERSION = -31;
   // Current version: 
-  // -29: Adding support for block pools and multiple namenodes
+  // -31: Adding support for block pools and multiple namenodes
 }
diff --git a/src/java/org/apache/hadoop/hdfs/server/common/Storage.java b/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
index 816a266..b02e083 100644
--- a/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
+++ b/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
@@ -79,7 +79,7 @@
   public static final int PRE_RBW_LAYOUT_VERSION = -19;
   
   // last layout version that is before federation
-  public static final int LAST_PRE_FEDERATION_LAYOUT_VERSION = -28;
+  public static final int LAST_PRE_FEDERATION_LAYOUT_VERSION = -30;
   
   private   static final String STORAGE_FILE_LOCK     = "in_use.lock";
   protected static final String STORAGE_FILE_VERSION  = "VERSION";
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 5fba5ec..9f253e1 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -22,6 +22,7 @@
 import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
 
 import java.io.BufferedOutputStream;
+import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
@@ -56,7 +57,7 @@
  * may copies it to another site. If a throttler is provided,
  * streaming throttling is also supported.
  **/
-class BlockReceiver implements java.io.Closeable, FSConstants {
+class BlockReceiver implements Closeable, FSConstants {
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
@@ -652,7 +653,7 @@
       DataInputStream mirrIn,   // input from next datanode
       DataOutputStream replyOut,  // output to previous datanode
       String mirrAddr, DataTransferThrottler throttlerArg,
-      int numTargets) throws IOException {
+      DatanodeInfo[] downstreams) throws IOException {
 
       boolean responderClosed = false;
       mirrorOut = mirrOut;
@@ -662,9 +663,8 @@
     try {
       if (isClient && !isTransfer) {
         responder = new Daemon(datanode.threadGroup, 
-            new PacketResponder(this, block, mirrIn, replyOut, 
-                                numTargets, Thread.currentThread()));
-        responder.start(); // start thread to processes reponses
+            new PacketResponder(replyOut, mirrIn, downstreams));
+        responder.start(); // start thread to processes responses
       }
 
       /* 
@@ -700,8 +700,7 @@
       }
 
     } catch (IOException ioe) {
-      LOG.info("Exception in receiveBlock for block " + block + 
-               " " + ioe);
+      LOG.info("Exception in receiveBlock for " + block, ioe);
       throw ioe;
     } finally {
       if (!responderClosed) { // Abnormal termination of the flow above
@@ -808,51 +807,71 @@
     }
   }
   
+  private static enum PacketResponderType {
+    NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE
+  }
   
   /**
    * Processed responses from downstream datanodes in the pipeline
    * and sends back replies to the originator.
    */
-  class PacketResponder implements Runnable, FSConstants {   
+  class PacketResponder implements Runnable, Closeable, FSConstants {   
 
-    //packet waiting for ack
-    private LinkedList<Packet> ackQueue = new LinkedList<Packet>(); 
+    /** queue for packets waiting for ack */
+    private final LinkedList<Packet> ackQueue = new LinkedList<Packet>(); 
+    /** the thread that spawns this responder */
+    private final Thread receiverThread = Thread.currentThread();
+    /** is this responder running? */
     private volatile boolean running = true;
-    private ExtendedBlock block;
-    DataInputStream mirrorIn;   // input from downstream datanode
-    DataOutputStream replyOut;  // output to upstream datanode
-    private int numTargets;     // number of downstream datanodes including myself
-    private BlockReceiver receiver; // The owner of this responder.
-    private Thread receiverThread; // the thread that spawns this responder
 
+    /** input from the next downstream datanode */
+    private final DataInputStream downstreamIn;
+    /** output to upstream datanode/client */
+    private final DataOutputStream upstreamOut;
+
+    /** The type of this responder */
+    private final PacketResponderType type;
+    /** for log and error messages */
+    private final String myString; 
+
+    @Override
     public String toString() {
-      return "PacketResponder " + numTargets + " for Block " + this.block;
+      return myString;
     }
 
-    PacketResponder(BlockReceiver receiver, ExtendedBlock b, DataInputStream in, 
-                    DataOutputStream out, int numTargets,
-                    Thread receiverThread) {
-      this.receiverThread = receiverThread;
-      this.receiver = receiver;
-      this.block = b;
-      mirrorIn = in;
-      replyOut = out;
-      this.numTargets = numTargets;
+    PacketResponder(final DataOutputStream upstreamOut,
+        final DataInputStream downstreamIn,
+        final DatanodeInfo[] downstreams) {
+      this.downstreamIn = downstreamIn;
+      this.upstreamOut = upstreamOut;
+
+      this.type = downstreams == null? PacketResponderType.NON_PIPELINE
+          : downstreams.length == 0? PacketResponderType.LAST_IN_PIPELINE
+              : PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE;
+
+      final StringBuilder b = new StringBuilder(getClass().getSimpleName())
+          .append(": ").append(block).append(", type=").append(type);
+      if (type != PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
+        b.append(", downstreams=").append(downstreams.length)
+            .append(":").append(Arrays.asList(downstreams));
+      }
+      this.myString = b.toString();
     }
 
     /**
      * enqueue the seqno that is still be to acked by the downstream datanode.
      * @param seqno
      * @param lastPacketInBlock
-     * @param lastByteInPacket
+     * @param offsetInBlock
      */
-    synchronized void enqueue(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
+    synchronized void enqueue(final long seqno,
+        final boolean lastPacketInBlock, final long offsetInBlock) {
       if (running) {
+        final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock);
         if(LOG.isDebugEnabled()) {
-          LOG.debug("PacketResponder " + numTargets + " adding seqno " + seqno +
-                    " to ack queue.");
+          LOG.debug(myString + ": enqueue " + p);
         }
-        ackQueue.addLast(new Packet(seqno, lastPacketInBlock, lastByteInPacket));
+        ackQueue.addLast(p);
         notifyAll();
       }
     }
@@ -860,7 +879,8 @@
     /**
      * wait for all pending packets to be acked. Then shutdown thread.
      */
-    synchronized void close() {
+    @Override
+    public synchronized void close() {
       while (running && ackQueue.size() != 0 && datanode.shouldRun) {
         try {
           wait();
@@ -869,8 +889,7 @@
         }
       }
       if(LOG.isDebugEnabled()) {
-        LOG.debug("PacketResponder " + numTargets +
-                 " for block " + block + " Closing down.");
+        LOG.debug(myString + ": closing");
       }
       running = false;
       notifyAll();
@@ -892,21 +911,21 @@
             PipelineAck ack = new PipelineAck();
             long seqno = PipelineAck.UNKOWN_SEQNO;
             try {
-              if (numTargets != 0 && !mirrorError) {// not the last DN & no mirror error
+              if (type != PacketResponderType.LAST_IN_PIPELINE
+                  && !mirrorError) {
                 // read an ack from downstream datanode
-                ack.readFields(mirrorIn);
+                ack.readFields(downstreamIn);
                 if (LOG.isDebugEnabled()) {
-                  LOG.debug("PacketResponder " + numTargets + " got " + ack);
+                  LOG.debug(myString + " got " + ack);
                 }
                 seqno = ack.getSeqno();
               }
-              if (seqno != PipelineAck.UNKOWN_SEQNO || numTargets == 0) {
+              if (seqno != PipelineAck.UNKOWN_SEQNO
+                  || type == PacketResponderType.LAST_IN_PIPELINE) {
                 synchronized (this) {
                   while (running && datanode.shouldRun && ackQueue.size() == 0) {
                     if (LOG.isDebugEnabled()) {
-                      LOG.debug("PacketResponder " + numTargets + 
-                                " seqno = " + seqno +
-                                " for block " + block +
+                      LOG.debug(myString + ": seqno=" + seqno +
                                 " waiting for local datanode to finish write.");
                     }
                     wait();
@@ -916,11 +935,10 @@
                   }
                   pkt = ackQueue.getFirst();
                   expected = pkt.seqno;
-                  if (numTargets > 0 && seqno != expected) {
-                    throw new IOException("PacketResponder " + numTargets +
-                                          " for block " + block +
-                                          " expected seqno:" + expected +
-                                          " received:" + seqno);
+                  if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
+                      && seqno != expected) {
+                    throw new IOException(myString + "seqno: expected="
+                        + expected + ", received=" + seqno);
                   }
                   lastPacketInBlock = pkt.lastPacketInBlock;
                 }
@@ -935,8 +953,7 @@
                 // notify client of the error
                 // and wait for the client to shut down the pipeline
                 mirrorError = true;
-                LOG.info("PacketResponder " + block + " " + numTargets + 
-                      " Exception " + StringUtils.stringifyException(ioe));
+                LOG.info(myString, ioe);
               }
             }
 
@@ -948,8 +965,7 @@
                * because this datanode has a problem. The upstream datanode
                * will detect that this datanode is bad, and rightly so.
                */
-              LOG.info("PacketResponder " + block +  " " + numTargets +
-                       " : Thread is interrupted.");
+              LOG.info(myString + ": Thread is interrupted.");
               running = false;
               continue;
             }
@@ -957,7 +973,7 @@
             // If this is the last packet in block, then close block
             // file and finalize the block before responding success
             if (lastPacketInBlock) {
-              receiver.close();
+              BlockReceiver.this.close();
               final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
               block.setNumBytes(replicaInfo.getNumBytes());
               datanode.data.finalizeBlock(block);
@@ -967,13 +983,12 @@
                 DatanodeRegistration dnR = 
                   datanode.getDNRegistrationForBP(block.getBlockPoolId());
                 ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
-                      receiver.inAddr, receiver.myAddr, block.getNumBytes(),
-                      "HDFS_WRITE", receiver.clientname, offset,
+                      inAddr, myAddr, block.getNumBytes(),
+                      "HDFS_WRITE", clientname, offset,
                       dnR.getStorageID(), block, endTime-startTime));
               } else {
-                LOG.info("Received block " + block + 
-                         " of size " + block.getNumBytes() + 
-                         " from " + receiver.inAddr);
+                LOG.info("Received block " + block + " of size "
+                    + block.getNumBytes() + " from " + inAddr);
               }
             }
 
@@ -984,7 +999,8 @@
               replies[0] = SUCCESS;
               replies[1] = ERROR;
             } else {
-              short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
+              short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0
+                  : ack.getNumOfReplies();
               replies = new Status[1+ackLen];
               replies[0] = SUCCESS;
               for (int i=0; i<ackLen; i++) {
@@ -994,20 +1010,18 @@
             PipelineAck replyAck = new PipelineAck(expected, replies);
             
             // send my ack back to upstream datanode
-            replyAck.write(replyOut);
-            replyOut.flush();
+            replyAck.write(upstreamOut);
+            upstreamOut.flush();
             if (LOG.isDebugEnabled()) {
-              LOG.debug("PacketResponder " + numTargets + 
-                        " for block " + block +
-                        " responded an ack: " + replyAck);
+              LOG.debug(myString + ", replyAck=" + replyAck);
             }
             if (pkt != null) {
               // remove the packet from the ack queue
               removeAckHead();
               // update bytes acked
               if (replyAck.isSuccess() && 
-                  pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
-                replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+                  pkt.offsetInBlock > replicaInfo.getBytesAcked()) {
+                replicaInfo.setBytesAcked(pkt.offsetInBlock);
               }
             }
         } catch (IOException e) {
@@ -1018,8 +1032,7 @@
             } catch (IOException ioe) {
               LOG.warn("DataNode.checkDiskError failed in run() with: ", ioe);
             }
-            LOG.info("PacketResponder " + block + " " + numTargets + 
-                     " Exception " + StringUtils.stringifyException(e));
+            LOG.info(myString, e);
             running = false;
             if (!Thread.interrupted()) { // failure not caused by interruption
               receiverThread.interrupt();
@@ -1027,15 +1040,13 @@
           }
         } catch (Throwable e) {
           if (running) {
-            LOG.info("PacketResponder " + block + " " + numTargets + 
-                     " Exception " + StringUtils.stringifyException(e));
+            LOG.info(myString, e);
             running = false;
             receiverThread.interrupt();
           }
         }
       }
-      LOG.info("PacketResponder " + numTargets + 
-               " for block " + block + " terminating");
+      LOG.info(myString + " terminating");
     }
     
     /**
@@ -1052,15 +1063,23 @@
   /**
    * This information is cached by the Datanode in the ackQueue.
    */
-  static private class Packet {
-    long seqno;
-    boolean lastPacketInBlock;
-    long lastByteInBlock;
+  private static class Packet {
+    final long seqno;
+    final boolean lastPacketInBlock;
+    final long offsetInBlock;
 
-    Packet(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
+    Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock) {
       this.seqno = seqno;
       this.lastPacketInBlock = lastPacketInBlock;
-      this.lastByteInBlock = lastByteInPacket;
+      this.offsetInBlock = offsetInBlock;
+    }
+
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + "(seqno=" + seqno
+        + ", lastPacketInBlock=" + lastPacketInBlock
+        + ", offsetInBlock=" + offsetInBlock
+        + ")";
     }
   }
 }
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 937e631..a1d7ccf 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1907,8 +1907,9 @@
      */
     DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage,
         final String clientname) throws IOException {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(getClass().getSimpleName() + ": " + b
+      if (DataTransferProtocol.LOG.isDebugEnabled()) {
+        DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
+            + b + " (numBytes=" + b.getNumBytes() + ")"
             + ", stage=" + stage
             + ", clientname=" + clientname
             + ", targests=" + Arrays.asList(targets));
@@ -2573,12 +2574,9 @@
    *          the stored GS and the visible length. 
    * @param targets
    * @param client
-   * @return whether the replica is an RBW
    */
-  boolean transferReplicaForPipelineRecovery(final ExtendedBlock b,
+  void transferReplicaForPipelineRecovery(final ExtendedBlock b,
       final DatanodeInfo[] targets, final String client) throws IOException {
-    checkWriteAccess(b);
-
     final long storedGS;
     final long visible;
     final BlockConstructionStage stage;
@@ -2590,7 +2588,8 @@
       } else if (data.isValidBlock(b)) {
         stage = BlockConstructionStage.TRANSFER_FINALIZED;
       } else {
-        throw new IOException(b + " is not a RBW or a Finalized");
+        final String r = data.getReplicaString(b.getBlockPoolId(), b.getBlockId());
+        throw new IOException(b + " is neither a RBW nor a Finalized, r=" + r);
       }
 
       storedGS = data.getStoredBlock(b.getBlockPoolId(),
@@ -2609,7 +2608,6 @@
     if (targets.length > 0) {
       new DataTransfer(targets, b, stage, client).run();
     }
-    return stage == BlockConstructionStage.TRANSFER_RBW;
   }
 
   // Determine a Datanode's streaming address
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index a6f4d34..ed94f50 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -154,7 +154,7 @@
         datanode.socketWriteTimeout);
     DataOutputStream out = new DataOutputStream(
                  new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
-    checkAccess(out, block, blockToken,
+    checkAccess(out, true, block, blockToken,
         DataTransferProtocol.Op.READ_BLOCK,
         BlockTokenSecretManager.AccessMode.READ);
   
@@ -258,7 +258,7 @@
         new BufferedOutputStream(
             NetUtils.getOutputStream(s, datanode.socketWriteTimeout),
             SMALL_BUFFER_SIZE));
-    checkAccess(isClient? replyOut: null, block, blockToken,
+    checkAccess(replyOut, isClient, block, blockToken,
         DataTransferProtocol.Op.WRITE_BLOCK,
         BlockTokenSecretManager.AccessMode.WRITE);
 
@@ -365,7 +365,7 @@
       if (blockReceiver != null) {
         String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
         blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
-            mirrorAddr, null, targets.length);
+            mirrorAddr, null, targets);
 
         // send close-ack for transfer-RBW/Finalized 
         if (isTransfer) {
@@ -419,13 +419,14 @@
       final ExtendedBlock blk, final String client,
       final DatanodeInfo[] targets,
       final Token<BlockTokenIdentifier> blockToken) throws IOException {
-    final DataOutputStream out = new DataOutputStream(
-        NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
-    checkAccess(out, blk, blockToken,
+    checkAccess(null, true, blk, blockToken,
         DataTransferProtocol.Op.TRANSFER_BLOCK,
         BlockTokenSecretManager.AccessMode.COPY);
 
     updateCurrentThreadName(DataTransferProtocol.Op.TRANSFER_BLOCK + " " + blk);
+
+    final DataOutputStream out = new DataOutputStream(
+        NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
     try {
       datanode.transferReplicaForPipelineRecovery(blk, targets, client);
       SUCCESS.write(out);
@@ -442,7 +443,7 @@
       Token<BlockTokenIdentifier> blockToken) throws IOException {
     final DataOutputStream out = new DataOutputStream(
         NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
-    checkAccess(out, block, blockToken,
+    checkAccess(out, true, block, blockToken,
         DataTransferProtocol.Op.BLOCK_CHECKSUM,
         BlockTokenSecretManager.AccessMode.READ);
     updateCurrentThreadName("Reading metadata for block " + block);
@@ -634,7 +635,7 @@
 
       // receive a block
       blockReceiver.receiveBlock(null, null, null, null, 
-          dataXceiverServer.balanceThrottler, -1);
+          dataXceiverServer.balanceThrottler, null);
                     
       // notify name node
       datanode.notifyNamenodeReceivedBlock(block, sourceID);
@@ -699,7 +700,7 @@
     }
   }
 
-  private void checkAccess(final DataOutputStream out, 
+  private void checkAccess(DataOutputStream out, final boolean reply, 
       final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> t,
       final DataTransferProtocol.Op op,
@@ -709,7 +710,11 @@
         datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode);
       } catch(InvalidToken e) {
         try {
-          if (out != null) {
+          if (reply) {
+            if (out == null) {
+              out = new DataOutputStream(
+                  NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
+            }
             ERROR_ACCESS_TOKEN.write(out);
             if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
               DatanodeRegistration dnR = 
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java b/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
index 03f38e3..200ac7c 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
@@ -2369,6 +2369,12 @@
     return volumeMap.get(bpid, blockId);
   }
 
+  @Override 
+  public synchronized String getReplicaString(String bpid, long blockId) {
+    final Replica r = volumeMap.get(bpid, blockId);
+    return r == null? "null": r.toString();
+  }
+
   @Override // FSDatasetInterface
   public synchronized ReplicaRecoveryInfo initReplicaRecovery(
       RecoveringBlock rBlock) throws IOException {
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java b/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
index 449d153..867eebe 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
@@ -107,6 +107,11 @@
   public Replica getReplica(String bpid, long blockId);
 
   /**
+   * @return replica meta information
+   */
+  public String getReplicaString(String bpid, long blockId);
+
+  /**
    * @return the generation stamp stored with the block.
    */
   public Block getStoredBlock(String bpid, long blkid)
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java b/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
index dba232f..40ae574 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
@@ -17,12 +17,15 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -226,8 +229,15 @@
           // update NameSpace in memory
           backupInputStream.setBytes(data);
           FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
-          logLoader.loadEditRecords(storage.getLayoutVersion(),
-                    backupInputStream.getDataInputStream(), true);
+          int logVersion = storage.getLayoutVersion();
+          BufferedInputStream bin = new BufferedInputStream(backupInputStream);
+          DataInputStream in = new DataInputStream(bin);
+          Checksum checksum = null;
+          if (logVersion <= -28) { // support fsedits checksum
+            checksum = FSEditLog.getChecksum();
+            in = new DataInputStream(new CheckedInputStream(bin, checksum));
+          }
+          logLoader.loadEditRecords(logVersion, in, checksum, true);
           getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
           break;
         case INPROGRESS:
@@ -346,14 +356,21 @@
     if(jSpoolFile.exists()) {
       // load edits.new
       EditLogFileInputStream edits = new EditLogFileInputStream(jSpoolFile);
-      DataInputStream in = edits.getDataInputStream();
+      BufferedInputStream bin = new BufferedInputStream(edits);
+      DataInputStream in = new DataInputStream(bin);
       FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
-      numEdits += logLoader.loadFSEdits(in, false);
+      int logVersion = logLoader.readLogVersion(in);
+      Checksum checksum = null;
+      if (logVersion <= -28) { // support fsedits checksum
+        checksum = FSEditLog.getChecksum();
+        in = new DataInputStream(new CheckedInputStream(bin, checksum));
+      }
+      numEdits += logLoader.loadEditRecords(logVersion, in, checksum, false);
 
       // first time reached the end of spool
       jsState = JSpoolState.WAIT;
-      numEdits += logLoader.loadEditRecords(storage.getLayoutVersion(),
-                                            in, true);
+      numEdits += logLoader.loadEditRecords(logVersion,
+                                            in, checksum, true);
       getFSNamesystem().dir.updateCountForINodeWithQuota();
       edits.close();
     }
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java b/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
index 292598b..38b0866 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
@@ -24,6 +24,7 @@
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.zip.Checksum;
 
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -84,10 +85,18 @@
   /** {@inheritDoc} */
   @Override
   void write(byte op, Writable... writables) throws IOException {
+    int start = bufCurrent.getLength();
     write(op);
     for (Writable w : writables) {
       w.write(bufCurrent);
     }
+    // write transaction checksum
+    int end = bufCurrent.getLength();
+    Checksum checksum = FSEditLog.getChecksum();
+    checksum.reset();
+    checksum.update(bufCurrent.getData(), start, end-start);
+    int sum = (int)checksum.getValue();
+    bufCurrent.writeInt(sum);
   }
 
   /**
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 889e1fd..a0c6e3e 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -1285,9 +1285,10 @@
    * Check whether the path specifies a directory
    */
   boolean isDir(String src) throws UnresolvedLinkException {
+    src = normalizePath(src);
     readLock();
     try {
-      INode node = rootDir.getNode(normalizePath(src), false);
+      INode node = rootDir.getNode(src, false);
       return node != null && node.isDirectory();
     } finally {
       readUnlock();
@@ -1385,6 +1386,12 @@
   /** Return the name of the path represented by inodes at [0, pos] */
   private static String getFullPathName(INode[] inodes, int pos) {
     StringBuilder fullPathName = new StringBuilder();
+    if (inodes[0].isRoot()) {
+      if (pos == 0) return Path.SEPARATOR;
+    } else {
+      fullPathName.append(inodes[0].getLocalName());
+    }
+    
     for (int i=1; i<=pos; i++) {
       fullPathName.append(Path.SEPARATOR_CHAR).append(inodes[i].getLocalName());
     }
@@ -2018,7 +2025,7 @@
         return null;
       }
     }
-    final String userName = UserGroupInformation.getCurrentUser().getUserName();
+    final String userName = dirPerms.getUserName();
     INodeSymlink newNode = unprotectedSymlink(path, target, modTime, modTime,
       new PermissionStatus(userName, null, FsPermission.getDefault()));         
     if (newNode == null) {
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index 9ec33eb..b8e7930 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.zip.Checksum;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,6 +48,7 @@
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.PureJavaCrc32;
 
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
 
@@ -90,6 +92,18 @@
 
   private NNStorage storage;
 
+  private static ThreadLocal<Checksum> localChecksum =
+    new ThreadLocal<Checksum>() {
+    protected Checksum initialValue() {
+      return new PureJavaCrc32();
+    }
+  };
+
+  /** Get a thread local checksum */
+  public static Checksum getChecksum() {
+    return localChecksum.get();
+  }
+
   private static class TransactionId {
     public long txid;
 
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index c442920..2d70237 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -17,12 +17,16 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.BufferedInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.EOFException;
 import java.io.IOException;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
 
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
@@ -54,42 +58,62 @@
    * along.
    */
   int loadFSEdits(EditLogInputStream edits) throws IOException {
-    DataInputStream in = edits.getDataInputStream();
     long startTime = now();
-    int numEdits = loadFSEdits(in, true);
+    int numEdits = loadFSEdits(edits, true);
     FSImage.LOG.info("Edits file " + edits.getName() 
         + " of size " + edits.length() + " edits # " + numEdits 
         + " loaded in " + (now()-startTime)/1000 + " seconds.");
     return numEdits;
   }
 
-  int loadFSEdits(DataInputStream in, boolean closeOnExit) throws IOException {
+  /**
+   * Read the header of fsedit log
+   * @param in fsedit stream
+   * @return the edit log version number
+   * @throws IOException if error occurs
+   */
+  int readLogVersion(DataInputStream in) throws IOException {
+    int logVersion = 0;
+    // Read log file version. Could be missing. 
+    in.mark(4);
+    // If edits log is greater than 2G, available method will return negative
+    // numbers, so we avoid having to call available
+    boolean available = true;
+    try {
+      logVersion = in.readByte();
+    } catch (EOFException e) {
+      available = false;
+    }
+    if (available) {
+      in.reset();
+      logVersion = in.readInt();
+      if (logVersion < FSConstants.LAYOUT_VERSION) // future version
+        throw new IOException(
+            "Unexpected version of the file system log file: "
+            + logVersion + ". Current version = " 
+            + FSConstants.LAYOUT_VERSION + ".");
+    }
+    assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
+      "Unsupported version " + logVersion;
+    return logVersion;
+  }
+  
+  int loadFSEdits(EditLogInputStream edits, boolean closeOnExit) throws IOException {
+    BufferedInputStream bin = new BufferedInputStream(edits);
+    DataInputStream in = new DataInputStream(bin);
+    
     int numEdits = 0;
     int logVersion = 0;
 
     try {
-      // Read log file version. Could be missing. 
-      in.mark(4);
-      // If edits log is greater than 2G, available method will return negative
-      // numbers, so we avoid having to call available
-      boolean available = true;
-      try {
-        logVersion = in.readByte();
-      } catch (EOFException e) {
-        available = false;
+      logVersion = readLogVersion(in);
+      Checksum checksum = null;
+      if (logVersion <= -28) { // support fsedits checksum
+        checksum = FSEditLog.getChecksum();
+        in = new DataInputStream(new CheckedInputStream(bin, checksum));
       }
-      if (available) {
-        in.reset();
-        logVersion = in.readInt();
-        if (logVersion < FSConstants.LAYOUT_VERSION) // future version
-          throw new IOException(
-                          "Unexpected version of the file system log file: "
-                          + logVersion + ". Current version = " 
-                          + FSConstants.LAYOUT_VERSION + ".");
-      }
-      assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
-                            "Unsupported version " + logVersion;
-      numEdits = loadEditRecords(logVersion, in, false);
+
+      numEdits = loadEditRecords(logVersion, in, checksum, false);
     } finally {
       if(closeOnExit)
         in.close();
@@ -101,7 +125,7 @@
 
   @SuppressWarnings("deprecation")
   int loadEditRecords(int logVersion, DataInputStream in,
-      boolean closeOnExit) throws IOException {
+      Checksum checksum, boolean closeOnExit) throws IOException {
     FSDirectory fsDir = fsNamesys.dir;
     int numEdits = 0;
     String clientName = null;
@@ -123,6 +147,9 @@
         long blockSize = 0;
         FSEditLogOpCodes opCode;
         try {
+          if (checksum != null) {
+            checksum.reset();
+          }
           in.mark(1);
           byte opCodeByte = in.readByte();
           opCode = FSEditLogOpCodes.fromByte(opCodeByte);
@@ -480,6 +507,7 @@
           throw new IOException("Never seen opCode " + opCode);
         }
         }
+        validateChecksum(in, checksum, numEdits);
       }
     } finally {
       if(closeOnExit)
@@ -505,6 +533,22 @@
     return numEdits;
   }
 
+  /**
+   * Validate a transaction's checksum
+   */
+  private static void validateChecksum(
+      DataInputStream in, Checksum checksum, int tid)
+  throws IOException {
+    if (checksum != null) {
+      int calculatedChecksum = (int)checksum.getValue();
+      int readChecksum = in.readInt(); // read in checksum
+      if (readChecksum != calculatedChecksum) {
+        throw new ChecksumException(
+            "Transaction " + tid + " is corrupt. Calculated checksum is " +
+            calculatedChecksum + " but read checksum " + readChecksum, tid);
+      }
+    }
+  }
 
   /**
    * A class to read in blocks stored in the old format. The only two
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
index ddb64f9..5e38f2b 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
@@ -30,6 +30,7 @@
 import java.security.DigestOutputStream;
 import java.security.MessageDigest;
 import java.util.Arrays;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -179,7 +180,11 @@
 
         // load all inodes
         LOG.info("Number of files = " + numFiles);
-        loadFullNameINodes(numFiles, in);
+        if (imgVersion <= -30) {
+          loadLocalNameINodes(numFiles, in);
+        } else {
+          loadFullNameINodes(numFiles, in);
+        }
 
         // load datanode info
         this.loadDatanodes(in);
@@ -215,6 +220,64 @@
     fsDir.rootDir.setPermissionStatus(root.getPermissionStatus());    
   }
 
+  /** 
+   * load fsimage files assuming only local names are stored
+   *   
+   * @param numFiles number of files expected to be read
+   * @param in image input stream
+   * @throws IOException
+   */  
+   private void loadLocalNameINodes(long numFiles, DataInputStream in) 
+   throws IOException {
+     assert imgVersion <= -30; // -30: store only local name in image
+     assert numFiles > 0;
+
+     // load root
+     if( in.readShort() != 0) {
+       throw new IOException("First node is not root");
+     }   
+     INode root = loadINode(in);
+     // update the root's attributes
+     updateRootAttr(root);
+     numFiles--;
+
+     // load rest of the nodes directory by directory
+     while (numFiles > 0) {
+       numFiles -= loadDirectory(in);
+     }
+     if (numFiles != 0) {
+       throw new IOException("Read unexpect number of files: " + -numFiles);
+     }
+   }
+   
+   /**
+    * Load all children of a directory
+    * 
+    * @param in
+    * @return number of child inodes read
+    * @throws IOException
+    */
+   private int loadDirectory(DataInputStream in) throws IOException {
+     String parentPath = FSImageSerialization.readString(in);
+     FSDirectory fsDir = namesystem.dir;
+     INode parent = fsDir.rootDir.getNode(parentPath, true);
+     if (parent == null || !parent.isDirectory()) {
+       throw new IOException("Path " + parentPath + "is not a directory.");
+     }
+
+     int numChildren = in.readInt();
+     for(int i=0; i<numChildren; i++) {
+       // load single inode
+       byte[] localName = new byte[in.readShort()];
+       in.readFully(localName); // read local name
+       INode newNode = loadINode(in); // read rest of inode
+
+       // add to parent
+       namesystem.dir.addToParent(localName, (INodeDirectory)parent, newNode, false);
+     }
+     return numChildren;
+   }
+
   /**
    * load fsimage files assuming full path names are stored
    * 
@@ -485,9 +548,10 @@
         byte[] byteStore = new byte[4*FSConstants.MAX_PATH_LENGTH];
         ByteBuffer strbuf = ByteBuffer.wrap(byteStore);
         // save the root
-        FSImageSerialization.saveINode2Image(strbuf, fsDir.rootDir, out);
+        FSImageSerialization.saveINode2Image(fsDir.rootDir, out);
         // save the rest of the nodes
-        saveImage(strbuf, 0, fsDir.rootDir, out);
+        saveImage(strbuf, fsDir.rootDir, out);
+        // save files under construction
         sourceNamesystem.saveFilesUnderConstruction(out);
         sourceNamesystem.saveSecretManagerState(out);
         strbuf = null;
@@ -511,28 +575,33 @@
      * This is a recursive procedure, which first saves all children of
      * a current directory and then moves inside the sub-directories.
      */
-    private static void saveImage(ByteBuffer parentPrefix,
-                                  int prefixLength,
+    private static void saveImage(ByteBuffer currentDirName,
                                   INodeDirectory current,
                                   DataOutputStream out) throws IOException {
-      int newPrefixLength = prefixLength;
-      if (current.getChildrenRaw() == null)
+      List<INode> children = current.getChildrenRaw();
+      if (children == null || children.isEmpty())
         return;
-      for(INode child : current.getChildren()) {
-        // print all children first
-        parentPrefix.position(prefixLength);
-        parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
-        FSImageSerialization.saveINode2Image(parentPrefix, child, out);
+      // print prefix (parent directory name)
+      int prefixLen = currentDirName.position();
+      if (prefixLen == 0) {  // root
+        out.writeShort(PATH_SEPARATOR.length);
+        out.write(PATH_SEPARATOR);
+      } else {  // non-root directories
+        out.writeShort(prefixLen);
+        out.write(currentDirName.array(), 0, prefixLen);
       }
-      for(INode child : current.getChildren()) {
+      out.writeInt(children.size());
+      for(INode child : children) {
+        // print all children first
+        FSImageSerialization.saveINode2Image(child, out);
+      }
+      for(INode child : children) {
         if(!child.isDirectory())
           continue;
-        parentPrefix.position(prefixLength);
-        parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
-        newPrefixLength = parentPrefix.position();
-        saveImage(parentPrefix, newPrefixLength, (INodeDirectory)child, out);
+        currentDirName.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
+        saveImage(currentDirName, (INodeDirectory)child, out);
+        currentDirName.position(prefixLen);
       }
-      parentPrefix.position(prefixLength);
     }
   }
 }
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
index 14b313e..d993ae6 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
@@ -145,12 +145,11 @@
   /*
    * Save one inode's attributes to the image.
    */
-  static void saveINode2Image(ByteBuffer name,
-                              INode node,
+  static void saveINode2Image(INode node,
                               DataOutputStream out) throws IOException {
-    int nameLen = name.position();
-    out.writeShort(nameLen);
-    out.write(name.array(), name.arrayOffset(), nameLen);
+    byte[] name = node.getLocalNameBytes();
+    out.writeShort(name.length);
+    out.write(name);
     FsPermission filePerm = TL_DATA.get().FILE_PERM;
     if (node.isDirectory()) {
       out.writeShort(0);  // replication
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 983e937..bac6fcc 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -259,6 +259,8 @@
   private FsServerDefaults serverDefaults;
   // allow appending to hdfs files
   private boolean supportAppends = true;
+  private DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure = 
+      DataTransferProtocol.ReplaceDatanodeOnFailure.DEFAULT;
 
   private volatile SafeModeInfo safeMode;  // safe mode information
   private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
@@ -529,6 +531,8 @@
         + " blockKeyUpdateInterval=" + blockKeyUpdateInterval / (60 * 1000)
         + " min(s), blockTokenLifetime=" + blockTokenLifetime / (60 * 1000)
         + " min(s)");
+
+    this.dtpReplaceDatanodeOnFailure = DataTransferProtocol.ReplaceDatanodeOnFailure.get(conf);
   }
 
   /**
@@ -1329,22 +1333,16 @@
       long blockSize) throws SafeModeException, FileAlreadyExistsException,
       AccessControlException, UnresolvedLinkException, FileNotFoundException,
       ParentNotDirectoryException, IOException {
-    writeLock();
-    try {
-    boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
-    boolean append = flag.contains(CreateFlag.APPEND);
-    boolean create = flag.contains(CreateFlag.CREATE);
-
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
           + ", holder=" + holder
           + ", clientMachine=" + clientMachine
           + ", createParent=" + createParent
           + ", replication=" + replication
-          + ", overwrite=" + overwrite
-          + ", append=" + append);
+          + ", createFlag=" + flag.toString());
     }
-
+    writeLock();
+    try {
     if (isInSafeMode())
       throw new SafeModeException("Cannot create file" + src, safeMode);
     if (!DFSUtil.isValidName(src)) {
@@ -1354,14 +1352,16 @@
     // Verify that the destination does not exist as a directory already.
     boolean pathExists = dir.exists(src);
     if (pathExists && dir.isDir(src)) {
-      throw new FileAlreadyExistsException("Cannot create file "+ src + "; already exists as a directory.");
+      throw new FileAlreadyExistsException("Cannot create file " + src
+          + "; already exists as a directory.");
     }
 
+    boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
+    boolean append = flag.contains(CreateFlag.APPEND);
     if (isPermissionEnabled) {
       if (append || (overwrite && pathExists)) {
         checkPathAccess(src, FsAction.WRITE);
-      }
-      else {
+      } else {
         checkAncestorAccess(src, FsAction.WRITE);
       }
     }
@@ -1434,34 +1434,27 @@
       } catch(IOException e) {
         throw new IOException("failed to create "+e.getMessage());
       }
-      if (append) {
-        if (myFile == null) {
-          if(!create)
-            throw new FileNotFoundException("failed to append to non-existent file "
-              + src + " on client " + clientMachine);
-          else {
-            //append & create a nonexist file equals to overwrite
-            return startFileInternal(src, permissions, holder, clientMachine,
-                EnumSet.of(CreateFlag.OVERWRITE), createParent, replication, blockSize);
-          }
-        } else if (myFile.isDirectory()) {
-          throw new IOException("failed to append to directory " + src 
-                                +" on client " + clientMachine);
+      boolean create = flag.contains(CreateFlag.CREATE);
+      if (myFile == null) {
+        if (!create) {
+          throw new FileNotFoundException("failed to overwrite or append to non-existent file "
+            + src + " on client " + clientMachine);
         }
-      } else if (!dir.isValidToCreate(src)) {
+      } else {
+        // File exists - must be one of append or overwrite
         if (overwrite) {
           delete(src, true);
-        } else {
-          throw new IOException("failed to create file " + src 
-                                +" on client " + clientMachine
-                                +" either because the filename is invalid or the file exists");
+        } else if (!append) {
+          throw new FileAlreadyExistsException("failed to create file " + src
+              + " on client " + clientMachine
+              + " because the file exists");
         }
       }
 
       DatanodeDescriptor clientNode = 
         host2DataNodeMap.getDatanodeByHost(clientMachine);
 
-      if (append) {
+      if (append && myFile != null) {
         //
         // Replace current node with a INodeUnderConstruction.
         // Recreate in-memory lease record.
@@ -1663,6 +1656,53 @@
     return b;
   }
 
+  /** @see NameNode#getAdditionalDatanode(String, ExtendedBlock, DatanodeInfo[], DatanodeInfo[], int, String) */
+  LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
+      final DatanodeInfo[] existings,  final HashMap<Node, Node> excludes,
+      final int numAdditionalNodes, final String clientName
+      ) throws IOException {
+    //check if the feature is enabled
+    dtpReplaceDatanodeOnFailure.checkEnabled();
+
+    final DatanodeDescriptor clientnode;
+    final long preferredblocksize;
+    readLock();
+    try {
+      //check safe mode
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot add datanode; src=" + src
+            + ", blk=" + blk, safeMode);
+      }
+
+      //check lease
+      final INodeFileUnderConstruction file = checkLease(src, clientName);
+      clientnode = file.getClientNode();
+      preferredblocksize = file.getPreferredBlockSize();
+    } finally {
+      readUnlock();
+    }
+
+    //find datanode descriptors
+    final List<DatanodeDescriptor> chosen = new ArrayList<DatanodeDescriptor>();
+    for(DatanodeInfo d : existings) {
+      final DatanodeDescriptor descriptor = getDatanode(d);
+      if (descriptor != null) {
+        chosen.add(descriptor);
+      }
+    }
+
+    // choose new datanodes.
+    final DatanodeInfo[] targets = blockManager.replicator.chooseTarget(
+        src, numAdditionalNodes, clientnode, chosen, true,
+        excludes, preferredblocksize);
+    final LocatedBlock lb = new LocatedBlock(blk, targets);
+    if (isBlockTokenEnabled) {
+      lb.setBlockToken(blockTokenSecretManager.generateToken(lb.getBlock(), 
+          EnumSet.of(BlockTokenSecretManager.AccessMode.COPY)));
+    }
+    return lb;
+  }
+
   /**
    * The client would like to let go of the given block
    */
@@ -2691,7 +2731,6 @@
    * Get registrationID for datanodes based on the namespaceID.
    * 
    * @see #registerDatanode(DatanodeRegistration)
-   * @see FSImage#newNamespaceID()
    * @return registration ID
    */
   public String getRegistrationID() {
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java b/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
index 3133734..2faa430 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
@@ -233,12 +233,8 @@
 
 
   String getLocalParentDir() {
-    INode p_node=getParent();
-
-    if(p_node == null)
-      return "/";
-    else
-      return p_node.getFullPathName();
+    INode inode = isRoot() ? this : getParent();
+    return (inode != null) ? inode.getFullPathName() : "";
   }
 
   /**
@@ -271,12 +267,7 @@
 
   /** {@inheritDoc} */
   public String toString() {
-    String i_path=getFullPathName();
-
-    if(i_path.length() == 0)
-      i_path="/";
-
-    return "\"" + i_path + "\":"
+    return "\"" + getFullPathName() + "\":"
     + getUserName() + ":" + getGroupName() + ":"
     + (isDirectory()? "d": "-") + getFsPermission();
   }
@@ -470,7 +461,9 @@
                         long nsQuota,
                         long dsQuota,
                         long preferredBlockSize) {
-    if (blocks == null) {
+    if (symlink.length() != 0) { // check if symbolic link
+      return new INodeSymlink(symlink, modificationTime, atime, permissions);
+    }  else if (blocks == null) { //not sym link and blocks null? directory!
       if (nsQuota >= 0 || dsQuota >= 0) {
         return new INodeDirectoryWithQuota(
             permissions, modificationTime, nsQuota, dsQuota);
@@ -478,10 +471,6 @@
       // regular directory
       return new INodeDirectory(permissions, modificationTime);
     }
-    // check if symbolic link
-    if (symlink.length() != 0) {
-      return new INodeSymlink(symlink, modificationTime, atime, permissions);
-    } 
     // file
     return new INodeFile(permissions, blocks, replication,
         modificationTime, atime, preferredBlockSize);
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index 12f416c..63ed01a 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -17,13 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -846,6 +845,33 @@
     return locatedBlock;
   }
 
+  @Override
+  public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
+      final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
+      final int numAdditionalNodes, final String clientName
+      ) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getAdditionalDatanode: src=" + src
+          + ", blk=" + blk
+          + ", existings=" + Arrays.asList(existings)
+          + ", excludes=" + Arrays.asList(excludes)
+          + ", numAdditionalNodes=" + numAdditionalNodes
+          + ", clientName=" + clientName);
+    }
+
+    myMetrics.numGetAdditionalDatanodeOps.inc();
+
+    HashMap<Node, Node> excludeSet = null;
+    if (excludes != null) {
+      excludeSet = new HashMap<Node, Node>(excludes.length);
+      for (Node node : excludes) {
+        excludeSet.put(node, node);
+      }
+    }
+    return namesystem.getAdditionalDatanode(src, blk,
+        existings, excludeSet, numAdditionalNodes, clientName);
+  }
+
   /**
    * The client needs to give up on the block.
    */
@@ -1202,7 +1228,7 @@
     }
     final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     namesystem.createSymlink(target, link,
-      new PermissionStatus(ugi.getUserName(), null, dirPerms), createParent);
+      new PermissionStatus(ugi.getShortUserName(), null, dirPerms), createParent);
   }
 
   /** @inheritDoc */
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java b/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
index 2b589d2..259615c 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
@@ -72,6 +72,8 @@
                           new MetricsTimeVaryingInt("FileInfoOps", registry);
     public MetricsTimeVaryingInt numAddBlockOps = 
                           new MetricsTimeVaryingInt("AddBlockOps", registry);
+    public final MetricsTimeVaryingInt numGetAdditionalDatanodeOps
+        = new MetricsTimeVaryingInt("GetAdditionalDatanodeOps", registry);
     public MetricsTimeVaryingInt numcreateSymlinkOps = 
                           new MetricsTimeVaryingInt("CreateSymlinkOps", registry);
     public MetricsTimeVaryingInt numgetLinkTargetOps = 
diff --git a/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
index b7f7fb4..4c3d0e0 100644
--- a/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
+++ b/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
@@ -269,13 +269,21 @@
     super(conf);
   }
   
+  protected DistributedFileSystem getDFS() throws IOException {
+    FileSystem fs = getFS();
+    if (!(fs instanceof DistributedFileSystem)) {
+      throw new IllegalArgumentException("FileSystem " + fs.getUri() + 
+      " is not a distributed file system");
+    }
+    return (DistributedFileSystem)fs;
+  }
+  
   /**
    * Gives a report on how the FileSystem is doing.
    * @exception IOException if the filesystem does not exist.
    */
   public void report() throws IOException {
-    if (fs instanceof DistributedFileSystem) {
-      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      DistributedFileSystem dfs = getDFS();
       FsStatus ds = dfs.getStatus();
       long capacity = ds.getCapacity();
       long used = ds.getUsed();
@@ -342,7 +350,6 @@
           System.out.println();
         }     
       }
-    }
   }
 
   /**
@@ -353,10 +360,6 @@
    * @exception IOException if the filesystem does not exist.
    */
   public void setSafeMode(String[] argv, int idx) throws IOException {
-    if (!(fs instanceof DistributedFileSystem)) {
-      System.err.println("FileSystem is " + fs.getUri());
-      return;
-    }
     if (idx != argv.length - 1) {
       printUsage("-safemode");
       return;
@@ -377,7 +380,7 @@
       printUsage("-safemode");
       return;
     }
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DistributedFileSystem dfs = getDFS();
     boolean inSafeMode = dfs.setSafeMode(action);
 
     //
@@ -407,12 +410,7 @@
   public int saveNamespace() throws IOException {
     int exitCode = -1;
 
-    if (!(fs instanceof DistributedFileSystem)) {
-      System.err.println("FileSystem is " + fs.getUri());
-      return exitCode;
-    }
-
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DistributedFileSystem dfs = getDFS();
     dfs.saveNamespace();
     exitCode = 0;
    
@@ -428,17 +426,12 @@
   public int restoreFaileStorage(String arg) throws IOException {
     int exitCode = -1;
 
-    if (!(fs instanceof DistributedFileSystem)) {
-      System.err.println("FileSystem is " + fs.getUri());
-      return exitCode;
-    }
-
     if(!arg.equals("check") && !arg.equals("true") && !arg.equals("false")) {
       System.err.println("restoreFailedStorage valid args are true|false|check");
       return exitCode;
     }
     
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DistributedFileSystem dfs = getDFS();
     Boolean res = dfs.restoreFailedStorage(arg);
     System.out.println("restoreFailedStorage is set to " + res);
     exitCode = 0;
@@ -455,12 +448,7 @@
   public int refreshNodes() throws IOException {
     int exitCode = -1;
 
-    if (!(fs instanceof DistributedFileSystem)) {
-      System.err.println("FileSystem is " + fs.getUri());
-      return exitCode;
-    }
-
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DistributedFileSystem dfs = getDFS();
     dfs.refreshNodes();
     exitCode = 0;
    
@@ -636,18 +624,10 @@
    * @exception IOException 
    */
   public int finalizeUpgrade() throws IOException {
-    int exitCode = -1;
-
-    if (!(fs instanceof DistributedFileSystem)) {
-      System.out.println("FileSystem is " + fs.getUri());
-      return exitCode;
-    }
-
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DistributedFileSystem dfs = getDFS();
     dfs.finalizeUpgrade();
-    exitCode = 0;
-   
-    return exitCode;
+    
+    return 0;
   }
 
   /**
@@ -658,10 +638,7 @@
    * @exception IOException 
    */
   public int upgradeProgress(String[] argv, int idx) throws IOException {
-    if (!(fs instanceof DistributedFileSystem)) {
-      System.out.println("FileSystem is " + fs.getUri());
-      return -1;
-    }
+    
     if (idx != argv.length - 1) {
       printUsage("-upgradeProgress");
       return -1;
@@ -679,7 +656,7 @@
       return -1;
     }
 
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DistributedFileSystem dfs = getDFS();
     UpgradeStatusReport status = dfs.distributedUpgradeProgress(action);
     String statusText = (status == null ? 
         "There are no upgrades in progress." :
@@ -698,7 +675,7 @@
    */
   public int metaSave(String[] argv, int idx) throws IOException {
     String pathname = argv[idx];
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DistributedFileSystem dfs = getDFS();
     dfs.metaSave(pathname);
     System.out.println("Created file " + pathname + " on server " +
                        dfs.getUri());
@@ -713,8 +690,7 @@
    * @throws IOException If an error while getting datanode report
    */
   public int printTopology() throws IOException {
-    if (fs instanceof DistributedFileSystem) {
-      DistributedFileSystem dfs = (DistributedFileSystem)fs;
+      DistributedFileSystem dfs = getDFS();
       DFSClient client = dfs.getClient();
       DatanodeInfo[] report = client.datanodeReport(DatanodeReportType.ALL);
       
@@ -749,7 +725,6 @@
 
         System.out.println();
       }
-    }
     return 0;
   }
   
@@ -1052,13 +1027,13 @@
       } else if ("-metasave".equals(cmd)) {
         exitCode = metaSave(argv, i);
       } else if (ClearQuotaCommand.matches(cmd)) {
-        exitCode = new ClearQuotaCommand(argv, i, fs).runAll();
+        exitCode = new ClearQuotaCommand(argv, i, getDFS()).runAll();
       } else if (SetQuotaCommand.matches(cmd)) {
-        exitCode = new SetQuotaCommand(argv, i, fs).runAll();
+        exitCode = new SetQuotaCommand(argv, i, getDFS()).runAll();
       } else if (ClearSpaceQuotaCommand.matches(cmd)) {
-        exitCode = new ClearSpaceQuotaCommand(argv, i, fs).runAll();
+        exitCode = new ClearSpaceQuotaCommand(argv, i, getDFS()).runAll();
       } else if (SetSpaceQuotaCommand.matches(cmd)) {
-        exitCode = new SetSpaceQuotaCommand(argv, i, fs).runAll();
+        exitCode = new SetSpaceQuotaCommand(argv, i, getDFS()).runAll();
       } else if ("-refreshServiceAcl".equals(cmd)) {
         exitCode = refreshServiceAcl();
       } else if ("-refreshUserToGroupsMappings".equals(cmd)) {
diff --git a/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java b/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java
index 3003649..80e2927 100644
--- a/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java
+++ b/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java
@@ -80,5 +80,6 @@
     KEY_ID,
     KEY_EXPIRY_DATE,
     KEY_LENGTH,
-    KEY_BLOB
+    KEY_BLOB,
+    CHECKSUM
 }
diff --git a/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java b/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java
index bef2532..b98e68d 100644
--- a/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java
+++ b/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java
@@ -17,26 +17,15 @@
  */
 package org.apache.hadoop.hdfs.tools.offlineEditsViewer;
 
-import java.io.DataInputStream;
 import java.io.IOException;
-import java.io.EOFException;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
 
 import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.ByteToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.ShortToken;
 import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.IntToken;
 import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.VIntToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.LongToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.VLongToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.StringUTF8Token;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.StringTextToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.BlobToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.BytesWritableToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.EmptyToken;
 
 /**
  * EditsLoaderCurrent processes Hadoop EditLogs files and walks over
@@ -49,7 +38,7 @@
 class EditsLoaderCurrent implements EditsLoader {
 
   private static int [] supportedVersions = {
-    -18, -19, -20, -21, -22, -23, -24, -25, -26, -27 };
+    -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -30, -31 };
 
   private EditsVisitor v;
   private int editsVersion = 0;
@@ -464,6 +453,10 @@
         visitOpCode(editsOpCode);
 
         v.leaveEnclosingElement(); // DATA
+        
+        if (editsOpCode != FSEditLogOpCodes.OP_INVALID && editsVersion <= -28) {
+          v.visitInt(EditsElement.CHECKSUM);
+        }
         v.leaveEnclosingElement(); // RECORD
       } while(editsOpCode != FSEditLogOpCodes.OP_INVALID);
 
diff --git a/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java b/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
index c660514..64f8329 100644
--- a/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
+++ b/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
@@ -27,7 +27,6 @@
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
 import org.apache.hadoop.hdfs.tools.offlineImageViewer.ImageVisitor.ImageElement;
 import org.apache.hadoop.io.Text;
@@ -121,7 +120,7 @@
   protected final DateFormat dateFormat = 
                                       new SimpleDateFormat("yyyy-MM-dd HH:mm");
   private static int [] versions = 
-    {-16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27};
+    {-16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -30, -31};
   private int imageVersion = 0;
 
   /* (non-Javadoc)
@@ -334,35 +333,106 @@
       long numInodes, boolean skipBlocks) throws IOException {
     v.visitEnclosingElement(ImageElement.INODES,
         ImageElement.NUM_INODES, numInodes);
-
-    for(long i = 0; i < numInodes; i++) {
-      v.visitEnclosingElement(ImageElement.INODE);
-      v.visit(ImageElement.INODE_PATH, FSImageSerialization.readString(in));
-      v.visit(ImageElement.REPLICATION, in.readShort());
-      v.visit(ImageElement.MODIFICATION_TIME, formatDate(in.readLong()));
-      if(imageVersion <= -17) // added in version -17
-        v.visit(ImageElement.ACCESS_TIME, formatDate(in.readLong()));
-      v.visit(ImageElement.BLOCK_SIZE, in.readLong());
-      int numBlocks = in.readInt();
-
-      processBlocks(in, v, numBlocks, skipBlocks);
-
-      // File or directory
-      if (numBlocks > 0 || numBlocks == -1) {
-        v.visit(ImageElement.NS_QUOTA, numBlocks == -1 ? in.readLong() : -1);
-        if(imageVersion <= -18) // added in version -18
-          v.visit(ImageElement.DS_QUOTA, numBlocks == -1 ? in.readLong() : -1);
-      }
-      if (imageVersion <= -23 && numBlocks == -2) {
-        v.visit(ImageElement.SYMLINK, Text.readString(in));
-      }
-
-      processPermission(in, v);
-      v.leaveEnclosingElement(); // INode
+    
+    if (imageVersion <= -30) { // local file name
+      processLocalNameINodes(in, v, numInodes, skipBlocks);
+    } else { // full path name
+      processFullNameINodes(in, v, numInodes, skipBlocks);
     }
+
     
     v.leaveEnclosingElement(); // INodes
   }
+  
+  /**
+   * Process image with full path name
+   * 
+   * @param in image stream
+   * @param v visitor
+   * @param numInodes number of indoes to read
+   * @param skipBlocks skip blocks or not
+   * @throws IOException if there is any error occurs
+   */
+  private void processLocalNameINodes(DataInputStream in, ImageVisitor v,
+      long numInodes, boolean skipBlocks) throws IOException {
+    // process root
+    processINode(in, v, skipBlocks, "");
+    numInodes--;
+    while (numInodes > 0) {
+      numInodes -= processDirectory(in, v, skipBlocks);
+    }
+  }
+  
+  private int processDirectory(DataInputStream in, ImageVisitor v,
+     boolean skipBlocks) throws IOException {
+    String parentName = FSImageSerialization.readString(in);
+    int numChildren = in.readInt();
+    for (int i=0; i<numChildren; i++) {
+      processINode(in, v, skipBlocks, parentName);
+    }
+    return numChildren;
+  }
+  
+   /**
+    * Process image with full path name
+    * 
+    * @param in image stream
+    * @param v visitor
+    * @param numInodes number of indoes to read
+    * @param skipBlocks skip blocks or not
+    * @throws IOException if there is any error occurs
+    */
+   private void processFullNameINodes(DataInputStream in, ImageVisitor v,
+       long numInodes, boolean skipBlocks) throws IOException {
+     for(long i = 0; i < numInodes; i++) {
+       processINode(in, v, skipBlocks, null);
+     }
+   }
+   
+   /**
+    * Process an INode
+    * 
+    * @param in image stream
+    * @param v visitor
+    * @param skipBlocks skip blocks or not
+    * @param parentName the name of its parent node
+    * @return the number of Children
+    * @throws IOException
+    */
+  private void processINode(DataInputStream in, ImageVisitor v,
+      boolean skipBlocks, String parentName) throws IOException {
+    v.visitEnclosingElement(ImageElement.INODE);
+    String pathName = FSImageSerialization.readString(in);
+    if (parentName != null) {  // local name
+      pathName = "/" + pathName;
+      if (!"/".equals(parentName)) { // children of non-root directory
+        pathName = parentName + pathName;
+      }
+    }
+
+    v.visit(ImageElement.INODE_PATH, pathName);
+    v.visit(ImageElement.REPLICATION, in.readShort());
+    v.visit(ImageElement.MODIFICATION_TIME, formatDate(in.readLong()));
+    if(imageVersion <= -17) // added in version -17
+      v.visit(ImageElement.ACCESS_TIME, formatDate(in.readLong()));
+    v.visit(ImageElement.BLOCK_SIZE, in.readLong());
+    int numBlocks = in.readInt();
+
+    processBlocks(in, v, numBlocks, skipBlocks);
+
+    // File or directory
+    if (numBlocks > 0 || numBlocks == -1) {
+      v.visit(ImageElement.NS_QUOTA, numBlocks == -1 ? in.readLong() : -1);
+      if(imageVersion <= -18) // added in version -18
+        v.visit(ImageElement.DS_QUOTA, numBlocks == -1 ? in.readLong() : -1);
+    }
+    if (imageVersion <= -23 && numBlocks == -2) {
+      v.visit(ImageElement.SYMLINK, Text.readString(in));
+    }
+
+    processPermission(in, v);
+    v.leaveEnclosingElement(); // INode
+  }
 
   /**
    * Helper method to format dates during processing.
diff --git a/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java b/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
index 898cabf..de8be73 100644
--- a/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
+++ b/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
@@ -343,7 +343,7 @@
       if (!test.isSuccess() && p.contains(index, id)) {
         FiTestUtil.LOG.info(toString(id));
         if (maxDuration <= 0) {
-          for(; true; FiTestUtil.sleep(1000)); //sleep forever
+          for(; FiTestUtil.sleep(1000); ); //sleep forever until interrupt
         } else {
           FiTestUtil.sleep(minDuration, maxDuration);
         }
@@ -391,7 +391,7 @@
         + minDuration + "," + maxDuration + ")";
         FiTestUtil.LOG.info(s);
         if (maxDuration <= 1) {
-          for(; true; FiTestUtil.sleep(1000)); //sleep forever
+          for(; FiTestUtil.sleep(1000); ); //sleep forever until interrupt
         } else {
           FiTestUtil.sleep(minDuration, maxDuration);
         }
diff --git a/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java b/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
index 3f608a4..396f5fe 100644
--- a/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
+++ b/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
@@ -73,14 +73,17 @@
 
   /**
    * Sleep.
-   * If there is an InterruptedException, re-throw it as a RuntimeException.
+   * @return true if sleep exits normally; false if InterruptedException.
    */
-  public static void sleep(long ms) {
+  public static boolean sleep(long ms) {
+    LOG.info("Sleep " + ms + " ms");
     try {
       Thread.sleep(ms);
     } catch (InterruptedException e) {
-      throw new RuntimeException(e);
+      LOG.info("Sleep is interrupted", e);
+      return false;
     }
+    return true;
   }
 
   /**
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
index 2d68aac..73062de 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
@@ -45,6 +45,11 @@
 privileged public aspect BlockReceiverAspects {
   public static final Log LOG = LogFactory.getLog(BlockReceiverAspects.class);
 
+  BlockReceiver BlockReceiver.PacketResponder.getReceiver(){
+    LOG.info("FI: getReceiver() " + getClass().getName());
+    return BlockReceiver.this;
+  }
+
   pointcut callReceivePacket(BlockReceiver blockreceiver) :
     call(* receivePacket(..)) && target(blockreceiver);
 	
@@ -82,7 +87,7 @@
 
   after(BlockReceiver.PacketResponder responder)
       throws IOException: afterDownstreamStatusRead(responder) {
-    final DataNode d = responder.receiver.getDataNode();
+    final DataNode d = responder.getReceiver().getDataNode();
     DataTransferTest dtTest = DataTransferTestUtil.getDataTransferTest();
     if (dtTest != null)
       dtTest.fiAfterDownstreamStatusRead.run(d.getDatanodeId());
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
index 434e0cc..ed9de6d 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
@@ -22,18 +22,13 @@
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fi.DataTransferTestUtil;
-import org.apache.hadoop.fi.FiTestUtil;
-import org.apache.hadoop.fi.DataTransferTestUtil.DataNodeAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
-import org.apache.hadoop.fi.DataTransferTestUtil.DatanodeMarkingAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.DoosAction;
-import org.apache.hadoop.fi.DataTransferTestUtil.IoeAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.OomAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.VerificationAction;
+import org.apache.hadoop.fi.FiTestUtil;
 import org.apache.hadoop.fi.FiTestUtil.Action;
-import org.apache.hadoop.fi.FiTestUtil.ConstraintSatisfactionAction;
-import org.apache.hadoop.fi.FiTestUtil.MarkerConstraint;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -41,7 +36,9 @@
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -63,6 +60,10 @@
         REPLICATION, BLOCKSIZE);
   }
 
+  {
+    ((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL);
+  }
+
   /**
    * 1. create files with dfs
    * 2. write 1 byte
@@ -70,9 +71,9 @@
    * 4. open the same file
    * 5. read the 1 byte and compare results
    */
-  private static void write1byte(String methodName) throws IOException {
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(REPLICATION).format(true).build();
+  static void write1byte(String methodName) throws IOException {
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
+        ).numDataNodes(REPLICATION + 1).build();
     final FileSystem dfs = cluster.getFileSystem();
     try {
       final Path p = new Path("/" + methodName + "/foo");
@@ -305,184 +306,4 @@
     final String methodName = FiTestUtil.getMethodName();
     runCallWritePacketToDisk(methodName, 2, new DoosAction(methodName, 2));
   }
-
-  private static void runPipelineCloseTest(String methodName,
-      Action<DatanodeID, IOException> a) throws IOException {
-    FiTestUtil.LOG.info("Running " + methodName + " ...");
-    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
-        .initTest();
-    t.fiPipelineClose.set(a);
-    write1byte(methodName);
-  }
-
-  private static void run41_43(String name, int i) throws IOException {
-    runPipelineCloseTest(name, new SleepAction(name, i, 3000));
-  }
-
-  private static void runPipelineCloseAck(String name, int i, DataNodeAction a
-      ) throws IOException {
-    FiTestUtil.LOG.info("Running " + name + " ...");
-    final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest();
-    final MarkerConstraint marker = new MarkerConstraint(name);
-    t.fiPipelineClose.set(new DatanodeMarkingAction(name, i, marker));
-    t.fiPipelineAck.set(new ConstraintSatisfactionAction<DatanodeID, IOException>(a, marker));
-    write1byte(name);
-  }
-
-  private static void run39_40(String name, int i) throws IOException {
-    runPipelineCloseAck(name, i, new SleepAction(name, i, 0));
-  }
-
-  /**
-   * Pipeline close:
-   * DN1 never responses after received close ack DN2.
-   * Client gets an IOException and determine DN1 bad.
-   */
-  @Test
-  public void pipeline_Fi_39() throws IOException {
-    run39_40(FiTestUtil.getMethodName(), 1);
-  }
-
-  /**
-   * Pipeline close:
-   * DN0 never responses after received close ack DN1.
-   * Client gets an IOException and determine DN0 bad.
-   */
-  @Test
-  public void pipeline_Fi_40() throws IOException {
-    run39_40(FiTestUtil.getMethodName(), 0);
-  }
-  
-  /**
-   * Pipeline close with DN0 very slow but it won't lead to timeout.
-   * Client finishes close successfully.
-   */
-  @Test
-  public void pipeline_Fi_41() throws IOException {
-    run41_43(FiTestUtil.getMethodName(), 0);
-  }
-
-  /**
-   * Pipeline close with DN1 very slow but it won't lead to timeout.
-   * Client finishes close successfully.
-   */
-  @Test
-  public void pipeline_Fi_42() throws IOException {
-    run41_43(FiTestUtil.getMethodName(), 1);
-  }
-
-  /**
-   * Pipeline close with DN2 very slow but it won't lead to timeout.
-   * Client finishes close successfully.
-   */
-  @Test
-  public void pipeline_Fi_43() throws IOException {
-    run41_43(FiTestUtil.getMethodName(), 2);
-  }
-
-  /**
-   * Pipeline close:
-   * DN0 throws an OutOfMemoryException
-   * right after it received a close request from client.
-   * Client gets an IOException and determine DN0 bad.
-   */
-  @Test
-  public void pipeline_Fi_44() throws IOException {
-    final String methodName = FiTestUtil.getMethodName();
-    runPipelineCloseTest(methodName, new OomAction(methodName, 0));
-  }
-
-  /**
-   * Pipeline close:
-   * DN1 throws an OutOfMemoryException
-   * right after it received a close request from client.
-   * Client gets an IOException and determine DN1 bad.
-   */
-  @Test
-  public void pipeline_Fi_45() throws IOException {
-    final String methodName = FiTestUtil.getMethodName();
-    runPipelineCloseTest(methodName, new OomAction(methodName, 1));
-  }
-
-  /**
-   * Pipeline close:
-   * DN2 throws an OutOfMemoryException
-   * right after it received a close request from client.
-   * Client gets an IOException and determine DN2 bad.
-   */
-  @Test
-  public void pipeline_Fi_46() throws IOException {
-    final String methodName = FiTestUtil.getMethodName();
-    runPipelineCloseTest(methodName, new OomAction(methodName, 2));
-  }
-
-  private static void run47_48(String name, int i) throws IOException {
-    runPipelineCloseAck(name, i, new OomAction(name, i));
-  }
-
-  /**
-   * Pipeline close:
-   * DN1 throws an OutOfMemoryException right after
-   * it received a close ack from DN2.
-   * Client gets an IOException and determine DN1 bad.
-   */
-  @Test
-  public void pipeline_Fi_47() throws IOException {
-    run47_48(FiTestUtil.getMethodName(), 1);
-  }
-
-  /**
-   * Pipeline close:
-   * DN0 throws an OutOfMemoryException right after
-   * it received a close ack from DN1.
-   * Client gets an IOException and determine DN0 bad.
-   */
-  @Test
-  public void pipeline_Fi_48() throws IOException {
-    run47_48(FiTestUtil.getMethodName(), 0);
-  }
-
-  private static void runBlockFileCloseTest(String methodName,
-      Action<DatanodeID, IOException> a) throws IOException {
-    FiTestUtil.LOG.info("Running " + methodName + " ...");
-    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
-        .initTest();
-    t.fiBlockFileClose.set(a);
-    write1byte(methodName);
-  }
-
-  private static void run49_51(String name, int i) throws IOException {
-    runBlockFileCloseTest(name, new IoeAction(name, i, "DISK ERROR"));
-  }
-
-  /**
-   * Pipeline close:
-   * DN0 throws a disk error exception when it is closing the block file.
-   * Client gets an IOException and determine DN0 bad.
-   */
-  @Test
-  public void pipeline_Fi_49() throws IOException {
-    run49_51(FiTestUtil.getMethodName(), 0);
-  }
-
-
-  /**
-   * Pipeline close:
-   * DN1 throws a disk error exception when it is closing the block file.
-   * Client gets an IOException and determine DN1 bad.
-   */
-  @Test
-  public void pipeline_Fi_50() throws IOException {
-    run49_51(FiTestUtil.getMethodName(), 1);
-  }
-
-  /**
-   * Pipeline close:
-   * DN2 throws a disk error exception when it is closing the block file.
-   * Client gets an IOException and determine DN2 bad.
-   */
-  @Test
-  public void pipeline_Fi_51() throws IOException {
-    run49_51(FiTestUtil.getMethodName(), 2);
-  }
-}
\ No newline at end of file
+}
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
index 46caea4..d2868b7 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
@@ -23,13 +23,13 @@
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fi.DataTransferTestUtil;
-import org.apache.hadoop.fi.FiTestUtil;
 import org.apache.hadoop.fi.DataTransferTestUtil.CountdownDoosAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.CountdownOomAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.CountdownSleepAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
 import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.VerificationAction;
+import org.apache.hadoop.fi.FiTestUtil;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -37,9 +37,8 @@
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.datanode.BlockReceiver;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.log4j.Level;
-
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -71,6 +70,7 @@
   {
     ((Log4JLogger) BlockReceiver.LOG).getLogger().setLevel(Level.ALL);
     ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL);
   }
   /**
    * 1. create files with dfs
@@ -88,8 +88,8 @@
     FiTestUtil.LOG.info("size=" + size + ", nPackets=" + nPackets
         + ", lastPacketSize=" + lastPacketSize);
 
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(REPLICATION).build();
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
+        ).numDataNodes(REPLICATION + 1).build();
     final FileSystem dfs = cluster.getFileSystem();
     try {
       final Path p = new Path("/" + methodName + "/foo");
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
index 0f39595..1468222 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
@@ -19,76 +19,29 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fi.DataTransferTestUtil;
-import org.apache.hadoop.fi.FiTestUtil;
+import org.apache.hadoop.fi.DataTransferTestUtil.DataNodeAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
+import org.apache.hadoop.fi.DataTransferTestUtil.DatanodeMarkingAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.IoeAction;
+import org.apache.hadoop.fi.DataTransferTestUtil.OomAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
+import org.apache.hadoop.fi.FiTestUtil;
 import org.apache.hadoop.fi.FiTestUtil.Action;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fi.FiTestUtil.ConstraintSatisfactionAction;
+import org.apache.hadoop.fi.FiTestUtil.MarkerConstraint;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.junit.Assert;
 import org.junit.Test;
 
 /** Test DataTransferProtocol with fault injection. */
 public class TestFiPipelineClose {
-  static final short REPLICATION = 3;
-  static final long BLOCKSIZE = 1L * (1L << 20);
-
-  static final Configuration conf = new HdfsConfiguration();
-  static {
-    conf.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 1);
-    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION);
-    conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 5000);
-  }
-
-  static private FSDataOutputStream createFile(FileSystem fs, Path p
-      ) throws IOException {
-    return fs.create(p, true, fs.getConf().getInt("io.file.buffer.size", 4096),
-        REPLICATION, BLOCKSIZE);
-  }
-
-  /**
-   * 1. create files with dfs
-   * 2. write 1 byte
-   * 3. close file
-   * 4. open the same file
-   * 5. read the 1 byte and compare results
-   */
-  private static void write1byte(String methodName) throws IOException {
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(REPLICATION).format(true).build();
-    final FileSystem dfs = cluster.getFileSystem();
-    try {
-      final Path p = new Path("/" + methodName + "/foo");
-      final FSDataOutputStream out = createFile(dfs, p);
-      out.write(1);
-      out.close();
-      
-      final FSDataInputStream in = dfs.open(p);
-      final int b = in.read();
-      in.close();
-      Assert.assertEquals(1, b);
-    }
-    finally {
-      dfs.close();
-      cluster.shutdown();
-    }
-  }
-
-   private static void runPipelineCloseTest(String methodName,
+  private static void runPipelineCloseTest(String methodName,
       Action<DatanodeID, IOException> a) throws IOException {
     FiTestUtil.LOG.info("Running " + methodName + " ...");
     final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
         .initTest();
     t.fiPipelineClose.set(a);
-    write1byte(methodName);
+    TestFiDataTransferProtocol.write1byte(methodName);
   }
 
   /**
@@ -123,4 +76,175 @@
     final String methodName = FiTestUtil.getMethodName();
     runPipelineCloseTest(methodName, new SleepAction(methodName, 2, 0));
   }
+
+  private static void run41_43(String name, int i) throws IOException {
+    runPipelineCloseTest(name, new SleepAction(name, i, 3000));
+  }
+
+  private static void runPipelineCloseAck(String name, int i, DataNodeAction a
+      ) throws IOException {
+    FiTestUtil.LOG.info("Running " + name + " ...");
+    final DataTransferTest t = (DataTransferTest)DataTransferTestUtil.initTest();
+    final MarkerConstraint marker = new MarkerConstraint(name);
+    t.fiPipelineClose.set(new DatanodeMarkingAction(name, i, marker));
+    t.fiPipelineAck.set(new ConstraintSatisfactionAction<DatanodeID, IOException>(a, marker));
+    TestFiDataTransferProtocol.write1byte(name);
+  }
+
+  private static void run39_40(String name, int i) throws IOException {
+    runPipelineCloseAck(name, i, new SleepAction(name, i, 0));
+  }
+
+  /**
+   * Pipeline close:
+   * DN1 never responses after received close ack DN2.
+   * Client gets an IOException and determine DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_39() throws IOException {
+    run39_40(FiTestUtil.getMethodName(), 1);
+  }
+
+  /**
+   * Pipeline close:
+   * DN0 never responses after received close ack DN1.
+   * Client gets an IOException and determine DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_40() throws IOException {
+    run39_40(FiTestUtil.getMethodName(), 0);
+  }
+  
+  /**
+   * Pipeline close with DN0 very slow but it won't lead to timeout.
+   * Client finishes close successfully.
+   */
+  @Test
+  public void pipeline_Fi_41() throws IOException {
+    run41_43(FiTestUtil.getMethodName(), 0);
+  }
+
+  /**
+   * Pipeline close with DN1 very slow but it won't lead to timeout.
+   * Client finishes close successfully.
+   */
+  @Test
+  public void pipeline_Fi_42() throws IOException {
+    run41_43(FiTestUtil.getMethodName(), 1);
+  }
+
+  /**
+   * Pipeline close with DN2 very slow but it won't lead to timeout.
+   * Client finishes close successfully.
+   */
+  @Test
+  public void pipeline_Fi_43() throws IOException {
+    run41_43(FiTestUtil.getMethodName(), 2);
+  }
+
+  /**
+   * Pipeline close:
+   * DN0 throws an OutOfMemoryException
+   * right after it received a close request from client.
+   * Client gets an IOException and determine DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_44() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runPipelineCloseTest(methodName, new OomAction(methodName, 0));
+  }
+
+  /**
+   * Pipeline close:
+   * DN1 throws an OutOfMemoryException
+   * right after it received a close request from client.
+   * Client gets an IOException and determine DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_45() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runPipelineCloseTest(methodName, new OomAction(methodName, 1));
+  }
+
+  /**
+   * Pipeline close:
+   * DN2 throws an OutOfMemoryException
+   * right after it received a close request from client.
+   * Client gets an IOException and determine DN2 bad.
+   */
+  @Test
+  public void pipeline_Fi_46() throws IOException {
+    final String methodName = FiTestUtil.getMethodName();
+    runPipelineCloseTest(methodName, new OomAction(methodName, 2));
+  }
+
+  private static void run47_48(String name, int i) throws IOException {
+    runPipelineCloseAck(name, i, new OomAction(name, i));
+  }
+
+  /**
+   * Pipeline close:
+   * DN1 throws an OutOfMemoryException right after
+   * it received a close ack from DN2.
+   * Client gets an IOException and determine DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_47() throws IOException {
+    run47_48(FiTestUtil.getMethodName(), 1);
+  }
+
+  /**
+   * Pipeline close:
+   * DN0 throws an OutOfMemoryException right after
+   * it received a close ack from DN1.
+   * Client gets an IOException and determine DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_48() throws IOException {
+    run47_48(FiTestUtil.getMethodName(), 0);
+  }
+
+  private static void runBlockFileCloseTest(String methodName,
+      Action<DatanodeID, IOException> a) throws IOException {
+    FiTestUtil.LOG.info("Running " + methodName + " ...");
+    final DataTransferTest t = (DataTransferTest) DataTransferTestUtil
+        .initTest();
+    t.fiBlockFileClose.set(a);
+    TestFiDataTransferProtocol.write1byte(methodName);
+  }
+
+  private static void run49_51(String name, int i) throws IOException {
+    runBlockFileCloseTest(name, new IoeAction(name, i, "DISK ERROR"));
+  }
+
+  /**
+   * Pipeline close:
+   * DN0 throws a disk error exception when it is closing the block file.
+   * Client gets an IOException and determine DN0 bad.
+   */
+  @Test
+  public void pipeline_Fi_49() throws IOException {
+    run49_51(FiTestUtil.getMethodName(), 0);
+  }
+
+
+  /**
+   * Pipeline close:
+   * DN1 throws a disk error exception when it is closing the block file.
+   * Client gets an IOException and determine DN1 bad.
+   */
+  @Test
+  public void pipeline_Fi_50() throws IOException {
+    run49_51(FiTestUtil.getMethodName(), 1);
+  }
+
+  /**
+   * Pipeline close:
+   * DN2 throws a disk error exception when it is closing the block file.
+   * Client gets an IOException and determine DN2 bad.
+   */
+  @Test
+  public void pipeline_Fi_51() throws IOException {
+    run49_51(FiTestUtil.getMethodName(), 2);
+  }
 }
diff --git a/src/test/hdfs/org/apache/hadoop/cli/CLITestCmdDFS.java b/src/test/hdfs/org/apache/hadoop/cli/CLITestCmdDFS.java
new file mode 100644
index 0000000..ece2261
--- /dev/null
+++ b/src/test/hdfs/org/apache/hadoop/cli/CLITestCmdDFS.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cli;
+
+import org.apache.hadoop.cli.util.*;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
+
+public class CLITestCmdDFS extends CLITestCmd {
+  public CLITestCmdDFS(String str, CLICommandTypes type) {
+    super(str, type);
+  }
+
+  @Override
+  public CommandExecutor getExecutor(String tag) throws IllegalArgumentException {
+    if (getType() instanceof CLICommandDFSAdmin)
+      return new FSCmdExecutor(tag, new DFSAdmin());
+    return super.getExecutor(tag);
+  }
+}
diff --git a/src/test/hdfs/org/apache/hadoop/cli/CLITestHelperDFS.java b/src/test/hdfs/org/apache/hadoop/cli/CLITestHelperDFS.java
new file mode 100644
index 0000000..fc3567e
--- /dev/null
+++ b/src/test/hdfs/org/apache/hadoop/cli/CLITestHelperDFS.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cli;
+
+import org.apache.hadoop.cli.util.CLICommandDFSAdmin;
+import org.apache.hadoop.cli.util.CLITestCmd;
+import org.xml.sax.SAXException;
+
+public class CLITestHelperDFS extends CLITestHelper {
+
+  @Override
+  protected TestConfigFileParser getConfigParser() {
+    return new TestConfigFileParserDFS();
+  }
+
+  class TestConfigFileParserDFS extends CLITestHelper.TestConfigFileParser {
+    @Override
+    public void endElement(String uri, String localName, String qName)
+        throws SAXException {
+      if (qName.equals("dfs-admin-command")) {
+        if (testCommands != null) {
+          testCommands.add(new CLITestCmdDFS(charString,
+              new CLICommandDFSAdmin()));
+        } else if (cleanupCommands != null) {
+          cleanupCommands.add(new CLITestCmdDFS(charString,
+              new CLICommandDFSAdmin()));
+        }
+      } else {
+        super.endElement(uri, localName, qName);
+      }
+    }
+  }
+}
diff --git a/src/test/hdfs/org/apache/hadoop/cli/CmdFactoryDFS.java b/src/test/hdfs/org/apache/hadoop/cli/CmdFactoryDFS.java
deleted file mode 100644
index 4d615f9..0000000
--- a/src/test/hdfs/org/apache/hadoop/cli/CmdFactoryDFS.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.cli;
-
-import org.apache.hadoop.cli.util.CLICommands;
-import org.apache.hadoop.cli.util.CLITestData;
-import org.apache.hadoop.cli.util.CmdFactory;
-import org.apache.hadoop.cli.util.CommandExecutor;
-import org.apache.hadoop.hdfs.tools.DFSAdmin;
-
-public abstract class CmdFactoryDFS extends CmdFactory {
-  public static CommandExecutor getCommandExecutor(CLITestData.TestCmd cmd,
-                                            String tag)
-    throws IllegalArgumentException {
-    CommandExecutor executor;
-    switch (cmd.getType()) {
-      case DFSADMIN:
-        executor = new CLICommands.FSCmdExecutor(tag, new DFSAdmin());
-        break;
-      default:
-        executor = CmdFactory.getCommandExecutor(cmd, tag);
-    }
-    return executor;
-  }
-}
diff --git a/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java b/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
index 4f73b23..45e3bdc 100644
--- a/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
+++ b/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.cli;
 
-import org.apache.hadoop.cli.util.CLITestData.TestCmd;
+import org.apache.hadoop.cli.util.CLICommand;
 import org.apache.hadoop.cli.util.CommandExecutor.Result;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -31,7 +31,7 @@
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestHDFSCLI extends CLITestHelper {
+public class TestHDFSCLI extends CLITestHelperDFS {
 
   protected MiniDFSCluster dfsCluster = null;
   protected DistributedFileSystem dfs = null;
@@ -85,13 +85,13 @@
   protected String expandCommand(final String cmd) {
     String expCmd = cmd;
     expCmd = expCmd.replaceAll("NAMENODE", namenode);
-    expCmd = super.expandCommand(cmd);
+    expCmd = super.expandCommand(expCmd);
     return expCmd;
   }
   
   @Override
-  protected Result execute(TestCmd cmd) throws Exception {
-    return CmdFactoryDFS.getCommandExecutor(cmd, namenode).executeCommand(cmd.getCmd());
+  protected Result execute(CLICommand cmd) throws Exception {
+    return cmd.getExecutor(namenode).executeCommand(cmd.getCmd());
   }
 
   @Test
diff --git a/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml b/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml
index 3ec1eed..7da99b5 100644
--- a/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml
+++ b/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml
@@ -7068,7 +7068,7 @@
       <comparators>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>Can not find listing for /file1</expected-output>
+          <expected-output>count: Can not find listing for /file1</expected-output>
         </comparator>
       </comparators>
     </test>
@@ -7083,7 +7083,7 @@
       <comparators>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>Can not find listing for file1</expected-output>
+          <expected-output>count: Can not find listing for file1</expected-output>
         </comparator>
       </comparators>
     </test>
@@ -7426,7 +7426,7 @@
       <comparators>
         <comparator>
          <type>TokenComparator</type>
-          <expected-output>Can not find listing for /file1</expected-output>
+          <expected-output>count: Can not find listing for /file1</expected-output>
        </comparator>
      </comparators>
    </test>
@@ -7441,7 +7441,7 @@
       <comparators>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>Can not find listing for file1</expected-output>
+          <expected-output>count: Can not find listing for file1</expected-output>
         </comparator>
       </comparators>
     </test>
@@ -7778,7 +7778,7 @@
       <comparators>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>Can not find listing for hdfs:///file1</expected-output>
+          <expected-output>count: Can not find listing for hdfs:/file1</expected-output>
         </comparator>
       </comparators>
     </test>
@@ -7957,7 +7957,7 @@
       <comparators>
         <comparator>
           <type>TokenComparator</type>
-          <expected-output>Can not find listing for hdfs:///file1</expected-output>
+          <expected-output>count: Can not find listing for hdfs:/file1</expected-output>
         </comparator>
       </comparators>
     </test>
@@ -8150,7 +8150,7 @@
       <comparators>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>Can not find listing for hdfs://\w+[.a-z]*:[0-9]+/file1</expected-output>
+          <expected-output>count: Can not find listing for hdfs://\w+[.a-z]*:[0-9]+/file1</expected-output>
         </comparator>
       </comparators>
     </test>
@@ -8329,7 +8329,7 @@
       <comparators>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>Can not find listing for hdfs://\w+[.a-z]*:[0-9]+/file1</expected-output>
+          <expected-output>count: Can not find listing for hdfs://\w+[.a-z]*:[0-9]+/file1</expected-output>
         </comparator>
       </comparators>
     </test>
@@ -15096,741 +15096,6 @@
       </comparators>
     </test>
 
-    <test> <!-- TESTED -->
-      <description>help: help for ls</description>
-      <test-commands>
-        <command>-fs NAMENODE -help ls</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-ls &lt;path&gt;:( |\t)*List the contents that match the specified file pattern. If( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*path is not specified, the contents of /user/&lt;currentUser&gt;( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*path is not specified, the contents of /user/&lt;currentUser&gt;( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*will be listed. Directory entries are of the form( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*dirName \(full path\) &lt;dir&gt;( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*and file entries are of the form( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*fileName\(full path\) &lt;r n&gt; size( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*where n is the number of replicas specified for the file( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*and size is the size of the file, in bytes.( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for lsr</description>
-      <test-commands>
-        <command>-fs NAMENODE -help lsr</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-lsr &lt;path&gt;:( |\t)*Recursively list the contents that match the specified( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*file pattern.( |\t)*Behaves very similarly to hadoop fs -ls,( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*except that the data is shown for all the entries in the( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*subtree.( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for get</description>
-      <test-commands>
-        <command>-fs NAMENODE -help get</command>
-      </test-commands>
-      <cleanup-commands>
-        <!-- No cleanup -->
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-get( )*\[-ignoreCrc\]( )*\[-crc\]( )*&lt;src&gt; &lt;localdst&gt;:( |\t)*Copy files that match the file pattern &lt;src&gt;( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*to the local name.( )*&lt;src&gt; is kept.( )*When copying mutiple,( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*files, the destination must be a directory.( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for du</description>
-      <test-commands>
-        <command>-fs NAMENODE -help du</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-du \[-s\] \[-h\] &lt;path&gt;:\s+Show the amount of space, in bytes, used by the files that\s*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^\s*match the specified file pattern. The following flags are optional:</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^\s*-s\s*Rather than showing the size of each individual file that</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^\s*matches the pattern, shows the total \(summary\) size.</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^\s*-h\s*Formats the sizes of files in a human-readable fashion</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>\s*rather than a number of bytes.</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^\s*Note that, even without the -s option, this only shows size summaries</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^\s*one level deep into a directory.</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^\s*The output is in the form </expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^\s*size\s+name\(full path\)\s*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for dus</description>
-      <test-commands>
-        <command>-fs NAMENODE -help dus</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-dus &lt;path&gt;:( |\t)*Show the amount of space, in bytes, used by the files that( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*match the specified file pattern. This is equivalent to -du -s above.</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for count</description>
-      <test-commands>
-        <command>-fs NAMENODE -help count</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-count\[-q\] &lt;path&gt;: Count the number of directories, files and bytes under the paths( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*that match the specified file pattern.  The output columns are:( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*DIR_COUNT FILE_COUNT CONTENT_SIZE FILE_NAME or( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*QUOTA REMAINING_QUATA SPACE_QUOTA REMAINING_SPACE_QUOTA( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*DIR_COUNT FILE_COUNT CONTENT_SIZE FILE_NAME( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-   <test> <!-- TESTED -->
-      <description>help: help for mv</description>
-      <test-commands>
-        <command>-fs NAMENODE -help mv</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-mv &lt;src&gt; &lt;dst&gt;:( |\t)*Move files that match the specified file pattern &lt;src&gt;( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*to a destination &lt;dst&gt;.  When moving multiple files, the( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*destination must be a directory.( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for cp</description>
-      <test-commands>
-        <command>-fs NAMENODE -help cp</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-cp &lt;src&gt; &lt;dst&gt;:( |\t)*Copy files that match the file pattern &lt;src&gt; to a( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*destination.  When copying multiple files, the destination( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*must be a directory.( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for rm</description>
-      <test-commands>
-        <command>-fs NAMENODE -help rm</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-rm \[-skipTrash\] &lt;src&gt;:( |\t)*Delete all files that match the specified file pattern.( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*Equivalent to the Unix command "rm &lt;src&gt;"( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*-skipTrash option bypasses trash, if enabled, and immediately( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*deletes &lt;src&gt;( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for rmr</description>
-      <test-commands>
-        <command>-fs NAMENODE -help rmr</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-rmr \[-skipTrash\] &lt;src&gt;:( |\t)*Remove all directories which match the specified file( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*pattern. Equivalent to the Unix command "rm -rf &lt;src&gt;"( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*-skipTrash option bypasses trash, if enabled, and immediately( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*deletes &lt;src&gt;( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-   <test> <!-- TESTED -->
-      <description>help: help for put</description>
-      <test-commands>
-        <command>-fs NAMENODE -help put</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-put &lt;localsrc&gt; ... &lt;dst&gt;:( |\t)*Copy files from the local file system( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*into fs.( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for copyFromLocal</description>
-      <test-commands>
-        <command>-fs NAMENODE -help copyFromLocal</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-copyFromLocal &lt;localsrc&gt; ... &lt;dst&gt;:( )*Identical to the -put command.( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for moveFromLocal</description>
-      <test-commands>
-        <command>-fs NAMENODE -help moveFromLocal</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-moveFromLocal &lt;localsrc&gt; ... &lt;dst&gt;: Same as -put, except that the source is( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*deleted after it's copied.( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for get</description>
-      <test-commands>
-        <command>-fs NAMENODE -help get</command>
-      </test-commands>
-      <cleanup-commands>
-        <!-- No cleanup -->
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-get( )*\[-ignoreCrc\]( )*\[-crc\]( )*&lt;src&gt; &lt;localdst&gt;:( |\t)*Copy files that match the file pattern &lt;src&gt;( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*to the local name.( )*&lt;src&gt; is kept.( )*When copying mutiple,( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*files, the destination must be a directory.( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for getmerge</description>
-      <test-commands>
-        <command>-fs NAMENODE -help getmerge</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-getmerge &lt;src&gt; &lt;localdst&gt;:  Get all the files in the directories that( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*match the source file pattern and merge and sort them to only( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*one file on local fs. &lt;src&gt; is kept.( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for cat</description>
-      <test-commands>
-        <command>-fs NAMENODE -help cat</command>
-      </test-commands>
-      <cleanup-commands>
-        <!-- No cleanup -->
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-cat &lt;src&gt;:( |\t)*Fetch all files that match the file pattern &lt;src&gt;( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*and display their content on stdout.</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for copyToLocal</description>
-      <test-commands>
-        <command>-fs NAMENODE -help copyToLocal</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-copyToLocal \[-ignoreCrc\] \[-crc\] &lt;src&gt; &lt;localdst&gt;:( )*Identical to the -get command.( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for moveToLocal</description>
-      <test-commands>
-        <command>-fs NAMENODE -help moveToLocal</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-moveToLocal &lt;src&gt; &lt;localdst&gt;:( )*Not implemented yet( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for mkdir</description>
-      <test-commands>
-        <command>-fs NAMENODE -help mkdir</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-mkdir &lt;path&gt;:( |\t)*Create a directory in specified location.( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for setrep</description>
-      <test-commands>
-        <command>-fs NAMENODE -help setrep</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-setrep \[-R\] \[-w\] &lt;rep&gt; &lt;path/file&gt;:( )*Set the replication level of a file.( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*The -R flag requests a recursive change of replication level( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*for an entire tree.( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for touchz</description>
-      <test-commands>
-        <command>-fs NAMENODE -help touchz</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-touchz &lt;path&gt;: Creates a file of zero length( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*at &lt;path&gt; with current time as the timestamp of that &lt;path&gt;.( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)* An error is returned if the file exists with non-zero length( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for test</description>
-      <test-commands>
-        <command>-fs NAMENODE -help test</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-test -\[ezd\] &lt;path&gt;: If file \{ exists, has zero length, is a directory( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*then return 0, else return 1.( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for stat</description>
-      <test-commands>
-        <command>-fs NAMENODE -help stat</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-stat \[format\] &lt;path&gt;: Print statistics about the file/directory at &lt;path&gt;( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*in the specified format. Format accepts filesize in blocks \(%b\), filename \(%n\),( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*block size \(%o\), replication \(%r\), modification date \(%y, %Y\)( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for tail</description>
-      <test-commands>
-        <command>-fs NAMENODE -help tail</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-tail \[-f\] &lt;file&gt;:  Show the last 1KB of the file.( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*The -f option shows apended data as the file grows.( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for chmod</description>
-      <test-commands>
-        <command>-fs NAMENODE -help chmod</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-chmod \[-R\] &lt;MODE\[,MODE\]... \| OCTALMODE&gt; PATH...( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*Changes permissions of a file.( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*This works similar to shell's chmod with a few exceptions.( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*-R( |\t)*modifies the files recursively. This is the only option( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*currently supported.( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*MODE( |\t)*Mode is same as mode used for chmod shell command.( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*Only letters recognized are 'rwxXt'. E.g. \+t,a\+r,g-w,\+rwx,o=r( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*OCTALMODE Mode specifed in 3 or 4 digits. If 4 digits, the first may( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*be 1 or 0 to turn the sticky bit on or off, respectively.( )*Unlike( |\t)*shell command, it is not possible to specify only part of the mode( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*E.g. 754 is same as u=rwx,g=rx,o=r( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*If none of 'augo' is specified, 'a' is assumed and unlike( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*shell command, no umask is applied.( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for chown</description>
-      <test-commands>
-        <command>-fs NAMENODE -help chown</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-chown \[-R\] \[OWNER\]\[:\[GROUP\]\] PATH...( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*Changes owner and group of a file.( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*This is similar to shell's chown with a few exceptions.( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*-R( |\t)*modifies the files recursively. This is the only option( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*currently supported.( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*If only owner or group is specified then only owner or( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*group is modified.( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*The owner and group names may only cosists of digits, alphabet,( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*and any of '-_.@/' i.e. \[-_.@/a-zA-Z0-9\]. The names are case( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*sensitive.( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*WARNING: Avoid using '.' to separate user name and group though( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*Linux allows it. If user names have dots in them and you are( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*using local file system, you might see surprising results since( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*shell command 'chown' is used for local files.( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for chgrp</description>
-      <test-commands>
-        <command>-fs NAMENODE -help chgrp</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-chgrp \[-R\] GROUP PATH...( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*This is equivalent to -chown ... :GROUP ...( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
-    <test> <!-- TESTED -->
-      <description>help: help for help</description>
-      <test-commands>
-        <command>-fs NAMENODE -help help</command>
-      </test-commands>
-      <cleanup-commands>
-      </cleanup-commands>
-      <comparators>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^-help \[cmd\]:( |\t)*Displays help for given command or all commands if none( )*</expected-output>
-        </comparator>
-        <comparator>
-          <type>RegexpComparator</type>
-          <expected-output>^( |\t)*is specified.( )*</expected-output>
-        </comparator>
-      </comparators>
-    </test>
-
     <test> <!--Tested -->
       <description>help: help for dfsadmin report</description>
       <test-commands>
@@ -16233,7 +15498,7 @@
       <comparators>
         <comparator>
           <type>SubstringComparator</type>
-          <expected-output>Can not find listing for /test1</expected-output>
+          <expected-output>setSpaceQuota: Directory does not exist: /test1</expected-output>
         </comparator>
       </comparators>
     </test>
@@ -16299,7 +15564,7 @@
       <comparators>
         <comparator>
           <type>SubstringComparator</type>
-          <expected-output>Can not find listing for /test1</expected-output>
+          <expected-output>clrQuota: Directory does not exist: /test1</expected-output>
         </comparator>
       </comparators>
     </test>
diff --git a/src/test/hdfs/org/apache/hadoop/cli/util/CLICommandDFSAdmin.java b/src/test/hdfs/org/apache/hadoop/cli/util/CLICommandDFSAdmin.java
new file mode 100644
index 0000000..03c441c
--- /dev/null
+++ b/src/test/hdfs/org/apache/hadoop/cli/util/CLICommandDFSAdmin.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cli.util;
+
+public class CLICommandDFSAdmin implements CLICommandTypes {
+}
diff --git a/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsSymlink.java b/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsSymlink.java
index 4e7b12e..c68cef6 100644
--- a/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsSymlink.java
+++ b/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsSymlink.java
@@ -241,5 +241,17 @@
     } catch (IOException x) {
       // Expected
     }
-  } 
-}
\ No newline at end of file
+  }
+
+  @Test
+  /** Test symlink owner */
+  public void testLinkOwner() throws IOException {
+    Path file = new Path(testBaseDir1(), "file");
+    Path link = new Path(testBaseDir1(), "symlinkToFile");
+    createAndWriteFile(file);
+    fc.createSymlink(file, link, false);
+    FileStatus stat_file = fc.getFileStatus(file);
+    FileStatus stat_link = fc.getFileStatus(link);
+    assertEquals(stat_link.getOwner(), stat_file.getOwner());
+  }
+}
diff --git a/src/test/hdfs/org/apache/hadoop/fs/TestResolveHdfsSymlink.java b/src/test/hdfs/org/apache/hadoop/fs/TestResolveHdfsSymlink.java
new file mode 100644
index 0000000..0c17dc6
--- /dev/null
+++ b/src/test/hdfs/org/apache/hadoop/fs/TestResolveHdfsSymlink.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests whether FileContext can resolve an hdfs path that has a symlink to
+ * local file system. Also tests getDelegationTokens API in file context with
+ * underlying file system as Hdfs.
+ */
+public class TestResolveHdfsSymlink {
+  private static MiniDFSCluster cluster = null;
+
+  @BeforeClass
+  public static void setUp() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    cluster = new MiniDFSCluster.Builder(conf).build();
+    cluster.getNamesystem().getDelegationTokenSecretManager().startThreads();
+    cluster.waitActive();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Tests resolution of an hdfs symlink to the local file system.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testFcResolveAfs() throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+    FileContext fcLocal = FileContext.getLocalFSFileContext();
+    FileContext fcHdfs = FileContext.getFileContext(cluster.getFileSystem()
+        .getUri());
+
+    Path alphaLocalPath = new Path(fcLocal.getDefaultFileSystem().getUri()
+        .toString(), "/tmp/alpha");
+    DFSTestUtil.createFile(FileSystem.getLocal(conf), alphaLocalPath, 16,
+        (short) 1, 2);
+
+    Path linkTarget = new Path(fcLocal.getDefaultFileSystem().getUri()
+        .toString(), "/tmp");
+    Path hdfsLink = new Path(fcHdfs.getDefaultFileSystem().getUri().toString(),
+        "/tmp/link");
+    fcHdfs.createSymlink(linkTarget, hdfsLink, true);
+
+    Path alphaHdfsPathViaLink = new Path(fcHdfs.getDefaultFileSystem().getUri()
+        .toString()
+        + "/tmp/link/alpha");
+
+    Set<AbstractFileSystem> afsList = fcHdfs
+        .resolveAbstractFileSystems(alphaHdfsPathViaLink);
+    Assert.assertEquals(2, afsList.size());
+    for (AbstractFileSystem afs : afsList) {
+      if ((!afs.equals(fcHdfs.getDefaultFileSystem()))
+          && (!afs.equals(fcLocal.getDefaultFileSystem()))) {
+        Assert.fail("Failed to resolve AFS correctly");
+      }
+    }
+  }
+  
+  /**
+   * Tests delegation token APIs in FileContext for Hdfs; and renew and cancel
+   * APIs in Hdfs.
+   * 
+   * @throws UnsupportedFileSystemException
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testFcDelegationToken() throws UnsupportedFileSystemException,
+      IOException, InterruptedException {
+    FileContext fcHdfs = FileContext.getFileContext(cluster.getFileSystem()
+        .getUri());
+    final AbstractFileSystem afs = fcHdfs.getDefaultFileSystem();
+    final List<Token<?>> tokenList =
+        afs.getDelegationTokens(UserGroupInformation.getCurrentUser()
+            .getUserName());
+    ((Hdfs) afs).renewDelegationToken((Token<DelegationTokenIdentifier>) tokenList
+        .get(0));
+    ((Hdfs) afs).cancelDelegationToken(
+        (Token<? extends AbstractDelegationTokenIdentifier>) tokenList.get(0));
+  }
+}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java b/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
index 84cdbf0..422c46b 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -403,7 +403,8 @@
   public static DataTransferProtocol.Status transferRbw(final ExtendedBlock b, 
       final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException {
     Assert.assertEquals(2, datanodes.length);
-    final Socket s = DFSOutputStream.createSocketForPipeline(datanodes, dfsClient);
+    final Socket s = DFSOutputStream.createSocketForPipeline(datanodes[0],
+        datanodes.length, dfsClient);
     final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length);
     final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
         NetUtils.getOutputStream(s, writeTimeout),
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
index 53fc15b..115416a 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
@@ -47,6 +47,7 @@
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
@@ -1286,4 +1287,18 @@
     System.out.println("results:\n" + results);
     return results;
   }
+  
+  /**
+   * default setting is file:// which is not a DFS
+   * so DFSAdmin should throw and catch InvalidArgumentException
+   * and return -1 exit code.
+   * @throws Exception
+   */
+  public void testInvalidShell() throws Exception {
+    Configuration conf = new Configuration(); // default FS (non-DFS)
+    DFSAdmin admin = new DFSAdmin();
+    admin.setConf(conf);
+    int res = admin.run(new String[] {"-refreshNodes"});
+    assertEquals("expected to fail -1", res , -1);
+  }
 }
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
index ef2e924..88cf7bd 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
@@ -44,6 +44,10 @@
 public class TestDistributedFileSystem {
   private static final Random RAN = new Random();
 
+  {
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+  }
+
   private boolean dualPortTesting = false;
   
   private HdfsConfiguration getTestConfiguration() {
@@ -100,26 +104,94 @@
   @Test
   public void testDFSClient() throws Exception {
     Configuration conf = getTestConfiguration();
+    final long grace = 1000L;
     MiniDFSCluster cluster = null;
 
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
-      final Path filepath = new Path("/test/LeaseChecker/foo");
+      final String filepathstring = "/test/LeaseChecker/foo";
+      final Path[] filepaths = new Path[4];
+      for(int i = 0; i < filepaths.length; i++) {
+        filepaths[i] = new Path(filepathstring + i);
+      }
       final long millis = System.currentTimeMillis();
 
       {
         DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        dfs.dfs.leasechecker.setGraceSleepPeriod(grace);
+        assertFalse(dfs.dfs.leasechecker.isRunning());
   
-        //create a file
-        FSDataOutputStream out = dfs.create(filepath);
-        assertTrue(dfs.dfs.isLeaseCheckerStarted());
-  
-        //write something and close
-        out.writeLong(millis);
-        assertTrue(dfs.dfs.isLeaseCheckerStarted());
-        out.close();
-        assertTrue(dfs.dfs.isLeaseCheckerStarted());
+        {
+          //create a file
+          final FSDataOutputStream out = dfs.create(filepaths[0]);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          //write something
+          out.writeLong(millis);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          //close
+          out.close();
+          Thread.sleep(grace/4*3);
+          //within grace period
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          for(int i = 0; i < 3; i++) {
+            if (dfs.dfs.leasechecker.isRunning()) {
+              Thread.sleep(grace/2);
+            }
+          }
+          //passed grace period
+          assertFalse(dfs.dfs.leasechecker.isRunning());
+        }
+
+        {
+          //create file1
+          final FSDataOutputStream out1 = dfs.create(filepaths[1]);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          //create file2
+          final FSDataOutputStream out2 = dfs.create(filepaths[2]);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+
+          //write something to file1
+          out1.writeLong(millis);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          //close file1
+          out1.close();
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+
+          //write something to file2
+          out2.writeLong(millis);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          //close file2
+          out2.close();
+          Thread.sleep(grace/4*3);
+          //within grace period
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+        }
+
+        {
+          //create file3
+          final FSDataOutputStream out3 = dfs.create(filepaths[3]);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          Thread.sleep(grace/4*3);
+          //passed previous grace period, should still running
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          //write something to file3
+          out3.writeLong(millis);
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          //close file3
+          out3.close();
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          Thread.sleep(grace/4*3);
+          //within grace period
+          assertTrue(dfs.dfs.leasechecker.isRunning());
+          for(int i = 0; i < 3; i++) {
+            if (dfs.dfs.leasechecker.isRunning()) {
+              Thread.sleep(grace/2);
+            }
+          }
+          //passed grace period
+          assertFalse(dfs.dfs.leasechecker.isRunning());
+        }
+
         dfs.close();
       }
 
@@ -146,15 +218,15 @@
 
       {
         DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        assertFalse(dfs.dfs.leasechecker.isRunning());
 
         //open and check the file
-        FSDataInputStream in = dfs.open(filepath);
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        FSDataInputStream in = dfs.open(filepaths[0]);
+        assertFalse(dfs.dfs.leasechecker.isRunning());
         assertEquals(millis, in.readLong());
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        assertFalse(dfs.dfs.leasechecker.isRunning());
         in.close();
-        assertFalse(dfs.dfs.isLeaseCheckerStarted());
+        assertFalse(dfs.dfs.leasechecker.isRunning());
         dfs.close();
       }
       
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend2.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend2.java
index 35a561a..9733695 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend2.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend2.java
@@ -59,7 +59,7 @@
 
   private byte[] fileContents = null;
 
-  int numDatanodes = 5;
+  int numDatanodes = 6;
   int numberOfFiles = 50;
   int numThreads = 10;
   int numAppendsPerThread = 20;
@@ -350,7 +350,7 @@
       // Insert them into a linked list.
       //
       for (int i = 0; i < numberOfFiles; i++) {
-        short replication = (short)(AppendTestUtil.nextInt(numDatanodes) + 1);
+        final int replication = AppendTestUtil.nextInt(numDatanodes - 2) + 1;
         Path testFile = new Path("/" + i + ".dat");
         FSDataOutputStream stm =
             AppendTestUtil.createFile(fs, testFile, replication);
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java
index 6806359..0748e9d 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java
@@ -149,7 +149,7 @@
    */
   @Test(timeout=60000)
   public void testRecoverFinalizedBlock() throws Throwable {
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).build();
  
     try {
       cluster.waitActive();
@@ -220,7 +220,7 @@
    */
   @Test(timeout=60000)
   public void testCompleteOtherLeaseHoldersFile() throws Throwable {
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).build();
  
     try {
       cluster.waitActive();
@@ -296,8 +296,7 @@
    * Mockito answer helper that triggers one latch as soon as the
    * method is called, then waits on another before continuing.
    */
-  @SuppressWarnings("unchecked")
-  private static class DelayAnswer implements Answer {
+  private static class DelayAnswer implements Answer<Object> {
     private final CountDownLatch fireLatch = new CountDownLatch(1);
     private final CountDownLatch waitLatch = new CountDownLatch(1);
  
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
index 3322756..fa69017 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
@@ -630,7 +630,8 @@
           expectedException != null
               && expectedException instanceof FileNotFoundException);
 
-      EnumSet<CreateFlag> overwriteFlag = EnumSet.of(CreateFlag.OVERWRITE);
+      EnumSet<CreateFlag> overwriteFlag = 
+        EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
       // Overwrite a file in root dir, should succeed
       out = createNonRecursive(fs, path, 1, overwriteFlag);
       out.close();
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
index 8e0c2a3..ae70bd7 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
@@ -26,6 +26,7 @@
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
@@ -202,19 +203,12 @@
         try {
           dfs2.create(filepath, false, BUF_SIZE, REPLICATION_NUM, BLOCK_SIZE);
           fail("Creation of an existing file should never succeed.");
+        } catch (FileAlreadyExistsException ex) {
+          done = true;
+        } catch (AlreadyBeingCreatedException ex) {
+          AppendTestUtil.LOG.info("GOOD! got " + ex.getMessage());
         } catch (IOException ioe) {
-          final String message = ioe.getMessage();
-          if (message.contains("file exists")) {
-            AppendTestUtil.LOG.info("done", ioe);
-            done = true;
-          }
-          else if (message.contains(
-              AlreadyBeingCreatedException.class.getSimpleName())) {
-            AppendTestUtil.LOG.info("GOOD! got " + message);
-          }
-          else {
-            AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
-          }
+          AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
         }
 
         if (!done) {
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
index 4347180..6145313 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
@@ -19,7 +19,6 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.security.PrivilegedExceptionAction;
 
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
@@ -71,7 +70,7 @@
       final int half = BLOCK_SIZE/2;
 
       //a. On Machine M1, Create file. Write half block of data.
-      //   Invoke (DFSOutputStream).fsync() on the dfs file handle.
+      //   Invoke DFSOutputStream.hflush() on the dfs file handle.
       //   Do not close file yet.
       {
         final FSDataOutputStream out = fs.create(p, true,
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index efcc9b8..3cec060 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -497,6 +497,16 @@
     return null;
   }
 
+  @Override 
+  public synchronized String getReplicaString(String bpid, long blockId) {
+    Replica r = null;
+    final Map<Block, BInfo> map = blockMap.get(bpid);
+    if (map != null) {
+      r = map.get(new Block(blockId));
+    }
+    return r == null? "null": r.toString();
+  }
+
   @Override // FSDatasetInterface
   public Block getStoredBlock(String bpid, long blkid) throws IOException {
     final Map<Block, BInfo> map = blockMap.get(bpid);
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
index d458268..9f9daec 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
@@ -511,11 +511,11 @@
     long start = System.currentTimeMillis();
     int count = 0;
     while (r == null) {
-      waitTil(50);
+      waitTil(5);
       r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()).
         fetchReplicaInfo(bpid, bl.getBlockId());
       long waiting_period = System.currentTimeMillis() - start;
-      if (count++ % 10 == 0)
+      if (count++ % 100 == 0)
         if(LOG.isDebugEnabled()) {
           LOG.debug("Has been waiting for " + waiting_period + " ms.");
         }
@@ -530,7 +530,7 @@
     }
     start = System.currentTimeMillis();
     while (state != HdfsConstants.ReplicaState.TEMPORARY) {
-      waitTil(100);
+      waitTil(5);
       state = r.getState();
       if(LOG.isDebugEnabled()) {
         LOG.debug("Keep waiting for " + bl.getBlockName() +
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index fb7909a..b519752 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -555,7 +555,7 @@
       // dummyActionNoSynch(fileIdx);
       nameNode.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
                       clientName, new EnumSetWritable<CreateFlag>(EnumSet
-              .of(CreateFlag.OVERWRITE)), true, replication, BLOCK_SIZE);
+              .of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication, BLOCK_SIZE);
       long end = System.currentTimeMillis();
       for(boolean written = !closeUponCreate; !written; 
         written = nameNode.complete(fileNames[daemonId][inputIdx],
@@ -971,7 +971,7 @@
       for(int idx=0; idx < nrFiles; idx++) {
         String fileName = nameGenerator.getNextFileName("ThroughputBench");
         nameNode.create(fileName, FsPermission.getDefault(), clientName,
-            new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.OVERWRITE)), true, replication,
+            new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)), true, replication,
             BLOCK_SIZE);
         ExtendedBlock lastBlock = addBlocks(fileName, clientName);
         nameNode.complete(fileName, clientName, lastBlock);
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
index 85d37ca..ec27e21 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
@@ -77,7 +77,7 @@
       String newRacks[] = {"/rack2"} ;
       cluster.startDataNodes(conf, 1, true, null, newRacks);
 
-      while ( (numRacks < 2) || (curReplicas < REPLICATION_FACTOR) ||
+      while ( (numRacks < 2) || (curReplicas != REPLICATION_FACTOR) ||
               (neededReplicationSize > 0) ) {
         LOG.info("Waiting for replication");
         Thread.sleep(600);
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
index 391112b..744573a 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
@@ -42,6 +42,7 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 
@@ -738,11 +739,13 @@
   public void testSaveNamespace() throws IOException {
     MiniDFSCluster cluster = null;
     DistributedFileSystem fs = null;
+    FileContext fc;
     try {
       Configuration conf = new HdfsConfiguration();
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes).format(false).build();
       cluster.waitActive();
       fs = (DistributedFileSystem)(cluster.getFileSystem());
+      fc = FileContext.getFileContext(cluster.getURI(0));
 
       // Saving image without safe mode should fail
       DFSAdmin admin = new DFSAdmin(conf);
@@ -758,6 +761,12 @@
       Path file = new Path("namespace.dat");
       writeFile(fs, file, replication);
       checkFile(fs, file, replication);
+
+      // create new link
+      Path symlink = new Path("file.link");
+      fc.createSymlink(file, symlink, false);
+      assertTrue(fc.getFileLinkStatus(symlink).isSymlink());
+
       // verify that the edits file is NOT empty
       Collection<URI> editsDirs = cluster.getNameEditsDirs(0);
       for(URI uri : editsDirs) {
@@ -786,6 +795,8 @@
       cluster.waitActive();
       fs = (DistributedFileSystem)(cluster.getFileSystem());
       checkFile(fs, file, replication);
+      fc = FileContext.getFileContext(cluster.getURI(0));
+      assertTrue(fc.getFileLinkStatus(symlink).isSymlink());
     } finally {
       if(fs != null) fs.close();
       if(cluster!= null) cluster.shutdown();
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
index 5020ff7..56aeec3 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
@@ -20,12 +20,15 @@
 import junit.framework.TestCase;
 import java.io.*;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.*;
 
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -303,4 +306,37 @@
       if(cluster != null) cluster.shutdown();
     }
   }
+  
+  public void testEditChecksum() throws Exception {
+    // start a cluster 
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = null;
+    FileSystem fileSys = null;
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
+    cluster.waitActive();
+    fileSys = cluster.getFileSystem();
+    final FSNamesystem namesystem = cluster.getNamesystem();
+
+    FSImage fsimage = namesystem.getFSImage();
+    final FSEditLog editLog = fsimage.getEditLog();
+    fileSys.mkdirs(new Path("/tmp"));
+    File editFile = editLog.getFsEditName();
+    editLog.close();
+    cluster.shutdown();
+      long fileLen = editFile.length();
+    System.out.println("File name: " + editFile + " len: " + fileLen);
+    RandomAccessFile rwf = new RandomAccessFile(editFile, "rw");
+    rwf.seek(fileLen-4); // seek to checksum bytes
+    int b = rwf.readInt();
+    rwf.seek(fileLen-4);
+    rwf.writeInt(b+1);
+    rwf.close();
+    
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).format(false).build();
+      fail("should not be able to start");
+    } catch (ChecksumException e) {
+      // expected
+    }
+  }
 }
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
index 901cbb0..b4d0e51 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
@@ -33,9 +33,8 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.cli.CmdFactoryDFS;
-import org.apache.hadoop.cli.util.CLITestData;
-import org.apache.hadoop.cli.util.CommandExecutor;
+import org.apache.hadoop.cli.CLITestCmdDFS;
+import org.apache.hadoop.cli.util.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -350,10 +349,9 @@
 
       String cmd = "-fs NAMENODE -restoreFailedStorage false";
       String namenode = config.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");
-      CommandExecutor executor = 
-        CmdFactoryDFS.getCommandExecutor(
-          new CLITestData.TestCmd(cmd, CLITestData.TestCmd.CommandType.DFSADMIN),
-          namenode);
+      CommandExecutor executor =
+          new CLITestCmdDFS(cmd, new CLICommandDFSAdmin()).getExecutor(namenode);
+
       executor.executeCommand(cmd);
       restore = fsi.getStorage().getRestoreFailedStorage();
       assertFalse("After set true call restore is " + restore, restore);
diff --git a/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java b/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
index 82485ae..c8e9a18 100644
--- a/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
+++ b/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
@@ -20,6 +20,7 @@
 
 import static org.junit.Assert.*;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 
@@ -120,4 +121,33 @@
                                   0L, 0L, preferredBlockSize);
   }
 
+  @Test
+  public void testGetFullPathName() {
+    PermissionStatus perms = new PermissionStatus(
+      userName, null, FsPermission.getDefault());
+
+    replication = 3;
+    preferredBlockSize = 128*1024*1024;
+    INodeFile inf = new INodeFile(perms, null, replication,
+                                  0L, 0L, preferredBlockSize);
+    inf.setLocalName("f");
+
+    INodeDirectory root = new INodeDirectory(INodeDirectory.ROOT_NAME, perms);
+    INodeDirectory dir = new INodeDirectory("d", perms);
+
+    assertEquals("f", inf.getFullPathName());
+    assertEquals("", inf.getLocalParentDir());
+
+    dir.addChild(inf, false, false);
+    assertEquals("d"+Path.SEPARATOR+"f", inf.getFullPathName());
+    assertEquals("d", inf.getLocalParentDir());
+    
+    root.addChild(dir, false, false);
+    assertEquals(Path.SEPARATOR+"d"+Path.SEPARATOR+"f", inf.getFullPathName());
+    assertEquals(Path.SEPARATOR+"d", dir.getFullPathName());
+
+    assertEquals(Path.SEPARATOR, root.getFullPathName());
+    assertEquals(Path.SEPARATOR, root.getLocalParentDir());
+    
+  }
 }