| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.nio.ByteBuffer; |
| import java.security.AccessControlException; |
| import java.text.MessageFormat; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.stream.Collectors; |
| import org.apache.commons.cli.UnrecognizedOptionException; |
| import org.apache.commons.lang3.Range; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeysPublic; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.ipc.CallerContext; |
| import org.apache.hadoop.ipc.Server; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; |
| import org.apache.hadoop.security.authorize.PolicyProvider; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.service.AbstractService; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.yarn.api.ApplicationClientProtocol; |
| import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; |
| import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileRequest; |
| import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse; |
| import org.apache.hadoop.yarn.api.records.ApplicationAccessType; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationReport; |
| import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; |
| import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.ContainerReport; |
| import org.apache.hadoop.yarn.api.records.NodeAttribute; |
| import org.apache.hadoop.yarn.api.records.NodeAttributeKey; |
| import org.apache.hadoop.yarn.api.records.NodeAttributeInfo; |
| import org.apache.hadoop.yarn.api.records.NodeReport; |
| import org.apache.hadoop.yarn.api.records.NodeState; |
| import org.apache.hadoop.yarn.api.records.NodeToAttributeValue; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.QueueACL; |
| import org.apache.hadoop.yarn.api.records.QueueInfo; |
| import org.apache.hadoop.yarn.api.records.ReservationACL; |
| import org.apache.hadoop.yarn.api.records.ReservationAllocationState; |
| import org.apache.hadoop.yarn.api.records.ReservationDefinition; |
| import org.apache.hadoop.yarn.api.records.ReservationId; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.api.records.YarnApplicationState; |
| import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; |
| import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; |
| import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.exceptions.YARNFeatureNotEnabledException; |
| import org.apache.hadoop.yarn.factories.RecordFactory; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.ipc.RPCUtil; |
| import org.apache.hadoop.yarn.ipc.YarnRPC; |
| import org.apache.hadoop.yarn.nodelabels.AttributeValue; |
| import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; |
| import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; |
| import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.Keys; |
| import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.preprocessor.SubmissionContextPreProcessor; |
| import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceProfilesManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInputValidator; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil; |
| import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppKillByClientEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; |
| import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeSignalContainerEvent; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; |
| import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; |
| import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.security.ReservationsACLsManager; |
| import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; |
| import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; |
| import org.apache.hadoop.yarn.server.utils.BuilderUtils; |
| import org.apache.hadoop.yarn.util.Clock; |
| import org.apache.hadoop.yarn.util.Records; |
| import org.apache.hadoop.yarn.util.UTCClock; |
| |
| import org.apache.hadoop.classification.VisibleForTesting; |
| import org.apache.hadoop.yarn.util.resource.ResourceUtils; |
| import org.apache.hadoop.yarn.util.resource.Resources; |
| import org.apache.hadoop.yarn.util.timeline.TimelineUtils; |
| |
| |
| /** |
| * The client interface to the Resource Manager. This module handles all the rpc |
| * interfaces to the resource manager from the client. |
| */ |
| public class ClientRMService extends AbstractService implements |
| ApplicationClientProtocol { |
| private static final ArrayList<ApplicationReport> EMPTY_APPS_REPORT = new ArrayList<ApplicationReport>(); |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(ClientRMService.class); |
| |
| final private AtomicInteger applicationCounter = new AtomicInteger(0); |
| final private YarnScheduler scheduler; |
| final private RMContext rmContext; |
| private final RMAppManager rmAppManager; |
| |
| private Server server; |
| protected RMDelegationTokenSecretManager rmDTSecretManager; |
| |
| private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); |
| private InetSocketAddress clientBindAddress; |
| |
| private final ApplicationACLsManager applicationsACLsManager; |
| private final QueueACLsManager queueACLsManager; |
| |
| // For Reservation APIs |
| private Clock clock; |
| private ReservationSystem reservationSystem; |
| private ReservationInputValidator rValidator; |
| |
| private SubmissionContextPreProcessor contextPreProcessor; |
| |
| private boolean filterAppsByUser = false; |
| |
| private static final EnumSet<RMAppState> ACTIVE_APP_STATES = EnumSet.of( |
| RMAppState.ACCEPTED, RMAppState.RUNNING); |
| |
| private ResourceProfilesManager resourceProfilesManager; |
| private boolean timelineServiceV2Enabled; |
| |
| public ClientRMService(RMContext rmContext, YarnScheduler scheduler, |
| RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager, |
| QueueACLsManager queueACLsManager, |
| RMDelegationTokenSecretManager rmDTSecretManager) { |
| this(rmContext, scheduler, rmAppManager, applicationACLsManager, |
| queueACLsManager, rmDTSecretManager, new UTCClock()); |
| } |
| |
| public ClientRMService(RMContext rmContext, YarnScheduler scheduler, |
| RMAppManager rmAppManager, ApplicationACLsManager applicationACLsManager, |
| QueueACLsManager queueACLsManager, |
| RMDelegationTokenSecretManager rmDTSecretManager, Clock clock) { |
| super(ClientRMService.class.getName()); |
| this.scheduler = scheduler; |
| this.rmContext = rmContext; |
| this.rmAppManager = rmAppManager; |
| this.applicationsACLsManager = applicationACLsManager; |
| this.queueACLsManager = queueACLsManager; |
| this.rmDTSecretManager = rmDTSecretManager; |
| this.reservationSystem = rmContext.getReservationSystem(); |
| this.clock = clock; |
| this.rValidator = new ReservationInputValidator(clock); |
| resourceProfilesManager = rmContext.getResourceProfilesManager(); |
| } |
| |
| @Override |
| protected void serviceInit(Configuration conf) throws Exception { |
| clientBindAddress = getBindAddress(conf); |
| super.serviceInit(conf); |
| } |
| |
| @Override |
| protected void serviceStart() throws Exception { |
| Configuration conf = getConfig(); |
| YarnRPC rpc = YarnRPC.create(conf); |
| this.server = |
| rpc.getServer(ApplicationClientProtocol.class, this, |
| clientBindAddress, |
| conf, this.rmDTSecretManager, |
| conf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT, |
| YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT)); |
| |
| this.server.addTerseExceptions(ApplicationNotFoundException.class, |
| ApplicationAttemptNotFoundException.class, |
| ContainerNotFoundException.class, |
| YARNFeatureNotEnabledException.class); |
| |
| // Enable service authorization? |
| if (conf.getBoolean( |
| CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, |
| false)) { |
| InputStream inputStream = |
| this.rmContext.getConfigurationProvider() |
| .getConfigurationInputStream(conf, |
| YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE); |
| if (inputStream != null) { |
| conf.addResource(inputStream); |
| } |
| refreshServiceAcls(conf, RMPolicyProvider.getInstance()); |
| } |
| |
| this.filterAppsByUser = conf.getBoolean( |
| YarnConfiguration.FILTER_ENTITY_LIST_BY_USER, |
| YarnConfiguration.DEFAULT_DISPLAY_APPS_FOR_LOGGED_IN_USER); |
| |
| this.server.start(); |
| clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST, |
| YarnConfiguration.RM_ADDRESS, |
| YarnConfiguration.DEFAULT_RM_ADDRESS, |
| server.getListenerAddress()); |
| this.timelineServiceV2Enabled = YarnConfiguration. |
| timelineServiceV2Enabled(conf); |
| |
| if (conf.getBoolean( |
| YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_ENABLED, |
| YarnConfiguration.DEFAULT_RM_SUBMISSION_PREPROCESSOR_ENABLED)) { |
| this.contextPreProcessor = new SubmissionContextPreProcessor(); |
| this.contextPreProcessor.start(conf); |
| } |
| |
| super.serviceStart(); |
| } |
| |
| @Override |
| protected void serviceStop() throws Exception { |
| if (this.server != null) { |
| this.server.stop(); |
| } |
| if (this.contextPreProcessor != null) { |
| this.contextPreProcessor.stop(); |
| } |
| super.serviceStop(); |
| } |
| |
| InetSocketAddress getBindAddress(Configuration conf) { |
| return conf.getSocketAddr( |
| YarnConfiguration.RM_BIND_HOST, |
| YarnConfiguration.RM_ADDRESS, |
| YarnConfiguration.DEFAULT_RM_ADDRESS, |
| YarnConfiguration.DEFAULT_RM_PORT); |
| } |
| |
| @VisibleForTesting |
| SubmissionContextPreProcessor getContextPreProcessor() { |
| return this.contextPreProcessor; |
| } |
| |
| @Private |
| public InetSocketAddress getBindAddress() { |
| return clientBindAddress; |
| } |
| |
| /** |
| * check if the calling user has the access to application information. |
| * @param callerUGI the user information who submit the request |
| * @param owner the user of the application |
| * @param operationPerformed the type of operation defined in |
| * {@link ApplicationAccessType} |
| * @param application submitted application |
| * @return access is permitted or not |
| */ |
| private boolean checkAccess(UserGroupInformation callerUGI, String owner, |
| ApplicationAccessType operationPerformed, RMApp application) { |
| return applicationsACLsManager |
| .checkAccess(callerUGI, operationPerformed, owner, |
| application.getApplicationId()) || queueACLsManager |
| .checkAccess(callerUGI, QueueACL.ADMINISTER_QUEUE, application, |
| Server.getRemoteAddress(), null); |
| } |
| |
| ApplicationId getNewApplicationId() { |
| ApplicationId applicationId = org.apache.hadoop.yarn.server.utils.BuilderUtils |
| .newApplicationId(recordFactory, ResourceManager.getClusterTimeStamp(), |
| applicationCounter.incrementAndGet()); |
| LOG.info("Allocated new applicationId: " + applicationId.getId()); |
| return applicationId; |
| } |
| |
| @Override |
| public GetNewApplicationResponse getNewApplication( |
| GetNewApplicationRequest request) throws YarnException { |
| GetNewApplicationResponse response = recordFactory |
| .newRecordInstance(GetNewApplicationResponse.class); |
| response.setApplicationId(getNewApplicationId()); |
| // Pick up min/max resource from scheduler... |
| response.setMaximumResourceCapability(scheduler |
| .getMaximumResourceCapability()); |
| |
| return response; |
| } |
| |
| /** |
| * It gives response which includes application report if the application |
| * present otherwise throws ApplicationNotFoundException. |
| */ |
| @Override |
| public GetApplicationReportResponse getApplicationReport( |
| GetApplicationReportRequest request) throws YarnException { |
| ApplicationId applicationId = request.getApplicationId(); |
| if (applicationId == null) { |
| throw new ApplicationNotFoundException("Invalid application id: null"); |
| } |
| |
| UserGroupInformation callerUGI = getCallerUgi(applicationId, |
| AuditConstants.GET_APP_REPORT); |
| |
| RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI, |
| AuditConstants.GET_APP_REPORT, ApplicationAccessType.VIEW_APP, false); |
| |
| boolean allowAccess = checkAccess(callerUGI, application.getUser(), |
| ApplicationAccessType.VIEW_APP, application); |
| ApplicationReport report = |
| application.createAndGetApplicationReport(callerUGI.getUserName(), |
| allowAccess); |
| |
| GetApplicationReportResponse response = recordFactory |
| .newRecordInstance(GetApplicationReportResponse.class); |
| response.setApplicationReport(report); |
| return response; |
| } |
| |
| @Override |
| public GetApplicationAttemptReportResponse getApplicationAttemptReport( |
| GetApplicationAttemptReportRequest request) throws YarnException, |
| IOException { |
| ApplicationId applicationId |
| = request.getApplicationAttemptId().getApplicationId(); |
| ApplicationAttemptId appAttemptId = request.getApplicationAttemptId(); |
| UserGroupInformation callerUGI = getCallerUgi(applicationId, |
| AuditConstants.GET_APP_ATTEMPT_REPORT); |
| RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI, |
| AuditConstants.GET_APP_ATTEMPT_REPORT, ApplicationAccessType.VIEW_APP, |
| false); |
| |
| boolean allowAccess = checkAccess(callerUGI, application.getUser(), |
| ApplicationAccessType.VIEW_APP, application); |
| GetApplicationAttemptReportResponse response = null; |
| if (allowAccess) { |
| RMAppAttempt appAttempt = application.getAppAttempts().get(appAttemptId); |
| if (appAttempt == null) { |
| throw new ApplicationAttemptNotFoundException( |
| "ApplicationAttempt with id '" + appAttemptId + |
| "' doesn't exist in RM."); |
| } |
| ApplicationAttemptReport attemptReport = appAttempt |
| .createApplicationAttemptReport(); |
| response = GetApplicationAttemptReportResponse.newInstance(attemptReport); |
| }else{ |
| throw new YarnException("User " + callerUGI.getShortUserName() |
| + " does not have privilege to see this attempt " + appAttemptId); |
| } |
| return response; |
| } |
| |
| @Override |
| public GetApplicationAttemptsResponse getApplicationAttempts( |
| GetApplicationAttemptsRequest request) throws YarnException, IOException { |
| ApplicationId appId = request.getApplicationId(); |
| UserGroupInformation callerUGI = getCallerUgi(appId, |
| AuditConstants.GET_APP_ATTEMPTS); |
| RMApp application = verifyUserAccessForRMApp(appId, callerUGI, |
| AuditConstants.GET_APP_ATTEMPTS, ApplicationAccessType.VIEW_APP, |
| false); |
| boolean allowAccess = checkAccess(callerUGI, application.getUser(), |
| ApplicationAccessType.VIEW_APP, application); |
| GetApplicationAttemptsResponse response = null; |
| if (allowAccess) { |
| Map<ApplicationAttemptId, RMAppAttempt> attempts = application |
| .getAppAttempts(); |
| List<ApplicationAttemptReport> listAttempts = |
| new ArrayList<ApplicationAttemptReport>(); |
| Iterator<Map.Entry<ApplicationAttemptId, RMAppAttempt>> iter = attempts |
| .entrySet().iterator(); |
| while (iter.hasNext()) { |
| listAttempts.add(iter.next().getValue() |
| .createApplicationAttemptReport()); |
| } |
| response = GetApplicationAttemptsResponse.newInstance(listAttempts); |
| } else { |
| throw new YarnException("User " + callerUGI.getShortUserName() |
| + " does not have privilege to see this application " + appId); |
| } |
| return response; |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * we're going to fix the issue of showing non-running containers of the |
| * running application in YARN-1794 |
| */ |
| @Override |
| public GetContainerReportResponse getContainerReport( |
| GetContainerReportRequest request) throws YarnException, IOException { |
| ContainerId containerId = request.getContainerId(); |
| if (containerId == null) { |
| throw new ContainerNotFoundException("Invalid container id: null"); |
| } |
| ApplicationAttemptId appAttemptId = containerId.getApplicationAttemptId(); |
| ApplicationId appId = appAttemptId.getApplicationId(); |
| UserGroupInformation callerUGI = getCallerUgi(appId, |
| AuditConstants.GET_CONTAINER_REPORT); |
| RMApp application = verifyUserAccessForRMApp(appId, callerUGI, |
| AuditConstants.GET_CONTAINER_REPORT, ApplicationAccessType.VIEW_APP, |
| false); |
| boolean allowAccess = checkAccess(callerUGI, application.getUser(), |
| ApplicationAccessType.VIEW_APP, application); |
| GetContainerReportResponse response = null; |
| if (allowAccess) { |
| RMAppAttempt appAttempt = application.getAppAttempts().get(appAttemptId); |
| if (appAttempt == null) { |
| throw new ApplicationAttemptNotFoundException( |
| "ApplicationAttempt with id '" + appAttemptId + |
| "' doesn't exist in RM."); |
| } |
| RMContainer rmContainer = this.rmContext.getScheduler().getRMContainer( |
| containerId); |
| if (rmContainer == null) { |
| throw new ContainerNotFoundException("Container with id '" + containerId |
| + "' doesn't exist in RM."); |
| } |
| response = GetContainerReportResponse.newInstance(rmContainer |
| .createContainerReport()); |
| } else { |
| throw new YarnException("User " + callerUGI.getShortUserName() |
| + " does not have privilege to see this application " + appId); |
| } |
| return response; |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * we're going to fix the issue of showing non-running containers of the |
| * running application in YARN-1794" |
| */ |
| @Override |
| public GetContainersResponse getContainers(GetContainersRequest request) |
| throws YarnException, IOException { |
| ApplicationAttemptId appAttemptId = request.getApplicationAttemptId(); |
| ApplicationId appId = appAttemptId.getApplicationId(); |
| UserGroupInformation callerUGI = getCallerUgi(appId, |
| AuditConstants.GET_CONTAINERS); |
| RMApp application = verifyUserAccessForRMApp(appId, callerUGI, |
| AuditConstants.GET_CONTAINERS, ApplicationAccessType.VIEW_APP, false); |
| boolean allowAccess = checkAccess(callerUGI, application.getUser(), |
| ApplicationAccessType.VIEW_APP, application); |
| GetContainersResponse response = null; |
| if (allowAccess) { |
| RMAppAttempt appAttempt = application.getAppAttempts().get(appAttemptId); |
| if (appAttempt == null) { |
| throw new ApplicationAttemptNotFoundException( |
| "ApplicationAttempt with id '" + appAttemptId + |
| "' doesn't exist in RM."); |
| } |
| Collection<RMContainer> rmContainers = Collections.emptyList(); |
| SchedulerAppReport schedulerAppReport = |
| this.rmContext.getScheduler().getSchedulerAppInfo(appAttemptId); |
| if (schedulerAppReport != null) { |
| rmContainers = schedulerAppReport.getLiveContainers(); |
| } |
| List<ContainerReport> listContainers = new ArrayList<ContainerReport>(); |
| for (RMContainer rmContainer : rmContainers) { |
| listContainers.add(rmContainer.createContainerReport()); |
| } |
| response = GetContainersResponse.newInstance(listContainers); |
| } else { |
| throw new YarnException("User " + callerUGI.getShortUserName() |
| + " does not have privilege to see this application " + appId); |
| } |
| return response; |
| } |
| |
| @Override |
| public SubmitApplicationResponse submitApplication( |
| SubmitApplicationRequest request) throws YarnException, IOException { |
| ApplicationSubmissionContext submissionContext = request |
| .getApplicationSubmissionContext(); |
| ApplicationId applicationId = submissionContext.getApplicationId(); |
| CallerContext callerContext = CallerContext.getCurrent(); |
| |
| // ApplicationSubmissionContext needs to be validated for safety - only |
| // those fields that are independent of the RM's configuration will be |
| // checked here, those that are dependent on RM configuration are validated |
| // in RMAppManager. |
| |
| UserGroupInformation userUgi = null; |
| String user = null; |
| try { |
| // Safety |
| userUgi = UserGroupInformation.getCurrentUser(); |
| user = userUgi.getShortUserName(); |
| } catch (IOException ie) { |
| LOG.warn("Unable to get the current user.", ie); |
| RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, |
| ie.getMessage(), "ClientRMService", |
| "Exception in submitting application", applicationId, callerContext, |
| submissionContext.getQueue()); |
| throw RPCUtil.getRemoteException(ie); |
| } |
| |
| checkTags(submissionContext.getApplicationTags()); |
| |
| if (timelineServiceV2Enabled) { |
| // Sanity check for flow run |
| String value = null; |
| try { |
| for (String tag : submissionContext.getApplicationTags()) { |
| if (tag.startsWith(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX + ":") || |
| tag.startsWith( |
| TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.toLowerCase() + ":")) { |
| value = tag.substring(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX.length() |
| + 1); |
| // In order to check the number format |
| Long.valueOf(value); |
| } |
| } |
| } catch (NumberFormatException e) { |
| LOG.warn("Invalid to flow run: " + value + |
| ". Flow run should be a long integer", e); |
| RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, |
| e.getMessage(), "ClientRMService", |
| "Exception in submitting application", applicationId, |
| submissionContext.getQueue()); |
| throw RPCUtil.getRemoteException(e); |
| } |
| } |
| |
| // Check whether app has already been put into rmContext, |
| // If it is, simply return the response |
| if (rmContext.getRMApps().get(applicationId) != null) { |
| LOG.info("This is an earlier submitted application: " + applicationId); |
| return SubmitApplicationResponse.newInstance(); |
| } |
| |
| ByteBuffer tokenConf = |
| submissionContext.getAMContainerSpec().getTokensConf(); |
| if (tokenConf != null) { |
| int maxSize = getConfig() |
| .getInt(YarnConfiguration.RM_DELEGATION_TOKEN_MAX_CONF_SIZE, |
| YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_MAX_CONF_SIZE_BYTES); |
| LOG.info("Using app provided configurations for delegation token renewal," |
| + " total size = " + tokenConf.capacity()); |
| if (tokenConf.capacity() > maxSize) { |
| throw new YarnException( |
| "Exceed " + YarnConfiguration.RM_DELEGATION_TOKEN_MAX_CONF_SIZE |
| + " = " + maxSize + " bytes, current conf size = " |
| + tokenConf.capacity() + " bytes."); |
| } |
| } |
| if (submissionContext.getQueue() == null) { |
| submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME); |
| } |
| if (submissionContext.getApplicationName() == null) { |
| submissionContext.setApplicationName( |
| YarnConfiguration.DEFAULT_APPLICATION_NAME); |
| } |
| if (submissionContext.getApplicationType() == null) { |
| submissionContext |
| .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE); |
| } else { |
| if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) { |
| submissionContext.setApplicationType(submissionContext |
| .getApplicationType().substring(0, |
| YarnConfiguration.APPLICATION_TYPE_LENGTH)); |
| } |
| } |
| |
| ReservationId reservationId = request.getApplicationSubmissionContext() |
| .getReservationID(); |
| |
| checkReservationACLs(submissionContext.getQueue(), AuditConstants |
| .SUBMIT_RESERVATION_REQUEST, reservationId); |
| |
| if (this.contextPreProcessor != null) { |
| this.contextPreProcessor.preProcess(Server.getRemoteIp().getHostName(), |
| applicationId, submissionContext); |
| } |
| |
| try { |
| // call RMAppManager to submit application directly |
| rmAppManager.submitApplication(submissionContext, |
| System.currentTimeMillis(), userUgi); |
| |
| LOG.info("Application with id " + applicationId.getId() + |
| " submitted by user " + user); |
| RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST, |
| "ClientRMService", applicationId, callerContext, |
| submissionContext.getQueue(), |
| submissionContext.getNodeLabelExpression()); |
| } catch (YarnException e) { |
| LOG.info("Exception in submitting " + applicationId, e); |
| RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, |
| e.getMessage(), "ClientRMService", |
| "Exception in submitting application", applicationId, callerContext, |
| submissionContext.getQueue(), |
| submissionContext.getNodeLabelExpression()); |
| throw e; |
| } |
| |
| return recordFactory |
| .newRecordInstance(SubmitApplicationResponse.class); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public FailApplicationAttemptResponse failApplicationAttempt( |
| FailApplicationAttemptRequest request) throws YarnException { |
| |
| ApplicationAttemptId attemptId = request.getApplicationAttemptId(); |
| ApplicationId applicationId = attemptId.getApplicationId(); |
| |
| UserGroupInformation callerUGI = getCallerUgi(applicationId, |
| AuditConstants.FAIL_ATTEMPT_REQUEST); |
| RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI, |
| AuditConstants.FAIL_ATTEMPT_REQUEST, ApplicationAccessType.MODIFY_APP, |
| true); |
| |
| RMAppAttempt appAttempt = application.getAppAttempts().get(attemptId); |
| if (appAttempt == null) { |
| throw new ApplicationAttemptNotFoundException( |
| "ApplicationAttempt with id '" + attemptId + "' doesn't exist in RM."); |
| } |
| |
| FailApplicationAttemptResponse response = |
| recordFactory.newRecordInstance(FailApplicationAttemptResponse.class); |
| |
| if (application.isAppInCompletedStates()) { |
| RMAuditLogger.logSuccess(callerUGI.getShortUserName(), |
| AuditConstants.FAIL_ATTEMPT_REQUEST, "ClientRMService", |
| applicationId); |
| return response; |
| } |
| |
| this.rmContext.getDispatcher().getEventHandler().handle( |
| new RMAppAttemptEvent(attemptId, RMAppAttemptEventType.FAIL, |
| "Attempt failed by user.")); |
| |
| RMAuditLogger.logSuccess(callerUGI.getShortUserName(), |
| AuditConstants.FAIL_ATTEMPT_REQUEST, "ClientRMService", applicationId, |
| attemptId); |
| |
| return response; |
| } |
| |
| private void checkTags(Set<String> tags) throws YarnException { |
| int appMaxTags = getConfig().getInt( |
| YarnConfiguration.RM_APPLICATION_MAX_TAGS, |
| YarnConfiguration.DEFAULT_RM_APPLICATION_MAX_TAGS); |
| int appMaxTagLength = getConfig().getInt( |
| YarnConfiguration.RM_APPLICATION_MAX_TAG_LENGTH, |
| YarnConfiguration.DEFAULT_RM_APPLICATION_MAX_TAG_LENGTH); |
| if (tags.size() > appMaxTags) { |
| throw RPCUtil.getRemoteException(new IllegalArgumentException( |
| "Too many applicationTags, a maximum of only " + appMaxTags |
| + " are allowed!")); |
| } |
| for (String tag : tags) { |
| if (tag.length() > appMaxTagLength) { |
| throw RPCUtil.getRemoteException( |
| new IllegalArgumentException("Tag " + tag + " is too long, " |
| + "maximum allowed length of a tag is " + appMaxTagLength)); |
| } |
| if (!org.apache.commons.lang3.StringUtils.isAsciiPrintable(tag)) { |
| throw RPCUtil.getRemoteException(new IllegalArgumentException( |
| "A tag can only have ASCII " + "characters! Invalid tag - " + tag)); |
| } |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public KillApplicationResponse forceKillApplication( |
| KillApplicationRequest request) throws YarnException { |
| |
| ApplicationId applicationId = request.getApplicationId(); |
| CallerContext callerContext = CallerContext.getCurrent(); |
| |
| UserGroupInformation callerUGI; |
| try { |
| callerUGI = UserGroupInformation.getCurrentUser(); |
| } catch (IOException ie) { |
| LOG.info("Error getting UGI ", ie); |
| RMAuditLogger.logFailure("UNKNOWN", AuditConstants.KILL_APP_REQUEST, |
| "UNKNOWN", "ClientRMService", "Error getting UGI", |
| applicationId, callerContext); |
| throw RPCUtil.getRemoteException(ie); |
| } |
| RMApp application = this.rmContext.getRMApps().get(applicationId); |
| if (application == null) { |
| RMAuditLogger.logFailure(callerUGI.getUserName(), |
| AuditConstants.KILL_APP_REQUEST, "UNKNOWN", "ClientRMService", |
| "Trying to kill an absent application", applicationId, callerContext); |
| throw new ApplicationNotFoundException("Trying to kill an absent" |
| + " application " + applicationId); |
| } |
| |
| if (!checkAccess(callerUGI, application.getUser(), |
| ApplicationAccessType.MODIFY_APP, application)) { |
| RMAuditLogger.logFailure(callerUGI.getShortUserName(), |
| AuditConstants.KILL_APP_REQUEST, |
| "User doesn't have permissions to " |
| + ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService", |
| AuditConstants.UNAUTHORIZED_USER, applicationId, callerContext); |
| throw RPCUtil.getRemoteException(new AccessControlException("User " |
| + callerUGI.getShortUserName() + " cannot perform operation " |
| + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId)); |
| } |
| |
| if (application.isAppFinalStateStored()) { |
| return KillApplicationResponse.newInstance(true); |
| } |
| |
| StringBuilder message = new StringBuilder(); |
| message.append("Application ").append(applicationId) |
| .append(" was killed by user ").append(callerUGI.getShortUserName()); |
| |
| InetAddress remoteAddress = Server.getRemoteIp(); |
| if (null != remoteAddress) { |
| message.append(" at ").append(remoteAddress.getHostAddress()); |
| } |
| |
| String diagnostics = org.apache.commons.lang3.StringUtils |
| .trimToNull(request.getDiagnostics()); |
| if (diagnostics != null) { |
| message.append(" with diagnostic message: ") |
| .append(diagnostics); |
| } |
| |
| this.rmContext.getDispatcher().getEventHandler() |
| .handle(new RMAppKillByClientEvent(applicationId, message.toString(), |
| callerUGI, remoteAddress)); |
| |
| // For Unmanaged AMs, return true so they don't retry |
| return KillApplicationResponse.newInstance( |
| application.getApplicationSubmissionContext().getUnmanagedAM()); |
| } |
| |
| @Override |
| public GetClusterMetricsResponse getClusterMetrics( |
| GetClusterMetricsRequest request) throws YarnException { |
| GetClusterMetricsResponse response = recordFactory |
| .newRecordInstance(GetClusterMetricsResponse.class); |
| YarnClusterMetrics ymetrics = recordFactory |
| .newRecordInstance(YarnClusterMetrics.class); |
| ymetrics.setNumNodeManagers(this.rmContext.getRMNodes().size()); |
| ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics(); |
| ymetrics.setNumDecommissioningNodeManagers(clusterMetrics.getNumDecommissioningNMs()); |
| ymetrics.setNumDecommissionedNodeManagers(clusterMetrics |
| .getNumDecommisionedNMs()); |
| ymetrics.setNumActiveNodeManagers(clusterMetrics.getNumActiveNMs()); |
| ymetrics.setNumLostNodeManagers(clusterMetrics.getNumLostNMs()); |
| ymetrics.setNumUnhealthyNodeManagers(clusterMetrics.getUnhealthyNMs()); |
| ymetrics.setNumRebootedNodeManagers(clusterMetrics.getNumRebootedNMs()); |
| ymetrics.setNumShutdownNodeManagers(clusterMetrics.getNumShutdownNMs()); |
| response.setClusterMetrics(ymetrics); |
| return response; |
| } |
| |
| /** |
| * Get applications matching the {@link GetApplicationsRequest}. If |
| * caseSensitive is set to false, applicationTypes in |
| * GetApplicationRequest are expected to be in all-lowercase |
| */ |
| @Override |
| public GetApplicationsResponse getApplications(GetApplicationsRequest request) |
| throws YarnException { |
| UserGroupInformation callerUGI = getCallerUgi(null, |
| AuditConstants.GET_APPLICATIONS_REQUEST); |
| |
| Set<String> applicationTypes = getLowerCasedAppTypes(request); |
| EnumSet<YarnApplicationState> applicationStates = |
| request.getApplicationStates(); |
| Set<String> users = request.getUsers(); |
| Set<String> queues = request.getQueues(); |
| Set<String> tags = request.getApplicationTags(); |
| long limit = request.getLimit(); |
| Range<Long> start = request.getStartRange(); |
| Range<Long> finish = request.getFinishRange(); |
| ApplicationsRequestScope scope = request.getScope(); |
| String name = request.getName(); |
| |
| final Map<ApplicationId, RMApp> apps = rmContext.getRMApps(); |
| final Set<ApplicationId> runningAppsFilteredByQueues = |
| getRunningAppsFilteredByQueues(apps, queues); |
| |
| Iterator<RMApp> appsIter = apps.values().iterator(); |
| |
| List<ApplicationReport> reports = new ArrayList<ApplicationReport>(); |
| while (appsIter.hasNext() && reports.size() < limit) { |
| RMApp application = appsIter.next(); |
| |
| // Check if current application falls under the specified scope |
| if (scope == ApplicationsRequestScope.OWN && |
| !callerUGI.getUserName().equals(application.getUser())) { |
| continue; |
| } |
| |
| if (queues != null && !queues.isEmpty()) { |
| if (!runningAppsFilteredByQueues.contains(application.getApplicationId()) && |
| !queues.contains(application.getQueue())) { |
| continue; |
| } |
| } |
| |
| if (applicationTypes != null && !applicationTypes.isEmpty()) { |
| String appTypeToMatch = |
| StringUtils.toLowerCase(application.getApplicationType()); |
| if (!applicationTypes.contains(appTypeToMatch)) { |
| continue; |
| } |
| } |
| |
| if (applicationStates != null && !applicationStates.isEmpty()) { |
| if (!applicationStates.contains(application |
| .createApplicationState())) { |
| continue; |
| } |
| } |
| |
| if (users != null && !users.isEmpty() && |
| !users.contains(application.getUser())) { |
| continue; |
| } |
| |
| if (start != null && !start.contains(application.getStartTime())) { |
| continue; |
| } |
| |
| if (finish != null && !finish.contains(application.getFinishTime())) { |
| continue; |
| } |
| |
| if (tags != null && !tags.isEmpty()) { |
| Set<String> appTags = application.getApplicationTags(); |
| if (appTags == null || appTags.isEmpty()) { |
| continue; |
| } |
| boolean match = false; |
| for (String tag : tags) { |
| if (appTags.contains(tag)) { |
| match = true; |
| break; |
| } |
| } |
| if (!match) { |
| continue; |
| } |
| } |
| |
| // checkAccess can grab the scheduler lock so call it last |
| boolean allowAccess = checkAccess(callerUGI, application.getUser(), |
| ApplicationAccessType.VIEW_APP, application); |
| if (scope == ApplicationsRequestScope.VIEWABLE && !allowAccess) { |
| continue; |
| } |
| |
| // Given RM is configured to display apps per user, skip apps to which |
| // this caller doesn't have access to view. |
| if (filterAppsByUser && !allowAccess) { |
| continue; |
| } |
| |
| if (name != null && !name.equals(application.getName())) { |
| continue; |
| } |
| |
| reports.add(application.createAndGetApplicationReport( |
| callerUGI.getUserName(), allowAccess)); |
| } |
| |
| RMAuditLogger.logSuccess(callerUGI.getUserName(), |
| AuditConstants.GET_APPLICATIONS_REQUEST, "ClientRMService"); |
| GetApplicationsResponse response = |
| recordFactory.newRecordInstance(GetApplicationsResponse.class); |
| response.setApplicationList(reports); |
| return response; |
| } |
| |
| private Set<ApplicationId> getRunningAppsFilteredByQueues( |
| Map<ApplicationId, RMApp> apps, Set<String> queues) { |
| final Set<ApplicationId> runningApps = new HashSet<>(); |
| for (String queue : queues) { |
| List<ApplicationAttemptId> appsInQueue = scheduler.getAppsInQueue(queue); |
| if (appsInQueue != null) { |
| for (ApplicationAttemptId appAttemptId : appsInQueue) { |
| RMApp rmApp = apps.get(appAttemptId.getApplicationId()); |
| if (rmApp != null) { |
| runningApps.add(rmApp.getApplicationId()); |
| } |
| } |
| } |
| } |
| return runningApps; |
| } |
| |
| private Set<String> getLowerCasedAppTypes(GetApplicationsRequest request) { |
| Set<String> applicationTypes = new HashSet<>(); |
| if (request.getApplicationTypes() != null && !request.getApplicationTypes() |
| .isEmpty()) { |
| for (String type : request.getApplicationTypes()) { |
| applicationTypes.add(StringUtils.toLowerCase(type)); |
| } |
| } |
| return applicationTypes; |
| } |
| |
| @Override |
| public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) |
| throws YarnException { |
| GetClusterNodesResponse response = |
| recordFactory.newRecordInstance(GetClusterNodesResponse.class); |
| EnumSet<NodeState> nodeStates = request.getNodeStates(); |
| if (nodeStates == null || nodeStates.isEmpty()) { |
| nodeStates = EnumSet.allOf(NodeState.class); |
| } |
| Collection<RMNode> nodes = RMServerUtils.queryRMNodes(rmContext, |
| nodeStates); |
| |
| List<NodeReport> nodeReports = new ArrayList<NodeReport>(nodes.size()); |
| for (RMNode nodeInfo : nodes) { |
| nodeReports.add(createNodeReports(nodeInfo)); |
| } |
| response.setNodeReports(nodeReports); |
| return response; |
| } |
| |
| @Override |
| public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) |
| throws YarnException { |
| UserGroupInformation callerUGI = getCallerUgi(null, |
| AuditConstants.GET_QUEUE_INFO_REQUEST); |
| |
| GetQueueInfoResponse response = |
| recordFactory.newRecordInstance(GetQueueInfoResponse.class); |
| RMAuditLogger.ArgsBuilder arguments = new RMAuditLogger.ArgsBuilder() |
| .append(Keys.QUEUENAME, request.getQueueName()) |
| .append(Keys.INCLUDEAPPS, |
| String.valueOf(request.getIncludeApplications())) |
| .append(Keys.INCLUDECHILDQUEUES, |
| String.valueOf(request.getIncludeChildQueues())) |
| .append(Keys.RECURSIVE, String.valueOf(request.getRecursive())); |
| try { |
| QueueInfo queueInfo = |
| scheduler.getQueueInfo(request.getQueueName(), |
| request.getIncludeChildQueues(), |
| request.getRecursive()); |
| List<ApplicationReport> appReports = EMPTY_APPS_REPORT; |
| if (request.getIncludeApplications()) { |
| List<ApplicationAttemptId> apps = |
| scheduler.getAppsInQueue(request.getQueueName()); |
| appReports = new ArrayList<ApplicationReport>(apps.size()); |
| for (ApplicationAttemptId app : apps) { |
| RMApp rmApp = rmContext.getRMApps().get(app.getApplicationId()); |
| if (rmApp != null) { |
| // Check if user is allowed access to this app |
| if (!checkAccess(callerUGI, rmApp.getUser(), |
| ApplicationAccessType.VIEW_APP, rmApp)) { |
| continue; |
| } |
| appReports.add( |
| rmApp.createAndGetApplicationReport( |
| callerUGI.getUserName(), true)); |
| } |
| } |
| } |
| queueInfo.setApplications(appReports); |
| response.setQueueInfo(queueInfo); |
| RMAuditLogger.logSuccess(callerUGI.getUserName(), |
| AuditConstants.GET_QUEUE_INFO_REQUEST, |
| "ClientRMService", arguments); |
| } catch (IOException ioe) { |
| LOG.info("Failed to getQueueInfo for " + request.getQueueName(), ioe); |
| RMAuditLogger.logFailure(callerUGI.getUserName(), |
| AuditConstants.GET_QUEUE_INFO_REQUEST, "UNKNOWN", "ClientRMService", |
| ioe.getMessage(), arguments); |
| } |
| |
| return response; |
| } |
| |
| private NodeReport createNodeReports(RMNode rmNode) { |
| SchedulerNodeReport schedulerNodeReport = |
| scheduler.getNodeReport(rmNode.getNodeID()); |
| Resource used = Resources.createResource(0); |
| int numContainers = 0; |
| if (schedulerNodeReport != null) { |
| used = schedulerNodeReport.getUsedResource(); |
| numContainers = schedulerNodeReport.getNumContainers(); |
| } |
| |
| Set<NodeAttribute> attrs = rmNode.getAllNodeAttributes(); |
| NodeReport report = |
| BuilderUtils.newNodeReport(rmNode.getNodeID(), rmNode.getState(), |
| rmNode.getHttpAddress(), rmNode.getRackName(), used, |
| rmNode.getTotalCapability(), numContainers, |
| rmNode.getHealthReport(), rmNode.getLastHealthReportTime(), |
| rmNode.getNodeLabels(), rmNode.getAggregatedContainersUtilization(), |
| rmNode.getNodeUtilization(), rmNode.getDecommissioningTimeout(), |
| null, attrs); |
| |
| return report; |
| } |
| |
| @Override |
| public GetQueueUserAclsInfoResponse getQueueUserAcls( |
| GetQueueUserAclsInfoRequest request) throws YarnException { |
| GetQueueUserAclsInfoResponse response = |
| recordFactory.newRecordInstance(GetQueueUserAclsInfoResponse.class); |
| response.setUserAclsInfoList(scheduler.getQueueUserAclInfo()); |
| return response; |
| } |
| |
| |
| @Override |
| public GetDelegationTokenResponse getDelegationToken( |
| GetDelegationTokenRequest request) throws YarnException { |
| try { |
| |
| // Verify that the connection is kerberos authenticated |
| if (!isAllowedDelegationTokenOp()) { |
| throw new IOException( |
| "Delegation Token can be issued only with kerberos authentication"); |
| } |
| |
| GetDelegationTokenResponse response = |
| recordFactory.newRecordInstance(GetDelegationTokenResponse.class); |
| UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); |
| Text owner = new Text(ugi.getUserName()); |
| Text realUser = null; |
| if (ugi.getRealUser() != null) { |
| realUser = new Text(ugi.getRealUser().getUserName()); |
| } |
| RMDelegationTokenIdentifier tokenIdentifier = |
| new RMDelegationTokenIdentifier(owner, new Text(request.getRenewer()), |
| realUser); |
| Token<RMDelegationTokenIdentifier> realRMDToken = |
| new Token<RMDelegationTokenIdentifier>(tokenIdentifier, |
| this.rmDTSecretManager); |
| response.setRMDelegationToken( |
| BuilderUtils.newDelegationToken( |
| realRMDToken.getIdentifier(), |
| realRMDToken.getKind().toString(), |
| realRMDToken.getPassword(), |
| realRMDToken.getService().toString() |
| )); |
| return response; |
| } catch(IOException io) { |
| throw RPCUtil.getRemoteException(io); |
| } |
| } |
| |
| @Override |
| public RenewDelegationTokenResponse renewDelegationToken( |
| RenewDelegationTokenRequest request) throws YarnException { |
| try { |
| if (!isAllowedDelegationTokenOp()) { |
| throw new IOException( |
| "Delegation Token can be renewed only with kerberos authentication"); |
| } |
| |
| org.apache.hadoop.yarn.api.records.Token protoToken = request.getDelegationToken(); |
| Token<RMDelegationTokenIdentifier> token = new Token<RMDelegationTokenIdentifier>( |
| protoToken.getIdentifier().array(), protoToken.getPassword().array(), |
| new Text(protoToken.getKind()), new Text(protoToken.getService())); |
| |
| String user = getRenewerForToken(token); |
| long nextExpTime = rmDTSecretManager.renewToken(token, user); |
| RenewDelegationTokenResponse renewResponse = Records |
| .newRecord(RenewDelegationTokenResponse.class); |
| renewResponse.setNextExpirationTime(nextExpTime); |
| return renewResponse; |
| } catch (IOException e) { |
| throw RPCUtil.getRemoteException(e); |
| } |
| } |
| |
| @Override |
| public CancelDelegationTokenResponse cancelDelegationToken( |
| CancelDelegationTokenRequest request) throws YarnException { |
| try { |
| if (!isAllowedDelegationTokenOp()) { |
| throw new IOException( |
| "Delegation Token can be cancelled only with kerberos authentication"); |
| } |
| org.apache.hadoop.yarn.api.records.Token protoToken = request.getDelegationToken(); |
| Token<RMDelegationTokenIdentifier> token = new Token<RMDelegationTokenIdentifier>( |
| protoToken.getIdentifier().array(), protoToken.getPassword().array(), |
| new Text(protoToken.getKind()), new Text(protoToken.getService())); |
| |
| String user = UserGroupInformation.getCurrentUser().getUserName(); |
| rmDTSecretManager.cancelToken(token, user); |
| return Records.newRecord(CancelDelegationTokenResponse.class); |
| } catch (IOException e) { |
| throw RPCUtil.getRemoteException(e); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( |
| MoveApplicationAcrossQueuesRequest request) throws YarnException { |
| ApplicationId applicationId = request.getApplicationId(); |
| |
| UserGroupInformation callerUGI = getCallerUgi(applicationId, |
| AuditConstants.MOVE_APP_REQUEST); |
| RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI, |
| AuditConstants.MOVE_APP_REQUEST, ApplicationAccessType.MODIFY_APP, |
| true); |
| |
| String targetQueue = request.getTargetQueue(); |
| if (!accessToTargetQueueAllowed(callerUGI, application, targetQueue)) { |
| RMAuditLogger.logFailure(callerUGI.getShortUserName(), |
| AuditConstants.MOVE_APP_REQUEST, "Target queue doesn't exist or user" |
| + " doesn't have permissions to submit to target queue: " |
| + targetQueue, "ClientRMService", |
| AuditConstants.UNAUTHORIZED_USER, applicationId); |
| throw RPCUtil.getRemoteException(new AccessControlException("User " |
| + callerUGI.getShortUserName() + " cannot submit applications to" |
| + " target queue or the target queue doesn't exist: " |
| + targetQueue + " while moving " + applicationId)); |
| } |
| |
| // Moves only allowed when app is in a state that means it is tracked by |
| // the scheduler. Introducing SUBMITTED state also to this list as there |
| // could be a corner scenario that app may not be in Scheduler in SUBMITTED |
| // state. |
| if (!ACTIVE_APP_STATES.contains(application.getState())) { |
| String msg = "App in " + application.getState() + |
| " state cannot be moved."; |
| RMAuditLogger.logFailure(callerUGI.getShortUserName(), |
| AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService", msg); |
| throw new YarnException(msg); |
| } |
| |
| try { |
| this.rmAppManager.moveApplicationAcrossQueue( |
| application.getApplicationId(), |
| request.getTargetQueue()); |
| } catch (YarnException ex) { |
| RMAuditLogger.logFailure(callerUGI.getShortUserName(), |
| AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService", |
| ex.getMessage()); |
| throw ex; |
| } |
| |
| RMAuditLogger.logSuccess(callerUGI.getShortUserName(), |
| AuditConstants.MOVE_APP_REQUEST, "ClientRMService" , applicationId); |
| return recordFactory |
| .newRecordInstance(MoveApplicationAcrossQueuesResponse.class); |
| } |
| |
| /** |
| * Check if the submission of an application to the target queue is allowed. |
| * @param callerUGI the caller UGI |
| * @param application the application to move |
| * @param targetQueue the queue to move the application to |
| * @return true if submission is allowed, false otherwise |
| */ |
| private boolean accessToTargetQueueAllowed(UserGroupInformation callerUGI, |
| RMApp application, String targetQueue) { |
| return |
| queueACLsManager.checkAccess(callerUGI, |
| QueueACL.SUBMIT_APPLICATIONS, application, |
| Server.getRemoteAddress(), null, targetQueue) || |
| queueACLsManager.checkAccess(callerUGI, |
| QueueACL.ADMINISTER_QUEUE, application, |
| Server.getRemoteAddress(), null, targetQueue); |
| } |
| |
| private String getRenewerForToken(Token<RMDelegationTokenIdentifier> token) |
| throws IOException { |
| UserGroupInformation user = UserGroupInformation.getCurrentUser(); |
| UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); |
| // we can always renew our own tokens |
| return loginUser.getUserName().equals(user.getUserName()) |
| ? token.decodeIdentifier().getRenewer().toString() |
| : user.getShortUserName(); |
| } |
| |
| void refreshServiceAcls(Configuration configuration, |
| PolicyProvider policyProvider) { |
| this.server.refreshServiceAclWithLoadedConfiguration(configuration, |
| policyProvider); |
| } |
| |
| private boolean isAllowedDelegationTokenOp() throws IOException { |
| if (UserGroupInformation.isSecurityEnabled()) { |
| return EnumSet.of(AuthenticationMethod.KERBEROS, |
| AuthenticationMethod.KERBEROS_SSL, |
| AuthenticationMethod.CERTIFICATE) |
| .contains(UserGroupInformation.getCurrentUser() |
| .getRealAuthenticationMethod()); |
| } else { |
| return true; |
| } |
| } |
| |
| @VisibleForTesting |
| public Server getServer() { |
| return this.server; |
| } |
| |
| @Override |
| public GetNewReservationResponse getNewReservation( |
| GetNewReservationRequest request) throws YarnException, IOException { |
| checkReservationSystem(); |
| GetNewReservationResponse response = |
| recordFactory.newRecordInstance(GetNewReservationResponse.class); |
| |
| ReservationId reservationId = reservationSystem.getNewReservationId(); |
| response.setReservationId(reservationId); |
| // Create a new Reservation Id |
| return response; |
| } |
| |
| @Override |
| public ReservationSubmissionResponse submitReservation( |
| ReservationSubmissionRequest request) throws YarnException, IOException { |
| // Check if reservation system is enabled |
| checkReservationSystem(); |
| ReservationSubmissionResponse response = |
| recordFactory.newRecordInstance(ReservationSubmissionResponse.class); |
| ReservationId reservationId = request.getReservationId(); |
| // Validate the input |
| Plan plan = |
| rValidator.validateReservationSubmissionRequest(reservationSystem, |
| request, reservationId); |
| |
| ReservationAllocation allocation = plan.getReservationById(reservationId); |
| |
| if (allocation != null) { |
| boolean isNewDefinition = !allocation.getReservationDefinition().equals( |
| request.getReservationDefinition()); |
| if (isNewDefinition) { |
| String message = "Reservation allocation already exists with the " + |
| "reservation id " + reservationId.toString() + ", but a different" + |
| " reservation definition was provided. Please try again with a " + |
| "new reservation id, or consider updating the reservation instead."; |
| throw RPCUtil.getRemoteException(message); |
| } else { |
| return response; |
| } |
| } |
| |
| // Check ACLs |
| String queueName = request.getQueue(); |
| String user = |
| checkReservationACLs(queueName, |
| AuditConstants.SUBMIT_RESERVATION_REQUEST, null); |
| try { |
| // Try to place the reservation using the agent |
| boolean result = |
| plan.getReservationAgent().createReservation(reservationId, user, |
| plan, request.getReservationDefinition()); |
| if (result) { |
| // add the reservation id to valid ones maintained by reservation |
| // system |
| reservationSystem.setQueueForReservation(reservationId, queueName); |
| // create the reservation synchronously if required |
| refreshScheduler(queueName, request.getReservationDefinition(), |
| reservationId.toString()); |
| // return the reservation id |
| } |
| } catch (PlanningException e) { |
| RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_RESERVATION_REQUEST, |
| e.getMessage(), "ClientRMService", |
| "Unable to create the reservation: " + reservationId); |
| throw RPCUtil.getRemoteException(e); |
| } |
| RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_RESERVATION_REQUEST, |
| "ClientRMService: " + reservationId); |
| return response; |
| } |
| |
| @Override |
| public ReservationUpdateResponse updateReservation( |
| ReservationUpdateRequest request) throws YarnException, IOException { |
| // Check if reservation system is enabled |
| checkReservationSystem(); |
| ReservationUpdateResponse response = |
| recordFactory.newRecordInstance(ReservationUpdateResponse.class); |
| // Validate the input |
| Plan plan = |
| rValidator.validateReservationUpdateRequest(reservationSystem, request); |
| ReservationId reservationId = request.getReservationId(); |
| String queueName = reservationSystem.getQueueForReservation(reservationId); |
| // Check ACLs |
| String user = |
| checkReservationACLs(queueName, |
| AuditConstants.UPDATE_RESERVATION_REQUEST, reservationId); |
| // Try to update the reservation using default agent |
| try { |
| boolean result = |
| plan.getReservationAgent().updateReservation(reservationId, user, |
| plan, request.getReservationDefinition()); |
| if (!result) { |
| String errMsg = "Unable to update reservation: " + reservationId; |
| RMAuditLogger.logFailure(user, |
| AuditConstants.UPDATE_RESERVATION_REQUEST, errMsg, |
| "ClientRMService", errMsg); |
| throw RPCUtil.getRemoteException(errMsg); |
| } |
| } catch (PlanningException e) { |
| RMAuditLogger.logFailure(user, AuditConstants.UPDATE_RESERVATION_REQUEST, |
| e.getMessage(), "ClientRMService", |
| "Unable to update the reservation: " + reservationId); |
| throw RPCUtil.getRemoteException(e); |
| } |
| RMAuditLogger.logSuccess(user, AuditConstants.UPDATE_RESERVATION_REQUEST, |
| "ClientRMService: " + reservationId); |
| return response; |
| } |
| |
| @Override |
| public ReservationDeleteResponse deleteReservation( |
| ReservationDeleteRequest request) throws YarnException, IOException { |
| // Check if reservation system is enabled |
| checkReservationSystem(); |
| ReservationDeleteResponse response = |
| recordFactory.newRecordInstance(ReservationDeleteResponse.class); |
| // Validate the input |
| Plan plan = |
| rValidator.validateReservationDeleteRequest(reservationSystem, request); |
| ReservationId reservationId = request.getReservationId(); |
| String queueName = reservationSystem.getQueueForReservation(reservationId); |
| // Check ACLs |
| String user = |
| checkReservationACLs(queueName, |
| AuditConstants.DELETE_RESERVATION_REQUEST, reservationId); |
| // Try to update the reservation using default agent |
| try { |
| boolean result = |
| plan.getReservationAgent().deleteReservation(reservationId, user, |
| plan); |
| if (!result) { |
| String errMsg = "Could not delete reservation: " + reservationId; |
| RMAuditLogger.logFailure(user, |
| AuditConstants.DELETE_RESERVATION_REQUEST, errMsg, |
| "ClientRMService", errMsg); |
| throw RPCUtil.getRemoteException(errMsg); |
| } |
| } catch (PlanningException e) { |
| RMAuditLogger.logFailure(user, AuditConstants.DELETE_RESERVATION_REQUEST, |
| e.getMessage(), "ClientRMService", |
| "Unable to delete the reservation: " + reservationId); |
| throw RPCUtil.getRemoteException(e); |
| } |
| RMAuditLogger.logSuccess(user, AuditConstants.DELETE_RESERVATION_REQUEST, |
| "ClientRMService: " + reservationId); |
| return response; |
| } |
| |
| @Override |
| public ReservationListResponse listReservations( |
| ReservationListRequest requestInfo) throws YarnException, IOException { |
| // Check if reservation system is enabled |
| checkReservationSystem(); |
| ReservationListResponse response = |
| recordFactory.newRecordInstance(ReservationListResponse.class); |
| |
| Plan plan = rValidator.validateReservationListRequest( |
| reservationSystem, requestInfo); |
| boolean includeResourceAllocations = requestInfo |
| .getIncludeResourceAllocations(); |
| |
| ReservationId reservationId = null; |
| if (requestInfo.getReservationId() != null && !requestInfo |
| .getReservationId().isEmpty()) { |
| reservationId = ReservationId.parseReservationId( |
| requestInfo.getReservationId()); |
| } |
| |
| checkReservationACLs(requestInfo.getQueue(), |
| AuditConstants.LIST_RESERVATION_REQUEST, reservationId); |
| |
| long startTime = Math.max(requestInfo.getStartTime(), 0); |
| long endTime = requestInfo.getEndTime() <= -1? Long.MAX_VALUE : requestInfo |
| .getEndTime(); |
| |
| Set<ReservationAllocation> reservations; |
| |
| reservations = plan.getReservations(reservationId, new ReservationInterval( |
| startTime, endTime)); |
| |
| List<ReservationAllocationState> info = |
| ReservationSystemUtil.convertAllocationsToReservationInfo( |
| reservations, includeResourceAllocations); |
| |
| response.setReservationAllocationState(info); |
| return response; |
| } |
| |
| @Override |
| public GetNodesToLabelsResponse getNodeToLabels( |
| GetNodesToLabelsRequest request) throws YarnException, IOException { |
| RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager(); |
| return GetNodesToLabelsResponse.newInstance(labelsMgr.getNodeLabels()); |
| } |
| |
| @Override |
| public GetLabelsToNodesResponse getLabelsToNodes( |
| GetLabelsToNodesRequest request) throws YarnException, IOException { |
| RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager(); |
| if (request.getNodeLabels() == null || request.getNodeLabels().isEmpty()) { |
| return GetLabelsToNodesResponse.newInstance(labelsMgr.getLabelsToNodes()); |
| } else { |
| return GetLabelsToNodesResponse.newInstance( |
| labelsMgr.getLabelsToNodes(request.getNodeLabels())); |
| } |
| } |
| |
| @Override |
| public GetClusterNodeLabelsResponse getClusterNodeLabels( |
| GetClusterNodeLabelsRequest request) throws YarnException, IOException { |
| RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager(); |
| return GetClusterNodeLabelsResponse.newInstance( |
| labelsMgr.getClusterNodeLabels()); |
| } |
| |
| private void checkReservationSystem() |
| throws YarnException { |
| // Check if reservation is enabled |
| if (reservationSystem == null) { |
| throw RPCUtil.getRemoteException("Reservation is not enabled." |
| + " Please enable & try again"); |
| } |
| } |
| |
| private void refreshScheduler(String planName, |
| ReservationDefinition contract, String reservationId) { |
| if ((contract.getArrival() - clock.getTime()) < reservationSystem |
| .getPlanFollowerTimeStep()) { |
| LOG.debug("Reservation {} is within threshold so attempting to" |
| + " create synchronously.", reservationId); |
| reservationSystem.synchronizePlan(planName, true); |
| LOG.info(MessageFormat.format("Created reservation {0} synchronously.", |
| reservationId)); |
| } |
| } |
| |
| private String checkReservationACLs(String queueName, String auditConstant, |
| ReservationId reservationId) |
| throws YarnException, IOException { |
| UserGroupInformation callerUGI; |
| try { |
| callerUGI = UserGroupInformation.getCurrentUser(); |
| } catch (IOException ie) { |
| RMAuditLogger.logFailure("UNKNOWN", auditConstant, queueName, |
| "ClientRMService", "Error getting UGI"); |
| throw RPCUtil.getRemoteException(ie); |
| } |
| |
| if (reservationSystem == null) { |
| return callerUGI.getShortUserName(); |
| } |
| |
| ReservationsACLsManager manager = reservationSystem |
| .getReservationsACLsManager(); |
| ReservationACL reservationACL = getReservationACLFromAuditConstant( |
| auditConstant); |
| |
| if (manager == null) { |
| return callerUGI.getShortUserName(); |
| } |
| |
| String reservationCreatorName = ""; |
| ReservationAllocation reservation; |
| // Get the user associated with the reservation. |
| Plan plan = reservationSystem.getPlan(queueName); |
| if (reservationId != null && plan != null) { |
| reservation = plan.getReservationById(reservationId); |
| if (reservation != null) { |
| reservationCreatorName = reservation.getUser(); |
| } |
| } |
| |
| // If the reservation to be altered or listed belongs to the current user, |
| // access will be given. |
| if (reservationCreatorName != null && !reservationCreatorName.isEmpty() |
| && reservationCreatorName.equals(callerUGI.getUserName())) { |
| return callerUGI.getShortUserName(); |
| } |
| |
| // Check if the user has access to the specific ACL |
| if (manager.checkAccess(callerUGI, reservationACL, queueName)) { |
| return callerUGI.getShortUserName(); |
| } |
| |
| // If the user has Administer ACL then access is granted |
| if (manager.checkAccess(callerUGI, ReservationACL |
| .ADMINISTER_RESERVATIONS, queueName)) { |
| return callerUGI.getShortUserName(); |
| } |
| |
| handleNoAccess(callerUGI.getShortUserName(), queueName, auditConstant, |
| reservationACL.toString(), reservationACL.name()); |
| throw new IllegalStateException(); |
| } |
| |
| private ReservationACL getReservationACLFromAuditConstant( |
| String auditConstant) throws YarnException{ |
| if (auditConstant.equals(AuditConstants.SUBMIT_RESERVATION_REQUEST)) { |
| return ReservationACL.SUBMIT_RESERVATIONS; |
| } else if (auditConstant.equals(AuditConstants.LIST_RESERVATION_REQUEST)) { |
| return ReservationACL.LIST_RESERVATIONS; |
| } else if (auditConstant.equals(AuditConstants.DELETE_RESERVATION_REQUEST) |
| || auditConstant.equals(AuditConstants.UPDATE_RESERVATION_REQUEST)) { |
| return ReservationACL.ADMINISTER_RESERVATIONS; |
| } else { |
| String error = "Audit Constant " + auditConstant + " is not recognized."; |
| LOG.error(error); |
| throw RPCUtil.getRemoteException(new UnrecognizedOptionException(error)); |
| } |
| } |
| |
| private void handleNoAccess(String name, String queue, String auditConstant, |
| String acl, String op) throws YarnException { |
| RMAuditLogger.logFailure( |
| name, |
| auditConstant, |
| "User doesn't have permissions to " + acl, "ClientRMService", |
| auditConstant); |
| throw RPCUtil.getRemoteException(new AccessControlException("User " |
| + name + " cannot perform operation " + op + " on queue " + queue)); |
| } |
| |
| @Override |
| public UpdateApplicationPriorityResponse updateApplicationPriority( |
| UpdateApplicationPriorityRequest request) throws YarnException, |
| IOException { |
| |
| ApplicationId applicationId = request.getApplicationId(); |
| Priority newAppPriority = request.getApplicationPriority(); |
| |
| UserGroupInformation callerUGI = |
| getCallerUgi(applicationId, AuditConstants.UPDATE_APP_PRIORITY); |
| RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI, |
| AuditConstants.UPDATE_APP_PRIORITY, ApplicationAccessType.MODIFY_APP, |
| true); |
| |
| UpdateApplicationPriorityResponse response = recordFactory |
| .newRecordInstance(UpdateApplicationPriorityResponse.class); |
| // Update priority only when app is tracked by the scheduler |
| if (!ACTIVE_APP_STATES.contains(application.getState())) { |
| if (application.isAppInCompletedStates()) { |
| // If Application is in any of the final states, change priority |
| // can be skipped rather throwing exception. |
| RMAuditLogger.logSuccess(callerUGI.getShortUserName(), |
| AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService", |
| applicationId); |
| response.setApplicationPriority(application |
| .getApplicationPriority()); |
| return response; |
| } |
| String msg = "Application in " + application.getState() |
| + " state cannot update priority."; |
| RMAuditLogger |
| .logFailure(callerUGI.getShortUserName(), |
| AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService", |
| msg); |
| throw new YarnException(msg); |
| } |
| |
| try { |
| rmAppManager.updateApplicationPriority(callerUGI, |
| application.getApplicationId(), |
| newAppPriority); |
| } catch (YarnException ex) { |
| RMAuditLogger.logFailure(callerUGI.getShortUserName(), |
| AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService", |
| ex.getMessage()); |
| throw ex; |
| } |
| |
| RMAuditLogger.logSuccess(callerUGI.getShortUserName(), |
| AuditConstants.UPDATE_APP_PRIORITY, "ClientRMService", applicationId); |
| response.setApplicationPriority(application.getApplicationPriority()); |
| return response; |
| } |
| |
| /** |
| * Send a signal to a container. |
| * |
| * After the request passes some sanity check, it will be delivered |
| * to RMNodeImpl so that the next NM heartbeat will pick up the signal request |
| * @param request request to signal a container |
| * @return the response of sending signal request |
| * @throws YarnException rpc related exception |
| * @throws IOException fail to obtain user group information |
| */ |
| @SuppressWarnings("unchecked") |
| @Override |
| public SignalContainerResponse signalToContainer( |
| SignalContainerRequest request) throws YarnException, IOException { |
| ContainerId containerId = request.getContainerId(); |
| |
| ApplicationId applicationId = containerId.getApplicationAttemptId(). |
| getApplicationId(); |
| UserGroupInformation callerUGI = getCallerUgi(applicationId, |
| AuditConstants.SIGNAL_CONTAINER); |
| RMApp application = this.rmContext.getRMApps().get(applicationId); |
| if (application == null) { |
| RMAuditLogger.logFailure(callerUGI.getUserName(), |
| AuditConstants.SIGNAL_CONTAINER, "UNKNOWN", "ClientRMService", |
| "Trying to signal an absent container", applicationId, containerId, null); |
| throw RPCUtil |
| .getRemoteException("Trying to signal an absent container " |
| + containerId); |
| } |
| |
| if (!checkAccess(callerUGI, application.getUser(), |
| ApplicationAccessType.MODIFY_APP, application)) { |
| RMAuditLogger.logFailure(callerUGI.getShortUserName(), |
| AuditConstants.SIGNAL_CONTAINER, "User doesn't have permissions to " |
| + ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService", |
| AuditConstants.UNAUTHORIZED_USER, applicationId); |
| throw RPCUtil.getRemoteException(new AccessControlException("User " |
| + callerUGI.getShortUserName() + " cannot perform operation " |
| + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId)); |
| } |
| |
| RMContainer container = scheduler.getRMContainer(containerId); |
| if (container != null) { |
| this.rmContext.getDispatcher().getEventHandler().handle( |
| new RMNodeSignalContainerEvent(container.getContainer().getNodeId(), |
| request)); |
| RMAuditLogger.logSuccess(callerUGI.getShortUserName(), |
| AuditConstants.SIGNAL_CONTAINER, "ClientRMService", applicationId, |
| containerId, null); |
| } else { |
| RMAuditLogger.logFailure(callerUGI.getUserName(), |
| AuditConstants.SIGNAL_CONTAINER, "UNKNOWN", "ClientRMService", |
| "Trying to signal an absent container", applicationId, containerId, null); |
| throw RPCUtil |
| .getRemoteException("Trying to signal an absent container " |
| + containerId); |
| } |
| |
| return recordFactory |
| .newRecordInstance(SignalContainerResponse.class); |
| } |
| |
| @Override |
| public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( |
| UpdateApplicationTimeoutsRequest request) |
| throws YarnException, IOException { |
| |
| ApplicationId applicationId = request.getApplicationId(); |
| Map<ApplicationTimeoutType, String> applicationTimeouts = |
| request.getApplicationTimeouts(); |
| |
| UserGroupInformation callerUGI = |
| getCallerUgi(applicationId, AuditConstants.UPDATE_APP_TIMEOUTS); |
| RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI, |
| AuditConstants.UPDATE_APP_TIMEOUTS, ApplicationAccessType.MODIFY_APP, |
| true); |
| |
| if (applicationTimeouts.isEmpty()) { |
| String message = |
| "At least one ApplicationTimeoutType should be configured" |
| + " for updating timeouts."; |
| RMAuditLogger.logFailure(callerUGI.getShortUserName(), |
| AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService", |
| message, applicationId); |
| throw RPCUtil.getRemoteException(message); |
| } |
| |
| UpdateApplicationTimeoutsResponse response = recordFactory |
| .newRecordInstance(UpdateApplicationTimeoutsResponse.class); |
| |
| RMAppState state = application.getState(); |
| if (!EnumSet |
| .of(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppState.RUNNING) |
| .contains(state)) { |
| if (application.isAppInCompletedStates()) { |
| // If Application is in any of the final states, update timeout |
| // can be skipped rather throwing exception. |
| RMAuditLogger.logSuccess(callerUGI.getShortUserName(), |
| AuditConstants.UPDATE_APP_TIMEOUTS, "ClientRMService", |
| applicationId); |
| response.setApplicationTimeouts(applicationTimeouts); |
| return response; |
| } |
| String msg = |
| "Application is in " + state + " state can not update timeout."; |
| RMAuditLogger.logFailure(callerUGI.getShortUserName(), |
| AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService", |
| msg); |
| throw RPCUtil.getRemoteException(msg); |
| } |
| |
| try { |
| applicationTimeouts = rmAppManager.updateApplicationTimeout(application, |
| applicationTimeouts); |
| } catch (YarnException ex) { |
| RMAuditLogger.logFailure(callerUGI.getShortUserName(), |
| AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService", |
| ex.getMessage()); |
| throw ex; |
| } |
| |
| RMAuditLogger.logSuccess(callerUGI.getShortUserName(), |
| AuditConstants.UPDATE_APP_TIMEOUTS, "ClientRMService", applicationId); |
| response.setApplicationTimeouts(applicationTimeouts); |
| return response; |
| } |
| |
| private UserGroupInformation getCallerUgi(ApplicationId applicationId, |
| String operation) throws YarnException { |
| UserGroupInformation callerUGI; |
| try { |
| callerUGI = UserGroupInformation.getCurrentUser(); |
| } catch (IOException ie) { |
| LOG.info("Error getting UGI ", ie); |
| RMAuditLogger.logFailure("UNKNOWN", operation, "UNKNOWN", |
| "ClientRMService", "Error getting UGI", applicationId); |
| throw RPCUtil.getRemoteException(ie); |
| } |
| return callerUGI; |
| } |
| |
| private RMApp verifyUserAccessForRMApp(ApplicationId applicationId, |
| UserGroupInformation callerUGI, String operation, |
| ApplicationAccessType accessType, |
| boolean needCheckAccess) throws YarnException { |
| RMApp application = this.rmContext.getRMApps().get(applicationId); |
| if (application == null) { |
| RMAuditLogger.logFailure(callerUGI.getUserName(), operation, "UNKNOWN", |
| "ClientRMService", |
| "Trying to " + operation + " of an absent application", |
| applicationId); |
| // If the RM doesn't have the application, throw |
| // ApplicationNotFoundException and let client to handle. |
| throw new ApplicationNotFoundException("Application with id '" |
| + applicationId + "' doesn't exist in RM. " |
| + "Please check that the job " |
| + "submission was successful."); |
| } |
| |
| if (needCheckAccess) { |
| if (!checkAccess(callerUGI, application.getUser(), |
| accessType, application)) { |
| RMAuditLogger.logFailure(callerUGI.getShortUserName(), operation, |
| "User doesn't have permissions to " |
| + accessType.toString(), |
| "ClientRMService", AuditConstants.UNAUTHORIZED_USER, |
| applicationId); |
| throw RPCUtil.getRemoteException(new AccessControlException("User " |
| + callerUGI.getShortUserName() + " cannot perform operation " |
| + accessType.name() + " on " + applicationId)); |
| } |
| } |
| return application; |
| } |
| |
| @Override |
| public GetAllResourceProfilesResponse getResourceProfiles( |
| GetAllResourceProfilesRequest request) throws YarnException, IOException { |
| GetAllResourceProfilesResponse response = |
| GetAllResourceProfilesResponse.newInstance(); |
| response.setResourceProfiles(resourceProfilesManager.getResourceProfiles()); |
| return response; |
| } |
| |
| @Override |
| public GetResourceProfileResponse getResourceProfile( |
| GetResourceProfileRequest request) throws YarnException, IOException { |
| GetResourceProfileResponse response = |
| GetResourceProfileResponse.newInstance(); |
| response.setResource( |
| resourceProfilesManager.getProfile(request.getProfileName())); |
| return response; |
| } |
| |
| @Override |
| public GetAllResourceTypeInfoResponse getResourceTypeInfo( |
| GetAllResourceTypeInfoRequest request) throws YarnException, IOException { |
| GetAllResourceTypeInfoResponse response = |
| GetAllResourceTypeInfoResponse.newInstance(); |
| response.setResourceTypeInfo(ResourceUtils.getResourcesTypeInfo()); |
| return response; |
| } |
| |
| @Override |
| public GetAttributesToNodesResponse getAttributesToNodes( |
| GetAttributesToNodesRequest request) throws YarnException, IOException { |
| NodeAttributesManager attributesManager = |
| rmContext.getNodeAttributesManager(); |
| Map<NodeAttributeKey, List<NodeToAttributeValue>> attrToNodesWithStrVal = |
| new HashMap<>(); |
| Map<NodeAttributeKey, Map<String, AttributeValue>> attributesToNodes = |
| attributesManager.getAttributesToNodes(request.getNodeAttributes()); |
| for (Map.Entry<NodeAttributeKey, Map<String, AttributeValue>> attrib : |
| attributesToNodes.entrySet()) { |
| Map<String, AttributeValue> nodesToVal = attrib.getValue(); |
| List<NodeToAttributeValue> nodeToAttrValList = new ArrayList<>(); |
| for (Map.Entry<String, AttributeValue> nodeToVal : nodesToVal |
| .entrySet()) { |
| nodeToAttrValList.add(NodeToAttributeValue |
| .newInstance(nodeToVal.getKey(), nodeToVal.getValue().getValue())); |
| } |
| attrToNodesWithStrVal.put(attrib.getKey(), nodeToAttrValList); |
| } |
| GetAttributesToNodesResponse response = |
| GetAttributesToNodesResponse.newInstance(attrToNodesWithStrVal); |
| return response; |
| } |
| |
| @Override |
| public GetClusterNodeAttributesResponse getClusterNodeAttributes( |
| GetClusterNodeAttributesRequest request) |
| throws YarnException, IOException { |
| NodeAttributesManager attributesManager = |
| rmContext.getNodeAttributesManager(); |
| Set<NodeAttribute> attributes = |
| attributesManager.getClusterNodeAttributes(null); |
| |
| GetClusterNodeAttributesResponse response = |
| GetClusterNodeAttributesResponse.newInstance( |
| attributes.stream().map(attr -> NodeAttributeInfo.newInstance(attr)) |
| .collect(Collectors.toSet())); |
| return response; |
| } |
| |
| @Override |
| public GetNodesToAttributesResponse getNodesToAttributes( |
| GetNodesToAttributesRequest request) throws YarnException, IOException { |
| NodeAttributesManager attributesManager = |
| rmContext.getNodeAttributesManager(); |
| GetNodesToAttributesResponse response = GetNodesToAttributesResponse |
| .newInstance( |
| attributesManager.getNodesToAttributes(request.getHostNames())); |
| return response; |
| } |
| |
| @VisibleForTesting |
| public void setDisplayPerUserApps(boolean displayPerUserApps) { |
| this.filterAppsByUser = displayPerUserApps; |
| } |
| } |