/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.sysds.utils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.List;
import java.util.Map.Entry;
import java.util.Stack;

import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.sysds.hops.Hop;
import org.apache.sysds.hops.LiteralOp;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.hops.codegen.cplan.CNode;
import org.apache.sysds.hops.codegen.cplan.CNodeMultiAgg;
import org.apache.sysds.hops.codegen.cplan.CNodeTpl;
import org.apache.sysds.hops.ipa.FunctionCallGraph;
import org.apache.sysds.lops.Lop;
import org.apache.sysds.parser.DMLProgram;
import org.apache.sysds.parser.ForStatement;
import org.apache.sysds.parser.ForStatementBlock;
import org.apache.sysds.parser.FunctionStatement;
import org.apache.sysds.parser.FunctionStatementBlock;
import org.apache.sysds.parser.IfStatement;
import org.apache.sysds.parser.IfStatementBlock;
import org.apache.sysds.parser.ParForStatementBlock;
import org.apache.sysds.parser.StatementBlock;
import org.apache.sysds.parser.WhileStatement;
import org.apache.sysds.parser.WhileStatementBlock;
import org.apache.sysds.runtime.controlprogram.BasicProgramBlock;
import org.apache.sysds.runtime.controlprogram.ForProgramBlock;
import org.apache.sysds.runtime.controlprogram.FunctionProgramBlock;
import org.apache.sysds.runtime.controlprogram.IfProgramBlock;
import org.apache.sysds.runtime.controlprogram.ParForProgramBlock;
import org.apache.sysds.runtime.controlprogram.Program;
import org.apache.sysds.runtime.controlprogram.ProgramBlock;
import org.apache.sysds.runtime.controlprogram.WhileProgramBlock;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysds.runtime.instructions.Instruction;
import org.apache.sysds.runtime.instructions.cp.CPInstruction;
import org.apache.sysds.runtime.instructions.fed.FEDInstruction;
import org.apache.sysds.runtime.instructions.gpu.GPUInstruction;
import org.apache.sysds.runtime.instructions.spark.CSVReblockSPInstruction;
import org.apache.sysds.runtime.instructions.spark.CheckpointSPInstruction;
import org.apache.sysds.runtime.instructions.spark.ReblockSPInstruction;
import org.apache.sysds.runtime.instructions.spark.SPInstruction;
import org.apache.sysds.runtime.lineage.LineageItem;
import org.apache.sysds.runtime.lineage.LineageItemUtils;

public class Explain
{
	//internal configuration parameters
	private static final boolean REPLACE_SPECIAL_CHARACTERS = true;
	private static final boolean SHOW_MEM_ABOVE_BUDGET      = true;
	private static final boolean SHOW_LITERAL_HOPS          = false;
	private static final boolean SHOW_DATA_DEPENDENCIES     = true;
	private static final boolean SHOW_DATA_FLOW_PROPERTIES  = true;

	//different explain levels
	public enum ExplainType {
		NONE, 	  // explain disabled
		HOPS,     // explain program and hops
		RUNTIME,  // explain runtime program (default)
		RECOMPILE_HOPS, // explain hops, incl recompile
		RECOMPILE_RUNTIME;  // explain runtime program, incl recompile 

		public boolean isHopsType(boolean recompile) {
			return (this==RECOMPILE_HOPS || (!recompile && this==HOPS));
		}
		public boolean isRuntimeType(boolean recompile) {
			return (this==RECOMPILE_RUNTIME || (!recompile && this==RUNTIME));
		}
	}

	public static class ExplainCounts {
		public int numCPInst = 0;
		public int numJobs = 0;
		public int numReblocks = 0;
		public int numChkpts = 0;
	}

	//////////////
	// public explain interface

	public static String display(DMLProgram prog, Program rtprog, ExplainType type, ExplainCounts counts) {
		if( counts == null )
			counts = countDistributedOperations(rtprog);

		//explain plan of program (hops or runtime)
		return "# EXPLAIN ("+type.name()+"):\n"
				+ Explain.explainMemoryBudget(counts)+"\n"
				+ Explain.explainDegreeOfParallelism(counts)
				+ Explain.explain(prog, rtprog, type, counts);
	}

	public static String explainMemoryBudget() {
		return explainMemoryBudget(new ExplainCounts());
	}

	public static String explainMemoryBudget(ExplainCounts counts) {
		StringBuilder sb = new StringBuilder();
		sb.append( "# Memory Budget local/remote = " );
		sb.append( OptimizerUtils.toMB(OptimizerUtils.getLocalMemBudget()) );
		sb.append( "MB/" );

		if( OptimizerUtils.isSparkExecutionMode() ) {
			//avoid unnecessary lazy spark context creation on access to memory configurations
			if( counts.numJobs-counts.numReblocks-counts.numChkpts <= 0
				|| !SparkExecutionContext.isSparkContextCreated() ) {
				sb.append( "?MB/?MB/?MB" );
			}
			else { //default
				sb.append( OptimizerUtils.toMB(SparkExecutionContext.getDataMemoryBudget(true, false)) );
				sb.append( "MB/" );
				sb.append( OptimizerUtils.toMB(SparkExecutionContext.getDataMemoryBudget(false, false)) );
				sb.append( "MB/" );
				sb.append( OptimizerUtils.toMB(SparkExecutionContext.getBroadcastMemoryBudget()) );
				sb.append( "MB" );
			}
		}
		else {
			sb.append( "?MB/?MB" );
		}

		return sb.toString();
	}

	public static String explainDegreeOfParallelism() {
		return explainDegreeOfParallelism(new ExplainCounts());
	}

	public static String explainDegreeOfParallelism(ExplainCounts counts) {
		int lk = InfrastructureAnalyzer.getLocalParallelism();
		StringBuilder sb = new StringBuilder();
		sb.append( "# Degree of Parallelism (vcores) local/remote = " );
		sb.append( lk );
		sb.append( "/" );

		if( OptimizerUtils.isSparkExecutionMode() ) {
			if( counts.numJobs-counts.numReblocks-counts.numChkpts <= 0
				|| !SparkExecutionContext.isSparkContextCreated() ) {
				//avoid unnecessary lazy spark context creation on access to memory configurations
				sb.append( "?" );
			}
			else { //default
				sb.append( SparkExecutionContext.getDefaultParallelism(false) );
			}
		}

		return sb.toString();
	}

	public static String explain(DMLProgram prog, Program rtprog, ExplainType type) {
		return explain(prog, rtprog, type, null);
	}

	public static String explain(DMLProgram prog, Program rtprog, ExplainType type, ExplainCounts counts) {
		//dispatch to individual explain utils
		switch( type ) {
			//explain hops with stats
			case HOPS:
			case RECOMPILE_HOPS:
				return explain(prog);
			//explain runtime program	
			case RUNTIME:
			case RECOMPILE_RUNTIME:
				return explain(rtprog, counts);
			case NONE:
				//do nothing
		}

		return null;
	}

	public static String explain(DMLProgram prog)
	{
		StringBuilder sb = new StringBuilder();

		//create header
		sb.append("\nPROGRAM\n");

		// Explain functions (if exists)
		if( prog.hasFunctionStatementBlocks() ) {
			sb.append("--FUNCTIONS\n");

			//show function call graph
			sb.append("----FUNCTION CALL GRAPH\n");
			sb.append("------MAIN PROGRAM\n");
			FunctionCallGraph fgraph = new FunctionCallGraph(prog);
			sb.append(explainFunctionCallGraph(fgraph, new HashSet<String>(), null, 3));

			//show individual functions
			for (String namespace : prog.getNamespaces().keySet()) {
				for (String fname : prog.getFunctionStatementBlocks(namespace).keySet()) {
					FunctionStatementBlock fsb = prog.getFunctionStatementBlock(namespace, fname);
					FunctionStatement fstmt = (FunctionStatement) fsb.getStatement(0);
					String fkey = DMLProgram.constructFunctionKey(namespace, fname);
					sb.append("----FUNCTION " + fkey + " [recompile="+fsb.isRecompileOnce()+"]\n");
					for (StatementBlock current : fstmt.getBody())
						sb.append(explainStatementBlock(current, 3));
				}
			}
		}

		// Explain main program
		sb.append("--MAIN PROGRAM\n");
		for( StatementBlock sblk : prog.getStatementBlocks() )
			sb.append(explainStatementBlock(sblk, 2));

		return sb.toString();
	}

	public static String explain( Program rtprog ) {
		return explain(rtprog, null);
	}

	public static String explain( Program rtprog, ExplainCounts counts )
	{
		//counts number of instructions
		boolean sparkExec = OptimizerUtils.isSparkExecutionMode();
		if( counts == null ) {
			counts = new ExplainCounts();
			countCompiledInstructions(rtprog, counts, true, sparkExec);
		}

		StringBuilder sb = new StringBuilder();

		//create header
		sb.append("\nPROGRAM ( size CP/"+(sparkExec?"SP":"MR")+" = ");
		sb.append(counts.numCPInst);
		sb.append("/");
		sb.append(counts.numJobs);
		sb.append(" )\n");

		//explain functions (if exists)
		Map<String, FunctionProgramBlock> funcMap = rtprog.getFunctionProgramBlocks();
		if( funcMap != null && !funcMap.isEmpty() )
		{
			sb.append("--FUNCTIONS\n");

			//show function call graph
			if( !rtprog.getProgramBlocks().isEmpty() &&
					rtprog.getProgramBlocks().get(0).getStatementBlock() != null )
			{
				sb.append("----FUNCTION CALL GRAPH\n");
				sb.append("------MAIN PROGRAM\n");
				DMLProgram prog = rtprog.getProgramBlocks().get(0).getStatementBlock().getDMLProg();
				FunctionCallGraph fgraph = new FunctionCallGraph(prog);
				sb.append(explainFunctionCallGraph(fgraph, new HashSet<String>(), null, 3));
			}

			//show individual functions
			for( Entry<String, FunctionProgramBlock> e : funcMap.entrySet() ) {
				String fkey = e.getKey();
				FunctionProgramBlock fpb = e.getValue();
				//explain optimized function
				sb.append("----FUNCTION "+fkey+" [recompile="+fpb.isRecompileOnce()+"]\n");
				for( ProgramBlock pb : fpb.getChildBlocks() )
					sb.append( explainProgramBlock(pb,3) );
				//explain unoptimized function
				if( rtprog.containsFunctionProgramBlock(fkey, false) ) {
					FunctionProgramBlock fpb2 = rtprog.getFunctionProgramBlock(fkey, false);
					sb.append("----FUNCTION "+fkey+" (unoptimized) [recompile="+fpb2.isRecompileOnce()+"]\n");
					for( ProgramBlock pb : fpb2.getChildBlocks() )
						sb.append( explainProgramBlock(pb,3) );
				}
			}
		}

		//explain main program
		sb.append("--MAIN PROGRAM\n");
		for( ProgramBlock pb : rtprog.getProgramBlocks() )
			sb.append( explainProgramBlock(pb,2) );

		return sb.toString();
	}

	public static String explain( ProgramBlock pb ) {
		return explainProgramBlock(pb, 0);
	}

	public static String explain( ArrayList<Instruction> inst ) {
		return explainInstructions(inst, 0);
	}

	public static String explain( ArrayList<Instruction> inst, int level ) {
		return explainInstructions(inst, level);
	}

	public static String explain( Instruction inst ) {
		return explainGenericInstruction(inst, 0);
	}

	public static String explain( StatementBlock sb ) {
		return explainStatementBlock(sb, 0);
	}

	public static String explainHops( ArrayList<Hop> hops ) {
		return explainHops(hops, 0);
	}

	public static String explainHops( ArrayList<Hop> hops, int level ) {
		StringBuilder sb = new StringBuilder();
		Hop.resetVisitStatus(hops);
		for( Hop hop : hops )
			sb.append(explainHop(hop, level));
		Hop.resetVisitStatus(hops);
		return sb.toString();
	}

	public static String explain( Hop hop ) {
		return explain(hop, 0);
	}

	public static String explain( Hop hop, int level ) {
		hop.resetVisitStatus();
		String ret = explainHop(hop, level);
		hop.resetVisitStatus();
		return ret;
	}

	public static String explainLineageItems( LineageItem[] lis ) {
		return explainLineageItems(lis, 0);
	}

	public static String explainLineageItems( LineageItem[] lis, int level ) {
		StringBuilder sb = new StringBuilder();
		LineageItem.resetVisitStatusNR(lis);
		for( LineageItem li : lis )
			sb.append(explainLineageItemNR(li, level));
		LineageItem.resetVisitStatusNR(lis);
		return sb.toString();
	}

	public static String explain( LineageItem li ) {
		li.resetVisitStatusNR();
		String s = explain(li, 0);
		//s += rExplainDedupItems(li, new ArrayList<>());
		li.resetVisitStatusNR();
		return s;
	}

	private static String explain( LineageItem li, int level ) {
		li.resetVisitStatusNR();
		String ret = explainLineageItemNR(li, level);
		li.resetVisitStatusNR();
		return ret;
	}
	
	@Deprecated
	@SuppressWarnings("unused")
	private static String rExplainDedupItems(LineageItem li, List<String> paths) {
		if (li.isVisited())
			return "";
		StringBuilder sb = new StringBuilder();
		
		if (li.getType() == LineageItem.LineageItemType.Dedup && !paths.contains(li.getData())) {
			sb.append("\n").append("dedup").append(li.getData()).append(":\n");
			sb.append(Explain.explain(li, 0));
			paths.add(li.getData());
		}
		
		if (li.getInputs() != null)
			for (LineageItem in : li.getInputs())
				sb.append(rExplainDedupItems(in, paths));
		
		li.setVisited();
		return sb.toString();
	}
	

	public static String explainCPlan( CNodeTpl cplan ) {
		StringBuilder sb = new StringBuilder();

		//create template header
		sb.append("\n----------------------------------------\n");
		sb.append("CPLAN: "+cplan.getTemplateInfo()+"\n");
		sb.append("--inputs: "+Arrays.toString(cplan.getInputNames())+"\n");
		sb.append("----------------------------------------\n");

		//explain body dag
		cplan.resetVisitStatusOutputs();
		if( cplan instanceof CNodeMultiAgg )
			for( CNode output : ((CNodeMultiAgg)cplan).getOutputs() )
				sb.append(explainCNode(output, 1));
		else
			sb.append(explainCNode(cplan.getOutput(), 1));
		cplan.resetVisitStatusOutputs();
		sb.append("----------------------------------------\n");

		return sb.toString();
	}

	public static String explain( CNode node ) {
		return explain(node, 0);
	}

	public static String explain( CNode node, int level ) {
		return explainCNode(node, level);
	}

	/**
	 * Counts the number of compiled MRJob/Spark instructions in the
	 * given runtime program.
	 *
	 * @param rtprog runtime program
	 * @return counts
	 */
	public static ExplainCounts countDistributedOperations( Program rtprog ) {
		ExplainCounts counts = new ExplainCounts();
		Explain.countCompiledInstructions(rtprog, counts, true, true);
		return counts;
	}

	public static String getIdentation( int level ) {
		return createOffset(level);
	}

	//////////////
	// internal explain HOPS

	private static String explainStatementBlock(StatementBlock sb, int level)
	{
		StringBuilder builder = new StringBuilder();
		String offset = createOffset(level);

		if (sb instanceof WhileStatementBlock) {
			WhileStatementBlock wsb = (WhileStatementBlock) sb;
			builder.append(offset);
			if( !wsb.getUpdateInPlaceVars().isEmpty() )
				builder.append("WHILE (lines "+wsb.getBeginLine()+"-"+wsb.getEndLine()+") [in-place="+wsb.getUpdateInPlaceVars().toString()+"]\n");
			else
				builder.append("WHILE (lines "+wsb.getBeginLine()+"-"+wsb.getEndLine()+")\n");
			builder.append(explainHop(wsb.getPredicateHops(), level+1));

			WhileStatement ws = (WhileStatement)sb.getStatement(0);
			for (StatementBlock current : ws.getBody())
				builder.append(explainStatementBlock(current, level+1));

		}
		else if (sb instanceof IfStatementBlock) {
			IfStatementBlock ifsb = (IfStatementBlock) sb;
			builder.append(offset);
			builder.append("IF (lines "+ifsb.getBeginLine()+"-"+ifsb.getEndLine()+")\n");
			builder.append(explainHop(ifsb.getPredicateHops(), level+1));

			IfStatement ifs = (IfStatement) sb.getStatement(0);
			for (StatementBlock current : ifs.getIfBody())
				builder.append(explainStatementBlock(current, level+1));
			if( !ifs.getElseBody().isEmpty() ) {
				builder.append(offset);
				builder.append("ELSE\n");
			}
			for (StatementBlock current : ifs.getElseBody())
				builder.append(explainStatementBlock(current, level+1));

		}
		else if (sb instanceof ForStatementBlock) {
			ForStatementBlock fsb = (ForStatementBlock) sb;
			builder.append(offset);
			if (sb instanceof ParForStatementBlock) {
				if( !fsb.getUpdateInPlaceVars().isEmpty() )
					builder.append("PARFOR (lines "+fsb.getBeginLine()+"-"+fsb.getEndLine()+") [in-place="+fsb.getUpdateInPlaceVars().toString()+"]\n");
				else
					builder.append("PARFOR (lines "+fsb.getBeginLine()+"-"+fsb.getEndLine()+")\n");
			}
			else {
				if( !fsb.getUpdateInPlaceVars().isEmpty() )
					builder.append("FOR (lines "+fsb.getBeginLine()+"-"+fsb.getEndLine()+") [in-place="+fsb.getUpdateInPlaceVars().toString()+"]\n");
				else
					builder.append("FOR (lines "+fsb.getBeginLine()+"-"+fsb.getEndLine()+")\n");
			}
			if (fsb.getFromHops() != null)
				builder.append(explainHop(fsb.getFromHops(), level+1));
			if (fsb.getToHops() != null)
				builder.append(explainHop(fsb.getToHops(), level+1));
			if (fsb.getIncrementHops() != null)
				builder.append(explainHop(fsb.getIncrementHops(), level+1));

			ForStatement fs = (ForStatement)sb.getStatement(0);
			for (StatementBlock current : fs.getBody())
				builder.append(explainStatementBlock(current, level+1));

		}
		else if (sb instanceof FunctionStatementBlock) {
			FunctionStatement fsb = (FunctionStatement) sb.getStatement(0);
			for (StatementBlock current : fsb.getBody())
				builder.append(explainStatementBlock(current, level+1));

		}
		else {
			// For generic StatementBlock
			builder.append(offset);
			builder.append("GENERIC (lines "+sb.getBeginLine()+"-"+sb.getEndLine()+") [recompile=" + sb.requiresRecompilation() + "]\n");
			ArrayList<Hop> hopsDAG = sb.getHops();
			if( hopsDAG != null && !hopsDAG.isEmpty() ) {
				Hop.resetVisitStatus(hopsDAG);
				for (Hop hop : hopsDAG)
					builder.append(explainHop(hop, level+1));
				Hop.resetVisitStatus(hopsDAG);
			}
		}

		return builder.toString();
	}

	/**
	 * Do a post-order traverse through the Hop DAG and explain each Hop
	 *
	 * @param hop high-level operator
	 * @param level offset
	 * @return string explanation of Hop DAG
	 */
	private static String explainHop(Hop hop, int level) {
		if( hop.isVisited() || (!SHOW_LITERAL_HOPS && hop instanceof LiteralOp) )
			return "";

		StringBuilder sb = new StringBuilder();
		String offset = createOffset(level);

		for( Hop input : hop.getInput() )
			sb.append(explainHop(input, level));

		//indentation
		sb.append(offset);

		//hop id
		if( SHOW_DATA_DEPENDENCIES )
			sb.append("("+hop.getHopID()+") ");

		//operation string
		sb.append(hop.getOpString());

		//input hop references
		if( SHOW_DATA_DEPENDENCIES ) {
			StringBuilder childs = new StringBuilder();
			childs.append(" (");
			boolean childAdded = false;
			for( Hop input : hop.getInput() )
				if( SHOW_LITERAL_HOPS || !(input instanceof LiteralOp) ){
					childs.append(childAdded?",":"");
					childs.append(input.getHopID());
					childAdded = true;
				}
			childs.append(")");
			if( childAdded )
				sb.append(childs.toString());
		}

		//matrix characteristics
		sb.append(" [" + hop.getDim1() + ","
				+ hop.getDim2() + ","
				+ hop.getBlocksize() + ","
				+ hop.getNnz());

		if (hop.getUpdateType().isInPlace())
			sb.append("," + hop.getUpdateType().toString().toLowerCase());

		sb.append("]");

		//memory estimates
		sb.append(" [" + showMem(hop.getInputMemEstimate(), false) + ","
				+ showMem(hop.getIntermediateMemEstimate(), false) + ","
				+ showMem(hop.getOutputMemEstimate(), false) + " -> "
				+ showMem(hop.getMemEstimate(), true) + "]");

		//data flow properties
		if( SHOW_DATA_FLOW_PROPERTIES ) {
			if( hop.requiresReblock() && hop.requiresCheckpoint() )
				sb.append(" [rblk,chkpt]");
			else if( hop.requiresReblock() )
				sb.append(" [rblk]");
			else if( hop.requiresCheckpoint() )
				sb.append(" [chkpt]");
		}

		//exec type
		if (hop.getExecType() != null)
			sb.append(", " + hop.getExecType());

		sb.append('\n');

		hop.setVisited();

		return sb.toString();
	}

	private static String explainLineageItemNR(LineageItem item, int level) {
		//NOTE: in contrast to similar non-recursive functions like resetVisitStatusNR,
		// we maintain a more complex stack to ensure DFS ordering where current nodes
		// are added after the subtree underneath is processed (backwards compatibility)
		Stack<LineageItem> stackItem = new Stack<>();
		Stack<MutableInt> stackPos = new Stack<>();
		stackItem.push(item); stackPos.push(new MutableInt(0));
		StringBuilder sb = new StringBuilder();
		while( !stackItem.empty() ) {
			LineageItem tmpItem = stackItem.peek();
			MutableInt tmpPos = stackPos.peek();
			//check ascent condition - no item processing
			if( tmpItem.isVisited() ) {
				stackItem.pop(); stackPos.pop();
			}
			//check ascent condition - append item
			else if( tmpItem.getInputs() == null 
				|| tmpItem.getOpcode().startsWith(LineageItemUtils.LPLACEHOLDER)
				// don't trace beyond if a placeholder is found
				|| tmpItem.getInputs().length <= tmpPos.intValue() ) {
				sb.append(createOffset(level));
				sb.append(tmpItem.toString());
				sb.append('\n');
				stackItem.pop(); stackPos.pop();
				tmpItem.setVisited();
			}
			//check descent condition
			else if( tmpItem.getInputs() != null ) {
				stackItem.push(tmpItem.getInputs()[tmpPos.intValue()]);
				tmpPos.increment();
				stackPos.push(new MutableInt(0));
			}
		}
		return sb.toString();
	}
	
	@Deprecated
	@SuppressWarnings("unused")
	private static String explainLineageItem(LineageItem li, int level) {
		if( li.isVisited())
			return "";

		StringBuilder sb = new StringBuilder();
		String offset = createOffset(level);

		if (li.getInputs() != null)
			for( LineageItem input : li.getInputs() )
				sb.append(explainLineageItem(input, level));

		sb.append(offset);
		sb.append(li.toString());
		sb.append('\n');

		li.setVisited();

		return sb.toString();
	}

	//////////////
	// internal explain CNODE

	private static String explainCNode(CNode cnode, int level) {
		if( cnode.isVisited() )
			return "";

		StringBuilder sb = new StringBuilder();
		String offset = createOffset(level);

		for( CNode input : cnode.getInput() )
			sb.append(explainCNode(input, level));

		//indentation
		sb.append(offset);

		//hop id
		if( SHOW_DATA_DEPENDENCIES )
			sb.append("("+cnode.getID()+") ");

		//operation string
		sb.append(cnode.toString());

		//input hop references 
		if( SHOW_DATA_DEPENDENCIES ) {
			StringBuilder childs = new StringBuilder();
			childs.append(" (");
			boolean childAdded = false;
			for( CNode input : cnode.getInput() ) {
				childs.append(childAdded?",":"");
				childs.append(input.getID());
				childAdded = true;
			}
			childs.append(")");
			if( childAdded )
				sb.append(childs.toString());
		}

		sb.append('\n');
		cnode.setVisited();

		return sb.toString();
	}

	//////////////
	// internal explain RUNTIME

	private static String explainProgramBlock( ProgramBlock pb, int level )
	{
		StringBuilder sb = new StringBuilder();
		String offset = createOffset(level);

		if (pb instanceof FunctionProgramBlock ) {
			FunctionProgramBlock fpb = (FunctionProgramBlock)pb;
			for( ProgramBlock pbc : fpb.getChildBlocks() )
				sb.append( explainProgramBlock( pbc, level+1) );
		}
		else if (pb instanceof WhileProgramBlock) {
			WhileProgramBlock wpb = (WhileProgramBlock) pb;
			StatementBlock wsb = pb.getStatementBlock();
			sb.append(offset);
			if( wsb != null && !wsb.getUpdateInPlaceVars().isEmpty() )
				sb.append("WHILE (lines "+wpb.getBeginLine()+"-"+wpb.getEndLine()+") [in-place="+wsb.getUpdateInPlaceVars().toString()+"]\n");
			else
				sb.append("WHILE (lines "+wpb.getBeginLine()+"-"+wpb.getEndLine()+")\n");
			sb.append(explainInstructions(wpb.getPredicate(), level+1));
			for( ProgramBlock pbc : wpb.getChildBlocks() )
				sb.append( explainProgramBlock( pbc, level+1) );
			if( wpb.getExitInstruction() != null )
				sb.append(explainInstructions(wpb.getExitInstruction(), level+1));
		}
		else if (pb instanceof IfProgramBlock) {
			IfProgramBlock ipb = (IfProgramBlock) pb;
			sb.append(offset);
			sb.append("IF (lines "+ipb.getBeginLine()+"-"+ipb.getEndLine()+")\n");
			sb.append(explainInstructions(ipb.getPredicate(), level+1));
			for( ProgramBlock pbc : ipb.getChildBlocksIfBody() )
				sb.append( explainProgramBlock( pbc, level+1) );
			if( !ipb.getChildBlocksElseBody().isEmpty() ) {
				sb.append(offset);
				sb.append("ELSE\n");
				for( ProgramBlock pbc : ipb.getChildBlocksElseBody() )
					sb.append( explainProgramBlock( pbc, level+1) );
			}
			if( ipb.getExitInstruction() != null )
				sb.append(explainInstructions(ipb.getExitInstruction(), level+1));
		}
		else if (pb instanceof ForProgramBlock) { //incl parfor
			ForProgramBlock fpb = (ForProgramBlock) pb;
			StatementBlock fsb = pb.getStatementBlock();
			sb.append(offset);
			if( pb instanceof ParForProgramBlock )
				sb.append("PARFOR (lines "+fpb.getBeginLine()+"-"+fpb.getEndLine()+")\n");
			else {
				if( fsb != null && !fsb.getUpdateInPlaceVars().isEmpty() )
					sb.append("FOR (lines "+fpb.getBeginLine()+"-"+fpb.getEndLine()+") [in-place="+fsb.getUpdateInPlaceVars().toString()+"]\n");
				else
					sb.append("FOR (lines "+fpb.getBeginLine()+"-"+fpb.getEndLine()+")\n");
			}
			sb.append(explainInstructions(fpb.getFromInstructions(), level+1));
			sb.append(explainInstructions(fpb.getToInstructions(), level+1));
			sb.append(explainInstructions(fpb.getIncrementInstructions(), level+1));
			for( ProgramBlock pbc : fpb.getChildBlocks() )
				sb.append( explainProgramBlock( pbc, level+1) );
			if( fpb.getExitInstruction() != null )
				sb.append(explainInstructions(fpb.getExitInstruction(), level+1));
		}
		else if( pb instanceof BasicProgramBlock ) {
			BasicProgramBlock bpb = (BasicProgramBlock) pb;
			sb.append(offset);
			if( pb.getStatementBlock()!=null )
				sb.append("GENERIC (lines "+pb.getBeginLine()+"-"+pb.getEndLine()+") [recompile="+pb.getStatementBlock().requiresRecompilation()+"]\n");
			else
				sb.append("GENERIC (lines "+pb.getBeginLine()+"-"+pb.getEndLine()+") \n");
			sb.append(explainInstructions(bpb.getInstructions(), level+1));
		}

		return sb.toString();
	}

	private static String explainInstructions( ArrayList<Instruction> instSet, int level ) {
		StringBuilder sb = new StringBuilder();
		String offsetInst = createOffset(level);
		for( Instruction inst : instSet ) {
			String tmp = explainGenericInstruction(inst, level);
			sb.append( offsetInst );
			sb.append( tmp );
			sb.append( '\n' );
		}

		return sb.toString();
	}

	private static String explainInstructions( Instruction inst, int level ) {
		StringBuilder sb = new StringBuilder();
		sb.append( createOffset(level) );
		sb.append( explainGenericInstruction(inst, level) );
		sb.append( '\n' );
		return sb.toString();
	}
	
	private static String explainGenericInstruction( Instruction inst, int level )
	{
		String tmp = null;
		if ( inst instanceof SPInstruction || inst instanceof CPInstruction || inst instanceof GPUInstruction ||
				inst instanceof FEDInstruction )
			tmp = inst.toString();

		if( REPLACE_SPECIAL_CHARACTERS ){
			tmp = tmp.replaceAll(Lop.OPERAND_DELIMITOR, " ");
			tmp = tmp.replaceAll(Lop.DATATYPE_PREFIX, ".");
			tmp = tmp.replaceAll(Lop.INSTRUCTION_DELIMITOR, ", ");
		}

		return tmp;
	}

	@SuppressWarnings("unused")
	private static String showMem(double mem, boolean units)
	{
		if( !SHOW_MEM_ABOVE_BUDGET && mem >= OptimizerUtils.DEFAULT_SIZE )
			return "MAX";
		return OptimizerUtils.toMB(mem) + (units?"MB":"");
	}

	private static String createOffset( int level )
	{
		StringBuilder sb = new StringBuilder();
		for( int i=0; i<level; i++ )
			sb.append("--");
		return sb.toString();
	}

	private static void countCompiledInstructions( Program rtprog, ExplainCounts counts, boolean CP, boolean SP )
	{
		//analyze DML-bodied functions
		for( FunctionProgramBlock fpb : rtprog.getFunctionProgramBlocks().values() )
			countCompiledInstructions( fpb, counts, CP, SP );

		//analyze main program
		for( ProgramBlock pb : rtprog.getProgramBlocks() )
			countCompiledInstructions( pb, counts, CP, SP );
	}

	/**
	 * Recursively counts the number of compiled MRJob instructions in the
	 * given runtime program block. 
	 *
	 * @param pb program block
	 * @param counts explain countst
	 * @param CP if true, count CP instructions
	 * @param SP if true, count Spark instructions
	 */
	private static void countCompiledInstructions(ProgramBlock pb, ExplainCounts counts, boolean CP, boolean SP)
	{
		if (pb instanceof WhileProgramBlock) {
			WhileProgramBlock tmp = (WhileProgramBlock)pb;
			countCompiledInstructions(tmp.getPredicate(), counts, CP, SP);
			for (ProgramBlock pb2 : tmp.getChildBlocks())
				countCompiledInstructions(pb2, counts, CP, SP);
		}
		else if (pb instanceof IfProgramBlock) {
			IfProgramBlock tmp = (IfProgramBlock)pb;
			countCompiledInstructions(tmp.getPredicate(), counts, CP, SP);
			for( ProgramBlock pb2 : tmp.getChildBlocksIfBody() )
				countCompiledInstructions(pb2, counts, CP, SP);
			for( ProgramBlock pb2 : tmp.getChildBlocksElseBody() )
				countCompiledInstructions(pb2, counts, CP, SP);
		}
		else if (pb instanceof ForProgramBlock) { //includes ParFORProgramBlock
			ForProgramBlock tmp = (ForProgramBlock)pb;
			countCompiledInstructions(tmp.getFromInstructions(), counts, CP, SP);
			countCompiledInstructions(tmp.getToInstructions(), counts, CP, SP);
			countCompiledInstructions(tmp.getIncrementInstructions(), counts, CP, SP);
			for( ProgramBlock pb2 : tmp.getChildBlocks() )
				countCompiledInstructions(pb2, counts, CP, SP);
			//additional parfor jobs counted during runtime
		}
		else if ( pb instanceof FunctionProgramBlock ) {
			FunctionProgramBlock fpb = (FunctionProgramBlock)pb;
			for( ProgramBlock pb2 : fpb.getChildBlocks() )
				countCompiledInstructions(pb2, counts, CP, SP);
		}
		else if( pb instanceof BasicProgramBlock ) {
			BasicProgramBlock bpb = (BasicProgramBlock) pb;
			countCompiledInstructions(bpb.getInstructions(), counts, CP, SP);
		}
	}

	/**
	 * Count the number of Hadoop instructions, CP instructions, Spark
	 * instructions, and/or Spark reblock instructions in a list of
	 * instructions.
	 *
	 * @param instSet
	 *            list of instructions
	 * @param counts
	 *            explain counts
	 * @param CP
	 *            if true, count CP instructions
	 * @param SP
	 *            if true, count Spark instructions and Spark reblock
	 *            instructions
	 */
	private static void countCompiledInstructions( ArrayList<Instruction> instSet, ExplainCounts counts, boolean CP, boolean SP )
	{
		for( Instruction inst : instSet )
		{
			if( CP && inst instanceof CPInstruction )
				counts.numCPInst++;
			else if( SP && inst instanceof SPInstruction )
				counts.numJobs++;

			//keep track of reblocks (in order to prevent unnecessary spark context creation)
			if( SP && (inst instanceof CSVReblockSPInstruction || inst instanceof ReblockSPInstruction) )
				counts.numReblocks++;
			if( SP && inst instanceof CheckpointSPInstruction )
				counts.numChkpts++;
		}
	}

	private static String explainFunctionCallGraph(FunctionCallGraph fgraph, HashSet<String> fstack, String fkey, int level)
	{
		StringBuilder builder = new StringBuilder();
		String offset = createOffset(level);
		Collection<String> cfkeys = fgraph.getCalledFunctions(fkey);
		if( cfkeys != null ) {
			for( String cfkey : cfkeys ) {
				if( fstack.contains(cfkey) && fgraph.isRecursiveFunction(cfkey) )
					builder.append(offset + "--" + cfkey + " (recursive)\n");
				else {
					fstack.add(cfkey);
					builder.append(offset + "--" + cfkey + "\n");
					builder.append(explainFunctionCallGraph(fgraph, fstack, cfkey, level+1));
					fstack.remove(cfkey);
				}
			}
		}

		return builder.toString();
	}
}
