YARN-11290. Improve Query Condition of FederationStateStore#getApplicationsHomeSubCluster. (#4846)
diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql
index 9434ed3..6461cf2 100644
--- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql
+++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql
@@ -122,10 +122,21 @@
WHERE applicationId = applicationID_IN;
END //
-CREATE PROCEDURE sp_getApplicationsHomeSubCluster()
+CREATE PROCEDURE sp_getApplicationsHomeSubCluster(IN limit_IN int, IN homeSubCluster_IN varchar(256))
BEGIN
- SELECT applicationId, homeSubCluster
- FROM applicationsHomeSubCluster;
+ SELECT
+ applicationId,
+ homeSubCluster,
+ createTime
+ FROM (SELECT
+ applicationId,
+ homeSubCluster,
+ createTime,
+ @rownum := 0
+ FROM applicationshomesubcluster
+ ORDER BY createTime DESC) AS applicationshomesubcluster
+ WHERE (homeSubCluster_IN = '' OR homeSubCluster = homeSubCluster_IN)
+ AND (@rownum := @rownum + 1) <= limit_IN;
END //
CREATE PROCEDURE sp_deleteApplicationHomeSubCluster(
diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreTables.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreTables.sql
index 61f47d4..d61a10f 100644
--- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreTables.sql
+++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/MySQL/FederationStateStoreTables.sql
@@ -22,7 +22,8 @@
CREATE TABLE applicationsHomeSubCluster(
applicationId varchar(64) NOT NULL,
- homeSubCluster varchar(256) NULL,
+ homeSubCluster varchar(256) NOT NULL,
+ createTime datetime NOT NULL,
CONSTRAINT pk_applicationId PRIMARY KEY (applicationId)
);
diff --git a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoredProcs.sql b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoredProcs.sql
index ab17aae..17f9e969 100644
--- a/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoredProcs.sql
+++ b/hadoop-yarn-project/hadoop-yarn/bin/FederationStateStore/SQLServer/FederationStateStoreStoredProcs.sql
@@ -111,12 +111,26 @@
GO
CREATE PROCEDURE [dbo].[sp_getApplicationsHomeSubCluster]
+ @limit int,
+ @homeSubCluster VARCHAR(256)
AS BEGIN
DECLARE @errorMessage nvarchar(4000)
BEGIN TRY
- SELECT [applicationId], [homeSubCluster], [createTime]
- FROM [dbo].[applicationsHomeSubCluster]
+
+ SELECT
+ [applicationId],
+ [homeSubCluster],
+ [createTime]
+ FROM(SELECT
+ [applicationId],
+ [homeSubCluster],
+ [createTime],
+ row_number() over(order by [createTime] desc) AS app_rank
+ FROM [dbo].[applicationsHomeSubCluster]
+ WHERE [homeSubCluster] = @homeSubCluster OR @homeSubCluster = '') AS applicationsHomeSubCluster
+ WHERE app_rank <= @limit;
+
END TRY
BEGIN CATCH
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index fd4cb0c..d5e1206 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -4056,6 +4056,11 @@
public static final int DEFAULT_FEDERATION_STATESTORE_SQL_MAXCONNECTIONS = 1;
+ public static final String FEDERATION_STATESTORE_MAX_APPLICATIONS =
+ FEDERATION_PREFIX + "state-store.max-applications";
+
+ public static final int DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS = 1000;
+
public static final String ROUTER_PREFIX = YARN_PREFIX + "router.";
public static final String ROUTER_BIND_HOST = ROUTER_PREFIX + "bind-host";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 92a7298..5132d41 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -5007,4 +5007,13 @@
</description>
</property>
+ <property>
+ <name>yarn.federation.state-store.max-applications</name>
+ <value>1000</value>
+ <description>
+ Yarn federation state-store supports querying the maximum number of apps.
+ Default is 1000.
+ </description>
+ </property>
+
</configuration>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
index b4c99ba..4d545fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
@@ -27,12 +27,16 @@
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.Comparator;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
@@ -90,6 +94,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils.filterHomeSubCluster;
+
/**
* In-memory implementation of {@link FederationStateStore}.
*/
@@ -100,6 +106,7 @@
private Map<ReservationId, SubClusterId> reservations;
private Map<String, SubClusterPolicyConfiguration> policies;
private RouterRMDTSecretManagerState routerRMSecretManagerState;
+ private int maxAppsInStateStore;
private final MonotonicClock clock = new MonotonicClock();
@@ -113,6 +120,9 @@
reservations = new ConcurrentHashMap<ReservationId, SubClusterId>();
policies = new ConcurrentHashMap<String, SubClusterPolicyConfiguration>();
routerRMSecretManagerState = new RouterRMDTSecretManagerState();
+ maxAppsInStateStore = conf.getInt(
+ YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
+ YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS);
}
@Override
@@ -266,17 +276,28 @@
@Override
public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
GetApplicationsHomeSubClusterRequest request) throws YarnException {
- List<ApplicationHomeSubCluster> result =
- new ArrayList<ApplicationHomeSubCluster>();
- for (Entry<ApplicationId, SubClusterId> e : applications.entrySet()) {
- result
- .add(ApplicationHomeSubCluster.newInstance(e.getKey(), e.getValue()));
+
+ if (request == null) {
+ throw new YarnException("Missing getApplicationsHomeSubCluster request");
}
- GetApplicationsHomeSubClusterResponse.newInstance(result);
+ SubClusterId requestSC = request.getSubClusterId();
+ List<ApplicationHomeSubCluster> result = applications.keySet().stream()
+ .map(applicationId -> generateAppHomeSC(applicationId))
+ .sorted(Comparator.comparing(ApplicationHomeSubCluster::getCreateTime).reversed())
+ .filter(appHomeSC -> filterHomeSubCluster(requestSC, appHomeSC.getHomeSubCluster()))
+ .limit(maxAppsInStateStore)
+ .collect(Collectors.toList());
+
+ LOG.info("filterSubClusterId = {}, appCount = {}.", requestSC, result.size());
return GetApplicationsHomeSubClusterResponse.newInstance(result);
}
+ private ApplicationHomeSubCluster generateAppHomeSC(ApplicationId applicationId) {
+ SubClusterId subClusterId = applications.get(applicationId);
+ return ApplicationHomeSubCluster.newInstance(applicationId, Time.now(), subClusterId);
+ }
+
@Override
public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
DeleteApplicationHomeSubClusterRequest request) throws YarnException {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
index 22a10f5..889c1e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
@@ -136,7 +136,7 @@
"{call sp_getApplicationHomeSubCluster(?, ?)}";
private static final String CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER =
- "{call sp_getApplicationsHomeSubCluster()}";
+ "{call sp_getApplicationsHomeSubCluster(?, ?)}";
private static final String CALL_SP_SET_POLICY_CONFIGURATION =
"{call sp_setPolicyConfiguration(?, ?, ?, ?)}";
@@ -176,6 +176,7 @@
private final Clock clock = new MonotonicClock();
@VisibleForTesting
Connection conn = null;
+ private int maxAppsInStateStore;
@Override
public void init(Configuration conf) throws YarnException {
@@ -215,6 +216,10 @@
FederationStateStoreUtils.logAndThrowRetriableException(LOG,
"Not able to get Connection", e);
}
+
+ maxAppsInStateStore = conf.getInt(
+ YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
+ YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS);
}
@Override
@@ -748,24 +753,35 @@
@Override
public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
GetApplicationsHomeSubClusterRequest request) throws YarnException {
+
+ if (request == null) {
+ throw new YarnException("Missing getApplicationsHomeSubCluster request");
+ }
+
CallableStatement cstmt = null;
ResultSet rs = null;
- List<ApplicationHomeSubCluster> appsHomeSubClusters =
- new ArrayList<ApplicationHomeSubCluster>();
+ List<ApplicationHomeSubCluster> appsHomeSubClusters = new ArrayList<>();
try {
cstmt = getCallableStatement(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER);
+ cstmt.setInt("limit_IN", maxAppsInStateStore);
+ String homeSubClusterIN = StringUtils.EMPTY;
+ SubClusterId subClusterId = request.getSubClusterId();
+ if (subClusterId != null) {
+ homeSubClusterIN = subClusterId.toString();
+ }
+ cstmt.setString("homeSubCluster_IN", homeSubClusterIN);
// Execute the query
long startTime = clock.getTime();
rs = cstmt.executeQuery();
long stopTime = clock.getTime();
- while (rs.next()) {
+ while (rs.next() && appsHomeSubClusters.size() <= maxAppsInStateStore) {
// Extract the output for each tuple
- String applicationId = rs.getString(1);
- String homeSubCluster = rs.getString(2);
+ String applicationId = rs.getString("applicationId");
+ String homeSubCluster = rs.getString("homeSubCluster");
appsHomeSubClusters.add(ApplicationHomeSubCluster.newInstance(
ApplicationId.fromString(applicationId),
@@ -783,8 +799,8 @@
// Return to the pool the CallableStatement
FederationStateStoreUtils.returnToPool(LOG, cstmt, null, rs);
}
- return GetApplicationsHomeSubClusterResponse
- .newInstance(appsHomeSubClusters);
+
+ return GetApplicationsHomeSubClusterResponse.newInstance(appsHomeSubClusters);
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java
index 18dfdc2..affd4ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java
@@ -24,10 +24,13 @@
import java.util.Calendar;
import java.util.List;
import java.util.TimeZone;
+import java.util.Comparator;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -98,6 +101,8 @@
import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException;
+import static org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils.filterHomeSubCluster;
+
/**
* ZooKeeper implementation of {@link FederationStateStore}.
*
@@ -136,6 +141,7 @@
private String membershipZNode;
private String policiesZNode;
private String reservationsZNode;
+ private int maxAppsInStateStore;
private volatile Clock clock = SystemClock.getInstance();
@@ -147,6 +153,10 @@
public void init(Configuration conf) throws YarnException {
LOG.info("Initializing ZooKeeper connection");
+ maxAppsInStateStore = conf.getInt(
+ YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
+ YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_MAX_APPLICATIONS);
+
baseZNode = conf.get(
YarnConfiguration.FEDERATION_STATESTORE_ZK_PARENT_PATH,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_ZK_PARENT_PATH);
@@ -258,24 +268,44 @@
@Override
public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
GetApplicationsHomeSubClusterRequest request) throws YarnException {
- long start = clock.getTime();
- List<ApplicationHomeSubCluster> result = new ArrayList<>();
+
+ if (request == null) {
+ throw new YarnException("Missing getApplicationsHomeSubCluster request");
+ }
try {
- for (String child : zkManager.getChildren(appsZNode)) {
- ApplicationId appId = ApplicationId.fromString(child);
- SubClusterId homeSubCluster = getApp(appId);
- ApplicationHomeSubCluster app =
- ApplicationHomeSubCluster.newInstance(appId, homeSubCluster);
- result.add(app);
- }
+ long start = clock.getTime();
+ SubClusterId requestSC = request.getSubClusterId();
+ List<String> children = zkManager.getChildren(appsZNode);
+ List<ApplicationHomeSubCluster> result = children.stream()
+ .map(child -> generateAppHomeSC(child))
+ .sorted(Comparator.comparing(ApplicationHomeSubCluster::getCreateTime).reversed())
+ .filter(appHomeSC -> filterHomeSubCluster(requestSC, appHomeSC.getHomeSubCluster()))
+ .limit(maxAppsInStateStore)
+ .collect(Collectors.toList());
+ long end = clock.getTime();
+ opDurations.addGetAppsHomeSubClusterDuration(start, end);
+ LOG.info("filterSubClusterId = {}, appCount = {}.", requestSC, result.size());
+ return GetApplicationsHomeSubClusterResponse.newInstance(result);
} catch (Exception e) {
String errMsg = "Cannot get apps: " + e.getMessage();
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
}
- long end = clock.getTime();
- opDurations.addGetAppsHomeSubClusterDuration(start, end);
- return GetApplicationsHomeSubClusterResponse.newInstance(result);
+
+ throw new YarnException("Cannot get app by request");
+ }
+
+ private ApplicationHomeSubCluster generateAppHomeSC(String appId) {
+ try {
+ ApplicationId applicationId = ApplicationId.fromString(appId);
+ SubClusterId homeSubCluster = getApp(applicationId);
+ ApplicationHomeSubCluster app =
+ ApplicationHomeSubCluster.newInstance(applicationId, Time.now(), homeSubCluster);
+ return app;
+ } catch (Exception ex) {
+ LOG.error("get homeSubCluster by appId = {}.", appId);
+ }
+ return null;
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/ApplicationHomeSubCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/ApplicationHomeSubCluster.java
index 5e4c7cc..898e11f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/ApplicationHomeSubCluster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/ApplicationHomeSubCluster.java
@@ -51,6 +51,17 @@
return appMapping;
}
+ @Private
+ @Unstable
+ public static ApplicationHomeSubCluster newInstance(ApplicationId appId, long createTime,
+ SubClusterId homeSubCluster) {
+ ApplicationHomeSubCluster appMapping = Records.newRecord(ApplicationHomeSubCluster.class);
+ appMapping.setApplicationId(appId);
+ appMapping.setHomeSubCluster(homeSubCluster);
+ appMapping.setCreateTime(createTime);
+ return appMapping;
+ }
+
/**
* Get the {@link ApplicationId} representing the unique identifier of the
* application.
@@ -91,6 +102,25 @@
@Unstable
public abstract void setHomeSubCluster(SubClusterId homeSubCluster);
+ /**
+ * Get the create time of the subcluster.
+ *
+ * @return the state of the subcluster
+ */
+ @Public
+ @Unstable
+ public abstract long getCreateTime();
+
+ /**
+ * Set the create time of the subcluster.
+ *
+ * @param time the last heartbeat time of the subcluster
+ */
+ @Private
+ @Unstable
+ public abstract void setCreateTime(long time);
+
+
@Override
public boolean equals(Object obj) {
if (this == obj) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationsHomeSubClusterRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationsHomeSubClusterRequest.java
index 6054972..06b6987 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationsHomeSubClusterRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationsHomeSubClusterRequest.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.yarn.server.federation.store.records;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
@@ -37,4 +38,33 @@
return request;
}
+ @Private
+ @Unstable
+ public static GetApplicationsHomeSubClusterRequest
+ newInstance(SubClusterId subClusterId) {
+ GetApplicationsHomeSubClusterRequest request =
+ Records.newRecord(GetApplicationsHomeSubClusterRequest.class);
+ request.setSubClusterId(subClusterId);
+ return request;
+ }
+
+ /**
+ * Get the {@link SubClusterId} representing the unique identifier of the
+ * subcluster.
+ *
+ * @return the subcluster identifier
+ */
+ @Public
+ @Unstable
+ public abstract SubClusterId getSubClusterId();
+
+ /**
+ * Set the {@link SubClusterId} representing the unique identifier of the
+ * subcluster.
+ *
+ * @param subClusterId the subcluster identifier
+ */
+ @Public
+ @Unstable
+ public abstract void setSubClusterId(SubClusterId subClusterId);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java
index 05b0b62..a72a431 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/ApplicationHomeSubClusterPBImpl.java
@@ -149,6 +149,16 @@
this.homeSubCluster = homeSubCluster;
}
+ @Override
+ public long getCreateTime() {
+ return 0;
+ }
+
+ @Override
+ public void setCreateTime(long time) {
+
+ }
+
private SubClusterId convertFromProtoFormat(SubClusterIdProto subClusterId) {
return new SubClusterIdPBImpl(subClusterId);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsHomeSubClusterRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsHomeSubClusterRequestPBImpl.java
index a3c1c1a..1a75044 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsHomeSubClusterRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationsHomeSubClusterRequestPBImpl.java
@@ -19,10 +19,13 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.GetApplicationsHomeSubClusterRequestProto;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.GetApplicationsHomeSubClusterRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
/**
* Protocol buffer based implementation of
@@ -75,4 +78,37 @@
return TextFormat.shortDebugString(getProto());
}
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = GetApplicationsHomeSubClusterRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public SubClusterId getSubClusterId() {
+ GetApplicationsHomeSubClusterRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasSubClusterId()) {
+ return null;
+ }
+ return convertFromProtoFormat(p.getSubClusterId());
+ }
+
+ @Override
+ public void setSubClusterId(SubClusterId subClusterId) {
+ maybeInitBuilder();
+ if (subClusterId == null) {
+ builder.clearSubClusterId();
+ return;
+ }
+ builder.setSubClusterId(convertToProtoFormat(subClusterId));
+ }
+
+ private SubClusterId convertFromProtoFormat(YarnServerFederationProtos.SubClusterIdProto sc) {
+ return new SubClusterIdPBImpl(sc);
+ }
+
+ private YarnServerFederationProtos.SubClusterIdProto convertToProtoFormat(SubClusterId sc) {
+ return ((SubClusterIdPBImpl) sc).getProto();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java
index 7dc53f8..52ef725 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -279,4 +280,30 @@
LOG.debug("NULL Credentials specified for Store connection, so ignoring");
}
}
+
+ /**
+ * Filter HomeSubCluster based on Filter SubCluster.
+ *
+ * @param filterSubCluster filter query conditions
+ * @param homeSubCluster homeSubCluster
+ * @return return true, if match filter conditions,
+ * return false, if not match filter conditions.
+ */
+ public static boolean filterHomeSubCluster(SubClusterId filterSubCluster,
+ SubClusterId homeSubCluster) {
+
+ // If the filter condition is empty,
+ // it means that homeSubCluster needs to be added
+ if (filterSubCluster == null) {
+ return true;
+ }
+
+ // If the filter condition filterSubCluster is not empty,
+ // and filterSubCluster is equal to homeSubCluster, it needs to be added
+ if (filterSubCluster.equals(homeSubCluster)) {
+ return true;
+ }
+
+ return false;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto
index ff2b970..0544a26 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto
@@ -97,6 +97,7 @@
message ApplicationHomeSubClusterProto {
optional ApplicationIdProto application_id = 1;
optional SubClusterIdProto home_sub_cluster = 2;
+ optional int64 create_time = 3;
}
message AddApplicationHomeSubClusterRequestProto {
@@ -123,7 +124,7 @@
}
message GetApplicationsHomeSubClusterRequestProto {
-
+ optional SubClusterIdProto sub_cluster_id = 1;
}
message GetApplicationsHomeSubClusterResponseProto {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
index 2587626..d5493f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
@@ -87,6 +87,8 @@
private static final MonotonicClock CLOCK = new MonotonicClock();
private FederationStateStore stateStore;
+ private static final int NUM_APPS_10 = 10;
+ private static final int NUM_APPS_20 = 20;
protected abstract FederationStateStore createStateStore();
@@ -417,6 +419,89 @@
}
@Test
+ public void testGetApplicationsHomeSubClusterEmpty() throws Exception {
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing getApplicationsHomeSubCluster request",
+ () -> stateStore.getApplicationsHomeSubCluster(null));
+ }
+
+ @Test
+ public void testGetApplicationsHomeSubClusterFilter() throws Exception {
+ // Add ApplicationHomeSC - SC1
+ long now = Time.now();
+
+ Set<ApplicationHomeSubCluster> appHomeSubClusters = new HashSet<>();
+
+ for (int i = 0; i < NUM_APPS_10; i++) {
+ ApplicationId appId = ApplicationId.newInstance(now, i);
+ SubClusterId subClusterId = SubClusterId.newInstance("SC1");
+ addApplicationHomeSC(appId, subClusterId);
+ ApplicationHomeSubCluster ahsc =
+ ApplicationHomeSubCluster.newInstance(appId, subClusterId);
+ appHomeSubClusters.add(ahsc);
+ }
+
+ // Add ApplicationHomeSC - SC2
+ for (int i = 10; i < NUM_APPS_20; i++) {
+ ApplicationId appId = ApplicationId.newInstance(now, i);
+ SubClusterId subClusterId = SubClusterId.newInstance("SC2");
+ addApplicationHomeSC(appId, subClusterId);
+ }
+
+ GetApplicationsHomeSubClusterRequest getRequest =
+ GetApplicationsHomeSubClusterRequest.newInstance();
+ getRequest.setSubClusterId(SubClusterId.newInstance("SC1"));
+
+ GetApplicationsHomeSubClusterResponse result =
+ stateStore.getApplicationsHomeSubCluster(getRequest);
+ Assert.assertNotNull(result);
+
+ List<ApplicationHomeSubCluster> items = result.getAppsHomeSubClusters();
+ Assert.assertNotNull(items);
+ Assert.assertEquals(10, items.size());
+
+ for (ApplicationHomeSubCluster item : items) {
+ Assert.assertTrue(appHomeSubClusters.contains(item));
+ }
+ }
+
+ @Test
+ public void testGetApplicationsHomeSubClusterLimit() throws Exception {
+ // Add ApplicationHomeSC - SC1
+ long now = Time.now();
+
+ for (int i = 0; i < 50; i++) {
+ ApplicationId appId = ApplicationId.newInstance(now, i);
+ SubClusterId subClusterId = SubClusterId.newInstance("SC1");
+ addApplicationHomeSC(appId, subClusterId);
+ }
+
+ GetApplicationsHomeSubClusterRequest getRequest =
+ GetApplicationsHomeSubClusterRequest.newInstance();
+ getRequest.setSubClusterId(SubClusterId.newInstance("SC1"));
+ GetApplicationsHomeSubClusterResponse result =
+ stateStore.getApplicationsHomeSubCluster(getRequest);
+ Assert.assertNotNull(result);
+
+ // Write 50 records, but get 10 records because the maximum number is limited to 10
+ List<ApplicationHomeSubCluster> items = result.getAppsHomeSubClusters();
+ Assert.assertNotNull(items);
+ Assert.assertEquals(10, items.size());
+
+ GetApplicationsHomeSubClusterRequest getRequest1 =
+ GetApplicationsHomeSubClusterRequest.newInstance();
+ getRequest1.setSubClusterId(SubClusterId.newInstance("SC2"));
+ GetApplicationsHomeSubClusterResponse result1 =
+ stateStore.getApplicationsHomeSubCluster(getRequest1);
+ Assert.assertNotNull(result1);
+
+ // SC2 data does not exist, so the number of returned records is 0
+ List<ApplicationHomeSubCluster> items1 = result1.getAppsHomeSubClusters();
+ Assert.assertNotNull(items1);
+ Assert.assertEquals(0, items1.size());
+ }
+
+ @Test
public void testUpdateApplicationHomeSubCluster() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 1);
SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java
index f11a259..e90f1dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java
@@ -31,6 +31,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.slf4j.Logger;
@@ -50,6 +51,7 @@
" CREATE TABLE applicationsHomeSubCluster ("
+ " applicationId varchar(64) NOT NULL,"
+ " homeSubCluster varchar(256) NOT NULL,"
+ + " createTime datetime NOT NULL,"
+ " CONSTRAINT pk_applicationId PRIMARY KEY (applicationId))";
private static final String TABLE_MEMBERSHIP =
@@ -149,8 +151,9 @@
+ " OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)"
+ " MODIFIES SQL DATA BEGIN ATOMIC"
+ " INSERT INTO applicationsHomeSubCluster "
- + " (applicationId,homeSubCluster) "
- + " (SELECT applicationId_IN, homeSubCluster_IN"
+ + " (applicationId,homeSubCluster,createTime) "
+ + " (SELECT applicationId_IN, homeSubCluster_IN, "
+ + " NOW() AT TIME ZONE INTERVAL '0:00' HOUR TO MINUTE"
+ " FROM applicationsHomeSubCluster"
+ " WHERE applicationId = applicationId_IN"
+ " HAVING COUNT(*) = 0 );"
@@ -179,11 +182,16 @@
+ " WHERE applicationId = applicationID_IN; END";
private static final String SP_GETAPPLICATIONSHOMESUBCLUSTER =
- "CREATE PROCEDURE sp_getApplicationsHomeSubCluster()"
+ "CREATE PROCEDURE sp_getApplicationsHomeSubCluster("
+ + "IN limit_IN int, IN homeSubCluster_IN varchar(256))"
+ " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC"
+ " DECLARE result CURSOR FOR"
- + " SELECT applicationId, homeSubCluster"
- + " FROM applicationsHomeSubCluster; OPEN result; END";
+ + " SELECT applicationId, homeSubCluster, createTime"
+ + " FROM applicationsHomeSubCluster "
+ + " WHERE ROWNUM() <= limit_IN AND "
+ + " (homeSubCluster_IN = '' OR homeSubCluster = homeSubCluster_IN) "
+ + " ORDER BY createTime desc; "
+ + " OPEN result; END";
private static final String SP_DELETEAPPLICATIONHOMESUBCLUSTER =
"CREATE PROCEDURE sp_deleteApplicationHomeSubCluster("
@@ -315,6 +323,7 @@
@Override
public void init(Configuration conf) {
try {
+ conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10);
super.init(conf);
conn = super.conn;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
index c29fc03..70dda22 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.federation.store.impl;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
/**
@@ -29,6 +30,7 @@
@Override
protected FederationStateStore createStateStore() {
Configuration conf = new Configuration();
+ conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10);
super.setConf(conf);
return new MemoryFederationStateStore();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java
index d0dec26..6f5f198 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java
@@ -84,6 +84,7 @@
DATABASE_PASSWORD);
conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL,
DATABASE_URL + System.currentTimeMillis());
+ conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10);
super.setConf(conf);
return new HSQLDBFederationStateStore();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java
index 788adef..4571371 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java
@@ -68,6 +68,7 @@
Configuration conf = new YarnConfiguration();
conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString);
+ conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS, 10);
setConf(conf);
} catch (Exception e) {
LOG.error("Cannot initialize ZooKeeper store", e);