HDDS-5090. make Decommission work under SCM HA. (#2148)

Co-authored-by: Doroszlai, Attila <adoroszlai@apache.org>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index 90ace48..29dcf75 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -224,6 +224,8 @@
     Preconditions.checkArgument(
         deletedBlockLog instanceof DeletedBlockLogImplV2);
     ((DeletedBlockLogImplV2) deletedBlockLog).onBecomeLeader();
+
+    scm.getScmDecommissionManager().onBecomeLeader();
   }
 
   @Override
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
index e56eb84..33c9697 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
@@ -25,6 +25,7 @@
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
 import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -52,6 +53,7 @@
 
   private NodeManager nodeManager;
   //private ContainerManagerV2 containerManager;
+  private SCMContext scmContext;
   private EventPublisher eventQueue;
   private ReplicationManager replicationManager;
   private OzoneConfiguration conf;
@@ -171,11 +173,12 @@
   }
 
   public NodeDecommissionManager(OzoneConfiguration config, NodeManager nm,
-      ContainerManagerV2 containerManager,
+      ContainerManagerV2 containerManager, SCMContext scmContext,
       EventPublisher eventQueue, ReplicationManager rm) {
     this.nodeManager = nm;
     conf = config;
     //this.containerManager = containerManager;
+    this.scmContext = scmContext;
     this.eventQueue = eventQueue;
     this.replicationManager = rm;
 
@@ -248,10 +251,15 @@
    */
   public synchronized void continueAdminForNode(DatanodeDetails dn)
       throws NodeNotFoundException {
+    if (!scmContext.isLeader()) {
+      LOG.info("follower SCM ignored continue admin for datanode {}", dn);
+      return;
+    }
     NodeOperationalState opState = getNodeStatus(dn).getOperationalState();
     if (opState == NodeOperationalState.DECOMMISSIONING
         || opState == NodeOperationalState.ENTERING_MAINTENANCE
         || opState == NodeOperationalState.IN_MAINTENANCE) {
+      LOG.info("Continue admin for datanode {}", dn);
       monitor.startMonitoring(dn);
     }
   }
@@ -375,4 +383,20 @@
     return nodeManager.getNodeStatus(dn);
   }
 
+  /**
+   * Called in SCMStateMachine#notifyLeaderChanged when current SCM becomes
+   *  leader.
+   */
+  public void onBecomeLeader() {
+    nodeManager.getAllNodes().forEach(datanodeDetails -> {
+      try {
+        continueAdminForNode(datanodeDetails);
+      } catch (NodeNotFoundException e) {
+        // Should not happen, as the node has just registered to call this event
+        // handler.
+        LOG.warn("NodeNotFound when adding the node to the decommissionManager",
+            e);
+      }
+    });
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 6c02bdd..73a4209 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -413,13 +413,19 @@
   }
 
   /**
-   * If the operational state or expiry reported in the datanode heartbeat do
-   * not match those store in SCM, queue a command to update the state persisted
-   * on the datanode. Additionally, ensure the datanodeDetails stored in SCM
-   * match those reported in the heartbeat.
-   * This method should only be called when processing the
-   * heartbeat, and for a registered node, the information stored in SCM is the
-   * source of truth.
+   * This method should only be called when processing the heartbeat.
+   *
+   * On leader SCM, for a registered node, the information stored in SCM is
+   * the source of truth. If the operational state or expiry reported in the
+   * datanode heartbeat do not match those store in SCM, queue a command to
+   * update the state persisted on the datanode. Additionally, ensure the
+   * datanodeDetails stored in SCM match those reported in the heartbeat.
+   *
+   * On follower SCM, datanode notifies follower SCM its latest operational
+   * state or expiry via heartbeat. If the operational state or expiry
+   * reported in the datanode heartbeat do not match those stored in SCM,
+   * just update the state in follower SCM accordingly.
+   *
    * @param reportedDn The DatanodeDetails taken from the node heartbeat.
    * @throws NodeNotFoundException
    */
@@ -427,24 +433,36 @@
       throws NodeNotFoundException {
     NodeStatus scmStatus = getNodeStatus(reportedDn);
     if (opStateDiffers(reportedDn, scmStatus)) {
-      LOG.info("Scheduling a command to update the operationalState " +
-          "persisted on {} as the reported value does not " +
-          "match the value stored in SCM ({}, {})",
-          reportedDn,
-          scmStatus.getOperationalState(),
-          scmStatus.getOpStateExpiryEpochSeconds());
-
-      try {
-        SCMCommand<?> command = new SetNodeOperationalStateCommand(
-            Time.monotonicNow(),
+      if (scmContext.isLeader()) {
+        LOG.info("Scheduling a command to update the operationalState " +
+                "persisted on {} as the reported value does not " +
+                "match the value stored in SCM ({}, {})",
+            reportedDn,
             scmStatus.getOperationalState(),
             scmStatus.getOpStateExpiryEpochSeconds());
-        command.setTerm(scmContext.getTermOfLeader());
-        addDatanodeCommand(reportedDn.getUuid(), command);
-      } catch (NotLeaderException nle) {
-        LOG.warn("Skip sending SetNodeOperationalStateCommand,"
-            + " since current SCM is not leader.", nle);
-        return;
+
+        try {
+          SCMCommand<?> command = new SetNodeOperationalStateCommand(
+              Time.monotonicNow(),
+              scmStatus.getOperationalState(),
+              scmStatus.getOpStateExpiryEpochSeconds());
+          command.setTerm(scmContext.getTermOfLeader());
+          addDatanodeCommand(reportedDn.getUuid(), command);
+        } catch (NotLeaderException nle) {
+          LOG.warn("Skip sending SetNodeOperationalStateCommand,"
+              + " since current SCM is not leader.", nle);
+          return;
+        }
+      } else {
+        LOG.info("Update the operationalState saved in follower SCM " +
+                "for {} as the reported value does not " +
+                "match the value stored in SCM ({}, {})",
+            reportedDn,
+            scmStatus.getOperationalState(),
+            scmStatus.getOpStateExpiryEpochSeconds());
+
+        setNodeOperationalState(reportedDn, reportedDn.getPersistedOpState(),
+            reportedDn.getPersistedOpStateExpiryEpochSec());
       }
     }
     DatanodeDetails scmDnd = nodeStateManager.getNode(reportedDn);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 837096b..98a71ce 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -565,7 +565,7 @@
           pipelineManager, eventQueue, serviceManager, scmContext);
     }
     scmDecommissionManager = new NodeDecommissionManager(conf, scmNodeManager,
-        containerManager, eventQueue, replicationManager);
+        containerManager, scmContext, eventQueue, replicationManager);
   }
 
   /**
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
index 847e03e..665c3f7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
@@ -24,8 +24,10 @@
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.DatanodeAdminError;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Before;
@@ -58,8 +60,8 @@
         TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID());
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
     nodeManager = createNodeManager(conf);
-    decom = new NodeDecommissionManager(
-        conf, nodeManager, null, null, null);
+    decom = new NodeDecommissionManager(conf, nodeManager, null,
+        SCMContext.emptyContext(), new EventQueue(), null);
   }
 
   @Test
@@ -250,6 +252,34 @@
         nodeManager.getNodeStatus(dns.get(2)).getOperationalState());
   }
 
+  @Test
+  public void testNodeDecommissionManagerOnBecomeLeader() throws Exception {
+    List<DatanodeDetails> dns = generateDatanodes();
+
+    long maintenanceEnd =
+        (System.currentTimeMillis() / 1000L) + (100 * 60L * 60L);
+
+    // Put 1 node into entering_maintenance, 1 node into decommissioning
+    // and 1 node into in_maintenance.
+    nodeManager.setNodeOperationalState(dns.get(1),
+        HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, maintenanceEnd);
+    nodeManager.setNodeOperationalState(dns.get(2),
+        HddsProtos.NodeOperationalState.DECOMMISSIONING, 0);
+    nodeManager.setNodeOperationalState(dns.get(3),
+        HddsProtos.NodeOperationalState.IN_MAINTENANCE, maintenanceEnd);
+
+    // trackedNodes should be empty now.
+    assertEquals(decom.getMonitor().getTrackedNodes().size(), 0);
+
+    // all nodes with decommissioning, entering_maintenance and in_maintenance
+    // should be added to trackedNodes
+    decom.onBecomeLeader();
+    decom.getMonitor().run();
+
+    // so size of trackedNodes will be 3.
+    assertEquals(decom.getMonitor().getTrackedNodes().size(), 3);
+  }
+
   private SCMNodeManager createNodeManager(OzoneConfiguration config)
       throws IOException, AuthenticationException {
     scm = TestUtils.getScm(config);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index a0bac58..7c85eba 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -279,12 +279,13 @@
   }
 
   /**
-   * Ensure that a change to the operationalState of a node fires a datanode
-   * event of type SetNodeOperationalStateCommand.
+   * For leader SCM, ensure that a change to the operationalState of a node
+   * fires a SCMCommand of type SetNodeOperationalStateCommand.
+   *
+   * For follower SCM, no SetNodeOperationalStateCommand should be fired, yet
+   * operationalState of the node will be updated according to the heartbeat.
    */
   @Test
-  @Ignore // TODO - this test is no longer valid as the heartbeat processing
-          //        now generates the command message.
   public void testSetNodeOpStateAndCommandFired()
       throws IOException, NodeNotFoundException, AuthenticationException {
     final int interval = 100;
@@ -299,11 +300,27 @@
       long expiry = System.currentTimeMillis() / 1000 + 1000;
       nodeManager.setNodeOperationalState(dn,
           HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, expiry);
-      List<SCMCommand> commands = nodeManager.getCommandQueue(dn.getUuid());
+
+      // If found mismatch, leader SCM fires a SetNodeOperationalStateCommand
+      // to update the opState persisted in Datanode.
+      scm.getScmContext().updateLeaderAndTerm(true, 1);
+      List<SCMCommand> commands = nodeManager.processHeartbeat(dn);
 
       Assert.assertTrue(commands.get(0).getClass().equals(
           SetNodeOperationalStateCommand.class));
       assertEquals(1, commands.size());
+
+      // If found mismatch, follower SCM update its own opState according
+      // to the heartbeat, and no SCMCommand will be fired.
+      scm.getScmContext().updateLeaderAndTerm(false, 2);
+      commands = nodeManager.processHeartbeat(dn);
+
+      assertEquals(0, commands.size());
+
+      NodeStatus scmStatus = nodeManager.getNodeStatus(dn);
+      assertTrue(scmStatus.getOperationalState() == dn.getPersistedOpState()
+          && scmStatus.getOpStateExpiryEpochSeconds()
+          == dn.getPersistedOpStateExpiryEpochSec());
     }
   }