Merging r1565757 through r1565853 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5698@1565857 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5698.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5698.txt
new file mode 100644
index 0000000..5858968
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5698.txt
@@ -0,0 +1,48 @@
+HDFS-5698 subtasks
+
+    HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)
+
+    HDFS-5738. Serialize INode information in protobuf. (Haohui Mai via jing9)
+
+    HDFS-5772. Serialize under-construction file information in FSImage. (jing9)
+
+    HDFS-5783. Compute the digest before loading FSImage. (Haohui Mai via jing9)
+
+    HDFS-5785. Serialize symlink in protobuf. (Haohui Mai via jing9)
+
+    HDFS-5793. Optimize the serialization of PermissionStatus. (Haohui Mai via
+    jing9)
+
+    HDFS-5743. Use protobuf to serialize snapshot information. (jing9)
+
+    HDFS-5774. Serialize CachePool directives in protobuf. (Haohui Mai via jing9)
+
+    HDFS-5744. Serialize information for token managers in protobuf. (Haohui Mai
+    via jing9)
+
+    HDFS-5824. Add a Type field in Snapshot DiffEntry's protobuf definition.
+    (jing9)
+
+    HDFS-5808. Implement cancellation when saving FSImage. (Haohui Mai via jing9)
+
+    HDFS-5826. Update the stored edit logs to be consistent with the changes in
+    HDFS-5698 branch. (Haohui Mai via jing9)
+
+    HDFS-5797. Implement offline image viewer. (Haohui Mai via jing9)
+
+    HDFS-5771. Track progress when loading fsimage. (Haohui Mai via cnauroth)
+
+    HDFS-5871. Use PBHelper to serialize CacheDirectiveInfoExpirationProto.
+    (Haohui Mai via jing9)
+
+    HDFS-5884. LoadDelegator should use IOUtils.readFully() to read the magic
+    header. (Haohui Mai via jing9)
+
+    HDFS-5885. Add annotation for repeated fields in the protobuf definition.
+    (Haohui Mai via jing9)
+
+    HDFS-5906. Fixing findbugs and javadoc warnings in the HDFS-5698 branch.
+    (Haohui Mai via jing9)
+
+    HDFS-5911. The id of a CacheDirective instance does not get serialized in 
+    the protobuf-fsimage. (Haohui Mai via jing9)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
index 028e64c..70b7e65 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
@@ -9,6 +9,9 @@
        <Package name="org.apache.hadoop.hdfs.server.namenode.ha.proto" />
      </Match>
      <Match>
+       <Class name="~org.apache.hadoop.hdfs.server.namenode.FsImageProto.*" />
+     </Match>
+     <Match>
        <Package name="org.apache.hadoop.hdfs.qjournal.protocol" />
      </Match>
      <Match>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index 0b1e55d..6cd9fea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -458,6 +458,7 @@
                 <includes>
                   <include>ClientDatanodeProtocol.proto</include>
                   <include>DatanodeProtocol.proto</include>
+                  <include>fsimage.proto</include>
                 </includes>
               </source>
               <output>${project.build.directory}/generated-sources/java</output>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
index fa00cd4..5d823b7 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
@@ -139,7 +139,7 @@
 elif [ "$COMMAND" = "jmxget" ] ; then
   CLASS=org.apache.hadoop.hdfs.tools.JMXGet
 elif [ "$COMMAND" = "oiv" ] ; then
-  CLASS=org.apache.hadoop.hdfs.tools.offlineImageViewer.OfflineImageViewer
+  CLASS=org.apache.hadoop.hdfs.tools.offlineImageViewer.OfflineImageViewerPB
 elif [ "$COMMAND" = "oev" ] ; then
   CLASS=org.apache.hadoop.hdfs.tools.offlineEditsViewer.OfflineEditsViewer
 elif [ "$COMMAND" = "fetchdt" ] ; then
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
index 923ed70..9842b53 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
@@ -112,7 +112,8 @@
     ADD_DATANODE_AND_STORAGE_UUIDS(-49, "Replace StorageID with DatanodeUuid."
         + " Use distinct StorageUuid per storage directory."),
     ADD_LAYOUT_FLAGS(-50, "Add support for layout flags."),
-    CACHING(-51, "Support for cache pools and path-based caching");
+    CACHING(-51, "Support for cache pools and path-based caching"),
+    PROTOBUF_FORMAT(-52, "Use protobuf to serialize FSImage");
 
     final int lv;
     final int ancestorLV;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java
index e291204..b9fce60 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java
@@ -23,12 +23,16 @@
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
@@ -46,6 +50,10 @@
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+
 /**
  * A HDFS specific delegation token secret manager.
  * The secret manager is responsible for generating and accepting the password
@@ -167,7 +175,45 @@
     }
     serializerCompat.load(in);
   }
-  
+
+  public static class SecretManagerState {
+    public final SecretManagerSection section;
+    public final List<SecretManagerSection.DelegationKey> keys;
+    public final List<SecretManagerSection.PersistToken> tokens;
+
+    public SecretManagerState(
+        SecretManagerSection s,
+        List<SecretManagerSection.DelegationKey> keys,
+        List<SecretManagerSection.PersistToken> tokens) {
+      this.section = s;
+      this.keys = keys;
+      this.tokens = tokens;
+    }
+  }
+
+  public synchronized void loadSecretManagerState(SecretManagerState state)
+      throws IOException {
+    Preconditions.checkState(!running,
+        "Can't load state from image in a running SecretManager.");
+
+    currentId = state.section.getCurrentId();
+    delegationTokenSequenceNumber = state.section.getTokenSequenceNumber();
+    for (SecretManagerSection.DelegationKey k : state.keys) {
+      addKey(new DelegationKey(k.getId(), k.getExpiryDate(), k.hasKey() ? k
+          .getKey().toByteArray() : null));
+    }
+
+    for (SecretManagerSection.PersistToken t : state.tokens) {
+      DelegationTokenIdentifier id = new DelegationTokenIdentifier(new Text(
+          t.getOwner()), new Text(t.getRenewer()), new Text(t.getRealUser()));
+      id.setIssueDate(t.getIssueDate());
+      id.setMaxDate(t.getMaxDate());
+      id.setSequenceNumber(t.getSequenceNumber());
+      id.setMasterKeyId(t.getMasterKeyId());
+      addPersistedDelegationToken(id, t.getExpiryDate());
+    }
+  }
+
   /**
    * Store the current state of the SecretManager for persistence
    * 
@@ -179,7 +225,43 @@
       String sdPath) throws IOException {
     serializerCompat.save(out, sdPath);
   }
-  
+
+  public synchronized SecretManagerState saveSecretManagerState() {
+    SecretManagerSection s = SecretManagerSection.newBuilder()
+        .setCurrentId(currentId)
+        .setTokenSequenceNumber(delegationTokenSequenceNumber)
+        .setNumKeys(allKeys.size()).setNumTokens(currentTokens.size()).build();
+    ArrayList<SecretManagerSection.DelegationKey> keys = Lists
+        .newArrayListWithCapacity(allKeys.size());
+    ArrayList<SecretManagerSection.PersistToken> tokens = Lists
+        .newArrayListWithCapacity(currentTokens.size());
+
+    for (DelegationKey v : allKeys.values()) {
+      SecretManagerSection.DelegationKey.Builder b = SecretManagerSection.DelegationKey
+          .newBuilder().setId(v.getKeyId()).setExpiryDate(v.getExpiryDate());
+      if (v.getEncodedKey() != null) {
+        b.setKey(ByteString.copyFrom(v.getEncodedKey()));
+      }
+      keys.add(b.build());
+    }
+
+    for (Entry<DelegationTokenIdentifier, DelegationTokenInformation> e : currentTokens
+        .entrySet()) {
+      DelegationTokenIdentifier id = e.getKey();
+      SecretManagerSection.PersistToken.Builder b = SecretManagerSection.PersistToken
+          .newBuilder().setOwner(id.getOwner().toString())
+          .setRenewer(id.getRenewer().toString())
+          .setRealUser(id.getRealUser().toString())
+          .setIssueDate(id.getIssueDate()).setMaxDate(id.getMaxDate())
+          .setSequenceNumber(id.getSequenceNumber())
+          .setMasterKeyId(id.getMasterKeyId())
+          .setExpiryDate(e.getValue().getRenewDate());
+      tokens.add(b.build());
+    }
+
+    return new SecretManagerState(s, keys, tokens);
+  }
+
   /**
    * This method is intended to be used only while reading edit logs.
    * 
@@ -431,4 +513,5 @@
       prog.endStep(Phase.LOADING_FSIMAGE, step);
     }
   }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
index ba3936c..de536b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
@@ -50,8 +50,10 @@
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.CacheDirective;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -62,11 +64,15 @@
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
@@ -81,6 +87,7 @@
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 
 /**
  * The Cache Manager handles caching on DataNodes.
@@ -167,6 +174,19 @@
    */
   private CacheReplicationMonitor monitor;
 
+  public static final class PersistState {
+    public final CacheManagerSection section;
+    public final List<CachePoolInfoProto> pools;
+    public final List<CacheDirectiveInfoProto> directives;
+
+    public PersistState(CacheManagerSection section,
+        List<CachePoolInfoProto> pools, List<CacheDirectiveInfoProto> directives) {
+      this.section = section;
+      this.pools = pools;
+      this.directives = directives;
+    }
+  }
+
   CacheManager(FSNamesystem namesystem, Configuration conf,
       BlockManager blockManager) {
     this.namesystem = namesystem;
@@ -944,6 +964,64 @@
     serializerCompat.save(out, sdPath);
   }
 
+  public PersistState saveState() throws IOException {
+    ArrayList<CachePoolInfoProto> pools = Lists
+        .newArrayListWithCapacity(cachePools.size());
+    ArrayList<CacheDirectiveInfoProto> directives = Lists
+        .newArrayListWithCapacity(directivesById.size());
+
+    for (CachePool pool : cachePools.values()) {
+      CachePoolInfo p = pool.getInfo(true);
+      CachePoolInfoProto.Builder b = CachePoolInfoProto.newBuilder()
+          .setPoolName(p.getPoolName());
+
+      if (p.getOwnerName() != null)
+        b.setOwnerName(p.getOwnerName());
+
+      if (p.getGroupName() != null)
+        b.setGroupName(p.getGroupName());
+
+      if (p.getMode() != null)
+        b.setMode(p.getMode().toShort());
+
+      if (p.getLimit() != null)
+        b.setLimit(p.getLimit());
+
+      pools.add(b.build());
+    }
+
+    for (CacheDirective directive : directivesById.values()) {
+      CacheDirectiveInfo info = directive.toInfo();
+      CacheDirectiveInfoProto.Builder b = CacheDirectiveInfoProto.newBuilder()
+          .setId(info.getId());
+
+      if (info.getPath() != null) {
+        b.setPath(info.getPath().toUri().getPath());
+      }
+
+      if (info.getReplication() != null) {
+        b.setReplication(info.getReplication());
+      }
+
+      if (info.getPool() != null) {
+        b.setPool(info.getPool());
+      }
+
+      Expiration expiry = info.getExpiration();
+      if (expiry != null) {
+        assert (!expiry.isRelative());
+        b.setExpiration(PBHelper.convert(expiry));
+      }
+
+      directives.add(b.build());
+    }
+    CacheManagerSection s = CacheManagerSection.newBuilder()
+        .setNextDirectiveId(nextDirectiveId).setNumPools(pools.size())
+        .setNumDirectives(directives.size()).build();
+
+    return new PersistState(s, pools, directives);
+  }
+
   /**
    * Reloads CacheManager state from the passed DataInput. Used during namenode
    * startup to restore CacheManager state from an FSImage.
@@ -954,6 +1032,56 @@
     serializerCompat.load(in);
   }
 
+  public void loadState(PersistState s) throws IOException {
+    nextDirectiveId = s.section.getNextDirectiveId();
+    for (CachePoolInfoProto p : s.pools) {
+      CachePoolInfo info = new CachePoolInfo(p.getPoolName());
+      if (p.hasOwnerName())
+        info.setOwnerName(p.getOwnerName());
+
+      if (p.hasGroupName())
+        info.setGroupName(p.getGroupName());
+
+      if (p.hasMode())
+        info.setMode(new FsPermission((short) p.getMode()));
+
+      if (p.hasLimit())
+        info.setLimit(p.getLimit());
+
+      addCachePool(info);
+    }
+
+    for (CacheDirectiveInfoProto p : s.directives) {
+      // Get pool reference by looking it up in the map
+      final String poolName = p.getPool();
+      CacheDirective directive = new CacheDirective(p.getId(), new Path(
+          p.getPath()).toUri().getPath(), (short) p.getReplication(), p
+          .getExpiration().getMillis());
+      addCacheDirective(poolName, directive);
+    }
+  }
+
+  private void addCacheDirective(final String poolName,
+      final CacheDirective directive) throws IOException {
+    CachePool pool = cachePools.get(poolName);
+    if (pool == null) {
+      throw new IOException("Directive refers to pool " + poolName
+          + ", which does not exist.");
+    }
+    boolean addedDirective = pool.getDirectiveList().add(directive);
+    assert addedDirective;
+    if (directivesById.put(directive.getId(), directive) != null) {
+      throw new IOException("A directive with ID " + directive.getId()
+          + " already exists");
+    }
+    List<CacheDirective> directives = directivesByPath.get(directive.getPath());
+    if (directives == null) {
+      directives = new LinkedList<CacheDirective>();
+      directivesByPath.put(directive.getPath(), directives);
+    }
+    directives.add(directive);
+  }
+
   private final class SerializerCompat {
     private void save(DataOutputStream out, String sdPath) throws IOException {
       out.writeLong(nextDirectiveId);
@@ -1036,27 +1164,10 @@
         CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in);
         // Get pool reference by looking it up in the map
         final String poolName = info.getPool();
-        CachePool pool = cachePools.get(poolName);
-        if (pool == null) {
-          throw new IOException("Directive refers to pool " + poolName +
-              ", which does not exist.");
-        }
         CacheDirective directive =
             new CacheDirective(info.getId(), info.getPath().toUri().getPath(),
                 info.getReplication(), info.getExpiration().getAbsoluteMillis());
-        boolean addedDirective = pool.getDirectiveList().add(directive);
-        assert addedDirective;
-        if (directivesById.put(directive.getId(), directive) != null) {
-          throw new IOException("A directive with ID " + directive.getId() +
-              " already exists");
-        }
-        List<CacheDirective> directives =
-            directivesByPath.get(directive.getPath());
-        if (directives == null) {
-          directives = new LinkedList<CacheDirective>();
-          directivesByPath.put(directive.getPath(), directives);
-        }
-        directives.add(directive);
+        addCacheDirective(poolName, directive);
         counter.increment();
       }
       prog.endStep(Phase.LOADING_FSIMAGE, step);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index 166ffb2..6202017 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -797,8 +797,7 @@
    */
   private void loadFSImage(File curFile, MD5Hash expectedMd5,
       FSNamesystem target, MetaRecoveryContext recovery) throws IOException {
-    FSImageFormat.Loader loader = new FSImageFormat.Loader(
-        conf, target);
+    FSImageFormat.LoaderDelegator loader = FSImageFormat.newLoader(conf, target);
     loader.load(curFile);
     target.setBlockPoolId(this.getBlockPoolID());
 
@@ -827,7 +826,7 @@
     File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
     File dstFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE, txid);
     
-    FSImageFormat.Saver saver = new FSImageFormat.Saver(context);
+    FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context);
     FSImageCompression compression = FSImageCompression.createCompression(conf);
     saver.save(newFile, compression);
     
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java
index e0a46f1..872ee74 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageCompression.java
@@ -57,6 +57,10 @@
     imageCodec = codec;
   }
 
+  public CompressionCodec getImageCodec() {
+    return imageCodec;
+  }
+
   /**
    * Create a "noop" compression - i.e. uncompressed
    */
@@ -89,7 +93,7 @@
    * Create a compression instance using the codec specified by
    * <code>codecClassName</code>
    */
-  private static FSImageCompression createCompression(Configuration conf,
+  static FSImageCompression createCompression(Configuration conf,
                                                       String codecClassName)
     throws IOException {
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
index 3ad258a..bcbad75 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
@@ -68,12 +68,13 @@
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.StringUtils;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Contains inner classes for reading or writing the on-disk format for
@@ -180,16 +181,74 @@
 @InterfaceStability.Evolving
 public class FSImageFormat {
   private static final Log LOG = FSImage.LOG;
-  
+
   // Static-only class
   private FSImageFormat() {}
-  
+
+  interface AbstractLoader {
+    MD5Hash getLoadedImageMd5();
+    long getLoadedImageTxId();
+  }
+
+  static class LoaderDelegator implements AbstractLoader {
+    private AbstractLoader impl;
+    private final Configuration conf;
+    private final FSNamesystem fsn;
+
+    LoaderDelegator(Configuration conf, FSNamesystem fsn) {
+      this.conf = conf;
+      this.fsn = fsn;
+    }
+
+    @Override
+    public MD5Hash getLoadedImageMd5() {
+      return impl.getLoadedImageMd5();
+    }
+
+    @Override
+    public long getLoadedImageTxId() {
+      return impl.getLoadedImageTxId();
+    }
+
+    public void load(File file) throws IOException {
+      Preconditions.checkState(impl == null, "Image already loaded!");
+
+      FileInputStream is = null;
+      try {
+        is = new FileInputStream(file);
+        byte[] magic = new byte[FSImageUtil.MAGIC_HEADER.length];
+        IOUtils.readFully(is, magic, 0, magic.length);
+        if (Arrays.equals(magic, FSImageUtil.MAGIC_HEADER)) {
+          FSImageFormatProtobuf.Loader loader = new FSImageFormatProtobuf.Loader(
+              conf, fsn);
+          impl = loader;
+          loader.load(file);
+        } else {
+          Loader loader = new Loader(conf, fsn);
+          impl = loader;
+          loader.load(file);
+        }
+
+      } finally {
+        IOUtils.cleanup(LOG, is);
+      }
+    }
+  }
+
+  /**
+   * Construct a loader class to load the image. It chooses the loader based on
+   * the layout version.
+   */
+  public static LoaderDelegator newLoader(Configuration conf, FSNamesystem fsn) {
+    return new LoaderDelegator(conf, fsn);
+  }
+
   /**
    * A one-shot class responsible for loading an image. The load() function
    * should be called once, after which the getter methods may be used to retrieve
    * information about the image that was loaded, if loading was successful.
    */
-  public static class Loader {
+  public static class Loader implements AbstractLoader {
     private final Configuration conf;
     /** which namesystem this loader is working for */
     private final FSNamesystem namesystem;
@@ -214,12 +273,14 @@
      * Return the MD5 checksum of the image that has been loaded.
      * @throws IllegalStateException if load() has not yet been called.
      */
-    MD5Hash getLoadedImageMd5() {
+    @Override
+    public MD5Hash getLoadedImageMd5() {
       checkLoaded();
       return imgDigest;
     }
 
-    long getLoadedImageTxId() {
+    @Override
+    public long getLoadedImageTxId() {
       checkLoaded();
       return imgTxId;
     }
@@ -242,7 +303,7 @@
       }
     }
 
-    void load(File curFile) throws IOException {
+    public void load(File curFile) throws IOException {
       checkNotLoaded();
       assert curFile != null : "curFile is null";
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
new file mode 100644
index 0000000..5ade5ce
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
@@ -0,0 +1,466 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.StringMap;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FilesUnderConstructionSection.FileUnderConstructionEntry;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeDirectorySection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.DstReference;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+
+@InterfaceAudience.Private
+public final class FSImageFormatPBINode {
+  private final static long USER_GROUP_STRID_MASK = (1 << 24) - 1;
+  private final static int USER_STRID_OFFSET = 40;
+  private final static int GROUP_STRID_OFFSET = 16;
+  private static final Log LOG = LogFactory.getLog(FSImageFormatProtobuf.class);
+
+  public final static class Loader {
+    public static PermissionStatus loadPermission(long id,
+        final String[] stringTable) {
+      short perm = (short) (id & ((1 << GROUP_STRID_OFFSET) - 1));
+      int gsid = (int) ((id >> GROUP_STRID_OFFSET) & USER_GROUP_STRID_MASK);
+      int usid = (int) ((id >> USER_STRID_OFFSET) & USER_GROUP_STRID_MASK);
+      return new PermissionStatus(stringTable[usid], stringTable[gsid],
+          new FsPermission(perm));
+    }
+
+    public static INodeReference loadINodeReference(
+        INodeSection.INodeReference r, FSDirectory dir) throws IOException {
+      long referredId = r.getReferredId();
+      INode referred = dir.getInode(referredId);
+      WithCount withCount = (WithCount) referred.getParentReference();
+      if (withCount == null) {
+        withCount = new INodeReference.WithCount(null, referred);
+      }
+      final INodeReference ref;
+      if (r.hasDstSnapshotId()) { // DstReference
+        ref = new INodeReference.DstReference(null, withCount,
+            r.getDstSnapshotId());
+      } else {
+        ref = new INodeReference.WithName(null, withCount, r.getName()
+            .toByteArray(), r.getLastSnapshotId());
+      }
+      return ref;
+    }
+
+    public static INodeDirectory loadINodeDirectory(INodeSection.INode n,
+        final String[] stringTable) {
+      assert n.getType() == INodeSection.INode.Type.DIRECTORY;
+      INodeSection.INodeDirectory d = n.getDirectory();
+
+      final PermissionStatus permissions = loadPermission(d.getPermission(),
+          stringTable);
+      final INodeDirectory dir = new INodeDirectory(n.getId(), n.getName()
+          .toByteArray(), permissions, d.getModificationTime());
+
+      final long nsQuota = d.getNsQuota(), dsQuota = d.getDsQuota();
+      if (nsQuota >= 0 || dsQuota >= 0) {
+        dir.addDirectoryWithQuotaFeature(nsQuota, dsQuota);
+      }
+      return dir;
+    }
+
+    public static void updateBlocksMap(INodeFile file, BlockManager bm) {
+      // Add file->block mapping
+      final BlockInfo[] blocks = file.getBlocks();
+      if (blocks != null) {
+        for (int i = 0; i < blocks.length; i++) {
+          file.setBlock(i, bm.addBlockCollection(blocks[i], file));
+        }
+      }
+    }
+
+    private final FSDirectory dir;
+    private final FSNamesystem fsn;
+    private final FSImageFormatProtobuf.Loader parent;
+
+    Loader(FSNamesystem fsn, final FSImageFormatProtobuf.Loader parent) {
+      this.fsn = fsn;
+      this.dir = fsn.dir;
+      this.parent = parent;
+    }
+
+    void loadINodeDirectorySection(InputStream in) throws IOException {
+      while (true) {
+        INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry
+            .parseDelimitedFrom(in);
+        // note that in is a LimitedInputStream
+        if (e == null) {
+          break;
+        }
+        INodeDirectory p = dir.getInode(e.getParent()).asDirectory();
+        for (long id : e.getChildrenList()) {
+          INode child = dir.getInode(id);
+          addToParent(p, child);
+        }
+        for (int i = 0; i < e.getNumOfRef(); i++) {
+          INodeReference ref = loadINodeReference(in);
+          addToParent(p, ref);
+        }
+      }
+    }
+
+    private INodeReference loadINodeReference(InputStream in)
+        throws IOException {
+      INodeSection.INodeReference ref = INodeSection.INodeReference
+          .parseDelimitedFrom(in);
+      return loadINodeReference(ref, dir);
+    }
+
+    void loadINodeSection(InputStream in) throws IOException {
+      INodeSection s = INodeSection.parseDelimitedFrom(in);
+      fsn.resetLastInodeId(s.getLastInodeId());
+      LOG.info("Loading " + s.getNumInodes() + " INodes.");
+      for (int i = 0; i < s.getNumInodes(); ++i) {
+        INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
+        if (p.getId() == INodeId.ROOT_INODE_ID) {
+          loadRootINode(p);
+        } else {
+          INode n = loadINode(p);
+          dir.addToInodeMap(n);
+        }
+      }
+    }
+
+    /**
+     * Load the under-construction files section, and update the lease map
+     */
+    void loadFilesUnderConstructionSection(InputStream in) throws IOException {
+      while (true) {
+        FileUnderConstructionEntry entry = FileUnderConstructionEntry
+            .parseDelimitedFrom(in);
+        if (entry == null) {
+          break;
+        }
+        // update the lease manager
+        INodeFile file = dir.getInode(entry.getInodeId()).asFile();
+        FileUnderConstructionFeature uc = file.getFileUnderConstructionFeature();
+        Preconditions.checkState(uc != null); // file must be under-construction
+        fsn.leaseManager.addLease(uc.getClientName(), entry.getFullPath());
+      }
+    }
+
+    private void addToParent(INodeDirectory parent, INode child) {
+      if (parent == dir.rootDir && FSDirectory.isReservedName(child)) {
+        throw new HadoopIllegalArgumentException("File name \""
+            + child.getLocalName() + "\" is reserved. Please "
+            + " change the name of the existing file or directory to another "
+            + "name before upgrading to this release.");
+      }
+      // NOTE: This does not update space counts for parents
+      if (!parent.addChild(child)) {
+        return;
+      }
+      dir.cacheName(child);
+
+      if (child.isFile()) {
+        updateBlocksMap(child.asFile(), fsn.getBlockManager());
+      }
+    }
+
+    private INode loadINode(INodeSection.INode n) {
+      switch (n.getType()) {
+      case FILE:
+        return loadINodeFile(n);
+      case DIRECTORY:
+        return loadINodeDirectory(n, parent.getStringTable());
+      case SYMLINK:
+        return loadINodeSymlink(n);
+      default:
+        break;
+      }
+      return null;
+    }
+
+    private INodeFile loadINodeFile(INodeSection.INode n) {
+      assert n.getType() == INodeSection.INode.Type.FILE;
+      INodeSection.INodeFile f = n.getFile();
+      List<BlockProto> bp = f.getBlocksList();
+      short replication = (short) f.getReplication();
+
+      BlockInfo[] blocks = new BlockInfo[bp.size()];
+      for (int i = 0, e = bp.size(); i < e; ++i) {
+        blocks[i] = new BlockInfo(PBHelper.convert(bp.get(i)), replication);
+      }
+      final PermissionStatus permissions = loadPermission(f.getPermission(),
+          parent.getStringTable());
+
+      final INodeFile file = new INodeFile(n.getId(),
+          n.getName().toByteArray(), permissions, f.getModificationTime(),
+          f.getAccessTime(), blocks, replication, f.getPreferredBlockSize());
+      // under-construction information
+      if (f.hasFileUC()) {
+        INodeSection.FileUnderConstructionFeature uc = f.getFileUC();
+        file.toUnderConstruction(uc.getClientName(), uc.getClientMachine(),
+            null);
+        if (blocks.length > 0) {
+          BlockInfo lastBlk = file.getLastBlock();
+          // replace the last block of file
+          file.setBlock(file.numBlocks() - 1, new BlockInfoUnderConstruction(
+              lastBlk, replication));
+        }
+      }
+      return file;
+    }
+
+
+    private INodeSymlink loadINodeSymlink(INodeSection.INode n) {
+      assert n.getType() == INodeSection.INode.Type.SYMLINK;
+      INodeSection.INodeSymlink s = n.getSymlink();
+      final PermissionStatus permissions = loadPermission(s.getPermission(),
+          parent.getStringTable());
+      return new INodeSymlink(n.getId(), n.getName().toByteArray(), permissions,
+          0, 0, s.getTarget().toStringUtf8());
+    }
+
+    private void loadRootINode(INodeSection.INode p) {
+      INodeDirectory root = loadINodeDirectory(p, parent.getStringTable());
+      final Quota.Counts q = root.getQuotaCounts();
+      final long nsQuota = q.get(Quota.NAMESPACE);
+      final long dsQuota = q.get(Quota.DISKSPACE);
+      if (nsQuota != -1 || dsQuota != -1) {
+        dir.rootDir.getDirectoryWithQuotaFeature().setQuota(nsQuota, dsQuota);
+      }
+      dir.rootDir.cloneModificationTime(root);
+      dir.rootDir.clonePermissionStatus(root);
+    }
+  }
+
+  public final static class Saver {
+    private static long buildPermissionStatus(INodeAttributes n,
+        final StringMap stringMap) {
+      long userId = stringMap.getStringId(n.getUserName());
+      long groupId = stringMap.getStringId(n.getGroupName());
+      return ((userId & USER_GROUP_STRID_MASK) << USER_STRID_OFFSET)
+          | ((groupId & USER_GROUP_STRID_MASK) << GROUP_STRID_OFFSET)
+          | n.getFsPermissionShort();
+    }
+
+    public static INodeSection.INodeFile.Builder buildINodeFile(
+        INodeFileAttributes file, final StringMap stringMap) {
+      INodeSection.INodeFile.Builder b = INodeSection.INodeFile.newBuilder()
+          .setAccessTime(file.getAccessTime())
+          .setModificationTime(file.getModificationTime())
+          .setPermission(buildPermissionStatus(file, stringMap))
+          .setPreferredBlockSize(file.getPreferredBlockSize())
+          .setReplication(file.getFileReplication());
+      return b;
+    }
+
+    public static INodeSection.INodeDirectory.Builder buildINodeDirectory(
+        INodeDirectoryAttributes dir, final StringMap stringMap) {
+      Quota.Counts quota = dir.getQuotaCounts();
+      INodeSection.INodeDirectory.Builder b = INodeSection.INodeDirectory
+          .newBuilder().setModificationTime(dir.getModificationTime())
+          .setNsQuota(quota.get(Quota.NAMESPACE))
+          .setDsQuota(quota.get(Quota.DISKSPACE))
+          .setPermission(buildPermissionStatus(dir, stringMap));
+      return b;
+    }
+
+    public static INodeSection.INodeReference.Builder buildINodeReference(
+        INodeReference ref) throws IOException {
+      INodeSection.INodeReference.Builder rb = INodeSection.INodeReference
+          .newBuilder().setReferredId(ref.getId());
+      if (ref instanceof WithName) {
+        rb.setLastSnapshotId(((WithName) ref).getLastSnapshotId()).setName(
+            ByteString.copyFrom(ref.getLocalNameBytes()));
+      } else if (ref instanceof DstReference) {
+        rb.setDstSnapshotId(((DstReference) ref).getDstSnapshotId());
+      }
+      return rb;
+    }
+
+    private final FSNamesystem fsn;
+    private final FileSummary.Builder summary;
+    private final SaveNamespaceContext context;
+    private final FSImageFormatProtobuf.Saver parent;
+
+    Saver(FSImageFormatProtobuf.Saver parent, FileSummary.Builder summary) {
+      this.parent = parent;
+      this.summary = summary;
+      this.context = parent.getContext();
+      this.fsn = context.getSourceNamesystem();
+    }
+
+    void serializeINodeDirectorySection(OutputStream out) throws IOException {
+      Iterator<INodeWithAdditionalFields> iter = fsn.getFSDirectory()
+          .getINodeMap().getMapIterator();
+      int i = 0;
+      while (iter.hasNext()) {
+        INodeWithAdditionalFields n = iter.next();
+        if (!n.isDirectory()) {
+          continue;
+        }
+
+        ReadOnlyList<INode> children = n.asDirectory().getChildrenList(
+            Snapshot.CURRENT_STATE_ID);
+        if (children.size() > 0) {
+          INodeDirectorySection.DirEntry.Builder b = INodeDirectorySection.
+              DirEntry.newBuilder().setParent(n.getId());
+          List<INodeReference> refs = new ArrayList<INodeReference>();
+          for (INode inode : children) {
+            if (!inode.isReference()) {
+              b.addChildren(inode.getId());
+            } else {
+              refs.add(inode.asReference());
+            }
+          }
+          b.setNumOfRef(refs.size());
+          INodeDirectorySection.DirEntry e = b.build();
+          e.writeDelimitedTo(out);
+          for (INodeReference ref : refs) {
+            INodeSection.INodeReference.Builder rb = buildINodeReference(ref);
+            rb.build().writeDelimitedTo(out);
+          }
+        }
+
+        ++i;
+        if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
+          context.checkCancelled();
+        }
+      }
+      parent.commitSection(summary,
+          FSImageFormatProtobuf.SectionName.INODE_DIR);
+    }
+
+    void serializeINodeSection(OutputStream out) throws IOException {
+      INodeMap inodesMap = fsn.dir.getINodeMap();
+
+      INodeSection.Builder b = INodeSection.newBuilder()
+          .setLastInodeId(fsn.getLastInodeId()).setNumInodes(inodesMap.size());
+      INodeSection s = b.build();
+      s.writeDelimitedTo(out);
+
+      int i = 0;
+      Iterator<INodeWithAdditionalFields> iter = inodesMap.getMapIterator();
+      while (iter.hasNext()) {
+        INodeWithAdditionalFields n = iter.next();
+        save(out, n);
+        ++i;
+        if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
+          context.checkCancelled();
+        }
+      }
+      parent.commitSection(summary, FSImageFormatProtobuf.SectionName.INODE);
+    }
+
+    void serializeFilesUCSection(OutputStream out) throws IOException {
+      Map<String, INodeFile> ucMap = fsn.getFilesUnderConstruction();
+      for (Map.Entry<String, INodeFile> entry : ucMap.entrySet()) {
+        String path = entry.getKey();
+        INodeFile file = entry.getValue();
+        FileUnderConstructionEntry.Builder b = FileUnderConstructionEntry
+            .newBuilder().setInodeId(file.getId()).setFullPath(path);
+        FileUnderConstructionEntry e = b.build();
+        e.writeDelimitedTo(out);
+      }
+      parent.commitSection(summary,
+          FSImageFormatProtobuf.SectionName.FILES_UNDERCONSTRUCTION);
+    }
+
+    private void save(OutputStream out, INode n) throws IOException {
+      if (n.isDirectory()) {
+        save(out, n.asDirectory());
+      } else if (n.isFile()) {
+        save(out, n.asFile());
+      } else if (n.isSymlink()) {
+        save(out, n.asSymlink());
+      }
+    }
+
+    private void save(OutputStream out, INodeDirectory n) throws IOException {
+      INodeSection.INodeDirectory.Builder b = buildINodeDirectory(n,
+          parent.getStringMap());
+      INodeSection.INode r = buildINodeCommon(n)
+          .setType(INodeSection.INode.Type.DIRECTORY).setDirectory(b).build();
+      r.writeDelimitedTo(out);
+    }
+
+    private void save(OutputStream out, INodeFile n) throws IOException {
+      INodeSection.INodeFile.Builder b = buildINodeFile(n,
+          parent.getStringMap());
+
+      for (Block block : n.getBlocks()) {
+        b.addBlocks(PBHelper.convert(block));
+      }
+
+      FileUnderConstructionFeature uc = n.getFileUnderConstructionFeature();
+      if (uc != null) {
+        INodeSection.FileUnderConstructionFeature f =
+            INodeSection.FileUnderConstructionFeature
+            .newBuilder().setClientName(uc.getClientName())
+            .setClientMachine(uc.getClientMachine()).build();
+        b.setFileUC(f);
+      }
+
+      INodeSection.INode r = buildINodeCommon(n)
+          .setType(INodeSection.INode.Type.FILE).setFile(b).build();
+      r.writeDelimitedTo(out);
+    }
+
+    private void save(OutputStream out, INodeSymlink n) throws IOException {
+      INodeSection.INodeSymlink.Builder b = INodeSection.INodeSymlink
+          .newBuilder()
+          .setPermission(buildPermissionStatus(n, parent.getStringMap()))
+          .setTarget(ByteString.copyFrom(n.getSymlink()));
+      INodeSection.INode r = buildINodeCommon(n)
+          .setType(INodeSection.INode.Type.SYMLINK).setSymlink(b).build();
+      r.writeDelimitedTo(out);
+    }
+
+    private final INodeSection.INode.Builder buildINodeCommon(INode n) {
+      return INodeSection.INode.newBuilder()
+          .setId(n.getId())
+          .setName(ByteString.copyFrom(n.getLocalNameBytes()));
+    }
+  }
+
+  private FSImageFormatPBINode() {
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
new file mode 100644
index 0000000..2edc57b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.NameSystemSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.StringTableSection;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.FSImageFormatPBSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
+import org.apache.hadoop.hdfs.util.MD5FileUtils;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressorStream;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.LimitInputStream;
+import com.google.protobuf.CodedOutputStream;
+
+/**
+ * Utility class to read / write fsimage in protobuf format.
+ */
+@InterfaceAudience.Private
+public final class FSImageFormatProtobuf {
+  private static final Log LOG = LogFactory.getLog(FSImageFormatProtobuf.class);
+
+  public static final class Loader implements FSImageFormat.AbstractLoader {
+    static final int MINIMUM_FILE_LENGTH = 8;
+    private final Configuration conf;
+    private final FSNamesystem fsn;
+
+    private String[] stringTable;
+
+    /** The MD5 sum of the loaded file */
+    private MD5Hash imgDigest;
+    /** The transaction ID of the last edit represented by the loaded file */
+    private long imgTxId;
+
+    Loader(Configuration conf, FSNamesystem fsn) {
+      this.conf = conf;
+      this.fsn = fsn;
+    }
+
+    @Override
+    public MD5Hash getLoadedImageMd5() {
+      return imgDigest;
+    }
+
+    @Override
+    public long getLoadedImageTxId() {
+      return imgTxId;
+    }
+
+    public String[] getStringTable() {
+      return stringTable;
+    }
+
+    void load(File file) throws IOException {
+      long start = System.currentTimeMillis();
+      imgDigest = MD5FileUtils.computeMd5ForFile(file);
+      RandomAccessFile raFile = new RandomAccessFile(file, "r");
+      FileInputStream fin = new FileInputStream(file);
+      try {
+        loadInternal(raFile, fin);
+        long end = System.currentTimeMillis();
+        LOG.info("Loaded FSImage in " + (end - start) / 1000 + " seconds.");
+      } finally {
+        fin.close();
+        raFile.close();
+      }
+    }
+
+    private void loadInternal(RandomAccessFile raFile, FileInputStream fin)
+        throws IOException {
+      if (!FSImageUtil.checkFileFormat(raFile)) {
+        throw new IOException("Unrecognized file format");
+      }
+      FileSummary summary = FSImageUtil.loadSummary(raFile);
+
+      FileChannel channel = fin.getChannel();
+
+      FSImageFormatPBINode.Loader inodeLoader = new FSImageFormatPBINode.Loader(
+          fsn, this);
+      FSImageFormatPBSnapshot.Loader snapshotLoader = new FSImageFormatPBSnapshot.Loader(
+          fsn, this);
+
+      ArrayList<FileSummary.Section> sections = Lists.newArrayList(summary
+          .getSectionsList());
+      Collections.sort(sections, new Comparator<FileSummary.Section>() {
+        @Override
+        public int compare(FileSummary.Section s1, FileSummary.Section s2) {
+          SectionName n1 = SectionName.fromString(s1.getName());
+          SectionName n2 = SectionName.fromString(s2.getName());
+          if (n1 == null) {
+            return n2 == null ? 0 : -1;
+          } else if (n2 == null) {
+            return -1;
+          } else {
+            return n1.ordinal() - n2.ordinal();
+          }
+        }
+      });
+
+      StartupProgress prog = NameNode.getStartupProgress();
+      /**
+       * beginStep() and the endStep() calls do not match the boundary of the
+       * sections. This is because that the current implementation only allows
+       * a particular step to be started for once.
+       */
+      Step currentStep = null;
+
+      for (FileSummary.Section s : sections) {
+        channel.position(s.getOffset());
+        InputStream in = new BufferedInputStream(new LimitInputStream(fin,
+            s.getLength()));
+
+        in = FSImageUtil.wrapInputStreamForCompression(conf,
+            summary.getCodec(), in);
+
+        String n = s.getName();
+
+        switch (SectionName.fromString(n)) {
+        case NS_INFO:
+          loadNameSystemSection(in);
+          break;
+        case STRING_TABLE:
+          loadStringTableSection(in);
+          break;
+        case INODE: {
+          currentStep = new Step(StepType.INODES);
+          prog.beginStep(Phase.LOADING_FSIMAGE, currentStep);
+          inodeLoader.loadINodeSection(in);
+        }
+          break;
+        case INODE_DIR:
+          inodeLoader.loadINodeDirectorySection(in);
+          break;
+        case FILES_UNDERCONSTRUCTION:
+          inodeLoader.loadFilesUnderConstructionSection(in);
+          break;
+        case SNAPSHOT:
+          snapshotLoader.loadSnapshotSection(in);
+          break;
+        case SNAPSHOT_DIFF:
+          snapshotLoader.loadSnapshotDiffSection(in);
+          break;
+        case SECRET_MANAGER: {
+          prog.endStep(Phase.LOADING_FSIMAGE, currentStep);
+          Step step = new Step(StepType.DELEGATION_TOKENS);
+          prog.beginStep(Phase.LOADING_FSIMAGE, step);
+          loadSecretManagerSection(in);
+          prog.endStep(Phase.LOADING_FSIMAGE, step);
+        }
+          break;
+        case CACHE_MANAGER: {
+          Step step = new Step(StepType.CACHE_POOLS);
+          prog.beginStep(Phase.LOADING_FSIMAGE, step);
+          loadCacheManagerSection(in);
+          prog.endStep(Phase.LOADING_FSIMAGE, step);
+        }
+          break;
+        default:
+          LOG.warn("Unregconized section " + n);
+          break;
+        }
+      }
+    }
+
+    private void loadNameSystemSection(InputStream in) throws IOException {
+      NameSystemSection s = NameSystemSection.parseDelimitedFrom(in);
+      fsn.setGenerationStampV1(s.getGenstampV1());
+      fsn.setGenerationStampV2(s.getGenstampV2());
+      fsn.setGenerationStampV1Limit(s.getGenstampV1Limit());
+      fsn.setLastAllocatedBlockId(s.getLastAllocatedBlockId());
+      imgTxId = s.getTransactionId();
+    }
+
+    private void loadStringTableSection(InputStream in) throws IOException {
+      StringTableSection s = StringTableSection.parseDelimitedFrom(in);
+      stringTable = new String[s.getNumEntry() + 1];
+      for (int i = 0; i < s.getNumEntry(); ++i) {
+        StringTableSection.Entry e = StringTableSection.Entry
+            .parseDelimitedFrom(in);
+        stringTable[e.getId()] = e.getStr();
+      }
+    }
+
+    private void loadSecretManagerSection(InputStream in) throws IOException {
+      SecretManagerSection s = SecretManagerSection.parseDelimitedFrom(in);
+      int numKeys = s.getNumKeys(), numTokens = s.getNumTokens();
+      ArrayList<SecretManagerSection.DelegationKey> keys = Lists
+          .newArrayListWithCapacity(numKeys);
+      ArrayList<SecretManagerSection.PersistToken> tokens = Lists
+          .newArrayListWithCapacity(numTokens);
+
+      for (int i = 0; i < numKeys; ++i)
+        keys.add(SecretManagerSection.DelegationKey.parseDelimitedFrom(in));
+
+      for (int i = 0; i < numTokens; ++i)
+        tokens.add(SecretManagerSection.PersistToken.parseDelimitedFrom(in));
+
+      fsn.loadSecretManagerState(s, keys, tokens);
+    }
+
+    private void loadCacheManagerSection(InputStream in) throws IOException {
+      CacheManagerSection s = CacheManagerSection.parseDelimitedFrom(in);
+      ArrayList<CachePoolInfoProto> pools = Lists.newArrayListWithCapacity(s
+          .getNumPools());
+      ArrayList<CacheDirectiveInfoProto> directives = Lists
+          .newArrayListWithCapacity(s.getNumDirectives());
+      for (int i = 0; i < s.getNumPools(); ++i)
+        pools.add(CachePoolInfoProto.parseDelimitedFrom(in));
+      for (int i = 0; i < s.getNumDirectives(); ++i)
+        directives.add(CacheDirectiveInfoProto.parseDelimitedFrom(in));
+      fsn.getCacheManager().loadState(
+          new CacheManager.PersistState(s, pools, directives));
+    }
+
+  }
+
+  public static final class Saver {
+    private final SaveNamespaceContext context;
+    private long currentOffset = FSImageUtil.MAGIC_HEADER.length;
+    private MD5Hash savedDigest;
+    private StringMap stringMap = new StringMap();
+
+    private FileChannel fileChannel;
+    // OutputStream for the section data
+    private OutputStream sectionOutputStream;
+    private CompressionCodec codec;
+    private OutputStream underlyingOutputStream;
+    public static final int CHECK_CANCEL_INTERVAL = 4096;
+
+    Saver(SaveNamespaceContext context) {
+      this.context = context;
+    }
+
+    public MD5Hash getSavedDigest() {
+      return savedDigest;
+    }
+
+    public SaveNamespaceContext getContext() {
+      return context;
+    }
+
+    public void commitSection(FileSummary.Builder summary, SectionName name)
+        throws IOException {
+      long oldOffset = currentOffset;
+      flushSectionOutputStream();
+
+      if (codec != null) {
+        sectionOutputStream = codec.createOutputStream(underlyingOutputStream);
+      } else {
+        sectionOutputStream = underlyingOutputStream;
+      }
+      long length = fileChannel.position() - oldOffset;
+      summary.addSections(FileSummary.Section.newBuilder().setName(name.name)
+          .setLength(length).setOffset(currentOffset));
+      currentOffset += length;
+    }
+
+    private void flushSectionOutputStream() throws IOException {
+      if (codec != null) {
+        ((CompressorStream) sectionOutputStream).finish();
+      }
+      sectionOutputStream.flush();
+    }
+
+    void save(File file, FSImageCompression compression) throws IOException {
+      FileOutputStream fout = new FileOutputStream(file);
+      fileChannel = fout.getChannel();
+      try {
+        saveInternal(fout, compression, file.getAbsolutePath().toString());
+      } finally {
+        fout.close();
+      }
+    }
+
+    private static void saveFileSummary(OutputStream out, FileSummary summary)
+        throws IOException {
+      summary.writeDelimitedTo(out);
+      int length = getOndiskTrunkSize(summary);
+      byte[] lengthBytes = new byte[4];
+      ByteBuffer.wrap(lengthBytes).asIntBuffer().put(length);
+      out.write(lengthBytes);
+    }
+
+    private void saveInodes(FileSummary.Builder summary) throws IOException {
+      FSImageFormatPBINode.Saver saver = new FSImageFormatPBINode.Saver(this,
+          summary);
+
+      saver.serializeINodeSection(sectionOutputStream);
+      saver.serializeINodeDirectorySection(sectionOutputStream);
+      saver.serializeFilesUCSection(sectionOutputStream);
+    }
+
+    private void saveSnapshots(FileSummary.Builder summary) throws IOException {
+      FSImageFormatPBSnapshot.Saver snapshotSaver = new FSImageFormatPBSnapshot.Saver(
+          this, summary, context, context.getSourceNamesystem());
+
+      snapshotSaver.serializeSnapshotSection(sectionOutputStream);
+      snapshotSaver.serializeSnapshotDiffSection(sectionOutputStream);
+    }
+
+    private void saveInternal(FileOutputStream fout,
+        FSImageCompression compression, String filePath) throws IOException {
+      StartupProgress prog = NameNode.getStartupProgress();
+      MessageDigest digester = MD5Hash.getDigester();
+
+      underlyingOutputStream = new DigestOutputStream(new BufferedOutputStream(
+          fout), digester);
+      underlyingOutputStream.write(FSImageUtil.MAGIC_HEADER);
+
+      fileChannel = fout.getChannel();
+
+      FileSummary.Builder b = FileSummary.newBuilder()
+          .setOndiskVersion(FSImageUtil.FILE_VERSION)
+          .setLayoutVersion(LayoutVersion.getCurrentLayoutVersion());
+
+      codec = compression.getImageCodec();
+      if (codec != null) {
+        b.setCodec(codec.getClass().getCanonicalName());
+        sectionOutputStream = codec.createOutputStream(underlyingOutputStream);
+      } else {
+        sectionOutputStream = underlyingOutputStream;
+      }
+
+      saveNameSystemSection(b);
+      // Check for cancellation right after serializing the name system section.
+      // Some unit tests, such as TestSaveNamespace#testCancelSaveNameSpace
+      // depends on this behavior.
+      context.checkCancelled();
+
+      Step step = new Step(StepType.INODES, filePath);
+      prog.beginStep(Phase.SAVING_CHECKPOINT, step);
+      saveInodes(b);
+      saveSnapshots(b);
+      prog.endStep(Phase.SAVING_CHECKPOINT, step);
+
+      step = new Step(StepType.DELEGATION_TOKENS, filePath);
+      prog.beginStep(Phase.SAVING_CHECKPOINT, step);
+      saveSecretManagerSection(b);
+      prog.endStep(Phase.SAVING_CHECKPOINT, step);
+
+      step = new Step(StepType.CACHE_POOLS, filePath);
+      prog.beginStep(Phase.SAVING_CHECKPOINT, step);
+      saveCacheManagerSection(b);
+      prog.endStep(Phase.SAVING_CHECKPOINT, step);
+
+      saveStringTableSection(b);
+
+      // We use the underlyingOutputStream to write the header. Therefore flush
+      // the buffered stream (which is potentially compressed) first.
+      flushSectionOutputStream();
+
+      FileSummary summary = b.build();
+      saveFileSummary(underlyingOutputStream, summary);
+      underlyingOutputStream.close();
+      savedDigest = new MD5Hash(digester.digest());
+    }
+
+    private void saveSecretManagerSection(FileSummary.Builder summary)
+        throws IOException {
+      final FSNamesystem fsn = context.getSourceNamesystem();
+      DelegationTokenSecretManager.SecretManagerState state = fsn
+          .saveSecretManagerState();
+      state.section.writeDelimitedTo(sectionOutputStream);
+      for (SecretManagerSection.DelegationKey k : state.keys)
+        k.writeDelimitedTo(sectionOutputStream);
+
+      for (SecretManagerSection.PersistToken t : state.tokens)
+        t.writeDelimitedTo(sectionOutputStream);
+
+      commitSection(summary, SectionName.SECRET_MANAGER);
+    }
+
+    private void saveCacheManagerSection(FileSummary.Builder summary)
+        throws IOException {
+      final FSNamesystem fsn = context.getSourceNamesystem();
+      CacheManager.PersistState state = fsn.getCacheManager().saveState();
+      state.section.writeDelimitedTo(sectionOutputStream);
+
+      for (CachePoolInfoProto p : state.pools)
+        p.writeDelimitedTo(sectionOutputStream);
+
+      for (CacheDirectiveInfoProto p : state.directives)
+        p.writeDelimitedTo(sectionOutputStream);
+
+      commitSection(summary, SectionName.CACHE_MANAGER);
+    }
+
+    private void saveNameSystemSection(FileSummary.Builder summary)
+        throws IOException {
+      final FSNamesystem fsn = context.getSourceNamesystem();
+      OutputStream out = sectionOutputStream;
+      NameSystemSection.Builder b = NameSystemSection.newBuilder()
+          .setGenstampV1(fsn.getGenerationStampV1())
+          .setGenstampV1Limit(fsn.getGenerationStampV1Limit())
+          .setGenstampV2(fsn.getGenerationStampV2())
+          .setLastAllocatedBlockId(fsn.getLastAllocatedBlockId())
+          .setTransactionId(context.getTxId());
+
+      // We use the non-locked version of getNamespaceInfo here since
+      // the coordinating thread of saveNamespace already has read-locked
+      // the namespace for us. If we attempt to take another readlock
+      // from the actual saver thread, there's a potential of a
+      // fairness-related deadlock. See the comments on HDFS-2223.
+      b.setNamespaceId(fsn.unprotectedGetNamespaceInfo().getNamespaceID());
+      NameSystemSection s = b.build();
+      s.writeDelimitedTo(out);
+
+      commitSection(summary, SectionName.NS_INFO);
+    }
+
+    private void saveStringTableSection(FileSummary.Builder summary)
+        throws IOException {
+      OutputStream out = sectionOutputStream;
+      StringTableSection.Builder b = StringTableSection.newBuilder()
+          .setNumEntry(stringMap.size());
+      b.build().writeDelimitedTo(out);
+      for (Entry<String, Integer> e : stringMap.entrySet()) {
+        StringTableSection.Entry.Builder eb = StringTableSection.Entry
+            .newBuilder().setId(e.getValue()).setStr(e.getKey());
+        eb.build().writeDelimitedTo(out);
+      }
+      commitSection(summary, SectionName.STRING_TABLE);
+    }
+
+    public StringMap getStringMap() {
+      return stringMap;
+    }
+  }
+
+  public static class StringMap {
+    private final Map<String, Integer> stringMap;
+
+    public StringMap() {
+      stringMap = Maps.newHashMap();
+    }
+
+    int getStringId(String str) {
+      if (str == null) {
+        return 0;
+      }
+      Integer v = stringMap.get(str);
+      if (v == null) {
+        int nv = stringMap.size() + 1;
+        stringMap.put(str, nv);
+        return nv;
+      }
+      return v;
+    }
+
+    int size() {
+      return stringMap.size();
+    }
+
+    Set<Entry<String, Integer>> entrySet() {
+      return stringMap.entrySet();
+    }
+  }
+
+  /**
+   * Supported section name. The order of the enum determines the order of
+   * loading.
+   */
+  public enum SectionName {
+    NS_INFO("NS_INFO"),
+    STRING_TABLE("STRING_TABLE"),
+    INODE("INODE"),
+    SNAPSHOT("SNAPSHOT"),
+    INODE_DIR("INODE_DIR"),
+    FILES_UNDERCONSTRUCTION("FILES_UNDERCONSTRUCTION"),
+    SNAPSHOT_DIFF("SNAPSHOT_DIFF"),
+    SECRET_MANAGER("SECRET_MANAGER"),
+    CACHE_MANAGER("CACHE_MANAGER");
+
+    private static final SectionName[] values = SectionName.values();
+
+    public static SectionName fromString(String name) {
+      for (SectionName n : values) {
+        if (n.name.equals(name))
+          return n;
+      }
+      return null;
+    }
+
+    private final String name;
+
+    private SectionName(String name) {
+      this.name = name;
+    }
+  }
+
+  private static int getOndiskTrunkSize(com.google.protobuf.GeneratedMessage s) {
+    return CodedOutputStream.computeRawVarint32Size(s.getSerializedSize())
+        + s.getSerializedSize();
+  }
+
+  private FSImageFormatProtobuf() {
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageUtil.java
new file mode 100644
index 0000000..b995348
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageUtil.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion;
+import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.Loader;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+@InterfaceAudience.Private
+public final class FSImageUtil {
+  public static final byte[] MAGIC_HEADER = "HDFSIMG1".getBytes();
+  public static final int FILE_VERSION = 1;
+
+  public static boolean checkFileFormat(RandomAccessFile file)
+      throws IOException {
+    if (file.length() < Loader.MINIMUM_FILE_LENGTH)
+      return false;
+
+    byte[] magic = new byte[MAGIC_HEADER.length];
+    file.readFully(magic);
+    if (!Arrays.equals(MAGIC_HEADER, magic))
+      return false;
+
+    return true;
+  }
+
+  public static FileSummary loadSummary(RandomAccessFile file)
+      throws IOException {
+    final int FILE_LENGTH_FIELD_SIZE = 4;
+    long fileLength = file.length();
+    file.seek(fileLength - FILE_LENGTH_FIELD_SIZE);
+    int summaryLength = file.readInt();
+
+    if (summaryLength <= 0) {
+      throw new IOException("Negative length of the file");
+    }
+    file.seek(fileLength - FILE_LENGTH_FIELD_SIZE - summaryLength);
+
+    byte[] summaryBytes = new byte[summaryLength];
+    file.readFully(summaryBytes);
+
+    FileSummary summary = FileSummary
+        .parseDelimitedFrom(new ByteArrayInputStream(summaryBytes));
+    if (summary.getOndiskVersion() != FILE_VERSION) {
+      throw new IOException("Unsupported file version "
+          + summary.getOndiskVersion());
+    }
+
+    if (!LayoutVersion.supports(Feature.PROTOBUF_FORMAT,
+        summary.getLayoutVersion())) {
+      throw new IOException("Unsupported layout version "
+          + summary.getLayoutVersion());
+    }
+    return summary;
+  }
+
+  public static InputStream wrapInputStreamForCompression(
+      Configuration conf, String codec, InputStream in) throws IOException {
+    if (codec.isEmpty())
+      return in;
+
+    FSImageCompression compression = FSImageCompression.createCompression(
+        conf, codec);
+    CompressionCodec imageCodec = compression.getImageCodec();
+    return imageCodec.createInputStream(in);
+  }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 4e20976..f91c41c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -179,6 +179,7 @@
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager.SecretManagerState;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
@@ -196,6 +197,8 @@
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection.PersistToken;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
@@ -6013,6 +6016,15 @@
   }
 
   /**
+   * @return all the under-construction files in the lease map
+   */
+  Map<String, INodeFile> getFilesUnderConstruction() {
+    synchronized (leaseManager) {
+      return leaseManager.getINodesUnderConstruction();
+    }
+  }
+
+  /**
    * Register a Backup name-node, verifying that it belongs
    * to the correct namespace, and adding it to the set of
    * active journals if necessary.
@@ -6288,6 +6300,10 @@
     dtSecretManager.saveSecretManagerStateCompat(out, sdPath);
   }
 
+  SecretManagerState saveSecretManagerState() {
+    return dtSecretManager.saveSecretManagerState();
+  }
+
   /**
    * @param in load the state of secret manager from input stream
    */
@@ -6295,6 +6311,12 @@
     dtSecretManager.loadSecretManagerStateCompat(in);
   }
 
+  void loadSecretManagerState(SecretManagerSection s,
+      List<SecretManagerSection.DelegationKey> keys,
+      List<SecretManagerSection.PersistToken> tokens) throws IOException {
+    dtSecretManager.loadSecretManagerState(new SecretManagerState(s, keys, tokens));
+  }
+
   /**
    * Log the updateMasterKey operation to edit logs
    * 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
index 83cb0a4..f9a06f1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
@@ -171,7 +171,7 @@
     return children == null? -1: Collections.binarySearch(children, name);
   }
   
-  protected DirectoryWithSnapshotFeature addSnapshotFeature(
+  public DirectoryWithSnapshotFeature addSnapshotFeature(
       DirectoryDiffList diffs) {
     Preconditions.checkState(!isWithSnapshot(), 
         "Directory is already with snapshot");
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index 500405e..80abb52 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -252,7 +252,7 @@
   
   /* Start of Snapshot Feature */
 
-  private FileWithSnapshotFeature addSnapshotFeature(FileDiffList diffs) {
+  public FileWithSnapshotFeature addSnapshotFeature(FileDiffList diffs) {
     Preconditions.checkState(!isWithSnapshot(), 
         "File is already with snapshot");
     FileWithSnapshotFeature sf = new FileWithSnapshotFeature(diffs);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
index 5ffcc21..bd0355b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -46,6 +47,10 @@
   /** Synchronized by external lock. */
   private final GSet<INode, INodeWithAdditionalFields> map;
   
+  public Iterator<INodeWithAdditionalFields> getMapIterator() {
+    return map.iterator();
+  }
+
   private INodeMap(GSet<INode, INodeWithAdditionalFields> map) {
     Preconditions.checkArgument(map != null);
     this.map = map;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java
index 67ee88e..a7c4c75 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceContext.java
@@ -22,6 +22,7 @@
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.util.Canceler;
 
@@ -32,7 +33,8 @@
  * allows cancellation, and also is responsible for accumulating
  * failed storage directories.
  */
-class SaveNamespaceContext {
+@InterfaceAudience.Private
+public class SaveNamespaceContext {
   private final FSNamesystem sourceNamesystem;
   private final long txid;
   private final List<StorageDirectory> errorSDs =
@@ -72,7 +74,7 @@
     completionLatch.countDown();
   }
 
-  void checkCancelled() throws SaveNamespaceCancelledException {
+  public void checkCancelled() throws SaveNamespaceCancelledException {
     if (canceller.isCancelled()) {
       throw new SaveNamespaceCancelledException(
           canceller.getCancellationReason());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
index 06f7a89..a9cad94 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
@@ -244,7 +244,7 @@
       this.isSnapshotRoot = isSnapshotRoot;
     }
 
-    ChildrenDiff getChildrenDiff() {
+    public ChildrenDiff getChildrenDiff() {
       return diff;
     }
     
@@ -343,6 +343,10 @@
       return super.toString() + " childrenSize=" + childrenSize + ", " + diff;
     }
 
+    int getChildrenSize() {
+      return childrenSize;
+    }
+
     @Override
     void write(DataOutput out, ReferenceMap referenceMap) throws IOException {
       writeSnapshot(out);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
new file mode 100644
index 0000000..06cc1d0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
@@ -0,0 +1,437 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.snapshot;
+
+import static org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode.Loader.loadINodeDirectory;
+import static org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode.Loader.loadINodeReference;
+import static org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode.Loader.loadPermission;
+import static org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode.Loader.updateBlocksMap;
+import static org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode.Saver.buildINodeDirectory;
+import static org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode.Saver.buildINodeFile;
+import static org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode.Saver.buildINodeReference;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SnapshotDiffSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SnapshotDiffSection.CreatedListEntry;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SnapshotDiffSection.DiffEntry.Type;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SnapshotSection;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryAttributes;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes;
+import org.apache.hadoop.hdfs.server.namenode.INodeMap;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference;
+import org.apache.hadoop.hdfs.server.namenode.INodeWithAdditionalFields;
+import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceContext;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiff;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiffList;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.Root;
+import org.apache.hadoop.hdfs.util.Diff.ListType;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+
+@InterfaceAudience.Private
+public class FSImageFormatPBSnapshot {
+  /**
+   * Loading snapshot related information from protobuf based FSImage
+   */
+  public final static class Loader {
+    private final FSNamesystem fsn;
+    private final FSDirectory fsDir;
+    private final FSImageFormatProtobuf.Loader parent;
+    private final Map<Integer, Snapshot> snapshotMap;
+
+
+    public Loader(FSNamesystem fsn, FSImageFormatProtobuf.Loader parent) {
+      this.fsn = fsn;
+      this.fsDir = fsn.getFSDirectory();
+      this.snapshotMap = new HashMap<Integer, Snapshot>();
+      this.parent = parent;
+    }
+
+    /**
+     * Load the snapshots section from fsimage. Also convert snapshottable
+     * directories into {@link INodeDirectorySnapshottable}.
+     *
+     */
+    public void loadSnapshotSection(InputStream in) throws IOException {
+      SnapshotManager sm = fsn.getSnapshotManager();
+      SnapshotSection section = SnapshotSection.parseDelimitedFrom(in);
+      int snum = section.getNumSnapshots();
+      sm.setNumSnapshots(snum);
+      sm.setSnapshotCounter(section.getSnapshotCounter());
+      for (long sdirId : section.getSnapshottableDirList()) {
+        INodeDirectory dir = fsDir.getInode(sdirId).asDirectory();
+        final INodeDirectorySnapshottable sdir;
+        if (!dir.isSnapshottable()) {
+          sdir = new INodeDirectorySnapshottable(dir);
+          fsDir.addToInodeMap(sdir);
+        } else {
+          // dir is root, and admin set root to snapshottable before
+          sdir = (INodeDirectorySnapshottable) dir;
+          sdir.setSnapshotQuota(INodeDirectorySnapshottable.SNAPSHOT_LIMIT);
+        }
+        sm.addSnapshottable(sdir);
+      }
+      loadSnapshots(in, snum);
+    }
+
+    private void loadSnapshots(InputStream in, int size) throws IOException {
+      for (int i = 0; i < size; i++) {
+        SnapshotSection.Snapshot pbs = SnapshotSection.Snapshot
+            .parseDelimitedFrom(in);
+        INodeDirectory root = loadINodeDirectory(pbs.getRoot(),
+            parent.getStringTable());
+        int sid = pbs.getSnapshotId();
+        INodeDirectorySnapshottable parent = (INodeDirectorySnapshottable) fsDir
+            .getInode(root.getId()).asDirectory();
+        Snapshot snapshot = new Snapshot(sid, root, parent);
+        // add the snapshot to parent, since we follow the sequence of
+        // snapshotsByNames when saving, we do not need to sort when loading
+        parent.addSnapshot(snapshot);
+        snapshotMap.put(sid, snapshot);
+      }
+    }
+
+    /**
+     * Load the snapshot diff section from fsimage.
+     */
+    public void loadSnapshotDiffSection(InputStream in) throws IOException {
+      while (true) {
+        SnapshotDiffSection.DiffEntry entry = SnapshotDiffSection.DiffEntry
+            .parseDelimitedFrom(in);
+        if (entry == null) {
+          break;
+        }
+        long inodeId = entry.getInodeId();
+        INode inode = fsDir.getInode(inodeId);
+        SnapshotDiffSection.DiffEntry.Type type = entry.getType();
+        switch (type) {
+        case FILEDIFF:
+          loadFileDiffList(in, inode.asFile(), entry.getNumOfDiff());
+          break;
+        case DIRECTORYDIFF:
+          loadDirectoryDiffList(in, inode.asDirectory(), entry.getNumOfDiff());
+          break;
+        }
+      }
+    }
+
+    /** Load FileDiff list for a file with snapshot feature */
+    private void loadFileDiffList(InputStream in, INodeFile file, int size)
+        throws IOException {
+      final FileDiffList diffs = new FileDiffList();
+      for (int i = 0; i < size; i++) {
+        SnapshotDiffSection.FileDiff pbf = SnapshotDiffSection.FileDiff
+            .parseDelimitedFrom(in);
+        INodeFileAttributes copy = null;
+        if (pbf.hasSnapshotCopy()) {
+          INodeSection.INodeFile fileInPb = pbf.getSnapshotCopy();
+          PermissionStatus permission = loadPermission(
+              fileInPb.getPermission(), parent.getStringTable());
+          copy = new INodeFileAttributes.SnapshotCopy(pbf.getName()
+              .toByteArray(), permission, fileInPb.getModificationTime(),
+              fileInPb.getAccessTime(), (short) fileInPb.getReplication(),
+              fileInPb.getPreferredBlockSize());
+        }
+
+        FileDiff diff = new FileDiff(pbf.getSnapshotId(), copy, null,
+            pbf.getFileSize());
+        diffs.addFirst(diff);
+      }
+      file.addSnapshotFeature(diffs);
+    }
+
+    /** Load the created list in a DirectoryDiff */
+    private List<INode> loadCreatedList(InputStream in, INodeDirectory dir,
+        int size) throws IOException {
+      List<INode> clist = new ArrayList<INode>(size);
+      for (long c = 0; c < size; c++) {
+        CreatedListEntry entry = CreatedListEntry.parseDelimitedFrom(in);
+        INode created = SnapshotFSImageFormat.loadCreated(entry.getName()
+            .toByteArray(), dir);
+        clist.add(created);
+      }
+      return clist;
+    }
+
+    private void addToDeletedList(INode dnode, INodeDirectory parent) {
+      dnode.setParent(parent);
+      if (dnode.isFile()) {
+        updateBlocksMap(dnode.asFile(), fsn.getBlockManager());
+      }
+    }
+
+    /**
+     * Load the deleted list in a DirectoryDiff
+     * @param totalSize the total size of the deleted list
+     * @param deletedNodes non-reference inodes in the deleted list. These
+     *        inodes' ids are directly recorded in protobuf
+     */
+    private List<INode> loadDeletedList(InputStream in, INodeDirectory dir,
+        int refNum, List<Long> deletedNodes) throws IOException {
+      List<INode> dlist = new ArrayList<INode>(refNum + deletedNodes.size());
+      // load non-reference inodes
+      for (long deletedId : deletedNodes) {
+        INode deleted = fsDir.getInode(deletedId);
+        dlist.add(deleted);
+        addToDeletedList(deleted, dir);
+      }
+      // load reference nodes in the deleted list
+      for (int r = 0; r < refNum; r++) {
+        INodeSection.INodeReference ref = INodeSection.INodeReference
+            .parseDelimitedFrom(in);
+        INodeReference refNode = loadINodeReference(ref, fsDir);
+        dlist.add(refNode);
+        addToDeletedList(refNode, dir);
+      }
+      Collections.sort(dlist, new Comparator<INode>() {
+        @Override
+        public int compare(INode n1, INode n2) {
+          return n1.compareTo(n2.getLocalNameBytes());
+        }
+      });
+      return dlist;
+    }
+
+    /** Load DirectoryDiff list for a directory with snapshot feature */
+    private void loadDirectoryDiffList(InputStream in, INodeDirectory dir,
+        int size) throws IOException {
+      if (!dir.isWithSnapshot()) {
+        dir.addSnapshotFeature(null);
+      }
+      DirectoryDiffList diffs = dir.getDiffs();
+      for (int i = 0; i < size; i++) {
+        // load a directory diff
+        SnapshotDiffSection.DirectoryDiff diffInPb = SnapshotDiffSection.
+            DirectoryDiff.parseDelimitedFrom(in);
+        final int snapshotId = diffInPb.getSnapshotId();
+        final Snapshot snapshot = snapshotMap.get(snapshotId);
+        int childrenSize = diffInPb.getChildrenSize();
+        boolean useRoot = diffInPb.getIsSnapshotRoot();
+        INodeDirectoryAttributes copy = null;
+        if (useRoot) {
+          copy = snapshot.getRoot();
+        }else if (diffInPb.hasSnapshotCopy()) {
+          INodeSection.INodeDirectory dirCopyInPb = diffInPb.getSnapshotCopy();
+          final byte[] name = diffInPb.getName().toByteArray();
+          PermissionStatus permission = loadPermission(dirCopyInPb
+              .getPermission(), parent.getStringTable());
+          long modTime = dirCopyInPb.getModificationTime();
+          boolean noQuota = dirCopyInPb.getNsQuota() == -1
+              && dirCopyInPb.getDsQuota() == -1;
+          copy = noQuota ? new INodeDirectoryAttributes.SnapshotCopy(name,
+              permission, modTime)
+              : new INodeDirectoryAttributes.CopyWithQuota(name, permission,
+                  modTime, dirCopyInPb.getNsQuota(), dirCopyInPb.getDsQuota());
+        }
+        // load created list
+        List<INode> clist = loadCreatedList(in, dir,
+            diffInPb.getCreatedListSize());
+        // load deleted list
+        List<INode> dlist = loadDeletedList(in, dir,
+            diffInPb.getNumOfDeletedRef(), diffInPb.getDeletedINodeList());
+        // create the directory diff
+        DirectoryDiff diff = new DirectoryDiff(snapshotId, copy, null,
+            childrenSize, clist, dlist, useRoot);
+        diffs.addFirst(diff);
+      }
+    }
+  }
+
+  /**
+   * Saving snapshot related information to protobuf based FSImage
+   */
+  public final static class Saver {
+    private final FSNamesystem fsn;
+    private final FileSummary.Builder headers;
+    private final FSImageFormatProtobuf.Saver parent;
+    private final SaveNamespaceContext context;
+
+    public Saver(FSImageFormatProtobuf.Saver parent,
+        FileSummary.Builder headers, SaveNamespaceContext context, FSNamesystem fsn) {
+      this.parent = parent;
+      this.headers = headers;
+      this.context = context;
+      this.fsn = fsn;
+    }
+
+    /**
+     * save all the snapshottable directories and snapshots to fsimage
+     */
+    public void serializeSnapshotSection(OutputStream out) throws IOException {
+      SnapshotManager sm = fsn.getSnapshotManager();
+      SnapshotSection.Builder b = SnapshotSection.newBuilder()
+          .setSnapshotCounter(sm.getSnapshotCounter())
+          .setNumSnapshots(sm.getNumSnapshots());
+
+      INodeDirectorySnapshottable[] snapshottables = sm.getSnapshottableDirs();
+      for (INodeDirectorySnapshottable sdir : snapshottables) {
+        b.addSnapshottableDir(sdir.getId());
+      }
+      b.build().writeDelimitedTo(out);
+      int i = 0;
+      for(INodeDirectorySnapshottable sdir : snapshottables) {
+        for(Snapshot s : sdir.getSnapshotsByNames()) {
+          Root sroot = s.getRoot();
+          SnapshotSection.Snapshot.Builder sb = SnapshotSection.Snapshot
+              .newBuilder().setSnapshotId(s.getId());
+          INodeSection.INodeDirectory.Builder db = buildINodeDirectory(sroot,
+              parent.getStringMap());
+          INodeSection.INode r = INodeSection.INode.newBuilder()
+              .setId(sroot.getId())
+              .setType(INodeSection.INode.Type.DIRECTORY)
+              .setName(ByteString.copyFrom(sroot.getLocalNameBytes()))
+              .setDirectory(db).build();
+          sb.setRoot(r).build().writeDelimitedTo(out);
+          i++;
+          if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
+            context.checkCancelled();
+          }
+        }
+      }
+      Preconditions.checkState(i == sm.getNumSnapshots());
+      parent.commitSection(headers, FSImageFormatProtobuf.SectionName.SNAPSHOT);
+    }
+
+    /**
+     * save all the snapshot diff to fsimage
+     */
+    public void serializeSnapshotDiffSection(OutputStream out)
+        throws IOException {
+      INodeMap inodesMap = fsn.getFSDirectory().getINodeMap();
+      int i = 0;
+      Iterator<INodeWithAdditionalFields> iter = inodesMap.getMapIterator();
+      while (iter.hasNext()) {
+        INodeWithAdditionalFields inode = iter.next();
+        if (inode.isFile()) {
+          serializeFileDiffList(inode.asFile(), out);
+        } else if (inode.isDirectory()) {
+          serializeDirDiffList(inode.asDirectory(), out);
+        }
+        ++i;
+        if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
+          context.checkCancelled();
+        }
+      }
+      parent.commitSection(headers,
+          FSImageFormatProtobuf.SectionName.SNAPSHOT_DIFF);
+    }
+
+    private void serializeFileDiffList(INodeFile file, OutputStream out)
+        throws IOException {
+      FileWithSnapshotFeature sf = file.getFileWithSnapshotFeature();
+      if (sf != null) {
+        List<FileDiff> diffList = sf.getDiffs().asList();
+        SnapshotDiffSection.DiffEntry entry = SnapshotDiffSection.DiffEntry
+            .newBuilder().setInodeId(file.getId()).setType(Type.FILEDIFF)
+            .setNumOfDiff(diffList.size()).build();
+        entry.writeDelimitedTo(out);
+        for (int i = diffList.size() - 1; i >= 0; i--) {
+          FileDiff diff = diffList.get(i);
+          SnapshotDiffSection.FileDiff.Builder fb = SnapshotDiffSection.FileDiff
+              .newBuilder().setSnapshotId(diff.getSnapshotId())
+              .setFileSize(diff.getFileSize());
+          INodeFileAttributes copy = diff.snapshotINode;
+          if (copy != null) {
+            fb.setName(ByteString.copyFrom(copy.getLocalNameBytes()))
+                .setSnapshotCopy(buildINodeFile(copy, parent.getStringMap()));
+          }
+          fb.build().writeDelimitedTo(out);
+        }
+      }
+    }
+
+    private void saveCreatedDeletedList(List<INode> created,
+        List<INodeReference> deletedRefs, OutputStream out) throws IOException {
+      // local names of the created list member
+      for (INode c : created) {
+        SnapshotDiffSection.CreatedListEntry.newBuilder()
+            .setName(ByteString.copyFrom(c.getLocalNameBytes())).build()
+            .writeDelimitedTo(out);
+      }
+      // reference nodes in deleted list
+      for (INodeReference ref : deletedRefs) {
+        INodeSection.INodeReference.Builder rb = buildINodeReference(ref);
+        rb.build().writeDelimitedTo(out);
+      }
+    }
+
+    private void serializeDirDiffList(INodeDirectory dir, OutputStream out)
+        throws IOException {
+      DirectoryWithSnapshotFeature sf = dir.getDirectoryWithSnapshotFeature();
+      if (sf != null) {
+        List<DirectoryDiff> diffList = sf.getDiffs().asList();
+        SnapshotDiffSection.DiffEntry entry = SnapshotDiffSection.DiffEntry
+            .newBuilder().setInodeId(dir.getId()).setType(Type.DIRECTORYDIFF)
+            .setNumOfDiff(diffList.size()).build();
+        entry.writeDelimitedTo(out);
+        for (int i = diffList.size() - 1; i >= 0; i--) { // reverse order!
+          DirectoryDiff diff = diffList.get(i);
+          SnapshotDiffSection.DirectoryDiff.Builder db = SnapshotDiffSection.
+              DirectoryDiff.newBuilder().setSnapshotId(diff.getSnapshotId())
+                           .setChildrenSize(diff.getChildrenSize())
+                           .setIsSnapshotRoot(diff.isSnapshotRoot());
+          INodeDirectoryAttributes copy = diff.snapshotINode;
+          if (!diff.isSnapshotRoot() && copy != null) {
+            db.setName(ByteString.copyFrom(copy.getLocalNameBytes()))
+                .setSnapshotCopy(
+                    buildINodeDirectory(copy, parent.getStringMap()));
+          }
+          // process created list and deleted list
+          List<INode> created = diff.getChildrenDiff()
+              .getList(ListType.CREATED);
+          db.setCreatedListSize(created.size());
+          List<INode> deleted = diff.getChildrenDiff().getList(ListType.DELETED);
+          List<INodeReference> refs = new ArrayList<INodeReference>();
+          for (INode d : deleted) {
+            if (d.isReference()) {
+              refs.add(d.asReference());
+            } else {
+              db.addDeletedINode(d.getId());
+            }
+          }
+          db.setNumOfDeletedRef(refs.size());
+          db.build().writeDelimitedTo(out);
+          saveCreatedDeletedList(created, refs, out);
+        }
+      }
+    }
+  }
+
+  private FSImageFormatPBSnapshot(){}
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java
index e836cd8..69fdf97 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java
@@ -27,7 +27,6 @@
 
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.server.namenode.FSImageFormat;
-import org.apache.hadoop.hdfs.server.namenode.FSImageFormat.Loader;
 import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeAttributes;
@@ -137,7 +136,7 @@
    * @param parent The directory that the created list belongs to.
    * @return The created node.
    */
-  private static INode loadCreated(byte[] createdNodeName,
+  public static INode loadCreated(byte[] createdNodeName,
       INodeDirectory parent) throws IOException {
     // the INode in the created list should be a reference to another INode
     // in posterior SnapshotDiffs or one of the current children
@@ -209,11 +208,13 @@
   
   /**
    * Load snapshots and snapshotQuota for a Snapshottable directory.
-   * @param snapshottableParent The snapshottable directory for loading.
-   * @param numSnapshots The number of snapshots that the directory has.
-   * @param in The {@link DataInput} instance to read.
-   * @param loader The {@link Loader} instance that this loading procedure is 
-   *               using.
+   *
+   * @param snapshottableParent
+   *          The snapshottable directory for loading.
+   * @param numSnapshots
+   *          The number of snapshots that the directory has.
+   * @param loader
+   *          The loader
    */
   public static void loadSnapshotList(
       INodeDirectorySnapshottable snapshottableParent, int numSnapshots,
@@ -231,10 +232,13 @@
   /**
    * Load the {@link SnapshotDiff} list for the INodeDirectoryWithSnapshot
    * directory.
-   * @param dir The snapshottable directory for loading.
-   * @param in The {@link DataInput} instance to read.
-   * @param loader The {@link Loader} instance that this loading procedure is 
-   *               using.
+   *
+   * @param dir
+   *          The snapshottable directory for loading.
+   * @param in
+   *          The {@link DataInput} instance to read.
+   * @param loader
+   *          The loader
    */
   public static void loadDirectoryDiffList(INodeDirectory dir,
       DataInput in, FSImageFormat.Loader loader) throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
index 8fa0f0c..be1ddc0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
@@ -270,6 +270,23 @@
     return numSnapshots.get();
   }
   
+  void setNumSnapshots(int num) {
+    numSnapshots.set(num);
+  }
+
+  int getSnapshotCounter() {
+    return snapshotCounter;
+  }
+
+  void setSnapshotCounter(int counter) {
+    snapshotCounter = counter;
+  }
+
+  INodeDirectorySnapshottable[] getSnapshottableDirs() {
+    return snapshottables.values().toArray(
+        new INodeDirectorySnapshottable[snapshottables.size()]);
+  }
+
   /**
    * Write {@link #snapshotCounter}, {@link #numSnapshots},
    * and all snapshots to the DataOutput.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FileDistributionCalculator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FileDistributionCalculator.java
new file mode 100644
index 0000000..2433b28
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FileDistributionCalculator.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.tools.offlineImageViewer;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.io.RandomAccessFile;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
+import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SectionName;
+import org.apache.hadoop.hdfs.server.namenode.FSImageUtil;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
+import org.apache.hadoop.io.IOUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.LimitInputStream;
+
+/**
+ * This is the tool for analyzing file sizes in the namespace image. In order to
+ * run the tool one should define a range of integers <tt>[0, maxSize]</tt> by
+ * specifying <tt>maxSize</tt> and a <tt>step</tt>. The range of integers is
+ * divided into segments of size <tt>step</tt>:
+ * <tt>[0, s<sub>1</sub>, ..., s<sub>n-1</sub>, maxSize]</tt>, and the visitor
+ * calculates how many files in the system fall into each segment
+ * <tt>[s<sub>i-1</sub>, s<sub>i</sub>)</tt>. Note that files larger than
+ * <tt>maxSize</tt> always fall into the very last segment.
+ *
+ * <h3>Input.</h3>
+ * <ul>
+ * <li><tt>filename</tt> specifies the location of the image file;</li>
+ * <li><tt>maxSize</tt> determines the range <tt>[0, maxSize]</tt> of files
+ * sizes considered by the visitor;</li>
+ * <li><tt>step</tt> the range is divided into segments of size step.</li>
+ * </ul>
+ *
+ * <h3>Output.</h3> The output file is formatted as a tab separated two column
+ * table: Size and NumFiles. Where Size represents the start of the segment, and
+ * numFiles is the number of files form the image which size falls in this
+ * segment.
+ *
+ */
+final class FileDistributionCalculator {
+  private final static long MAX_SIZE_DEFAULT = 0x2000000000L; // 1/8 TB = 2^37
+  private final static int INTERVAL_DEFAULT = 0x200000; // 2 MB = 2^21
+
+  private final Configuration conf;
+  private final long maxSize;
+  private final int steps;
+  private final PrintWriter out;
+
+  private int[] distribution;
+  private int totalFiles;
+  private int totalDirectories;
+  private int totalBlocks;
+  private long totalSpace;
+  private long maxFileSize;
+
+  FileDistributionCalculator(Configuration conf, long maxSize, int steps,
+      PrintWriter out) {
+    this.conf = conf;
+    this.maxSize = maxSize == 0 ? MAX_SIZE_DEFAULT : maxSize;
+    this.steps = steps == 0 ? INTERVAL_DEFAULT : steps;
+    this.out = out;
+    long numIntervals = this.maxSize / this.steps;
+    this.distribution = new int[1 + (int) (numIntervals)];
+    Preconditions.checkState(numIntervals < Integer.MAX_VALUE,
+        "Too many distribution intervals");
+  }
+
+  void visit(RandomAccessFile file) throws IOException {
+    if (!FSImageUtil.checkFileFormat(file)) {
+      throw new IOException("Unrecognized FSImage");
+    }
+
+    FileSummary summary = FSImageUtil.loadSummary(file);
+    FileInputStream in = null;
+    try {
+      in = new FileInputStream(file.getFD());
+      for (FileSummary.Section s : summary.getSectionsList()) {
+        if (SectionName.fromString(s.getName()) != SectionName.INODE) {
+          continue;
+        }
+
+        in.getChannel().position(s.getOffset());
+        InputStream is = FSImageUtil.wrapInputStreamForCompression(conf,
+            summary.getCodec(), new BufferedInputStream(new LimitInputStream(
+                in, s.getLength())));
+        run(is);
+        output();
+      }
+    } finally {
+      IOUtils.cleanup(null, in);
+    }
+  }
+
+  private void run(InputStream in) throws IOException {
+    INodeSection s = INodeSection.parseDelimitedFrom(in);
+    for (int i = 0; i < s.getNumInodes(); ++i) {
+      INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
+      if (p.getType() == INodeSection.INode.Type.FILE) {
+        ++totalFiles;
+        INodeSection.INodeFile f = p.getFile();
+        totalBlocks += f.getBlocksCount();
+        long fileSize = 0;
+        for (BlockProto b : f.getBlocksList()) {
+          fileSize += b.getNumBytes() * f.getReplication();
+        }
+        maxFileSize = Math.max(fileSize, maxFileSize);
+        totalSpace += fileSize;
+
+        int bucket = fileSize > maxSize ? distribution.length - 1 : (int) Math
+            .ceil((double)fileSize / steps);
+        ++distribution[bucket];
+
+      } else if (p.getType() == INodeSection.INode.Type.DIRECTORY) {
+        ++totalDirectories;
+      }
+
+      if (i % (1 << 20) == 0) {
+        out.println("Processed " + i + " inodes.");
+      }
+    }
+  }
+
+  private void output() {
+    // write the distribution into the output file
+    out.print("Size\tNumFiles\n");
+    for (int i = 0; i < distribution.length; i++) {
+      if (distribution[i] != 0) {
+        out.print(((long) i * steps) + "\t" + distribution[i]);
+        out.print('\n');
+      }
+    }
+    out.print("totalFiles = " + totalFiles + "\n");
+    out.print("totalDirectories = " + totalDirectories + "\n");
+    out.print("totalBlocks = " + totalBlocks + "\n");
+    out.print("totalSpace = " + totalSpace + "\n");
+    out.print("maxFileSize = " + maxFileSize + "\n");
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
index c529fb5..19b8591 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
@@ -127,7 +127,7 @@
                                       new SimpleDateFormat("yyyy-MM-dd HH:mm");
   private static int[] versions = { -16, -17, -18, -19, -20, -21, -22, -23,
       -24, -25, -26, -27, -28, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39,
-      -40, -41, -42, -43, -44, -45, -46, -47, -48, -49, -50, -51 };
+      -40, -41, -42, -43, -44, -45, -46, -47, -48, -49, -50, -51, -52 };
   private int imageVersion = 0;
   
   private final Map<Long, Boolean> subtreeMap = new HashMap<Long, Boolean>();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/LsrPBImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/LsrPBImage.java
new file mode 100644
index 0000000..e467725
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/LsrPBImage.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.tools.offlineImageViewer;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
+import org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode;
+import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SectionName;
+import org.apache.hadoop.hdfs.server.namenode.FSImageUtil;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeDirectorySection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INode;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INodeDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INodeFile;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INodeSymlink;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.StringTableSection;
+import org.apache.hadoop.hdfs.server.namenode.INodeId;
+import org.apache.hadoop.io.IOUtils;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.LimitInputStream;
+
+/**
+ * This is the tool for analyzing file sizes in the namespace image. In order to
+ * run the tool one should define a range of integers <tt>[0, maxSize]</tt> by
+ * specifying <tt>maxSize</tt> and a <tt>step</tt>. The range of integers is
+ * divided into segments of size <tt>step</tt>:
+ * <tt>[0, s<sub>1</sub>, ..., s<sub>n-1</sub>, maxSize]</tt>, and the visitor
+ * calculates how many files in the system fall into each segment
+ * <tt>[s<sub>i-1</sub>, s<sub>i</sub>)</tt>. Note that files larger than
+ * <tt>maxSize</tt> always fall into the very last segment.
+ *
+ * <h3>Input.</h3>
+ * <ul>
+ * <li><tt>filename</tt> specifies the location of the image file;</li>
+ * <li><tt>maxSize</tt> determines the range <tt>[0, maxSize]</tt> of files
+ * sizes considered by the visitor;</li>
+ * <li><tt>step</tt> the range is divided into segments of size step.</li>
+ * </ul>
+ *
+ * <h3>Output.</h3> The output file is formatted as a tab separated two column
+ * table: Size and NumFiles. Where Size represents the start of the segment, and
+ * numFiles is the number of files form the image which size falls in this
+ * segment.
+ * 
+ */
+final class LsrPBImage {
+  private final Configuration conf;
+  private final PrintWriter out;
+  private String[] stringTable;
+  private HashMap<Long, INodeSection.INode> inodes = Maps.newHashMap();
+  private HashMap<Long, long[]> dirmap = Maps.newHashMap();
+
+  public LsrPBImage(Configuration conf, PrintWriter out) {
+    this.conf = conf;
+    this.out = out;
+  }
+
+  public void visit(RandomAccessFile file) throws IOException {
+    if (!FSImageUtil.checkFileFormat(file)) {
+      throw new IOException("Unrecognized FSImage");
+    }
+
+    FileSummary summary = FSImageUtil.loadSummary(file);
+    FileInputStream fin = null;
+    try {
+      fin = new FileInputStream(file.getFD());
+
+      ArrayList<FileSummary.Section> sections = Lists.newArrayList(summary
+          .getSectionsList());
+      Collections.sort(sections, new Comparator<FileSummary.Section>() {
+        @Override
+        public int compare(FileSummary.Section s1, FileSummary.Section s2) {
+          SectionName n1 = SectionName.fromString(s1.getName());
+          SectionName n2 = SectionName.fromString(s2.getName());
+          if (n1 == null) {
+            return n2 == null ? 0 : -1;
+          } else if (n2 == null) {
+            return -1;
+          } else {
+            return n1.ordinal() - n2.ordinal();
+          }
+        }
+      });
+
+      for (FileSummary.Section s : sections) {
+        fin.getChannel().position(s.getOffset());
+        InputStream is = FSImageUtil.wrapInputStreamForCompression(conf,
+            summary.getCodec(), new BufferedInputStream(new LimitInputStream(
+                fin, s.getLength())));
+
+        switch (SectionName.fromString(s.getName())) {
+        case STRING_TABLE:
+          loadStringTable(is);
+          break;
+        case INODE:
+          loadINodeSection(is);
+          break;
+        case INODE_DIR:
+          loadINodeDirectorySection(is);
+          break;
+        default:
+          break;
+        }
+      }
+      list("", INodeId.ROOT_INODE_ID);
+    } finally {
+      IOUtils.cleanup(null, fin);
+    }
+  }
+
+  private void list(String parent, long dirId) {
+    INode inode = inodes.get(dirId);
+    listINode(parent.isEmpty() ? "/" : parent, inode);
+    long[] children = dirmap.get(dirId);
+    if (children == null) {
+      return;
+    }
+    String newParent = parent + inode.getName().toStringUtf8() + "/";
+    for (long cid : children) {
+      list(newParent, cid);
+    }
+  }
+
+  private void listINode(String parent, INode inode) {
+    switch (inode.getType()) {
+    case FILE: {
+      INodeFile f = inode.getFile();
+      PermissionStatus p = FSImageFormatPBINode.Loader.loadPermission(
+          f.getPermission(), stringTable);
+      out.print(String.format("-%s %2s %8s %10s %10s %10d %s%s\n", p
+          .getPermission().toString(), f.getReplication(), p.getUserName(), p
+          .getGroupName(), f.getModificationTime(), getFileSize(f), parent,
+          inode.getName().toStringUtf8()));
+    }
+      break;
+    case DIRECTORY: {
+      INodeDirectory d = inode.getDirectory();
+      PermissionStatus p = FSImageFormatPBINode.Loader.loadPermission(
+          d.getPermission(), stringTable);
+      out.print(String.format("d%s  - %8s %10s %10s %10d %s%s\n", p
+          .getPermission().toString(), p.getUserName(), p.getGroupName(), d
+          .getModificationTime(), 0, parent, inode.getName().toStringUtf8()));
+    }
+      break;
+    case SYMLINK: {
+      INodeSymlink d = inode.getSymlink();
+      PermissionStatus p = FSImageFormatPBINode.Loader.loadPermission(
+          d.getPermission(), stringTable);
+      out.print(String.format("-%s  - %8s %10s %10s %10d %s%s -> %s\n", p
+          .getPermission().toString(), p.getUserName(), p.getGroupName(), 0, 0,
+          parent, inode.getName().toStringUtf8(), d.getTarget().toStringUtf8()));
+    }
+      break;
+    default:
+      break;
+    }
+  }
+
+  private long getFileSize(INodeFile f) {
+    long size = 0;
+    for (BlockProto p : f.getBlocksList()) {
+      size += p.getNumBytes();
+    }
+    return size;
+  }
+
+  private void loadINodeDirectorySection(InputStream in) throws IOException {
+    while (true) {
+      INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry
+          .parseDelimitedFrom(in);
+      // note that in is a LimitedInputStream
+      if (e == null) {
+        break;
+      }
+      long[] l = new long[e.getChildrenCount()];
+      for (int i = 0; i < l.length; ++i) {
+        l[i] = e.getChildren(i);
+      }
+      dirmap.put(e.getParent(), l);
+      for (int i = 0; i < e.getNumOfRef(); i++) {
+        INodeSection.INodeReference.parseDelimitedFrom(in);
+      }
+    }
+  }
+
+  private void loadINodeSection(InputStream in) throws IOException {
+    INodeSection s = INodeSection.parseDelimitedFrom(in);
+    for (int i = 0; i < s.getNumInodes(); ++i) {
+      INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
+      inodes.put(p.getId(), p);
+    }
+  }
+
+  private void loadStringTable(InputStream in) throws IOException {
+    StringTableSection s = StringTableSection.parseDelimitedFrom(in);
+    stringTable = new String[s.getNumEntry() + 1];
+    for (int i = 0; i < s.getNumEntry(); ++i) {
+      StringTableSection.Entry e = StringTableSection.Entry
+          .parseDelimitedFrom(in);
+      stringTable[e.getId()] = e.getStr();
+    }
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java
new file mode 100644
index 0000000..2d8c42d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.tools.offlineImageViewer;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.RandomAccessFile;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * OfflineImageViewer to dump the contents of an Hadoop image file to XML or the
+ * console. Main entry point into utility, either via the command line or
+ * programatically.
+ */
+@InterfaceAudience.Private
+public class OfflineImageViewerPB {
+  public static final Log LOG = LogFactory.getLog(OfflineImageViewerPB.class);
+
+  private final static String usage = "Usage: bin/hdfs oiv [OPTIONS] -i INPUTFILE -o OUTPUTFILE\n"
+      + "Offline Image Viewer\n"
+      + "View a Hadoop fsimage INPUTFILE using the specified PROCESSOR,\n"
+      + "saving the results in OUTPUTFILE.\n"
+      + "\n"
+      + "The oiv utility will attempt to parse correctly formed image files\n"
+      + "and will abort fail with mal-formed image files.\n"
+      + "\n"
+      + "The tool works offline and does not require a running cluster in\n"
+      + "order to process an image file.\n"
+      + "\n"
+      + "The following image processors are available:\n"
+      + "  * Ls: The default image processor generates an lsr-style listing\n"
+      + "    of the files in the namespace, with the same fields in the same\n"
+      + "    order.  Note that in order to correctly determine file sizes,\n"
+      + "    this formatter cannot skip blocks and will override the\n"
+      + "    -skipBlocks option.\n"
+      + "  * XML: This processor creates an XML document with all elements of\n"
+      + "    the fsimage enumerated, suitable for further analysis by XML\n"
+      + "    tools.\n"
+      + "  * FileDistribution: This processor analyzes the file size\n"
+      + "    distribution in the image.\n"
+      + "    -maxSize specifies the range [0, maxSize] of file sizes to be\n"
+      + "     analyzed (128GB by default).\n"
+      + "    -step defines the granularity of the distribution. (2MB by default)\n"
+      + "\n"
+      + "Required command line arguments:\n"
+      + "-i,--inputFile <arg>   FSImage file to process.\n"
+      + "-o,--outputFile <arg>  Name of output file. If the specified\n"
+      + "                       file exists, it will be overwritten.\n"
+      + "\n"
+      + "Optional command line arguments:\n"
+      + "-p,--processor <arg>   Select which type of processor to apply\n"
+      + "                       against image file."
+      + " (Ls|XML|FileDistribution).\n"
+      + "-h,--help              Display usage information and exit\n";
+
+  /**
+   * Build command-line options and descriptions
+   */
+  private static Options buildOptions() {
+    Options options = new Options();
+
+    // Build in/output file arguments, which are required, but there is no
+    // addOption method that can specify this
+    OptionBuilder.isRequired();
+    OptionBuilder.hasArgs();
+    OptionBuilder.withLongOpt("outputFile");
+    options.addOption(OptionBuilder.create("o"));
+
+    OptionBuilder.isRequired();
+    OptionBuilder.hasArgs();
+    OptionBuilder.withLongOpt("inputFile");
+    options.addOption(OptionBuilder.create("i"));
+
+    options.addOption("p", "processor", true, "");
+    options.addOption("h", "help", false, "");
+    options.addOption("skipBlocks", false, "");
+    options.addOption("printToScreen", false, "");
+    options.addOption("delimiter", true, "");
+
+    return options;
+  }
+
+  /**
+   * Entry point to command-line-driven operation. User may specify options and
+   * start fsimage viewer from the command line. Program will process image file
+   * and exit cleanly or, if an error is encountered, inform user and exit.
+   * 
+   * @param args
+   *          Command line options
+   * @throws IOException
+   */
+  public static void main(String[] args) throws IOException {
+    Options options = buildOptions();
+    if (args.length == 0) {
+      printUsage();
+      return;
+    }
+
+    CommandLineParser parser = new PosixParser();
+    CommandLine cmd;
+
+    try {
+      cmd = parser.parse(options, args);
+    } catch (ParseException e) {
+      System.out.println("Error parsing command-line options: ");
+      printUsage();
+      return;
+    }
+
+    if (cmd.hasOption("h")) { // print help and exit
+      printUsage();
+      return;
+    }
+
+    String inputFile = cmd.getOptionValue("i");
+    String processor = cmd.getOptionValue("p", "Ls");
+    String outputFile = cmd.getOptionValue("o");
+
+    PrintWriter out = (outputFile == null || outputFile.equals("-")) ? new PrintWriter(
+        System.out) : new PrintWriter(new File(outputFile));
+
+    Configuration conf = new Configuration();
+    try {
+      if (processor.equals("FileDistribution")) {
+        long maxSize = Long.parseLong(cmd.getOptionValue("maxSize", "0"));
+        int step = Integer.parseInt(cmd.getOptionValue("step", "0"));
+        new FileDistributionCalculator(conf, maxSize, step, out)
+            .visit(new RandomAccessFile(inputFile, "r"));
+      } else if (processor.equals("XML")) {
+        new PBImageXmlWriter(conf, out).visit(new RandomAccessFile(inputFile,
+            "r"));
+      } else {
+        new LsrPBImage(conf, out).visit(new RandomAccessFile(inputFile, "r"));
+      }
+    } catch (EOFException e) {
+      System.err.println("Input file ended unexpectedly. Exiting");
+    } catch (IOException e) {
+      System.err.println("Encountered exception.  Exiting: " + e.getMessage());
+    } finally {
+      out.close();
+    }
+
+  }
+
+  /**
+   * Print application usage instructions.
+   */
+  private static void printUsage() {
+    System.out.println(usage);
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageXmlWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageXmlWriter.java
new file mode 100644
index 0000000..7ebf119
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageXmlWriter.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.tools.offlineImageViewer;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoExpirationProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
+import org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode;
+import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SectionName;
+import org.apache.hadoop.hdfs.server.namenode.FSImageUtil;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.CacheManagerSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FilesUnderConstructionSection.FileUnderConstructionEntry;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeDirectorySection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INodeDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INodeSymlink;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.NameSystemSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SecretManagerSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SnapshotDiffSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SnapshotSection;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.StringTableSection;
+import org.apache.hadoop.io.IOUtils;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.LimitInputStream;
+
+/**
+ * This is the tool for analyzing file sizes in the namespace image. In order to
+ * run the tool one should define a range of integers <tt>[0, maxSize]</tt> by
+ * specifying <tt>maxSize</tt> and a <tt>step</tt>. The range of integers is
+ * divided into segments of size <tt>step</tt>:
+ * <tt>[0, s<sub>1</sub>, ..., s<sub>n-1</sub>, maxSize]</tt>, and the visitor
+ * calculates how many files in the system fall into each segment
+ * <tt>[s<sub>i-1</sub>, s<sub>i</sub>)</tt>. Note that files larger than
+ * <tt>maxSize</tt> always fall into the very last segment.
+ *
+ * <h3>Input.</h3>
+ * <ul>
+ * <li><tt>filename</tt> specifies the location of the image file;</li>
+ * <li><tt>maxSize</tt> determines the range <tt>[0, maxSize]</tt> of files
+ * sizes considered by the visitor;</li>
+ * <li><tt>step</tt> the range is divided into segments of size step.</li>
+ * </ul>
+ *
+ * <h3>Output.</h3> The output file is formatted as a tab separated two column
+ * table: Size and NumFiles. Where Size represents the start of the segment, and
+ * numFiles is the number of files form the image which size falls in this
+ * segment.
+ *
+ */
+@InterfaceAudience.Private
+public final class PBImageXmlWriter {
+  private final Configuration conf;
+  private final PrintWriter out;
+  private String[] stringTable;
+
+  public PBImageXmlWriter(Configuration conf, PrintWriter out) {
+    this.conf = conf;
+    this.out = out;
+  }
+
+  public void visit(RandomAccessFile file) throws IOException {
+    if (!FSImageUtil.checkFileFormat(file)) {
+      throw new IOException("Unrecognized FSImage");
+    }
+
+    FileSummary summary = FSImageUtil.loadSummary(file);
+    FileInputStream fin = null;
+    try {
+      fin = new FileInputStream(file.getFD());
+      out.print("<?xml version=\"1.0\"?>\n");
+
+      ArrayList<FileSummary.Section> sections = Lists.newArrayList(summary
+          .getSectionsList());
+      Collections.sort(sections, new Comparator<FileSummary.Section>() {
+        @Override
+        public int compare(FileSummary.Section s1, FileSummary.Section s2) {
+          SectionName n1 = SectionName.fromString(s1.getName());
+          SectionName n2 = SectionName.fromString(s2.getName());
+          if (n1 == null) {
+            return n2 == null ? 0 : -1;
+          } else if (n2 == null) {
+            return -1;
+          } else {
+            return n1.ordinal() - n2.ordinal();
+          }
+        }
+      });
+
+      for (FileSummary.Section s : sections) {
+        fin.getChannel().position(s.getOffset());
+        InputStream is = FSImageUtil.wrapInputStreamForCompression(conf,
+            summary.getCodec(), new BufferedInputStream(new LimitInputStream(
+                fin, s.getLength())));
+
+        switch (SectionName.fromString(s.getName())) {
+        case NS_INFO:
+          dumpNameSection(is);
+          break;
+        case STRING_TABLE:
+          loadStringTable(is);
+          break;
+        case INODE:
+          dumpINodeSection(is);
+          break;
+        case INODE_DIR:
+          dumpINodeDirectorySection(is);
+          break;
+        case FILES_UNDERCONSTRUCTION:
+          dumpFileUnderConstructionSection(is);
+          break;
+        case SNAPSHOT:
+          dumpSnapshotSection(is);
+          break;
+        case SNAPSHOT_DIFF:
+          dumpSnapshotDiffSection(is);
+          break;
+        case SECRET_MANAGER:
+          dumpSecretManagerSection(is);
+          break;
+        case CACHE_MANAGER:
+          dumpCacheManagerSection(is);
+          break;
+        default:
+          break;
+        }
+      }
+    } finally {
+      IOUtils.cleanup(null, fin);
+    }
+  }
+
+  private void dumpCacheManagerSection(InputStream is) throws IOException {
+    out.print("<CacheManagerSection>");
+    CacheManagerSection s = CacheManagerSection.parseDelimitedFrom(is);
+    o("nextDirectiveId", s.getNextDirectiveId());
+    for (int i = 0; i < s.getNumPools(); ++i) {
+      CachePoolInfoProto p = CachePoolInfoProto.parseDelimitedFrom(is);
+      out.print("<pool>");
+      o("poolName", p.getPoolName()).o("ownerName", p.getOwnerName())
+          .o("groupName", p.getGroupName()).o("mode", p.getMode())
+          .o("limit", p.getLimit())
+          .o("maxRelativeExpiry", p.getMaxRelativeExpiry());
+      out.print("</pool>\n");
+    }
+    for (int i = 0; i < s.getNumPools(); ++i) {
+      CacheDirectiveInfoProto p = CacheDirectiveInfoProto
+          .parseDelimitedFrom(is);
+      out.print("<directive>");
+      o("id", p.getId()).o("path", p.getPath())
+          .o("replication", p.getReplication()).o("pool", p.getPool());
+      out.print("<expiration>");
+      CacheDirectiveInfoExpirationProto e = p.getExpiration();
+      o("millis", e.getMillis()).o("relatilve", e.getIsRelative());
+      out.print("</expiration>\n");
+      out.print("</directive>\n");
+    }
+    out.print("</CacheManagerSection>\n");
+
+  }
+
+  private void dumpFileUnderConstructionSection(InputStream in)
+      throws IOException {
+    out.print("<FileUnderConstructionSection>");
+    while (true) {
+      FileUnderConstructionEntry e = FileUnderConstructionEntry
+          .parseDelimitedFrom(in);
+      if (e == null) {
+        break;
+      }
+      out.print("<inode>");
+      o("id", e.getInodeId()).o("path", e.getFullPath());
+      out.print("</inode>\n");
+    }
+    out.print("</FileUnderConstructionSection>\n");
+  }
+
+  private void dumpINodeDirectory(INodeDirectory d) {
+    o("mtime", d.getModificationTime()).o("permission",
+        dumpPermission(d.getPermission()));
+
+    if (d.hasDsQuota() && d.hasNsQuota()) {
+      o("nsquota", d.getNsQuota()).o("dsquota", d.getDsQuota());
+    }
+  }
+
+  private void dumpINodeDirectorySection(InputStream in) throws IOException {
+    out.print("<INodeDirectorySection>");
+    while (true) {
+      INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry
+          .parseDelimitedFrom(in);
+      // note that in is a LimitedInputStream
+      if (e == null) {
+        break;
+      }
+      out.print("<directory>");
+      o("parent", e.getParent());
+      for (long id : e.getChildrenList()) {
+        o("inode", id);
+      }
+      for (int i = 0; i < e.getNumOfRef(); i++) {
+        INodeSection.INodeReference r = INodeSection.INodeReference
+            .parseDelimitedFrom(in);
+        dumpINodeReference(r);
+
+      }
+      out.print("</directory>\n");
+    }
+    out.print("</INodeDirectorySection>\n");
+  }
+
+  private void dumpINodeReference(INodeSection.INodeReference r) {
+    out.print("<ref>");
+    o("referredId", r.getReferredId()).o("name", r.getName().toStringUtf8())
+        .o("dstSnapshotId", r.getDstSnapshotId())
+        .o("lastSnapshotId", r.getLastSnapshotId());
+    out.print("</ref>\n");
+  }
+
+  private void dumpINodeFile(INodeSection.INodeFile f) {
+    o("replication", f.getReplication()).o("mtime", f.getModificationTime())
+        .o("atime", f.getAccessTime())
+        .o("perferredBlockSize", f.getPreferredBlockSize())
+        .o("permission", dumpPermission(f.getPermission()));
+
+    if (f.getBlocksCount() > 0) {
+      out.print("<blocks>");
+      for (BlockProto b : f.getBlocksList()) {
+        out.print("<block>");
+        o("id", b.getBlockId()).o("genstamp", b.getGenStamp()).o("numBytes",
+            b.getNumBytes());
+        out.print("</block>\n");
+      }
+      out.print("</blocks>\n");
+    }
+
+    if (f.hasFileUC()) {
+      INodeSection.FileUnderConstructionFeature u = f.getFileUC();
+      out.print("<file-under-construction>");
+      o("clientName", u.getClientName()).o("clientMachine",
+          u.getClientMachine());
+      out.print("</file-under-construction>\n");
+    }
+  }
+
+  private void dumpINodeSection(InputStream in) throws IOException {
+    INodeSection s = INodeSection.parseDelimitedFrom(in);
+    out.print("<INodeSection>");
+    o("lastInodeId", s.getLastInodeId());
+    for (int i = 0; i < s.getNumInodes(); ++i) {
+      INodeSection.INode p = INodeSection.INode.parseDelimitedFrom(in);
+      out.print("<inode>");
+      o("id", p.getId()).o("type", p.getType()).o("name",
+          p.getName().toStringUtf8());
+
+      if (p.hasFile()) {
+        dumpINodeFile(p.getFile());
+      } else if (p.hasDirectory()) {
+        dumpINodeDirectory(p.getDirectory());
+      } else if (p.hasSymlink()) {
+        dumpINodeSymlink(p.getSymlink());
+      }
+
+      out.print("</inode>\n");
+    }
+    out.print("</INodeSection>\n");
+  }
+
+  private void dumpINodeSymlink(INodeSymlink s) {
+    o("permission", dumpPermission(s.getPermission())).o("target",
+        s.getTarget().toStringUtf8());
+  }
+
+  private void dumpNameSection(InputStream in) throws IOException {
+    NameSystemSection s = NameSystemSection.parseDelimitedFrom(in);
+    out.print("<NameSection>\n");
+    o("genstampV1", s.getGenstampV1()).o("genstampV2", s.getGenstampV2())
+        .o("genstampV1Limit", s.getGenstampV1Limit())
+        .o("lastAllocatedBlockId", s.getLastAllocatedBlockId())
+        .o("txid", s.getTransactionId());
+    out.print("<NameSection>\n");
+  }
+
+  private String dumpPermission(long permission) {
+    return FSImageFormatPBINode.Loader.loadPermission(permission, stringTable)
+        .toString();
+  }
+
+  private void dumpSecretManagerSection(InputStream is) throws IOException {
+    out.print("<SecretManagerSection>");
+    SecretManagerSection s = SecretManagerSection.parseDelimitedFrom(is);
+    o("currentId", s.getCurrentId()).o("tokenSequenceNumber",
+        s.getTokenSequenceNumber());
+    out.print("</SecretManagerSection>");
+  }
+
+  private void dumpSnapshotDiffSection(InputStream in) throws IOException {
+    out.print("<SnapshotDiffSection>");
+    while (true) {
+      SnapshotDiffSection.DiffEntry e = SnapshotDiffSection.DiffEntry
+          .parseDelimitedFrom(in);
+      if (e == null) {
+        break;
+      }
+      out.print("<diff>");
+      o("inodeid", e.getInodeId());
+      switch (e.getType()) {
+      case FILEDIFF: {
+        for (int i = 0; i < e.getNumOfDiff(); ++i) {
+          out.print("<filediff>");
+          SnapshotDiffSection.FileDiff f = SnapshotDiffSection.FileDiff
+              .parseDelimitedFrom(in);
+          o("snapshotId", f.getSnapshotId()).o("size", f.getFileSize()).o(
+              "name", f.getName().toStringUtf8());
+          out.print("</filediff>\n");
+        }
+      }
+        break;
+      case DIRECTORYDIFF: {
+        for (int i = 0; i < e.getNumOfDiff(); ++i) {
+          out.print("<dirdiff>");
+          SnapshotDiffSection.DirectoryDiff d = SnapshotDiffSection.DirectoryDiff
+              .parseDelimitedFrom(in);
+          o("snapshotId", d.getSnapshotId())
+              .o("isSnapshotroot", d.getIsSnapshotRoot())
+              .o("childrenSize", d.getChildrenSize())
+              .o("name", d.getName().toStringUtf8());
+
+          for (int j = 0; j < d.getCreatedListSize(); ++j) {
+            SnapshotDiffSection.CreatedListEntry ce = SnapshotDiffSection.CreatedListEntry
+                .parseDelimitedFrom(in);
+            out.print("<created>");
+            o("name", ce.getName().toStringUtf8());
+            out.print("</created>\n");
+          }
+          for (int j = 0; j < d.getNumOfDeletedRef(); ++j) {
+            INodeSection.INodeReference r = INodeSection.INodeReference
+                .parseDelimitedFrom(in);
+            dumpINodeReference(r);
+          }
+          out.print("</dirdiff>\n");
+        }
+      }
+        break;
+      default:
+        break;
+      }
+      out.print("</diff>");
+    }
+    out.print("<SnapshotDiffSection>\n");
+  }
+
+  private void dumpSnapshotSection(InputStream in) throws IOException {
+    out.print("<SnapshotSection>");
+    SnapshotSection s = SnapshotSection.parseDelimitedFrom(in);
+    o("snapshotCounter", s.getSnapshotCounter());
+    if (s.getSnapshottableDirCount() > 0) {
+      out.print("<snapshottableDir>");
+      for (long id : s.getSnapshottableDirList()) {
+        o("dir", id);
+      }
+      out.print("</snapshottableDir>\n");
+    }
+    for (int i = 0; i < s.getNumSnapshots(); ++i) {
+      SnapshotSection.Snapshot pbs = SnapshotSection.Snapshot
+          .parseDelimitedFrom(in);
+      o("snapshot", pbs.getSnapshotId());
+    }
+    out.print("</SnapshotSection>\n");
+  }
+
+  private void loadStringTable(InputStream in) throws IOException {
+    StringTableSection s = StringTableSection.parseDelimitedFrom(in);
+    stringTable = new String[s.getNumEntry() + 1];
+    for (int i = 0; i < s.getNumEntry(); ++i) {
+      StringTableSection.Entry e = StringTableSection.Entry
+          .parseDelimitedFrom(in);
+      stringTable[e.getId()] = e.getStr();
+    }
+  }
+
+  private PBImageXmlWriter o(final String e, final Object v) {
+    out.print("<" + e + ">" + v + "</" + e + ">");
+    return this;
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto
new file mode 100644
index 0000000..af7ba87
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.hdfs.server.namenode";
+option java_outer_classname = "FsImageProto";
+
+package hadoop.hdfs.fsimage;
+
+import "hdfs.proto";
+
+/**
+ * This file defines the on-disk layout of the file system image. The
+ * layout is defined by the following EBNF grammar, in which angle
+ * brackets mark protobuf definitions. (e.g., <FileSummary>)
+ *
+ * FILE := MAGIC SECTION* <FileSummary> FileSummaryLength
+ * MAGIC := 'HDFSIMG1'
+ * SECTION := <NameSystemSection> | ...
+ * FileSummaryLength := 4 byte int
+ *
+ * Some notes:
+ *
+ * The codec field in FileSummary describes the compression codec used
+ * for all sections. The fileheader is always uncompressed.
+ *
+ * All protobuf messages are serialized in delimited form, which means
+ * that there always will be an integer indicates the size of the
+ * protobuf message.
+ *
+ */
+
+message FileSummary {
+  // The version of the above EBNF grammars.
+  required uint32 ondiskVersion = 1;
+  // layoutVersion describes which features are available in the
+  // FSImage.
+  required uint32 layoutVersion = 2;
+  optional string codec         = 3;
+  // index for each section
+  message Section {
+    optional string name = 1;
+    optional uint64 length = 2;
+    optional uint64 offset = 3;
+  }
+  repeated Section sections = 4;
+}
+
+/**
+ * Name: NS_INFO
+ */
+message NameSystemSection {
+  optional uint32 namespaceId = 1;
+  optional uint64 genstampV1 = 2;
+  optional uint64 genstampV2 = 3;
+  optional uint64 genstampV1Limit = 4;
+  optional uint64 lastAllocatedBlockId = 5;
+  optional uint64 transactionId = 6;
+}
+
+/**
+ * Permission is serialized as a 64-bit long. [0:24):[25:48):[48:64) (in Big Endian).
+ * The first and the second parts are the string ids of the user and
+ * group name, and the last 16 bits are the permission bits.
+ *
+ * Name: INODE
+ */
+message INodeSection {
+  /**
+   * under-construction feature for INodeFile
+   */
+  message FileUnderConstructionFeature {
+    optional string clientName = 1;
+    optional string clientMachine = 2;
+  }
+
+  message INodeFile {
+    optional uint32 replication = 1;
+    optional uint64 modificationTime = 2;
+    optional uint64 accessTime = 3;
+    optional uint64 preferredBlockSize = 4;
+    optional fixed64 permission = 5;
+    repeated BlockProto blocks = 6;
+    optional FileUnderConstructionFeature fileUC = 7;
+  }
+
+  message INodeDirectory {
+    optional uint64 modificationTime = 1;
+    // namespace quota
+    optional uint64 nsQuota = 2;
+    // diskspace quota
+    optional uint64 dsQuota = 3;
+    optional fixed64 permission = 4;
+  }
+
+  message INodeSymlink {
+    optional fixed64 permission = 1;
+    optional bytes target = 2;
+  }
+
+  message INodeReference {
+    // id of the referred inode
+    optional uint64 referredId = 1;
+    // local name recorded in WithName
+    optional bytes name = 2;
+    // recorded in DstReference
+    optional uint32 dstSnapshotId = 3;
+    // recorded in WithName
+    optional uint32 lastSnapshotId = 4;
+  }
+
+  message INode {
+    enum Type {
+      FILE = 1;
+      DIRECTORY = 2;
+      SYMLINK = 3;
+    };
+    required Type type = 1;
+    required uint64 id = 2;
+    optional bytes name = 3;
+
+    optional INodeFile file = 4;
+    optional INodeDirectory directory = 5;
+    optional INodeSymlink symlink = 6;
+  }
+
+  optional uint64 lastInodeId = 1;
+  optional uint64 numInodes = 2;
+  // repeated INodes..
+}
+
+/**
+ * This section records information about under-construction files for
+ * reconstructing the lease map.
+ * NAME: FILES_UNDERCONSTRUCTION
+ */
+message FilesUnderConstructionSection {
+  message FileUnderConstructionEntry {
+    optional uint64 inodeId = 1;
+    optional string fullPath = 2;
+  }
+  // repeated FileUnderConstructionEntry...
+}
+
+/**
+ * This section records the children of each directories
+ * NAME: INODE_DIR
+ */
+message INodeDirectorySection {
+  message DirEntry {
+    optional uint64 parent = 1;
+    repeated uint64 children = 2 [packed = true];
+    optional uint64 numOfRef = 3;
+    // repeated INodeReference...
+  }
+  // repeated DirEntry, ended at the boundary of the section.
+}
+
+/**
+ * This section records the information about snapshot
+ * NAME: SNAPSHOT
+ */
+message SnapshotSection {
+  message Snapshot {
+    optional uint32 snapshotId = 1;
+    // Snapshot root
+    optional INodeSection.INode root = 2;
+  }
+
+  optional uint32 snapshotCounter = 1;
+  repeated uint64 snapshottableDir = 2 [packed = true];
+  // total number of snapshots
+  optional uint32 numSnapshots = 3;
+  // repeated Snapshot...
+}
+
+/**
+ * This section records information about snapshot diffs
+ * NAME: SNAPSHOT_DIFF
+ */
+message SnapshotDiffSection {
+  message CreatedListEntry {
+    optional bytes name = 1;
+  }
+
+  message DirectoryDiff {
+    optional uint32 snapshotId = 1;
+    optional uint32 childrenSize = 2;
+    optional bool isSnapshotRoot = 3;
+    optional bytes name = 4;
+    optional INodeSection.INodeDirectory snapshotCopy = 5;
+    optional uint32 createdListSize = 6;
+    optional uint32 numOfDeletedRef = 7; // number of reference nodes in deleted list
+    repeated uint64 deletedINode = 8 [packed = true]; // id of deleted inode
+    // repeated CreatedListEntry (size is specified by createdListSize)
+    // repeated INodeReference (reference inodes in deleted list)
+  }
+
+  message FileDiff {
+    optional uint32 snapshotId = 1;
+    optional uint64 fileSize = 2;
+    optional bytes name = 3;
+    optional INodeSection.INodeFile snapshotCopy = 4;
+  }
+
+  message DiffEntry {
+    enum Type {
+      FILEDIFF = 1;
+      DIRECTORYDIFF = 2;
+    }
+    required Type type = 1;
+    optional uint64 inodeId = 2;
+    optional uint32 numOfDiff = 3;
+
+    // repeated DirectoryDiff or FileDiff
+  }
+
+  // repeated DiffEntry
+}
+
+/**
+ * This section maps string to id
+ * NAME: STRING_TABLE
+ */
+message StringTableSection {
+  message Entry {
+    optional uint32 id = 1;
+    optional string str = 2;
+  }
+  optional uint32 numEntry = 1;
+  // repeated Entry
+}
+
+message SecretManagerSection {
+  message DelegationKey {
+    optional uint32 id         = 1;
+    optional uint64 expiryDate = 2;
+    optional bytes  key        = 3;
+  }
+  message PersistToken {
+    optional uint32 version        = 1;
+    optional string owner          = 2;
+    optional string renewer        = 3;
+    optional string realUser       = 4;
+    optional uint64 issueDate      = 5;
+    optional uint64 maxDate        = 6;
+    optional uint32 sequenceNumber = 7;
+    optional uint32 masterKeyId    = 8;
+    optional uint64 expiryDate     = 9;
+  }
+  optional uint32 currentId = 1;
+  optional uint32 tokenSequenceNumber = 2;
+  optional uint32 numKeys = 3;
+  optional uint32 numTokens = 4;
+  // repeated DelegationKey keys
+  // repeated PersistToken tokens
+}
+
+message CacheManagerSection {
+  required uint64 nextDirectiveId = 1;
+  required uint32 numPools        = 2;
+  required uint32 numDirectives   = 3;
+  // repeated CachePoolInfoProto pools
+  // repeated CacheDirectiveInfoProto directives
+}
+
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
new file mode 100644
index 0000000..552b091
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.EnumSet;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
+import org.apache.hadoop.hdfs.util.MD5FileUtils;
+import org.junit.Test;
+
+public class TestFSImage {
+
+  @Test
+  public void testPersist() throws IOException {
+    Configuration conf = new Configuration();
+    testPersistHelper(conf);
+  }
+
+  @Test
+  public void testCompression() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
+    conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
+        "org.apache.hadoop.io.compress.GzipCodec");
+    testPersistHelper(conf);
+  }
+
+  private void testPersistHelper(Configuration conf) throws IOException {
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).build();
+      cluster.waitActive();
+      FSNamesystem fsn = cluster.getNamesystem();
+      DistributedFileSystem fs = cluster.getFileSystem();
+
+      final Path dir = new Path("/abc/def");
+      final Path file1 = new Path(dir, "f1");
+      final Path file2 = new Path(dir, "f2");
+
+      // create an empty file f1
+      fs.create(file1).close();
+
+      // create an under-construction file f2
+      FSDataOutputStream out = fs.create(file2);
+      out.writeBytes("hello");
+      ((DFSOutputStream) out.getWrappedStream()).hsync(EnumSet
+          .of(SyncFlag.UPDATE_LENGTH));
+
+      // checkpoint
+      fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+      fs.saveNamespace();
+      fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+
+      cluster.restartNameNode();
+      cluster.waitActive();
+      fs = cluster.getFileSystem();
+
+      assertTrue(fs.isDirectory(dir));
+      assertTrue(fs.exists(file1));
+      assertTrue(fs.exists(file2));
+
+      // check internals of file2
+      INodeFile file2Node = fsn.dir.getINode4Write(file2.toString()).asFile();
+      assertEquals("hello".length(), file2Node.computeFileSize());
+      assertTrue(file2Node.isUnderConstruction());
+      BlockInfo[] blks = file2Node.getBlocks();
+      assertEquals(1, blks.length);
+      assertEquals(BlockUCState.UNDER_CONSTRUCTION, blks[0].getBlockUCState());
+      // check lease manager
+      Lease lease = fsn.leaseManager.getLeaseByPath(file2.toString());
+      Assert.assertNotNull(lease);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Ensure that the digest written by the saver equals to the digest of the
+   * file.
+   */
+  @Test
+  public void testDigest() throws IOException {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
+      fs.saveNamespace();
+      fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
+      File currentDir = FSImageTestUtil.getNameNodeCurrentDirs(cluster, 0).get(
+          0);
+      File fsimage = FSImageTestUtil.findNewestImageFile(currentDir
+          .getAbsolutePath());
+      assertEquals(MD5FileUtils.readStoredMd5ForFile(fsimage),
+          MD5FileUtils.computeMd5ForFile(fsimage));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
index 21935d0..f3cbf15 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageWithSnapshot.java
@@ -140,7 +140,7 @@
   private File saveFSImageToTempFile() throws IOException {
     SaveNamespaceContext context = new SaveNamespaceContext(fsn, txid,
         new Canceler());
-    FSImageFormat.Saver saver = new FSImageFormat.Saver(context);
+    FSImageFormatProtobuf.Saver saver = new FSImageFormatProtobuf.Saver(context);
     FSImageCompression compression = FSImageCompression.createCompression(conf);
     File imageFile = getImageFile(testDir, txid);
     fsn.readLock();
@@ -154,7 +154,7 @@
   
   /** Load the fsimage from a temp file */
   private void loadFSImageFromTempFile(File imageFile) throws IOException {
-    FSImageFormat.Loader loader = new FSImageFormat.Loader(conf, fsn);
+    FSImageFormat.LoaderDelegator loader = FSImageFormat.newLoader(conf, fsn);
     fsn.writeLock();
     fsn.getFSDirectory().writeLock();
     try {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
index 3ff5d54..0ca112d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java
@@ -287,7 +287,6 @@
     doEdits(0, 1000);
     nn0.getRpcServer().rollEditLog();
     answerer.waitForCall();
-    answerer.proceed();
     assertTrue("SBN is not performing checkpoint but it should be.",
         answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
     
@@ -306,6 +305,7 @@
     // RPC to the SBN happened during the checkpoint.
     assertTrue("SBN should have still been checkpointing.",
         answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
+    answerer.proceed();
     answerer.waitForResult();
     assertTrue("SBN should have finished checkpointing.",
         answerer.getFireCount() == 1 && answerer.getResultCount() == 1);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
index 7fe8087..d4e8879 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
@@ -73,7 +73,6 @@
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
-;
 
 /** Testing rename with snapshots. */
 public class TestRenameWithSnapshots {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java
index 27228bd..20cc135 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshot.java
@@ -25,6 +25,9 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.RandomAccessFile;
+import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
@@ -53,8 +56,7 @@
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper.TestDirectoryTree;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper.TestDirectoryTree.Node;
-import org.apache.hadoop.hdfs.tools.offlineImageViewer.OfflineImageViewer;
-import org.apache.hadoop.hdfs.tools.offlineImageViewer.XmlImageVisitor;
+import org.apache.hadoop.hdfs.tools.offlineImageViewer.PBImageXmlWriter;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
@@ -245,8 +247,8 @@
    * snapshots
    */
   @Test
-  public void testOfflineImageViewer() throws Throwable {
-    runTestSnapshot(SNAPSHOT_ITERATION_NUMBER);
+  public void testOfflineImageViewer() throws Exception {
+    runTestSnapshot(1);
     
     // retrieve the fsimage. Note that we already save namespace to fsimage at
     // the end of each iteration of runTestSnapshot.
@@ -254,31 +256,10 @@
         FSImageTestUtil.getFSImage(
         cluster.getNameNode()).getStorage().getStorageDir(0));
     assertNotNull("Didn't generate or can't find fsimage", originalFsimage);
-    
-    String ROOT = System.getProperty("test.build.data", "build/test/data");
-    File testFile = new File(ROOT, "/image");
-    String xmlImage = ROOT + "/image_xml";
-    boolean success = false;
-    
-    try {
-      DFSTestUtil.copyFile(originalFsimage, testFile);
-      XmlImageVisitor v = new XmlImageVisitor(xmlImage, true);
-      OfflineImageViewer oiv = new OfflineImageViewer(testFile.getPath(), v,
-          true);
-      oiv.go();
-      success = true;
-    } finally {
-      if (testFile.exists()) {
-        testFile.delete();
-      }
-      // delete the xml file if the parsing is successful
-      if (success) {
-        File xmlImageFile = new File(xmlImage);
-        if (xmlImageFile.exists()) {
-          xmlImageFile.delete();
-        }
-      }
-    }
+    StringWriter output = new StringWriter();
+    PrintWriter o = new PrintWriter(output);
+    PBImageXmlWriter v = new PBImageXmlWriter(new Configuration(), o);
+    v.visit(new RandomAccessFile(originalFsimage, "r"));
   }
 
   private void runTestSnapshot(int iteration) throws Exception {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java
index 11aa3b8..91a5c15 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java
@@ -20,23 +20,20 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.io.RandomAccessFile;
+import java.io.StringWriter;
 import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -46,27 +43,29 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.test.PathUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
-
+import org.junit.rules.TemporaryFolder;
 
 /**
- * Test function of OfflineImageViewer by:
- *   * confirming it can correctly process a valid fsimage file and that
- *     the processing generates a correct representation of the namespace
- *   * confirming it correctly fails to process an fsimage file with a layout
- *     version it shouldn't be able to handle
- *   * confirm it correctly bails on malformed image files, in particular, a
- *     file that ends suddenly.
+ * Test function of OfflineImageViewer by: * confirming it can correctly process
+ * a valid fsimage file and that the processing generates a correct
+ * representation of the namespace * confirming it correctly fails to process an
+ * fsimage file with a layout version it shouldn't be able to handle * confirm
+ * it correctly bails on malformed image files, in particular, a file that ends
+ * suddenly.
  */
 public class TestOfflineImageViewer {
   private static final Log LOG = LogFactory.getLog(OfflineImageViewer.class);
@@ -76,22 +75,22 @@
   private static File originalFsimage = null;
 
   // Elements of lines of ls-file output to be compared to FileStatus instance
-  private static class LsElements {
-    public String perms;
-    public int replication;
-    public String username;
-    public String groupname;
-    public long filesize;
-    public char dir; // d if dir, - otherwise
+  private static final class LsElements {
+    private String perms;
+    private int replication;
+    private String username;
+    private String groupname;
+    private long filesize;
+    private boolean isDir;
   }
-  
+
   // namespace as written to dfs, to be compared with viewer's output
-  final static HashMap<String, FileStatus> writtenFiles = 
-      new HashMap<String, FileStatus>();
-  
-  private static String ROOT = PathUtils.getTestDirName(TestOfflineImageViewer.class);
-  
-  // Create a populated namespace for later testing.  Save its contents to a
+  final static HashMap<String, FileStatus> writtenFiles = new HashMap<String, FileStatus>();
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  // Create a populated namespace for later testing. Save its contents to a
   // data structure and store its fsimage location.
   // We only want to generate the fsimage file once and use it for
   // multiple tests.
@@ -100,35 +99,39 @@
     MiniDFSCluster cluster = null;
     try {
       Configuration conf = new HdfsConfiguration();
-      conf.setLong(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY, 10000);
-      conf.setLong(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, 5000);
-      conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
+      conf.setLong(
+          DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY, 10000);
+      conf.setLong(
+          DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, 5000);
+      conf.setBoolean(
+          DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
       conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL,
           "RULE:[2:$1@$0](JobTracker@.*FOO.COM)s/@.*//" + "DEFAULT");
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
       cluster.waitActive();
       FileSystem hdfs = cluster.getFileSystem();
-      
+
       int filesize = 256;
-      
-      // Create a reasonable namespace 
-      for(int i = 0; i < NUM_DIRS; i++)  {
+
+      // Create a reasonable namespace
+      for (int i = 0; i < NUM_DIRS; i++) {
         Path dir = new Path("/dir" + i);
         hdfs.mkdirs(dir);
         writtenFiles.put(dir.toString(), pathToFileEntry(hdfs, dir.toString()));
-        for(int j = 0; j < FILES_PER_DIR; j++) {
+        for (int j = 0; j < FILES_PER_DIR; j++) {
           Path file = new Path(dir, "file" + j);
           FSDataOutputStream o = hdfs.create(file);
-          o.write(new byte[ filesize++ ]);
+          o.write(new byte[filesize++]);
           o.close();
-          
-          writtenFiles.put(file.toString(), pathToFileEntry(hdfs, file.toString()));
+
+          writtenFiles.put(file.toString(),
+              pathToFileEntry(hdfs, file.toString()));
         }
       }
 
       // Get delegation tokens so we log the delegation token op
-      Token<?>[] delegationTokens = 
-          hdfs.addDelegationTokens(TEST_RENEWER, null);
+      Token<?>[] delegationTokens = hdfs
+          .addDelegationTokens(TEST_RENEWER, null);
       for (Token<?> t : delegationTokens) {
         LOG.debug("got token " + t);
       }
@@ -137,329 +140,113 @@
       cluster.getNameNodeRpc()
           .setSafeMode(SafeModeAction.SAFEMODE_ENTER, false);
       cluster.getNameNodeRpc().saveNamespace();
-      
+
       // Determine location of fsimage file
-      originalFsimage = FSImageTestUtil.findLatestImageFile(
-          FSImageTestUtil.getFSImage(
-          cluster.getNameNode()).getStorage().getStorageDir(0));
+      originalFsimage = FSImageTestUtil.findLatestImageFile(FSImageTestUtil
+          .getFSImage(cluster.getNameNode()).getStorage().getStorageDir(0));
       if (originalFsimage == null) {
         throw new RuntimeException("Didn't generate or can't find fsimage");
       }
       LOG.debug("original FS image file is " + originalFsimage);
     } finally {
-      if(cluster != null)
+      if (cluster != null)
         cluster.shutdown();
     }
   }
-  
+
   @AfterClass
   public static void deleteOriginalFSImage() throws IOException {
-    if(originalFsimage != null && originalFsimage.exists()) {
+    if (originalFsimage != null && originalFsimage.exists()) {
       originalFsimage.delete();
     }
   }
-  
-  // Convenience method to generate a file status from file system for 
+
+  // Convenience method to generate a file status from file system for
   // later comparison
-  private static FileStatus pathToFileEntry(FileSystem hdfs, String file) 
-        throws IOException {
+  private static FileStatus pathToFileEntry(FileSystem hdfs, String file)
+      throws IOException {
     return hdfs.getFileStatus(new Path(file));
   }
-  
-  // Verify that we can correctly generate an ls-style output for a valid 
+
+  // Verify that we can correctly generate an ls-style output for a valid
   // fsimage
   @Test
   public void outputOfLSVisitor() throws IOException {
-    File testFile = new File(ROOT, "/basicCheck");
-    File outputFile = new File(ROOT, "/basicCheckOutput");
-    
-    try {
-      DFSTestUtil.copyFile(originalFsimage, testFile);
-      
-      ImageVisitor v = new LsImageVisitor(outputFile.getPath(), true);
-      OfflineImageViewer oiv = new OfflineImageViewer(testFile.getPath(), v, false);
-
-      oiv.go();
-      
-      HashMap<String, LsElements> fileOutput = readLsfile(outputFile);
-      
-      compareNamespaces(writtenFiles, fileOutput);
-    } finally {
-      if(testFile.exists()) testFile.delete();
-      if(outputFile.exists()) outputFile.delete();
-    }
-    LOG.debug("Correctly generated ls-style output.");
-  }
-  
-  // Confirm that attempting to read an fsimage file with an unsupported
-  // layout results in an error
-  @Test
-  public void unsupportedFSLayoutVersion() throws IOException {
-    File testFile = new File(ROOT, "/invalidLayoutVersion");
-    File outputFile = new File(ROOT, "invalidLayoutVersionOutput");
-    
-    try {
-      int badVersionNum = -432;
-      changeLayoutVersion(originalFsimage, testFile, badVersionNum);
-      ImageVisitor v = new LsImageVisitor(outputFile.getPath(), true);
-      OfflineImageViewer oiv = new OfflineImageViewer(testFile.getPath(), v, false);
-      
-      try {
-        oiv.go();
-        fail("Shouldn't be able to read invalid laytout version");
-      } catch(IOException e) {
-        if(!e.getMessage().contains(Integer.toString(badVersionNum)))
-          throw e; // wasn't error we were expecting
-        LOG.debug("Correctly failed at reading bad image version.");
+    StringWriter output = new StringWriter();
+    PrintWriter out = new PrintWriter(output);
+    LsrPBImage v = new LsrPBImage(new Configuration(), out);
+    v.visit(new RandomAccessFile(originalFsimage, "r"));
+    out.close();
+    Pattern pattern = Pattern
+        .compile("([d\\-])([rwx\\-]{9})\\s*(-|\\d+)\\s*(\\w+)\\s*(\\w+)\\s*(\\d+)\\s*(\\d+)\\s*([\b/]+)");
+    int count = 0;
+    for (String s : output.toString().split("\n")) {
+      Matcher m = pattern.matcher(s);
+      assertTrue(m.find());
+      LsElements e = new LsElements();
+      e.isDir = m.group(1).equals("d");
+      e.perms = m.group(2);
+      e.replication = m.group(3).equals("-") ? 0 : Integer.parseInt(m.group(3));
+      e.username = m.group(4);
+      e.groupname = m.group(5);
+      e.filesize = Long.parseLong(m.group(7));
+      String path = m.group(8);
+      if (!path.equals("/")) {
+        compareFiles(writtenFiles.get(path), e);
       }
-    } finally {
-      if(testFile.exists()) testFile.delete();
-      if(outputFile.exists()) outputFile.delete();
+      ++count;
     }
+    assertEquals(writtenFiles.size() + 1, count);
   }
-  
-  // Verify that image viewer will bail on a file that ends unexpectedly
-  @Test
-  public void truncatedFSImage() throws IOException {
-    File testFile = new File(ROOT, "/truncatedFSImage");
-    File outputFile = new File(ROOT, "/trucnatedFSImageOutput");
-    try {
-      copyPartOfFile(originalFsimage, testFile);
-      assertTrue("Created truncated fsimage", testFile.exists());
-      
-      ImageVisitor v = new LsImageVisitor(outputFile.getPath(), true);
-      OfflineImageViewer oiv = new OfflineImageViewer(testFile.getPath(), v, false);
 
-      try {
-        oiv.go();
-        fail("Managed to process a truncated fsimage file");
-      } catch (EOFException e) {
-        LOG.debug("Correctly handled EOF");
-      }
-
-    } finally {
-      if(testFile.exists()) testFile.delete();
-      if(outputFile.exists()) outputFile.delete();
-    }
+  @Test(expected = IOException.class)
+  public void testTruncatedFSImage() throws IOException {
+    File truncatedFile = folder.newFile();
+    StringWriter output = new StringWriter();
+    copyPartOfFile(originalFsimage, truncatedFile);
+    new FileDistributionCalculator(new Configuration(), 0, 0, new PrintWriter(
+        output)).visit(new RandomAccessFile(truncatedFile, "r"));
   }
-  
-  // Test that our ls file has all the same compenents of the original namespace
-  private void compareNamespaces(HashMap<String, FileStatus> written,
-      HashMap<String, LsElements> fileOutput) {
-    assertEquals( "Should be the same number of files in both, plus one for root"
-            + " in fileoutput", fileOutput.keySet().size(), 
-                                written.keySet().size() + 1);
-    Set<String> inFile = fileOutput.keySet();
 
-    // For each line in the output file, verify that the namespace had a
-    // filestatus counterpart 
-    for (String path : inFile) {
-      if (path.equals("/")) // root's not included in output from system call
-        continue;
-
-      assertTrue("Path in file (" + path + ") was written to fs", written
-          .containsKey(path));
-      
-      compareFiles(written.get(path), fileOutput.get(path));
-      
-      written.remove(path);
-    }
-
-    assertEquals("No more files were written to fs", 0, written.size());
-  }
-  
   // Compare two files as listed in the original namespace FileStatus and
   // the output of the ls file from the image processor
   private void compareFiles(FileStatus fs, LsElements elements) {
-    assertEquals("directory listed as such",  
-                 fs.isDirectory() ? 'd' : '-', elements.dir);
-    assertEquals("perms string equal", 
-                                fs.getPermission().toString(), elements.perms);
+    assertEquals("directory listed as such", fs.isDirectory(), elements.isDir);
+    assertEquals("perms string equal", fs.getPermission().toString(),
+        elements.perms);
     assertEquals("replication equal", fs.getReplication(), elements.replication);
     assertEquals("owner equal", fs.getOwner(), elements.username);
     assertEquals("group equal", fs.getGroup(), elements.groupname);
     assertEquals("lengths equal", fs.getLen(), elements.filesize);
   }
 
-  // Read the contents of the file created by the Ls processor
-  private HashMap<String, LsElements> readLsfile(File lsFile) throws IOException {
-    BufferedReader br = new BufferedReader(new FileReader(lsFile));
-    String line = null;
-    HashMap<String, LsElements> fileContents = new HashMap<String, LsElements>();
-    
-    while((line = br.readLine()) != null) 
-      readLsLine(line, fileContents);
-    
-    br.close();
-    return fileContents;
-  }
-  
-  // Parse a line from the ls output.  Store permissions, replication, 
-  // username, groupname and filesize in hashmap keyed to the path name
-  private void readLsLine(String line, HashMap<String, LsElements> fileContents) {
-    String elements [] = line.split("\\s+");
-    
-    assertEquals("Not enough elements in ls output", 8, elements.length);
-    
-    LsElements lsLine = new LsElements();
-    
-    lsLine.dir = elements[0].charAt(0);
-    lsLine.perms = elements[0].substring(1);
-    lsLine.replication = elements[1].equals("-") 
-                                             ? 0 : Integer.valueOf(elements[1]);
-    lsLine.username = elements[2];
-    lsLine.groupname = elements[3];
-    lsLine.filesize = Long.valueOf(elements[4]);
-    // skipping date and time 
-    
-    String path = elements[7];
-    
-    // Check that each file in the ls output was listed once
-    assertFalse("LS file had duplicate file entries", 
-        fileContents.containsKey(path));
-    
-    fileContents.put(path, lsLine);
-  }
-  
-  // Copy one fsimage to another, changing the layout version in the process
-  private void changeLayoutVersion(File src, File dest, int newVersion) 
-         throws IOException {
-    DataInputStream in = null; 
-    DataOutputStream out = null; 
-    
-    try {
-      in = new DataInputStream(new FileInputStream(src));
-      out = new DataOutputStream(new FileOutputStream(dest));
-      
-      in.readInt();
-      out.writeInt(newVersion);
-      
-      byte [] b = new byte[1024];
-      while( in.read(b)  > 0 ) {
-        out.write(b);
-      }
-    } finally {
-      if(in != null) in.close();
-      if(out != null) out.close();
-    }
-  }
-  
-  // Only copy part of file into the other.  Used for testing truncated fsimage
   private void copyPartOfFile(File src, File dest) throws IOException {
-    InputStream in = null;
-    OutputStream out = null;
-    
-    byte [] b = new byte[256];
-    int bytesWritten = 0;
-    int count;
-    int maxBytes = 700;
-    
+    FileInputStream in = null;
+    FileOutputStream out = null;
+    final int MAX_BYTES = 700;
     try {
       in = new FileInputStream(src);
       out = new FileOutputStream(dest);
-      
-      while( (count = in.read(b))  > 0 && bytesWritten < maxBytes ) {
-        out.write(b);
-        bytesWritten += count;
-      } 
+      in.getChannel().transferTo(0, MAX_BYTES, out.getChannel());
     } finally {
-      if(in != null) in.close();
-      if(out != null) out.close();
+      IOUtils.cleanup(null, in);
+      IOUtils.cleanup(null, out);
     }
   }
 
   @Test
-  public void outputOfFileDistributionVisitor() throws IOException {
-    File testFile = new File(ROOT, "/basicCheck");
-    File outputFile = new File(ROOT, "/fileDistributionCheckOutput");
+  public void testFileDistributionVisitor() throws IOException {
+    StringWriter output = new StringWriter();
+    PrintWriter o = new PrintWriter(output);
+    new FileDistributionCalculator(new Configuration(), 0, 0, o)
+        .visit(new RandomAccessFile(originalFsimage, "r"));
+    o.close();
 
-    int totalFiles = 0;
-    BufferedReader reader = null;
-    try {
-      DFSTestUtil.copyFile(originalFsimage, testFile);
-      ImageVisitor v = new FileDistributionVisitor(outputFile.getPath(), 0, 0);
-      OfflineImageViewer oiv = 
-        new OfflineImageViewer(testFile.getPath(), v, false);
+    Pattern p = Pattern.compile("totalFiles = (\\d+)\n");
+    Matcher matcher = p.matcher(output.getBuffer());
 
-      oiv.go();
-
-      reader = new BufferedReader(new FileReader(outputFile));
-      String line = reader.readLine();
-      assertEquals(line, "Size\tNumFiles");
-      while((line = reader.readLine()) != null) {
-        String[] row = line.split("\t");
-        assertEquals(row.length, 2);
-        totalFiles += Integer.parseInt(row[1]);
-      }
-    } finally {
-      if (reader != null) {
-        reader.close();
-      }
-      if(testFile.exists()) testFile.delete();
-      if(outputFile.exists()) outputFile.delete();
-    }
+    assertTrue(matcher.find() && matcher.groupCount() == 1);
+    int totalFiles = Integer.parseInt(matcher.group(1));
     assertEquals(totalFiles, NUM_DIRS * FILES_PER_DIR);
   }
-  
-  private static class TestImageVisitor extends ImageVisitor {
-    private List<String> delegationTokenRenewers = new LinkedList<String>();
-    TestImageVisitor() {
-    }
-    
-    List<String> getDelegationTokenRenewers() {
-      return delegationTokenRenewers;
-    }
-
-    @Override
-    void start() throws IOException {
-    }
-
-    @Override
-    void finish() throws IOException {
-    }
-
-    @Override
-    void finishAbnormally() throws IOException {
-    }
-
-    @Override
-    void visit(ImageElement element, String value) throws IOException {
-      if (element == ImageElement.DELEGATION_TOKEN_IDENTIFIER_RENEWER) {
-        delegationTokenRenewers.add(value);
-      }
-    }
-
-    @Override
-    void visitEnclosingElement(ImageElement element) throws IOException {
-    }
-
-    @Override
-    void visitEnclosingElement(ImageElement element, ImageElement key,
-        String value) throws IOException {
-    }
-
-    @Override
-    void leaveEnclosingElement() throws IOException {
-    }
-  }
-
-  @Test
-  public void outputOfTestVisitor() throws IOException {
-    File testFile = new File(ROOT, "/basicCheck");
-
-    try {
-      DFSTestUtil.copyFile(originalFsimage, testFile);
-      TestImageVisitor v = new TestImageVisitor();
-      OfflineImageViewer oiv = new OfflineImageViewer(testFile.getPath(), v, true);
-      oiv.go();
-
-      // Validated stored delegation token identifiers.
-      List<String> dtrs = v.getDelegationTokenRenewers();
-      assertEquals(1, dtrs.size());
-      assertEquals(TEST_RENEWER, dtrs.get(0));
-    } finally {
-      if(testFile.exists()) testFile.delete();
-    }
-    LOG.debug("Passed TestVisitor validation.");
-  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored
index c617432..a3f3511 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored
Binary files differ
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml
index 3a60b6d..c7fafcc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml
@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <EDITS>
-  <EDITS_VERSION>-51</EDITS_VERSION>
+  <EDITS_VERSION>-52</EDITS_VERSION>
   <RECORD>
     <OPCODE>OP_START_LOG_SEGMENT</OPCODE>
     <DATA>