HDDS-4896. Need a tool to upgrade current non-HA SCM node to single node HA cluster (#1999)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index a46f396..c170b0b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -292,6 +292,20 @@
   public static final String OZONE_SCM_NODE_ID_KEY =
       "ozone.scm.node.id";
 
+  /**
+   * Optional config, if being set will cause scm --init to only take effect on
+   * the specific node and ignore scm --bootstrap cmd.
+   * Similarly, scm --init will be ignored on the non-primordial scm nodes.
+   * With the config set, applications/admins can safely execute init and
+   * bootstrap commands safely on all scm instances, for example kubernetes
+   * deployments.
+   *
+   * If a cluster is upgraded from non-ratis to ratis based SCM, scm --init
+   * needs to re-run for switching from
+   * non-ratis based SCM to ratis-based SCM on the primary node.
+   */
+  public static final String OZONE_SCM_PRIMORDIAL_NODE_ID_KEY =
+      "ozone.scm.primordial.node.id";
   // The path where datanode ID is to be written to.
   // if this value is not set then container startup will fail.
   public static final String OZONE_SCM_DATANODE_ID_DIR =
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
index 192b784..b3fc40e 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
@@ -56,6 +56,16 @@
         ScmConfigKeys.OZONE_SCM_HA_ENABLE_DEFAULT);
   }
 
+  public static String getPrimordialSCM(ConfigurationSource conf) {
+    return conf.get(ScmConfigKeys.OZONE_SCM_PRIMORDIAL_NODE_ID_KEY);
+  }
+
+  public static boolean isPrimordialSCM(ConfigurationSource conf,
+      String selfNodeId) {
+    String primordialNode = getPrimordialSCM(conf);
+    return isSCMHAEnabled(conf) && primordialNode != null && primordialNode
+        .equals(selfNodeId);
+  }
   /**
    * Get a collection of all scmNodeIds for the given scmServiceId.
    */
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index a59a627..0a40d73 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1920,6 +1920,22 @@
     </description>
   </property>
   <property>
+    <name>ozone.scm.primordial.node.id</name>
+    <value></value>
+    <tag>OZONE, SCM, HA</tag>
+    <description>
+      optional config, if being set will cause scm --init to only take effect on
+      the specific node and ignore scm --bootstrap cmd.
+      Similarly, scm --init will be ignored on the non-primordial scm nodes.
+      With the config set, applications/admins can safely execute init and
+      bootstrap commands safely on all scm instances.
+
+      If a cluster is upgraded from non-ratis to ratis based SCM, scm --init
+      needs to re-run for switching from
+      non-ratis based SCM to ratis-based SCM on the primary node.
+    </description>
+  </property>
+  <property>
     <name>ozone.scm.ratis.enable</name>
     <value>false</value>
     <tag>OZONE, SCM, HA, RATIS</tag>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java
index 4ee2ae3..4eea7a2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHADBTransactionBufferImpl.java
@@ -138,6 +138,5 @@
 
   @Override
   public void close() throws IOException {
-    flush();
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index 3df5f7b..4835d03 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -230,7 +230,7 @@
         // Pause the State Machine so that no new transactions can be applied.
         // This action also clears the SCM Double Buffer so that if there
         // are any pending transactions in the buffer, they are discarded.
-
+        getRatisServer().getSCMStateMachine().pause();
       } catch (Exception e) {
         LOG.error("Failed to stop/ pause the services. Cannot proceed with "
             + "installing the new checkpoint.");
@@ -250,42 +250,41 @@
         LOG.error("Failed to install Snapshot from {} as SCM failed to replace"
             + " DB with downloaded checkpoint. Reloading old SCM state.", e);
       }
+      // Reload the DB store with the new checkpoint.
+      // Restart (unpause) the state machine and update its last applied index
+      // to the installed checkpoint's snapshot index.
+      try {
+        reloadSCMState();
+        getRatisServer().getSCMStateMachine().unpause(term, lastAppliedIndex);
+        LOG.info("Reloaded SCM state with Term: {} and Index: {}", term,
+            lastAppliedIndex);
+      } catch (Exception ex) {
+        String errorMsg =
+            "Failed to reload SCM state and instantiate services.";
+        exitManager.exitSystem(1, errorMsg, ex, LOG);
+      }
+
+      // Delete the backup DB
+      try {
+        if (dbBackup != null) {
+          FileUtils.deleteFully(dbBackup);
+        }
+      } catch (Exception e) {
+        LOG.error("Failed to delete the backup of the original DB {}",
+            dbBackup);
+      }
     } else {
       LOG.warn("Cannot proceed with InstallSnapshot as SCM is at TermIndex {} "
           + "and checkpoint has lower TermIndex {}. Reloading old "
           + "state of SCM.", termIndex, checkpointTrxnInfo.getTermIndex());
     }
 
-    // Reload the DB store with the new checkpoint.
-    // Restart (unpause) the state machine and update its last applied index
-    // to the installed checkpoint's snapshot index.
-    try {
-      reloadSCMState();
-      getRatisServer().getSCMStateMachine().unpause(lastAppliedIndex, term);
-      LOG.info("Reloaded SCM state with Term: {} and Index: {}", term,
-          lastAppliedIndex);
-    } catch (Exception ex) {
-      String errorMsg = "Failed to reload SCM state and instantiate services.";
-      exitManager.exitSystem(1, errorMsg, ex, LOG);
-    }
-
-    // Delete the backup DB
-    try {
-      if (dbBackup != null) {
-        FileUtils.deleteFully(dbBackup);
-      }
-    } catch (Exception e) {
-      LOG.error("Failed to delete the backup of the original DB {}", dbBackup);
-    }
-
     if (lastAppliedIndex != checkpointTrxnInfo.getTransactionIndex()) {
       // Install Snapshot failed and old state was reloaded. Return null to
       // Ratis to indicate that installation failed.
       return null;
     }
 
-    // TODO: We should only return the snpashotIndex to the leader.
-    //  Should be fixed after RATIS-586
     TermIndex newTermIndex = TermIndex.valueOf(term, lastAppliedIndex);
     return newTermIndex;
   }
@@ -308,7 +307,7 @@
   public void shutdown() throws IOException {
     if (ratisServer != null) {
       ratisServer.stop();
-      ratisServer.getSCMStateMachine().stop();
+      ratisServer.getSCMStateMachine().close();
       grpcServer.stop();
     }
   }
@@ -334,7 +333,8 @@
     scm.getScmMetadataStore().stop();
   }
 
-  void startServices() throws IOException {
+  @VisibleForTesting
+   public void startServices() throws IOException {
 
    // TODO: Fix the metrics ??
     final SCMMetadataStore metadataStore = scm.getScmMetadataStore();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
index 3e6b57c..593e9c1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
@@ -17,18 +17,14 @@
 
 package org.apache.hadoop.hdds.scm.ha;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
-import java.util.Optional;
+import java.util.Iterator;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -50,7 +46,6 @@
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.SetConfigurationRequest;
 import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.RaftServerConfigKeys;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,15 +90,48 @@
   public static void initialize(String clusterId, String scmId,
       SCMNodeDetails details, OzoneConfiguration conf) throws IOException {
     final RaftGroup group = buildRaftGroup(details, scmId, clusterId);
-    RaftServer server = newRaftServer(scmId, conf)
-        .setGroup(group).build();
-    server.start();
-    // TODO: Timeout and sleep interval should be made configurable
-    waitforLeaderToBeReady(server, 60000, group);
-    server.close();
+    RaftServer server = null;
+    try {
+      server = newRaftServer(scmId, conf).setGroup(group).build();
+      server.start();
+      // TODO: Timeout and sleep interval should be made configurable
+      waitForLeaderToBeReady(server, 60000, group);
+    } finally {
+      if (server != null) {
+        server.close();
+      }
+    }
   }
 
-  private static void waitforLeaderToBeReady(RaftServer server, long timeout,
+  public static void reinitialize(String clusterId, String scmId,
+      SCMNodeDetails details, OzoneConfiguration conf) throws IOException {
+    RaftServer server = null;
+    try {
+      server = newRaftServer(scmId, conf).build();
+      RaftGroup group = null;
+      Iterator<RaftGroup> iter = server.getGroups().iterator();
+      if (iter.hasNext()) {
+        group = iter.next();
+      }
+      if (group != null && group.getGroupId()
+          .equals(buildRaftGroupId(clusterId))) {
+        LOG.info("Ratis group with group Id {} already exists.",
+            group.getGroupId());
+        return;
+      } else {
+        // close the server instance so that pending locks on raft storage
+        // directory gets released if any and further initiliaze can succeed.
+        server.close();
+        initialize(clusterId, scmId, details, conf);
+      }
+    } finally {
+      if (server != null) {
+        server.close();
+      }
+    }
+  }
+
+  private static void waitForLeaderToBeReady(RaftServer server, long timeout,
       RaftGroup group)
       throws IOException {
     boolean ready;
@@ -138,6 +166,7 @@
 
   @Override
   public void start() throws IOException {
+    LOG.info("starting ratis server {}", server.getPeer().getAddress());
     server.start();
   }
 
@@ -187,6 +216,7 @@
 
   @Override
   public void stop() throws IOException {
+    LOG.info("stopping ratis server {}", server.getPeer().getAddress());
     server.close();
   }
 
@@ -241,46 +271,6 @@
     }
   }
 
-  @VisibleForTesting
-  public static void validateRatisGroupExists(OzoneConfiguration conf,
-      String clusterId) throws IOException {
-    final SCMHAConfiguration haConf = conf.getObject(SCMHAConfiguration.class);
-    final RaftProperties properties = RatisUtil.newRaftProperties(haConf, conf);
-    final RaftGroupId raftGroupId = buildRaftGroupId(clusterId);
-    final AtomicBoolean found = new AtomicBoolean(false);
-    RaftServerConfigKeys.storageDir(properties).parallelStream().forEach(
-        (dir) -> Optional.ofNullable(dir.listFiles()).map(Arrays::stream)
-            .orElse(Stream.empty()).filter(File::isDirectory).forEach(sub -> {
-              try {
-                LOG.info("{}: found a subdirectory {}", raftGroupId, sub);
-                RaftGroupId groupId = null;
-                try {
-                  groupId = RaftGroupId.valueOf(UUID.fromString(sub.getName()));
-                } catch (Exception e) {
-                  LOG.info("{}: The directory {} is not a group directory;"
-                      + " ignoring it. ", raftGroupId, sub.getAbsolutePath());
-                }
-                if (groupId != null) {
-                  if (groupId.equals(raftGroupId)) {
-                    LOG.info(
-                        "{} : The directory {} found a group directory for "
-                            + "cluster {}", raftGroupId, sub.getAbsolutePath(),
-                        clusterId);
-                    found.set(true);
-                  }
-                }
-              } catch (Exception e) {
-                LOG.warn(
-                    raftGroupId + ": Failed to find the group directory "
-                        + sub.getAbsolutePath() + ".", e);
-              }
-            }));
-    if (!found.get()) {
-      throw new IOException(
-          "Could not find any ratis group with id " + raftGroupId);
-    }
-  }
-
   private static RaftGroup buildRaftGroup(SCMNodeDetails details,
       String scmId, String clusterId) {
     Preconditions.checkNotNull(scmId);
@@ -299,7 +289,8 @@
     return group;
   }
 
-  private static RaftGroupId buildRaftGroupId(String clusterId) {
+  @VisibleForTesting
+  public static RaftGroupId buildRaftGroupId(String clusterId) {
     Preconditions.checkNotNull(clusterId);
     return RaftGroupId.valueOf(
         UUID.fromString(clusterId.replace(OzoneConsts.CLUSTER_ID_PREFIX, "")));
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index 65569b9..15332d7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -40,9 +40,12 @@
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.TransactionContext;
@@ -111,6 +114,18 @@
         transactionBuffer.getLatestSnapshot();
   }
 
+  /**
+   * Initializes the State Machine with the given server, group and storage.
+   */
+  @Override
+  public void initialize(RaftServer server, RaftGroupId id,
+      RaftStorage raftStorage) throws IOException {
+    getLifeCycle().startAndTransition(() -> {
+      super.initialize(server, id, raftStorage);
+      storage.init(raftStorage);
+    });
+  }
+
   @Override
   public CompletableFuture<Message> applyTransaction(
       final TransactionContext trx) {
@@ -246,7 +261,6 @@
       LOG.info("Current Snapshot Index {}, takeSnapshot took {} ms",
           lastAppliedIndex, Time.monotonicNow() - startTime);
     }
-    super.takeSnapshot();
     return lastAppliedIndex;
   }
 
@@ -277,7 +291,10 @@
     getLifeCycle().transition(LifeCycle.State.PAUSED);
   }
 
-  public void stop() {
+  @Override
+  public void close() throws IOException {
+    super.close();
+    transactionBuffer.close();
     HadoopExecutors.
         shutdown(installSnapshotExecutor, LOG, 5, TimeUnit.SECONDS);
   }
@@ -286,14 +303,13 @@
    * lastAppliedIndex. This should be done after uploading new state to the
    * StateMachine.
    */
-  public void unpause(long newLastAppliedSnaphsotIndex,
-      long newLastAppliedSnapShotTermIndex) {
+  public void unpause(long newLastAppliedSnapShotTerm,
+      long newLastAppliedSnapshotIndex) {
     getLifeCycle().startAndTransition(() -> {
       try {
         transactionBuffer.init();
         this.setLastAppliedTermIndex(TermIndex
-            .valueOf(newLastAppliedSnapShotTermIndex,
-                newLastAppliedSnaphsotIndex));
+            .valueOf(newLastAppliedSnapShotTerm, newLastAppliedSnapshotIndex));
       } catch (IOException ioe) {
         LOG.error("Failed to unpause ", ioe);
       }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 80a7f3c..d7bb0bc 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -743,6 +743,15 @@
     // The node here will try to fetch the cluster id from any of existing
     // running SCM instances.
     SCMHANodeDetails scmhaNodeDetails = SCMHANodeDetails.loadSCMHAConfig(conf);
+    String primordialSCM = SCMHAUtils.getPrimordialSCM(conf);
+    String selfNodeId = scmhaNodeDetails.getLocalNodeDetails().getNodeId();
+    if (primordialSCM != null && SCMHAUtils.isPrimordialSCM(conf, selfNodeId)) {
+      LOG.info(
+          "SCM bootstrap command can only be executed in non-Primordial SCM "
+              + "{}, self id {} "
+              + "Ignoring it.", primordialSCM, selfNodeId);
+      return false;
+    }
     OzoneConfiguration config =
         SCMHAUtils.removeSelfId(conf,
             scmhaNodeDetails.getLocalNodeDetails().getNodeId());
@@ -791,6 +800,16 @@
     SCMStorageConfig scmStorageConfig = new SCMStorageConfig(conf);
     StorageState state = scmStorageConfig.getState();
     final SCMHANodeDetails haDetails = SCMHANodeDetails.loadSCMHAConfig(conf);
+    String primordialSCM = SCMHAUtils.getPrimordialSCM(conf);
+    String selfNodeId = haDetails.getLocalNodeDetails().getNodeId();
+    if (primordialSCM != null && !SCMHAUtils
+        .isPrimordialSCM(conf, selfNodeId)) {
+      LOG.info(
+          "SCM init command can only be executed in Primordial SCM {}, "
+              + "self id {} "
+              + "Ignoring it.", primordialSCM, selfNodeId);
+      return false;
+    }
     if (state != StorageState.INITIALIZED) {
       try {
         if (clusterId != null && !clusterId.isEmpty()) {
@@ -819,7 +838,8 @@
               + ";cid={};layoutVersion={}", scmStorageConfig.getStorageDir(),
           clusterId, scmStorageConfig.getLayoutVersion());
       if (SCMHAUtils.isSCMHAEnabled(conf)) {
-        SCMRatisServerImpl.validateRatisGroupExists(conf, clusterId);
+        SCMRatisServerImpl.reinitialize(clusterId, scmStorageConfig.getScmId(),
+            haDetails.getLocalNodeDetails(), conf);
       }
       return true;
     }
@@ -1098,8 +1118,6 @@
     if (jvmPauseMonitor != null) {
       jvmPauseMonitor.stop();
     }
-    IOUtils.cleanupWithLogger(LOG, containerManager);
-    IOUtils.cleanupWithLogger(LOG, pipelineManager);
 
     try {
       scmHAManager.shutdown();
@@ -1107,6 +1125,9 @@
       LOG.error("SCM HA Manager stop failed", ex);
     }
 
+    IOUtils.cleanupWithLogger(LOG, containerManager);
+    IOUtils.cleanupWithLogger(LOG, pipelineManager);
+
     try {
       scmMetadataStore.stop();
     } catch (Exception ex) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java
index 140b010..0c3fbcc 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMSnapshot.java
@@ -25,7 +25,6 @@
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -37,7 +36,6 @@
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
-import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
 
 public class TestSCMSnapshot {
   private static MiniOzoneCluster cluster;
@@ -82,15 +80,6 @@
             "index 1 {}", snapshotInfo2, snapshotInfo1),
         snapshotInfo2 > snapshotInfo1);
 
-    Table<String, TransactionInfo> trxInfo =
-        scm.getScmMetadataStore().getTransactionInfoTable();
-    TransactionInfo transactionInfo = trxInfo.get(TRANSACTION_INFO_KEY);
-
-    Assert.assertTrue(
-        "DB trx info:" + transactionInfo.getTransactionIndex()
-        + ", latestSnapshotInfo:" + snapshotInfo2,
-        transactionInfo.getTransactionIndex() >= snapshotInfo2);
-
     cluster.restartStorageContainerManager(false);
     TransactionInfo trxInfoAfterRestart =
         scm.getScmHAManager().asSCMHADBTransactionBuffer().getLatestTrxInfo();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index a6bdee5..4cf2049 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -27,6 +27,7 @@
     .HDDS_SCM_SAFEMODE_PIPELINE_CREATION;
 import static org.junit.Assert.fail;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
@@ -49,7 +50,11 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.Arrays;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -75,9 +80,7 @@
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.ha.SCMContext;
-import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
-import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
+import org.apache.hadoop.hdds.scm.ha.*;
 import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
@@ -100,6 +103,9 @@
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -493,7 +499,7 @@
     final UUID clusterId = UUID.randomUUID();
     // This will initialize SCM
     StorageContainerManager.scmInit(conf, clusterId.toString());
-    SCMRatisServerImpl.validateRatisGroupExists(conf, clusterId.toString());
+    validateRatisGroupExists(conf, clusterId.toString());
   }
 
   @Test
@@ -518,6 +524,78 @@
     }
   }
 
+  @VisibleForTesting
+  public static void validateRatisGroupExists(OzoneConfiguration conf,
+      String clusterId) throws IOException {
+    final SCMHAConfiguration haConf = conf.getObject(SCMHAConfiguration.class);
+    final RaftProperties properties = RatisUtil.newRaftProperties(haConf, conf);
+    final RaftGroupId raftGroupId =
+        SCMRatisServerImpl.buildRaftGroupId(clusterId);
+    final AtomicBoolean found = new AtomicBoolean(false);
+    RaftServerConfigKeys.storageDir(properties).parallelStream().forEach(
+        (dir) -> Optional.ofNullable(dir.listFiles()).map(Arrays::stream)
+            .orElse(Stream.empty()).filter(File::isDirectory).forEach(sub -> {
+              try {
+                LOG.info("{}: found a subdirectory {}", raftGroupId, sub);
+                RaftGroupId groupId = null;
+                try {
+                  groupId = RaftGroupId.valueOf(UUID.fromString(sub.getName()));
+                } catch (Exception e) {
+                  LOG.info("{}: The directory {} is not a group directory;"
+                      + " ignoring it. ", raftGroupId, sub.getAbsolutePath());
+                }
+                if (groupId != null) {
+                  if (groupId.equals(raftGroupId)) {
+                    LOG.info(
+                        "{} : The directory {} found a group directory for "
+                            + "cluster {}", raftGroupId, sub.getAbsolutePath(),
+                        clusterId);
+                    found.set(true);
+                  }
+                }
+              } catch (Exception e) {
+                LOG.warn(
+                    raftGroupId + ": Failed to find the group directory "
+                        + sub.getAbsolutePath() + ".", e);
+              }
+            }));
+    if (!found.get()) {
+      throw new IOException(
+          "Could not find any ratis group with id " + raftGroupId);
+    }
+  }
+  @Test
+  public void testSCMReinitializationWithHAEnabled() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, false);
+    final String path = GenericTestUtils.getTempPath(
+        UUID.randomUUID().toString());
+    Path scmPath = Paths.get(path, "scm-meta");
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
+    //This will set the cluster id in the version file
+    MiniOzoneCluster cluster =
+        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
+    cluster.waitForClusterToBeReady();
+    try {
+      final String clusterId =
+          cluster.getStorageContainerManager().getClusterId();
+      // validate there is no ratis group pre existing
+      try {
+        validateRatisGroupExists(conf, clusterId);
+        Assert.fail();
+      } catch (IOException ioe) {
+        // Exception is expected here
+      }
+      conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true);
+      // This will re-initialize SCM
+      StorageContainerManager.scmInit(conf, clusterId);
+      // Ratis group with cluster id exists now
+      validateRatisGroupExists(conf, clusterId);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   @Test
   public void testSCMInitializationFailure()
       throws IOException, AuthenticationException {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java
index c242d61..5754ff7 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java
@@ -18,6 +18,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
@@ -231,6 +232,7 @@
         "logIndex is less than it's lastAppliedIndex", newTermIndex);
     Assert.assertEquals(followerTermIndex,
         followerSM.getLastAppliedTermIndex());
+    Assert.assertFalse(followerSM.getLifeCycleState().isPausingOrPaused());
   }
 
   @Test
@@ -240,9 +242,17 @@
     // Find the inactive SCM
     String followerId = getInactiveSCM(cluster).getScmId();
     StorageContainerManager follower = cluster.getSCM(followerId);
+    //cluster.startInactiveSCM(followerId);
+    follower.start();
+    follower.exitSafeMode();
     // Do some transactions so that the log index increases
     writeToIncreaseLogIndex(leaderSCM, 100);
+    File oldDBLocation =
+        follower.getScmMetadataStore().getStore().getDbLocation();
 
+    SCMStateMachine sm =
+        follower.getScmHAManager().getRatisServer().getSCMStateMachine();
+    TermIndex termIndex = sm.getLastAppliedTermIndex();
     DBCheckpoint leaderDbCheckpoint = leaderSCM.getScmMetadataStore().getStore()
         .getCheckpoint(false);
     Path leaderCheckpointLocation = leaderDbCheckpoint.getCheckpointLocation();
@@ -251,6 +261,16 @@
             new SCMDBDefinition());
 
     Assert.assertNotNull(leaderCheckpointLocation);
+    // Take a backup of the current DB
+    String dbBackupName =
+        "SCM_CHECKPOINT_BACKUP" + termIndex.getIndex() + "_" + System
+            .currentTimeMillis();
+    File dbDir = oldDBLocation.getParentFile();
+    File checkpointBackup = new File(dbDir, dbBackupName);
+
+    // Take a backup of the leader checkpoint
+    Files.copy(leaderCheckpointLocation.toAbsolutePath(),
+        checkpointBackup.toPath());
     // Corrupt the leader checkpoint and install that on the follower. The
     // operation should fail and  should shutdown.
     boolean delete = true;
@@ -278,6 +298,18 @@
 
     Assert.assertTrue(logCapture.getOutput()
         .contains("Failed to reload SCM state and instantiate services."));
+    Assert.assertTrue(sm.getLifeCycleState().isPausingOrPaused());
+
+    // Verify correct reloading
+    HAUtils
+        .replaceDBWithCheckpoint(leaderCheckpointTrxnInfo.getTransactionIndex(),
+            oldDBLocation, checkpointBackup.toPath(),
+            OzoneConsts.SCM_DB_BACKUP_PREFIX);
+    scmhaManager.startServices();
+    sm.unpause(leaderCheckpointTrxnInfo.getTerm(),
+        leaderCheckpointTrxnInfo.getTransactionIndex());
+    Assert.assertTrue(sm.getLastAppliedTermIndex()
+        .equals(leaderCheckpointTrxnInfo.getTermIndex()));
   }
 
   private List<ContainerInfo> writeToIncreaseLogIndex(
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
index abf1e3d..1a60067 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestStorageContainerManagerHA.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.scm;
 
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -43,6 +44,7 @@
 import java.time.Instant;
 import java.util.HashMap;
 import java.util.List;
+import java.util.UUID;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -51,7 +53,6 @@
 import org.junit.rules.Timeout;
 
 import java.io.IOException;
-import java.util.UUID;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
@@ -208,4 +209,23 @@
     }
     return sync;
   }
+
+  @Test
+  public void testPrimordialSCM() throws Exception {
+    StorageContainerManager scm1 = cluster.getStorageContainerManagers().get(0);
+    StorageContainerManager scm2 = cluster.getStorageContainerManagers().get(1);
+    OzoneConfiguration conf1 = scm1.getConfiguration();
+    OzoneConfiguration conf2 = scm2.getConfiguration();
+    conf1.set(ScmConfigKeys.OZONE_SCM_PRIMORDIAL_NODE_ID_KEY,
+        scm1.getSCMNodeId());
+    conf2.set(ScmConfigKeys.OZONE_SCM_PRIMORDIAL_NODE_ID_KEY,
+        scm1.getSCMNodeId());
+    Assert.assertFalse(StorageContainerManager.scmBootstrap(conf1));
+    scm1.getScmHAManager().shutdown();
+    Assert.assertTrue(
+        StorageContainerManager.scmInit(conf1, scm1.getClusterId()));
+    Assert.assertTrue(StorageContainerManager.scmBootstrap(conf2));
+    Assert.assertFalse(
+        StorageContainerManager.scmInit(conf2, scm2.getClusterId()));
+  }
 }