Merge branch 'trunk' into HDFS-6581
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java
index 252f37b..c5d23b4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java
@@ -79,7 +79,21 @@
   /**
    * Force closed blocks to disk. Similar to POSIX O_SYNC. See javadoc for description.
    */
-  SYNC_BLOCK((short) 0x08);
+  SYNC_BLOCK((short) 0x08),
+
+  /**
+   * Create the block on transient storage (RAM) if available. If
+   * transient storage is unavailable then the block will be created
+   * on disk.
+   *
+   * HDFS will make a best effort to lazily write these files to persistent
+   * storage, however file contents may be lost at any time due to process/
+   * node restarts, hence there is no guarantee of data durability.
+   *
+   * This flag must only be used for intermediate data whose loss can be
+   * tolerated by the application.
+   */
+  LAZY_PERSIST((short) 0x10);
 
   private final short mode;
 
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
index 52706f4..e729e67 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
@@ -179,8 +179,19 @@
     return fs.create(f, permission,
         overwrite, bufferSize, replication, blockSize, progress);
   }
-  
 
+  @Override
+  public FSDataOutputStream create(Path f,
+        FsPermission permission,
+        EnumSet<CreateFlag> flags,
+        int bufferSize,
+        short replication,
+        long blockSize,
+        Progressable progress,
+        ChecksumOpt checksumOpt) throws IOException {
+    return fs.create(f, permission,
+      flags, bufferSize, replication, blockSize, progress);
+  }
   
   @Override
   @Deprecated
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
index da67f1c..65e52fd 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
@@ -30,6 +30,7 @@
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FilterFileSystem;
@@ -45,6 +46,9 @@
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
+
 /**
  * Provides: argument processing to ensure the destination is valid
  * for the number of source arguments.  A processPaths that accepts both
@@ -56,6 +60,7 @@
   private boolean overwrite = false;
   private boolean verifyChecksum = true;
   private boolean writeChecksum = true;
+  private boolean lazyPersist = false;
   
   /**
    * The name of the raw xattr namespace. It would be nice to use
@@ -78,6 +83,10 @@
     overwrite = flag;
   }
   
+  protected void setLazyPersist(boolean flag) {
+    lazyPersist = flag;
+  }
+
   protected void setVerifyChecksum(boolean flag) {
     verifyChecksum = flag;
   }
@@ -379,7 +388,7 @@
     try {
       PathData tempTarget = target.suffix("._COPYING_");
       targetFs.setWriteChecksum(writeChecksum);
-      targetFs.writeStreamToFile(in, tempTarget);
+      targetFs.writeStreamToFile(in, tempTarget, lazyPersist);
       targetFs.rename(tempTarget, target);
     } finally {
       targetFs.close(); // last ditch effort to ensure temp file is removed
@@ -449,10 +458,11 @@
       super(fs);
     }
 
-    void writeStreamToFile(InputStream in, PathData target) throws IOException {
+    void writeStreamToFile(InputStream in, PathData target,
+                           boolean lazyPersist) throws IOException {
       FSDataOutputStream out = null;
       try {
-        out = create(target);
+        out = create(target, lazyPersist);
         IOUtils.copyBytes(in, out, getConf(), true);
       } finally {
         IOUtils.closeStream(out); // just in case copyBytes didn't
@@ -460,9 +470,23 @@
     }
     
     // tag created files as temp files
-    FSDataOutputStream create(PathData item) throws IOException {
+    FSDataOutputStream create(PathData item, boolean lazyPersist)
+        throws IOException {
       try {
-        return create(item.path, true);
+        if (lazyPersist) {
+          EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE, LAZY_PERSIST);
+          return create(item.path,
+                        FsPermission.getFileDefault().applyUMask(
+                            FsPermission.getUMask(getConf())),
+                        createFlags,
+                        getConf().getInt("io.file.buffer.size", 4096),
+                        lazyPersist ? 1 : getDefaultReplication(item.path),
+                        getDefaultBlockSize(),
+                        null,
+                        null);
+        } else {
+          return create(item.path, true);
+        }
       } finally { // might have been created but stream was interrupted
         deleteOnExit(item.path);
       }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
index 3fd870c..afd1115 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
@@ -215,21 +215,25 @@
    */
   public static class Put extends CommandWithDestination {
     public static final String NAME = "put";
-    public static final String USAGE = "[-f] [-p] <localsrc> ... <dst>";
+    public static final String USAGE = "[-f] [-p] [-l] <localsrc> ... <dst>";
     public static final String DESCRIPTION =
       "Copy files from the local file system " +
       "into fs. Copying fails if the file already " +
-      "exists, unless the -f flag is given. Passing " +
-      "-p preserves access and modification times, " +
-      "ownership and the mode. Passing -f overwrites " +
-      "the destination if it already exists.\n";
+      "exists, unless the -f flag is given.\n" +
+      "Flags:\n" +
+      "  -p : Preserves access and modification times, ownership and the mode.\n" +
+      "  -f : Overwrites the destination if it already exists.\n" +
+      "  -l : Allow DataNode to lazily persist the file to disk. Forces\n" +
+      "       replication factor of 1. This flag will result in reduced\n" +
+      "       durability. Use with care.\n";
 
     @Override
     protected void processOptions(LinkedList<String> args) throws IOException {
-      CommandFormat cf = new CommandFormat(1, Integer.MAX_VALUE, "f", "p");
+      CommandFormat cf = new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l");
       cf.parse(args);
       setOverwrite(cf.getOpt("f"));
       setPreserve(cf.getOpt("p"));
+      setLazyPersist(cf.getOpt("l"));
       getRemoteDestination(args);
       // should have a -r option
       setRecursive(true);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Stat.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Stat.java
index 652c928..ee56fe6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Stat.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Stat.java
@@ -91,7 +91,7 @@
             break;
           case 'F':
             buf.append(stat.isDirectory()
-                ? "directory" 
+                ? "directory"
                 : (stat.isFile() ? "regular file" : "symlink"));
             break;
           case 'g':
diff --git a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml
index 804b504..c6e5fc5 100644
--- a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml
+++ b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml
@@ -436,7 +436,7 @@
       <comparators>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^-put \[-f\] \[-p\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; :\s*</expected-output>
+          <expected-output>^-put \[-f\] \[-p\] \[-l\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; :( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
@@ -444,15 +444,31 @@
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^\s*exists, unless the -f flag is given.( )*Passing -p preserves access and( )*</expected-output>
+          <expected-output>^\s*exists, unless the -f flag is given.( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^\s*modification times, ownership and the mode. Passing -f overwrites the( )*</expected-output>
+          <expected-output>^\s*Flags:( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^( |\t)*destination if it already exists.( )*</expected-output>
+          <expected-output>^\s*-p  Preserves access and modification times, ownership and the mode.( )*</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^\s*-f  Overwrites the destination if it already exists.( )*</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^\s*-l  Allow DataNode to lazily persist the file to disk. Forces( )*</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^\s*replication factor of 1. This flag will result in reduced( )*</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^\s*durability. Use with care.( )*</expected-output>
         </comparator>
       </comparators>
     </test>
@@ -467,7 +483,7 @@
       <comparators>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^-copyFromLocal \[-f\] \[-p\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; :\s*</expected-output>
+          <expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; :\s*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
new file mode 100644
index 0000000..b41e133
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
@@ -0,0 +1,96 @@
+  BREAKDOWN OF HDFS-6581 SUBTASKS AND RELATED JIRAS
+
+    HDFS-6921. Add LazyPersist flag to FileStatus. (Arpit Agarwal)
+
+    HDFS-6924. Add new RAM_DISK storage type. (Arpit Agarwal)
+
+    HDFS-6922. Add LazyPersist flag to INodeFile, save it in FsImage and
+    edit logs. (Arpit Agarwal)
+  
+    HDFS-6923. Propagate LazyPersist flag to DNs via DataTransferProtocol.
+    (Arpit Agarwal)
+
+    HDFS-6925. DataNode should attempt to place replicas on transient storage
+    first if lazyPersist flag is received. (Arpit Agarwal)
+
+    HDFS-6926. DN support for saving replicas to persistent storage and
+    evicting in-memory replicas. (Arpit Agarwal)
+
+    HDFS-6927. Initial unit tests for lazy persist files. (Arpit Agarwal)
+
+    HDFS-6929. NN periodically unlinks lazy persist files with missing
+    replicas from namespace. (Arpit Agarwal)
+
+    HDFS-6928. 'hdfs put' command should accept lazyPersist flag for testing.
+    (Arpit Agarwal)
+
+    HDFS-6960. Bugfix in LazyWriter, fix test case and some refactoring.
+    (Arpit Agarwal)
+
+    HDFS-6931. Move lazily persisted replicas to finalized directory on DN
+    startup. (Arpit Agarwal)
+
+    HDFS-6950. Add Additional unit tests for HDFS-6581. (Xiaoyu Yao via
+    Arpit Agarwal)
+
+    HDFS-6930. Improve replica eviction from RAM disk. (Arpit Agarwal)
+
+    HDFS-6977. Delete all copies when a block is deleted from the block space.
+    (Arpit Agarwal)
+
+    HDFS-6991. Notify NN of evicted block before deleting it from RAM disk.
+    (Arpit Agarwal)
+
+    HDFS-6978. Directory scanner should correctly reconcile blocks on RAM
+    disk. (Arpit Agarwal)
+
+    HDFS-7066. LazyWriter#evictBlocks misses a null check for replicaState.
+    (Xiaoyu Yao via Arpit Agarwal)
+
+    HDFS-7064. Fix unit test failures in HDFS-6581 branch. (Xiaoyu Yao via
+    Arpit Agarwal)
+
+    HDFS-6581. Few more unit test fixes for HDFS-6581. (Arpit Agarwal)
+
+    HDFS-7080. Fix finalize and upgrade unit test failures. (Arpit Agarwal)
+
+    HDFS-7084. FsDatasetImpl#copyBlockFiles debug log can be improved.
+    (Xiaoyu Yao via Arpit Agarwal)
+
+    HDFS-7091. Add forwarding constructor for INodeFile for existing callers.
+    (Arpit Agarwal)
+
+    HDFS-7100. Make eviction scheme pluggable. (Arpit Agarwal)
+
+    HDFS-7108. Fix unit test failures in SimulatedFsDataset. (Arpit Agarwal)
+
+    HDFS-7071. Updated editsStored and editsStored.xml to bump layout
+    version and add LazyPersist flag. (Xiaoyu Yao and Arpit Agarwal via
+    Arpit Agarwal)
+    
+    HDFS-6990. Add unit test for evict/delete RAM_DISK block with open
+    handle. (Xiaoyu Yao via Arpit Agarwal)
+
+    HDFS-7143. Fix findbugs warnings in HDFS-6581 branch. (szetszwo via
+    Arpit Agarwal)
+
+    HDFS-6932. Balancer and Mover tools should ignore replicas on RAM_DISK.
+    (Xiaoyu Yao via Arpit Agarwal)
+
+    HDFS-7144. Fix findbugs warnings in RamDiskReplicaTracker. (szetszwo via
+    Arpit Agarwal)
+
+    HDFS-7155. Bugfix in createLocatedFileStatus caused by bad merge.
+    (Arpit Agarwal)
+
+    HDFS-7153. Add storagePolicy to NN edit log during file creation.
+    (Arpit Agarwal)
+
+    HDFS-7159. Use block storage policy to set lazy persist preference.
+    (Arpit Agarwal)
+
+    HDFS-7129. Metrics to track usage of memory for writes. (Xiaoyu Yao
+    via Arpit Agarwal)
+
+    HDFS-7171. Fix Jenkins failures in HDFS-6581 branch. (Arpit Agarwal)
+
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index b016750..4391578 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -21,6 +21,7 @@
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
 import org.apache.hadoop.hdfs.web.AuthFilter;
 import org.apache.hadoop.http.HttpConfig;
 
@@ -127,6 +128,19 @@
   public static final long    DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT = 0;
   public static final String  DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY = "dfs.datanode.fsdatasetcache.max.threads.per.volume";
   public static final int     DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4;
+  public static final String  DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec";
+  public static final int     DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60;
+  public static final String  DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY = "dfs.datanode.ram.disk.replica.tracker";
+  public static final Class<RamDiskReplicaLruTracker>  DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT = RamDiskReplicaLruTracker.class;
+  public static final String  DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT = "dfs.datanode.ram.disk.low.watermark.percent";
+  public static final int     DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT = 10;
+  public static final String  DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS = "dfs.datanode.ram.disk.low.watermark.replicas";
+  public static final int     DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT = 3;
+
+  // This setting is for testing/internal use only.
+  public static final String  DFS_DATANODE_DUPLICATE_REPLICA_DELETION = "dfs.datanode.duplicate.replica.deletion";
+  public static final boolean DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT = true;
+
   public static final String  DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT =
     "dfs.namenode.path.based.cache.block.map.allocation.percent";
   public static final float    DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f;
@@ -228,6 +242,9 @@
   public static final float   DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT = 2.0f;
   public static final String  DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS = "dfs.namenode.edit.log.autoroll.check.interval.ms";
   public static final int     DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT = 5*60*1000;
+
+  public static final String  DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC = "dfs.namenode.lazypersist.file.scrub.interval.sec";
+  public static final int     DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT = 5 * 60;
   
   public static final String  DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH = "dfs.namenode.edits.noeditlogchannelflush";
   public static final boolean DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT = false;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 13544c8..781fb68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -54,6 +54,7 @@
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -75,6 +76,7 @@
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
@@ -172,6 +174,8 @@
   private final AtomicReference<CachingStrategy> cachingStrategy;
   private boolean failPacket = false;
   private FileEncryptionInfo fileEncryptionInfo;
+  private static final BlockStoragePolicySuite blockStoragePolicySuite =
+      BlockStoragePolicySuite.createDefaultSuite();
 
   private static class Packet {
     private static final long HEART_BEAT_SEQNO = -1L;
@@ -360,6 +364,7 @@
     private long restartDeadline = 0; // Deadline of DN restart
     private BlockConstructionStage stage;  // block construction stage
     private long bytesSent = 0; // number of bytes that've been sent
+    private final boolean isLazyPersistFile;
 
     /** Nodes have been used in the pipeline before and have failed. */
     private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
@@ -376,15 +381,16 @@
     /**
      * Default construction for file create
      */
-    private DataStreamer() {
-      this(null);
+    private DataStreamer(HdfsFileStatus stat) {
+      this(stat, null);
     }
 
     /**
      * construction with tracing info
      */
-    private DataStreamer(Span span) {
+    private DataStreamer(HdfsFileStatus stat, Span span) {
       isAppend = false;
+      isLazyPersistFile = initLazyPersist(stat);
       stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
       traceSpan = span;
     }
@@ -404,6 +410,7 @@
       block = lastBlock.getBlock();
       bytesSent = block.getNumBytes();
       accessToken = lastBlock.getBlockToken();
+      isLazyPersistFile = initLazyPersist(stat);
       long usedInLastBlock = stat.getLen() % blockSize;
       int freeInLastBlock = (int)(blockSize - usedInLastBlock);
 
@@ -447,6 +454,13 @@
       }
     }
     
+    private boolean initLazyPersist(HdfsFileStatus stat) {
+      final BlockStoragePolicy lpPolicy =
+          blockStoragePolicySuite.getPolicy("LAZY_PERSIST");
+      return lpPolicy != null &&
+             stat.getStoragePolicy() == lpPolicy.getId();
+    }
+
     private void setPipeline(LocatedBlock lb) {
       setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
     }
@@ -1396,7 +1410,7 @@
           new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
               dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, 
               nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
-              cachingStrategy.get());
+              cachingStrategy.get(), isLazyPersistFile);
   
           // receive ack for connect
           BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@@ -1649,7 +1663,7 @@
     if (Trace.isTracing()) {
       traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
     }
-    streamer = new DataStreamer(traceSpan);
+    streamer = new DataStreamer(stat, traceSpan);
     if (favoredNodes != null && favoredNodes.length != 0) {
       streamer.setFavoredNodes(favoredNodes);
     }
@@ -1726,7 +1740,7 @@
     } else {
       computePacketChunkSize(dfsClient.getConf().writePacketSize,
           checksum.getBytesPerChecksum());
-      streamer = new DataStreamer(traceSpan);
+      streamer = new DataStreamer(stat, traceSpan);
     }
     this.fileEncryptionInfo = stat.getFileEncryptionInfo();
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java
index 7ca9e00..88cc7d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs;
 
 import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -31,17 +32,40 @@
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
 public enum StorageType {
-  DISK,
-  SSD,
-  ARCHIVE;
+  DISK(false),
+  SSD(false),
+  ARCHIVE(false),
+  RAM_DISK(true);
+
+  private final boolean isTransient;
 
   public static final StorageType DEFAULT = DISK;
-  
+
   public static final StorageType[] EMPTY_ARRAY = {};
-  
+
   private static final StorageType[] VALUES = values();
-  
+
+  StorageType(boolean isTransient) { this.isTransient = isTransient; }
+
+  public boolean isTransient() {
+    return isTransient;
+  }
+
+  public boolean isMovable() {
+    return !isTransient;
+  }
+
   public static List<StorageType> asList() {
     return Arrays.asList(VALUES);
   }
+
+  public static List<StorageType> getMovableTypes() {
+    List<StorageType> movableTypes = new ArrayList<StorageType>();
+    for (StorageType t : VALUES) {
+      if ( t.isTransient == false ) {
+        movableTypes.add(t);
+      }
+    }
+    return movableTypes;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockStoragePolicy.java
index 8ca83a0..ef13e0c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockStoragePolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockStoragePolicy.java
@@ -49,15 +49,29 @@
   private final StorageType[] creationFallbacks;
   /** The fallback storage type for replication. */
   private final StorageType[] replicationFallbacks;
+  /**
+   * Whether the policy is inherited during file creation.
+   * If set then the policy cannot be changed after file creation.
+   */
+  private boolean copyOnCreateFile;
 
   @VisibleForTesting
   public BlockStoragePolicy(byte id, String name, StorageType[] storageTypes,
       StorageType[] creationFallbacks, StorageType[] replicationFallbacks) {
+    this(id, name, storageTypes, creationFallbacks, replicationFallbacks,
+         false);
+  }
+
+  @VisibleForTesting
+  public BlockStoragePolicy(byte id, String name, StorageType[] storageTypes,
+      StorageType[] creationFallbacks, StorageType[] replicationFallbacks,
+      boolean copyOnCreateFile) {
     this.id = id;
     this.name = name;
     this.storageTypes = storageTypes;
     this.creationFallbacks = creationFallbacks;
     this.replicationFallbacks = replicationFallbacks;
+    this.copyOnCreateFile = copyOnCreateFile;
   }
 
   /**
@@ -65,13 +79,22 @@
    */
   public List<StorageType> chooseStorageTypes(final short replication) {
     final List<StorageType> types = new LinkedList<StorageType>();
-    int i = 0;
-    for(; i < replication && i < storageTypes.length; i++) {
-      types.add(storageTypes[i]);
+    int i = 0, j = 0;
+
+    // Do not return transient storage types. We will not have accurate
+    // usage information for transient types.
+    for (;i < replication && j < storageTypes.length; ++j) {
+      if (!storageTypes[j].isTransient()) {
+        types.add(storageTypes[j]);
+        ++i;
+      }
     }
+
     final StorageType last = storageTypes[storageTypes.length - 1];
-    for(; i < replication; i++) {
-      types.add(last);
+    if (!last.isTransient()) {
+      for (; i < replication; i++) {
+        types.add(last);
+      }
     }
     return types;
   }
@@ -241,4 +264,8 @@
     }
     return null;
   }
+
+  public boolean isCopyOnCreateFile() {
+    return copyOnCreateFile;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
index d54d5be..f6b99e6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -106,8 +106,8 @@
       final long maxBytesRcvd,
       final long latestGenerationStamp,
       final DataChecksum requestedChecksum,
-      final CachingStrategy cachingStrategy) throws IOException;
-
+      final CachingStrategy cachingStrategy,
+      final boolean allowLazyPersist) throws IOException;
   /**
    * Transfer a block to another datanode.
    * The block stage must be
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index daae9b7..8192925 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -148,10 +148,11 @@
           fromProto(proto.getRequestedChecksum()),
           (proto.hasCachingStrategy() ?
               getCachingStrategy(proto.getCachingStrategy()) :
-            CachingStrategy.newDefaultStrategy()));
-     } finally {
-      if (traceScope != null) traceScope.close();
-     }
+            CachingStrategy.newDefaultStrategy()),
+          (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false));
+    } finally {
+     if (traceScope != null) traceScope.close();
+    }
   }
 
   /** Receive {@link Op#TRANSFER_BLOCK} */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index fb6cf2c..1ae9da5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -128,7 +128,8 @@
       final long maxBytesRcvd,
       final long latestGenerationStamp,
       DataChecksum requestedChecksum,
-      final CachingStrategy cachingStrategy) throws IOException {
+      final CachingStrategy cachingStrategy,
+      final boolean allowLazyPersist) throws IOException {
     ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
         blk, clientName, blockToken);
     
@@ -146,7 +147,8 @@
       .setMaxBytesRcvd(maxBytesRcvd)
       .setLatestGenerationStamp(latestGenerationStamp)
       .setRequestedChecksum(checksumProto)
-      .setCachingStrategy(getCachingStrategy(cachingStrategy));
+      .setCachingStrategy(getCachingStrategy(cachingStrategy))
+      .setAllowLazyPersist(allowLazyPersist);
     
     if (source != null) {
       proto.setSource(PBHelper.convertDatanodeInfo(source));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 0432d5d..6470a1c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -1362,6 +1362,9 @@
     if (flag.contains(CreateFlag.OVERWRITE)) {
       value |= CreateFlagProto.OVERWRITE.getNumber();
     }
+    if (flag.contains(CreateFlag.LAZY_PERSIST)) {
+      value |= CreateFlagProto.LAZY_PERSIST.getNumber();
+    }
     return value;
   }
   
@@ -1378,6 +1381,10 @@
         == CreateFlagProto.OVERWRITE_VALUE) {
       result.add(CreateFlag.OVERWRITE);
     }
+    if ((flag & CreateFlagProto.LAZY_PERSIST_VALUE)
+        == CreateFlagProto.LAZY_PERSIST_VALUE) {
+      result.add(CreateFlag.LAZY_PERSIST);
+    }
     return new EnumSetWritable<CreateFlag>(result);
   }
 
@@ -1784,6 +1791,8 @@
       return StorageTypeProto.SSD;
     case ARCHIVE:
       return StorageTypeProto.ARCHIVE;
+    case RAM_DISK:
+      return StorageTypeProto.RAM_DISK;
     default:
       throw new IllegalStateException(
           "BUG: StorageType not found, type=" + type);
@@ -1814,6 +1823,8 @@
         return StorageType.SSD;
       case ARCHIVE:
         return StorageType.ARCHIVE;
+      case RAM_DISK:
+        return StorageType.RAM_DISK;
       default:
         throw new IllegalStateException(
             "BUG: StorageTypeProto not found, type=" + type);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index 67994c8..2a19537 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -271,7 +271,7 @@
     long overLoadedBytes = 0L, underLoadedBytes = 0L;
     for(DatanodeStorageReport r : reports) {
       final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
-      for(StorageType t : StorageType.asList()) {
+      for(StorageType t : StorageType.getMovableTypes()) {
         final Double utilization = policy.getUtilization(r, t);
         if (utilization == null) { // datanode does not have such storage type 
           continue;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index ad170d7..640791f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -402,6 +402,10 @@
     return storagePolicySuite.getPolicy(policyName);
   }
 
+  public BlockStoragePolicy getStoragePolicy(final byte policyId) {
+    return storagePolicySuite.getPolicy(policyId);
+  }
+
   public BlockStoragePolicy[] getStoragePolicies() {
     return storagePolicySuite.getAllPolicies();
   }
@@ -2103,8 +2107,8 @@
     // Add replica if appropriate. If the replica was previously corrupt
     // but now okay, it might need to be updated.
     if (reportedState == ReplicaState.FINALIZED
-        && (!storedBlock.findDatanode(dn)
-        || corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
+        && (storedBlock.findStorageInfo(storageInfo) == -1 ||
+            corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
       toAdd.add(storedBlock);
     }
     return storedBlock;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java
index 1d162a0..caaabad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStoragePolicySuite.java
@@ -44,6 +44,12 @@
   public static BlockStoragePolicySuite createDefaultSuite() {
     final BlockStoragePolicy[] policies =
         new BlockStoragePolicy[1 << ID_BIT_LENGTH];
+    final byte lazyPersistId = 15;
+    policies[lazyPersistId] = new BlockStoragePolicy(lazyPersistId, "LAZY_PERSIST",
+        new StorageType[]{StorageType.RAM_DISK, StorageType.DISK},
+        new StorageType[]{StorageType.DISK},
+        new StorageType[]{StorageType.DISK},
+        true);    // Cannot be changed on regular files, but inherited.
     final byte hotId = 12;
     policies[hotId] = new BlockStoragePolicy(hotId, "HOT",
         new StorageType[]{StorageType.DISK}, StorageType.EMPTY_ARRAY,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 3121496..25e89ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -230,7 +230,6 @@
   void notifyNamenodeReceivedBlock(
       ExtendedBlock block, String delHint, String storageUuid) {
     checkBlock(block);
-    checkDelHint(delHint);
     ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
         block.getLocalBlock(),
         ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
@@ -249,11 +248,6 @@
         block.getBlockPoolId(), getBlockPoolId());
   }
   
-  private void checkDelHint(String delHint) {
-    Preconditions.checkArgument(delHint != null,
-        "delHint is null");
-  }
-
   void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
     checkBlock(block);
     ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
index bbb67fc..61f1e7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
@@ -191,7 +191,7 @@
         + hours + " hours for block pool " + bpid);
 
     // get the list of blocks and arrange them in random order
-    List<FinalizedReplica> arr = dataset.getFinalizedBlocks(blockPoolId);
+    List<FinalizedReplica> arr = dataset.getFinalizedBlocksOnPersistentStorage(blockPoolId);
     Collections.shuffle(arr);
     
     long scanTime = -1;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index bfb2233..4d1cc6c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -139,7 +139,8 @@
       final long newGs, final long minBytesRcvd, final long maxBytesRcvd, 
       final String clientname, final DatanodeInfo srcDataNode,
       final DataNode datanode, DataChecksum requestedChecksum,
-      CachingStrategy cachingStrategy) throws IOException {
+      CachingStrategy cachingStrategy,
+      final boolean allowLazyPersist) throws IOException {
     try{
       this.block = block;
       this.in = in;
@@ -180,7 +181,7 @@
       } else {
         switch (stage) {
         case PIPELINE_SETUP_CREATE:
-          replicaInfo = datanode.data.createRbw(storageType, block);
+          replicaInfo = datanode.data.createRbw(storageType, block, allowLazyPersist);
           datanode.notifyNamenodeReceivingBlock(
               block, replicaInfo.getStorageUuid());
           break;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 546162f..a9ea3fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -890,7 +890,7 @@
   }
   
   // calls specific to BP
-  protected void notifyNamenodeReceivedBlock(
+  public void notifyNamenodeReceivedBlock(
       ExtendedBlock block, String delHint, String storageUuid) {
     BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
     if(bpos != null) {
@@ -2003,7 +2003,8 @@
 
         new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
             clientname, targets, targetStorageTypes, srcNode,
-            stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
+            stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
+            false);
 
         // send data & checksum
         blockSender.sendBlock(out, unbufOut, null);
@@ -2080,7 +2081,8 @@
       LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
           + block.getBlockPoolId());
     }
-    if (blockScanner != null) {
+    FsVolumeSpi volume = getFSDataset().getVolume(block);
+    if (blockScanner != null && !volume.isTransientStorage()) {
       blockScanner.addBlock(block);
     }
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 965b655..99eedb1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -81,6 +81,7 @@
   final static String STORAGE_DIR_DETACHED = "detach";
   public final static String STORAGE_DIR_RBW = "rbw";
   public final static String STORAGE_DIR_FINALIZED = "finalized";
+  public final static String STORAGE_DIR_LAZY_PERSIST = "lazypersist";
   public final static String STORAGE_DIR_TMP = "tmp";
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 4575c93..67eb941 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -544,7 +544,8 @@
       final long maxBytesRcvd,
       final long latestGenerationStamp,
       DataChecksum requestedChecksum,
-      CachingStrategy cachingStrategy) throws IOException {
+      CachingStrategy cachingStrategy,
+      final boolean allowLazyPersist) throws IOException {
     previousOpClientName = clientname;
     updateCurrentThreadName("Receiving block " + block);
     final boolean isDatanode = clientname.length() == 0;
@@ -606,8 +607,8 @@
             peer.getLocalAddressString(),
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             clientname, srcDataNode, datanode, requestedChecksum,
-            cachingStrategy);
-        
+            cachingStrategy, allowLazyPersist);
+
         storageUuid = blockReceiver.getStorageUuid();
       } else {
         storageUuid = datanode.data.recoverClose(
@@ -648,10 +649,11 @@
               HdfsConstants.SMALL_BUFFER_SIZE));
           mirrorIn = new DataInputStream(unbufMirrorIn);
 
+          // Do not propagate allowLazyPersist to downstream DataNodes.
           new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
               blockToken, clientname, targets, targetStorageTypes, srcDataNode,
               stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
-              latestGenerationStamp, requestedChecksum, cachingStrategy);
+              latestGenerationStamp, requestedChecksum, cachingStrategy, false);
 
           mirrorOut.flush();
 
@@ -1046,7 +1048,7 @@
           proxyReply, proxySock.getRemoteSocketAddress().toString(),
           proxySock.getLocalSocketAddress().toString(),
           null, 0, 0, 0, "", null, datanode, remoteChecksum,
-          CachingStrategy.newDropBehind());
+          CachingStrategy.newDropBehind(), false);
 
       // receive a block
       blockReceiver.receiveBlock(null, null, replyOut, null, 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index a47f2ef..71f976b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -83,6 +83,7 @@
     long missingBlockFile = 0;
     long missingMemoryBlocks = 0;
     long mismatchBlocks = 0;
+    long duplicateBlocks = 0;
     
     public Stats(String bpid) {
       this.bpid = bpid;
@@ -399,7 +400,7 @@
   /**
    * Reconcile differences between disk and in-memory blocks
    */
-  void reconcile() {
+  void reconcile() throws IOException {
     scan();
     for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
       String bpid = entry.getKey();
@@ -440,7 +441,7 @@
         int d = 0; // index for blockpoolReport
         int m = 0; // index for memReprot
         while (m < memReport.length && d < blockpoolReport.length) {
-          Block memBlock = memReport[Math.min(m, memReport.length - 1)];
+          FinalizedReplica memBlock = memReport[Math.min(m, memReport.length - 1)];
           ScanInfo info = blockpoolReport[Math.min(
               d, blockpoolReport.length - 1)];
           if (info.getBlockId() < memBlock.getBlockId()) {
@@ -468,9 +469,23 @@
             // or block file length is different than expected
             statsRecord.mismatchBlocks++;
             addDifference(diffRecord, statsRecord, info);
+          } else if (info.getBlockFile().compareTo(memBlock.getBlockFile()) != 0) {
+            // volumeMap record and on-disk files don't match.
+            statsRecord.duplicateBlocks++;
+            addDifference(diffRecord, statsRecord, info);
           }
           d++;
-          m++;
+
+          if (d < blockpoolReport.length) {
+            // There may be multiple on-disk records for the same block, don't increment
+            // the memory record pointer if so.
+            ScanInfo nextInfo = blockpoolReport[Math.min(d, blockpoolReport.length - 1)];
+            if (nextInfo.getBlockId() != info.blockId) {
+              ++m;
+            }
+          } else {
+            ++m;
+          }
         }
         while (m < memReport.length) {
           FinalizedReplica current = memReport[m++];
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java
index a480bb1..b6e5ba9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java
@@ -59,4 +59,9 @@
    * Return the storageUuid of the volume that stores this replica.
    */
   public String getStorageUuid();
+
+  /**
+   * Return true if the target volume is backed by RAM.
+   */
+  public boolean isOnTransientStorage();
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index 08395aa..45862ca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -250,7 +250,7 @@
         }
       }
     } else {
-      // for create, we can use the requested checksum
+			// for create, we can use the requested checksum
       checksum = requestedChecksum;
     }
     
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
index 49ac605..940d3eb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
@@ -62,17 +62,6 @@
   private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
 
   /**
-   * Constructor for a zero length replica
-   * @param blockId block id
-   * @param genStamp replica generation stamp
-   * @param vol volume where replica is located
-   * @param dir directory path where block and meta files are located
-   */
-  ReplicaInfo(long blockId, long genStamp, FsVolumeSpi vol, File dir) {
-    this( blockId, 0L, genStamp, vol, dir);
-  }
-  
-  /**
    * Constructor
    * @param block a block
    * @param vol volume where replica is located
@@ -296,20 +285,6 @@
     return true;
   }
 
-  /**
-   * Set this replica's generation stamp to be a newer one
-   * @param newGS new generation stamp
-   * @throws IOException is the new generation stamp is not greater than the current one
-   */
-  void setNewerGenerationStamp(long newGS) throws IOException {
-    long curGS = getGenerationStamp();
-    if (newGS <= curGS) {
-      throw new IOException("New generation stamp (" + newGS 
-          + ") must be greater than current one (" + curGS + ")");
-    }
-    setGenerationStamp(newGS);
-  }
-  
   @Override  //Object
   public String toString() {
     return getClass().getSimpleName()
@@ -321,4 +296,9 @@
         + "\n  getVolume()       = " + getVolume()
         + "\n  getBlockFile()    = " + getBlockFile();
   }
+
+  @Override
+  public boolean isOnTransientStorage() {
+    return volume.isTransientStorage();
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
index 35f7c93..2cd8a01 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
@@ -37,8 +37,7 @@
                            // that the replica will be bumped to after recovery
 
   public ReplicaUnderRecovery(ReplicaInfo replica, long recoveryId) {
-    super(replica.getBlockId(), replica.getNumBytes(), replica.getGenerationStamp(),
-        replica.getVolume(), replica.getDir());
+    super(replica, replica.getVolume(), replica.getDir());
     if ( replica.getState() != ReplicaState.FINALIZED &&
          replica.getState() != ReplicaState.RBW &&
          replica.getState() != ReplicaState.RWR ) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
index ec19ec5..d0d36ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
@@ -99,7 +99,7 @@
 
   @Override
   public synchronized V chooseVolume(List<V> volumes,
-      final long replicaSize) throws IOException {
+      long replicaSize) throws IOException {
     if (volumes.size() < 1) {
       throw new DiskOutOfSpaceException("No more available volumes");
     }
@@ -138,8 +138,7 @@
       if (mostAvailableAmongLowVolumes < replicaSize ||
           random.nextFloat() < scaledPreferencePercent) {
         volume = roundRobinPolicyHighAvailable.chooseVolume(
-            highAvailableVolumes,
-            replicaSize);
+            highAvailableVolumes, replicaSize);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Volumes are imbalanced. Selecting " + volume +
               " from high available space volumes for write of block size "
@@ -147,8 +146,7 @@
         }
       } else {
         volume = roundRobinPolicyLowAvailable.chooseVolume(
-            lowAvailableVolumes,
-            replicaSize);
+            lowAvailableVolumes, replicaSize);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Volumes are imbalanced. Selecting " + volume +
               " from low available space volumes for write of block size "
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 4c03151..2bb2e7f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -116,13 +116,16 @@
   /** @return a list of finalized blocks for the given block pool. */
   public List<FinalizedReplica> getFinalizedBlocks(String bpid);
 
+  /** @return a list of finalized blocks for the given block pool. */
+  public List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid);
+
   /**
    * Check whether the in-memory block record matches the block on the disk,
    * and, in case that they are not matched, update the record or mark it
    * as corrupted.
    */
   public void checkAndUpdate(String bpid, long blockId, File diskFile,
-      File diskMetaFile, FsVolumeSpi vol);
+      File diskMetaFile, FsVolumeSpi vol) throws IOException;
 
   /**
    * @param b - the block
@@ -197,7 +200,7 @@
    * @throws IOException if an error occurs
    */
   public ReplicaInPipelineInterface createRbw(StorageType storageType,
-      ExtendedBlock b) throws IOException;
+      ExtendedBlock b, boolean allowLazyPersist) throws IOException;
 
   /**
    * Recovers a RBW replica and returns the meta info of the replica
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index cba23c3..8ebf2b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -46,6 +46,9 @@
   
   public StorageType getStorageType();
 
+  /** Returns true if the volume is NOT backed by persistent storage. */
+  public boolean isTransientStorage();
+
   /**
    * Reserve disk space for an RBW block so a writer does not run out of
    * space before the block is full.
@@ -56,4 +59,4 @@
    * Release disk space previously reserved for RBW block.
    */
   public void releaseReservedSpace(long bytesToRelease);
-}
\ No newline at end of file
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
index 7f4bdae..55a3560 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
@@ -20,6 +20,9 @@
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 
 /**
@@ -27,12 +30,14 @@
  */
 public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
     implements VolumeChoosingPolicy<V> {
+  public static final Log LOG = LogFactory.getLog(RoundRobinVolumeChoosingPolicy.class);
 
   private int curVolume = 0;
 
   @Override
-  public synchronized V chooseVolume(final List<V> volumes, final long blockSize
-      ) throws IOException {
+  public synchronized V chooseVolume(final List<V> volumes, long blockSize)
+      throws IOException {
+
     if(volumes.size() < 1) {
       throw new DiskOutOfSpaceException("No more available volumes");
     }
@@ -50,7 +55,9 @@
       final V volume = volumes.get(curVolume);
       curVolume = (curVolume + 1) % volumes.size();
       long availableVolumeSize = volume.getAvailable();
-      if (availableVolumeSize > blockSize) { return volume; }
+      if (availableVolumeSize > blockSize) {
+        return volume;
+      }
       
       if (availableVolumeSize > maxAvailable) {
         maxAvailable = availableVolumeSize;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 96e4650..3eeb3ef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -28,6 +28,9 @@
 import java.io.RandomAccessFile;
 import java.util.Scanner;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DU;
 import org.apache.hadoop.fs.FileUtil;
@@ -42,6 +45,7 @@
 import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -56,16 +60,20 @@
  * This class is synchronized by {@link FsVolumeImpl}.
  */
 class BlockPoolSlice {
+  static final Log LOG = LogFactory.getLog(BlockPoolSlice.class);
+
   private final String bpid;
   private final FsVolumeImpl volume; // volume to which this BlockPool belongs to
   private final File currentDir; // StorageDirectory/current/bpid/current
   // directory where finalized replicas are stored
   private final File finalizedDir;
+  private final File lazypersistDir;
   private final File rbwDir; // directory store RBW replica
   private final File tmpDir; // directory store Temporary replica
   private static final String DU_CACHE_FILE = "dfsUsed";
   private volatile boolean dfsUsedSaved = false;
   private static final int SHUTDOWN_HOOK_PRIORITY = 30;
+  private final boolean deleteDuplicateReplicas;
   
   // TODO:FEDERATION scalability issue - a thread per DU is needed
   private final DU dfsUsage;
@@ -85,12 +93,17 @@
     this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); 
     this.finalizedDir = new File(
         currentDir, DataStorage.STORAGE_DIR_FINALIZED);
+    this.lazypersistDir = new File(currentDir, DataStorage.STORAGE_DIR_LAZY_PERSIST);
     if (!this.finalizedDir.exists()) {
       if (!this.finalizedDir.mkdirs()) {
         throw new IOException("Failed to mkdirs " + this.finalizedDir);
       }
     }
 
+    this.deleteDuplicateReplicas = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION,
+        DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT);
+
     // Files that were being written when the datanode was last shutdown
     // are now moved back to the data directory. It is possible that
     // in the future, we might want to do some sort of datanode-local
@@ -136,6 +149,10 @@
     return finalizedDir;
   }
   
+  File getLazypersistDir() {
+    return lazypersistDir;
+  }
+
   File getRbwDir() {
     return rbwDir;
   }
@@ -252,18 +269,63 @@
     dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
     return blockFile;
   }
-    
+
+  /**
+   * Save the given replica to persistent storage.
+   *
+   * @return The saved meta and block files, in that order.
+   * @throws IOException
+   */
+  File[] lazyPersistReplica(long blockId, long genStamp,
+                            File srcMeta, File srcFile) throws IOException {
+    if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) {
+      FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir);
+    }
+    File targetFiles[] = FsDatasetImpl.copyBlockFiles(
+        blockId, genStamp, srcMeta, srcFile, lazypersistDir);
+    dfsUsage.incDfsUsed(targetFiles[0].length() + targetFiles[1].length());
+    return targetFiles;
+  }
+
+  /**
+   * Move a persisted replica from lazypersist directory to a subdirectory
+   * under finalized.
+   */
+  File activateSavedReplica(Block b, File metaFile, File blockFile)
+      throws IOException {
+    final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
+    final File targetBlockFile = new File(blockDir, blockFile.getName());
+    final File targetMetaFile = new File(blockDir, metaFile.getName());
+    FileUtils.moveFile(blockFile, targetBlockFile);
+    FsDatasetImpl.LOG.info("Moved " + blockFile + " to " + targetBlockFile);
+    FileUtils.moveFile(metaFile, targetMetaFile);
+    FsDatasetImpl.LOG.info("Moved " + metaFile + " to " + targetMetaFile);
+    return targetBlockFile;
+  }
+
   void checkDirs() throws DiskErrorException {
     DiskChecker.checkDirs(finalizedDir);
     DiskChecker.checkDir(tmpDir);
     DiskChecker.checkDir(rbwDir);
   }
+
+
     
-  void getVolumeMap(ReplicaMap volumeMap) throws IOException {
+  void getVolumeMap(ReplicaMap volumeMap,
+                    final RamDiskReplicaTracker lazyWriteReplicaMap)
+      throws IOException {
+    // Recover lazy persist replicas, they will be added to the volumeMap
+    // when we scan the finalized directory.
+    if (lazypersistDir.exists()) {
+      int numRecovered = moveLazyPersistReplicasToFinalized(lazypersistDir);
+      FsDatasetImpl.LOG.info(
+          "Recovered " + numRecovered + " replicas from " + lazypersistDir);
+    }
+
     // add finalized replicas
-    addToReplicasMap(volumeMap, finalizedDir, true);
+    addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true);
     // add rbw replicas
-    addToReplicasMap(volumeMap, rbwDir, false);
+    addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false);
   }
 
   /**
@@ -292,18 +354,82 @@
 
 
   /**
+   * Move replicas in the lazy persist directory to their corresponding locations
+   * in the finalized directory.
+   * @return number of replicas recovered.
+   */
+  private int moveLazyPersistReplicasToFinalized(File source)
+      throws IOException {
+    File files[] = FileUtil.listFiles(source);
+    int numRecovered = 0;
+    for (File file : files) {
+      if (file.isDirectory()) {
+        numRecovered += moveLazyPersistReplicasToFinalized(file);
+      }
+
+      if (Block.isMetaFilename(file.getName())) {
+        File metaFile = file;
+        File blockFile = Block.metaToBlockFile(metaFile);
+        long blockId = Block.filename2id(blockFile.getName());
+        File targetDir = DatanodeUtil.idToBlockDir(finalizedDir, blockId);
+
+        if (blockFile.exists()) {
+
+          if (!targetDir.exists() && !targetDir.mkdirs()) {
+            LOG.warn("Failed to mkdirs " + targetDir);
+            continue;
+          }
+
+          final File targetMetaFile = new File(targetDir, metaFile.getName());
+          try {
+            NativeIO.renameTo(metaFile, targetMetaFile);
+          } catch (IOException e) {
+            LOG.warn("Failed to move meta file from "
+                + metaFile + " to " + targetMetaFile, e);
+            continue;
+
+          }
+
+          final File targetBlockFile = new File(targetDir, blockFile.getName());
+          try {
+            NativeIO.renameTo(blockFile, targetBlockFile);
+          } catch (IOException e) {
+            LOG.warn("Failed to move block file from "
+                + blockFile + " to " + targetBlockFile, e);
+            continue;
+          }
+
+          if (targetBlockFile.exists() && targetMetaFile.exists()) {
+            ++numRecovered;
+          } else {
+            // Failure should be rare.
+            LOG.warn("Failed to move " + blockFile + " to " + targetDir);
+          }
+        }
+      }
+    }
+
+    FileUtil.fullyDelete(source);
+    return numRecovered;
+  }
+
+  /**
    * Add replicas under the given directory to the volume map
    * @param volumeMap the replicas map
    * @param dir an input directory
+   * @param lazyWriteReplicaMap Map of replicas on transient
+   *                                storage.
    * @param isFinalized true if the directory has finalized replicas;
    *                    false if the directory has rbw replicas
    */
-  void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized
-      ) throws IOException {
+  void addToReplicasMap(ReplicaMap volumeMap, File dir,
+                        final RamDiskReplicaTracker lazyWriteReplicaMap,
+                        boolean isFinalized)
+      throws IOException {
     File files[] = FileUtil.listFiles(dir);
     for (File file : files) {
       if (file.isDirectory()) {
-        addToReplicasMap(volumeMap, file, isFinalized);
+        addToReplicasMap(volumeMap, file, lazyWriteReplicaMap, isFinalized);
       }
 
       if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) {
@@ -361,13 +487,96 @@
         }
       }
 
-      ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
-      if (oldReplica != null) {
-        FsDatasetImpl.LOG.warn("Two block files with the same block id exist " +
-            "on disk: " + oldReplica.getBlockFile() + " and " + file );
+      ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId());
+      if (oldReplica == null) {
+        volumeMap.add(bpid, newReplica);
+      } else {
+        // We have multiple replicas of the same block so decide which one
+        // to keep.
+        newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap);
+      }
+
+      // If we are retaining a replica on transient storage make sure
+      // it is in the lazyWriteReplicaMap so it can be persisted
+      // eventually.
+      if (newReplica.getVolume().isTransientStorage()) {
+        lazyWriteReplicaMap.addReplica(bpid, blockId,
+                                       (FsVolumeImpl) newReplica.getVolume());
+      } else {
+        lazyWriteReplicaMap.discardReplica(bpid, blockId, false);
       }
     }
   }
+
+  /**
+   * This method is invoked during DN startup when volumes are scanned to
+   * build up the volumeMap.
+   *
+   * Given two replicas, decide which one to keep. The preference is as
+   * follows:
+   *   1. Prefer the replica with the higher generation stamp.
+   *   2. If generation stamps are equal, prefer the replica with the
+   *      larger on-disk length.
+   *   3. If on-disk length is the same, prefer the replica on persistent
+   *      storage volume.
+   *   4. All other factors being equal, keep replica1.
+   *
+   * The other replica is removed from the volumeMap and is deleted from
+   * its storage volume.
+   *
+   * @param replica1
+   * @param replica2
+   * @param volumeMap
+   * @return the replica that is retained.
+   * @throws IOException
+   */
+  ReplicaInfo resolveDuplicateReplicas(
+      final ReplicaInfo replica1, final ReplicaInfo replica2,
+      final ReplicaMap volumeMap) throws IOException {
+
+    if (!deleteDuplicateReplicas) {
+      // Leave both block replicas in place.
+      return replica1;
+    }
+
+    ReplicaInfo replicaToKeep;
+    ReplicaInfo replicaToDelete;
+
+    if (replica1.getGenerationStamp() != replica2.getGenerationStamp()) {
+      replicaToKeep = replica1.getGenerationStamp() > replica2.getGenerationStamp()
+          ? replica1 : replica2;
+    } else if (replica1.getNumBytes() != replica2.getNumBytes()) {
+      replicaToKeep = replica1.getNumBytes() > replica2.getNumBytes() ?
+          replica1 : replica2;
+    } else if (replica1.getVolume().isTransientStorage() &&
+               !replica2.getVolume().isTransientStorage()) {
+      replicaToKeep = replica2;
+    } else {
+      replicaToKeep = replica1;
+    }
+
+    replicaToDelete = (replicaToKeep == replica1) ? replica2 : replica1;
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("resolveDuplicateReplicas decide to keep " + replicaToKeep
+          + ".  Will try to delete " + replicaToDelete);
+    }
+
+    // Update volumeMap.
+    volumeMap.add(bpid, replicaToKeep);
+
+    // Delete the files on disk. Failure here is okay.
+    final File blockFile = replicaToDelete.getBlockFile();
+    if (!blockFile.delete()) {
+      LOG.warn("Failed to delete block file " + blockFile);
+    }
+    final File metaFile = replicaToDelete.getMetaFile();
+    if (!metaFile.delete()) {
+      LOG.warn("Failed to delete meta file " + metaFile);
+    }
+
+    return replicaToKeep;
+  }
   
   /**
    * Find out the number of bytes in the block that match its crc.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 6e81082..df52e14 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -30,7 +30,6 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -44,7 +43,9 @@
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -66,6 +67,7 @@
 import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.Replica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
@@ -84,6 +86,7 @@
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
+import static org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -92,6 +95,7 @@
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -115,7 +119,6 @@
     }
   }
 
-
   @Override // FsDatasetSpi
   public List<FsVolumeImpl> getVolumes() {
     return volumes.volumes;
@@ -155,7 +158,7 @@
   @Override // FsDatasetSpi
   public synchronized Block getStoredBlock(String bpid, long blkid)
       throws IOException {
-    File blockfile = getFile(bpid, blkid);
+    File blockfile = getFile(bpid, blkid, false);
     if (blockfile == null) {
       return null;
     }
@@ -208,11 +211,17 @@
   final FsVolumeList volumes;
   final Map<String, DatanodeStorage> storageMap;
   final FsDatasetAsyncDiskService asyncDiskService;
+  final Daemon lazyWriter;
   final FsDatasetCache cacheManager;
   private final Configuration conf;
   private final int validVolsRequired;
+  private volatile boolean fsRunning;
 
   final ReplicaMap volumeMap;
+  final RamDiskReplicaTracker ramDiskReplicaTracker;
+
+  private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3;
+
 
   // Used for synchronizing access to usage stats
   private final Object statsLock = new Object();
@@ -222,6 +231,7 @@
    */
   FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
       ) throws IOException {
+    this.fsRunning = true;
     this.datanode = datanode;
     this.dataStorage = storage;
     this.conf = conf;
@@ -252,6 +262,8 @@
 
     storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
     volumeMap = new ReplicaMap(this);
+    ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
+
     @SuppressWarnings("unchecked")
     final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
         ReflectionUtils.newInstance(conf.getClass(
@@ -266,6 +278,10 @@
     }
 
     cacheManager = new FsDatasetCache(this);
+
+    // Start the lazy writer once we have built the replica maps.
+    lazyWriter = new Daemon(new LazyWriter(conf));
+    lazyWriter.start();
     registerMBean(datanode.getDatanodeUuid());
   }
 
@@ -281,7 +297,7 @@
     FsVolumeImpl fsVolume = new FsVolumeImpl(
         this, sd.getStorageUuid(), dir, this.conf, storageType);
     ReplicaMap tempVolumeMap = new ReplicaMap(this);
-    fsVolume.getVolumeMap(tempVolumeMap);
+    fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
 
     volumeMap.addAll(tempVolumeMap);
     volumes.addVolume(fsVolume);
@@ -309,7 +325,7 @@
     for (final String bpid : bpids) {
       try {
         fsVolume.addBlockPool(bpid, this.conf);
-        fsVolume.getVolumeMap(bpid, tempVolumeMap);
+        fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker);
       } catch (IOException e) {
         LOG.warn("Caught exception when adding " + fsVolume +
             ". Will throw later.", e);
@@ -550,16 +566,16 @@
    * Get File name for a given block.
    */
   private File getBlockFile(ExtendedBlock b) throws IOException {
-    return getBlockFile(b.getBlockPoolId(), b.getLocalBlock());
+    return getBlockFile(b.getBlockPoolId(), b.getBlockId());
   }
   
   /**
    * Get File name for a given block.
    */
-  File getBlockFile(String bpid, Block b) throws IOException {
-    File f = validateBlockFile(bpid, b);
+  File getBlockFile(String bpid, long blockId) throws IOException {
+    File f = validateBlockFile(bpid, blockId);
     if(f == null) {
-      throw new IOException("Block " + b + " is not valid.");
+      throw new IOException("BlockId " + blockId + " is not valid.");
     }
     return f;
   }
@@ -569,12 +585,16 @@
    * checking that it exists. This should be used when the
    * next operation is going to open the file for read anyway,
    * and thus the exists check is redundant.
+   *
+   * @param touch if true then update the last access timestamp of the
+   *              block. Currently used for blocks on transient storage.
    */
-  private File getBlockFileNoExistsCheck(ExtendedBlock b)
+  private File getBlockFileNoExistsCheck(ExtendedBlock b,
+                                         boolean touch)
       throws IOException {
     final File f;
     synchronized(this) {
-      f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId());
+      f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch);
     }
     if (f == null) {
       throw new IOException("Block " + b + " is not valid");
@@ -585,7 +605,7 @@
   @Override // FsDatasetSpi
   public InputStream getBlockInputStream(ExtendedBlock b,
       long seekOffset) throws IOException {
-    File blockFile = getBlockFileNoExistsCheck(b);
+    File blockFile = getBlockFileNoExistsCheck(b, true);
     if (isNativeIOAvailable) {
       return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
     } else {
@@ -661,8 +681,8 @@
     return new ReplicaInputStreams(blockInFile.getFD(), metaInFile.getFD());
   }
 
-  static File moveBlockFiles(Block b, File srcfile, File destdir
-      ) throws IOException {
+  static File moveBlockFiles(Block b, File srcfile, File destdir)
+      throws IOException {
     final File dstfile = new File(destdir, b.getBlockName());
     final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp());
     final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp());
@@ -685,6 +705,34 @@
     return dstfile;
   }
 
+  /**
+   * Copy the block and meta files for the given block to the given destination.
+   * @return the new meta and block files.
+   * @throws IOException
+   */
+  static File[] copyBlockFiles(long blockId, long genStamp,
+                               File srcMeta, File srcFile, File destRoot)
+      throws IOException {
+    final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
+    final File dstFile = new File(destDir, srcFile.getName());
+    final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
+    try {
+      FileUtils.copyFile(srcMeta, dstMeta);
+    } catch (IOException e) {
+      throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e);
+    }
+    try {
+      FileUtils.copyFile(srcFile, dstFile);
+    } catch (IOException e) {
+      throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Copied " + srcMeta + " to " + dstMeta);
+      LOG.debug("Copied " + srcFile + " to " + dstFile);
+    }
+    return new File[] {dstMeta, dstFile};
+  }
+
   static private void truncateBlock(File blockFile, File metaFile,
       long oldlen, long newlen) throws IOException {
     LOG.info("truncateBlock: blockFile=" + blockFile
@@ -949,8 +997,8 @@
 
   @Override // FsDatasetSpi
   public synchronized ReplicaInPipeline createRbw(StorageType storageType,
-      ExtendedBlock b) throws IOException {
-    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 
+      ExtendedBlock b, boolean allowLazyPersist) throws IOException {
+    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
         b.getBlockId());
     if (replicaInfo != null) {
       throw new ReplicaAlreadyExistsException("Block " + b +
@@ -958,12 +1006,32 @@
       " and thus cannot be created.");
     }
     // create a new block
-    FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
-    // create a rbw file to hold block in the designated volume
+    FsVolumeImpl v;
+    while (true) {
+      try {
+        if (allowLazyPersist) {
+          // First try to place the block on a transient volume.
+          v = volumes.getNextTransientVolume(b.getNumBytes());
+          datanode.getMetrics().incrRamDiskBlocksWrite();
+        } else {
+          v = volumes.getNextVolume(storageType, b.getNumBytes());
+        }
+      } catch (DiskOutOfSpaceException de) {
+        if (allowLazyPersist) {
+          datanode.getMetrics().incrRamDiskBlocksWriteFallback();
+          allowLazyPersist = false;
+          continue;
+        }
+        throw de;
+      }
+      break;
+    }
+    // create an rbw file to hold block in the designated volume
     File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
     ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
         b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
     volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
+
     return newReplicaInfo;
   }
   
@@ -1109,7 +1177,6 @@
     ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), 
         b.getGenerationStamp(), v, f.getParentFile(), 0);
     volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
-    
     return newReplicaInfo;
   }
 
@@ -1176,8 +1243,14 @@
       File dest = v.addFinalizedBlock(
           bpid, replicaInfo, f, replicaInfo.getBytesReserved());
       newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
+
+      if (v.isTransientStorage()) {
+        ramDiskReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
+        datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes());
+      }
     }
     volumeMap.add(bpid, newReplicaInfo);
+
     return newReplicaInfo;
   }
 
@@ -1197,6 +1270,9 @@
           replicaInfo.getMetaFile(), b.getLocalBlock())) {
         LOG.warn("Block " + b + " unfinalized and removed. " );
       }
+      if (replicaInfo.getVolume().isTransientStorage()) {
+        ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true);
+      }
     }
   }
 
@@ -1293,6 +1369,22 @@
   }
 
   /**
+   * Get the list of finalized blocks from in-memory blockmap for a block pool.
+   */
+  @Override
+  public synchronized List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid) {
+    ArrayList<FinalizedReplica> finalized =
+        new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
+    for (ReplicaInfo b : volumeMap.replicas(bpid)) {
+      if(!b.getVolume().isTransientStorage() &&
+         b.getState() == ReplicaState.FINALIZED) {
+        finalized.add(new FinalizedReplica((FinalizedReplica)b));
+      }
+    }
+    return finalized;
+  }
+
+  /**
    * Check whether the given block is a valid one.
    * valid means finalized
    */
@@ -1321,11 +1413,11 @@
   /**
    * Find the file corresponding to the block and return it if it exists.
    */
-  File validateBlockFile(String bpid, Block b) {
+  File validateBlockFile(String bpid, long blockId) {
     //Should we check for metadata file too?
     final File f;
     synchronized(this) {
-      f = getFile(bpid, b.getBlockId());
+      f = getFile(bpid, blockId, false);
     }
     
     if(f != null ) {
@@ -1337,7 +1429,7 @@
     }
     
     if (LOG.isDebugEnabled()) {
-      LOG.debug("b=" + b + ", f=" + f);
+      LOG.debug("blockId=" + blockId + ", f=" + f);
     }
     return null;
   }
@@ -1409,6 +1501,17 @@
         volumeMap.remove(bpid, invalidBlks[i]);
       }
 
+      if (v.isTransientStorage()) {
+        RamDiskReplica replicaInfo =
+          ramDiskReplicaTracker.getReplica(bpid, invalidBlks[i].getBlockId());
+        if (replicaInfo != null) {
+          if (replicaInfo.getIsPersisted() ==  false) {
+            datanode.getMetrics().incrRamDiskBlocksDeletedBeforeLazyPersisted();
+          }
+          discardRamDiskReplica(replicaInfo, true);
+        }
+      }
+
       // If a DFSClient has the replica in its cache of short-circuit file
       // descriptors (and the client is using ShortCircuitShm), invalidate it.
       datanode.getShortCircuitRegistry().processBlockInvalidation(
@@ -1497,6 +1600,11 @@
               ": volume was not an instance of FsVolumeImpl.");
           return;
         }
+        if (volume.isTransientStorage()) {
+          LOG.warn("Caching not supported on block with id " + blockId +
+              " since the volume is backed by RAM.");
+          return;
+        }
         success = true;
       } finally {
         if (!success) {
@@ -1533,7 +1641,7 @@
   @Override // FsDatasetSpi
   public synchronized boolean contains(final ExtendedBlock block) {
     final long blockId = block.getLocalBlock().getBlockId();
-    return getFile(block.getBlockPoolId(), blockId) != null;
+    return getFile(block.getBlockPoolId(), blockId, false) != null;
   }
 
   /**
@@ -1542,13 +1650,18 @@
    * @param blockId a block's id
    * @return on disk data file path; null if the replica does not exist
    */
-  File getFile(final String bpid, final long blockId) {
+  File getFile(final String bpid, final long blockId, boolean touch) {
     ReplicaInfo info = volumeMap.get(bpid, blockId);
     if (info != null) {
+      if (touch && info.getVolume().isTransientStorage()) {
+        ramDiskReplicaTracker.touch(bpid, blockId);
+        datanode.getMetrics().incrRamDiskBlocksReadHits();
+      }
       return info.getBlockFile();
     }
     return null;    
   }
+
   /**
    * check if a data directory is healthy
    * if some volumes failed - make sure to remove all the blocks that belong
@@ -1624,8 +1737,14 @@
 
   @Override // FsDatasetSpi
   public void shutdown() {
-    if (mbeanName != null)
+    fsRunning = false;
+
+    ((LazyWriter) lazyWriter.getRunnable()).stop();
+    lazyWriter.interrupt();
+
+    if (mbeanName != null) {
       MBeans.unregister(mbeanName);
+    }
     
     if (asyncDiskService != null) {
       asyncDiskService.shutdown();
@@ -1634,6 +1753,13 @@
     if(volumes != null) {
       volumes.shutdown();
     }
+
+    try {
+      lazyWriter.join();
+    } catch (InterruptedException ie) {
+      LOG.warn("FsDatasetImpl.shutdown ignoring InterruptedException " +
+               "from LazyWriter.join");
+    }
   }
 
   @Override // FSDatasetMBean
@@ -1666,7 +1792,7 @@
    */
   @Override
   public void checkAndUpdate(String bpid, long blockId, File diskFile,
-      File diskMetaFile, FsVolumeSpi vol) {
+      File diskMetaFile, FsVolumeSpi vol) throws IOException {
     Block corruptBlock = null;
     ReplicaInfo memBlockInfo;
     synchronized (this) {
@@ -1699,6 +1825,9 @@
           if (blockScanner != null) {
             blockScanner.deleteBlock(bpid, new Block(blockId));
           }
+          if (vol.isTransientStorage()) {
+            ramDiskReplicaTracker.discardReplica(bpid, blockId, true);
+          }
           LOG.warn("Removed block " + blockId
               + " from memory with missing block file on the disk");
           // Finally remove the metadata file
@@ -1719,8 +1848,12 @@
             diskFile.length(), diskGS, vol, diskFile.getParentFile());
         volumeMap.add(bpid, diskBlockInfo);
         final DataBlockScanner blockScanner = datanode.getBlockScanner();
-        if (blockScanner != null) {
-          blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
+        if (!vol.isTransientStorage()) {
+          if (blockScanner != null) {
+            blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
+          }
+        } else {
+          ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
         }
         LOG.warn("Added missing block to memory " + diskBlockInfo);
         return;
@@ -1732,10 +1865,20 @@
       File memFile = memBlockInfo.getBlockFile();
       if (memFile.exists()) {
         if (memFile.compareTo(diskFile) != 0) {
-          LOG.warn("Block file " + memFile.getAbsolutePath()
-              + " does not match file found by scan "
-              + diskFile.getAbsolutePath());
-          // TODO: Should the diskFile be deleted?
+          if (diskMetaFile.exists()) {
+            if (memBlockInfo.getMetaFile().exists()) {
+              // We have two sets of block+meta files. Decide which one to
+              // keep.
+              ReplicaInfo diskBlockInfo = new FinalizedReplica(
+                  blockId, diskFile.length(), diskGS, vol, diskFile.getParentFile());
+              ((FsVolumeImpl) vol).getBlockPoolSlice(bpid).resolveDuplicateReplicas(
+                  memBlockInfo, diskBlockInfo, volumeMap);
+            }
+          } else {
+            if (!diskFile.delete()) {
+              LOG.warn("Failed to delete " + diskFile + ". Will retry on next scan");
+            }
+          }
         }
       } else {
         // Block refers to a block file that does not exist.
@@ -1899,9 +2042,9 @@
     final String bpid = oldBlock.getBlockPoolId();
     final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
     LOG.info("updateReplica: " + oldBlock
-        + ", recoveryId=" + recoveryId
-        + ", length=" + newlength
-        + ", replica=" + replica);
+                 + ", recoveryId=" + recoveryId
+                 + ", length=" + newlength
+                 + ", replica=" + replica);
 
     //check replica
     if (replica == null) {
@@ -1993,7 +2136,7 @@
       volumes.addBlockPool(bpid, conf);
       volumeMap.initBlockPool(bpid);
     }
-    volumes.getAllVolumesMap(bpid, volumeMap);
+    volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
   }
 
   @Override
@@ -2171,5 +2314,248 @@
     asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, fd, offset,
         nbytes, flags);
   }
+
+  void discardRamDiskReplica(RamDiskReplica replica, boolean deleteSavedCopies) {
+    ramDiskReplicaTracker.discardReplica(replica.getBlockPoolId(),
+      replica.getBlockId(), deleteSavedCopies);
+  }
+
+  class LazyWriter implements Runnable {
+    private volatile boolean shouldRun = true;
+    final int checkpointerInterval;
+    final long estimateBlockSize;
+    final int lowWatermarkFreeSpacePercentage;
+    final int lowWatermarkFreeSpaceReplicas;
+
+
+    public LazyWriter(Configuration conf) {
+      this.checkpointerInterval = conf.getInt(
+          DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
+          DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC);
+      this.estimateBlockSize = conf.getLongBytes(
+          DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
+          DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
+      this.lowWatermarkFreeSpacePercentage = conf.getInt(
+          DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT,
+          DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT);
+      this.lowWatermarkFreeSpaceReplicas = conf.getInt(
+          DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS,
+          DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT);
+    }
+
+    private void moveReplicaToNewVolume(String bpid, long blockId, long creationTime)
+        throws IOException {
+
+      FsVolumeImpl targetVolume;
+      ReplicaInfo replicaInfo;
+      BlockPoolSlice bpSlice;
+      File srcFile, srcMeta;
+      long genStamp;
+
+      synchronized (FsDatasetImpl.this) {
+        replicaInfo = volumeMap.get(bpid, blockId);
+
+        if (replicaInfo == null || !replicaInfo.getVolume().isTransientStorage()) {
+          // The block was either deleted before it could be checkpointed or
+          // it is already on persistent storage. This can occur if a second
+          // replica on persistent storage was found after the lazy write was
+          // scheduled.
+          return;
+        }
+
+        // Pick a target volume for the block.
+        targetVolume = volumes.getNextVolume(
+            StorageType.DEFAULT, replicaInfo.getNumBytes());
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
+        }
+
+        ramDiskReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
+        bpSlice = targetVolume.getBlockPoolSlice(bpid);
+        srcMeta = replicaInfo.getMetaFile();
+        srcFile = replicaInfo.getBlockFile();
+        genStamp = replicaInfo.getGenerationStamp();
+      }
+
+      // Drop the FsDatasetImpl lock for the file copy.
+      File[] savedFiles =
+          bpSlice.lazyPersistReplica(blockId, genStamp, srcMeta, srcFile);
+
+      synchronized (FsDatasetImpl.this) {
+        ramDiskReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles);
+
+        // Update metrics (ignore the metadata file size)
+        datanode.getMetrics().incrRamDiskBlocksLazyPersisted();
+        datanode.getMetrics().incrRamDiskBytesLazyPersisted(replicaInfo.getNumBytes());
+        datanode.getMetrics().addRamDiskBlocksLazyPersistWindowMs(
+          Time.monotonicNow() - creationTime);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
+                        " to file " + savedFiles[1]);
+        }
+      }
+    }
+
+    /**
+     * Checkpoint a pending replica to persistent storage now.
+     * If we fail then move the replica to the end of the queue.
+     * @return true if there is more work to be done, false otherwise.
+     */
+    private boolean saveNextReplica() {
+      RamDiskReplica block = null;
+      boolean succeeded = false;
+
+      try {
+        block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
+        if (block != null) {
+          moveReplicaToNewVolume(block.getBlockPoolId(), block.getBlockId(),
+            block.getCreationTime());
+        }
+        succeeded = true;
+      } catch(IOException ioe) {
+        LOG.warn("Exception saving replica " + block, ioe);
+      } finally {
+        if (!succeeded && block != null) {
+          LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
+          ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block);
+        }
+      }
+
+      return succeeded;
+    }
+
+    private boolean transientFreeSpaceBelowThreshold() throws IOException {
+      long free = 0;
+      long capacity = 0;
+
+      // Don't worry about fragmentation for now. We don't expect more than one
+      // transient volume per DN.
+      for (FsVolumeImpl v : volumes.volumes) {
+        if (v.isTransientStorage()) {
+          capacity += v.getCapacity();
+          free += v.getAvailable();
+        }
+      }
+
+      if (capacity == 0) {
+        return false;
+      }
+
+      int percentFree = (int) (free * 100 / capacity);
+      return percentFree < lowWatermarkFreeSpacePercentage ||
+             free < (estimateBlockSize * lowWatermarkFreeSpaceReplicas);
+    }
+
+    /**
+     * Attempt to evict one or more transient block replicas we have at least
+     * spaceNeeded bytes free.
+     */
+    private void evictBlocks() throws IOException {
+      int iterations = 0;
+
+      while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
+             transientFreeSpaceBelowThreshold()) {
+        RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction();
+
+        if (replicaState == null) {
+          break;
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Evicting block " + replicaState);
+        }
+
+        ReplicaInfo replicaInfo, newReplicaInfo;
+        File blockFile, metaFile;
+        long blockFileUsed, metaFileUsed;
+        final String bpid = replicaState.getBlockPoolId();
+
+        synchronized (FsDatasetImpl.this) {
+          replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), replicaState.getBlockId());
+          Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
+          blockFile = replicaInfo.getBlockFile();
+          metaFile = replicaInfo.getMetaFile();
+          blockFileUsed = blockFile.length();
+          metaFileUsed = metaFile.length();
+          discardRamDiskReplica(replicaState, false);
+
+          // Move the replica from lazyPersist/ to finalized/ on target volume
+          BlockPoolSlice bpSlice =
+              replicaState.getLazyPersistVolume().getBlockPoolSlice(bpid);
+          File newBlockFile = bpSlice.activateSavedReplica(
+              replicaInfo, replicaState.getSavedMetaFile(),
+              replicaState.getSavedBlockFile());
+
+          newReplicaInfo =
+              new FinalizedReplica(replicaInfo.getBlockId(),
+                                   replicaInfo.getBytesOnDisk(),
+                                   replicaInfo.getGenerationStamp(),
+                                   replicaState.getLazyPersistVolume(),
+                                   newBlockFile.getParentFile());
+
+          // Update the volumeMap entry.
+          volumeMap.add(bpid, newReplicaInfo);
+
+          // Update metrics
+          datanode.getMetrics().incrRamDiskBlocksEvicted();
+          datanode.getMetrics().addRamDiskBlocksEvictionWindowMs(
+              Time.monotonicNow() - replicaState.getCreationTime());
+          if (replicaState.getNumReads() == 0) {
+            datanode.getMetrics().incrRamDiskBlocksEvictedWithoutRead();
+          }
+        }
+
+        // Before deleting the files from transient storage we must notify the
+        // NN that the files are on the new storage. Else a blockReport from
+        // the transient storage might cause the NN to think the blocks are lost.
+        ExtendedBlock extendedBlock =
+            new ExtendedBlock(bpid, newReplicaInfo);
+        datanode.notifyNamenodeReceivedBlock(
+            extendedBlock, null, newReplicaInfo.getStorageUuid());
+
+        // Remove the old replicas from transient storage.
+        if (blockFile.delete() || !blockFile.exists()) {
+          ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, blockFileUsed);
+          if (metaFile.delete() || !metaFile.exists()) {
+            ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, metaFileUsed);
+          }
+        }
+
+        // If deletion failed then the directory scanner will cleanup the blocks
+        // eventually.
+      }
+    }
+
+    @Override
+    public void run() {
+      int numSuccessiveFailures = 0;
+
+      while (fsRunning && shouldRun) {
+        try {
+          numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1);
+          evictBlocks();
+
+          // Sleep if we have no more work to do or if it looks like we are not
+          // making any forward progress. This is to ensure that if all persist
+          // operations are failing we don't keep retrying them in a tight loop.
+          if (numSuccessiveFailures >= ramDiskReplicaTracker.numReplicasNotPersisted()) {
+            Thread.sleep(checkpointerInterval * 1000);
+            numSuccessiveFailures = 0;
+          }
+        } catch (InterruptedException e) {
+          LOG.info("LazyWriter was interrupted, exiting");
+          break;
+        } catch (Exception e) {
+          LOG.warn("Ignoring exception in LazyWriter:", e);
+        }
+      }
+    }
+
+    public void stop() {
+      shouldRun = false;
+    }
+  }
 }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 63bc6a1..60fb71d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -77,7 +77,7 @@
    * dfs.datanode.fsdatasetcache.max.threads.per.volume) to limit resource
    * contention.
    */
-  private final ThreadPoolExecutor cacheExecutor;
+  protected ThreadPoolExecutor cacheExecutor;
   
   FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
       Configuration conf, StorageType storageType) throws IOException {
@@ -96,6 +96,10 @@
   }
 
   protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
+    if (storageType.isTransient()) {
+      return null;
+    }
+
     final int maxNumThreads = dataset.datanode.getConf().getInt(
         DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY,
         DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT);
@@ -202,6 +206,11 @@
   }
   
   @Override
+  public boolean isTransientStorage() {
+    return storageType.isTransient();
+  }
+
+  @Override
   public String getPath(String bpid) throws IOException {
     return getBlockPoolSlice(bpid).getDirectory().getAbsolutePath();
   }
@@ -230,9 +239,6 @@
   @Override
   public void reserveSpaceForRbw(long bytesToReserve) {
     if (bytesToReserve != 0) {
-      if (FsDatasetImpl.LOG.isDebugEnabled()) {
-        FsDatasetImpl.LOG.debug("Reserving " + bytesToReserve + " on volume " + getBasePath());
-      }
       reservedForRbw.addAndGet(bytesToReserve);
     }
   }
@@ -240,9 +246,6 @@
   @Override
   public void releaseReservedSpace(long bytesToRelease) {
     if (bytesToRelease != 0) {
-      if (FsDatasetImpl.LOG.isDebugEnabled()) {
-        FsDatasetImpl.LOG.debug("Releasing " + bytesToRelease + " on volume " + getBasePath());
-      }
 
       long oldReservation, newReservation;
       do {
@@ -292,39 +295,29 @@
     }
   }
     
-  void getVolumeMap(ReplicaMap volumeMap) throws IOException {
+  void getVolumeMap(ReplicaMap volumeMap,
+                    final RamDiskReplicaTracker ramDiskReplicaMap)
+      throws IOException {
     for(BlockPoolSlice s : bpSlices.values()) {
-      s.getVolumeMap(volumeMap);
+      s.getVolumeMap(volumeMap, ramDiskReplicaMap);
     }
   }
   
-  void getVolumeMap(String bpid, ReplicaMap volumeMap) throws IOException {
-    getBlockPoolSlice(bpid).getVolumeMap(volumeMap);
+  void getVolumeMap(String bpid, ReplicaMap volumeMap,
+                    final RamDiskReplicaTracker ramDiskReplicaMap)
+      throws IOException {
+    getBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap);
   }
   
-  /**
-   * Add replicas under the given directory to the volume map
-   * @param volumeMap the replicas map
-   * @param dir an input directory
-   * @param isFinalized true if the directory has finalized replicas;
-   *                    false if the directory has rbw replicas
-   * @throws IOException 
-   */
-  void addToReplicasMap(String bpid, ReplicaMap volumeMap, 
-      File dir, boolean isFinalized) throws IOException {
-    BlockPoolSlice bp = getBlockPoolSlice(bpid);
-    // TODO move this up
-    // dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
-    bp.addToReplicasMap(volumeMap, dir, isFinalized);
-  }
-
   @Override
   public String toString() {
     return currentDir.getAbsolutePath();
   }
 
   void shutdown() {
-    cacheExecutor.shutdown();
+    if (cacheExecutor != null) {
+      cacheExecutor.shutdown();
+    }
     Set<Entry<String, BlockPoolSlice>> set = bpSlices.entrySet();
     for (Entry<String, BlockPoolSlice> entry : set) {
       entry.getValue().shutdown();
@@ -373,6 +366,8 @@
     File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
     File finalizedDir = new File(bpCurrentDir,
         DataStorage.STORAGE_DIR_FINALIZED);
+    File lazypersistDir = new File(bpCurrentDir,
+        DataStorage.STORAGE_DIR_LAZY_PERSIST);
     File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
     if (force) {
       FileUtil.fullyDelete(bpDir);
@@ -384,6 +379,11 @@
           !FileUtil.fullyDelete(finalizedDir)) {
         throw new IOException("Failed to delete " + finalizedDir);
       }
+      if (lazypersistDir.exists() &&
+        ((!DatanodeUtil.dirNoFilesRecursive(lazypersistDir) ||
+          !FileUtil.fullyDelete(lazypersistDir)))) {
+        throw new IOException("Failed to delete " + lazypersistDir);
+      }
       FileUtil.fullyDelete(tmpDir);
       for (File f : FileUtil.listFiles(bpCurrentDir)) {
         if (!f.delete()) {
@@ -417,6 +417,5 @@
   DatanodeStorage toDatanodeStorage() {
     return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType);
   }
-
 }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index 90739c3..837ddf7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -68,7 +68,25 @@
     }
     return blockChooser.chooseVolume(list, blockSize);
   }
-    
+
+  /**
+   * Get next volume. Synchronized to ensure {@link #curVolume} is updated
+   * by a single thread and next volume is chosen with no concurrent
+   * update to {@link #volumes}.
+   * @param blockSize free space needed on the volume
+   * @return next volume to store the block in.
+   */
+  synchronized FsVolumeImpl getNextTransientVolume(
+      long blockSize) throws IOException {
+    final List<FsVolumeImpl> list = new ArrayList<FsVolumeImpl>(volumes.size());
+    for(FsVolumeImpl v : volumes) {
+      if (v.isTransientStorage()) {
+        list.add(v);
+      }
+    }
+    return blockChooser.chooseVolume(list, blockSize);
+  }
+
   long getDfsUsed() throws IOException {
     long dfsUsed = 0L;
     for (FsVolumeImpl v : volumes) {
@@ -101,7 +119,10 @@
     return remaining;
   }
   
-  void getAllVolumesMap(final String bpid, final ReplicaMap volumeMap) throws IOException {
+  void getAllVolumesMap(final String bpid,
+                        final ReplicaMap volumeMap,
+                        final RamDiskReplicaTracker ramDiskReplicaMap)
+      throws IOException {
     long totalStartTime = Time.monotonicNow();
     final List<IOException> exceptions = Collections.synchronizedList(
         new ArrayList<IOException>());
@@ -113,7 +134,7 @@
             FsDatasetImpl.LOG.info("Adding replicas to map for block pool " +
                 bpid + " on volume " + v + "...");
             long startTime = Time.monotonicNow();
-            v.getVolumeMap(bpid, volumeMap);
+            v.getVolumeMap(bpid, volumeMap, ramDiskReplicaMap);
             long timeTaken = Time.monotonicNow() - startTime;
             FsDatasetImpl.LOG.info("Time to add replicas to map for block pool"
                 + " " + bpid + " on volume " + v + ": " + timeTaken + "ms");
@@ -142,17 +163,6 @@
         + totalTimeTaken + "ms");
   }
 
-  void getVolumeMap(String bpid, FsVolumeImpl volume, ReplicaMap volumeMap)
-      throws IOException {
-    FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + bpid +
-                               " on volume " + volume + "...");
-    long startTime = Time.monotonicNow();
-    volume.getVolumeMap(bpid, volumeMap);
-    long timeTaken = Time.monotonicNow() - startTime;
-    FsDatasetImpl.LOG.info("Time to add replicas to map for block pool " + bpid +
-                               " on volume " + volume + ": " + timeTaken + "ms");
-  }
-    
   /**
    * Calls {@link FsVolumeImpl#checkDirs()} on each volume, removing any
    * volumes from the active list that result in a DiskErrorException.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
new file mode 100644
index 0000000..a843d9a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+
+import com.google.common.collect.TreeMultimap;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.Time;
+
+import java.io.File;
+import java.util.*;
+
+/**
+ * An implementation of RamDiskReplicaTracker that uses an LRU
+ * eviction scheme.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
+
+  private class RamDiskReplicaLru extends RamDiskReplica {
+    long lastUsedTime;
+
+    private RamDiskReplicaLru(String bpid, long blockId, FsVolumeImpl ramDiskVolume) {
+      super(bpid, blockId, ramDiskVolume);
+    }
+
+    @Override
+    public int hashCode() {
+      return super.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      return super.equals(other);
+    }
+  }
+
+  /**
+   * Map of blockpool ID to <map of blockID to ReplicaInfo>.
+   */
+  Map<String, Map<Long, RamDiskReplicaLru>> replicaMaps;
+
+  /**
+   * Queue of replicas that need to be written to disk.
+   * Stale entries are GC'd by dequeueNextReplicaToPersist.
+   */
+  Queue<RamDiskReplicaLru> replicasNotPersisted;
+
+  /**
+   * Map of persisted replicas ordered by their last use times.
+   */
+  TreeMultimap<Long, RamDiskReplicaLru> replicasPersisted;
+
+  RamDiskReplicaLruTracker() {
+    replicaMaps = new HashMap<String, Map<Long, RamDiskReplicaLru>>();
+    replicasNotPersisted = new LinkedList<RamDiskReplicaLru>();
+    replicasPersisted = TreeMultimap.create();
+  }
+
+  @Override
+  synchronized void addReplica(final String bpid, final long blockId,
+                               final FsVolumeImpl transientVolume) {
+    Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+    if (map == null) {
+      map = new HashMap<Long, RamDiskReplicaLru>();
+      replicaMaps.put(bpid, map);
+    }
+    RamDiskReplicaLru ramDiskReplicaLru = new RamDiskReplicaLru(bpid, blockId, transientVolume);
+    map.put(blockId, ramDiskReplicaLru);
+    replicasNotPersisted.add(ramDiskReplicaLru);
+  }
+
+  @Override
+  synchronized void touch(final String bpid,
+                          final long blockId) {
+    Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+    RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
+
+    if (ramDiskReplicaLru == null) {
+      return;
+    }
+
+    ramDiskReplicaLru.numReads.getAndIncrement();
+
+    // Reinsert the replica with its new timestamp.
+    if (replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru)) {
+      ramDiskReplicaLru.lastUsedTime = Time.monotonicNow();
+      replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
+    }
+  }
+
+  @Override
+  synchronized void recordStartLazyPersist(
+      final String bpid, final long blockId, FsVolumeImpl checkpointVolume) {
+    Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+    RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
+    ramDiskReplicaLru.setLazyPersistVolume(checkpointVolume);
+  }
+
+  @Override
+  synchronized void recordEndLazyPersist(
+      final String bpid, final long blockId, final File[] savedFiles) {
+    Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+    RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
+
+    if (ramDiskReplicaLru == null) {
+      throw new IllegalStateException("Unknown replica bpid=" +
+          bpid + "; blockId=" + blockId);
+    }
+    ramDiskReplicaLru.recordSavedBlockFiles(savedFiles);
+
+    if (replicasNotPersisted.peek() == ramDiskReplicaLru) {
+      // Common case.
+      replicasNotPersisted.remove();
+    } else {
+      // Caller error? Fallback to O(n) removal.
+      replicasNotPersisted.remove(ramDiskReplicaLru);
+    }
+
+    ramDiskReplicaLru.lastUsedTime = Time.monotonicNow();
+    replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
+    ramDiskReplicaLru.isPersisted = true;
+  }
+
+  @Override
+  synchronized RamDiskReplicaLru dequeueNextReplicaToPersist() {
+    while (replicasNotPersisted.size() != 0) {
+      RamDiskReplicaLru ramDiskReplicaLru = replicasNotPersisted.remove();
+      Map<Long, RamDiskReplicaLru> replicaMap =
+          replicaMaps.get(ramDiskReplicaLru.getBlockPoolId());
+
+      if (replicaMap != null && replicaMap.get(ramDiskReplicaLru.getBlockId()) != null) {
+        return ramDiskReplicaLru;
+      }
+
+      // The replica no longer exists, look for the next one.
+    }
+    return null;
+  }
+
+  @Override
+  synchronized void reenqueueReplicaNotPersisted(final RamDiskReplica ramDiskReplicaLru) {
+    replicasNotPersisted.add((RamDiskReplicaLru) ramDiskReplicaLru);
+  }
+
+  @Override
+  synchronized int numReplicasNotPersisted() {
+    return replicasNotPersisted.size();
+  }
+
+  @Override
+  synchronized RamDiskReplicaLru getNextCandidateForEviction() {
+    Iterator it = replicasPersisted.values().iterator();
+    while (it.hasNext()) {
+      RamDiskReplicaLru ramDiskReplicaLru = (RamDiskReplicaLru) it.next();
+      it.remove();
+
+      Map<Long, RamDiskReplicaLru> replicaMap =
+          replicaMaps.get(ramDiskReplicaLru.getBlockPoolId());
+
+      if (replicaMap != null && replicaMap.get(ramDiskReplicaLru.getBlockId()) != null) {
+        return ramDiskReplicaLru;
+      }
+
+      // The replica no longer exists, look for the next one.
+    }
+    return null;
+  }
+
+  /**
+   * Discard any state we are tracking for the given replica. This could mean
+   * the block is either deleted from the block space or the replica is no longer
+   * on transient storage.
+   *
+   * @param deleteSavedCopies true if we should delete the saved copies on
+   *                          persistent storage. This should be set by the
+   *                          caller when the block is no longer needed.
+   */
+  @Override
+  synchronized void discardReplica(
+      final String bpid, final long blockId,
+      boolean deleteSavedCopies) {
+    Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+
+    if (map == null) {
+      return;
+    }
+
+    RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
+
+    if (ramDiskReplicaLru == null) {
+      return;
+    }
+
+    if (deleteSavedCopies) {
+      ramDiskReplicaLru.deleteSavedFiles();
+    }
+
+    map.remove(blockId);
+    replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
+
+    // replicasNotPersisted will be lazily GC'ed.
+  }
+
+  @Override
+  synchronized RamDiskReplica getReplica(
+    final String bpid, final long blockId) {
+    Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+
+    if (map == null) {
+      return null;
+    }
+
+    return map.get(blockId);
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
new file mode 100644
index 0000000..7507925
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicLong;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class RamDiskReplicaTracker {
+  static final Log LOG = LogFactory.getLog(RamDiskReplicaTracker.class);
+
+  FsDatasetImpl fsDataset;
+
+  static class RamDiskReplica implements Comparable<RamDiskReplica>  {
+    private final String bpid;
+    private final long blockId;
+    private File savedBlockFile;
+    private File savedMetaFile;
+
+    private long creationTime;
+    protected AtomicLong numReads = new AtomicLong(0);
+    protected boolean isPersisted;
+
+    /**
+     * RAM_DISK volume that holds the original replica.
+     */
+    final FsVolumeSpi ramDiskVolume;
+
+    /**
+     * Persistent volume that holds or will hold the saved replica.
+     */
+    FsVolumeImpl lazyPersistVolume;
+
+    RamDiskReplica(final String bpid, final long blockId,
+                   final FsVolumeImpl ramDiskVolume) {
+      this.bpid = bpid;
+      this.blockId = blockId;
+      this.ramDiskVolume = ramDiskVolume;
+      lazyPersistVolume = null;
+      savedMetaFile = null;
+      savedBlockFile = null;
+      creationTime = Time.monotonicNow();
+      isPersisted = false;
+    }
+
+    long getBlockId() {
+      return blockId;
+    }
+
+    String getBlockPoolId() {
+      return bpid;
+    }
+
+    FsVolumeImpl getLazyPersistVolume() {
+      return lazyPersistVolume;
+    }
+
+    void setLazyPersistVolume(FsVolumeImpl volume) {
+      Preconditions.checkState(!volume.isTransientStorage());
+      this.lazyPersistVolume = volume;
+    }
+
+    File getSavedBlockFile() {
+      return savedBlockFile;
+    }
+
+    File getSavedMetaFile() {
+      return savedMetaFile;
+    }
+
+    long getNumReads() { return numReads.get(); }
+
+    long getCreationTime() { return creationTime; }
+
+    boolean getIsPersisted() {return isPersisted; }
+
+    /**
+     * Record the saved meta and block files on the given volume.
+     *
+     * @param files Meta and block files, in that order.
+     */
+    void recordSavedBlockFiles(File[] files) {
+      this.savedMetaFile = files[0];
+      this.savedBlockFile = files[1];
+    }
+
+    @Override
+    public int hashCode() {
+      return bpid.hashCode() ^ (int) blockId;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (this == other) {
+        return true;
+      }
+
+      if (other == null || getClass() != other.getClass()) {
+        return false;
+      }
+
+      RamDiskReplica otherState = (RamDiskReplica) other;
+      return (otherState.bpid.equals(bpid) && otherState.blockId == blockId);
+    }
+
+    // Delete the saved meta and block files. Failure to delete can be
+    // ignored, the directory scanner will retry the deletion later.
+    void deleteSavedFiles() {
+      if (savedBlockFile != null) {
+        if (!savedBlockFile.delete()) {
+          LOG.warn("Failed to delete block file " + savedBlockFile);
+        }
+        savedBlockFile = null;
+      }
+
+      if (savedMetaFile != null) {
+        if (!savedMetaFile.delete()) {
+          LOG.warn("Failed to delete meta file " + savedMetaFile);
+        }
+        savedMetaFile = null;
+      }
+    }
+
+    @Override
+    public int compareTo(RamDiskReplica other) {
+      int bpidResult = bpid.compareTo(other.bpid);
+      if (bpidResult == 0)
+        if (blockId == other.blockId) {
+          return 0;
+        } else if (blockId < other.blockId) {
+          return -1;
+        } else {
+          return 1;
+        }
+      return bpidResult;
+    }
+
+    @Override
+    public String toString() {
+      return "[BlockPoolID=" + bpid + "; BlockId=" + blockId + "]";
+    }
+  }
+
+  /**
+   * Get an instance of the configured RamDiskReplicaTracker based on the
+   * the configuration property
+   * {@link org.apache.hadoop.hdfs.DFSConfigKeys#DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY}.
+   *
+   * @param conf the configuration to be used
+   * @param dataset the FsDataset object.
+   * @return an instance of RamDiskReplicaTracker
+   */
+  static RamDiskReplicaTracker getInstance(final Configuration conf,
+                                           final FsDatasetImpl fsDataset) {
+    final Class<? extends RamDiskReplicaTracker> trackerClass = conf.getClass(
+        DFSConfigKeys.DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY,
+        DFSConfigKeys.DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT,
+        RamDiskReplicaTracker.class);
+    final RamDiskReplicaTracker tracker = ReflectionUtils.newInstance(
+        trackerClass, conf);
+    tracker.initialize(fsDataset);
+    return tracker;
+  }
+
+  void initialize(final FsDatasetImpl fsDataset) {
+    this.fsDataset = fsDataset;
+  }
+
+  /**
+   * Start tracking a new finalized replica on RAM disk.
+   *
+   * @param transientVolume RAM disk volume that stores the replica.
+   */
+  abstract void addReplica(final String bpid, final long blockId,
+                           final FsVolumeImpl transientVolume);
+
+  /**
+   * Invoked when a replica is opened by a client. This may be used as
+   * a heuristic by the eviction scheme.
+   */
+  abstract void touch(final String bpid, final long blockId);
+
+  /**
+   * Get the next replica to write to persistent storage.
+   */
+  abstract RamDiskReplica dequeueNextReplicaToPersist();
+
+  /**
+   * Invoked if a replica that was previously dequeued for persistence
+   * could not be successfully persisted. Add it back so it can be retried
+   * later.
+   */
+  abstract void reenqueueReplicaNotPersisted(
+      final RamDiskReplica ramDiskReplica);
+
+  /**
+   * Invoked when the Lazy persist operation is started by the DataNode.
+   * @param checkpointVolume
+   */
+  abstract void recordStartLazyPersist(
+      final String bpid, final long blockId, FsVolumeImpl checkpointVolume);
+
+  /**
+   * Invoked when the Lazy persist operation is complete.
+   *
+   * @param savedFiles The saved meta and block files, in that order.
+   */
+  abstract void recordEndLazyPersist(
+      final String bpid, final long blockId, final File[] savedFiles);
+
+  /**
+   * Return a candidate replica to remove from RAM Disk. The exact replica
+   * to be returned may depend on the eviction scheme utilized.
+   *
+   * @return
+   */
+  abstract RamDiskReplica getNextCandidateForEviction();
+
+  /**
+   * Return the number of replicas pending persistence to disk.
+   */
+  abstract int numReplicasNotPersisted();
+
+  /**
+   * Discard all state we are tracking for the given replica.
+   */
+  abstract void discardReplica(
+      final String bpid, final long blockId,
+      boolean deleteSavedCopies);
+
+  /**
+   * Return RamDiskReplica info given block pool id and block id
+   * Return null if it does not exist in RamDisk
+   */
+  abstract RamDiskReplica getReplica(
+    final String bpid, final long blockId);
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index b536e7e..57f12db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -65,6 +65,26 @@
   @Metric MutableCounterLong writesFromRemoteClient;
   @Metric MutableCounterLong blocksGetLocalPathInfo;
 
+  // RamDisk metrics on read/write
+  @Metric MutableCounterLong ramDiskBlocksWrite;
+  @Metric MutableCounterLong ramDiskBlocksWriteFallback;
+  @Metric MutableCounterLong ramDiskBytesWrite;
+  @Metric MutableCounterLong ramDiskBlocksReadHits;
+
+  // RamDisk metrics on eviction
+  @Metric MutableCounterLong ramDiskBlocksEvicted;
+  @Metric MutableCounterLong ramDiskBlocksEvictedWithoutRead;
+  @Metric MutableRate        ramDiskBlocksEvictionWindowMs;
+  final MutableQuantiles[]   ramDiskBlocksEvictionWindowMsQuantiles;
+
+
+  // RamDisk metrics on lazy persist
+  @Metric MutableCounterLong ramDiskBlocksLazyPersisted;
+  @Metric MutableCounterLong ramDiskBlocksDeletedBeforeLazyPersisted;
+  @Metric MutableCounterLong ramDiskBytesLazyPersisted;
+  @Metric MutableRate        ramDiskBlocksLazyPersistWindowMs;
+  final MutableQuantiles[]   ramDiskBlocksLazyPersistWindowMsQuantiles;
+
   @Metric MutableCounterLong fsyncCount;
   
   @Metric MutableCounterLong volumeFailures;
@@ -107,6 +127,8 @@
     fsyncNanosQuantiles = new MutableQuantiles[len];
     sendDataPacketBlockedOnNetworkNanosQuantiles = new MutableQuantiles[len];
     sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len];
+    ramDiskBlocksEvictionWindowMsQuantiles = new MutableQuantiles[len];
+    ramDiskBlocksLazyPersistWindowMsQuantiles = new MutableQuantiles[len];
     
     for (int i = 0; i < len; i++) {
       int interval = intervals[i];
@@ -127,6 +149,14 @@
           "sendDataPacketTransferNanos" + interval + "s", 
           "Time reading from disk and writing to network while sending " +
           "a packet in ns", "ops", "latency", interval);
+      ramDiskBlocksEvictionWindowMsQuantiles[i] = registry.newQuantiles(
+          "ramDiskBlocksEvictionWindows" + interval + "s",
+          "Time between the RamDisk block write and eviction in ms",
+          "ops", "latency", interval);
+      ramDiskBlocksLazyPersistWindowMsQuantiles[i] = registry.newQuantiles(
+          "ramDiskBlocksLazyPersistWindows" + interval + "s",
+          "Time between the RamDisk block write and disk persist in ms",
+          "ops", "latency", interval);
     }
   }
 
@@ -284,4 +314,54 @@
       q.add(latencyNanos);
     }
   }
+
+  public void incrRamDiskBlocksWrite() {
+    ramDiskBlocksWrite.incr();
+  }
+
+  public void incrRamDiskBlocksWriteFallback() {
+    ramDiskBlocksWriteFallback.incr();
+  }
+
+  public void addRamDiskBytesWrite(long bytes) {
+    ramDiskBytesWrite.incr(bytes);
+  }
+
+  public void incrRamDiskBlocksReadHits() {
+    ramDiskBlocksReadHits.incr();
+  }
+
+  public void incrRamDiskBlocksEvicted() {
+    ramDiskBlocksEvicted.incr();
+  }
+
+  public void incrRamDiskBlocksEvictedWithoutRead() {
+    ramDiskBlocksEvictedWithoutRead.incr();
+  }
+
+  public void addRamDiskBlocksEvictionWindowMs(long latencyMs) {
+    ramDiskBlocksEvictionWindowMs.add(latencyMs);
+    for (MutableQuantiles q : ramDiskBlocksEvictionWindowMsQuantiles) {
+      q.add(latencyMs);
+    }
+  }
+
+  public void incrRamDiskBlocksLazyPersisted() {
+    ramDiskBlocksLazyPersisted.incr();
+  }
+
+  public void incrRamDiskBlocksDeletedBeforeLazyPersisted() {
+    ramDiskBlocksDeletedBeforeLazyPersisted.incr();
+  }
+
+  public void incrRamDiskBytesLazyPersisted(long bytes) {
+    ramDiskBytesLazyPersisted.incr(bytes);
+  }
+
+  public void addRamDiskBlocksLazyPersistWindowMs(long latencyMs) {
+    ramDiskBlocksLazyPersistWindowMs.add(latencyMs);
+    for (MutableQuantiles q : ramDiskBlocksLazyPersistWindowMsQuantiles) {
+      q.add(latencyMs);
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index 04133bd..59814af 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -69,7 +69,7 @@
         = new EnumMap<StorageType, List<StorageGroup>>(StorageType.class);
     
     private StorageMap() {
-      for(StorageType t : StorageType.asList()) {
+      for(StorageType t : StorageType.getMovableTypes()) {
         targetStorageTypeMap.put(t, new LinkedList<StorageGroup>());
       }
     }
@@ -130,7 +130,7 @@
     final List<DatanodeStorageReport> reports = dispatcher.init();
     for(DatanodeStorageReport r : reports) {
       final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
-      for(StorageType t : StorageType.asList()) {
+      for(StorageType t : StorageType.getMovableTypes()) {
         final Source source = dn.addSource(t, Long.MAX_VALUE, dispatcher);
         final long maxRemaining = getMaxRemaining(r, t);
         final StorageGroup target = maxRemaining > 0L ? dn.addTarget(t,
@@ -348,7 +348,7 @@
         LocatedBlock lb = lbs.get(i);
         final StorageTypeDiff diff = new StorageTypeDiff(types,
             lb.getStorageTypes());
-        if (!diff.removeOverlap()) {
+        if (!diff.removeOverlap(true)) {
           if (scheduleMoves4Block(diff, lb)) {
             hasRemaining |= (diff.existing.size() > 1 &&
                 diff.expected.size() > 1);
@@ -452,22 +452,38 @@
       this.expected = new LinkedList<StorageType>(expected);
       this.existing = new LinkedList<StorageType>(Arrays.asList(existing));
     }
-    
+
     /**
      * Remove the overlap between the expected types and the existing types.
-     * @return if the existing types or the expected types is empty after
+     * @param  ignoreNonMovable ignore non-movable storage types
+     *         by removing them from both expected and existing storage type list
+     *         to prevent non-movable storage from being moved.
+     * @returns if the existing types or the expected types is empty after
      *         removing the overlap.
      */
-    boolean removeOverlap() { 
+    boolean removeOverlap(boolean ignoreNonMovable) {
       for(Iterator<StorageType> i = existing.iterator(); i.hasNext(); ) {
         final StorageType t = i.next();
         if (expected.remove(t)) {
           i.remove();
         }
       }
+      if (ignoreNonMovable) {
+        removeNonMovable(existing);
+        removeNonMovable(expected);
+      }
       return expected.isEmpty() || existing.isEmpty();
     }
-    
+
+    void removeNonMovable(List<StorageType> types) {
+      for (Iterator<StorageType> i = types.iterator(); i.hasNext(); ) {
+        final StorageType t = i.next();
+        if (!t.isMovable()) {
+          i.remove();
+        }
+      }
+    }
+
     @Override
     public String toString() {
       return getClass().getSimpleName() + "{expected=" + expected
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index aab2183..0d999e3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -60,6 +60,7 @@
 import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
@@ -1036,6 +1037,20 @@
     }
     final int snapshotId = iip.getLatestSnapshotId();
     if (inode.isFile()) {
+      BlockStoragePolicy newPolicy = getBlockManager().getStoragePolicy(policyId);
+      if (newPolicy.isCopyOnCreateFile()) {
+        throw new HadoopIllegalArgumentException(
+            "Policy " + newPolicy + " cannot be set after file creation.");
+      }
+
+      BlockStoragePolicy currentPolicy =
+          getBlockManager().getStoragePolicy(inode.getLocalStoragePolicyID());
+
+      if (currentPolicy != null && currentPolicy.isCopyOnCreateFile()) {
+        throw new HadoopIllegalArgumentException(
+            "Existing policy " + currentPolicy.getName() +
+                " cannot be changed after file creation.");
+      }
       inode.asFile().setStoragePolicyID(policyId, snapshotId);
     } else if (inode.isDirectory()) {
       setDirStoragePolicy(inode.asDirectory(), policyId, snapshotId);  
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
index 98ea7d6..c9eb9fe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
@@ -415,6 +415,7 @@
     
     private AddCloseOp(FSEditLogOpCodes opCode) {
       super(opCode);
+      storagePolicyId = BlockStoragePolicySuite.ID_UNSPECIFIED;
       assert(opCode == OP_ADD || opCode == OP_CLOSE);
     }
     
@@ -709,6 +710,7 @@
       this.mtime = Long.parseLong(st.getValue("MTIME"));
       this.atime = Long.parseLong(st.getValue("ATIME"));
       this.blockSize = Long.parseLong(st.getValue("BLOCKSIZE"));
+
       this.clientName = st.getValue("CLIENT_NAME");
       this.clientMachine = st.getValue("CLIENT_MACHINE");
       this.overwrite = Boolean.parseBoolean(st.getValueOrNull("OVERWRITE"));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
index af3cf2c..b1e7dfe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
@@ -783,8 +783,9 @@
       if (counter != null) {
         counter.increment();
       }
+
       final INodeFile file = new INodeFile(inodeId, localName, permissions,
-          modificationTime, atime, blocks, replication, blockSize, (byte)0);
+          modificationTime, atime, blocks, replication, blockSize);
       if (underConstruction) {
         file.toUnderConstruction(clientName, clientMachine);
       }
@@ -885,7 +886,7 @@
       final long preferredBlockSize = in.readLong();
 
       return new INodeFileAttributes.SnapshotCopy(name, permissions, null, modificationTime,
-          accessTime, replication, preferredBlockSize, (byte)0, null);
+          accessTime, replication, preferredBlockSize, (byte) 0, null);
     }
 
     public INodeDirectoryAttributes loadINodeDirectoryAttributes(DataInput in)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
index d3472fc..16636a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
@@ -148,14 +148,16 @@
     int numLocs = in.readInt();
     assert numLocs == 0 : "Unexpected block locations";
 
+    // Images in the pre-protobuf format will not have the lazyPersist flag,
+    // so it is safe to pass false always.
     INodeFile file = new INodeFile(inodeId, name, perm, modificationTime,
-        modificationTime, blocks, blockReplication, preferredBlockSize, (byte)0);
+        modificationTime, blocks, blockReplication, preferredBlockSize);
     file.toUnderConstruction(clientName, clientMachine);
     return file;
   }
 
   // Helper function that writes an INodeUnderConstruction
-  // into the input stream
+  // into the output stream
   //
   static void writeINodeUnderConstruction(DataOutputStream out, INodeFile cons,
       String path) throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index a3d8dc7..baea1a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -60,6 +60,8 @@
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
@@ -360,7 +362,8 @@
       Path symlink = stat.isSymlink() ? new Path(stat.getSymlink()) : null;
       Path path = dst != null ? new Path(dst) : new Path(src);
       status = new FileStatus(stat.getLen(), stat.isDir(),
-          stat.getReplication(), stat.getBlockSize(), stat.getModificationTime(),
+          stat.getReplication(), stat.getBlockSize(),
+          stat.getModificationTime(),
           stat.getAccessTime(), stat.getPermission(), stat.getOwner(),
           stat.getGroup(), symlink, path);
     }
@@ -445,6 +448,10 @@
   Daemon nnrmthread = null; // NamenodeResourceMonitor thread
 
   Daemon nnEditLogRoller = null; // NameNodeEditLogRoller thread
+
+  // A daemon to periodically clean up corrupt lazyPersist files
+  // from the name space.
+  Daemon lazyPersistFileScrubber = null;
   /**
    * When an active namenode will roll its own edit log, in # edits
    */
@@ -454,6 +461,12 @@
    */
   private final int editLogRollerInterval;
 
+  /**
+   * How frequently we scan and unlink corrupt lazyPersist files.
+   * (In seconds)
+   */
+  private final int lazyPersistFileScrubIntervalSec;
+
   private volatile boolean hasResourcesAvailable = false;
   private volatile boolean fsRunning = true;
   
@@ -863,6 +876,15 @@
           DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT);
       this.inodeId = new INodeId();
       
+      this.lazyPersistFileScrubIntervalSec = conf.getInt(
+          DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
+          DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT);
+
+      if (this.lazyPersistFileScrubIntervalSec == 0) {
+        throw new IllegalArgumentException(
+            DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC + " must be non-zero.");
+      }
+
       // For testing purposes, allow the DT secret manager to be started regardless
       // of whether security is enabled.
       alwaysUseDelegationTokensForTests = conf.getBoolean(
@@ -932,7 +954,7 @@
   @VisibleForTesting
   static RetryCache initRetryCache(Configuration conf) {
     boolean enable = conf.getBoolean(DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY,
-        DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT);
+                                     DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT);
     LOG.info("Retry cache on namenode is " + (enable ? "enabled" : "disabled"));
     if (enable) {
       float heapPercent = conf.getFloat(
@@ -1161,6 +1183,12 @@
           editLogRollerThreshold, editLogRollerInterval));
       nnEditLogRoller.start();
 
+      if (lazyPersistFileScrubIntervalSec > 0) {
+        lazyPersistFileScrubber = new Daemon(new LazyPersistFileScrubber(
+            lazyPersistFileScrubIntervalSec));
+        lazyPersistFileScrubber.start();
+      }
+
       cacheManager.startMonitorThread();
       blockManager.getDatanodeManager().setShouldSendCachingCommands(true);
     } finally {
@@ -1214,6 +1242,10 @@
         ((NameNodeEditLogRoller)nnEditLogRoller.getRunnable()).stop();
         nnEditLogRoller.interrupt();
       }
+      if (lazyPersistFileScrubber != null) {
+        ((LazyPersistFileScrubber) lazyPersistFileScrubber.getRunnable()).stop();
+        lazyPersistFileScrubber.interrupt();
+      }
       if (dir != null && getFSImage() != null) {
         if (getFSImage().editLog != null) {
           getFSImage().editLog.close();
@@ -2517,6 +2549,7 @@
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     boolean create = flag.contains(CreateFlag.CREATE);
     boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
+    boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST);
 
     waitForLoadingFSImage();
 
@@ -2576,7 +2609,7 @@
       src = resolvePath(src, pathComponents);
       toRemoveBlocks = startFileInternal(pc, src, permissions, holder, 
           clientMachine, create, overwrite, createParent, replication, 
-          blockSize, suite, protocolVersion, edek, logRetryCache);
+          blockSize, isLazyPersist, suite, protocolVersion, edek, logRetryCache);
       stat = dir.getFileInfo(src, false,
           FSDirectory.isReservedRawName(srcArg), true);
     } catch (StandbyException se) {
@@ -2612,7 +2645,7 @@
       String src, PermissionStatus permissions, String holder, 
       String clientMachine, boolean create, boolean overwrite, 
       boolean createParent, short replication, long blockSize, 
-      CipherSuite suite, CryptoProtocolVersion version,
+      boolean isLazyPersist, CipherSuite suite, CryptoProtocolVersion version,
       EncryptedKeyVersion edek, boolean logRetryEntry)
       throws FileAlreadyExistsException, AccessControlException,
       UnresolvedLinkException, FileNotFoundException,
@@ -2693,7 +2726,7 @@
       if (parent != null && mkdirsRecursively(parent.toString(),
               permissions, true, now())) {
         newNode = dir.addFile(src, permissions, replication, blockSize,
-                holder, clientMachine);
+                              holder, clientMachine);
       }
 
       if (newNode == null) {
@@ -2708,6 +2741,8 @@
         newNode = dir.getInode(newNode.getId()).asFile();
       }
 
+      setNewINodeStoragePolicy(newNode, iip, isLazyPersist);
+
       // record file record in log, record new generation stamp
       getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
       if (NameNode.stateChangeLog.isDebugEnabled()) {
@@ -2722,6 +2757,37 @@
     }
   }
 
+  private void setNewINodeStoragePolicy(INodeFile inode,
+                                        INodesInPath iip,
+                                        boolean isLazyPersist)
+      throws IOException {
+
+    if (isLazyPersist) {
+      BlockStoragePolicy lpPolicy =
+          blockManager.getStoragePolicy("LAZY_PERSIST");
+
+      // Set LAZY_PERSIST storage policy if the flag was passed to
+      // CreateFile.
+      if (lpPolicy == null) {
+        throw new HadoopIllegalArgumentException(
+            "The LAZY_PERSIST storage policy has been disabled " +
+            "by the administrator.");
+      }
+      inode.setStoragePolicyID(lpPolicy.getId(),
+                                 iip.getLatestSnapshotId());
+    } else {
+      BlockStoragePolicy effectivePolicy =
+          blockManager.getStoragePolicy(inode.getStoragePolicyID());
+
+      if (effectivePolicy != null &&
+          effectivePolicy.isCopyOnCreateFile()) {
+        // Copy effective policy from ancestor directory to current file.
+        inode.setStoragePolicyID(effectivePolicy.getId(),
+                                 iip.getLatestSnapshotId());
+      }
+    }
+  }
+
   /**
    * Append to an existing file for append.
    * <p>
@@ -2761,6 +2827,14 @@
           + src + " for client " + clientMachine);
       }
       INodeFile myFile = INodeFile.valueOf(inode, src, true);
+      final BlockStoragePolicy lpPolicy =
+          blockManager.getStoragePolicy("LAZY_PERSIST");
+
+      if (lpPolicy != null &&
+          lpPolicy.getId() == myFile.getStoragePolicyID()) {
+        throw new UnsupportedOperationException(
+            "Cannot append to lazy persist file " + src);
+      }
       // Opening an existing file for write - may need to recover lease.
       recoverLeaseInternal(myFile, src, holder, clientMachine, false);
       
@@ -5088,6 +5162,73 @@
     }
   }
 
+  /**
+   * Daemon to periodically scan the namespace for lazyPersist files
+   * with missing blocks and unlink them.
+   */
+  class LazyPersistFileScrubber implements Runnable {
+    private volatile boolean shouldRun = true;
+    final int scrubIntervalSec;
+    public LazyPersistFileScrubber(final int scrubIntervalSec) {
+      this.scrubIntervalSec = scrubIntervalSec;
+    }
+
+    /**
+     * Periodically go over the list of lazyPersist files with missing
+     * blocks and unlink them from the namespace.
+     */
+    private void clearCorruptLazyPersistFiles()
+        throws SafeModeException, AccessControlException,
+        UnresolvedLinkException, IOException {
+
+      BlockStoragePolicy lpPolicy = blockManager.getStoragePolicy("LAZY_PERSIST");
+
+      List<BlockCollection> filesToDelete = new ArrayList<BlockCollection>();
+
+      writeLock();
+
+      try {
+        final Iterator<Block> it = blockManager.getCorruptReplicaBlockIterator();
+
+        while (it.hasNext()) {
+          Block b = it.next();
+          BlockInfo blockInfo = blockManager.getStoredBlock(b);
+          if (blockInfo.getBlockCollection().getStoragePolicyID() == lpPolicy.getId()) {
+            filesToDelete.add(blockInfo.getBlockCollection());
+          }
+        }
+
+        for (BlockCollection bc : filesToDelete) {
+          LOG.warn("Removing lazyPersist file " + bc.getName() + " with no replicas.");
+          deleteInternal(bc.getName(), false, false, false);
+        }
+      } finally {
+        writeUnlock();
+      }
+    }
+
+    @Override
+    public void run() {
+      while (fsRunning && shouldRun) {
+        try {
+          clearCorruptLazyPersistFiles();
+          Thread.sleep(scrubIntervalSec * 1000);
+        } catch (InterruptedException e) {
+          FSNamesystem.LOG.info(
+              "LazyPersistFileScrubber was interrupted, exiting");
+          break;
+        } catch (Exception e) {
+          FSNamesystem.LOG.error(
+              "Ignoring exception in LazyPersistFileScrubber:", e);
+        }
+      }
+    }
+
+    public void stop() {
+      shouldRun = false;
+    }
+  }
+
   public FSImage getFSImage() {
     return fsImage;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index 583c193..dde36c30 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -108,6 +108,7 @@
       h = STORAGE_POLICY_ID.BITS.combine(storagePolicyID, h);
       return h;
     }
+
   }
 
   private long header = 0L;
@@ -115,11 +116,17 @@
   private BlockInfo[] blocks;
 
   INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime,
+            long atime, BlockInfo[] blklist, short replication,
+            long preferredBlockSize) {
+    this(id, name, permissions, mtime, atime, blklist, replication,
+         preferredBlockSize, (byte) 0);
+  }
+
+  INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime,
       long atime, BlockInfo[] blklist, short replication,
       long preferredBlockSize, byte storagePolicyID) {
     super(id, name, permissions, mtime, atime);
-    header = HeaderFormat.toLong(preferredBlockSize, replication,
-        storagePolicyID);
+    header = HeaderFormat.toLong(preferredBlockSize, replication, storagePolicyID);
     this.blocks = blklist;
   }
   
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
index f9d2700..0f85bab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
@@ -21,7 +21,6 @@
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile.HeaderFormat;
 import org.apache.hadoop.hdfs.server.namenode.XAttrFeature;
-
 /**
  * The attributes of a file.
  */
@@ -47,11 +46,12 @@
 
     public SnapshotCopy(byte[] name, PermissionStatus permissions,
         AclFeature aclFeature, long modificationTime, long accessTime,
-        short replication, long preferredBlockSize, byte storagePolicyID,
-        XAttrFeature xAttrsFeature) {
+        short replication, long preferredBlockSize,
+        byte storagePolicyID, XAttrFeature xAttrsFeature) {
       super(name, permissions, aclFeature, modificationTime, accessTime, 
           xAttrsFeature);
-      header = HeaderFormat.toLong(preferredBlockSize, replication, storagePolicyID);
+      header = HeaderFormat.toLong(preferredBlockSize, replication,
+          storagePolicyID);
     }
 
     public SnapshotCopy(INodeFile file) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java
index bafef25..bbd545a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java
@@ -22,6 +22,7 @@
 import java.util.Arrays;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.regex.Pattern;
 
 import javax.management.AttributeNotFoundException;
 import javax.management.MBeanAttributeInfo;
@@ -109,6 +110,23 @@
     }
   }
 
+  public void printAllMatchedAttributes(String attrRegExp) throws Exception {
+    err("List of the keys matching " + attrRegExp + " :");
+    Object val = null;
+    Pattern p = Pattern.compile(attrRegExp);
+    for (ObjectName oname : hadoopObjectNames) {
+      err(">>>>>>>>jmx name: " + oname.getCanonicalKeyPropertyListString());
+      MBeanInfo mbinfo = mbsc.getMBeanInfo(oname);
+      MBeanAttributeInfo[] mbinfos = mbinfo.getAttributes();
+      for (MBeanAttributeInfo mb : mbinfos) {
+        if (p.matcher(mb.getName()).lookingAt()) {
+          val = mbsc.getAttribute(oname, mb.getName());
+          System.out.format(format, mb.getName(), (val == null) ? "" : val.toString());
+        }
+      }
+    }
+  }
+
   /**
    * get single value by key
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
index 0eb7c61..0a2ae26 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
@@ -257,6 +257,8 @@
     final long aTime = (Long) m.get("accessTime");
     final long mTime = (Long) m.get("modificationTime");
     final long blockSize = (Long) m.get("blockSize");
+    final boolean isLazyPersist = m.containsKey("lazyPersist")
+        ? (Boolean) m.get("lazyPersist") : false;
     final short replication = (short) (long) (Long) m.get("replication");
     final long fileId = m.containsKey("fileId") ? (Long) m.get("fileId")
         : INodeId.GRANDFATHER_INODE_ID;
@@ -267,8 +269,9 @@
         (byte) (long) (Long) m.get("storagePolicy") :
           BlockStoragePolicySuite.ID_UNSPECIFIED;
     return new HdfsFileStatus(len, type == PathType.DIRECTORY, replication,
-        blockSize, mTime, aTime, permission, owner, group, symlink,
-        DFSUtil.string2Bytes(localName), fileId, childrenNum, null, storagePolicy);
+        blockSize, mTime, aTime, permission, owner, group,
+        symlink, DFSUtil.string2Bytes(localName), fileId, childrenNum, null,
+        storagePolicy);
   }
 
   /** Convert an ExtendedBlock to a Json map. */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
index 14e8c68..232c264 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
@@ -65,6 +65,7 @@
   CREATE = 0x01;    // Create a file
   OVERWRITE = 0x02; // Truncate/overwrite a file. Same as POSIX O_TRUNC
   APPEND = 0x04;    // Append to a file
+  LAZY_PERSIST = 0x10; // File with reduced durability guarantees.
 }
 
 message CreateRequestProto {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
index 098d10a..fb774b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
@@ -115,6 +115,13 @@
   optional CachingStrategyProto cachingStrategy = 10;
   optional StorageTypeProto storageType = 11 [default = DISK];
   repeated StorageTypeProto targetStorageTypes = 12;
+
+  /**
+   * Hint to the DataNode that the block can be allocated on transient
+   * storage i.e. memory and written to disk lazily. The DataNode is free
+   * to ignore this hint.
+   */
+  optional bool allowLazyPersist = 13 [default = false];
 }
   
 message OpTransferBlockProto {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
index 38e649edb..10af3b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
@@ -159,6 +159,7 @@
   DISK = 1;
   SSD = 2;
   ARCHIVE = 3;
+  RAM_DISK = 4;
 }
 
 /**
@@ -305,7 +306,6 @@
   // Optional field for fileId
   optional uint64 fileId = 13 [default = 0]; // default as an invalid id
   optional int32 childrenNum = 14 [default = -1];
-
   // Optional field for file encryption
   optional FileEncryptionInfoProto fileEncryptionInfo = 15;
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index adc80a2..54814b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -398,6 +398,16 @@
 </property>
 
 <property>
+  <name>dfs.namenode.lazypersist.file.scrub.interval.sec</name>
+  <value>300</value>
+  <description>
+    The NameNode periodically scans the namespace for LazyPersist files with
+    missing blocks and unlinks them from the namespace. This configuration key
+    controls the interval between successive scans. Set it to a negative value
+    to disable this behavior.
+  </description>
+</property>
+<property>
   <name>dfs.block.access.token.enable</name>
   <value>false</value>
   <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 7be6a49..84792b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -93,6 +93,7 @@
 import java.util.*;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.hadoop.fs.CreateFlag.*;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -285,16 +286,29 @@
   public static void createFile(FileSystem fs, Path fileName, int bufferLen,
       long fileLen, long blockSize, short replFactor, long seed)
       throws IOException {
-    assert bufferLen > 0;
-    if (!fs.mkdirs(fileName.getParent())) {
-      throw new IOException("Mkdirs failed to create " + 
-                            fileName.getParent().toString());
-    }
-    FSDataOutputStream out = null;
-    try {
-      out = fs.create(fileName, true, fs.getConf()
-        .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
-        replFactor, blockSize);
+    createFile(fs, fileName, false, bufferLen, fileLen, blockSize,
+            replFactor, seed, false);
+  }
+
+  public static void createFile(FileSystem fs, Path fileName,
+      boolean isLazyPersist, int bufferLen, long fileLen, long blockSize,
+      short replFactor, long seed, boolean flush) throws IOException {
+  assert bufferLen > 0;
+  if (!fs.mkdirs(fileName.getParent())) {
+      throw new IOException("Mkdirs failed to create " +
+                fileName.getParent().toString());
+  }
+  FSDataOutputStream out = null;
+  EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+  createFlags.add(OVERWRITE);
+  if (isLazyPersist) {
+    createFlags.add(LAZY_PERSIST);
+  }
+  try {
+      out = fs.create(fileName, FsPermission.getFileDefault(), createFlags,
+        fs.getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
+        replFactor, blockSize, null);
+
       if (fileLen > 0) {
         byte[] toWrite = new byte[bufferLen];
         Random rb = new Random(seed);
@@ -302,10 +316,13 @@
         while (bytesToWrite>0) {
           rb.nextBytes(toWrite);
           int bytesToWriteNext = (bufferLen < bytesToWrite) ? bufferLen
-              : (int) bytesToWrite;
-  
-          out.write(toWrite, 0, bytesToWriteNext);
-          bytesToWrite -= bytesToWriteNext;
+            : (int) bytesToWrite;
+
+            out.write(toWrite, 0, bytesToWriteNext);
+            bytesToWrite -= bytesToWriteNext;
+        }
+        if (flush) {
+          out.hsync();
         }
       }
     } finally {
@@ -1415,6 +1432,39 @@
   }
 
   /**
+   * Helper function that verified blocks of a file are placed on the
+   * expected storage type.
+   *
+   * @param fs The file system containing the the file.
+   * @param client The DFS client used to access the file
+   * @param path name to the file to verify
+   * @param storageType expected storage type
+   * @returns true if file exists and its blocks are located on the expected
+   *            storage type.
+   *          false otherwise.
+   */
+  public static boolean verifyFileReplicasOnStorageType(FileSystem fs,
+    DFSClient client, Path path, StorageType storageType) throws IOException {
+    if (!fs.exists(path)) {
+      LOG.info("verifyFileReplicasOnStorageType: file " + path + "does not exist");
+      return false;
+    }
+    long fileLength = client.getFileInfo(path.toString()).getLen();
+    LocatedBlocks locatedBlocks =
+      client.getLocatedBlocks(path.toString(), 0, fileLength);
+    for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+      if (locatedBlock.getStorageTypes()[0] != storageType) {
+        LOG.info("verifyFileReplicasOnStorageType: for file " + path +
+            ". Expect blk" + locatedBlock +
+          " on Type: " + storageType + ". Actual Type: " +
+          locatedBlock.getStorageTypes()[0]);
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
    * Helper function to create a key in the Key Provider. Defaults
    * to the first indexed NameNode's Key Provider.
    *
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 0512b7f..0010a75 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -1340,6 +1340,7 @@
     }
 
     int curDatanodesNum = dataNodes.size();
+    final int curDatanodesNumSaved = curDatanodesNum;
     // for mincluster's the default initialDelay for BRs is 0
     if (conf.get(DFS_BLOCKREPORT_INITIAL_DELAY_KEY) == null) {
       conf.setLong(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 0);
@@ -1390,7 +1391,8 @@
       // Set up datanode address
       setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
       if (manageDfsDirs) {
-        String dirs = makeDataNodeDirs(i, storageTypes == null ? null : storageTypes[i]);
+        String dirs = makeDataNodeDirs(i, storageTypes == null ?
+          null : storageTypes[i - curDatanodesNum]);
         dnConf.set(DFS_DATANODE_DATA_DIR_KEY, dirs);
         conf.set(DFS_DATANODE_DATA_DIR_KEY, dirs);
       }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
index 771b7bd..03317b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
@@ -69,7 +69,7 @@
   static final byte COLD = (byte) 4;
   static final byte WARM = (byte) 8;
   static final byte HOT  = (byte) 12;
-
+  static final byte LAZY_PERSIST  = (byte) 15;
 
   @Test (timeout=300000)
   public void testConfigKeyEnabled() throws IOException {
@@ -116,6 +116,9 @@
     expectedPolicyStrings.put(HOT,
         "BlockStoragePolicy{HOT:12, storageTypes=[DISK], " +
             "creationFallbacks=[], replicationFallbacks=[ARCHIVE]}");
+    expectedPolicyStrings.put(LAZY_PERSIST,
+        "BlockStoragePolicy{LAZY_PERSIST:15, storageTypes=[RAM_DISK, DISK], " +
+            "creationFallbacks=[DISK], replicationFallbacks=[DISK]}");
 
     for(byte i = 1; i < 16; i++) {
       final BlockStoragePolicy policy = POLICY_SUITE.getPolicy(i); 
@@ -1141,7 +1144,7 @@
     final DistributedFileSystem fs = cluster.getFileSystem();
     try {
       BlockStoragePolicy[] policies = fs.getStoragePolicies();
-      Assert.assertEquals(3, policies.length);
+      Assert.assertEquals(4, policies.length);
       Assert.assertEquals(POLICY_SUITE.getPolicy(COLD).toString(),
           policies[0].toString());
       Assert.assertEquals(POLICY_SUITE.getPolicy(WARM).toString(),
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSFinalize.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSFinalize.java
index 01bfb0d..39d3c96 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSFinalize.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSFinalize.java
@@ -115,9 +115,12 @@
        * the upgrade. Actually it is ok for those contents to change.
        * For now disabling block verification so that the contents are 
        * not changed.
+       * Disable duplicate replica deletion as the test intentionally
+       * mirrors the contents of storage directories.
        */
       conf = new HdfsConfiguration();
       conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
+      conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION, false);
       conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
       String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
       String[] dataNodeDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
index 104b043..bb00144 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUpgrade.java
@@ -229,6 +229,7 @@
       conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
       String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
       String[] dataNodeDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
+      conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION, false);
       
       log("Normal NameNode upgrade", numDirs);
       UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
@@ -370,6 +371,7 @@
     {
       conf = new HdfsConfiguration();
       conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);      
+      conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION, false);
       conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
       String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
       
@@ -405,6 +407,7 @@
     int numDirs = 4;
     conf = new HdfsConfiguration();
     conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);      
+    conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION, false);
     conf = UpgradeUtilities.initializeStorageStateConf(numDirs, conf);
     String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
index bcb68e9..3586551 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
@@ -524,6 +524,6 @@
         BlockTokenSecretManager.DUMMY_TOKEN, "cl",
         new DatanodeInfo[1], new StorageType[1], null, stage,
         0, block.getNumBytes(), block.getNumBytes(), newGS,
-        checksum, CachingStrategy.newDefaultStrategy());
+        checksum, CachingStrategy.newDefaultStrategy(), false);
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
index 29ac3f2..b7fdccf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
@@ -98,9 +98,10 @@
      */
     @Override
     public synchronized ReplicaInPipelineInterface createRbw(
-        StorageType storageType, ExtendedBlock b) throws IOException {
+        StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
+        throws IOException {
       assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH));
-      return super.createRbw(storageType, b);
+      return super.createRbw(storageType, b, allowLazyPersist);
     }
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
index cb85c7d..98fd59a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
@@ -448,13 +448,16 @@
         DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h2",
             AdminStates.DECOMMISSIONED),
         DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h3", 
-            AdminStates.NORMAL)
+            AdminStates.NORMAL),
+        DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h4",
+            AdminStates.NORMAL),
     };
-    String[] storageIDs = {"s1", "s2", "s3"};
+    String[] storageIDs = {"s1", "s2", "s3", "s4"};
     StorageType[] media = {
         StorageType.DISK,
         StorageType.SSD,
-        StorageType.DISK
+        StorageType.DISK,
+        StorageType.RAM_DISK
     };
     LocatedBlock lb = new LocatedBlock(
         new ExtendedBlock("bp12", 12345, 10, 53),
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index dbc3212a..751f186 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.balancer;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
+import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -41,12 +44,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.hdfs.*;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -57,6 +55,7 @@
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
 import org.apache.log4j.Level;
@@ -86,6 +85,7 @@
   static final double CAPACITY_ALLOWED_VARIANCE = 0.005;  // 0.5%
   static final double BALANCE_ALLOWED_VARIANCE = 0.11;    // 10%+delta
   static final int DEFAULT_BLOCK_SIZE = 100;
+  static final int DEFAULT_RAM_DISK_BLOCK_SIZE = 5 * 1024 * 1024;
   private static final Random r = new Random();
 
   static {
@@ -108,6 +108,15 @@
     conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
   }
 
+  static void initConfWithRamDisk(Configuration conf) {
+    conf.setLong(DFS_BLOCK_SIZE_KEY, DEFAULT_RAM_DISK_BLOCK_SIZE);
+    conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, 3);
+    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
+    conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
+    conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS, 1);
+  }
+
   /* create a file with a length of <code>fileLen</code> */
   static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen,
       short replicationFactor, int nnIndex)
@@ -1096,6 +1105,81 @@
         CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true);
   }
 
+  /*
+   * Test Balancer with Ram_Disk configured
+   * One DN has two files on RAM_DISK, other DN has no files on RAM_DISK.
+   * Then verify that the balancer does not migrate files on RAM_DISK across DN.
+   */
+  @Test(timeout=300000)
+  public void testBalancerWithRamDisk() throws Exception {
+    final int SEED = 0xFADED;
+    final short REPL_FACT = 1;
+    Configuration conf = new Configuration();
+    initConfWithRamDisk(conf);
+
+    final int defaultRamDiskCapacity = 10;
+    final int defaultDiskCapacity = 100;
+    final long ramDiskStorageLimit =
+      ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
+      (DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
+    final long diskStorageLimit =
+      ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
+      (DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
+
+    cluster = new MiniDFSCluster
+      .Builder(conf)
+      .numDataNodes(1)
+      .storageCapacities(new long[] { ramDiskStorageLimit, diskStorageLimit })
+      .storageTypes(new StorageType[] { RAM_DISK, DEFAULT })
+      .build();
+
+    try {
+      cluster.waitActive();
+      // Create few files on RAM_DISK
+      final String METHOD_NAME = GenericTestUtils.getMethodName();
+      final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+      final Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+
+      DistributedFileSystem fs = cluster.getFileSystem();
+      DFSClient client = fs.getClient();
+      DFSTestUtil.createFile(fs, path1, true,
+        DEFAULT_RAM_DISK_BLOCK_SIZE, 4 * DEFAULT_RAM_DISK_BLOCK_SIZE,
+        DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
+      DFSTestUtil.createFile(fs, path1, true,
+        DEFAULT_RAM_DISK_BLOCK_SIZE, 1 * DEFAULT_RAM_DISK_BLOCK_SIZE,
+        DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
+
+      // Sleep for a short time to allow the lazy writer thread to do its job
+      Thread.sleep(6 * 1000);
+
+      // Add another fresh DN with the same type/capacity without files on RAM_DISK
+      StorageType[][] storageTypes = new StorageType[][] {{RAM_DISK, DEFAULT}};
+      long[][] storageCapacities = new long[][]{{ramDiskStorageLimit, diskStorageLimit}};
+      cluster.startDataNodes(conf, REPL_FACT, storageTypes, true, null,
+        null, null, storageCapacities, null, false, false, false, null);
+
+      cluster.triggerHeartbeats();
+      Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+
+      // Run Balancer
+      Balancer.Parameters p = new Balancer.Parameters(
+        Parameters.DEFAULT.policy,
+        Parameters.DEFAULT.threshold,
+        Parameters.DEFAULT.nodesToBeExcluded,
+        Parameters.DEFAULT.nodesToBeIncluded);
+      final int r = Balancer.run(namenodes, p, conf);
+
+      // Validate no RAM_DISK block should be moved
+      assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+
+      // Verify files are still on RAM_DISK
+      DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path1, RAM_DISK);
+      DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path2, RAM_DISK);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   /**
    * @param args
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
index 9ea6c51..e9557da 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
@@ -364,7 +364,7 @@
     // Create a bogus new block which will not be present on the namenode.
     ExtendedBlock b = new ExtendedBlock(
         poolId, rand.nextLong(), 1024L, rand.nextLong());
-    dn.getFSDataset().createRbw(StorageType.DEFAULT, b);
+    dn.getFSDataset().createRbw(StorageType.DEFAULT, b, false);
 
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
     StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 83d93f0..d1284fe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -300,6 +300,11 @@
     public ChunkChecksum getLastChecksumAndDataLen() {
       return new ChunkChecksum(oStream.getLength(), null);
     }
+
+    @Override
+    public boolean isOnTransientStorage() {
+      return false;
+    }
   }
   
   /**
@@ -415,9 +420,66 @@
     }
   }
   
+  static class SimulatedVolume implements FsVolumeSpi {
+    private final SimulatedStorage storage;
+
+    SimulatedVolume(final SimulatedStorage storage) {
+      this.storage = storage;
+    }
+
+    @Override
+    public String getStorageID() {
+      return storage.getStorageUuid();
+    }
+
+    @Override
+    public String[] getBlockPoolList() {
+      return new String[0];
+    }
+
+    @Override
+    public long getAvailable() throws IOException {
+      return storage.getCapacity() - storage.getUsed();
+    }
+
+    @Override
+    public String getBasePath() {
+      return null;
+    }
+
+    @Override
+    public String getPath(String bpid) throws IOException {
+      return null;
+    }
+
+    @Override
+    public File getFinalizedDir(String bpid) throws IOException {
+      return null;
+    }
+
+    @Override
+    public StorageType getStorageType() {
+      return null;
+    }
+
+    @Override
+    public boolean isTransientStorage() {
+      return false;
+    }
+
+    @Override
+    public void reserveSpaceForRbw(long bytesToReserve) {
+    }
+
+    @Override
+    public void releaseReservedSpace(long bytesToRelease) {
+    }
+  }
+
   private final Map<String, Map<Block, BInfo>> blockMap
       = new HashMap<String, Map<Block,BInfo>>();
   private final SimulatedStorage storage;
+  private final SimulatedVolume volume;
   private final String datanodeUuid;
   
   public SimulatedFSDataset(DataStorage storage, Configuration conf) {
@@ -434,6 +496,7 @@
     this.storage = new SimulatedStorage(
         conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY),
         conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE));
+    this.volume = new SimulatedVolume(this.storage);
   }
 
   public synchronized void injectBlocks(String bpid,
@@ -747,7 +810,8 @@
 
   @Override // FsDatasetSpi
   public synchronized ReplicaInPipelineInterface createRbw(
-      StorageType storageType, ExtendedBlock b) throws IOException {
+      StorageType storageType, ExtendedBlock b,
+      boolean allowLazyPersist) throws IOException {
     return createTemporary(storageType, b);
   }
 
@@ -1083,7 +1147,7 @@
 
   @Override
   public void checkAndUpdate(String bpid, long blockId, File diskFile,
-      File diskMetaFile, FsVolumeSpi vol) {
+      File diskMetaFile, FsVolumeSpi vol) throws IOException {
     throw new UnsupportedOperationException();
   }
 
@@ -1116,6 +1180,11 @@
   }
 
   @Override
+  public List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public Map<String, Object> getVolumeInfoMap() {
     throw new UnsupportedOperationException();
   }
@@ -1127,7 +1196,7 @@
 
   @Override
   public FsVolumeSpi getVolume(ExtendedBlock b) {
-    throw new UnsupportedOperationException();
+    return volume;
   }
 
   @Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 67805c0..987b480 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -532,7 +532,7 @@
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
     }
-    dn.data.createRbw(StorageType.DEFAULT, block);
+    dn.data.createRbw(StorageType.DEFAULT, block, false);
     try {
       dn.syncBlock(rBlock, initBlockRecords(dn));
       fail("Sync should fail");
@@ -556,7 +556,7 @@
       LOG.debug("Running " + GenericTestUtils.getMethodName());
     }
     ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(
-        StorageType.DEFAULT, block);
+        StorageType.DEFAULT, block, false);
     ReplicaOutputStreams streams = null;
     try {
       streams = replicaInfo.createStreams(true,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataDirs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataDirs.java
index 53babb4..94af015 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataDirs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataDirs.java
@@ -44,13 +44,14 @@
     File dir1 = new File("/dir1");
     File dir2 = new File("/dir2");
     File dir3 = new File("/dir3");
+    File dir4 = new File("/dir4");
 
     // Verify that a valid string is correctly parsed, and that storage
     // type is not case-sensitive
-    String locations1 = "[disk]/dir0,[DISK]/dir1,[sSd]/dir2,[disK]/dir3";
+    String locations1 = "[disk]/dir0,[DISK]/dir1,[sSd]/dir2,[disK]/dir3,[ram_disk]/dir4";
     conf.set(DFS_DATANODE_DATA_DIR_KEY, locations1);
     locations = DataNode.getStorageLocations(conf);
-    assertThat(locations.size(), is(4));
+    assertThat(locations.size(), is(5));
     assertThat(locations.get(0).getStorageType(), is(StorageType.DISK));
     assertThat(locations.get(0).getUri(), is(dir0.toURI()));
     assertThat(locations.get(1).getStorageType(), is(StorageType.DISK));
@@ -59,6 +60,8 @@
     assertThat(locations.get(2).getUri(), is(dir2.toURI()));
     assertThat(locations.get(3).getStorageType(), is(StorageType.DISK));
     assertThat(locations.get(3).getUri(), is(dir3.toURI()));
+    assertThat(locations.get(4).getStorageType(), is(StorageType.RAM_DISK));
+    assertThat(locations.get(4).getUri(), is(dir4.toURI()));
 
     // Verify that an unrecognized storage type result in an exception.
     String locations2 = "[BadMediaType]/dir0,[ssd]/dir1,[disk]/dir2";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index bc50eaa..b7795b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -31,22 +33,21 @@
 import java.util.List;
 import java.util.Random;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.*;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Test;
 
 /**
@@ -60,22 +61,29 @@
 
   private MiniDFSCluster cluster;
   private String bpid;
+  private DFSClient client;
   private FsDatasetSpi<? extends FsVolumeSpi> fds = null;
   private DirectoryScanner scanner = null;
   private final Random rand = new Random();
   private final Random r = new Random();
+  private static final int BLOCK_LENGTH = 100;
 
   static {
-    CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100);
+    CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_LENGTH);
     CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
     CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
   }
 
   /** create a file with a length of <code>fileLen</code> */
-  private void createFile(String fileName, long fileLen) throws IOException {
+  private List<LocatedBlock> createFile(String fileNamePrefix,
+                                        long fileLen,
+                                        boolean isLazyPersist) throws IOException {
     FileSystem fs = cluster.getFileSystem();
-    Path filePath = new Path(fileName);
-    DFSTestUtil.createFile(fs, filePath, fileLen, (short) 1, r.nextLong());
+    Path filePath = new Path("/" + fileNamePrefix + ".dat");
+    DFSTestUtil.createFile(
+        fs, filePath, isLazyPersist, 1024, fileLen,
+        BLOCK_LENGTH, (short) 1, r.nextLong(), false);
+    return client.getLocatedBlocks(filePath.toString(), 0, fileLen).getLocatedBlocks();
   }
 
   /** Truncate a block file */
@@ -134,6 +142,43 @@
     return 0;
   }
 
+  /**
+   * Duplicate the given block on all volumes.
+   * @param blockId
+   * @throws IOException
+   */
+  private void duplicateBlock(long blockId) throws IOException {
+    synchronized (fds) {
+      ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
+      for (FsVolumeSpi v : fds.getVolumes()) {
+        if (v.getStorageID().equals(b.getVolume().getStorageID())) {
+          continue;
+        }
+
+        // Volume without a copy of the block. Make a copy now.
+        File sourceBlock = b.getBlockFile();
+        File sourceMeta = b.getMetaFile();
+        String sourceRoot = b.getVolume().getBasePath();
+        String destRoot = v.getBasePath();
+
+        String relativeBlockPath = new File(sourceRoot).toURI().relativize(sourceBlock.toURI()).getPath();
+        String relativeMetaPath = new File(sourceRoot).toURI().relativize(sourceMeta.toURI()).getPath();
+
+        File destBlock = new File(destRoot, relativeBlockPath);
+        File destMeta = new File(destRoot, relativeMetaPath);
+
+        destBlock.getParentFile().mkdirs();
+        FileUtils.copyFile(sourceBlock, destBlock);
+        FileUtils.copyFile(sourceMeta, destMeta);
+
+        if (destBlock.exists() && destMeta.exists()) {
+          LOG.info("Copied " + sourceBlock + " ==> " + destBlock);
+          LOG.info("Copied " + sourceMeta + " ==> " + destMeta);
+        }
+      }
+    }
+  }
+
   /** Get a random blockId that is not used already */
   private long getFreeBlockId() {
     long id = rand.nextLong();
@@ -215,7 +260,13 @@
   }
 
   private void scan(long totalBlocks, int diffsize, long missingMetaFile, long missingBlockFile,
-      long missingMemoryBlocks, long mismatchBlocks) {
+      long missingMemoryBlocks, long mismatchBlocks) throws IOException {
+    scan(totalBlocks, diffsize, missingMetaFile, missingBlockFile,
+         missingMemoryBlocks, mismatchBlocks, 0);
+  }
+
+    private void scan(long totalBlocks, int diffsize, long missingMetaFile, long missingBlockFile,
+      long missingMemoryBlocks, long mismatchBlocks, long duplicateBlocks) throws IOException {
     scanner.reconcile();
     
     assertTrue(scanner.diffs.containsKey(bpid));
@@ -229,9 +280,92 @@
     assertEquals(missingBlockFile, stats.missingBlockFile);
     assertEquals(missingMemoryBlocks, stats.missingMemoryBlocks);
     assertEquals(mismatchBlocks, stats.mismatchBlocks);
+    assertEquals(duplicateBlocks, stats.duplicateBlocks);
   }
 
-  @Test
+  @Test (timeout=300000)
+  public void testRetainBlockOnPersistentStorage() throws Exception {
+    cluster = new MiniDFSCluster
+        .Builder(CONF)
+        .storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
+        .numDataNodes(1)
+        .build();
+    try {
+      cluster.waitActive();
+      bpid = cluster.getNamesystem().getBlockPoolId();
+      fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
+      client = cluster.getFileSystem().getClient();
+      scanner = new DirectoryScanner(fds, CONF);
+      scanner.setRetainDiffs(true);
+      FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
+
+      // Add a file with 1 block
+      List<LocatedBlock> blocks =
+          createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, false);
+
+      // Ensure no difference between volumeMap and disk.
+      scan(1, 0, 0, 0, 0, 0);
+
+      // Make a copy of the block on RAM_DISK and ensure that it is
+      // picked up by the scanner.
+      duplicateBlock(blocks.get(0).getBlock().getBlockId());
+      scan(2, 1, 0, 0, 0, 0, 1);
+      verifyStorageType(blocks.get(0).getBlock().getBlockId(), false);
+      scan(1, 0, 0, 0, 0, 0);
+
+    } finally {
+      if (scanner != null) {
+        scanner.shutdown();
+        scanner = null;
+      }
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Test (timeout=300000)
+  public void testDeleteBlockOnTransientStorage() throws Exception {
+    cluster = new MiniDFSCluster
+        .Builder(CONF)
+        .storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
+        .numDataNodes(1)
+        .build();
+    try {
+      cluster.waitActive();
+      bpid = cluster.getNamesystem().getBlockPoolId();
+      fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
+      client = cluster.getFileSystem().getClient();
+      scanner = new DirectoryScanner(fds, CONF);
+      scanner.setRetainDiffs(true);
+      FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
+
+      // Create a file file on RAM_DISK
+      List<LocatedBlock> blocks =
+          createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH, true);
+
+      // Ensure no difference between volumeMap and disk.
+      scan(1, 0, 0, 0, 0, 0);
+
+      // Make a copy of the block on DEFAULT storage and ensure that it is
+      // picked up by the scanner.
+      duplicateBlock(blocks.get(0).getBlock().getBlockId());
+      scan(2, 1, 0, 0, 0, 0, 1);
+
+      // Ensure that the copy on RAM_DISK was deleted.
+      verifyStorageType(blocks.get(0).getBlock().getBlockId(), false);
+      scan(1, 0, 0, 0, 0, 0);
+
+    } finally {
+      if (scanner != null) {
+        scanner.shutdown();
+        scanner = null;
+      }
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Test (timeout=600000)
   public void testDirectoryScanner() throws Exception {
     // Run the test with and without parallel scanning
     for (int parallelism = 1; parallelism < 3; parallelism++) {
@@ -245,16 +379,17 @@
       cluster.waitActive();
       bpid = cluster.getNamesystem().getBlockPoolId();
       fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
+      client = cluster.getFileSystem().getClient();
       CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
                   parallelism);
       scanner = new DirectoryScanner(fds, CONF);
       scanner.setRetainDiffs(true);
 
       // Add files with 100 blocks
-      createFile("/tmp/t1", 10000);
+      createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH * 100, false);
       long totalBlocks = 100;
 
-      // Test1: No difference between in-memory and disk
+      // Test1: No difference between volumeMap and disk
       scan(100, 0, 0, 0, 0, 0);
 
       // Test2: block metafile is missing
@@ -355,7 +490,10 @@
       assertFalse(scanner.getRunStatus());
       
     } finally {
-      scanner.shutdown();
+      if (scanner != null) {
+        scanner.shutdown();
+        scanner = null;
+      }
       cluster.shutdown();
     }
   }
@@ -389,6 +527,13 @@
     assertEquals(genStamp, memBlock.getGenerationStamp());
   }
   
+  private void verifyStorageType(long blockId, boolean expectTransient) {
+    final ReplicaInfo memBlock;
+    memBlock = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
+    assertNotNull(memBlock);
+    assertThat(memBlock.getVolume().isTransientStorage(), is(expectTransient));
+  }
+
   private static class TestFsVolumeSpi implements FsVolumeSpi {
     @Override
     public String[] getBlockPoolList() {
@@ -426,6 +571,11 @@
     }
 
     @Override
+    public boolean isTransientStorage() {
+      return false;
+    }
+
+    @Override
     public void reserveSpaceForRbw(long bytesToReserve) {
     }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
index 4b5b6e1..f440bb6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
@@ -152,7 +152,7 @@
         BlockTokenSecretManager.DUMMY_TOKEN, "",
         new DatanodeInfo[0], new StorageType[0], null,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
-        checksum, CachingStrategy.newDefaultStrategy());
+        checksum, CachingStrategy.newDefaultStrategy(), false);
     out.flush();
 
     // close the connection before sending the content of the block
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
index bd6c3de..099a0cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
@@ -67,7 +67,7 @@
       // we pass expected len as zero, - fsdataset should use the sizeof actual
       // data written
       ReplicaInPipelineInterface bInfo = fsdataset.createRbw(
-          StorageType.DEFAULT, b);
+          StorageType.DEFAULT, b, false);
       ReplicaOutputStreams out = bInfo.createStreams(true,
           DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
       try {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
index 6bd36ed..f9e30e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
@@ -23,18 +23,19 @@
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 
 public class FsDatasetTestUtil {
 
   public static File getFile(FsDatasetSpi<?> fsd, String bpid, long bid) {
-    return ((FsDatasetImpl)fsd).getFile(bpid, bid);
+    return ((FsDatasetImpl)fsd).getFile(bpid, bid, false);
   }
 
   public static File getBlockFile(FsDatasetSpi<?> fsd, String bpid, Block b
       ) throws IOException {
-    return ((FsDatasetImpl)fsd).getBlockFile(bpid, b);
+    return ((FsDatasetImpl)fsd).getBlockFile(bpid, b.getBlockId());
   }
 
   public static File getMetaFile(FsDatasetSpi<?> fsd, String bpid, Block b)
@@ -62,4 +63,13 @@
       String bpid) {
     return ((FsDatasetImpl)fsd).volumeMap.replicas(bpid);
   }
+
+  /**
+   * Stop the lazy writer daemon that saves RAM disk files to persistent storage.
+   * @param dn
+   */
+  public static void stopLazyWriter(DataNode dn) {
+    FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset());
+    ((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop();
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 10b9f7e..7c39ca5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -147,7 +147,7 @@
     for (int i = 0; i < NUM_BLOCKS; i++) {
       String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length];
       ExtendedBlock eb = new ExtendedBlock(bpid, i);
-      dataset.createRbw(StorageType.DEFAULT, eb);
+      dataset.createRbw(StorageType.DEFAULT, eb, false);
     }
     final String[] dataDirs =
         conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
new file mode 100644
index 0000000..91deb55
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -0,0 +1,984 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.tools.JMXGet;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
+import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNot.not;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class TestLazyPersistFiles {
+  public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
+
+  static {
+    ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  private static final byte LAZY_PERSIST_POLICY_ID = (byte) 15;
+
+  private static final int THREADPOOL_SIZE = 10;
+
+  private static final short REPL_FACTOR = 1;
+  private static final int BLOCK_SIZE = 5 * 1024 * 1024;
+  private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
+  private static final long HEARTBEAT_INTERVAL_SEC = 1;
+  private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
+  private static final int LAZY_WRITER_INTERVAL_SEC = 1;
+  private static final int BUFFER_LENGTH = 4096;
+  private static final int EVICTION_LOW_WATERMARK = 1;
+  private static final String JMX_SERVICE_NAME = "DataNode";
+  private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
+
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem fs;
+  private DFSClient client;
+  private Configuration conf;
+  private JMXGet jmx;
+
+  @After
+  public void shutDownCluster() throws Exception {
+
+    // Dump all RamDisk JMX metrics before shutdown the cluster
+    printRamDiskJMXMetrics();
+
+    if (fs != null) {
+      fs.close();
+      fs = null;
+      client = null;
+    }
+
+    if (cluster != null) {
+      cluster.shutdownDataNodes();
+      cluster.shutdown();
+      cluster = null;
+    }
+
+    if (jmx != null) {
+      jmx = null;
+    }
+  }
+
+  @Test (timeout=300000)
+  public void testPolicyNotSetByDefault() throws IOException {
+    startUpCluster(false, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    makeTestFile(path, 0, false);
+    // Stat the file and check that the LAZY_PERSIST policy is not
+    // returned back.
+    HdfsFileStatus status = client.getFileInfo(path.toString());
+    assertThat(status.getStoragePolicy(), not(LAZY_PERSIST_POLICY_ID));
+  }
+
+  @Test (timeout=300000)
+  public void testPolicyPropagation() throws IOException {
+    startUpCluster(false, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    makeTestFile(path, 0, true);
+    // Stat the file and check that the lazyPersist flag is returned back.
+    HdfsFileStatus status = client.getFileInfo(path.toString());
+    assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
+  }
+
+  @Test (timeout=300000)
+  public void testPolicyPersistenceInEditLog() throws IOException {
+    startUpCluster(false, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    makeTestFile(path, 0, true);
+    cluster.restartNameNode(true);
+
+    // Stat the file and check that the lazyPersist flag is returned back.
+    HdfsFileStatus status = client.getFileInfo(path.toString());
+    assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
+  }
+
+  @Test (timeout=300000)
+  public void testPolicyPersistenceInFsImage() throws IOException {
+    startUpCluster(false, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    makeTestFile(path, 0, true);
+    // checkpoint
+    fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
+    fs.saveNamespace();
+    fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+    cluster.restartNameNode(true);
+
+    // Stat the file and check that the lazyPersist flag is returned back.
+    HdfsFileStatus status = client.getFileInfo(path.toString());
+    assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
+  }
+
+  @Test (timeout=300000)
+  public void testPlacementOnRamDisk() throws IOException {
+    startUpCluster(true, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    makeTestFile(path, BLOCK_SIZE, true);
+    ensureFileReplicasOnStorageType(path, RAM_DISK);
+  }
+
+  @Test (timeout=300000)
+  public void testPlacementOnSizeLimitedRamDisk() throws IOException {
+    startUpCluster(true, 3);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+    Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+
+    makeTestFile(path1, BLOCK_SIZE, true);
+    makeTestFile(path2, BLOCK_SIZE, true);
+
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+    ensureFileReplicasOnStorageType(path2, RAM_DISK);
+  }
+
+  /**
+   * Client tries to write LAZY_PERSIST to same DN with no RamDisk configured
+   * Write should default to disk. No error.
+   * @throws IOException
+   */
+  @Test (timeout=300000)
+  public void testFallbackToDisk() throws IOException {
+    startUpCluster(false, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    makeTestFile(path, BLOCK_SIZE, true);
+    ensureFileReplicasOnStorageType(path, DEFAULT);
+  }
+
+  /**
+   * File can not fit in RamDisk even with eviction
+   * @throws IOException
+   */
+  @Test (timeout=300000)
+  public void testFallbackToDiskFull() throws Exception {
+    startUpCluster(false, 0);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    makeTestFile(path, BLOCK_SIZE, true);
+    ensureFileReplicasOnStorageType(path, DEFAULT);
+
+    verifyRamDiskJMXMetric("RamDiskBlocksWriteFallback", 1);
+  }
+
+  /**
+   * File partially fit in RamDisk after eviction.
+   * RamDisk can fit 2 blocks. Write a file with 5 blocks.
+   * Expect 2 or less blocks are on RamDisk and 3 or more on disk.
+   * @throws IOException
+   */
+  @Test (timeout=300000)
+  public void testFallbackToDiskPartial()
+    throws IOException, InterruptedException {
+    startUpCluster(true, 2);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    makeTestFile(path, BLOCK_SIZE * 5, true);
+
+    // Sleep for a short time to allow the lazy writer thread to do its job
+    Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+    triggerBlockReport();
+
+    int numBlocksOnRamDisk = 0;
+    int numBlocksOnDisk = 0;
+
+    long fileLength = client.getFileInfo(path.toString()).getLen();
+    LocatedBlocks locatedBlocks =
+      client.getLocatedBlocks(path.toString(), 0, fileLength);
+    for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+      if (locatedBlock.getStorageTypes()[0] == RAM_DISK) {
+        numBlocksOnRamDisk++;
+      } else if (locatedBlock.getStorageTypes()[0] == DEFAULT) {
+        numBlocksOnDisk++;
+      }
+    }
+
+    // Since eviction is asynchronous, depending on the timing of eviction
+    // wrt writes, we may get 2 or less blocks on RAM disk.
+    assert(numBlocksOnRamDisk <= 2);
+    assert(numBlocksOnDisk >= 3);
+  }
+
+  /**
+   * If the only available storage is RAM_DISK and the LAZY_PERSIST flag is not
+   * specified, then block placement should fail.
+   *
+   * @throws IOException
+   */
+  @Test (timeout=300000)
+  public void testRamDiskNotChosenByDefault() throws IOException {
+    startUpCluster(true, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    try {
+      makeTestFile(path, BLOCK_SIZE, false);
+      fail("Block placement to RAM_DISK should have failed without lazyPersist flag");
+    } catch (Throwable t) {
+      LOG.info("Got expected exception ", t);
+    }
+  }
+
+  /**
+   * Append to lazy persist file is denied.
+   * @throws IOException
+   */
+  @Test (timeout=300000)
+  public void testAppendIsDenied() throws IOException {
+    startUpCluster(true, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    makeTestFile(path, BLOCK_SIZE, true);
+
+    try {
+      client.append(path.toString(), BUFFER_LENGTH, null, null).close();
+      fail("Append to LazyPersist file did not fail as expected");
+    } catch (Throwable t) {
+      LOG.info("Got expected exception ", t);
+    }
+  }
+
+  /**
+   * If one or more replicas of a lazyPersist file are lost, then the file
+   * must be discarded by the NN, instead of being kept around as a
+   * 'corrupt' file.
+   */
+  @Test (timeout=300000)
+  public void testLazyPersistFilesAreDiscarded()
+      throws IOException, InterruptedException {
+    startUpCluster(true, 2);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+
+    makeTestFile(path1, BLOCK_SIZE, true);
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+    // Stop the DataNode and sleep for the time it takes the NN to
+    // detect the DN as being dead.
+    cluster.shutdownDataNodes();
+    Thread.sleep(30000L);
+    assertThat(cluster.getNamesystem().getNumDeadDataNodes(), is(1));
+
+    // Next, wait for the replication monitor to mark the file as corrupt
+    Thread.sleep(2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000);
+
+    // Wait for the LazyPersistFileScrubber to run
+    Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000);
+
+    // Ensure that path1 does not exist anymore, whereas path2 does.
+    assert(!fs.exists(path1));
+
+    // We should have zero blocks that needs replication i.e. the one
+    // belonging to path2.
+    assertThat(cluster.getNameNode()
+                      .getNamesystem()
+                      .getBlockManager()
+                      .getUnderReplicatedBlocksCount(),
+               is(0L));
+  }
+
+  @Test (timeout=300000)
+  public void testLazyPersistBlocksAreSaved()
+      throws IOException, InterruptedException {
+    startUpCluster(true, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    // Create a test file
+    makeTestFile(path, BLOCK_SIZE * 10, true);
+    LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK);
+
+    // Sleep for a short time to allow the lazy writer thread to do its job
+    Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
+    
+    LOG.info("Verifying copy was saved to lazyPersist/");
+
+    // Make sure that there is a saved copy of the replica on persistent
+    // storage.
+    final String bpid = cluster.getNamesystem().getBlockPoolId();
+    List<? extends FsVolumeSpi> volumes =
+        cluster.getDataNodes().get(0).getFSDataset().getVolumes();
+
+    final Set<Long> persistedBlockIds = new HashSet<Long>();
+
+    // Make sure at least one non-transient volume has a saved copy of
+    // the replica.
+    for (FsVolumeSpi v : volumes) {
+      if (v.isTransientStorage()) {
+        continue;
+      }
+
+      FsVolumeImpl volume = (FsVolumeImpl) v;
+      File lazyPersistDir = volume.getBlockPoolSlice(bpid).getLazypersistDir();
+
+      for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
+        File targetDir = DatanodeUtil.idToBlockDir(lazyPersistDir, lb.getBlock().getBlockId());
+        File blockFile = new File(targetDir, lb.getBlock().getBlockName());
+        if (blockFile.exists()) {
+          // Found a persisted copy for this block!
+          boolean added = persistedBlockIds.add(lb.getBlock().getBlockId());
+          assertThat(added, is(true));
+        } else {
+          LOG.error(blockFile + " not found");
+        }
+      }
+    }
+
+    // We should have found a persisted copy for each located block.
+    assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size()));
+  }
+
+  /**
+   * RamDisk eviction after lazy persist to disk.
+   * @throws Exception
+   */
+  @Test (timeout=300000)
+  public void testRamDiskEviction() throws Exception {
+    startUpCluster(true, 1 + EVICTION_LOW_WATERMARK);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+    Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+
+    final int SEED = 0xFADED;
+    makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+    // Sleep for a short time to allow the lazy writer thread to do its job.
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+    // Create another file with a replica on RAM_DISK.
+    makeTestFile(path2, BLOCK_SIZE, true);
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+    triggerBlockReport();
+
+    // Ensure the first file was evicted to disk, the second is still on
+    // RAM_DISK.
+    ensureFileReplicasOnStorageType(path2, RAM_DISK);
+    ensureFileReplicasOnStorageType(path1, DEFAULT);
+
+    verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 1);
+    verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 1);
+  }
+
+  /**
+   * RamDisk eviction should not happen on blocks that are not yet
+   * persisted on disk.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test (timeout=300000)
+  public void testRamDiskEvictionBeforePersist()
+    throws IOException, InterruptedException {
+    startUpCluster(true, 1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+    Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+    final int SEED = 0XFADED;
+
+    // Stop lazy writer to ensure block for path1 is not persisted to disk.
+    FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
+
+    makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+    // Create second file with a replica on RAM_DISK.
+    makeTestFile(path2, BLOCK_SIZE, true);
+
+    // Eviction should not happen for block of the first file that is not
+    // persisted yet.
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+    ensureFileReplicasOnStorageType(path2, DEFAULT);
+
+    assert(fs.exists(path1));
+    assert(fs.exists(path2));
+    verifyReadRandomFile(path1, BLOCK_SIZE, SEED);
+  }
+
+  /**
+   * Validates lazy persisted blocks are evicted from RAM_DISK based on LRU.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test (timeout=300000)
+  public void testRamDiskEvictionIsLru()
+    throws Exception {
+    final int NUM_PATHS = 5;
+    startUpCluster(true, NUM_PATHS + EVICTION_LOW_WATERMARK);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path paths[] = new Path[NUM_PATHS * 2];
+
+    for (int i = 0; i < paths.length; i++) {
+      paths[i] = new Path("/" + METHOD_NAME + "." + i +".dat");
+    }
+
+    for (int i = 0; i < NUM_PATHS; i++) {
+      makeTestFile(paths[i], BLOCK_SIZE, true);
+    }
+
+    // Sleep for a short time to allow the lazy writer thread to do its job.
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+    for (int i = 0; i < NUM_PATHS; ++i) {
+      ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
+    }
+
+    // Open the files for read in a random order.
+    ArrayList<Integer> indexes = new ArrayList<Integer>(NUM_PATHS);
+    for (int i = 0; i < NUM_PATHS; ++i) {
+      indexes.add(i);
+    }
+    Collections.shuffle(indexes);
+
+    for (int i = 0; i < NUM_PATHS; ++i) {
+      LOG.info("Touching file " + paths[indexes.get(i)]);
+      DFSTestUtil.readFile(fs, paths[indexes.get(i)]);
+    }
+
+    // Create an equal number of new files ensuring that the previous
+    // files are evicted in the same order they were read.
+    for (int i = 0; i < NUM_PATHS; ++i) {
+      makeTestFile(paths[i + NUM_PATHS], BLOCK_SIZE, true);
+      triggerBlockReport();
+      Thread.sleep(3000);
+      ensureFileReplicasOnStorageType(paths[i + NUM_PATHS], RAM_DISK);
+      ensureFileReplicasOnStorageType(paths[indexes.get(i)], DEFAULT);
+      for (int j = i + 1; j < NUM_PATHS; ++j) {
+        ensureFileReplicasOnStorageType(paths[indexes.get(j)], RAM_DISK);
+      }
+    }
+
+    verifyRamDiskJMXMetric("RamDiskBlocksWrite", NUM_PATHS * 2);
+    verifyRamDiskJMXMetric("RamDiskBlocksWriteFallback", 0);
+    verifyRamDiskJMXMetric("RamDiskBytesWrite", BLOCK_SIZE * NUM_PATHS * 2);
+    verifyRamDiskJMXMetric("RamDiskBlocksReadHits", NUM_PATHS);
+    verifyRamDiskJMXMetric("RamDiskBlocksEvicted", NUM_PATHS);
+    verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 0);
+    verifyRamDiskJMXMetric("RamDiskBlocksDeletedBeforeLazyPersisted", 0);
+  }
+
+  /**
+   * Delete lazy-persist file that has not been persisted to disk.
+   * Memory is freed up and file is gone.
+   * @throws IOException
+   */
+  @Test // (timeout=300000)
+  public void testDeleteBeforePersist()
+    throws Exception {
+    startUpCluster(true, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
+
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+    makeTestFile(path, BLOCK_SIZE, true);
+    LocatedBlocks locatedBlocks =
+      ensureFileReplicasOnStorageType(path, RAM_DISK);
+
+    // Delete before persist
+    client.delete(path.toString(), false);
+    Assert.assertFalse(fs.exists(path));
+
+    assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
+
+    verifyRamDiskJMXMetric("RamDiskBlocksDeletedBeforeLazyPersisted", 1);
+  }
+
+  /**
+   * Delete lazy-persist file that has been persisted to disk
+   * Both memory blocks and disk blocks are deleted.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test (timeout=300000)
+  public void testDeleteAfterPersist()
+    throws Exception {
+    startUpCluster(true, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    makeTestFile(path, BLOCK_SIZE, true);
+    LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK);
+
+    // Sleep for a short time to allow the lazy writer thread to do its job
+    Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+    // Delete after persist
+    client.delete(path.toString(), false);
+    Assert.assertFalse(fs.exists(path));
+
+    assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
+
+    verifyRamDiskJMXMetric("RamDiskBlocksLazyPersisted", 1);
+    verifyRamDiskJMXMetric("RamDiskBytesLazyPersisted", BLOCK_SIZE);
+  }
+
+  /**
+   * RAM_DISK used/free space
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test (timeout=300000)
+  public void testDfsUsageCreateDelete()
+    throws IOException, InterruptedException {
+    startUpCluster(true, 4);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    // Get the usage before write BLOCK_SIZE
+    long usedBeforeCreate = fs.getUsed();
+
+    makeTestFile(path, BLOCK_SIZE, true);
+    long usedAfterCreate = fs.getUsed();
+
+    assertThat(usedAfterCreate, is((long) BLOCK_SIZE));
+
+    // Sleep for a short time to allow the lazy writer thread to do its job
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+    long usedAfterPersist = fs.getUsed();
+    assertThat(usedAfterPersist, is((long) BLOCK_SIZE));
+
+    // Delete after persist
+    client.delete(path.toString(), false);
+    long usedAfterDelete = fs.getUsed();
+
+    assertThat(usedBeforeCreate, is(usedAfterDelete));
+  }
+
+  /**
+   * Concurrent read from the same node and verify the contents.
+   */
+  @Test (timeout=300000)
+  public void testConcurrentRead()
+    throws Exception {
+    startUpCluster(true, 2);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    final Path path1 = new Path("/" + METHOD_NAME + ".dat");
+
+    final int SEED = 0xFADED;
+    final int NUM_TASKS = 5;
+    makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+    //Read from multiple clients
+    final CountDownLatch latch = new CountDownLatch(NUM_TASKS);
+    final AtomicBoolean testFailed = new AtomicBoolean(false);
+
+    Runnable readerRunnable = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          Assert.assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
+        } catch (Throwable e) {
+          LOG.error("readerRunnable error", e);
+          testFailed.set(true);
+        } finally {
+          latch.countDown();
+        }
+      }
+    };
+
+    Thread threads[] = new Thread[NUM_TASKS];
+    for (int i = 0; i < NUM_TASKS; i++) {
+      threads[i] = new Thread(readerRunnable);
+      threads[i].start();
+    }
+
+    Thread.sleep(500);
+
+    for (int i = 0; i < NUM_TASKS; i++) {
+      Uninterruptibles.joinUninterruptibly(threads[i]);
+    }
+    Assert.assertFalse(testFailed.get());
+  }
+
+  /**
+   * Concurrent write with eviction
+   * RAM_DISK can hold 9 replicas
+   * 4 threads each write 5 replicas
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test (timeout=300000)
+  public void testConcurrentWrites()
+    throws IOException, InterruptedException {
+    startUpCluster(true, 9);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    final int SEED = 0xFADED;
+    final int NUM_WRITERS = 4;
+    final int NUM_WRITER_PATHS = 5;
+
+    Path paths[][] = new Path[NUM_WRITERS][NUM_WRITER_PATHS];
+    for (int i = 0; i < NUM_WRITERS; i++) {
+      paths[i] = new Path[NUM_WRITER_PATHS];
+      for (int j = 0; j < NUM_WRITER_PATHS; j++) {
+        paths[i][j] =
+          new Path("/" + METHOD_NAME + ".Writer" + i + ".File." + j + ".dat");
+      }
+    }
+
+    final CountDownLatch latch = new CountDownLatch(NUM_WRITERS);
+    final AtomicBoolean testFailed = new AtomicBoolean(false);
+
+    ExecutorService executor = Executors.newFixedThreadPool(THREADPOOL_SIZE);
+    for (int i = 0; i < NUM_WRITERS; i++) {
+      Runnable writer = new WriterRunnable(i, paths[i], SEED, latch, testFailed);
+      executor.execute(writer);
+    }
+
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+    triggerBlockReport();
+
+    // Stop executor from adding new tasks to finish existing threads in queue
+    latch.await();
+
+    assertThat(testFailed.get(), is(false));
+  }
+
+  @Test (timeout=300000)
+  public void testDnRestartWithSavedReplicas()
+      throws IOException, InterruptedException {
+
+    startUpCluster(true, -1);
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+
+    makeTestFile(path1, BLOCK_SIZE, true);
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+    // Sleep for a short time to allow the lazy writer thread to do its job.
+    // However the block replica should not be evicted from RAM_DISK yet.
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+    LOG.info("Restarting the DataNode");
+    cluster.restartDataNode(0, true);
+    cluster.waitActive();
+
+    // Ensure that the replica is now on persistent storage.
+    ensureFileReplicasOnStorageType(path1, DEFAULT);
+  }
+
+  @Test (timeout=300000)
+  public void testDnRestartWithUnsavedReplicas()
+      throws IOException, InterruptedException {
+
+    startUpCluster(true, 1);
+    FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
+
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+    makeTestFile(path1, BLOCK_SIZE, true);
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+    LOG.info("Restarting the DataNode");
+    cluster.restartDataNode(0, true);
+    cluster.waitActive();
+
+    // Ensure that the replica is still on transient storage.
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+  }
+
+  // ---- Utility functions for all test cases -------------------------------
+
+  /**
+   * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
+   * capped. If ramDiskStorageLimit < 0 then it is ignored.
+   */
+  private void startUpCluster(boolean hasTransientStorage,
+                              final int ramDiskReplicaCapacity,
+                              final boolean useSCR)
+      throws IOException {
+
+    conf = new Configuration();
+    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
+                LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
+    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
+    conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+                HEARTBEAT_RECHECK_INTERVAL_MSEC);
+    conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
+                LAZY_WRITER_INTERVAL_SEC);
+    conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS,
+                EVICTION_LOW_WATERMARK);
+
+    conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, useSCR);
+
+    long[] capacities = null;
+    if (hasTransientStorage && ramDiskReplicaCapacity >= 0) {
+      // Convert replica count to byte count, add some delta for .meta and VERSION files.
+      long ramDiskStorageLimit = ((long) ramDiskReplicaCapacity * BLOCK_SIZE) + (BLOCK_SIZE - 1);
+      capacities = new long[] { ramDiskStorageLimit, -1 };
+    }
+
+    cluster = new MiniDFSCluster
+        .Builder(conf)
+        .numDataNodes(REPL_FACTOR)
+        .storageCapacities(capacities)
+        .storageTypes(hasTransientStorage ? new StorageType[]{ RAM_DISK, DEFAULT } : null)
+        .build();
+    fs = cluster.getFileSystem();
+    client = fs.getClient();
+    try {
+      jmx = initJMX();
+    } catch (Exception e) {
+      fail("Failed initialize JMX for testing: " + e);
+    }
+    LOG.info("Cluster startup complete");
+  }
+
+  private void startUpCluster(boolean hasTransientStorage,
+                              final int ramDiskReplicaCapacity)
+    throws IOException {
+    startUpCluster(hasTransientStorage, ramDiskReplicaCapacity, false);
+  }
+
+  private void makeTestFile(Path path, long length, final boolean isLazyPersist)
+      throws IOException {
+
+    EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+
+    if (isLazyPersist) {
+      createFlags.add(LAZY_PERSIST);
+    }
+
+    FSDataOutputStream fos = null;
+    try {
+      fos =
+          fs.create(path,
+              FsPermission.getFileDefault(),
+              createFlags,
+              BUFFER_LENGTH,
+              REPL_FACTOR,
+              BLOCK_SIZE,
+              null);
+
+      // Allocate a block.
+      byte[] buffer = new byte[BUFFER_LENGTH];
+      for (int bytesWritten = 0; bytesWritten < length; ) {
+        fos.write(buffer, 0, buffer.length);
+        bytesWritten += buffer.length;
+      }
+      if (length > 0) {
+        fos.hsync();
+      }
+    } finally {
+      IOUtils.closeQuietly(fos);
+    }
+  }
+
+  private LocatedBlocks ensureFileReplicasOnStorageType(
+      Path path, StorageType storageType) throws IOException {
+    // Ensure that returned block locations returned are correct!
+    LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
+    assertThat(fs.exists(path), is(true));
+    long fileLength = client.getFileInfo(path.toString()).getLen();
+    LocatedBlocks locatedBlocks =
+        client.getLocatedBlocks(path.toString(), 0, fileLength);
+    for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+      assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
+    }
+    return locatedBlocks;
+  }
+
+  private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist,
+                                  long seed) throws IOException {
+    DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
+      BLOCK_SIZE, REPL_FACTOR, seed, true);
+  }
+
+  private boolean verifyReadRandomFile(
+    Path path, int fileLength, int seed) throws IOException {
+    byte contents[] = DFSTestUtil.readFileBuffer(fs, path);
+    byte expected[] = DFSTestUtil.
+      calculateFileContentsFromSeed(seed, fileLength);
+    return Arrays.equals(contents, expected);
+  }
+
+  private boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
+    throws IOException, InterruptedException {
+
+    LOG.info("Verifying replica has no saved copy after deletion.");
+    triggerBlockReport();
+
+    while(
+      DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0))
+        > 0L){
+      Thread.sleep(1000);
+    }
+
+    final String bpid = cluster.getNamesystem().getBlockPoolId();
+    List<? extends FsVolumeSpi> volumes =
+      cluster.getDataNodes().get(0).getFSDataset().getVolumes();
+
+    // Make sure deleted replica does not have a copy on either finalized dir of
+    // transient volume or finalized dir of non-transient volume
+    for (FsVolumeSpi v : volumes) {
+      FsVolumeImpl volume = (FsVolumeImpl) v;
+      File targetDir = (v.isTransientStorage()) ?
+          volume.getBlockPoolSlice(bpid).getFinalizedDir() :
+          volume.getBlockPoolSlice(bpid).getLazypersistDir();
+      if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean verifyBlockDeletedFromDir(File dir, LocatedBlocks locatedBlocks) {
+
+    for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
+      File targetDir =
+        DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId());
+
+      File blockFile = new File(targetDir, lb.getBlock().getBlockName());
+      if (blockFile.exists()) {
+        LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
+          " exists after deletion.");
+        return false;
+      }
+      File metaFile = new File(targetDir,
+        DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
+          lb.getBlock().getGenerationStamp()));
+      if (metaFile.exists()) {
+        LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
+          " exists after deletion.");
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private void triggerBlockReport()
+    throws IOException, InterruptedException {
+    // Trigger block report to NN
+    DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
+    Thread.sleep(10 * 1000);
+  }
+
+  class WriterRunnable implements Runnable {
+    private final int id;
+    private final Path paths[];
+    private final int seed;
+    private CountDownLatch latch;
+    private AtomicBoolean bFail;
+
+    public WriterRunnable(int threadIndex, Path[] paths,
+                          int seed, CountDownLatch latch,
+                          AtomicBoolean bFail) {
+      id = threadIndex;
+      this.paths = paths;
+      this.seed = seed;
+      this.latch = latch;
+      this.bFail = bFail;
+      System.out.println("Creating Writer: " + id);
+    }
+
+    public void run() {
+      System.out.println("Writer " + id + " starting... ");
+      int i = 0;
+      try {
+        for (i = 0; i < paths.length; i++) {
+          makeRandomTestFile(paths[i], BLOCK_SIZE, true, seed);
+          // eviction may faiL when all blocks are not persisted yet.
+          // ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
+        }
+      } catch (IOException e) {
+        bFail.set(true);
+        LOG.error("Writer exception: writer id:" + id +
+          " testfile: " + paths[i].toString() +
+          " " + e);
+      } finally {
+        latch.countDown();
+      }
+    }
+  }
+
+  JMXGet initJMX() throws Exception
+  {
+    JMXGet jmx = new JMXGet();
+    jmx.setService(JMX_SERVICE_NAME);
+    jmx.init();
+    return jmx;
+  }
+
+  void printRamDiskJMXMetrics() {
+    try {
+      jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  void verifyRamDiskJMXMetric(String metricName, long expectedValue)
+      throws Exception {
+    assertEquals(expectedValue, Integer.parseInt(jmx.getValue(metricName)));
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
new file mode 100644
index 0000000..b6ac287
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+  package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+  import org.apache.commons.io.IOUtils;
+  import org.apache.commons.logging.Log;
+  import org.apache.commons.logging.LogFactory;
+  import org.apache.commons.logging.impl.Log4JLogger;
+  import org.apache.hadoop.conf.Configuration;
+  import org.apache.hadoop.fs.CreateFlag;
+  import org.apache.hadoop.fs.FSDataInputStream;
+  import org.apache.hadoop.fs.FSDataOutputStream;
+  import org.apache.hadoop.fs.Path;
+  import org.apache.hadoop.fs.permission.FsPermission;
+  import org.apache.hadoop.hdfs.*;
+  import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+  import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+  import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+  import org.apache.hadoop.hdfs.server.datanode.DataNode;
+  import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+  import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+  import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+  import org.apache.hadoop.hdfs.server.namenode.NameNode;
+  import org.apache.hadoop.net.unix.DomainSocket;
+  import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+  import org.apache.hadoop.security.UserGroupInformation;
+  import org.apache.hadoop.test.GenericTestUtils;
+  import org.apache.hadoop.util.NativeCodeLoader;
+  import org.apache.log4j.Level;
+  import org.junit.*;
+
+  import java.io.File;
+  import java.io.IOException;
+  import java.util.Arrays;
+  import java.util.EnumSet;
+  import java.util.List;
+  import java.util.UUID;
+
+  import static org.apache.hadoop.fs.CreateFlag.CREATE;
+  import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
+  import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+  import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
+  import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
+  import static org.hamcrest.CoreMatchers.equalTo;
+  import static org.hamcrest.core.Is.is;
+  import static org.junit.Assert.assertThat;
+
+public class TestScrLazyPersistFiles {
+  public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
+
+  static {
+    ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  private static short REPL_FACTOR = 1;
+  private static final int BLOCK_SIZE = 10485760;   // 10 MB
+  private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
+  private static final long HEARTBEAT_INTERVAL_SEC = 1;
+  private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
+  private static final int LAZY_WRITER_INTERVAL_SEC = 1;
+  private static final int BUFFER_LENGTH = 4096;
+  private static TemporarySocketDirectory sockDir;
+
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem fs;
+  private DFSClient client;
+  private Configuration conf;
+
+  @BeforeClass
+  public static void init() {
+    sockDir = new TemporarySocketDirectory();
+    DomainSocket.disableBindPathValidation();
+  }
+
+  @AfterClass
+  public static void shutdown() throws IOException {
+    sockDir.close();
+  }
+
+  @Before
+  public void before() {
+    Assume.assumeThat(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS,
+        equalTo(true));
+    Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+  }
+
+  @After
+  public void shutDownCluster() throws IOException {
+    if (fs != null) {
+      fs.close();
+      fs = null;
+      client = null;
+    }
+
+    if (cluster != null) {
+      cluster.shutdownDataNodes();
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  /**
+   * Read in-memory block with Short Circuit Read
+   * Note: the test uses faked RAM_DISK from physical disk.
+   */
+  @Test (timeout=300000)
+  public void testRamDiskShortCircuitRead()
+    throws IOException, InterruptedException {
+    startUpCluster(REPL_FACTOR,
+      new StorageType[]{RAM_DISK, DEFAULT},
+      2 * BLOCK_SIZE - 1, true);  // 1 replica + delta, SCR read
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    final int SEED = 0xFADED;
+    Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    makeRandomTestFile(path, BLOCK_SIZE, true, SEED);
+    ensureFileReplicasOnStorageType(path, RAM_DISK);
+
+    // Sleep for a short time to allow the lazy writer thread to do its job
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+    //assertThat(verifyReadRandomFile(path, BLOCK_SIZE, SEED), is(true));
+    FSDataInputStream fis = fs.open(path);
+
+    // Verify SCR read counters
+    try {
+      fis = fs.open(path);
+      byte[] buf = new byte[BUFFER_LENGTH];
+      fis.read(0, buf, 0, BUFFER_LENGTH);
+      HdfsDataInputStream dfsis = (HdfsDataInputStream) fis;
+      Assert.assertEquals(BUFFER_LENGTH,
+        dfsis.getReadStatistics().getTotalBytesRead());
+      Assert.assertEquals(BUFFER_LENGTH,
+        dfsis.getReadStatistics().getTotalShortCircuitBytesRead());
+    } finally {
+      fis.close();
+      fis = null;
+    }
+  }
+
+  /**
+   * Eviction of lazy persisted blocks with Short Circuit Read handle open
+   * Note: the test uses faked RAM_DISK from physical disk.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test (timeout=300000000)
+  public void testRamDiskEvictionWithShortCircuitReadHandle()
+    throws IOException, InterruptedException {
+    startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
+      (6 * BLOCK_SIZE -1), true);  // 5 replica + delta, SCR.
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+    Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+    final int SEED = 0xFADED;
+
+    makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+    // Sleep for a short time to allow the lazy writer thread to do its job.
+    // However the block replica should not be evicted from RAM_DISK yet.
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+    // No eviction should happen as the free ratio is below the threshold
+    FSDataInputStream fis = fs.open(path1);
+    try {
+      // Keep and open read handle to path1 while creating path2
+      byte[] buf = new byte[BUFFER_LENGTH];
+      fis.read(0, buf, 0, BUFFER_LENGTH);
+
+      // Create the 2nd file that will trigger RAM_DISK eviction.
+      makeTestFile(path2, BLOCK_SIZE * 2, true);
+      ensureFileReplicasOnStorageType(path2, RAM_DISK);
+
+      // Ensure path1 is still readable from the open SCR handle.
+      fis.read(fis.getPos(), buf, 0, BUFFER_LENGTH);
+      HdfsDataInputStream dfsis = (HdfsDataInputStream) fis;
+      Assert.assertEquals(2 * BUFFER_LENGTH,
+        dfsis.getReadStatistics().getTotalBytesRead());
+      Assert.assertEquals(2 * BUFFER_LENGTH,
+        dfsis.getReadStatistics().getTotalShortCircuitBytesRead());
+    } finally {
+      IOUtils.closeQuietly(fis);
+    }
+
+    // After the open handle is closed, path1 should be evicted to DISK.
+    triggerBlockReport();
+    ensureFileReplicasOnStorageType(path1, DEFAULT);
+  }
+
+  // ---- Utility functions for all test cases -------------------------------
+
+  /**
+   * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
+   * capped. If ramDiskStorageLimit < 0 then it is ignored.
+   */
+  private void startUpCluster(final int numDataNodes,
+                              final StorageType[] storageTypes,
+                              final long ramDiskStorageLimit,
+                              final boolean useSCR)
+    throws IOException {
+
+    conf = new Configuration();
+    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
+      LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
+    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
+    conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+      HEARTBEAT_RECHECK_INTERVAL_MSEC);
+    conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
+      LAZY_WRITER_INTERVAL_SEC);
+
+    if (useSCR)
+    {
+      conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
+      conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT,
+        UUID.randomUUID().toString());
+      conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+        new File(sockDir.getDir(),
+          "TestShortCircuitLocalReadHandle._PORT.sock").getAbsolutePath());
+      conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+        UserGroupInformation.getCurrentUser().getShortUserName());
+    }
+
+    REPL_FACTOR = 1; //Reset in case a test has modified the value
+
+    cluster = new MiniDFSCluster
+      .Builder(conf)
+      .numDataNodes(numDataNodes)
+      .storageTypes(storageTypes != null ? storageTypes : new StorageType[] { DEFAULT, DEFAULT })
+      .build();
+    fs = cluster.getFileSystem();
+    client = fs.getClient();
+
+    // Artificially cap the storage capacity of the RAM_DISK volume.
+    if (ramDiskStorageLimit >= 0) {
+      List<? extends FsVolumeSpi> volumes =
+        cluster.getDataNodes().get(0).getFSDataset().getVolumes();
+
+      for (FsVolumeSpi volume : volumes) {
+        if (volume.getStorageType() == RAM_DISK) {
+          ((FsVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit);
+        }
+      }
+    }
+
+    LOG.info("Cluster startup complete");
+  }
+
+  private void makeTestFile(Path path, long length, final boolean isLazyPersist)
+    throws IOException {
+
+    EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
+
+    if (isLazyPersist) {
+      createFlags.add(LAZY_PERSIST);
+    }
+
+    FSDataOutputStream fos = null;
+    try {
+      fos =
+        fs.create(path,
+          FsPermission.getFileDefault(),
+          createFlags,
+          BUFFER_LENGTH,
+          REPL_FACTOR,
+          BLOCK_SIZE,
+          null);
+
+      // Allocate a block.
+      byte[] buffer = new byte[BUFFER_LENGTH];
+      for (int bytesWritten = 0; bytesWritten < length; ) {
+        fos.write(buffer, 0, buffer.length);
+        bytesWritten += buffer.length;
+      }
+      if (length > 0) {
+        fos.hsync();
+      }
+    } finally {
+      IOUtils.closeQuietly(fos);
+    }
+  }
+
+  private LocatedBlocks ensureFileReplicasOnStorageType(
+    Path path, StorageType storageType) throws IOException {
+    // Ensure that returned block locations returned are correct!
+    LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
+    assertThat(fs.exists(path), is(true));
+    long fileLength = client.getFileInfo(path.toString()).getLen();
+    LocatedBlocks locatedBlocks =
+      client.getLocatedBlocks(path.toString(), 0, fileLength);
+    for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+      assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
+    }
+    return locatedBlocks;
+  }
+
+  private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist,
+                                  long seed) throws IOException {
+    DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
+      BLOCK_SIZE, REPL_FACTOR, seed, true);
+  }
+
+  private void triggerBlockReport()
+    throws IOException, InterruptedException {
+    // Trigger block report to NN
+    DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
+    Thread.sleep(10 * 1000);
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
index a870aa9..60c6d03 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
@@ -358,7 +358,7 @@
     }
  
     try {
-      dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED]);
+      dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED], false);
       Assert.fail("Should not have created a replica that's already " +
       		"finalized " + blocks[FINALIZED]);
     } catch (ReplicaAlreadyExistsException e) {
@@ -376,7 +376,7 @@
     }
 
     try {
-      dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY]);
+      dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY], false);
       Assert.fail("Should not have created a replica that had created as " +
       		"temporary " + blocks[TEMPORARY]);
     } catch (ReplicaAlreadyExistsException e) {
@@ -386,7 +386,7 @@
         0L, blocks[RBW].getNumBytes());  // expect to be successful
     
     try {
-      dataSet.createRbw(StorageType.DEFAULT, blocks[RBW]);
+      dataSet.createRbw(StorageType.DEFAULT, blocks[RBW], false);
       Assert.fail("Should not have created a replica that had created as RBW " +
           blocks[RBW]);
     } catch (ReplicaAlreadyExistsException e) {
@@ -402,7 +402,7 @@
     }
 
     try {
-      dataSet.createRbw(StorageType.DEFAULT, blocks[RWR]);
+      dataSet.createRbw(StorageType.DEFAULT, blocks[RWR], false);
       Assert.fail("Should not have created a replica that was waiting to be " +
       		"recovered " + blocks[RWR]);
     } catch (ReplicaAlreadyExistsException e) {
@@ -418,7 +418,7 @@
     }
 
     try {
-      dataSet.createRbw(StorageType.DEFAULT, blocks[RUR]);
+      dataSet.createRbw(StorageType.DEFAULT, blocks[RUR], false);
       Assert.fail("Should not have created a replica that was under recovery " +
           blocks[RUR]);
     } catch (ReplicaAlreadyExistsException e) {
@@ -435,7 +435,7 @@
           e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA));
     }
     
-    dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT]);
+    dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT], false);
   }
   
   private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
index a6edd80..2dae239 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
@@ -32,6 +32,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -66,6 +67,8 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
+
 /**
  * Test the data migration tool (for Archival Storage)
  */
@@ -336,10 +339,10 @@
         Assert.assertTrue(fileStatus.getFullName(parent.toString())
             + " with policy " + policy + " has non-empty overlap: " + diff
             + ", the corresponding block is " + lb.getBlock().getLocalBlock(),
-            diff.removeOverlap());
+            diff.removeOverlap(true));
       }
     }
-    
+
     Replication getReplication(Path file) throws IOException {
       return getOrVerifyReplication(file, null);
     }
@@ -407,17 +410,29 @@
   }
 
   private static StorageType[][] genStorageTypes(int numDataNodes) {
-    return genStorageTypes(numDataNodes, 0, 0);
+    return genStorageTypes(numDataNodes, 0, 0, 0);
   }
 
   private static StorageType[][] genStorageTypes(int numDataNodes,
       int numAllDisk, int numAllArchive) {
+    return genStorageTypes(numDataNodes, numAllDisk, numAllArchive, 0);
+  }
+
+  private static StorageType[][] genStorageTypes(int numDataNodes,
+      int numAllDisk, int numAllArchive, int numRamDisk) {
+    Preconditions.checkArgument(
+      (numAllDisk + numAllArchive + numRamDisk) <= numDataNodes);
+
     StorageType[][] types = new StorageType[numDataNodes][];
     int i = 0;
-    for (; i < numAllDisk; i++) {
+    for (; i < numRamDisk; i++)
+    {
+      types[i] = new StorageType[]{StorageType.RAM_DISK, StorageType.DISK};
+    }
+    for (; i < numRamDisk + numAllDisk; i++) {
       types[i] = new StorageType[]{StorageType.DISK, StorageType.DISK};
     }
-    for (; i < numAllDisk + numAllArchive; i++) {
+    for (; i < numRamDisk + numAllDisk + numAllArchive; i++) {
       types[i] = new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE};
     }
     for (; i < types.length; i++) {
@@ -425,15 +440,19 @@
     }
     return types;
   }
-  
+
   private static long[][] genCapacities(int nDatanodes, int numAllDisk,
-      int numAllArchive, long diskCapacity, long archiveCapacity) {
+      int numAllArchive, int numRamDisk, long diskCapacity,
+      long archiveCapacity, long ramDiskCapacity) {
     final long[][] capacities = new long[nDatanodes][];
     int i = 0;
-    for (; i < numAllDisk; i++) {
+    for (; i < numRamDisk; i++) {
+      capacities[i] = new long[]{ramDiskCapacity, diskCapacity};
+    }
+    for (; i < numRamDisk + numAllDisk; i++) {
       capacities[i] = new long[]{diskCapacity, diskCapacity};
     }
-    for (; i < numAllDisk + numAllArchive; i++) {
+    for (; i < numRamDisk + numAllDisk + numAllArchive; i++) {
       capacities[i] = new long[]{archiveCapacity, archiveCapacity};
     }
     for(; i < capacities.length; i++) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
index 94b139b..3f96c0c5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/CreateEditsLog.java
@@ -82,7 +82,7 @@
       }
 
       final INodeFile inode = new INodeFile(inodeId.nextValue(), null,
-          p, 0L, 0L, blocks, replication, blockSize, (byte)0);
+          p, 0L, 0L, blocks, replication, blockSize);
       inode.toUnderConstruction("", "");
 
      // Append path to filename with information about blockIDs 
@@ -97,7 +97,7 @@
         editLog.logMkDir(currentDir, dirInode);
       }
       INodeFile fileUc = new INodeFile(inodeId.nextValue(), null,
-          p, 0L, 0L, BlockInfo.EMPTY_ARRAY, replication, blockSize, (byte)0);
+          p, 0L, 0L, BlockInfo.EMPTY_ARRAY, replication, blockSize);
       fileUc.toUnderConstruction("", "");
       editLog.logOpenFile(filePath, fileUc, false, false);
       editLog.logCloseFile(filePath, inode);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
index cf37a54..6098ebf0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
@@ -35,6 +35,7 @@
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
index 8070a5f..7b62242 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
@@ -194,7 +194,7 @@
 
       for (int i = 0; i < numTransactions; i++) {
         INodeFile inode = new INodeFile(namesystem.allocateNewInodeId(), null,
-            p, 0L, 0L, BlockInfo.EMPTY_ARRAY, replication, blockSize, (byte)0);
+            p, 0L, 0L, BlockInfo.EMPTY_ARRAY, replication, blockSize);
         inode.toUnderConstruction("", "");
 
         editLog.logOpenFile("/filename" + (startIndex + i), inode, false, false);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java
index 9bee4a9b..91931aa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSPermissionChecker.java
@@ -432,7 +432,7 @@
       FsPermission.createImmutable(perm));
     INodeFile inodeFile = new INodeFile(INodeId.GRANDFATHER_INODE_ID,
       name.getBytes("UTF-8"), permStatus, 0L, 0L, null, REPLICATION,
-      PREFERRED_BLOCK_SIZE, (byte)0);
+      PREFERRED_BLOCK_SIZE);
     parent.addChild(inodeFile);
     return inodeFile;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
index 8d298ae..e16df16 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
@@ -1018,8 +1018,8 @@
     byte storagePolicy = 0;
 
     HdfsFileStatus file = new HdfsFileStatus(length, isDir, blockReplication,
-        blockSize, modTime, accessTime, perms, owner, group, symlink, path,
-        fileId, numChildren, null, storagePolicy);
+        blockSize, modTime, accessTime, perms, owner, group, symlink,
+        path, fileId, numChildren, null, storagePolicy);
     Result res = new Result(conf);
 
     try {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
index 26d9a96..4221ad5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
@@ -82,7 +82,7 @@
 
   INodeFile createINodeFile(short replication, long preferredBlockSize) {
     return new INodeFile(INodeId.GRANDFATHER_INODE_ID, null, perm, 0L, 0L,
-        null, replication, preferredBlockSize, (byte)0);
+        null, replication, preferredBlockSize);
   }
 
   private static INodeFile createINodeFile(byte storagePolicyID) {
@@ -283,7 +283,7 @@
     INodeFile[] iNodes = new INodeFile[nCount];
     for (int i = 0; i < nCount; i++) {
       iNodes[i] = new INodeFile(i, null, perm, 0L, 0L, null, replication,
-          preferredBlockSize, (byte)0);
+          preferredBlockSize);
       iNodes[i].setLocalName(DFSUtil.string2Bytes(fileNamePrefix + i));
       BlockInfo newblock = new BlockInfo(replication);
       iNodes[i].addBlock(newblock);
@@ -341,7 +341,7 @@
     {//cast from INodeFileUnderConstruction
       final INode from = new INodeFile(
           INodeId.GRANDFATHER_INODE_ID, null, perm, 0L, 0L, null, replication,
-          1024L, (byte)0);
+          1024L);
       from.asFile().toUnderConstruction("client", "machine");
     
       //cast to INodeFile, should success
@@ -1104,7 +1104,7 @@
   public void testFileUnderConstruction() {
     replication = 3;
     final INodeFile file = new INodeFile(INodeId.GRANDFATHER_INODE_ID, null,
-        perm, 0L, 0L, null, replication, 1024L, (byte)0);
+        perm, 0L, 0L, null, replication, 1024L);
     assertFalse(file.isUnderConstruction());
 
     final String clientName = "client";