YARN-11284. [Federation] Improve UnmanagedAMPoolManager WithoutBlock ServiceStop (#4814)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
index d1e86de..1b72e40 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
@@ -24,13 +24,13 @@
import java.util.Set;
import java.util.HashMap;
import java.util.Collections;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
@@ -76,6 +76,10 @@
private ExecutorService threadpool;
+ private String dispatcherThreadName = "UnmanagedAMPoolManager-Finish-Thread";
+
+ private Thread finishApplicationThread;
+
public UnmanagedAMPoolManager(ExecutorService threadpool) {
super(UnmanagedAMPoolManager.class.getName());
this.threadpool = threadpool;
@@ -96,48 +100,16 @@
* UAMs running, force kill all of them. Do parallel kill because of
* performance reasons.
*
- * TODO: move waiting for the kill to finish into a separate thread, without
- * blocking the serviceStop.
*/
@Override
protected void serviceStop() throws Exception {
- ExecutorCompletionService<KillApplicationResponse> completionService =
- new ExecutorCompletionService<>(this.threadpool);
- if (this.unmanagedAppMasterMap.isEmpty()) {
- return;
+
+ if (!this.unmanagedAppMasterMap.isEmpty()) {
+ finishApplicationThread = new Thread(createForceFinishApplicationThread());
+ finishApplicationThread.setName(dispatcherThreadName);
+ finishApplicationThread.start();
}
- // Save a local copy of the key set so that it won't change with the map
- Set<String> addressList =
- new HashSet<>(this.unmanagedAppMasterMap.keySet());
- LOG.warn("Abnormal shutdown of UAMPoolManager, still {} UAMs in map",
- addressList.size());
-
- for (final String uamId : addressList) {
- completionService.submit(new Callable<KillApplicationResponse>() {
- @Override
- public KillApplicationResponse call() throws Exception {
- try {
- LOG.info("Force-killing UAM id " + uamId + " for application "
- + appIdMap.get(uamId));
- return unmanagedAppMasterMap.remove(uamId).forceKillApplication();
- } catch (Exception e) {
- LOG.error("Failed to kill unmanaged application master", e);
- return null;
- }
- }
- });
- }
-
- for (int i = 0; i < addressList.size(); ++i) {
- try {
- Future<KillApplicationResponse> future = completionService.take();
- future.get();
- } catch (Exception e) {
- LOG.error("Failed to kill unmanaged application master", e);
- }
- }
- this.appIdMap.clear();
super.serviceStop();
}
@@ -501,4 +473,62 @@
return responseMap;
}
+
+ Runnable createForceFinishApplicationThread() {
+ return () -> {
+
+ ExecutorCompletionService<Pair<String, KillApplicationResponse>> completionService =
+ new ExecutorCompletionService<>(threadpool);
+
+ // Save a local copy of the key set so that it won't change with the map
+ Set<String> addressList = new HashSet<>(unmanagedAppMasterMap.keySet());
+
+ LOG.warn("Abnormal shutdown of UAMPoolManager, still {} UAMs in map", addressList.size());
+
+ for (final String uamId : addressList) {
+ completionService.submit(() -> {
+ try {
+ ApplicationId appId = appIdMap.get(uamId);
+ LOG.info("Force-killing UAM id {} for application {}", uamId, appId);
+ UnmanagedApplicationManager applicationManager = unmanagedAppMasterMap.remove(uamId);
+ KillApplicationResponse response = applicationManager.forceKillApplication();
+ return Pair.of(uamId, response);
+ } catch (Exception e) {
+ LOG.error("Failed to kill unmanaged application master", e);
+ return Pair.of(uamId, null);
+ }
+ });
+ }
+
+ for (int i = 0; i < addressList.size(); ++i) {
+ try {
+ Future<Pair<String, KillApplicationResponse>> future = completionService.take();
+ Pair<String, KillApplicationResponse> pairs = future.get();
+ String uamId = pairs.getLeft();
+ ApplicationId appId = appIdMap.get(uamId);
+ KillApplicationResponse response = pairs.getRight();
+ if (response == null) {
+ throw new YarnException(
+ "Failed Force-killing UAM id " + uamId + " for application " + appId);
+ }
+ LOG.info("Force-killing UAM id = {} for application {} KillCompleted {}.",
+ uamId, appId, response.getIsKillCompleted());
+ } catch (Exception e) {
+ LOG.error("Failed to kill unmanaged application master", e);
+ }
+ }
+
+ appIdMap.clear();
+ };
+ }
+
+ @VisibleForTesting
+ protected Map<String, UnmanagedApplicationManager> getUnmanagedAppMasterMap() {
+ return unmanagedAppMasterMap;
+ }
+
+ @VisibleForTesting
+ protected Thread getFinishApplicationThread() {
+ return finishApplicationThread;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
index abb1d93..ddc85bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
@@ -20,10 +20,16 @@
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -39,6 +45,7 @@
import org.apache.hadoop.yarn.server.AMRMClientRelayer;
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
import org.apache.hadoop.yarn.util.AsyncCallback;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -58,6 +65,9 @@
private ApplicationAttemptId attemptId;
+ private UnmanagedAMPoolManager uamPool;
+ private ExecutorService threadpool;
+
@Before
public void setup() {
conf.set(YarnConfiguration.RM_CLUSTER_ID, "subclusterId");
@@ -69,6 +79,29 @@
uam = new TestableUnmanagedApplicationManager(conf,
attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true,
"rm");
+
+ threadpool = Executors.newCachedThreadPool();
+ uamPool = new TestableUnmanagedAMPoolManager(this.threadpool);
+ uamPool.init(conf);
+ uamPool.start();
+ }
+
+ @After
+ public void tearDown() throws IOException, InterruptedException {
+ if (uam != null) {
+ uam.shutDownConnections();
+ uam = null;
+ }
+ if (uamPool != null) {
+ if (uamPool.isInState(Service.STATE.STARTED)) {
+ uamPool.stop();
+ }
+ uamPool = null;
+ }
+ if (threadpool != null) {
+ threadpool.shutdownNow();
+ threadpool = null;
+ }
}
protected void waitForCallBackCountAndCheckZeroPending(
@@ -464,4 +497,49 @@
}
}
+ protected class TestableUnmanagedAMPoolManager extends UnmanagedAMPoolManager {
+ public TestableUnmanagedAMPoolManager(ExecutorService threadpool) {
+ super(threadpool);
+ }
+
+ @Override
+ public UnmanagedApplicationManager createUAM(Configuration configuration,
+ ApplicationId appId, String queueName, String submitter, String appNameSuffix,
+ boolean keepContainersAcrossApplicationAttempts, String rmId) {
+ return new TestableUnmanagedApplicationManager(configuration, appId, queueName, submitter,
+ appNameSuffix, keepContainersAcrossApplicationAttempts, rmId);
+ }
+ }
+
+ @Test
+ public void testSeparateThreadWithoutBlockServiceStop() throws Exception {
+ ApplicationAttemptId attemptId1 =
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(Time.now(), 1), 1);
+ Token<AMRMTokenIdentifier> token1 = uamPool.launchUAM("SC-1", this.conf,
+ attemptId1.getApplicationId(), "default", "test-user", "SC-HOME", true, "SC-1");
+ Assert.assertNotNull(token1);
+
+ ApplicationAttemptId attemptId2 =
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(Time.now(), 2), 1);
+ Token<AMRMTokenIdentifier> token2 = uamPool.launchUAM("SC-2", this.conf,
+ attemptId2.getApplicationId(), "default", "test-user", "SC-HOME", true, "SC-2");
+ Assert.assertNotNull(token2);
+
+ Map<String, UnmanagedApplicationManager> unmanagedAppMasterMap =
+ uamPool.getUnmanagedAppMasterMap();
+ Assert.assertNotNull(unmanagedAppMasterMap);
+ Assert.assertEquals(2, unmanagedAppMasterMap.size());
+
+ // try to stop uamPool
+ uamPool.stop();
+ Assert.assertTrue(uamPool.waitForServiceToStop(2000));
+ // process force finish Application in a separate thread, not blocking the main thread
+ Assert.assertEquals(Service.STATE.STOPPED, uamPool.getServiceState());
+
+ // Wait for the thread to terminate, check if uamPool#unmanagedAppMasterMap is 0
+ Thread finishApplicationThread = uamPool.getFinishApplicationThread();
+ GenericTestUtils.waitFor(() -> !finishApplicationThread.isAlive(),
+ 100, 2000);
+ Assert.assertEquals(0, unmanagedAppMasterMap.size());
+ }
}
\ No newline at end of file