YARN-321. Forwarding YARN-321 branch to latest trunk.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/YARN-321@1561449 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index b221d9d..09eaf6bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -148,6 +148,8 @@
     HDFS-5721. sharedEditsImage in Namenode#initializeSharedEdits() should be 
     closed before method returns. (Ted Yu via junping_du)
 
+    HDFS-5138. Support HDFS upgrade in HA. (atm via todd)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -594,6 +596,9 @@
     HDFS-5806. balancer should set SoTimeout to avoid indefinite hangs.
     (Nathan Roberts via Andrew Wang).
 
+    HDFS-5728. Block recovery will fail if the metafile does not have crc 
+    for all chunks of the block (Vinay via kihwal)
+
   BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS
 
     HDFS-4985. Add storage type to the protocol and expose it in block report
@@ -1134,6 +1139,9 @@
 
     HDFS-5789. Some of snapshot APIs missing checkOperation double check in fsn. (umamahesh)
 
+    HDFS-5343. When cat command is issued on snapshot files getting unexpected result.
+    (Sathish via umamahesh)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES
diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
index f971107..028e64c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
@@ -361,5 +361,10 @@
       <Class name="org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor" />
       <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
     </Match>
+    <Match>
+      <Class name="org.apache.hadoop.hdfs.DFSUtil"/>
+      <Method name="assertAllResultsEqual" />
+      <Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE" />
+    </Match>
 
  </FindBugsFilter>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
index 5dc1281..91ccc54 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.contrib.bkjournal;
 
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.JournalManager;
 import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
 import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
@@ -660,6 +662,37 @@
   }
 
   @Override
+  public void doPreUpgrade() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void doUpgrade(Storage storage) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getJournalCTime() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void doFinalize() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
+      int targetLayoutVersion) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void doRollback() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public void close() throws IOException {
     try {
       bkc.close();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
index 0a14e78..5611bb8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
@@ -316,7 +316,7 @@
     } catch (IOException ioe) {
       LOG.info("Got expected exception", ioe);
       GenericTestUtils.assertExceptionContains(
-          "Cannot start an HA namenode with name dirs that need recovery", ioe);
+          "storage directory does not exist or is not accessible", ioe);
     }
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index d90317d..73861bc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -792,6 +792,9 @@
             currentNode = blockSeekTo(pos);
           }
           int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
+          if (locatedBlocks.isLastBlockComplete()) {
+            realLen = (int) Math.min(realLen, locatedBlocks.getFileLength());
+          }
           int result = readBuffer(strategy, off, realLen, corruptedBlockMap);
           
           if (result >= 0) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 1d0421e..abddf1f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -43,6 +43,7 @@
 import java.security.SecureRandom;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -574,10 +575,24 @@
     }
     return ret;
   }
+  
+  /**
+   * Get all of the RPC addresses of the individual NNs in a given nameservice.
+   * 
+   * @param conf Configuration
+   * @param nsId the nameservice whose NNs addresses we want.
+   * @param defaultValue default address to return in case key is not found.
+   * @return A map from nnId -> RPC address of each NN in the nameservice.
+   */
+  public static Map<String, InetSocketAddress> getRpcAddressesForNameserviceId(
+      Configuration conf, String nsId, String defaultValue) {
+    return getAddressesForNameserviceId(conf, nsId, defaultValue,
+        DFS_NAMENODE_RPC_ADDRESS_KEY);
+  }
 
   private static Map<String, InetSocketAddress> getAddressesForNameserviceId(
       Configuration conf, String nsId, String defaultValue,
-      String[] keys) {
+      String... keys) {
     Collection<String> nnIds = getNameNodeIds(conf, nsId);
     Map<String, InetSocketAddress> ret = Maps.newHashMap();
     for (String nnId : emptyAsSingletonNull(nnIds)) {
@@ -1670,4 +1685,32 @@
     }
     return builder;
   }
+
+  /**
+   * Assert that all objects in the collection are equal. Returns silently if
+   * so, throws an AssertionError if any object is not equal. All null values
+   * are considered equal.
+   * 
+   * @param objects the collection of objects to check for equality.
+   */
+  public static void assertAllResultsEqual(Collection<?> objects) {
+    Object[] resultsArray = objects.toArray();
+    
+    if (resultsArray.length == 0)
+      return;
+    
+    for (int i = 0; i < resultsArray.length; i++) {
+      if (i == 0)
+        continue;
+      else {
+        Object currElement = resultsArray[i];
+        Object lastElement = resultsArray[i - 1];
+        if ((currElement == null && currElement != lastElement) ||
+            (currElement != null && !currElement.equals(lastElement))) {
+          throw new AssertionError("Not all elements match in results: " +
+            Arrays.toString(resultsArray));
+        }
+      }
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
index 7d53fb9..47ea821 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
@@ -26,22 +26,29 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
@@ -305,4 +312,55 @@
     DFSClient dfsClient = dfs.getClient();
     return RPC.getServerAddress(dfsClient.getNamenode());
   }
+  
+  /**
+   * Get an RPC proxy for each NN in an HA nameservice. Used when a given RPC
+   * call should be made on every NN in an HA nameservice, not just the active.
+   * 
+   * @param conf configuration
+   * @param nsId the nameservice to get all of the proxies for.
+   * @return a list of RPC proxies for each NN in the nameservice.
+   * @throws IOException in the event of error.
+   */
+  public static List<ClientProtocol> getProxiesForAllNameNodesInNameservice(
+      Configuration conf, String nsId) throws IOException {
+    Map<String, InetSocketAddress> nnAddresses =
+        DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null);
+    
+    List<ClientProtocol> namenodes = new ArrayList<ClientProtocol>();
+    for (InetSocketAddress nnAddress : nnAddresses.values()) {
+      NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
+      proxyInfo = NameNodeProxies.createNonHAProxy(conf,
+          nnAddress, ClientProtocol.class,
+          UserGroupInformation.getCurrentUser(), false);
+      namenodes.add(proxyInfo.getProxy());
+    }
+    return namenodes;
+  }
+  
+  /**
+   * Used to ensure that at least one of the given HA NNs is currently in the
+   * active state..
+   * 
+   * @param namenodes list of RPC proxies for each NN to check.
+   * @return true if at least one NN is active, false if all are in the standby state.
+   * @throws IOException in the event of error.
+   */
+  public static boolean isAtLeastOneActive(List<ClientProtocol> namenodes)
+      throws IOException {
+    for (ClientProtocol namenode : namenodes) {
+      try {
+        namenode.getFileInfo("/");
+        return true;
+      } catch (RemoteException re) {
+        IOException cause = re.unwrapRemoteException();
+        if (cause instanceof StandbyException) {
+          // This is expected to happen for a standby NN.
+        } else {
+          throw re;
+        }
+      }
+    }
+    return false;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
index 2f6baa3..a3a6387 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
@@ -27,6 +27,7 @@
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 
@@ -151,4 +152,17 @@
    * StringBuilder. This is displayed on the NN web UI.
    */
   public void appendReport(StringBuilder sb);
+
+  public ListenableFuture<Void> doPreUpgrade();
+
+  public ListenableFuture<Void> doUpgrade(StorageInfo sInfo);
+
+  public ListenableFuture<Void> doFinalize();
+
+  public ListenableFuture<Boolean> canRollBack(StorageInfo storage,
+      StorageInfo prevStorage, int targetLayoutVersion);
+
+  public ListenableFuture<Void> doRollback();
+
+  public ListenableFuture<Long> getJournalCTime();
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
index d891858..66a03c9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
@@ -29,6 +29,7 @@
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 
@@ -308,4 +309,71 @@
     }
     return QuorumCall.create(calls);
   }
+  
+  QuorumCall<AsyncLogger, Void> doPreUpgrade() {
+    Map<AsyncLogger, ListenableFuture<Void>> calls =
+        Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      ListenableFuture<Void> future =
+          logger.doPreUpgrade();
+      calls.put(logger, future);
+    }
+    return QuorumCall.create(calls);
+  }
+
+  public QuorumCall<AsyncLogger, Void> doUpgrade(StorageInfo sInfo) {
+    Map<AsyncLogger, ListenableFuture<Void>> calls =
+        Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      ListenableFuture<Void> future =
+          logger.doUpgrade(sInfo);
+      calls.put(logger, future);
+    }
+    return QuorumCall.create(calls);
+  }
+
+  public QuorumCall<AsyncLogger, Void> doFinalize() {
+    Map<AsyncLogger, ListenableFuture<Void>> calls =
+        Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      ListenableFuture<Void> future =
+          logger.doFinalize();
+      calls.put(logger, future);
+    }
+    return QuorumCall.create(calls);
+  }
+
+  public QuorumCall<AsyncLogger, Boolean> canRollBack(StorageInfo storage,
+      StorageInfo prevStorage, int targetLayoutVersion) {
+    Map<AsyncLogger, ListenableFuture<Boolean>> calls =
+        Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      ListenableFuture<Boolean> future =
+          logger.canRollBack(storage, prevStorage, targetLayoutVersion);
+      calls.put(logger, future);
+    }
+    return QuorumCall.create(calls);
+  }
+
+  public QuorumCall<AsyncLogger, Void> doRollback() {
+    Map<AsyncLogger, ListenableFuture<Void>> calls =
+        Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      ListenableFuture<Void> future =
+          logger.doRollback();
+      calls.put(logger, future);
+    }
+    return QuorumCall.create(calls);
+  }
+
+  public QuorumCall<AsyncLogger, Long> getJournalCTime() {
+    Map<AsyncLogger, ListenableFuture<Long>> calls =
+        Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      ListenableFuture<Long> future = logger.getJournalCTime();
+      calls.put(logger, future);
+    }
+    return QuorumCall.create(calls);
+  }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
index 3731f5a..2f1bff1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
@@ -46,6 +46,7 @@
 import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
 import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.qjournal.server.GetJournalEditServlet;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -564,6 +565,72 @@
       }
     });
   }
+  
+  @Override
+  public ListenableFuture<Void> doPreUpgrade() {
+    return executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws IOException {
+        getProxy().doPreUpgrade(journalId);
+        return null;
+      }
+    });
+  }
+  
+  @Override
+  public ListenableFuture<Void> doUpgrade(final StorageInfo sInfo) {
+    return executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws IOException {
+        getProxy().doUpgrade(journalId, sInfo);
+        return null;
+      }
+    });
+  }
+  
+  @Override
+  public ListenableFuture<Void> doFinalize() {
+    return executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws IOException {
+        getProxy().doFinalize(journalId);
+        return null;
+      }
+    });
+  }
+  
+  @Override
+  public ListenableFuture<Boolean> canRollBack(final StorageInfo storage,
+      final StorageInfo prevStorage, final int targetLayoutVersion) {
+    return executor.submit(new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws IOException {
+        return getProxy().canRollBack(journalId, storage, prevStorage,
+            targetLayoutVersion);
+      }
+    });
+  }
+
+  @Override
+  public ListenableFuture<Void> doRollback() {
+    return executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws IOException {
+        getProxy().doRollback(journalId);
+        return null;
+      }
+    });
+  }
+  
+  @Override
+  public ListenableFuture<Long> getJournalCTime() {
+    return executor.submit(new Callable<Long>() {
+      @Override
+      public Long call() throws IOException {
+        return getProxy().getJournalCTime(journalId);
+      }
+    });
+  }
 
   @Override
   public String toString() {
@@ -636,4 +703,5 @@
   private boolean hasHttpServerEndPoint() {
    return httpServerURL != null;
   }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
index 9f2cd56..befb876 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
@@ -34,10 +34,13 @@
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
 import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
 import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
@@ -77,8 +80,14 @@
   // Since these don't occur during normal operation, we can
   // use rather lengthy timeouts, and don't need to make them
   // configurable.
-  private static final int FORMAT_TIMEOUT_MS = 60000;
-  private static final int HASDATA_TIMEOUT_MS = 60000;
+  private static final int FORMAT_TIMEOUT_MS            = 60000;
+  private static final int HASDATA_TIMEOUT_MS           = 60000;
+  private static final int CAN_ROLL_BACK_TIMEOUT_MS     = 60000;
+  private static final int FINALIZE_TIMEOUT_MS          = 60000;
+  private static final int PRE_UPGRADE_TIMEOUT_MS       = 60000;
+  private static final int ROLL_BACK_TIMEOUT_MS         = 60000;
+  private static final int UPGRADE_TIMEOUT_MS           = 60000;
+  private static final int GET_JOURNAL_CTIME_TIMEOUT_MS = 60000;
   
   private final Configuration conf;
   private final URI uri;
@@ -492,4 +501,131 @@
     return loggers;
   }
 
+  @Override
+  public void doPreUpgrade() throws IOException {
+    QuorumCall<AsyncLogger, Void> call = loggers.doPreUpgrade();
+    try {
+      call.waitFor(loggers.size(), loggers.size(), 0, PRE_UPGRADE_TIMEOUT_MS,
+          "doPreUpgrade");
+      
+      if (call.countExceptions() > 0) {
+        call.rethrowException("Could not do pre-upgrade of one or more JournalNodes");
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted waiting for doPreUpgrade() response");
+    } catch (TimeoutException e) {
+      throw new IOException("Timed out waiting for doPreUpgrade() response");
+    }
+  }
+
+  @Override
+  public void doUpgrade(Storage storage) throws IOException {
+    QuorumCall<AsyncLogger, Void> call = loggers.doUpgrade(storage);
+    try {
+      call.waitFor(loggers.size(), loggers.size(), 0, UPGRADE_TIMEOUT_MS,
+          "doUpgrade");
+      
+      if (call.countExceptions() > 0) {
+        call.rethrowException("Could not perform upgrade of one or more JournalNodes");
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted waiting for doUpgrade() response");
+    } catch (TimeoutException e) {
+      throw new IOException("Timed out waiting for doUpgrade() response");
+    }
+  }
+  
+  @Override
+  public void doFinalize() throws IOException {
+    QuorumCall<AsyncLogger, Void> call = loggers.doFinalize();
+    try {
+      call.waitFor(loggers.size(), loggers.size(), 0, FINALIZE_TIMEOUT_MS,
+          "doFinalize");
+      
+      if (call.countExceptions() > 0) {
+        call.rethrowException("Could not finalize one or more JournalNodes");
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted waiting for doFinalize() response");
+    } catch (TimeoutException e) {
+      throw new IOException("Timed out waiting for doFinalize() response");
+    }
+  }
+  
+  @Override
+  public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
+      int targetLayoutVersion) throws IOException {
+    QuorumCall<AsyncLogger, Boolean> call = loggers.canRollBack(storage,
+        prevStorage, targetLayoutVersion);
+    try {
+      call.waitFor(loggers.size(), loggers.size(), 0, CAN_ROLL_BACK_TIMEOUT_MS,
+          "lockSharedStorage");
+      
+      if (call.countExceptions() > 0) {
+        call.rethrowException("Could not check if roll back possible for"
+            + " one or more JournalNodes");
+      }
+      
+      // Either they all return the same thing or this call fails, so we can
+      // just return the first result.
+      DFSUtil.assertAllResultsEqual(call.getResults().values());
+      for (Boolean result : call.getResults().values()) {
+        return result;
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted waiting for lockSharedStorage() " +
+          "response");
+    } catch (TimeoutException e) {
+      throw new IOException("Timed out waiting for lockSharedStorage() " +
+          "response");
+    }
+    
+    throw new AssertionError("Unreachable code.");
+  }
+
+  @Override
+  public void doRollback() throws IOException {
+    QuorumCall<AsyncLogger, Void> call = loggers.doRollback();
+    try {
+      call.waitFor(loggers.size(), loggers.size(), 0, ROLL_BACK_TIMEOUT_MS,
+          "doRollback");
+      
+      if (call.countExceptions() > 0) {
+        call.rethrowException("Could not perform rollback of one or more JournalNodes");
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted waiting for doFinalize() response");
+    } catch (TimeoutException e) {
+      throw new IOException("Timed out waiting for doFinalize() response");
+    }
+  }
+  
+  @Override
+  public long getJournalCTime() throws IOException {
+    QuorumCall<AsyncLogger, Long> call = loggers.getJournalCTime();
+    try {
+      call.waitFor(loggers.size(), loggers.size(), 0,
+          GET_JOURNAL_CTIME_TIMEOUT_MS, "getJournalCTime");
+      
+      if (call.countExceptions() > 0) {
+        call.rethrowException("Could not journal CTime for one "
+            + "more JournalNodes");
+      }
+      
+      // Either they all return the same thing or this call fails, so we can
+      // just return the first result.
+      DFSUtil.assertAllResultsEqual(call.getResults().values());
+      for (Long result : call.getResults().values()) {
+        return result;
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted waiting for getJournalCTime() " +
+          "response");
+    } catch (TimeoutException e) {
+      throw new IOException("Timed out waiting for getJournalCTime() " +
+          "response");
+    }
+    
+    throw new AssertionError("Unreachable code.");
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
index 41600ac..c7ab691 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
@@ -29,6 +29,7 @@
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
 import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.JournalManager;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.security.KerberosInfo;
@@ -143,4 +144,17 @@
    */
   public void acceptRecovery(RequestInfo reqInfo,
       SegmentStateProto stateToAccept, URL fromUrl) throws IOException;
+
+  public void doPreUpgrade(String journalId) throws IOException;
+
+  public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException;
+
+  public void doFinalize(String journalId) throws IOException;
+
+  public Boolean canRollBack(String journalId, StorageInfo storage,
+      StorageInfo prevStorage, int targetLayoutVersion) throws IOException;
+
+  public void doRollback(String journalId) throws IOException;
+
+  public Long getJournalCTime(String journalId) throws IOException;
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
index 47d8100..b118aba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
@@ -17,17 +17,35 @@
  */
 package org.apache.hadoop.hdfs.qjournal.protocolPB;
 
+import java.io.IOException;
+import java.net.URL;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.CanRollBackRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.CanRollBackResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoFinalizeRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoFinalizeResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoPreUpgradeRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoPreUpgradeResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoRollbackRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoRollbackResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoUpgradeRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoUpgradeResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatRequestProto;
@@ -39,8 +57,6 @@
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
-import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
-import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsRequestProto;
@@ -48,13 +64,11 @@
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
-import java.io.IOException;
-import java.net.URL;
-
 /**
  * Implementation for protobuf service that forwards requests
  * received on {@link JournalProtocolPB} to the 
@@ -244,4 +258,79 @@
         reqInfo.hasCommittedTxId() ?
           reqInfo.getCommittedTxId() : HdfsConstants.INVALID_TXID);
   }
+
+
+  @Override
+  public DoPreUpgradeResponseProto doPreUpgrade(RpcController controller,
+      DoPreUpgradeRequestProto request) throws ServiceException {
+    try {
+      impl.doPreUpgrade(convert(request.getJid()));
+      return DoPreUpgradeResponseProto.getDefaultInstance();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public DoUpgradeResponseProto doUpgrade(RpcController controller,
+      DoUpgradeRequestProto request) throws ServiceException {
+    try {
+      impl.doUpgrade(convert(request.getJid()),
+          PBHelper.convert(request.getSInfo()));
+      return DoUpgradeResponseProto.getDefaultInstance();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public DoFinalizeResponseProto doFinalize(RpcController controller,
+      DoFinalizeRequestProto request) throws ServiceException {
+    try {
+      impl.doFinalize(convert(request.getJid()));
+      return DoFinalizeResponseProto.getDefaultInstance();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public CanRollBackResponseProto canRollBack(RpcController controller,
+      CanRollBackRequestProto request) throws ServiceException {
+    try {
+      Boolean result = impl.canRollBack(convert(request.getJid()),
+          PBHelper.convert(request.getStorage()),
+          PBHelper.convert(request.getPrevStorage()),
+          request.getTargetLayoutVersion());
+      return CanRollBackResponseProto.newBuilder()
+          .setCanRollBack(result)
+          .build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public DoRollbackResponseProto doRollback(RpcController controller, DoRollbackRequestProto request)
+      throws ServiceException {
+    try {
+      impl.doRollback(convert(request.getJid()));
+      return DoRollbackResponseProto.getDefaultInstance();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public GetJournalCTimeResponseProto getJournalCTime(RpcController controller,
+      GetJournalCTimeRequestProto request) throws ServiceException {
+    try {
+      Long resultCTime = impl.getJournalCTime(convert(request.getJid()));
+      return GetJournalCTimeResponseProto.newBuilder()
+          .setResultCTime(resultCTime)
+          .build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
index f111933..6127478 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
@@ -23,13 +23,23 @@
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.CanRollBackRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.CanRollBackResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoFinalizeRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoPreUpgradeRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoRollbackRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoUpgradeRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatRequestProto;
@@ -39,7 +49,6 @@
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
-import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsRequestProto;
@@ -47,6 +56,7 @@
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.ipc.ProtobufHelper;
@@ -277,4 +287,85 @@
         RPC.getProtocolVersion(QJournalProtocolPB.class), methodName);
   }
 
+  @Override
+  public void doPreUpgrade(String jid) throws IOException {
+    try {
+      rpcProxy.doPreUpgrade(NULL_CONTROLLER,
+          DoPreUpgradeRequestProto.newBuilder()
+            .setJid(convertJournalId(jid))
+            .build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException {
+    try {
+      rpcProxy.doUpgrade(NULL_CONTROLLER,
+          DoUpgradeRequestProto.newBuilder()
+            .setJid(convertJournalId(journalId))
+            .setSInfo(PBHelper.convert(sInfo))
+            .build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+  
+  @Override
+  public void doFinalize(String jid) throws IOException {
+    try {
+      rpcProxy.doFinalize(NULL_CONTROLLER,
+          DoFinalizeRequestProto.newBuilder()
+            .setJid(convertJournalId(jid))
+            .build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public Boolean canRollBack(String journalId, StorageInfo storage,
+      StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
+    try {
+      CanRollBackResponseProto response = rpcProxy.canRollBack(
+          NULL_CONTROLLER,
+          CanRollBackRequestProto.newBuilder()
+            .setJid(convertJournalId(journalId))
+            .setStorage(PBHelper.convert(storage))
+            .setPrevStorage(PBHelper.convert(prevStorage))
+            .setTargetLayoutVersion(targetLayoutVersion)
+            .build());
+      return response.getCanRollBack();
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public void doRollback(String journalId) throws IOException {
+    try {
+      rpcProxy.doRollback(NULL_CONTROLLER,
+          DoRollbackRequestProto.newBuilder()
+            .setJid(convertJournalId(journalId))
+            .build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public Long getJournalCTime(String journalId) throws IOException {
+    try {
+      GetJournalCTimeResponseProto response = rpcProxy.getJournalCTime(
+          NULL_CONTROLLER,
+          GetJournalCTimeRequestProto.newBuilder()
+            .setJid(convertJournalId(journalId))
+            .build());
+      return response.getResultCTime();
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java
index 36135cb..2571670 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java
@@ -40,6 +40,7 @@
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
 import org.apache.hadoop.hdfs.server.namenode.GetImageServlet;
@@ -139,20 +140,26 @@
   private boolean checkStorageInfoOrSendError(JNStorage storage,
       HttpServletRequest request, HttpServletResponse response)
       throws IOException {
-    String myStorageInfoString = storage.toColonSeparatedString();
+    int myNsId = storage.getNamespaceID();
+    String myClusterId = storage.getClusterID();
+    
     String theirStorageInfoString = StringEscapeUtils.escapeHtml(
         request.getParameter(STORAGEINFO_PARAM));
 
-    if (theirStorageInfoString != null
-        && !myStorageInfoString.equals(theirStorageInfoString)) {
-      String msg = "This node has storage info '" + myStorageInfoString
-          + "' but the requesting node expected '"
-          + theirStorageInfoString + "'";
-      
-      response.sendError(HttpServletResponse.SC_FORBIDDEN, msg);
-      LOG.warn("Received an invalid request file transfer request from " +
-          request.getRemoteAddr() + ": " + msg);
-      return false;
+    if (theirStorageInfoString != null) {
+      int theirNsId = StorageInfo.getNsIdFromColonSeparatedString(
+          theirStorageInfoString);
+      String theirClusterId = StorageInfo.getClusterIdFromColonSeparatedString(
+          theirStorageInfoString);
+      if (myNsId != theirNsId || !myClusterId.equals(theirClusterId)) {
+        String msg = "This node has namespaceId '" + myNsId + " and clusterId '"
+            + myClusterId + "' but the requesting node expected '" + theirNsId
+            + "' and '" + theirClusterId + "'";
+        response.sendError(HttpServletResponse.SC_FORBIDDEN, msg);
+        LOG.warn("Received an invalid request file transfer request from " +
+            request.getRemoteAddr() + ": " + msg);
+        return false;
+      }
     }
     return true;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
index 347ac53..e972fe0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
@@ -130,6 +130,10 @@
     return new File(sd.getCurrentDir(), "paxos");
   }
   
+  File getRoot() {
+    return sd.getRoot();
+  }
+  
   /**
    * Remove any log files and associated paxos files which are older than
    * the given txid.
@@ -182,12 +186,15 @@
     unlockAll();
     sd.clearDirectory();
     writeProperties(sd);
+    createPaxosDir();
+    analyzeStorage();
+  }
+  
+  void createPaxosDir() throws IOException {
     if (!getPaxosDir().mkdirs()) {
       throw new IOException("Could not create paxos dir: " + getPaxosDir());
     }
-    analyzeStorage();
   }
-
   
   void analyzeStorage() throws IOException {
     this.state = sd.analyzeStorage(StartupOption.REGULAR, this);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
index cf83cef..e1bb69d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
@@ -37,12 +37,14 @@
 import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
 import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
 import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
@@ -73,7 +75,7 @@
  * Each such journal is entirely independent despite being hosted by
  * the same JVM.
  */
-class Journal implements Closeable {
+public class Journal implements Closeable {
   static final Log LOG = LogFactory.getLog(Journal.class);
 
 
@@ -122,8 +124,8 @@
    */
   private BestEffortLongFile committedTxnId;
   
-  private static final String LAST_PROMISED_FILENAME = "last-promised-epoch";
-  private static final String LAST_WRITER_EPOCH = "last-writer-epoch";
+  public static final String LAST_PROMISED_FILENAME = "last-promised-epoch";
+  public static final String LAST_WRITER_EPOCH = "last-writer-epoch";
   private static final String COMMITTED_TXID_FILENAME = "committed-txid";
   
   private final FileJournalManager fjm;
@@ -627,7 +629,7 @@
   }
 
   /**
-   * @see QJournalProtocol#getEditLogManifest(String, long)
+   * @see QJournalProtocol#getEditLogManifest(String, long, boolean)
    */
   public RemoteEditLogManifest getEditLogManifest(long sinceTxId,
       boolean inProgressOk) throws IOException {
@@ -728,7 +730,7 @@
   }
   
   /**
-   * @see QJournalProtocol#acceptRecovery(RequestInfo, SegmentStateProto, URL)
+   * @see QJournalProtocol#acceptRecovery(RequestInfo, QJournalProtocolProtos.SegmentStateProto, URL)
    */
   public synchronized void acceptRecovery(RequestInfo reqInfo,
       SegmentStateProto segment, URL fromUrl)
@@ -980,4 +982,62 @@
       }
     }
   }
+
+  public synchronized void doPreUpgrade() throws IOException {
+    storage.getJournalManager().doPreUpgrade();
+  }
+
+  public synchronized void doUpgrade(StorageInfo sInfo) throws IOException {
+    long oldCTime = storage.getCTime();
+    storage.cTime = sInfo.cTime;
+    int oldLV = storage.getLayoutVersion();
+    storage.layoutVersion = sInfo.layoutVersion;
+    LOG.info("Starting upgrade of edits directory: "
+        + ".\n   old LV = " + oldLV
+        + "; old CTime = " + oldCTime
+        + ".\n   new LV = " + storage.getLayoutVersion()
+        + "; new CTime = " + storage.getCTime());
+    storage.getJournalManager().doUpgrade(storage);
+    storage.createPaxosDir();
+    
+    // Copy over the contents of the epoch data files to the new dir.
+    File currentDir = storage.getSingularStorageDir().getCurrentDir();
+    File previousDir = storage.getSingularStorageDir().getPreviousDir();
+    
+    PersistentLongFile prevLastPromisedEpoch = new PersistentLongFile(
+        new File(previousDir, LAST_PROMISED_FILENAME), 0);
+    PersistentLongFile prevLastWriterEpoch = new PersistentLongFile(
+        new File(previousDir, LAST_WRITER_EPOCH), 0);
+    
+    lastPromisedEpoch = new PersistentLongFile(
+        new File(currentDir, LAST_PROMISED_FILENAME), 0);
+    lastWriterEpoch = new PersistentLongFile(
+        new File(currentDir, LAST_WRITER_EPOCH), 0);
+    
+    lastPromisedEpoch.set(prevLastPromisedEpoch.get());
+    lastWriterEpoch.set(prevLastWriterEpoch.get());
+  }
+
+  public synchronized void doFinalize() throws IOException {
+    LOG.info("Finalizing upgrade for journal " 
+          + storage.getRoot() + "."
+          + (storage.getLayoutVersion()==0 ? "" :
+            "\n   cur LV = " + storage.getLayoutVersion()
+            + "; cur CTime = " + storage.getCTime()));
+    storage.getJournalManager().doFinalize();
+  }
+
+  public Boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
+      int targetLayoutVersion) throws IOException {
+    return this.storage.getJournalManager().canRollBack(storage, prevStorage,
+        targetLayoutVersion);
+  }
+
+  public void doRollback() throws IOException {
+    storage.getJournalManager().doRollback();
+  }
+
+  public Long getJournalCTime() throws IOException {
+    return storage.getJournalManager().getJournalCTime();
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
index c43edb9..9f1665b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
@@ -35,6 +35,7 @@
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
 import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
@@ -285,4 +286,31 @@
     StringUtils.startupShutdownMessage(JournalNode.class, args, LOG);
     System.exit(ToolRunner.run(new JournalNode(), args));
   }
+
+  public void doPreUpgrade(String journalId) throws IOException {
+    getOrCreateJournal(journalId).doPreUpgrade();
+  }
+
+  public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException {
+    getOrCreateJournal(journalId).doUpgrade(sInfo);
+  }
+
+  public void doFinalize(String journalId) throws IOException {
+    getOrCreateJournal(journalId).doFinalize();
+  }
+
+  public Boolean canRollBack(String journalId, StorageInfo storage,
+      StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
+    return getOrCreateJournal(journalId).canRollBack(storage, prevStorage,
+        targetLayoutVersion);
+  }
+
+  public void doRollback(String journalId) throws IOException {
+    getOrCreateJournal(journalId).doRollback();
+  }
+
+  public Long getJournalCTime(String journalId) throws IOException {
+    return getOrCreateJournal(journalId).getJournalCTime();
+  }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
index 5749cc1..2dbda7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
@@ -37,6 +37,7 @@
 import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
 import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
 import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -205,4 +206,35 @@
         .acceptRecovery(reqInfo, log, fromUrl);
   }
 
+  @Override
+  public void doPreUpgrade(String journalId) throws IOException {
+    jn.doPreUpgrade(journalId);
+  }
+
+  @Override
+  public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException {
+    jn.doUpgrade(journalId, sInfo);
+  }
+
+  @Override
+  public void doFinalize(String journalId) throws IOException {
+    jn.doFinalize(journalId);
+  }
+
+  @Override
+  public Boolean canRollBack(String journalId, StorageInfo storage,
+      StorageInfo prevStorage, int targetLayoutVersion)
+      throws IOException {
+    return jn.canRollBack(journalId, storage, prevStorage, targetLayoutVersion);
+  }
+
+  @Override
+  public void doRollback(String journalId) throws IOException {
+    jn.doRollback(journalId);
+  }
+
+  @Override
+  public Long getJournalCTime(String journalId) throws IOException {
+    return jn.getJournalCTime(journalId);
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
index 3d661f1..f6189b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.server.common;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
@@ -26,26 +25,23 @@
 import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.util.VersionInfo;
 
-import com.google.common.base.Preconditions;
-
 import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
 
 
 
@@ -82,7 +78,6 @@
   public static final int[] LAYOUT_VERSIONS_203 = {-19, -31};
 
   public    static final String STORAGE_FILE_LOCK     = "in_use.lock";
-  protected static final String STORAGE_FILE_VERSION  = "VERSION";
   public    static final String STORAGE_DIR_CURRENT   = "current";
   public    static final String STORAGE_DIR_PREVIOUS  = "previous";
   public    static final String STORAGE_TMP_REMOVED   = "removed.tmp";
@@ -126,22 +121,24 @@
   
   private class DirIterator implements Iterator<StorageDirectory> {
     StorageDirType dirType;
+    boolean includeShared;
     int prevIndex; // for remove()
     int nextIndex; // for next()
     
-    DirIterator(StorageDirType dirType) {
+    DirIterator(StorageDirType dirType, boolean includeShared) {
       this.dirType = dirType;
       this.nextIndex = 0;
       this.prevIndex = 0;
+      this.includeShared = includeShared;
     }
     
     @Override
     public boolean hasNext() {
       if (storageDirs.isEmpty() || nextIndex >= storageDirs.size())
         return false;
-      if (dirType != null) {
+      if (dirType != null || !includeShared) {
         while (nextIndex < storageDirs.size()) {
-          if (getStorageDir(nextIndex).getStorageDirType().isOfType(dirType))
+          if (shouldReturnNextDir())
             break;
           nextIndex++;
         }
@@ -156,9 +153,9 @@
       StorageDirectory sd = getStorageDir(nextIndex);
       prevIndex = nextIndex;
       nextIndex++;
-      if (dirType != null) {
+      if (dirType != null || !includeShared) {
         while (nextIndex < storageDirs.size()) {
-          if (getStorageDir(nextIndex).getStorageDirType().isOfType(dirType))
+          if (shouldReturnNextDir())
             break;
           nextIndex++;
         }
@@ -172,6 +169,12 @@
       storageDirs.remove(prevIndex); // remove last returned element
       hasNext(); // reset nextIndex to correct place
     }
+    
+    private boolean shouldReturnNextDir() {
+      StorageDirectory sd = getStorageDir(nextIndex);
+      return (dirType == null || sd.getStorageDirType().isOfType(dirType)) &&
+          (includeShared || !sd.isShared());
+    }
   }
   
   /**
@@ -203,7 +206,27 @@
    * them via the Iterator
    */
   public Iterator<StorageDirectory> dirIterator(StorageDirType dirType) {
-    return new DirIterator(dirType);
+    return dirIterator(dirType, true);
+  }
+  
+  /**
+   * Return all entries in storageDirs, potentially excluding shared dirs.
+   * @param includeShared whether or not to include shared dirs.
+   * @return an iterator over the configured storage dirs.
+   */
+  public Iterator<StorageDirectory> dirIterator(boolean includeShared) {
+    return dirIterator(null, includeShared);
+  }
+  
+  /**
+   * @param dirType all entries will be of this type of dir
+   * @param includeShared true to include any shared directories,
+   *        false otherwise
+   * @return an iterator over the configured storage dirs.
+   */
+  public Iterator<StorageDirectory> dirIterator(StorageDirType dirType,
+      boolean includeShared) {
+    return new DirIterator(dirType, includeShared);
   }
   
   public Iterable<StorageDirectory> dirIterable(final StorageDirType dirType) {
@@ -233,7 +256,9 @@
   @InterfaceAudience.Private
   public static class StorageDirectory implements FormatConfirmable {
     final File root;              // root directory
-    final boolean useLock;        // flag to enable storage lock
+    // whether or not this dir is shared between two separate NNs for HA, or
+    // between multiple block pools in the case of federation.
+    final boolean isShared;
     final StorageDirType dirType; // storage dir type
     FileLock lock;                // storage lock
 
@@ -241,11 +266,11 @@
     
     public StorageDirectory(File dir) {
       // default dirType is null
-      this(dir, null, true);
+      this(dir, null, false);
     }
     
     public StorageDirectory(File dir, StorageDirType dirType) {
-      this(dir, dirType, true);
+      this(dir, dirType, false);
     }
     
     public void setStorageUuid(String storageUuid) {
@@ -260,14 +285,14 @@
      * Constructor
      * @param dir directory corresponding to the storage
      * @param dirType storage directory type
-     * @param useLock true - enables locking on the storage directory and false
-     *          disables locking
+     * @param isShared whether or not this dir is shared between two NNs. true
+     *          disables locking on the storage directory, false enables locking
      */
-    public StorageDirectory(File dir, StorageDirType dirType, boolean useLock) {
+    public StorageDirectory(File dir, StorageDirType dirType, boolean isShared) {
       this.root = dir;
       this.lock = null;
       this.dirType = dirType;
-      this.useLock = useLock;
+      this.isShared = isShared;
     }
     
     /**
@@ -621,6 +646,10 @@
       
       return true;
     }
+    
+    public boolean isShared() {
+      return isShared;
+    }
 
 
     /**
@@ -635,7 +664,7 @@
      * @throws IOException if locking fails
      */
     public void lock() throws IOException {
-      if (!useLock) {
+      if (isShared()) {
         LOG.info("Locking is disabled");
         return;
       }
@@ -890,22 +919,6 @@
   }
   
   /**
-   * Get common storage fields.
-   * Should be overloaded if additional fields need to be get.
-   * 
-   * @param props
-   * @throws IOException
-   */
-  protected void setFieldsFromProperties(
-      Properties props, StorageDirectory sd) throws IOException {
-    setLayoutVersion(props, sd);
-    setNamespaceID(props, sd);
-    setStorageType(props, sd);
-    setcTime(props, sd);
-    setClusterId(props, layoutVersion, sd);
-  }
-  
-  /**
    * Set common storage fields into the given properties object.
    * Should be overloaded if additional fields need to be set.
    * 
@@ -923,22 +936,29 @@
     }
     props.setProperty("cTime", String.valueOf(cTime));
   }
-
+  
   /**
-   * Read properties from the VERSION file in the given storage directory.
+   * Get common storage fields.
+   * Should be overloaded if additional fields need to be get.
+   * 
+   * @param props
+   * @throws IOException
    */
-  public void readProperties(StorageDirectory sd) throws IOException {
-    Properties props = readPropertiesFile(sd.getVersionFile());
-    setFieldsFromProperties(props, sd);
+  protected void setFieldsFromProperties(
+      Properties props, StorageDirectory sd) throws IOException {
+    super.setFieldsFromProperties(props, sd);
+    setStorageType(props, sd);
   }
-
-  /**
-   * Read properties from the the previous/VERSION file in the given storage directory.
-   */
-  public void readPreviousVersionProperties(StorageDirectory sd)
-      throws IOException {
-    Properties props = readPropertiesFile(sd.getPreviousVersionFile());
-    setFieldsFromProperties(props, sd);
+  
+  /** Validate and set storage type from {@link Properties}*/
+  protected void setStorageType(Properties props, StorageDirectory sd)
+      throws InconsistentFSStateException {
+    NodeType type = NodeType.valueOf(getProperty(props, sd, "storageType"));
+    if (!storageType.equals(type)) {
+      throw new InconsistentFSStateException(sd.root,
+          "node type is incompatible with others.");
+    }
+    storageType = type;
   }
 
   /**
@@ -947,10 +967,15 @@
   public void writeProperties(StorageDirectory sd) throws IOException {
     writeProperties(sd.getVersionFile(), sd);
   }
-
+  
   public void writeProperties(File to, StorageDirectory sd) throws IOException {
     Properties props = new Properties();
     setPropertiesFromFields(props, sd);
+    writeProperties(to, sd, props);
+  }
+
+  public static void writeProperties(File to, StorageDirectory sd,
+      Properties props) throws IOException {
     RandomAccessFile file = new RandomAccessFile(to, "rws");
     FileOutputStream out = null;
     try {
@@ -977,23 +1002,6 @@
       file.close();
     }
   }
-  
-  public static Properties readPropertiesFile(File from) throws IOException {
-    RandomAccessFile file = new RandomAccessFile(from, "rws");
-    FileInputStream in = null;
-    Properties props = new Properties();
-    try {
-      in = new FileInputStream(file.getFD());
-      file.seek(0);
-      props.load(in);
-    } finally {
-      if (in != null) {
-        in.close();
-      }
-      file.close();
-    }
-    return props;
-  }
 
   public static void rename(File from, File to) throws IOException {
     if (!from.renameTo(to))
@@ -1044,69 +1052,6 @@
       + "-" + Long.toString(storage.getCTime());
   }
   
-  String getProperty(Properties props, StorageDirectory sd,
-      String name) throws InconsistentFSStateException {
-    String property = props.getProperty(name);
-    if (property == null) {
-      throw new InconsistentFSStateException(sd.root, "file "
-          + STORAGE_FILE_VERSION + " has " + name + " missing.");
-    }
-    return property;
-  }
-  
-  /** Validate and set storage type from {@link Properties}*/
-  protected void setStorageType(Properties props, StorageDirectory sd)
-      throws InconsistentFSStateException {
-    NodeType type = NodeType.valueOf(getProperty(props, sd, "storageType"));
-    if (!storageType.equals(type)) {
-      throw new InconsistentFSStateException(sd.root,
-          "node type is incompatible with others.");
-    }
-    storageType = type;
-  }
-  
-  /** Validate and set ctime from {@link Properties}*/
-  protected void setcTime(Properties props, StorageDirectory sd)
-      throws InconsistentFSStateException {
-    cTime = Long.parseLong(getProperty(props, sd, "cTime"));
-  }
-
-  /** Validate and set clusterId from {@link Properties}*/
-  protected void setClusterId(Properties props, int layoutVersion,
-      StorageDirectory sd) throws InconsistentFSStateException {
-    // Set cluster ID in version that supports federation
-    if (LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
-      String cid = getProperty(props, sd, "clusterID");
-      if (!(clusterID.equals("") || cid.equals("") || clusterID.equals(cid))) {
-        throw new InconsistentFSStateException(sd.getRoot(),
-            "cluster Id is incompatible with others.");
-      }
-      clusterID = cid;
-    }
-  }
-  
-  /** Validate and set layout version from {@link Properties}*/
-  protected void setLayoutVersion(Properties props, StorageDirectory sd)
-      throws IncorrectVersionException, InconsistentFSStateException {
-    int lv = Integer.parseInt(getProperty(props, sd, "layoutVersion"));
-    if (lv < HdfsConstants.LAYOUT_VERSION) { // future version
-      throw new IncorrectVersionException(lv, "storage directory "
-          + sd.root.getAbsolutePath());
-    }
-    layoutVersion = lv;
-  }
-  
-  /** Validate and set namespaceID version from {@link Properties}*/
-  protected void setNamespaceID(Properties props, StorageDirectory sd)
-      throws InconsistentFSStateException {
-    int nsId = Integer.parseInt(getProperty(props, sd, "namespaceID"));
-    if (namespaceID != 0 && nsId != 0 && namespaceID != nsId) {
-      throw new InconsistentFSStateException(sd.root,
-          "namespaceID is incompatible with others.");
-    }
-    namespaceID = nsId;
-  }
-  
   public static boolean is203LayoutVersion(int layoutVersion) {
     for (int lv203 : LAYOUT_VERSIONS_203) {
       if (lv203 == layoutVersion) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
index 1dc8340..59f3d99 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
@@ -17,9 +17,17 @@
  */
 package org.apache.hadoop.hdfs.server.common;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Properties;
+
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 
 import com.google.common.base.Joiner;
 
@@ -34,6 +42,8 @@
   public int   namespaceID;     // id of the file system
   public String clusterID;      // id of the cluster
   public long  cTime;           // creation time of the file system state
+  
+  protected static final String STORAGE_FILE_VERSION    = "VERSION";
  
   public StorageInfo () {
     this(0, 0, "", 0L);
@@ -96,4 +106,113 @@
     return Joiner.on(":").join(
         layoutVersion, namespaceID, cTime, clusterID);
   }
+  
+  public static int getNsIdFromColonSeparatedString(String in) {
+    return Integer.parseInt(in.split(":")[1]);
+  }
+  
+  public static String getClusterIdFromColonSeparatedString(String in) {
+    return in.split(":")[3];
+  }
+  
+  /**
+   * Read properties from the VERSION file in the given storage directory.
+   */
+  public void readProperties(StorageDirectory sd) throws IOException {
+    Properties props = readPropertiesFile(sd.getVersionFile());
+    setFieldsFromProperties(props, sd);
+  }
+  
+  /**
+   * Read properties from the the previous/VERSION file in the given storage directory.
+   */
+  public void readPreviousVersionProperties(StorageDirectory sd)
+      throws IOException {
+    Properties props = readPropertiesFile(sd.getPreviousVersionFile());
+    setFieldsFromProperties(props, sd);
+  }
+  
+  /**
+   * Get common storage fields.
+   * Should be overloaded if additional fields need to be get.
+   * 
+   * @param props
+   * @throws IOException
+   */
+  protected void setFieldsFromProperties(
+      Properties props, StorageDirectory sd) throws IOException {
+    setLayoutVersion(props, sd);
+    setNamespaceID(props, sd);
+    setcTime(props, sd);
+    setClusterId(props, layoutVersion, sd);
+  }
+  
+  /** Validate and set ctime from {@link Properties}*/
+  protected void setcTime(Properties props, StorageDirectory sd)
+      throws InconsistentFSStateException {
+    cTime = Long.parseLong(getProperty(props, sd, "cTime"));
+  }
+
+  /** Validate and set clusterId from {@link Properties}*/
+  protected void setClusterId(Properties props, int layoutVersion,
+      StorageDirectory sd) throws InconsistentFSStateException {
+    // Set cluster ID in version that supports federation
+    if (LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
+      String cid = getProperty(props, sd, "clusterID");
+      if (!(clusterID.equals("") || cid.equals("") || clusterID.equals(cid))) {
+        throw new InconsistentFSStateException(sd.getRoot(),
+            "cluster Id is incompatible with others.");
+      }
+      clusterID = cid;
+    }
+  }
+  
+  /** Validate and set layout version from {@link Properties}*/
+  protected void setLayoutVersion(Properties props, StorageDirectory sd)
+      throws IncorrectVersionException, InconsistentFSStateException {
+    int lv = Integer.parseInt(getProperty(props, sd, "layoutVersion"));
+    if (lv < HdfsConstants.LAYOUT_VERSION) { // future version
+      throw new IncorrectVersionException(lv, "storage directory "
+          + sd.root.getAbsolutePath());
+    }
+    layoutVersion = lv;
+  }
+  
+  /** Validate and set namespaceID version from {@link Properties}*/
+  protected void setNamespaceID(Properties props, StorageDirectory sd)
+      throws InconsistentFSStateException {
+    int nsId = Integer.parseInt(getProperty(props, sd, "namespaceID"));
+    if (namespaceID != 0 && nsId != 0 && namespaceID != nsId) {
+      throw new InconsistentFSStateException(sd.root,
+          "namespaceID is incompatible with others.");
+    }
+    namespaceID = nsId;
+  }
+  
+  static String getProperty(Properties props, StorageDirectory sd,
+      String name) throws InconsistentFSStateException {
+    String property = props.getProperty(name);
+    if (property == null) {
+      throw new InconsistentFSStateException(sd.root, "file "
+          + STORAGE_FILE_VERSION + " has " + name + " missing.");
+    }
+    return property;
+  }
+  
+  public static Properties readPropertiesFile(File from) throws IOException {
+    RandomAccessFile file = new RandomAccessFile(from, "rws");
+    FileInputStream in = null;
+    Properties props = new Properties();
+    try {
+      in = new FileInputStream(file.getFD());
+      file.seek(0);
+      props.load(in);
+    } finally {
+      if (in != null) {
+        in.close();
+      }
+      file.close();
+    }
+    return props;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
index 8558e95..2497621 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
@@ -103,7 +103,7 @@
         dataDirs.size());
     for (Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
       File dataDir = it.next();
-      StorageDirectory sd = new StorageDirectory(dataDir, null, false);
+      StorageDirectory sd = new StorageDirectory(dataDir, null, true);
       StorageState curState;
       try {
         curState = sd.analyzeStorage(startOpt, this);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index ce3dccd..ed9ba58 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -23,6 +23,7 @@
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.RandomAccessFile;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DU;
@@ -191,7 +192,7 @@
             blockFile.length(), genStamp, volume, blockFile.getParentFile());
       } else {
         newReplica = new ReplicaWaitingToBeRecovered(blockId,
-            validateIntegrity(blockFile, genStamp), 
+            validateIntegrityAndSetLength(blockFile, genStamp), 
             genStamp, volume, blockFile.getParentFile());
       }
 
@@ -214,7 +215,7 @@
    * @param genStamp generation stamp of the block
    * @return the number of valid bytes
    */
-  private long validateIntegrity(File blockFile, long genStamp) {
+  private long validateIntegrityAndSetLength(File blockFile, long genStamp) {
     DataInputStream checksumIn = null;
     InputStream blockIn = null;
     try {
@@ -257,11 +258,25 @@
       IOUtils.readFully(blockIn, buf, 0, lastChunkSize);
 
       checksum.update(buf, 0, lastChunkSize);
+      long validFileLength;
       if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc
-        return lastChunkStartPos + lastChunkSize;
+        validFileLength = lastChunkStartPos + lastChunkSize;
       } else { // last chunck is corrupt
-        return lastChunkStartPos;
+        validFileLength = lastChunkStartPos;
       }
+
+      // truncate if extra bytes are present without CRC
+      if (blockFile.length() > validFileLength) {
+        RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
+        try {
+          // truncate blockFile
+          blockRAF.setLength(validFileLength);
+        } finally {
+          blockRAF.close();
+        }
+      }
+
+      return validFileLength;
     } catch (IOException e) {
       FsDatasetImpl.LOG.warn(e);
       return 0;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
index 5420b12..2547d88 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
@@ -20,6 +20,8 @@
 import java.io.IOException;
 import java.util.Collection;
 
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@@ -97,4 +99,35 @@
   public String toString() {
     return "BackupJournalManager";
   }
+  
+  @Override
+  public void doPreUpgrade() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void doUpgrade(Storage storage) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+  
+  @Override
+  public void doFinalize() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
+      int targetLayoutVersion) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void doRollback() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getJournalCTime() throws IOException {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
index 5e7740c..6a1afb7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
@@ -36,6 +36,7 @@
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
 import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
@@ -367,7 +368,7 @@
     } else {
       nsInfo.validateStorage(storage);
     }
-    bnImage.initEditLog();
+    bnImage.initEditLog(StartupOption.REGULAR);
     setRegistration();
     NamenodeRegistration nnReg = null;
     while(!isStopRequested()) {
@@ -423,7 +424,8 @@
     return DFSUtil.getBackupNameServiceId(conf);
   }
 
-  protected HAState createHAState() {
+  @Override
+  protected HAState createHAState(StartupOption startOpt) {
     return new BackupState();
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index d98dad1..d43a55bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -42,6 +42,7 @@
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddBlockOp;
@@ -252,10 +253,12 @@
       if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
         StorageDirectory sd = storage.getStorageDirectory(u);
         if (sd != null) {
-          journalSet.add(new FileJournalManager(conf, sd, storage), required);
+          journalSet.add(new FileJournalManager(conf, sd, storage),
+              required, sharedEditsDirs.contains(u));
         }
       } else {
-        journalSet.add(createJournal(u), required);
+        journalSet.add(createJournal(u), required,
+            sharedEditsDirs.contains(u));
       }
     }
  
@@ -1339,6 +1342,58 @@
     }
   }
   
+  public long getSharedLogCTime() throws IOException {
+    for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
+      if (jas.isShared()) {
+        return jas.getManager().getJournalCTime();
+      }
+    }
+    throw new IOException("No shared log found.");
+  }
+  
+  public synchronized void doPreUpgradeOfSharedLog() throws IOException {
+    for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
+      if (jas.isShared()) {
+        jas.getManager().doPreUpgrade();
+      }
+    }
+  }
+  
+  public synchronized void doUpgradeOfSharedLog() throws IOException {
+    for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
+      if (jas.isShared()) {
+        jas.getManager().doUpgrade(storage);
+      }
+    }
+  }
+  
+  public synchronized void doFinalizeOfSharedLog() throws IOException {
+    for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
+      if (jas.isShared()) {
+        jas.getManager().doFinalize();
+      }
+    }
+  }
+  
+  public synchronized boolean canRollBackSharedLog(Storage prevStorage,
+      int targetLayoutVersion) throws IOException {
+    for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
+      if (jas.isShared()) {
+        return jas.getManager().canRollBack(storage, prevStorage,
+            targetLayoutVersion);
+      }
+    }
+    throw new IOException("No shared log found.");
+  }
+  
+  public synchronized void doRollback() throws IOException {
+    for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
+      if (jas.isShared()) {
+        jas.getManager().doRollback();
+      }
+    }
+  }
+  
   @Override
   public void selectInputStreams(Collection<EditLogInputStream> streams,
       long fromTxId, boolean inProgressOk) throws IOException {
@@ -1470,4 +1525,5 @@
                                          + uri, e);
     }
   }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index cc4ca0c..166ffb2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -178,7 +178,8 @@
    * @return true if the image needs to be saved or false otherwise
    */
   boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target,
-      MetaRecoveryContext recovery) throws IOException {
+      MetaRecoveryContext recovery)
+      throws IOException {
     assert startOpt != StartupOption.FORMAT : 
       "NameNode formatting should be performed before reading the image";
     
@@ -252,14 +253,14 @@
       doImportCheckpoint(target);
       return false; // import checkpoint saved image already
     case ROLLBACK:
-      doRollback();
-      break;
+      throw new AssertionError("Rollback is now a standalone command, " +
+          "NameNode should not be starting with this option.");
     case REGULAR:
     default:
       // just load the image
     }
     
-    return loadFSImage(target, recovery);
+    return loadFSImage(target, recovery, startOpt);
   }
   
   /**
@@ -272,17 +273,15 @@
   private boolean recoverStorageDirs(StartupOption startOpt,
       Map<StorageDirectory, StorageState> dataDirStates) throws IOException {
     boolean isFormatted = false;
+    // This loop needs to be over all storage dirs, even shared dirs, to make
+    // sure that we properly examine their state, but we make sure we don't
+    // mutate the shared dir below in the actual loop.
     for (Iterator<StorageDirectory> it = 
                       storage.dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
       StorageState curState;
       try {
         curState = sd.analyzeStorage(startOpt, storage);
-        String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
-        if (curState != StorageState.NORMAL && HAUtil.isHAEnabled(conf, nameserviceId)) {
-          throw new IOException("Cannot start an HA namenode with name dirs " +
-              "that need recovery. Dir: " + sd + " state: " + curState);
-        }
         // sd is locked but not opened
         switch(curState) {
         case NON_EXISTENT:
@@ -294,7 +293,7 @@
         case NORMAL:
           break;
         default:  // recovery is possible
-          sd.doRecover(curState);      
+          sd.doRecover(curState);
         }
         if (curState != StorageState.NOT_FORMATTED 
             && startOpt != StartupOption.ROLLBACK) {
@@ -315,10 +314,10 @@
     return isFormatted;
   }
 
-  private void doUpgrade(FSNamesystem target) throws IOException {
+  void doUpgrade(FSNamesystem target) throws IOException {
     // Upgrade is allowed only if there are 
-    // no previous fs states in any of the directories
-    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+    // no previous fs states in any of the local directories
+    for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
       StorageDirectory sd = it.next();
       if (sd.getPreviousDir().exists())
         throw new InconsistentFSStateException(sd.getRoot(),
@@ -327,9 +326,9 @@
     }
 
     // load the latest image
-    this.loadFSImage(target, null);
-
     // Do upgrade for each directory
+    this.loadFSImage(target, null, StartupOption.UPGRADE);
+    
     long oldCTime = storage.getCTime();
     storage.cTime = now();  // generate new cTime for the state
     int oldLV = storage.getLayoutVersion();
@@ -337,28 +336,17 @@
     
     List<StorageDirectory> errorSDs =
       Collections.synchronizedList(new ArrayList<StorageDirectory>());
-    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+    assert !editLog.isSegmentOpen() : "Edits log must not be open.";
+    LOG.info("Starting upgrade of local storage directories."
+        + "\n   old LV = " + oldLV
+        + "; old CTime = " + oldCTime
+        + ".\n   new LV = " + storage.getLayoutVersion()
+        + "; new CTime = " + storage.getCTime());
+    // Do upgrade for each directory
+    for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
       StorageDirectory sd = it.next();
-      LOG.info("Starting upgrade of image directory " + sd.getRoot()
-               + ".\n   old LV = " + oldLV
-               + "; old CTime = " + oldCTime
-               + ".\n   new LV = " + storage.getLayoutVersion()
-               + "; new CTime = " + storage.getCTime());
       try {
-        File curDir = sd.getCurrentDir();
-        File prevDir = sd.getPreviousDir();
-        File tmpDir = sd.getPreviousTmp();
-        assert curDir.exists() : "Current directory must exist.";
-        assert !prevDir.exists() : "previous directory must not exist.";
-        assert !tmpDir.exists() : "previous.tmp directory must not exist.";
-        assert !editLog.isSegmentOpen() : "Edits log must not be open.";
-
-        // rename current to tmp
-        NNStorage.rename(curDir, tmpDir);
-        
-        if (!curDir.mkdir()) {
-          throw new IOException("Cannot create directory " + curDir);
-        }
+        NNUpgradeUtil.doPreUpgrade(sd);
       } catch (Exception e) {
         LOG.error("Failed to move aside pre-upgrade storage " +
             "in image directory " + sd.getRoot(), e);
@@ -366,41 +354,38 @@
         continue;
       }
     }
+    if (target.isHaEnabled()) {
+      editLog.doPreUpgradeOfSharedLog();
+    }
     storage.reportErrorsOnDirectories(errorSDs);
     errorSDs.clear();
 
     saveFSImageInAllDirs(target, editLog.getLastWrittenTxId());
 
-    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+    for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
       StorageDirectory sd = it.next();
       try {
-        // Write the version file, since saveFsImage above only makes the
-        // fsimage_<txid>, and the directory is otherwise empty.
-        storage.writeProperties(sd);
-        
-        File prevDir = sd.getPreviousDir();
-        File tmpDir = sd.getPreviousTmp();
-        // rename tmp to previous
-        NNStorage.rename(tmpDir, prevDir);
+        NNUpgradeUtil.doUpgrade(sd, storage);
       } catch (IOException ioe) {
-        LOG.error("Unable to rename temp to previous for " + sd.getRoot(), ioe);
         errorSDs.add(sd);
         continue;
       }
-      LOG.info("Upgrade of " + sd.getRoot() + " is complete.");
+    }
+    if (target.isHaEnabled()) {
+      editLog.doUpgradeOfSharedLog();
     }
     storage.reportErrorsOnDirectories(errorSDs);
-
+    
     isUpgradeFinalized = false;
     if (!storage.getRemovedStorageDirs().isEmpty()) {
-      //during upgrade, it's a fatal error to fail any storage directory
+      // during upgrade, it's a fatal error to fail any storage directory
       throw new IOException("Upgrade failed in "
           + storage.getRemovedStorageDirs().size()
           + " storage directory(ies), previously logged.");
     }
   }
 
-  private void doRollback() throws IOException {
+  void doRollback(FSNamesystem fsns) throws IOException {
     // Rollback is allowed only if there is 
     // a previous fs states in at least one of the storage directories.
     // Directories that don't have previous state do not rollback
@@ -408,85 +393,46 @@
     FSImage prevState = new FSImage(conf);
     try {
       prevState.getStorage().layoutVersion = HdfsConstants.LAYOUT_VERSION;
-      for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+      for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
         StorageDirectory sd = it.next();
-        File prevDir = sd.getPreviousDir();
-        if (!prevDir.exists()) {  // use current directory then
-          LOG.info("Storage directory " + sd.getRoot()
-            + " does not contain previous fs state.");
-          // read and verify consistency with other directories
-          storage.readProperties(sd);
+        if (!NNUpgradeUtil.canRollBack(sd, storage, prevState.getStorage(),
+            HdfsConstants.LAYOUT_VERSION)) {
           continue;
         }
-
-        // read and verify consistency of the prev dir
-        prevState.getStorage().readPreviousVersionProperties(sd);
-
-        if (prevState.getLayoutVersion() != HdfsConstants.LAYOUT_VERSION) {
-          throw new IOException(
-            "Cannot rollback to storage version " +
-                prevState.getLayoutVersion() +
-                " using this version of the NameNode, which uses storage version " +
-                HdfsConstants.LAYOUT_VERSION + ". " +
-              "Please use the previous version of HDFS to perform the rollback.");
-        }
         canRollback = true;
       }
+      
+      if (fsns.isHaEnabled()) {
+        // If HA is enabled, check if the shared log can be rolled back as well.
+        editLog.initJournalsForWrite();
+        canRollback |= editLog.canRollBackSharedLog(prevState.getStorage(),
+            HdfsConstants.LAYOUT_VERSION);
+      }
+      
       if (!canRollback)
         throw new IOException("Cannot rollback. None of the storage "
             + "directories contain previous fs state.");
-
+  
       // Now that we know all directories are going to be consistent
       // Do rollback for each directory containing previous state
-      for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+      for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
         StorageDirectory sd = it.next();
-        File prevDir = sd.getPreviousDir();
-        if (!prevDir.exists())
-          continue;
-
         LOG.info("Rolling back storage directory " + sd.getRoot()
-          + ".\n   new LV = " + prevState.getStorage().getLayoutVersion()
-          + "; new CTime = " + prevState.getStorage().getCTime());
-        File tmpDir = sd.getRemovedTmp();
-        assert !tmpDir.exists() : "removed.tmp directory must not exist.";
-        // rename current to tmp
-        File curDir = sd.getCurrentDir();
-        assert curDir.exists() : "Current directory must exist.";
-        NNStorage.rename(curDir, tmpDir);
-        // rename previous to current
-        NNStorage.rename(prevDir, curDir);
-
-        // delete tmp dir
-        NNStorage.deleteDir(tmpDir);
-        LOG.info("Rollback of " + sd.getRoot()+ " is complete.");
+                 + ".\n   new LV = " + prevState.getStorage().getLayoutVersion()
+                 + "; new CTime = " + prevState.getStorage().getCTime());
+        NNUpgradeUtil.doRollBack(sd);
       }
+      if (fsns.isHaEnabled()) {
+        // If HA is enabled, try to roll back the shared log as well.
+        editLog.doRollback();
+      }
+      
       isUpgradeFinalized = true;
     } finally {
       prevState.close();
     }
   }
 
-  private void doFinalize(StorageDirectory sd) throws IOException {
-    File prevDir = sd.getPreviousDir();
-    if (!prevDir.exists()) { // already discarded
-      LOG.info("Directory " + prevDir + " does not exist.");
-      LOG.info("Finalize upgrade for " + sd.getRoot()+ " is not required.");
-      return;
-    }
-    LOG.info("Finalizing upgrade for storage directory " 
-             + sd.getRoot() + "."
-             + (storage.getLayoutVersion()==0 ? "" :
-                   "\n   cur LV = " + storage.getLayoutVersion()
-                   + "; cur CTime = " + storage.getCTime()));
-    assert sd.getCurrentDir().exists() : "Current directory must exist.";
-    final File tmpDir = sd.getFinalizedTmp();
-    // rename previous to tmp and remove
-    NNStorage.rename(prevDir, tmpDir);
-    NNStorage.deleteDir(tmpDir);
-    isUpgradeFinalized = true;
-    LOG.info("Finalize upgrade for " + sd.getRoot()+ " is complete.");
-  }
-
   /**
    * Load image from a checkpoint directory and save it into the current one.
    * @param target the NameSystem to import into
@@ -521,7 +467,7 @@
     // return back the real image
     realImage.getStorage().setStorageInfo(ckptImage.getStorage());
     realImage.getEditLog().setNextTxId(ckptImage.getEditLog().getLastWrittenTxId()+1);
-    realImage.initEditLog();
+    realImage.initEditLog(StartupOption.IMPORT);
 
     target.dir.fsImage = realImage;
     realImage.getStorage().setBlockPoolID(ckptImage.getBlockPoolID());
@@ -530,12 +476,23 @@
     saveNamespace(target);
     getStorage().writeAll();
   }
-
-  void finalizeUpgrade() throws IOException {
-    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+  
+  void finalizeUpgrade(boolean finalizeEditLog) throws IOException {
+    LOG.info("Finalizing upgrade for local dirs. " +
+        (storage.getLayoutVersion() == 0 ? "" : 
+          "\n   cur LV = " + storage.getLayoutVersion()
+          + "; cur CTime = " + storage.getCTime()));
+    for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
       StorageDirectory sd = it.next();
-      doFinalize(sd);
+      NNUpgradeUtil.doFinalize(sd);
     }
+    if (finalizeEditLog) {
+      // We only do this in the case that HA is enabled and we're active. In any
+      // other case the NN will have done the upgrade of the edits directories
+      // already by virtue of the fact that they're local.
+      editLog.doFinalizeOfSharedLog();
+    }
+    isUpgradeFinalized = true;
   }
 
   boolean isUpgradeFinalized() {
@@ -582,8 +539,8 @@
    * @return whether the image should be saved
    * @throws IOException
    */
-  boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery)
-      throws IOException {
+  boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery,
+      StartupOption startOpt) throws IOException {
     FSImageStorageInspector inspector = storage.readAndInspectDirs();
     FSImageFile imageFile = null;
     
@@ -600,7 +557,7 @@
 
     Iterable<EditLogInputStream> editStreams = null;
 
-    initEditLog();
+    initEditLog(startOpt);
 
     if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT, 
                                getLayoutVersion())) {
@@ -682,14 +639,30 @@
     }
   }
 
-  public void initEditLog() {
+  public void initEditLog(StartupOption startOpt) throws IOException {
     Preconditions.checkState(getNamespaceID() != 0,
         "Must know namespace ID before initting edit log");
     String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
     if (!HAUtil.isHAEnabled(conf, nameserviceId)) {
+      // If this NN is not HA
       editLog.initJournalsForWrite();
       editLog.recoverUnclosedStreams();
+    } else if (HAUtil.isHAEnabled(conf, nameserviceId) &&
+        startOpt == StartupOption.UPGRADE) {
+      // This NN is HA, but we're doing an upgrade so init the edit log for
+      // write.
+      editLog.initJournalsForWrite();
+      long sharedLogCTime = editLog.getSharedLogCTime();
+      if (this.storage.getCTime() < sharedLogCTime) {
+        throw new IOException("It looks like the shared log is already " +
+            "being upgraded but this NN has not been upgraded yet. You " +
+            "should restart this NameNode with the '" +
+            StartupOption.BOOTSTRAPSTANDBY.getName() + "' option to bring " +
+            "this NN in sync with the other.");
+      }
+      editLog.recoverUnclosedStreams();
     } else {
+      // This NN is HA and we're not doing an upgrade.
       editLog.initSharedJournalsForRead();
     }
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 537aa72..f1bf5d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -549,6 +549,10 @@
     return leaseManager;
   }
   
+  boolean isHaEnabled() {
+    return haEnabled;
+  }
+  
   /**
    * Check the supplied configuration for correctness.
    * @param conf Supplies the configuration to validate.
@@ -878,7 +882,7 @@
       }
       // This will start a new log segment and write to the seen_txid file, so
       // we shouldn't do it when coming up in standby state
-      if (!haEnabled) {
+      if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)) {
         fsImage.openEditLogForWrite();
       }
       success = true;
@@ -1005,6 +1009,7 @@
 
         dir.fsImage.editLog.openForWrite();
       }
+      
       if (haEnabled) {
         // Renew all of the leases before becoming active.
         // This is because, while we were in standby mode,
@@ -1031,14 +1036,17 @@
     }
   }
   
+  private boolean inActiveState() {
+    return haContext != null &&
+        haContext.getState().getServiceState() == HAServiceState.ACTIVE;
+  }
+  
   /**
    * @return Whether the namenode is transitioning to active state and is in the
    *         middle of the {@link #startActiveServices()}
    */
   public boolean inTransitionToActive() {
-    return haEnabled && haContext != null
-        && haContext.getState().getServiceState() == HAServiceState.ACTIVE
-        && startingActiveService;
+    return haEnabled && inActiveState() && startingActiveService;
   }
 
   private boolean shouldUseDelegationTokens() {
@@ -4512,11 +4520,11 @@
     
   void finalizeUpgrade() throws IOException {
     checkSuperuserPrivilege();
-    checkOperation(OperationCategory.WRITE);
+    checkOperation(OperationCategory.UNCHECKED);
     writeLock();
     try {
-      checkOperation(OperationCategory.WRITE);
-      getFSImage().finalizeUpgrade();
+      checkOperation(OperationCategory.UNCHECKED);
+      getFSImage().finalizeUpgrade(this.isHaEnabled() && inActiveState());
     } finally {
       writeUnlock();
     }
@@ -7421,5 +7429,6 @@
       logger.addAppender(asyncAppender);        
     }
   }
+
 }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
index 09bddef..4c78add 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
@@ -33,14 +33,15 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -489,4 +490,49 @@
                            isInProgress(), hasCorruptHeader);
     }
   }
+  
+  @Override
+  public void doPreUpgrade() throws IOException {
+    LOG.info("Starting upgrade of edits directory " + sd.getRoot());
+    try {
+     NNUpgradeUtil.doPreUpgrade(sd);
+    } catch (IOException ioe) {
+     LOG.error("Failed to move aside pre-upgrade storage " +
+         "in image directory " + sd.getRoot(), ioe);
+     throw ioe;
+    }
+  }
+  
+  /**
+   * This method assumes that the fields of the {@link Storage} object have
+   * already been updated to the appropriate new values for the upgrade.
+   */
+  @Override
+  public void doUpgrade(Storage storage) throws IOException {
+    NNUpgradeUtil.doUpgrade(sd, storage);
+  }
+  
+  @Override
+  public void doFinalize() throws IOException {
+    NNUpgradeUtil.doFinalize(sd);
+  }
+
+  @Override
+  public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
+      int targetLayoutVersion) throws IOException {
+    return NNUpgradeUtil.canRollBack(sd, storage,
+        prevStorage, targetLayoutVersion);
+  }
+
+  @Override
+  public void doRollback() throws IOException {
+    NNUpgradeUtil.doRollBack(sd);
+  }
+
+  @Override
+  public long getJournalCTime() throws IOException {
+    StorageInfo sInfo = new StorageInfo();
+    sInfo.readProperties(sd);
+    return sInfo.getCTime();
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
index 785c1fe..a50b3aa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
@@ -22,7 +22,9 @@
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 
 /**
@@ -64,6 +66,54 @@
    * Recover segments which have not been finalized.
    */
   void recoverUnfinalizedSegments() throws IOException;
+  
+  /**
+   * Perform any steps that must succeed across all JournalManagers involved in
+   * an upgrade before proceeding onto the actual upgrade stage. If a call to
+   * any JM's doPreUpgrade method fails, then doUpgrade will not be called for
+   * any JM.
+   */
+  void doPreUpgrade() throws IOException;
+  
+  /**
+   * Perform the actual upgrade of the JM. After this is completed, the NN can
+   * begin to use the new upgraded metadata. This metadata may later be either
+   * finalized or rolled back to the previous state.
+   * 
+   * @param storage info about the new upgraded versions.
+   */
+  void doUpgrade(Storage storage) throws IOException;
+  
+  /**
+   * Finalize the upgrade. JMs should purge any state that they had been keeping
+   * around during the upgrade process. After this is completed, rollback is no
+   * longer allowed.
+   */
+  void doFinalize() throws IOException;
+  
+  /**
+   * Return true if this JM can roll back to the previous storage state, false
+   * otherwise. The NN will refuse to run the rollback operation unless at least
+   * one JM or fsimage storage directory can roll back.
+   * 
+   * @param storage the storage info for the current state
+   * @param prevStorage the storage info for the previous (unupgraded) state
+   * @param targetLayoutVersion the layout version we intend to roll back to
+   * @return true if this JM can roll back, false otherwise.
+   */
+  boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
+      int targetLayoutVersion) throws IOException;
+  
+  /**
+   * Perform the rollback to the previous FS state. JMs which do not need to
+   * roll back their state should just return without error.
+   */
+  void doRollback() throws IOException;
+  
+  /**
+   * @return the CTime of the journal manager.
+   */
+  long getJournalCTime() throws IOException;
 
   /**
    * Close the journal manager, freeing any resources it may hold.
@@ -84,4 +134,5 @@
       super(reason);
     }
   }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
index b117606..7bf5015 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
@@ -33,6 +33,8 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
@@ -77,11 +79,14 @@
     private final JournalManager journal;
     private boolean disabled = false;
     private EditLogOutputStream stream;
-    private boolean required = false;
+    private final boolean required;
+    private final boolean shared;
     
-    public JournalAndStream(JournalManager manager, boolean required) {
+    public JournalAndStream(JournalManager manager, boolean required,
+        boolean shared) {
       this.journal = manager;
       this.required = required;
+      this.shared = shared;
     }
 
     public void startLogSegment(long txId) throws IOException {
@@ -163,6 +168,10 @@
     public boolean isRequired() {
       return required;
     }
+    
+    public boolean isShared() {
+      return shared;
+    }
   }
  
   // COW implementation is necessary since some users (eg the web ui) call
@@ -178,7 +187,7 @@
   
   @Override
   public void format(NamespaceInfo nsInfo) throws IOException {
-    // The iteration is done by FSEditLog itself
+    // The operation is done by FSEditLog itself
     throw new UnsupportedOperationException();
   }
 
@@ -537,9 +546,13 @@
     }
     return jList;
   }
-
+  
   void add(JournalManager j, boolean required) {
-    JournalAndStream jas = new JournalAndStream(j, required);
+    add(j, required, false);
+  }
+  
+  void add(JournalManager j, boolean required, boolean shared) {
+    JournalAndStream jas = new JournalAndStream(j, required, shared);
     journals.add(jas);
   }
   
@@ -655,4 +668,40 @@
     }
     return buf.toString();
   }
+
+  @Override
+  public void doPreUpgrade() throws IOException {
+    // This operation is handled by FSEditLog directly.
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void doUpgrade(Storage storage) throws IOException {
+    // This operation is handled by FSEditLog directly.
+    throw new UnsupportedOperationException();
+  }
+  
+  @Override
+  public void doFinalize() throws IOException {
+    // This operation is handled by FSEditLog directly.
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
+    // This operation is handled by FSEditLog directly.
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void doRollback() throws IOException {
+    // This operation is handled by FSEditLog directly.
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getJournalCTime() throws IOException {
+    // This operation is handled by FSEditLog directly.
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
index 21c56c2..ce23c57 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
@@ -299,7 +299,7 @@
       if(dirName.getScheme().compareTo("file") == 0) {
         this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
             dirType,
-            !sharedEditsDirs.contains(dirName))); // Don't lock the dir if it's shared.
+            sharedEditsDirs.contains(dirName))); // Don't lock the dir if it's shared.
       }
     }
 
@@ -310,7 +310,7 @@
       // URI is of type file://
       if(dirName.getScheme().compareTo("file") == 0)
         this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
-                    NameNodeDirType.EDITS, !sharedEditsDirs.contains(dirName)));
+                    NameNodeDirType.EDITS, sharedEditsDirs.contains(dirName)));
     }
   }
 
@@ -976,7 +976,7 @@
     StringBuilder layoutVersions = new StringBuilder();
 
     // First determine what range of layout versions we're going to inspect
-    for (Iterator<StorageDirectory> it = dirIterator();
+    for (Iterator<StorageDirectory> it = dirIterator(false);
          it.hasNext();) {
       StorageDirectory sd = it.next();
       if (!sd.getVersionFile().exists()) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNUpgradeUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNUpgradeUtil.java
new file mode 100644
index 0000000..1c491e5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNUpgradeUtil.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
+
+abstract class NNUpgradeUtil {
+  
+  private static final Log LOG = LogFactory.getLog(NNUpgradeUtil.class);
+  
+  /**
+   * Return true if this storage dir can roll back to the previous storage
+   * state, false otherwise. The NN will refuse to run the rollback operation
+   * unless at least one JM or fsimage storage directory can roll back.
+   * 
+   * @param storage the storage info for the current state
+   * @param prevStorage the storage info for the previous (unupgraded) state
+   * @param targetLayoutVersion the layout version we intend to roll back to
+   * @return true if this JM can roll back, false otherwise.
+   * @throws IOException in the event of error
+   */
+  static boolean canRollBack(StorageDirectory sd, StorageInfo storage,
+      StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
+    File prevDir = sd.getPreviousDir();
+    if (!prevDir.exists()) {  // use current directory then
+      LOG.info("Storage directory " + sd.getRoot()
+               + " does not contain previous fs state.");
+      // read and verify consistency with other directories
+      storage.readProperties(sd);
+      return false;
+    }
+
+    // read and verify consistency of the prev dir
+    prevStorage.readPreviousVersionProperties(sd);
+
+    if (prevStorage.getLayoutVersion() != targetLayoutVersion) {
+      throw new IOException(
+        "Cannot rollback to storage version " +
+        prevStorage.getLayoutVersion() +
+        " using this version of the NameNode, which uses storage version " +
+        targetLayoutVersion + ". " +
+        "Please use the previous version of HDFS to perform the rollback.");
+    }
+    
+    return true;
+  }
+
+  /**
+   * Finalize the upgrade. The previous dir, if any, will be renamed and
+   * removed. After this is completed, rollback is no longer allowed.
+   * 
+   * @param sd the storage directory to finalize
+   * @throws IOException in the event of error
+   */
+  static void doFinalize(StorageDirectory sd) throws IOException {
+    File prevDir = sd.getPreviousDir();
+    if (!prevDir.exists()) { // already discarded
+      LOG.info("Directory " + prevDir + " does not exist.");
+      LOG.info("Finalize upgrade for " + sd.getRoot()+ " is not required.");
+      return;
+    }
+    LOG.info("Finalizing upgrade of storage directory " + sd.getRoot());
+    assert sd.getCurrentDir().exists() : "Current directory must exist.";
+    final File tmpDir = sd.getFinalizedTmp();
+    // rename previous to tmp and remove
+    NNStorage.rename(prevDir, tmpDir);
+    NNStorage.deleteDir(tmpDir);
+    LOG.info("Finalize upgrade for " + sd.getRoot()+ " is complete.");
+  }
+  
+  /**
+   * Perform any steps that must succeed across all storage dirs/JournalManagers
+   * involved in an upgrade before proceeding onto the actual upgrade stage. If
+   * a call to any JM's or local storage dir's doPreUpgrade method fails, then
+   * doUpgrade will not be called for any JM. The existing current dir is
+   * renamed to previous.tmp, and then a new, empty current dir is created.
+   * 
+   * @param sd the storage directory to perform the pre-upgrade procedure.
+   * @throws IOException in the event of error
+   */
+  static void doPreUpgrade(StorageDirectory sd) throws IOException {
+    LOG.info("Starting upgrade of storage directory " + sd.getRoot());
+    File curDir = sd.getCurrentDir();
+    File prevDir = sd.getPreviousDir();
+    File tmpDir = sd.getPreviousTmp();
+    assert curDir.exists() : "Current directory must exist.";
+    assert !prevDir.exists() : "previous directory must not exist.";
+    assert !tmpDir.exists() : "previous.tmp directory must not exist.";
+
+    // rename current to tmp
+    NNStorage.rename(curDir, tmpDir);
+    
+    if (!curDir.mkdir()) {
+      throw new IOException("Cannot create directory " + curDir);
+    }
+  }
+  
+  /**
+   * Perform the upgrade of the storage dir to the given storage info. The new
+   * storage info is written into the current directory, and the previous.tmp
+   * directory is renamed to previous.
+   * 
+   * @param sd the storage directory to upgrade
+   * @param storage info about the new upgraded versions.
+   * @throws IOException in the event of error
+   */
+  static void doUpgrade(StorageDirectory sd, Storage storage) throws
+      IOException {
+    LOG.info("Performing upgrade of storage directory " + sd.getRoot());
+    try {
+      // Write the version file, since saveFsImage only makes the
+      // fsimage_<txid>, and the directory is otherwise empty.
+      storage.writeProperties(sd);
+      
+      File prevDir = sd.getPreviousDir();
+      File tmpDir = sd.getPreviousTmp();
+      // rename tmp to previous
+      NNStorage.rename(tmpDir, prevDir);
+    } catch (IOException ioe) {
+      LOG.error("Unable to rename temp to previous for " + sd.getRoot(), ioe);
+      throw ioe;
+    }
+  }
+
+  /**
+   * Perform rollback of the storage dir to the previous state. The existing
+   * current dir is removed, and the previous dir is renamed to current.
+   * 
+   * @param sd the storage directory to roll back.
+   * @throws IOException in the event of error
+   */
+  static void doRollBack(StorageDirectory sd)
+      throws IOException {
+    File prevDir = sd.getPreviousDir();
+    if (!prevDir.exists())
+      return;
+
+    File tmpDir = sd.getRemovedTmp();
+    assert !tmpDir.exists() : "removed.tmp directory must not exist.";
+    // rename current to tmp
+    File curDir = sd.getCurrentDir();
+    assert curDir.exists() : "Current directory must exist.";
+    NNStorage.rename(curDir, tmpDir);
+    // rename previous to current
+    NNStorage.rename(prevDir, curDir);
+
+    // delete tmp dir
+    NNStorage.deleteDir(tmpDir);
+    LOG.info("Rollback of " + sd.getRoot() + " is complete.");
+  }
+  
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index eb3755b..1d02e01 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -648,7 +648,7 @@
     String nsId = getNameServiceId(conf);
     String namenodeId = HAUtil.getNameNodeId(conf, nsId);
     this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
-    state = createHAState();
+    state = createHAState(getStartupOption(conf));
     this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
     this.haContext = createHAContext();
     try {
@@ -670,8 +670,12 @@
     }
   }
 
-  protected HAState createHAState() {
-    return !haEnabled ? ACTIVE_STATE : STANDBY_STATE;
+  protected HAState createHAState(StartupOption startOpt) {
+    if (!haEnabled || startOpt == StartupOption.UPGRADE) {
+      return ACTIVE_STATE;
+    } else {
+      return STANDBY_STATE;
+    }
   }
 
   protected HAContext createHAContext() {
@@ -1023,26 +1027,28 @@
       }
     }
   }
-
-  private static boolean finalize(Configuration conf,
-                               boolean isConfirmationNeeded
-                               ) throws IOException {
+  
+  @VisibleForTesting
+  public static boolean doRollback(Configuration conf,
+      boolean isConfirmationNeeded) throws IOException {
     String nsId = DFSUtil.getNamenodeNameServiceId(conf);
     String namenodeId = HAUtil.getNameNodeId(conf, nsId);
     initializeGenericKeys(conf, nsId, namenodeId);
 
     FSNamesystem nsys = new FSNamesystem(conf, new FSImage(conf));
     System.err.print(
-        "\"finalize\" will remove the previous state of the files system.\n"
-        + "Recent upgrade will become permanent.\n"
-        + "Rollback option will not be available anymore.\n");
+        "\"rollBack\" will remove the current state of the file system,\n"
+        + "returning you to the state prior to initiating your recent.\n"
+        + "upgrade. This action is permanent and cannot be undone. If you\n"
+        + "are performing a rollback in an HA environment, you should be\n"
+        + "certain that no NameNode process is running on any host.");
     if (isConfirmationNeeded) {
-      if (!confirmPrompt("Finalize filesystem state?")) {
-        System.err.println("Finalize aborted.");
+      if (!confirmPrompt("Roll back file system state?")) {
+        System.err.println("Rollback aborted.");
         return true;
       }
     }
-    nsys.dir.fsImage.finalizeUpgrade();
+    nsys.dir.fsImage.doRollback(nsys);
     return false;
   }
 
@@ -1206,14 +1212,6 @@
       return null;
     }
     setStartupOption(conf, startOpt);
-    
-    if (HAUtil.isHAEnabled(conf, DFSUtil.getNamenodeNameServiceId(conf)) &&
-        (startOpt == StartupOption.UPGRADE ||
-         startOpt == StartupOption.ROLLBACK ||
-         startOpt == StartupOption.FINALIZE)) {
-      throw new HadoopIllegalArgumentException("Invalid startup option. " +
-          "Cannot perform DFS upgrade with HA enabled.");
-    }
 
     switch (startOpt) {
       case FORMAT: {
@@ -1229,10 +1227,17 @@
         return null;
       }
       case FINALIZE: {
-        boolean aborted = finalize(conf, true);
-        terminate(aborted ? 1 : 0);
+        System.err.println("Use of the argument '" + StartupOption.FINALIZE +
+            "' is no longer supported. To finalize an upgrade, start the NN " +
+            " and then run `hdfs dfsadmin -finalizeUpgrade'");
+        terminate(1);
         return null; // avoid javac warning
       }
+      case ROLLBACK: {
+        boolean aborted = doRollback(conf, true);
+        terminate(aborted ? 1 : 0);
+        return null; // avoid warning
+      }
       case BOOTSTRAPSTANDBY: {
         String toolArgs[] = Arrays.copyOfRange(argv, 1, argv.length);
         int rc = BootstrapStandby.run(toolArgs, conf);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
index ac0761d..29b43cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
@@ -39,6 +39,7 @@
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -192,7 +193,7 @@
     FSImage image = new FSImage(conf);
     try {
       image.getStorage().setStorageInfo(storage);
-      image.initEditLog();
+      image.initEditLog(StartupOption.REGULAR);
       assert image.getEditLog().isOpenForRead() :
         "Expected edit log to be open for read";
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
index 09a4d70..9b5b2ab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
@@ -20,6 +20,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.net.URL;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
@@ -46,6 +47,7 @@
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
@@ -769,7 +771,24 @@
    */
   public int finalizeUpgrade() throws IOException {
     DistributedFileSystem dfs = getDFS();
-    dfs.finalizeUpgrade();
+    
+    Configuration dfsConf = dfs.getConf();
+    URI dfsUri = dfs.getUri();
+    boolean isHaEnabled = HAUtil.isLogicalUri(dfsConf, dfsUri);
+    if (isHaEnabled) {
+      // In the case of HA, run finalizeUpgrade for all NNs in this nameservice
+      String nsId = dfsUri.getHost();
+      List<ClientProtocol> namenodes =
+          HAUtil.getProxiesForAllNameNodesInNameservice(dfsConf, nsId);
+      if (!HAUtil.isAtLeastOneActive(namenodes)) {
+        throw new IOException("Cannot finalize with no NameNode active");
+      }
+      for (ClientProtocol haNn : namenodes) {
+        haNn.finalizeUpgrade();
+      }
+    } else {
+      dfs.finalizeUpgrade();
+    }
     
     return 0;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
index ae96375..cff6439 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
@@ -134,6 +134,72 @@
 }
 
 /**
+ * getJournalCTime()
+ */
+message GetJournalCTimeRequestProto {
+  required JournalIdProto jid = 1;
+}
+
+message GetJournalCTimeResponseProto {
+  required int64 resultCTime = 1;
+}
+
+/**
+ * doPreUpgrade()
+ */
+message DoPreUpgradeRequestProto {
+  required JournalIdProto jid = 1;
+}
+
+message DoPreUpgradeResponseProto {
+}
+
+/**
+ * doUpgrade()
+ */
+message DoUpgradeRequestProto {
+  required JournalIdProto jid = 1;
+  required StorageInfoProto sInfo = 2;
+}
+
+message DoUpgradeResponseProto {
+}
+
+/**
+ * doFinalize()
+ */
+message DoFinalizeRequestProto {
+  required JournalIdProto jid = 1;
+}
+
+message DoFinalizeResponseProto {
+}
+
+/**
+ * canRollBack()
+ */
+message CanRollBackRequestProto {
+  required JournalIdProto jid = 1;
+  required StorageInfoProto storage = 2;
+  required StorageInfoProto prevStorage = 3;
+  required int32 targetLayoutVersion = 4;
+}
+
+message CanRollBackResponseProto {
+  required bool canRollBack = 1;
+}
+
+/**
+ * doRollback()
+ */
+message DoRollbackRequestProto {
+  required JournalIdProto jid = 1;
+}
+
+message DoRollbackResponseProto {
+}
+
+/**
  * getJournalState()
  */
 message GetJournalStateRequestProto {
@@ -236,6 +302,18 @@
 service QJournalProtocolService {
   rpc isFormatted(IsFormattedRequestProto) returns (IsFormattedResponseProto);
 
+  rpc getJournalCTime(GetJournalCTimeRequestProto) returns (GetJournalCTimeResponseProto);
+  
+  rpc doPreUpgrade(DoPreUpgradeRequestProto) returns (DoPreUpgradeResponseProto);
+  
+  rpc doUpgrade(DoUpgradeRequestProto) returns (DoUpgradeResponseProto);
+
+  rpc doFinalize(DoFinalizeRequestProto) returns (DoFinalizeResponseProto);
+
+  rpc canRollBack(CanRollBackRequestProto) returns (CanRollBackResponseProto);
+
+  rpc doRollback(DoRollbackRequestProto) returns (DoRollbackResponseProto);
+
   rpc getJournalState(GetJournalStateRequestProto) returns (GetJournalStateResponseProto);
 
   rpc newEpoch(NewEpochRequestProto) returns (NewEpochResponseProto);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HDFSHighAvailabilityWithQJM.apt.vm b/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HDFSHighAvailabilityWithQJM.apt.vm
index 2aefc35..eccd705 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HDFSHighAvailabilityWithQJM.apt.vm
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HDFSHighAvailabilityWithQJM.apt.vm
@@ -765,3 +765,49 @@
   Even if automatic failover is configured, you may initiate a manual failover
   using the same <<<hdfs haadmin>>> command. It will perform a coordinated
   failover.
+
+* HDFS Upgrade/Finalization/Rollback with HA Enabled
+
+  When moving between versions of HDFS, sometimes the newer software can simply
+  be installed and the cluster restarted. Sometimes, however, upgrading the
+  version of HDFS you're running may require changing on-disk data. In this case,
+  one must use the HDFS Upgrade/Finalize/Rollback facility after installing the
+  new software. This process is made more complex in an HA environment, since the
+  on-disk metadata that the NN relies upon is by definition distributed, both on
+  the two HA NNs in the pair, and on the JournalNodes in the case that QJM is
+  being used for the shared edits storage. This documentation section describes
+  the procedure to use the HDFS Upgrade/Finalize/Rollback facility in an HA setup.
+
+  <<To perform an HA upgrade>>, the operator must do the following:
+
+    [[1]] Shut down all of the NNs as normal, and install the newer software.
+
+    [[2]] Start one of the NNs with the <<<'-upgrade'>>> flag.
+  
+    [[3]] On start, this NN will not enter the standby state as usual in an HA
+    setup. Rather, this NN will immediately enter the active state, perform an
+    upgrade of its local storage dirs, and also perform an upgrade of the shared
+    edit log.
+  
+    [[4]] At this point the other NN in the HA pair will be out of sync with
+    the upgraded NN. In order to bring it back in sync and once again have a highly
+    available setup, you should re-bootstrap this NameNode by running the NN with
+    the <<<'-bootstrapStandby'>>> flag. It is an error to start this second NN with
+    the <<<'-upgrade'>>> flag.
+  
+  Note that if at any time you want to restart the NameNodes before finalizing
+  or rolling back the upgrade, you should start the NNs as normal, i.e. without
+  any special startup flag.
+
+  <<To finalize an HA upgrade>>, the operator will use the <<<`hdfsadmin
+  dfsadmin -finalizeUpgrade'>>> command while the NNs are running and one of them
+  is active. The active NN at the time this happens will perform the finalization
+  of the shared log, and the NN whose local storage directories contain the
+  previous FS state will delete its local state.
+
+  <<To perform a rollback>> of an upgrade, both NNs should first be shut down.
+  The operator should run the roll back command on the NN where they initiated
+  the upgrade procedure, which will perform the rollback on the local dirs there,
+  as well as on the shared log, either NFS or on the JNs. Afterward, this NN
+  should be started and the operator should run <<<`-bootstrapStandby'>>> on the
+  other NN to bring the two NNs in sync with this rolled-back file system state.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index ea78be1..3e9b614 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -100,7 +100,6 @@
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.ProxyUsers;
-import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
@@ -147,6 +146,7 @@
     private boolean enableManagedDfsDirsRedundancy = true;
     private boolean manageDataDfsDirs = true;
     private StartupOption option = null;
+    private StartupOption dnOption = null;
     private String[] racks = null; 
     private String [] hosts = null;
     private long [] simulatedCapacities = null;
@@ -241,6 +241,14 @@
       this.option = val;
       return this;
     }
+    
+    /**
+     * Default: null
+     */
+    public Builder dnStartupOption(StartupOption val) {
+      this.dnOption = val;
+      return this;
+    }
 
     /**
      * Default: null
@@ -357,6 +365,7 @@
                        builder.enableManagedDfsDirsRedundancy,
                        builder.manageDataDfsDirs,
                        builder.option,
+                       builder.dnOption,
                        builder.racks,
                        builder.hosts,
                        builder.simulatedCapacities,
@@ -406,18 +415,24 @@
   /**
    * Stores the information related to a namenode in the cluster
    */
-  static class NameNodeInfo {
+  public static class NameNodeInfo {
     final NameNode nameNode;
     final Configuration conf;
     final String nameserviceId;
     final String nnId;
+    StartupOption startOpt;
     NameNodeInfo(NameNode nn, String nameserviceId, String nnId,
-        Configuration conf) {
+        StartupOption startOpt, Configuration conf) {
       this.nameNode = nn;
       this.nameserviceId = nameserviceId;
       this.nnId = nnId;
+      this.startOpt = startOpt;
       this.conf = conf;
     }
+    
+    public void setStartOpt(StartupOption startOpt) {
+      this.startOpt = startOpt;
+    }
   }
   
   /**
@@ -603,8 +618,8 @@
                         long[] simulatedCapacities) throws IOException {
     this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
     initMiniDFSCluster(conf, numDataNodes, StorageType.DEFAULT, format,
-        manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
-        operation, racks, hosts,
+        manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs, 
+        operation, null, racks, hosts,
         simulatedCapacities, null, true, false,
         MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false, false);
   }
@@ -613,7 +628,8 @@
       Configuration conf,
       int numDataNodes, StorageType storageType, boolean format, boolean manageNameDfsDirs,
       boolean manageNameDfsSharedDirs, boolean enableManagedDfsDirsRedundancy,
-      boolean manageDataDfsDirs, StartupOption operation, String[] racks,
+      boolean manageDataDfsDirs, StartupOption startOpt,
+      StartupOption dnStartOpt, String[] racks,
       String[] hosts, long[] simulatedCapacities, String clusterId,
       boolean waitSafeMode, boolean setupHostsFile,
       MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown,
@@ -662,7 +678,7 @@
       createNameNodesAndSetConf(
           nnTopology, manageNameDfsDirs, manageNameDfsSharedDirs,
           enableManagedDfsDirsRedundancy,
-          format, operation, clusterId, conf);
+          format, startOpt, clusterId, conf);
     } catch (IOException ioe) {
       LOG.error("IOE creating namenodes. Permissions dump:\n" +
           createPermissionsDiagnosisString(data_dir));
@@ -675,13 +691,15 @@
       }
     }
     
-    if (operation == StartupOption.RECOVER) {
+    if (startOpt == StartupOption.RECOVER) {
       return;
     }
 
     // Start the DataNodes
-    startDataNodes(conf, numDataNodes, storageType, manageDataDfsDirs, operation, racks,
-        hosts, simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, checkDataNodeHostConfig);
+    startDataNodes(conf, numDataNodes, storageType, manageDataDfsDirs,
+        dnStartOpt != null ? dnStartOpt : startOpt,
+        racks, hosts, simulatedCapacities, setupHostsFile,
+        checkDataNodeAddrConfig, checkDataNodeHostConfig);
     waitClusterUp();
     //make sure ProxyUsers uses the latest conf
     ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
@@ -759,6 +777,8 @@
         if (manageNameDfsSharedDirs) {
           URI sharedEditsUri = getSharedEditsDir(nnCounter, nnCounter+nnIds.size()-1); 
           conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString());
+          // Clean out the shared edits dir completely, including all subdirectories.
+          FileUtil.fullyDelete(new File(sharedEditsUri));
         }
       }
 
@@ -858,7 +878,8 @@
     URI srcDir = Lists.newArrayList(srcDirs).get(0);
     FileSystem dstFS = FileSystem.getLocal(dstConf).getRaw();
     for (URI dstDir : dstDirs) {
-      Preconditions.checkArgument(!dstDir.equals(srcDir));
+      Preconditions.checkArgument(!dstDir.equals(srcDir),
+          "src and dst are the same: " + dstDir);
       File dstDirF = new File(dstDir);
       if (dstDirF.exists()) {
         if (!FileUtil.fullyDelete(dstDirF)) {
@@ -892,6 +913,14 @@
     conf.set(key, "127.0.0.1:" + nnConf.getIpcPort());
   }
   
+  private static String[] createArgs(StartupOption operation) {
+    String[] args = (operation == null ||
+        operation == StartupOption.FORMAT ||
+        operation == StartupOption.REGULAR) ?
+            new String[] {} : new String[] {operation.getName()};
+    return args;
+  }
+  
   private void createNameNode(int nnIndex, Configuration conf,
       int numDataNodes, boolean format, StartupOption operation,
       String clusterId, String nameserviceId,
@@ -906,10 +935,7 @@
     }
     
     // Start the NameNode
-    String[] args = (operation == null ||
-                     operation == StartupOption.FORMAT ||
-                     operation == StartupOption.REGULAR) ?
-      new String[] {} : new String[] {operation.getName()};
+    String[] args = createArgs(operation);
     NameNode nn =  NameNode.createNameNode(args, conf);
     if (operation == StartupOption.RECOVER) {
       return;
@@ -931,7 +957,7 @@
     DFSUtil.setGenericConf(conf, nameserviceId, nnId,
         DFS_NAMENODE_HTTP_ADDRESS_KEY);
     nameNodes[nnIndex] = new NameNodeInfo(nn, nameserviceId, nnId,
-        new Configuration(conf));
+        operation, new Configuration(conf));
   }
 
   /**
@@ -1499,7 +1525,7 @@
       nn.stop();
       nn.join();
       Configuration conf = nameNodes[nnIndex].conf;
-      nameNodes[nnIndex] = new NameNodeInfo(null, null, null, conf);
+      nameNodes[nnIndex] = new NameNodeInfo(null, null, null, null, conf);
     }
   }
   
@@ -1545,10 +1571,12 @@
       throws IOException {
     String nameserviceId = nameNodes[nnIndex].nameserviceId;
     String nnId = nameNodes[nnIndex].nnId;
+    StartupOption startOpt = nameNodes[nnIndex].startOpt;
     Configuration conf = nameNodes[nnIndex].conf;
     shutdownNameNode(nnIndex);
-    NameNode nn = NameNode.createNameNode(new String[] {}, conf);
-    nameNodes[nnIndex] = new NameNodeInfo(nn, nameserviceId, nnId, conf);
+    NameNode nn = NameNode.createNameNode(createArgs(startOpt), conf);
+    nameNodes[nnIndex] = new NameNodeInfo(nn, nameserviceId, nnId, startOpt,
+        conf);
     if (waitActive) {
       waitClusterUp();
       LOG.info("Restarted the namenode");
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java
index 7715041..f88edcd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java
@@ -34,6 +34,7 @@
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.After;
 import org.junit.Test;
@@ -97,10 +98,10 @@
    * Attempts to start a NameNode with the given operation.  Starting
    * the NameNode should throw an exception.
    */
-  void startNameNodeShouldFail(StartupOption operation, String searchString) {
+  void startNameNodeShouldFail(String searchString) {
     try {
+      NameNode.doRollback(conf, false);
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
-                                                .startupOption(operation)
                                                 .format(false)
                                                 .manageDataDfsDirs(false)
                                                 .manageNameDfsDirs(false)
@@ -149,24 +150,19 @@
       log("Normal NameNode rollback", numDirs);
       UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
       UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
-                                                .format(false)
-                                                .manageDataDfsDirs(false)
-                                                .manageNameDfsDirs(false)
-                                                .startupOption(StartupOption.ROLLBACK)
-                                                .build();
+      NameNode.doRollback(conf, false);
       checkResult(NAME_NODE, nameNodeDirs);
-      cluster.shutdown();
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       
       log("Normal DataNode rollback", numDirs);
       UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
       UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
+      NameNode.doRollback(conf, false);
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
                                                 .format(false)
                                                 .manageDataDfsDirs(false)
                                                 .manageNameDfsDirs(false)
-                                                .startupOption(StartupOption.ROLLBACK)
+                                                .dnStartupOption(StartupOption.ROLLBACK)
                                                 .build();
       UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
       UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "previous");
@@ -179,11 +175,12 @@
       log("Normal BlockPool rollback", numDirs);
       UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
       UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
+      NameNode.doRollback(conf, false);
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
                                                 .format(false)
                                                 .manageDataDfsDirs(false)
                                                 .manageNameDfsDirs(false)
-                                                .startupOption(StartupOption.ROLLBACK)
+                                                .dnStartupOption(StartupOption.ROLLBACK)
                                                 .build();
       UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
       UpgradeUtilities.createBlockPoolStorageDirs(dataNodeDirs, "current",
@@ -217,10 +214,10 @@
       cluster.shutdown();
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       UpgradeUtilities.createEmptyDirs(dataNodeDirs);
-
+      
       log("NameNode rollback without existing previous dir", numDirs);
       UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
-      startNameNodeShouldFail(StartupOption.ROLLBACK,
+      startNameNodeShouldFail(
           "None of the storage directories contain previous fs state");
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       
@@ -237,15 +234,16 @@
       cluster.shutdown();
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       UpgradeUtilities.createEmptyDirs(dataNodeDirs);
-
+      
       log("DataNode rollback with future stored layout version in previous", numDirs);
       UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
       UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
+      NameNode.doRollback(conf, false);
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
                                                 .format(false)
                                                 .manageDataDfsDirs(false)
                                                 .manageNameDfsDirs(false)
-                                                .startupOption(StartupOption.ROLLBACK)
+                                                .dnStartupOption(StartupOption.ROLLBACK)
                                                 .build();
       UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
       baseDirs = UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "previous");
@@ -266,11 +264,12 @@
       log("DataNode rollback with newer fsscTime in previous", numDirs);
       UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
       UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
+      NameNode.doRollback(conf, false);
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
                                                 .format(false)
                                                 .manageDataDfsDirs(false)
                                                 .manageNameDfsDirs(false)
-                                                .startupOption(StartupOption.ROLLBACK)
+                                                .dnStartupOption(StartupOption.ROLLBACK)
                                                 .build();
       
       UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
@@ -287,21 +286,19 @@
       cluster.shutdown();
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       UpgradeUtilities.createEmptyDirs(dataNodeDirs);
-
+      
       log("NameNode rollback with no edits file", numDirs);
       UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
       baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
       deleteMatchingFiles(baseDirs, "edits.*");
-      startNameNodeShouldFail(StartupOption.ROLLBACK,
-          "Gap in transactions");
+      startNameNodeShouldFail("Gap in transactions");
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       
       log("NameNode rollback with no image file", numDirs);
       UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
       baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
       deleteMatchingFiles(baseDirs, "fsimage_.*");
-      startNameNodeShouldFail(StartupOption.ROLLBACK,
-          "No valid image files found");
+      startNameNodeShouldFail("No valid image files found");
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       
       log("NameNode rollback with corrupt version file", numDirs);
@@ -313,8 +310,7 @@
             "layoutVersion".getBytes(Charsets.UTF_8),
             "xxxxxxxxxxxxx".getBytes(Charsets.UTF_8));
       }
-      startNameNodeShouldFail(StartupOption.ROLLBACK,
-          "file VERSION has layoutVersion missing");
+      startNameNodeShouldFail("file VERSION has layoutVersion missing");
 
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
       
@@ -328,8 +324,7 @@
       
       UpgradeUtilities.createNameNodeVersionFile(conf, baseDirs,
           storageInfo, UpgradeUtilities.getCurrentBlockPoolID(cluster));
-      startNameNodeShouldFail(StartupOption.ROLLBACK,
-          "Cannot rollback to storage version 1 using this version");
+      startNameNodeShouldFail("Cannot rollback to storage version 1 using this version");
       UpgradeUtilities.createEmptyDirs(nameNodeDirs);
     } // end numDir loop
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
index 3a0134e..cf72e79 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
@@ -62,6 +62,7 @@
 import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Shell;
 import org.junit.Assume;
 import org.junit.Before;
@@ -764,4 +765,37 @@
     assertEquals(4*24*60*60*1000l, DFSUtil.parseRelativeTime("4d"));
     assertEquals(999*24*60*60*1000l, DFSUtil.parseRelativeTime("999d"));
   }
+  
+  @Test
+  public void testAssertAllResultsEqual() {
+    checkAllResults(new Long[]{}, true);
+    checkAllResults(new Long[]{1l}, true);
+    checkAllResults(new Long[]{1l, 1l}, true);
+    checkAllResults(new Long[]{1l, 1l, 1l}, true);
+    checkAllResults(new Long[]{new Long(1), new Long(1)}, true);
+    checkAllResults(new Long[]{null, null, null}, true);
+    
+    checkAllResults(new Long[]{1l, 2l}, false);
+    checkAllResults(new Long[]{2l, 1l}, false);
+    checkAllResults(new Long[]{1l, 2l, 1l}, false);
+    checkAllResults(new Long[]{2l, 1l, 1l}, false);
+    checkAllResults(new Long[]{1l, 1l, 2l}, false);
+    checkAllResults(new Long[]{1l, null}, false);
+    checkAllResults(new Long[]{null, 1l}, false);
+    checkAllResults(new Long[]{1l, null, 1l}, false);
+  }
+  
+  private static void checkAllResults(Long[] toCheck, boolean shouldSucceed) {
+    if (shouldSucceed) {
+      DFSUtil.assertAllResultsEqual(Arrays.asList(toCheck));
+    } else {
+      try {
+        DFSUtil.assertAllResultsEqual(Arrays.asList(toCheck));
+        fail("Should not have succeeded with input: " +
+            Arrays.toString(toCheck));
+      } catch (AssertionError ae) {
+        GenericTestUtils.assertExceptionContains("Not all elements match", ae);
+      }
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
index 2dbe6e8..91c9ba8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
@@ -19,20 +19,28 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Test;
 
 public class TestLeaseRecovery {
@@ -148,4 +156,55 @@
       if (cluster != null) {cluster.shutdown();}
     }
   }
+
+  /**
+   * Block Recovery when the meta file not having crcs for all chunks in block
+   * file
+   */
+  @Test
+  public void testBlockRecoveryWithLessMetafile() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+        UserGroupInformation.getCurrentUser().getShortUserName());
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+        .build();
+    Path file = new Path("/testRecoveryFile");
+    DistributedFileSystem dfs = cluster.getFileSystem();
+    FSDataOutputStream out = dfs.create(file);
+    int count = 0;
+    while (count < 2 * 1024 * 1024) {
+      out.writeBytes("Data");
+      count += 4;
+    }
+    out.hsync();
+    // abort the original stream
+    ((DFSOutputStream) out.getWrappedStream()).abort();
+
+    LocatedBlocks locations = cluster.getNameNodeRpc().getBlockLocations(
+        file.toString(), 0, count);
+    ExtendedBlock block = locations.get(0).getBlock();
+    DataNode dn = cluster.getDataNodes().get(0);
+    BlockLocalPathInfo localPathInfo = dn.getBlockLocalPathInfo(block, null);
+    File metafile = new File(localPathInfo.getMetaPath());
+    assertTrue(metafile.exists());
+
+    // reduce the block meta file size
+    RandomAccessFile raf = new RandomAccessFile(metafile, "rw");
+    raf.setLength(metafile.length() - 20);
+    raf.close();
+
+    // restart DN to make replica to RWR
+    DataNodeProperties dnProp = cluster.stopDataNode(0);
+    cluster.restartDataNode(dnProp, true);
+
+    // try to recover the lease
+    DistributedFileSystem newdfs = (DistributedFileSystem) FileSystem
+        .newInstance(cluster.getConfiguration(0));
+    count = 0;
+    while (++count < 10 && !newdfs.recoverLease(file)) {
+      Thread.sleep(1000);
+    }
+    assertTrue("File should be closed", newdfs.recoverLease(file));
+
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
index fab83b4..a4f67e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
@@ -167,8 +167,16 @@
     return new File(baseDir, "journalnode-" + idx).getAbsoluteFile();
   }
   
+  public File getJournalDir(int idx, String jid) {
+    return new File(getStorageDir(idx), jid);
+  }
+  
   public File getCurrentDir(int idx, String jid) {
-    return new File(new File(getStorageDir(idx), jid), "current");
+    return new File(getJournalDir(idx, jid), "current");
+  }
+  
+  public File getPreviousDir(int idx, String jid) {
+    return new File(getJournalDir(idx, jid), "previous");
   }
 
   public JournalNode getJournalNode(int i) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
index e5b636c..e782da2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
@@ -29,6 +29,7 @@
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
 
@@ -47,6 +48,7 @@
 
   public static class Builder {
     private final Configuration conf;
+    private StartupOption startOpt = null;
     private final MiniDFSCluster.Builder dfsBuilder;
     
     public Builder(Configuration conf) {
@@ -61,6 +63,10 @@
     public MiniQJMHACluster build() throws IOException {
       return new MiniQJMHACluster(this);
     }
+
+    public void startupOption(StartupOption startOpt) {
+      this.startOpt = startOpt;
+    }
   }
   
   public static MiniDFSNNTopology createDefaultTopology() {
@@ -95,6 +101,9 @@
     Configuration confNN0 = cluster.getConfiguration(0);
     NameNode.initializeSharedEdits(confNN0, true);
     
+    cluster.getNameNodeInfos()[0].setStartOpt(builder.startOpt);
+    cluster.getNameNodeInfos()[1].setStartOpt(builder.startOpt);
+    
     // restart the cluster
     cluster.restartNameNodes();
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java
index 43bd7a4..28d1dac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java
@@ -27,6 +27,8 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.junit.Test;
 
@@ -191,6 +193,29 @@
       shouldPromptCalled = true;
       return false;
     }
+
+    @Override
+    public void doPreUpgrade() throws IOException {}
+
+    @Override
+    public void doUpgrade(Storage storage) throws IOException {}
+
+    @Override
+    public void doFinalize() throws IOException {}
+
+    @Override
+    public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage, int targetLayoutVersion)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public void doRollback() throws IOException {}
+
+    @Override
+    public long getJournalCTime() throws IOException {
+      return -1;
+    }
   }
 
   public static class BadConstructorJournalManager extends DummyJournalManager {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java
index 502c9de..7abc502 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java
@@ -91,7 +91,7 @@
       fail("Did not throw");
     } catch (IOException ioe) {
       GenericTestUtils.assertExceptionContains(
-          "Cannot start an HA namenode with name dirs that need recovery",
+          "storage directory does not exist or is not accessible",
           ioe);
     }
     
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSUpgradeWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSUpgradeWithHA.java
index 4f213b2..c3a8674 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSUpgradeWithHA.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSUpgradeWithHA.java
@@ -1,89 +1,506 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.hdfs.server.namenode.ha;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
+import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster.Builder;
+import org.apache.hadoop.hdfs.qjournal.server.Journal;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.hadoop.hdfs.util.PersistentLongFile;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.collect.Lists;
+import com.google.common.base.Joiner;
 
 /**
  * Tests for upgrading with HA enabled.
  */
 public class TestDFSUpgradeWithHA {
-  
-  private static final Log LOG = LogFactory.getLog(TestDFSUpgradeWithHA.class);
 
+  private static final Log LOG = LogFactory.getLog(TestDFSUpgradeWithHA.class);
+  
+  private Configuration conf;
+  
+  @Before
+  public void createConfiguration() {
+    conf = new HdfsConfiguration();
+    // Turn off persistent IPC, so that the DFSClient can survive NN restart
+    conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
+        0);
+  }
+
+  private static void assertCTimesEqual(MiniDFSCluster cluster) {
+    long nn1CTime = cluster.getNamesystem(0).getFSImage().getStorage().getCTime();
+    long nn2CTime = cluster.getNamesystem(1).getFSImage().getStorage().getCTime();
+    assertEquals(nn1CTime, nn2CTime);
+  }
+
+  private static void checkClusterPreviousDirExistence(MiniDFSCluster cluster,
+      boolean shouldExist) {
+    for (int i = 0; i < 2; i++) {
+      checkNnPreviousDirExistence(cluster, i, shouldExist);
+    }
+  }
+
+  private static void checkNnPreviousDirExistence(MiniDFSCluster cluster,
+      int index, boolean shouldExist) {
+    Collection<URI> nameDirs = cluster.getNameDirs(index);
+    for (URI nnDir : nameDirs) {
+      checkPreviousDirExistence(new File(nnDir), shouldExist);
+    }
+  }
+
+  private static void checkJnPreviousDirExistence(MiniQJMHACluster jnCluster,
+      boolean shouldExist) throws IOException {
+    for (int i = 0; i < 3; i++) {
+      checkPreviousDirExistence(
+          jnCluster.getJournalCluster().getJournalDir(i, "ns1"), shouldExist);
+    }
+    if (shouldExist) {
+      assertEpochFilesCopied(jnCluster);
+    }
+  }
+
+  private static void assertEpochFilesCopied(MiniQJMHACluster jnCluster)
+      throws IOException {
+    for (int i = 0; i < 3; i++) {
+      File journalDir = jnCluster.getJournalCluster().getJournalDir(i, "ns1");
+      File currDir = new File(journalDir, "current");
+      File prevDir = new File(journalDir, "previous");
+      for (String fileName : new String[]{ Journal.LAST_PROMISED_FILENAME,
+          Journal.LAST_WRITER_EPOCH }) {
+        File prevFile = new File(prevDir, fileName);
+        // Possible the prev file doesn't exist, e.g. if there has never been a
+        // writer before the upgrade.
+        if (prevFile.exists()) {
+          PersistentLongFile prevLongFile = new PersistentLongFile(prevFile, -10);
+          PersistentLongFile currLongFile = new PersistentLongFile(new File(currDir,
+              fileName), -11);
+          assertTrue("Value in " + fileName + " has decreased on upgrade in "
+              + journalDir, prevLongFile.get() <= currLongFile.get());
+        }
+      }
+    }
+  }
+
+  private static void checkPreviousDirExistence(File rootDir,
+      boolean shouldExist) {
+    File previousDir = new File(rootDir, "previous");
+    if (shouldExist) {
+      assertTrue(previousDir + " does not exist", previousDir.exists());
+    } else {
+      assertFalse(previousDir + " does exist", previousDir.exists());
+    }
+  }
+  
+  private void runFinalizeCommand(MiniDFSCluster cluster)
+      throws IOException {
+    HATestUtil.setFailoverConfigurations(cluster, conf);
+    new DFSAdmin(conf).finalizeUpgrade();
+  }
+  
   /**
-   * Make sure that an HA NN refuses to start if given an upgrade-related
-   * startup option.
+   * Ensure that an admin cannot finalize an HA upgrade without at least one NN
+   * being active.
    */
   @Test
-  public void testStartingWithUpgradeOptionsFails() throws IOException {
-    for (StartupOption startOpt : Lists.newArrayList(new StartupOption[] {
-        StartupOption.UPGRADE, StartupOption.FINALIZE,
-        StartupOption.ROLLBACK })) {
-      MiniDFSCluster cluster = null;
+  public void testCannotFinalizeIfNoActive() throws IOException,
+      URISyntaxException {
+    MiniDFSCluster cluster = null;
+    FileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf)
+          .nnTopology(MiniDFSNNTopology.simpleHATopology())
+          .numDataNodes(0)
+          .build();
+
+      File sharedDir = new File(cluster.getSharedEditsDir(0, 1));
+      
+      // No upgrade is in progress at the moment.
+      checkClusterPreviousDirExistence(cluster, false);
+      assertCTimesEqual(cluster);
+      checkPreviousDirExistence(sharedDir, false);
+      
+      // Transition NN0 to active and do some FS ops.
+      cluster.transitionToActive(0);
+      fs = HATestUtil.configureFailoverFs(cluster, conf);
+      assertTrue(fs.mkdirs(new Path("/foo1")));
+      
+      // Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
+      // flag.
+      cluster.shutdownNameNode(1);
+      cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
+      cluster.restartNameNode(0, false);
+      
+      checkNnPreviousDirExistence(cluster, 0, true);
+      checkNnPreviousDirExistence(cluster, 1, false);
+      checkPreviousDirExistence(sharedDir, true);
+      
+      // NN0 should come up in the active state when given the -upgrade option,
+      // so no need to transition it to active.
+      assertTrue(fs.mkdirs(new Path("/foo2")));
+      
+      // Restart NN0 without the -upgrade flag, to make sure that works.
+      cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.REGULAR);
+      cluster.restartNameNode(0, false);
+      
+      // Make sure we can still do FS ops after upgrading.
+      cluster.transitionToActive(0);
+      assertTrue(fs.mkdirs(new Path("/foo3")));
+      
+      // Now bootstrap the standby with the upgraded info.
+      int rc = BootstrapStandby.run(
+          new String[]{"-force"},
+          cluster.getConfiguration(1));
+      assertEquals(0, rc);
+      
+      // Now restart NN1 and make sure that we can do ops against that as well.
+      cluster.restartNameNode(1);
+      cluster.transitionToStandby(0);
+      cluster.transitionToActive(1);
+      assertTrue(fs.mkdirs(new Path("/foo4")));
+      
+      assertCTimesEqual(cluster);
+      
+      // Now there's no active NN.
+      cluster.transitionToStandby(1);
+
       try {
-        cluster = new MiniDFSCluster.Builder(new Configuration())
-            .nnTopology(MiniDFSNNTopology.simpleHATopology())
-            .startupOption(startOpt)
-            .numDataNodes(0)
-            .build();
-        fail("Should not have been able to start an HA NN in upgrade mode");
-      } catch (IllegalArgumentException iae) {
+        runFinalizeCommand(cluster);
+        fail("Should not have been able to finalize upgrade with no NN active");
+      } catch (IOException ioe) {
         GenericTestUtils.assertExceptionContains(
-            "Cannot perform DFS upgrade with HA enabled.", iae);
-        LOG.info("Got expected exception", iae);
-      } finally {
-        if (cluster != null) {
-          cluster.shutdown();
-        }
+            "Cannot finalize with no NameNode active", ioe);
+      }
+    } finally {
+      if (fs != null) {
+        fs.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Make sure that an HA NN with NFS-based HA can successfully start and
+   * upgrade.
+   */
+  @Test
+  public void testNfsUpgrade() throws IOException, URISyntaxException {
+    MiniDFSCluster cluster = null;
+    FileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf)
+          .nnTopology(MiniDFSNNTopology.simpleHATopology())
+          .numDataNodes(0)
+          .build();
+      
+      File sharedDir = new File(cluster.getSharedEditsDir(0, 1));
+      
+      // No upgrade is in progress at the moment.
+      checkClusterPreviousDirExistence(cluster, false);
+      assertCTimesEqual(cluster);
+      checkPreviousDirExistence(sharedDir, false);
+      
+      // Transition NN0 to active and do some FS ops.
+      cluster.transitionToActive(0);
+      fs = HATestUtil.configureFailoverFs(cluster, conf);
+      assertTrue(fs.mkdirs(new Path("/foo1")));
+      
+      // Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
+      // flag.
+      cluster.shutdownNameNode(1);
+      cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
+      cluster.restartNameNode(0, false);
+      
+      checkNnPreviousDirExistence(cluster, 0, true);
+      checkNnPreviousDirExistence(cluster, 1, false);
+      checkPreviousDirExistence(sharedDir, true);
+      
+      // NN0 should come up in the active state when given the -upgrade option,
+      // so no need to transition it to active.
+      assertTrue(fs.mkdirs(new Path("/foo2")));
+      
+      // Restart NN0 without the -upgrade flag, to make sure that works.
+      cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.REGULAR);
+      cluster.restartNameNode(0, false);
+      
+      // Make sure we can still do FS ops after upgrading.
+      cluster.transitionToActive(0);
+      assertTrue(fs.mkdirs(new Path("/foo3")));
+      
+      // Now bootstrap the standby with the upgraded info.
+      int rc = BootstrapStandby.run(
+          new String[]{"-force"},
+          cluster.getConfiguration(1));
+      assertEquals(0, rc);
+      
+      // Now restart NN1 and make sure that we can do ops against that as well.
+      cluster.restartNameNode(1);
+      cluster.transitionToStandby(0);
+      cluster.transitionToActive(1);
+      assertTrue(fs.mkdirs(new Path("/foo4")));
+      
+      assertCTimesEqual(cluster);
+    } finally {
+      if (fs != null) {
+        fs.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Make sure that an HA NN can successfully upgrade when configured using
+   * JournalNodes.
+   */
+  @Test
+  public void testUpgradeWithJournalNodes() throws IOException,
+      URISyntaxException {
+    MiniQJMHACluster qjCluster = null;
+    FileSystem fs = null;
+    try {
+      Builder builder = new MiniQJMHACluster.Builder(conf);
+      builder.getDfsBuilder()
+          .numDataNodes(0);
+      qjCluster = builder.build();
+
+      MiniDFSCluster cluster = qjCluster.getDfsCluster();
+      
+      // No upgrade is in progress at the moment.
+      checkJnPreviousDirExistence(qjCluster, false);
+      checkClusterPreviousDirExistence(cluster, false);
+      assertCTimesEqual(cluster);
+      
+      // Transition NN0 to active and do some FS ops.
+      cluster.transitionToActive(0);
+      fs = HATestUtil.configureFailoverFs(cluster, conf);
+      assertTrue(fs.mkdirs(new Path("/foo1")));
+      
+      // Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
+      // flag.
+      cluster.shutdownNameNode(1);
+      cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
+      cluster.restartNameNode(0, false);
+      
+      checkNnPreviousDirExistence(cluster, 0, true);
+      checkNnPreviousDirExistence(cluster, 1, false);
+      checkJnPreviousDirExistence(qjCluster, true);
+      
+      // NN0 should come up in the active state when given the -upgrade option,
+      // so no need to transition it to active.
+      assertTrue(fs.mkdirs(new Path("/foo2")));
+      
+      // Restart NN0 without the -upgrade flag, to make sure that works.
+      cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.REGULAR);
+      cluster.restartNameNode(0, false);
+      
+      // Make sure we can still do FS ops after upgrading.
+      cluster.transitionToActive(0);
+      assertTrue(fs.mkdirs(new Path("/foo3")));
+      
+      // Now bootstrap the standby with the upgraded info.
+      int rc = BootstrapStandby.run(
+          new String[]{"-force"},
+          cluster.getConfiguration(1));
+      assertEquals(0, rc);
+      
+      // Now restart NN1 and make sure that we can do ops against that as well.
+      cluster.restartNameNode(1);
+      cluster.transitionToStandby(0);
+      cluster.transitionToActive(1);
+      assertTrue(fs.mkdirs(new Path("/foo4")));
+      
+      assertCTimesEqual(cluster);
+    } finally {
+      if (fs != null) {
+        fs.close();
+      }
+      if (qjCluster != null) {
+        qjCluster.shutdown();
+      }
+    }
+  }
+
+  @Test
+  public void testFinalizeWithJournalNodes() throws IOException,
+      URISyntaxException {
+    MiniQJMHACluster qjCluster = null;
+    FileSystem fs = null;
+    try {
+      Builder builder = new MiniQJMHACluster.Builder(conf);
+      builder.getDfsBuilder()
+          .numDataNodes(0);
+      qjCluster = builder.build();
+
+      MiniDFSCluster cluster = qjCluster.getDfsCluster();
+      
+      // No upgrade is in progress at the moment.
+      checkJnPreviousDirExistence(qjCluster, false);
+      checkClusterPreviousDirExistence(cluster, false);
+      assertCTimesEqual(cluster);
+      
+      // Transition NN0 to active and do some FS ops.
+      cluster.transitionToActive(0);
+      fs = HATestUtil.configureFailoverFs(cluster, conf);
+      assertTrue(fs.mkdirs(new Path("/foo1")));
+      
+      // Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
+      // flag.
+      cluster.shutdownNameNode(1);
+      cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
+      cluster.restartNameNode(0, false);
+      
+      assertTrue(fs.mkdirs(new Path("/foo2")));
+      
+      checkNnPreviousDirExistence(cluster, 0, true);
+      checkNnPreviousDirExistence(cluster, 1, false);
+      checkJnPreviousDirExistence(qjCluster, true);
+      
+      // Now bootstrap the standby with the upgraded info.
+      int rc = BootstrapStandby.run(
+          new String[]{"-force"},
+          cluster.getConfiguration(1));
+      assertEquals(0, rc);
+      
+      cluster.restartNameNode(1);
+      
+      runFinalizeCommand(cluster);
+      
+      checkClusterPreviousDirExistence(cluster, false);
+      checkJnPreviousDirExistence(qjCluster, false);
+      assertCTimesEqual(cluster);
+    } finally {
+      if (fs != null) {
+        fs.close();
+      }
+      if (qjCluster != null) {
+        qjCluster.shutdown();
       }
     }
   }
   
   /**
-   * Make sure that an HA NN won't start if a previous upgrade was in progress.
+   * Make sure that even if the NN which initiated the upgrade is in the standby
+   * state that we're allowed to finalize.
    */
   @Test
-  public void testStartingWithUpgradeInProgressFails() throws Exception {
+  public void testFinalizeFromSecondNameNodeWithJournalNodes()
+      throws IOException, URISyntaxException {
+    MiniQJMHACluster qjCluster = null;
+    FileSystem fs = null;
+    try {
+      Builder builder = new MiniQJMHACluster.Builder(conf);
+      builder.getDfsBuilder()
+          .numDataNodes(0);
+      qjCluster = builder.build();
+
+      MiniDFSCluster cluster = qjCluster.getDfsCluster();
+      
+      // No upgrade is in progress at the moment.
+      checkJnPreviousDirExistence(qjCluster, false);
+      checkClusterPreviousDirExistence(cluster, false);
+      assertCTimesEqual(cluster);
+      
+      // Transition NN0 to active and do some FS ops.
+      cluster.transitionToActive(0);
+      fs = HATestUtil.configureFailoverFs(cluster, conf);
+      assertTrue(fs.mkdirs(new Path("/foo1")));
+      
+      // Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
+      // flag.
+      cluster.shutdownNameNode(1);
+      cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
+      cluster.restartNameNode(0, false);
+      
+      checkNnPreviousDirExistence(cluster, 0, true);
+      checkNnPreviousDirExistence(cluster, 1, false);
+      checkJnPreviousDirExistence(qjCluster, true);
+      
+      // Now bootstrap the standby with the upgraded info.
+      int rc = BootstrapStandby.run(
+          new String[]{"-force"},
+          cluster.getConfiguration(1));
+      assertEquals(0, rc);
+      
+      cluster.restartNameNode(1);
+      
+      // Make the second NN (not the one that initiated the upgrade) active when
+      // the finalize command is run.
+      cluster.transitionToStandby(0);
+      cluster.transitionToActive(1);
+      
+      runFinalizeCommand(cluster);
+      
+      checkClusterPreviousDirExistence(cluster, false);
+      checkJnPreviousDirExistence(qjCluster, false);
+      assertCTimesEqual(cluster);
+    } finally {
+      if (fs != null) {
+        fs.close();
+      }
+      if (qjCluster != null) {
+        qjCluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Make sure that an HA NN will start if a previous upgrade was in progress.
+   */
+  @Test
+  public void testStartingWithUpgradeInProgressSucceeds() throws Exception {
     MiniDFSCluster cluster = null;
     try {
-      cluster = new MiniDFSCluster.Builder(new Configuration())
+      cluster = new MiniDFSCluster.Builder(conf)
           .nnTopology(MiniDFSNNTopology.simpleHATopology())
           .numDataNodes(0)
           .build();
-      
+
       // Simulate an upgrade having started.
       for (int i = 0; i < 2; i++) {
         for (URI uri : cluster.getNameDirs(i)) {
@@ -92,18 +509,226 @@
           assertTrue(prevTmp.mkdirs());
         }
       }
-      
+
       cluster.restartNameNodes();
-      fail("Should not have been able to start an HA NN with an in-progress upgrade");
-    } catch (IOException ioe) {
-      GenericTestUtils.assertExceptionContains(
-          "Cannot start an HA namenode with name dirs that need recovery.",
-          ioe);
-      LOG.info("Got expected exception", ioe);
     } finally {
       if (cluster != null) {
         cluster.shutdown();
       }
     }
   }
+
+  /**
+   * Test rollback with NFS shared dir.
+   */
+  @Test
+  public void testRollbackWithNfs() throws Exception {
+    MiniDFSCluster cluster = null;
+    FileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf)
+          .nnTopology(MiniDFSNNTopology.simpleHATopology())
+          .numDataNodes(0)
+          .build();
+
+      File sharedDir = new File(cluster.getSharedEditsDir(0, 1));
+      
+      // No upgrade is in progress at the moment.
+      checkClusterPreviousDirExistence(cluster, false);
+      assertCTimesEqual(cluster);
+      checkPreviousDirExistence(sharedDir, false);
+      
+      // Transition NN0 to active and do some FS ops.
+      cluster.transitionToActive(0);
+      fs = HATestUtil.configureFailoverFs(cluster, conf);
+      assertTrue(fs.mkdirs(new Path("/foo1")));
+      
+      // Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
+      // flag.
+      cluster.shutdownNameNode(1);
+      cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
+      cluster.restartNameNode(0, false);
+      
+      checkNnPreviousDirExistence(cluster, 0, true);
+      checkNnPreviousDirExistence(cluster, 1, false);
+      checkPreviousDirExistence(sharedDir, true);
+      
+      // NN0 should come up in the active state when given the -upgrade option,
+      // so no need to transition it to active.
+      assertTrue(fs.mkdirs(new Path("/foo2")));
+      
+      // Now bootstrap the standby with the upgraded info.
+      int rc = BootstrapStandby.run(
+          new String[]{"-force"},
+          cluster.getConfiguration(1));
+      assertEquals(0, rc);
+      
+      cluster.restartNameNode(1);
+      
+      checkNnPreviousDirExistence(cluster, 0, true);
+      checkNnPreviousDirExistence(cluster, 1, false);
+      checkPreviousDirExistence(sharedDir, true);
+      assertCTimesEqual(cluster);
+      
+      // Now shut down the cluster and do the rollback.
+      Collection<URI> nn1NameDirs = cluster.getNameDirs(0);
+      cluster.shutdown();
+
+      conf.setStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, Joiner.on(",").join(nn1NameDirs));
+      NameNode.doRollback(conf, false);
+
+      // The rollback operation should have rolled back the first NN's local
+      // dirs, and the shared dir, but not the other NN's dirs. Those have to be
+      // done by bootstrapping the standby.
+      checkNnPreviousDirExistence(cluster, 0, false);
+      checkPreviousDirExistence(sharedDir, false);
+    } finally {
+      if (fs != null) {
+        fs.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  @Test
+  public void testRollbackWithJournalNodes() throws IOException,
+      URISyntaxException {
+    MiniQJMHACluster qjCluster = null;
+    FileSystem fs = null;
+    try {
+      Builder builder = new MiniQJMHACluster.Builder(conf);
+      builder.getDfsBuilder()
+          .numDataNodes(0);
+      qjCluster = builder.build();
+
+      MiniDFSCluster cluster = qjCluster.getDfsCluster();
+      
+      // No upgrade is in progress at the moment.
+      checkClusterPreviousDirExistence(cluster, false);
+      assertCTimesEqual(cluster);
+      checkJnPreviousDirExistence(qjCluster, false);
+      
+      // Transition NN0 to active and do some FS ops.
+      cluster.transitionToActive(0);
+      fs = HATestUtil.configureFailoverFs(cluster, conf);
+      assertTrue(fs.mkdirs(new Path("/foo1")));
+      
+      // Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
+      // flag.
+      cluster.shutdownNameNode(1);
+      cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
+      cluster.restartNameNode(0, false);
+      
+      checkNnPreviousDirExistence(cluster, 0, true);
+      checkNnPreviousDirExistence(cluster, 1, false);
+      checkJnPreviousDirExistence(qjCluster, true);
+      
+      // NN0 should come up in the active state when given the -upgrade option,
+      // so no need to transition it to active.
+      assertTrue(fs.mkdirs(new Path("/foo2")));
+      
+      // Now bootstrap the standby with the upgraded info.
+      int rc = BootstrapStandby.run(
+          new String[]{"-force"},
+          cluster.getConfiguration(1));
+      assertEquals(0, rc);
+      
+      cluster.restartNameNode(1);
+      
+      checkNnPreviousDirExistence(cluster, 0, true);
+      checkNnPreviousDirExistence(cluster, 1, false);
+      checkJnPreviousDirExistence(qjCluster, true);
+      assertCTimesEqual(cluster);
+      
+      // Shut down the NNs, but deliberately leave the JNs up and running.
+      Collection<URI> nn1NameDirs = cluster.getNameDirs(0);
+      cluster.shutdown();
+
+      conf.setStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, Joiner.on(",").join(nn1NameDirs));
+      NameNode.doRollback(conf, false);
+
+      // The rollback operation should have rolled back the first NN's local
+      // dirs, and the shared dir, but not the other NN's dirs. Those have to be
+      // done by bootstrapping the standby.
+      checkNnPreviousDirExistence(cluster, 0, false);
+      checkJnPreviousDirExistence(qjCluster, false);
+    } finally {
+      if (fs != null) {
+        fs.close();
+      }
+      if (qjCluster != null) {
+        qjCluster.shutdown();
+      }
+    }
+  }
+  
+  /**
+   * Make sure that starting a second NN with the -upgrade flag fails if the
+   * other NN has already done that.
+   */
+  @Test
+  public void testCannotUpgradeSecondNameNode() throws IOException,
+      URISyntaxException {
+    MiniDFSCluster cluster = null;
+    FileSystem fs = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf)
+      .nnTopology(MiniDFSNNTopology.simpleHATopology())
+      .numDataNodes(0)
+      .build();
+  
+      File sharedDir = new File(cluster.getSharedEditsDir(0, 1));
+      
+      // No upgrade is in progress at the moment.
+      checkClusterPreviousDirExistence(cluster, false);
+      assertCTimesEqual(cluster);
+      checkPreviousDirExistence(sharedDir, false);
+      
+      // Transition NN0 to active and do some FS ops.
+      cluster.transitionToActive(0);
+      fs = HATestUtil.configureFailoverFs(cluster, conf);
+      assertTrue(fs.mkdirs(new Path("/foo1")));
+      
+      // Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
+      // flag.
+      cluster.shutdownNameNode(1);
+      cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
+      cluster.restartNameNode(0, false);
+      
+      checkNnPreviousDirExistence(cluster, 0, true);
+      checkNnPreviousDirExistence(cluster, 1, false);
+      checkPreviousDirExistence(sharedDir, true);
+      
+      // NN0 should come up in the active state when given the -upgrade option,
+      // so no need to transition it to active.
+      assertTrue(fs.mkdirs(new Path("/foo2")));
+      
+      // Restart NN0 without the -upgrade flag, to make sure that works.
+      cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.REGULAR);
+      cluster.restartNameNode(0, false);
+      
+      // Make sure we can still do FS ops after upgrading.
+      cluster.transitionToActive(0);
+      assertTrue(fs.mkdirs(new Path("/foo3")));
+      
+      // Make sure that starting the second NN with the -upgrade flag fails.
+      cluster.getNameNodeInfos()[1].setStartOpt(StartupOption.UPGRADE);
+      try {
+        cluster.restartNameNode(1, false);
+        fail("Should not have been able to start second NN with -upgrade");
+      } catch (IOException ioe) {
+        GenericTestUtils.assertExceptionContains(
+            "It looks like the shared log is already being upgraded", ioe);
+      }
+    } finally {
+      if (fs != null) {
+        fs.close();
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java
index b534c03..272e543 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java
@@ -96,7 +96,7 @@
     } catch (IOException ioe) {
       LOG.info("Got expected exception", ioe);
       GenericTestUtils.assertExceptionContains(
-          "Cannot start an HA namenode with name dirs that need recovery", ioe);
+          "storage directory does not exist or is not accessible", ioe);
     }
     try {
       cluster.restartNameNode(1, false);
@@ -104,7 +104,7 @@
     } catch (IOException ioe) {
       LOG.info("Got expected exception", ioe);
       GenericTestUtils.assertExceptionContains(
-          "Cannot start an HA namenode with name dirs that need recovery", ioe);
+          "storage directory does not exist or is not accessible", ioe);
     }
   }
   
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotFileLength.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotFileLength.java
index cd316ab..817a2de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotFileLength.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotFileLength.java
@@ -17,22 +17,26 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
-import java.util.Random;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
 
 import org.apache.hadoop.fs.FileStatus;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
+
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.util.ToolRunner;
 
 public class TestSnapshotFileLength {
 
@@ -112,4 +116,61 @@
     assertThat(bytesRead, is(BLOCKSIZE));
     fis.close();
   }
+
+  /**
+   * Adding as part of jira HDFS-5343
+   * Test for checking the cat command on snapshot path it
+   *  cannot read a file beyond snapshot file length
+   * @throws Exception
+   */
+  @Test (timeout = 600000)
+  public void testSnapshotFileLengthWithCatCommand() throws Exception {
+
+    FSDataInputStream fis = null;
+    FileStatus fileStatus = null;
+
+    int bytesRead;
+    byte[] buffer = new byte[BLOCKSIZE * 8];
+
+    hdfs.mkdirs(sub);
+    Path file1 = new Path(sub, file1Name);
+    DFSTestUtil.createFile(hdfs, file1, BLOCKSIZE, REPLICATION, SEED);
+
+    hdfs.allowSnapshot(sub);
+    hdfs.createSnapshot(sub, snapshot1);
+
+    DFSTestUtil.appendFile(hdfs, file1, BLOCKSIZE);
+
+    // Make sure we can read the entire file via its non-snapshot path.
+    fileStatus = hdfs.getFileStatus(file1);
+    assertEquals(fileStatus.getLen(), BLOCKSIZE * 2);
+    fis = hdfs.open(file1);
+    bytesRead = fis.read(buffer, 0, buffer.length);
+    assertEquals(bytesRead, BLOCKSIZE * 2);
+    fis.close();
+
+    Path file1snap1 =
+        SnapshotTestHelper.getSnapshotPath(sub, snapshot1, file1Name);
+    fis = hdfs.open(file1snap1);
+    fileStatus = hdfs.getFileStatus(file1snap1);
+    assertEquals(fileStatus.getLen(), BLOCKSIZE);
+    // Make sure we can only read up to the snapshot length.
+    bytesRead = fis.read(buffer, 0, buffer.length);
+    assertEquals(bytesRead, BLOCKSIZE);
+    fis.close();
+
+    PrintStream psBackup = System.out;
+    ByteArrayOutputStream bao = new ByteArrayOutputStream();
+    System.setOut(new PrintStream(bao));
+    System.setErr(new PrintStream(bao));
+    // Make sure we can cat the file upto to snapshot length
+    FsShell shell = new FsShell();
+    try{
+      ToolRunner.run(conf, shell, new String[] { "-cat",
+      "/TestSnapshotFileLength/sub1/.snapshot/snapshot1/file1" });
+      assertEquals(bao.size(), BLOCKSIZE);
+    }finally{
+      System.setOut(psBackup);
+    }
+  }
 }