YARN-11275. [Federation] Add batchFinishApplicationMaster in UAMPoolManager. (#4792)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
index 47203b2..d1e86de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
@@ -22,6 +22,8 @@
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.HashMap;
+import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorCompletionService;
@@ -450,4 +452,53 @@
       uam.drainHeartbeatThread();
     }
   }
+
+  /**
+   * Complete FinishApplicationMaster interface calls in batches.
+   *
+   * @param request FinishApplicationMasterRequest
+   * @param appId application Id
+   * @return Returns the Map map,
+   *         the key is subClusterId, the value is FinishApplicationMasterResponse
+   */
+  public Map<String, FinishApplicationMasterResponse> batchFinishApplicationMaster(
+      FinishApplicationMasterRequest request, String appId) {
+
+    Map<String, FinishApplicationMasterResponse> responseMap = new HashMap<>();
+    Set<String> subClusterIds = this.unmanagedAppMasterMap.keySet();
+
+    if (subClusterIds != null && !subClusterIds.isEmpty()) {
+      ExecutorCompletionService<Map<String, FinishApplicationMasterResponse>> finishAppService =
+          new ExecutorCompletionService<>(this.threadpool);
+      LOG.info("Sending finish application request to {} sub-cluster RMs", subClusterIds.size());
+
+      for (final String subClusterId : subClusterIds) {
+        finishAppService.submit(() -> {
+          LOG.info("Sending finish application request to RM {}", subClusterId);
+          try {
+            FinishApplicationMasterResponse uamResponse =
+                finishApplicationMaster(subClusterId, request);
+            return Collections.singletonMap(subClusterId, uamResponse);
+          } catch (Throwable e) {
+            LOG.warn("Failed to finish unmanaged application master: " +
+                " RM address: {} ApplicationId: {}", subClusterId, appId, e);
+            return Collections.singletonMap(subClusterId, null);
+          }
+        });
+      }
+
+      for (int i = 0; i < subClusterIds.size(); ++i) {
+        try {
+          Future<Map<String, FinishApplicationMasterResponse>> future = finishAppService.take();
+          Map<String, FinishApplicationMasterResponse> uamResponse = future.get();
+          LOG.debug("Received finish application response from RM: {}", uamResponse.keySet());
+          responseMap.putAll(uamResponse);
+        } catch (Throwable e) {
+          LOG.warn("Failed to finish unmanaged application master: ApplicationId: {}", appId, e);
+        }
+      }
+    }
+
+    return responseMap;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 13c062a..4641479 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -736,50 +736,26 @@
 
     this.finishAMCalled = true;
 
-    // TODO: consider adding batchFinishApplicationMaster in UAMPoolManager
     boolean failedToUnRegister = false;
-    ExecutorCompletionService<FinishApplicationMasterResponseInfo> compSvc =
-        null;
 
     // Application master is completing operation. Send the finish
     // application master request to all the registered sub-cluster resource
     // managers in parallel, wait for the responses and aggregate the results.
-    Set<String> subClusterIds = this.uamPool.getAllUAMIds();
-    if (subClusterIds.size() > 0) {
-      final FinishApplicationMasterRequest finishRequest = request;
-      compSvc =
-          new ExecutorCompletionService<FinishApplicationMasterResponseInfo>(
-              this.threadpool);
+    Map<String, FinishApplicationMasterResponse> responseMap =
+        this.uamPool.batchFinishApplicationMaster(request, attemptId.toString());
 
-      LOG.info("Sending finish application request to {} sub-cluster RMs",
-          subClusterIds.size());
-      for (final String subClusterId : subClusterIds) {
-        compSvc.submit(new Callable<FinishApplicationMasterResponseInfo>() {
-          @Override
-          public FinishApplicationMasterResponseInfo call() throws Exception {
-            LOG.info("Sending finish application request to RM {}",
-                subClusterId);
-            FinishApplicationMasterResponse uamResponse = null;
-            try {
-              uamResponse =
-                  uamPool.finishApplicationMaster(subClusterId, finishRequest);
-
-              if (uamResponse.getIsUnregistered()) {
-                secondaryRelayers.remove(subClusterId);
-                if (getNMStateStore() != null) {
-                  getNMStateStore().removeAMRMProxyAppContextEntry(attemptId,
-                      NMSS_SECONDARY_SC_PREFIX + subClusterId);
-                }
-              }
-            } catch (Throwable e) {
-              LOG.warn("Failed to finish unmanaged application master: "
-                  + "RM address: " + subClusterId + " ApplicationId: "
-                  + attemptId, e);
-            }
-            return new FinishApplicationMasterResponseInfo(uamResponse,
-                subClusterId);
-          }
-        });
+    for (Map.Entry<String, FinishApplicationMasterResponse> entry : responseMap.entrySet()) {
+      String subClusterId = entry.getKey();
+      FinishApplicationMasterResponse response = entry.getValue();
+      if (response != null && response.getIsUnregistered()) {
+        secondaryRelayers.remove(subClusterId);
+        if (getNMStateStore() != null) {
+          getNMStateStore().removeAMRMProxyAppContextEntry(attemptId,
+              NMSS_SECONDARY_SC_PREFIX + subClusterId);
+        }
+      } else {
+        // response is null or response.getIsUnregistered() == false
+        failedToUnRegister = true;
       }
     }
 
@@ -792,30 +768,6 @@
     // Stop the home heartbeat thread
     this.homeHeartbeartHandler.shutdown();
 
-    if (subClusterIds.size() > 0) {
-      // Wait for other sub-cluster resource managers to return the
-      // response and merge it with the home response
-      LOG.info(
-          "Waiting for finish application response from {} sub-cluster RMs",
-          subClusterIds.size());
-      for (int i = 0; i < subClusterIds.size(); ++i) {
-        try {
-          Future<FinishApplicationMasterResponseInfo> future = compSvc.take();
-          FinishApplicationMasterResponseInfo uamResponse = future.get();
-          LOG.debug("Received finish application response from RM: {}",
-              uamResponse.getSubClusterId());
-          if (uamResponse.getResponse() == null
-              || !uamResponse.getResponse().getIsUnregistered()) {
-            failedToUnRegister = true;
-          }
-        } catch (Throwable e) {
-          failedToUnRegister = true;
-          LOG.warn("Failed to finish unmanaged application master: "
-              + " ApplicationId: " + this.attemptId, e);
-        }
-      }
-    }
-
     if (failedToUnRegister) {
       homeResponse.setIsUnregistered(false);
     } else {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index 3288382..f81eb69 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -969,4 +969,57 @@
     preemptionMessage.setContract(contract);
     return preemptionMessage;
   }
+
+  @Test
+  public void testBatchFinishApplicationMaster() throws IOException, InterruptedException {
+
+    final RegisterApplicationMasterRequest registerReq =
+        Records.newRecord(RegisterApplicationMasterRequest.class);
+    registerReq.setHost(Integer.toString(testAppId));
+    registerReq.setRpcPort(testAppId);
+    registerReq.setTrackingUrl("");
+
+    UserGroupInformation ugi = interceptor.getUGIWithToken(interceptor.getAttemptId());
+
+    ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
+
+      // Register the application
+      RegisterApplicationMasterRequest registerReq1 =
+          Records.newRecord(RegisterApplicationMasterRequest.class);
+      registerReq1.setHost(Integer.toString(testAppId));
+      registerReq1.setRpcPort(0);
+      registerReq1.setTrackingUrl("");
+
+      // Register ApplicationMaster
+      RegisterApplicationMasterResponse registerResponse =
+          interceptor.registerApplicationMaster(registerReq1);
+      Assert.assertNotNull(registerResponse);
+      lastResponseId = 0;
+
+      Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+      // Allocate the first batch of containers, with sc1 and sc2 active
+      registerSubCluster(SubClusterId.newInstance("SC-1"));
+      registerSubCluster(SubClusterId.newInstance("SC-2"));
+
+      int numberOfContainers = 3;
+      List<Container> containers =
+          getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
+      Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
+      Assert.assertEquals(numberOfContainers * 2, containers.size());
+
+      // Finish the application
+      FinishApplicationMasterRequest finishReq =
+          Records.newRecord(FinishApplicationMasterRequest.class);
+      finishReq.setDiagnostics("");
+      finishReq.setTrackingUrl("");
+      finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+      FinishApplicationMasterResponse finishResp = interceptor.finishApplicationMaster(finishReq);
+      Assert.assertNotNull(finishResp);
+      Assert.assertTrue(finishResp.getIsUnregistered());
+
+      return null;
+    });
+  }
 }