blob: 2b76f69886d08ff5790ba0d1090e39954987522a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.eventdb.webservice;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import javax.servlet.ServletContext;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import org.apache.ambari.eventdb.db.PostgresConnector;
import org.apache.ambari.eventdb.model.DataTable;
import org.apache.ambari.eventdb.model.Jobs;
import org.apache.ambari.eventdb.model.Jobs.JobDBEntry;
import org.apache.ambari.eventdb.model.TaskAttempt;
import org.apache.ambari.eventdb.model.TaskData;
import org.apache.ambari.eventdb.model.TaskData.Point;
import org.apache.ambari.eventdb.model.TaskLocalityData;
import org.apache.ambari.eventdb.model.TaskLocalityData.DataPoint;
import org.apache.ambari.eventdb.model.Workflows;
import org.apache.ambari.eventdb.model.Workflows.WorkflowDBEntry;
import org.apache.ambari.eventdb.model.Workflows.WorkflowDBEntry.WorkflowFields;
@Path("/jobhistory")
public class WorkflowJsonService {
private static final String PREFIX = "eventdb.";
private static final String HOSTNAME = PREFIX + "db.hostname";
private static final String DBNAME = PREFIX + "db.name";
private static final String USERNAME = PREFIX + "db.user";
private static final String PASSWORD = PREFIX + "db.password";
private static final String DEFAULT_HOSTNAME = "localhost";
private static final String DEFAULT_DBNAME = "ambarirca";
private static final String DEFAULT_USERNAME = "mapred";
private static final String DEFAULT_PASSWORD = "mapred";
private static final Workflows EMPTY_WORKFLOWS = new Workflows();
private static final List<JobDBEntry> EMPTY_JOBS = Collections.emptyList();
{
List<WorkflowDBEntry> emptyWorkflows = Collections.emptyList();
EMPTY_WORKFLOWS.setWorkflows(emptyWorkflows);
}
PostgresConnector getConnector() throws IOException {
return new PostgresConnector(DEFAULT_HOSTNAME, DEFAULT_DBNAME, DEFAULT_USERNAME, DEFAULT_PASSWORD);
}
@Context
ServletContext servletContext;
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/workflow")
public Workflows getWorkflows(@QueryParam("orderBy") String field, @DefaultValue(PostgresConnector.SORT_ASC) @QueryParam("sortDir") String sortDir,
@DefaultValue("0") @QueryParam("offset") int offset, @DefaultValue("-1") @QueryParam("limit") int limit) {
Workflows workflows = EMPTY_WORKFLOWS;
PostgresConnector conn = null;
try {
conn = getConnector();
if (field == null)
workflows = conn.fetchWorkflows();
else {
field = field.toUpperCase();
if ("ELAPSEDTIME".equals(field))
field = "DURATION";
workflows = conn.fetchWorkflows(WorkflowFields.valueOf(field), sortDir.toUpperCase().equals(PostgresConnector.SORT_ASC), offset, limit);
}
} catch (IOException e) {
e.printStackTrace();
workflows = EMPTY_WORKFLOWS;
} finally {
if (conn != null) {
conn.close();
}
}
return workflows;
}
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/datatable")
public DataTable getWorkflowDataTable(@DefaultValue("0") @QueryParam("iDisplayStart") int start,
@DefaultValue("10") @QueryParam("iDisplayLength") int amount, @QueryParam("sSearch") String searchTerm, @DefaultValue("0") @QueryParam("sEcho") int echo,
@DefaultValue("0") @QueryParam("iSortCol_0") int col, @DefaultValue(PostgresConnector.SORT_ASC) @QueryParam("sSortDir_0") String sdir,
@QueryParam("sSearch_0") String workflowId, @QueryParam("sSearch_1") String workflowName, @QueryParam("sSearch_2") String workflowType,
@QueryParam("sSearch_3") String userName, @DefaultValue("-1") @QueryParam("minJobs") int minJobs, @DefaultValue("-1") @QueryParam("maxJobs") int maxJobs,
@DefaultValue("-1") @QueryParam("minInputBytes") long minInputBytes, @DefaultValue("-1") @QueryParam("maxInputBytes") long maxInputBytes,
@DefaultValue("-1") @QueryParam("minOutputBytes") long minOutputBytes, @DefaultValue("-1") @QueryParam("maxOutputBytes") long maxOutputBytes,
@DefaultValue("-1") @QueryParam("minDuration") long minDuration, @DefaultValue("-1") @QueryParam("maxDuration") long maxDuration,
@DefaultValue("-1") @QueryParam("minStartTime") long minStartTime, @DefaultValue("-1") @QueryParam("maxStartTime") long maxStartTime) {
if (start < 0)
start = 0;
if (amount < 10 || amount > 100)
amount = 10;
boolean sortAscending = true;
if (!sdir.toUpperCase().equals(PostgresConnector.SORT_ASC))
sortAscending = false;
WorkflowFields field = null;
switch (col) {
case 0: // workflowId
field = WorkflowFields.WORKFLOWID;
break;
case 1: // workflowName
field = WorkflowFields.WORKFLOWNAME;
break;
case 2: // workflowType
field = WorkflowFields.WORKFLOWID;
break;
case 3: // userName
field = WorkflowFields.USERNAME;
break;
case 4: // numJobsTotal
field = WorkflowFields.NUMJOBSTOTAL;
break;
case 5: // inputBytes
field = WorkflowFields.INPUTBYTES;
break;
case 6: // outputBytes
field = WorkflowFields.OUTPUTBYTES;
break;
case 7: // duration
field = WorkflowFields.DURATION;
break;
case 8: // startTime
field = WorkflowFields.STARTTIME;
break;
default:
field = WorkflowFields.WORKFLOWID;
}
DataTable table = null;
PostgresConnector conn = null;
try {
conn = getConnector();
table = conn.fetchWorkflows(start, amount, searchTerm, echo, field, sortAscending, workflowId, workflowName, workflowType, userName, minJobs, maxJobs,
minInputBytes, maxInputBytes, minOutputBytes, maxOutputBytes, minDuration, maxDuration, minStartTime, maxStartTime);
} catch (IOException e) {
e.printStackTrace();
} finally {
if (conn != null) {
conn.close();
}
}
return table;
}
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/job")
public Jobs getJobs(@QueryParam("workflowId") String workflowId) {
Jobs jobs = new Jobs();
PostgresConnector conn = null;
try {
conn = getConnector();
jobs.setJobs(conn.fetchJobDetails(workflowId));
} catch (IOException e) {
e.printStackTrace();
jobs.setJobs(EMPTY_JOBS);
} finally {
if (conn != null) {
conn.close();
}
}
return jobs;
}
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/task")
public TaskData getTaskDetails(@QueryParam("jobId") String jobId, @QueryParam("width") int steps) {
TaskData points = new TaskData();
PostgresConnector conn = null;
try {
conn = getConnector();
long[] times = conn.fetchJobStartStopTimes(jobId);
if (times != null) {
double submitTimeSecs = times[0] / 1000.0;
double finishTimeSecs = times[1] / 1000.0;
double step = (finishTimeSecs - submitTimeSecs) / steps;
if (step < 1)
step = 1;
getMapDetails(conn, points, jobId, submitTimeSecs, finishTimeSecs, step);
getReduceDetails(conn, points, jobId, submitTimeSecs, finishTimeSecs, step);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (conn != null) {
conn.close();
}
}
return points;
}
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/tasklocality")
public TaskLocalityData getTaskLocalityDetails(@QueryParam("jobId") String jobId, @DefaultValue("4") @QueryParam("minr") int minr,
@DefaultValue("24") @QueryParam("maxr") int maxr) {
if (maxr < minr)
maxr = minr;
TaskLocalityData data = new TaskLocalityData();
PostgresConnector conn = null;
try {
conn = getConnector();
long[] times = conn.fetchJobStartStopTimes(jobId);
if (times != null) {
getTaskAttemptsByLocality(conn, jobId, times[0], times[1], data, minr, maxr);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (conn != null) {
conn.close();
}
}
return data;
}
private static void getMapDetails(PostgresConnector conn, TaskData points, String jobId, double submitTimeSecs, double finishTimeSecs, double step)
throws IOException {
List<TaskAttempt> taskAttempts = conn.fetchTaskAttempts(jobId, "MAP");
List<Point> mapPoints = new ArrayList<Point>();
for (double time = submitTimeSecs; time < finishTimeSecs; time += step) {
int numTasks = 0;
for (TaskAttempt taskAttempt : taskAttempts)
if ((taskAttempt.getStartTime() / 1000.0) <= (time + step) && (taskAttempt.getFinishTime() / 1000.0) >= time)
numTasks++;
mapPoints.add(new Point(Math.round(time), numTasks));
}
points.setMapData(mapPoints);
}
private static void getReduceDetails(PostgresConnector conn, TaskData points, String jobId, double submitTimeSecs, double finishTimeSecs, double step)
throws IOException {
List<TaskAttempt> taskAttempts = conn.fetchTaskAttempts(jobId, "REDUCE");
List<Point> shufflePoints = new ArrayList<Point>();
List<Point> reducePoints = new ArrayList<Point>();
for (double time = submitTimeSecs; time < finishTimeSecs; time += step) {
int numShuffleTasks = 0;
int numReduceTasks = 0;
for (TaskAttempt taskAttempt : taskAttempts) {
if ((taskAttempt.getStartTime() / 1000.0) <= (time + step) && (taskAttempt.getShuffleFinishTime() / 1000.0) >= time) {
numShuffleTasks++;
} else if ((taskAttempt.getShuffleFinishTime() / 1000.0) < (time + step) && (taskAttempt.getFinishTime() / 1000.0) >= time) {
numReduceTasks++;
}
}
shufflePoints.add(new Point(Math.round(time), numShuffleTasks));
reducePoints.add(new Point(Math.round(time), numReduceTasks));
}
points.setShuffleData(shufflePoints);
points.setReduceData(reducePoints);
}
private static void getTaskAttemptsByLocality(PostgresConnector conn, String jobId, long submitTime, long finishTime, TaskLocalityData data, int minr,
int maxr) throws IOException {
long submitTimeX = transformX(submitTime);
long finishTimeX = transformX(finishTime);
List<TaskAttempt> mapAttempts = conn.fetchTaskAttempts(jobId, "MAP");
List<TaskAttempt> reduceAttempts = conn.fetchTaskAttempts(jobId, "REDUCE");
Set<Long> xPoints = getXPoints(mapAttempts, reduceAttempts, submitTimeX, finishTimeX);
Long[] xList = xPoints.toArray(new Long[xPoints.size()]);
MinMax io = new MinMax();
data.setMapNodeLocal(processLocalityData(mapAttempts, "NODE_LOCAL", xList, io));
data.setMapRackLocal(processLocalityData(mapAttempts, "RACK_LOCAL", xList, io));
data.setMapOffSwitch(processLocalityData(mapAttempts, "OFF_SWITCH", xList, io));
data.setReduceOffSwitch(processLocalityData(reduceAttempts, "OFF_SWITCH", xList, io));
setRValues(data.getMapNodeLocal(), minr, maxr, io.max);
setRValues(data.getMapRackLocal(), minr, maxr, io.max);
setRValues(data.getMapOffSwitch(), minr, maxr, io.max);
setRValues(data.getReduceOffSwitch(), minr, maxr, io.max);
data.setSubmitTime(submitTimeX);
data.setFinishTime(finishTimeX);
}
private static class MinMax {
private long min = Long.MAX_VALUE;
private long max = 0;
}
private static long transformX(long time) {
return Math.round(time / 1000.0);
}
private static long untransformX(long x) {
return x * 1000;
}
private static long transformY(long time) {
return time;
}
private static Set<Long> getXPoints(List<TaskAttempt> mapAttempts, List<TaskAttempt> reduceAttempts, long submitTimeX, long finishTimeX) {
TreeSet<Long> xPoints = new TreeSet<Long>();
TreeSet<TaskAttempt> sortedAttempts = new TreeSet<TaskAttempt>(new Comparator<TaskAttempt>() {
@Override
public int compare(TaskAttempt t1, TaskAttempt t2) {
if (t1.getStartTime() < t2.getStartTime())
return -1;
else if (t1.getStartTime() > t2.getStartTime())
return 1;
return t1.getTaskAttemptId().compareTo(t2.getTaskAttemptId());
}
});
sortedAttempts.addAll(mapAttempts);
sortedAttempts.addAll(reduceAttempts);
getXPoints(sortedAttempts, xPoints);
xPoints.add(submitTimeX);
xPoints.add(finishTimeX);
return xPoints;
}
private static void getXPoints(Iterable<TaskAttempt> taskAttempts, Set<Long> xPoints) {
for (TaskAttempt taskAttempt : taskAttempts) {
long x = transformX(taskAttempt.getStartTime());
while (xPoints.contains(x))
x += 1;
xPoints.add(x);
taskAttempt.setStartTime(untransformX(x));
}
}
private static int addDataPoint(List<DataPoint> data, DataPoint point, int index, Long[] xPoints) {
while (index < xPoints.length) {
if (point.getX() == xPoints[index]) {
index++;
break;
} else if (point.getX() > xPoints[index]) {
data.add(new DataPoint(xPoints[index++]));
}
}
data.add(point);
return index;
}
private static List<DataPoint> processLocalityData(List<TaskAttempt> taskAttempts, String locality, Long[] xPoints, MinMax io) {
List<DataPoint> data = new ArrayList<DataPoint>();
int i = 0;
for (TaskAttempt taskAttempt : taskAttempts) {
if (locality.equals(taskAttempt.getLocality())) {
DataPoint point = new DataPoint();
point.setX(transformX(taskAttempt.getStartTime()));
point.setY(transformY(taskAttempt.getFinishTime() - taskAttempt.getStartTime()));
point.setIO(taskAttempt.getInputBytes() + taskAttempt.getOutputBytes());
point.setLabel(taskAttempt.getTaskAttemptId());
point.setStatus(taskAttempt.getStatus());
i = addDataPoint(data, point, i, xPoints);
io.max = Math.max(io.max, point.getIO());
io.min = Math.min(io.min, point.getIO());
}
}
while (i < xPoints.length)
data.add(new DataPoint(xPoints[i++]));
return data;
}
private static void setRValues(List<DataPoint> data, int minr, int maxr, long maxIO) {
for (DataPoint point : data) {
if (point.getY() == 0) {
continue;
}
if (maxIO == 0 || maxr == minr) {
point.setR(minr);
continue;
}
point.setR(Math.round(Math.sqrt(point.getIO() * 1.0 / maxIO) * (maxr - minr) + minr));
}
}
}