YARN-11029. Refactor AMRMProxy Service code and Added Some Metrics. (#4650)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyMetrics.java
index 241eeeb..f5a3082 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyMetrics.java
@@ -45,6 +45,16 @@
   private MutableGaugeLong failedAllocateRequests;
   @Metric("# of failed application recoveries")
   private MutableGaugeLong failedAppRecoveryCount;
+  @Metric("# of failed application stop")
+  private MutableGaugeLong failedAppStopRequests;
+  @Metric("# of failed update token")
+  private MutableGaugeLong failedUpdateAMRMTokenRequests;
+  @Metric("# all allocate requests count")
+  private MutableGaugeLong allocateCount;
+  @Metric("# all requests count")
+  private MutableGaugeLong requestCount;
+
+
   // Aggregate metrics are shared, and don't have to be looked up per call
   @Metric("Application start request latency(ms)")
   private MutableRate totalSucceededAppStartRequests;
@@ -54,11 +64,22 @@
   private MutableRate totalSucceededFinishAMRequests;
   @Metric("Allocate latency(ms)")
   private MutableRate totalSucceededAllocateRequests;
+  @Metric("Application stop request latency(ms)")
+  private MutableRate totalSucceededAppStopRequests;
+  @Metric("Recover latency(ms)")
+  private MutableRate totalSucceededRecoverRequests;
+  @Metric("UpdateAMRMToken latency(ms)")
+  private MutableRate totalSucceededUpdateAMRMTokenRequests;
+
   // Quantile latency in ms - this is needed for SLA (95%, 99%, etc)
   private MutableQuantiles applicationStartLatency;
   private MutableQuantiles registerAMLatency;
   private MutableQuantiles finishAMLatency;
   private MutableQuantiles allocateLatency;
+  private MutableQuantiles recoverLatency;
+  private MutableQuantiles applicationStopLatency;
+  private MutableQuantiles updateAMRMTokenLatency;
+
   private static volatile AMRMProxyMetrics instance = null;
   private MetricsRegistry registry;
 
@@ -78,6 +99,15 @@
     allocateLatency = registry
         .newQuantiles("allocateLatency", "latency of allocate", "ops",
             "latency", 10);
+    applicationStopLatency = registry
+        .newQuantiles("applicationStopLatency", "latency of app stop", "ops",
+            "latency", 10);
+    recoverLatency = registry
+        .newQuantiles("recoverLatency", "latency of recover", "ops",
+            "latency", 10);
+    updateAMRMTokenLatency = registry
+        .newQuantiles("updateAMRMTokenLatency", "latency of update amrm token", "ops",
+            "latency", 10);
   }
 
   /**
@@ -147,15 +177,56 @@
   }
 
   @VisibleForTesting
+  long getNumSucceededAppStopRequests() {
+    return totalSucceededAppStopRequests.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  long getNumSucceededRecoverRequests() {
+    return totalSucceededRecoverRequests.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  long getNumSucceededUpdateAMRMTokenRequests() {
+    return totalSucceededUpdateAMRMTokenRequests.lastStat().numSamples();
+  }
+
+
+  @VisibleForTesting
   double getLatencySucceededAllocateRequests() {
     return totalSucceededAllocateRequests.lastStat().mean();
   }
 
+  @VisibleForTesting
+  double getLatencySucceededAppStopRequests() {
+    return totalSucceededAppStopRequests.lastStat().mean();
+  }
+
+  @VisibleForTesting
+  double getLatencySucceededRecoverRequests() {
+    return totalSucceededRecoverRequests.lastStat().mean();
+  }
+
   public void succeededAllocateRequests(long duration) {
     totalSucceededAllocateRequests.add(duration);
     allocateLatency.add(duration);
   }
 
+  public void succeededAppStopRequests(long duration) {
+    totalSucceededAppStopRequests.add(duration);
+    applicationStopLatency.add(duration);
+  }
+
+  public void succeededRecoverRequests(long duration) {
+    totalSucceededRecoverRequests.add(duration);
+    recoverLatency.add(duration);
+  }
+
+  public void succeededUpdateTokenRequests(long duration) {
+    totalSucceededUpdateAMRMTokenRequests.add(duration);
+    updateAMRMTokenLatency.add(duration);
+  }
+
   long getFailedAppStartRequests() {
     return failedAppStartRequests.value();
   }
@@ -195,4 +266,36 @@
   public void incrFailedAppRecoveryCount() {
     failedAppRecoveryCount.incr();
   }
+
+  long getFailedAppStopRequests() {
+    return failedAppStopRequests.value();
+  }
+
+  public void incrFailedAppStopRequests() {
+    failedAppStopRequests.incr();
+  }
+
+  long getFailedUpdateAMRMTokenRequests() {
+    return failedUpdateAMRMTokenRequests.value();
+  }
+
+  public void incrFailedUpdateAMRMTokenRequests() {
+    failedUpdateAMRMTokenRequests.incr();
+  }
+
+  public void incrAllocateCount() {
+    allocateCount.incr();
+  }
+
+  public void incrRequestCount() {
+    requestCount.incr();
+  }
+
+  long getAllocateCount() {
+    return allocateCount.value();
+  }
+
+  long getRequestCount() {
+    return requestCount.value();
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
index b0d66ca..82873e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
@@ -53,6 +54,7 @@
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -123,12 +125,9 @@
     Preconditions.checkArgument(dispatcher != null, "dispatcher is null");
     this.nmContext = nmContext;
     this.dispatcher = dispatcher;
-    this.applPipelineMap =
-        new ConcurrentHashMap<ApplicationId, RequestInterceptorChainWrapper>();
+    this.applPipelineMap = new ConcurrentHashMap<>();
 
-    this.dispatcher.register(ApplicationEventType.class,
-        new ApplicationEventHandler());
-
+    this.dispatcher.register(ApplicationEventType.class, new ApplicationEventHandler());
     metrics = AMRMProxyMetrics.getMetrics();
   }
 
@@ -155,7 +154,7 @@
 
   @Override
   protected void serviceStart() throws Exception {
-    LOG.info("Starting AMRMProxyService");
+    LOG.info("Starting AMRMProxyService.");
     Configuration conf = getConfig();
     YarnRPC rpc = YarnRPC.create(conf);
     UserGroupInformation.setConfiguration(conf);
@@ -182,27 +181,22 @@
             listenerEndpoint, serverConf, this.secretManager,
             numWorkerThreads);
 
-    if (conf
-        .getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
-            false)) {
-        this.server.refreshServiceAcl(conf, NMPolicyProvider.getInstance());
+    if (conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
+      this.server.refreshServiceAcl(conf, NMPolicyProvider.getInstance());
     }
 
     this.server.start();
-    LOG.info("AMRMProxyService listening on address: "
-        + this.server.getListenerAddress());
+    LOG.info("AMRMProxyService listening on address: {}.", this.server.getListenerAddress());
     super.serviceStart();
   }
 
   @Override
   protected void serviceStop() throws Exception {
-    LOG.info("Stopping AMRMProxyService");
+    LOG.info("Stopping AMRMProxyService.");
     if (this.server != null) {
       this.server.stop();
     }
-
     this.secretManager.stop();
-
     super.serviceStop();
   }
 
@@ -212,19 +206,21 @@
    * @throws IOException if recover fails
    */
   public void recover() throws IOException {
-    LOG.info("Recovering AMRMProxyService");
+    LOG.info("Recovering AMRMProxyService.");
 
     RecoveredAMRMProxyState state =
         this.nmContext.getNMStateStore().loadAMRMProxyState();
 
     this.secretManager.recover(state);
 
-    LOG.info("Recovering {} running applications for AMRMProxy",
+    LOG.info("Recovering {} running applications for AMRMProxy.",
         state.getAppContexts().size());
+
     for (Map.Entry<ApplicationAttemptId, Map<String, byte[]>> entry : state
         .getAppContexts().entrySet()) {
       ApplicationAttemptId attemptId = entry.getKey();
-      LOG.info("Recovering app attempt {}", attemptId);
+      LOG.info("Recovering app attempt {}.", attemptId);
+      long startTime = clock.getTime();
 
       // Try recover for the running application attempt
       try {
@@ -233,19 +229,18 @@
         for (Map.Entry<String, byte[]> contextEntry : entry.getValue()
             .entrySet()) {
           if (contextEntry.getKey().equals(NMSS_USER_KEY)) {
-            user = new String(contextEntry.getValue(), "UTF-8");
+            user = new String(contextEntry.getValue(), StandardCharsets.UTF_8);
           } else if (contextEntry.getKey().equals(NMSS_AMRMTOKEN_KEY)) {
             amrmToken = new Token<>();
             amrmToken.decodeFromUrlString(
-                new String(contextEntry.getValue(), "UTF-8"));
+                new String(contextEntry.getValue(), StandardCharsets.UTF_8));
             // Clear the service field, as if RM just issued the token
             amrmToken.setService(new Text());
           }
         }
 
         if (amrmToken == null) {
-          throw new IOException(
-              "No amrmToken found for app attempt " + attemptId);
+          throw new IOException("No amrmToken found for app attempt " + attemptId);
         }
         if (user == null) {
           throw new IOException("No user found for app attempt " + attemptId);
@@ -258,14 +253,14 @@
         // Retrieve the AM container credentials from NM context
         Credentials amCred = null;
         for (Container container : this.nmContext.getContainers().values()) {
-          LOG.debug("From NM Context container {}", container.getContainerId());
+          LOG.debug("From NM Context container {}.", container.getContainerId());
           if (container.getContainerId().getApplicationAttemptId().equals(
               attemptId) && container.getContainerTokenIdentifier() != null) {
-            LOG.debug("Container type {}",
+            LOG.debug("Container type {}.",
                 container.getContainerTokenIdentifier().getContainerType());
             if (container.getContainerTokenIdentifier()
                 .getContainerType() == ContainerType.APPLICATION_MASTER) {
-              LOG.info("AM container {} found in context, has credentials: {}",
+              LOG.info("AM container {} found in context, has credentials: {}.",
                   container.getContainerId(),
                   (container.getCredentials() != null));
               amCred = container.getCredentials();
@@ -274,15 +269,17 @@
         }
         if (amCred == null) {
           LOG.error("No credentials found for AM container of {}. "
-              + "Yarn registry access might not work", attemptId);
+              + "Yarn registry access might not work.", attemptId);
         }
 
-        // Create the intercepter pipeline for the AM
+        // Create the interceptor pipeline for the AM
         initializePipeline(attemptId, user, amrmToken, localToken,
             entry.getValue(), true, amCred);
+        long endTime = clock.getTime();
+        this.metrics.succeededRecoverRequests(endTime - startTime);
       } catch (Throwable e) {
-        LOG.error("Exception when recovering " + attemptId
-            + ", removing it from NMStateStore and move on", e);
+        LOG.error("Exception when recovering {}, removing it from NMStateStore and move on.",
+            attemptId, e);
         this.metrics.incrFailedAppRecoveryCount();
         this.nmContext.getNMStateStore().removeAMRMProxyAppContext(attemptId);
       }
@@ -292,26 +289,28 @@
   /**
    * This is called by the AMs started on this node to register with the RM.
    * This method does the initial authorization and then forwards the request to
-   * the application instance specific intercepter chain.
+   * the application instance specific interceptor chain.
    */
   @Override
   public RegisterApplicationMasterResponse registerApplicationMaster(
       RegisterApplicationMasterRequest request) throws YarnException,
       IOException {
+    this.metrics.incrRequestCount();
     long startTime = clock.getTime();
     try {
       RequestInterceptorChainWrapper pipeline =
           authorizeAndGetInterceptorChain();
-      LOG.info("Registering application master." + " Host:" + request.getHost()
-          + " Port:" + request.getRpcPort() + " Tracking Url:" + request
-          .getTrackingUrl() + " for application " + pipeline
-          .getApplicationAttemptId());
+
+      LOG.info("RegisteringAM Host: {}, Port: {}, Tracking Url: {} for application {}. ",
+          request.getHost(), request.getRpcPort(), request.getTrackingUrl(),
+          pipeline.getApplicationAttemptId());
+
       RegisterApplicationMasterResponse response =
           pipeline.getRootInterceptor().registerApplicationMaster(request);
 
       long endTime = clock.getTime();
       this.metrics.succeededRegisterAMRequests(endTime - startTime);
-      LOG.info("RegisterAM processing finished in {} ms for application {}",
+      LOG.info("RegisterAM processing finished in {} ms for application {}.",
           endTime - startTime, pipeline.getApplicationAttemptId());
       return response;
     } catch (Throwable t) {
@@ -323,24 +322,25 @@
   /**
    * This is called by the AMs started on this node to unregister from the RM.
    * This method does the initial authorization and then forwards the request to
-   * the application instance specific intercepter chain.
+   * the application instance specific interceptor chain.
    */
   @Override
   public FinishApplicationMasterResponse finishApplicationMaster(
       FinishApplicationMasterRequest request) throws YarnException,
       IOException {
+    this.metrics.incrRequestCount();
     long startTime = clock.getTime();
     try {
       RequestInterceptorChainWrapper pipeline =
           authorizeAndGetInterceptorChain();
-      LOG.info("Finishing application master for {}. Tracking Url: {}",
+      LOG.info("Finishing application master for {}. Tracking Url: {}.",
           pipeline.getApplicationAttemptId(), request.getTrackingUrl());
       FinishApplicationMasterResponse response =
           pipeline.getRootInterceptor().finishApplicationMaster(request);
 
       long endTime = clock.getTime();
       this.metrics.succeededFinishAMRequests(endTime - startTime);
-      LOG.info("FinishAM finished with isUnregistered = {} in {} ms for {}",
+      LOG.info("FinishAM finished with isUnregistered = {} in {} ms for {}.",
           response.getIsUnregistered(), endTime - startTime,
           pipeline.getApplicationAttemptId());
       return response;
@@ -354,12 +354,13 @@
    * This is called by the AMs started on this node to send heart beat to RM.
    * This method does the initial authorization and then forwards the request to
    * the application instance specific pipeline, which is a chain of request
-   * intercepter objects. One application request processing pipeline is created
+   * interceptor objects. One application request processing pipeline is created
    * per AM instance.
    */
   @Override
   public AllocateResponse allocate(AllocateRequest request)
       throws YarnException, IOException {
+    this.metrics.incrAllocateCount();
     long startTime = clock.getTime();
     try {
       AMRMTokenIdentifier amrmTokenIdentifier =
@@ -373,7 +374,7 @@
 
       long endTime = clock.getTime();
       this.metrics.succeededAllocateRequests(endTime - startTime);
-      LOG.info("Allocate processing finished in {} ms for application {}",
+      LOG.info("Allocate processing finished in {} ms for application {}.",
           endTime - startTime, pipeline.getApplicationAttemptId());
       return allocateResponse;
     } catch (Throwable t) {
@@ -392,6 +393,7 @@
    */
   public void processApplicationStartRequest(StartContainerRequest request)
       throws IOException, YarnException {
+    this.metrics.incrRequestCount();
     long startTime = clock.getTime();
     try {
       ContainerTokenIdentifier containerTokenIdentifierForKey =
@@ -408,8 +410,7 @@
       if (!checkIfAppExistsInStateStore(applicationID)) {
         return;
       }
-      LOG.info("Callback received for initializing request "
-          + "processing pipeline for an AM");
+      LOG.info("Callback received for initializing request processing pipeline for an AM.");
       Credentials credentials = YarnServerSecurityUtils
           .parseCredentials(request.getContainerLaunchContext());
 
@@ -417,8 +418,7 @@
           getFirstAMRMToken(credentials.getAllTokens());
       if (amrmToken == null) {
         throw new YarnRuntimeException(
-            "AMRMToken not found in the start container request for "
-                + "application:" + appAttemptId.toString());
+            "AMRMToken not found in the start container request for application:" + appAttemptId);
       }
 
       // Substitute the existing AMRM Token with a local one. Keep the rest of
@@ -445,7 +445,7 @@
   }
 
   /**
-   * Initializes the request intercepter pipeline for the specified application.
+   * Initializes the request interceptor pipeline for the specified application.
    *
    * @param applicationAttemptId attempt id
    * @param user user name
@@ -465,8 +465,7 @@
           .containsKey(applicationAttemptId.getApplicationId())) {
         LOG.warn("Request to start an already existing appId was received. "
             + " This can happen if an application failed and a new attempt "
-            + "was created on this machine.  ApplicationId: "
-            + applicationAttemptId.toString());
+            + "was created on this machine.  ApplicationId: {}.", applicationAttemptId);
 
         RequestInterceptorChainWrapper chainWrapperBackup =
             this.applPipelineMap.get(applicationAttemptId.getApplicationId());
@@ -476,8 +475,7 @@
                 .equals(applicationAttemptId)) {
           // TODO: revisit in AMRMProxy HA in YARN-6128
           // Remove the existing pipeline
-          LOG.info("Remove the previous pipeline for ApplicationId: "
-              + applicationAttemptId.toString());
+          LOG.info("Remove the previous pipeline for ApplicationId: {}.", applicationAttemptId);
           RequestInterceptorChainWrapper pipeline =
               applPipelineMap.remove(applicationAttemptId.getApplicationId());
 
@@ -485,19 +483,17 @@
             try {
               this.nmContext.getNMStateStore()
                   .removeAMRMProxyAppContext(applicationAttemptId);
-            } catch (IOException e) {
-              LOG.error("Error removing AMRMProxy application context for "
-                  + applicationAttemptId, e);
+            } catch (IOException ioe) {
+              LOG.error("Error removing AMRMProxy application context for {}.",
+                  applicationAttemptId, ioe);
             }
           }
 
           try {
             pipeline.getRootInterceptor().shutdown();
           } catch (Throwable ex) {
-            LOG.warn(
-                "Failed to shutdown the request processing pipeline for app:"
-                    + applicationAttemptId.getApplicationId(),
-                ex);
+            LOG.warn("Failed to shutdown the request processing pipeline for app: {}.",
+                applicationAttemptId.getApplicationId(), ex);
           }
         } else {
           return;
@@ -510,12 +506,11 @@
     }
 
     // We register the pipeline instance in the map first and then initialize it
-    // later because chain initialization can be expensive and we would like to
+    // later because chain initialization can be expensive, and we would like to
     // release the lock as soon as possible to prevent other applications from
     // blocking when one application's chain is initializing
     LOG.info("Initializing request processing pipeline for application. "
-        + " ApplicationId:" + applicationAttemptId + " for the user: "
-        + user);
+        + " ApplicationId: {} for the user: {}.", applicationAttemptId, user);
 
     try {
       RequestInterceptor interceptorChain =
@@ -525,8 +520,7 @@
               user, amrmToken, localToken, credentials, this.registry));
       if (isRecovery) {
         if (recoveredDataMap == null) {
-          throw new YarnRuntimeException(
-              "null recoveredDataMap recieved for recover");
+          throw new YarnRuntimeException("null recoveredDataMap received for recover");
         }
         interceptorChain.recover(recoveredDataMap);
       }
@@ -535,13 +529,13 @@
       if (!isRecovery && this.nmContext.getNMStateStore() != null) {
         try {
           this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry(
-              applicationAttemptId, NMSS_USER_KEY, user.getBytes("UTF-8"));
+              applicationAttemptId, NMSS_USER_KEY, user.getBytes(StandardCharsets.UTF_8));
           this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry(
               applicationAttemptId, NMSS_AMRMTOKEN_KEY,
-              amrmToken.encodeToUrlString().getBytes("UTF-8"));
+              amrmToken.encodeToUrlString().getBytes(StandardCharsets.UTF_8));
         } catch (IOException e) {
-          LOG.error("Error storing AMRMProxy application context entry for "
-              + applicationAttemptId, e);
+          LOG.error("Error storing AMRMProxy application context entry for {}.",
+              applicationAttemptId, e);
         }
       }
     } catch (Exception e) {
@@ -557,29 +551,27 @@
    * @param applicationId application id
    */
   protected void stopApplication(ApplicationId applicationId) {
-    Preconditions.checkArgument(applicationId != null,
-        "applicationId is null");
+    this.metrics.incrRequestCount();
+    Preconditions.checkArgument(applicationId != null, "applicationId is null");
     RequestInterceptorChainWrapper pipeline =
         this.applPipelineMap.remove(applicationId);
+    boolean isStopSuccess = true;
+    long startTime = clock.getTime();
 
     if (pipeline == null) {
-      LOG.info(
-          "No interceptor pipeline for application {},"
-              + " likely because its AM is not run in this node.",
-          applicationId);
+      LOG.info("No interceptor pipeline for application {},"
+          + " likely because its AM is not run in this node.", applicationId);
+      isStopSuccess = false;
     } else {
       // Remove the appAttempt in AMRMTokenSecretManager
-      this.secretManager
-          .applicationMasterFinished(pipeline.getApplicationAttemptId());
-
-      LOG.info("Stopping the request processing pipeline for application: "
-          + applicationId);
+      this.secretManager.applicationMasterFinished(pipeline.getApplicationAttemptId());
+      LOG.info("Stopping the request processing pipeline for application: {}.", applicationId);
       try {
         pipeline.getRootInterceptor().shutdown();
       } catch (Throwable ex) {
-        LOG.warn(
-            "Failed to shutdown the request processing pipeline for app:"
-                + applicationId, ex);
+        LOG.warn("Failed to shutdown the request processing pipeline for app: {}.",
+            applicationId, ex);
+        isStopSuccess = false;
       }
 
       // Remove the app context from NMSS after the interceptors are shutdown
@@ -588,74 +580,83 @@
           this.nmContext.getNMStateStore()
               .removeAMRMProxyAppContext(pipeline.getApplicationAttemptId());
         } catch (IOException e) {
-          LOG.error("Error removing AMRMProxy application context for "
-              + applicationId, e);
+          LOG.error("Error removing AMRMProxy application context for {}.",
+              applicationId, e);
+          isStopSuccess = false;
         }
       }
     }
+
+    if (isStopSuccess) {
+      long endTime = clock.getTime();
+      this.metrics.succeededAppStopRequests(endTime - startTime);
+    } else {
+      this.metrics.incrFailedAppStopRequests();
+    }
   }
 
   private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier,
       RequestInterceptorChainWrapper pipeline,
       AllocateResponse allocateResponse) {
+
     AMRMProxyApplicationContextImpl context =
-        (AMRMProxyApplicationContextImpl) pipeline.getRootInterceptor()
-            .getApplicationContext();
+        (AMRMProxyApplicationContextImpl) pipeline.getRootInterceptor().getApplicationContext();
 
-    // check to see if the RM has issued a new AMRMToken & accordingly update
-    // the real ARMRMToken in the current context
-    if (allocateResponse.getAMRMToken() != null) {
-      LOG.info("RM rolled master-key for amrm-tokens");
+    try {
+      long startTime = clock.getTime();
 
-      org.apache.hadoop.yarn.api.records.Token token =
-          allocateResponse.getAMRMToken();
+      // check to see if the RM has issued a new AMRMToken & accordingly update
+      // the real ARMRMToken in the current context
+      if (allocateResponse.getAMRMToken() != null) {
+        LOG.info("RM rolled master-key for amrm-tokens.");
 
-      // Do not propagate this info back to AM
-      allocateResponse.setAMRMToken(null);
+        org.apache.hadoop.yarn.api.records.Token token = allocateResponse.getAMRMToken();
 
-      org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newToken =
-          ConverterUtils.convertFromYarn(token, (Text) null);
+        // Do not propagate this info back to AM
+        allocateResponse.setAMRMToken(null);
 
-      // Update the AMRMToken in context map, and in NM state store if it is
-      // different
-      if (context.setAMRMToken(newToken)
-          && this.nmContext.getNMStateStore() != null) {
-        try {
+        org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newToken =
+                ConverterUtils.convertFromYarn(token, (Text) null);
+
+        // Update the AMRMToken in context map, and in NM state store if it is
+        // different
+        if (context.setAMRMToken(newToken) && this.nmContext.getNMStateStore() != null) {
           this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry(
               context.getApplicationAttemptId(), NMSS_AMRMTOKEN_KEY,
-              newToken.encodeToUrlString().getBytes("UTF-8"));
-        } catch (IOException e) {
-          LOG.error("Error storing AMRMProxy application context entry for "
-              + context.getApplicationAttemptId(), e);
+              newToken.encodeToUrlString().getBytes(StandardCharsets.UTF_8));
         }
       }
-    }
 
-    // Check if the local AMRMToken is rolled up and update the context and
-    // response accordingly
-    MasterKeyData nextMasterKey =
-        this.secretManager.getNextMasterKeyData();
+      // Check if the local AMRMToken is rolled up and update the context and
+      // response accordingly
+      MasterKeyData nextMasterKey = this.secretManager.getNextMasterKeyData();
 
-    if (nextMasterKey != null
-        && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
-            .getKeyId()) {
-      Token<AMRMTokenIdentifier> localToken = context.getLocalAMRMToken();
-      if (nextMasterKey.getMasterKey().getKeyId() != context
-          .getLocalAMRMTokenKeyId()) {
-        LOG.info("The local AMRMToken has been rolled-over."
-            + " Send new local AMRMToken back to application: "
-            + pipeline.getApplicationId());
-        localToken =
-            this.secretManager.createAndGetAMRMToken(pipeline
-                .getApplicationAttemptId());
-        context.setLocalAMRMToken(localToken);
+      if (nextMasterKey != null) {
+        MasterKey masterKey = nextMasterKey.getMasterKey();
+        if (masterKey.getKeyId() != amrmTokenIdentifier.getKeyId()) {
+          Token<AMRMTokenIdentifier> localToken = context.getLocalAMRMToken();
+          if (masterKey.getKeyId() != context.getLocalAMRMTokenKeyId()) {
+            LOG.info("The local AMRMToken has been rolled-over."
+                + " Send new local AMRMToken back to application: {}",
+                pipeline.getApplicationId());
+            localToken = this.secretManager.createAndGetAMRMToken(
+                pipeline.getApplicationAttemptId());
+            context.setLocalAMRMToken(localToken);
+          }
+
+          allocateResponse
+              .setAMRMToken(org.apache.hadoop.yarn.api.records.Token
+                  .newInstance(localToken.getIdentifier(), localToken
+                      .getKind().toString(), localToken.getPassword(),
+                      localToken.getService().toString()));
+        }
       }
-
-      allocateResponse
-          .setAMRMToken(org.apache.hadoop.yarn.api.records.Token
-              .newInstance(localToken.getIdentifier(), localToken
-                  .getKind().toString(), localToken.getPassword(),
-                  localToken.getService().toString()));
+      long endTime = clock.getTime();
+      this.metrics.succeededUpdateTokenRequests(endTime - startTime);
+    } catch (IOException e) {
+      LOG.error("Error storing AMRMProxy application context entry for {}.",
+          context.getApplicationAttemptId(), e);
+      this.metrics.incrFailedUpdateAMRMTokenRequests();
     }
   }
 
@@ -672,19 +673,19 @@
   }
 
   /**
-   * Gets the Request intercepter chains for all the applications.
+   * Gets the Request interceptor chains for all the applications.
    *
-   * @return the request intercepter chains.
+   * @return the request interceptor chains.
    */
   protected Map<ApplicationId, RequestInterceptorChainWrapper> getPipelines() {
     return this.applPipelineMap;
   }
 
   /**
-   * This method creates and returns reference of the first intercepter in the
-   * chain of request intercepter instances.
+   * This method creates and returns reference of the first interceptor in the
+   * chain of request interceptor instances.
    *
-   * @return the reference of the first intercepter in the chain
+   * @return the reference of the first interceptor in the chain
    */
   protected RequestInterceptor createRequestInterceptorChain() {
     Configuration conf = getConfig();
@@ -717,7 +718,7 @@
       } catch (ClassNotFoundException e) {
         throw new YarnRuntimeException(
             "Could not instantiate ApplicationMasterRequestInterceptor: "
-                + interceptorClassName, e);
+            + interceptorClassName, e);
       }
     }
 
@@ -729,10 +730,10 @@
   }
 
   /**
-   * Returns the comma separated intercepter class names from the configuration.
+   * Returns the comma separated interceptor class names from the configuration.
    *
    * @param conf configuration
-   * @return the intercepter class names as an instance of ArrayList
+   * @return the interceptor class names as an instance of ArrayList
    */
   private List<String> getInterceptorClassNames(Configuration conf) {
     String configuredInterceptorClassNames =
@@ -759,7 +760,7 @@
    * Authorizes the request and returns the application specific request
    * processing pipeline.
    *
-   * @return the the intercepter wrapper instance
+   * @return the interceptor wrapper instance
    * @throws YarnException if fails
    */
   private RequestInterceptorChainWrapper authorizeAndGetInterceptorChain()
@@ -775,11 +776,10 @@
         tokenIdentifier.getApplicationAttemptId();
 
     synchronized (this.applPipelineMap) {
-      if (!this.applPipelineMap.containsKey(appAttemptId
-          .getApplicationId())) {
+      if (!this.applPipelineMap.containsKey(appAttemptId.getApplicationId())) {
         throw new YarnException(
             "The AM request processing pipeline is not initialized for app: "
-                + appAttemptId.getApplicationId().toString());
+            + appAttemptId.getApplicationId());
       }
 
       return this.applPipelineMap.get(appAttemptId.getApplicationId());
@@ -827,29 +827,26 @@
 
   /**
    * Private class for handling application stop events.
-   *
    */
   class ApplicationEventHandler implements EventHandler<ApplicationEvent> {
 
     @Override
     public void handle(ApplicationEvent event) {
       Application app =
-          AMRMProxyService.this.nmContext.getApplications().get(
-              event.getApplicationID());
+          AMRMProxyService.this.nmContext.getApplications().get(event.getApplicationID());
       if (app != null) {
         switch (event.getType()) {
         case APPLICATION_RESOURCES_CLEANEDUP:
-          LOG.info("Application stop event received for stopping AppId:"
-              + event.getApplicationID().toString());
+          LOG.info("Application stop event received for stopping AppId: {}.",
+              event.getApplicationID().toString());
           AMRMProxyService.this.stopApplication(event.getApplicationID());
           break;
         default:
-          LOG.debug("AMRMProxy is ignoring event: {}", event.getType());
+          LOG.debug("AMRMProxy is ignoring event: {}.", event.getType());
           break;
         }
       } else {
-        LOG.warn("Event " + event + " sent to absent application "
-            + event.getApplicationID());
+        LOG.warn("Event {} sent to absent application {}.", event, event.getApplicationID());
       }
     }
   }
@@ -866,20 +863,20 @@
 
     /**
      * Initializes the wrapper with the specified parameters.
-     * 
-     * @param rootInterceptor the root request intercepter
-     * @param applicationAttemptId attempt id
+     *
+     * @param interceptor the root request interceptor
+     * @param appAttemptId attempt id
      */
-    public synchronized void init(RequestInterceptor rootInterceptor,
-        ApplicationAttemptId applicationAttemptId) {
-      this.rootInterceptor = rootInterceptor;
-      this.applicationAttemptId = applicationAttemptId;
+    public synchronized void init(RequestInterceptor interceptor,
+        ApplicationAttemptId appAttemptId) {
+      rootInterceptor = interceptor;
+      applicationAttemptId = appAttemptId;
     }
 
     /**
-     * Gets the root request intercepter.
+     * Gets the root request interceptor.
      *
-     * @return the root request intercepter
+     * @return the root request interceptor
      */
     public synchronized RequestInterceptor getRootInterceptor() {
       return rootInterceptor;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/package-info.java
new file mode 100644
index 0000000..2f20d98
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.nodemanager.amrmproxy contains
+ * classes for handling federation amrm information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyMetrics.java
index 4621c4d..6219641 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyMetrics.java
@@ -42,11 +42,19 @@
     Assert.assertEquals(0, metrics.getFailedRegisterAMRequests());
     Assert.assertEquals(0, metrics.getFailedFinishAMRequests());
     Assert.assertEquals(0, metrics.getFailedAllocateRequests());
+    Assert.assertEquals(0, metrics.getFailedAppRecoveryCount());
+    Assert.assertEquals(0, metrics.getFailedAppStopRequests());
+    Assert.assertEquals(0, metrics.getFailedUpdateAMRMTokenRequests());
+    Assert.assertEquals(0, metrics.getAllocateCount());
+    Assert.assertEquals(0, metrics.getRequestCount());
 
     Assert.assertEquals(0, metrics.getNumSucceededAppStartRequests());
     Assert.assertEquals(0, metrics.getNumSucceededRegisterAMRequests());
     Assert.assertEquals(0, metrics.getNumSucceededFinishAMRequests());
     Assert.assertEquals(0, metrics.getNumSucceededAllocateRequests());
+    Assert.assertEquals(0, metrics.getNumSucceededRecoverRequests());
+    Assert.assertEquals(0, metrics.getNumSucceededAppStopRequests());
+    Assert.assertEquals(0, metrics.getNumSucceededUpdateAMRMTokenRequests());
 
     LOG.info("Test: aggregate metrics are updated correctly");
   }
@@ -57,19 +65,19 @@
     long failedRegisterAMRequests = metrics.getFailedRegisterAMRequests();
     long failedFinishAMRequests = metrics.getFailedFinishAMRequests();
     long failedAllocateRequests = metrics.getFailedAllocateRequests();
+    long failedAppRecoveryRequests = metrics.getFailedAppRecoveryCount();
+    long failedAppStopRequests = metrics.getFailedAppStopRequests();
+    long failedUpdateAMRMTokenRequests = metrics.getFailedUpdateAMRMTokenRequests();
 
     long succeededAppStartRequests = metrics.getNumSucceededAppStartRequests();
-    long succeededRegisterAMRequests =
-        metrics.getNumSucceededRegisterAMRequests();
+    long succeededRegisterAMRequests = metrics.getNumSucceededRegisterAMRequests();
     long succeededFinishAMRequests = metrics.getNumSucceededFinishAMRequests();
     long succeededAllocateRequests = metrics.getNumSucceededAllocateRequests();
 
     int testAppId = 1;
-    RegisterApplicationMasterResponse registerResponse =
-        registerApplicationMaster(testAppId);
+    RegisterApplicationMasterResponse registerResponse = registerApplicationMaster(testAppId);
     Assert.assertNotNull(registerResponse);
-    Assert
-        .assertEquals(Integer.toString(testAppId), registerResponse.getQueue());
+    Assert.assertEquals(Integer.toString(testAppId), registerResponse.getQueue());
 
     AllocateResponse allocateResponse = allocate(testAppId);
     Assert.assertNotNull(allocateResponse);
@@ -80,14 +88,13 @@
     Assert.assertNotNull(finshResponse);
     Assert.assertEquals(true, finshResponse.getIsUnregistered());
 
-    Assert.assertEquals(failedAppStartRequests,
-        metrics.getFailedAppStartRequests());
-    Assert.assertEquals(failedRegisterAMRequests,
-        metrics.getFailedRegisterAMRequests());
-    Assert.assertEquals(failedFinishAMRequests,
-        metrics.getFailedFinishAMRequests());
-    Assert.assertEquals(failedAllocateRequests,
-        metrics.getFailedAllocateRequests());
+    Assert.assertEquals(failedAppStartRequests, metrics.getFailedAppStartRequests());
+    Assert.assertEquals(failedRegisterAMRequests, metrics.getFailedRegisterAMRequests());
+    Assert.assertEquals(failedFinishAMRequests, metrics.getFailedFinishAMRequests());
+    Assert.assertEquals(failedAllocateRequests, metrics.getFailedAllocateRequests());
+    Assert.assertEquals(failedAppRecoveryRequests, metrics.getFailedAppRecoveryCount());
+    Assert.assertEquals(failedAppStopRequests, metrics.getFailedAppStopRequests());
+    Assert.assertEquals(failedUpdateAMRMTokenRequests, metrics.getFailedUpdateAMRMTokenRequests());
 
     Assert.assertEquals(succeededAppStartRequests,
         metrics.getNumSucceededAppStartRequests());