HDDS-10593. Prefer client read from IN_SERVICE datanodes (#6449)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 52f435d..2b5854c 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -20,7 +20,9 @@
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,6 +44,7 @@
 import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
 import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
 import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -384,6 +387,12 @@
       }
     }
 
+    boolean allInService = datanodeList.stream()
+        .allMatch(dn -> dn.getPersistedOpState() == NodeOperationalState.IN_SERVICE);
+    if (!allInService) {
+      datanodeList = sortDatanodeByOperationalState(datanodeList);
+    }
+
     for (DatanodeDetails dn : datanodeList) {
       try {
         if (LOG.isDebugEnabled()) {
@@ -447,6 +456,30 @@
     }
   }
 
+  private static List<DatanodeDetails> sortDatanodeByOperationalState(
+      List<DatanodeDetails> datanodeList) {
+    List<DatanodeDetails> sortedDatanodeList = new ArrayList<>(datanodeList);
+    // Make IN_SERVICE's Datanode precede all other State's Datanodes.
+    // This is a stable sort that does not change the order of the
+    // IN_SERVICE's Datanode.
+    Comparator<DatanodeDetails> byOpStateStable = (first, second) -> {
+      boolean firstInService = first.getPersistedOpState() ==
+          NodeOperationalState.IN_SERVICE;
+      boolean secondInService = second.getPersistedOpState() ==
+          NodeOperationalState.IN_SERVICE;
+
+      if (firstInService == secondInService) {
+        return 0;
+      } else if (firstInService) {
+        return -1;
+      } else {
+        return 1;
+      }
+    };
+    sortedDatanodeList.sort(byOpStateStable);
+    return sortedDatanodeList;
+  }
+
   @Override
   public XceiverClientReply sendCommandAsync(
       ContainerCommandRequestProto request)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java
index 79c937c..99095f5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java
@@ -20,13 +20,17 @@
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
@@ -175,6 +179,39 @@
   }
 
   @Test
+  public void testPrimaryReadFromNormalDatanode()
+      throws IOException {
+    final List<DatanodeDetails> seenDNs = new ArrayList<>();
+    for (int i = 0; i < 100; i++) {
+      Pipeline randomPipeline = MockPipeline.createRatisPipeline();
+      int nodeCount = randomPipeline.getNodes().size();
+      assertThat(nodeCount).isGreaterThan(1);
+      randomPipeline.getNodes().forEach(
+          node -> assertEquals(NodeOperationalState.IN_SERVICE, node.getPersistedOpState()));
+
+      randomPipeline.getNodes().get(
+          RandomUtils.nextInt(0, nodeCount)).setPersistedOpState(NodeOperationalState.IN_MAINTENANCE);
+      randomPipeline.getNodes().get(
+          RandomUtils.nextInt(0, nodeCount)).setPersistedOpState(NodeOperationalState.IN_MAINTENANCE);
+      try (XceiverClientGrpc client = new XceiverClientGrpc(randomPipeline, conf) {
+        @Override
+        public XceiverClientReply sendCommandAsync(
+            ContainerProtos.ContainerCommandRequestProto request,
+            DatanodeDetails dn) {
+          seenDNs.add(dn);
+          return buildValidResponse();
+        }
+      }) {
+        invokeXceiverClientGetBlock(client);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+      // Always the IN_SERVICE datanode will be read first
+      assertEquals(NodeOperationalState.IN_SERVICE, seenDNs.get(0).getPersistedOpState());
+    }
+  }
+
+  @Test
   public void testConnectionReusedAfterGetBlock() throws IOException {
     // With a new Client, make 100 calls. On each call, ensure that only one
     // DN is seen, indicating the same DN connection is reused.