YARN-11180. Refactor some code of getNewApplication, submitApplication etc. (#4618)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterAuditLogger.java
index cc82087..a89d0e4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterAuditLogger.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterAuditLogger.java
@@ -49,6 +49,8 @@
     public static final String SUBMIT_NEW_APP = "Submit New App";
     public static final String FORCE_KILL_APP = "Force Kill App";
     public static final String GET_APP_REPORT = "Get Application Report";
+    public static final String TARGET_CLIENT_RM_SERVICE = "RouterClientRMService";
+    public static final String UNKNOWN = "UNKNOWN";
   }
 
   /**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index 7fd1003..947e5f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -137,6 +137,13 @@
 
 import org.apache.hadoop.classification.VisibleForTesting;
 
+import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_NEW_APP;
+import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP;
+import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.GET_APP_REPORT;
+import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.FORCE_KILL_APP;
+import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.TARGET_CLIENT_RM_SERVICE;
+import static org.apache.hadoop.yarn.server.router.RouterAuditLogger.AuditConstants.UNKNOWN;
+
 /**
  * Extends the {@code AbstractRequestInterceptorClient} class and provides an
  * implementation for federation of YARN RM and scaling an application across
@@ -228,8 +235,8 @@
           CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false);
       UserGroupInformation realUser = user;
       if (serviceAuthEnabled) {
-        realUser = UserGroupInformation.createProxyUser(user.getShortUserName(),
-            UserGroupInformation.getLoginUser());
+        realUser = UserGroupInformation.createProxyUser(
+            user.getShortUserName(), UserGroupInformation.getLoginUser());
       }
       clientRMProxy = FederationProxyProviderUtil.createRMProxy(getConf(),
           ApplicationClientProtocol.class, subClusterId, realUser);
@@ -237,21 +244,17 @@
       RouterServerUtil.logAndThrowException(
           "Unable to create the interface to reach the SubCluster " + subClusterId, e);
     }
-
     clientRMProxies.put(subClusterId, clientRMProxy);
     return clientRMProxy;
   }
 
   private SubClusterId getRandomActiveSubCluster(
-      Map<SubClusterId, SubClusterInfo> activeSubclusters)
-      throws YarnException {
-
-    if (activeSubclusters == null || activeSubclusters.size() < 1) {
+      Map<SubClusterId, SubClusterInfo> activeSubClusters) throws YarnException {
+    if (activeSubClusters == null || activeSubClusters.isEmpty()) {
       RouterServerUtil.logAndThrowException(
           FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
     }
-    List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
-
+    List<SubClusterId> list = new ArrayList<>(activeSubClusters.keySet());
     return list.get(rand.nextInt(list.size()));
   }
 
@@ -276,45 +279,43 @@
   public GetNewApplicationResponse getNewApplication(
       GetNewApplicationRequest request) throws YarnException, IOException {
 
-    long startTime = clock.getTime();
+    if (request == null) {
+      routerMetrics.incrAppsFailedCreated();
+      String errMsg = "Missing getNewApplication request.";
+      RouterAuditLogger.logFailure(user.getShortUserName(), GET_NEW_APP, UNKNOWN,
+          TARGET_CLIENT_RM_SERVICE, errMsg);
+      RouterServerUtil.logAndThrowException(errMsg, null);
+    }
 
+    long startTime = clock.getTime();
     Map<SubClusterId, SubClusterInfo> subClustersActive =
         federationFacade.getSubClusters(true);
 
     for (int i = 0; i < numSubmitRetries; ++i) {
       SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
-      LOG.debug("getNewApplication try #{} on SubCluster {}", i, subClusterId);
-      ApplicationClientProtocol clientRMProxy =
-          getClientRMProxyForSubCluster(subClusterId);
+      LOG.info("getNewApplication try #{} on SubCluster {}.", i, subClusterId);
+      ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
       GetNewApplicationResponse response = null;
       try {
         response = clientRMProxy.getNewApplication(request);
       } catch (Exception e) {
-        LOG.warn("Unable to create a new ApplicationId in SubCluster "
-            + subClusterId.getId(), e);
-      }
-
-      if (response != null) {
-
-        long stopTime = clock.getTime();
-        routerMetrics.succeededAppsCreated(stopTime - startTime);
-        RouterAuditLogger.logSuccess(user.getShortUserName(),
-            RouterAuditLogger.AuditConstants.GET_NEW_APP,
-            "RouterClientRMService", response.getApplicationId());
-        return response;
-      } else {
-        // Empty response from the ResourceManager.
-        // Blacklist this subcluster for this request.
+        LOG.warn("Unable to create a new ApplicationId in SubCluster {}.", subClusterId.getId(), e);
         subClustersActive.remove(subClusterId);
       }
 
+      if (response != null) {
+        long stopTime = clock.getTime();
+        routerMetrics.succeededAppsCreated(stopTime - startTime);
+        RouterAuditLogger.logSuccess(user.getShortUserName(), GET_NEW_APP,
+            TARGET_CLIENT_RM_SERVICE, response.getApplicationId());
+        return response;
+      }
     }
 
     routerMetrics.incrAppsFailedCreated();
-    String errMsg = "Fail to create a new application.";
-    RouterAuditLogger.logFailure(user.getShortUserName(),
-        RouterAuditLogger.AuditConstants.GET_NEW_APP, "UNKNOWN",
-        "RouterClientRMService", errMsg);
+    String errMsg = "Failed to create a new application.";
+    RouterAuditLogger.logFailure(user.getShortUserName(), GET_NEW_APP, UNKNOWN,
+        TARGET_CLIENT_RM_SERVICE, errMsg);
     throw new YarnException(errMsg);
   }
 
@@ -387,21 +388,20 @@
   public SubmitApplicationResponse submitApplication(
       SubmitApplicationRequest request) throws YarnException, IOException {
 
-    long startTime = clock.getTime();
-
     if (request == null || request.getApplicationSubmissionContext() == null
-        || request.getApplicationSubmissionContext()
-            .getApplicationId() == null) {
+        || request.getApplicationSubmissionContext().getApplicationId() == null) {
       routerMetrics.incrAppsFailedSubmitted();
       String errMsg =
-          "Missing submitApplication request or applicationSubmissionContext "
-              + "information.";
-      RouterAuditLogger.logFailure(user.getShortUserName(),
-          RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN",
-          "RouterClientRMService", errMsg);
-      throw new YarnException(errMsg);
+          "Missing submitApplication request or applicationSubmissionContext information.";
+      RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
+          TARGET_CLIENT_RM_SERVICE, errMsg);
+      RouterServerUtil.logAndThrowException(errMsg, null);
     }
 
+    SubmitApplicationResponse response = null;
+
+    long startTime = clock.getTime();
+
     ApplicationId applicationId =
         request.getApplicationSubmissionContext().getApplicationId();
 
@@ -411,8 +411,9 @@
 
       SubClusterId subClusterId = policyFacade.getHomeSubcluster(
           request.getApplicationSubmissionContext(), blacklist);
-      LOG.info("submitApplication appId {} try #{} on SubCluster {}.", applicationId, i,
-          subClusterId);
+
+      LOG.info("submitApplication appId {} try #{} on SubCluster {}.",
+           applicationId, i, subClusterId);
 
       ApplicationHomeSubCluster appHomeSubCluster =
           ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
@@ -425,12 +426,12 @@
               federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
         } catch (YarnException e) {
           routerMetrics.incrAppsFailedSubmitted();
-          String message = "Unable to insert the ApplicationId " + applicationId
-              + " into the FederationStateStore";
-          RouterAuditLogger.logFailure(user.getShortUserName(),
-              RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN",
-              "RouterClientRMService", message, applicationId, subClusterId);
-          throw new YarnException(message, e);
+          String message =
+              String.format("Unable to insert the ApplicationId %s into the FederationStateStore.",
+              applicationId);
+          RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
+              TARGET_CLIENT_RM_SERVICE, message, applicationId, subClusterId);
+          RouterServerUtil.logAndThrowException(message, e);
         }
       } else {
         try {
@@ -438,19 +439,19 @@
           // the new subClusterId we have selected
           federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster);
         } catch (YarnException e) {
-          String message = "Unable to update the ApplicationId " + applicationId
-              + " into the FederationStateStore";
+          String message =
+              String.format("Unable to update the ApplicationId %s into the FederationStateStore.",
+              applicationId);
           SubClusterId subClusterIdInStateStore =
               federationFacade.getApplicationHomeSubCluster(applicationId);
           if (subClusterId == subClusterIdInStateStore) {
-            LOG.info("Application {} already submitted on SubCluster {}.", applicationId,
-                subClusterId);
+            LOG.info("Application {} already submitted on SubCluster {}.",
+                applicationId, subClusterId);
           } else {
             routerMetrics.incrAppsFailedSubmitted();
-            RouterAuditLogger.logFailure(user.getShortUserName(),
-                RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN",
-                "RouterClientRMService", message, applicationId, subClusterId);
-            throw new YarnException(message, e);
+            RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
+                TARGET_CLIENT_RM_SERVICE, message, applicationId, subClusterId);
+            RouterServerUtil.logAndThrowException(message, e);
           }
         }
       }
@@ -458,7 +459,6 @@
       ApplicationClientProtocol clientRMProxy =
           getClientRMProxyForSubCluster(subClusterId);
 
-      SubmitApplicationResponse response = null;
       try {
         response = clientRMProxy.submitApplication(request);
       } catch (Exception e) {
@@ -472,9 +472,8 @@
             applicationId, subClusterId);
         long stopTime = clock.getTime();
         routerMetrics.succeededAppsSubmitted(stopTime - startTime);
-        RouterAuditLogger.logSuccess(user.getShortUserName(),
-            RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP,
-            "RouterClientRMService", applicationId, subClusterId);
+        RouterAuditLogger.logSuccess(user.getShortUserName(), SUBMIT_NEW_APP,
+            TARGET_CLIENT_RM_SERVICE, applicationId, subClusterId);
         return response;
       } else {
         // Empty response from the ResourceManager.
@@ -484,13 +483,11 @@
     }
 
     routerMetrics.incrAppsFailedSubmitted();
-    String errMsg = "Application "
-        + request.getApplicationSubmissionContext().getApplicationName()
-        + " with appId " + applicationId + " failed to be submitted.";
-    RouterAuditLogger.logFailure(user.getShortUserName(),
-        RouterAuditLogger.AuditConstants.SUBMIT_NEW_APP, "UNKNOWN",
-        "RouterClientRMService", errMsg, applicationId);
-    throw new YarnException(errMsg);
+    String msg = String.format("Application %s with appId %s failed to be submitted.",
+        request.getApplicationSubmissionContext().getApplicationName(), applicationId);
+    RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
+        TARGET_CLIENT_RM_SERVICE, msg, applicationId);
+    throw new YarnException(msg);
   }
 
   /**
@@ -513,16 +510,16 @@
   public KillApplicationResponse forceKillApplication(
       KillApplicationRequest request) throws YarnException, IOException {
 
-    long startTime = clock.getTime();
-
     if (request == null || request.getApplicationId() == null) {
       routerMetrics.incrAppsFailedKilled();
-      String message = "Missing forceKillApplication request or ApplicationId.";
-      RouterAuditLogger.logFailure(user.getShortUserName(),
-          RouterAuditLogger.AuditConstants.FORCE_KILL_APP, "UNKNOWN",
-          "RouterClientRMService", message);
-      throw new YarnException(message);
+      String msg = "Missing forceKillApplication request or ApplicationId.";
+      RouterAuditLogger.logFailure(user.getShortUserName(), FORCE_KILL_APP, UNKNOWN,
+          TARGET_CLIENT_RM_SERVICE, msg);
+      RouterServerUtil.logAndThrowException(msg, null);
     }
+
+    long startTime = clock.getTime();
+
     ApplicationId applicationId = request.getApplicationId();
     SubClusterId subClusterId = null;
 
@@ -531,12 +528,11 @@
           .getApplicationHomeSubCluster(request.getApplicationId());
     } catch (YarnException e) {
       routerMetrics.incrAppsFailedKilled();
-      String msg = "Application " + applicationId
-          + " does not exist in FederationStateStore";
-      RouterAuditLogger.logFailure(user.getShortUserName(),
-          RouterAuditLogger.AuditConstants.FORCE_KILL_APP, "UNKNOWN",
-          "RouterClientRMService", msg, applicationId);
-      throw new YarnException(msg, e);
+      String msg =
+          String.format("Application %s does not exist in FederationStateStore.", applicationId);
+      RouterAuditLogger.logFailure(user.getShortUserName(), FORCE_KILL_APP, UNKNOWN,
+          TARGET_CLIENT_RM_SERVICE, msg, applicationId);
+      RouterServerUtil.logAndThrowException(msg, e);
     }
 
     ApplicationClientProtocol clientRMProxy =
@@ -548,11 +544,10 @@
       response = clientRMProxy.forceKillApplication(request);
     } catch (Exception e) {
       routerMetrics.incrAppsFailedKilled();
-      RouterAuditLogger.logFailure(user.getShortUserName(),
-          RouterAuditLogger.AuditConstants.FORCE_KILL_APP, "UNKNOWN",
-          "RouterClientRMService", "Unable to kill the application report",
-          applicationId, subClusterId);
-      throw e;
+      String msg = "Unable to kill the application report.";
+      RouterAuditLogger.logFailure(user.getShortUserName(), FORCE_KILL_APP, UNKNOWN,
+          TARGET_CLIENT_RM_SERVICE, msg, applicationId, subClusterId);
+      RouterServerUtil.logAndThrowException(msg, e);
     }
 
     if (response == null) {
@@ -562,9 +557,8 @@
 
     long stopTime = clock.getTime();
     routerMetrics.succeededAppsKilled(stopTime - startTime);
-    RouterAuditLogger.logSuccess(user.getShortUserName(),
-        RouterAuditLogger.AuditConstants.FORCE_KILL_APP,
-        "RouterClientRMService", applicationId);
+    RouterAuditLogger.logSuccess(user.getShortUserName(), FORCE_KILL_APP,
+        TARGET_CLIENT_RM_SERVICE, applicationId);
     return response;
   }
 
@@ -588,18 +582,15 @@
   public GetApplicationReportResponse getApplicationReport(
       GetApplicationReportRequest request) throws YarnException, IOException {
 
-    long startTime = clock.getTime();
-
     if (request == null || request.getApplicationId() == null) {
       routerMetrics.incrAppsFailedRetrieved();
-      String errMsg =
-          "Missing getApplicationReport request or applicationId information.";
-      RouterAuditLogger.logFailure(user.getShortUserName(),
-          RouterAuditLogger.AuditConstants.GET_APP_REPORT, "UNKNOWN",
-          "RouterClientRMService", errMsg);
-      throw new YarnException(errMsg);
+      String errMsg = "Missing getApplicationReport request or applicationId information.";
+      RouterAuditLogger.logFailure(user.getShortUserName(), GET_APP_REPORT, UNKNOWN,
+          TARGET_CLIENT_RM_SERVICE, errMsg);
+      RouterServerUtil.logAndThrowException(errMsg, null);
     }
 
+    long startTime = clock.getTime();
     SubClusterId subClusterId = null;
 
     try {
@@ -607,29 +598,26 @@
           .getApplicationHomeSubCluster(request.getApplicationId());
     } catch (YarnException e) {
       routerMetrics.incrAppsFailedRetrieved();
-      String errMsg = "Application " + request.getApplicationId()
-          + " does not exist in FederationStateStore";
-      RouterAuditLogger.logFailure(user.getShortUserName(),
-          RouterAuditLogger.AuditConstants.GET_APP_REPORT, "UNKNOWN",
-          "RouterClientRMService", errMsg, request.getApplicationId());
-      throw new YarnException(errMsg, e);
+      String errMsg = String.format("Application %s does not exist in FederationStateStore.",
+          request.getApplicationId());
+      RouterAuditLogger.logFailure(user.getShortUserName(), GET_APP_REPORT, UNKNOWN,
+          TARGET_CLIENT_RM_SERVICE, errMsg, request.getApplicationId());
+      RouterServerUtil.logAndThrowException(errMsg, e);
     }
 
     ApplicationClientProtocol clientRMProxy =
         getClientRMProxyForSubCluster(subClusterId);
-
     GetApplicationReportResponse response = null;
+
     try {
       response = clientRMProxy.getApplicationReport(request);
     } catch (Exception e) {
       routerMetrics.incrAppsFailedRetrieved();
-      String errMsg = "Unable to get the application report for " + request
-          .getApplicationId() + "to SubCluster " + subClusterId.getId();
-      RouterAuditLogger.logFailure(user.getShortUserName(),
-          RouterAuditLogger.AuditConstants.GET_APP_REPORT, "UNKNOWN",
-          "RouterClientRMService", errMsg, request.getApplicationId(),
-          subClusterId);
-      throw e;
+      String errMsg = String.format("Unable to get the application report for %s to SubCluster %s.",
+          request.getApplicationId(), subClusterId.getId());
+      RouterAuditLogger.logFailure(user.getShortUserName(), GET_APP_REPORT, UNKNOWN,
+          TARGET_CLIENT_RM_SERVICE, errMsg, request.getApplicationId(), subClusterId);
+      RouterServerUtil.logAndThrowException(errMsg, e);
     }
 
     if (response == null) {
@@ -637,12 +625,10 @@
           + "the application {} to SubCluster {}.",
           request.getApplicationId(), subClusterId.getId());
     }
-
     long stopTime = clock.getTime();
     routerMetrics.succeededAppsRetrieved(stopTime - startTime);
-    RouterAuditLogger.logSuccess(user.getShortUserName(),
-        RouterAuditLogger.AuditConstants.GET_APP_REPORT,
-        "RouterClientRMService", request.getApplicationId());
+    RouterAuditLogger.logSuccess(user.getShortUserName(), GET_APP_REPORT,
+        TARGET_CLIENT_RM_SERVICE, request.getApplicationId());
     return response;
   }
 
@@ -670,56 +656,48 @@
       throws YarnException, IOException {
     if (request == null) {
       routerMetrics.incrMultipleAppsFailedRetrieved();
-      RouterServerUtil.logAndThrowException(
-          "Missing getApplications request.",
-          null);
+      RouterServerUtil.logAndThrowException("Missing getApplications request.", null);
     }
     long startTime = clock.getTime();
     Map<SubClusterId, SubClusterInfo> subclusters =
         federationFacade.getSubClusters(true);
     ClientMethod remoteMethod = new ClientMethod("getApplications",
         new Class[] {GetApplicationsRequest.class}, new Object[] {request});
-    Map<SubClusterId, GetApplicationsResponse> applications;
-
+    Map<SubClusterId, GetApplicationsResponse> applications = null;
     try {
       applications = invokeConcurrent(subclusters.keySet(), remoteMethod,
           GetApplicationsResponse.class);
-
     } catch (Exception ex) {
       routerMetrics.incrMultipleAppsFailedRetrieved();
-      LOG.error("Unable to get applications due to exception.", ex);
-      throw ex;
+      RouterServerUtil.logAndThrowException("Unable to get applications due to exception.", ex);
     }
     long stopTime = clock.getTime();
     routerMetrics.succeededMultipleAppsRetrieved(stopTime - startTime);
     // Merge the Application Reports
-    return RouterYarnClientUtils.mergeApplications(applications.values(),
-        returnPartialReport);
+    return RouterYarnClientUtils.mergeApplications(applications.values(), returnPartialReport);
   }
 
   @Override
   public GetClusterMetricsResponse getClusterMetrics(
       GetClusterMetricsRequest request) throws YarnException, IOException {
+    if (request == null) {
+      routerMetrics.incrGetClusterMetricsFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Missing getClusterMetrics request.", null);
+    }
     long startTime = clock.getTime();
-    Map<SubClusterId, SubClusterInfo> subclusters =
-        federationFacade.getSubClusters(true);
     ClientMethod remoteMethod = new ClientMethod("getClusterMetrics",
         new Class[] {GetClusterMetricsRequest.class}, new Object[] {request});
-    Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics;
-
+    Collection<GetClusterMetricsResponse> clusterMetrics = null;
     try {
-      clusterMetrics = invokeConcurrent(subclusters.keySet(), remoteMethod,
-          GetClusterMetricsResponse.class);
-
+      clusterMetrics = invokeAppClientProtocolMethod(
+          true, remoteMethod, GetClusterMetricsResponse.class);
     } catch (Exception ex) {
       routerMetrics.incrGetClusterMetricsFailedRetrieved();
-      LOG.error("Unable to get cluster metrics due to exception.", ex);
-      throw ex;
+      RouterServerUtil.logAndThrowException("Unable to get cluster metrics due to exception.", ex);
     }
-
     long stopTime = clock.getTime();
     routerMetrics.succeededGetClusterMetricsRetrieved(stopTime - startTime);
-    return RouterYarnClientUtils.merge(clusterMetrics.values());
+    return RouterYarnClientUtils.merge(clusterMetrics);
   }
 
   <R> Map<SubClusterId, R> invokeConcurrent(ArrayList<SubClusterId> clusterIds,
@@ -804,8 +782,7 @@
         clusterNodes.put(subClusterId, response);
       } catch (Exception ex) {
         routerMetrics.incrClusterNodesFailedRetrieved();
-        LOG.error("Unable to get cluster nodes due to exception.", ex);
-        throw ex;
+        RouterServerUtil.logAndThrowException("Unable to get cluster nodes due to exception.", ex);
       }
     }
     long stopTime = clock.getTime();
@@ -850,14 +827,13 @@
     long startTime = clock.getTime();
     ClientMethod remoteMethod = new ClientMethod("getQueueUserAcls",
         new Class[] {GetQueueUserAclsInfoRequest.class}, new Object[] {request});
-    Collection<GetQueueUserAclsInfoResponse> queueUserAcls;
+    Collection<GetQueueUserAclsInfoResponse> queueUserAcls = null;
     try {
       queueUserAcls = invokeAppClientProtocolMethod(true, remoteMethod,
           GetQueueUserAclsInfoResponse.class);
     } catch (Exception ex) {
       routerMetrics.incrQueueUserAclsFailedRetrieved();
-      LOG.error("Unable to get queue user Acls due to exception.", ex);
-      throw ex;
+      RouterServerUtil.logAndThrowException("Unable to get queue user Acls due to exception.", ex);
     }
     long stopTime = clock.getTime();
     routerMetrics.succeededGetQueueUserAclsRetrieved(stopTime - startTime);
@@ -931,14 +907,14 @@
     long startTime = clock.getTime();
     ClientMethod remoteMethod = new ClientMethod("listReservations",
         new Class[] {ReservationListRequest.class}, new Object[] {request});
-    Collection<ReservationListResponse> listResponses;
+    Collection<ReservationListResponse> listResponses = null;
     try {
       listResponses = invokeAppClientProtocolMethod(true, remoteMethod,
           ReservationListResponse.class);
     } catch (Exception ex) {
       routerMetrics.incrListReservationsFailedRetrieved();
-      LOG.error("Unable to list reservations node due to exception.", ex);
-      throw ex;
+      RouterServerUtil.logAndThrowException(
+          "Unable to list reservations node due to exception.", ex);
     }
     long stopTime = clock.getTime();
     routerMetrics.succeededListReservationsRetrieved(stopTime - startTime);
@@ -986,14 +962,13 @@
     long startTime = clock.getTime();
     ClientMethod remoteMethod = new ClientMethod("getNodeToLabels",
          new Class[] {GetNodesToLabelsRequest.class}, new Object[] {request});
-    Collection<GetNodesToLabelsResponse> clusterNodes;
+    Collection<GetNodesToLabelsResponse> clusterNodes = null;
     try {
       clusterNodes = invokeAppClientProtocolMethod(true, remoteMethod,
           GetNodesToLabelsResponse.class);
     } catch (Exception ex) {
       routerMetrics.incrNodeToLabelsFailedRetrieved();
-      LOG.error("Unable to get label node due to exception.", ex);
-      throw ex;
+      RouterServerUtil.logAndThrowException("Unable to get node label due to exception.", ex);
     }
     long stopTime = clock.getTime();
     routerMetrics.succeededGetNodeToLabelsRetrieved(stopTime - startTime);
@@ -1010,15 +985,14 @@
     }
     long startTime = clock.getTime();
     ClientMethod remoteMethod = new ClientMethod("getLabelsToNodes",
-        new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request});
-    Collection<GetLabelsToNodesResponse> labelNodes;
+         new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request});
+    Collection<GetLabelsToNodesResponse> labelNodes = null;
     try {
       labelNodes = invokeAppClientProtocolMethod(true, remoteMethod,
           GetLabelsToNodesResponse.class);
     } catch (Exception ex) {
       routerMetrics.incrLabelsToNodesFailedRetrieved();
-      LOG.error("Unable to get label node due to exception.", ex);
-      throw ex;
+      RouterServerUtil.logAndThrowException("Unable to get label node due to exception.", ex);
     }
     long stopTime = clock.getTime();
     routerMetrics.succeededGetLabelsToNodesRetrieved(stopTime - startTime);
@@ -1035,15 +1009,15 @@
     }
     long startTime = clock.getTime();
     ClientMethod remoteMethod = new ClientMethod("getClusterNodeLabels",
-        new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request});
-    Collection<GetClusterNodeLabelsResponse> nodeLabels;
+         new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request});
+    Collection<GetClusterNodeLabelsResponse> nodeLabels = null;
     try {
       nodeLabels = invokeAppClientProtocolMethod(true, remoteMethod,
           GetClusterNodeLabelsResponse.class);
     } catch (Exception ex) {
       routerMetrics.incrClusterNodeLabelsFailedRetrieved();
-      LOG.error("Unable to get cluster nodeLabels due to exception.", ex);
-      throw ex;
+      RouterServerUtil.logAndThrowException("Unable to get cluster nodeLabels due to exception.",
+          ex);
     }
     long stopTime = clock.getTime();
     routerMetrics.succeededGetClusterNodeLabelsRetrieved(stopTime - startTime);
@@ -1096,15 +1070,15 @@
     ApplicationClientProtocol clientRMProxy =
         getClientRMProxyForSubCluster(subClusterId);
 
-    GetApplicationAttemptReportResponse response;
+    GetApplicationAttemptReportResponse response = null;
     try {
       response = clientRMProxy.getApplicationAttemptReport(request);
     } catch (Exception e) {
       routerMetrics.incrAppAttemptsFailedRetrieved();
-      LOG.error("Unable to get the applicationAttempt report for {} "
-          + "to SubCluster {}, error = {}.",
-          request.getApplicationAttemptId(), subClusterId.getId(), e);
-      throw e;
+      String msg = String.format(
+          "Unable to get the applicationAttempt report for %s to SubCluster %s.",
+          request.getApplicationAttemptId(), subClusterId.getId());
+      RouterServerUtil.logAndThrowException(msg, e);
     }
 
     if (response == null) {
@@ -1328,8 +1302,7 @@
     } catch (YarnException e) {
       routerMetrics.incrUpdateAppPriorityFailedRetrieved();
       RouterServerUtil.logAndThrowException("Application " +
-              request.getApplicationId() +
-              " does not exist in FederationStateStore.", e);
+          request.getApplicationId() + " does not exist in FederationStateStore.", e);
     }
 
     ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
index f0aa480..6a47f15 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
@@ -156,14 +156,13 @@
 
     stateStore = new MemoryFederationStateStore();
     stateStore.init(this.getConf());
-    FederationStateStoreFacade.getInstance().reinitialize(stateStore,
-        getConf());
+    FederationStateStoreFacade.getInstance().reinitialize(stateStore, getConf());
     stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
 
     interceptor.setConf(this.getConf());
     interceptor.init(user);
 
-    subClusters = new ArrayList<SubClusterId>();
+    subClusters = new ArrayList<>();
 
     try {
       for (int i = 0; i < NUM_SUBCLUSTER; i++) {
@@ -197,7 +196,7 @@
     // chain
     conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
         mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
-            + "," + TestableFederationClientInterceptor.class.getName());
+        + "," + TestableFederationClientInterceptor.class.getName());
 
     conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
         UniformBroadcastPolicyManager.class.getName());
@@ -212,17 +211,16 @@
    * ApplicationId has to belong to one of the SubCluster in the cluster.
    */
   @Test
-  public void testGetNewApplication()
-      throws YarnException, IOException, InterruptedException {
-    LOG.info("Test FederationClientInterceptor: Get New Application");
+  public void testGetNewApplication() throws YarnException, IOException {
+    LOG.info("Test FederationClientInterceptor: Get New Application.");
 
     GetNewApplicationRequest request = GetNewApplicationRequest.newInstance();
     GetNewApplicationResponse response = interceptor.getNewApplication(request);
 
     Assert.assertNotNull(response);
     Assert.assertNotNull(response.getApplicationId());
-    Assert.assertTrue(response.getApplicationId()
-        .getClusterTimestamp() == ResourceManager.getClusterTimeStamp());
+    Assert.assertEquals(response.getApplicationId().getClusterTimestamp(),
+        ResourceManager.getClusterTimeStamp());
   }
 
   /**
@@ -232,10 +230,9 @@
   @Test
   public void testSubmitApplication()
       throws YarnException, IOException {
-    LOG.info("Test FederationClientInterceptor: Submit Application");
+    LOG.info("Test FederationClientInterceptor: Submit Application.");
 
-    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
-        1);
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
     SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
 
     SubmitApplicationResponse response = interceptor.submitApplication(request);
@@ -249,14 +246,12 @@
   private SubmitApplicationRequest mockSubmitApplicationRequest(
       ApplicationId appId) {
     ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
-    ApplicationSubmissionContext context = ApplicationSubmissionContext
-        .newInstance(appId, MockApps.newAppName(), "default",
-            Priority.newInstance(APP_PRIORITY_ZERO), amContainerSpec, false, false, -1,
-            Resources.createResource(
-                YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
-            "MockApp");
-    SubmitApplicationRequest request = SubmitApplicationRequest
-        .newInstance(context);
+    ApplicationSubmissionContext context = ApplicationSubmissionContext.newInstance(
+        appId, MockApps.newAppName(), "default",
+        Priority.newInstance(APP_PRIORITY_ZERO), amContainerSpec, false, false, -1,
+        Resources.createResource(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
+        "MockApp");
+    SubmitApplicationRequest request = SubmitApplicationRequest.newInstance(context);
     return request;
   }
 
@@ -297,37 +292,27 @@
    */
   @Test
   public void testSubmitApplicationEmptyRequest()
-      throws YarnException, IOException, InterruptedException {
-    LOG.info(
-        "Test FederationClientInterceptor: Submit Application - Empty");
-    try {
-      interceptor.submitApplication(null);
-      Assert.fail();
-    } catch (YarnException e) {
-      Assert.assertTrue(
-          e.getMessage().startsWith("Missing submitApplication request or "
-              + "applicationSubmissionContext information."));
-    }
-    try {
-      interceptor.submitApplication(SubmitApplicationRequest.newInstance(null));
-      Assert.fail();
-    } catch (YarnException e) {
-      Assert.assertTrue(
-          e.getMessage().startsWith("Missing submitApplication request or "
-              + "applicationSubmissionContext information."));
-    }
-    try {
-      ApplicationSubmissionContext context = ApplicationSubmissionContext
-          .newInstance(null, "", "", null, null, false, false, -1, null, null);
-      SubmitApplicationRequest request =
-          SubmitApplicationRequest.newInstance(context);
-      interceptor.submitApplication(request);
-      Assert.fail();
-    } catch (YarnException e) {
-      Assert.assertTrue(
-          e.getMessage().startsWith("Missing submitApplication request or "
-              + "applicationSubmissionContext information."));
-    }
+      throws Exception {
+    LOG.info("Test FederationClientInterceptor: Submit Application - Empty.");
+
+    // null request1
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitApplication request or applicationSubmissionContext information.",
+        () -> interceptor.submitApplication(null));
+
+    // null request2
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitApplication request or applicationSubmissionContext information.",
+        () -> interceptor.submitApplication(SubmitApplicationRequest.newInstance(null)));
+
+    // null request3
+    ApplicationSubmissionContext context = ApplicationSubmissionContext
+        .newInstance(null, "", "", null, null, false, false, -1, null, null);
+    SubmitApplicationRequest request =
+        SubmitApplicationRequest.newInstance(context);
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing submitApplication request or applicationSubmissionContext information.",
+        () -> interceptor.submitApplication(request));
   }
 
   /**
@@ -337,7 +322,7 @@
   @Test
   public void testForceKillApplication()
       throws YarnException, IOException, InterruptedException {
-    LOG.info("Test FederationClientInterceptor: Force Kill Application");
+    LOG.info("Test FederationClientInterceptor: Force Kill Application.");
 
     ApplicationId appId =
         ApplicationId.newInstance(System.currentTimeMillis(), 1);
@@ -349,10 +334,8 @@
     Assert.assertNotNull(response);
     Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
 
-    KillApplicationRequest requestKill =
-        KillApplicationRequest.newInstance(appId);
-    KillApplicationResponse responseKill =
-        interceptor.forceKillApplication(requestKill);
+    KillApplicationRequest requestKill = KillApplicationRequest.newInstance(appId);
+    KillApplicationResponse responseKill = interceptor.forceKillApplication(requestKill);
     Assert.assertNotNull(responseKill);
   }
 
@@ -361,22 +344,17 @@
    * application does not exist in StateStore.
    */
   @Test
-  public void testForceKillApplicationNotExists()
-      throws YarnException, IOException, InterruptedException {
-    LOG.info("Test FederationClientInterceptor: "
-        + "Force Kill Application - Not Exists");
+  public void testForceKillApplicationNotExists() throws Exception {
+    LOG.info("Test FederationClientInterceptor: Force Kill Application - Not Exists");
 
     ApplicationId appId =
         ApplicationId.newInstance(System.currentTimeMillis(), 1);
     KillApplicationRequest requestKill =
         KillApplicationRequest.newInstance(appId);
-    try {
-      interceptor.forceKillApplication(requestKill);
-      Assert.fail();
-    } catch (YarnException e) {
-      Assert.assertTrue(e.getMessage().equals(
-          "Application " + appId + " does not exist in FederationStateStore"));
-    }
+
+    LambdaTestUtils.intercept(YarnException.class,
+        "Application " + appId + " does not exist in FederationStateStore.",
+        () -> interceptor.forceKillApplication(requestKill));
   }
 
   /**
@@ -385,24 +363,19 @@
    */
   @Test
   public void testForceKillApplicationEmptyRequest()
-      throws YarnException, IOException, InterruptedException {
-    LOG.info(
-        "Test FederationClientInterceptor: Force Kill Application - Empty");
-    try {
-      interceptor.forceKillApplication(null);
-      Assert.fail();
-    } catch (YarnException e) {
-      Assert.assertTrue(e.getMessage().startsWith(
-          "Missing forceKillApplication request or ApplicationId."));
-    }
-    try {
-      interceptor
-          .forceKillApplication(KillApplicationRequest.newInstance(null));
-      Assert.fail();
-    } catch (YarnException e) {
-      Assert.assertTrue(e.getMessage().startsWith(
-          "Missing forceKillApplication request or ApplicationId."));
-    }
+      throws Exception {
+    LOG.info("Test FederationClientInterceptor: Force Kill Application - Empty.");
+
+    // null request1
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing forceKillApplication request or ApplicationId.",
+        () -> interceptor.forceKillApplication(null));
+
+    // null request2
+    KillApplicationRequest killRequest = KillApplicationRequest.newInstance(null);
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing forceKillApplication request or ApplicationId.",
+        () -> interceptor.forceKillApplication(killRequest));
   }
 
   /**
@@ -439,20 +412,14 @@
    */
   @Test
   public void testGetApplicationNotExists()
-      throws YarnException, IOException, InterruptedException {
-    LOG.info(
-        "Test ApplicationClientProtocol: Get Application Report - Not Exists");
-    ApplicationId appId =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
-    GetApplicationReportRequest requestGet =
-        GetApplicationReportRequest.newInstance(appId);
-    try {
-      interceptor.getApplicationReport(requestGet);
-      Assert.fail();
-    } catch (YarnException e) {
-      Assert.assertTrue(e.getMessage().equals(
-          "Application " + appId + " does not exist in FederationStateStore"));
-    }
+      throws Exception {
+    LOG.info("Test ApplicationClientProtocol: Get Application Report - Not Exists.");
+
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    GetApplicationReportRequest requestGet = GetApplicationReportRequest.newInstance(appId);
+    LambdaTestUtils.intercept(YarnException.class,
+        "Application " + appId + " does not exist in FederationStateStore.",
+        () -> interceptor.getApplicationReport(requestGet));
   }
 
   /**
@@ -461,31 +428,23 @@
    */
   @Test
   public void testGetApplicationEmptyRequest()
-      throws YarnException, IOException, InterruptedException {
-    LOG.info(
-        "Test FederationClientInterceptor: Get Application Report - Empty");
-    try {
-      interceptor.getApplicationReport(null);
-      Assert.fail();
-    } catch (YarnException e) {
-      Assert.assertTrue(
-          e.getMessage().startsWith("Missing getApplicationReport request or "
-              + "applicationId information."));
-    }
-    try {
-      interceptor
-          .getApplicationReport(GetApplicationReportRequest.newInstance(null));
-      Assert.fail();
-    } catch (YarnException e) {
-      Assert.assertTrue(
-          e.getMessage().startsWith("Missing getApplicationReport request or "
-              + "applicationId information."));
-    }
+      throws Exception {
+    LOG.info("Test FederationClientInterceptor: Get Application Report - Empty.");
+
+    // null request1
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing getApplicationReport request or applicationId information.",
+        () -> interceptor.getApplicationReport(null));
+
+    // null request2
+    GetApplicationReportRequest reportRequest = GetApplicationReportRequest.newInstance(null);
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing getApplicationReport request or applicationId information.",
+        () -> interceptor.getApplicationReport(reportRequest));
   }
 
   /**
-   * This test validates the correctness of
-   * GetApplicationAttemptReport in case the
+   * This test validates the correctness of GetApplicationAttemptReport in case the
    * application exists in the cluster.
    */
   @Test
@@ -529,77 +488,68 @@
   }
 
   /**
-   * This test validates the correctness of
-   * GetApplicationAttemptReport in case the
+   * This test validates the correctness of GetApplicationAttemptReport in case the
    * application does not exist in StateStore.
    */
   @Test
-  public void testGetApplicationAttemptNotExists()
-          throws Exception {
-    LOG.info(
-            "Test ApplicationClientProtocol: " +
-                    "Get ApplicationAttempt Report - Not Exists");
+  public void testGetApplicationAttemptNotExists() throws Exception {
+    LOG.info("Test FederationClientInterceptor: Get ApplicationAttempt Report - Not Exists.");
+
     ApplicationId appId =
-            ApplicationId.newInstance(System.currentTimeMillis(), 1);
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
     ApplicationAttemptId appAttemptID =
-            ApplicationAttemptId.newInstance(appId, 1);
+        ApplicationAttemptId.newInstance(appId, 1);
     GetApplicationAttemptReportRequest requestGet =
-            GetApplicationAttemptReportRequest.newInstance(appAttemptID);
+        GetApplicationAttemptReportRequest.newInstance(appAttemptID);
 
     LambdaTestUtils.intercept(YarnException.class, "ApplicationAttempt " +
-            appAttemptID + " belongs to Application " +
-            appId + " does not exist in FederationStateStore.",
+        appAttemptID + " belongs to Application " +
+        appId + " does not exist in FederationStateStore.",
         () -> interceptor.getApplicationAttemptReport(requestGet));
   }
 
   /**
-   * This test validates
-   * the correctness of GetApplicationAttemptReport in case of
+   * This test validates the correctness of GetApplicationAttemptReport in case of
    * empty request.
    */
   @Test
   public void testGetApplicationAttemptEmptyRequest()
-          throws Exception {
-    LOG.info("Test FederationClientInterceptor: " +
-                    "Get ApplicationAttempt Report - Empty");
+      throws Exception {
+    LOG.info("Test FederationClientInterceptor: Get ApplicationAttempt Report - Empty.");
 
+    // null request1
     LambdaTestUtils.intercept(YarnException.class,
-            "Missing getApplicationAttemptReport " +
-                    "request or applicationId " +
-                    "or applicationAttemptId information.",
+        "Missing getApplicationAttemptReport request or applicationId " +
+        "or applicationAttemptId information.",
         () -> interceptor.getApplicationAttemptReport(null));
 
+    // null request2
     LambdaTestUtils.intercept(YarnException.class,
-            "Missing getApplicationAttemptReport " +
-                    "request or applicationId " +
-                    "or applicationAttemptId information.",
-        () -> interceptor
-                    .getApplicationAttemptReport(
-                            GetApplicationAttemptReportRequest
-                                    .newInstance(null)));
+        "Missing getApplicationAttemptReport request or applicationId " +
+        "or applicationAttemptId information.",
+        () -> interceptor.getApplicationAttemptReport(
+        GetApplicationAttemptReportRequest.newInstance(null)));
 
+    // null request3
     LambdaTestUtils.intercept(YarnException.class,
-            "Missing getApplicationAttemptReport " +
-                    "request or applicationId " +
-                    "or applicationAttemptId information.",
-        () -> interceptor
-                    .getApplicationAttemptReport(
-                            GetApplicationAttemptReportRequest.newInstance(
-                                    ApplicationAttemptId
-                                            .newInstance(null, 1)
-                            )));
+        "Missing getApplicationAttemptReport request or applicationId " +
+        "or applicationAttemptId information.",
+        () -> interceptor.getApplicationAttemptReport(
+        GetApplicationAttemptReportRequest.newInstance(
+        ApplicationAttemptId.newInstance(null, 1))));
   }
 
 
   @Test
-  public void testGetClusterMetricsRequest() throws YarnException, IOException {
-    LOG.info("Test FederationClientInterceptor : Get Cluster Metrics request");
+  public void testGetClusterMetricsRequest() throws Exception {
+    LOG.info("Test FederationClientInterceptor : Get Cluster Metrics request.");
+
     // null request
-    GetClusterMetricsResponse response = interceptor.getClusterMetrics(null);
-    Assert.assertEquals(subClusters.size(),
-        response.getClusterMetrics().getNumNodeManagers());
+    LambdaTestUtils.intercept(YarnException.class, "Missing getClusterMetrics request.",
+        () -> interceptor.getClusterMetrics(null));
+
     // normal request.
-    response =
+    GetClusterMetricsResponse response =
         interceptor.getClusterMetrics(GetClusterMetricsRequest.newInstance());
     Assert.assertEquals(subClusters.size(),
         response.getClusterMetrics().getNumNodeManagers());
@@ -607,23 +557,20 @@
     ClientMethod remoteMethod = new ClientMethod("getClusterMetrics",
         new Class[] {GetClusterMetricsRequest.class},
         new Object[] {GetClusterMetricsRequest.newInstance()});
-    Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics =interceptor.
-        invokeConcurrent(new ArrayList<>(), remoteMethod,
-            GetClusterMetricsResponse.class);
-    Assert.assertEquals(true, clusterMetrics.isEmpty());
+    Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics = interceptor.
+        invokeConcurrent(new ArrayList<>(), remoteMethod, GetClusterMetricsResponse.class);
+    Assert.assertTrue(clusterMetrics.isEmpty());
   }
 
   /**
-   * This test validates the correctness of
-   * GetApplicationsResponse in case the
+   * This test validates the correctness of GetApplicationsResponse in case the
    * application exists in the cluster.
    */
   @Test
   public void testGetApplicationsResponse()
       throws YarnException, IOException, InterruptedException {
-    LOG.info("Test FederationClientInterceptor: Get Applications Response");
-    ApplicationId appId =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    LOG.info("Test FederationClientInterceptor: Get Applications Response.");
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
 
     SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
     SubmitApplicationResponse response = interceptor.submitApplication(request);
@@ -632,40 +579,32 @@
     Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
 
     Set<String> appTypes = Collections.singleton("MockApp");
-    GetApplicationsRequest requestGet =
-        GetApplicationsRequest.newInstance(appTypes);
-
-    GetApplicationsResponse responseGet =
-        interceptor.getApplications(requestGet);
+    GetApplicationsRequest requestGet = GetApplicationsRequest.newInstance(appTypes);
+    GetApplicationsResponse responseGet = interceptor.getApplications(requestGet);
 
     Assert.assertNotNull(responseGet);
   }
 
   /**
-   * This test validates
-   * the correctness of GetApplicationsResponse in case of
+   * This test validates the correctness of GetApplicationsResponse in case of
    * empty request.
    */
   @Test
   public void testGetApplicationsNullRequest() throws Exception {
-    LOG.info("Test FederationClientInterceptor: Get Applications request");
-    LambdaTestUtils.intercept(YarnException.class,
-        "Missing getApplications request.",
+    LOG.info("Test FederationClientInterceptor: Get Applications request.");
+    LambdaTestUtils.intercept(YarnException.class, "Missing getApplications request.",
         () -> interceptor.getApplications(null));
   }
 
   /**
-   * This test validates
-   * the correctness of GetApplicationsResponse in case applications
+   * This test validates the correctness of GetApplicationsResponse in case applications
    * with given type does not exist.
    */
   @Test
   public void testGetApplicationsApplicationTypeNotExists() throws Exception{
-    LOG.info("Test FederationClientInterceptor: Application with type does "
-        + "not exist");
+    LOG.info("Test FederationClientInterceptor: Application with type does not exist.");
 
-    ApplicationId appId =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
 
     SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
     SubmitApplicationResponse response = interceptor.submitApplication(request);
@@ -675,25 +614,20 @@
 
     Set<String> appTypes = Collections.singleton("SPARK");
 
-    GetApplicationsRequest requestGet =
-        GetApplicationsRequest.newInstance(appTypes);
-
-    GetApplicationsResponse responseGet =
-        interceptor.getApplications(requestGet);
+    GetApplicationsRequest requestGet = GetApplicationsRequest.newInstance(appTypes);
+    GetApplicationsResponse responseGet = interceptor.getApplications(requestGet);
 
     Assert.assertNotNull(responseGet);
     Assert.assertTrue(responseGet.getApplicationList().isEmpty());
   }
 
   /**
-   * This test validates
-   * the correctness of GetApplicationsResponse in case applications
+   * This test validates the correctness of GetApplicationsResponse in case applications
    * with given YarnApplicationState does not exist.
    */
   @Test
   public void testGetApplicationsApplicationStateNotExists() throws Exception {
-    LOG.info("Test FederationClientInterceptor:" +
-        " Application with state does not exist");
+    LOG.info("Test FederationClientInterceptor: Application with state does not exist.");
 
     ApplicationId appId =
         ApplicationId.newInstance(System.currentTimeMillis(), 1);
@@ -711,8 +645,7 @@
     GetApplicationsRequest requestGet =
         GetApplicationsRequest.newInstance(applicationStates);
 
-    GetApplicationsResponse responseGet =
-        interceptor.getApplications(requestGet);
+    GetApplicationsResponse responseGet = interceptor.getApplications(requestGet);
 
     Assert.assertNotNull(responseGet);
     Assert.assertTrue(responseGet.getApplicationList().isEmpty());
@@ -720,7 +653,7 @@
 
   @Test
   public void testGetClusterNodesRequest() throws Exception {
-    LOG.info("Test FederationClientInterceptor : Get Cluster Nodeds request");
+    LOG.info("Test FederationClientInterceptor : Get Cluster Nodes request.");
     // null request
     LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodes request.",
         () -> interceptor.getClusterNodes(null));
@@ -732,7 +665,7 @@
 
   @Test
   public void testGetNodeToLabelsRequest() throws Exception {
-    LOG.info("Test FederationClientInterceptor :  Get Node To Labels request");
+    LOG.info("Test FederationClientInterceptor :  Get Node To Labels request.");
     // null request
     LambdaTestUtils.intercept(YarnException.class, "Missing getNodesToLabels request.",
         () -> interceptor.getNodeToLabels(null));
@@ -744,7 +677,7 @@
 
   @Test
   public void testGetLabelsToNodesRequest() throws Exception {
-    LOG.info("Test FederationClientInterceptor :  Get Labels To Node request");
+    LOG.info("Test FederationClientInterceptor :  Get Labels To Node request.");
     // null request
     LambdaTestUtils.intercept(YarnException.class, "Missing getLabelsToNodes request.",
         () -> interceptor.getLabelsToNodes(null));
@@ -756,7 +689,7 @@
 
   @Test
   public void testClusterNodeLabelsRequest() throws Exception {
-    LOG.info("Test FederationClientInterceptor :  Get Cluster NodeLabels request");
+    LOG.info("Test FederationClientInterceptor :  Get Cluster NodeLabels request.");
     // null request
     LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodeLabels request.",
         () -> interceptor.getClusterNodeLabels(null));
@@ -768,13 +701,13 @@
 
   @Test
   public void testGetQueueUserAcls() throws Exception {
-    LOG.info("Test FederationClientInterceptor : Get QueueUserAcls request");
+    LOG.info("Test FederationClientInterceptor : Get QueueUserAcls request.");
 
     // null request
     LambdaTestUtils.intercept(YarnException.class, "Missing getQueueUserAcls request.",
         () -> interceptor.getQueueUserAcls(null));
 
-    // noraml request
+    // normal request
     GetQueueUserAclsInfoResponse response = interceptor.getQueueUserAcls(
         GetQueueUserAclsInfoRequest.newInstance());
 
@@ -796,7 +729,7 @@
 
   @Test
   public void testListReservations() throws Exception {
-    LOG.info("Test FederationClientInterceptor :  Get ListReservations request");
+    LOG.info("Test FederationClientInterceptor :  Get ListReservations request.");
 
     // null request
     LambdaTestUtils.intercept(YarnException.class, "Missing listReservations request.",
@@ -812,7 +745,7 @@
 
   @Test
   public void testGetContainersRequest() throws Exception {
-    LOG.info("Test FederationClientInterceptor : Get Containers request");
+    LOG.info("Test FederationClientInterceptor : Get Containers request.");
 
     // null request
     LambdaTestUtils.intercept(YarnException.class, "Missing getContainers request " +
@@ -928,7 +861,7 @@
 
   @Test
   public void testGetResourceTypeInfoRequest() throws Exception {
-    LOG.info("Test FederationClientInterceptor :  Get Resource TypeInfo request");
+    LOG.info("Test FederationClientInterceptor :  Get Resource TypeInfo request.");
     // null request
     LambdaTestUtils.intercept(YarnException.class, "Missing getResourceTypeInfo request.",
         () -> interceptor.getResourceTypeInfo(null));
@@ -1109,7 +1042,7 @@
         RMAppAttemptState.SCHEDULED);
     MockNM nm = interceptor.getMockNMs().get(subClusterId);
     nm.nodeHeartbeat(true);
-    mockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED);
+    MockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED);
     mockRM.sendAMLaunched(rmApp.getCurrentAppAttempt().getAppAttemptId());
 
     ContainerId containerId = rmApp.getCurrentAppAttempt().getMasterContainer().getId();
@@ -1147,10 +1080,10 @@
     mockRM.waitForState(appId, RMAppState.ACCEPTED);
     RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId);
     mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
-            RMAppAttemptState.SCHEDULED);
+          RMAppAttemptState.SCHEDULED);
     MockNM nm = interceptor.getMockNMs().get(subClusterId);
     nm.nodeHeartbeat(true);
-    mockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED);
+    MockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED);
     mockRM.sendAMLaunched(rmApp.getCurrentAppAttempt().getAppAttemptId());
 
     MoveApplicationAcrossQueuesRequest acrossQueuesRequest =