HDFS-17354:Delay invoke clearStaleNamespacesInRouterStateIdContext during router start up (#6498)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 9d7c126..29aa16f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -430,6 +430,9 @@ public RouterRpcServer(Configuration conf, Router router,
* Clear expired namespace in the shared RouterStateIdContext.
*/
private void clearStaleNamespacesInRouterStateIdContext() {
+ if (!router.isRouterState(RouterServiceState.RUNNING)) {
+ return;
+ }
try {
final Set<String> resolvedNamespaces = namenodeResolver.getNamespaces()
.stream()
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
index 5548798..04b9427 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
@@ -336,7 +336,14 @@ public synchronized boolean registerNamenode(NamenodeStatusReport report)
@Override
public synchronized Set<FederationNamespaceInfo> getNamespaces()
throws IOException {
- return Collections.unmodifiableSet(this.namespaces);
+ Set<FederationNamespaceInfo> ret = new TreeSet<>();
+ Set<String> disabled = getDisabledNamespaces();
+ for (FederationNamespaceInfo ns : namespaces) {
+ if (!disabled.contains(ns.getNameserviceId())) {
+ ret.add(ns);
+ }
+ }
+ return Collections.unmodifiableSet(ret);
}
public void clearDisableNamespaces() {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
index 766a0351..f531e40 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
@@ -23,10 +23,12 @@
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.addDirectory;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.countContents;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createNamenodeReport;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.deleteFile;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getFileStatus;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.TEST_STRING;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS;
import static org.apache.hadoop.ipc.CallerContext.PROXY_USER_PORT;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.assertj.core.api.Assertions.assertThat;
@@ -72,6 +74,7 @@
import org.apache.hadoop.fs.SafeModeAction;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -115,6 +118,7 @@
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
import org.apache.hadoop.hdfs.server.federation.metrics.RBFMetrics;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -2327,4 +2331,46 @@ public void testGetListingOrder() throws Exception {
fileSystem1.delete(new Path(testPath2), true);
}
}
+
+ @Test
+ public void testClearStaleNamespacesInRouterStateIdContext() throws Exception {
+ Router testRouter = new Router();
+ Configuration routerConfig = DFSRouter.getConfiguration();
+ routerConfig.set(FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS, "2000");
+ routerConfig.set(RBFConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE, "false");
+ // Mock resolver classes
+ routerConfig.setClass(RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
+ MockResolver.class, ActiveNamenodeResolver.class);
+ routerConfig.setClass(RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
+ MockResolver.class, FileSubclusterResolver.class);
+
+ testRouter.init(routerConfig);
+ String nsID1 = cluster.getNameservices().get(0);
+ String nsID2 = cluster.getNameservices().get(1);
+ MockResolver resolver = (MockResolver)testRouter.getNamenodeResolver();
+ resolver.registerNamenode(createNamenodeReport(nsID1, "nn1",
+ HAServiceProtocol.HAServiceState.ACTIVE));
+ resolver.registerNamenode(createNamenodeReport(nsID2, "nn1",
+ HAServiceProtocol.HAServiceState.ACTIVE));
+
+ RouterRpcServer rpcServer = testRouter.getRpcServer();
+
+ rpcServer.getRouterStateIdContext().getNamespaceStateId(nsID1);
+ rpcServer.getRouterStateIdContext().getNamespaceStateId(nsID2);
+
+ resolver.disableNamespace(nsID1);
+ Thread.sleep(3000);
+ RouterStateIdContext context = rpcServer.getRouterStateIdContext();
+ assertEquals(2, context.getNamespaceIdMap().size());
+
+ testRouter.start();
+ Thread.sleep(3000);
+ // wait clear stale namespaces
+ RouterStateIdContext routerStateIdContext = rpcServer.getRouterStateIdContext();
+ int size = routerStateIdContext.getNamespaceIdMap().size();
+ assertEquals(1, size);
+ rpcServer.stop();
+ rpcServer.close();
+ testRouter.close();
+ }
}