HDFS-16831. [RBF SBN] GetNamenodesForNameserviceId should shuffle Observer NameNodes every time (#5098)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
index 862f851..db1dcdf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java
@@ -193,13 +193,53 @@ private void updateNameNodeState(final String nsId,
}
}
+ /**
+ * Try to shuffle the multiple observer namenodes if listObserversFirst is true.
+ * @param inputNameNodes the input FederationNamenodeContext list. If listObserversFirst is true,
+ * all observers will be placed at the front of the collection.
+ * @param listObserversFirst true if we need to shuffle the multiple front observer namenodes.
+ * @return a list of FederationNamenodeContext.
+ * @param <T> a subclass of FederationNamenodeContext.
+ */
+ private <T extends FederationNamenodeContext> List<T> shuffleObserverNN(
+ List<T> inputNameNodes, boolean listObserversFirst) {
+ if (!listObserversFirst) {
+ return inputNameNodes;
+ }
+ // Get Observers first.
+ List<T> observerList = new ArrayList<>();
+ for (T t : inputNameNodes) {
+ if (t.getState() == OBSERVER) {
+ observerList.add(t);
+ } else {
+ // The inputNameNodes are already sorted, so it can break
+ // when the first non-observer is encountered.
+ break;
+ }
+ }
+ // Returns the inputNameNodes if no shuffle is required
+ if (observerList.size() <= 1) {
+ return inputNameNodes;
+ }
+
+ // Shuffle multiple Observers
+ Collections.shuffle(observerList);
+
+ List<T> ret = new ArrayList<>(inputNameNodes.size());
+ ret.addAll(observerList);
+ for (int i = observerList.size(); i < inputNameNodes.size(); i++) {
+ ret.add(inputNameNodes.get(i));
+ }
+ return Collections.unmodifiableList(ret);
+ }
+
@Override
public List<? extends FederationNamenodeContext> getNamenodesForNameserviceId(
final String nsId, boolean listObserversFirst) throws IOException {
List<? extends FederationNamenodeContext> ret = cacheNS.get(Pair.of(nsId, listObserversFirst));
if (ret != null) {
- return ret;
+ return shuffleObserverNN(ret, listObserversFirst);
}
// Not cached, generate the value
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java
index b602a27..05d21b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java
@@ -91,6 +91,98 @@ public void setup() throws IOException, InterruptedException {
}
@Test
+ public void testShuffleObserverNNs() throws Exception {
+ // Add an active entry to the store
+ NamenodeStatusReport activeReport = createNamenodeReport(
+ NAMESERVICES[0], NAMENODES[0], HAServiceState.ACTIVE);
+ assertTrue(namenodeResolver.registerNamenode(activeReport));
+
+ // Add a standby entry to the store
+ NamenodeStatusReport standbyReport = createNamenodeReport(
+ NAMESERVICES[0], NAMENODES[1], HAServiceState.STANDBY);
+ assertTrue(namenodeResolver.registerNamenode(standbyReport));
+
+ // Load cache
+ stateStore.refreshCaches(true);
+
+ // Get namenodes from state store.
+ List<? extends FederationNamenodeContext> withoutObserver =
+ namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
+ assertEquals(2, withoutObserver.size());
+ assertEquals(FederationNamenodeServiceState.ACTIVE, withoutObserver.get(0).getState());
+ assertEquals(FederationNamenodeServiceState.STANDBY, withoutObserver.get(1).getState());
+
+ // Get namenodes from cache.
+ withoutObserver = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
+ assertEquals(2, withoutObserver.size());
+ assertEquals(FederationNamenodeServiceState.ACTIVE, withoutObserver.get(0).getState());
+ assertEquals(FederationNamenodeServiceState.STANDBY, withoutObserver.get(1).getState());
+
+ // Add an observer entry to the store
+ NamenodeStatusReport observerReport1 = createNamenodeReport(
+ NAMESERVICES[0], NAMENODES[2], HAServiceState.OBSERVER);
+ assertTrue(namenodeResolver.registerNamenode(observerReport1));
+
+ // Load cache
+ stateStore.refreshCaches(true);
+
+ // Get namenodes from state store.
+ List<? extends FederationNamenodeContext> observerList =
+ namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
+ assertEquals(3, observerList.size());
+ assertEquals(FederationNamenodeServiceState.OBSERVER, observerList.get(0).getState());
+ assertEquals(FederationNamenodeServiceState.ACTIVE, observerList.get(1).getState());
+ assertEquals(FederationNamenodeServiceState.STANDBY, observerList.get(2).getState());
+
+ // Get namenodes from cache.
+ observerList = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
+ assertEquals(3, observerList.size());
+ assertEquals(FederationNamenodeServiceState.OBSERVER, observerList.get(0).getState());
+ assertEquals(FederationNamenodeServiceState.ACTIVE, observerList.get(1).getState());
+ assertEquals(FederationNamenodeServiceState.STANDBY, observerList.get(2).getState());
+
+ // Add one new observer entry to the store
+ NamenodeStatusReport observerReport2 = createNamenodeReport(
+ NAMESERVICES[0], NAMENODES[3], HAServiceState.OBSERVER);
+ assertTrue(namenodeResolver.registerNamenode(observerReport2));
+
+ // Load cache
+ stateStore.refreshCaches(true);
+
+ // Get namenodes from state store.
+ List<? extends FederationNamenodeContext> observerList2 =
+ namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
+ assertEquals(4, observerList2.size());
+ assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(0).getState());
+ assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(1).getState());
+ assertEquals(FederationNamenodeServiceState.ACTIVE, observerList2.get(2).getState());
+ assertEquals(FederationNamenodeServiceState.STANDBY, observerList2.get(3).getState());
+
+ // Get namenodes from cache.
+ observerList2 = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
+ assertEquals(4, observerList2.size());
+ assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(0).getState());
+ assertEquals(FederationNamenodeServiceState.OBSERVER, observerList2.get(1).getState());
+ assertEquals(FederationNamenodeServiceState.ACTIVE, observerList2.get(2).getState());
+ assertEquals(FederationNamenodeServiceState.STANDBY, observerList2.get(3).getState());
+
+ // Test shuffler
+ List<? extends FederationNamenodeContext> observerList3;
+ boolean hit = false;
+ for (int i = 0; i < 1000; i++) {
+ observerList3 = namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0], true);
+ assertEquals(FederationNamenodeServiceState.OBSERVER, observerList3.get(0).getState());
+ assertEquals(FederationNamenodeServiceState.OBSERVER, observerList3.get(1).getState());
+ if (observerList3.get(0).getNamenodeId().equals(observerList2.get(1).getNamenodeId()) &&
+ observerList3.get(1).getNamenodeId().equals(observerList2.get(0).getNamenodeId())) {
+ hit = true;
+ break;
+ }
+ }
+ assertTrue(hit);
+ }
+
+ @Test
public void testStateStoreDisconnected() throws Exception {
// Add an entry to the store