YARN-11289. [Federation] Improve NM FederationInterceptor removeAppFromRegistry. (#4836)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 85a14f7..bf18561 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -66,6 +66,7 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.client.AMRMClientUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
@@ -771,18 +772,25 @@
if (failedToUnRegister) {
homeResponse.setIsUnregistered(false);
- } else {
+ } else if (checkRequestFinalApplicationStatusSuccess(request)) {
// Clean up UAMs only when the app finishes successfully, so that no more
// attempt will be launched.
this.uamPool.stop();
- if (this.registryClient != null) {
- this.registryClient
- .removeAppFromRegistry(this.attemptId.getApplicationId());
- }
+ removeAppFromRegistry();
}
return homeResponse;
}
+ private boolean checkRequestFinalApplicationStatusSuccess(
+ FinishApplicationMasterRequest request) {
+ if (request != null && request.getFinalApplicationStatus() != null) {
+ if (request.getFinalApplicationStatus().equals(FinalApplicationStatus.SUCCEEDED)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
public void setNextInterceptor(RequestInterceptor next) {
throw new YarnRuntimeException(
@@ -818,9 +826,21 @@
this.homeHeartbeartHandler.shutdown();
this.homeRMRelayer.shutdown();
+ // Shutdown needs to clean up app
+ removeAppFromRegistry();
+
super.shutdown();
}
+ private void removeAppFromRegistry() {
+ if (this.registryClient != null && this.attemptId != null) {
+ ApplicationId applicationId = this.attemptId.getApplicationId();
+ if (applicationId != null) {
+ this.registryClient.removeAppFromRegistry(applicationId);
+ }
+ }
+ }
+
/**
* Only for unit test cleanup.
*/
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index 59bc467..f0659a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -58,6 +58,7 @@
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
@@ -71,6 +72,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
@@ -1119,4 +1121,125 @@
return null;
});
}
+
+ @Test
+ public void testRemoveAppFromRegistryApplicationSuccess()
+ throws IOException, InterruptedException {
+
+ final RegisterApplicationMasterRequest registerReq =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq.setHost(Integer.toString(testAppId));
+ registerReq.setRpcPort(testAppId);
+ registerReq.setTrackingUrl("");
+
+ UserGroupInformation ugi = interceptor.getUGIWithToken(interceptor.getAttemptId());
+
+ ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
+
+ // Register the application
+ RegisterApplicationMasterRequest registerReq1 =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq1.setHost(Integer.toString(testAppId));
+ registerReq1.setRpcPort(0);
+ registerReq1.setTrackingUrl("");
+
+ // Register ApplicationMaster
+ RegisterApplicationMasterResponse registerResponse =
+ interceptor.registerApplicationMaster(registerReq1);
+ Assert.assertNotNull(registerResponse);
+ lastResponseId = 0;
+
+ Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+ // Allocate the first batch of containers, with sc1 active
+ registerSubCluster(SubClusterId.newInstance("SC-1"));
+
+ int numberOfContainers = 3;
+ List<Container> containers =
+ getContainersAndAssert(numberOfContainers, numberOfContainers);
+ Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+ Assert.assertEquals(numberOfContainers, containers.size());
+
+ // Finish the application
+ FinishApplicationMasterRequest finishReq =
+ Records.newRecord(FinishApplicationMasterRequest.class);
+ finishReq.setDiagnostics("");
+ finishReq.setTrackingUrl("");
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+ FinishApplicationMasterResponse finishResp = interceptor.finishApplicationMaster(finishReq);
+ Assert.assertNotNull(finishResp);
+ Assert.assertTrue(finishResp.getIsUnregistered());
+
+ FederationRegistryClient client = interceptor.getRegistryClient();
+ List<String> applications = client.getAllApplications();
+ Assert.assertNotNull(finishResp);
+ Assert.assertEquals(0, applications.size());
+ return null;
+ });
+ }
+
+ @Test
+ public void testRemoveAppFromRegistryApplicationFailed()
+ throws IOException, InterruptedException {
+
+ final RegisterApplicationMasterRequest registerReq =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+
+ registerReq.setHost(Integer.toString(testAppId));
+ registerReq.setRpcPort(testAppId);
+ registerReq.setTrackingUrl("");
+
+ UserGroupInformation ugi = interceptor.getUGIWithToken(interceptor.getAttemptId());
+
+ ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
+
+ // Register the application
+ RegisterApplicationMasterRequest registerReq1 =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq1.setHost(Integer.toString(testAppId));
+ registerReq1.setRpcPort(0);
+ registerReq1.setTrackingUrl("");
+
+ // Register ApplicationMaster
+ RegisterApplicationMasterResponse registerResponse =
+ interceptor.registerApplicationMaster(registerReq1);
+ Assert.assertNotNull(registerResponse);
+ lastResponseId = 0;
+
+ Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+ // Allocate the first batch of containers, with sc1 active
+ registerSubCluster(SubClusterId.newInstance("SC-1"));
+
+ int numberOfContainers = 3;
+ List<Container> containers =
+ getContainersAndAssert(numberOfContainers, numberOfContainers);
+ Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+ Assert.assertEquals(numberOfContainers, containers.size());
+
+ // Finish the application
+ FinishApplicationMasterRequest finishReq =
+ Records.newRecord(FinishApplicationMasterRequest.class);
+ finishReq.setDiagnostics("");
+ finishReq.setTrackingUrl("");
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED);
+
+ // Check Registry Applications
+ // At this time, the Application should not be cleaned up because the state is not SUCCESS.
+ FederationRegistryClient client = interceptor.getRegistryClient();
+ List<String> applications = client.getAllApplications();
+ Assert.assertNotNull(applications);
+ Assert.assertEquals(1, applications.size());
+
+ // interceptor cleanupRegistry
+ ApplicationId applicationId = interceptor.getAttemptId().getApplicationId();
+ client.removeAppFromRegistry(applicationId);
+ applications = client.getAllApplications();
+ Assert.assertNotNull(applications);
+ Assert.assertEquals(0, applications.size());
+
+ return null;
+ });
+ }
}