YARN-11502. Refactor AMRMProxy#FederationInterceptor#registerApplicationMaster. (#5705)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 9460e70..ae6765c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -91,6 +91,7 @@
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
 import org.apache.hadoop.yarn.util.AsyncCallback;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -570,9 +571,12 @@
    * For the same reason, this method needs to be synchronized.
    */
   @Override
-  public synchronized RegisterApplicationMasterResponse
-      registerApplicationMaster(RegisterApplicationMasterRequest request)
-          throws YarnException, IOException {
+  public synchronized RegisterApplicationMasterResponse registerApplicationMaster(
+      RegisterApplicationMasterRequest request) throws YarnException, IOException {
+
+    if (request == null) {
+      throw new YarnException("RegisterApplicationMasterRequest can't be null!");
+    }
 
     // Reset the heartbeat responseId to zero upon register
     synchronized (this.lastAllocateResponseLock) {
@@ -590,18 +594,9 @@
       // Save the registration request. This will be used for registering with
       // secondary sub-clusters using UAMs, as well as re-register later
       this.amRegistrationRequest = request;
-      if (getNMStateStore() != null) {
-        try {
-          RegisterApplicationMasterRequestPBImpl pb =
-              (RegisterApplicationMasterRequestPBImpl)
-                  this.amRegistrationRequest;
-          getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId,
-              NMSS_REG_REQUEST_KEY, pb.getProto().toByteArray());
-        } catch (Exception e) {
-          LOG.error("Error storing AMRMProxy application context entry for "
-              + this.attemptId, e);
-        }
-      }
+      RegisterApplicationMasterRequestPBImpl requestPB = (RegisterApplicationMasterRequestPBImpl)
+          this.amRegistrationRequest;
+      storeAMRMProxyAppContextEntry(NMSS_REG_REQUEST_KEY, requestPB.getProto().toByteArray());
     }
 
     /*
@@ -625,48 +620,40 @@
      * is running and will breaks the elasticity feature. The registration with
      * the other sub-cluster RM will be done lazily as needed later.
      */
-    this.amRegistrationResponse =
-        this.homeRMRelayer.registerApplicationMaster(request);
-    if (this.amRegistrationResponse
-        .getContainersFromPreviousAttempts() != null) {
-      cacheAllocatedContainers(
-          this.amRegistrationResponse.getContainersFromPreviousAttempts(),
-          this.homeSubClusterId);
+    this.amRegistrationResponse = this.homeRMRelayer.registerApplicationMaster(request);
+
+    if (this.amRegistrationResponse == null) {
+      throw new YarnException("RegisterApplicationMasterResponse can't be null!");
+    }
+
+    List<Container> containersFromPreviousAttempts =
+        this.amRegistrationResponse.getContainersFromPreviousAttempts();
+    if (containersFromPreviousAttempts != null) {
+      cacheAllocatedContainers(containersFromPreviousAttempts, this.homeSubClusterId);
     }
 
     ApplicationId appId = this.attemptId.getApplicationId();
     reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId);
 
-    if (getNMStateStore() != null) {
-      try {
-        RegisterApplicationMasterResponsePBImpl pb =
-            (RegisterApplicationMasterResponsePBImpl)
-                this.amRegistrationResponse;
-        getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId,
-            NMSS_REG_RESPONSE_KEY, pb.getProto().toByteArray());
-      } catch (Exception e) {
-        LOG.error("Error storing AMRMProxy application context entry for "
-            + this.attemptId, e);
-      }
-    }
+    RegisterApplicationMasterResponsePBImpl responsePB = (RegisterApplicationMasterResponsePBImpl)
+        this.amRegistrationResponse;
+    storeAMRMProxyAppContextEntry(NMSS_REG_RESPONSE_KEY, responsePB.getProto().toByteArray());
 
     // the queue this application belongs will be used for getting
     // AMRMProxy policy from state store.
     String queue = this.amRegistrationResponse.getQueue();
     if (queue == null) {
-      LOG.warn("Received null queue for application " + appId
-          + " from home subcluster. Will use default queue name "
-          + YarnConfiguration.DEFAULT_QUEUE_NAME
-          + " for getting AMRMProxyPolicy");
+      LOG.warn("Received null queue for application {} from home subcluster. " +
+          " Will use default queue name {} for getting AMRMProxyPolicy.", appId,
+          YarnConfiguration.DEFAULT_QUEUE_NAME);
     } else {
-      LOG.info("Application " + appId + " belongs to queue " + queue);
+      LOG.info("Application {} belongs to queue {}.", appId, queue);
     }
 
     // Initialize the AMRMProxyPolicy
     try {
-      this.policyInterpreter =
-          FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
-              getConf(), this.federationFacade, this.homeSubClusterId);
+      this.policyInterpreter = FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
+          getConf(), this.federationFacade, this.homeSubClusterId);
     } catch (FederationPolicyInitializationException e) {
       throw new YarnRuntimeException(e);
     }
@@ -674,6 +661,24 @@
   }
 
   /**
+   * Add a context entry for an application attempt in AMRMProxyService.
+   *
+   * @param key key string
+   * @param data state data
+   */
+  private void storeAMRMProxyAppContextEntry(String key, byte[] data) {
+    NMStateStoreService nmStateStore = getNMStateStore();
+    if (nmStateStore != null) {
+      try {
+        nmStateStore.storeAMRMProxyAppContextEntry(this.attemptId, key, data);
+      } catch (Exception e) {
+        LOG.error("Error storing AMRMProxy application context entry[{}] for {}.",
+            key, this.attemptId, e);
+      }
+    }
+  }
+
+  /**
    * Sends the heart beats to the home RM and the secondary sub-cluster RMs that
    * are being used by the application.
    */
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index 51c1c5a..8661990 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -195,10 +195,8 @@
   private List<Container> getContainersAndAssert(int numberOfResourceRequests,
       int numberOfAllocationExcepted) throws Exception {
     AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
-    List<Container> containers =
-        new ArrayList<Container>(numberOfResourceRequests);
-    List<ResourceRequest> askList =
-        new ArrayList<ResourceRequest>(numberOfResourceRequests);
+    List<Container> containers = new ArrayList<>(numberOfResourceRequests);
+    List<ResourceRequest> askList = new ArrayList<>(numberOfResourceRequests);
     for (int id = 0; id < numberOfResourceRequests; id++) {
       askList.add(createResourceRequest("test-node-" + Integer.toString(id),
           6000, 2, id % 5, 1));
@@ -269,8 +267,8 @@
     List<ContainerId> newlyFinished = getCompletedContainerIds(
         allocateResponse.getCompletedContainersStatuses());
     containersForReleasedContainerIds.addAll(newlyFinished);
-    LOG.info("Number of containers received in the original request: "
-        + Integer.toString(newlyFinished.size()));
+    LOG.info("Number of containers received in the original request: {}",
+        newlyFinished.size());
 
     // Send max 10 heart beats to receive all the containers. If not, we will
     // fail the test
@@ -290,10 +288,9 @@
       newlyFinished = getCompletedContainerIds(
           allocateResponse.getCompletedContainersStatuses());
       containersForReleasedContainerIds.addAll(newlyFinished);
-      LOG.info("Number of containers received in this request: "
-          + Integer.toString(newlyFinished.size()));
-      LOG.info("Total number of containers received: "
-          + Integer.toString(containersForReleasedContainerIds.size()));
+      LOG.info("Number of containers received in this request: {}.", newlyFinished.size());
+      LOG.info("Total number of containers received: {}.",
+          containersForReleasedContainerIds.size());
       Thread.sleep(10);
     }
 
@@ -431,7 +428,7 @@
         FinishApplicationMasterResponse finishResponse =
             interceptor.finishApplicationMaster(finishReq);
         Assert.assertNotNull(finishResponse);
-        Assert.assertEquals(true, finishResponse.getIsUnregistered());
+        Assert.assertTrue(finishResponse.getIsUnregistered());
         return null;
       }
     });
@@ -624,7 +621,7 @@
         FinishApplicationMasterResponse finishResponse =
             interceptor.finishApplicationMaster(finishReq);
         Assert.assertNotNull(finishResponse);
-        Assert.assertEquals(true, finishResponse.getIsUnregistered());
+        Assert.assertTrue(finishResponse.getIsUnregistered());
 
         // After the application succeeds, the registry/NMSS entry should be
         // cleaned up
@@ -805,7 +802,7 @@
         Assert.assertEquals(0, interceptor.getTimedOutSCs(true).size());
 
         // Generate a duplicate heartbeat from AM, so that it won't really
-        // trigger an heartbeat to all SC
+        // trigger a heartbeat to all SC
         AllocateRequest allocateRequest =
             Records.newRecord(AllocateRequest.class);
         // Set to lastResponseId - 1 so that it will be considered a duplicate
@@ -904,7 +901,7 @@
         FinishApplicationMasterResponse finishResponse =
             interceptor.finishApplicationMaster(finishReq);
         Assert.assertNotNull(finishResponse);
-        Assert.assertEquals(true, finishResponse.getIsUnregistered());
+        Assert.assertTrue(finishResponse.getIsUnregistered());
 
         // After the application succeeds, the registry entry should be deleted
         if (interceptor.getRegistryClient() != null) {