YARN-9013. [GPG] fix order of steps cleaning Registry entries in ApplicationCleaner. Contributed by Botong Huang.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
index 86eb536..dabb8c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
 
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.lang3.time.DurationFormatUtils;
@@ -96,6 +95,10 @@
     return this.gpgContext;
   }
 
+  public FederationRegistryClient getRegistryClient() {
+    return this.registryClient;
+  }
+
   /**
    * Query router for applications.
    *
@@ -154,18 +157,6 @@
         + " success Router queries after " + totalAttemptCount + " retries");
   }
 
-  protected void cleanupAppRecordInRegistry(Set<ApplicationId> knownApps) {
-    List<String> allApps = this.registryClient.getAllApplications();
-    LOG.info("Got " + allApps.size() + " existing apps in registry");
-    for (String app : allApps) {
-      ApplicationId appId = ApplicationId.fromString(app);
-      if (!knownApps.contains(appId)) {
-        LOG.info("removing finished application entry for {}", app);
-        this.registryClient.removeAppFromRegistry(appId, true);
-      }
-    }
-  }
-
   @Override
   public abstract void run();
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
index 3c67638..a32cfa5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
@@ -46,26 +46,37 @@
     LOG.info("Application cleaner run at time {}", now);
 
     FederationStateStoreFacade facade = getGPGContext().getStateStoreFacade();
-    Set<ApplicationId> candidates = new HashSet<ApplicationId>();
     try {
+      // Get the candidate list from StateStore before calling router
+      Set<ApplicationId> allStateStoreApps = new HashSet<ApplicationId>();
       List<ApplicationHomeSubCluster> response =
           facade.getApplicationsHomeSubCluster();
       for (ApplicationHomeSubCluster app : response) {
-        candidates.add(app.getApplicationId());
+        allStateStoreApps.add(app.getApplicationId());
       }
-      LOG.info("{} app entries in FederationStateStore", candidates.size());
+      LOG.info("{} app entries in FederationStateStore",
+          allStateStoreApps.size());
 
+      // Get the candidate list from Registry before calling router
+      List<String> allRegistryApps = getRegistryClient().getAllApplications();
+      LOG.info("{} app entries in FederationRegistry",
+          allStateStoreApps.size());
+
+      // Get the list of known apps from Router
       Set<ApplicationId> routerApps = getRouterKnownApplications();
       LOG.info("{} known applications from Router", routerApps.size());
 
-      candidates = Sets.difference(candidates, routerApps);
-      LOG.info("Deleting {} applications from statestore", candidates.size());
+      // Clean up StateStore entries
+      Set<ApplicationId> toDelete =
+          Sets.difference(allStateStoreApps, routerApps);
+      LOG.info("Deleting {} applications from statestore", toDelete.size());
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Apps to delete: ", candidates.stream().map(Object::toString)
+        LOG.debug("Apps to delete: ", toDelete.stream().map(Object::toString)
             .collect(Collectors.joining(",")));
       }
-      for (ApplicationId appId : candidates) {
+      for (ApplicationId appId : toDelete) {
         try {
+          LOG.debug("Deleting {} from statestore ", appId);
           facade.deleteApplicationHomeSubCluster(appId);
         } catch (Exception e) {
           LOG.error(
@@ -74,8 +85,15 @@
         }
       }
 
-      // Clean up registry entries
-      cleanupAppRecordInRegistry(routerApps);
+      // Clean up Registry entries
+      for (String app : allRegistryApps) {
+        ApplicationId appId = ApplicationId.fromString(app);
+        if (!routerApps.contains(appId)) {
+          LOG.debug("removing finished application entry for {}", app);
+          getRegistryClient().removeAppFromRegistry(appId, true);
+        }
+      }
+
     } catch (Throwable e) {
       LOG.error("Application cleaner started at time " + now + " fails: ", e);
     }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
index 10d442b..a4b6dc5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
@@ -63,6 +63,8 @@
   // The list of applications returned by mocked router
   private Set<ApplicationId> routerAppIds;
 
+  private ApplicationId appIdToAddConcurrently;
+
   @Before
   public void setup() throws Exception {
     conf = new YarnConfiguration();
@@ -111,6 +113,8 @@
           new Token<AMRMTokenIdentifier>());
     }
     Assert.assertEquals(3, registryClient.getAllApplications().size());
+
+    appIdToAddConcurrently = null;
   }
 
   @After
@@ -149,13 +153,45 @@
     Assert.assertEquals(1, registryClient.getAllApplications().size());
   }
 
+  @Test
+  public void testConcurrentNewApp() throws YarnException {
+    appIdToAddConcurrently = ApplicationId.newInstance(1, 1);
+
+    appCleaner.run();
+
+    // The concurrently added app should be still there
+    Assert.assertEquals(1,
+        stateStore
+            .getApplicationsHomeSubCluster(
+                GetApplicationsHomeSubClusterRequest.newInstance())
+            .getAppsHomeSubClusters().size());
+
+    // The concurrently added app should be still there
+    Assert.assertEquals(1, registryClient.getAllApplications().size());
+  }
+
   /**
    * Testable version of DefaultApplicationCleaner.
    */
   public class TestableDefaultApplicationCleaner
       extends DefaultApplicationCleaner {
+
     @Override
     public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException {
+      if (appIdToAddConcurrently != null) {
+        SubClusterId scId = SubClusterId.newInstance("MySubClusterId");
+        try {
+          stateStore
+              .addApplicationHomeSubCluster(AddApplicationHomeSubClusterRequest
+                  .newInstance(ApplicationHomeSubCluster
+                      .newInstance(appIdToAddConcurrently, scId)));
+        } catch (YarnException e) {
+          throw new YarnRuntimeException(e);
+        }
+
+        registryClient.writeAMRMTokenForUAM(appIdToAddConcurrently,
+            scId.toString(), new Token<AMRMTokenIdentifier>());
+      }
       return routerAppIds;
     }
   }