YARN-11289. [Federation] Improve NM FederationInterceptor removeAppFromRegistry. (#4836)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 85a14f7..bf18561 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -66,6 +66,7 @@
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.client.AMRMClientUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
@@ -771,18 +772,25 @@
 
     if (failedToUnRegister) {
       homeResponse.setIsUnregistered(false);
-    } else {
+    } else if (checkRequestFinalApplicationStatusSuccess(request)) {
       // Clean up UAMs only when the app finishes successfully, so that no more
       // attempt will be launched.
       this.uamPool.stop();
-      if (this.registryClient != null) {
-        this.registryClient
-            .removeAppFromRegistry(this.attemptId.getApplicationId());
-      }
+      removeAppFromRegistry();
     }
     return homeResponse;
   }
 
+  private boolean checkRequestFinalApplicationStatusSuccess(
+      FinishApplicationMasterRequest request) {
+    if (request != null && request.getFinalApplicationStatus() != null) {
+      if (request.getFinalApplicationStatus().equals(FinalApplicationStatus.SUCCEEDED)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   @Override
   public void setNextInterceptor(RequestInterceptor next) {
     throw new YarnRuntimeException(
@@ -818,9 +826,21 @@
     this.homeHeartbeartHandler.shutdown();
     this.homeRMRelayer.shutdown();
 
+    // Shutdown needs to clean up app
+    removeAppFromRegistry();
+
     super.shutdown();
   }
 
+  private void removeAppFromRegistry() {
+    if (this.registryClient != null && this.attemptId != null) {
+      ApplicationId applicationId = this.attemptId.getApplicationId();
+      if (applicationId != null) {
+        this.registryClient.removeAppFromRegistry(applicationId);
+      }
+    }
+  }
+
   /**
    * Only for unit test cleanup.
    */
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index 59bc467..f0659a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -58,6 +58,7 @@
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.UpdateContainerError;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
@@ -71,6 +72,7 @@
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
@@ -1119,4 +1121,125 @@
       return null;
     });
   }
+
+  @Test
+  public void testRemoveAppFromRegistryApplicationSuccess()
+      throws IOException, InterruptedException {
+
+    final RegisterApplicationMasterRequest registerReq =
+        Records.newRecord(RegisterApplicationMasterRequest.class);
+    registerReq.setHost(Integer.toString(testAppId));
+    registerReq.setRpcPort(testAppId);
+    registerReq.setTrackingUrl("");
+
+    UserGroupInformation ugi = interceptor.getUGIWithToken(interceptor.getAttemptId());
+
+    ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
+
+      // Register the application
+      RegisterApplicationMasterRequest registerReq1 =
+          Records.newRecord(RegisterApplicationMasterRequest.class);
+      registerReq1.setHost(Integer.toString(testAppId));
+      registerReq1.setRpcPort(0);
+      registerReq1.setTrackingUrl("");
+
+      // Register ApplicationMaster
+      RegisterApplicationMasterResponse registerResponse =
+          interceptor.registerApplicationMaster(registerReq1);
+      Assert.assertNotNull(registerResponse);
+      lastResponseId = 0;
+
+      Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+      // Allocate the first batch of containers, with sc1 active
+      registerSubCluster(SubClusterId.newInstance("SC-1"));
+
+      int numberOfContainers = 3;
+      List<Container> containers =
+          getContainersAndAssert(numberOfContainers, numberOfContainers);
+      Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+      Assert.assertEquals(numberOfContainers, containers.size());
+
+      // Finish the application
+      FinishApplicationMasterRequest finishReq =
+          Records.newRecord(FinishApplicationMasterRequest.class);
+      finishReq.setDiagnostics("");
+      finishReq.setTrackingUrl("");
+      finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+      FinishApplicationMasterResponse finishResp = interceptor.finishApplicationMaster(finishReq);
+      Assert.assertNotNull(finishResp);
+      Assert.assertTrue(finishResp.getIsUnregistered());
+
+      FederationRegistryClient client = interceptor.getRegistryClient();
+      List<String> applications = client.getAllApplications();
+      Assert.assertNotNull(finishResp);
+      Assert.assertEquals(0, applications.size());
+      return null;
+    });
+  }
+
+  @Test
+  public void testRemoveAppFromRegistryApplicationFailed()
+      throws IOException, InterruptedException {
+
+    final RegisterApplicationMasterRequest registerReq =
+        Records.newRecord(RegisterApplicationMasterRequest.class);
+
+    registerReq.setHost(Integer.toString(testAppId));
+    registerReq.setRpcPort(testAppId);
+    registerReq.setTrackingUrl("");
+
+    UserGroupInformation ugi = interceptor.getUGIWithToken(interceptor.getAttemptId());
+
+    ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
+
+      // Register the application
+      RegisterApplicationMasterRequest registerReq1 =
+          Records.newRecord(RegisterApplicationMasterRequest.class);
+      registerReq1.setHost(Integer.toString(testAppId));
+      registerReq1.setRpcPort(0);
+      registerReq1.setTrackingUrl("");
+
+      // Register ApplicationMaster
+      RegisterApplicationMasterResponse registerResponse =
+          interceptor.registerApplicationMaster(registerReq1);
+      Assert.assertNotNull(registerResponse);
+      lastResponseId = 0;
+
+      Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+      // Allocate the first batch of containers, with sc1 active
+      registerSubCluster(SubClusterId.newInstance("SC-1"));
+
+      int numberOfContainers = 3;
+      List<Container> containers =
+          getContainersAndAssert(numberOfContainers, numberOfContainers);
+      Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+      Assert.assertEquals(numberOfContainers, containers.size());
+
+      // Finish the application
+      FinishApplicationMasterRequest finishReq =
+          Records.newRecord(FinishApplicationMasterRequest.class);
+      finishReq.setDiagnostics("");
+      finishReq.setTrackingUrl("");
+      finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED);
+
+      // Check Registry Applications
+      // At this time, the Application should not be cleaned up because the state is not SUCCESS.
+      FederationRegistryClient client = interceptor.getRegistryClient();
+      List<String> applications = client.getAllApplications();
+      Assert.assertNotNull(applications);
+      Assert.assertEquals(1, applications.size());
+
+      // interceptor cleanupRegistry
+      ApplicationId applicationId = interceptor.getAttemptId().getApplicationId();
+      client.removeAppFromRegistry(applicationId);
+      applications = client.getAllApplications();
+      Assert.assertNotNull(applications);
+      Assert.assertEquals(0, applications.size());
+
+      return null;
+    });
+  }
 }