HDDS-6944. EC: Handle reconstructECContainersCommand in heartbeat (#3548)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index d0bc18c..45838b5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -55,9 +55,11 @@
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.FinalizeNewLayoutVersionCommand;
+import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
import org.apache.hadoop.ozone.protocol.commands.RefreshVolumeUsageCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.commands.SetNodeOperationalStateCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -309,141 +311,109 @@
for (SCMCommandProto commandResponseProto : response.getCommandsList()) {
switch (commandResponseProto.getCommandType()) {
case reregisterCommand:
- if (rpcEndpoint.getState() == EndPointStates.HEARTBEAT) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received SCM notification to register."
- + " Interrupt HEARTBEAT and transit to REGISTER state.");
- }
- rpcEndpoint.setState(EndPointStates.REGISTER);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Illegal state {} found, expecting {}.",
- rpcEndpoint.getState().name(), EndPointStates.HEARTBEAT);
- }
- }
+ processReregisterCommand();
break;
case deleteBlocksCommand:
DeleteBlocksCommand deleteBlocksCommand = DeleteBlocksCommand
.getFromProtobuf(
commandResponseProto.getDeleteBlocksCommandProto());
- if (commandResponseProto.hasTerm()) {
- deleteBlocksCommand.setTerm(commandResponseProto.getTerm());
- }
if (!deleteBlocksCommand.blocksTobeDeleted().isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug(DeletedContainerBlocksSummary
.getFrom(deleteBlocksCommand.blocksTobeDeleted())
.toString());
}
- this.context.addCommand(deleteBlocksCommand);
+ processCommonCommand(commandResponseProto, deleteBlocksCommand);
}
break;
case closeContainerCommand:
CloseContainerCommand closeContainer =
CloseContainerCommand.getFromProtobuf(
commandResponseProto.getCloseContainerCommandProto());
- if (commandResponseProto.hasTerm()) {
- closeContainer.setTerm(commandResponseProto.getTerm());
- }
- if (commandResponseProto.hasEncodedToken()) {
- closeContainer.setEncodedToken(
- commandResponseProto.getEncodedToken());
- }
if (LOG.isDebugEnabled()) {
LOG.debug("Received SCM container close request for container {}",
closeContainer.getContainerID());
}
- this.context.addCommand(closeContainer);
+ processCommonCommand(commandResponseProto, closeContainer);
break;
case replicateContainerCommand:
ReplicateContainerCommand replicateContainerCommand =
ReplicateContainerCommand.getFromProtobuf(
commandResponseProto.getReplicateContainerCommandProto());
- if (commandResponseProto.hasTerm()) {
- replicateContainerCommand.setTerm(commandResponseProto.getTerm());
- }
if (LOG.isDebugEnabled()) {
LOG.debug("Received SCM container replicate request for container {}",
replicateContainerCommand.getContainerID());
}
- this.context.addCommand(replicateContainerCommand);
+ processCommonCommand(commandResponseProto, replicateContainerCommand);
+ break;
+ case reconstructECContainersCommand:
+ ReconstructECContainersCommand reccc =
+ ReconstructECContainersCommand.getFromProtobuf(
+ commandResponseProto.getReconstructECContainersCommandProto());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received SCM reconstruct request for container {}",
+ reccc.getContainerID());
+ }
+ processCommonCommand(commandResponseProto, reccc);
break;
case deleteContainerCommand:
DeleteContainerCommand deleteContainerCommand =
DeleteContainerCommand.getFromProtobuf(
commandResponseProto.getDeleteContainerCommandProto());
- if (commandResponseProto.hasTerm()) {
- deleteContainerCommand.setTerm(commandResponseProto.getTerm());
- }
if (LOG.isDebugEnabled()) {
LOG.debug("Received SCM delete container request for container {}",
deleteContainerCommand.getContainerID());
}
- this.context.addCommand(deleteContainerCommand);
+ processCommonCommand(commandResponseProto, deleteContainerCommand);
break;
case createPipelineCommand:
CreatePipelineCommand createPipelineCommand =
CreatePipelineCommand.getFromProtobuf(
commandResponseProto.getCreatePipelineCommandProto());
- if (commandResponseProto.hasTerm()) {
- createPipelineCommand.setTerm(commandResponseProto.getTerm());
- }
if (LOG.isDebugEnabled()) {
LOG.debug("Received SCM create pipeline request {}",
createPipelineCommand.getPipelineID());
}
- this.context.addCommand(createPipelineCommand);
+ processCommonCommand(commandResponseProto, createPipelineCommand);
break;
case closePipelineCommand:
ClosePipelineCommand closePipelineCommand =
ClosePipelineCommand.getFromProtobuf(
commandResponseProto.getClosePipelineCommandProto());
- if (commandResponseProto.hasTerm()) {
- closePipelineCommand.setTerm(commandResponseProto.getTerm());
- }
if (LOG.isDebugEnabled()) {
LOG.debug("Received SCM close pipeline request {}",
closePipelineCommand.getPipelineID());
}
- this.context.addCommand(closePipelineCommand);
+ processCommonCommand(commandResponseProto, closePipelineCommand);
break;
case setNodeOperationalStateCommand:
SetNodeOperationalStateCommand setNodeOperationalStateCommand =
SetNodeOperationalStateCommand.getFromProtobuf(
commandResponseProto.getSetNodeOperationalStateCommandProto());
- if (commandResponseProto.hasTerm()) {
- setNodeOperationalStateCommand.setTerm(
- commandResponseProto.getTerm());
- }
if (LOG.isDebugEnabled()) {
LOG.debug("Received SCM set operational state command. State: {} " +
"Expiry: {}", setNodeOperationalStateCommand.getOpState(),
setNodeOperationalStateCommand.getStateExpiryEpochSeconds());
}
- this.context.addCommand(setNodeOperationalStateCommand);
+ processCommonCommand(commandResponseProto,
+ setNodeOperationalStateCommand);
break;
case finalizeNewLayoutVersionCommand:
FinalizeNewLayoutVersionCommand finalizeNewLayoutVersionCommand =
FinalizeNewLayoutVersionCommand.getFromProtobuf(
commandResponseProto.getFinalizeNewLayoutVersionCommandProto());
- if (commandResponseProto.hasTerm()) {
- finalizeNewLayoutVersionCommand.setTerm(
- commandResponseProto.getTerm());
- }
if (LOG.isDebugEnabled()) {
LOG.debug("Received SCM finalize command {}",
finalizeNewLayoutVersionCommand.getId());
}
- this.context.addCommand(finalizeNewLayoutVersionCommand);
+ processCommonCommand(commandResponseProto,
+ finalizeNewLayoutVersionCommand);
break;
case refreshVolumeUsageInfo:
RefreshVolumeUsageCommand refreshVolumeUsageCommand =
RefreshVolumeUsageCommand.getFromProtobuf(
commandResponseProto.getRefreshVolumeUsageCommandProto());
- if (commandResponseProto.hasTerm()) {
- refreshVolumeUsageCommand.setTerm(commandResponseProto.getTerm());
- }
- this.context.addCommand(refreshVolumeUsageCommand);
+ processCommonCommand(commandResponseProto, refreshVolumeUsageCommand);
break;
default:
throw new IllegalArgumentException("Unknown response : "
@@ -453,6 +423,38 @@
}
/**
+ * Common processing for SCM commands.
+ * - set term
+ * - set encoded token
+ * - add to context's queue
+ */
+ private void processCommonCommand(
+ SCMCommandProto response, SCMCommand<?> cmd) {
+ if (response.hasTerm()) {
+ cmd.setTerm(response.getTerm());
+ }
+ if (response.hasEncodedToken()) {
+ cmd.setEncodedToken(response.getEncodedToken());
+ }
+ context.addCommand(cmd);
+ }
+
+ private void processReregisterCommand() {
+ if (rpcEndpoint.getState() == EndPointStates.HEARTBEAT) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received SCM notification to register."
+ + " Interrupt HEARTBEAT and transit to REGISTER state.");
+ }
+ rpcEndpoint.setState(EndPointStates.REGISTER);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Illegal state {} found, expecting {}.",
+ rpcEndpoint.getState().name(), EndPointStates.HEARTBEAT);
+ }
+ }
+ }
+
+ /**
* Builder class for HeartbeatEndpointTask.
*/
public static class Builder {
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
index 2057f58..2daa985 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -18,13 +18,17 @@
package org.apache.hadoop.ozone.container.common.states.endpoint;
+import static java.util.Collections.emptyList;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reconstructECContainersCommand;
import static org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager.maxLayoutVersion;
+import static org.mockito.ArgumentMatchers.any;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -42,6 +46,7 @@
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB;
import org.junit.Assert;
@@ -58,6 +63,43 @@
new InetSocketAddress("test-scm-1", 9861);
@Test
+ public void handlesReconstructContainerCommand() throws Exception {
+ StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+ Mockito.mock(
+ StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+
+ ReconstructECContainersCommand cmd = new ReconstructECContainersCommand(
+ 1, emptyList(), emptyList(), new byte[]{2, 5},
+ new ECReplicationConfig(3, 2));
+
+ Mockito.when(scm.sendHeartbeat(any()))
+ .thenAnswer(invocation ->
+ SCMHeartbeatResponseProto.newBuilder()
+ .setDatanodeUUID(
+ ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+ .getDatanodeDetails().getUuid())
+ .addCommands(SCMCommandProto.newBuilder()
+ .setCommandType(reconstructECContainersCommand)
+ .setReconstructECContainersCommandProto(cmd.getProto())
+ .build())
+ .build());
+
+ OzoneConfiguration conf = new OzoneConfiguration();
+ DatanodeStateMachine datanodeStateMachine =
+ Mockito.mock(DatanodeStateMachine.class);
+ StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+ datanodeStateMachine);
+
+ // WHEN
+ HeartbeatEndpointTask task = getHeartbeatEndpointTask(conf, context, scm);
+ task.call();
+
+ // THEN
+ Assert.assertEquals(1, context.getCommandQueueSummary()
+ .get(reconstructECContainersCommand).intValue());
+ }
+
+ @Test
public void testheartbeatWithoutReports() throws Exception {
StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
Mockito.mock(
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 0b5bee8..625a59e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -38,6 +38,7 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReconstructECContainersCommandProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReregisterCommandProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
@@ -90,6 +91,7 @@
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteBlocksCommand;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteContainerCommand;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.finalizeNewLayoutVersionCommand;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reconstructECContainersCommand;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.refreshVolumeUsageInfo;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.replicateContainerCommand;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reregisterCommand;
@@ -352,6 +354,12 @@
.setReplicateContainerCommandProto(
((ReplicateContainerCommand)cmd).getProto())
.build();
+ case reconstructECContainersCommand:
+ return builder
+ .setCommandType(reconstructECContainersCommand)
+ .setReconstructECContainersCommandProto(
+ (ReconstructECContainersCommandProto) cmd.getProto())
+ .build();
case createPipelineCommand:
return builder
.setCommandType(createPipelineCommand)