HDDS-5697. SCMContainerPlacementRackAware should both affinity nodes before fallback (#2597)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java
index 8ea2eae..10184ae 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java
@@ -183,7 +183,8 @@
    * @return the chosen node
    */
   Node chooseRandom(String scope, List<String>  excludedScopes,
-      Collection<Node> excludedNodes, Node affinityNode, int ancestorGen);
+      Collection<? extends Node> excludedNodes, Node affinityNode,
+      int ancestorGen);
 
   /**
    * Choose the node at index <i>index</i> from <i>scope</i>, share the same
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
index 3d3bdbf..43765a6 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
@@ -421,7 +421,8 @@
    */
   @Override
   public Node chooseRandom(String scope, List<String> excludedScopes,
-      Collection<Node> excludedNodes, Node affinityNode, int ancestorGen) {
+      Collection<? extends Node> excludedNodes, Node affinityNode,
+      int ancestorGen) {
     if (scope == null) {
       scope = ROOT;
     }
@@ -511,7 +512,7 @@
   }
 
   private Node chooseNodeInternal(String scope, int leafIndex,
-      List<String> excludedScopes, Collection<Node> excludedNodes,
+      List<String> excludedScopes, Collection<? extends Node> excludedNodes,
       Node affinityNode, int ancestorGen) {
     Preconditions.checkArgument(scope != null);
     if (LOG.isDebugEnabled()) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
index 615f7f4..d46713b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
@@ -33,6 +33,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Container placement policy that choose datanodes with network topology
@@ -120,14 +121,14 @@
     int favoredNodeNum = mutableFavoredNodes == null? 0 :
         mutableFavoredNodes.size();
 
-    List<Node> chosenNodes = new ArrayList<>();
+    List<DatanodeDetails> chosenNodes = new ArrayList<>();
     int favorIndex = 0;
     if (excludedNodes == null || excludedNodes.isEmpty()) {
       // choose all nodes for a new pipeline case
       // choose first datanode from scope ROOT or from favoredNodes if not null
-      Node favoredNode = favoredNodeNum > favorIndex ?
+      DatanodeDetails favoredNode = favoredNodeNum > favorIndex ?
           mutableFavoredNodes.get(favorIndex) : null;
-      Node firstNode;
+      DatanodeDetails firstNode;
       if (favoredNode != null) {
         firstNode = favoredNode;
         favorIndex++;
@@ -144,14 +145,14 @@
       // choose second datanode on the same rack as first one
       favoredNode = favoredNodeNum > favorIndex ?
           mutableFavoredNodes.get(favorIndex) : null;
-      Node secondNode;
+      DatanodeDetails secondNode;
       if (favoredNode != null &&
           networkTopology.isSameParent(firstNode, favoredNode)) {
         secondNode = favoredNode;
         favorIndex++;
       } else {
-        secondNode = chooseNode(chosenNodes, firstNode, metadataSizeRequired,
-            dataSizeRequired);
+        secondNode = chooseNode(chosenNodes, Arrays.asList(firstNode),
+            metadataSizeRequired, dataSizeRequired);
       }
       chosenNodes.add(secondNode);
       nodesRequired--;
@@ -163,22 +164,22 @@
       return chooseNodes(null, chosenNodes, mutableFavoredNodes,
           favorIndex, nodesRequired, metadataSizeRequired, dataSizeRequired);
     } else {
-      List<Node> mutableExcludedNodes = new ArrayList<>();
+      List<DatanodeDetails> mutableExcludedNodes = new ArrayList<>();
       mutableExcludedNodes.addAll(excludedNodes);
       // choose node to meet replication requirement
       // case 1: one excluded node, choose one on the same rack as the excluded
       // node, choose others on different racks.
-      Node favoredNode;
+      DatanodeDetails favoredNode;
       if (excludedNodes.size() == 1) {
         favoredNode = favoredNodeNum > favorIndex ?
             mutableFavoredNodes.get(favorIndex) : null;
-        Node firstNode;
+        DatanodeDetails firstNode;
         if (favoredNode != null &&
             networkTopology.isSameParent(excludedNodes.get(0), favoredNode)) {
           firstNode = favoredNode;
           favorIndex++;
         } else {
-          firstNode = chooseNode(mutableExcludedNodes, excludedNodes.get(0),
+          firstNode = chooseNode(mutableExcludedNodes, excludedNodes,
               metadataSizeRequired, dataSizeRequired);
         }
         chosenNodes.add(firstNode);
@@ -208,14 +209,14 @@
       // choose one data on the same rack with one excluded node
       favoredNode = favoredNodeNum > favorIndex ?
           mutableFavoredNodes.get(favorIndex) : null;
-      Node secondNode;
+      DatanodeDetails secondNode;
       if (favoredNode != null && networkTopology.isSameParent(
           mutableExcludedNodes.get(0), favoredNode)) {
         secondNode = favoredNode;
         favorIndex++;
       } else {
         secondNode =
-            chooseNode(chosenNodes, mutableExcludedNodes.get(0),
+            chooseNode(chosenNodes, mutableExcludedNodes,
                 metadataSizeRequired, dataSizeRequired);
       }
       chosenNodes.add(secondNode);
@@ -242,34 +243,49 @@
    *
    *
    * @param excludedNodes - list of the datanodes to excluded. Can be null.
-   * @param affinityNode - the chosen nodes should be on the same rack as
-   *                    affinityNode. Can be null.
+   * @param affinityNodes - the chosen nodes should be on the same rack as
+   *                    affinityNodes. Can be null.
    * @param dataSizeRequired - size required for the container.
    * @param metadataSizeRequired - size required for Ratis metadata.
    * @return List of chosen datanodes.
    * @throws SCMException  SCMException
    */
-  private Node chooseNode(List<Node> excludedNodes, Node affinityNode,
-      long metadataSizeRequired, long dataSizeRequired) throws SCMException {
+  private DatanodeDetails chooseNode(List<DatanodeDetails> excludedNodes,
+      List<DatanodeDetails> affinityNodes, long metadataSizeRequired,
+      long dataSizeRequired) throws SCMException {
     int ancestorGen = RACK_LEVEL;
     int maxRetry = MAX_RETRY;
     List<String> excludedNodesForCapacity = null;
     boolean isFallbacked = false;
     while(true) {
       metrics.incrDatanodeChooseAttemptCount();
-      Node node = networkTopology.chooseRandom(NetConstants.ROOT,
-          excludedNodesForCapacity, excludedNodes, affinityNode, ancestorGen);
+      DatanodeDetails node = null;
+      if (affinityNodes != null) {
+        for (Node affinityNode : affinityNodes) {
+          node = (DatanodeDetails)networkTopology.chooseRandom(
+              NetConstants.ROOT, excludedNodesForCapacity, excludedNodes,
+              affinityNode, ancestorGen);
+          if (node != null) {
+            break;
+          }
+        }
+      } else {
+        node = (DatanodeDetails)networkTopology.chooseRandom(NetConstants.ROOT,
+            excludedNodesForCapacity, excludedNodes, null, ancestorGen);
+      }
+
       if (node == null) {
         // cannot find the node which meets all constrains
         LOG.warn("Failed to find the datanode for container. excludedNodes:" +
             (excludedNodes == null ? "" : excludedNodes.toString()) +
             ", affinityNode:" +
-            (affinityNode == null ? "" : affinityNode.getNetworkFullPath()));
+            (affinityNodes == null ? "" : affinityNodes.stream()
+                .map(Object::toString).collect(Collectors.joining(", "))));
         if (fallback) {
           isFallbacked = true;
           // fallback, don't consider the affinity node
-          if (affinityNode != null) {
-            affinityNode = null;
+          if (affinityNodes != null) {
+            affinityNodes = null;
             continue;
           }
           // fallback, don't consider cross rack
@@ -283,9 +299,7 @@
             " excludedNodes and affinityNode constrains.", null);
       }
 
-      DatanodeDetails datanodeDetails = (DatanodeDetails)node;
-      if (isValidNode(datanodeDetails, metadataSizeRequired,
-          dataSizeRequired)) {
+      if (isValidNode(node, metadataSizeRequired, dataSizeRequired)) {
         metrics.incrDatanodeChooseSuccessCount();
         if (isFallbacked) {
           metrics.incrDatanodeChooseFallbackCount();
@@ -327,18 +341,18 @@
    * @return List of chosen datanodes.
    * @throws SCMException  SCMException
    */
-  private List<DatanodeDetails> chooseNodes(List<Node> excludedNodes,
-      List<Node> chosenNodes, List<DatanodeDetails> favoredNodes,
+  private List<DatanodeDetails> chooseNodes(List<DatanodeDetails> excludedNodes,
+      List<DatanodeDetails> chosenNodes, List<DatanodeDetails> favoredNodes,
       int favorIndex, int nodesRequired, long metadataSizeRequired,
       long dataSizeRequired) throws SCMException {
     Preconditions.checkArgument(chosenNodes != null);
-    List<Node> excludedNodeList = excludedNodes != null ?
+    List<DatanodeDetails> excludedNodeList = excludedNodes != null ?
         excludedNodes : chosenNodes;
     int favoredNodeNum = favoredNodes == null? 0 : favoredNodes.size();
     while(true) {
-      Node favoredNode = favoredNodeNum > favorIndex ?
+      DatanodeDetails favoredNode = favoredNodeNum > favorIndex ?
           favoredNodes.get(favorIndex) : null;
-      Node chosenNode;
+      DatanodeDetails chosenNode;
       if (favoredNode != null && networkTopology.isSameParent(
           excludedNodeList.get(excludedNodeList.size() - 1), favoredNode)) {
         chosenNode = favoredNode;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
index dd004f8..eba7703 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
@@ -283,6 +283,21 @@
   }
 
   @Test
+  public void testSingleNodeRack() throws SCMException {
+    // make sure there is a single node rack
+    assumeTrue(datanodeCount % NODE_PER_RACK == 1);
+    List<DatanodeDetails> excludeNodes = new ArrayList<>();
+    excludeNodes.add(datanodes.get(datanodeCount - 1));
+    excludeNodes.add(datanodes.get(0));
+    List<DatanodeDetails> chooseDatanodes =
+        policy.chooseDatanodes(excludeNodes, null, 1, 0, 0);
+    Assert.assertTrue(chooseDatanodes.size() == 1);
+    // the selected node should be on the same rack as the second exclude node
+    Assert.assertTrue(chooseDatanodes.get(0).toString(),
+        cluster.isSameParent(chooseDatanodes.get(0), excludeNodes.get(1)));
+  }
+
+  @Test
   public void testFallback() throws SCMException {
     // 5 replicas. there are only 3 racks. policy with fallback should
     // allocate the 5th datanode though it will break the rack rule(first