blob: e5326d6170b02aaa56fc9a47d35d9153e4f73622 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysml.api.monitoring;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import scala.collection.Seq;
import scala.xml.Node;
import com.google.common.collect.Multimap;
import com.google.common.collect.TreeMultimap;
import org.apache.sysml.lops.Lop;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.instructions.Instruction;
import org.apache.sysml.runtime.instructions.spark.SPInstruction;
import org.apache.sysml.runtime.instructions.spark.functions.SparkListener;
/**
* Usage guide:
* MLContext mlCtx = new MLContext(sc, true);
* mlCtx.register...
* mlCtx.execute(...)
* mlCtx.getMonitoringUtil().getRuntimeInfoInHTML("runtime.html");
*/
public class SparkMonitoringUtil {
// ----------------------------------------------------
// For VLDB Demo:
private Multimap<Location, String> instructions = TreeMultimap.create();
private Multimap<String, Integer> stageIDs = TreeMultimap.create(); // instruction -> stageIds
private Multimap<String, Integer> jobIDs = TreeMultimap.create(); // instruction -> jobIds
private HashMap<String, String> lineageInfo = new HashMap<String, String>(); // instruction -> lineageInfo
private HashMap<String, Long> instructionCreationTime = new HashMap<String, Long>();
private Multimap<Integer, String> rddInstructionMapping = TreeMultimap.create();
private HashSet<String> getRelatedInstructions(int stageID) {
HashSet<String> retVal = new HashSet<String>();
if(_sparkListener != null) {
ArrayList<Integer> rdds = _sparkListener.stageRDDMapping.get(stageID);
for(Integer rddID : rdds) {
retVal.addAll(rddInstructionMapping.get(rddID));
}
}
return retVal;
}
private SparkListener _sparkListener = null;
public SparkListener getSparkListener() {
return _sparkListener;
}
private String explainOutput = "";
public String getExplainOutput() {
return explainOutput;
}
public void setExplainOutput(String explainOutput) {
this.explainOutput = explainOutput;
}
public SparkMonitoringUtil(SparkListener sparkListener) {
_sparkListener = sparkListener;
}
public void addCurrentInstruction(SPInstruction inst) {
if(_sparkListener != null) {
_sparkListener.addCurrentInstruction(inst);
}
}
public void addRDDForInstruction(SPInstruction inst, Integer rddID) {
this.rddInstructionMapping.put(rddID, getInstructionString(inst));
}
public void removeCurrentInstruction(SPInstruction inst) {
if(_sparkListener != null) {
_sparkListener.removeCurrentInstruction(inst);
}
}
public void setDMLString(String dmlStr) {
this.dmlStrForMonitoring = dmlStr;
}
public void resetMonitoringData() {
if(_sparkListener != null && _sparkListener.stageDAGs != null)
_sparkListener.stageDAGs.clear();
if(_sparkListener != null && _sparkListener.stageTimeline != null)
_sparkListener.stageTimeline.clear();
}
// public Multimap<Location, String> hops = ArrayListMultimap.create(); TODO:
private String dmlStrForMonitoring = null;
public void getRuntimeInfoInHTML(String htmlFilePath) throws DMLRuntimeException, IOException {
String jsAndCSSFiles = "<script src=\"js/lodash.min.js\"></script>"
+ "<script src=\"js/jquery-1.11.1.min.js\"></script>"
+ "<script src=\"js/d3.min.js\"></script>"
+ "<script src=\"js/bootstrap-tooltip.js\"></script>"
+ "<script src=\"js/dagre-d3.min.js\"></script>"
+ "<script src=\"js/graphlib-dot.min.js\"></script>"
+ "<script src=\"js/spark-dag-viz.js\"></script>"
+ "<script src=\"js/timeline-view.js\"></script>"
+ "<script src=\"js/vis.min.js\"></script>"
+ "<link rel=\"stylesheet\" href=\"css/bootstrap.min.css\">"
+ "<link rel=\"stylesheet\" href=\"css/vis.min.css\">"
+ "<link rel=\"stylesheet\" href=\"css/spark-dag-viz.css\">"
+ "<link rel=\"stylesheet\" href=\"css/timeline-view.css\"> ";
BufferedWriter bw = new BufferedWriter(new FileWriter(htmlFilePath));
bw.write("<html><head>\n");
bw.write(jsAndCSSFiles + "\n");
bw.write("</head><body>\n<table border=1>\n");
bw.write("<tr>\n");
bw.write("<td><b>Position in script</b></td>\n");
bw.write("<td><b>DML</b></td>\n");
bw.write("<td><b>Instruction</b></td>\n");
bw.write("<td><b>StageIDs</b></td>\n");
bw.write("<td><b>RDD Lineage</b></td>\n");
bw.write("</tr>\n");
for(Location loc : instructions.keySet()) {
String dml = getExpression(loc);
// Sort the instruction with time - so as to separate recompiled instructions
List<String> listInst = new ArrayList<String>(instructions.get(loc));
Collections.sort(listInst, new InstructionComparator(instructionCreationTime));
if(dml != null && dml.trim().length() > 1) {
bw.write("<tr>\n");
int rowSpan = listInst.size();
bw.write("<td rowspan=\"" + rowSpan + "\">" + loc.toString() + "</td>\n");
bw.write("<td rowspan=\"" + rowSpan + "\">" + dml + "</td>\n");
boolean firstTime = true;
for(String inst : listInst) {
if(!firstTime)
bw.write("<tr>\n");
if(inst.startsWith("SPARK"))
bw.write("<td style=\"color:red\">" + inst + "</td>\n");
else if(isInterestingCP(inst))
bw.write("<td style=\"color:blue\">" + inst + "</td>\n");
else
bw.write("<td>" + inst + "</td>\n");
bw.write("<td>" + getStageIDAsString(inst) + "</td>\n");
if(lineageInfo.containsKey(inst))
bw.write("<td>" + lineageInfo.get(inst).replaceAll("\n", "<br />") + "</td>\n");
else
bw.write("<td></td>\n");
bw.write("</tr>\n");
firstTime = false;
}
}
}
bw.write("</table></body>\n</html>");
bw.close();
}
private String getInQuotes(String str) {
return "\"" + str + "\"";
}
private String getEscapedJSON(String json) {
if(json == null)
return "";
else {
return json
//.replaceAll("\\\\", "\\\\\\")
.replaceAll("\\t", "\\\\t")
.replaceAll("/", "\\\\/")
.replaceAll("\"", "\\\\\"")
.replaceAll("\\r?\\n", "\\\\n");
}
}
private long maxExpressionExecutionTime = 0;
HashMap<Integer, Long> stageExecutionTimes = new HashMap<Integer, Long>();
HashMap<String, Long> expressionExecutionTimes = new HashMap<String, Long>();
HashMap<String, Long> instructionExecutionTimes = new HashMap<String, Long>();
HashMap<Integer, HashSet<String>> relatedInstructionsPerStage = new HashMap<Integer, HashSet<String>>();
private void fillExecutionTimes() {
stageExecutionTimes.clear();
expressionExecutionTimes.clear();
for(Location loc : instructions.keySet()) {
List<String> listInst = new ArrayList<String>(instructions.get(loc));
long expressionExecutionTime = 0;
for(String inst : listInst) {
long instructionExecutionTime = 0;
for(Integer stageId : stageIDs.get(inst)) {
try {
if(getStageExecutionTime(stageId) != null) {
long stageExecTime = getStageExecutionTime(stageId);
instructionExecutionTime += stageExecTime;
expressionExecutionTime += stageExecTime;
stageExecutionTimes.put(stageId, stageExecTime);
}
}
catch(Exception e) {}
relatedInstructionsPerStage.put(stageId, getRelatedInstructions(stageId));
}
instructionExecutionTimes.put(inst, instructionExecutionTime);
}
expressionExecutionTime /= listInst.size(); // average
maxExpressionExecutionTime = Math.max(maxExpressionExecutionTime, expressionExecutionTime);
expressionExecutionTimes.put(loc.toString(), expressionExecutionTime);
}
// Now fill empty instructions
for(Entry<String, Long> kv : instructionExecutionTimes.entrySet()) {
if(kv.getValue() == 0) {
// Find all stages that contain this as related instruction
long sumExecutionTime = 0;
for(Entry<Integer, HashSet<String>> kv1 : relatedInstructionsPerStage.entrySet()) {
if(kv1.getValue().contains(kv.getKey())) {
sumExecutionTime += stageExecutionTimes.get(kv1.getKey());
}
}
kv.setValue(sumExecutionTime);
}
}
for(Location loc : instructions.keySet()) {
if(expressionExecutionTimes.get(loc.toString()) == 0) {
List<String> listInst = new ArrayList<String>(instructions.get(loc));
long expressionExecutionTime = 0;
for(String inst : listInst) {
expressionExecutionTime += instructionExecutionTimes.get(inst);
}
expressionExecutionTime /= listInst.size(); // average
maxExpressionExecutionTime = Math.max(maxExpressionExecutionTime, expressionExecutionTime);
expressionExecutionTimes.put(loc.toString(), expressionExecutionTime);
}
}
}
/**
* Useful to avoid passing large String through Py4J
* @param fileName
* @throws DMLRuntimeException
* @throws IOException
*/
public void saveRuntimeInfoInJSONFormat(String fileName) throws DMLRuntimeException, IOException {
String json = getRuntimeInfoInJSONFormat();
BufferedWriter bw = new BufferedWriter(new FileWriter(fileName));
bw.write(json);
bw.close();
}
public String getRuntimeInfoInJSONFormat() throws DMLRuntimeException, IOException {
StringBuilder retVal = new StringBuilder("{\n");
retVal.append(getInQuotes("dml") + ":" + getInQuotes(getEscapedJSON(dmlStrForMonitoring)) + ",\n");
retVal.append(getInQuotes("expressions") + ":" + "[\n");
boolean isFirstExpression = true;
fillExecutionTimes();
for(Location loc : instructions.keySet()) {
String dml = getEscapedJSON(getExpressionInJSON(loc));
if(dml != null) {
// Sort the instruction with time - so as to separate recompiled instructions
List<String> listInst = new ArrayList<String>(instructions.get(loc));
Collections.sort(listInst, new InstructionComparator(instructionCreationTime));
if(!isFirstExpression) {
retVal.append(",\n");
}
retVal.append("{\n");
isFirstExpression = false;
retVal.append(getInQuotes("beginLine") + ":" + loc.beginLine + ",\n");
retVal.append(getInQuotes("beginCol") + ":" + loc.beginCol + ",\n");
retVal.append(getInQuotes("endLine") + ":" + loc.endLine + ",\n");
retVal.append(getInQuotes("endCol") + ":" + loc.endCol + ",\n");
long expressionExecutionTime = expressionExecutionTimes.get(loc.toString());
retVal.append(getInQuotes("expressionExecutionTime") + ":" + expressionExecutionTime + ",\n");
retVal.append(getInQuotes("expressionHeavyHitterFactor") + ":" + ((double)expressionExecutionTime / (double)maxExpressionExecutionTime) + ",\n");
retVal.append(getInQuotes("expression") + ":" + getInQuotes(dml) + ",\n");
retVal.append(getInQuotes("instructions") + ":" + "[\n");
boolean firstTime = true;
for(String inst : listInst) {
if(!firstTime)
retVal.append(", {");
else
retVal.append("{");
if(inst.startsWith("SPARK")) {
retVal.append(getInQuotes("isSpark") + ":" + "true,\n");
}
else if(isInterestingCP(inst)) {
retVal.append(getInQuotes("isInteresting") + ":" + "true,\n");
}
retVal.append(getStageIDAsJSONString(inst) + "\n");
if(lineageInfo.containsKey(inst)) {
retVal.append(getInQuotes("lineageInfo") + ":" + getInQuotes(getEscapedJSON(lineageInfo.get(inst))) + ",\n");
}
retVal.append(getInQuotes("instruction") + ":" + getInQuotes(getEscapedJSON(inst)));
retVal.append("}");
firstTime = false;
}
retVal.append("]\n");
retVal.append("}\n");
}
}
return retVal.append("]\n}").toString();
}
private boolean isInterestingCP(String inst) {
if(inst.startsWith("CP rmvar") || inst.startsWith("CP cpvar") || inst.startsWith("CP mvvar"))
return false;
else if(inst.startsWith("CP"))
return true;
else
return false;
}
private String getStageIDAsString(String instruction) {
String retVal = "";
for(Integer stageId : stageIDs.get(instruction)) {
String stageDAG = "";
String stageTimeLine = "";
if(getStageDAGs(stageId) != null) {
stageDAG = getStageDAGs(stageId).toString();
}
if(getStageTimeLine(stageId) != null) {
stageTimeLine = getStageTimeLine(stageId).toString();
}
retVal += "Stage:" + stageId +
" ("
+ "<div>"
+ stageDAG.replaceAll("toggleDagViz\\(false\\)", "toggleDagViz(false, this)")
+ "</div>, "
+ "<div id=\"timeline-" + stageId + "\">"
+ stageTimeLine
.replaceAll("drawTaskAssignmentTimeline\\(", "registerTimelineData(" + stageId + ", ")
.replaceAll("class=\"expand-task-assignment-timeline\"", "class=\"expand-task-assignment-timeline\" onclick=\"toggleStageTimeline(this)\"")
+ "</div>"
+ ")";
}
return retVal;
}
private String getStageIDAsJSONString(String instruction) {
long instructionExecutionTime = instructionExecutionTimes.get(instruction);
StringBuilder retVal = new StringBuilder(getInQuotes("instructionExecutionTime") + ":" + instructionExecutionTime + ",\n");
boolean isFirst = true;
if(stageIDs.get(instruction).size() == 0) {
// Find back references
HashSet<Integer> relatedStages = new HashSet<Integer>();
for(Entry<Integer, HashSet<String>> kv : relatedInstructionsPerStage.entrySet()) {
if(kv.getValue().contains(instruction)) {
relatedStages.add(kv.getKey());
}
}
HashSet<String> relatedInstructions = new HashSet<String>();
for(Entry<String, Integer> kv : stageIDs.entries()) {
if(relatedStages.contains(kv.getValue())) {
relatedInstructions.add(kv.getKey());
}
}
retVal.append(getInQuotes("backReferences") + ": [\n");
boolean isFirstRelInst = true;
for(String relInst : relatedInstructions) {
if(!isFirstRelInst) {
retVal.append(",\n");
}
retVal.append(getInQuotes(relInst));
isFirstRelInst = false;
}
retVal.append("], \n");
}
else {
retVal.append(getInQuotes("stages") + ": {");
for(Integer stageId : stageIDs.get(instruction)) {
String stageDAG = "";
String stageTimeLine = "";
if(getStageDAGs(stageId) != null) {
stageDAG = getStageDAGs(stageId).toString();
}
if(getStageTimeLine(stageId) != null) {
stageTimeLine = getStageTimeLine(stageId).toString();
}
long stageExecutionTime = stageExecutionTimes.get(stageId);
if(!isFirst) {
retVal.append(",\n");
}
retVal.append(getInQuotes("" + stageId) + ": {");
// Now add related instructions
HashSet<String> relatedInstructions = relatedInstructionsPerStage.get(stageId);
retVal.append(getInQuotes("relatedInstructions") + ": [\n");
boolean isFirstRelInst = true;
for(String relInst : relatedInstructions) {
if(!isFirstRelInst) {
retVal.append(",\n");
}
retVal.append(getInQuotes(relInst));
isFirstRelInst = false;
}
retVal.append("],\n");
retVal.append(getInQuotes("DAG") + ":")
.append(
getInQuotes(
getEscapedJSON(stageDAG.replaceAll("toggleDagViz\\(false\\)", "toggleDagViz(false, this)"))
) + ",\n"
)
.append(getInQuotes("stageExecutionTime") + ":" + stageExecutionTime + ",\n")
.append(getInQuotes("timeline") + ":")
.append(
getInQuotes(
getEscapedJSON(
stageTimeLine
.replaceAll("drawTaskAssignmentTimeline\\(", "registerTimelineData(" + stageId + ", ")
.replaceAll("class=\"expand-task-assignment-timeline\"", "class=\"expand-task-assignment-timeline\" onclick=\"toggleStageTimeline(this)\""))
)
)
.append("}");
isFirst = false;
}
retVal.append("}, ");
}
retVal.append(getInQuotes("jobs") + ": {");
isFirst = true;
for(Integer jobId : jobIDs.get(instruction)) {
String jobDAG = "";
if(getJobDAGs(jobId) != null) {
jobDAG = getJobDAGs(jobId).toString();
}
if(!isFirst) {
retVal.append(",\n");
}
retVal.append(getInQuotes("" + jobId) + ": {")
.append(getInQuotes("DAG") + ":" )
.append(getInQuotes(
getEscapedJSON(jobDAG.replaceAll("toggleDagViz\\(true\\)", "toggleDagViz(true, this)"))
) + "}\n");
isFirst = false;
}
retVal.append("}, ");
return retVal.toString();
}
String [] dmlLines = null;
private String getExpression(Location loc) {
try {
if(dmlLines == null) {
dmlLines = dmlStrForMonitoring.split("\\r?\\n");
}
if(loc.beginLine == loc.endLine) {
return dmlLines[loc.beginLine-1].substring(loc.beginCol-1, loc.endCol);
}
else {
String retVal = dmlLines[loc.beginLine-1].substring(loc.beginCol-1);
for(int i = loc.beginLine+1; i < loc.endLine; i++) {
retVal += "<br />" + dmlLines[i-1];
}
retVal += "<br />" + dmlLines[loc.endLine-1].substring(0, loc.endCol);
return retVal;
}
}
catch(Exception e) {
return null; // "[[" + loc.beginLine + "," + loc.endLine + "," + loc.beginCol + "," + loc.endCol + "]]";
}
}
private String getExpressionInJSON(Location loc) {
try {
if(dmlLines == null) {
dmlLines = dmlStrForMonitoring.split("\\r?\\n");
}
if(loc.beginLine == loc.endLine) {
return dmlLines[loc.beginLine-1].substring(loc.beginCol-1, loc.endCol);
}
else {
String retVal = dmlLines[loc.beginLine-1].substring(loc.beginCol-1);
for(int i = loc.beginLine+1; i < loc.endLine; i++) {
retVal += "\\n" + dmlLines[i-1];
}
retVal += "\\n" + dmlLines[loc.endLine-1].substring(0, loc.endCol);
return retVal;
}
}
catch(Exception e) {
return null; // "[[" + loc.beginLine + "," + loc.endLine + "," + loc.beginCol + "," + loc.endCol + "]]";
}
}
public Seq<Node> getStageDAGs(int stageIDs) {
if(_sparkListener == null || _sparkListener.stageDAGs == null)
return null;
else
return _sparkListener.stageDAGs.get(stageIDs);
}
public Long getStageExecutionTime(int stageID) {
if(_sparkListener == null || _sparkListener.stageDAGs == null)
return null;
else
return _sparkListener.stageExecutionTime.get(stageID);
}
public Seq<Node> getJobDAGs(int jobID) {
if(_sparkListener == null || _sparkListener.jobDAGs == null)
return null;
else
return _sparkListener.jobDAGs.get(jobID);
}
public Seq<Node> getStageTimeLine(int stageIDs) {
if(_sparkListener == null || _sparkListener.stageTimeline == null)
return null;
else
return _sparkListener.stageTimeline.get(stageIDs);
}
public void setLineageInfo(Instruction inst, String plan) {
lineageInfo.put(getInstructionString(inst), plan);
}
public void setStageId(Instruction inst, int stageId) {
stageIDs.put(getInstructionString(inst), stageId);
}
public void setJobId(Instruction inst, int jobId) {
jobIDs.put(getInstructionString(inst), jobId);
}
public void setInstructionLocation(Location loc, Instruction inst) {
String instStr = getInstructionString(inst);
instructions.put(loc, instStr);
instructionCreationTime.put(instStr, System.currentTimeMillis());
}
private String getInstructionString(Instruction inst) {
String tmp = inst.toString();
tmp = tmp.replaceAll(Lop.OPERAND_DELIMITOR, " ");
tmp = tmp.replaceAll(Lop.DATATYPE_PREFIX, ".");
tmp = tmp.replaceAll(Lop.INSTRUCTION_DELIMITOR, ", ");
return tmp;
}
}