YARN-11502. Refactor AMRMProxy#FederationInterceptor#registerApplicationMaster. (#5705)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 9460e70..ae6765c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -91,6 +91,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
import org.apache.hadoop.yarn.util.AsyncCallback;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -570,9 +571,12 @@
* For the same reason, this method needs to be synchronized.
*/
@Override
- public synchronized RegisterApplicationMasterResponse
- registerApplicationMaster(RegisterApplicationMasterRequest request)
- throws YarnException, IOException {
+ public synchronized RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request) throws YarnException, IOException {
+
+ if (request == null) {
+ throw new YarnException("RegisterApplicationMasterRequest can't be null!");
+ }
// Reset the heartbeat responseId to zero upon register
synchronized (this.lastAllocateResponseLock) {
@@ -590,18 +594,9 @@
// Save the registration request. This will be used for registering with
// secondary sub-clusters using UAMs, as well as re-register later
this.amRegistrationRequest = request;
- if (getNMStateStore() != null) {
- try {
- RegisterApplicationMasterRequestPBImpl pb =
- (RegisterApplicationMasterRequestPBImpl)
- this.amRegistrationRequest;
- getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId,
- NMSS_REG_REQUEST_KEY, pb.getProto().toByteArray());
- } catch (Exception e) {
- LOG.error("Error storing AMRMProxy application context entry for "
- + this.attemptId, e);
- }
- }
+ RegisterApplicationMasterRequestPBImpl requestPB = (RegisterApplicationMasterRequestPBImpl)
+ this.amRegistrationRequest;
+ storeAMRMProxyAppContextEntry(NMSS_REG_REQUEST_KEY, requestPB.getProto().toByteArray());
}
/*
@@ -625,48 +620,40 @@
* is running and will breaks the elasticity feature. The registration with
* the other sub-cluster RM will be done lazily as needed later.
*/
- this.amRegistrationResponse =
- this.homeRMRelayer.registerApplicationMaster(request);
- if (this.amRegistrationResponse
- .getContainersFromPreviousAttempts() != null) {
- cacheAllocatedContainers(
- this.amRegistrationResponse.getContainersFromPreviousAttempts(),
- this.homeSubClusterId);
+ this.amRegistrationResponse = this.homeRMRelayer.registerApplicationMaster(request);
+
+ if (this.amRegistrationResponse == null) {
+ throw new YarnException("RegisterApplicationMasterResponse can't be null!");
+ }
+
+ List<Container> containersFromPreviousAttempts =
+ this.amRegistrationResponse.getContainersFromPreviousAttempts();
+ if (containersFromPreviousAttempts != null) {
+ cacheAllocatedContainers(containersFromPreviousAttempts, this.homeSubClusterId);
}
ApplicationId appId = this.attemptId.getApplicationId();
reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId);
- if (getNMStateStore() != null) {
- try {
- RegisterApplicationMasterResponsePBImpl pb =
- (RegisterApplicationMasterResponsePBImpl)
- this.amRegistrationResponse;
- getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId,
- NMSS_REG_RESPONSE_KEY, pb.getProto().toByteArray());
- } catch (Exception e) {
- LOG.error("Error storing AMRMProxy application context entry for "
- + this.attemptId, e);
- }
- }
+ RegisterApplicationMasterResponsePBImpl responsePB = (RegisterApplicationMasterResponsePBImpl)
+ this.amRegistrationResponse;
+ storeAMRMProxyAppContextEntry(NMSS_REG_RESPONSE_KEY, responsePB.getProto().toByteArray());
// the queue this application belongs will be used for getting
// AMRMProxy policy from state store.
String queue = this.amRegistrationResponse.getQueue();
if (queue == null) {
- LOG.warn("Received null queue for application " + appId
- + " from home subcluster. Will use default queue name "
- + YarnConfiguration.DEFAULT_QUEUE_NAME
- + " for getting AMRMProxyPolicy");
+ LOG.warn("Received null queue for application {} from home subcluster. " +
+ " Will use default queue name {} for getting AMRMProxyPolicy.", appId,
+ YarnConfiguration.DEFAULT_QUEUE_NAME);
} else {
- LOG.info("Application " + appId + " belongs to queue " + queue);
+ LOG.info("Application {} belongs to queue {}.", appId, queue);
}
// Initialize the AMRMProxyPolicy
try {
- this.policyInterpreter =
- FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
- getConf(), this.federationFacade, this.homeSubClusterId);
+ this.policyInterpreter = FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
+ getConf(), this.federationFacade, this.homeSubClusterId);
} catch (FederationPolicyInitializationException e) {
throw new YarnRuntimeException(e);
}
@@ -674,6 +661,24 @@
}
/**
+ * Add a context entry for an application attempt in AMRMProxyService.
+ *
+ * @param key key string
+ * @param data state data
+ */
+ private void storeAMRMProxyAppContextEntry(String key, byte[] data) {
+ NMStateStoreService nmStateStore = getNMStateStore();
+ if (nmStateStore != null) {
+ try {
+ nmStateStore.storeAMRMProxyAppContextEntry(this.attemptId, key, data);
+ } catch (Exception e) {
+ LOG.error("Error storing AMRMProxy application context entry[{}] for {}.",
+ key, this.attemptId, e);
+ }
+ }
+ }
+
+ /**
* Sends the heart beats to the home RM and the secondary sub-cluster RMs that
* are being used by the application.
*/
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index 51c1c5a..8661990 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -195,10 +195,8 @@
private List<Container> getContainersAndAssert(int numberOfResourceRequests,
int numberOfAllocationExcepted) throws Exception {
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
- List<Container> containers =
- new ArrayList<Container>(numberOfResourceRequests);
- List<ResourceRequest> askList =
- new ArrayList<ResourceRequest>(numberOfResourceRequests);
+ List<Container> containers = new ArrayList<>(numberOfResourceRequests);
+ List<ResourceRequest> askList = new ArrayList<>(numberOfResourceRequests);
for (int id = 0; id < numberOfResourceRequests; id++) {
askList.add(createResourceRequest("test-node-" + Integer.toString(id),
6000, 2, id % 5, 1));
@@ -269,8 +267,8 @@
List<ContainerId> newlyFinished = getCompletedContainerIds(
allocateResponse.getCompletedContainersStatuses());
containersForReleasedContainerIds.addAll(newlyFinished);
- LOG.info("Number of containers received in the original request: "
- + Integer.toString(newlyFinished.size()));
+ LOG.info("Number of containers received in the original request: {}",
+ newlyFinished.size());
// Send max 10 heart beats to receive all the containers. If not, we will
// fail the test
@@ -290,10 +288,9 @@
newlyFinished = getCompletedContainerIds(
allocateResponse.getCompletedContainersStatuses());
containersForReleasedContainerIds.addAll(newlyFinished);
- LOG.info("Number of containers received in this request: "
- + Integer.toString(newlyFinished.size()));
- LOG.info("Total number of containers received: "
- + Integer.toString(containersForReleasedContainerIds.size()));
+ LOG.info("Number of containers received in this request: {}.", newlyFinished.size());
+ LOG.info("Total number of containers received: {}.",
+ containersForReleasedContainerIds.size());
Thread.sleep(10);
}
@@ -431,7 +428,7 @@
FinishApplicationMasterResponse finishResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finishResponse);
- Assert.assertEquals(true, finishResponse.getIsUnregistered());
+ Assert.assertTrue(finishResponse.getIsUnregistered());
return null;
}
});
@@ -624,7 +621,7 @@
FinishApplicationMasterResponse finishResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finishResponse);
- Assert.assertEquals(true, finishResponse.getIsUnregistered());
+ Assert.assertTrue(finishResponse.getIsUnregistered());
// After the application succeeds, the registry/NMSS entry should be
// cleaned up
@@ -805,7 +802,7 @@
Assert.assertEquals(0, interceptor.getTimedOutSCs(true).size());
// Generate a duplicate heartbeat from AM, so that it won't really
- // trigger an heartbeat to all SC
+ // trigger a heartbeat to all SC
AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class);
// Set to lastResponseId - 1 so that it will be considered a duplicate
@@ -904,7 +901,7 @@
FinishApplicationMasterResponse finishResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finishResponse);
- Assert.assertEquals(true, finishResponse.getIsUnregistered());
+ Assert.assertTrue(finishResponse.getIsUnregistered());
// After the application succeeds, the registry entry should be deleted
if (interceptor.getRegistryClient() != null) {