HDDS-4922. refactor code in SCMStateMachine. (#2007)

diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/TransactionInfo.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/TransactionInfo.java
index c75f4e5..ec4c0e1 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/TransactionInfo.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/TransactionInfo.java
@@ -59,7 +59,7 @@
     this.transactionIndex = transactionIndex;
   }
 
-  public boolean isInitialized() {
+  public boolean isDefault() {
     return transactionIndex == -1 && term == 0;
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
index 593e9c1..39d9071 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
@@ -161,7 +161,7 @@
         RatisUtil.newRaftProperties(haConf, conf);
     return RaftServer.newBuilder().setServerId(RaftPeerId.getRaftPeerId(scmId))
         .setProperties(serverProperties)
-        .setStateMachine(new SCMStateMachine(false));
+        .setStateMachine(new SCMStateMachine());
   }
 
   @Override
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index 15332d7..a1047b4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -27,7 +27,6 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.InvalidProtocolBufferException;
@@ -61,47 +60,46 @@
 import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.SCM_NOT_INITIALIZED;
 
 /**
- * TODO.
+ * The SCMStateMachine is the state machine for SCMRatisServer. It is
+ * responsible for applying ratis committed transactions to
+ * {@link StorageContainerManager}.
  */
 public class SCMStateMachine extends BaseStateMachine {
   private static final Logger LOG =
       LoggerFactory.getLogger(SCMStateMachine.class);
 
   private StorageContainerManager scm;
-  private SCMRatisServer ratisServer;
   private Map<RequestType, Object> handlers;
   private SCMHADBTransactionBuffer transactionBuffer;
   private final SimpleStateMachineStorage storage =
       new SimpleStateMachineStorage();
-  private final AtomicBoolean isInitialized;
+  private final boolean isInitialized;
   private ExecutorService installSnapshotExecutor;
 
   public SCMStateMachine(final StorageContainerManager scm,
       final SCMRatisServer ratisServer, SCMHADBTransactionBuffer buffer)
       throws SCMException {
     this.scm = scm;
-    this.ratisServer = ratisServer;
     this.handlers = new EnumMap<>(RequestType.class);
     this.transactionBuffer = buffer;
-    TransactionInfo latestTrxInfo =
-        this.transactionBuffer.getLatestTrxInfo();
-    if (!latestTrxInfo.isInitialized()) {
-      if (!updateLastAppliedTermIndex(latestTrxInfo.getTerm(),
-          latestTrxInfo.getTransactionIndex())) {
-        throw new SCMException(
-            String.format("Failed to update LastAppliedTermIndex " +
-                    "in StateMachine to term:{} index:{}",
-                latestTrxInfo.getTerm(), latestTrxInfo.getTransactionIndex()
-            ), SCM_NOT_INITIALIZED);
-      }
+    TransactionInfo latestTrxInfo = this.transactionBuffer.getLatestTrxInfo();
+    if (!latestTrxInfo.isDefault() &&
+        !updateLastAppliedTermIndex(latestTrxInfo.getTerm(),
+            latestTrxInfo.getTransactionIndex())) {
+      throw new SCMException(
+          String.format("Failed to update LastAppliedTermIndex " +
+                  "in StateMachine to term:{} index:{}",
+              latestTrxInfo.getTerm(), latestTrxInfo.getTransactionIndex()
+          ), SCM_NOT_INITIALIZED);
     }
     this.installSnapshotExecutor = HadoopExecutors.newSingleThreadExecutor();
-    isInitialized = new AtomicBoolean(true);
+    isInitialized = true;
   }
 
-  public SCMStateMachine(boolean init) {
-    isInitialized = new AtomicBoolean(init);
+  public SCMStateMachine() {
+    isInitialized = false;
   }
+
   public void registerHandler(RequestType type, Object handler) {
     handlers.put(type, handler);
   }
@@ -109,9 +107,8 @@
   @Override
   public SnapshotInfo getLatestSnapshot() {
     // Transaction buffer will be null during scm initlialization phase
-    return transactionBuffer == null ?
-        null :
-        transactionBuffer.getLatestSnapshot();
+    return transactionBuffer == null
+        ? null : transactionBuffer.getLatestSnapshot();
   }
 
   /**
@@ -173,11 +170,11 @@
 
   @Override
   public void notifyNotLeader(Collection<TransactionContext> pendingEntries) {
-    LOG.info("current leader SCM steps down.");
-
-    if (!isInitialized.get()) {
+    if (!isInitialized) {
       return;
     }
+    LOG.info("current leader SCM steps down.");
+
     scm.getScmContext().updateLeaderAndTerm(false, 0);
     scm.getSCMServiceManager().notifyStatusChanged();
   }
@@ -208,12 +205,11 @@
   @Override
   public void notifyLeaderChanged(RaftGroupMemberId groupMemberId,
                                   RaftPeerId newLeaderId) {
-    if (!groupMemberId.getPeerId().equals(newLeaderId)) {
-      LOG.info("leader changed, yet current SCM is still follower.");
+    if (!isInitialized) {
       return;
     }
-
-    if (!isInitialized.get()) {
+    if (!groupMemberId.getPeerId().equals(newLeaderId)) {
+      LOG.info("leader changed, yet current SCM is still follower.");
       return;
     }
 
@@ -233,34 +229,35 @@
         .getDeletedBlockLog();
     Preconditions.checkArgument(
         deletedBlockLog instanceof DeletedBlockLogImplV2);
-    ((DeletedBlockLogImplV2) deletedBlockLog)
-          .onBecomeLeader();
+    ((DeletedBlockLogImplV2) deletedBlockLog).onBecomeLeader();
   }
 
   @Override
   public long takeSnapshot() throws IOException {
-    long startTime = Time.monotonicNow();
     TermIndex lastTermIndex = getLastAppliedTermIndex();
     long lastAppliedIndex = lastTermIndex.getIndex();
-    if (isInitialized.get()) {
-      TransactionInfo lastAppliedTrxInfo =
-          TransactionInfo.fromTermIndex(lastTermIndex);
-      if (transactionBuffer.getLatestTrxInfo().compareTo(lastAppliedTrxInfo)
-          < 0) {
-        transactionBuffer.updateLatestTrxInfo(
-            TransactionInfo.builder().setCurrentTerm(lastTermIndex.getTerm())
-                .setTransactionIndex(lastTermIndex.getIndex()).build());
-        transactionBuffer.setLatestSnapshot(
-            transactionBuffer.getLatestTrxInfo().toSnapshotInfo());
-      } else {
-        lastAppliedIndex =
-            transactionBuffer.getLatestTrxInfo().getTransactionIndex();
-      }
 
-      transactionBuffer.flush();
-      LOG.info("Current Snapshot Index {}, takeSnapshot took {} ms",
-          lastAppliedIndex, Time.monotonicNow() - startTime);
+    if (!isInitialized) {
+      return lastAppliedIndex;
     }
+
+    long startTime = Time.monotonicNow();
+
+    TransactionInfo latestTrxInfo = transactionBuffer.getLatestTrxInfo();
+    TransactionInfo lastAppliedTrxInfo =
+        TransactionInfo.fromTermIndex(lastTermIndex);
+
+    if (latestTrxInfo.compareTo(lastAppliedTrxInfo) < 0) {
+      transactionBuffer.updateLatestTrxInfo(lastAppliedTrxInfo);
+      transactionBuffer.setLatestSnapshot(lastAppliedTrxInfo.toSnapshotInfo());
+    } else {
+      lastAppliedIndex = latestTrxInfo.getTransactionIndex();
+    }
+
+    transactionBuffer.flush();
+
+    LOG.info("Current Snapshot Index {}, takeSnapshot took {} ms",
+        lastAppliedIndex, Time.monotonicNow() - startTime);
     return lastAppliedIndex;
   }
 
@@ -279,7 +276,6 @@
     updateLastAppliedTermIndex(term, index);
   }
 
-
   @Override
   public void notifyConfigurationChanged(long term, long index,
       RaftProtos.RaftConfigurationProto newRaftConfiguration) {
@@ -293,6 +289,9 @@
 
   @Override
   public void close() throws IOException {
+    if (!isInitialized) {
+      return;
+    }
     super.close();
     transactionBuffer.close();
     HadoopExecutors.