YARN-10885. Make FederationStateStoreFacade#getApplicationHomeSubCluster use JCache. (#4701)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index 56aeba8..2f65bc2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -92,6 +92,8 @@
private static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters";
private static final String GET_POLICIES_CONFIGURATIONS_CACHEID =
"getPoliciesConfigurations";
+ private static final String GET_APPLICATION_HOME_SUBCLUSTER_CACHEID =
+ "getApplicationHomeSubCluster";
private static final FederationStateStoreFacade FACADE =
new FederationStateStoreFacade();
@@ -382,10 +384,19 @@
*/
public SubClusterId getApplicationHomeSubCluster(ApplicationId appId)
throws YarnException {
- GetApplicationHomeSubClusterResponse response =
- stateStore.getApplicationHomeSubCluster(
+ try {
+ if (isCachingEnabled()) {
+ SubClusterId value = SubClusterId.class.cast(
+ cache.get(buildGetApplicationHomeSubClusterRequest(appId)));
+ return value;
+ } else {
+ GetApplicationHomeSubClusterResponse response = stateStore.getApplicationHomeSubCluster(
GetApplicationHomeSubClusterRequest.newInstance(appId));
- return response.getApplicationHomeSubCluster().getHomeSubCluster();
+ return response.getApplicationHomeSubCluster().getHomeSubCluster();
+ }
+ } catch (Throwable ex) {
+ throw new YarnException(ex);
+ }
}
/**
@@ -548,6 +559,26 @@
return cacheRequest;
}
+ private Object buildGetApplicationHomeSubClusterRequest(ApplicationId applicationId) {
+ final String cacheKey = buildCacheKey(getClass().getSimpleName(),
+ GET_APPLICATION_HOME_SUBCLUSTER_CACHEID, applicationId.toString());
+ CacheRequest<String, SubClusterId> cacheRequest = new CacheRequest<>(
+ cacheKey,
+ input -> {
+
+ GetApplicationHomeSubClusterRequest request =
+ GetApplicationHomeSubClusterRequest.newInstance(applicationId);
+ GetApplicationHomeSubClusterResponse response =
+ stateStore.getApplicationHomeSubCluster(request);
+
+ ApplicationHomeSubCluster appHomeSubCluster = response.getApplicationHomeSubCluster();
+ SubClusterId subClusterId = appHomeSubCluster.getHomeSubCluster();
+
+ return subClusterId;
+ });
+ return cacheRequest;
+ }
+
protected String buildCacheKey(String typeName, String methodName,
String argName) {
StringBuilder buffer = new StringBuilder();
@@ -645,6 +676,15 @@
TResult invoke(T input) throws Exception;
}
+ @VisibleForTesting
+ public Cache<Object, Object> getCache() {
+ return cache;
+ }
+
+ @VisibleForTesting
+ protected Object getAppHomeSubClusterCacheRequest(ApplicationId applicationId) {
+ return buildGetApplicationHomeSubClusterRequest(applicationId);
+ }
@VisibleForTesting
public FederationStateStore getStateStore() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java
index 56fa052..0606f5c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java
@@ -41,6 +41,8 @@
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
+import javax.cache.Cache;
+
/**
* Unit tests for FederationStateStoreFacade.
*/
@@ -64,12 +66,14 @@
private FederationStateStoreTestUtil stateStoreTestUtil;
private FederationStateStoreFacade facade =
FederationStateStoreFacade.getInstance();
+ private Boolean isCachingEnabled;
public TestFederationStateStoreFacade(Boolean isCachingEnabled) {
conf = new Configuration();
if (!(isCachingEnabled.booleanValue())) {
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
}
+ this.isCachingEnabled = isCachingEnabled;
}
@Before
@@ -206,4 +210,26 @@
Assert.assertEquals(subClusterId1, result);
}
+ @Test
+ public void testGetApplicationHomeSubClusterCache() throws YarnException {
+ ApplicationId appId = ApplicationId.newInstance(clusterTs, numApps + 1);
+ SubClusterId subClusterId1 = SubClusterId.newInstance("Home1");
+
+ ApplicationHomeSubCluster appHomeSubCluster =
+ ApplicationHomeSubCluster.newInstance(appId, subClusterId1);
+ SubClusterId subClusterIdAdd = facade.addApplicationHomeSubCluster(appHomeSubCluster);
+
+ SubClusterId subClusterIdByFacade = facade.getApplicationHomeSubCluster(appId);
+ Assert.assertEquals(subClusterIdByFacade, subClusterIdAdd);
+ Assert.assertEquals(subClusterId1, subClusterIdAdd);
+
+ if (isCachingEnabled.booleanValue()) {
+ Cache<Object, Object> cache = facade.getCache();
+ Object cacheKey = facade.getAppHomeSubClusterCacheRequest(appId);
+ Object subClusterIdByCache = cache.get(cacheKey);
+ Assert.assertEquals(subClusterIdByFacade, subClusterIdByCache);
+ Assert.assertEquals(subClusterId1, subClusterIdByCache);
+ }
+ }
+
}