blob: 4be5e2ab6d8e5476b3b78794815c2c15bdb56578 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.security.AccessRequest;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRecoverEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Times;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.hadoop.yarn.util.StringHelper;
/**
* This class manages the list of applications for the resource manager.
*/
public class RMAppManager implements EventHandler<RMAppManagerEvent>,
Recoverable {
private static final Log LOG = LogFactory.getLog(RMAppManager.class);
private int maxCompletedAppsInMemory;
private int maxCompletedAppsInStateStore;
protected int completedAppsInStateStore = 0;
private LinkedList<ApplicationId> completedApps = new LinkedList<ApplicationId>();
private final RMContext rmContext;
private final ApplicationMasterService masterService;
private final YarnScheduler scheduler;
private final ApplicationACLsManager applicationACLsManager;
private Configuration conf;
private YarnAuthorizationProvider authorizer;
private Set<String> exclusiveEnforcedPartitions;
public RMAppManager(RMContext context,
YarnScheduler scheduler, ApplicationMasterService masterService,
ApplicationACLsManager applicationACLsManager, Configuration conf) {
this.rmContext = context;
this.scheduler = scheduler;
this.masterService = masterService;
this.applicationACLsManager = applicationACLsManager;
this.conf = conf;
this.maxCompletedAppsInMemory = conf.getInt(
YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
this.maxCompletedAppsInStateStore =
conf.getInt(
YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS,
YarnConfiguration.DEFAULT_RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS);
if (this.maxCompletedAppsInStateStore > this.maxCompletedAppsInMemory) {
this.maxCompletedAppsInStateStore = this.maxCompletedAppsInMemory;
}
this.authorizer = YarnAuthorizationProvider.getInstance(conf);
this.exclusiveEnforcedPartitions = YarnConfiguration
.getExclusiveEnforcedPartitions(rmContext.getYarnConfiguration());
}
/**
* This class is for logging the application summary.
*/
static class ApplicationSummary {
static final Log LOG = LogFactory.getLog(ApplicationSummary.class);
// Escape sequences
static final char EQUALS = '=';
static final char[] charsToEscape =
{StringUtils.COMMA, EQUALS, StringUtils.ESCAPE_CHAR};
static class SummaryBuilder {
final StringBuilder buffer = new StringBuilder();
// A little optimization for a very common case
SummaryBuilder add(String key, long value) {
return _add(key, Long.toString(value));
}
<T> SummaryBuilder add(String key, T value) {
String escapedString = StringUtils.escapeString(String.valueOf(value),
StringUtils.ESCAPE_CHAR, charsToEscape).replaceAll("\n", "\\\\n")
.replaceAll("\r", "\\\\r");
return _add(key, escapedString);
}
SummaryBuilder add(SummaryBuilder summary) {
if (buffer.length() > 0) buffer.append(StringUtils.COMMA);
buffer.append(summary.buffer);
return this;
}
SummaryBuilder _add(String key, String value) {
if (buffer.length() > 0) buffer.append(StringUtils.COMMA);
buffer.append(key).append(EQUALS).append(value);
return this;
}
@Override public String toString() {
return buffer.toString();
}
}
/**
* create a summary of the application's runtime.
*
* @param app {@link RMApp} whose summary is to be created, cannot
* be <code>null</code>.
*/
public static SummaryBuilder createAppSummary(RMApp app) {
String trackingUrl = "N/A";
String host = "N/A";
RMAppAttempt attempt = app.getCurrentAppAttempt();
if (attempt != null) {
trackingUrl = attempt.getTrackingUrl();
host = attempt.getHost();
}
RMAppMetrics metrics = app.getRMAppMetrics();
SummaryBuilder summary = new SummaryBuilder()
.add("appId", app.getApplicationId())
.add("name", app.getName())
.add("user", app.getUser())
.add("queue", app.getQueue())
.add("state", app.getState())
.add("trackingUrl", trackingUrl)
.add("appMasterHost", host)
.add("submitTime", app.getSubmitTime())
.add("startTime", app.getStartTime())
.add("launchTime", app.getLaunchTime())
.add("finishTime", app.getFinishTime())
.add("finalStatus", app.getFinalApplicationStatus())
.add("memorySeconds", metrics.getMemorySeconds())
.add("vcoreSeconds", metrics.getVcoreSeconds())
.add("preemptedMemorySeconds", metrics.getPreemptedMemorySeconds())
.add("preemptedVcoreSeconds", metrics.getPreemptedVcoreSeconds())
.add("preemptedAMContainers", metrics.getNumAMContainersPreempted())
.add("preemptedNonAMContainers", metrics.getNumNonAMContainersPreempted())
.add("preemptedResources", metrics.getResourcePreempted())
.add("applicationType", app.getApplicationType())
.add("resourceSeconds", StringHelper
.getResourceSecondsString(metrics.getResourceSecondsMap()))
.add("preemptedResourceSeconds", StringHelper
.getResourceSecondsString(
metrics.getPreemptedResourceSecondsMap()))
.add("applicationTags", StringHelper.CSV_JOINER.join(
app.getApplicationTags() != null ? new TreeSet<>(
app.getApplicationTags()) : Collections.<String>emptySet()))
.add("applicationNodeLabel",
app.getApplicationSubmissionContext().getNodeLabelExpression()
== null
? ""
: app.getApplicationSubmissionContext()
.getNodeLabelExpression());
return summary;
}
/**
* Log a summary of the application's runtime.
*
* @param app {@link RMApp} whose summary is to be logged
*/
public static void logAppSummary(RMApp app) {
if (app != null) {
LOG.info(createAppSummary(app));
}
}
}
@VisibleForTesting
public void logApplicationSummary(ApplicationId appId) {
ApplicationSummary.logAppSummary(rmContext.getRMApps().get(appId));
}
protected synchronized int getCompletedAppsListSize() {
return this.completedApps.size();
}
protected synchronized void finishApplication(ApplicationId applicationId) {
if (applicationId == null) {
LOG.error("RMAppManager received completed appId of null, skipping");
} else {
// Inform the DelegationTokenRenewer
if (UserGroupInformation.isSecurityEnabled()) {
rmContext.getDelegationTokenRenewer().applicationFinished(applicationId);
}
completedApps.add(applicationId);
completedAppsInStateStore++;
writeAuditLog(applicationId);
}
}
protected void writeAuditLog(ApplicationId appId) {
RMApp app = rmContext.getRMApps().get(appId);
String operation = "UNKONWN";
boolean success = false;
switch (app.getState()) {
case FAILED:
operation = AuditConstants.FINISH_FAILED_APP;
break;
case FINISHED:
operation = AuditConstants.FINISH_SUCCESS_APP;
success = true;
break;
case KILLED:
operation = AuditConstants.FINISH_KILLED_APP;
success = true;
break;
default:
break;
}
if (success) {
RMAuditLogger.logSuccess(app.getUser(), operation,
"RMAppManager", app.getApplicationId());
} else {
StringBuilder diag = app.getDiagnostics();
String msg = diag == null ? null : diag.toString();
RMAuditLogger.logFailure(app.getUser(), operation, msg, "RMAppManager",
"App failed with state: " + app.getState(), appId);
}
}
/*
* check to see if hit the limit for max # completed apps kept
*/
protected synchronized void checkAppNumCompletedLimit() {
// check apps kept in state store.
while (completedAppsInStateStore > this.maxCompletedAppsInStateStore) {
ApplicationId removeId =
completedApps.get(completedApps.size() - completedAppsInStateStore);
RMApp removeApp = rmContext.getRMApps().get(removeId);
LOG.info("Max number of completed apps kept in state store met:"
+ " maxCompletedAppsInStateStore = " + maxCompletedAppsInStateStore
+ ", removing app " + removeApp.getApplicationId()
+ " from state store.");
rmContext.getStateStore().removeApplication(removeApp);
completedAppsInStateStore--;
}
// check apps kept in memorty.
while (completedApps.size() > this.maxCompletedAppsInMemory) {
ApplicationId removeId = completedApps.remove();
LOG.info("Application should be expired, max number of completed apps"
+ " kept in memory met: maxCompletedAppsInMemory = "
+ this.maxCompletedAppsInMemory + ", removing app " + removeId
+ " from memory: ");
rmContext.getRMApps().remove(removeId);
this.applicationACLsManager.removeApplication(removeId);
}
}
@SuppressWarnings("unchecked")
protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
String user) throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId();
// Passing start time as -1. It will be eventually set in RMAppImpl
// constructor.
RMAppImpl application = createAndPopulateNewRMApp(
submissionContext, submitTime, user, false, -1);
try {
if (UserGroupInformation.isSecurityEnabled()) {
this.rmContext.getDelegationTokenRenewer()
.addApplicationAsync(applicationId,
BuilderUtils.parseCredentials(submissionContext),
submissionContext.getCancelTokensWhenComplete(),
application.getUser(),
BuilderUtils.parseTokensConf(submissionContext));
} else {
// Dispatcher is not yet started at this time, so these START events
// enqueued should be guaranteed to be first processed when dispatcher
// gets started.
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.START));
}
} catch (Exception e) {
LOG.warn("Unable to parse credentials for " + applicationId, e);
// Sending APP_REJECTED is fine, since we assume that the
// RMApp is in NEW state and thus we haven't yet informed the
// scheduler about the existence of the application
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, e.getMessage()));
throw RPCUtil.getRemoteException(e);
}
}
protected void recoverApplication(ApplicationStateData appState,
RMState rmState) throws Exception {
ApplicationSubmissionContext appContext =
appState.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
// create and recover app.
RMAppImpl application =
createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
appState.getUser(), true, appState.getStartTime());
application.handle(new RMAppRecoverEvent(appId, rmState));
}
private RMAppImpl createAndPopulateNewRMApp(
ApplicationSubmissionContext submissionContext, long submitTime,
String user, boolean isRecovery, long startTime) throws YarnException {
// Do queue mapping
if (!isRecovery) {
// Do queue mapping
if (rmContext.getQueuePlacementManager() != null) {
// We only do queue mapping when it's a new application
rmContext.getQueuePlacementManager().placeApplication(
submissionContext, user);
}
// fail the submission if configured application timeout value is invalid
RMServerUtils.validateApplicationTimeouts(
submissionContext.getApplicationTimeouts());
}
ApplicationId applicationId = submissionContext.getApplicationId();
List<ResourceRequest> amReqs = validateAndCreateResourceRequest(
submissionContext, isRecovery);
// Verify and get the update application priority and set back to
// submissionContext
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
// Application priority needed to be validated only while submitting. During
// recovery, validated priority could be recovered from submission context.
if (!isRecovery) {
Priority appPriority = scheduler.checkAndGetApplicationPriority(
submissionContext.getPriority(), userUgi,
submissionContext.getQueue(),
applicationId);
submissionContext.setPriority(appPriority);
}
// Since FairScheduler queue mapping is done inside scheduler,
// if FairScheduler is used and the queue doesn't exist, we should not
// fail here because queue will be created inside FS. Ideally, FS queue
// mapping should be done outside scheduler too like CS.
// For now, exclude FS for the acl check.
if (!isRecovery && YarnConfiguration.isAclEnabled(conf)
&& scheduler instanceof CapacityScheduler) {
String queueName = submissionContext.getQueue();
String appName = submissionContext.getApplicationName();
CSQueue csqueue = ((CapacityScheduler) scheduler).getQueue(queueName);
if (null != csqueue
&& !authorizer.checkPermission(
new AccessRequest(csqueue.getPrivilegedEntity(), userUgi,
SchedulerUtils.toAccessType(QueueACL.SUBMIT_APPLICATIONS),
applicationId.toString(), appName, Server.getRemoteAddress(),
null))
&& !authorizer.checkPermission(
new AccessRequest(csqueue.getPrivilegedEntity(), userUgi,
SchedulerUtils.toAccessType(QueueACL.ADMINISTER_QUEUE),
applicationId.toString(), appName, Server.getRemoteAddress(),
null))) {
throw RPCUtil.getRemoteException(new AccessControlException(
"User " + user + " does not have permission to submit "
+ applicationId + " to queue " + submissionContext.getQueue()));
}
}
// Create RMApp
RMAppImpl application =
new RMAppImpl(applicationId, rmContext, this.conf,
submissionContext.getApplicationName(), user,
submissionContext.getQueue(),
submissionContext, this.scheduler, this.masterService,
submitTime, submissionContext.getApplicationType(),
submissionContext.getApplicationTags(), amReqs, startTime);
// Concurrent app submissions with same applicationId will fail here
// Concurrent app submissions with different applicationIds will not
// influence each other
if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
null) {
String message = "Application with id " + applicationId
+ " is already present! Cannot add a duplicate!";
LOG.warn(message);
throw new YarnException(message);
}
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
// Start timeline collector for the submitted app
application.startTimelineCollector();
}
// Inform the ACLs Manager
this.applicationACLsManager.addApplication(applicationId,
submissionContext.getAMContainerSpec().getApplicationACLs());
String appViewACLs = submissionContext.getAMContainerSpec()
.getApplicationACLs().get(ApplicationAccessType.VIEW_APP);
rmContext.getSystemMetricsPublisher().appACLsUpdated(
application, appViewACLs, System.currentTimeMillis());
return application;
}
private List<ResourceRequest> validateAndCreateResourceRequest(
ApplicationSubmissionContext submissionContext, boolean isRecovery)
throws InvalidResourceRequestException {
// Validation of the ApplicationSubmissionContext needs to be completed
// here. Only those fields that are dependent on RM's configuration are
// checked here as they have to be validated whether they are part of new
// submission or just being recovered.
// Check whether AM resource requirements are within required limits
if (!submissionContext.getUnmanagedAM()) {
List<ResourceRequest> amReqs =
submissionContext.getAMContainerResourceRequests();
if (amReqs == null || amReqs.isEmpty()) {
if (submissionContext.getResource() != null) {
amReqs = Collections.singletonList(BuilderUtils
.newResourceRequest(RMAppAttemptImpl.AM_CONTAINER_PRIORITY,
ResourceRequest.ANY, submissionContext.getResource(), 1));
} else {
throw new InvalidResourceRequestException("Invalid resource request, "
+ "no resources requested");
}
}
try {
// Find the ANY request and ensure there's only one
ResourceRequest anyReq = null;
for (ResourceRequest amReq : amReqs) {
if (amReq.getResourceName().equals(ResourceRequest.ANY)) {
if (anyReq == null) {
anyReq = amReq;
} else {
throw new InvalidResourceRequestException("Invalid resource "
+ "request, only one resource request with "
+ ResourceRequest.ANY + " is allowed");
}
}
}
if (anyReq == null) {
throw new InvalidResourceRequestException("Invalid resource request, "
+ "no resource request specified with " + ResourceRequest.ANY);
}
SchedulerUtils.enforcePartitionExclusivity(anyReq,
exclusiveEnforcedPartitions,
submissionContext.getNodeLabelExpression());
// Make sure that all of the requests agree with the ANY request
// and have correct values
for (ResourceRequest amReq : amReqs) {
amReq.setCapability(anyReq.getCapability());
amReq.setExecutionTypeRequest(
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED));
amReq.setNumContainers(1);
amReq.setPriority(RMAppAttemptImpl.AM_CONTAINER_PRIORITY);
}
// set label expression for AM ANY request if not set
if (null == anyReq.getNodeLabelExpression()) {
anyReq.setNodeLabelExpression(submissionContext
.getNodeLabelExpression());
}
// Put ANY request at the front
if (!amReqs.get(0).equals(anyReq)) {
amReqs.remove(anyReq);
amReqs.add(0, anyReq);
}
// Normalize all requests
String queue = submissionContext.getQueue();
for (ResourceRequest amReq : amReqs) {
SchedulerUtils.normalizeAndValidateRequest(amReq,
scheduler.getMaximumResourceCapability(queue),
queue, scheduler, isRecovery, rmContext);
amReq.setCapability(
scheduler.getNormalizedResource(amReq.getCapability()));
}
return amReqs;
} catch (InvalidResourceRequestException e) {
LOG.warn("RM app submission failed in validating AM resource request"
+ " for application " + submissionContext.getApplicationId(), e);
throw e;
}
}
return null;
}
@Override
public void recover(RMState state) throws Exception {
RMStateStore store = rmContext.getStateStore();
assert store != null;
// recover applications
Map<ApplicationId, ApplicationStateData> appStates =
state.getApplicationState();
LOG.info("Recovering " + appStates.size() + " applications");
int count = 0;
try {
for (ApplicationStateData appState : appStates.values()) {
recoverApplication(appState, state);
count += 1;
}
} finally {
LOG.info("Successfully recovered " + count + " out of "
+ appStates.size() + " applications");
}
}
@Override
public void handle(RMAppManagerEvent event) {
ApplicationId applicationId = event.getApplicationId();
LOG.debug("RMAppManager processing event for "
+ applicationId + " of type " + event.getType());
switch (event.getType()) {
case APP_COMPLETED :
finishApplication(applicationId);
logApplicationSummary(applicationId);
checkAppNumCompletedLimit();
break;
case APP_MOVE :
// moveAllApps from scheduler will fire this event for each of
// those applications which needed to be moved to a new queue.
// Use the standard move application api to do the same.
try {
moveApplicationAcrossQueue(applicationId,
event.getTargetQueueForMove());
} catch (YarnException e) {
LOG.warn("Move Application has failed: " + e.getMessage());
}
break;
default :
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
}
// transaction method.
public Map<ApplicationTimeoutType, String> updateApplicationTimeout(RMApp app,
Map<ApplicationTimeoutType, String> newTimeoutInISO8601Format)
throws YarnException {
ApplicationId applicationId = app.getApplicationId();
synchronized (applicationId) {
if (app.isAppInCompletedStates()) {
return newTimeoutInISO8601Format;
}
Map<ApplicationTimeoutType, Long> newExpireTime = RMServerUtils
.validateISO8601AndConvertToLocalTimeEpoch(newTimeoutInISO8601Format);
// validation is only for lifetime
Long updatedlifetimeInMillis =
newExpireTime.get(ApplicationTimeoutType.LIFETIME);
if (updatedlifetimeInMillis != null) {
long queueMaxLifetimeInSec =
scheduler.getMaximumApplicationLifetime(app.getQueue());
if (queueMaxLifetimeInSec > 0) {
if (updatedlifetimeInMillis > (app.getSubmitTime()
+ queueMaxLifetimeInSec * 1000)) {
updatedlifetimeInMillis =
app.getSubmitTime() + queueMaxLifetimeInSec * 1000;
// cut off to maximum queue lifetime if update lifetime is exceeding
// queue lifetime.
newExpireTime.put(ApplicationTimeoutType.LIFETIME,
updatedlifetimeInMillis);
newTimeoutInISO8601Format.put(ApplicationTimeoutType.LIFETIME,
Times.formatISO8601(updatedlifetimeInMillis.longValue()));
}
}
}
SettableFuture<Object> future = SettableFuture.create();
Map<ApplicationTimeoutType, Long> currentExpireTimeouts =
app.getApplicationTimeouts();
currentExpireTimeouts.putAll(newExpireTime);
ApplicationStateData appState =
ApplicationStateData.newInstance(app.getSubmitTime(),
app.getStartTime(), app.getApplicationSubmissionContext(),
app.getUser(), app.getCallerContext());
appState.setApplicationTimeouts(currentExpireTimeouts);
appState.setLaunchTime(app.getLaunchTime());
// update to state store. Though it synchronous call, update via future to
// know any exception has been set. It is required because in non-HA mode,
// state-store errors are skipped.
this.rmContext.getStateStore()
.updateApplicationStateSynchronously(appState, false, future);
Futures.get(future, YarnException.class);
// update in-memory
((RMAppImpl) app).updateApplicationTimeout(newExpireTime);
return newTimeoutInISO8601Format;
}
}
/**
* updateApplicationPriority will invoke scheduler api to update the
* new priority to RM and StateStore.
* @param callerUGI user
* @param applicationId Application Id
* @param newAppPriority proposed new application priority
* @throws YarnException Handle exceptions
*/
public void updateApplicationPriority(UserGroupInformation callerUGI,
ApplicationId applicationId, Priority newAppPriority)
throws YarnException {
RMApp app = this.rmContext.getRMApps().get(applicationId);
synchronized (applicationId) {
if (app == null || app.isAppInCompletedStates()) {
return;
}
// Create a future object to capture exceptions from StateStore.
SettableFuture<Object> future = SettableFuture.create();
// Invoke scheduler api to update priority in scheduler and to
// State Store.
Priority appPriority = rmContext.getScheduler().updateApplicationPriority(
newAppPriority, applicationId, future, callerUGI);
if (app.getApplicationPriority().equals(appPriority)) {
return;
}
Futures.get(future, YarnException.class);
// update in-memory
((RMAppImpl) app).setApplicationPriority(appPriority);
}
// Update the changed application state to timeline server
rmContext.getSystemMetricsPublisher().appUpdated(app,
System.currentTimeMillis());
}
/**
* moveToQueue will invoke scheduler api to perform move queue operation.
*
* @param applicationId
* Application Id.
* @param targetQueue
* Target queue to which this app has to be moved.
* @throws YarnException
* Handle exceptions.
*/
public void moveApplicationAcrossQueue(ApplicationId applicationId, String targetQueue)
throws YarnException {
RMApp app = this.rmContext.getRMApps().get(applicationId);
// Capacity scheduler will directly follow below approach.
// 1. Do a pre-validate check to ensure that changes are fine.
// 2. Update this information to state-store
// 3. Perform real move operation and update in-memory data structures.
synchronized (applicationId) {
if (app == null || app.isAppInCompletedStates()) {
return;
}
String sourceQueue = app.getQueue();
// 1. pre-validate move application request to check for any access
// violations or other errors. If there are any violations, YarnException
// will be thrown.
rmContext.getScheduler().preValidateMoveApplication(applicationId,
targetQueue);
// 2. Update to state store with new queue and throw exception is failed.
updateAppDataToStateStore(targetQueue, app, false);
// 3. Perform the real move application
String queue = "";
try {
queue = rmContext.getScheduler().moveApplication(applicationId,
targetQueue);
} catch (YarnException e) {
// Revert to source queue since in-memory move has failed. Chances
// of this is very rare as we have already done the pre-validation.
updateAppDataToStateStore(sourceQueue, app, true);
throw e;
}
// update in-memory
if (queue != null && !queue.isEmpty()) {
app.setQueue(queue);
}
}
rmContext.getSystemMetricsPublisher().appUpdated(app,
System.currentTimeMillis());
}
private void updateAppDataToStateStore(String queue, RMApp app,
boolean toSuppressException) throws YarnException {
// Create a future object to capture exceptions from StateStore.
SettableFuture<Object> future = SettableFuture.create();
// Update new queue in Submission Context to update to StateStore.
app.getApplicationSubmissionContext().setQueue(queue);
ApplicationStateData appState = ApplicationStateData.newInstance(
app.getSubmitTime(), app.getStartTime(),
app.getApplicationSubmissionContext(), app.getUser(),
app.getCallerContext());
appState.setApplicationTimeouts(app.getApplicationTimeouts());
appState.setLaunchTime(app.getLaunchTime());
rmContext.getStateStore().updateApplicationStateSynchronously(appState,
false, future);
try {
Futures.get(future, YarnException.class);
} catch (YarnException ex) {
if (!toSuppressException) {
throw ex;
}
LOG.error("Statestore update failed for move application '"
+ app.getApplicationId() + "' to queue '" + queue
+ "' with below exception:" + ex.getMessage());
}
}
}