blob: 70fd2576ab09ee91fd4ef52a6f09797531cb8736 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
/**
* This class manages the list of applications for the resource manager.
*/
public class RMAppManager implements EventHandler<RMAppManagerEvent>,
Recoverable {
private static final Log LOG = LogFactory.getLog(RMAppManager.class);
private int completedAppsMax = YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS;
private LinkedList<ApplicationId> completedApps = new LinkedList<ApplicationId>();
private final RMContext rmContext;
private final ApplicationMasterService masterService;
private final YarnScheduler scheduler;
private final ApplicationACLsManager applicationACLsManager;
private Configuration conf;
public RMAppManager(RMContext context,
YarnScheduler scheduler, ApplicationMasterService masterService,
ApplicationACLsManager applicationACLsManager, Configuration conf) {
this.rmContext = context;
this.scheduler = scheduler;
this.masterService = masterService;
this.applicationACLsManager = applicationACLsManager;
this.conf = conf;
setCompletedAppsMax(conf.getInt(
YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS));
}
/**
* This class is for logging the application summary.
*/
static class ApplicationSummary {
static final Log LOG = LogFactory.getLog(ApplicationSummary.class);
// Escape sequences
static final char EQUALS = '=';
static final char[] charsToEscape =
{StringUtils.COMMA, EQUALS, StringUtils.ESCAPE_CHAR};
static class SummaryBuilder {
final StringBuilder buffer = new StringBuilder();
// A little optimization for a very common case
SummaryBuilder add(String key, long value) {
return _add(key, Long.toString(value));
}
<T> SummaryBuilder add(String key, T value) {
return _add(key, StringUtils.escapeString(String.valueOf(value),
StringUtils.ESCAPE_CHAR, charsToEscape));
}
SummaryBuilder add(SummaryBuilder summary) {
if (buffer.length() > 0) buffer.append(StringUtils.COMMA);
buffer.append(summary.buffer);
return this;
}
SummaryBuilder _add(String key, String value) {
if (buffer.length() > 0) buffer.append(StringUtils.COMMA);
buffer.append(key).append(EQUALS).append(value);
return this;
}
@Override public String toString() {
return buffer.toString();
}
}
/**
* create a summary of the application's runtime.
*
* @param app {@link RMApp} whose summary is to be created, cannot
* be <code>null</code>.
*/
public static SummaryBuilder createAppSummary(RMApp app) {
String trackingUrl = "N/A";
String host = "N/A";
RMAppAttempt attempt = app.getCurrentAppAttempt();
if (attempt != null) {
trackingUrl = attempt.getTrackingUrl();
host = attempt.getHost();
}
SummaryBuilder summary = new SummaryBuilder()
.add("appId", app.getApplicationId())
.add("name", app.getName())
.add("user", app.getUser())
.add("queue", app.getQueue())
.add("state", app.getState())
.add("trackingUrl", trackingUrl)
.add("appMasterHost", host)
.add("startTime", app.getStartTime())
.add("finishTime", app.getFinishTime());
return summary;
}
/**
* Log a summary of the application's runtime.
*
* @param app {@link RMApp} whose summary is to be logged
*/
public static void logAppSummary(RMApp app) {
if (app != null) {
LOG.info(createAppSummary(app));
}
}
}
protected synchronized void setCompletedAppsMax(int max) {
this.completedAppsMax = max;
}
protected synchronized int getCompletedAppsListSize() {
return this.completedApps.size();
}
protected synchronized void finishApplication(ApplicationId applicationId) {
if (applicationId == null) {
LOG.error("RMAppManager received completed appId of null, skipping");
} else {
// Inform the DelegationTokenRenewer
if (UserGroupInformation.isSecurityEnabled()) {
rmContext.getDelegationTokenRenewer().applicationFinished(applicationId);
}
completedApps.add(applicationId);
writeAuditLog(applicationId);
// application completely done. Remove from state
RMStateStore store = rmContext.getStateStore();
store.removeApplication(rmContext.getRMApps().get(applicationId));
}
}
protected void writeAuditLog(ApplicationId appId) {
RMApp app = rmContext.getRMApps().get(appId);
String operation = "UNKONWN";
boolean success = false;
switch (app.getState()) {
case FAILED:
operation = AuditConstants.FINISH_FAILED_APP;
break;
case FINISHED:
operation = AuditConstants.FINISH_SUCCESS_APP;
success = true;
break;
case KILLED:
operation = AuditConstants.FINISH_KILLED_APP;
success = true;
break;
default:
}
if (success) {
RMAuditLogger.logSuccess(app.getUser(), operation,
"RMAppManager", app.getApplicationId());
} else {
StringBuilder diag = app.getDiagnostics();
String msg = diag == null ? null : diag.toString();
RMAuditLogger.logFailure(app.getUser(), operation, msg, "RMAppManager",
"App failed with state: " + app.getState(), appId);
}
}
/*
* check to see if hit the limit for max # completed apps kept
*/
protected synchronized void checkAppNumCompletedLimit() {
while (completedApps.size() > this.completedAppsMax) {
ApplicationId removeId = completedApps.remove();
LOG.info("Application should be expired, max # apps"
+ " met. Removing app: " + removeId);
rmContext.getRMApps().remove(removeId);
this.applicationACLsManager.removeApplication(removeId);
}
}
@SuppressWarnings("unchecked")
protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime) {
ApplicationId applicationId = submissionContext.getApplicationId();
RMApp application = null;
try {
// Sanity checks
if (submissionContext.getQueue() == null) {
submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
}
if (submissionContext.getApplicationName() == null) {
submissionContext.setApplicationName(
YarnConfiguration.DEFAULT_APPLICATION_NAME);
}
// Create RMApp
application =
new RMAppImpl(applicationId, rmContext, this.conf,
submissionContext.getApplicationName(),
submissionContext.getUser(), submissionContext.getQueue(),
submissionContext, this.scheduler, this.masterService,
submitTime);
// Sanity check - duplicate?
if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
null) {
String message = "Application with id " + applicationId
+ " is already present! Cannot add a duplicate!";
LOG.info(message);
throw RPCUtil.getRemoteException(message);
}
// Inform the ACLs Manager
this.applicationACLsManager.addApplication(applicationId,
submissionContext.getAMContainerSpec().getApplicationACLs());
// Setup tokens for renewal
if (UserGroupInformation.isSecurityEnabled()) {
this.rmContext.getDelegationTokenRenewer().addApplication(
applicationId,parseCredentials(submissionContext),
submissionContext.getCancelTokensWhenComplete()
);
}
// All done, start the RMApp
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.START));
} catch (IOException ie) {
LOG.info("RMAppManager submit application exception", ie);
if (application != null) {
// Sending APP_REJECTED is fine, since we assume that the
// RMApp is in NEW state and thus we havne't yet informed the
// Scheduler about the existence of the application
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppRejectedEvent(applicationId, ie.getMessage()));
}
}
}
private Credentials parseCredentials(ApplicationSubmissionContext application)
throws IOException {
Credentials credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
ByteBuffer tokens = application.getAMContainerSpec().getContainerTokens();
if (tokens != null) {
dibb.reset(tokens);
credentials.readTokenStorageStream(dibb);
tokens.rewind();
}
return credentials;
}
@Override
public void recover(RMState state) throws Exception {
RMStateStore store = rmContext.getStateStore();
assert store != null;
// recover applications
Map<ApplicationId, ApplicationState> appStates = state.getApplicationState();
LOG.info("Recovering " + appStates.size() + " applications");
for(ApplicationState appState : appStates.values()) {
// re-submit the application
// this is going to send an app start event but since the async dispatcher
// has not started that event will be queued until we have completed re
// populating the state
if(appState.getApplicationSubmissionContext().getUnmanagedAM()) {
// do not recover unmanaged applications since current recovery
// mechanism of restarting attempts does not work for them.
// This will need to be changed in work preserving recovery in which
// RM will re-connect with the running AM's instead of restarting them
LOG.info("Not recovering unmanaged application " + appState.getAppId());
store.removeApplication(appState);
} else {
LOG.info("Recovering application " + appState.getAppId());
submitApplication(appState.getApplicationSubmissionContext(),
appState.getSubmitTime());
// re-populate attempt information in application
RMAppImpl appImpl = (RMAppImpl) rmContext.getRMApps().get(
appState.getAppId());
appImpl.recover(state);
}
}
}
@Override
public void handle(RMAppManagerEvent event) {
ApplicationId applicationId = event.getApplicationId();
LOG.debug("RMAppManager processing event for "
+ applicationId + " of type " + event.getType());
switch(event.getType()) {
case APP_COMPLETED:
{
finishApplication(applicationId);
ApplicationSummary.logAppSummary(
rmContext.getRMApps().get(applicationId));
checkAppNumCompletedLimit();
}
break;
case APP_SUBMIT:
{
ApplicationSubmissionContext submissionContext =
((RMAppManagerSubmitEvent)event).getSubmissionContext();
long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime();
submitApplication(submissionContext, submitTime);
}
break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
}
}