HDDS-10593. Prefer client read from IN_SERVICE datanodes (#6449)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 52f435d..2b5854c 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -20,7 +20,9 @@
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -42,6 +44,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -384,6 +387,12 @@
}
}
+ boolean allInService = datanodeList.stream()
+ .allMatch(dn -> dn.getPersistedOpState() == NodeOperationalState.IN_SERVICE);
+ if (!allInService) {
+ datanodeList = sortDatanodeByOperationalState(datanodeList);
+ }
+
for (DatanodeDetails dn : datanodeList) {
try {
if (LOG.isDebugEnabled()) {
@@ -447,6 +456,30 @@
}
}
+ private static List<DatanodeDetails> sortDatanodeByOperationalState(
+ List<DatanodeDetails> datanodeList) {
+ List<DatanodeDetails> sortedDatanodeList = new ArrayList<>(datanodeList);
+ // Make IN_SERVICE's Datanode precede all other State's Datanodes.
+ // This is a stable sort that does not change the order of the
+ // IN_SERVICE's Datanode.
+ Comparator<DatanodeDetails> byOpStateStable = (first, second) -> {
+ boolean firstInService = first.getPersistedOpState() ==
+ NodeOperationalState.IN_SERVICE;
+ boolean secondInService = second.getPersistedOpState() ==
+ NodeOperationalState.IN_SERVICE;
+
+ if (firstInService == secondInService) {
+ return 0;
+ } else if (firstInService) {
+ return -1;
+ } else {
+ return 1;
+ }
+ };
+ sortedDatanodeList.sort(byOpStateStable);
+ return sortedDatanodeList;
+ }
+
@Override
public XceiverClientReply sendCommandAsync(
ContainerCommandRequestProto request)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java
index 79c937c..99095f5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientGrpc.java
@@ -20,13 +20,17 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
@@ -175,6 +179,39 @@
}
@Test
+ public void testPrimaryReadFromNormalDatanode()
+ throws IOException {
+ final List<DatanodeDetails> seenDNs = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ Pipeline randomPipeline = MockPipeline.createRatisPipeline();
+ int nodeCount = randomPipeline.getNodes().size();
+ assertThat(nodeCount).isGreaterThan(1);
+ randomPipeline.getNodes().forEach(
+ node -> assertEquals(NodeOperationalState.IN_SERVICE, node.getPersistedOpState()));
+
+ randomPipeline.getNodes().get(
+ RandomUtils.nextInt(0, nodeCount)).setPersistedOpState(NodeOperationalState.IN_MAINTENANCE);
+ randomPipeline.getNodes().get(
+ RandomUtils.nextInt(0, nodeCount)).setPersistedOpState(NodeOperationalState.IN_MAINTENANCE);
+ try (XceiverClientGrpc client = new XceiverClientGrpc(randomPipeline, conf) {
+ @Override
+ public XceiverClientReply sendCommandAsync(
+ ContainerProtos.ContainerCommandRequestProto request,
+ DatanodeDetails dn) {
+ seenDNs.add(dn);
+ return buildValidResponse();
+ }
+ }) {
+ invokeXceiverClientGetBlock(client);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ // Always the IN_SERVICE datanode will be read first
+ assertEquals(NodeOperationalState.IN_SERVICE, seenDNs.get(0).getPersistedOpState());
+ }
+ }
+
+ @Test
public void testConnectionReusedAfterGetBlock() throws IOException {
// With a new Client, make 100 calls. On each call, ensure that only one
// DN is seen, indicating the same DN connection is reused.