/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.sysds.runtime.util;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.conf.CompilerConfig;
import org.apache.sysds.conf.CompilerConfig.ConfigType;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.conf.DMLConfig;
import org.apache.sysds.hops.FunctionOp;
import org.apache.sysds.hops.Hop;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.hops.recompile.Recompiler;
import org.apache.sysds.lops.Lop;
import org.apache.sysds.parser.DMLProgram;
import org.apache.sysds.parser.DataIdentifier;
import org.apache.sysds.parser.ForStatement;
import org.apache.sysds.parser.ForStatementBlock;
import org.apache.sysds.parser.FunctionStatement;
import org.apache.sysds.parser.FunctionStatementBlock;
import org.apache.sysds.parser.IfStatement;
import org.apache.sysds.parser.IfStatementBlock;
import org.apache.sysds.parser.ParForStatementBlock;
import org.apache.sysds.parser.ParForStatementBlock.ResultVar;
import org.apache.sysds.parser.StatementBlock;
import org.apache.sysds.parser.WhileStatement;
import org.apache.sysds.parser.WhileStatementBlock;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.codegen.CodegenUtils;
import org.apache.sysds.runtime.controlprogram.BasicProgramBlock;
import org.apache.sysds.runtime.controlprogram.ForProgramBlock;
import org.apache.sysds.runtime.controlprogram.FunctionProgramBlock;
import org.apache.sysds.runtime.controlprogram.IfProgramBlock;
import org.apache.sysds.runtime.controlprogram.LocalVariableMap;
import org.apache.sysds.runtime.controlprogram.ParForProgramBlock;
import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PExecMode;
import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
import org.apache.sysds.runtime.controlprogram.Program;
import org.apache.sysds.runtime.controlprogram.ProgramBlock;
import org.apache.sysds.runtime.controlprogram.WhileProgramBlock;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory;
import org.apache.sysds.runtime.controlprogram.paramserv.SparkPSBody;
import org.apache.sysds.runtime.controlprogram.parfor.ParForBody;
import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysds.runtime.instructions.CPInstructionParser;
import org.apache.sysds.runtime.instructions.Instruction;
import org.apache.sysds.runtime.instructions.InstructionParser;
import org.apache.sysds.runtime.instructions.cp.BooleanObject;
import org.apache.sysds.runtime.instructions.cp.CPInstruction;
import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.instructions.cp.DoubleObject;
import org.apache.sysds.runtime.instructions.cp.FunctionCallCPInstruction;
import org.apache.sysds.runtime.instructions.cp.IntObject;
import org.apache.sysds.runtime.instructions.cp.ListObject;
import org.apache.sysds.runtime.instructions.cp.ScalarObject;
import org.apache.sysds.runtime.instructions.cp.SpoofCPInstruction;
import org.apache.sysds.runtime.instructions.cp.StringObject;
import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
import org.apache.sysds.runtime.instructions.gpu.GPUInstruction;
import org.apache.sysds.runtime.instructions.spark.SPInstruction;
import org.apache.sysds.runtime.lineage.Lineage;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.meta.MetaDataFormat;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.stream.Collectors;

/**
 * Program converter functionalities for 
 *   (1) creating deep copies of program blocks, instructions, function program blocks, and 
 *   (2) serializing and parsing of programs, program blocks, functions program blocks.
 * 
 */
//TODO: rewrite class to instance-based invocation (grown gradually and now inappropriate design)
public class ProgramConverter 
{
	protected static final Log LOG = LogFactory.getLog(ProgramConverter.class.getName());

	//use escaped unicodes for separators in order to prevent string conflict
	public static final String NEWLINE           = "\n"; //System.lineSeparator();
	public static final String COMPONENTS_DELIM  = "\u236e"; //semicolon w/ bar; ";";
	public static final String ELEMENT_DELIM     = "\u236a"; //comma w/ bar; ",";
	public static final String ELEMENT_DELIM2    = ",";
	public static final String DATA_FIELD_DELIM  = "\u007c"; //"|";
	public static final String KEY_VALUE_DELIM   = "\u003d"; //"=";
	public static final String LEVELIN           = "\u23a8"; //variant of left curly bracket; "\u007b"; //"{";
	public static final String LEVELOUT          = "\u23ac"; //variant of right curly bracket; "\u007d"; //"}";
	public static final String EMPTY             = "null";
	public static final String DASH              = "-";
	public static final String REF               = "ref";
	public static final String LIST_ELEMENT_DELIM = "\t";

	public static final String CDATA_BEGIN = "<![CDATA[";
	public static final String CDATA_END = " ]]>";
	
	public static final String PROG_BEGIN = " PROG" + LEVELIN;
	public static final String PROG_END = LEVELOUT;
	public static final String VARS_BEGIN = "VARS: ";
	public static final String VARS_END = "";
	public static final String PBS_BEGIN = " PBS" + LEVELIN;
	public static final String PBS_END = LEVELOUT;
	public static final String INST_BEGIN = " INST: ";
	public static final String INST_END = "";
	public static final String EC_BEGIN = " EC: ";
	public static final String EC_END = "";
	public static final String PB_BEGIN = " PB" + LEVELIN;
	public static final String PB_END = LEVELOUT;
	public static final String PB_WHILE = " WHILE" + LEVELIN;
	public static final String PB_FOR = " FOR" + LEVELIN;
	public static final String PB_PARFOR = " PARFOR" + LEVELIN;
	public static final String PB_IF = " IF" + LEVELIN;
	public static final String PB_FC = " FC" + LEVELIN;
	public static final String PB_EFC = " EFC" + LEVELIN;

	public static final String CONF_STATS = "stats";

	// Used for parfor
	public static final String PARFORBODY_BEGIN  = CDATA_BEGIN + "PARFORBODY" + LEVELIN;
	public static final String PARFORBODY_END    = LEVELOUT + CDATA_END;

	// Used for paramserv builtin function
	public static final String PSBODY_BEGIN = CDATA_BEGIN + "PSBODY" + LEVELIN;
	public static final String PSBODY_END = LEVELOUT + CDATA_END;
	
	//exception msgs
	public static final String NOT_SUPPORTED_EXTERNALFUNCTION_PB = "Not supported: ExternalFunctionProgramBlock contains MR instructions. " +
																	"(ExternalFunctionPRogramBlockCP can be used)";
	public static final String NOT_SUPPORTED_SPARK_INSTRUCTION   = "Not supported: Instructions of type other than CP instructions";
	public static final String NOT_SUPPORTED_SPARK_PARFOR        = "Not supported: Nested ParFOR REMOTE_SPARK due to possible deadlocks." +
																	"(LOCAL can be used for innner ParFOR)";
	public static final String NOT_SUPPORTED_PB                  = "Not supported: type of program block";
	
	////////////////////////////////
	// CREATION of DEEP COPIES
	////////////////////////////////
	
	/**
	 * Creates a deep copy of the given execution context.
	 * For rt_platform=Hadoop, execution context has a symbol table.
	 * 
	 * @param ec execution context
	 * @return execution context
	 * @throws CloneNotSupportedException if CloneNotSupportedException occurs
	 */
	public static ExecutionContext createDeepCopyExecutionContext(ExecutionContext ec) 
		throws CloneNotSupportedException
	{
		ExecutionContext cpec = ExecutionContextFactory.createContext(false, ec.getProgram());
		cpec.setVariables((LocalVariableMap) ec.getVariables().clone());
		if( ec.getLineage() != null )
			cpec.setLineage(new Lineage(ec.getLineage()));
		
		//handle result variables with in-place update flag
		//(each worker requires its own copy of the empty matrix object)
		for( String var : cpec.getVariables().keySet() ) {
			Data dat = cpec.getVariables().get(var);
			if( dat instanceof MatrixObject && ((MatrixObject)dat).getUpdateType().isInPlace() ) {
				MatrixObject mo = (MatrixObject)dat;
				MatrixObject moNew = new MatrixObject(mo); 
				if( mo.getNnz() != 0 ){
					// If output matrix is not empty (NNZ != 0), then local copy is created so that 
					// update in place operation can be applied.
					MatrixBlock mbVar = mo.acquireRead();
					moNew.acquireModify (new MatrixBlock(mbVar));
					mo.release();
				} else {
					//create empty matrix block w/ dense representation (preferred for update in-place)
					//Creating a dense matrix block is valid because empty block not allocated and transfer 
					// to sparse representation happens in left indexing in place operation.
					moNew.acquireModify(new MatrixBlock((int)mo.getNumRows(), (int)mo.getNumColumns(), false));
				}
				moNew.release();
				cpec.setVariable(var, moNew);
			}
		}
		
		return cpec;
	}
	
	/**
	 * This recursively creates a deep copy of program blocks and transparently replaces filenames according to the
	 * specified parallel worker in order to avoid conflicts between parworkers. This happens recursively in order
	 * to support arbitrary control-flow constructs within a parfor. 
	 * 
	 * @param childBlocks child program blocks
	 * @param pid ?
	 * @param IDPrefix ?
	 * @param fnStack ?
	 * @param fnCreated ?
	 * @param plain if true, full deep copy without id replacement
	 * @param forceDeepCopy if true, force deep copy
	 * @return list of program blocks
	 */
	public static ArrayList<ProgramBlock> rcreateDeepCopyProgramBlocks(ArrayList<ProgramBlock> childBlocks, long pid, int IDPrefix, Set<String> fnStack, Set<String> fnCreated, boolean plain, boolean forceDeepCopy) 
	{
		ArrayList<ProgramBlock> tmp = new ArrayList<>();
		
		for( ProgramBlock pb : childBlocks )
		{
			Program prog = pb.getProgram();
			ProgramBlock tmpPB = null;
			
			if( pb instanceof WhileProgramBlock ) {
				tmpPB = createDeepCopyWhileProgramBlock((WhileProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy);
			}
			else if( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock) ) {
				tmpPB = createDeepCopyForProgramBlock((ForProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy );
			}
			else if( pb instanceof ParForProgramBlock ) {
				ParForProgramBlock pfpb = (ParForProgramBlock) pb;
				if( ParForProgramBlock.ALLOW_NESTED_PARALLELISM )
					tmpPB = createDeepCopyParForProgramBlock(pfpb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy);
				else 
					tmpPB = createDeepCopyForProgramBlock((ForProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy);
			}
			else if( pb instanceof IfProgramBlock ) {
				tmpPB = createDeepCopyIfProgramBlock((IfProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy);
			}
			else if( pb instanceof BasicProgramBlock ) { //last-level program block
				BasicProgramBlock bpb = (BasicProgramBlock) pb;
				tmpPB = new BasicProgramBlock(prog); // general case use for most PBs
				
				//for recompile in the master node JVM
				tmpPB.setStatementBlock(createStatementBlockCopy(bpb.getStatementBlock(), pid, plain, forceDeepCopy)); 
				tmpPB.setThreadID(pid);
				
				//copy instructions
				((BasicProgramBlock)tmpPB).setInstructions(
					createDeepCopyInstructionSet(bpb.getInstructions(),
					pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
			}
			
			//copy symbol table
			//tmpPB.setVariables( pb.getVariables() ); //implicit cloning
			
			tmp.add(tmpPB);
		}
		
		return tmp;
	}

	public static WhileProgramBlock createDeepCopyWhileProgramBlock(WhileProgramBlock wpb, long pid, int IDPrefix, Program prog, Set<String> fnStack, Set<String> fnCreated, boolean plain, boolean forceDeepCopy) {
		ArrayList<Instruction> predinst = createDeepCopyInstructionSet(wpb.getPredicate(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true);
		WhileProgramBlock tmpPB = new WhileProgramBlock(prog, predinst);
		StatementBlock sb = ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) ?
			createWhileStatementBlockCopy((WhileStatementBlock) wpb.getStatementBlock(), forceDeepCopy) : wpb.getStatementBlock();
		tmpPB.setStatementBlock( sb );
		tmpPB.setThreadID(pid);
		tmpPB.setChildBlocks(rcreateDeepCopyProgramBlocks(wpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy));
		tmpPB.setExitInstruction(wpb.getExitInstruction());
		return tmpPB;
	}

	public static IfProgramBlock createDeepCopyIfProgramBlock(IfProgramBlock ipb, long pid, int IDPrefix, Program prog, Set<String> fnStack, Set<String> fnCreated, boolean plain, boolean forceDeepCopy) {
		ArrayList<Instruction> predinst = createDeepCopyInstructionSet(ipb.getPredicate(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true);
		IfProgramBlock tmpPB = new IfProgramBlock(prog, predinst);
		StatementBlock sb = ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) ?
			createIfStatementBlockCopy((IfStatementBlock)ipb.getStatementBlock(), forceDeepCopy ) : ipb.getStatementBlock();
		tmpPB.setStatementBlock( sb );
		tmpPB.setThreadID(pid);
		tmpPB.setChildBlocksIfBody(rcreateDeepCopyProgramBlocks(ipb.getChildBlocksIfBody(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy));
		tmpPB.setChildBlocksElseBody(rcreateDeepCopyProgramBlocks(ipb.getChildBlocksElseBody(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy));
		tmpPB.setExitInstruction(ipb.getExitInstruction());
		return tmpPB;
	}

	public static ForProgramBlock createDeepCopyForProgramBlock(ForProgramBlock fpb, long pid, int IDPrefix, Program prog, Set<String> fnStack, Set<String> fnCreated, boolean plain, boolean forceDeepCopy) {
		ForProgramBlock tmpPB = new ForProgramBlock(prog,fpb.getIterVar());
		StatementBlock sb = ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) ?
			createForStatementBlockCopy((ForStatementBlock)fpb.getStatementBlock(), forceDeepCopy) : fpb.getStatementBlock();
		tmpPB.setStatementBlock(sb);
		tmpPB.setThreadID(pid);
		tmpPB.setFromInstructions( createDeepCopyInstructionSet(fpb.getFromInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
		tmpPB.setToInstructions( createDeepCopyInstructionSet(fpb.getToInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
		tmpPB.setIncrementInstructions( createDeepCopyInstructionSet(fpb.getIncrementInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
		tmpPB.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy) );
		tmpPB.setExitInstruction(fpb.getExitInstruction());
		return tmpPB;
	}

	public static ForProgramBlock createShallowCopyForProgramBlock(ForProgramBlock fpb, Program prog ) {
		ForProgramBlock tmpPB = new ForProgramBlock(prog,fpb.getIterVar());
		tmpPB.setFromInstructions( fpb.getFromInstructions() );
		tmpPB.setToInstructions( fpb.getToInstructions() );
		tmpPB.setIncrementInstructions( fpb.getIncrementInstructions() );
		tmpPB.setChildBlocks( fpb.getChildBlocks() );
		tmpPB.setExitInstruction(fpb.getExitInstruction());
		return tmpPB;
	}

	public static ParForProgramBlock createDeepCopyParForProgramBlock(ParForProgramBlock pfpb, long pid, int IDPrefix, Program prog, Set<String> fnStack, Set<String> fnCreated, boolean plain, boolean forceDeepCopy) {
		ParForProgramBlock tmpPB = null;
		
		if( IDPrefix == -1 ) //still on master node
			tmpPB = new ParForProgramBlock(prog,pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables());
		else //child of remote ParWorker at any level
			tmpPB = new ParForProgramBlock(IDPrefix, prog, pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables());
		
		StatementBlock sb = ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) ?
			createForStatementBlockCopy((ForStatementBlock)pfpb.getStatementBlock(), forceDeepCopy) : pfpb.getStatementBlock();
		tmpPB.setStatementBlock( sb );
		tmpPB.setThreadID(pid);
		
		tmpPB.disableOptimization(); //already done in top-level parfor
		tmpPB.disableMonitorReport(); //already done in top-level parfor
		
		tmpPB.setFromInstructions( createDeepCopyInstructionSet(pfpb.getFromInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
		tmpPB.setToInstructions( createDeepCopyInstructionSet(pfpb.getToInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
		tmpPB.setIncrementInstructions( createDeepCopyInstructionSet(pfpb.getIncrementInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
		
		//NOTE: Normally, no recursive copy because (1) copied on each execution in this PB anyway 
		//and (2) leave placeholders as they are. However, if plain, an explicit deep copy is requested.
		if( plain || forceDeepCopy )
			tmpPB.setChildBlocks( rcreateDeepCopyProgramBlocks(pfpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy) ); 
		else
			tmpPB.setChildBlocks( pfpb.getChildBlocks() );
		tmpPB.setExitInstruction(pfpb.getExitInstruction());
		
		return tmpPB;
	}
	
	/**
	 * This creates a deep copy of a function program block. The central reference to singletons of function program blocks
	 * poses the need for explicit copies in order to prevent conflicting writes of temporary variables (see ExternalFunctionProgramBlock.
	 * 
	 * @param namespace function namespace
	 * @param oldName ?
	 * @param pid ?
	 * @param IDPrefix ?
	 * @param prog runtime program
	 * @param fnStack ?
	 * @param fnCreated ?
	 * @param plain ?
	 */
	public static void createDeepCopyFunctionProgramBlock(String namespace, String oldName, long pid, int IDPrefix, Program prog, Set<String> fnStack, Set<String> fnCreated, boolean plain) 
	{
		//fpb guaranteed to be non-null (checked inside getFunctionProgramBlock)
		FunctionProgramBlock fpb = prog.getFunctionProgramBlock(namespace, oldName);
		String fnameNew = (plain)? oldName :(oldName+Lop.CP_CHILD_THREAD+pid); 
		String fnameNewKey = DMLProgram.constructFunctionKey(namespace,fnameNew);

		if( prog.getFunctionProgramBlocks().containsKey(fnameNewKey) )
			return; //prevent redundant deep copy if already existent
		
		//create deep copy
		FunctionProgramBlock copy = null;
		ArrayList<DataIdentifier> tmp1 = new ArrayList<>();
		ArrayList<DataIdentifier> tmp2 = new ArrayList<>();
		if( fpb.getInputParams()!= null )
			tmp1.addAll(fpb.getInputParams());
		if( fpb.getOutputParams()!= null )
			tmp2.addAll(fpb.getOutputParams());
		
		
		if( !fnStack.contains(fnameNewKey) ) {
			fnStack.add(fnameNewKey);
			copy = new FunctionProgramBlock(prog, tmp1, tmp2);
			copy.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, fpb.isRecompileOnce()) );
			copy.setRecompileOnce( fpb.isRecompileOnce() );
			copy.setThreadID(pid);
			fnStack.remove(fnameNewKey);
		}
		else //stop deep copy for recursive function calls
			copy = fpb;
		
		//copy.setVariables( (LocalVariableMap) fpb.getVariables() ); //implicit cloning
		//note: instructions not used by function program block
		
		//put if not existing (recursive processing might have added it)
		if( !prog.getFunctionProgramBlocks().containsKey(fnameNewKey) ) {
			prog.addFunctionProgramBlock(namespace, fnameNew, copy);
			fnCreated.add(DMLProgram.constructFunctionKey(namespace, fnameNew));
		}
	}

	public static FunctionProgramBlock createDeepCopyFunctionProgramBlock(FunctionProgramBlock fpb, Set<String> fnStack, Set<String> fnCreated) 
	{
		if( fpb == null )
			throw new DMLRuntimeException("Unable to create a deep copy of a non-existing FunctionProgramBlock.");
	
		//create deep copy
		FunctionProgramBlock copy = null;
		ArrayList<DataIdentifier> tmp1 = new ArrayList<>();
		ArrayList<DataIdentifier> tmp2 = new ArrayList<>();
		if( fpb.getInputParams()!= null )
			tmp1.addAll(fpb.getInputParams());
		if( fpb.getOutputParams()!= null )
			tmp2.addAll(fpb.getOutputParams());
		
		copy = new FunctionProgramBlock(fpb.getProgram(), tmp1, tmp2);
		copy.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), 0, -1, fnStack, fnCreated, true, fpb.isRecompileOnce()) );
		copy.setStatementBlock( fpb.getStatementBlock() );
		copy.setRecompileOnce(fpb.isRecompileOnce());
		//copy.setVariables( (LocalVariableMap) fpb.getVariables() ); //implicit cloning
		//note: instructions not used by function program block
	
		return copy;
	}

	
	/**
	 * Creates a deep copy of an array of instructions and replaces the placeholders of parworker
	 * IDs with the concrete IDs of this parfor instance. This is a helper method uses for generating
	 * deep copies of program blocks.
	 * 
	 * @param instSet list of instructions
	 * @param pid ?
	 * @param IDPrefix ?
	 * @param prog runtime program
	 * @param fnStack ?
	 * @param fnCreated ?
	 * @param plain ?
	 * @param cpFunctions ?
	 * @return list of instructions
	 */
	public static ArrayList<Instruction> createDeepCopyInstructionSet(ArrayList<Instruction> instSet, long pid, int IDPrefix, Program prog, Set<String> fnStack, Set<String> fnCreated, boolean plain, boolean cpFunctions) {
		ArrayList<Instruction> tmp = new ArrayList<>();
		for( Instruction inst : instSet ) {
			if( inst instanceof FunctionCallCPInstruction && cpFunctions ) {
				FunctionCallCPInstruction finst = (FunctionCallCPInstruction) inst;
				createDeepCopyFunctionProgramBlock( finst.getNamespace(),
					finst.getFunctionName(), pid, IDPrefix, prog, fnStack, fnCreated, plain );
			}
			tmp.add( cloneInstruction( inst, pid, plain, cpFunctions ) );
		}
		return tmp;
	}

	public static Instruction cloneInstruction( Instruction oInst, long pid, boolean plain, boolean cpFunctions ) 
	{
		Instruction inst = null;
		String tmpString = oInst.toString();
		
		try
		{
			if( oInst instanceof CPInstruction || oInst instanceof SPInstruction 
				|| oInst instanceof GPUInstruction ) {
				if( oInst instanceof FunctionCallCPInstruction && cpFunctions ) {
					FunctionCallCPInstruction tmp = (FunctionCallCPInstruction) oInst;
					if( !plain ) {
						//safe replacement because target variables might include the function name
						//note: this is no update-in-place in order to keep the original function name as basis
						tmpString = tmp.updateInstStringFunctionName(tmp.getFunctionName(), tmp.getFunctionName() + Lop.CP_CHILD_THREAD+pid);
					}
					//otherwise: preserve function name
				}
				inst = InstructionParser.parseSingleInstruction(tmpString);
			}
			else
				throw new DMLRuntimeException("Failed to clone instruction: "+oInst);
		}
		catch(Exception ex) {
			throw new DMLRuntimeException(ex);
		}
		
		//save replacement of thread id references in instructions
		inst = saveReplaceThreadID( inst, Lop.CP_ROOT_THREAD_ID, Lop.CP_CHILD_THREAD+pid);
		
		return inst;
	}
	
	public static FunctionStatementBlock createDeepCopyFunctionStatementBlock(FunctionStatementBlock fsb, Set<String> fnStack, Set<String> fnCreated) {
		FunctionStatement fstmt = (FunctionStatement) fsb.getStatement(0);
		FunctionStatementBlock retSb = new FunctionStatementBlock();
		FunctionStatement retStmt = new FunctionStatement();
		retStmt.setName(fstmt.getName());
		retStmt.setInputParams(fstmt.getInputParams());
		retStmt.setInputDefaults(fstmt.getInputDefaults());
		retStmt.setOutputParams(fstmt.getOutputParams());
		retSb.addStatement(retStmt);
		retSb.setDMLProg(fsb.getDMLProg());
		retSb.setParseInfo(fsb);
		retSb.setLiveIn( fsb.liveIn() );
		retSb.setLiveOut( fsb.liveOut() );
		for( StatementBlock sb : fstmt.getBody() )
			retStmt.getBody().add(rCreateDeepCopyStatementBlock(sb));
		return retSb;
	}
	
	public static StatementBlock rCreateDeepCopyStatementBlock(StatementBlock sb) {
		StatementBlock ret = null;
		if( sb instanceof IfStatementBlock ) {
			IfStatementBlock orig = (IfStatementBlock) sb;
			IfStatementBlock isb = createIfStatementBlockCopy(orig, true);
			IfStatement origstmt = (IfStatement) orig.getStatement(0);
			IfStatement istmt = new IfStatement(); //only shallow
			istmt.setConditionalPredicate(origstmt.getConditionalPredicate());
			isb.setStatements(CollectionUtils.asArrayList(istmt));
			for( StatementBlock c : origstmt.getIfBody() )
				istmt.addStatementBlockIfBody(rCreateDeepCopyStatementBlock(c));
			for( StatementBlock c : origstmt.getElseBody() )
				istmt.addStatementBlockElseBody(rCreateDeepCopyStatementBlock(c));
			ret = isb;
		}
		else if( sb instanceof WhileStatementBlock ) {
			WhileStatementBlock orig = (WhileStatementBlock) sb;
			WhileStatementBlock wsb = createWhileStatementBlockCopy(orig, true);
			WhileStatement origstmt = (WhileStatement) orig.getStatement(0);
			WhileStatement wstmt = new WhileStatement(); //only shallow
			wstmt.setPredicate(origstmt.getConditionalPredicate());
			wsb.setStatements(CollectionUtils.asArrayList(wstmt));
			for( StatementBlock c : origstmt.getBody() )
				wstmt.addStatementBlock(rCreateDeepCopyStatementBlock(c));
			ret = wsb;
		}
		else if( sb instanceof ForStatementBlock ) { //incl parfor
			ForStatementBlock orig = (ForStatementBlock) sb;
			ForStatementBlock fsb = createForStatementBlockCopy(orig, true);
			ForStatement origstmt = (ForStatement) orig.getStatement(0);
			ForStatement fstmt = new ForStatement(); //only shallow
			fstmt.setPredicate(origstmt.getIterablePredicate());
			fsb.setStatements(CollectionUtils.asArrayList(fstmt));
			for( StatementBlock c : origstmt.getBody() )
				fstmt.addStatementBlock(rCreateDeepCopyStatementBlock(c));
			ret = fsb;
		}
		else {
			StatementBlock bsb = createStatementBlockCopy(sb, -1, true, true);
			for( Hop root : bsb.getHops() )
				if( root instanceof FunctionOp )
					((FunctionOp)root).setCallOptimized(false);
			ret = bsb;
		}
		return ret;
	}

	public static StatementBlock createStatementBlockCopy( StatementBlock sb, long pid, boolean plain, boolean forceDeepCopy )
	{
		StatementBlock ret = null;
		
		try
		{
			if( sb != null  //forced deep copy for function recompilation
				&& (Recompiler.requiresRecompilation( sb.getHops() ) || forceDeepCopy) )
			{
				//create new statement (shallow copy livein/liveout for recompile, line numbers for explain)
				ret = new StatementBlock();
				ret.setDMLProg(sb.getDMLProg());
				ret.setParseInfo(sb);
				ret.setLiveIn( sb.liveIn() );
				ret.setLiveOut( sb.liveOut() );
				ret.setUpdatedVariables( sb.variablesUpdated() );
				ret.setReadVariables( sb.variablesRead() );
				
				//deep copy hops dag for concurrent recompile
				ArrayList<Hop> hops = Recompiler.deepCopyHopsDag( sb.getHops() );
				if( !plain )
					Recompiler.updateFunctionNames( hops, pid );
				ret.setHops( hops );
				ret.updateRecompilationFlag();
				ret.setNondeterministic(sb.isNondeterministic());
			}
			else {
				ret = sb;
			}
		}
		catch( Exception ex ) {
			throw new DMLRuntimeException( ex );
		}
		
		return ret;
	}

	public static IfStatementBlock createIfStatementBlockCopy( IfStatementBlock sb, boolean forceDeepCopy ) 
	{
		IfStatementBlock ret = null;
		
		try
		{
			if( sb != null //forced deep copy for function recompile
				&& (Recompiler.requiresRecompilation( sb.getPredicateHops() ) || forceDeepCopy)  )
			{
				//create new statement (shallow copy livein/liveout for recompile, line numbers for explain)
				ret = new IfStatementBlock();
				ret.setDMLProg(sb.getDMLProg());
				ret.setParseInfo(sb);
				ret.setLiveIn( sb.liveIn() );
				ret.setLiveOut( sb.liveOut() );
				ret.setUpdatedVariables( sb.variablesUpdated() );
				ret.setReadVariables( sb.variablesRead() );
				
				//shallow copy child statements
				ret.setStatements( sb.getStatements() );
				
				//deep copy predicate hops dag for concurrent recompile
				Hop hops = Recompiler.deepCopyHopsDag( sb.getPredicateHops() );
				ret.setPredicateHops( hops );
				ret.updatePredicateRecompilationFlag();
				ret.setNondeterministic(sb.isNondeterministic());
			}
			else {
				ret = sb;
			}
		}
		catch( Exception ex ) {
			throw new DMLRuntimeException( ex );
		}
		
		return ret;
	}

	public static WhileStatementBlock createWhileStatementBlockCopy( WhileStatementBlock sb, boolean forceDeepCopy ) 
	{
		WhileStatementBlock ret = null;
		
		try
		{
			if( sb != null  //forced deep copy for function recompile
				&& (Recompiler.requiresRecompilation( sb.getPredicateHops() ) || forceDeepCopy)  )
			{
				//create new statement (shallow copy livein/liveout for recompile, line numbers for explain)
				ret = new WhileStatementBlock();
				ret.setDMLProg(sb.getDMLProg());
				ret.setParseInfo(sb);
				ret.setLiveIn( sb.liveIn() );
				ret.setLiveOut( sb.liveOut() );
				ret.setUpdatedVariables( sb.variablesUpdated() );
				ret.setReadVariables( sb.variablesRead() );
				ret.setUpdateInPlaceVars( sb.getUpdateInPlaceVars() );
				
				//shallow copy child statements
				ret.setStatements( sb.getStatements() );
				
				//deep copy predicate hops dag for concurrent recompile
				Hop hops = Recompiler.deepCopyHopsDag( sb.getPredicateHops() );
				ret.setPredicateHops( hops );
				ret.updatePredicateRecompilationFlag();
				ret.setNondeterministic(sb.isNondeterministic());
			}
			else {
				ret = sb;
			}
		}
		catch( Exception ex ) {
			throw new DMLRuntimeException( ex );
		}
		
		return ret;
	}

	public static ForStatementBlock createForStatementBlockCopy( ForStatementBlock sb, boolean forceDeepCopy ) 
	{
		ForStatementBlock ret = null;
		
		try
		{
			if( sb != null && (forceDeepCopy
				|| Recompiler.requiresRecompilation(sb.getFromHops())
				|| Recompiler.requiresRecompilation(sb.getToHops())
				|| Recompiler.requiresRecompilation(sb.getIncrementHops())) )
			{
				ret = (sb instanceof ParForStatementBlock) ? new ParForStatementBlock() : new ForStatementBlock();
				
				//create new statement (shallow copy livein/liveout for recompile, line numbers for explain)
				ret.setDMLProg(sb.getDMLProg());
				ret.setParseInfo(sb);
				ret.setLiveIn( sb.liveIn() );
				ret.setLiveOut( sb.liveOut() );
				ret.setUpdatedVariables( sb.variablesUpdated() );
				ret.setReadVariables( sb.variablesRead() );
				ret.setUpdateInPlaceVars( sb.getUpdateInPlaceVars() );
				
				//shallow copy child statements
				ret.setStatements( sb.getStatements() );
				
				//deep copy predicate hops dag for concurrent recompile
				//or on create full statement block copies
				ret.setFromHops( Recompiler.deepCopyHopsDag(sb.getFromHops()));
				ret.setToHops(Recompiler.deepCopyHopsDag(sb.getToHops()));
				if( sb.getIncrementHops() != null )
					ret.setIncrementHops(Recompiler.deepCopyHopsDag(sb.getIncrementHops()));
				
				ret.updatePredicateRecompilationFlags();
				ret.setNondeterministic(sb.isNondeterministic());
			}
			else {
				ret = sb;
			}
		}
		catch( Exception ex ) {
			throw new DMLRuntimeException( ex );
		}
		
		return ret;
	}
	
	
	////////////////////////////////
	// SERIALIZATION 
	////////////////////////////////

	public static String serializeSparkPSBody(SparkPSBody body, HashMap<String, byte[]> clsMap) {

		ExecutionContext ec = body.getEc();
		StringBuilder builder = new StringBuilder();
		builder.append(PSBODY_BEGIN);
		builder.append(NEWLINE);

		//handle DMLScript UUID (propagate original uuid for writing to scratch space)
		builder.append(DMLScript.getUUID());
		builder.append(COMPONENTS_DELIM);
		builder.append(NEWLINE);

		//handle DML config
		builder.append(ConfigurationManager.getDMLConfig().serializeDMLConfig());
		builder.append(COMPONENTS_DELIM);
		builder.append(NEWLINE);

		//handle additional configurations
		builder.append(CONF_STATS + "=" + DMLScript.STATISTICS);
		builder.append(COMPONENTS_DELIM);
		builder.append(NEWLINE);

		//handle program
		builder.append(PROG_BEGIN);
		builder.append(NEWLINE);
		builder.append(rSerializeFunctionProgramBlocks(ec.getProgram().getFunctionProgramBlocks(),
			new HashSet<>(ec.getProgram().getFunctionProgramBlocks().keySet()), clsMap));
		builder.append(PROG_END);
		builder.append(NEWLINE);
		builder.append(COMPONENTS_DELIM);
		builder.append(NEWLINE);

		//handle execution context
		builder.append(EC_BEGIN);
		builder.append(serializeExecutionContext(ec));
		builder.append(EC_END);
		builder.append(NEWLINE);
		builder.append(COMPONENTS_DELIM);
		builder.append(NEWLINE);

		//handle program blocks
		builder.append(PBS_BEGIN);
		builder.append(NEWLINE);
		builder.append(rSerializeProgramBlocks(ec.getProgram().getProgramBlocks(), clsMap));
		builder.append(PBS_END);
		builder.append(NEWLINE);
		builder.append(COMPONENTS_DELIM);
		builder.append(NEWLINE);

		builder.append(PSBODY_END);
		return builder.toString();
	}

	public static String serializeParForBody( ParForBody body ) {
		return serializeParForBody(body, new HashMap<String, byte[]>());
	}	
	
	public static String serializeParForBody( ParForBody body, HashMap<String,byte[]> clsMap ) 
	{
		ArrayList<ProgramBlock> pbs = body.getChildBlocks();
		ArrayList<ResultVar> rVnames = body.getResultVariables();
		ExecutionContext ec = body.getEc();
		
		if( pbs.isEmpty() )
			return PARFORBODY_BEGIN + PARFORBODY_END;
		
		Program prog = pbs.get( 0 ).getProgram();
		
		StringBuilder sb = new StringBuilder();
		sb.append( PARFORBODY_BEGIN );
		sb.append( NEWLINE );
		
		//handle DMLScript UUID (propagate original uuid for writing to scratch space)
		sb.append( DMLScript.getUUID() );
		sb.append( COMPONENTS_DELIM );
		sb.append( NEWLINE );
		
		//handle DML config
		sb.append( ConfigurationManager.getDMLConfig().serializeDMLConfig() );
		sb.append( COMPONENTS_DELIM );
		sb.append( NEWLINE );
		
		//handle additional configurations
		sb.append( CONF_STATS + "=" + DMLScript.STATISTICS );
		sb.append( COMPONENTS_DELIM );
		sb.append( NEWLINE );
		
		//handle program
		sb.append(PROG_BEGIN);
		sb.append( NEWLINE );
		sb.append( serializeProgram(prog, pbs, clsMap) );
		sb.append(PROG_END);
		sb.append( NEWLINE );
		sb.append( COMPONENTS_DELIM );
		sb.append( NEWLINE );
		
		//handle result variable names
		sb.append( serializeResultVariables(rVnames) );
		sb.append( COMPONENTS_DELIM );
		
		//handle execution context
		//note: this includes also the symbol table (serialize only the top-level variable map,
		//      (symbol tables for nested/child blocks are created at parse time, on the remote side)
		sb.append(EC_BEGIN);
		sb.append( serializeExecutionContext(ec) );
		sb.append(EC_END);
		sb.append( NEWLINE );
		sb.append( COMPONENTS_DELIM );
		sb.append( NEWLINE );
		
		//handle program blocks
		sb.append(PBS_BEGIN);
		sb.append( NEWLINE );
		sb.append( rSerializeProgramBlocks(pbs, clsMap) );
		sb.append(PBS_END);
		sb.append( NEWLINE );
		
		sb.append( PARFORBODY_END );
		
		return sb.toString();
	}

	private static String serializeProgram( Program prog, ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap ) {
		//note program contains variables, programblocks and function program blocks 
		//but in order to avoid redundancy, we only serialize function program blocks
		HashMap<String, FunctionProgramBlock> fpb = prog.getFunctionProgramBlocks();
		HashSet<String> cand = new HashSet<>();
		rFindSerializationCandidates(pbs, cand);
		return rSerializeFunctionProgramBlocks( fpb, cand, clsMap );
	}

	private static void rFindSerializationCandidates( ArrayList<ProgramBlock> pbs, HashSet<String> cand ) 
	{
		for( ProgramBlock pb : pbs )
		{
			if( pb instanceof WhileProgramBlock ) {
				WhileProgramBlock wpb = (WhileProgramBlock) pb;
				rFindSerializationCandidates(wpb.getChildBlocks(), cand );
			}
			else if ( pb instanceof ForProgramBlock || pb instanceof ParForProgramBlock ) {
				ForProgramBlock fpb = (ForProgramBlock) pb; 
				rFindSerializationCandidates(fpb.getChildBlocks(), cand);
			}
			else if ( pb instanceof IfProgramBlock ) {
				IfProgramBlock ipb = (IfProgramBlock) pb;
				rFindSerializationCandidates(ipb.getChildBlocksIfBody(), cand);
				if( ipb.getChildBlocksElseBody() != null )
					rFindSerializationCandidates(ipb.getChildBlocksElseBody(), cand);
			}
			else if( pb instanceof BasicProgramBlock ) { 
				BasicProgramBlock bpb = (BasicProgramBlock) pb;
				for( Instruction inst : bpb.getInstructions() )
					if( inst instanceof FunctionCallCPInstruction ) {
						FunctionCallCPInstruction fci = (FunctionCallCPInstruction) inst;
						String fkey = DMLProgram.constructFunctionKey(fci.getNamespace(), fci.getFunctionName());
						if( !cand.contains(fkey) ) { //memoization for multiple calls, recursion
							cand.add( fkey ); //add to candidates
							//investigate chains of function calls
							FunctionProgramBlock fpb = pb.getProgram().getFunctionProgramBlock(fci.getNamespace(), fci.getFunctionName());
							rFindSerializationCandidates(fpb.getChildBlocks(), cand);
						}
					}
			}
		}
	}

	private static String serializeVariables (LocalVariableMap vars) {
		StringBuilder sb = new StringBuilder();
		sb.append(VARS_BEGIN);
		sb.append( vars.serialize() );
		sb.append(VARS_END);
		return sb.toString();
	}
	
	public static String serializeDataObject(String key, Data dat) 
	{
		// SCHEMA: <name>|<datatype>|<valuetype>|value
		// (scalars are serialize by value, matrices by filename)
		StringBuilder sb = new StringBuilder();
		//prepare data for serialization
		String name = key;
		DataType datatype = dat.getDataType();
		ValueType valuetype = dat.getValueType();
		String value = null;
		String[] metaData = null;
		String[] listData = null;
		switch( datatype )
		{
			case SCALAR:
				ScalarObject so = (ScalarObject) dat;
				//name = so.getName();
				value = so.getStringValue();
				break;
			case MATRIX:
				MatrixObject mo = (MatrixObject) dat;
				MetaDataFormat md = (MetaDataFormat) dat.getMetaData();
				DataCharacteristics dc = md.getDataCharacteristics();
				value = mo.getFileName();
				PartitionFormat partFormat = (mo.getPartitionFormat()!=null) ? new PartitionFormat(
						mo.getPartitionFormat(),mo.getPartitionSize()) : PartitionFormat.NONE;
				metaData = new String[10];
				metaData[0] = String.valueOf( dc.getRows() );
				metaData[1] = String.valueOf( dc.getCols() );
				metaData[2] = String.valueOf( dc.getBlocksize() );
				metaData[3] = String.valueOf( dc.getNonZeros() );
				metaData[4] = md.getFileFormat().toString();
				metaData[5] = String.valueOf( partFormat );
				metaData[6] = String.valueOf( mo.getUpdateType() );
				metaData[7] = String.valueOf(mo.isHDFSFileExists());
				metaData[8] = String.valueOf(mo.isCleanupEnabled());
				break;
			case LIST:
				// SCHEMA: <name>|<datatype>|<valuetype>|value|<metadata>|<tab>element1<tab>element2<tab>element3 (this is the list)
				//         (for the element1) <listName-index>|<datatype>|<valuetype>|value
				//         (for the element2) <listName-index>|<datatype>|<valuetype>|value
				ListObject lo = (ListObject) dat;
				value = REF;
				metaData = new String[2];
				metaData[0] = String.valueOf(lo.getLength());
				metaData[1] = lo.getNames() == null ? EMPTY : serializeList(lo.getNames(), ELEMENT_DELIM2);
				listData = new String[lo.getLength()];
				for (int index = 0; index < lo.getLength(); index++) {
					listData[index] = serializeDataObject(name + DASH + index, lo.slice(index));
				}
				break;
			default:
				throw new DMLRuntimeException("Unable to serialize datatype "+datatype);
		}
		
		//serialize data
		sb.append(name);
		sb.append(DATA_FIELD_DELIM);
		sb.append(datatype);
		sb.append(DATA_FIELD_DELIM);
		sb.append(valuetype);
		sb.append(DATA_FIELD_DELIM);
		sb.append(value);
		if( metaData != null )
			for( int i=0; i<metaData.length; i++ ) {
				sb.append(DATA_FIELD_DELIM);
				sb.append(metaData[i]);
			}
		if (listData != null) {
			sb.append(DATA_FIELD_DELIM);
			for (String ld : listData) {
				sb.append(LIST_ELEMENT_DELIM);
				sb.append(ld);
			}
		}
		
		return sb.toString();
	}

	private static String serializeExecutionContext( ExecutionContext ec ) {
		return (ec != null) ? serializeVariables( ec.getVariables() ) : EMPTY;
	}

	@SuppressWarnings("all")
	private static String serializeInstructions( ArrayList<Instruction> inst, HashMap<String, byte[]> clsMap ) 
	{
		StringBuilder sb = new StringBuilder();
		int count = 0;
		for( Instruction linst : inst ) {
			//check that only cp instruction are transmitted 
			if( !( linst instanceof CPInstruction) )
				throw new DMLRuntimeException( NOT_SUPPORTED_SPARK_INSTRUCTION + " " +linst.getClass().getName()+"\n"+linst );
			
			//obtain serialized version of generated classes
			if( linst instanceof SpoofCPInstruction ) {
				Class<?> cla = ((SpoofCPInstruction) linst).getOperatorClass();
				clsMap.put(cla.getName(), CodegenUtils.getClassData(cla.getName()));
			}
			
			if( count > 0 )
				sb.append( ELEMENT_DELIM );
			
			sb.append( checkAndReplaceLiterals( linst.toString() ) );
			count++;
		}
		
		return sb.toString();
	}
	
	/**
	 * Replacement of internal delimiters occurring in literals of instructions
	 * in order to ensure robustness of serialization and parsing.
	 * (e.g. print( "a,b" ) would break the parsing of instruction that internally
	 * are separated with a "," )
	 * 
	 * @param instStr instruction string
	 * @return instruction string with replacements
	 */
	private static String checkAndReplaceLiterals( String instStr )
	{
		String tmp = instStr;
		
		//1) check own delimiters (very unlikely due to special characters)
		if( tmp.contains(COMPONENTS_DELIM) ) {
			tmp = tmp.replaceAll(COMPONENTS_DELIM, ".");
			LOG.warn("Replaced special literal character sequence "+COMPONENTS_DELIM+" with '.'");
		}
		
		if( tmp.contains(ELEMENT_DELIM) ) {
			tmp = tmp.replaceAll(ELEMENT_DELIM, ".");
			LOG.warn("Replaced special literal character sequence "+ELEMENT_DELIM+" with '.'");
		}
			
		if( tmp.contains( LEVELIN ) ){
			tmp = tmp.replaceAll(LEVELIN, "("); // '\\' required if LEVELIN='{' because regex
			LOG.warn("Replaced special literal character sequence "+LEVELIN+" with '('");
		}

		if( tmp.contains(LEVELOUT) ){
			tmp = tmp.replaceAll(LEVELOUT, ")");
			LOG.warn("Replaced special literal character sequence "+LEVELOUT+" with ')'");
		}
		
		//NOTE: DATA_FIELD_DELIM and KEY_VALUE_DELIM not required
		//because those literals cannot occur in critical places.
		
		//2) check end tag of CDATA
		if( tmp.contains(CDATA_END) ){
			tmp = tmp.replaceAll(CDATA_END, "."); //prevent XML parsing issues in job.xml
			LOG.warn("Replaced special literal character sequence "+ CDATA_END +" with '.'");
		}
		
		return tmp;
	}

	private static String serializeStringHashMap(HashMap<String,String> vars) {
		return serializeList(vars.entrySet().stream().map(e ->
			e.getKey()+KEY_VALUE_DELIM+e.getValue()).collect(Collectors.toList()));
	}

	public static String serializeResultVariables( List<ResultVar> vars) {
		return serializeList(vars.stream().map(v -> v._isAccum ?
			v._name+"+" : v._name).collect(Collectors.toList()));
	}
	
	public static String serializeList(List<String> elements) {
		return serializeList(elements, ELEMENT_DELIM);
	}
	
	public static String serializeList(List<String> elements, String delim) {
		return StringUtils.join(elements, delim);
	}

	private static String serializeDataIdentifiers(List<DataIdentifier> vars) {
		return serializeList(vars.stream().map(v ->
			serializeDataIdentifier(v)).collect(Collectors.toList()));
	}

	private static String serializeDataIdentifier( DataIdentifier dat ) {
		// SCHEMA: <name>|<datatype>|<valuetype>
		StringBuilder sb = new StringBuilder();
		sb.append(dat.getName());
		sb.append(DATA_FIELD_DELIM);
		sb.append(dat.getDataType());
		sb.append(DATA_FIELD_DELIM);
		sb.append(dat.getValueType());
		return sb.toString();
	}

	private static String rSerializeFunctionProgramBlocks(HashMap<String,FunctionProgramBlock> pbs, HashSet<String> cand, HashMap<String, byte[]> clsMap) {
		StringBuilder sb = new StringBuilder();
		int count = 0;
		for( Entry<String,FunctionProgramBlock> pb : pbs.entrySet() ) {
			if( !cand.contains(pb.getKey()) ) //skip function not included in the parfor body
				continue;
			if( count>0 ) {
				sb.append( ELEMENT_DELIM );
				sb.append( NEWLINE );
			}
			sb.append( pb.getKey() );
			sb.append( KEY_VALUE_DELIM );
			sb.append( rSerializeProgramBlock(pb.getValue(), clsMap) );
			count++;
		}
		sb.append(NEWLINE);
		return sb.toString();
	}

	private static String rSerializeProgramBlocks(ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap) {
		StringBuilder sb = new StringBuilder();
		int count = 0;
		for( ProgramBlock pb : pbs ) {
			if( count>0 ) {
				sb.append( ELEMENT_DELIM );
				sb.append(NEWLINE);
			}
			sb.append( rSerializeProgramBlock(pb, clsMap) );
			count++;
		}
		return sb.toString();
	}

	private static String rSerializeProgramBlock( ProgramBlock pb, HashMap<String, byte[]> clsMap ) {
		StringBuilder sb = new StringBuilder();
		
		//handle header
		if( pb instanceof WhileProgramBlock ) 
			sb.append(PB_WHILE);
		else if ( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock) )
			sb.append(PB_FOR);
		else if ( pb instanceof ParForProgramBlock )
			sb.append(PB_PARFOR);
		else if ( pb instanceof IfProgramBlock )
			sb.append(PB_IF);
		else if ( pb instanceof FunctionProgramBlock )
			sb.append(PB_FC);
		else //all generic program blocks
			sb.append(PB_BEGIN);
		
		//handle body
		if( pb instanceof WhileProgramBlock ) {
			WhileProgramBlock wpb = (WhileProgramBlock) pb;
			sb.append(INST_BEGIN);
			sb.append( serializeInstructions( wpb.getPredicate(), clsMap ) );
			sb.append(INST_END);
			sb.append( COMPONENTS_DELIM );
			sb.append(PBS_BEGIN);
			sb.append( rSerializeProgramBlocks( wpb.getChildBlocks(), clsMap) );
			sb.append(PBS_END);
		}
		else if ( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock ) ) {
			ForProgramBlock fpb = (ForProgramBlock) pb; 
			sb.append( fpb.getIterVar() );
			sb.append( COMPONENTS_DELIM );
			sb.append(INST_BEGIN);
			sb.append( serializeInstructions( fpb.getFromInstructions(), clsMap ) );
			sb.append(INST_END);
			sb.append( COMPONENTS_DELIM );
			sb.append(INST_BEGIN);
			sb.append( serializeInstructions(fpb.getToInstructions(), clsMap) );
			sb.append(INST_END);
			sb.append( COMPONENTS_DELIM );
			sb.append(INST_BEGIN);
			sb.append( serializeInstructions(fpb.getIncrementInstructions(), clsMap) );
			sb.append(INST_END);
			sb.append( COMPONENTS_DELIM );
			sb.append(PBS_BEGIN);
			sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) );
			sb.append(PBS_END);
		}
		else if ( pb instanceof ParForProgramBlock ) {
			ParForProgramBlock pfpb = (ParForProgramBlock) pb; 
			
			//check for nested remote ParFOR
			if( PExecMode.valueOf( pfpb.getParForParams().get( ParForStatementBlock.EXEC_MODE )) == PExecMode.REMOTE_SPARK )
				throw new DMLRuntimeException( NOT_SUPPORTED_SPARK_PARFOR );
			
			sb.append( pfpb.getIterVar() );
			sb.append( COMPONENTS_DELIM );
			sb.append( serializeResultVariables( pfpb.getResultVariables()) );
			sb.append( COMPONENTS_DELIM );
			sb.append( serializeStringHashMap( pfpb.getParForParams()) ); //parameters of nested parfor
			sb.append( COMPONENTS_DELIM );
			sb.append(INST_BEGIN);
			sb.append( serializeInstructions(pfpb.getFromInstructions(), clsMap) );
			sb.append(INST_END);
			sb.append( COMPONENTS_DELIM );
			sb.append(INST_BEGIN);
			sb.append( serializeInstructions(pfpb.getToInstructions(), clsMap) );
			sb.append(INST_END);
			sb.append( COMPONENTS_DELIM );
			sb.append(INST_BEGIN);
			sb.append( serializeInstructions(pfpb.getIncrementInstructions(), clsMap) );
			sb.append(INST_END);
			sb.append( COMPONENTS_DELIM );
			sb.append(PBS_BEGIN);
			sb.append( rSerializeProgramBlocks( pfpb.getChildBlocks(), clsMap ) );
			sb.append(PBS_END);
		}
		else if ( pb instanceof IfProgramBlock ) {
			IfProgramBlock ipb = (IfProgramBlock) pb;
			sb.append(INST_BEGIN);
			sb.append( serializeInstructions(ipb.getPredicate(), clsMap) );
			sb.append(INST_END);
			sb.append( COMPONENTS_DELIM );
			sb.append(PBS_BEGIN);
			sb.append( rSerializeProgramBlocks(ipb.getChildBlocksIfBody(), clsMap) );
			sb.append(PBS_END);
			sb.append( COMPONENTS_DELIM );
			sb.append(PBS_BEGIN);
			sb.append( rSerializeProgramBlocks(ipb.getChildBlocksElseBody(), clsMap) );
			sb.append(PBS_END);
		}
		else if( pb instanceof FunctionProgramBlock ) {
			FunctionProgramBlock fpb = (FunctionProgramBlock) pb;
			sb.append( serializeDataIdentifiers( fpb.getInputParams() ) );
			sb.append( COMPONENTS_DELIM );
			sb.append( serializeDataIdentifiers( fpb.getOutputParams() ) );
			sb.append( COMPONENTS_DELIM );
			sb.append(PBS_BEGIN);
			sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) );
			sb.append(PBS_END);
			sb.append( COMPONENTS_DELIM );
		}
		else if( pb instanceof BasicProgramBlock ) {
			BasicProgramBlock bpb = (BasicProgramBlock) pb;
			sb.append(INST_BEGIN);
			sb.append( serializeInstructions(
				bpb.getInstructions(), clsMap) );
			sb.append(INST_END);
		}
		
		//handle end
		sb.append(PB_END);
		
		return sb.toString();
	}

	
	////////////////////////////////
	// PARSING 
	////////////////////////////////

	public static SparkPSBody parseSparkPSBody(String in, int id) {
		SparkPSBody body = new SparkPSBody();

		//header elimination
		String tmpin = in.replaceAll(NEWLINE, ""); //normalization
		tmpin = tmpin.substring(PSBODY_BEGIN.length(), tmpin.length() - PSBODY_END.length()); //remove start/end
		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(tmpin, COMPONENTS_DELIM);

		//handle DMLScript UUID (NOTE: set directly in DMLScript)
		//(master UUID is used for all nodes (in order to simply cleanup))
		DMLScript.setUUID(st.nextToken());

		//handle DML config (NOTE: set directly in ConfigurationManager)
		handleDMLConfig(st.nextToken());

		//handle additional configs
		parseAndSetAdditionalConfigurations(st.nextToken());

		//handle program
		Program prog = parseProgram(st.nextToken(), id);

		//handle execution context
		ExecutionContext ec = parseExecutionContext(st.nextToken(), prog);
		ec.setProgram(prog);

		//handle program blocks
		String spbs = st.nextToken();
		ArrayList<ProgramBlock> pbs = rParseProgramBlocks(spbs, prog, id);
		prog.getProgramBlocks().addAll(pbs);

		body.setEc(ec);
		return body;
	}

	public static ParForBody parseParForBody( String in, int id ) {
		return parseParForBody(in, id, false);
	}
	
	public static ParForBody parseParForBody( String in, int id, boolean inSpark ) {
		ParForBody body = new ParForBody();
		
		//header elimination
		String tmpin = in.replaceAll(NEWLINE, ""); //normalization
		tmpin = tmpin.substring(PARFORBODY_BEGIN.length(),tmpin.length()-PARFORBODY_END.length()); //remove start/end
		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(tmpin, COMPONENTS_DELIM);
		
		//handle DMLScript UUID (NOTE: set directly in DMLScript)
		//(master UUID is used for all nodes (in order to simply cleanup))
		DMLScript.setUUID( st.nextToken() );
		
		//handle DML config (NOTE: set directly in ConfigurationManager)
		String confStr = st.nextToken();
		JobConf job = ConfigurationManager.getCachedJobConf();
		if( !InfrastructureAnalyzer.isLocalMode(job) ) {
			handleDMLConfig(confStr);
		}
		
		//handle additional configs
		String aconfs = st.nextToken();
		if( !inSpark )
			parseAndSetAdditionalConfigurations( aconfs );
		
		//handle program
		String progStr = st.nextToken();
		Program prog = parseProgram( progStr, id ); 
		
		//handle result variable names
		String rvarStr = st.nextToken();
		ArrayList<ResultVar> rvars = parseResultVariables(rvarStr);
		body.setResultVariables(rvars);
		
		//handle execution context
		String ecStr = st.nextToken();
		ExecutionContext ec = parseExecutionContext( ecStr, prog );
		
		//handle program blocks
		String spbs = st.nextToken();
		ArrayList<ProgramBlock> pbs = rParseProgramBlocks(spbs, prog, id);
		
		body.setChildBlocks( pbs );
		body.setEc( ec );
		
		return body;
	}

	private static void handleDMLConfig(String confStr) {
		if(confStr != null && !confStr.trim().isEmpty()) {
			DMLConfig dmlconf = DMLConfig.parseDMLConfig(confStr);
			CompilerConfig cconf = OptimizerUtils.constructCompilerConfig(dmlconf);
			ConfigurationManager.setLocalConfig(dmlconf);
			ConfigurationManager.setLocalConfig(cconf);
		}
	}

	public static Program parseProgram( String in, int id ) {
		String lin = in.substring( PROG_BEGIN.length(),in.length()- PROG_END.length()).trim();
		Program prog = new Program();
		HashMap<String,FunctionProgramBlock> fc = parseFunctionProgramBlocks(lin, prog, id);
		for( Entry<String,FunctionProgramBlock> e : fc.entrySet() ) {
			String[] keypart = e.getKey().split( Program.KEY_DELIM );
			String namespace = keypart[0];
			String name      = keypart[1];
			prog.addFunctionProgramBlock(namespace, name, e.getValue());
		}
		return prog;
	}

	private static LocalVariableMap parseVariables(String in) {
		LocalVariableMap ret = null;
		if( in.length()> VARS_BEGIN.length() + VARS_END.length()) {
			String varStr = in.substring( VARS_BEGIN.length(),in.length()- VARS_END.length()).trim();
			ret = LocalVariableMap.deserialize(varStr);
		}
		else { //empty input symbol table
			ret = new LocalVariableMap();
		}
		return ret;
	}
	
	private static HashMap<String,FunctionProgramBlock> parseFunctionProgramBlocks( String in, Program prog, int id ) {
		HashMap<String,FunctionProgramBlock> ret = new HashMap<>();
		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer( in, ELEMENT_DELIM );
		while( st.hasMoreTokens() ) {
			String lvar  = st.nextToken(); //with ID = CP_CHILD_THREAD+id for current use
			//put first copy into prog (for direct use)
			int index = lvar.indexOf( KEY_VALUE_DELIM );
			String tmp1 = lvar.substring(0, index); // + CP_CHILD_THREAD+id;
			String tmp2 = lvar.substring(index + 1);
			ret.put(tmp1, (FunctionProgramBlock)rParseProgramBlock(tmp2, prog, id));
		}
		return ret;
	}

	private static ArrayList<ProgramBlock> rParseProgramBlocks(String in, Program prog, int id) {
		ArrayList<ProgramBlock> pbs = new ArrayList<>();
		String tmpdata = in.substring(PBS_BEGIN.length(),in.length()- PBS_END.length());
		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(tmpdata, ELEMENT_DELIM);
		while( st.hasMoreTokens() )
			pbs.add( rParseProgramBlock( st.nextToken(), prog, id ) );
		return pbs;
	}

	private static ProgramBlock rParseProgramBlock( String in, Program prog, int id ) {
		ProgramBlock pb = null;
		if( in.startsWith(PB_WHILE) )
			pb = rParseWhileProgramBlock( in, prog, id );
		else if ( in.startsWith(PB_FOR) )
			pb = rParseForProgramBlock( in, prog, id );
		else if ( in.startsWith(PB_PARFOR) )
			pb = rParseParForProgramBlock( in, prog, id );
		else if ( in.startsWith(PB_IF) )
			pb = rParseIfProgramBlock( in, prog, id );
		else if ( in.startsWith(PB_FC) )
			pb = rParseFunctionProgramBlock( in, prog, id );
 		else if ( in.startsWith(PB_BEGIN) )
			pb = rParseGenericProgramBlock( in, prog, id );
		else 
			throw new DMLRuntimeException( NOT_SUPPORTED_PB+" "+in );
		return pb;
	}

	private static WhileProgramBlock rParseWhileProgramBlock( String in, Program prog, int id ) {
		String lin = in.substring( PB_WHILE.length(),in.length()- PB_END.length());
		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
		
		//predicate instructions
		ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id);
		
		//program blocks
		ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id);
		
		WhileProgramBlock wpb = new WhileProgramBlock(prog,inst);
		wpb.setChildBlocks(pbs);
		return wpb;
	}

	private static ForProgramBlock rParseForProgramBlock( String in, Program prog, int id ) {
		String lin = in.substring( PB_FOR.length(),in.length()- PB_END.length());
		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
		
		//inputs
		String iterVar = st.nextToken();
		
		//instructions
		ArrayList<Instruction> from = parseInstructions(st.nextToken(),id);
		ArrayList<Instruction> to = parseInstructions(st.nextToken(),id);
		ArrayList<Instruction> incr = parseInstructions(st.nextToken(),id);
		
		//program blocks
		ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id);
		
		ForProgramBlock fpb = new ForProgramBlock(prog, iterVar);
		fpb.setFromInstructions(from);
		fpb.setToInstructions(to);
		fpb.setIncrementInstructions(incr);
		fpb.setChildBlocks(pbs);
		
		return fpb;
	}

	private static ParForProgramBlock rParseParForProgramBlock( String in, Program prog, int id ) {
		String lin = in.substring( PB_PARFOR.length(),in.length()- PB_END.length());
		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
		
		//inputs
		String iterVar = st.nextToken();
		ArrayList<ResultVar> resultVars = parseResultVariables(st.nextToken());
		HashMap<String,String> params = parseStringHashMap(st.nextToken());
		
		//instructions 
		ArrayList<Instruction> from = parseInstructions(st.nextToken(), 0);
		ArrayList<Instruction> to = parseInstructions(st.nextToken(), 0);
		ArrayList<Instruction> incr = parseInstructions(st.nextToken(), 0);
		
		//program blocks //reset id to preinit state, replaced during exec
		ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, 0); 
		
		ParForProgramBlock pfpb = new ParForProgramBlock(id, prog, iterVar, params, resultVars);
		pfpb.disableOptimization(); //already done in top-level parfor
		pfpb.setFromInstructions(from);
		pfpb.setToInstructions(to);
		pfpb.setIncrementInstructions(incr);
		pfpb.setChildBlocks(pbs);
		
		return pfpb;
	}

	private static IfProgramBlock rParseIfProgramBlock( String in, Program prog, int id ) {
		String lin = in.substring( PB_IF.length(),in.length()- PB_END.length());
		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
		
		//predicate instructions
		ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id);
		
		//program blocks: if and else
		ArrayList<ProgramBlock> pbs1 = rParseProgramBlocks(st.nextToken(), prog, id);
		ArrayList<ProgramBlock> pbs2 = rParseProgramBlocks(st.nextToken(), prog, id);
		
		IfProgramBlock ipb = new IfProgramBlock(prog,inst);
		ipb.setChildBlocksIfBody(pbs1);
		ipb.setChildBlocksElseBody(pbs2);
		
		return ipb;
	}

	private static FunctionProgramBlock rParseFunctionProgramBlock( String in, Program prog, int id ) {
		String lin = in.substring( PB_FC.length(),in.length()- PB_END.length());
		HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM);
		
		//inputs and outputs
		ArrayList<DataIdentifier> dat1 = parseDataIdentifiers(st.nextToken());
		ArrayList<DataIdentifier> dat2 = parseDataIdentifiers(st.nextToken());

		//program blocks
		ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id);

		ArrayList<DataIdentifier> tmp1 = new ArrayList<>(dat1);
		ArrayList<DataIdentifier> tmp2 = new ArrayList<>(dat2);
		FunctionProgramBlock fpb = new FunctionProgramBlock(prog, tmp1, tmp2);
		fpb.setChildBlocks(pbs);
		
		return fpb;
	}

	private static ProgramBlock rParseGenericProgramBlock( String in, Program prog, int id ) {
		String lin = in.substring( PB_BEGIN.length(),in.length()- PB_END.length());
		StringTokenizer st = new StringTokenizer(lin,COMPONENTS_DELIM);
		BasicProgramBlock pb = new BasicProgramBlock(prog);
		pb.setInstructions(parseInstructions(st.nextToken(),id));
		return pb;
	}

	private static ArrayList<Instruction> parseInstructions( String in, int id ) {
		ArrayList<Instruction> insts = new ArrayList<>();
		String lin = in.substring( INST_BEGIN.length(),in.length()- INST_END.length());
		StringTokenizer st = new StringTokenizer(lin, ELEMENT_DELIM);
		while(st.hasMoreTokens()) {
			//Note that at this point only CP instructions and External function instruction can occur
			String instStr = st.nextToken();
			try {
				Instruction tmpinst = CPInstructionParser.parseSingleInstruction(instStr);
				tmpinst = saveReplaceThreadID(tmpinst, Lop.CP_ROOT_THREAD_ID, Lop.CP_CHILD_THREAD+id );
				insts.add( tmpinst );
			}
			catch(Exception ex) {
				throw new DMLRuntimeException("Failed to parse instruction: " + instStr, ex);
			}
		}
		return insts;
	}
	
	private static ArrayList<ResultVar> parseResultVariables(String in) {
		ArrayList<ResultVar> ret = new ArrayList<>();
		for(String var : parseStringArrayList(in)) {
			boolean accum = var.endsWith("+");
			ret.add(new ResultVar(accum ? var.substring(0, var.length()-1) : var, accum));
		}
		return ret;
	}

	private static HashMap<String,String> parseStringHashMap( String in ) {
		HashMap<String,String> vars = new HashMap<>();
		StringTokenizer st = new StringTokenizer(in,ELEMENT_DELIM);
		while( st.hasMoreTokens() ) {
			String lin = st.nextToken();
			int index = lin.indexOf( KEY_VALUE_DELIM );
			String tmp1 = lin.substring(0, index);
			String tmp2 = lin.substring(index + 1);
			vars.put(tmp1, tmp2);
		}
		return vars;
	}
	
	private static ArrayList<String> parseStringArrayList(String in) {
		return parseStringArrayList(in, ELEMENT_DELIM);
	}

	private static ArrayList<String> parseStringArrayList(String in, String delim) {
		StringTokenizer st = new StringTokenizer(in, delim);
		ArrayList<String> vars = new ArrayList<>(st.countTokens());
		while( st.hasMoreTokens() )
			vars.add(st.nextToken());
		return vars;
	}

	private static ArrayList<DataIdentifier> parseDataIdentifiers( String in ) {
		ArrayList<DataIdentifier> vars = new ArrayList<>();
		StringTokenizer st = new StringTokenizer(in, ELEMENT_DELIM);
		while( st.hasMoreTokens() ) {
			String tmp = st.nextToken();
			DataIdentifier dat = parseDataIdentifier( tmp );
			vars.add(dat);
		}
		return vars;
	}

	private static DataIdentifier parseDataIdentifier( String in ) {
		StringTokenizer st = new StringTokenizer(in, DATA_FIELD_DELIM);
		DataIdentifier dat = new DataIdentifier(st.nextToken());
		dat.setDataType(DataType.valueOf(st.nextToken()));
		dat.setValueType(ValueType.valueOf(st.nextToken()));
		return dat;
	}
	
	/**
	 * NOTE: MRJobConfiguration cannot be used for the general case because program blocks and
	 * related symbol tables can be hierarchically structured.
	 * 
	 * @param in data object as string
	 * @return array of objects
	 */
	public static Object[] parseDataObject(String in) {
		Object[] ret = new Object[2];
	
		StringTokenizer st = new StringTokenizer(in, DATA_FIELD_DELIM );
		String name = st.nextToken();
		DataType datatype = DataType.valueOf( st.nextToken() );
		ValueType valuetype = ValueType.valueOf( st.nextToken() );
		String valString = st.hasMoreTokens() ? st.nextToken() : "";
		Data dat = null;
		switch( datatype )
		{
			case SCALAR: {
				switch ( valuetype ) {
					case INT64:     dat = new IntObject(Long.parseLong(valString)); break;
					case FP64:  dat = new DoubleObject(Double.parseDouble(valString)); break;
					case BOOLEAN: dat = new BooleanObject(Boolean.parseBoolean(valString)); break;
					case STRING:  dat = new StringObject(valString); break;
					default:
						throw new DMLRuntimeException("Unable to parse valuetype "+valuetype);
				}
				break;
			}
			case MATRIX: {
				MatrixObject mo = new MatrixObject(valuetype,valString);
				long rows = Long.parseLong( st.nextToken() );
				long cols = Long.parseLong( st.nextToken() );
				int blen = Integer.parseInt( st.nextToken() );
				long nnz = Long.parseLong( st.nextToken() );
				FileFormat fmt = FileFormat.safeValueOf(st.nextToken());
				PartitionFormat partFormat = PartitionFormat.valueOf( st.nextToken() );
				UpdateType inplace = UpdateType.valueOf( st.nextToken() );
				MatrixCharacteristics mc = new MatrixCharacteristics(rows, cols, blen, nnz);
				MetaDataFormat md = new MetaDataFormat(mc, fmt);
				mo.setMetaData( md );
				if( partFormat._dpf != PDataPartitionFormat.NONE )
					mo.setPartitioned( partFormat._dpf, partFormat._N );
				mo.setUpdateType(inplace);
				mo.setHDFSFileExists(Boolean.valueOf(st.nextToken()));
				mo.enableCleanup(Boolean.valueOf(st.nextToken()));
				dat = mo;
				break;
			}
			case LIST:
				int size = Integer.parseInt(st.nextToken());
				String namesStr = st.nextToken();
				List<String> names = namesStr.equals(EMPTY) ? null :
					parseStringArrayList(namesStr, ELEMENT_DELIM2);
				List<Data> data = new ArrayList<>(size);
				st.nextToken(LIST_ELEMENT_DELIM);
				for (int i = 0; i < size; i++) {
					String dataStr = st.nextToken();
					Object[] obj = parseDataObject(dataStr);
					data.add((Data) obj[1]);
				}
				dat = new ListObject(data, names);
				break;
			default:
				throw new DMLRuntimeException("Unable to parse datatype "+datatype);
		}
		
		ret[0] = name;
		ret[1] = dat;
		return ret;
	}

	private static ExecutionContext parseExecutionContext(String in, Program prog) {
		ExecutionContext ec = null;
		String lin = in.substring(EC_BEGIN.length(),in.length()- EC_END.length()).trim();
		if( !lin.equals( EMPTY ) ) {
			LocalVariableMap vars = parseVariables(lin);
			ec = ExecutionContextFactory.createContext( false, prog );
			ec.setVariables(vars);
		}
		return ec;
	}
	
	private static void parseAndSetAdditionalConfigurations(String conf) {
		String[] statsFlag = conf.split("=");
		DMLScript.STATISTICS = Boolean.parseBoolean(statsFlag[1]);
	}

	//////////
	// CUSTOM SAFE LITERAL REPLACEMENT
	
	/**
	 * In-place replacement of thread ids in filenames, functions names etc
	 * 
	 * @param inst instruction
	 * @param pattern ?
	 * @param replacement string replacement
	 * @return instruction
	 */
	private static Instruction saveReplaceThreadID( Instruction inst, String pattern, String replacement ) {
		if ( inst instanceof VariableCPInstruction ) { //createvar, setfilename
			//update in-memory representation
			inst.updateInstructionThreadID(pattern, replacement);
		}
		//NOTE> //Rand, seq in CP not required
		return inst;
	}
	
	public static String saveReplaceFilenameThreadID(String fname, String pattern, String replace) {
		//save replace necessary in order to account for the possibility that read variables have our prefix in the absolute path
		//replace the last match only, because (1) we have at most one _t0 and (2) always concatenated to the end.
		int pos = fname.lastIndexOf(pattern);
		return ( pos < 0 ) ? fname : fname.substring(0, pos)
			+ replace + fname.substring(pos+pattern.length());
	}
	
	
	//////////
	// CUSTOM HIERARCHICAL TOKENIZER
	
	
	/**
	 * Custom StringTokenizer for splitting strings of hierarchies. The basic idea is to
	 * search for delim-Strings on the same hierarchy level, while delims of lower hierarchy
	 * levels are skipped.  
	 * 
	 */
	private static class HierarchyAwareStringTokenizer //extends StringTokenizer
	{
		private String _str = null;
		private String _del = null;
		private int    _off = -1;
		
		public HierarchyAwareStringTokenizer( String in, String delim ) {
			//super(in);
			_str = in;
			_del = delim;
			_off = delim.length();
		}

		public boolean hasMoreTokens() {
			return (_str.length() > 0);
		}

		public String nextToken() {
			int nextDelim = determineNextSameLevelIndexOf(_str, _del);
			String token = null;
			if(nextDelim < 0) {
				nextDelim = _str.length();
				_off = 0;
			}
			token = _str.substring(0,nextDelim);
			_str = _str.substring( nextDelim + _off );
			return token;
		}
		
		private static int determineNextSameLevelIndexOf( String data, String pattern  )
		{
			String tmpdata = data;
			int index      = 0;
			int count      = 0;
			int off=0,i1,i2,i3,min;
			
			while(true) {
				i1 = tmpdata.indexOf(pattern);
				i2 = tmpdata.indexOf(LEVELIN);
				i3 = tmpdata.indexOf(LEVELOUT);
				
				if( i1 < 0 ) return i1; //no pattern found at all
				
				min = i1; //min >= 0 by definition
				if( i2 >= 0 ) min = Math.min(min, i2);
				if( i3 >= 0 ) min = Math.min(min, i3);
				
				//stack maintenance
				if( i1 == min && count == 0 )
					return index+i1;
				else if( i2 == min ) {
					count++;
					off = LEVELIN.length();
				}
				else if( i3 == min ) {
					count--;
					off = LEVELOUT.length();
				}
			
				//prune investigated string
				index += min+off;
				tmpdata = tmpdata.substring(min+off);
			}
		}
	}
}
