| /* |
| * Copyright 2009-2010 by The Regents of the University of California |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * you may obtain a copy of the License from |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package edu.uci.ics.hyracks.control.cc.adminconsole.pages; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.wicket.markup.html.basic.Label; |
| import org.apache.wicket.request.mapper.parameter.PageParameters; |
| import org.apache.wicket.util.string.StringValue; |
| import org.json.JSONArray; |
| import org.json.JSONObject; |
| |
| import edu.uci.ics.hyracks.api.dataflow.ActivityId; |
| import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId; |
| import edu.uci.ics.hyracks.api.dataflow.TaskId; |
| import edu.uci.ics.hyracks.api.job.JobId; |
| import edu.uci.ics.hyracks.control.cc.ClusterControllerService; |
| import edu.uci.ics.hyracks.control.cc.work.GetJobActivityGraphJSONWork; |
| import edu.uci.ics.hyracks.control.cc.work.GetJobRunJSONWork; |
| import edu.uci.ics.hyracks.control.cc.work.GetJobSpecificationJSONWork; |
| |
| public class JobDetailsPage extends AbstractPage { |
| private static final long serialVersionUID = 1L; |
| |
| private static final int HEIGHT = 29; |
| |
| public JobDetailsPage(PageParameters params) throws Exception { |
| ClusterControllerService ccs = getAdminConsoleApplication().getClusterControllerService(); |
| |
| StringValue jobIdStr = params.get("job-id"); |
| |
| JobId jobId = JobId.parse(jobIdStr.toString()); |
| |
| GetJobSpecificationJSONWork gjsw = new GetJobSpecificationJSONWork(ccs, jobId); |
| ccs.getWorkQueue().scheduleAndSync(gjsw); |
| Label jobspec = new Label("job-specification", gjsw.getJSON().toString()); |
| jobspec.setEscapeModelStrings(false); |
| add(jobspec); |
| |
| GetJobActivityGraphJSONWork gjagw = new GetJobActivityGraphJSONWork(ccs, jobId); |
| ccs.getWorkQueue().scheduleAndSync(gjagw); |
| Label jag = new Label("job-activity-graph", gjagw.getJSON().toString()); |
| jag.setEscapeModelStrings(false); |
| add(jag); |
| |
| JSONObject jagO = gjagw.getJSON(); |
| |
| Map<ActivityId, String> activityMap = new HashMap<ActivityId, String>(); |
| if (jagO.has("activities")) { |
| JSONArray aArray = jagO.getJSONArray("activities"); |
| for (int i = 0; i < aArray.length(); ++i) { |
| JSONObject aO = aArray.getJSONObject(i); |
| ActivityId aid = ActivityId.parse(aO.getString("id")); |
| String className = aO.getString("java-class"); |
| |
| activityMap.put(aid, className); |
| } |
| } |
| |
| GetJobRunJSONWork gjrw = new GetJobRunJSONWork(ccs, jobId); |
| ccs.getWorkQueue().scheduleAndSync(gjrw); |
| Label jobrun = new Label("job-run", gjrw.getJSON().toString()); |
| jobrun.setEscapeModelStrings(false); |
| add(jobrun); |
| |
| JSONObject jrO = gjrw.getJSON(); |
| |
| List<TaskClusterAttempt[]> tcList = new ArrayList<TaskClusterAttempt[]>(); |
| long minTime = Long.MAX_VALUE; |
| long maxTime = Long.MIN_VALUE; |
| if (jrO.has("activity-clusters")) { |
| JSONArray acA = jrO.getJSONArray("activity-clusters"); |
| for (int i = 0; i < acA.length(); ++i) { |
| JSONObject acO = acA.getJSONObject(i); |
| if (acO.has("plan")) { |
| JSONObject planO = acO.getJSONObject("plan"); |
| if (planO.has("task-clusters")) { |
| JSONArray tcA = planO.getJSONArray("task-clusters"); |
| for (int j = 0; j < tcA.length(); ++j) { |
| JSONObject tcO = tcA.getJSONObject(j); |
| String tcId = tcO.getString("task-cluster-id"); |
| if (tcO.has("attempts")) { |
| JSONArray tcaA = tcO.getJSONArray("attempts"); |
| TaskClusterAttempt[] tcAttempts = new TaskClusterAttempt[tcaA.length()]; |
| for (int k = 0; k < tcaA.length(); ++k) { |
| JSONObject tcaO = tcaA.getJSONObject(k); |
| int attempt = tcaO.getInt("attempt"); |
| long startTime = tcaO.getLong("start-time"); |
| long endTime = tcaO.getLong("end-time"); |
| |
| tcAttempts[k] = new TaskClusterAttempt(tcId, attempt, startTime, endTime); |
| if (startTime < minTime) { |
| minTime = startTime; |
| } |
| if (endTime > maxTime) { |
| maxTime = endTime; |
| } |
| if (tcaO.has("task-attempts")) { |
| JSONArray taArray = tcaO.getJSONArray("task-attempts"); |
| tcAttempts[k].tasks = new TaskAttempt[taArray.length()]; |
| for (int l = 0; l < taArray.length(); ++l) { |
| JSONObject taO = taArray.getJSONObject(l); |
| TaskAttemptId taId = TaskAttemptId.parse(taO.getString("task-attempt-id")); |
| TaskAttempt ta = new TaskAttempt(taId, taO.getLong("start-time"), |
| taO.getLong("end-time")); |
| tcAttempts[k].tasks[l] = ta; |
| TaskId tid = taId.getTaskId(); |
| ta.name = activityMap.get(tid.getActivityId()); |
| ta.partition = tid.getPartition(); |
| } |
| Arrays.sort(tcAttempts[k].tasks, new Comparator<TaskAttempt>() { |
| @Override |
| public int compare(TaskAttempt o1, TaskAttempt o2) { |
| return o1.startTime < o2.startTime ? -1 |
| : (o1.startTime > o2.startTime ? 1 : 0); |
| } |
| }); |
| } |
| } |
| Arrays.sort(tcAttempts, new Comparator<TaskClusterAttempt>() { |
| @Override |
| public int compare(TaskClusterAttempt o1, TaskClusterAttempt o2) { |
| return o1.startTime < o2.startTime ? -1 : (o1.startTime > o2.startTime ? 1 : 0); |
| } |
| }); |
| tcList.add(tcAttempts); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| Map<TaskAttemptId, TaskProfile> tpMap = new HashMap<TaskAttemptId, TaskProfile>(); |
| if (jrO.has("profile")) { |
| JSONObject pO = jrO.getJSONObject("profile"); |
| if (pO.has("joblets")) { |
| JSONArray jobletsA = pO.getJSONArray("joblets"); |
| for (int i = 0; i < jobletsA.length(); ++i) { |
| JSONObject jobletO = jobletsA.getJSONObject(i); |
| if (jobletO.has("tasks")) { |
| JSONArray tasksA = jobletO.getJSONArray("tasks"); |
| for (int j = 0; j < tasksA.length(); ++j) { |
| JSONObject taskO = tasksA.getJSONObject(j); |
| ActivityId activityId = ActivityId.parse(taskO.getString("activity-id")); |
| int partition = taskO.getInt("partition"); |
| int attempt = taskO.getInt("attempt"); |
| TaskAttemptId taId = new TaskAttemptId(new TaskId(activityId, partition), attempt); |
| if (taskO.has("partition-send-profile")) { |
| JSONArray taskProfilesA = taskO.getJSONArray("partition-send-profile"); |
| for (int k = 0; k < taskProfilesA.length(); ++k) { |
| JSONObject ppO = taskProfilesA.getJSONObject(k); |
| long openTime = ppO.getLong("open-time"); |
| long closeTime = ppO.getLong("close-time"); |
| int resolution = ppO.getInt("resolution"); |
| long offset = ppO.getLong("offset"); |
| JSONArray frameTimesA = ppO.getJSONArray("frame-times"); |
| long[] frameTimes = new long[frameTimesA.length()]; |
| for (int l = 0; l < frameTimes.length; ++l) { |
| frameTimes[l] = frameTimesA.getInt(l) + offset; |
| } |
| TaskProfile tp = new TaskProfile(taId, openTime, closeTime, frameTimes, resolution); |
| if (!tpMap.containsKey(tp.taId)) { |
| tpMap.put(tp.taId, tp); |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| if (!tcList.isEmpty()) { |
| Collections.sort(tcList, new Comparator<TaskClusterAttempt[]>() { |
| @Override |
| public int compare(TaskClusterAttempt[] o1, TaskClusterAttempt[] o2) { |
| if (o1.length == 0) { |
| return o2.length == 0 ? 0 : -1; |
| } else if (o2.length == 0) { |
| return 1; |
| } |
| return o1[0].startTime < o2[0].startTime ? -1 : (o1[0].startTime > o2[0].startTime ? 1 : 0); |
| } |
| }); |
| long range = maxTime - minTime; |
| |
| double leftOffset = 20; |
| |
| int xWidth = 1024; |
| double width = ((double) xWidth) / range; |
| StringBuilder buffer = new StringBuilder(); |
| int y = 0; |
| for (TaskClusterAttempt[] tcAttempts : tcList) { |
| for (int i = 0; i < tcAttempts.length; ++i) { |
| TaskClusterAttempt tca = tcAttempts[i]; |
| long startTime = tca.startTime - minTime; |
| long endTime = tca.endTime - minTime; |
| buffer.append("<rect x=\"").append(startTime * width + leftOffset).append("\" y=\"") |
| .append(y * (HEIGHT + 1)).append("\" width=\"").append(width * (endTime - startTime)) |
| .append("\" height=\"").append(HEIGHT).append("\"/>\n"); |
| buffer.append("<text x=\"").append(endTime * width + leftOffset + 20).append("\" y=\"") |
| .append(y * (HEIGHT + 1) + HEIGHT * 3 / 4).append("\">") |
| .append((endTime - startTime) + " ms").append("</text>\n"); |
| ++y; |
| for (int j = 0; j < tca.tasks.length; ++j) { |
| TaskAttempt ta = tca.tasks[j]; |
| long tStartTime = ta.startTime - minTime; |
| long tEndTime = ta.endTime - minTime; |
| buffer.append("<rect x=\"").append(tStartTime * width + leftOffset).append("\" y=\"") |
| .append(y * (HEIGHT + 1) + HEIGHT / 4).append("\" width=\"") |
| .append(width * (tEndTime - tStartTime)).append("\" height=\"").append(HEIGHT / 2) |
| .append("\" style=\"fill:rgb(255,255,255);stroke-width:1;stroke:rgb(0,0,0)\"/>\n"); |
| buffer.append("<text x=\"").append(tEndTime * width + leftOffset + 20).append("\" y=\"") |
| .append(y * (HEIGHT + 1) + HEIGHT * 3 / 4).append("\">") |
| .append((tEndTime - tStartTime) + " ms (" + ta.name + ":" + ta.partition + ")") |
| .append("</text>\n"); |
| TaskProfile tp = tpMap.get(ta.taId); |
| if (tp != null) { |
| for (int k = 0; k < tp.frameTimes.length; ++k) { |
| long taOpenTime = tp.openTime - minTime; |
| buffer.append("<rect x=\"") |
| .append(taOpenTime * width + leftOffset) |
| .append("\" y=\"") |
| .append(y * (HEIGHT + 1) + HEIGHT / 4) |
| .append("\" width=\"1\" height=\"") |
| .append(HEIGHT / 2) |
| .append("\" style=\"fill:rgb(255,0,0);stroke-width:1;stroke:rgb(255,0,0)\"/>\n"); |
| for (int l = 0; l < tp.frameTimes.length; ++l) { |
| long ft = tp.frameTimes[l]; |
| long ftn = l < tp.frameTimes.length - 1 ? tp.frameTimes[l + 1] : ft; |
| long taNextTime = ft - minTime; |
| long barWidth = ftn - ft; |
| buffer.append("<rect x=\"") |
| .append(taNextTime * width + leftOffset) |
| .append("\" y=\"") |
| .append(y * (HEIGHT + 1) + HEIGHT / 4) |
| .append("\" width=\"") |
| .append(barWidth == 0 ? 1 : (barWidth * width)) |
| .append("\" height=\"") |
| .append(HEIGHT / 2) |
| .append("\" style=\"fill:rgb(0,255,0);stroke-width:1;stroke:rgb(0,255,0)\"/>\n"); |
| } |
| long taCloseTime = tp.closeTime - minTime; |
| buffer.append("<rect x=\"") |
| .append(taCloseTime * width + leftOffset) |
| .append("\" y=\"") |
| .append(y * (HEIGHT + 1) + HEIGHT / 4) |
| .append("\" width=\"1\" height=\"") |
| .append(HEIGHT / 2) |
| .append("\" style=\"fill:rgb(0,0,255);stroke-width:1;stroke:rgb(0,0,255)\"/>\n"); |
| } |
| } |
| ++y; |
| } |
| } |
| } |
| buffer.append("<rect x=\"").append(leftOffset).append("\" y=\"").append(0).append("\" width=\"").append(1) |
| .append("\" height=\"").append((y + 2) * (HEIGHT + 1)).append("\"/>\n"); |
| buffer.append("<rect x=\"").append(0).append("\" y=\"").append((y + 1) * (HEIGHT + 1)) |
| .append("\" width=\"").append(xWidth + 2 * leftOffset).append("\" height=\"").append(1) |
| .append("\"/>\n"); |
| buffer.append("</svg>"); |
| Label markup = new Label("job-timeline", |
| "<svg version=\"1.1\"\nxmlns=\"http://www.w3.org/2000/svg\" width=\"" + (xWidth * 1.5) |
| + "\" height=\"" + ((y + 2) * (HEIGHT + 1)) + "\">\n" + buffer.toString()); |
| markup.setEscapeModelStrings(false); |
| add(markup); |
| } else { |
| Label markup = new Label("job-timeline", "No information available yet"); |
| add(markup); |
| } |
| } |
| |
| private static class TaskAttempt { |
| private TaskAttemptId taId; |
| private long startTime; |
| private long endTime; |
| private String name; |
| private int partition; |
| |
| public TaskAttempt(TaskAttemptId taId, long startTime, long endTime) { |
| this.taId = taId; |
| this.startTime = startTime; |
| this.endTime = endTime; |
| } |
| } |
| |
| private static class TaskClusterAttempt { |
| private String tcId; |
| private int attempt; |
| private long startTime; |
| private long endTime; |
| private TaskAttempt[] tasks; |
| |
| public TaskClusterAttempt(String tcId, int attempt, long startTime, long endTime) { |
| this.tcId = tcId; |
| this.attempt = attempt; |
| this.startTime = startTime; |
| this.endTime = endTime; |
| } |
| } |
| |
| private static class TaskProfile { |
| private TaskAttemptId taId; |
| private long openTime; |
| private long closeTime; |
| private long[] frameTimes; |
| private int resolution; |
| |
| public TaskProfile(TaskAttemptId taId, long openTime, long closeTime, long[] frameTimes, int resolution) { |
| this.taId = taId; |
| this.openTime = openTime; |
| this.closeTime = closeTime; |
| this.frameTimes = frameTimes; |
| this.resolution = resolution; |
| } |
| } |
| } |