YARN-11275. [Federation] Add batchFinishApplicationMaster in UAMPoolManager. (#4792)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
index 47203b2..d1e86de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
@@ -22,6 +22,8 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.HashMap;
+import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorCompletionService;
@@ -450,4 +452,53 @@
uam.drainHeartbeatThread();
}
}
+
+ /**
+ * Complete FinishApplicationMaster interface calls in batches.
+ *
+ * @param request FinishApplicationMasterRequest
+ * @param appId application Id
+ * @return Returns the Map map,
+ * the key is subClusterId, the value is FinishApplicationMasterResponse
+ */
+ public Map<String, FinishApplicationMasterResponse> batchFinishApplicationMaster(
+ FinishApplicationMasterRequest request, String appId) {
+
+ Map<String, FinishApplicationMasterResponse> responseMap = new HashMap<>();
+ Set<String> subClusterIds = this.unmanagedAppMasterMap.keySet();
+
+ if (subClusterIds != null && !subClusterIds.isEmpty()) {
+ ExecutorCompletionService<Map<String, FinishApplicationMasterResponse>> finishAppService =
+ new ExecutorCompletionService<>(this.threadpool);
+ LOG.info("Sending finish application request to {} sub-cluster RMs", subClusterIds.size());
+
+ for (final String subClusterId : subClusterIds) {
+ finishAppService.submit(() -> {
+ LOG.info("Sending finish application request to RM {}", subClusterId);
+ try {
+ FinishApplicationMasterResponse uamResponse =
+ finishApplicationMaster(subClusterId, request);
+ return Collections.singletonMap(subClusterId, uamResponse);
+ } catch (Throwable e) {
+ LOG.warn("Failed to finish unmanaged application master: " +
+ " RM address: {} ApplicationId: {}", subClusterId, appId, e);
+ return Collections.singletonMap(subClusterId, null);
+ }
+ });
+ }
+
+ for (int i = 0; i < subClusterIds.size(); ++i) {
+ try {
+ Future<Map<String, FinishApplicationMasterResponse>> future = finishAppService.take();
+ Map<String, FinishApplicationMasterResponse> uamResponse = future.get();
+ LOG.debug("Received finish application response from RM: {}", uamResponse.keySet());
+ responseMap.putAll(uamResponse);
+ } catch (Throwable e) {
+ LOG.warn("Failed to finish unmanaged application master: ApplicationId: {}", appId, e);
+ }
+ }
+ }
+
+ return responseMap;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 13c062a..4641479 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -736,50 +736,26 @@
this.finishAMCalled = true;
- // TODO: consider adding batchFinishApplicationMaster in UAMPoolManager
boolean failedToUnRegister = false;
- ExecutorCompletionService<FinishApplicationMasterResponseInfo> compSvc =
- null;
// Application master is completing operation. Send the finish
// application master request to all the registered sub-cluster resource
// managers in parallel, wait for the responses and aggregate the results.
- Set<String> subClusterIds = this.uamPool.getAllUAMIds();
- if (subClusterIds.size() > 0) {
- final FinishApplicationMasterRequest finishRequest = request;
- compSvc =
- new ExecutorCompletionService<FinishApplicationMasterResponseInfo>(
- this.threadpool);
+ Map<String, FinishApplicationMasterResponse> responseMap =
+ this.uamPool.batchFinishApplicationMaster(request, attemptId.toString());
- LOG.info("Sending finish application request to {} sub-cluster RMs",
- subClusterIds.size());
- for (final String subClusterId : subClusterIds) {
- compSvc.submit(new Callable<FinishApplicationMasterResponseInfo>() {
- @Override
- public FinishApplicationMasterResponseInfo call() throws Exception {
- LOG.info("Sending finish application request to RM {}",
- subClusterId);
- FinishApplicationMasterResponse uamResponse = null;
- try {
- uamResponse =
- uamPool.finishApplicationMaster(subClusterId, finishRequest);
-
- if (uamResponse.getIsUnregistered()) {
- secondaryRelayers.remove(subClusterId);
- if (getNMStateStore() != null) {
- getNMStateStore().removeAMRMProxyAppContextEntry(attemptId,
- NMSS_SECONDARY_SC_PREFIX + subClusterId);
- }
- }
- } catch (Throwable e) {
- LOG.warn("Failed to finish unmanaged application master: "
- + "RM address: " + subClusterId + " ApplicationId: "
- + attemptId, e);
- }
- return new FinishApplicationMasterResponseInfo(uamResponse,
- subClusterId);
- }
- });
+ for (Map.Entry<String, FinishApplicationMasterResponse> entry : responseMap.entrySet()) {
+ String subClusterId = entry.getKey();
+ FinishApplicationMasterResponse response = entry.getValue();
+ if (response != null && response.getIsUnregistered()) {
+ secondaryRelayers.remove(subClusterId);
+ if (getNMStateStore() != null) {
+ getNMStateStore().removeAMRMProxyAppContextEntry(attemptId,
+ NMSS_SECONDARY_SC_PREFIX + subClusterId);
+ }
+ } else {
+ // response is null or response.getIsUnregistered() == false
+ failedToUnRegister = true;
}
}
@@ -792,30 +768,6 @@
// Stop the home heartbeat thread
this.homeHeartbeartHandler.shutdown();
- if (subClusterIds.size() > 0) {
- // Wait for other sub-cluster resource managers to return the
- // response and merge it with the home response
- LOG.info(
- "Waiting for finish application response from {} sub-cluster RMs",
- subClusterIds.size());
- for (int i = 0; i < subClusterIds.size(); ++i) {
- try {
- Future<FinishApplicationMasterResponseInfo> future = compSvc.take();
- FinishApplicationMasterResponseInfo uamResponse = future.get();
- LOG.debug("Received finish application response from RM: {}",
- uamResponse.getSubClusterId());
- if (uamResponse.getResponse() == null
- || !uamResponse.getResponse().getIsUnregistered()) {
- failedToUnRegister = true;
- }
- } catch (Throwable e) {
- failedToUnRegister = true;
- LOG.warn("Failed to finish unmanaged application master: "
- + " ApplicationId: " + this.attemptId, e);
- }
- }
- }
-
if (failedToUnRegister) {
homeResponse.setIsUnregistered(false);
} else {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index 3288382..f81eb69 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -969,4 +969,57 @@
preemptionMessage.setContract(contract);
return preemptionMessage;
}
+
+ @Test
+ public void testBatchFinishApplicationMaster() throws IOException, InterruptedException {
+
+ final RegisterApplicationMasterRequest registerReq =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq.setHost(Integer.toString(testAppId));
+ registerReq.setRpcPort(testAppId);
+ registerReq.setTrackingUrl("");
+
+ UserGroupInformation ugi = interceptor.getUGIWithToken(interceptor.getAttemptId());
+
+ ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
+
+ // Register the application
+ RegisterApplicationMasterRequest registerReq1 =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ registerReq1.setHost(Integer.toString(testAppId));
+ registerReq1.setRpcPort(0);
+ registerReq1.setTrackingUrl("");
+
+ // Register ApplicationMaster
+ RegisterApplicationMasterResponse registerResponse =
+ interceptor.registerApplicationMaster(registerReq1);
+ Assert.assertNotNull(registerResponse);
+ lastResponseId = 0;
+
+ Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+ // Allocate the first batch of containers, with sc1 and sc2 active
+ registerSubCluster(SubClusterId.newInstance("SC-1"));
+ registerSubCluster(SubClusterId.newInstance("SC-2"));
+
+ int numberOfContainers = 3;
+ List<Container> containers =
+ getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
+ Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
+ Assert.assertEquals(numberOfContainers * 2, containers.size());
+
+ // Finish the application
+ FinishApplicationMasterRequest finishReq =
+ Records.newRecord(FinishApplicationMasterRequest.class);
+ finishReq.setDiagnostics("");
+ finishReq.setTrackingUrl("");
+ finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+ FinishApplicationMasterResponse finishResp = interceptor.finishApplicationMaster(finishReq);
+ Assert.assertNotNull(finishResp);
+ Assert.assertTrue(finishResp.getIsUnregistered());
+
+ return null;
+ });
+ }
}