YARN-9013. [GPG] fix order of steps cleaning Registry entries in ApplicationCleaner. Contributed by Botong Huang.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
index 86eb536..dabb8c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/ApplicationCleaner.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.time.DurationFormatUtils;
@@ -96,6 +95,10 @@
return this.gpgContext;
}
+ public FederationRegistryClient getRegistryClient() {
+ return this.registryClient;
+ }
+
/**
* Query router for applications.
*
@@ -154,18 +157,6 @@
+ " success Router queries after " + totalAttemptCount + " retries");
}
- protected void cleanupAppRecordInRegistry(Set<ApplicationId> knownApps) {
- List<String> allApps = this.registryClient.getAllApplications();
- LOG.info("Got " + allApps.size() + " existing apps in registry");
- for (String app : allApps) {
- ApplicationId appId = ApplicationId.fromString(app);
- if (!knownApps.contains(appId)) {
- LOG.info("removing finished application entry for {}", app);
- this.registryClient.removeAppFromRegistry(appId, true);
- }
- }
- }
-
@Override
public abstract void run();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
index 3c67638..a32cfa5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/DefaultApplicationCleaner.java
@@ -46,26 +46,37 @@
LOG.info("Application cleaner run at time {}", now);
FederationStateStoreFacade facade = getGPGContext().getStateStoreFacade();
- Set<ApplicationId> candidates = new HashSet<ApplicationId>();
try {
+ // Get the candidate list from StateStore before calling router
+ Set<ApplicationId> allStateStoreApps = new HashSet<ApplicationId>();
List<ApplicationHomeSubCluster> response =
facade.getApplicationsHomeSubCluster();
for (ApplicationHomeSubCluster app : response) {
- candidates.add(app.getApplicationId());
+ allStateStoreApps.add(app.getApplicationId());
}
- LOG.info("{} app entries in FederationStateStore", candidates.size());
+ LOG.info("{} app entries in FederationStateStore",
+ allStateStoreApps.size());
+ // Get the candidate list from Registry before calling router
+ List<String> allRegistryApps = getRegistryClient().getAllApplications();
+ LOG.info("{} app entries in FederationRegistry",
+ allStateStoreApps.size());
+
+ // Get the list of known apps from Router
Set<ApplicationId> routerApps = getRouterKnownApplications();
LOG.info("{} known applications from Router", routerApps.size());
- candidates = Sets.difference(candidates, routerApps);
- LOG.info("Deleting {} applications from statestore", candidates.size());
+ // Clean up StateStore entries
+ Set<ApplicationId> toDelete =
+ Sets.difference(allStateStoreApps, routerApps);
+ LOG.info("Deleting {} applications from statestore", toDelete.size());
if (LOG.isDebugEnabled()) {
- LOG.debug("Apps to delete: ", candidates.stream().map(Object::toString)
+ LOG.debug("Apps to delete: ", toDelete.stream().map(Object::toString)
.collect(Collectors.joining(",")));
}
- for (ApplicationId appId : candidates) {
+ for (ApplicationId appId : toDelete) {
try {
+ LOG.debug("Deleting {} from statestore ", appId);
facade.deleteApplicationHomeSubCluster(appId);
} catch (Exception e) {
LOG.error(
@@ -74,8 +85,15 @@
}
}
- // Clean up registry entries
- cleanupAppRecordInRegistry(routerApps);
+ // Clean up Registry entries
+ for (String app : allRegistryApps) {
+ ApplicationId appId = ApplicationId.fromString(app);
+ if (!routerApps.contains(appId)) {
+ LOG.debug("removing finished application entry for {}", app);
+ getRegistryClient().removeAppFromRegistry(appId, true);
+ }
+ }
+
} catch (Throwable e) {
LOG.error("Application cleaner started at time " + now + " fails: ", e);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
index 10d442b..a4b6dc5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/applicationcleaner/TestDefaultApplicationCleaner.java
@@ -63,6 +63,8 @@
// The list of applications returned by mocked router
private Set<ApplicationId> routerAppIds;
+ private ApplicationId appIdToAddConcurrently;
+
@Before
public void setup() throws Exception {
conf = new YarnConfiguration();
@@ -111,6 +113,8 @@
new Token<AMRMTokenIdentifier>());
}
Assert.assertEquals(3, registryClient.getAllApplications().size());
+
+ appIdToAddConcurrently = null;
}
@After
@@ -149,13 +153,45 @@
Assert.assertEquals(1, registryClient.getAllApplications().size());
}
+ @Test
+ public void testConcurrentNewApp() throws YarnException {
+ appIdToAddConcurrently = ApplicationId.newInstance(1, 1);
+
+ appCleaner.run();
+
+ // The concurrently added app should be still there
+ Assert.assertEquals(1,
+ stateStore
+ .getApplicationsHomeSubCluster(
+ GetApplicationsHomeSubClusterRequest.newInstance())
+ .getAppsHomeSubClusters().size());
+
+ // The concurrently added app should be still there
+ Assert.assertEquals(1, registryClient.getAllApplications().size());
+ }
+
/**
* Testable version of DefaultApplicationCleaner.
*/
public class TestableDefaultApplicationCleaner
extends DefaultApplicationCleaner {
+
@Override
public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException {
+ if (appIdToAddConcurrently != null) {
+ SubClusterId scId = SubClusterId.newInstance("MySubClusterId");
+ try {
+ stateStore
+ .addApplicationHomeSubCluster(AddApplicationHomeSubClusterRequest
+ .newInstance(ApplicationHomeSubCluster
+ .newInstance(appIdToAddConcurrently, scId)));
+ } catch (YarnException e) {
+ throw new YarnRuntimeException(e);
+ }
+
+ registryClient.writeAMRMTokenForUAM(appIdToAddConcurrently,
+ scId.toString(), new Token<AMRMTokenIdentifier>());
+ }
return routerAppIds;
}
}