HDDS-5090. make Decommission work under SCM HA. (#2148)
Co-authored-by: Doroszlai, Attila <adoroszlai@apache.org>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index 90ace48..29dcf75 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -224,6 +224,8 @@
Preconditions.checkArgument(
deletedBlockLog instanceof DeletedBlockLogImplV2);
((DeletedBlockLogImplV2) deletedBlockLog).onBecomeLeader();
+
+ scm.getScmDecommissionManager().onBecomeLeader();
}
@Override
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
index e56eb84..33c9697 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -52,6 +53,7 @@
private NodeManager nodeManager;
//private ContainerManagerV2 containerManager;
+ private SCMContext scmContext;
private EventPublisher eventQueue;
private ReplicationManager replicationManager;
private OzoneConfiguration conf;
@@ -171,11 +173,12 @@
}
public NodeDecommissionManager(OzoneConfiguration config, NodeManager nm,
- ContainerManagerV2 containerManager,
+ ContainerManagerV2 containerManager, SCMContext scmContext,
EventPublisher eventQueue, ReplicationManager rm) {
this.nodeManager = nm;
conf = config;
//this.containerManager = containerManager;
+ this.scmContext = scmContext;
this.eventQueue = eventQueue;
this.replicationManager = rm;
@@ -248,10 +251,15 @@
*/
public synchronized void continueAdminForNode(DatanodeDetails dn)
throws NodeNotFoundException {
+ if (!scmContext.isLeader()) {
+ LOG.info("follower SCM ignored continue admin for datanode {}", dn);
+ return;
+ }
NodeOperationalState opState = getNodeStatus(dn).getOperationalState();
if (opState == NodeOperationalState.DECOMMISSIONING
|| opState == NodeOperationalState.ENTERING_MAINTENANCE
|| opState == NodeOperationalState.IN_MAINTENANCE) {
+ LOG.info("Continue admin for datanode {}", dn);
monitor.startMonitoring(dn);
}
}
@@ -375,4 +383,20 @@
return nodeManager.getNodeStatus(dn);
}
+ /**
+ * Called in SCMStateMachine#notifyLeaderChanged when current SCM becomes
+ * leader.
+ */
+ public void onBecomeLeader() {
+ nodeManager.getAllNodes().forEach(datanodeDetails -> {
+ try {
+ continueAdminForNode(datanodeDetails);
+ } catch (NodeNotFoundException e) {
+ // Should not happen, as the node has just registered to call this event
+ // handler.
+ LOG.warn("NodeNotFound when adding the node to the decommissionManager",
+ e);
+ }
+ });
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 6c02bdd..73a4209 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -413,13 +413,19 @@
}
/**
- * If the operational state or expiry reported in the datanode heartbeat do
- * not match those store in SCM, queue a command to update the state persisted
- * on the datanode. Additionally, ensure the datanodeDetails stored in SCM
- * match those reported in the heartbeat.
- * This method should only be called when processing the
- * heartbeat, and for a registered node, the information stored in SCM is the
- * source of truth.
+ * This method should only be called when processing the heartbeat.
+ *
+ * On leader SCM, for a registered node, the information stored in SCM is
+ * the source of truth. If the operational state or expiry reported in the
+ * datanode heartbeat do not match those store in SCM, queue a command to
+ * update the state persisted on the datanode. Additionally, ensure the
+ * datanodeDetails stored in SCM match those reported in the heartbeat.
+ *
+ * On follower SCM, datanode notifies follower SCM its latest operational
+ * state or expiry via heartbeat. If the operational state or expiry
+ * reported in the datanode heartbeat do not match those stored in SCM,
+ * just update the state in follower SCM accordingly.
+ *
* @param reportedDn The DatanodeDetails taken from the node heartbeat.
* @throws NodeNotFoundException
*/
@@ -427,24 +433,36 @@
throws NodeNotFoundException {
NodeStatus scmStatus = getNodeStatus(reportedDn);
if (opStateDiffers(reportedDn, scmStatus)) {
- LOG.info("Scheduling a command to update the operationalState " +
- "persisted on {} as the reported value does not " +
- "match the value stored in SCM ({}, {})",
- reportedDn,
- scmStatus.getOperationalState(),
- scmStatus.getOpStateExpiryEpochSeconds());
-
- try {
- SCMCommand<?> command = new SetNodeOperationalStateCommand(
- Time.monotonicNow(),
+ if (scmContext.isLeader()) {
+ LOG.info("Scheduling a command to update the operationalState " +
+ "persisted on {} as the reported value does not " +
+ "match the value stored in SCM ({}, {})",
+ reportedDn,
scmStatus.getOperationalState(),
scmStatus.getOpStateExpiryEpochSeconds());
- command.setTerm(scmContext.getTermOfLeader());
- addDatanodeCommand(reportedDn.getUuid(), command);
- } catch (NotLeaderException nle) {
- LOG.warn("Skip sending SetNodeOperationalStateCommand,"
- + " since current SCM is not leader.", nle);
- return;
+
+ try {
+ SCMCommand<?> command = new SetNodeOperationalStateCommand(
+ Time.monotonicNow(),
+ scmStatus.getOperationalState(),
+ scmStatus.getOpStateExpiryEpochSeconds());
+ command.setTerm(scmContext.getTermOfLeader());
+ addDatanodeCommand(reportedDn.getUuid(), command);
+ } catch (NotLeaderException nle) {
+ LOG.warn("Skip sending SetNodeOperationalStateCommand,"
+ + " since current SCM is not leader.", nle);
+ return;
+ }
+ } else {
+ LOG.info("Update the operationalState saved in follower SCM " +
+ "for {} as the reported value does not " +
+ "match the value stored in SCM ({}, {})",
+ reportedDn,
+ scmStatus.getOperationalState(),
+ scmStatus.getOpStateExpiryEpochSeconds());
+
+ setNodeOperationalState(reportedDn, reportedDn.getPersistedOpState(),
+ reportedDn.getPersistedOpStateExpiryEpochSec());
}
}
DatanodeDetails scmDnd = nodeStateManager.getNode(reportedDn);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 837096b..98a71ce 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -565,7 +565,7 @@
pipelineManager, eventQueue, serviceManager, scmContext);
}
scmDecommissionManager = new NodeDecommissionManager(conf, scmNodeManager,
- containerManager, eventQueue, replicationManager);
+ containerManager, scmContext, eventQueue, replicationManager);
}
/**
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
index 847e03e..665c3f7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeDecommissionManager.java
@@ -24,8 +24,10 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.DatanodeAdminError;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
@@ -58,8 +60,8 @@
TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID());
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
nodeManager = createNodeManager(conf);
- decom = new NodeDecommissionManager(
- conf, nodeManager, null, null, null);
+ decom = new NodeDecommissionManager(conf, nodeManager, null,
+ SCMContext.emptyContext(), new EventQueue(), null);
}
@Test
@@ -250,6 +252,34 @@
nodeManager.getNodeStatus(dns.get(2)).getOperationalState());
}
+ @Test
+ public void testNodeDecommissionManagerOnBecomeLeader() throws Exception {
+ List<DatanodeDetails> dns = generateDatanodes();
+
+ long maintenanceEnd =
+ (System.currentTimeMillis() / 1000L) + (100 * 60L * 60L);
+
+ // Put 1 node into entering_maintenance, 1 node into decommissioning
+ // and 1 node into in_maintenance.
+ nodeManager.setNodeOperationalState(dns.get(1),
+ HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, maintenanceEnd);
+ nodeManager.setNodeOperationalState(dns.get(2),
+ HddsProtos.NodeOperationalState.DECOMMISSIONING, 0);
+ nodeManager.setNodeOperationalState(dns.get(3),
+ HddsProtos.NodeOperationalState.IN_MAINTENANCE, maintenanceEnd);
+
+ // trackedNodes should be empty now.
+ assertEquals(decom.getMonitor().getTrackedNodes().size(), 0);
+
+ // all nodes with decommissioning, entering_maintenance and in_maintenance
+ // should be added to trackedNodes
+ decom.onBecomeLeader();
+ decom.getMonitor().run();
+
+ // so size of trackedNodes will be 3.
+ assertEquals(decom.getMonitor().getTrackedNodes().size(), 3);
+ }
+
private SCMNodeManager createNodeManager(OzoneConfiguration config)
throws IOException, AuthenticationException {
scm = TestUtils.getScm(config);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index a0bac58..7c85eba 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -279,12 +279,13 @@
}
/**
- * Ensure that a change to the operationalState of a node fires a datanode
- * event of type SetNodeOperationalStateCommand.
+ * For leader SCM, ensure that a change to the operationalState of a node
+ * fires a SCMCommand of type SetNodeOperationalStateCommand.
+ *
+ * For follower SCM, no SetNodeOperationalStateCommand should be fired, yet
+ * operationalState of the node will be updated according to the heartbeat.
*/
@Test
- @Ignore // TODO - this test is no longer valid as the heartbeat processing
- // now generates the command message.
public void testSetNodeOpStateAndCommandFired()
throws IOException, NodeNotFoundException, AuthenticationException {
final int interval = 100;
@@ -299,11 +300,27 @@
long expiry = System.currentTimeMillis() / 1000 + 1000;
nodeManager.setNodeOperationalState(dn,
HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE, expiry);
- List<SCMCommand> commands = nodeManager.getCommandQueue(dn.getUuid());
+
+ // If found mismatch, leader SCM fires a SetNodeOperationalStateCommand
+ // to update the opState persisted in Datanode.
+ scm.getScmContext().updateLeaderAndTerm(true, 1);
+ List<SCMCommand> commands = nodeManager.processHeartbeat(dn);
Assert.assertTrue(commands.get(0).getClass().equals(
SetNodeOperationalStateCommand.class));
assertEquals(1, commands.size());
+
+ // If found mismatch, follower SCM update its own opState according
+ // to the heartbeat, and no SCMCommand will be fired.
+ scm.getScmContext().updateLeaderAndTerm(false, 2);
+ commands = nodeManager.processHeartbeat(dn);
+
+ assertEquals(0, commands.size());
+
+ NodeStatus scmStatus = nodeManager.getNodeStatus(dn);
+ assertTrue(scmStatus.getOperationalState() == dn.getPersistedOpState()
+ && scmStatus.getOpStateExpiryEpochSeconds()
+ == dn.getPersistedOpStateExpiryEpochSec());
}
}