blob: 1d96a712215a8682bb6cccb8f19822585cce0d0d [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.control.cc.adminconsole.pages;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.wicket.markup.html.basic.Label;
import org.apache.wicket.request.mapper.parameter.PageParameters;
import org.apache.wicket.util.string.StringValue;
import org.json.JSONArray;
import org.json.JSONObject;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.work.GetJobActivityGraphJSONWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobRunJSONWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobSpecificationJSONWork;
public class JobDetailsPage extends AbstractPage {
private static final long serialVersionUID = 1L;
private static final int HEIGHT = 29;
public JobDetailsPage(PageParameters params) throws Exception {
ClusterControllerService ccs = getAdminConsoleApplication().getClusterControllerService();
StringValue jobIdStr = params.get("job-id");
JobId jobId = JobId.parse(jobIdStr.toString());
GetJobSpecificationJSONWork gjsw = new GetJobSpecificationJSONWork(ccs, jobId);
ccs.getWorkQueue().scheduleAndSync(gjsw);
Label jobspec = new Label("job-specification", gjsw.getJSON().toString());
jobspec.setEscapeModelStrings(false);
add(jobspec);
GetJobActivityGraphJSONWork gjagw = new GetJobActivityGraphJSONWork(ccs, jobId);
ccs.getWorkQueue().scheduleAndSync(gjagw);
Label jag = new Label("job-activity-graph", gjagw.getJSON().toString());
jag.setEscapeModelStrings(false);
add(jag);
JSONObject jagO = gjagw.getJSON();
Map<ActivityId, String> activityMap = new HashMap<ActivityId, String>();
if (jagO.has("activities")) {
JSONArray aArray = jagO.getJSONArray("activities");
for (int i = 0; i < aArray.length(); ++i) {
JSONObject aO = aArray.getJSONObject(i);
ActivityId aid = ActivityId.parse(aO.getString("id"));
String className = aO.getString("java-class");
activityMap.put(aid, className);
}
}
GetJobRunJSONWork gjrw = new GetJobRunJSONWork(ccs, jobId);
ccs.getWorkQueue().scheduleAndSync(gjrw);
Label jobrun = new Label("job-run", gjrw.getJSON().toString());
jobrun.setEscapeModelStrings(false);
add(jobrun);
JSONObject jrO = gjrw.getJSON();
List<TaskClusterAttempt[]> tcList = new ArrayList<TaskClusterAttempt[]>();
long minTime = Long.MAX_VALUE;
long maxTime = Long.MIN_VALUE;
if (jrO.has("activity-clusters")) {
JSONArray acA = jrO.getJSONArray("activity-clusters");
for (int i = 0; i < acA.length(); ++i) {
JSONObject acO = acA.getJSONObject(i);
if (acO.has("plan")) {
JSONObject planO = acO.getJSONObject("plan");
if (planO.has("task-clusters")) {
JSONArray tcA = planO.getJSONArray("task-clusters");
for (int j = 0; j < tcA.length(); ++j) {
JSONObject tcO = tcA.getJSONObject(j);
String tcId = tcO.getString("task-cluster-id");
if (tcO.has("attempts")) {
JSONArray tcaA = tcO.getJSONArray("attempts");
TaskClusterAttempt[] tcAttempts = new TaskClusterAttempt[tcaA.length()];
for (int k = 0; k < tcaA.length(); ++k) {
JSONObject tcaO = tcaA.getJSONObject(k);
int attempt = tcaO.getInt("attempt");
long startTime = tcaO.getLong("start-time");
long endTime = tcaO.getLong("end-time");
tcAttempts[k] = new TaskClusterAttempt(tcId, attempt, startTime, endTime);
if (startTime < minTime) {
minTime = startTime;
}
if (endTime > maxTime) {
maxTime = endTime;
}
if (tcaO.has("task-attempts")) {
JSONArray taArray = tcaO.getJSONArray("task-attempts");
tcAttempts[k].tasks = new TaskAttempt[taArray.length()];
for (int l = 0; l < taArray.length(); ++l) {
JSONObject taO = taArray.getJSONObject(l);
TaskAttemptId taId = TaskAttemptId.parse(taO.getString("task-attempt-id"));
TaskAttempt ta = new TaskAttempt(taId, taO.getLong("start-time"),
taO.getLong("end-time"));
tcAttempts[k].tasks[l] = ta;
TaskId tid = taId.getTaskId();
ta.name = activityMap.get(tid.getActivityId());
ta.partition = tid.getPartition();
}
Arrays.sort(tcAttempts[k].tasks, new Comparator<TaskAttempt>() {
@Override
public int compare(TaskAttempt o1, TaskAttempt o2) {
return o1.startTime < o2.startTime ? -1
: (o1.startTime > o2.startTime ? 1 : 0);
}
});
}
}
Arrays.sort(tcAttempts, new Comparator<TaskClusterAttempt>() {
@Override
public int compare(TaskClusterAttempt o1, TaskClusterAttempt o2) {
return o1.startTime < o2.startTime ? -1 : (o1.startTime > o2.startTime ? 1 : 0);
}
});
tcList.add(tcAttempts);
}
}
}
}
}
}
Map<TaskAttemptId, TaskProfile> tpMap = new HashMap<TaskAttemptId, TaskProfile>();
if (jrO.has("profile")) {
JSONObject pO = jrO.getJSONObject("profile");
if (pO.has("joblets")) {
JSONArray jobletsA = pO.getJSONArray("joblets");
for (int i = 0; i < jobletsA.length(); ++i) {
JSONObject jobletO = jobletsA.getJSONObject(i);
if (jobletO.has("tasks")) {
JSONArray tasksA = jobletO.getJSONArray("tasks");
for (int j = 0; j < tasksA.length(); ++j) {
JSONObject taskO = tasksA.getJSONObject(j);
ActivityId activityId = ActivityId.parse(taskO.getString("activity-id"));
int partition = taskO.getInt("partition");
int attempt = taskO.getInt("attempt");
TaskAttemptId taId = new TaskAttemptId(new TaskId(activityId, partition), attempt);
if (taskO.has("partition-send-profile")) {
JSONArray taskProfilesA = taskO.getJSONArray("partition-send-profile");
for (int k = 0; k < taskProfilesA.length(); ++k) {
JSONObject ppO = taskProfilesA.getJSONObject(k);
long openTime = ppO.getLong("open-time");
long closeTime = ppO.getLong("close-time");
int resolution = ppO.getInt("resolution");
long offset = ppO.getLong("offset");
JSONArray frameTimesA = ppO.getJSONArray("frame-times");
long[] frameTimes = new long[frameTimesA.length()];
for (int l = 0; l < frameTimes.length; ++l) {
frameTimes[l] = frameTimesA.getInt(l) + offset;
}
TaskProfile tp = new TaskProfile(taId, openTime, closeTime, frameTimes, resolution);
if (!tpMap.containsKey(tp.taId)) {
tpMap.put(tp.taId, tp);
}
}
}
}
}
}
}
}
if (!tcList.isEmpty()) {
Collections.sort(tcList, new Comparator<TaskClusterAttempt[]>() {
@Override
public int compare(TaskClusterAttempt[] o1, TaskClusterAttempt[] o2) {
if (o1.length == 0) {
return o2.length == 0 ? 0 : -1;
} else if (o2.length == 0) {
return 1;
}
return o1[0].startTime < o2[0].startTime ? -1 : (o1[0].startTime > o2[0].startTime ? 1 : 0);
}
});
long range = maxTime - minTime;
double leftOffset = 20;
int xWidth = 1024;
double width = ((double) xWidth) / range;
StringBuilder buffer = new StringBuilder();
int y = 0;
for (TaskClusterAttempt[] tcAttempts : tcList) {
for (int i = 0; i < tcAttempts.length; ++i) {
TaskClusterAttempt tca = tcAttempts[i];
long startTime = tca.startTime - minTime;
long endTime = tca.endTime - minTime;
buffer.append("<rect x=\"").append(startTime * width + leftOffset).append("\" y=\"")
.append(y * (HEIGHT + 1)).append("\" width=\"").append(width * (endTime - startTime))
.append("\" height=\"").append(HEIGHT).append("\"/>\n");
buffer.append("<text x=\"").append(endTime * width + leftOffset + 20).append("\" y=\"")
.append(y * (HEIGHT + 1) + HEIGHT * 3 / 4).append("\">")
.append((endTime - startTime) + " ms").append("</text>\n");
++y;
for (int j = 0; j < tca.tasks.length; ++j) {
TaskAttempt ta = tca.tasks[j];
long tStartTime = ta.startTime - minTime;
long tEndTime = ta.endTime - minTime;
buffer.append("<rect x=\"").append(tStartTime * width + leftOffset).append("\" y=\"")
.append(y * (HEIGHT + 1) + HEIGHT / 4).append("\" width=\"")
.append(width * (tEndTime - tStartTime)).append("\" height=\"").append(HEIGHT / 2)
.append("\" style=\"fill:rgb(255,255,255);stroke-width:1;stroke:rgb(0,0,0)\"/>\n");
buffer.append("<text x=\"").append(tEndTime * width + leftOffset + 20).append("\" y=\"")
.append(y * (HEIGHT + 1) + HEIGHT * 3 / 4).append("\">")
.append((tEndTime - tStartTime) + " ms (" + ta.name + ":" + ta.partition + ")")
.append("</text>\n");
TaskProfile tp = tpMap.get(ta.taId);
if (tp != null) {
for (int k = 0; k < tp.frameTimes.length; ++k) {
long taOpenTime = tp.openTime - minTime;
buffer.append("<rect x=\"")
.append(taOpenTime * width + leftOffset)
.append("\" y=\"")
.append(y * (HEIGHT + 1) + HEIGHT / 4)
.append("\" width=\"1\" height=\"")
.append(HEIGHT / 2)
.append("\" style=\"fill:rgb(255,0,0);stroke-width:1;stroke:rgb(255,0,0)\"/>\n");
for (int l = 0; l < tp.frameTimes.length; ++l) {
long ft = tp.frameTimes[l];
long ftn = l < tp.frameTimes.length - 1 ? tp.frameTimes[l + 1] : ft;
long taNextTime = ft - minTime;
long barWidth = ftn - ft;
buffer.append("<rect x=\"")
.append(taNextTime * width + leftOffset)
.append("\" y=\"")
.append(y * (HEIGHT + 1) + HEIGHT / 4)
.append("\" width=\"")
.append(barWidth == 0 ? 1 : (barWidth * width))
.append("\" height=\"")
.append(HEIGHT / 2)
.append("\" style=\"fill:rgb(0,255,0);stroke-width:1;stroke:rgb(0,255,0)\"/>\n");
}
long taCloseTime = tp.closeTime - minTime;
buffer.append("<rect x=\"")
.append(taCloseTime * width + leftOffset)
.append("\" y=\"")
.append(y * (HEIGHT + 1) + HEIGHT / 4)
.append("\" width=\"1\" height=\"")
.append(HEIGHT / 2)
.append("\" style=\"fill:rgb(0,0,255);stroke-width:1;stroke:rgb(0,0,255)\"/>\n");
}
}
++y;
}
}
}
buffer.append("<rect x=\"").append(leftOffset).append("\" y=\"").append(0).append("\" width=\"").append(1)
.append("\" height=\"").append((y + 2) * (HEIGHT + 1)).append("\"/>\n");
buffer.append("<rect x=\"").append(0).append("\" y=\"").append((y + 1) * (HEIGHT + 1))
.append("\" width=\"").append(xWidth + 2 * leftOffset).append("\" height=\"").append(1)
.append("\"/>\n");
buffer.append("</svg>");
Label markup = new Label("job-timeline",
"<svg version=\"1.1\"\nxmlns=\"http://www.w3.org/2000/svg\" width=\"" + (xWidth * 1.5)
+ "\" height=\"" + ((y + 2) * (HEIGHT + 1)) + "\">\n" + buffer.toString());
markup.setEscapeModelStrings(false);
add(markup);
} else {
Label markup = new Label("job-timeline", "No information available yet");
add(markup);
}
}
private static class TaskAttempt {
private TaskAttemptId taId;
private long startTime;
private long endTime;
private String name;
private int partition;
public TaskAttempt(TaskAttemptId taId, long startTime, long endTime) {
this.taId = taId;
this.startTime = startTime;
this.endTime = endTime;
}
}
private static class TaskClusterAttempt {
private String tcId;
private int attempt;
private long startTime;
private long endTime;
private TaskAttempt[] tasks;
public TaskClusterAttempt(String tcId, int attempt, long startTime, long endTime) {
this.tcId = tcId;
this.attempt = attempt;
this.startTime = startTime;
this.endTime = endTime;
}
}
private static class TaskProfile {
private TaskAttemptId taId;
private long openTime;
private long closeTime;
private long[] frameTimes;
private int resolution;
public TaskProfile(TaskAttemptId taId, long openTime, long closeTime, long[] frameTimes, int resolution) {
this.taId = taId;
this.openTime = openTime;
this.closeTime = closeTime;
this.frameTimes = frameTimes;
this.resolution = resolution;
}
}
}