blob: cded2ec2de18ad6bfd05c1dd3cdcf33fb3d3a897 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.AccessControlException;
import java.security.Principal;
import java.security.PrivilegedExceptionAction;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.FormParam;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import com.google.common.base.Joiner;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationHandler;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FifoSchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntry;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDefinitionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteResponseInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationListInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateResponseInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ConfInfo;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.webapp.WebServices;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.AdHocLogDumper;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.ForbiddenException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@Singleton
@Path(RMWSConsts.RM_WEB_SERVICE_PATH)
public class RMWebServices extends WebServices implements RMWebServiceProtocol {
private static final Log LOG =
LogFactory.getLog(RMWebServices.class.getName());
private final ResourceManager rm;
private static RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private final Configuration conf;
private @Context HttpServletResponse response;
// -------Default values of QueryParams for RMWebServiceProtocol--------
public static final String DEFAULT_QUEUE = "default";
public static final String DEFAULT_RESERVATION_ID = "";
public static final String DEFAULT_START_TIME = "0";
public static final String DEFAULT_END_TIME = "-1";
public static final String DEFAULT_INCLUDE_RESOURCE = "false";
@VisibleForTesting
boolean isCentralizedNodeLabelConfiguration = true;
private boolean displayPerUserApps = false;
public final static String DELEGATION_TOKEN_HEADER =
"Hadoop-YARN-RM-Delegation-Token";
@Inject
public RMWebServices(final ResourceManager rm, Configuration conf) {
// don't inject, always take appBaseRoot from RM.
super(null);
this.rm = rm;
this.conf = conf;
isCentralizedNodeLabelConfiguration =
YarnConfiguration.isCentralizedNodeLabelConfiguration(conf);
this.displayPerUserApps = conf.getBoolean(
YarnConfiguration.DISPLAY_APPS_FOR_LOGGED_IN_USER,
YarnConfiguration.DEFAULT_DISPLAY_APPS_FOR_LOGGED_IN_USER);
}
RMWebServices(ResourceManager rm, Configuration conf,
HttpServletResponse response) {
this(rm, conf);
this.response = response;
}
protected Boolean hasAccess(RMApp app, HttpServletRequest hsr) {
// Check for the authorization.
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
List<String> forwardedAddresses = null;
String forwardedFor = hsr.getHeader(RMWSConsts.FORWARDED_FOR);
if (forwardedFor != null) {
forwardedAddresses = Arrays.asList(forwardedFor.split(","));
}
if (callerUGI != null
&& !(this.rm.getApplicationACLsManager().checkAccess(callerUGI,
ApplicationAccessType.VIEW_APP, app.getUser(),
app.getApplicationId())
|| this.rm.getQueueACLsManager().checkAccess(callerUGI,
QueueACL.ADMINISTER_QUEUE, app, hsr.getRemoteAddr(),
forwardedAddresses))) {
return false;
}
return true;
}
/**
* initForReadableEndpoints does the init for all readable REST end points.
*/
private void initForReadableEndpoints() {
// clear content type
response.setContentType(null);
}
/**
* initForWritableEndpoints does the init and acls verification for all
* writable REST end points.
* @param callerUGI remote caller who initiated the request
* @param doAdminACLsCheck boolean flag to indicate whether ACLs check is needed
* @throws AuthorizationException in case of no access to perfom this op.
*/
private void initForWritableEndpoints(UserGroupInformation callerUGI,
boolean doAdminACLsCheck) throws AuthorizationException {
// clear content type
response.setContentType(null);
if (callerUGI == null) {
String msg = "Unable to obtain user name, user not authenticated";
throw new AuthorizationException(msg);
}
if (UserGroupInformation.isSecurityEnabled() && isStaticUser(callerUGI)) {
String msg = "The default static user cannot carry out this operation.";
throw new ForbiddenException(msg);
}
if (doAdminACLsCheck) {
ApplicationACLsManager aclsManager = rm.getApplicationACLsManager();
if (aclsManager.areACLsEnabled()) {
if (!aclsManager.isAdmin(callerUGI)) {
String msg = "Only admins can carry out this operation.";
throw new ForbiddenException(msg);
}
}
}
}
@GET
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public ClusterInfo get() {
return getClusterInfo();
}
@GET
@Path(RMWSConsts.INFO)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public ClusterInfo getClusterInfo() {
initForReadableEndpoints();
return new ClusterInfo(this.rm);
}
@GET
@Path(RMWSConsts.METRICS)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public ClusterMetricsInfo getClusterMetricsInfo() {
initForReadableEndpoints();
return new ClusterMetricsInfo(this.rm);
}
@GET
@Path(RMWSConsts.SCHEDULER)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public SchedulerTypeInfo getSchedulerInfo() {
initForReadableEndpoints();
ResourceScheduler rs = rm.getResourceScheduler();
SchedulerInfo sinfo;
if (rs instanceof CapacityScheduler) {
CapacityScheduler cs = (CapacityScheduler) rs;
CSQueue root = cs.getRootQueue();
sinfo = new CapacitySchedulerInfo(root, cs);
} else if (rs instanceof FairScheduler) {
FairScheduler fs = (FairScheduler) rs;
sinfo = new FairSchedulerInfo(fs);
} else if (rs instanceof FifoScheduler) {
sinfo = new FifoSchedulerInfo(this.rm);
} else {
throw new NotFoundException("Unknown scheduler configured");
}
return new SchedulerTypeInfo(sinfo);
}
@POST
@Path(RMWSConsts.SCHEDULER_LOGS)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public String dumpSchedulerLogs(@FormParam(RMWSConsts.TIME) String time,
@Context HttpServletRequest hsr) throws IOException {
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, true);
ResourceScheduler rs = rm.getResourceScheduler();
int period = Integer.parseInt(time);
if (period <= 0) {
throw new BadRequestException("Period must be greater than 0");
}
final String logHierarchy =
"org.apache.hadoop.yarn.server.resourcemanager.scheduler";
String logfile = "yarn-scheduler-debug.log";
if (rs instanceof CapacityScheduler) {
logfile = "yarn-capacity-scheduler-debug.log";
} else if (rs instanceof FairScheduler) {
logfile = "yarn-fair-scheduler-debug.log";
}
AdHocLogDumper dumper = new AdHocLogDumper(logHierarchy, logfile);
// time period is sent to us in seconds
dumper.dumpLogs("DEBUG", period * 1000);
return "Capacity scheduler logs are being created.";
}
@GET
@Path(RMWSConsts.NODES)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public NodesInfo getNodes(@QueryParam(RMWSConsts.STATES) String states) {
initForReadableEndpoints();
ResourceScheduler sched = this.rm.getResourceScheduler();
if (sched == null) {
throw new NotFoundException("Null ResourceScheduler instance");
}
EnumSet<NodeState> acceptedStates;
if (states == null) {
acceptedStates = EnumSet.allOf(NodeState.class);
} else {
acceptedStates = EnumSet.noneOf(NodeState.class);
for (String stateStr : states.split(",")) {
acceptedStates
.add(NodeState.valueOf(StringUtils.toUpperCase(stateStr)));
}
}
Collection<RMNode> rmNodes =
RMServerUtils.queryRMNodes(this.rm.getRMContext(), acceptedStates);
NodesInfo nodesInfo = new NodesInfo();
for (RMNode rmNode : rmNodes) {
NodeInfo nodeInfo = new NodeInfo(rmNode, sched);
if (rmNode.getState().isInactiveState()) {
nodeInfo.setNodeHTTPAddress(RMWSConsts.EMPTY);
}
nodesInfo.add(nodeInfo);
}
return nodesInfo;
}
@GET
@Path(RMWSConsts.NODES_NODEID)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public NodeInfo getNode(@PathParam(RMWSConsts.NODEID) String nodeId) {
initForReadableEndpoints();
if (nodeId == null || nodeId.isEmpty()) {
throw new NotFoundException("nodeId, " + nodeId + ", is empty or null");
}
ResourceScheduler sched = this.rm.getResourceScheduler();
if (sched == null) {
throw new NotFoundException("Null ResourceScheduler instance");
}
NodeId nid = NodeId.fromString(nodeId);
RMNode ni = this.rm.getRMContext().getRMNodes().get(nid);
boolean isInactive = false;
if (ni == null) {
ni = this.rm.getRMContext().getInactiveRMNodes().get(nid);
if (ni == null) {
throw new NotFoundException("nodeId, " + nodeId + ", is not found");
}
isInactive = true;
}
NodeInfo nodeInfo = new NodeInfo(ni, sched);
if (isInactive) {
nodeInfo.setNodeHTTPAddress(RMWSConsts.EMPTY);
}
return nodeInfo;
}
@GET
@Path(RMWSConsts.APPS)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public AppsInfo getApps(@Context HttpServletRequest hsr,
@QueryParam(RMWSConsts.STATE) String stateQuery,
@QueryParam(RMWSConsts.STATES) Set<String> statesQuery,
@QueryParam(RMWSConsts.FINAL_STATUS) String finalStatusQuery,
@QueryParam(RMWSConsts.USER) String userQuery,
@QueryParam(RMWSConsts.QUEUE) String queueQuery,
@QueryParam(RMWSConsts.LIMIT) String count,
@QueryParam(RMWSConsts.STARTED_TIME_BEGIN) String startedBegin,
@QueryParam(RMWSConsts.STARTED_TIME_END) String startedEnd,
@QueryParam(RMWSConsts.FINISHED_TIME_BEGIN) String finishBegin,
@QueryParam(RMWSConsts.FINISHED_TIME_END) String finishEnd,
@QueryParam(RMWSConsts.APPLICATION_TYPES) Set<String> applicationTypes,
@QueryParam(RMWSConsts.APPLICATION_TAGS) Set<String> applicationTags,
@QueryParam(RMWSConsts.DESELECTS) Set<String> unselectedFields) {
initForReadableEndpoints();
boolean checkCount = false;
boolean checkStart = false;
boolean checkEnd = false;
boolean checkAppTypes = false;
boolean checkAppStates = false;
boolean checkAppTags = false;
long countNum = 0;
// set values suitable in case both of begin/end not specified
long sBegin = 0;
long sEnd = Long.MAX_VALUE;
long fBegin = 0;
long fEnd = Long.MAX_VALUE;
if (count != null && !count.isEmpty()) {
checkCount = true;
countNum = Long.parseLong(count);
if (countNum <= 0) {
throw new BadRequestException("limit value must be greater then 0");
}
}
if (startedBegin != null && !startedBegin.isEmpty()) {
checkStart = true;
sBegin = Long.parseLong(startedBegin);
if (sBegin < 0) {
throw new BadRequestException(
"startedTimeBegin must be greater than 0");
}
}
if (startedEnd != null && !startedEnd.isEmpty()) {
checkStart = true;
sEnd = Long.parseLong(startedEnd);
if (sEnd < 0) {
throw new BadRequestException("startedTimeEnd must be greater than 0");
}
}
if (sBegin > sEnd) {
throw new BadRequestException(
"startedTimeEnd must be greater than startTimeBegin");
}
if (finishBegin != null && !finishBegin.isEmpty()) {
checkEnd = true;
fBegin = Long.parseLong(finishBegin);
if (fBegin < 0) {
throw new BadRequestException("finishTimeBegin must be greater than 0");
}
}
if (finishEnd != null && !finishEnd.isEmpty()) {
checkEnd = true;
fEnd = Long.parseLong(finishEnd);
if (fEnd < 0) {
throw new BadRequestException("finishTimeEnd must be greater than 0");
}
}
if (fBegin > fEnd) {
throw new BadRequestException(
"finishTimeEnd must be greater than finishTimeBegin");
}
Set<String> appTypes = parseQueries(applicationTypes, false);
if (!appTypes.isEmpty()) {
checkAppTypes = true;
}
Set<String> appTags = parseQueries(applicationTags, false);
if (!appTags.isEmpty()) {
checkAppTags = true;
}
// stateQuery is deprecated.
if (stateQuery != null && !stateQuery.isEmpty()) {
statesQuery.add(stateQuery);
}
Set<String> appStates = parseQueries(statesQuery, true);
if (!appStates.isEmpty()) {
checkAppStates = true;
}
GetApplicationsRequest request = GetApplicationsRequest.newInstance();
if (checkStart) {
request.setStartRange(sBegin, sEnd);
}
if (checkEnd) {
request.setFinishRange(fBegin, fEnd);
}
if (checkCount) {
request.setLimit(countNum);
}
if (checkAppTypes) {
request.setApplicationTypes(appTypes);
}
if (checkAppTags) {
request.setApplicationTags(appTags);
}
if (checkAppStates) {
request.setApplicationStates(appStates);
}
if (queueQuery != null && !queueQuery.isEmpty()) {
ResourceScheduler rs = rm.getResourceScheduler();
if (rs instanceof CapacityScheduler) {
CapacityScheduler cs = (CapacityScheduler) rs;
// validate queue exists
try {
cs.getQueueInfo(queueQuery, false, false);
} catch (IOException e) {
throw new BadRequestException(e.getMessage());
}
}
Set<String> queues = new HashSet<String>(1);
queues.add(queueQuery);
request.setQueues(queues);
}
if (userQuery != null && !userQuery.isEmpty()) {
Set<String> users = new HashSet<String>(1);
users.add(userQuery);
request.setUsers(users);
}
List<ApplicationReport> appReports = null;
try {
appReports = rm.getClientRMService().getApplications(request)
.getApplicationList();
} catch (YarnException e) {
LOG.error("Unable to retrieve apps from ClientRMService", e);
throw new YarnRuntimeException(
"Unable to retrieve apps from ClientRMService", e);
}
final ConcurrentMap<ApplicationId, RMApp> apps =
rm.getRMContext().getRMApps();
AppsInfo allApps = new AppsInfo();
for (ApplicationReport report : appReports) {
RMApp rmapp = apps.get(report.getApplicationId());
if (rmapp == null) {
continue;
}
if (finalStatusQuery != null && !finalStatusQuery.isEmpty()) {
FinalApplicationStatus.valueOf(finalStatusQuery);
if (!rmapp.getFinalApplicationStatus().toString()
.equalsIgnoreCase(finalStatusQuery)) {
continue;
}
}
DeSelectFields deSelectFields = new DeSelectFields();
deSelectFields.initFields(unselectedFields);
boolean allowAccess = hasAccess(rmapp, hsr);
// Given RM is configured to display apps per user, skip apps to which
// this caller doesn't have access to view.
if (displayPerUserApps && !allowAccess) {
continue;
}
AppInfo app = new AppInfo(rm, rmapp, allowAccess,
WebAppUtils.getHttpSchemePrefix(conf), deSelectFields);
allApps.add(app);
}
return allApps;
}
@GET
@Path(RMWSConsts.SCHEDULER_ACTIVITIES)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public ActivitiesInfo getActivities(@Context HttpServletRequest hsr,
@QueryParam(RMWSConsts.NODEID) String nodeId) {
initForReadableEndpoints();
YarnScheduler scheduler = rm.getRMContext().getScheduler();
if (scheduler instanceof AbstractYarnScheduler) {
String errMessage = "";
AbstractYarnScheduler abstractYarnScheduler =
(AbstractYarnScheduler) scheduler;
ActivitiesManager activitiesManager =
abstractYarnScheduler.getActivitiesManager();
if (null == activitiesManager) {
errMessage = "Not Capacity Scheduler";
return new ActivitiesInfo(errMessage, nodeId);
}
List<FiCaSchedulerNode> nodeList =
abstractYarnScheduler.getNodeTracker().getAllNodes();
boolean illegalInput = false;
if (nodeList.size() == 0) {
illegalInput = true;
errMessage = "No node manager running in the cluster";
} else {
if (nodeId != null) {
String hostName = nodeId;
String portName = "";
if (nodeId.contains(":")) {
int index = nodeId.indexOf(":");
hostName = nodeId.substring(0, index);
portName = nodeId.substring(index + 1);
}
boolean correctNodeId = false;
for (FiCaSchedulerNode node : nodeList) {
if ((portName.equals("")
&& node.getRMNode().getHostName().equals(hostName))
|| (!portName.equals("")
&& node.getRMNode().getHostName().equals(hostName)
&& String.valueOf(node.getRMNode().getCommandPort())
.equals(portName))) {
correctNodeId = true;
nodeId = node.getNodeID().toString();
break;
}
}
if (!correctNodeId) {
illegalInput = true;
errMessage = "Cannot find node manager with given node id";
}
}
}
if (!illegalInput) {
activitiesManager.recordNextNodeUpdateActivities(nodeId);
return activitiesManager.getActivitiesInfo(nodeId);
}
// Return a activities info with error message
return new ActivitiesInfo(errMessage, nodeId);
}
return null;
}
@GET
@Path(RMWSConsts.SCHEDULER_APP_ACTIVITIES)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr,
@QueryParam(RMWSConsts.APP_ID) String appId,
@QueryParam(RMWSConsts.MAX_TIME) String time) {
initForReadableEndpoints();
YarnScheduler scheduler = rm.getRMContext().getScheduler();
if (scheduler instanceof AbstractYarnScheduler) {
AbstractYarnScheduler abstractYarnScheduler =
(AbstractYarnScheduler) scheduler;
ActivitiesManager activitiesManager =
abstractYarnScheduler.getActivitiesManager();
if (null == activitiesManager) {
String errMessage = "Not Capacity Scheduler";
return new AppActivitiesInfo(errMessage, appId);
}
if (appId == null) {
String errMessage = "Must provide an application Id";
return new AppActivitiesInfo(errMessage, null);
}
double maxTime = 3.0;
if (time != null) {
if (time.contains(".")) {
maxTime = Double.parseDouble(time);
} else {
maxTime = Double.parseDouble(time + ".0");
}
}
ApplicationId applicationId;
try {
applicationId = ApplicationId.fromString(appId);
activitiesManager.turnOnAppActivitiesRecording(applicationId, maxTime);
AppActivitiesInfo appActivitiesInfo =
activitiesManager.getAppActivitiesInfo(applicationId);
return appActivitiesInfo;
} catch (Exception e) {
String errMessage = "Cannot find application with given appId";
return new AppActivitiesInfo(errMessage, appId);
}
}
return null;
}
@GET
@Path(RMWSConsts.APP_STATISTICS)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public ApplicationStatisticsInfo getAppStatistics(
@Context HttpServletRequest hsr,
@QueryParam(RMWSConsts.STATES) Set<String> stateQueries,
@QueryParam(RMWSConsts.APPLICATION_TYPES) Set<String> typeQueries) {
initForReadableEndpoints();
// parse the params and build the scoreboard
// converting state/type name to lowercase
Set<String> states = parseQueries(stateQueries, true);
Set<String> types = parseQueries(typeQueries, false);
// if no types, counts the applications of any types
if (types.size() == 0) {
types.add(RMWSConsts.ANY);
} else if (types.size() != 1) {
throw new BadRequestException("# of applicationTypes = " + types.size()
+ ", we temporarily support at most one applicationType");
}
// if no states, returns the counts of all RMAppStates
if (states.size() == 0) {
for (YarnApplicationState state : YarnApplicationState.values()) {
states.add(StringUtils.toLowerCase(state.toString()));
}
}
// in case we extend to multiple applicationTypes in the future
Map<YarnApplicationState, Map<String, Long>> scoreboard =
buildScoreboard(states, types);
// go through the apps in RM to count the numbers, ignoring the case of
// the state/type name
ConcurrentMap<ApplicationId, RMApp> apps = rm.getRMContext().getRMApps();
for (RMApp rmapp : apps.values()) {
YarnApplicationState state = rmapp.createApplicationState();
String type = StringUtils.toLowerCase(rmapp.getApplicationType().trim());
if (states.contains(StringUtils.toLowerCase(state.toString()))) {
if (types.contains(RMWSConsts.ANY)) {
countApp(scoreboard, state, RMWSConsts.ANY);
} else if (types.contains(type)) {
countApp(scoreboard, state, type);
}
}
}
// fill the response object
ApplicationStatisticsInfo appStatInfo = new ApplicationStatisticsInfo();
for (Map.Entry<YarnApplicationState, Map<String, Long>> partScoreboard : scoreboard
.entrySet()) {
for (Map.Entry<String, Long> statEntry : partScoreboard.getValue()
.entrySet()) {
StatisticsItemInfo statItem = new StatisticsItemInfo(
partScoreboard.getKey(), statEntry.getKey(), statEntry.getValue());
appStatInfo.add(statItem);
}
}
return appStatInfo;
}
private static Map<YarnApplicationState, Map<String, Long>> buildScoreboard(
Set<String> states, Set<String> types) {
Map<YarnApplicationState, Map<String, Long>> scoreboard =
new HashMap<YarnApplicationState, Map<String, Long>>();
// default states will result in enumerating all YarnApplicationStates
assert !states.isEmpty();
for (String state : states) {
Map<String, Long> partScoreboard = new HashMap<String, Long>();
scoreboard.put(
YarnApplicationState.valueOf(StringUtils.toUpperCase(state)),
partScoreboard);
// types is verified no to be empty
for (String type : types) {
partScoreboard.put(type, 0L);
}
}
return scoreboard;
}
private static void countApp(
Map<YarnApplicationState, Map<String, Long>> scoreboard,
YarnApplicationState state, String type) {
Map<String, Long> partScoreboard = scoreboard.get(state);
Long count = partScoreboard.get(type);
partScoreboard.put(type, count + 1L);
}
@GET
@Path(RMWSConsts.APPS_APPID)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public AppInfo getApp(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId,
@QueryParam(RMWSConsts.DESELECTS) Set<String> unselectedFields) {
initForReadableEndpoints();
ApplicationId id = WebAppUtils.parseApplicationId(recordFactory, appId);
RMApp app = rm.getRMContext().getRMApps().get(id);
if (app == null) {
throw new NotFoundException("app with id: " + appId + " not found");
}
DeSelectFields deSelectFields = new DeSelectFields();
deSelectFields.initFields(unselectedFields);
return new AppInfo(rm, app, hasAccess(app, hsr), hsr.getScheme() + "://",
deSelectFields);
}
@GET
@Path(RMWSConsts.APPS_APPID_APPATTEMPTS)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public AppAttemptsInfo getAppAttempts(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) {
initForReadableEndpoints();
ApplicationId id = WebAppUtils.parseApplicationId(recordFactory, appId);
RMApp app = rm.getRMContext().getRMApps().get(id);
if (app == null) {
throw new NotFoundException("app with id: " + appId + " not found");
}
AppAttemptsInfo appAttemptsInfo = new AppAttemptsInfo();
for (RMAppAttempt attempt : app.getAppAttempts().values()) {
AppAttemptInfo attemptInfo = new AppAttemptInfo(rm, attempt,
app.getUser(), hsr.getScheme() + "://");
appAttemptsInfo.add(attemptInfo);
}
return appAttemptsInfo;
}
@GET
@Path(RMWSConsts.APPS_APPID_APPATTEMPTS_APPATTEMPTID)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo getAppAttempt(
@Context HttpServletRequest req, @Context HttpServletResponse res,
@PathParam(RMWSConsts.APPID) String appId,
@PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId) {
initForReadableEndpoints(res);
return super.getAppAttempt(req, res, appId, appAttemptId);
}
@GET
@Path(RMWSConsts.APPS_APPID_APPATTEMPTS_APPATTEMPTID_CONTAINERS)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public ContainersInfo getContainers(@Context HttpServletRequest req,
@Context HttpServletResponse res,
@PathParam(RMWSConsts.APPID) String appId,
@PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId) {
initForReadableEndpoints(res);
return super.getContainers(req, res, appId, appAttemptId);
}
@GET
@Path("/apps/{appid}/appattempts/{appattemptid}/containers/{containerid}")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public ContainerInfo getContainer(@Context HttpServletRequest req,
@Context HttpServletResponse res,
@PathParam(RMWSConsts.APPID) String appId,
@PathParam(RMWSConsts.APPATTEMPTID) String appAttemptId,
@PathParam("containerid") String containerId) {
initForReadableEndpoints(res);
return super.getContainer(req, res, appId, appAttemptId, containerId);
}
@GET
@Path("/apps/{appid}/state")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public AppState getAppState(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException {
initForReadableEndpoints();
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
String userName = "";
if (callerUGI != null) {
userName = callerUGI.getUserName();
}
RMApp app = null;
try {
app = getRMAppForAppId(appId);
} catch (NotFoundException e) {
RMAuditLogger.logFailure(userName, AuditConstants.GET_APP_STATE,
"UNKNOWN", "RMWebService",
"Trying to get state of an absent application " + appId);
throw e;
}
AppState ret = new AppState();
ret.setState(app.getState().toString());
return ret;
}
// can't return POJO because we can't control the status code
// it's always set to 200 when we need to allow it to be set
// to 202
@PUT
@Path(RMWSConsts.APPS_APPID_STATE)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public Response updateAppState(AppState targetState,
@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, false);
String userName = callerUGI.getUserName();
RMApp app = null;
try {
app = getRMAppForAppId(appId);
} catch (NotFoundException e) {
RMAuditLogger.logFailure(userName, AuditConstants.KILL_APP_REQUEST,
"UNKNOWN", "RMWebService",
"Trying to kill an absent application " + appId);
throw e;
}
if (!app.getState().toString().equals(targetState.getState())) {
// user is attempting to change state. right we only
// allow users to kill the app
if (targetState.getState()
.equals(YarnApplicationState.KILLED.toString())) {
return killApp(app, callerUGI, hsr, targetState.getDiagnostics());
}
throw new BadRequestException(
"Only '" + YarnApplicationState.KILLED.toString()
+ "' is allowed as a target state.");
}
AppState ret = new AppState();
ret.setState(app.getState().toString());
return Response.status(Status.OK).entity(ret).build();
}
@GET
@Path(RMWSConsts.GET_NODE_TO_LABELS)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public NodeToLabelsInfo getNodeToLabels(@Context HttpServletRequest hsr)
throws IOException {
initForReadableEndpoints();
NodeToLabelsInfo ntl = new NodeToLabelsInfo();
HashMap<String, NodeLabelsInfo> ntlMap = ntl.getNodeToLabels();
Map<NodeId, Set<NodeLabel>> nodeIdToLabels =
rm.getRMContext().getNodeLabelManager().getNodeLabelsInfo();
for (Map.Entry<NodeId, Set<NodeLabel>> nitle : nodeIdToLabels.entrySet()) {
List<NodeLabel> labels = new ArrayList<NodeLabel>(nitle.getValue());
ntlMap.put(nitle.getKey().toString(), new NodeLabelsInfo(labels));
}
return ntl;
}
@GET
@Path(RMWSConsts.LABEL_MAPPINGS)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public LabelsToNodesInfo getLabelsToNodes(
@QueryParam(RMWSConsts.LABELS) Set<String> labels) throws IOException {
initForReadableEndpoints();
LabelsToNodesInfo lts = new LabelsToNodesInfo();
Map<NodeLabelInfo, NodeIDsInfo> ltsMap = lts.getLabelsToNodes();
Map<NodeLabel, Set<NodeId>> labelsToNodeId = null;
if (labels == null || labels.size() == 0) {
labelsToNodeId =
rm.getRMContext().getNodeLabelManager().getLabelsInfoToNodes();
} else {
labelsToNodeId =
rm.getRMContext().getNodeLabelManager().getLabelsInfoToNodes(labels);
}
for (Entry<NodeLabel, Set<NodeId>> entry : labelsToNodeId.entrySet()) {
List<String> nodeIdStrList = new ArrayList<String>();
for (NodeId nodeId : entry.getValue()) {
nodeIdStrList.add(nodeId.toString());
}
ltsMap.put(new NodeLabelInfo(entry.getKey()),
new NodeIDsInfo(nodeIdStrList));
}
return lts;
}
@POST
@Path(RMWSConsts.REPLACE_NODE_TO_LABELS)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public Response replaceLabelsOnNodes(
final NodeToLabelsEntryList newNodeToLabels,
@Context HttpServletRequest hsr) throws IOException {
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, false);
Map<NodeId, Set<String>> nodeIdToLabels =
new HashMap<NodeId, Set<String>>();
for (NodeToLabelsEntry nitle : newNodeToLabels.getNodeToLabels()) {
nodeIdToLabels.put(
ConverterUtils.toNodeIdWithDefaultPort(nitle.getNodeId()),
new HashSet<String>(nitle.getNodeLabels()));
}
return replaceLabelsOnNode(nodeIdToLabels, hsr, "/replace-node-to-labels");
}
@POST
@Path(RMWSConsts.NODES_NODEID_REPLACE_LABELS)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public Response replaceLabelsOnNode(
@QueryParam("labels") Set<String> newNodeLabelsName,
@Context HttpServletRequest hsr, @PathParam("nodeId") String nodeId)
throws Exception {
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, false);
NodeId nid = ConverterUtils.toNodeIdWithDefaultPort(nodeId);
Map<NodeId, Set<String>> newLabelsForNode =
new HashMap<NodeId, Set<String>>();
newLabelsForNode.put(nid, new HashSet<String>(newNodeLabelsName));
return replaceLabelsOnNode(newLabelsForNode, hsr,
"/nodes/nodeid/replace-labels");
}
private Response replaceLabelsOnNode(
Map<NodeId, Set<String>> newLabelsForNode, HttpServletRequest hsr,
String operation) throws IOException {
NodeLabelsUtils.verifyCentralizedNodeLabelConfEnabled("replaceLabelsOnNode",
isCentralizedNodeLabelConfiguration);
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
if (callerUGI == null) {
String msg = "Unable to obtain user name, user not authenticated for"
+ " post to ..." + operation;
throw new AuthorizationException(msg);
}
if (!rm.getRMContext().getNodeLabelManager().checkAccess(callerUGI)) {
String msg = "User " + callerUGI.getShortUserName() + " not authorized"
+ " for post to ..." + operation;
throw new AuthorizationException(msg);
}
try {
rm.getRMContext().getNodeLabelManager()
.replaceLabelsOnNode(newLabelsForNode);
} catch (IOException e) {
throw new BadRequestException(e);
}
return Response.status(Status.OK).build();
}
@GET
@Path(RMWSConsts.GET_NODE_LABELS)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public NodeLabelsInfo getClusterNodeLabels(@Context HttpServletRequest hsr)
throws IOException {
initForReadableEndpoints();
List<NodeLabel> nodeLabels =
rm.getRMContext().getNodeLabelManager().getClusterNodeLabels();
NodeLabelsInfo ret = new NodeLabelsInfo(nodeLabels);
return ret;
}
@POST
@Path(RMWSConsts.ADD_NODE_LABELS)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public Response addToClusterNodeLabels(final NodeLabelsInfo newNodeLabels,
@Context HttpServletRequest hsr) throws Exception {
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, false);
if (!rm.getRMContext().getNodeLabelManager().checkAccess(callerUGI)) {
String msg = "User " + callerUGI.getShortUserName() + " not authorized"
+ " for post to .../add-node-labels ";
throw new AuthorizationException(msg);
}
try {
rm.getRMContext().getNodeLabelManager()
.addToCluserNodeLabels(newNodeLabels.getNodeLabels());
} catch (IOException e) {
throw new BadRequestException(e);
}
return Response.status(Status.OK).build();
}
@POST
@Path(RMWSConsts.REMOVE_NODE_LABELS)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public Response removeFromCluserNodeLabels(
@QueryParam(RMWSConsts.LABELS) Set<String> oldNodeLabels,
@Context HttpServletRequest hsr) throws Exception {
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, false);
if (!rm.getRMContext().getNodeLabelManager().checkAccess(callerUGI)) {
String msg = "User " + callerUGI.getShortUserName() + " not authorized"
+ " for post to .../remove-node-labels ";
throw new AuthorizationException(msg);
}
try {
rm.getRMContext().getNodeLabelManager()
.removeFromClusterNodeLabels(new HashSet<String>(oldNodeLabels));
} catch (IOException e) {
throw new BadRequestException(e);
}
return Response.status(Status.OK).build();
}
@GET
@Path(RMWSConsts.NODES_NODEID_GETLABELS)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public NodeLabelsInfo getLabelsOnNode(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.NODEID) String nodeId) throws IOException {
initForReadableEndpoints();
NodeId nid = ConverterUtils.toNodeIdWithDefaultPort(nodeId);
List<NodeLabel> labels = new ArrayList<NodeLabel>(
rm.getRMContext().getNodeLabelManager().getLabelsInfoByNode(nid));
return new NodeLabelsInfo(labels);
}
protected Response killApp(RMApp app, UserGroupInformation callerUGI,
HttpServletRequest hsr, final String diagnostic)
throws IOException, InterruptedException {
if (app == null) {
throw new IllegalArgumentException("app cannot be null");
}
String userName = callerUGI.getUserName();
final ApplicationId appid = app.getApplicationId();
KillApplicationResponse resp = null;
try {
resp = callerUGI
.doAs(new PrivilegedExceptionAction<KillApplicationResponse>() {
@Override
public KillApplicationResponse run()
throws IOException, YarnException {
KillApplicationRequest req =
KillApplicationRequest.newInstance(appid);
if (diagnostic != null) {
req.setDiagnostics(diagnostic);
}
return rm.getClientRMService().forceKillApplication(req);
}
});
} catch (UndeclaredThrowableException ue) {
// if the root cause is a permissions issue
// bubble that up to the user
if (ue.getCause() instanceof YarnException) {
YarnException ye = (YarnException) ue.getCause();
if (ye.getCause() instanceof AccessControlException) {
String appId = app.getApplicationId().toString();
String msg = "Unauthorized attempt to kill appid " + appId
+ " by remote user " + userName;
return Response.status(Status.FORBIDDEN).entity(msg).build();
} else {
throw ue;
}
} else {
throw ue;
}
}
AppState ret = new AppState();
ret.setState(app.getState().toString());
if (resp.getIsKillCompleted()) {
RMAuditLogger.logSuccess(userName, AuditConstants.KILL_APP_REQUEST,
"RMWebService", app.getApplicationId());
} else {
return Response.status(Status.ACCEPTED).entity(ret)
.header(HttpHeaders.LOCATION, hsr.getRequestURL()).build();
}
return Response.status(Status.OK).entity(ret).build();
}
@GET
@Path(RMWSConsts.APPS_APPID_PRIORITY)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public AppPriority getAppPriority(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException {
initForReadableEndpoints();
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
String userName = "UNKNOWN-USER";
if (callerUGI != null) {
userName = callerUGI.getUserName();
}
RMApp app = null;
try {
app = getRMAppForAppId(appId);
} catch (NotFoundException e) {
RMAuditLogger.logFailure(userName, AuditConstants.GET_APP_PRIORITY,
"UNKNOWN", "RMWebService",
"Trying to get priority of an absent application " + appId);
throw e;
}
AppPriority ret = new AppPriority();
ret.setPriority(app.getApplicationPriority().getPriority());
return ret;
}
@PUT
@Path(RMWSConsts.APPS_APPID_PRIORITY)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public Response updateApplicationPriority(AppPriority targetPriority,
@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, false);
if (targetPriority == null) {
throw new YarnException("Target Priority cannot be null");
}
String userName = callerUGI.getUserName();
RMApp app = null;
try {
app = getRMAppForAppId(appId);
} catch (NotFoundException e) {
RMAuditLogger.logFailure(userName, AuditConstants.UPDATE_APP_PRIORITY,
"UNKNOWN", "RMWebService",
"Trying to update priority an absent application " + appId);
throw e;
}
Priority priority = app.getApplicationPriority();
if (priority == null
|| priority.getPriority() != targetPriority.getPriority()) {
return modifyApplicationPriority(app, callerUGI,
targetPriority.getPriority());
}
return Response.status(Status.OK).entity(targetPriority).build();
}
private Response modifyApplicationPriority(final RMApp app,
UserGroupInformation callerUGI, final int appPriority)
throws IOException, InterruptedException {
String userName = callerUGI.getUserName();
try {
callerUGI.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws IOException, YarnException {
Priority priority = Priority.newInstance(appPriority);
UpdateApplicationPriorityRequest request =
UpdateApplicationPriorityRequest
.newInstance(app.getApplicationId(), priority);
rm.getClientRMService().updateApplicationPriority(request);
return null;
}
});
} catch (UndeclaredThrowableException ue) {
// if the root cause is a permissions issue
// bubble that up to the user
if (ue.getCause() instanceof YarnException) {
YarnException ye = (YarnException) ue.getCause();
if (ye.getCause() instanceof AccessControlException) {
String appId = app.getApplicationId().toString();
String msg = "Unauthorized attempt to change priority of appid "
+ appId + " by remote user " + userName;
return Response.status(Status.FORBIDDEN).entity(msg).build();
} else if (ye.getMessage().startsWith("Application in")
&& ye.getMessage().endsWith("state cannot be update priority.")) {
return Response.status(Status.BAD_REQUEST).entity(ye.getMessage())
.build();
} else {
throw ue;
}
} else {
throw ue;
}
}
AppPriority ret =
new AppPriority(app.getApplicationPriority().getPriority());
return Response.status(Status.OK).entity(ret).build();
}
@GET
@Path(RMWSConsts.APPS_APPID_QUEUE)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public AppQueue getAppQueue(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException {
initForReadableEndpoints();
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
String userName = "UNKNOWN-USER";
if (callerUGI != null) {
userName = callerUGI.getUserName();
}
RMApp app = null;
try {
app = getRMAppForAppId(appId);
} catch (NotFoundException e) {
RMAuditLogger.logFailure(userName, AuditConstants.GET_APP_QUEUE,
"UNKNOWN", "RMWebService",
"Trying to get queue of an absent application " + appId);
throw e;
}
AppQueue ret = new AppQueue();
ret.setQueue(app.getQueue());
return ret;
}
@PUT
@Path(RMWSConsts.APPS_APPID_QUEUE)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public Response updateAppQueue(AppQueue targetQueue,
@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, false);
String userName = callerUGI.getUserName();
RMApp app = null;
try {
app = getRMAppForAppId(appId);
} catch (NotFoundException e) {
RMAuditLogger.logFailure(userName, AuditConstants.MOVE_APP_REQUEST,
"UNKNOWN", "RMWebService",
"Trying to move an absent application " + appId);
throw e;
}
if (!app.getQueue().equals(targetQueue.getQueue())) {
// user is attempting to change queue.
return moveApp(app, callerUGI, targetQueue.getQueue());
}
AppQueue ret = new AppQueue();
ret.setQueue(app.getQueue());
return Response.status(Status.OK).entity(ret).build();
}
protected Response moveApp(RMApp app, UserGroupInformation callerUGI,
String targetQueue) throws IOException, InterruptedException {
if (app == null) {
throw new IllegalArgumentException("app cannot be null");
}
String userName = callerUGI.getUserName();
final ApplicationId appid = app.getApplicationId();
final String reqTargetQueue = targetQueue;
try {
callerUGI.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws IOException, YarnException {
MoveApplicationAcrossQueuesRequest req =
MoveApplicationAcrossQueuesRequest.newInstance(appid,
reqTargetQueue);
rm.getClientRMService().moveApplicationAcrossQueues(req);
return null;
}
});
} catch (UndeclaredThrowableException ue) {
// if the root cause is a permissions issue
// bubble that up to the user
if (ue.getCause() instanceof YarnException) {
YarnException ye = (YarnException) ue.getCause();
if (ye.getCause() instanceof AccessControlException) {
String appId = app.getApplicationId().toString();
String msg = "Unauthorized attempt to move appid " + appId
+ " by remote user " + userName;
return Response.status(Status.FORBIDDEN).entity(msg).build();
} else if (ye.getMessage().startsWith("App in")
&& ye.getMessage().endsWith("state cannot be moved.")) {
return Response.status(Status.BAD_REQUEST).entity(ye.getMessage())
.build();
} else {
throw ue;
}
} else {
throw ue;
}
}
AppQueue ret = new AppQueue();
ret.setQueue(app.getQueue());
return Response.status(Status.OK).entity(ret).build();
}
private RMApp getRMAppForAppId(String appId) {
ApplicationId id = WebAppUtils.parseApplicationId(recordFactory, appId);
RMApp app = rm.getRMContext().getRMApps().get(id);
if (app == null) {
throw new NotFoundException("app with id: " + appId + " not found");
}
return app;
}
private UserGroupInformation getCallerUserGroupInformation(
HttpServletRequest hsr, boolean usePrincipal) {
String remoteUser = hsr.getRemoteUser();
if (usePrincipal) {
Principal princ = hsr.getUserPrincipal();
remoteUser = princ == null ? null : princ.getName();
}
UserGroupInformation callerUGI = null;
if (remoteUser != null) {
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
}
return callerUGI;
}
private boolean isStaticUser(UserGroupInformation callerUGI) {
String staticUser =
conf.get(CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER,
CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER);
return staticUser.equals(callerUGI.getUserName());
}
@POST
@Path(RMWSConsts.APPS_NEW_APPLICATION)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public Response createNewApplication(@Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, false);
NewApplication appId = createNewApplication();
return Response.status(Status.OK).entity(appId).build();
}
// reuse the code in ClientRMService to create new app
// get the new app id and submit app
// set location header with new app location
@POST
@Path(RMWSConsts.APPS)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public Response submitApplication(ApplicationSubmissionContextInfo newApp,
@Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, false);
ApplicationSubmissionContext appContext =
RMWebAppUtil.createAppSubmissionContext(newApp, conf);
final SubmitApplicationRequest req =
SubmitApplicationRequest.newInstance(appContext);
try {
callerUGI
.doAs(new PrivilegedExceptionAction<SubmitApplicationResponse>() {
@Override
public SubmitApplicationResponse run()
throws IOException, YarnException {
return rm.getClientRMService().submitApplication(req);
}
});
} catch (UndeclaredThrowableException ue) {
if (ue.getCause() instanceof YarnException) {
throw new BadRequestException(ue.getCause().getMessage());
}
LOG.info("Submit app request failed", ue);
throw ue;
}
String url = hsr.getRequestURL() + "/" + newApp.getApplicationId();
return Response.status(Status.ACCEPTED).header(HttpHeaders.LOCATION, url)
.build();
}
/**
* Function that actually creates the ApplicationId by calling the
* ClientRMService
*
* @return returns structure containing the app-id and maximum resource
* capabilities
*/
private NewApplication createNewApplication() {
GetNewApplicationRequest req =
recordFactory.newRecordInstance(GetNewApplicationRequest.class);
GetNewApplicationResponse resp;
try {
resp = rm.getClientRMService().getNewApplication(req);
} catch (YarnException e) {
String msg = "Unable to create new app from RM web service";
LOG.error(msg, e);
throw new YarnRuntimeException(msg, e);
}
NewApplication appId =
new NewApplication(resp.getApplicationId().toString(),
new ResourceInfo(resp.getMaximumResourceCapability()));
return appId;
}
private void createKerberosUserGroupInformation(HttpServletRequest hsr,
UserGroupInformation callerUGI)
throws AuthorizationException, YarnException {
String authType = hsr.getAuthType();
if (!KerberosAuthenticationHandler.TYPE.equalsIgnoreCase(authType)) {
String msg = "Delegation token operations can only be carried out on a "
+ "Kerberos authenticated channel. Expected auth type is "
+ KerberosAuthenticationHandler.TYPE + ", got type " + authType;
throw new YarnException(msg);
}
if (hsr.getAttribute(
DelegationTokenAuthenticationHandler.DELEGATION_TOKEN_UGI_ATTRIBUTE) != null) {
String msg = "Delegation token operations cannot be carried out using "
+ "delegation token authentication.";
throw new YarnException(msg);
}
}
@POST
@Path(RMWSConsts.DELEGATION_TOKEN)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public Response postDelegationToken(DelegationToken tokenData,
@Context HttpServletRequest hsr) throws AuthorizationException,
IOException, InterruptedException, Exception {
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, false);
try {
createKerberosUserGroupInformation(hsr, callerUGI);
callerUGI.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
} catch (YarnException ye) {
return Response.status(Status.FORBIDDEN).entity(ye.getMessage()).build();
}
return createDelegationToken(tokenData, hsr, callerUGI);
}
@POST
@Path(RMWSConsts.DELEGATION_TOKEN_EXPIRATION)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public Response postDelegationTokenExpiration(@Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException,
Exception {
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, false);
try {
createKerberosUserGroupInformation(hsr, callerUGI);
callerUGI.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
} catch (YarnException ye) {
return Response.status(Status.FORBIDDEN).entity(ye.getMessage()).build();
}
DelegationToken requestToken = new DelegationToken();
requestToken.setToken(extractToken(hsr).encodeToUrlString());
return renewDelegationToken(requestToken, hsr, callerUGI);
}
private Response createDelegationToken(DelegationToken tokenData,
HttpServletRequest hsr, UserGroupInformation callerUGI)
throws AuthorizationException, IOException, InterruptedException,
Exception {
final String renewer = tokenData.getRenewer();
GetDelegationTokenResponse resp;
try {
resp = callerUGI
.doAs(new PrivilegedExceptionAction<GetDelegationTokenResponse>() {
@Override
public GetDelegationTokenResponse run()
throws IOException, YarnException {
GetDelegationTokenRequest createReq =
GetDelegationTokenRequest.newInstance(renewer);
return rm.getClientRMService().getDelegationToken(createReq);
}
});
} catch (Exception e) {
LOG.info("Create delegation token request failed", e);
throw e;
}
Token<RMDelegationTokenIdentifier> tk =
new Token<RMDelegationTokenIdentifier>(
resp.getRMDelegationToken().getIdentifier().array(),
resp.getRMDelegationToken().getPassword().array(),
new Text(resp.getRMDelegationToken().getKind()),
new Text(resp.getRMDelegationToken().getService()));
RMDelegationTokenIdentifier identifier = tk.decodeIdentifier();
long currentExpiration = rm.getRMContext()
.getRMDelegationTokenSecretManager().getRenewDate(identifier);
DelegationToken respToken = new DelegationToken(tk.encodeToUrlString(),
renewer, identifier.getOwner().toString(), tk.getKind().toString(),
currentExpiration, identifier.getMaxDate());
return Response.status(Status.OK).entity(respToken).build();
}
private Response renewDelegationToken(DelegationToken tokenData,
HttpServletRequest hsr, UserGroupInformation callerUGI)
throws AuthorizationException, IOException, InterruptedException,
Exception {
Token<RMDelegationTokenIdentifier> token =
extractToken(tokenData.getToken());
org.apache.hadoop.yarn.api.records.Token dToken = BuilderUtils
.newDelegationToken(token.getIdentifier(), token.getKind().toString(),
token.getPassword(), token.getService().toString());
final RenewDelegationTokenRequest req =
RenewDelegationTokenRequest.newInstance(dToken);
RenewDelegationTokenResponse resp;
try {
resp = callerUGI
.doAs(new PrivilegedExceptionAction<RenewDelegationTokenResponse>() {
@Override
public RenewDelegationTokenResponse run() throws YarnException {
return rm.getClientRMService().renewDelegationToken(req);
}
});
} catch (UndeclaredThrowableException ue) {
if (ue.getCause() instanceof YarnException) {
if (ue.getCause().getCause() instanceof InvalidToken) {
throw new BadRequestException(ue.getCause().getCause().getMessage());
} else if (ue.getCause()
.getCause() instanceof org.apache.hadoop.security.AccessControlException) {
return Response.status(Status.FORBIDDEN)
.entity(ue.getCause().getCause().getMessage()).build();
}
LOG.info("Renew delegation token request failed", ue);
throw ue;
}
LOG.info("Renew delegation token request failed", ue);
throw ue;
} catch (Exception e) {
LOG.info("Renew delegation token request failed", e);
throw e;
}
long renewTime = resp.getNextExpirationTime();
DelegationToken respToken = new DelegationToken();
respToken.setNextExpirationTime(renewTime);
return Response.status(Status.OK).entity(respToken).build();
}
// For cancelling tokens, the encoded token is passed as a header
// There are two reasons for this -
// 1. Passing a request body as part of a DELETE request is not
// allowed by Jetty
// 2. Passing the encoded token as part of the url is not ideal
// since urls tend to get logged and anyone with access to
// the logs can extract tokens which are meant to be secret
@DELETE
@Path(RMWSConsts.DELEGATION_TOKEN)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public Response cancelDelegationToken(@Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException,
Exception {
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, false);
try {
createKerberosUserGroupInformation(hsr, callerUGI);
callerUGI.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
} catch (YarnException ye) {
return Response.status(Status.FORBIDDEN).entity(ye.getMessage()).build();
}
Token<RMDelegationTokenIdentifier> token = extractToken(hsr);
org.apache.hadoop.yarn.api.records.Token dToken = BuilderUtils
.newDelegationToken(token.getIdentifier(), token.getKind().toString(),
token.getPassword(), token.getService().toString());
final CancelDelegationTokenRequest req =
CancelDelegationTokenRequest.newInstance(dToken);
try {
callerUGI
.doAs(new PrivilegedExceptionAction<CancelDelegationTokenResponse>() {
@Override
public CancelDelegationTokenResponse run()
throws IOException, YarnException {
return rm.getClientRMService().cancelDelegationToken(req);
}
});
} catch (UndeclaredThrowableException ue) {
if (ue.getCause() instanceof YarnException) {
if (ue.getCause().getCause() instanceof InvalidToken) {
throw new BadRequestException(ue.getCause().getCause().getMessage());
} else if (ue.getCause()
.getCause() instanceof org.apache.hadoop.security.AccessControlException) {
return Response.status(Status.FORBIDDEN)
.entity(ue.getCause().getCause().getMessage()).build();
}
LOG.info("Renew delegation token request failed", ue);
throw ue;
}
LOG.info("Renew delegation token request failed", ue);
throw ue;
} catch (Exception e) {
LOG.info("Renew delegation token request failed", e);
throw e;
}
return Response.status(Status.OK).build();
}
private Token<RMDelegationTokenIdentifier> extractToken(
HttpServletRequest request) {
String encodedToken = request.getHeader(DELEGATION_TOKEN_HEADER);
if (encodedToken == null) {
String msg = "Header '" + DELEGATION_TOKEN_HEADER
+ "' containing encoded token not found";
throw new BadRequestException(msg);
}
return extractToken(encodedToken);
}
private Token<RMDelegationTokenIdentifier> extractToken(String encodedToken) {
Token<RMDelegationTokenIdentifier> token =
new Token<RMDelegationTokenIdentifier>();
try {
token.decodeFromUrlString(encodedToken);
} catch (Exception ie) {
String msg = "Could not decode encoded token";
throw new BadRequestException(msg);
}
return token;
}
@POST
@Path(RMWSConsts.RESERVATION_NEW)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public Response createNewReservation(@Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, false);
NewReservation reservationId = createNewReservation();
return Response.status(Status.OK).entity(reservationId).build();
}
/**
* Function that actually creates the {@link ReservationId} by calling the
* ClientRMService.
*
* @return returns structure containing the {@link ReservationId}
* @throws IOException if creation fails.
*/
private NewReservation createNewReservation() throws IOException {
GetNewReservationRequest req =
recordFactory.newRecordInstance(GetNewReservationRequest.class);
GetNewReservationResponse resp;
try {
resp = rm.getClientRMService().getNewReservation(req);
} catch (YarnException e) {
String msg = "Unable to create new reservation from RM web service";
LOG.error(msg, e);
throw new YarnRuntimeException(msg, e);
}
NewReservation reservationId =
new NewReservation(resp.getReservationId().toString());
return reservationId;
}
@POST
@Path(RMWSConsts.RESERVATION_SUBMIT)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public Response submitReservation(ReservationSubmissionRequestInfo resContext,
@Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, false);
final ReservationSubmissionRequest reservation =
createReservationSubmissionRequest(resContext);
try {
callerUGI
.doAs(new PrivilegedExceptionAction<ReservationSubmissionResponse>() {
@Override
public ReservationSubmissionResponse run()
throws IOException, YarnException {
return rm.getClientRMService().submitReservation(reservation);
}
});
} catch (UndeclaredThrowableException ue) {
if (ue.getCause() instanceof YarnException) {
throw new BadRequestException(ue.getCause().getMessage());
}
LOG.info("Submit reservation request failed", ue);
throw ue;
}
return Response.status(Status.ACCEPTED).build();
}
private ReservationSubmissionRequest createReservationSubmissionRequest(
ReservationSubmissionRequestInfo resContext) throws IOException {
// defending against a couple of common submission format problems
if (resContext == null) {
throw new BadRequestException(
"Input ReservationSubmissionContext should not be null");
}
ReservationDefinitionInfo resInfo = resContext.getReservationDefinition();
if (resInfo == null) {
throw new BadRequestException(
"Input ReservationDefinition should not be null");
}
ReservationRequestsInfo resReqsInfo = resInfo.getReservationRequests();
if (resReqsInfo == null || resReqsInfo.getReservationRequest() == null
|| resReqsInfo.getReservationRequest().size() == 0) {
throw new BadRequestException("The ReservationDefinition should"
+ " contain at least one ReservationRequest");
}
ReservationRequestInterpreter[] values =
ReservationRequestInterpreter.values();
ReservationRequestInterpreter resInt =
values[resReqsInfo.getReservationRequestsInterpreter()];
List<ReservationRequest> list = new ArrayList<ReservationRequest>();
for (ReservationRequestInfo resReqInfo : resReqsInfo
.getReservationRequest()) {
ResourceInfo rInfo = resReqInfo.getCapability();
Resource capability =
Resource.newInstance(rInfo.getMemorySize(), rInfo.getvCores());
int numContainers = resReqInfo.getNumContainers();
int minConcurrency = resReqInfo.getMinConcurrency();
long duration = resReqInfo.getDuration();
ReservationRequest rr = ReservationRequest.newInstance(capability,
numContainers, minConcurrency, duration);
list.add(rr);
}
ReservationRequests reqs = ReservationRequests.newInstance(list, resInt);
ReservationDefinition rDef = ReservationDefinition.newInstance(
resInfo.getArrival(), resInfo.getDeadline(), reqs,
resInfo.getReservationName(), resInfo.getRecurrenceExpression(),
Priority.newInstance(resInfo.getPriority()));
ReservationId reservationId =
ReservationId.parseReservationId(resContext.getReservationId());
ReservationSubmissionRequest request = ReservationSubmissionRequest
.newInstance(rDef, resContext.getQueue(), reservationId);
return request;
}
@POST
@Path(RMWSConsts.RESERVATION_UPDATE)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public Response updateReservation(ReservationUpdateRequestInfo resContext,
@Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, false);
final ReservationUpdateRequest reservation =
createReservationUpdateRequest(resContext);
ReservationUpdateResponseInfo resRespInfo;
try {
resRespInfo = callerUGI
.doAs(new PrivilegedExceptionAction<ReservationUpdateResponseInfo>() {
@Override
public ReservationUpdateResponseInfo run()
throws IOException, YarnException {
rm.getClientRMService().updateReservation(reservation);
return new ReservationUpdateResponseInfo();
}
});
} catch (UndeclaredThrowableException ue) {
if (ue.getCause() instanceof YarnException) {
throw new BadRequestException(ue.getCause().getMessage());
}
LOG.info("Update reservation request failed", ue);
throw ue;
}
return Response.status(Status.OK).entity(resRespInfo).build();
}
private ReservationUpdateRequest createReservationUpdateRequest(
ReservationUpdateRequestInfo resContext) throws IOException {
// defending against a couple of common submission format problems
if (resContext == null) {
throw new BadRequestException(
"Input ReservationSubmissionContext should not be null");
}
ReservationDefinitionInfo resInfo = resContext.getReservationDefinition();
if (resInfo == null) {
throw new BadRequestException(
"Input ReservationDefinition should not be null");
}
ReservationRequestsInfo resReqsInfo = resInfo.getReservationRequests();
if (resReqsInfo == null || resReqsInfo.getReservationRequest() == null
|| resReqsInfo.getReservationRequest().size() == 0) {
throw new BadRequestException("The ReservationDefinition should"
+ " contain at least one ReservationRequest");
}
if (resContext.getReservationId() == null) {
throw new BadRequestException(
"Update operations must specify an existing ReservaitonId");
}
ReservationRequestInterpreter[] values =
ReservationRequestInterpreter.values();
ReservationRequestInterpreter resInt =
values[resReqsInfo.getReservationRequestsInterpreter()];
List<ReservationRequest> list = new ArrayList<ReservationRequest>();
for (ReservationRequestInfo resReqInfo : resReqsInfo
.getReservationRequest()) {
ResourceInfo rInfo = resReqInfo.getCapability();
Resource capability =
Resource.newInstance(rInfo.getMemorySize(), rInfo.getvCores());
int numContainers = resReqInfo.getNumContainers();
int minConcurrency = resReqInfo.getMinConcurrency();
long duration = resReqInfo.getDuration();
ReservationRequest rr = ReservationRequest.newInstance(capability,
numContainers, minConcurrency, duration);
list.add(rr);
}
ReservationRequests reqs = ReservationRequests.newInstance(list, resInt);
ReservationDefinition rDef = ReservationDefinition.newInstance(
resInfo.getArrival(), resInfo.getDeadline(), reqs,
resInfo.getReservationName(), resInfo.getRecurrenceExpression(),
Priority.newInstance(resInfo.getPriority()));
ReservationUpdateRequest request = ReservationUpdateRequest.newInstance(
rDef, ReservationId.parseReservationId(resContext.getReservationId()));
return request;
}
@POST
@Path(RMWSConsts.RESERVATION_DELETE)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public Response deleteReservation(ReservationDeleteRequestInfo resContext,
@Context HttpServletRequest hsr)
throws AuthorizationException, IOException, InterruptedException {
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, false);
final ReservationDeleteRequest reservation =
createReservationDeleteRequest(resContext);
ReservationDeleteResponseInfo resRespInfo;
try {
resRespInfo = callerUGI
.doAs(new PrivilegedExceptionAction<ReservationDeleteResponseInfo>() {
@Override
public ReservationDeleteResponseInfo run()
throws IOException, YarnException {
rm.getClientRMService().deleteReservation(reservation);
return new ReservationDeleteResponseInfo();
}
});
} catch (UndeclaredThrowableException ue) {
if (ue.getCause() instanceof YarnException) {
throw new BadRequestException(ue.getCause().getMessage());
}
LOG.info("Update reservation request failed", ue);
throw ue;
}
return Response.status(Status.OK).entity(resRespInfo).build();
}
private ReservationDeleteRequest createReservationDeleteRequest(
ReservationDeleteRequestInfo resContext) throws IOException {
ReservationDeleteRequest request = ReservationDeleteRequest.newInstance(
ReservationId.parseReservationId(resContext.getReservationId()));
return request;
}
@GET
@Path(RMWSConsts.RESERVATION_LIST)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public Response listReservation(
@QueryParam(RMWSConsts.QUEUE) @DefaultValue(DEFAULT_QUEUE) String queue,
@QueryParam(RMWSConsts.RESERVATION_ID) @DefaultValue(DEFAULT_RESERVATION_ID) String reservationId,
@QueryParam(RMWSConsts.START_TIME) @DefaultValue(DEFAULT_START_TIME) long startTime,
@QueryParam(RMWSConsts.END_TIME) @DefaultValue(DEFAULT_END_TIME) long endTime,
@QueryParam(RMWSConsts.INCLUDE_RESOURCE) @DefaultValue(DEFAULT_INCLUDE_RESOURCE) boolean includeResourceAllocations,
@Context HttpServletRequest hsr) throws Exception {
initForReadableEndpoints();
final ReservationListRequest request = ReservationListRequest.newInstance(
queue, reservationId, startTime, endTime, includeResourceAllocations);
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
if (callerUGI == null) {
throw new AuthorizationException(
"Unable to obtain user name, " + "user not authenticated");
}
if (UserGroupInformation.isSecurityEnabled() && isStaticUser(callerUGI)) {
String msg = "The default static user cannot carry out this operation.";
return Response.status(Status.FORBIDDEN).entity(msg).build();
}
ReservationListResponse resRespInfo;
try {
resRespInfo = callerUGI
.doAs(new PrivilegedExceptionAction<ReservationListResponse>() {
@Override
public ReservationListResponse run()
throws IOException, YarnException {
return rm.getClientRMService().listReservations(request);
}
});
} catch (UndeclaredThrowableException ue) {
if (ue.getCause() instanceof YarnException) {
throw new BadRequestException(ue.getCause().getMessage());
}
LOG.info("List reservation request failed", ue);
throw ue;
}
ReservationListInfo resResponse =
new ReservationListInfo(resRespInfo, includeResourceAllocations);
return Response.status(Status.OK).entity(resResponse).build();
}
@GET
@Path(RMWSConsts.APPS_TIMEOUTS_TYPE)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public AppTimeoutInfo getAppTimeout(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId,
@PathParam(RMWSConsts.TYPE) String type) throws AuthorizationException {
initForReadableEndpoints();
RMApp app = validateAppTimeoutRequest(hsr, appId);
ApplicationTimeoutType appTimeoutType = parseTimeoutType(type);
Long timeoutValue = app.getApplicationTimeouts().get(appTimeoutType);
AppTimeoutInfo timeout =
constructAppTimeoutDao(appTimeoutType, timeoutValue);
return timeout;
}
private RMApp validateAppTimeoutRequest(HttpServletRequest hsr,
String appId) {
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
String userName = "UNKNOWN-USER";
if (callerUGI != null) {
userName = callerUGI.getUserName();
}
if (UserGroupInformation.isSecurityEnabled() && isStaticUser(callerUGI)) {
String msg = "The default static user cannot carry out this operation.";
RMAuditLogger.logFailure(userName, AuditConstants.GET_APP_TIMEOUTS,
"UNKNOWN", "RMWebService", msg);
throw new ForbiddenException(msg);
}
RMApp app = null;
try {
app = getRMAppForAppId(appId);
} catch (NotFoundException e) {
RMAuditLogger.logFailure(userName, AuditConstants.GET_APP_TIMEOUTS,
"UNKNOWN", "RMWebService",
"Trying to get timeouts of an absent application " + appId);
throw e;
}
return app;
}
@GET
@Path(RMWSConsts.APPS_TIMEOUTS)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public AppTimeoutsInfo getAppTimeouts(@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException {
initForReadableEndpoints();
RMApp app = validateAppTimeoutRequest(hsr, appId);
AppTimeoutsInfo timeouts = new AppTimeoutsInfo();
Map<ApplicationTimeoutType, Long> applicationTimeouts =
app.getApplicationTimeouts();
if (applicationTimeouts.isEmpty()) {
// If application is not set timeout, lifetime should be sent as default
// with expiryTime=UNLIMITED and remainingTime=-1
timeouts
.add(constructAppTimeoutDao(ApplicationTimeoutType.LIFETIME, null));
} else {
for (Entry<ApplicationTimeoutType, Long> timeout : app
.getApplicationTimeouts().entrySet()) {
AppTimeoutInfo timeoutInfo =
constructAppTimeoutDao(timeout.getKey(), timeout.getValue());
timeouts.add(timeoutInfo);
}
}
return timeouts;
}
private ApplicationTimeoutType parseTimeoutType(String type) {
try {
// enum string is in the uppercase
return ApplicationTimeoutType
.valueOf(StringUtils.toUpperCase(type.trim()));
} catch (RuntimeException e) {
ApplicationTimeoutType[] typeArray = ApplicationTimeoutType.values();
String allAppTimeoutTypes = Arrays.toString(typeArray);
throw new BadRequestException("Invalid application-state " + type.trim()
+ " specified. It should be one of " + allAppTimeoutTypes);
}
}
private AppTimeoutInfo constructAppTimeoutDao(ApplicationTimeoutType type,
Long timeoutInMillis) {
AppTimeoutInfo timeout = new AppTimeoutInfo();
timeout.setTimeoutType(type);
if (timeoutInMillis != null) {
timeout.setExpiryTime(Times.formatISO8601(timeoutInMillis.longValue()));
timeout.setRemainingTime(
Math.max((timeoutInMillis - System.currentTimeMillis()) / 1000, 0));
}
return timeout;
}
@PUT
@Path(RMWSConsts.APPS_TIMEOUT)
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Override
public Response updateApplicationTimeout(AppTimeoutInfo appTimeout,
@Context HttpServletRequest hsr,
@PathParam(RMWSConsts.APPID) String appId) throws AuthorizationException,
YarnException, InterruptedException, IOException {
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, false);
String userName = callerUGI.getUserName();
RMApp app = null;
try {
app = getRMAppForAppId(appId);
} catch (NotFoundException e) {
RMAuditLogger.logFailure(userName, AuditConstants.UPDATE_APP_TIMEOUTS,
"UNKNOWN", "RMWebService",
"Trying to update timeout of an absent application " + appId);
throw e;
}
return updateApplicationTimeouts(app, callerUGI, appTimeout);
}
private Response updateApplicationTimeouts(final RMApp app,
UserGroupInformation callerUGI, final AppTimeoutInfo appTimeout)
throws IOException, InterruptedException {
if (appTimeout.getTimeoutType() == null
|| appTimeout.getExpireTime() == null) {
return Response.status(Status.BAD_REQUEST)
.entity("Timeout type or ExpiryTime is null.").build();
}
String userName = callerUGI.getUserName();
try {
callerUGI.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws IOException, YarnException {
UpdateApplicationTimeoutsRequest request =
UpdateApplicationTimeoutsRequest
.newInstance(app.getApplicationId(), Collections.singletonMap(
appTimeout.getTimeoutType(), appTimeout.getExpireTime()));
rm.getClientRMService().updateApplicationTimeouts(request);
return null;
}
});
} catch (UndeclaredThrowableException ue) {
// if the root cause is a permissions issue
// bubble that up to the user
if (ue.getCause() instanceof YarnException) {
YarnException ye = (YarnException) ue.getCause();
if (ye.getCause() instanceof AccessControlException) {
String appId = app.getApplicationId().toString();
String msg = "Unauthorized attempt to change timeout of app " + appId
+ " by remote user " + userName;
return Response.status(Status.FORBIDDEN).entity(msg).build();
} else if (ye.getCause() instanceof ParseException) {
return Response.status(Status.BAD_REQUEST).entity(ye.getMessage())
.build();
} else {
throw ue;
}
} else {
throw ue;
}
}
AppTimeoutInfo timeout = constructAppTimeoutDao(appTimeout.getTimeoutType(),
app.getApplicationTimeouts().get(appTimeout.getTimeoutType()));
return Response.status(Status.OK).entity(timeout).build();
}
@Override
protected ApplicationReport getApplicationReport(
GetApplicationReportRequest request) throws YarnException, IOException {
return rm.getClientRMService().getApplicationReport(request)
.getApplicationReport();
}
@Override
protected List<ApplicationReport> getApplicationsReport(
final GetApplicationsRequest request) throws YarnException, IOException {
return rm.getClientRMService().getApplications(request)
.getApplicationList();
}
@Override
protected ApplicationAttemptReport getApplicationAttemptReport(
GetApplicationAttemptReportRequest request)
throws YarnException, IOException {
return rm.getClientRMService().getApplicationAttemptReport(request)
.getApplicationAttemptReport();
}
@Override
protected List<ApplicationAttemptReport> getApplicationAttemptsReport(
GetApplicationAttemptsRequest request) throws YarnException, IOException {
return rm.getClientRMService().getApplicationAttempts(request)
.getApplicationAttemptList();
}
@Override
protected ContainerReport getContainerReport(
GetContainerReportRequest request) throws YarnException, IOException {
return rm.getClientRMService().getContainerReport(request)
.getContainerReport();
}
@Override
protected List<ContainerReport> getContainersReport(
GetContainersRequest request) throws YarnException, IOException {
return rm.getClientRMService().getContainers(request).getContainerList();
}
@PUT
@Path("/scheduler-conf")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
@Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public synchronized Response updateSchedulerConfiguration(
final SchedConfUpdateInfo mutationInfo, @Context HttpServletRequest hsr)
throws AuthorizationException, InterruptedException {
final UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr,
true);
initForWritableEndpoints(callerUGI, true);
final ResourceScheduler scheduler = rm.getResourceScheduler();
if (scheduler instanceof MutableConfScheduler && ((MutableConfScheduler)
scheduler).isConfigurationMutable()) {
try {
callerUGI.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
MutableConfigurationProvider provider = ((MutableConfScheduler)
scheduler).getMutableConfProvider();
if (!provider.getAclMutationPolicy().isMutationAllowed(callerUGI,
mutationInfo)) {
throw new org.apache.hadoop.security.AccessControlException("User"
+ " is not admin of all modified queues.");
}
provider.logAndApplyMutation(callerUGI, mutationInfo);
try {
rm.getRMContext().getRMAdminService().refreshQueues();
} catch (IOException | YarnException e) {
provider.confirmPendingMutation(false);
throw e;
}
provider.confirmPendingMutation(true);
return null;
}
});
} catch (IOException e) {
LOG.error("Exception thrown when modifying configuration.", e);
return Response.status(Status.BAD_REQUEST).entity(e.getMessage())
.build();
}
return Response.status(Status.OK).entity("Configuration change " +
"successfully applied.").build();
} else {
return Response.status(Status.BAD_REQUEST)
.entity("Configuration change only supported by " +
"MutableConfScheduler.")
.build();
}
}
@GET
@Path("/scheduler-conf")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public Response getSchedulerConfiguration(@Context HttpServletRequest hsr)
throws AuthorizationException {
// Only admin user is allowed to read scheduler conf,
// in order to avoid leaking sensitive info, such as ACLs
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
initForWritableEndpoints(callerUGI, true);
ResourceScheduler scheduler = rm.getResourceScheduler();
if (scheduler instanceof MutableConfScheduler
&& ((MutableConfScheduler) scheduler).isConfigurationMutable()) {
MutableConfigurationProvider mutableConfigurationProvider =
((MutableConfScheduler) scheduler).getMutableConfProvider();
// We load the cached configuration from configuration store,
// this should be the conf properties used by the scheduler.
Configuration schedulerConf = mutableConfigurationProvider
.getConfiguration();
return Response.status(Status.OK)
.entity(new ConfInfo(schedulerConf))
.build();
} else {
return Response.status(Status.BAD_REQUEST).entity(
"This API only supports to retrieve scheduler configuration"
+ " from a mutable-conf scheduler, underneath scheduler "
+ scheduler.getClass().getSimpleName()
+ " is not an instance of MutableConfScheduler")
.build();
}
}
}