YARN-11029. Refactor AMRMProxy Service code and Added Some Metrics. (#4650)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyMetrics.java
index 241eeeb..f5a3082 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyMetrics.java
@@ -45,6 +45,16 @@
private MutableGaugeLong failedAllocateRequests;
@Metric("# of failed application recoveries")
private MutableGaugeLong failedAppRecoveryCount;
+ @Metric("# of failed application stop")
+ private MutableGaugeLong failedAppStopRequests;
+ @Metric("# of failed update token")
+ private MutableGaugeLong failedUpdateAMRMTokenRequests;
+ @Metric("# all allocate requests count")
+ private MutableGaugeLong allocateCount;
+ @Metric("# all requests count")
+ private MutableGaugeLong requestCount;
+
+
// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Application start request latency(ms)")
private MutableRate totalSucceededAppStartRequests;
@@ -54,11 +64,22 @@
private MutableRate totalSucceededFinishAMRequests;
@Metric("Allocate latency(ms)")
private MutableRate totalSucceededAllocateRequests;
+ @Metric("Application stop request latency(ms)")
+ private MutableRate totalSucceededAppStopRequests;
+ @Metric("Recover latency(ms)")
+ private MutableRate totalSucceededRecoverRequests;
+ @Metric("UpdateAMRMToken latency(ms)")
+ private MutableRate totalSucceededUpdateAMRMTokenRequests;
+
// Quantile latency in ms - this is needed for SLA (95%, 99%, etc)
private MutableQuantiles applicationStartLatency;
private MutableQuantiles registerAMLatency;
private MutableQuantiles finishAMLatency;
private MutableQuantiles allocateLatency;
+ private MutableQuantiles recoverLatency;
+ private MutableQuantiles applicationStopLatency;
+ private MutableQuantiles updateAMRMTokenLatency;
+
private static volatile AMRMProxyMetrics instance = null;
private MetricsRegistry registry;
@@ -78,6 +99,15 @@
allocateLatency = registry
.newQuantiles("allocateLatency", "latency of allocate", "ops",
"latency", 10);
+ applicationStopLatency = registry
+ .newQuantiles("applicationStopLatency", "latency of app stop", "ops",
+ "latency", 10);
+ recoverLatency = registry
+ .newQuantiles("recoverLatency", "latency of recover", "ops",
+ "latency", 10);
+ updateAMRMTokenLatency = registry
+ .newQuantiles("updateAMRMTokenLatency", "latency of update amrm token", "ops",
+ "latency", 10);
}
/**
@@ -147,15 +177,56 @@
}
@VisibleForTesting
+ long getNumSucceededAppStopRequests() {
+ return totalSucceededAppStopRequests.lastStat().numSamples();
+ }
+
+ @VisibleForTesting
+ long getNumSucceededRecoverRequests() {
+ return totalSucceededRecoverRequests.lastStat().numSamples();
+ }
+
+ @VisibleForTesting
+ long getNumSucceededUpdateAMRMTokenRequests() {
+ return totalSucceededUpdateAMRMTokenRequests.lastStat().numSamples();
+ }
+
+
+ @VisibleForTesting
double getLatencySucceededAllocateRequests() {
return totalSucceededAllocateRequests.lastStat().mean();
}
+ @VisibleForTesting
+ double getLatencySucceededAppStopRequests() {
+ return totalSucceededAppStopRequests.lastStat().mean();
+ }
+
+ @VisibleForTesting
+ double getLatencySucceededRecoverRequests() {
+ return totalSucceededRecoverRequests.lastStat().mean();
+ }
+
public void succeededAllocateRequests(long duration) {
totalSucceededAllocateRequests.add(duration);
allocateLatency.add(duration);
}
+ public void succeededAppStopRequests(long duration) {
+ totalSucceededAppStopRequests.add(duration);
+ applicationStopLatency.add(duration);
+ }
+
+ public void succeededRecoverRequests(long duration) {
+ totalSucceededRecoverRequests.add(duration);
+ recoverLatency.add(duration);
+ }
+
+ public void succeededUpdateTokenRequests(long duration) {
+ totalSucceededUpdateAMRMTokenRequests.add(duration);
+ updateAMRMTokenLatency.add(duration);
+ }
+
long getFailedAppStartRequests() {
return failedAppStartRequests.value();
}
@@ -195,4 +266,36 @@
public void incrFailedAppRecoveryCount() {
failedAppRecoveryCount.incr();
}
+
+ long getFailedAppStopRequests() {
+ return failedAppStopRequests.value();
+ }
+
+ public void incrFailedAppStopRequests() {
+ failedAppStopRequests.incr();
+ }
+
+ long getFailedUpdateAMRMTokenRequests() {
+ return failedUpdateAMRMTokenRequests.value();
+ }
+
+ public void incrFailedUpdateAMRMTokenRequests() {
+ failedUpdateAMRMTokenRequests.incr();
+ }
+
+ public void incrAllocateCount() {
+ allocateCount.incr();
+ }
+
+ public void incrRequestCount() {
+ requestCount.incr();
+ }
+
+ long getAllocateCount() {
+ return allocateCount.value();
+ }
+
+ long getRequestCount() {
+ return requestCount.value();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
index b0d66ca..82873e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
@@ -53,6 +54,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -123,12 +125,9 @@
Preconditions.checkArgument(dispatcher != null, "dispatcher is null");
this.nmContext = nmContext;
this.dispatcher = dispatcher;
- this.applPipelineMap =
- new ConcurrentHashMap<ApplicationId, RequestInterceptorChainWrapper>();
+ this.applPipelineMap = new ConcurrentHashMap<>();
- this.dispatcher.register(ApplicationEventType.class,
- new ApplicationEventHandler());
-
+ this.dispatcher.register(ApplicationEventType.class, new ApplicationEventHandler());
metrics = AMRMProxyMetrics.getMetrics();
}
@@ -155,7 +154,7 @@
@Override
protected void serviceStart() throws Exception {
- LOG.info("Starting AMRMProxyService");
+ LOG.info("Starting AMRMProxyService.");
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
UserGroupInformation.setConfiguration(conf);
@@ -182,27 +181,22 @@
listenerEndpoint, serverConf, this.secretManager,
numWorkerThreads);
- if (conf
- .getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
- false)) {
- this.server.refreshServiceAcl(conf, NMPolicyProvider.getInstance());
+ if (conf.getBoolean(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
+ this.server.refreshServiceAcl(conf, NMPolicyProvider.getInstance());
}
this.server.start();
- LOG.info("AMRMProxyService listening on address: "
- + this.server.getListenerAddress());
+ LOG.info("AMRMProxyService listening on address: {}.", this.server.getListenerAddress());
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
- LOG.info("Stopping AMRMProxyService");
+ LOG.info("Stopping AMRMProxyService.");
if (this.server != null) {
this.server.stop();
}
-
this.secretManager.stop();
-
super.serviceStop();
}
@@ -212,19 +206,21 @@
* @throws IOException if recover fails
*/
public void recover() throws IOException {
- LOG.info("Recovering AMRMProxyService");
+ LOG.info("Recovering AMRMProxyService.");
RecoveredAMRMProxyState state =
this.nmContext.getNMStateStore().loadAMRMProxyState();
this.secretManager.recover(state);
- LOG.info("Recovering {} running applications for AMRMProxy",
+ LOG.info("Recovering {} running applications for AMRMProxy.",
state.getAppContexts().size());
+
for (Map.Entry<ApplicationAttemptId, Map<String, byte[]>> entry : state
.getAppContexts().entrySet()) {
ApplicationAttemptId attemptId = entry.getKey();
- LOG.info("Recovering app attempt {}", attemptId);
+ LOG.info("Recovering app attempt {}.", attemptId);
+ long startTime = clock.getTime();
// Try recover for the running application attempt
try {
@@ -233,19 +229,18 @@
for (Map.Entry<String, byte[]> contextEntry : entry.getValue()
.entrySet()) {
if (contextEntry.getKey().equals(NMSS_USER_KEY)) {
- user = new String(contextEntry.getValue(), "UTF-8");
+ user = new String(contextEntry.getValue(), StandardCharsets.UTF_8);
} else if (contextEntry.getKey().equals(NMSS_AMRMTOKEN_KEY)) {
amrmToken = new Token<>();
amrmToken.decodeFromUrlString(
- new String(contextEntry.getValue(), "UTF-8"));
+ new String(contextEntry.getValue(), StandardCharsets.UTF_8));
// Clear the service field, as if RM just issued the token
amrmToken.setService(new Text());
}
}
if (amrmToken == null) {
- throw new IOException(
- "No amrmToken found for app attempt " + attemptId);
+ throw new IOException("No amrmToken found for app attempt " + attemptId);
}
if (user == null) {
throw new IOException("No user found for app attempt " + attemptId);
@@ -258,14 +253,14 @@
// Retrieve the AM container credentials from NM context
Credentials amCred = null;
for (Container container : this.nmContext.getContainers().values()) {
- LOG.debug("From NM Context container {}", container.getContainerId());
+ LOG.debug("From NM Context container {}.", container.getContainerId());
if (container.getContainerId().getApplicationAttemptId().equals(
attemptId) && container.getContainerTokenIdentifier() != null) {
- LOG.debug("Container type {}",
+ LOG.debug("Container type {}.",
container.getContainerTokenIdentifier().getContainerType());
if (container.getContainerTokenIdentifier()
.getContainerType() == ContainerType.APPLICATION_MASTER) {
- LOG.info("AM container {} found in context, has credentials: {}",
+ LOG.info("AM container {} found in context, has credentials: {}.",
container.getContainerId(),
(container.getCredentials() != null));
amCred = container.getCredentials();
@@ -274,15 +269,17 @@
}
if (amCred == null) {
LOG.error("No credentials found for AM container of {}. "
- + "Yarn registry access might not work", attemptId);
+ + "Yarn registry access might not work.", attemptId);
}
- // Create the intercepter pipeline for the AM
+ // Create the interceptor pipeline for the AM
initializePipeline(attemptId, user, amrmToken, localToken,
entry.getValue(), true, amCred);
+ long endTime = clock.getTime();
+ this.metrics.succeededRecoverRequests(endTime - startTime);
} catch (Throwable e) {
- LOG.error("Exception when recovering " + attemptId
- + ", removing it from NMStateStore and move on", e);
+ LOG.error("Exception when recovering {}, removing it from NMStateStore and move on.",
+ attemptId, e);
this.metrics.incrFailedAppRecoveryCount();
this.nmContext.getNMStateStore().removeAMRMProxyAppContext(attemptId);
}
@@ -292,26 +289,28 @@
/**
* This is called by the AMs started on this node to register with the RM.
* This method does the initial authorization and then forwards the request to
- * the application instance specific intercepter chain.
+ * the application instance specific interceptor chain.
*/
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws YarnException,
IOException {
+ this.metrics.incrRequestCount();
long startTime = clock.getTime();
try {
RequestInterceptorChainWrapper pipeline =
authorizeAndGetInterceptorChain();
- LOG.info("Registering application master." + " Host:" + request.getHost()
- + " Port:" + request.getRpcPort() + " Tracking Url:" + request
- .getTrackingUrl() + " for application " + pipeline
- .getApplicationAttemptId());
+
+ LOG.info("RegisteringAM Host: {}, Port: {}, Tracking Url: {} for application {}. ",
+ request.getHost(), request.getRpcPort(), request.getTrackingUrl(),
+ pipeline.getApplicationAttemptId());
+
RegisterApplicationMasterResponse response =
pipeline.getRootInterceptor().registerApplicationMaster(request);
long endTime = clock.getTime();
this.metrics.succeededRegisterAMRequests(endTime - startTime);
- LOG.info("RegisterAM processing finished in {} ms for application {}",
+ LOG.info("RegisterAM processing finished in {} ms for application {}.",
endTime - startTime, pipeline.getApplicationAttemptId());
return response;
} catch (Throwable t) {
@@ -323,24 +322,25 @@
/**
* This is called by the AMs started on this node to unregister from the RM.
* This method does the initial authorization and then forwards the request to
- * the application instance specific intercepter chain.
+ * the application instance specific interceptor chain.
*/
@Override
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request) throws YarnException,
IOException {
+ this.metrics.incrRequestCount();
long startTime = clock.getTime();
try {
RequestInterceptorChainWrapper pipeline =
authorizeAndGetInterceptorChain();
- LOG.info("Finishing application master for {}. Tracking Url: {}",
+ LOG.info("Finishing application master for {}. Tracking Url: {}.",
pipeline.getApplicationAttemptId(), request.getTrackingUrl());
FinishApplicationMasterResponse response =
pipeline.getRootInterceptor().finishApplicationMaster(request);
long endTime = clock.getTime();
this.metrics.succeededFinishAMRequests(endTime - startTime);
- LOG.info("FinishAM finished with isUnregistered = {} in {} ms for {}",
+ LOG.info("FinishAM finished with isUnregistered = {} in {} ms for {}.",
response.getIsUnregistered(), endTime - startTime,
pipeline.getApplicationAttemptId());
return response;
@@ -354,12 +354,13 @@
* This is called by the AMs started on this node to send heart beat to RM.
* This method does the initial authorization and then forwards the request to
* the application instance specific pipeline, which is a chain of request
- * intercepter objects. One application request processing pipeline is created
+ * interceptor objects. One application request processing pipeline is created
* per AM instance.
*/
@Override
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
+ this.metrics.incrAllocateCount();
long startTime = clock.getTime();
try {
AMRMTokenIdentifier amrmTokenIdentifier =
@@ -373,7 +374,7 @@
long endTime = clock.getTime();
this.metrics.succeededAllocateRequests(endTime - startTime);
- LOG.info("Allocate processing finished in {} ms for application {}",
+ LOG.info("Allocate processing finished in {} ms for application {}.",
endTime - startTime, pipeline.getApplicationAttemptId());
return allocateResponse;
} catch (Throwable t) {
@@ -392,6 +393,7 @@
*/
public void processApplicationStartRequest(StartContainerRequest request)
throws IOException, YarnException {
+ this.metrics.incrRequestCount();
long startTime = clock.getTime();
try {
ContainerTokenIdentifier containerTokenIdentifierForKey =
@@ -408,8 +410,7 @@
if (!checkIfAppExistsInStateStore(applicationID)) {
return;
}
- LOG.info("Callback received for initializing request "
- + "processing pipeline for an AM");
+ LOG.info("Callback received for initializing request processing pipeline for an AM.");
Credentials credentials = YarnServerSecurityUtils
.parseCredentials(request.getContainerLaunchContext());
@@ -417,8 +418,7 @@
getFirstAMRMToken(credentials.getAllTokens());
if (amrmToken == null) {
throw new YarnRuntimeException(
- "AMRMToken not found in the start container request for "
- + "application:" + appAttemptId.toString());
+ "AMRMToken not found in the start container request for application:" + appAttemptId);
}
// Substitute the existing AMRM Token with a local one. Keep the rest of
@@ -445,7 +445,7 @@
}
/**
- * Initializes the request intercepter pipeline for the specified application.
+ * Initializes the request interceptor pipeline for the specified application.
*
* @param applicationAttemptId attempt id
* @param user user name
@@ -465,8 +465,7 @@
.containsKey(applicationAttemptId.getApplicationId())) {
LOG.warn("Request to start an already existing appId was received. "
+ " This can happen if an application failed and a new attempt "
- + "was created on this machine. ApplicationId: "
- + applicationAttemptId.toString());
+ + "was created on this machine. ApplicationId: {}.", applicationAttemptId);
RequestInterceptorChainWrapper chainWrapperBackup =
this.applPipelineMap.get(applicationAttemptId.getApplicationId());
@@ -476,8 +475,7 @@
.equals(applicationAttemptId)) {
// TODO: revisit in AMRMProxy HA in YARN-6128
// Remove the existing pipeline
- LOG.info("Remove the previous pipeline for ApplicationId: "
- + applicationAttemptId.toString());
+ LOG.info("Remove the previous pipeline for ApplicationId: {}.", applicationAttemptId);
RequestInterceptorChainWrapper pipeline =
applPipelineMap.remove(applicationAttemptId.getApplicationId());
@@ -485,19 +483,17 @@
try {
this.nmContext.getNMStateStore()
.removeAMRMProxyAppContext(applicationAttemptId);
- } catch (IOException e) {
- LOG.error("Error removing AMRMProxy application context for "
- + applicationAttemptId, e);
+ } catch (IOException ioe) {
+ LOG.error("Error removing AMRMProxy application context for {}.",
+ applicationAttemptId, ioe);
}
}
try {
pipeline.getRootInterceptor().shutdown();
} catch (Throwable ex) {
- LOG.warn(
- "Failed to shutdown the request processing pipeline for app:"
- + applicationAttemptId.getApplicationId(),
- ex);
+ LOG.warn("Failed to shutdown the request processing pipeline for app: {}.",
+ applicationAttemptId.getApplicationId(), ex);
}
} else {
return;
@@ -510,12 +506,11 @@
}
// We register the pipeline instance in the map first and then initialize it
- // later because chain initialization can be expensive and we would like to
+ // later because chain initialization can be expensive, and we would like to
// release the lock as soon as possible to prevent other applications from
// blocking when one application's chain is initializing
LOG.info("Initializing request processing pipeline for application. "
- + " ApplicationId:" + applicationAttemptId + " for the user: "
- + user);
+ + " ApplicationId: {} for the user: {}.", applicationAttemptId, user);
try {
RequestInterceptor interceptorChain =
@@ -525,8 +520,7 @@
user, amrmToken, localToken, credentials, this.registry));
if (isRecovery) {
if (recoveredDataMap == null) {
- throw new YarnRuntimeException(
- "null recoveredDataMap recieved for recover");
+ throw new YarnRuntimeException("null recoveredDataMap received for recover");
}
interceptorChain.recover(recoveredDataMap);
}
@@ -535,13 +529,13 @@
if (!isRecovery && this.nmContext.getNMStateStore() != null) {
try {
this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry(
- applicationAttemptId, NMSS_USER_KEY, user.getBytes("UTF-8"));
+ applicationAttemptId, NMSS_USER_KEY, user.getBytes(StandardCharsets.UTF_8));
this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry(
applicationAttemptId, NMSS_AMRMTOKEN_KEY,
- amrmToken.encodeToUrlString().getBytes("UTF-8"));
+ amrmToken.encodeToUrlString().getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
- LOG.error("Error storing AMRMProxy application context entry for "
- + applicationAttemptId, e);
+ LOG.error("Error storing AMRMProxy application context entry for {}.",
+ applicationAttemptId, e);
}
}
} catch (Exception e) {
@@ -557,29 +551,27 @@
* @param applicationId application id
*/
protected void stopApplication(ApplicationId applicationId) {
- Preconditions.checkArgument(applicationId != null,
- "applicationId is null");
+ this.metrics.incrRequestCount();
+ Preconditions.checkArgument(applicationId != null, "applicationId is null");
RequestInterceptorChainWrapper pipeline =
this.applPipelineMap.remove(applicationId);
+ boolean isStopSuccess = true;
+ long startTime = clock.getTime();
if (pipeline == null) {
- LOG.info(
- "No interceptor pipeline for application {},"
- + " likely because its AM is not run in this node.",
- applicationId);
+ LOG.info("No interceptor pipeline for application {},"
+ + " likely because its AM is not run in this node.", applicationId);
+ isStopSuccess = false;
} else {
// Remove the appAttempt in AMRMTokenSecretManager
- this.secretManager
- .applicationMasterFinished(pipeline.getApplicationAttemptId());
-
- LOG.info("Stopping the request processing pipeline for application: "
- + applicationId);
+ this.secretManager.applicationMasterFinished(pipeline.getApplicationAttemptId());
+ LOG.info("Stopping the request processing pipeline for application: {}.", applicationId);
try {
pipeline.getRootInterceptor().shutdown();
} catch (Throwable ex) {
- LOG.warn(
- "Failed to shutdown the request processing pipeline for app:"
- + applicationId, ex);
+ LOG.warn("Failed to shutdown the request processing pipeline for app: {}.",
+ applicationId, ex);
+ isStopSuccess = false;
}
// Remove the app context from NMSS after the interceptors are shutdown
@@ -588,74 +580,83 @@
this.nmContext.getNMStateStore()
.removeAMRMProxyAppContext(pipeline.getApplicationAttemptId());
} catch (IOException e) {
- LOG.error("Error removing AMRMProxy application context for "
- + applicationId, e);
+ LOG.error("Error removing AMRMProxy application context for {}.",
+ applicationId, e);
+ isStopSuccess = false;
}
}
}
+
+ if (isStopSuccess) {
+ long endTime = clock.getTime();
+ this.metrics.succeededAppStopRequests(endTime - startTime);
+ } else {
+ this.metrics.incrFailedAppStopRequests();
+ }
}
private void updateAMRMTokens(AMRMTokenIdentifier amrmTokenIdentifier,
RequestInterceptorChainWrapper pipeline,
AllocateResponse allocateResponse) {
+
AMRMProxyApplicationContextImpl context =
- (AMRMProxyApplicationContextImpl) pipeline.getRootInterceptor()
- .getApplicationContext();
+ (AMRMProxyApplicationContextImpl) pipeline.getRootInterceptor().getApplicationContext();
- // check to see if the RM has issued a new AMRMToken & accordingly update
- // the real ARMRMToken in the current context
- if (allocateResponse.getAMRMToken() != null) {
- LOG.info("RM rolled master-key for amrm-tokens");
+ try {
+ long startTime = clock.getTime();
- org.apache.hadoop.yarn.api.records.Token token =
- allocateResponse.getAMRMToken();
+ // check to see if the RM has issued a new AMRMToken & accordingly update
+ // the real ARMRMToken in the current context
+ if (allocateResponse.getAMRMToken() != null) {
+ LOG.info("RM rolled master-key for amrm-tokens.");
- // Do not propagate this info back to AM
- allocateResponse.setAMRMToken(null);
+ org.apache.hadoop.yarn.api.records.Token token = allocateResponse.getAMRMToken();
- org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newToken =
- ConverterUtils.convertFromYarn(token, (Text) null);
+ // Do not propagate this info back to AM
+ allocateResponse.setAMRMToken(null);
- // Update the AMRMToken in context map, and in NM state store if it is
- // different
- if (context.setAMRMToken(newToken)
- && this.nmContext.getNMStateStore() != null) {
- try {
+ org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newToken =
+ ConverterUtils.convertFromYarn(token, (Text) null);
+
+ // Update the AMRMToken in context map, and in NM state store if it is
+ // different
+ if (context.setAMRMToken(newToken) && this.nmContext.getNMStateStore() != null) {
this.nmContext.getNMStateStore().storeAMRMProxyAppContextEntry(
context.getApplicationAttemptId(), NMSS_AMRMTOKEN_KEY,
- newToken.encodeToUrlString().getBytes("UTF-8"));
- } catch (IOException e) {
- LOG.error("Error storing AMRMProxy application context entry for "
- + context.getApplicationAttemptId(), e);
+ newToken.encodeToUrlString().getBytes(StandardCharsets.UTF_8));
}
}
- }
- // Check if the local AMRMToken is rolled up and update the context and
- // response accordingly
- MasterKeyData nextMasterKey =
- this.secretManager.getNextMasterKeyData();
+ // Check if the local AMRMToken is rolled up and update the context and
+ // response accordingly
+ MasterKeyData nextMasterKey = this.secretManager.getNextMasterKeyData();
- if (nextMasterKey != null
- && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier
- .getKeyId()) {
- Token<AMRMTokenIdentifier> localToken = context.getLocalAMRMToken();
- if (nextMasterKey.getMasterKey().getKeyId() != context
- .getLocalAMRMTokenKeyId()) {
- LOG.info("The local AMRMToken has been rolled-over."
- + " Send new local AMRMToken back to application: "
- + pipeline.getApplicationId());
- localToken =
- this.secretManager.createAndGetAMRMToken(pipeline
- .getApplicationAttemptId());
- context.setLocalAMRMToken(localToken);
+ if (nextMasterKey != null) {
+ MasterKey masterKey = nextMasterKey.getMasterKey();
+ if (masterKey.getKeyId() != amrmTokenIdentifier.getKeyId()) {
+ Token<AMRMTokenIdentifier> localToken = context.getLocalAMRMToken();
+ if (masterKey.getKeyId() != context.getLocalAMRMTokenKeyId()) {
+ LOG.info("The local AMRMToken has been rolled-over."
+ + " Send new local AMRMToken back to application: {}",
+ pipeline.getApplicationId());
+ localToken = this.secretManager.createAndGetAMRMToken(
+ pipeline.getApplicationAttemptId());
+ context.setLocalAMRMToken(localToken);
+ }
+
+ allocateResponse
+ .setAMRMToken(org.apache.hadoop.yarn.api.records.Token
+ .newInstance(localToken.getIdentifier(), localToken
+ .getKind().toString(), localToken.getPassword(),
+ localToken.getService().toString()));
+ }
}
-
- allocateResponse
- .setAMRMToken(org.apache.hadoop.yarn.api.records.Token
- .newInstance(localToken.getIdentifier(), localToken
- .getKind().toString(), localToken.getPassword(),
- localToken.getService().toString()));
+ long endTime = clock.getTime();
+ this.metrics.succeededUpdateTokenRequests(endTime - startTime);
+ } catch (IOException e) {
+ LOG.error("Error storing AMRMProxy application context entry for {}.",
+ context.getApplicationAttemptId(), e);
+ this.metrics.incrFailedUpdateAMRMTokenRequests();
}
}
@@ -672,19 +673,19 @@
}
/**
- * Gets the Request intercepter chains for all the applications.
+ * Gets the Request interceptor chains for all the applications.
*
- * @return the request intercepter chains.
+ * @return the request interceptor chains.
*/
protected Map<ApplicationId, RequestInterceptorChainWrapper> getPipelines() {
return this.applPipelineMap;
}
/**
- * This method creates and returns reference of the first intercepter in the
- * chain of request intercepter instances.
+ * This method creates and returns reference of the first interceptor in the
+ * chain of request interceptor instances.
*
- * @return the reference of the first intercepter in the chain
+ * @return the reference of the first interceptor in the chain
*/
protected RequestInterceptor createRequestInterceptorChain() {
Configuration conf = getConfig();
@@ -717,7 +718,7 @@
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException(
"Could not instantiate ApplicationMasterRequestInterceptor: "
- + interceptorClassName, e);
+ + interceptorClassName, e);
}
}
@@ -729,10 +730,10 @@
}
/**
- * Returns the comma separated intercepter class names from the configuration.
+ * Returns the comma separated interceptor class names from the configuration.
*
* @param conf configuration
- * @return the intercepter class names as an instance of ArrayList
+ * @return the interceptor class names as an instance of ArrayList
*/
private List<String> getInterceptorClassNames(Configuration conf) {
String configuredInterceptorClassNames =
@@ -759,7 +760,7 @@
* Authorizes the request and returns the application specific request
* processing pipeline.
*
- * @return the the intercepter wrapper instance
+ * @return the interceptor wrapper instance
* @throws YarnException if fails
*/
private RequestInterceptorChainWrapper authorizeAndGetInterceptorChain()
@@ -775,11 +776,10 @@
tokenIdentifier.getApplicationAttemptId();
synchronized (this.applPipelineMap) {
- if (!this.applPipelineMap.containsKey(appAttemptId
- .getApplicationId())) {
+ if (!this.applPipelineMap.containsKey(appAttemptId.getApplicationId())) {
throw new YarnException(
"The AM request processing pipeline is not initialized for app: "
- + appAttemptId.getApplicationId().toString());
+ + appAttemptId.getApplicationId());
}
return this.applPipelineMap.get(appAttemptId.getApplicationId());
@@ -827,29 +827,26 @@
/**
* Private class for handling application stop events.
- *
*/
class ApplicationEventHandler implements EventHandler<ApplicationEvent> {
@Override
public void handle(ApplicationEvent event) {
Application app =
- AMRMProxyService.this.nmContext.getApplications().get(
- event.getApplicationID());
+ AMRMProxyService.this.nmContext.getApplications().get(event.getApplicationID());
if (app != null) {
switch (event.getType()) {
case APPLICATION_RESOURCES_CLEANEDUP:
- LOG.info("Application stop event received for stopping AppId:"
- + event.getApplicationID().toString());
+ LOG.info("Application stop event received for stopping AppId: {}.",
+ event.getApplicationID().toString());
AMRMProxyService.this.stopApplication(event.getApplicationID());
break;
default:
- LOG.debug("AMRMProxy is ignoring event: {}", event.getType());
+ LOG.debug("AMRMProxy is ignoring event: {}.", event.getType());
break;
}
} else {
- LOG.warn("Event " + event + " sent to absent application "
- + event.getApplicationID());
+ LOG.warn("Event {} sent to absent application {}.", event, event.getApplicationID());
}
}
}
@@ -866,20 +863,20 @@
/**
* Initializes the wrapper with the specified parameters.
- *
- * @param rootInterceptor the root request intercepter
- * @param applicationAttemptId attempt id
+ *
+ * @param interceptor the root request interceptor
+ * @param appAttemptId attempt id
*/
- public synchronized void init(RequestInterceptor rootInterceptor,
- ApplicationAttemptId applicationAttemptId) {
- this.rootInterceptor = rootInterceptor;
- this.applicationAttemptId = applicationAttemptId;
+ public synchronized void init(RequestInterceptor interceptor,
+ ApplicationAttemptId appAttemptId) {
+ rootInterceptor = interceptor;
+ applicationAttemptId = appAttemptId;
}
/**
- * Gets the root request intercepter.
+ * Gets the root request interceptor.
*
- * @return the root request intercepter
+ * @return the root request interceptor
*/
public synchronized RequestInterceptor getRootInterceptor() {
return rootInterceptor;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/package-info.java
new file mode 100644
index 0000000..2f20d98
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.nodemanager.amrmproxy contains
+ * classes for handling federation amrm information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyMetrics.java
index 4621c4d..6219641 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyMetrics.java
@@ -42,11 +42,19 @@
Assert.assertEquals(0, metrics.getFailedRegisterAMRequests());
Assert.assertEquals(0, metrics.getFailedFinishAMRequests());
Assert.assertEquals(0, metrics.getFailedAllocateRequests());
+ Assert.assertEquals(0, metrics.getFailedAppRecoveryCount());
+ Assert.assertEquals(0, metrics.getFailedAppStopRequests());
+ Assert.assertEquals(0, metrics.getFailedUpdateAMRMTokenRequests());
+ Assert.assertEquals(0, metrics.getAllocateCount());
+ Assert.assertEquals(0, metrics.getRequestCount());
Assert.assertEquals(0, metrics.getNumSucceededAppStartRequests());
Assert.assertEquals(0, metrics.getNumSucceededRegisterAMRequests());
Assert.assertEquals(0, metrics.getNumSucceededFinishAMRequests());
Assert.assertEquals(0, metrics.getNumSucceededAllocateRequests());
+ Assert.assertEquals(0, metrics.getNumSucceededRecoverRequests());
+ Assert.assertEquals(0, metrics.getNumSucceededAppStopRequests());
+ Assert.assertEquals(0, metrics.getNumSucceededUpdateAMRMTokenRequests());
LOG.info("Test: aggregate metrics are updated correctly");
}
@@ -57,19 +65,19 @@
long failedRegisterAMRequests = metrics.getFailedRegisterAMRequests();
long failedFinishAMRequests = metrics.getFailedFinishAMRequests();
long failedAllocateRequests = metrics.getFailedAllocateRequests();
+ long failedAppRecoveryRequests = metrics.getFailedAppRecoveryCount();
+ long failedAppStopRequests = metrics.getFailedAppStopRequests();
+ long failedUpdateAMRMTokenRequests = metrics.getFailedUpdateAMRMTokenRequests();
long succeededAppStartRequests = metrics.getNumSucceededAppStartRequests();
- long succeededRegisterAMRequests =
- metrics.getNumSucceededRegisterAMRequests();
+ long succeededRegisterAMRequests = metrics.getNumSucceededRegisterAMRequests();
long succeededFinishAMRequests = metrics.getNumSucceededFinishAMRequests();
long succeededAllocateRequests = metrics.getNumSucceededAllocateRequests();
int testAppId = 1;
- RegisterApplicationMasterResponse registerResponse =
- registerApplicationMaster(testAppId);
+ RegisterApplicationMasterResponse registerResponse = registerApplicationMaster(testAppId);
Assert.assertNotNull(registerResponse);
- Assert
- .assertEquals(Integer.toString(testAppId), registerResponse.getQueue());
+ Assert.assertEquals(Integer.toString(testAppId), registerResponse.getQueue());
AllocateResponse allocateResponse = allocate(testAppId);
Assert.assertNotNull(allocateResponse);
@@ -80,14 +88,13 @@
Assert.assertNotNull(finshResponse);
Assert.assertEquals(true, finshResponse.getIsUnregistered());
- Assert.assertEquals(failedAppStartRequests,
- metrics.getFailedAppStartRequests());
- Assert.assertEquals(failedRegisterAMRequests,
- metrics.getFailedRegisterAMRequests());
- Assert.assertEquals(failedFinishAMRequests,
- metrics.getFailedFinishAMRequests());
- Assert.assertEquals(failedAllocateRequests,
- metrics.getFailedAllocateRequests());
+ Assert.assertEquals(failedAppStartRequests, metrics.getFailedAppStartRequests());
+ Assert.assertEquals(failedRegisterAMRequests, metrics.getFailedRegisterAMRequests());
+ Assert.assertEquals(failedFinishAMRequests, metrics.getFailedFinishAMRequests());
+ Assert.assertEquals(failedAllocateRequests, metrics.getFailedAllocateRequests());
+ Assert.assertEquals(failedAppRecoveryRequests, metrics.getFailedAppRecoveryCount());
+ Assert.assertEquals(failedAppStopRequests, metrics.getFailedAppStopRequests());
+ Assert.assertEquals(failedUpdateAMRMTokenRequests, metrics.getFailedUpdateAMRMTokenRequests());
Assert.assertEquals(succeededAppStartRequests,
metrics.getNumSucceededAppStartRequests());