YARN-11180. Refactor some code of getNewApplication, submitApplication etc. (#4618)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterAuditLogger.java
index cc82087..a89d0e4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterAuditLogger.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterAuditLogger.java
@@ -49,6 +49,8 @@
public static final String SUBMIT_NEW_APP = "Submit New App";
public static final String FORCE_KILL_APP = "Force Kill App";
public static final String GET_APP_REPORT = "Get Application Report";
+ public static final String TARGET_CLIENT_RM_SERVICE = "RouterClientRMService";
+ public static final String UNKNOWN = "UNKNOWN";
}
/**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index 7fd1003..947e5f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -137,6 +137,13 @@
import org.apache.hadoop.classification.VisibleForTesting;
+import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_NEW_APP;
+import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP;
+import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_APP_REPORT;
+import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.FORCE_KILL_APP;
+import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.TARGET_CLIENT_RM_SERVICE;
+import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.UNKNOWN;
+
/**
* Extends the {@code AbstractRequestInterceptorClient} class and provides an
* implementation for federation of YARN RM and scaling an application across
@@ -228,8 +235,8 @@
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false);
UserGroupInformation realUser = user;
if (serviceAuthEnabled) {
- realUser = UserGroupInformation.createProxyUser(user.getShortUserName(),
- UserGroupInformation.getLoginUser());
+ realUser = UserGroupInformation.createProxyUser(
+ user.getShortUserName(), UserGroupInformation.getLoginUser());
}
clientRMProxy = FederationProxyProviderUtil.createRMProxy(getConf(),
ApplicationClientProtocol.class, subClusterId, realUser);
@@ -237,21 +244,17 @@
RouterServerUtil.logAndThrowException(
"Unable to create the interface to reach the SubCluster " + subClusterId, e);
}
-
clientRMProxies.put(subClusterId, clientRMProxy);
return clientRMProxy;
}
private SubClusterId getRandomActiveSubCluster(
- Map<SubClusterId, SubClusterInfo> activeSubclusters)
- throws YarnException {
-
- if (activeSubclusters == null || activeSubclusters.size() < 1) {
+ Map<SubClusterId, SubClusterInfo> activeSubClusters) throws YarnException {
+ if (activeSubClusters == null || activeSubClusters.isEmpty()) {
RouterServerUtil.logAndThrowException(
FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
}
- List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
-
+ List<SubClusterId> list = new ArrayList<>(activeSubClusters.keySet());
return list.get(rand.nextInt(list.size()));
}
@@ -276,45 +279,43 @@
public GetNewApplicationResponse getNewApplication(
GetNewApplicationRequest request) throws YarnException, IOException {
- long startTime = clock.getTime();
+ if (request == null) {
+ routerMetrics.incrAppsFailedCreated();
+ String errMsg = "Missing getNewApplication request.";
+ RouterAuditLogger.logFailure(user.getShortUserName(), GET_NEW_APP, UNKNOWN,
+ TARGET_CLIENT_RM_SERVICE, errMsg);
+ RouterServerUtil.logAndThrowException(errMsg, null);
+ }
+ long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subClustersActive =
federationFacade.getSubClusters(true);
for (int i = 0; i < numSubmitRetries; ++i) {
SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
- LOG.debug("getNewApplication try #{} on SubCluster {}", i, subClusterId);
- ApplicationClientProtocol clientRMProxy =
- getClientRMProxyForSubCluster(subClusterId);
+ LOG.info("getNewApplication try #{} on SubCluster {}.", i, subClusterId);
+ ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
GetNewApplicationResponse response = null;
try {
response = clientRMProxy.getNewApplication(request);
} catch (Exception e) {
- LOG.warn("Unable to create a new ApplicationId in SubCluster "
- + subClusterId.getId(), e);
- }
-
- if (response != null) {
-
- long stopTime = clock.getTime();
- routerMetrics.succeededAppsCreated(stopTime - startTime);
- RouterAuditLogger.logSuccess(user.getShortUserName(),
- RouterAuditLogger.AuditConstants.GET_NEW_APP,
- "RouterClientRMService", response.getApplicationId());
- return response;
- } else {
- // Empty response from the ResourceManager.
- // Blacklist this subcluster for this request.
+ LOG.warn("Unable to create a new ApplicationId in SubCluster {}.", subClusterId.getId(), e);
subClustersActive.remove(subClusterId);
}
+ if (response != null) {
+ long stopTime = clock.getTime();
+ routerMetrics.succeededAppsCreated(stopTime - startTime);
+ RouterAuditLogger.logSuccess(user.getShortUserName(), GET_NEW_APP,
+ TARGET_CLIENT_RM_SERVICE, response.getApplicationId());
+ return response;
+ }
}
routerMetrics.incrAppsFailedCreated();
- String errMsg = "Fail to create a new application.";
- RouterAuditLogger.logFailure(user.getShortUserName(),
- RouterAuditLogger.AuditConstants.GET_NEW_APP, "UNKNOWN",
- "RouterClientRMService", errMsg);
+ String errMsg = "Failed to create a new application.";
+ RouterAuditLogger.logFailure(user.getShortUserName(), GET_NEW_APP, UNKNOWN,
+ TARGET_CLIENT_RM_SERVICE, errMsg);
throw new YarnException(errMsg);
}
@@ -387,21 +388,20 @@
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException, IOException {
- long startTime = clock.getTime();
-
if (request == null || request.getApplicationSubmissionContext() == null
- || request.getApplicationSubmissionContext()
- .getApplicationId() == null) {
+ || request.getApplicationSubmissionContext().getApplicationId() == null) {
routerMetrics.incrAppsFailedSubmitted();
String errMsg =
- "Missing submitApplication request or applicationSubmissionContext "
- + "information.";
- RouterAuditLogger.logFailure(user.getShortUserName(),
- RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN",
- "RouterClientRMService", errMsg);
- throw new YarnException(errMsg);
+ "Missing submitApplication request or applicationSubmissionContext information.";
+ RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
+ TARGET_CLIENT_RM_SERVICE, errMsg);
+ RouterServerUtil.logAndThrowException(errMsg, null);
}
+ SubmitApplicationResponse response = null;
+
+ long startTime = clock.getTime();
+
ApplicationId applicationId =
request.getApplicationSubmissionContext().getApplicationId();
@@ -411,8 +411,9 @@
SubClusterId subClusterId = policyFacade.getHomeSubcluster(
request.getApplicationSubmissionContext(), blacklist);
- LOG.info("submitApplication appId {} try #{} on SubCluster {}.", applicationId, i,
- subClusterId);
+
+ LOG.info("submitApplication appId {} try #{} on SubCluster {}.",
+ applicationId, i, subClusterId);
ApplicationHomeSubCluster appHomeSubCluster =
ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
@@ -425,12 +426,12 @@
federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
} catch (YarnException e) {
routerMetrics.incrAppsFailedSubmitted();
- String message = "Unable to insert the ApplicationId " + applicationId
- + " into the FederationStateStore";
- RouterAuditLogger.logFailure(user.getShortUserName(),
- RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN",
- "RouterClientRMService", message, applicationId, subClusterId);
- throw new YarnException(message, e);
+ String message =
+ String.format("Unable to insert the ApplicationId %s into the FederationStateStore.",
+ applicationId);
+ RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
+ TARGET_CLIENT_RM_SERVICE, message, applicationId, subClusterId);
+ RouterServerUtil.logAndThrowException(message, e);
}
} else {
try {
@@ -438,19 +439,19 @@
// the new subClusterId we have selected
federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster);
} catch (YarnException e) {
- String message = "Unable to update the ApplicationId " + applicationId
- + " into the FederationStateStore";
+ String message =
+ String.format("Unable to update the ApplicationId %s into the FederationStateStore.",
+ applicationId);
SubClusterId subClusterIdInStateStore =
federationFacade.getApplicationHomeSubCluster(applicationId);
if (subClusterId == subClusterIdInStateStore) {
- LOG.info("Application {} already submitted on SubCluster {}.", applicationId,
- subClusterId);
+ LOG.info("Application {} already submitted on SubCluster {}.",
+ applicationId, subClusterId);
} else {
routerMetrics.incrAppsFailedSubmitted();
- RouterAuditLogger.logFailure(user.getShortUserName(),
- RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN",
- "RouterClientRMService", message, applicationId, subClusterId);
- throw new YarnException(message, e);
+ RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
+ TARGET_CLIENT_RM_SERVICE, message, applicationId, subClusterId);
+ RouterServerUtil.logAndThrowException(message, e);
}
}
}
@@ -458,7 +459,6 @@
ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId);
- SubmitApplicationResponse response = null;
try {
response = clientRMProxy.submitApplication(request);
} catch (Exception e) {
@@ -472,9 +472,8 @@
applicationId, subClusterId);
long stopTime = clock.getTime();
routerMetrics.succeededAppsSubmitted(stopTime - startTime);
- RouterAuditLogger.logSuccess(user.getShortUserName(),
- RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP,
- "RouterClientRMService", applicationId, subClusterId);
+ RouterAuditLogger.logSuccess(user.getShortUserName(), SUBMIT_NEW_APP,
+ TARGET_CLIENT_RM_SERVICE, applicationId, subClusterId);
return response;
} else {
// Empty response from the ResourceManager.
@@ -484,13 +483,11 @@
}
routerMetrics.incrAppsFailedSubmitted();
- String errMsg = "Application "
- + request.getApplicationSubmissionContext().getApplicationName()
- + " with appId " + applicationId + " failed to be submitted.";
- RouterAuditLogger.logFailure(user.getShortUserName(),
- RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN",
- "RouterClientRMService", errMsg, applicationId);
- throw new YarnException(errMsg);
+ String msg = String.format("Application %s with appId %s failed to be submitted.",
+ request.getApplicationSubmissionContext().getApplicationName(), applicationId);
+ RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
+ TARGET_CLIENT_RM_SERVICE, msg, applicationId);
+ throw new YarnException(msg);
}
/**
@@ -513,16 +510,16 @@
public KillApplicationResponse forceKillApplication(
KillApplicationRequest request) throws YarnException, IOException {
- long startTime = clock.getTime();
-
if (request == null || request.getApplicationId() == null) {
routerMetrics.incrAppsFailedKilled();
- String message = "Missing forceKillApplication request or ApplicationId.";
- RouterAuditLogger.logFailure(user.getShortUserName(),
- RouterAuditLogger.AuditConstants.FORCE_KILL_APP, "UNKNOWN",
- "RouterClientRMService", message);
- throw new YarnException(message);
+ String msg = "Missing forceKillApplication request or ApplicationId.";
+ RouterAuditLogger.logFailure(user.getShortUserName(), FORCE_KILL_APP, UNKNOWN,
+ TARGET_CLIENT_RM_SERVICE, msg);
+ RouterServerUtil.logAndThrowException(msg, null);
}
+
+ long startTime = clock.getTime();
+
ApplicationId applicationId = request.getApplicationId();
SubClusterId subClusterId = null;
@@ -531,12 +528,11 @@
.getApplicationHomeSubCluster(request.getApplicationId());
} catch (YarnException e) {
routerMetrics.incrAppsFailedKilled();
- String msg = "Application " + applicationId
- + " does not exist in FederationStateStore";
- RouterAuditLogger.logFailure(user.getShortUserName(),
- RouterAuditLogger.AuditConstants.FORCE_KILL_APP, "UNKNOWN",
- "RouterClientRMService", msg, applicationId);
- throw new YarnException(msg, e);
+ String msg =
+ String.format("Application %s does not exist in FederationStateStore.", applicationId);
+ RouterAuditLogger.logFailure(user.getShortUserName(), FORCE_KILL_APP, UNKNOWN,
+ TARGET_CLIENT_RM_SERVICE, msg, applicationId);
+ RouterServerUtil.logAndThrowException(msg, e);
}
ApplicationClientProtocol clientRMProxy =
@@ -548,11 +544,10 @@
response = clientRMProxy.forceKillApplication(request);
} catch (Exception e) {
routerMetrics.incrAppsFailedKilled();
- RouterAuditLogger.logFailure(user.getShortUserName(),
- RouterAuditLogger.AuditConstants.FORCE_KILL_APP, "UNKNOWN",
- "RouterClientRMService", "Unable to kill the application report",
- applicationId, subClusterId);
- throw e;
+ String msg = "Unable to kill the application report.";
+ RouterAuditLogger.logFailure(user.getShortUserName(), FORCE_KILL_APP, UNKNOWN,
+ TARGET_CLIENT_RM_SERVICE, msg, applicationId, subClusterId);
+ RouterServerUtil.logAndThrowException(msg, e);
}
if (response == null) {
@@ -562,9 +557,8 @@
long stopTime = clock.getTime();
routerMetrics.succeededAppsKilled(stopTime - startTime);
- RouterAuditLogger.logSuccess(user.getShortUserName(),
- RouterAuditLogger.AuditConstants.FORCE_KILL_APP,
- "RouterClientRMService", applicationId);
+ RouterAuditLogger.logSuccess(user.getShortUserName(), FORCE_KILL_APP,
+ TARGET_CLIENT_RM_SERVICE, applicationId);
return response;
}
@@ -588,18 +582,15 @@
public GetApplicationReportResponse getApplicationReport(
GetApplicationReportRequest request) throws YarnException, IOException {
- long startTime = clock.getTime();
-
if (request == null || request.getApplicationId() == null) {
routerMetrics.incrAppsFailedRetrieved();
- String errMsg =
- "Missing getApplicationReport request or applicationId information.";
- RouterAuditLogger.logFailure(user.getShortUserName(),
- RouterAuditLogger.AuditConstants.GET_APP_REPORT, "UNKNOWN",
- "RouterClientRMService", errMsg);
- throw new YarnException(errMsg);
+ String errMsg = "Missing getApplicationReport request or applicationId information.";
+ RouterAuditLogger.logFailure(user.getShortUserName(), GET_APP_REPORT, UNKNOWN,
+ TARGET_CLIENT_RM_SERVICE, errMsg);
+ RouterServerUtil.logAndThrowException(errMsg, null);
}
+ long startTime = clock.getTime();
SubClusterId subClusterId = null;
try {
@@ -607,29 +598,26 @@
.getApplicationHomeSubCluster(request.getApplicationId());
} catch (YarnException e) {
routerMetrics.incrAppsFailedRetrieved();
- String errMsg = "Application " + request.getApplicationId()
- + " does not exist in FederationStateStore";
- RouterAuditLogger.logFailure(user.getShortUserName(),
- RouterAuditLogger.AuditConstants.GET_APP_REPORT, "UNKNOWN",
- "RouterClientRMService", errMsg, request.getApplicationId());
- throw new YarnException(errMsg, e);
+ String errMsg = String.format("Application %s does not exist in FederationStateStore.",
+ request.getApplicationId());
+ RouterAuditLogger.logFailure(user.getShortUserName(), GET_APP_REPORT, UNKNOWN,
+ TARGET_CLIENT_RM_SERVICE, errMsg, request.getApplicationId());
+ RouterServerUtil.logAndThrowException(errMsg, e);
}
ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId);
-
GetApplicationReportResponse response = null;
+
try {
response = clientRMProxy.getApplicationReport(request);
} catch (Exception e) {
routerMetrics.incrAppsFailedRetrieved();
- String errMsg = "Unable to get the application report for " + request
- .getApplicationId() + "to SubCluster " + subClusterId.getId();
- RouterAuditLogger.logFailure(user.getShortUserName(),
- RouterAuditLogger.AuditConstants.GET_APP_REPORT, "UNKNOWN",
- "RouterClientRMService", errMsg, request.getApplicationId(),
- subClusterId);
- throw e;
+ String errMsg = String.format("Unable to get the application report for %s to SubCluster %s.",
+ request.getApplicationId(), subClusterId.getId());
+ RouterAuditLogger.logFailure(user.getShortUserName(), GET_APP_REPORT, UNKNOWN,
+ TARGET_CLIENT_RM_SERVICE, errMsg, request.getApplicationId(), subClusterId);
+ RouterServerUtil.logAndThrowException(errMsg, e);
}
if (response == null) {
@@ -637,12 +625,10 @@
+ "the application {} to SubCluster {}.",
request.getApplicationId(), subClusterId.getId());
}
-
long stopTime = clock.getTime();
routerMetrics.succeededAppsRetrieved(stopTime - startTime);
- RouterAuditLogger.logSuccess(user.getShortUserName(),
- RouterAuditLogger.AuditConstants.GET_APP_REPORT,
- "RouterClientRMService", request.getApplicationId());
+ RouterAuditLogger.logSuccess(user.getShortUserName(), GET_APP_REPORT,
+ TARGET_CLIENT_RM_SERVICE, request.getApplicationId());
return response;
}
@@ -670,56 +656,48 @@
throws YarnException, IOException {
if (request == null) {
routerMetrics.incrMultipleAppsFailedRetrieved();
- RouterServerUtil.logAndThrowException(
- "Missing getApplications request.",
- null);
+ RouterServerUtil.logAndThrowException("Missing getApplications request.", null);
}
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subclusters =
federationFacade.getSubClusters(true);
ClientMethod remoteMethod = new ClientMethod("getApplications",
new Class[] {GetApplicationsRequest.class}, new Object[] {request});
- Map<SubClusterId, GetApplicationsResponse> applications;
-
+ Map<SubClusterId, GetApplicationsResponse> applications = null;
try {
applications = invokeConcurrent(subclusters.keySet(), remoteMethod,
GetApplicationsResponse.class);
-
} catch (Exception ex) {
routerMetrics.incrMultipleAppsFailedRetrieved();
- LOG.error("Unable to get applications due to exception.", ex);
- throw ex;
+ RouterServerUtil.logAndThrowException("Unable to get applications due to exception.", ex);
}
long stopTime = clock.getTime();
routerMetrics.succeededMultipleAppsRetrieved(stopTime - startTime);
// Merge the Application Reports
- return RouterYarnClientUtils.mergeApplications(applications.values(),
- returnPartialReport);
+ return RouterYarnClientUtils.mergeApplications(applications.values(), returnPartialReport);
}
@Override
public GetClusterMetricsResponse getClusterMetrics(
GetClusterMetricsRequest request) throws YarnException, IOException {
+ if (request == null) {
+ routerMetrics.incrGetClusterMetricsFailedRetrieved();
+ RouterServerUtil.logAndThrowException("Missing getClusterMetrics request.", null);
+ }
long startTime = clock.getTime();
- Map<SubClusterId, SubClusterInfo> subclusters =
- federationFacade.getSubClusters(true);
ClientMethod remoteMethod = new ClientMethod("getClusterMetrics",
new Class[] {GetClusterMetricsRequest.class}, new Object[] {request});
- Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics;
-
+ Collection<GetClusterMetricsResponse> clusterMetrics = null;
try {
- clusterMetrics = invokeConcurrent(subclusters.keySet(), remoteMethod,
- GetClusterMetricsResponse.class);
-
+ clusterMetrics = invokeAppClientProtocolMethod(
+ true, remoteMethod, GetClusterMetricsResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetClusterMetricsFailedRetrieved();
- LOG.error("Unable to get cluster metrics due to exception.", ex);
- throw ex;
+ RouterServerUtil.logAndThrowException("Unable to get cluster metrics due to exception.", ex);
}
-
long stopTime = clock.getTime();
routerMetrics.succeededGetClusterMetricsRetrieved(stopTime - startTime);
- return RouterYarnClientUtils.merge(clusterMetrics.values());
+ return RouterYarnClientUtils.merge(clusterMetrics);
}
<R> Map<SubClusterId, R> invokeConcurrent(ArrayList<SubClusterId> clusterIds,
@@ -804,8 +782,7 @@
clusterNodes.put(subClusterId, response);
} catch (Exception ex) {
routerMetrics.incrClusterNodesFailedRetrieved();
- LOG.error("Unable to get cluster nodes due to exception.", ex);
- throw ex;
+ RouterServerUtil.logAndThrowException("Unable to get cluster nodes due to exception.", ex);
}
}
long stopTime = clock.getTime();
@@ -850,14 +827,13 @@
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getQueueUserAcls",
new Class[] {GetQueueUserAclsInfoRequest.class}, new Object[] {request});
- Collection<GetQueueUserAclsInfoResponse> queueUserAcls;
+ Collection<GetQueueUserAclsInfoResponse> queueUserAcls = null;
try {
queueUserAcls = invokeAppClientProtocolMethod(true, remoteMethod,
GetQueueUserAclsInfoResponse.class);
} catch (Exception ex) {
routerMetrics.incrQueueUserAclsFailedRetrieved();
- LOG.error("Unable to get queue user Acls due to exception.", ex);
- throw ex;
+ RouterServerUtil.logAndThrowException("Unable to get queue user Acls due to exception.", ex);
}
long stopTime = clock.getTime();
routerMetrics.succeededGetQueueUserAclsRetrieved(stopTime - startTime);
@@ -931,14 +907,14 @@
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("listReservations",
new Class[] {ReservationListRequest.class}, new Object[] {request});
- Collection<ReservationListResponse> listResponses;
+ Collection<ReservationListResponse> listResponses = null;
try {
listResponses = invokeAppClientProtocolMethod(true, remoteMethod,
ReservationListResponse.class);
} catch (Exception ex) {
routerMetrics.incrListReservationsFailedRetrieved();
- LOG.error("Unable to list reservations node due to exception.", ex);
- throw ex;
+ RouterServerUtil.logAndThrowException(
+ "Unable to list reservations node due to exception.", ex);
}
long stopTime = clock.getTime();
routerMetrics.succeededListReservationsRetrieved(stopTime - startTime);
@@ -986,14 +962,13 @@
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getNodeToLabels",
new Class[] {GetNodesToLabelsRequest.class}, new Object[] {request});
- Collection<GetNodesToLabelsResponse> clusterNodes;
+ Collection<GetNodesToLabelsResponse> clusterNodes = null;
try {
clusterNodes = invokeAppClientProtocolMethod(true, remoteMethod,
GetNodesToLabelsResponse.class);
} catch (Exception ex) {
routerMetrics.incrNodeToLabelsFailedRetrieved();
- LOG.error("Unable to get label node due to exception.", ex);
- throw ex;
+ RouterServerUtil.logAndThrowException("Unable to get node label due to exception.", ex);
}
long stopTime = clock.getTime();
routerMetrics.succeededGetNodeToLabelsRetrieved(stopTime - startTime);
@@ -1010,15 +985,14 @@
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getLabelsToNodes",
- new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request});
- Collection<GetLabelsToNodesResponse> labelNodes;
+ new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request});
+ Collection<GetLabelsToNodesResponse> labelNodes = null;
try {
labelNodes = invokeAppClientProtocolMethod(true, remoteMethod,
GetLabelsToNodesResponse.class);
} catch (Exception ex) {
routerMetrics.incrLabelsToNodesFailedRetrieved();
- LOG.error("Unable to get label node due to exception.", ex);
- throw ex;
+ RouterServerUtil.logAndThrowException("Unable to get label node due to exception.", ex);
}
long stopTime = clock.getTime();
routerMetrics.succeededGetLabelsToNodesRetrieved(stopTime - startTime);
@@ -1035,15 +1009,15 @@
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getClusterNodeLabels",
- new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request});
- Collection<GetClusterNodeLabelsResponse> nodeLabels;
+ new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request});
+ Collection<GetClusterNodeLabelsResponse> nodeLabels = null;
try {
nodeLabels = invokeAppClientProtocolMethod(true, remoteMethod,
GetClusterNodeLabelsResponse.class);
} catch (Exception ex) {
routerMetrics.incrClusterNodeLabelsFailedRetrieved();
- LOG.error("Unable to get cluster nodeLabels due to exception.", ex);
- throw ex;
+ RouterServerUtil.logAndThrowException("Unable to get cluster nodeLabels due to exception.",
+ ex);
}
long stopTime = clock.getTime();
routerMetrics.succeededGetClusterNodeLabelsRetrieved(stopTime - startTime);
@@ -1096,15 +1070,15 @@
ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId);
- GetApplicationAttemptReportResponse response;
+ GetApplicationAttemptReportResponse response = null;
try {
response = clientRMProxy.getApplicationAttemptReport(request);
} catch (Exception e) {
routerMetrics.incrAppAttemptsFailedRetrieved();
- LOG.error("Unable to get the applicationAttempt report for {} "
- + "to SubCluster {}, error = {}.",
- request.getApplicationAttemptId(), subClusterId.getId(), e);
- throw e;
+ String msg = String.format(
+ "Unable to get the applicationAttempt report for %s to SubCluster %s.",
+ request.getApplicationAttemptId(), subClusterId.getId());
+ RouterServerUtil.logAndThrowException(msg, e);
}
if (response == null) {
@@ -1328,8 +1302,7 @@
} catch (YarnException e) {
routerMetrics.incrUpdateAppPriorityFailedRetrieved();
RouterServerUtil.logAndThrowException("Application " +
- request.getApplicationId() +
- " does not exist in FederationStateStore.", e);
+ request.getApplicationId() + " does not exist in FederationStateStore.", e);
}
ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
index f0aa480..6a47f15 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
@@ -156,14 +156,13 @@
stateStore = new MemoryFederationStateStore();
stateStore.init(this.getConf());
- FederationStateStoreFacade.getInstance().reinitialize(stateStore,
- getConf());
+ FederationStateStoreFacade.getInstance().reinitialize(stateStore, getConf());
stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
interceptor.setConf(this.getConf());
interceptor.init(user);
- subClusters = new ArrayList<SubClusterId>();
+ subClusters = new ArrayList<>();
try {
for (int i = 0; i < NUM_SUBCLUSTER; i++) {
@@ -197,7 +196,7 @@
// chain
conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
- + "," + TestableFederationClientInterceptor.class.getName());
+ + "," + TestableFederationClientInterceptor.class.getName());
conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
UniformBroadcastPolicyManager.class.getName());
@@ -212,17 +211,16 @@
* ApplicationId has to belong to one of the SubCluster in the cluster.
*/
@Test
- public void testGetNewApplication()
- throws YarnException, IOException, InterruptedException {
- LOG.info("Test FederationClientInterceptor: Get New Application");
+ public void testGetNewApplication() throws YarnException, IOException {
+ LOG.info("Test FederationClientInterceptor: Get New Application.");
GetNewApplicationRequest request = GetNewApplicationRequest.newInstance();
GetNewApplicationResponse response = interceptor.getNewApplication(request);
Assert.assertNotNull(response);
Assert.assertNotNull(response.getApplicationId());
- Assert.assertTrue(response.getApplicationId()
- .getClusterTimestamp() == ResourceManager.getClusterTimeStamp());
+ Assert.assertEquals(response.getApplicationId().getClusterTimestamp(),
+ ResourceManager.getClusterTimeStamp());
}
/**
@@ -232,10 +230,9 @@
@Test
public void testSubmitApplication()
throws YarnException, IOException {
- LOG.info("Test FederationClientInterceptor: Submit Application");
+ LOG.info("Test FederationClientInterceptor: Submit Application.");
- ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
- 1);
+ ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
SubmitApplicationResponse response = interceptor.submitApplication(request);
@@ -249,14 +246,12 @@
private SubmitApplicationRequest mockSubmitApplicationRequest(
ApplicationId appId) {
ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
- ApplicationSubmissionContext context = ApplicationSubmissionContext
- .newInstance(appId, MockApps.newAppName(), "default",
- Priority.newInstance(APP_PRIORITY_ZERO), amContainerSpec, false, false, -1,
- Resources.createResource(
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
- "MockApp");
- SubmitApplicationRequest request = SubmitApplicationRequest
- .newInstance(context);
+ ApplicationSubmissionContext context = ApplicationSubmissionContext.newInstance(
+ appId, MockApps.newAppName(), "default",
+ Priority.newInstance(APP_PRIORITY_ZERO), amContainerSpec, false, false, -1,
+ Resources.createResource(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
+ "MockApp");
+ SubmitApplicationRequest request = SubmitApplicationRequest.newInstance(context);
return request;
}
@@ -297,37 +292,27 @@
*/
@Test
public void testSubmitApplicationEmptyRequest()
- throws YarnException, IOException, InterruptedException {
- LOG.info(
- "Test FederationClientInterceptor: Submit Application - Empty");
- try {
- interceptor.submitApplication(null);
- Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(
- e.getMessage().startsWith("Missing submitApplication request or "
- + "applicationSubmissionContext information."));
- }
- try {
- interceptor.submitApplication(SubmitApplicationRequest.newInstance(null));
- Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(
- e.getMessage().startsWith("Missing submitApplication request or "
- + "applicationSubmissionContext information."));
- }
- try {
- ApplicationSubmissionContext context = ApplicationSubmissionContext
- .newInstance(null, "", "", null, null, false, false, -1, null, null);
- SubmitApplicationRequest request =
- SubmitApplicationRequest.newInstance(context);
- interceptor.submitApplication(request);
- Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(
- e.getMessage().startsWith("Missing submitApplication request or "
- + "applicationSubmissionContext information."));
- }
+ throws Exception {
+ LOG.info("Test FederationClientInterceptor: Submit Application - Empty.");
+
+ // null request1
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing submitApplication request or applicationSubmissionContext information.",
+ () -> interceptor.submitApplication(null));
+
+ // null request2
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing submitApplication request or applicationSubmissionContext information.",
+ () -> interceptor.submitApplication(SubmitApplicationRequest.newInstance(null)));
+
+ // null request3
+ ApplicationSubmissionContext context = ApplicationSubmissionContext
+ .newInstance(null, "", "", null, null, false, false, -1, null, null);
+ SubmitApplicationRequest request =
+ SubmitApplicationRequest.newInstance(context);
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing submitApplication request or applicationSubmissionContext information.",
+ () -> interceptor.submitApplication(request));
}
/**
@@ -337,7 +322,7 @@
@Test
public void testForceKillApplication()
throws YarnException, IOException, InterruptedException {
- LOG.info("Test FederationClientInterceptor: Force Kill Application");
+ LOG.info("Test FederationClientInterceptor: Force Kill Application.");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
@@ -349,10 +334,8 @@
Assert.assertNotNull(response);
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
- KillApplicationRequest requestKill =
- KillApplicationRequest.newInstance(appId);
- KillApplicationResponse responseKill =
- interceptor.forceKillApplication(requestKill);
+ KillApplicationRequest requestKill = KillApplicationRequest.newInstance(appId);
+ KillApplicationResponse responseKill = interceptor.forceKillApplication(requestKill);
Assert.assertNotNull(responseKill);
}
@@ -361,22 +344,17 @@
* application does not exist in StateStore.
*/
@Test
- public void testForceKillApplicationNotExists()
- throws YarnException, IOException, InterruptedException {
- LOG.info("Test FederationClientInterceptor: "
- + "Force Kill Application - Not Exists");
+ public void testForceKillApplicationNotExists() throws Exception {
+ LOG.info("Test FederationClientInterceptor: Force Kill Application - Not Exists");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
KillApplicationRequest requestKill =
KillApplicationRequest.newInstance(appId);
- try {
- interceptor.forceKillApplication(requestKill);
- Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(e.getMessage().equals(
- "Application " + appId + " does not exist in FederationStateStore"));
- }
+
+ LambdaTestUtils.intercept(YarnException.class,
+ "Application " + appId + " does not exist in FederationStateStore.",
+ () -> interceptor.forceKillApplication(requestKill));
}
/**
@@ -385,24 +363,19 @@
*/
@Test
public void testForceKillApplicationEmptyRequest()
- throws YarnException, IOException, InterruptedException {
- LOG.info(
- "Test FederationClientInterceptor: Force Kill Application - Empty");
- try {
- interceptor.forceKillApplication(null);
- Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(e.getMessage().startsWith(
- "Missing forceKillApplication request or ApplicationId."));
- }
- try {
- interceptor
- .forceKillApplication(KillApplicationRequest.newInstance(null));
- Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(e.getMessage().startsWith(
- "Missing forceKillApplication request or ApplicationId."));
- }
+ throws Exception {
+ LOG.info("Test FederationClientInterceptor: Force Kill Application - Empty.");
+
+ // null request1
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing forceKillApplication request or ApplicationId.",
+ () -> interceptor.forceKillApplication(null));
+
+ // null request2
+ KillApplicationRequest killRequest = KillApplicationRequest.newInstance(null);
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing forceKillApplication request or ApplicationId.",
+ () -> interceptor.forceKillApplication(killRequest));
}
/**
@@ -439,20 +412,14 @@
*/
@Test
public void testGetApplicationNotExists()
- throws YarnException, IOException, InterruptedException {
- LOG.info(
- "Test ApplicationClientProtocol: Get Application Report - Not Exists");
- ApplicationId appId =
- ApplicationId.newInstance(System.currentTimeMillis(), 1);
- GetApplicationReportRequest requestGet =
- GetApplicationReportRequest.newInstance(appId);
- try {
- interceptor.getApplicationReport(requestGet);
- Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(e.getMessage().equals(
- "Application " + appId + " does not exist in FederationStateStore"));
- }
+ throws Exception {
+ LOG.info("Test ApplicationClientProtocol: Get Application Report - Not Exists.");
+
+ ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ GetApplicationReportRequest requestGet = GetApplicationReportRequest.newInstance(appId);
+ LambdaTestUtils.intercept(YarnException.class,
+ "Application " + appId + " does not exist in FederationStateStore.",
+ () -> interceptor.getApplicationReport(requestGet));
}
/**
@@ -461,31 +428,23 @@
*/
@Test
public void testGetApplicationEmptyRequest()
- throws YarnException, IOException, InterruptedException {
- LOG.info(
- "Test FederationClientInterceptor: Get Application Report - Empty");
- try {
- interceptor.getApplicationReport(null);
- Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(
- e.getMessage().startsWith("Missing getApplicationReport request or "
- + "applicationId information."));
- }
- try {
- interceptor
- .getApplicationReport(GetApplicationReportRequest.newInstance(null));
- Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(
- e.getMessage().startsWith("Missing getApplicationReport request or "
- + "applicationId information."));
- }
+ throws Exception {
+ LOG.info("Test FederationClientInterceptor: Get Application Report - Empty.");
+
+ // null request1
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing getApplicationReport request or applicationId information.",
+ () -> interceptor.getApplicationReport(null));
+
+ // null request2
+ GetApplicationReportRequest reportRequest = GetApplicationReportRequest.newInstance(null);
+ LambdaTestUtils.intercept(YarnException.class,
+ "Missing getApplicationReport request or applicationId information.",
+ () -> interceptor.getApplicationReport(reportRequest));
}
/**
- * This test validates the correctness of
- * GetApplicationAttemptReport in case the
+ * This test validates the correctness of GetApplicationAttemptReport in case the
* application exists in the cluster.
*/
@Test
@@ -529,77 +488,68 @@
}
/**
- * This test validates the correctness of
- * GetApplicationAttemptReport in case the
+ * This test validates the correctness of GetApplicationAttemptReport in case the
* application does not exist in StateStore.
*/
@Test
- public void testGetApplicationAttemptNotExists()
- throws Exception {
- LOG.info(
- "Test ApplicationClientProtocol: " +
- "Get ApplicationAttempt Report - Not Exists");
+ public void testGetApplicationAttemptNotExists() throws Exception {
+ LOG.info("Test FederationClientInterceptor: Get ApplicationAttempt Report - Not Exists.");
+
ApplicationId appId =
- ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationAttemptId appAttemptID =
- ApplicationAttemptId.newInstance(appId, 1);
+ ApplicationAttemptId.newInstance(appId, 1);
GetApplicationAttemptReportRequest requestGet =
- GetApplicationAttemptReportRequest.newInstance(appAttemptID);
+ GetApplicationAttemptReportRequest.newInstance(appAttemptID);
LambdaTestUtils.intercept(YarnException.class, "ApplicationAttempt " +
- appAttemptID + " belongs to Application " +
- appId + " does not exist in FederationStateStore.",
+ appAttemptID + " belongs to Application " +
+ appId + " does not exist in FederationStateStore.",
() -> interceptor.getApplicationAttemptReport(requestGet));
}
/**
- * This test validates
- * the correctness of GetApplicationAttemptReport in case of
+ * This test validates the correctness of GetApplicationAttemptReport in case of
* empty request.
*/
@Test
public void testGetApplicationAttemptEmptyRequest()
- throws Exception {
- LOG.info("Test FederationClientInterceptor: " +
- "Get ApplicationAttempt Report - Empty");
+ throws Exception {
+ LOG.info("Test FederationClientInterceptor: Get ApplicationAttempt Report - Empty.");
+ // null request1
LambdaTestUtils.intercept(YarnException.class,
- "Missing getApplicationAttemptReport " +
- "request or applicationId " +
- "or applicationAttemptId information.",
+ "Missing getApplicationAttemptReport request or applicationId " +
+ "or applicationAttemptId information.",
() -> interceptor.getApplicationAttemptReport(null));
+ // null request2
LambdaTestUtils.intercept(YarnException.class,
- "Missing getApplicationAttemptReport " +
- "request or applicationId " +
- "or applicationAttemptId information.",
- () -> interceptor
- .getApplicationAttemptReport(
- GetApplicationAttemptReportRequest
- .newInstance(null)));
+ "Missing getApplicationAttemptReport request or applicationId " +
+ "or applicationAttemptId information.",
+ () -> interceptor.getApplicationAttemptReport(
+ GetApplicationAttemptReportRequest.newInstance(null)));
+ // null request3
LambdaTestUtils.intercept(YarnException.class,
- "Missing getApplicationAttemptReport " +
- "request or applicationId " +
- "or applicationAttemptId information.",
- () -> interceptor
- .getApplicationAttemptReport(
- GetApplicationAttemptReportRequest.newInstance(
- ApplicationAttemptId
- .newInstance(null, 1)
- )));
+ "Missing getApplicationAttemptReport request or applicationId " +
+ "or applicationAttemptId information.",
+ () -> interceptor.getApplicationAttemptReport(
+ GetApplicationAttemptReportRequest.newInstance(
+ ApplicationAttemptId.newInstance(null, 1))));
}
@Test
- public void testGetClusterMetricsRequest() throws YarnException, IOException {
- LOG.info("Test FederationClientInterceptor : Get Cluster Metrics request");
+ public void testGetClusterMetricsRequest() throws Exception {
+ LOG.info("Test FederationClientInterceptor : Get Cluster Metrics request.");
+
// null request
- GetClusterMetricsResponse response = interceptor.getClusterMetrics(null);
- Assert.assertEquals(subClusters.size(),
- response.getClusterMetrics().getNumNodeManagers());
+ LambdaTestUtils.intercept(YarnException.class, "Missing getClusterMetrics request.",
+ () -> interceptor.getClusterMetrics(null));
+
// normal request.
- response =
+ GetClusterMetricsResponse response =
interceptor.getClusterMetrics(GetClusterMetricsRequest.newInstance());
Assert.assertEquals(subClusters.size(),
response.getClusterMetrics().getNumNodeManagers());
@@ -607,23 +557,20 @@
ClientMethod remoteMethod = new ClientMethod("getClusterMetrics",
new Class[] {GetClusterMetricsRequest.class},
new Object[] {GetClusterMetricsRequest.newInstance()});
- Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics =interceptor.
- invokeConcurrent(new ArrayList<>(), remoteMethod,
- GetClusterMetricsResponse.class);
- Assert.assertEquals(true, clusterMetrics.isEmpty());
+ Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics = interceptor.
+ invokeConcurrent(new ArrayList<>(), remoteMethod, GetClusterMetricsResponse.class);
+ Assert.assertTrue(clusterMetrics.isEmpty());
}
/**
- * This test validates the correctness of
- * GetApplicationsResponse in case the
+ * This test validates the correctness of GetApplicationsResponse in case the
* application exists in the cluster.
*/
@Test
public void testGetApplicationsResponse()
throws YarnException, IOException, InterruptedException {
- LOG.info("Test FederationClientInterceptor: Get Applications Response");
- ApplicationId appId =
- ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ LOG.info("Test FederationClientInterceptor: Get Applications Response.");
+ ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
SubmitApplicationResponse response = interceptor.submitApplication(request);
@@ -632,40 +579,32 @@
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
Set<String> appTypes = Collections.singleton("MockApp");
- GetApplicationsRequest requestGet =
- GetApplicationsRequest.newInstance(appTypes);
-
- GetApplicationsResponse responseGet =
- interceptor.getApplications(requestGet);
+ GetApplicationsRequest requestGet = GetApplicationsRequest.newInstance(appTypes);
+ GetApplicationsResponse responseGet = interceptor.getApplications(requestGet);
Assert.assertNotNull(responseGet);
}
/**
- * This test validates
- * the correctness of GetApplicationsResponse in case of
+ * This test validates the correctness of GetApplicationsResponse in case of
* empty request.
*/
@Test
public void testGetApplicationsNullRequest() throws Exception {
- LOG.info("Test FederationClientInterceptor: Get Applications request");
- LambdaTestUtils.intercept(YarnException.class,
- "Missing getApplications request.",
+ LOG.info("Test FederationClientInterceptor: Get Applications request.");
+ LambdaTestUtils.intercept(YarnException.class, "Missing getApplications request.",
() -> interceptor.getApplications(null));
}
/**
- * This test validates
- * the correctness of GetApplicationsResponse in case applications
+ * This test validates the correctness of GetApplicationsResponse in case applications
* with given type does not exist.
*/
@Test
public void testGetApplicationsApplicationTypeNotExists() throws Exception{
- LOG.info("Test FederationClientInterceptor: Application with type does "
- + "not exist");
+ LOG.info("Test FederationClientInterceptor: Application with type does not exist.");
- ApplicationId appId =
- ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
SubmitApplicationResponse response = interceptor.submitApplication(request);
@@ -675,25 +614,20 @@
Set<String> appTypes = Collections.singleton("SPARK");
- GetApplicationsRequest requestGet =
- GetApplicationsRequest.newInstance(appTypes);
-
- GetApplicationsResponse responseGet =
- interceptor.getApplications(requestGet);
+ GetApplicationsRequest requestGet = GetApplicationsRequest.newInstance(appTypes);
+ GetApplicationsResponse responseGet = interceptor.getApplications(requestGet);
Assert.assertNotNull(responseGet);
Assert.assertTrue(responseGet.getApplicationList().isEmpty());
}
/**
- * This test validates
- * the correctness of GetApplicationsResponse in case applications
+ * This test validates the correctness of GetApplicationsResponse in case applications
* with given YarnApplicationState does not exist.
*/
@Test
public void testGetApplicationsApplicationStateNotExists() throws Exception {
- LOG.info("Test FederationClientInterceptor:" +
- " Application with state does not exist");
+ LOG.info("Test FederationClientInterceptor: Application with state does not exist.");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
@@ -711,8 +645,7 @@
GetApplicationsRequest requestGet =
GetApplicationsRequest.newInstance(applicationStates);
- GetApplicationsResponse responseGet =
- interceptor.getApplications(requestGet);
+ GetApplicationsResponse responseGet = interceptor.getApplications(requestGet);
Assert.assertNotNull(responseGet);
Assert.assertTrue(responseGet.getApplicationList().isEmpty());
@@ -720,7 +653,7 @@
@Test
public void testGetClusterNodesRequest() throws Exception {
- LOG.info("Test FederationClientInterceptor : Get Cluster Nodeds request");
+ LOG.info("Test FederationClientInterceptor : Get Cluster Nodes request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodes request.",
() -> interceptor.getClusterNodes(null));
@@ -732,7 +665,7 @@
@Test
public void testGetNodeToLabelsRequest() throws Exception {
- LOG.info("Test FederationClientInterceptor : Get Node To Labels request");
+ LOG.info("Test FederationClientInterceptor : Get Node To Labels request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getNodesToLabels request.",
() -> interceptor.getNodeToLabels(null));
@@ -744,7 +677,7 @@
@Test
public void testGetLabelsToNodesRequest() throws Exception {
- LOG.info("Test FederationClientInterceptor : Get Labels To Node request");
+ LOG.info("Test FederationClientInterceptor : Get Labels To Node request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getLabelsToNodes request.",
() -> interceptor.getLabelsToNodes(null));
@@ -756,7 +689,7 @@
@Test
public void testClusterNodeLabelsRequest() throws Exception {
- LOG.info("Test FederationClientInterceptor : Get Cluster NodeLabels request");
+ LOG.info("Test FederationClientInterceptor : Get Cluster NodeLabels request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodeLabels request.",
() -> interceptor.getClusterNodeLabels(null));
@@ -768,13 +701,13 @@
@Test
public void testGetQueueUserAcls() throws Exception {
- LOG.info("Test FederationClientInterceptor : Get QueueUserAcls request");
+ LOG.info("Test FederationClientInterceptor : Get QueueUserAcls request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getQueueUserAcls request.",
() -> interceptor.getQueueUserAcls(null));
- // noraml request
+ // normal request
GetQueueUserAclsInfoResponse response = interceptor.getQueueUserAcls(
GetQueueUserAclsInfoRequest.newInstance());
@@ -796,7 +729,7 @@
@Test
public void testListReservations() throws Exception {
- LOG.info("Test FederationClientInterceptor : Get ListReservations request");
+ LOG.info("Test FederationClientInterceptor : Get ListReservations request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing listReservations request.",
@@ -812,7 +745,7 @@
@Test
public void testGetContainersRequest() throws Exception {
- LOG.info("Test FederationClientInterceptor : Get Containers request");
+ LOG.info("Test FederationClientInterceptor : Get Containers request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getContainers request " +
@@ -928,7 +861,7 @@
@Test
public void testGetResourceTypeInfoRequest() throws Exception {
- LOG.info("Test FederationClientInterceptor : Get Resource TypeInfo request");
+ LOG.info("Test FederationClientInterceptor : Get Resource TypeInfo request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getResourceTypeInfo request.",
() -> interceptor.getResourceTypeInfo(null));
@@ -1109,7 +1042,7 @@
RMAppAttemptState.SCHEDULED);
MockNM nm = interceptor.getMockNMs().get(subClusterId);
nm.nodeHeartbeat(true);
- mockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED);
+ MockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED);
mockRM.sendAMLaunched(rmApp.getCurrentAppAttempt().getAppAttemptId());
ContainerId containerId = rmApp.getCurrentAppAttempt().getMasterContainer().getId();
@@ -1147,10 +1080,10 @@
mockRM.waitForState(appId, RMAppState.ACCEPTED);
RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId);
mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
- RMAppAttemptState.SCHEDULED);
+ RMAppAttemptState.SCHEDULED);
MockNM nm = interceptor.getMockNMs().get(subClusterId);
nm.nodeHeartbeat(true);
- mockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED);
+ MockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED);
mockRM.sendAMLaunched(rmApp.getCurrentAppAttempt().getAppAttemptId());
MoveApplicationAcrossQueuesRequest acrossQueuesRequest =