blob: 1d672e5f5f8b1a8747b32a187be4da26a3901fe6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import com.google.common.annotations.VisibleForTesting;
/**
* This class manages the list of applications for the resource manager.
*/
public class RMAppManager implements EventHandler<RMAppManagerEvent>,
Recoverable {
private static final Log LOG = LogFactory.getLog(RMAppManager.class);
private int maxCompletedAppsInMemory;
private int maxCompletedAppsInStateStore;
protected int completedAppsInStateStore = 0;
private LinkedList<ApplicationId> completedApps = new LinkedList<ApplicationId>();
private final RMContext rmContext;
private final ApplicationMasterService masterService;
private final YarnScheduler scheduler;
private final ApplicationACLsManager applicationACLsManager;
private Configuration conf;
public RMAppManager(RMContext context,
YarnScheduler scheduler, ApplicationMasterService masterService,
ApplicationACLsManager applicationACLsManager, Configuration conf) {
this.rmContext = context;
this.scheduler = scheduler;
this.masterService = masterService;
this.applicationACLsManager = applicationACLsManager;
this.conf = conf;
this.maxCompletedAppsInMemory = conf.getInt(
YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
this.maxCompletedAppsInStateStore =
conf.getInt(
YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS,
YarnConfiguration.DEFAULT_RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS);
if (this.maxCompletedAppsInStateStore > this.maxCompletedAppsInMemory) {
this.maxCompletedAppsInStateStore = this.maxCompletedAppsInMemory;
}
}
/**
* This class is for logging the application summary.
*/
static class ApplicationSummary {
static final Log LOG = LogFactory.getLog(ApplicationSummary.class);
// Escape sequences
static final char EQUALS = '=';
static final char[] charsToEscape =
{StringUtils.COMMA, EQUALS, StringUtils.ESCAPE_CHAR};
static class SummaryBuilder {
final StringBuilder buffer = new StringBuilder();
// A little optimization for a very common case
SummaryBuilder add(String key, long value) {
return _add(key, Long.toString(value));
}
<T> SummaryBuilder add(String key, T value) {
String escapedString = StringUtils.escapeString(String.valueOf(value),
StringUtils.ESCAPE_CHAR, charsToEscape).replaceAll("\n", "\\\\n")
.replaceAll("\r", "\\\\r");
return _add(key, escapedString);
}
SummaryBuilder add(SummaryBuilder summary) {
if (buffer.length() > 0) buffer.append(StringUtils.COMMA);
buffer.append(summary.buffer);
return this;
}
SummaryBuilder _add(String key, String value) {
if (buffer.length() > 0) buffer.append(StringUtils.COMMA);
buffer.append(key).append(EQUALS).append(value);
return this;
}
@Override public String toString() {
return buffer.toString();
}
}
/**
* create a summary of the application's runtime.
*
* @param app {@link RMApp} whose summary is to be created, cannot
* be <code>null</code>.
*/
public static SummaryBuilder createAppSummary(RMApp app) {
String trackingUrl = "N/A";
String host = "N/A";
RMAppAttempt attempt = app.getCurrentAppAttempt();
if (attempt != null) {
trackingUrl = attempt.getTrackingUrl();
host = attempt.getHost();
}
SummaryBuilder summary = new SummaryBuilder()
.add("appId", app.getApplicationId())
.add("name", app.getName())
.add("user", app.getUser())
.add("queue", app.getQueue())
.add("state", app.getState())
.add("trackingUrl", trackingUrl)
.add("appMasterHost", host)
.add("startTime", app.getStartTime())
.add("finishTime", app.getFinishTime())
.add("finalStatus", app.getFinalApplicationStatus());
return summary;
}
/**
* Log a summary of the application's runtime.
*
* @param app {@link RMApp} whose summary is to be logged
*/
public static void logAppSummary(RMApp app) {
if (app != null) {
LOG.info(createAppSummary(app));
}
}
}
@VisibleForTesting
public void logApplicationSummary(ApplicationId appId) {
ApplicationSummary.logAppSummary(rmContext.getRMApps().get(appId));
}
protected synchronized int getCompletedAppsListSize() {
return this.completedApps.size();
}
protected synchronized void finishApplication(ApplicationId applicationId) {
if (applicationId == null) {
LOG.error("RMAppManager received completed appId of null, skipping");
} else {
// Inform the DelegationTokenRenewer
if (UserGroupInformation.isSecurityEnabled()) {
rmContext.getDelegationTokenRenewer().applicationFinished(applicationId);
}
completedApps.add(applicationId);
completedAppsInStateStore++;
writeAuditLog(applicationId);
}
}
protected void writeAuditLog(ApplicationId appId) {
RMApp app = rmContext.getRMApps().get(appId);
String operation = "UNKONWN";
boolean success = false;
switch (app.getState()) {
case FAILED:
operation = AuditConstants.FINISH_FAILED_APP;
break;
case FINISHED:
operation = AuditConstants.FINISH_SUCCESS_APP;
success = true;
break;
case KILLED:
operation = AuditConstants.FINISH_KILLED_APP;
success = true;
break;
default:
}
if (success) {
RMAuditLogger.logSuccess(app.getUser(), operation,
"RMAppManager", app.getApplicationId());
} else {
StringBuilder diag = app.getDiagnostics();
String msg = diag == null ? null : diag.toString();
RMAuditLogger.logFailure(app.getUser(), operation, msg, "RMAppManager",
"App failed with state: " + app.getState(), appId);
}
}
/*
* check to see if hit the limit for max # completed apps kept
*/
protected synchronized void checkAppNumCompletedLimit() {
// check apps kept in state store.
while (completedAppsInStateStore > this.maxCompletedAppsInStateStore) {
ApplicationId removeId =
completedApps.get(completedApps.size() - completedAppsInStateStore);
RMApp removeApp = rmContext.getRMApps().get(removeId);
LOG.info("Max number of completed apps kept in state store met:"
+ " maxCompletedAppsInStateStore = " + maxCompletedAppsInStateStore
+ ", removing app " + removeApp.getApplicationId()
+ " from state store.");
rmContext.getStateStore().removeApplication(removeApp);
completedAppsInStateStore--;
}
// check apps kept in memorty.
while (completedApps.size() > this.maxCompletedAppsInMemory) {
ApplicationId removeId = completedApps.remove();
LOG.info("Application should be expired, max number of completed apps"
+ " kept in memory met: maxCompletedAppsInMemory = "
+ this.maxCompletedAppsInMemory + ", removing app " + removeId
+ " from memory: ");
rmContext.getRMApps().remove(removeId);
this.applicationACLsManager.removeApplication(removeId);
}
}
@SuppressWarnings("unchecked")
protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
String user) throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId();
RMAppImpl application =
createAndPopulateNewRMApp(submissionContext, submitTime, user);
ApplicationId appId = submissionContext.getApplicationId();
if (UserGroupInformation.isSecurityEnabled()) {
Credentials credentials = null;
try {
credentials = parseCredentials(submissionContext);
this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,
credentials, submissionContext.getCancelTokensWhenComplete());
} catch (Exception e) {
LOG.warn("Unable to parse credentials.", e);
// Sending APP_REJECTED is fine, since we assume that the
// RMApp is in NEW state and thus we haven't yet informed the
// scheduler about the existence of the application
assert application.getState() == RMAppState.NEW;
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, e.getMessage()));
throw RPCUtil.getRemoteException(e);
}
} else {
// Dispatcher is not yet started at this time, so these START events
// enqueued should be guaranteed to be first processed when dispatcher
// gets started.
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.START));
}
}
@SuppressWarnings("unchecked")
protected void
recoverApplication(ApplicationState appState, RMState rmState)
throws Exception {
ApplicationSubmissionContext appContext =
appState.getApplicationSubmissionContext();
ApplicationId appId = appState.getAppId();
// create and recover app.
RMAppImpl application =
createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
appState.getUser());
application.recover(rmState);
if (isApplicationInFinalState(appState.getState())) {
// We are synchronously moving the application into final state so that
// momentarily client will not see this application in NEW state. Also
// for finished applications we will avoid renewing tokens.
application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
return;
}
if (UserGroupInformation.isSecurityEnabled()) {
Credentials credentials = null;
try {
credentials = parseCredentials(appContext);
// synchronously renew delegation token on recovery.
rmContext.getDelegationTokenRenewer().addApplicationSync(appId,
credentials, appContext.getCancelTokensWhenComplete());
application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
} catch (Exception e) {
LOG.warn("Unable to parse and renew delegation tokens.", e);
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(appId, e.getMessage()));
throw e;
}
} else {
application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
}
}
private RMAppImpl createAndPopulateNewRMApp(
ApplicationSubmissionContext submissionContext,
long submitTime, String user)
throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId();
validateResourceRequest(submissionContext);
// Create RMApp
RMAppImpl application =
new RMAppImpl(applicationId, rmContext, this.conf,
submissionContext.getApplicationName(), user,
submissionContext.getQueue(),
submissionContext, this.scheduler, this.masterService,
submitTime, submissionContext.getApplicationType(),
submissionContext.getApplicationTags());
// Concurrent app submissions with same applicationId will fail here
// Concurrent app submissions with different applicationIds will not
// influence each other
if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
null) {
String message = "Application with id " + applicationId
+ " is already present! Cannot add a duplicate!";
LOG.warn(message);
throw RPCUtil.getRemoteException(message);
}
// Inform the ACLs Manager
this.applicationACLsManager.addApplication(applicationId,
submissionContext.getAMContainerSpec().getApplicationACLs());
String appViewACLs = submissionContext.getAMContainerSpec()
.getApplicationACLs().get(ApplicationAccessType.VIEW_APP);
rmContext.getSystemMetricsPublisher().appACLsUpdated(
application, appViewACLs, System.currentTimeMillis());
return application;
}
private void validateResourceRequest(
ApplicationSubmissionContext submissionContext)
throws InvalidResourceRequestException {
// Validation of the ApplicationSubmissionContext needs to be completed
// here. Only those fields that are dependent on RM's configuration are
// checked here as they have to be validated whether they are part of new
// submission or just being recovered.
// Check whether AM resource requirements are within required limits
if (!submissionContext.getUnmanagedAM()) {
ResourceRequest amReq = BuilderUtils.newResourceRequest(
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
submissionContext.getResource(), 1);
try {
SchedulerUtils.validateResourceRequest(amReq,
scheduler.getMaximumResourceCapability());
} catch (InvalidResourceRequestException e) {
LOG.warn("RM app submission failed in validating AM resource request"
+ " for application " + submissionContext.getApplicationId(), e);
throw e;
}
}
}
private boolean isApplicationInFinalState(RMAppState rmAppState) {
if (rmAppState == RMAppState.FINISHED || rmAppState == RMAppState.FAILED
|| rmAppState == RMAppState.KILLED) {
return true;
} else {
return false;
}
}
protected Credentials parseCredentials(ApplicationSubmissionContext application)
throws IOException {
Credentials credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
ByteBuffer tokens = application.getAMContainerSpec().getTokens();
if (tokens != null) {
dibb.reset(tokens);
credentials.readTokenStorageStream(dibb);
tokens.rewind();
}
return credentials;
}
@Override
public void recover(RMState state) throws Exception {
RMStateStore store = rmContext.getStateStore();
assert store != null;
// recover applications
Map<ApplicationId, ApplicationState> appStates = state.getApplicationState();
LOG.info("Recovering " + appStates.size() + " applications");
for (ApplicationState appState : appStates.values()) {
recoverApplication(appState, state);
}
}
@Override
public void handle(RMAppManagerEvent event) {
ApplicationId applicationId = event.getApplicationId();
LOG.debug("RMAppManager processing event for "
+ applicationId + " of type " + event.getType());
switch(event.getType()) {
case APP_COMPLETED:
{
finishApplication(applicationId);
logApplicationSummary(applicationId);
checkAppNumCompletedLimit();
}
break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
}
}