HDDS-7069. EC: ReplicationManager - Track nodes already used when handing under replication (#3640)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index 8be80f2..be09e45 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -112,6 +112,10 @@
container.getContainerID(), currentUnderRepRes);
return emptyMap();
}
+ List<DatanodeDetails> excludedNodes = replicas.stream()
+ .map(ContainerReplica::getDatanodeDetails)
+ .collect(Collectors.toList());
+
ContainerHealthResult.UnderReplicatedHealthResult containerHealthResult =
((ContainerHealthResult.UnderReplicatedHealthResult)
currentUnderRepRes);
@@ -154,8 +158,9 @@
" The missing indexes are {}", id, missingIndexes);
// We have source nodes.
if (sources.size() >= repConfig.getData()) {
- final List<DatanodeDetails> selectedDatanodes =
- getTargetDatanodes(replicas, container, missingIndexes.size());
+ final List<DatanodeDetails> selectedDatanodes = getTargetDatanodes(
+ excludedNodes, container, missingIndexes.size());
+ excludedNodes.addAll(selectedDatanodes);
List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex>
sourceDatanodesWithIndex = new ArrayList<>();
@@ -186,15 +191,17 @@
Set<Integer> decomIndexes = replicaCount.decommissioningOnlyIndexes(true);
if (decomIndexes.size() > 0) {
final List<DatanodeDetails> selectedDatanodes =
- getTargetDatanodes(replicas, container, decomIndexes.size());
+ getTargetDatanodes(excludedNodes, container, decomIndexes.size());
+ excludedNodes.addAll(selectedDatanodes);
Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator();
// In this case we need to do one to one copy.
for (ContainerReplica replica : replicas) {
if (decomIndexes.contains(replica.getReplicaIndex())) {
if (!iterator.hasNext()) {
- LOG.warn("Couldn't find the enough targets. Available source"
- + " nodes: {}, the target nodes: {} and the decommission"
- + " indexes: {}", replicas, selectedDatanodes, decomIndexes);
+ LOG.warn("Couldn't find enough targets. Available source"
+ + " nodes: {}, the target nodes: {}, excluded nodes: {} and"
+ + " the decommission indexes: {}",
+ replicas, selectedDatanodes, excludedNodes, decomIndexes);
break;
}
DatanodeDetails decommissioningSrcNode
@@ -216,23 +223,24 @@
id, ex);
throw ex;
}
+ if (commands.size() == 0) {
+ LOG.warn("Container {} is under replicated, but no commands were " +
+ "created to correct it", id);
+ }
return commands;
}
private List<DatanodeDetails> getTargetDatanodes(
- Set<ContainerReplica> replicas, ContainerInfo container,
+ List<DatanodeDetails> excludedNodes, ContainerInfo container,
int requiredNodes) throws IOException {
// We should ensure that the target datanode has enough space
// for a complete container to be created, but since the container
// size may be changed smaller than origin, we should be defensive.
final long dataSizeRequired =
Math.max(container.getUsedBytes(), currentContainerSize);
- final List<DatanodeDetails> excludeList =
- replicas.stream().map(ContainerReplica::getDatanodeDetails)
- .collect(Collectors.toList());
-
return containerPlacement
- .chooseDatanodes(excludeList, null, requiredNodes, 0, dataSizeRequired);
+ .chooseDatanodes(excludedNodes, null, requiredNodes, 0,
+ dataSizeRequired);
}
private static byte[] int2byte(List<Integer> src) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
index 2be5cc6..2b3883c 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
@@ -166,6 +166,35 @@
};
}
+ public static PlacementPolicy getSameNodeTestPlacementPolicy(
+ final NodeManager nodeManager, final OzoneConfiguration conf,
+ DatanodeDetails nodeToReturn) {
+ return new SCMCommonPlacementPolicy(nodeManager, conf) {
+ @Override
+ protected List<DatanodeDetails> chooseDatanodesInternal(
+ List<DatanodeDetails> excludedNodes,
+ List<DatanodeDetails> favoredNodes, int nodesRequiredToChoose,
+ long metadataSizeRequired, long dataSizeRequired)
+ throws SCMException {
+ if (nodesRequiredToChoose > 1) {
+ throw new IllegalArgumentException("Only one node is allowed");
+ }
+ if (excludedNodes.contains(nodeToReturn)) {
+ throw new SCMException("Insufficient Nodes available to choose",
+ SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES);
+ }
+ List<DatanodeDetails> dns = new ArrayList<>();
+ dns.add(nodeToReturn);
+ return dns;
+ }
+
+ @Override
+ public DatanodeDetails chooseNode(List<DatanodeDetails> healthyNodes) {
+ return null;
+ }
+ };
+ }
+
public static PlacementPolicy getNoNodesTestPlacementPolicy(
final NodeManager nodeManager, final OzoneConfiguration conf) {
return new SCMCommonPlacementPolicy(nodeManager, conf) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index ed09c04..b17f5d6 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -22,6 +22,7 @@
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@@ -169,6 +170,34 @@
}
+ @Test
+ public void testSameNodeIsNotReused() throws IOException {
+ DatanodeDetails newDn = MockDatanodeDetails.randomDatanodeDetails();
+ PlacementPolicy sameNodePolicy = ReplicationTestUtil
+ .getSameNodeTestPlacementPolicy(nodeManager, conf, newDn);
+ // Just have a missing index, this should return OK.
+ Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+ .createReplicas(Pair.of(IN_SERVICE, 1),
+ Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
+ Pair.of(IN_SERVICE, 4));
+ // Passing zero for decommIndexes, as we don't expect the decom command to
+ // get created due to the placement policy returning an already used node
+ testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
+ availableReplicas, 0, sameNodePolicy);
+
+ // Now add a decommissioning index - we should get an exception as the
+ // placement policy should throw. It will have used up the node for
+ // reconstruction, and hence no nodes will be found to fix the decommission
+ // index.
+ Set<ContainerReplica> replicas = ReplicationTestUtil
+ .createReplicas(Pair.of(DECOMMISSIONING, 1),
+ Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
+ Pair.of(IN_SERVICE, 4));
+ assertThrows(SCMException.class, () ->
+ testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
+ replicas, 1, sameNodePolicy));
+ }
+
public Map<DatanodeDetails, SCMCommand<?>>
testUnderReplicationWithMissingIndexes(
List<Integer> missingIndexes, Set<ContainerReplica> availableReplicas,