| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.sysds.utils; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.List; |
| import java.util.Map.Entry; |
| import java.util.Stack; |
| |
| import org.apache.commons.lang3.mutable.MutableInt; |
| import org.apache.sysds.hops.Hop; |
| import org.apache.sysds.hops.LiteralOp; |
| import org.apache.sysds.hops.OptimizerUtils; |
| import org.apache.sysds.hops.codegen.cplan.CNode; |
| import org.apache.sysds.hops.codegen.cplan.CNodeMultiAgg; |
| import org.apache.sysds.hops.codegen.cplan.CNodeTpl; |
| import org.apache.sysds.hops.ipa.FunctionCallGraph; |
| import org.apache.sysds.lops.Lop; |
| import org.apache.sysds.parser.DMLProgram; |
| import org.apache.sysds.parser.ForStatement; |
| import org.apache.sysds.parser.ForStatementBlock; |
| import org.apache.sysds.parser.FunctionStatement; |
| import org.apache.sysds.parser.FunctionStatementBlock; |
| import org.apache.sysds.parser.IfStatement; |
| import org.apache.sysds.parser.IfStatementBlock; |
| import org.apache.sysds.parser.ParForStatementBlock; |
| import org.apache.sysds.parser.StatementBlock; |
| import org.apache.sysds.parser.WhileStatement; |
| import org.apache.sysds.parser.WhileStatementBlock; |
| import org.apache.sysds.runtime.controlprogram.BasicProgramBlock; |
| import org.apache.sysds.runtime.controlprogram.ForProgramBlock; |
| import org.apache.sysds.runtime.controlprogram.FunctionProgramBlock; |
| import org.apache.sysds.runtime.controlprogram.IfProgramBlock; |
| import org.apache.sysds.runtime.controlprogram.ParForProgramBlock; |
| import org.apache.sysds.runtime.controlprogram.Program; |
| import org.apache.sysds.runtime.controlprogram.ProgramBlock; |
| import org.apache.sysds.runtime.controlprogram.WhileProgramBlock; |
| import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext; |
| import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; |
| import org.apache.sysds.runtime.instructions.Instruction; |
| import org.apache.sysds.runtime.instructions.cp.CPInstruction; |
| import org.apache.sysds.runtime.instructions.fed.FEDInstruction; |
| import org.apache.sysds.runtime.instructions.gpu.GPUInstruction; |
| import org.apache.sysds.runtime.instructions.spark.CSVReblockSPInstruction; |
| import org.apache.sysds.runtime.instructions.spark.CheckpointSPInstruction; |
| import org.apache.sysds.runtime.instructions.spark.ReblockSPInstruction; |
| import org.apache.sysds.runtime.instructions.spark.SPInstruction; |
| import org.apache.sysds.runtime.lineage.LineageItem; |
| import org.apache.sysds.runtime.lineage.LineageItemUtils; |
| |
| public class Explain |
| { |
| //internal configuration parameters |
| private static final boolean REPLACE_SPECIAL_CHARACTERS = true; |
| private static final boolean SHOW_MEM_ABOVE_BUDGET = true; |
| private static final boolean SHOW_LITERAL_HOPS = false; |
| private static final boolean SHOW_DATA_DEPENDENCIES = true; |
| private static final boolean SHOW_DATA_FLOW_PROPERTIES = true; |
| |
| //different explain levels |
| public enum ExplainType { |
| NONE, // explain disabled |
| HOPS, // explain program and hops |
| RUNTIME, // explain runtime program (default) |
| RECOMPILE_HOPS, // explain hops, incl recompile |
| RECOMPILE_RUNTIME; // explain runtime program, incl recompile |
| |
| public boolean isHopsType(boolean recompile) { |
| return (this==RECOMPILE_HOPS || (!recompile && this==HOPS)); |
| } |
| public boolean isRuntimeType(boolean recompile) { |
| return (this==RECOMPILE_RUNTIME || (!recompile && this==RUNTIME)); |
| } |
| } |
| |
| public static class ExplainCounts { |
| public int numCPInst = 0; |
| public int numJobs = 0; |
| public int numReblocks = 0; |
| public int numChkpts = 0; |
| } |
| |
| ////////////// |
| // public explain interface |
| |
| public static String display(DMLProgram prog, Program rtprog, ExplainType type, ExplainCounts counts) { |
| if( counts == null ) |
| counts = countDistributedOperations(rtprog); |
| |
| //explain plan of program (hops or runtime) |
| return "# EXPLAIN ("+type.name()+"):\n" |
| + Explain.explainMemoryBudget(counts)+"\n" |
| + Explain.explainDegreeOfParallelism(counts) |
| + Explain.explain(prog, rtprog, type, counts); |
| } |
| |
| public static String explainMemoryBudget() { |
| return explainMemoryBudget(new ExplainCounts()); |
| } |
| |
| public static String explainMemoryBudget(ExplainCounts counts) { |
| StringBuilder sb = new StringBuilder(); |
| sb.append( "# Memory Budget local/remote = " ); |
| sb.append( OptimizerUtils.toMB(OptimizerUtils.getLocalMemBudget()) ); |
| sb.append( "MB/" ); |
| |
| if( OptimizerUtils.isSparkExecutionMode() ) { |
| //avoid unnecessary lazy spark context creation on access to memory configurations |
| if( counts.numJobs-counts.numReblocks-counts.numChkpts <= 0 |
| || !SparkExecutionContext.isSparkContextCreated() ) { |
| sb.append( "?MB/?MB/?MB" ); |
| } |
| else { //default |
| sb.append( OptimizerUtils.toMB(SparkExecutionContext.getDataMemoryBudget(true, false)) ); |
| sb.append( "MB/" ); |
| sb.append( OptimizerUtils.toMB(SparkExecutionContext.getDataMemoryBudget(false, false)) ); |
| sb.append( "MB/" ); |
| sb.append( OptimizerUtils.toMB(SparkExecutionContext.getBroadcastMemoryBudget()) ); |
| sb.append( "MB" ); |
| } |
| } |
| else { |
| sb.append( "?MB/?MB" ); |
| } |
| |
| return sb.toString(); |
| } |
| |
| public static String explainDegreeOfParallelism() { |
| return explainDegreeOfParallelism(new ExplainCounts()); |
| } |
| |
| public static String explainDegreeOfParallelism(ExplainCounts counts) { |
| int lk = InfrastructureAnalyzer.getLocalParallelism(); |
| StringBuilder sb = new StringBuilder(); |
| sb.append( "# Degree of Parallelism (vcores) local/remote = " ); |
| sb.append( lk ); |
| sb.append( "/" ); |
| |
| if( OptimizerUtils.isSparkExecutionMode() ) { |
| if( counts.numJobs-counts.numReblocks-counts.numChkpts <= 0 |
| || !SparkExecutionContext.isSparkContextCreated() ) { |
| //avoid unnecessary lazy spark context creation on access to memory configurations |
| sb.append( "?" ); |
| } |
| else { //default |
| sb.append( SparkExecutionContext.getDefaultParallelism(false) ); |
| } |
| } |
| |
| return sb.toString(); |
| } |
| |
| public static String explain(DMLProgram prog, Program rtprog, ExplainType type) { |
| return explain(prog, rtprog, type, null); |
| } |
| |
| public static String explain(DMLProgram prog, Program rtprog, ExplainType type, ExplainCounts counts) { |
| //dispatch to individual explain utils |
| switch( type ) { |
| //explain hops with stats |
| case HOPS: |
| case RECOMPILE_HOPS: |
| return explain(prog); |
| //explain runtime program |
| case RUNTIME: |
| case RECOMPILE_RUNTIME: |
| return explain(rtprog, counts); |
| case NONE: |
| //do nothing |
| } |
| |
| return null; |
| } |
| |
| public static String explain(DMLProgram prog) |
| { |
| StringBuilder sb = new StringBuilder(); |
| |
| //create header |
| sb.append("\nPROGRAM\n"); |
| |
| // Explain functions (if exists) |
| if( prog.hasFunctionStatementBlocks() ) { |
| sb.append("--FUNCTIONS\n"); |
| |
| //show function call graph |
| sb.append("----FUNCTION CALL GRAPH\n"); |
| sb.append("------MAIN PROGRAM\n"); |
| FunctionCallGraph fgraph = new FunctionCallGraph(prog); |
| sb.append(explainFunctionCallGraph(fgraph, new HashSet<String>(), null, 3)); |
| |
| //show individual functions |
| for (String namespace : prog.getNamespaces().keySet()) { |
| for (String fname : prog.getFunctionStatementBlocks(namespace).keySet()) { |
| FunctionStatementBlock fsb = prog.getFunctionStatementBlock(namespace, fname); |
| FunctionStatement fstmt = (FunctionStatement) fsb.getStatement(0); |
| String fkey = DMLProgram.constructFunctionKey(namespace, fname); |
| sb.append("----FUNCTION " + fkey + " [recompile="+fsb.isRecompileOnce()+"]\n"); |
| for (StatementBlock current : fstmt.getBody()) |
| sb.append(explainStatementBlock(current, 3)); |
| } |
| } |
| } |
| |
| // Explain main program |
| sb.append("--MAIN PROGRAM\n"); |
| for( StatementBlock sblk : prog.getStatementBlocks() ) |
| sb.append(explainStatementBlock(sblk, 2)); |
| |
| return sb.toString(); |
| } |
| |
| public static String explain( Program rtprog ) { |
| return explain(rtprog, null); |
| } |
| |
| public static String explain( Program rtprog, ExplainCounts counts ) |
| { |
| //counts number of instructions |
| boolean sparkExec = OptimizerUtils.isSparkExecutionMode(); |
| if( counts == null ) { |
| counts = new ExplainCounts(); |
| countCompiledInstructions(rtprog, counts, true, sparkExec); |
| } |
| |
| StringBuilder sb = new StringBuilder(); |
| |
| //create header |
| sb.append("\nPROGRAM ( size CP/"+(sparkExec?"SP":"MR")+" = "); |
| sb.append(counts.numCPInst); |
| sb.append("/"); |
| sb.append(counts.numJobs); |
| sb.append(" )\n"); |
| |
| //explain functions (if exists) |
| Map<String, FunctionProgramBlock> funcMap = rtprog.getFunctionProgramBlocks(); |
| if( funcMap != null && !funcMap.isEmpty() ) |
| { |
| sb.append("--FUNCTIONS\n"); |
| |
| //show function call graph |
| if( !rtprog.getProgramBlocks().isEmpty() && |
| rtprog.getProgramBlocks().get(0).getStatementBlock() != null ) |
| { |
| sb.append("----FUNCTION CALL GRAPH\n"); |
| sb.append("------MAIN PROGRAM\n"); |
| DMLProgram prog = rtprog.getProgramBlocks().get(0).getStatementBlock().getDMLProg(); |
| FunctionCallGraph fgraph = new FunctionCallGraph(prog); |
| sb.append(explainFunctionCallGraph(fgraph, new HashSet<String>(), null, 3)); |
| } |
| |
| //show individual functions |
| for( Entry<String, FunctionProgramBlock> e : funcMap.entrySet() ) { |
| String fkey = e.getKey(); |
| FunctionProgramBlock fpb = e.getValue(); |
| //explain optimized function |
| sb.append("----FUNCTION "+fkey+" [recompile="+fpb.isRecompileOnce()+"]\n"); |
| for( ProgramBlock pb : fpb.getChildBlocks() ) |
| sb.append( explainProgramBlock(pb,3) ); |
| //explain unoptimized function |
| if( rtprog.containsFunctionProgramBlock(fkey, false) ) { |
| FunctionProgramBlock fpb2 = rtprog.getFunctionProgramBlock(fkey, false); |
| sb.append("----FUNCTION "+fkey+" (unoptimized) [recompile="+fpb2.isRecompileOnce()+"]\n"); |
| for( ProgramBlock pb : fpb2.getChildBlocks() ) |
| sb.append( explainProgramBlock(pb,3) ); |
| } |
| } |
| } |
| |
| //explain main program |
| sb.append("--MAIN PROGRAM\n"); |
| for( ProgramBlock pb : rtprog.getProgramBlocks() ) |
| sb.append( explainProgramBlock(pb,2) ); |
| |
| return sb.toString(); |
| } |
| |
| public static String explain( ProgramBlock pb ) { |
| return explainProgramBlock(pb, 0); |
| } |
| |
| public static String explain( ArrayList<Instruction> inst ) { |
| return explainInstructions(inst, 0); |
| } |
| |
| public static String explain( ArrayList<Instruction> inst, int level ) { |
| return explainInstructions(inst, level); |
| } |
| |
| public static String explain( Instruction inst ) { |
| return explainGenericInstruction(inst, 0); |
| } |
| |
| public static String explain( StatementBlock sb ) { |
| return explainStatementBlock(sb, 0); |
| } |
| |
| public static String explainHops( ArrayList<Hop> hops ) { |
| return explainHops(hops, 0); |
| } |
| |
| public static String explainHops( ArrayList<Hop> hops, int level ) { |
| StringBuilder sb = new StringBuilder(); |
| Hop.resetVisitStatus(hops); |
| for( Hop hop : hops ) |
| sb.append(explainHop(hop, level)); |
| Hop.resetVisitStatus(hops); |
| return sb.toString(); |
| } |
| |
| public static String explain( Hop hop ) { |
| return explain(hop, 0); |
| } |
| |
| public static String explain( Hop hop, int level ) { |
| hop.resetVisitStatus(); |
| String ret = explainHop(hop, level); |
| hop.resetVisitStatus(); |
| return ret; |
| } |
| |
| public static String explainLineageItems( LineageItem[] lis ) { |
| return explainLineageItems(lis, 0); |
| } |
| |
| public static String explainLineageItems( LineageItem[] lis, int level ) { |
| StringBuilder sb = new StringBuilder(); |
| LineageItem.resetVisitStatusNR(lis); |
| for( LineageItem li : lis ) |
| sb.append(explainLineageItemNR(li, level)); |
| LineageItem.resetVisitStatusNR(lis); |
| return sb.toString(); |
| } |
| |
| public static String explain( LineageItem li ) { |
| li.resetVisitStatusNR(); |
| String s = explain(li, 0); |
| s += rExplainDedupItems(li, new ArrayList<>()); |
| li.resetVisitStatusNR(); |
| return s; |
| } |
| |
| private static String explain( LineageItem li, int level ) { |
| li.resetVisitStatusNR(); |
| String ret = explainLineageItemNR(li, level); |
| li.resetVisitStatusNR(); |
| return ret; |
| } |
| |
| private static String rExplainDedupItems(LineageItem li, List<String> paths) { |
| if (li.isVisited()) |
| return ""; |
| StringBuilder sb = new StringBuilder(); |
| |
| if (li.getType() == LineageItem.LineageItemType.Dedup && !paths.contains(li.getData())) { |
| sb.append("\n").append("dedup").append(li.getData()).append(":\n"); |
| sb.append(Explain.explain(li, 0)); |
| paths.add(li.getData()); |
| } |
| |
| if (li.getInputs() != null) |
| for (LineageItem in : li.getInputs()) |
| sb.append(rExplainDedupItems(in, paths)); |
| |
| li.setVisited(); |
| return sb.toString(); |
| } |
| |
| |
| public static String explainCPlan( CNodeTpl cplan ) { |
| StringBuilder sb = new StringBuilder(); |
| |
| //create template header |
| sb.append("\n----------------------------------------\n"); |
| sb.append("CPLAN: "+cplan.getTemplateInfo()+"\n"); |
| sb.append("--inputs: "+Arrays.toString(cplan.getInputNames())+"\n"); |
| sb.append("----------------------------------------\n"); |
| |
| //explain body dag |
| cplan.resetVisitStatusOutputs(); |
| if( cplan instanceof CNodeMultiAgg ) |
| for( CNode output : ((CNodeMultiAgg)cplan).getOutputs() ) |
| sb.append(explainCNode(output, 1)); |
| else |
| sb.append(explainCNode(cplan.getOutput(), 1)); |
| cplan.resetVisitStatusOutputs(); |
| sb.append("----------------------------------------\n"); |
| |
| return sb.toString(); |
| } |
| |
| public static String explain( CNode node ) { |
| return explain(node, 0); |
| } |
| |
| public static String explain( CNode node, int level ) { |
| return explainCNode(node, level); |
| } |
| |
| /** |
| * Counts the number of compiled MRJob/Spark instructions in the |
| * given runtime program. |
| * |
| * @param rtprog runtime program |
| * @return counts |
| */ |
| public static ExplainCounts countDistributedOperations( Program rtprog ) { |
| ExplainCounts counts = new ExplainCounts(); |
| Explain.countCompiledInstructions(rtprog, counts, true, true); |
| return counts; |
| } |
| |
| public static String getIdentation( int level ) { |
| return createOffset(level); |
| } |
| |
| ////////////// |
| // internal explain HOPS |
| |
| private static String explainStatementBlock(StatementBlock sb, int level) |
| { |
| StringBuilder builder = new StringBuilder(); |
| String offset = createOffset(level); |
| |
| if (sb instanceof WhileStatementBlock) { |
| WhileStatementBlock wsb = (WhileStatementBlock) sb; |
| builder.append(offset); |
| if( !wsb.getUpdateInPlaceVars().isEmpty() ) |
| builder.append("WHILE (lines "+wsb.getBeginLine()+"-"+wsb.getEndLine()+") [in-place="+wsb.getUpdateInPlaceVars().toString()+"]\n"); |
| else |
| builder.append("WHILE (lines "+wsb.getBeginLine()+"-"+wsb.getEndLine()+")\n"); |
| builder.append(explainHop(wsb.getPredicateHops(), level+1)); |
| |
| WhileStatement ws = (WhileStatement)sb.getStatement(0); |
| for (StatementBlock current : ws.getBody()) |
| builder.append(explainStatementBlock(current, level+1)); |
| |
| } |
| else if (sb instanceof IfStatementBlock) { |
| IfStatementBlock ifsb = (IfStatementBlock) sb; |
| builder.append(offset); |
| builder.append("IF (lines "+ifsb.getBeginLine()+"-"+ifsb.getEndLine()+")\n"); |
| builder.append(explainHop(ifsb.getPredicateHops(), level+1)); |
| |
| IfStatement ifs = (IfStatement) sb.getStatement(0); |
| for (StatementBlock current : ifs.getIfBody()) |
| builder.append(explainStatementBlock(current, level+1)); |
| if( !ifs.getElseBody().isEmpty() ) { |
| builder.append(offset); |
| builder.append("ELSE\n"); |
| } |
| for (StatementBlock current : ifs.getElseBody()) |
| builder.append(explainStatementBlock(current, level+1)); |
| |
| } |
| else if (sb instanceof ForStatementBlock) { |
| ForStatementBlock fsb = (ForStatementBlock) sb; |
| builder.append(offset); |
| if (sb instanceof ParForStatementBlock) { |
| if( !fsb.getUpdateInPlaceVars().isEmpty() ) |
| builder.append("PARFOR (lines "+fsb.getBeginLine()+"-"+fsb.getEndLine()+") [in-place="+fsb.getUpdateInPlaceVars().toString()+"]\n"); |
| else |
| builder.append("PARFOR (lines "+fsb.getBeginLine()+"-"+fsb.getEndLine()+")\n"); |
| } |
| else { |
| if( !fsb.getUpdateInPlaceVars().isEmpty() ) |
| builder.append("FOR (lines "+fsb.getBeginLine()+"-"+fsb.getEndLine()+") [in-place="+fsb.getUpdateInPlaceVars().toString()+"]\n"); |
| else |
| builder.append("FOR (lines "+fsb.getBeginLine()+"-"+fsb.getEndLine()+")\n"); |
| } |
| if (fsb.getFromHops() != null) |
| builder.append(explainHop(fsb.getFromHops(), level+1)); |
| if (fsb.getToHops() != null) |
| builder.append(explainHop(fsb.getToHops(), level+1)); |
| if (fsb.getIncrementHops() != null) |
| builder.append(explainHop(fsb.getIncrementHops(), level+1)); |
| |
| ForStatement fs = (ForStatement)sb.getStatement(0); |
| for (StatementBlock current : fs.getBody()) |
| builder.append(explainStatementBlock(current, level+1)); |
| |
| } |
| else if (sb instanceof FunctionStatementBlock) { |
| FunctionStatement fsb = (FunctionStatement) sb.getStatement(0); |
| for (StatementBlock current : fsb.getBody()) |
| builder.append(explainStatementBlock(current, level+1)); |
| |
| } |
| else { |
| // For generic StatementBlock |
| builder.append(offset); |
| builder.append("GENERIC (lines "+sb.getBeginLine()+"-"+sb.getEndLine()+") [recompile=" + sb.requiresRecompilation() + "]\n"); |
| ArrayList<Hop> hopsDAG = sb.getHops(); |
| if( hopsDAG != null && !hopsDAG.isEmpty() ) { |
| Hop.resetVisitStatus(hopsDAG); |
| for (Hop hop : hopsDAG) |
| builder.append(explainHop(hop, level+1)); |
| Hop.resetVisitStatus(hopsDAG); |
| } |
| } |
| |
| return builder.toString(); |
| } |
| |
| /** |
| * Do a post-order traverse through the Hop DAG and explain each Hop |
| * |
| * @param hop high-level operator |
| * @param level offset |
| * @return string explanation of Hop DAG |
| */ |
| private static String explainHop(Hop hop, int level) { |
| if( hop.isVisited() || (!SHOW_LITERAL_HOPS && hop instanceof LiteralOp) ) |
| return ""; |
| |
| StringBuilder sb = new StringBuilder(); |
| String offset = createOffset(level); |
| |
| for( Hop input : hop.getInput() ) |
| sb.append(explainHop(input, level)); |
| |
| //indentation |
| sb.append(offset); |
| |
| //hop id |
| if( SHOW_DATA_DEPENDENCIES ) |
| sb.append("("+hop.getHopID()+") "); |
| |
| //operation string |
| sb.append(hop.getOpString()); |
| |
| //input hop references |
| if( SHOW_DATA_DEPENDENCIES ) { |
| StringBuilder childs = new StringBuilder(); |
| childs.append(" ("); |
| boolean childAdded = false; |
| for( Hop input : hop.getInput() ) |
| if( SHOW_LITERAL_HOPS || !(input instanceof LiteralOp) ){ |
| childs.append(childAdded?",":""); |
| childs.append(input.getHopID()); |
| childAdded = true; |
| } |
| childs.append(")"); |
| if( childAdded ) |
| sb.append(childs.toString()); |
| } |
| |
| //matrix characteristics |
| sb.append(" [" + hop.getDim1() + "," |
| + hop.getDim2() + "," |
| + hop.getBlocksize() + "," |
| + hop.getNnz()); |
| |
| if (hop.getUpdateType().isInPlace()) |
| sb.append("," + hop.getUpdateType().toString().toLowerCase()); |
| |
| sb.append("]"); |
| |
| //memory estimates |
| sb.append(" [" + showMem(hop.getInputMemEstimate(), false) + "," |
| + showMem(hop.getIntermediateMemEstimate(), false) + "," |
| + showMem(hop.getOutputMemEstimate(), false) + " -> " |
| + showMem(hop.getMemEstimate(), true) + "]"); |
| |
| //data flow properties |
| if( SHOW_DATA_FLOW_PROPERTIES ) { |
| if( hop.requiresReblock() && hop.requiresCheckpoint() ) |
| sb.append(" [rblk,chkpt]"); |
| else if( hop.requiresReblock() ) |
| sb.append(" [rblk]"); |
| else if( hop.requiresCheckpoint() ) |
| sb.append(" [chkpt]"); |
| } |
| |
| //exec type |
| if (hop.getExecType() != null) |
| sb.append(", " + hop.getExecType()); |
| |
| sb.append('\n'); |
| |
| hop.setVisited(); |
| |
| return sb.toString(); |
| } |
| |
| private static String explainLineageItemNR(LineageItem item, int level) { |
| //NOTE: in contrast to similar non-recursive functions like resetVisitStatusNR, |
| // we maintain a more complex stack to ensure DFS ordering where current nodes |
| // are added after the subtree underneath is processed (backwards compatibility) |
| Stack<LineageItem> stackItem = new Stack<>(); |
| Stack<MutableInt> stackPos = new Stack<>(); |
| stackItem.push(item); stackPos.push(new MutableInt(0)); |
| StringBuilder sb = new StringBuilder(); |
| while( !stackItem.empty() ) { |
| LineageItem tmpItem = stackItem.peek(); |
| MutableInt tmpPos = stackPos.peek(); |
| //check ascent condition - no item processing |
| if( tmpItem.isVisited() ) { |
| stackItem.pop(); stackPos.pop(); |
| } |
| //check ascent condition - append item |
| else if( tmpItem.getInputs() == null |
| || tmpItem.getOpcode().startsWith(LineageItemUtils.LPLACEHOLDER) |
| // don't trace beyond if a placeholder is found |
| || tmpItem.getInputs().length <= tmpPos.intValue() ) { |
| sb.append(createOffset(level)); |
| sb.append(tmpItem.toString()); |
| sb.append('\n'); |
| stackItem.pop(); stackPos.pop(); |
| tmpItem.setVisited(); |
| } |
| //check descent condition |
| else if( tmpItem.getInputs() != null ) { |
| stackItem.push(tmpItem.getInputs()[tmpPos.intValue()]); |
| tmpPos.increment(); |
| stackPos.push(new MutableInt(0)); |
| } |
| } |
| return sb.toString(); |
| } |
| |
| @Deprecated |
| @SuppressWarnings("unused") |
| private static String explainLineageItem(LineageItem li, int level) { |
| if( li.isVisited()) |
| return ""; |
| |
| StringBuilder sb = new StringBuilder(); |
| String offset = createOffset(level); |
| |
| if (li.getInputs() != null) |
| for( LineageItem input : li.getInputs() ) |
| sb.append(explainLineageItem(input, level)); |
| |
| sb.append(offset); |
| sb.append(li.toString()); |
| sb.append('\n'); |
| |
| li.setVisited(); |
| |
| return sb.toString(); |
| } |
| |
| ////////////// |
| // internal explain CNODE |
| |
| private static String explainCNode(CNode cnode, int level) { |
| if( cnode.isVisited() ) |
| return ""; |
| |
| StringBuilder sb = new StringBuilder(); |
| String offset = createOffset(level); |
| |
| for( CNode input : cnode.getInput() ) |
| sb.append(explainCNode(input, level)); |
| |
| //indentation |
| sb.append(offset); |
| |
| //hop id |
| if( SHOW_DATA_DEPENDENCIES ) |
| sb.append("("+cnode.getID()+") "); |
| |
| //operation string |
| sb.append(cnode.toString()); |
| |
| //input hop references |
| if( SHOW_DATA_DEPENDENCIES ) { |
| StringBuilder childs = new StringBuilder(); |
| childs.append(" ("); |
| boolean childAdded = false; |
| for( CNode input : cnode.getInput() ) { |
| childs.append(childAdded?",":""); |
| childs.append(input.getID()); |
| childAdded = true; |
| } |
| childs.append(")"); |
| if( childAdded ) |
| sb.append(childs.toString()); |
| } |
| |
| sb.append('\n'); |
| cnode.setVisited(); |
| |
| return sb.toString(); |
| } |
| |
| ////////////// |
| // internal explain RUNTIME |
| |
| private static String explainProgramBlock( ProgramBlock pb, int level ) |
| { |
| StringBuilder sb = new StringBuilder(); |
| String offset = createOffset(level); |
| |
| if (pb instanceof FunctionProgramBlock ) { |
| FunctionProgramBlock fpb = (FunctionProgramBlock)pb; |
| for( ProgramBlock pbc : fpb.getChildBlocks() ) |
| sb.append( explainProgramBlock( pbc, level+1) ); |
| } |
| else if (pb instanceof WhileProgramBlock) { |
| WhileProgramBlock wpb = (WhileProgramBlock) pb; |
| StatementBlock wsb = pb.getStatementBlock(); |
| sb.append(offset); |
| if( wsb != null && !wsb.getUpdateInPlaceVars().isEmpty() ) |
| sb.append("WHILE (lines "+wpb.getBeginLine()+"-"+wpb.getEndLine()+") [in-place="+wsb.getUpdateInPlaceVars().toString()+"]\n"); |
| else |
| sb.append("WHILE (lines "+wpb.getBeginLine()+"-"+wpb.getEndLine()+")\n"); |
| sb.append(explainInstructions(wpb.getPredicate(), level+1)); |
| for( ProgramBlock pbc : wpb.getChildBlocks() ) |
| sb.append( explainProgramBlock( pbc, level+1) ); |
| if( wpb.getExitInstruction() != null ) |
| sb.append(explainInstructions(wpb.getExitInstruction(), level+1)); |
| } |
| else if (pb instanceof IfProgramBlock) { |
| IfProgramBlock ipb = (IfProgramBlock) pb; |
| sb.append(offset); |
| sb.append("IF (lines "+ipb.getBeginLine()+"-"+ipb.getEndLine()+")\n"); |
| sb.append(explainInstructions(ipb.getPredicate(), level+1)); |
| for( ProgramBlock pbc : ipb.getChildBlocksIfBody() ) |
| sb.append( explainProgramBlock( pbc, level+1) ); |
| if( !ipb.getChildBlocksElseBody().isEmpty() ) { |
| sb.append(offset); |
| sb.append("ELSE\n"); |
| for( ProgramBlock pbc : ipb.getChildBlocksElseBody() ) |
| sb.append( explainProgramBlock( pbc, level+1) ); |
| } |
| if( ipb.getExitInstruction() != null ) |
| sb.append(explainInstructions(ipb.getExitInstruction(), level+1)); |
| } |
| else if (pb instanceof ForProgramBlock) { //incl parfor |
| ForProgramBlock fpb = (ForProgramBlock) pb; |
| StatementBlock fsb = pb.getStatementBlock(); |
| sb.append(offset); |
| if( pb instanceof ParForProgramBlock ) |
| sb.append("PARFOR (lines "+fpb.getBeginLine()+"-"+fpb.getEndLine()+")\n"); |
| else { |
| if( fsb != null && !fsb.getUpdateInPlaceVars().isEmpty() ) |
| sb.append("FOR (lines "+fpb.getBeginLine()+"-"+fpb.getEndLine()+") [in-place="+fsb.getUpdateInPlaceVars().toString()+"]\n"); |
| else |
| sb.append("FOR (lines "+fpb.getBeginLine()+"-"+fpb.getEndLine()+")\n"); |
| } |
| sb.append(explainInstructions(fpb.getFromInstructions(), level+1)); |
| sb.append(explainInstructions(fpb.getToInstructions(), level+1)); |
| sb.append(explainInstructions(fpb.getIncrementInstructions(), level+1)); |
| for( ProgramBlock pbc : fpb.getChildBlocks() ) |
| sb.append( explainProgramBlock( pbc, level+1) ); |
| if( fpb.getExitInstruction() != null ) |
| sb.append(explainInstructions(fpb.getExitInstruction(), level+1)); |
| } |
| else if( pb instanceof BasicProgramBlock ) { |
| BasicProgramBlock bpb = (BasicProgramBlock) pb; |
| sb.append(offset); |
| if( pb.getStatementBlock()!=null ) |
| sb.append("GENERIC (lines "+pb.getBeginLine()+"-"+pb.getEndLine()+") [recompile="+pb.getStatementBlock().requiresRecompilation()+"]\n"); |
| else |
| sb.append("GENERIC (lines "+pb.getBeginLine()+"-"+pb.getEndLine()+") \n"); |
| sb.append(explainInstructions(bpb.getInstructions(), level+1)); |
| } |
| |
| return sb.toString(); |
| } |
| |
| private static String explainInstructions( ArrayList<Instruction> instSet, int level ) { |
| StringBuilder sb = new StringBuilder(); |
| String offsetInst = createOffset(level); |
| for( Instruction inst : instSet ) { |
| String tmp = explainGenericInstruction(inst, level); |
| sb.append( offsetInst ); |
| sb.append( tmp ); |
| sb.append( '\n' ); |
| } |
| |
| return sb.toString(); |
| } |
| |
| private static String explainInstructions( Instruction inst, int level ) { |
| StringBuilder sb = new StringBuilder(); |
| sb.append( createOffset(level) ); |
| sb.append( explainGenericInstruction(inst, level) ); |
| sb.append( '\n' ); |
| return sb.toString(); |
| } |
| |
| private static String explainGenericInstruction( Instruction inst, int level ) |
| { |
| String tmp = null; |
| if ( inst instanceof SPInstruction || inst instanceof CPInstruction || inst instanceof GPUInstruction || |
| inst instanceof FEDInstruction ) |
| tmp = inst.toString(); |
| |
| if( REPLACE_SPECIAL_CHARACTERS ){ |
| tmp = tmp.replaceAll(Lop.OPERAND_DELIMITOR, " "); |
| tmp = tmp.replaceAll(Lop.DATATYPE_PREFIX, "."); |
| tmp = tmp.replaceAll(Lop.INSTRUCTION_DELIMITOR, ", "); |
| } |
| |
| return tmp; |
| } |
| |
| @SuppressWarnings("unused") |
| private static String showMem(double mem, boolean units) |
| { |
| if( !SHOW_MEM_ABOVE_BUDGET && mem >= OptimizerUtils.DEFAULT_SIZE ) |
| return "MAX"; |
| return OptimizerUtils.toMB(mem) + (units?"MB":""); |
| } |
| |
| private static String createOffset( int level ) |
| { |
| StringBuilder sb = new StringBuilder(); |
| for( int i=0; i<level; i++ ) |
| sb.append("--"); |
| return sb.toString(); |
| } |
| |
| private static void countCompiledInstructions( Program rtprog, ExplainCounts counts, boolean CP, boolean SP ) |
| { |
| //analyze DML-bodied functions |
| for( FunctionProgramBlock fpb : rtprog.getFunctionProgramBlocks().values() ) |
| countCompiledInstructions( fpb, counts, CP, SP ); |
| |
| //analyze main program |
| for( ProgramBlock pb : rtprog.getProgramBlocks() ) |
| countCompiledInstructions( pb, counts, CP, SP ); |
| } |
| |
| /** |
| * Recursively counts the number of compiled MRJob instructions in the |
| * given runtime program block. |
| * |
| * @param pb program block |
| * @param counts explain countst |
| * @param CP if true, count CP instructions |
| * @param SP if true, count Spark instructions |
| */ |
| private static void countCompiledInstructions(ProgramBlock pb, ExplainCounts counts, boolean CP, boolean SP) |
| { |
| if (pb instanceof WhileProgramBlock) { |
| WhileProgramBlock tmp = (WhileProgramBlock)pb; |
| countCompiledInstructions(tmp.getPredicate(), counts, CP, SP); |
| for (ProgramBlock pb2 : tmp.getChildBlocks()) |
| countCompiledInstructions(pb2, counts, CP, SP); |
| } |
| else if (pb instanceof IfProgramBlock) { |
| IfProgramBlock tmp = (IfProgramBlock)pb; |
| countCompiledInstructions(tmp.getPredicate(), counts, CP, SP); |
| for( ProgramBlock pb2 : tmp.getChildBlocksIfBody() ) |
| countCompiledInstructions(pb2, counts, CP, SP); |
| for( ProgramBlock pb2 : tmp.getChildBlocksElseBody() ) |
| countCompiledInstructions(pb2, counts, CP, SP); |
| } |
| else if (pb instanceof ForProgramBlock) { //includes ParFORProgramBlock |
| ForProgramBlock tmp = (ForProgramBlock)pb; |
| countCompiledInstructions(tmp.getFromInstructions(), counts, CP, SP); |
| countCompiledInstructions(tmp.getToInstructions(), counts, CP, SP); |
| countCompiledInstructions(tmp.getIncrementInstructions(), counts, CP, SP); |
| for( ProgramBlock pb2 : tmp.getChildBlocks() ) |
| countCompiledInstructions(pb2, counts, CP, SP); |
| //additional parfor jobs counted during runtime |
| } |
| else if ( pb instanceof FunctionProgramBlock ) { |
| FunctionProgramBlock fpb = (FunctionProgramBlock)pb; |
| for( ProgramBlock pb2 : fpb.getChildBlocks() ) |
| countCompiledInstructions(pb2, counts, CP, SP); |
| } |
| else if( pb instanceof BasicProgramBlock ) { |
| BasicProgramBlock bpb = (BasicProgramBlock) pb; |
| countCompiledInstructions(bpb.getInstructions(), counts, CP, SP); |
| } |
| } |
| |
| /** |
| * Count the number of Hadoop instructions, CP instructions, Spark |
| * instructions, and/or Spark reblock instructions in a list of |
| * instructions. |
| * |
| * @param instSet |
| * list of instructions |
| * @param counts |
| * explain counts |
| * @param CP |
| * if true, count CP instructions |
| * @param SP |
| * if true, count Spark instructions and Spark reblock |
| * instructions |
| */ |
| private static void countCompiledInstructions( ArrayList<Instruction> instSet, ExplainCounts counts, boolean CP, boolean SP ) |
| { |
| for( Instruction inst : instSet ) |
| { |
| if( CP && inst instanceof CPInstruction ) |
| counts.numCPInst++; |
| else if( SP && inst instanceof SPInstruction ) |
| counts.numJobs++; |
| |
| //keep track of reblocks (in order to prevent unnecessary spark context creation) |
| if( SP && (inst instanceof CSVReblockSPInstruction || inst instanceof ReblockSPInstruction) ) |
| counts.numReblocks++; |
| if( SP && inst instanceof CheckpointSPInstruction ) |
| counts.numChkpts++; |
| } |
| } |
| |
| private static String explainFunctionCallGraph(FunctionCallGraph fgraph, HashSet<String> fstack, String fkey, int level) |
| { |
| StringBuilder builder = new StringBuilder(); |
| String offset = createOffset(level); |
| Collection<String> cfkeys = fgraph.getCalledFunctions(fkey); |
| if( cfkeys != null ) { |
| for( String cfkey : cfkeys ) { |
| if( fstack.contains(cfkey) && fgraph.isRecursiveFunction(cfkey) ) |
| builder.append(offset + "--" + cfkey + " (recursive)\n"); |
| else { |
| fstack.add(cfkey); |
| builder.append(offset + "--" + cfkey + "\n"); |
| builder.append(explainFunctionCallGraph(fgraph, fstack, cfkey, level+1)); |
| fstack.remove(cfkey); |
| } |
| } |
| } |
| |
| return builder.toString(); |
| } |
| } |