blob: 77be8da446cfc676d15029fe0767ac1bc290be97 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.security.AccessControlException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.UnrecognizedOptionException;
import org.apache.commons.lang.math.LongRange;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.ReservationACL;
import org.apache.hadoop.yarn.api.records.ReservationAllocationState;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceProfilesManager;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInputValidator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppKillByClientEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeSignalContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ReservationsACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.UTCClock;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
/**
* The client interface to the Resource Manager. This module handles all the rpc
* interfaces to the resource manager from the client.
*/
public class ClientRMService extends AbstractService implements
ApplicationClientProtocol {
private static final ArrayList<ApplicationReport> EMPTY_APPS_REPORT = new ArrayList<ApplicationReport>();
private static final Log LOG = LogFactory.getLog(ClientRMService.class);
final private AtomicInteger applicationCounter = new AtomicInteger(0);
final private YarnScheduler scheduler;
final private RMContext rmContext;
private final RMAppManager rmAppManager;
private Server server;
protected RMDelegationTokenSecretManager rmDTSecretManager;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
InetSocketAddress clientBindAddress;
private final ApplicationACLsManager applicationsACLsManager;
private final QueueACLsManager queueACLsManager;
// For Reservation APIs
private Clock clock;
private ReservationSystem reservationSystem;
private ReservationInputValidator rValidator;
private static final EnumSet<RMAppState> COMPLETED_APP_STATES = EnumSet.of(
RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED,
RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING);
private static final EnumSet<RMAppState> ACTIVE_APP_STATES = EnumSet.of(
RMAppState.ACCEPTED, RMAppState.RUNNING);
private ResourceProfilesManager resourceProfilesManager;
public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
QueueACLsManager queueACLsManager,
RMDelegationTokenSecretManager rmDTSecretManager) {
this(rmContext, scheduler, rmAppManager, applicationACLsManager,
queueACLsManager, rmDTSecretManager, new UTCClock());
}
public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager,
QueueACLsManager queueACLsManager,
RMDelegationTokenSecretManager rmDTSecretManager, Clock clock) {
super(ClientRMService.class.getName());
this.scheduler = scheduler;
this.rmContext = rmContext;
this.rmAppManager = rmAppManager;
this.applicationsACLsManager = applicationACLsManager;
this.queueACLsManager = queueACLsManager;
this.rmDTSecretManager = rmDTSecretManager;
this.reservationSystem = rmContext.getReservationSystem();
this.clock = clock;
this.rValidator = new ReservationInputValidator(clock);
resourceProfilesManager = rmContext.getResourceProfilesManager();
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
clientBindAddress = getBindAddress(conf);
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
this.server =
rpc.getServer(ApplicationClientProtocol.class, this,
clientBindAddress,
conf, this.rmDTSecretManager,
conf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT));
// Enable service authorization?
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
InputStream inputStream =
this.rmContext.getConfigurationProvider()
.getConfigurationInputStream(conf,
YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE);
if (inputStream != null) {
conf.addResource(inputStream);
}
refreshServiceAcls(conf, RMPolicyProvider.getInstance());
}
this.server.start();
clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
server.getListenerAddress());
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
if (this.server != null) {
this.server.stop();
}
super.serviceStop();
}
InetSocketAddress getBindAddress(Configuration conf) {
return conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
}
@Private
public InetSocketAddress getBindAddress() {
return clientBindAddress;
}
/**
* check if the calling user has the access to application information.
* @param callerUGI
* @param owner
* @param operationPerformed
* @param application
* @return
*/
private boolean checkAccess(UserGroupInformation callerUGI, String owner,
ApplicationAccessType operationPerformed, RMApp application) {
return applicationsACLsManager
.checkAccess(callerUGI, operationPerformed, owner,
application.getApplicationId()) || queueACLsManager
.checkAccess(callerUGI, QueueACL.ADMINISTER_QUEUE, application,
Server.getRemoteAddress(), null);
}
ApplicationId getNewApplicationId() {
ApplicationId applicationId = org.apache.hadoop.yarn.server.utils.BuilderUtils
.newApplicationId(recordFactory, ResourceManager.getClusterTimeStamp(),
applicationCounter.incrementAndGet());
LOG.info("Allocated new applicationId: " + applicationId.getId());
return applicationId;
}
@Override
public GetNewApplicationResponse getNewApplication(
GetNewApplicationRequest request) throws YarnException {
GetNewApplicationResponse response = recordFactory
.newRecordInstance(GetNewApplicationResponse.class);
response.setApplicationId(getNewApplicationId());
// Pick up min/max resource from scheduler...
response.setMaximumResourceCapability(scheduler
.getMaximumResourceCapability());
return response;
}
/**
* It gives response which includes application report if the application
* present otherwise throws ApplicationNotFoundException.
*/
@Override
public GetApplicationReportResponse getApplicationReport(
GetApplicationReportRequest request) throws YarnException {
ApplicationId applicationId = request.getApplicationId();
if (applicationId == null) {
throw new ApplicationNotFoundException("Invalid application id: null");
}
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
throw RPCUtil.getRemoteException(ie);
}
RMApp application = this.rmContext.getRMApps().get(applicationId);
if (application == null) {
// If the RM doesn't have the application, throw
// ApplicationNotFoundException and let client to handle.
throw new ApplicationNotFoundException("Application with id '"
+ applicationId + "' doesn't exist in RM.");
}
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.VIEW_APP, application);
ApplicationReport report =
application.createAndGetApplicationReport(callerUGI.getUserName(),
allowAccess);
GetApplicationReportResponse response = recordFactory
.newRecordInstance(GetApplicationReportResponse.class);
response.setApplicationReport(report);
return response;
}
@Override
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
GetApplicationAttemptReportRequest request) throws YarnException,
IOException {
ApplicationAttemptId appAttemptId = request.getApplicationAttemptId();
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
throw RPCUtil.getRemoteException(ie);
}
RMApp application = this.rmContext.getRMApps().get(
appAttemptId.getApplicationId());
if (application == null) {
// If the RM doesn't have the application, throw
// ApplicationNotFoundException and let client to handle.
throw new ApplicationNotFoundException("Application with id '"
+ request.getApplicationAttemptId().getApplicationId()
+ "' doesn't exist in RM.");
}
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.VIEW_APP, application);
GetApplicationAttemptReportResponse response = null;
if (allowAccess) {
RMAppAttempt appAttempt = application.getAppAttempts().get(appAttemptId);
if (appAttempt == null) {
throw new ApplicationAttemptNotFoundException(
"ApplicationAttempt with id '" + appAttemptId +
"' doesn't exist in RM.");
}
ApplicationAttemptReport attemptReport = appAttempt
.createApplicationAttemptReport();
response = GetApplicationAttemptReportResponse.newInstance(attemptReport);
}else{
throw new YarnException("User " + callerUGI.getShortUserName()
+ " does not have privilage to see this attempt " + appAttemptId);
}
return response;
}
@Override
public GetApplicationAttemptsResponse getApplicationAttempts(
GetApplicationAttemptsRequest request) throws YarnException, IOException {
ApplicationId appId = request.getApplicationId();
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
throw RPCUtil.getRemoteException(ie);
}
RMApp application = this.rmContext.getRMApps().get(appId);
if (application == null) {
// If the RM doesn't have the application, throw
// ApplicationNotFoundException and let client to handle.
throw new ApplicationNotFoundException("Application with id '" + appId
+ "' doesn't exist in RM.");
}
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.VIEW_APP, application);
GetApplicationAttemptsResponse response = null;
if (allowAccess) {
Map<ApplicationAttemptId, RMAppAttempt> attempts = application
.getAppAttempts();
List<ApplicationAttemptReport> listAttempts =
new ArrayList<ApplicationAttemptReport>();
Iterator<Map.Entry<ApplicationAttemptId, RMAppAttempt>> iter = attempts
.entrySet().iterator();
while (iter.hasNext()) {
listAttempts.add(iter.next().getValue()
.createApplicationAttemptReport());
}
response = GetApplicationAttemptsResponse.newInstance(listAttempts);
} else {
throw new YarnException("User " + callerUGI.getShortUserName()
+ " does not have privilage to see this application " + appId);
}
return response;
}
/*
* (non-Javadoc)
*
* we're going to fix the issue of showing non-running containers of the
* running application in YARN-1794
*/
@Override
public GetContainerReportResponse getContainerReport(
GetContainerReportRequest request) throws YarnException, IOException {
ContainerId containerId = request.getContainerId();
ApplicationAttemptId appAttemptId = containerId.getApplicationAttemptId();
ApplicationId appId = appAttemptId.getApplicationId();
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
throw RPCUtil.getRemoteException(ie);
}
RMApp application = this.rmContext.getRMApps().get(appId);
if (application == null) {
// If the RM doesn't have the application, throw
// ApplicationNotFoundException and let client to handle.
throw new ApplicationNotFoundException("Application with id '" + appId
+ "' doesn't exist in RM.");
}
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.VIEW_APP, application);
GetContainerReportResponse response = null;
if (allowAccess) {
RMAppAttempt appAttempt = application.getAppAttempts().get(appAttemptId);
if (appAttempt == null) {
throw new ApplicationAttemptNotFoundException(
"ApplicationAttempt with id '" + appAttemptId +
"' doesn't exist in RM.");
}
RMContainer rmConatiner = this.rmContext.getScheduler().getRMContainer(
containerId);
if (rmConatiner == null) {
throw new ContainerNotFoundException("Container with id '" + containerId
+ "' doesn't exist in RM.");
}
response = GetContainerReportResponse.newInstance(rmConatiner
.createContainerReport());
} else {
throw new YarnException("User " + callerUGI.getShortUserName()
+ " does not have privilage to see this application " + appId);
}
return response;
}
/*
* (non-Javadoc)
*
* we're going to fix the issue of showing non-running containers of the
* running application in YARN-1794"
*/
@Override
public GetContainersResponse getContainers(GetContainersRequest request)
throws YarnException, IOException {
ApplicationAttemptId appAttemptId = request.getApplicationAttemptId();
ApplicationId appId = appAttemptId.getApplicationId();
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
throw RPCUtil.getRemoteException(ie);
}
RMApp application = this.rmContext.getRMApps().get(appId);
if (application == null) {
// If the RM doesn't have the application, throw
// ApplicationNotFoundException and let client to handle.
throw new ApplicationNotFoundException("Application with id '" + appId
+ "' doesn't exist in RM.");
}
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.VIEW_APP, application);
GetContainersResponse response = null;
if (allowAccess) {
RMAppAttempt appAttempt = application.getAppAttempts().get(appAttemptId);
if (appAttempt == null) {
throw new ApplicationAttemptNotFoundException(
"ApplicationAttempt with id '" + appAttemptId +
"' doesn't exist in RM.");
}
Collection<RMContainer> rmContainers = Collections.emptyList();
SchedulerAppReport schedulerAppReport =
this.rmContext.getScheduler().getSchedulerAppInfo(appAttemptId);
if (schedulerAppReport != null) {
rmContainers = schedulerAppReport.getLiveContainers();
}
List<ContainerReport> listContainers = new ArrayList<ContainerReport>();
for (RMContainer rmContainer : rmContainers) {
listContainers.add(rmContainer.createContainerReport());
}
response = GetContainersResponse.newInstance(listContainers);
} else {
throw new YarnException("User " + callerUGI.getShortUserName()
+ " does not have privilage to see this application " + appId);
}
return response;
}
@Override
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException, IOException {
ApplicationSubmissionContext submissionContext = request
.getApplicationSubmissionContext();
ApplicationId applicationId = submissionContext.getApplicationId();
CallerContext callerContext = CallerContext.getCurrent();
// ApplicationSubmissionContext needs to be validated for safety - only
// those fields that are independent of the RM's configuration will be
// checked here, those that are dependent on RM configuration are validated
// in RMAppManager.
String user = null;
try {
// Safety
user = UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException ie) {
LOG.warn("Unable to get the current user.", ie);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
ie.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId, callerContext);
throw RPCUtil.getRemoteException(ie);
}
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
// Sanity check for flow run
String value = null;
try {
for (String tag : submissionContext.getApplicationTags()) {
if (tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX + ":") ||
tag.startsWith(
TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) {
value = tag.substring(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length()
+ 1);
Long.valueOf(value);
}
}
} catch (NumberFormatException e) {
LOG.warn("Invalid to flow run: " + value +
". Flow run should be a long integer", e);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
e.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId);
throw RPCUtil.getRemoteException(e);
}
}
// Check whether app has already been put into rmContext,
// If it is, simply return the response
if (rmContext.getRMApps().get(applicationId) != null) {
LOG.info("This is an earlier submitted application: " + applicationId);
return SubmitApplicationResponse.newInstance();
}
if (submissionContext.getQueue() == null) {
submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
}
if (submissionContext.getApplicationName() == null) {
submissionContext.setApplicationName(
YarnConfiguration.DEFAULT_APPLICATION_NAME);
}
if (submissionContext.getApplicationType() == null) {
submissionContext
.setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);
} else {
if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) {
submissionContext.setApplicationType(submissionContext
.getApplicationType().substring(0,
YarnConfiguration.APPLICATION_TYPE_LENGTH));
}
}
ReservationId reservationId = request.getApplicationSubmissionContext()
.getReservationID();
checkReservationACLs(submissionContext.getQueue(), AuditConstants
.SUBMIT_RESERVATION_REQUEST, reservationId);
try {
// call RMAppManager to submit application directly
rmAppManager.submitApplication(submissionContext,
System.currentTimeMillis(), user);
LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user);
RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
"ClientRMService", applicationId, callerContext);
} catch (YarnException e) {
LOG.info("Exception in submitting application with id " +
applicationId.getId(), e);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
e.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId, callerContext);
throw e;
}
SubmitApplicationResponse response = recordFactory
.newRecordInstance(SubmitApplicationResponse.class);
return response;
}
@SuppressWarnings("unchecked")
@Override
public FailApplicationAttemptResponse failApplicationAttempt(
FailApplicationAttemptRequest request) throws YarnException {
ApplicationAttemptId attemptId = request.getApplicationAttemptId();
ApplicationId applicationId = attemptId.getApplicationId();
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
RMAuditLogger.logFailure("UNKNOWN", AuditConstants.FAIL_ATTEMPT_REQUEST,
"UNKNOWN", "ClientRMService" , "Error getting UGI",
applicationId, attemptId);
throw RPCUtil.getRemoteException(ie);
}
RMApp application = this.rmContext.getRMApps().get(applicationId);
if (application == null) {
RMAuditLogger.logFailure(callerUGI.getUserName(),
AuditConstants.FAIL_ATTEMPT_REQUEST, "UNKNOWN", "ClientRMService",
"Trying to fail an attempt of an absent application", applicationId,
attemptId);
throw new ApplicationNotFoundException("Trying to fail an attempt "
+ attemptId + " of an absent application " + applicationId);
}
RMAppAttempt appAttempt = application.getAppAttempts().get(attemptId);
if (appAttempt == null) {
throw new ApplicationAttemptNotFoundException(
"ApplicationAttempt with id '" + attemptId + "' doesn't exist in RM.");
}
if (!checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.MODIFY_APP, application)) {
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.FAIL_ATTEMPT_REQUEST,
"User doesn't have permissions to "
+ ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
AuditConstants.UNAUTHORIZED_USER, applicationId);
throw RPCUtil.getRemoteException(new AccessControlException("User "
+ callerUGI.getShortUserName() + " cannot perform operation "
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
}
FailApplicationAttemptResponse response =
recordFactory.newRecordInstance(FailApplicationAttemptResponse.class);
if (!ACTIVE_APP_STATES.contains(application.getState())) {
if (COMPLETED_APP_STATES.contains(application.getState())) {
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.FAIL_ATTEMPT_REQUEST, "ClientRMService",
applicationId);
return response;
}
}
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(attemptId, RMAppAttemptEventType.FAIL,
"Attempt failed by user."));
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.FAIL_ATTEMPT_REQUEST, "ClientRMService", applicationId,
attemptId);
return response;
}
@SuppressWarnings("unchecked")
@Override
public KillApplicationResponse forceKillApplication(
KillApplicationRequest request) throws YarnException {
ApplicationId applicationId = request.getApplicationId();
CallerContext callerContext = CallerContext.getCurrent();
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
RMAuditLogger.logFailure("UNKNOWN", AuditConstants.KILL_APP_REQUEST,
"UNKNOWN", "ClientRMService" , "Error getting UGI",
applicationId, callerContext);
throw RPCUtil.getRemoteException(ie);
}
RMApp application = this.rmContext.getRMApps().get(applicationId);
if (application == null) {
RMAuditLogger.logFailure(callerUGI.getUserName(),
AuditConstants.KILL_APP_REQUEST, "UNKNOWN", "ClientRMService",
"Trying to kill an absent application", applicationId, callerContext);
throw new ApplicationNotFoundException("Trying to kill an absent"
+ " application " + applicationId);
}
if (!checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.MODIFY_APP, application)) {
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.KILL_APP_REQUEST,
"User doesn't have permissions to "
+ ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
AuditConstants.UNAUTHORIZED_USER, applicationId, callerContext);
throw RPCUtil.getRemoteException(new AccessControlException("User "
+ callerUGI.getShortUserName() + " cannot perform operation "
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
}
if (application.isAppFinalStateStored()) {
return KillApplicationResponse.newInstance(true);
}
StringBuilder message = new StringBuilder();
message.append("Application ").append(applicationId)
.append(" was killed by user ").append(callerUGI.getShortUserName());
InetAddress remoteAddress = Server.getRemoteIp();
if (null != remoteAddress) {
message.append(" at ").append(remoteAddress.getHostAddress());
}
String diagnostics = org.apache.commons.lang.StringUtils
.trimToNull(request.getDiagnostics());
if (diagnostics != null) {
message.append(" with diagnostic message: ");
message.append(diagnostics);
}
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppKillByClientEvent(applicationId, message.toString(),
callerUGI, remoteAddress));
// For UnmanagedAMs, return true so they don't retry
return KillApplicationResponse.newInstance(
application.getApplicationSubmissionContext().getUnmanagedAM());
}
@Override
public GetClusterMetricsResponse getClusterMetrics(
GetClusterMetricsRequest request) throws YarnException {
GetClusterMetricsResponse response = recordFactory
.newRecordInstance(GetClusterMetricsResponse.class);
YarnClusterMetrics ymetrics = recordFactory
.newRecordInstance(YarnClusterMetrics.class);
ymetrics.setNumNodeManagers(this.rmContext.getRMNodes().size());
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
ymetrics.setNumDecommissionedNodeManagers(clusterMetrics
.getNumDecommisionedNMs());
ymetrics.setNumActiveNodeManagers(clusterMetrics.getNumActiveNMs());
ymetrics.setNumLostNodeManagers(clusterMetrics.getNumLostNMs());
ymetrics.setNumUnhealthyNodeManagers(clusterMetrics.getUnhealthyNMs());
ymetrics.setNumRebootedNodeManagers(clusterMetrics.getNumRebootedNMs());
response.setClusterMetrics(ymetrics);
return response;
}
@Override
public GetApplicationsResponse getApplications(
GetApplicationsRequest request) throws YarnException {
return getApplications(request, true);
}
/**
* Get applications matching the {@link GetApplicationsRequest}. If
* caseSensitive is set to false, applicationTypes in
* GetApplicationRequest are expected to be in all-lowercase
*/
@Private
public GetApplicationsResponse getApplications(
GetApplicationsRequest request, boolean caseSensitive)
throws YarnException {
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
throw RPCUtil.getRemoteException(ie);
}
Set<String> applicationTypes = request.getApplicationTypes();
EnumSet<YarnApplicationState> applicationStates =
request.getApplicationStates();
Set<String> users = request.getUsers();
Set<String> queues = request.getQueues();
Set<String> tags = request.getApplicationTags();
long limit = request.getLimit();
LongRange start = request.getStartRange();
LongRange finish = request.getFinishRange();
ApplicationsRequestScope scope = request.getScope();
final Map<ApplicationId, RMApp> apps = rmContext.getRMApps();
Iterator<RMApp> appsIter;
// If the query filters by queues, we can avoid considering apps outside
// of those queues by asking the scheduler for the apps in those queues.
if (queues != null && !queues.isEmpty()) {
// Construct an iterator over apps in given queues
// Collect list of lists to avoid copying all apps
final List<List<ApplicationAttemptId>> queueAppLists =
new ArrayList<List<ApplicationAttemptId>>();
for (String queue : queues) {
List<ApplicationAttemptId> appsInQueue = scheduler.getAppsInQueue(queue);
if (appsInQueue != null && !appsInQueue.isEmpty()) {
queueAppLists.add(appsInQueue);
}
}
appsIter = new Iterator<RMApp>() {
Iterator<List<ApplicationAttemptId>> appListIter = queueAppLists.iterator();
Iterator<ApplicationAttemptId> schedAppsIter;
@Override
public boolean hasNext() {
// Because queueAppLists has no empty lists, hasNext is whether the
// current list hasNext or whether there are any remaining lists
return (schedAppsIter != null && schedAppsIter.hasNext())
|| appListIter.hasNext();
}
@Override
public RMApp next() {
if (schedAppsIter == null || !schedAppsIter.hasNext()) {
schedAppsIter = appListIter.next().iterator();
}
return apps.get(schedAppsIter.next().getApplicationId());
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove not supported");
}
};
} else {
appsIter = apps.values().iterator();
}
List<ApplicationReport> reports = new ArrayList<ApplicationReport>();
while (appsIter.hasNext() && reports.size() < limit) {
RMApp application = appsIter.next();
// Check if current application falls under the specified scope
if (scope == ApplicationsRequestScope.OWN &&
!callerUGI.getUserName().equals(application.getUser())) {
continue;
}
if (applicationTypes != null && !applicationTypes.isEmpty()) {
String appTypeToMatch = caseSensitive
? application.getApplicationType()
: StringUtils.toLowerCase(application.getApplicationType());
if (!applicationTypes.contains(appTypeToMatch)) {
continue;
}
}
if (applicationStates != null && !applicationStates.isEmpty()) {
if (!applicationStates.contains(application
.createApplicationState())) {
continue;
}
}
if (users != null && !users.isEmpty() &&
!users.contains(application.getUser())) {
continue;
}
if (start != null && !start.containsLong(application.getStartTime())) {
continue;
}
if (finish != null && !finish.containsLong(application.getFinishTime())) {
continue;
}
if (tags != null && !tags.isEmpty()) {
Set<String> appTags = application.getApplicationTags();
if (appTags == null || appTags.isEmpty()) {
continue;
}
boolean match = false;
for (String tag : tags) {
if (appTags.contains(tag)) {
match = true;
break;
}
}
if (!match) {
continue;
}
}
// checkAccess can grab the scheduler lock so call it last
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.VIEW_APP, application);
if (scope == ApplicationsRequestScope.VIEWABLE && !allowAccess) {
continue;
}
reports.add(application.createAndGetApplicationReport(
callerUGI.getUserName(), allowAccess));
}
GetApplicationsResponse response =
recordFactory.newRecordInstance(GetApplicationsResponse.class);
response.setApplicationList(reports);
return response;
}
@Override
public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
throws YarnException {
GetClusterNodesResponse response =
recordFactory.newRecordInstance(GetClusterNodesResponse.class);
EnumSet<NodeState> nodeStates = request.getNodeStates();
if (nodeStates == null || nodeStates.isEmpty()) {
nodeStates = EnumSet.allOf(NodeState.class);
}
Collection<RMNode> nodes = RMServerUtils.queryRMNodes(rmContext,
nodeStates);
List<NodeReport> nodeReports = new ArrayList<NodeReport>(nodes.size());
for (RMNode nodeInfo : nodes) {
nodeReports.add(createNodeReports(nodeInfo));
}
response.setNodeReports(nodeReports);
return response;
}
@Override
public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
throws YarnException {
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
throw RPCUtil.getRemoteException(ie);
}
GetQueueInfoResponse response =
recordFactory.newRecordInstance(GetQueueInfoResponse.class);
try {
QueueInfo queueInfo =
scheduler.getQueueInfo(request.getQueueName(),
request.getIncludeChildQueues(),
request.getRecursive());
List<ApplicationReport> appReports = EMPTY_APPS_REPORT;
if (request.getIncludeApplications()) {
List<ApplicationAttemptId> apps =
scheduler.getAppsInQueue(request.getQueueName());
appReports = new ArrayList<ApplicationReport>(apps.size());
for (ApplicationAttemptId app : apps) {
RMApp rmApp = rmContext.getRMApps().get(app.getApplicationId());
if (rmApp != null) {
// Check if user is allowed access to this app
if (!checkAccess(callerUGI, rmApp.getUser(),
ApplicationAccessType.VIEW_APP, rmApp)) {
continue;
}
appReports.add(
rmApp.createAndGetApplicationReport(
callerUGI.getUserName(), true));
}
}
}
queueInfo.setApplications(appReports);
response.setQueueInfo(queueInfo);
} catch (IOException ioe) {
LOG.info("Failed to getQueueInfo for " + request.getQueueName(), ioe);
}
return response;
}
private NodeReport createNodeReports(RMNode rmNode) {
SchedulerNodeReport schedulerNodeReport =
scheduler.getNodeReport(rmNode.getNodeID());
Resource used = BuilderUtils.newResource(0, 0);
int numContainers = 0;
if (schedulerNodeReport != null) {
used = schedulerNodeReport.getUsedResource();
numContainers = schedulerNodeReport.getNumContainers();
}
NodeReport report =
BuilderUtils.newNodeReport(rmNode.getNodeID(), rmNode.getState(),
rmNode.getHttpAddress(), rmNode.getRackName(), used,
rmNode.getTotalCapability(), numContainers,
rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
rmNode.getNodeLabels(), rmNode.getAggregatedContainersUtilization(),
rmNode.getNodeUtilization());
return report;
}
@Override
public GetQueueUserAclsInfoResponse getQueueUserAcls(
GetQueueUserAclsInfoRequest request) throws YarnException {
GetQueueUserAclsInfoResponse response =
recordFactory.newRecordInstance(GetQueueUserAclsInfoResponse.class);
response.setUserAclsInfoList(scheduler.getQueueUserAclInfo());
return response;
}
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnException {
try {
// Verify that the connection is kerberos authenticated
if (!isAllowedDelegationTokenOp()) {
throw new IOException(
"Delegation Token can be issued only with kerberos authentication");
}
GetDelegationTokenResponse response =
recordFactory.newRecordInstance(GetDelegationTokenResponse.class);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
Text owner = new Text(ugi.getUserName());
Text realUser = null;
if (ugi.getRealUser() != null) {
realUser = new Text(ugi.getRealUser().getUserName());
}
RMDelegationTokenIdentifier tokenIdentifier =
new RMDelegationTokenIdentifier(owner, new Text(request.getRenewer()),
realUser);
Token<RMDelegationTokenIdentifier> realRMDTtoken =
new Token<RMDelegationTokenIdentifier>(tokenIdentifier,
this.rmDTSecretManager);
response.setRMDelegationToken(
BuilderUtils.newDelegationToken(
realRMDTtoken.getIdentifier(),
realRMDTtoken.getKind().toString(),
realRMDTtoken.getPassword(),
realRMDTtoken.getService().toString()
));
return response;
} catch(IOException io) {
throw RPCUtil.getRemoteException(io);
}
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnException {
try {
if (!isAllowedDelegationTokenOp()) {
throw new IOException(
"Delegation Token can be renewed only with kerberos authentication");
}
org.apache.hadoop.yarn.api.records.Token protoToken = request.getDelegationToken();
Token<RMDelegationTokenIdentifier> token = new Token<RMDelegationTokenIdentifier>(
protoToken.getIdentifier().array(), protoToken.getPassword().array(),
new Text(protoToken.getKind()), new Text(protoToken.getService()));
String user = getRenewerForToken(token);
long nextExpTime = rmDTSecretManager.renewToken(token, user);
RenewDelegationTokenResponse renewResponse = Records
.newRecord(RenewDelegationTokenResponse.class);
renewResponse.setNextExpirationTime(nextExpTime);
return renewResponse;
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnException {
try {
if (!isAllowedDelegationTokenOp()) {
throw new IOException(
"Delegation Token can be cancelled only with kerberos authentication");
}
org.apache.hadoop.yarn.api.records.Token protoToken = request.getDelegationToken();
Token<RMDelegationTokenIdentifier> token = new Token<RMDelegationTokenIdentifier>(
protoToken.getIdentifier().array(), protoToken.getPassword().array(),
new Text(protoToken.getKind()), new Text(protoToken.getService()));
String user = UserGroupInformation.getCurrentUser().getUserName();
rmDTSecretManager.cancelToken(token, user);
return Records.newRecord(CancelDelegationTokenResponse.class);
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
}
@SuppressWarnings("unchecked")
@Override
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
MoveApplicationAcrossQueuesRequest request) throws YarnException {
ApplicationId applicationId = request.getApplicationId();
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
RMAuditLogger.logFailure("UNKNOWN", AuditConstants.MOVE_APP_REQUEST,
"UNKNOWN", "ClientRMService" , "Error getting UGI",
applicationId);
throw RPCUtil.getRemoteException(ie);
}
RMApp application = this.rmContext.getRMApps().get(applicationId);
if (application == null) {
RMAuditLogger.logFailure(callerUGI.getUserName(),
AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService",
"Trying to move an absent application", applicationId);
throw new ApplicationNotFoundException("Trying to move an absent"
+ " application " + applicationId);
}
if (!checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.MODIFY_APP, application)) {
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.MOVE_APP_REQUEST,
"User doesn't have permissions to "
+ ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
AuditConstants.UNAUTHORIZED_USER, applicationId);
throw RPCUtil.getRemoteException(new AccessControlException("User "
+ callerUGI.getShortUserName() + " cannot perform operation "
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
}
// Moves only allowed when app is in a state that means it is tracked by
// the scheduler. Introducing SUBMITTED state also to this list as there
// could be a corner scenario that app may not be in Scheduler in SUBMITTED
// state.
if (!ACTIVE_APP_STATES.contains(application.getState())) {
String msg = "App in " + application.getState() + " state cannot be moved.";
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService", msg);
throw new YarnException(msg);
}
try {
this.rmAppManager.moveApplicationAcrossQueue(applicationId, request.getTargetQueue());
} catch (YarnException ex) {
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService",
ex.getMessage());
throw ex;
}
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.MOVE_APP_REQUEST, "ClientRMService" , applicationId);
MoveApplicationAcrossQueuesResponse response = recordFactory
.newRecordInstance(MoveApplicationAcrossQueuesResponse.class);
return response;
}
private String getRenewerForToken(Token<RMDelegationTokenIdentifier> token)
throws IOException {
UserGroupInformation user = UserGroupInformation.getCurrentUser();
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
// we can always renew our own tokens
return loginUser.getUserName().equals(user.getUserName())
? token.decodeIdentifier().getRenewer().toString()
: user.getShortUserName();
}
void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
this.server.refreshServiceAclWithLoadedConfiguration(configuration,
policyProvider);
}
private boolean isAllowedDelegationTokenOp() throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
return EnumSet.of(AuthenticationMethod.KERBEROS,
AuthenticationMethod.KERBEROS_SSL,
AuthenticationMethod.CERTIFICATE)
.contains(UserGroupInformation.getCurrentUser()
.getRealAuthenticationMethod());
} else {
return false;
}
}
@VisibleForTesting
public Server getServer() {
return this.server;
}
@Override
public GetNewReservationResponse getNewReservation(
GetNewReservationRequest request) throws YarnException, IOException {
checkReservationSytem(AuditConstants.CREATE_NEW_RESERVATION_REQUEST);
GetNewReservationResponse response =
recordFactory.newRecordInstance(GetNewReservationResponse.class);
ReservationId reservationId = reservationSystem.getNewReservationId();
response.setReservationId(reservationId);
// Create a new Reservation Id
return response;
}
@Override
public ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException {
// Check if reservation system is enabled
checkReservationSytem(AuditConstants.SUBMIT_RESERVATION_REQUEST);
ReservationSubmissionResponse response =
recordFactory.newRecordInstance(ReservationSubmissionResponse.class);
ReservationId reservationId = request.getReservationId();
// Validate the input
Plan plan =
rValidator.validateReservationSubmissionRequest(reservationSystem,
request, reservationId);
ReservationAllocation allocation = plan.getReservationById(reservationId);
if (allocation != null) {
boolean isNewDefinition = !allocation.getReservationDefinition().equals(
request.getReservationDefinition());
if (isNewDefinition) {
String message = "Reservation allocation already exists with the " +
"reservation id " + reservationId.toString() + ", but a different" +
" reservation definition was provided. Please try again with a " +
"new reservation id, or consider updating the reservation instead.";
throw RPCUtil.getRemoteException(message);
} else {
return response;
}
}
// Check ACLs
String queueName = request.getQueue();
String user =
checkReservationACLs(queueName,
AuditConstants.SUBMIT_RESERVATION_REQUEST, null);
try {
// Try to place the reservation using the agent
boolean result =
plan.getReservationAgent().createReservation(reservationId, user,
plan, request.getReservationDefinition());
if (result) {
// add the reservation id to valid ones maintained by reservation
// system
reservationSystem.setQueueForReservation(reservationId, queueName);
// create the reservation synchronously if required
refreshScheduler(queueName, request.getReservationDefinition(),
reservationId.toString());
// return the reservation id
}
} catch (PlanningException e) {
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_RESERVATION_REQUEST,
e.getMessage(), "ClientRMService",
"Unable to create the reservation: " + reservationId);
throw RPCUtil.getRemoteException(e);
}
RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_RESERVATION_REQUEST,
"ClientRMService: " + reservationId);
return response;
}
@Override
public ReservationUpdateResponse updateReservation(
ReservationUpdateRequest request) throws YarnException, IOException {
// Check if reservation system is enabled
checkReservationSytem(AuditConstants.UPDATE_RESERVATION_REQUEST);
ReservationUpdateResponse response =
recordFactory.newRecordInstance(ReservationUpdateResponse.class);
// Validate the input
Plan plan =
rValidator.validateReservationUpdateRequest(reservationSystem, request);
ReservationId reservationId = request.getReservationId();
String queueName = reservationSystem.getQueueForReservation(reservationId);
// Check ACLs
String user =
checkReservationACLs(queueName,
AuditConstants.UPDATE_RESERVATION_REQUEST, reservationId);
// Try to update the reservation using default agent
try {
boolean result =
plan.getReservationAgent().updateReservation(reservationId, user,
plan, request.getReservationDefinition());
if (!result) {
String errMsg = "Unable to update reservation: " + reservationId;
RMAuditLogger.logFailure(user,
AuditConstants.UPDATE_RESERVATION_REQUEST, errMsg,
"ClientRMService", errMsg);
throw RPCUtil.getRemoteException(errMsg);
}
} catch (PlanningException e) {
RMAuditLogger.logFailure(user, AuditConstants.UPDATE_RESERVATION_REQUEST,
e.getMessage(), "ClientRMService",
"Unable to update the reservation: " + reservationId);
throw RPCUtil.getRemoteException(e);
}
RMAuditLogger.logSuccess(user, AuditConstants.UPDATE_RESERVATION_REQUEST,
"ClientRMService: " + reservationId);
return response;
}
@Override
public ReservationDeleteResponse deleteReservation(
ReservationDeleteRequest request) throws YarnException, IOException {
// Check if reservation system is enabled
checkReservationSytem(AuditConstants.DELETE_RESERVATION_REQUEST);
ReservationDeleteResponse response =
recordFactory.newRecordInstance(ReservationDeleteResponse.class);
// Validate the input
Plan plan =
rValidator.validateReservationDeleteRequest(reservationSystem, request);
ReservationId reservationId = request.getReservationId();
String queueName = reservationSystem.getQueueForReservation(reservationId);
// Check ACLs
String user =
checkReservationACLs(queueName,
AuditConstants.DELETE_RESERVATION_REQUEST, reservationId);
// Try to update the reservation using default agent
try {
boolean result =
plan.getReservationAgent().deleteReservation(reservationId, user,
plan);
if (!result) {
String errMsg = "Could not delete reservation: " + reservationId;
RMAuditLogger.logFailure(user,
AuditConstants.DELETE_RESERVATION_REQUEST, errMsg,
"ClientRMService", errMsg);
throw RPCUtil.getRemoteException(errMsg);
}
} catch (PlanningException e) {
RMAuditLogger.logFailure(user, AuditConstants.DELETE_RESERVATION_REQUEST,
e.getMessage(), "ClientRMService",
"Unable to delete the reservation: " + reservationId);
throw RPCUtil.getRemoteException(e);
}
RMAuditLogger.logSuccess(user, AuditConstants.DELETE_RESERVATION_REQUEST,
"ClientRMService: " + reservationId);
return response;
}
@Override
public ReservationListResponse listReservations(
ReservationListRequest requestInfo) throws YarnException, IOException {
// Check if reservation system is enabled
checkReservationSytem(AuditConstants.LIST_RESERVATION_REQUEST);
ReservationListResponse response =
recordFactory.newRecordInstance(ReservationListResponse.class);
Plan plan = rValidator.validateReservationListRequest(
reservationSystem, requestInfo);
boolean includeResourceAllocations = requestInfo
.getIncludeResourceAllocations();
ReservationId reservationId = null;
if (requestInfo.getReservationId() != null && !requestInfo
.getReservationId().isEmpty()) {
reservationId = ReservationId.parseReservationId(
requestInfo.getReservationId());
}
checkReservationACLs(requestInfo.getQueue(),
AuditConstants.LIST_RESERVATION_REQUEST, reservationId);
long startTime = Math.max(requestInfo.getStartTime(), 0);
long endTime = requestInfo.getEndTime() <= -1? Long.MAX_VALUE : requestInfo
.getEndTime();
Set<ReservationAllocation> reservations;
reservations = plan.getReservations(reservationId, new ReservationInterval(
startTime, endTime));
List<ReservationAllocationState> info =
ReservationSystemUtil.convertAllocationsToReservationInfo(
reservations, includeResourceAllocations);
response.setReservationAllocationState(info);
return response;
}
@Override
public GetNodesToLabelsResponse getNodeToLabels(
GetNodesToLabelsRequest request) throws YarnException, IOException {
RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
GetNodesToLabelsResponse response =
GetNodesToLabelsResponse.newInstance(labelsMgr.getNodeLabelsInfo());
return response;
}
@Override
public GetLabelsToNodesResponse getLabelsToNodes(
GetLabelsToNodesRequest request) throws YarnException, IOException {
RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
if (request.getNodeLabels() == null || request.getNodeLabels().isEmpty()) {
return GetLabelsToNodesResponse.newInstance(
labelsMgr.getLabelsInfoToNodes());
} else {
return GetLabelsToNodesResponse.newInstance(
labelsMgr.getLabelsInfoToNodes(request.getNodeLabels()));
}
}
@Override
public GetClusterNodeLabelsResponse getClusterNodeLabels(
GetClusterNodeLabelsRequest request) throws YarnException, IOException {
RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
GetClusterNodeLabelsResponse response =
GetClusterNodeLabelsResponse.newInstance(
labelsMgr.getClusterNodeLabels());
return response;
}
private void checkReservationSytem(String auditConstant) throws YarnException {
// Check if reservation is enabled
if (reservationSystem == null) {
throw RPCUtil.getRemoteException("Reservation is not enabled."
+ " Please enable & try again");
}
}
private void refreshScheduler(String planName,
ReservationDefinition contract, String reservationId) {
if ((contract.getArrival() - clock.getTime()) < reservationSystem
.getPlanFollowerTimeStep()) {
LOG.debug(MessageFormat
.format(
"Reservation {0} is within threshold so attempting to create synchronously.",
reservationId));
reservationSystem.synchronizePlan(planName, true);
LOG.info(MessageFormat.format("Created reservation {0} synchronously.",
reservationId));
}
}
private String checkReservationACLs(String queueName, String auditConstant,
ReservationId reservationId)
throws YarnException, IOException {
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
RMAuditLogger.logFailure("UNKNOWN", auditConstant, queueName,
"ClientRMService", "Error getting UGI");
throw RPCUtil.getRemoteException(ie);
}
if (reservationSystem == null) {
return callerUGI.getShortUserName();
}
ReservationsACLsManager manager = reservationSystem
.getReservationsACLsManager();
ReservationACL reservationACL = getReservationACLFromAuditConstant(
auditConstant);
if (manager == null) {
return callerUGI.getShortUserName();
}
String reservationCreatorName = "";
ReservationAllocation reservation;
// Get the user associated with the reservation.
Plan plan = reservationSystem.getPlan(queueName);
if (reservationId != null && plan != null) {
reservation = plan.getReservationById(reservationId);
if (reservation != null) {
reservationCreatorName = reservation.getUser();
}
}
// If the reservation to be altered or listed belongs to the current user,
// access will be given.
if (reservationCreatorName != null && !reservationCreatorName.isEmpty()
&& reservationCreatorName.equals(callerUGI.getUserName())) {
return callerUGI.getShortUserName();
}
// Check if the user has access to the specific ACL
if (manager.checkAccess(callerUGI, reservationACL, queueName)) {
return callerUGI.getShortUserName();
}
// If the user has Administer ACL then access is granted
if (manager.checkAccess(callerUGI, ReservationACL
.ADMINISTER_RESERVATIONS, queueName)) {
return callerUGI.getShortUserName();
}
handleNoAccess(callerUGI.getShortUserName(), queueName, auditConstant,
reservationACL.toString(), reservationACL.name());
throw new IllegalStateException();
}
private ReservationACL getReservationACLFromAuditConstant(
String auditConstant) throws YarnException{
if (auditConstant.equals(AuditConstants.SUBMIT_RESERVATION_REQUEST)) {
return ReservationACL.SUBMIT_RESERVATIONS;
} else if (auditConstant.equals(AuditConstants.LIST_RESERVATION_REQUEST)) {
return ReservationACL.LIST_RESERVATIONS;
} else if (auditConstant.equals(AuditConstants.DELETE_RESERVATION_REQUEST)
|| auditConstant.equals(AuditConstants.UPDATE_RESERVATION_REQUEST)) {
return ReservationACL.ADMINISTER_RESERVATIONS;
} else {
String error = "Audit Constant " + auditConstant + " is not recognized.";
LOG.error(error);
throw RPCUtil.getRemoteException(new UnrecognizedOptionException(error));
}
}
private void handleNoAccess(String name, String queue, String auditConstant,
String acl, String op) throws YarnException {
RMAuditLogger.logFailure(
name,
auditConstant,
"User doesn't have permissions to " + acl, "ClientRMService",
auditConstant);
throw RPCUtil.getRemoteException(new AccessControlException("User "
+ name + " cannot perform operation " + op + " on queue " + queue));
}
@Override
public UpdateApplicationPriorityResponse updateApplicationPriority(
UpdateApplicationPriorityRequest request) throws YarnException,
IOException {
ApplicationId applicationId = request.getApplicationId();
Priority newAppPriority = request.getApplicationPriority();
UserGroupInformation callerUGI =
getCallerUgi(applicationId, AuditConstants.UPDATE_APP_PRIORITY);
RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI,
AuditConstants.UPDATE_APP_PRIORITY);
UpdateApplicationPriorityResponse response = recordFactory
.newRecordInstance(UpdateApplicationPriorityResponse.class);
// Update priority only when app is tracked by the scheduler
if (!ACTIVE_APP_STATES.contains(application.getState())) {
if (application.isAppInCompletedStates()) {
// If Application is in any of the final states, change priority
// can be skipped rather throwing exception.
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService",
applicationId);
response.setApplicationPriority(application
.getApplicationPriority());
return response;
}
String msg = "Application in " + application.getState()
+ " state cannot update priority.";
RMAuditLogger
.logFailure(callerUGI.getShortUserName(),
AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService",
msg);
throw new YarnException(msg);
}
try {
rmAppManager.updateApplicationPriority(applicationId, newAppPriority);
} catch (YarnException ex) {
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService",
ex.getMessage());
throw ex;
}
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService", applicationId);
response.setApplicationPriority(application.getApplicationPriority());
return response;
}
/**
* Signal a container.
* After the request passes some sanity check, it will be delivered
* to RMNodeImpl so that the next NM heartbeat will pick up the signal request
*/
@SuppressWarnings("unchecked")
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
ContainerId containerId = request.getContainerId();
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
throw RPCUtil.getRemoteException(ie);
}
ApplicationId applicationId = containerId.getApplicationAttemptId().
getApplicationId();
RMApp application = this.rmContext.getRMApps().get(applicationId);
if (application == null) {
RMAuditLogger.logFailure(callerUGI.getUserName(),
AuditConstants.SIGNAL_CONTAINER, "UNKNOWN", "ClientRMService",
"Trying to signal an absent container", applicationId, containerId, null);
throw RPCUtil
.getRemoteException("Trying to signal an absent container "
+ containerId);
}
if (!checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.MODIFY_APP, application)) {
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.SIGNAL_CONTAINER, "User doesn't have permissions to "
+ ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
AuditConstants.UNAUTHORIZED_USER, applicationId);
throw RPCUtil.getRemoteException(new AccessControlException("User "
+ callerUGI.getShortUserName() + " cannot perform operation "
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
}
RMContainer container = scheduler.getRMContainer(containerId);
if (container != null) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeSignalContainerEvent(container.getContainer().getNodeId(),
request));
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.SIGNAL_CONTAINER, "ClientRMService", applicationId,
containerId, null);
} else {
RMAuditLogger.logFailure(callerUGI.getUserName(),
AuditConstants.SIGNAL_CONTAINER, "UNKNOWN", "ClientRMService",
"Trying to signal an absent container", applicationId, containerId, null);
throw RPCUtil
.getRemoteException("Trying to signal an absent container "
+ containerId);
}
return recordFactory
.newRecordInstance(SignalContainerResponse.class);
}
@Override
public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
UpdateApplicationTimeoutsRequest request)
throws YarnException, IOException {
ApplicationId applicationId = request.getApplicationId();
Map<ApplicationTimeoutType, String> applicationTimeouts =
request.getApplicationTimeouts();
UserGroupInformation callerUGI =
getCallerUgi(applicationId, AuditConstants.UPDATE_APP_TIMEOUTS);
RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI,
AuditConstants.UPDATE_APP_TIMEOUTS);
if (applicationTimeouts.isEmpty()) {
String message =
"At least one ApplicationTimeoutType should be configured"
+ " for updating timeouts.";
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService",
message, applicationId);
throw RPCUtil.getRemoteException(message);
}
UpdateApplicationTimeoutsResponse response = recordFactory
.newRecordInstance(UpdateApplicationTimeoutsResponse.class);
RMAppState state = application.getState();
if (!EnumSet
.of(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppState.RUNNING)
.contains(state)) {
if (COMPLETED_APP_STATES.contains(state)) {
// If Application is in any of the final states, update timeout
// can be skipped rather throwing exception.
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.UPDATE_APP_TIMEOUTS, "ClientRMService",
applicationId);
return response;
}
String msg =
"Application is in " + state + " state can not update timeout.";
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService",
msg);
throw RPCUtil.getRemoteException(msg);
}
try {
rmAppManager.updateApplicationTimeout(application, applicationTimeouts);
} catch (YarnException ex) {
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService",
ex.getMessage());
throw ex;
}
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.UPDATE_APP_TIMEOUTS, "ClientRMService", applicationId);
return response;
}
private UserGroupInformation getCallerUgi(ApplicationId applicationId,
String operation) throws YarnException {
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie) {
LOG.info("Error getting UGI ", ie);
RMAuditLogger.logFailure("UNKNOWN", operation, "UNKNOWN",
"ClientRMService", "Error getting UGI", applicationId);
throw RPCUtil.getRemoteException(ie);
}
return callerUGI;
}
private RMApp verifyUserAccessForRMApp(ApplicationId applicationId,
UserGroupInformation callerUGI, String operation) throws YarnException {
RMApp application = this.rmContext.getRMApps().get(applicationId);
if (application == null) {
RMAuditLogger.logFailure(callerUGI.getUserName(), operation, "UNKNOWN",
"ClientRMService",
"Trying to " + operation + " of an absent application",
applicationId);
throw new ApplicationNotFoundException("Trying to " + operation
+ " of an absent application " + applicationId);
}
if (!checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.MODIFY_APP, application)) {
RMAuditLogger.logFailure(callerUGI.getShortUserName(), operation,
"User doesn't have permissions to "
+ ApplicationAccessType.MODIFY_APP.toString(),
"ClientRMService", AuditConstants.UNAUTHORIZED_USER, applicationId);
throw RPCUtil.getRemoteException(new AccessControlException("User "
+ callerUGI.getShortUserName() + " cannot perform operation "
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
}
return application;
}
@Override
public GetAllResourceProfilesResponse getResourceProfiles(
GetAllResourceProfilesRequest request) throws YarnException, IOException {
GetAllResourceProfilesResponse response =
GetAllResourceProfilesResponse.newInstance();
response.setResourceProfiles(getResourceProfiles());
return response;
}
@Override
public GetResourceProfileResponse getResourceProfile(
GetResourceProfileRequest request) throws YarnException, IOException {
Map<String, Resource> profiles = getResourceProfiles();
if (!profiles.containsKey(request.getProfileName())) {
throw new YarnException(
"Resource profile '" + request.getProfileName() + "' not found");
}
GetResourceProfileResponse response =
GetResourceProfileResponse.newInstance();
response.setResource(
resourceProfilesManager.getProfile(request.getProfileName()));
return response;
}
private Map<String, Resource> getResourceProfiles() throws YarnException {
boolean resourceProfilesEnabled = getConfig()
.getBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED,
YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_ENABLED);
if (!resourceProfilesEnabled) {
throw new YarnException("Resource profiles are not enabled");
}
return resourceProfilesManager.getResourceProfiles();
}
}