blob: c95ce3e557eb8b1e6098381751587129a259a686 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs.webapp;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriInfo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.webapp.AMWebServices;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.ConfInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobCounterInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobTaskAttemptCounterInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobTaskCounterInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.ReduceTaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptsInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TasksInfo;
import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.AMAttemptInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.AMAttemptsInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.HistoryInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.hadoop.yarn.webapp.WebApp;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
@Path("/ws/v1/history")
public class HsWebServices {
private final HistoryContext ctx;
private WebApp webapp;
private @Context HttpServletResponse response;
@Context
UriInfo uriInfo;
@Inject
public HsWebServices(final HistoryContext ctx, final Configuration conf,
final WebApp webapp) {
this.ctx = ctx;
this.webapp = webapp;
}
private boolean hasAccess(Job job, HttpServletRequest request) {
String remoteUser = request.getRemoteUser();
if (remoteUser != null) {
return job.checkAccess(UserGroupInformation.createRemoteUser(remoteUser),
JobACL.VIEW_JOB);
}
return true;
}
private void checkAccess(Job job, HttpServletRequest request) {
if (!hasAccess(job, request)) {
throw new WebApplicationException(Status.UNAUTHORIZED);
}
}
private void init() {
//clear content type
response.setContentType(null);
}
@VisibleForTesting
void setResponse(HttpServletResponse response) {
this.response = response;
}
@GET
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public HistoryInfo get() {
return getHistoryInfo();
}
@GET
@Path("/info")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public HistoryInfo getHistoryInfo() {
init();
return new HistoryInfo();
}
@GET
@Path("/mapreduce/jobs")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public JobsInfo getJobs(@QueryParam("user") String userQuery,
@QueryParam("limit") String count,
@QueryParam("state") String stateQuery,
@QueryParam("queue") String queueQuery,
@QueryParam("startedTimeBegin") String startedBegin,
@QueryParam("startedTimeEnd") String startedEnd,
@QueryParam("finishedTimeBegin") String finishBegin,
@QueryParam("finishedTimeEnd") String finishEnd) {
Long countParam = null;
init();
if (count != null && !count.isEmpty()) {
try {
countParam = Long.parseLong(count);
} catch (NumberFormatException e) {
throw new BadRequestException(e.getMessage());
}
if (countParam <= 0) {
throw new BadRequestException("limit value must be greater then 0");
}
}
Long sBegin = null;
if (startedBegin != null && !startedBegin.isEmpty()) {
try {
sBegin = Long.parseLong(startedBegin);
} catch (NumberFormatException e) {
throw new BadRequestException("Invalid number format: " + e.getMessage());
}
if (sBegin < 0) {
throw new BadRequestException("startedTimeBegin must be greater than 0");
}
}
Long sEnd = null;
if (startedEnd != null && !startedEnd.isEmpty()) {
try {
sEnd = Long.parseLong(startedEnd);
} catch (NumberFormatException e) {
throw new BadRequestException("Invalid number format: " + e.getMessage());
}
if (sEnd < 0) {
throw new BadRequestException("startedTimeEnd must be greater than 0");
}
}
if (sBegin != null && sEnd != null && sBegin > sEnd) {
throw new BadRequestException(
"startedTimeEnd must be greater than startTimeBegin");
}
Long fBegin = null;
if (finishBegin != null && !finishBegin.isEmpty()) {
try {
fBegin = Long.parseLong(finishBegin);
} catch (NumberFormatException e) {
throw new BadRequestException("Invalid number format: " + e.getMessage());
}
if (fBegin < 0) {
throw new BadRequestException("finishedTimeBegin must be greater than 0");
}
}
Long fEnd = null;
if (finishEnd != null && !finishEnd.isEmpty()) {
try {
fEnd = Long.parseLong(finishEnd);
} catch (NumberFormatException e) {
throw new BadRequestException("Invalid number format: " + e.getMessage());
}
if (fEnd < 0) {
throw new BadRequestException("finishedTimeEnd must be greater than 0");
}
}
if (fBegin != null && fEnd != null && fBegin > fEnd) {
throw new BadRequestException(
"finishedTimeEnd must be greater than finishedTimeBegin");
}
JobState jobState = null;
if (stateQuery != null) {
jobState = JobState.valueOf(stateQuery);
}
return ctx.getPartialJobs(0l, countParam, userQuery, queueQuery,
sBegin, sEnd, fBegin, fEnd, jobState);
}
@GET
@Path("/mapreduce/jobs/{jobid}")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public JobInfo getJob(@Context HttpServletRequest hsr,
@PathParam("jobid") String jid) {
init();
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
checkAccess(job, hsr);
return new JobInfo(job);
}
@GET
@Path("/mapreduce/jobs/{jobid}/jobattempts")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public AMAttemptsInfo getJobAttempts(@PathParam("jobid") String jid) {
init();
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
AMAttemptsInfo amAttempts = new AMAttemptsInfo();
for (AMInfo amInfo : job.getAMInfos()) {
AMAttemptInfo attempt = new AMAttemptInfo(amInfo, MRApps.toString(job
.getID()), job.getUserName(), uriInfo.getBaseUri().toString(),
webapp.name());
amAttempts.add(attempt);
}
return amAttempts;
}
@GET
@Path("/mapreduce/jobs/{jobid}/counters")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public JobCounterInfo getJobCounters(@Context HttpServletRequest hsr,
@PathParam("jobid") String jid) {
init();
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
checkAccess(job, hsr);
return new JobCounterInfo(this.ctx, job);
}
@GET
@Path("/mapreduce/jobs/{jobid}/conf")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public ConfInfo getJobConf(@Context HttpServletRequest hsr,
@PathParam("jobid") String jid) {
init();
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
checkAccess(job, hsr);
ConfInfo info;
try {
info = new ConfInfo(job);
} catch (IOException e) {
throw new NotFoundException("unable to load configuration for job: "
+ jid);
}
return info;
}
@GET
@Path("/mapreduce/jobs/{jobid}/tasks")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public TasksInfo getJobTasks(@Context HttpServletRequest hsr,
@PathParam("jobid") String jid, @QueryParam("type") String type) {
init();
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
checkAccess(job, hsr);
TasksInfo allTasks = new TasksInfo();
for (Task task : job.getTasks().values()) {
TaskType ttype = null;
if (type != null && !type.isEmpty()) {
try {
ttype = MRApps.taskType(type);
} catch (YarnRuntimeException e) {
throw new BadRequestException("tasktype must be either m or r");
}
}
if (ttype != null && task.getType() != ttype) {
continue;
}
allTasks.add(new TaskInfo(task));
}
return allTasks;
}
@GET
@Path("/mapreduce/jobs/{jobid}/tasks/{taskid}")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public TaskInfo getJobTask(@Context HttpServletRequest hsr,
@PathParam("jobid") String jid, @PathParam("taskid") String tid) {
init();
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
checkAccess(job, hsr);
Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
return new TaskInfo(task);
}
@GET
@Path("/mapreduce/jobs/{jobid}/tasks/{taskid}/counters")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public JobTaskCounterInfo getSingleTaskCounters(
@Context HttpServletRequest hsr, @PathParam("jobid") String jid,
@PathParam("taskid") String tid) {
init();
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
checkAccess(job, hsr);
TaskId taskID = MRApps.toTaskID(tid);
if (taskID == null) {
throw new NotFoundException("taskid " + tid + " not found or invalid");
}
Task task = job.getTask(taskID);
if (task == null) {
throw new NotFoundException("task not found with id " + tid);
}
return new JobTaskCounterInfo(task);
}
@GET
@Path("/mapreduce/jobs/{jobid}/tasks/{taskid}/attempts")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public TaskAttemptsInfo getJobTaskAttempts(@Context HttpServletRequest hsr,
@PathParam("jobid") String jid, @PathParam("taskid") String tid) {
init();
TaskAttemptsInfo attempts = new TaskAttemptsInfo();
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
checkAccess(job, hsr);
Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
for (TaskAttempt ta : task.getAttempts().values()) {
if (ta != null) {
if (task.getType() == TaskType.REDUCE) {
attempts.add(new ReduceTaskAttemptInfo(ta, task.getType()));
} else {
attempts.add(new TaskAttemptInfo(ta, task.getType(), false));
}
}
}
return attempts;
}
@GET
@Path("/mapreduce/jobs/{jobid}/tasks/{taskid}/attempts/{attemptid}")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public TaskAttemptInfo getJobTaskAttemptId(@Context HttpServletRequest hsr,
@PathParam("jobid") String jid, @PathParam("taskid") String tid,
@PathParam("attemptid") String attId) {
init();
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
checkAccess(job, hsr);
Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
TaskAttempt ta = AMWebServices.getTaskAttemptFromTaskAttemptString(attId,
task);
if (task.getType() == TaskType.REDUCE) {
return new ReduceTaskAttemptInfo(ta, task.getType());
} else {
return new TaskAttemptInfo(ta, task.getType(), false);
}
}
@GET
@Path("/mapreduce/jobs/{jobid}/tasks/{taskid}/attempts/{attemptid}/counters")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public JobTaskAttemptCounterInfo getJobTaskAttemptIdCounters(
@Context HttpServletRequest hsr, @PathParam("jobid") String jid,
@PathParam("taskid") String tid, @PathParam("attemptid") String attId) {
init();
Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
checkAccess(job, hsr);
Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
TaskAttempt ta = AMWebServices.getTaskAttemptFromTaskAttemptString(attId,
task);
return new JobTaskAttemptCounterInfo(ta);
}
}