HDFS-6041. Downgrade/Finalize should rename the rollback image instead of purging it. Contributed by Jing Zhao.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1573851 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt
index 1bc6d03..7d7d87c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt
@@ -137,3 +137,5 @@
     HDFS-6042. Fix rolling upgrade documentation and error messages. (szetszwo
     via Arpit Agarwal)
 
+    HDFS-6041. Downgrade/Finalize should rename the rollback image instead of
+    purging it. (jing9)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index be8960e..205c1a4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -736,7 +736,8 @@
     case OP_ROLLING_UPGRADE_FINALIZE: {
       final long finalizeTime = ((RollingUpgradeOp) op).getTime();
       fsNamesys.finalizeRollingUpgradeInternal(finalizeTime);
-      fsNamesys.getFSImage().purgeCheckpoints(NameNodeFile.IMAGE_ROLLBACK);
+      fsNamesys.getFSImage().renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK,
+          NameNodeFile.IMAGE);
       break;
     }
     case OP_ADD_CACHE_DIRECTIVE: {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index 85cc264..a70e303 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -659,8 +659,8 @@
       needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
           txnsAdvanced);
       if (RollingUpgradeStartupOption.DOWNGRADE.matches(startOpt)) {
-        // purge rollback image if it is downgrade
-        archivalManager.purgeCheckpoints(NameNodeFile.IMAGE_ROLLBACK);
+        // rename rollback image if it is downgrade
+        renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK, NameNodeFile.IMAGE);
       }
     } else {
       // Trigger the rollback for rolling upgrade. Here lastAppliedTxId equals
@@ -1097,18 +1097,7 @@
   }
 
   /**
-   * Purge all the checkpoints with the name style.
-   */
-  void purgeCheckpoints(NameNodeFile nnf) {
-    try {
-      archivalManager.purgeCheckpoints(nnf);
-    } catch (Exception e) {
-      LOG.warn("Unable to purge checkpoints with name " + nnf.getName(), e);
-    }
-  }
-
-  /**
-   * Rename FSImage
+   * Rename FSImage with the specific txid
    */
   private void renameCheckpoint(long txid, NameNodeFile fromNnf,
       NameNodeFile toNnf, boolean renameMD5) throws IOException {
@@ -1127,7 +1116,33 @@
     }
     if(al != null) storage.reportErrorsOnDirectories(al);
   }
-  
+
+  /**
+   * Rename all the fsimage files with the specific NameNodeFile type. The
+   * associated checksum files will also be renamed.
+   */
+  void renameCheckpoint(NameNodeFile fromNnf, NameNodeFile toNnf)
+      throws IOException {
+    ArrayList<StorageDirectory> al = null;
+    FSImageTransactionalStorageInspector inspector =
+        new FSImageTransactionalStorageInspector(EnumSet.of(fromNnf));
+    storage.inspectStorageDirs(inspector);
+    for (FSImageFile image : inspector.getFoundImages()) {
+      try {
+        renameImageFileInDir(image.sd, fromNnf, toNnf, image.txId, true);
+      } catch (IOException ioe) {
+        LOG.warn("Unable to rename checkpoint in " + image.sd, ioe);
+        if (al == null) {
+          al = Lists.newArrayList();
+        }
+        al.add(image.sd);
+      }
+    }
+    if(al != null) {
+      storage.reportErrorsOnDirectories(al);
+    }
+  }
+
   /**
    * Deletes the checkpoint file in every storage directory,
    * since the checkpoint was cancelled.
@@ -1149,8 +1164,7 @@
       NameNodeFile toNnf, long txid, boolean renameMD5) throws IOException {
     final File fromFile = NNStorage.getStorageFile(sd, fromNnf, txid);
     final File toFile = NNStorage.getStorageFile(sd, toNnf, txid);
-    // renameTo fails on Windows if the destination file 
-    // already exists.
+    // renameTo fails on Windows if the destination file already exists.
     if(LOG.isDebugEnabled()) {
       LOG.debug("renaming  " + fromFile.getAbsolutePath() 
                 + " to " + toFile.getAbsolutePath());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 31e2cd6..b3af0e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -7287,7 +7287,8 @@
       returnInfo = finalizeRollingUpgradeInternal(now());
       getEditLog().logFinalizeRollingUpgrade(returnInfo.getFinalizeTime());
       getFSImage().saveNamespace(this);
-      getFSImage().purgeCheckpoints(NameNodeFile.IMAGE_ROLLBACK);
+      getFSImage().renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK,
+          NameNodeFile.IMAGE);
     } finally {
       writeUnlock();
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgradeDowngrade.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgradeDowngrade.java
index 545b8e1..22efd6e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgradeDowngrade.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgradeDowngrade.java
@@ -37,7 +37,7 @@
 public class TestRollingUpgradeDowngrade {
 
   @Test(timeout = 300000)
-  public void testDowngrade() throws IOException {
+  public void testDowngrade() throws Exception {
     final Configuration conf = new HdfsConfiguration();
     MiniQJMHACluster cluster = null;
     final Path foo = new Path("/foo");
@@ -48,6 +48,11 @@
       MiniDFSCluster dfsCluster = cluster.getDfsCluster();
       dfsCluster.waitActive();
 
+      // let NN1 tail editlog every 1s
+      dfsCluster.getConfiguration(1).setInt(
+          DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
+      dfsCluster.restartNameNode(1);
+
       dfsCluster.transitionToActive(0);
       DistributedFileSystem dfs = dfsCluster.getFileSystem(0);
       dfs.mkdirs(foo);
@@ -57,9 +62,14 @@
           .rollingUpgrade(RollingUpgradeAction.PREPARE);
       Assert.assertTrue(info.isStarted());
       dfs.mkdirs(bar);
+
+      TestRollingUpgrade.queryForPreparation(dfs);
       dfs.close();
 
       dfsCluster.restartNameNode(0, true, "-rollingUpgrade", "downgrade");
+      // Once downgraded, there should be no more fsimage for rollbacks.
+      Assert.assertFalse(dfsCluster.getNamesystem(0).getFSImage()
+          .hasRollbackFSImage());
       // shutdown NN1
       dfsCluster.shutdownNameNode(1);
       dfsCluster.transitionToActive(0);