YARN-11284. [Federation] Improve UnmanagedAMPoolManager WithoutBlock ServiceStop (#4814)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
index d1e86de..1b72e40 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
@@ -24,13 +24,13 @@
 import java.util.Set;
 import java.util.HashMap;
 import java.util.Collections;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
@@ -76,6 +76,10 @@
 
   private ExecutorService threadpool;
 
+  private String dispatcherThreadName = "UnmanagedAMPoolManager-Finish-Thread";
+
+  private Thread finishApplicationThread;
+
   public UnmanagedAMPoolManager(ExecutorService threadpool) {
     super(UnmanagedAMPoolManager.class.getName());
     this.threadpool = threadpool;
@@ -96,48 +100,16 @@
    * UAMs running, force kill all of them. Do parallel kill because of
    * performance reasons.
    *
-   * TODO: move waiting for the kill to finish into a separate thread, without
-   * blocking the serviceStop.
    */
   @Override
   protected void serviceStop() throws Exception {
-    ExecutorCompletionService<KillApplicationResponse> completionService =
-        new ExecutorCompletionService<>(this.threadpool);
-    if (this.unmanagedAppMasterMap.isEmpty()) {
-      return;
+
+    if (!this.unmanagedAppMasterMap.isEmpty()) {
+      finishApplicationThread = new Thread(createForceFinishApplicationThread());
+      finishApplicationThread.setName(dispatcherThreadName);
+      finishApplicationThread.start();
     }
 
-    // Save a local copy of the key set so that it won't change with the map
-    Set<String> addressList =
-        new HashSet<>(this.unmanagedAppMasterMap.keySet());
-    LOG.warn("Abnormal shutdown of UAMPoolManager, still {} UAMs in map",
-        addressList.size());
-
-    for (final String uamId : addressList) {
-      completionService.submit(new Callable<KillApplicationResponse>() {
-        @Override
-        public KillApplicationResponse call() throws Exception {
-          try {
-            LOG.info("Force-killing UAM id " + uamId + " for application "
-                + appIdMap.get(uamId));
-            return unmanagedAppMasterMap.remove(uamId).forceKillApplication();
-          } catch (Exception e) {
-            LOG.error("Failed to kill unmanaged application master", e);
-            return null;
-          }
-        }
-      });
-    }
-
-    for (int i = 0; i < addressList.size(); ++i) {
-      try {
-        Future<KillApplicationResponse> future = completionService.take();
-        future.get();
-      } catch (Exception e) {
-        LOG.error("Failed to kill unmanaged application master", e);
-      }
-    }
-    this.appIdMap.clear();
     super.serviceStop();
   }
 
@@ -501,4 +473,62 @@
 
     return responseMap;
   }
+
+  Runnable createForceFinishApplicationThread() {
+    return () -> {
+
+      ExecutorCompletionService<Pair<String, KillApplicationResponse>> completionService =
+          new ExecutorCompletionService<>(threadpool);
+
+      // Save a local copy of the key set so that it won't change with the map
+      Set<String> addressList = new HashSet<>(unmanagedAppMasterMap.keySet());
+
+      LOG.warn("Abnormal shutdown of UAMPoolManager, still {} UAMs in map", addressList.size());
+
+      for (final String uamId : addressList) {
+        completionService.submit(() -> {
+          try {
+            ApplicationId appId = appIdMap.get(uamId);
+            LOG.info("Force-killing UAM id {} for application {}", uamId, appId);
+            UnmanagedApplicationManager applicationManager = unmanagedAppMasterMap.remove(uamId);
+            KillApplicationResponse response = applicationManager.forceKillApplication();
+            return Pair.of(uamId, response);
+          } catch (Exception e) {
+            LOG.error("Failed to kill unmanaged application master", e);
+            return Pair.of(uamId, null);
+          }
+        });
+      }
+
+      for (int i = 0; i < addressList.size(); ++i) {
+        try {
+          Future<Pair<String, KillApplicationResponse>> future = completionService.take();
+          Pair<String, KillApplicationResponse> pairs = future.get();
+          String uamId = pairs.getLeft();
+          ApplicationId appId = appIdMap.get(uamId);
+          KillApplicationResponse response = pairs.getRight();
+          if (response == null) {
+            throw new YarnException(
+                "Failed Force-killing UAM id " + uamId + " for application " + appId);
+          }
+          LOG.info("Force-killing UAM id = {} for application {} KillCompleted {}.",
+              uamId, appId, response.getIsKillCompleted());
+        } catch (Exception e) {
+          LOG.error("Failed to kill unmanaged application master", e);
+        }
+      }
+
+      appIdMap.clear();
+    };
+  }
+
+  @VisibleForTesting
+  protected Map<String, UnmanagedApplicationManager> getUnmanagedAppMasterMap() {
+    return unmanagedAppMasterMap;
+  }
+
+  @VisibleForTesting
+  protected Thread getFinishApplicationThread() {
+    return finishApplicationThread;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
index abb1d93..ddc85bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java
@@ -20,10 +20,16 @@
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -39,6 +45,7 @@
 import org.apache.hadoop.yarn.server.AMRMClientRelayer;
 import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
 import org.apache.hadoop.yarn.util.AsyncCallback;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -58,6 +65,9 @@
 
   private ApplicationAttemptId attemptId;
 
+  private UnmanagedAMPoolManager uamPool;
+  private ExecutorService threadpool;
+
   @Before
   public void setup() {
     conf.set(YarnConfiguration.RM_CLUSTER_ID, "subclusterId");
@@ -69,6 +79,29 @@
     uam = new TestableUnmanagedApplicationManager(conf,
         attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true,
         "rm");
+
+    threadpool = Executors.newCachedThreadPool();
+    uamPool = new TestableUnmanagedAMPoolManager(this.threadpool);
+    uamPool.init(conf);
+    uamPool.start();
+  }
+
+  @After
+  public void tearDown() throws IOException, InterruptedException {
+    if (uam != null) {
+      uam.shutDownConnections();
+      uam = null;
+    }
+    if (uamPool != null) {
+      if (uamPool.isInState(Service.STATE.STARTED)) {
+        uamPool.stop();
+      }
+      uamPool = null;
+    }
+    if (threadpool != null) {
+      threadpool.shutdownNow();
+      threadpool = null;
+    }
   }
 
   protected void waitForCallBackCountAndCheckZeroPending(
@@ -464,4 +497,49 @@
     }
   }
 
+  protected class TestableUnmanagedAMPoolManager extends UnmanagedAMPoolManager {
+    public TestableUnmanagedAMPoolManager(ExecutorService threadpool) {
+      super(threadpool);
+    }
+
+    @Override
+    public UnmanagedApplicationManager createUAM(Configuration configuration,
+        ApplicationId appId, String queueName, String submitter, String appNameSuffix,
+        boolean keepContainersAcrossApplicationAttempts, String rmId) {
+      return new TestableUnmanagedApplicationManager(configuration, appId, queueName, submitter,
+          appNameSuffix, keepContainersAcrossApplicationAttempts, rmId);
+    }
+  }
+
+  @Test
+  public void testSeparateThreadWithoutBlockServiceStop() throws Exception {
+    ApplicationAttemptId attemptId1 =
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(Time.now(), 1), 1);
+    Token<AMRMTokenIdentifier> token1 = uamPool.launchUAM("SC-1", this.conf,
+        attemptId1.getApplicationId(), "default", "test-user", "SC-HOME", true, "SC-1");
+    Assert.assertNotNull(token1);
+
+    ApplicationAttemptId attemptId2 =
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(Time.now(), 2), 1);
+    Token<AMRMTokenIdentifier> token2 = uamPool.launchUAM("SC-2", this.conf,
+        attemptId2.getApplicationId(), "default", "test-user", "SC-HOME", true, "SC-2");
+    Assert.assertNotNull(token2);
+
+    Map<String, UnmanagedApplicationManager> unmanagedAppMasterMap =
+        uamPool.getUnmanagedAppMasterMap();
+    Assert.assertNotNull(unmanagedAppMasterMap);
+    Assert.assertEquals(2, unmanagedAppMasterMap.size());
+
+    // try to stop uamPool
+    uamPool.stop();
+    Assert.assertTrue(uamPool.waitForServiceToStop(2000));
+    // process force finish Application in a separate thread, not blocking the main thread
+    Assert.assertEquals(Service.STATE.STOPPED, uamPool.getServiceState());
+
+    // Wait for the thread to terminate, check if uamPool#unmanagedAppMasterMap is 0
+    Thread finishApplicationThread = uamPool.getFinishApplicationThread();
+    GenericTestUtils.waitFor(() -> !finishApplicationThread.isAlive(),
+        100, 2000);
+    Assert.assertEquals(0, unmanagedAppMasterMap.size());
+  }
 }
\ No newline at end of file