HDDS-7069. EC: ReplicationManager - Track nodes already used when handing under replication (#3640)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index 8be80f2..be09e45 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -112,6 +112,10 @@
           container.getContainerID(), currentUnderRepRes);
       return emptyMap();
     }
+    List<DatanodeDetails> excludedNodes = replicas.stream()
+        .map(ContainerReplica::getDatanodeDetails)
+        .collect(Collectors.toList());
+
     ContainerHealthResult.UnderReplicatedHealthResult containerHealthResult =
         ((ContainerHealthResult.UnderReplicatedHealthResult)
             currentUnderRepRes);
@@ -154,8 +158,9 @@
                 " The missing indexes are {}", id, missingIndexes);
         // We have source nodes.
         if (sources.size() >= repConfig.getData()) {
-          final List<DatanodeDetails> selectedDatanodes =
-              getTargetDatanodes(replicas, container, missingIndexes.size());
+          final List<DatanodeDetails> selectedDatanodes = getTargetDatanodes(
+              excludedNodes, container, missingIndexes.size());
+          excludedNodes.addAll(selectedDatanodes);
 
           List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex>
               sourceDatanodesWithIndex = new ArrayList<>();
@@ -186,15 +191,17 @@
       Set<Integer> decomIndexes = replicaCount.decommissioningOnlyIndexes(true);
       if (decomIndexes.size() > 0) {
         final List<DatanodeDetails> selectedDatanodes =
-            getTargetDatanodes(replicas, container, decomIndexes.size());
+            getTargetDatanodes(excludedNodes, container, decomIndexes.size());
+        excludedNodes.addAll(selectedDatanodes);
         Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator();
         // In this case we need to do one to one copy.
         for (ContainerReplica replica : replicas) {
           if (decomIndexes.contains(replica.getReplicaIndex())) {
             if (!iterator.hasNext()) {
-              LOG.warn("Couldn't find the enough targets. Available source"
-                  + " nodes: {}, the target nodes: {} and the decommission"
-                  + " indexes: {}", replicas, selectedDatanodes, decomIndexes);
+              LOG.warn("Couldn't find enough targets. Available source"
+                  + " nodes: {}, the target nodes: {}, excluded nodes: {} and"
+                  + "  the decommission indexes: {}",
+                  replicas, selectedDatanodes, excludedNodes, decomIndexes);
               break;
             }
             DatanodeDetails decommissioningSrcNode
@@ -216,23 +223,24 @@
           id, ex);
       throw ex;
     }
+    if (commands.size() == 0) {
+      LOG.warn("Container {} is under replicated, but no commands were " +
+          "created to correct it", id);
+    }
     return commands;
   }
 
   private List<DatanodeDetails> getTargetDatanodes(
-      Set<ContainerReplica> replicas, ContainerInfo container,
+      List<DatanodeDetails> excludedNodes, ContainerInfo container,
       int requiredNodes) throws IOException {
     // We should ensure that the target datanode has enough space
     // for a complete container to be created, but since the container
     // size may be changed smaller than origin, we should be defensive.
     final long dataSizeRequired =
         Math.max(container.getUsedBytes(), currentContainerSize);
-    final List<DatanodeDetails> excludeList =
-        replicas.stream().map(ContainerReplica::getDatanodeDetails)
-            .collect(Collectors.toList());
-
     return containerPlacement
-        .chooseDatanodes(excludeList, null, requiredNodes, 0, dataSizeRequired);
+        .chooseDatanodes(excludedNodes, null, requiredNodes, 0,
+            dataSizeRequired);
   }
 
   private static byte[] int2byte(List<Integer> src) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
index 2be5cc6..2b3883c 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
@@ -166,6 +166,35 @@
     };
   }
 
+  public static PlacementPolicy getSameNodeTestPlacementPolicy(
+      final NodeManager nodeManager, final OzoneConfiguration conf,
+      DatanodeDetails nodeToReturn) {
+    return new SCMCommonPlacementPolicy(nodeManager, conf) {
+      @Override
+      protected List<DatanodeDetails> chooseDatanodesInternal(
+          List<DatanodeDetails> excludedNodes,
+          List<DatanodeDetails> favoredNodes, int nodesRequiredToChoose,
+          long metadataSizeRequired, long dataSizeRequired)
+          throws SCMException {
+        if (nodesRequiredToChoose > 1) {
+          throw new IllegalArgumentException("Only one node is allowed");
+        }
+        if (excludedNodes.contains(nodeToReturn)) {
+          throw new SCMException("Insufficient Nodes available to choose",
+              SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES);
+        }
+        List<DatanodeDetails> dns = new ArrayList<>();
+        dns.add(nodeToReturn);
+        return dns;
+      }
+
+      @Override
+      public DatanodeDetails chooseNode(List<DatanodeDetails> healthyNodes) {
+        return null;
+      }
+    };
+  }
+
   public static PlacementPolicy getNoNodesTestPlacementPolicy(
       final NodeManager nodeManager, final OzoneConfiguration conf) {
     return new SCMCommonPlacementPolicy(nodeManager, conf) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index ed09c04..b17f5d6 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -22,6 +22,7 @@
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@@ -169,6 +170,34 @@
 
   }
 
+  @Test
+  public void testSameNodeIsNotReused() throws IOException {
+    DatanodeDetails newDn = MockDatanodeDetails.randomDatanodeDetails();
+    PlacementPolicy sameNodePolicy = ReplicationTestUtil
+        .getSameNodeTestPlacementPolicy(nodeManager, conf, newDn);
+    // Just have a missing index, this should return OK.
+    Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+        .createReplicas(Pair.of(IN_SERVICE, 1),
+            Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
+            Pair.of(IN_SERVICE, 4));
+    // Passing zero for decommIndexes, as we don't expect the decom command to
+    // get created due to the placement policy returning an already used node
+    testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
+        availableReplicas, 0, sameNodePolicy);
+
+    // Now add a decommissioning index - we should get an exception as the
+    // placement policy should throw. It will have used up the node for
+    // reconstruction, and hence no nodes will be found to fix the decommission
+    // index.
+    Set<ContainerReplica> replicas = ReplicationTestUtil
+        .createReplicas(Pair.of(DECOMMISSIONING, 1),
+            Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
+            Pair.of(IN_SERVICE, 4));
+    assertThrows(SCMException.class, () ->
+        testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
+            replicas, 1, sameNodePolicy));
+  }
+
   public Map<DatanodeDetails, SCMCommand<?>>
       testUnderReplicationWithMissingIndexes(
       List<Integer> missingIndexes, Set<ContainerReplica> availableReplicas,