| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.sysds.runtime.util; |
| |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.sysds.api.DMLScript; |
| import org.apache.sysds.common.Types.DataType; |
| import org.apache.sysds.common.Types.FileFormat; |
| import org.apache.sysds.common.Types.ValueType; |
| import org.apache.sysds.conf.CompilerConfig; |
| import org.apache.sysds.conf.CompilerConfig.ConfigType; |
| import org.apache.sysds.conf.ConfigurationManager; |
| import org.apache.sysds.conf.DMLConfig; |
| import org.apache.sysds.hops.FunctionOp; |
| import org.apache.sysds.hops.Hop; |
| import org.apache.sysds.hops.OptimizerUtils; |
| import org.apache.sysds.hops.recompile.Recompiler; |
| import org.apache.sysds.lops.Lop; |
| import org.apache.sysds.parser.DMLProgram; |
| import org.apache.sysds.parser.DataIdentifier; |
| import org.apache.sysds.parser.ForStatement; |
| import org.apache.sysds.parser.ForStatementBlock; |
| import org.apache.sysds.parser.FunctionStatement; |
| import org.apache.sysds.parser.FunctionStatementBlock; |
| import org.apache.sysds.parser.IfStatement; |
| import org.apache.sysds.parser.IfStatementBlock; |
| import org.apache.sysds.parser.ParForStatement; |
| import org.apache.sysds.parser.ParForStatementBlock; |
| import org.apache.sysds.parser.ParForStatementBlock.ResultVar; |
| import org.apache.sysds.parser.StatementBlock; |
| import org.apache.sysds.parser.WhileStatement; |
| import org.apache.sysds.parser.WhileStatementBlock; |
| import org.apache.sysds.runtime.DMLRuntimeException; |
| import org.apache.sysds.runtime.codegen.CodegenUtils; |
| import org.apache.sysds.runtime.controlprogram.BasicProgramBlock; |
| import org.apache.sysds.runtime.controlprogram.ForProgramBlock; |
| import org.apache.sysds.runtime.controlprogram.FunctionProgramBlock; |
| import org.apache.sysds.runtime.controlprogram.IfProgramBlock; |
| import org.apache.sysds.runtime.controlprogram.LocalVariableMap; |
| import org.apache.sysds.runtime.controlprogram.ParForProgramBlock; |
| import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; |
| import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PExecMode; |
| import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PartitionFormat; |
| import org.apache.sysds.runtime.controlprogram.Program; |
| import org.apache.sysds.runtime.controlprogram.ProgramBlock; |
| import org.apache.sysds.runtime.controlprogram.WhileProgramBlock; |
| import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; |
| import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType; |
| import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; |
| import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory; |
| import org.apache.sysds.runtime.controlprogram.paramserv.SparkPSBody; |
| import org.apache.sysds.runtime.controlprogram.parfor.ParForBody; |
| import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; |
| import org.apache.sysds.runtime.instructions.CPInstructionParser; |
| import org.apache.sysds.runtime.instructions.Instruction; |
| import org.apache.sysds.runtime.instructions.InstructionParser; |
| import org.apache.sysds.runtime.instructions.cp.BooleanObject; |
| import org.apache.sysds.runtime.instructions.cp.CPInstruction; |
| import org.apache.sysds.runtime.instructions.cp.Data; |
| import org.apache.sysds.runtime.instructions.cp.DoubleObject; |
| import org.apache.sysds.runtime.instructions.cp.FunctionCallCPInstruction; |
| import org.apache.sysds.runtime.instructions.cp.IntObject; |
| import org.apache.sysds.runtime.instructions.cp.ListObject; |
| import org.apache.sysds.runtime.instructions.cp.ScalarObject; |
| import org.apache.sysds.runtime.instructions.cp.SpoofCPInstruction; |
| import org.apache.sysds.runtime.instructions.cp.StringObject; |
| import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction; |
| import org.apache.sysds.runtime.instructions.gpu.GPUInstruction; |
| import org.apache.sysds.runtime.instructions.spark.SPInstruction; |
| import org.apache.sysds.runtime.lineage.Lineage; |
| import org.apache.sysds.runtime.matrix.data.MatrixBlock; |
| import org.apache.sysds.runtime.meta.DataCharacteristics; |
| import org.apache.sysds.runtime.meta.MatrixCharacteristics; |
| import org.apache.sysds.runtime.meta.MetaDataFormat; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.StringTokenizer; |
| import java.util.stream.Collectors; |
| |
| /** |
| * Program converter functionalities for |
| * (1) creating deep copies of program blocks, instructions, function program blocks, and |
| * (2) serializing and parsing of programs, program blocks, functions program blocks. |
| * |
| */ |
| //TODO: rewrite class to instance-based invocation (grown gradually and now inappropriate design) |
| public class ProgramConverter |
| { |
| protected static final Log LOG = LogFactory.getLog(ProgramConverter.class.getName()); |
| |
| //use escaped unicodes for separators in order to prevent string conflict |
| public static final String NEWLINE = "\n"; //System.lineSeparator(); |
| public static final String COMPONENTS_DELIM = "\u236e"; //semicolon w/ bar; ";"; |
| public static final String ELEMENT_DELIM = "\u236a"; //comma w/ bar; ","; |
| public static final String ELEMENT_DELIM2 = ","; |
| public static final String DATA_FIELD_DELIM = "\u007c"; //"|"; |
| public static final String KEY_VALUE_DELIM = "\u003d"; //"="; |
| public static final String LEVELIN = "\u23a8"; //variant of left curly bracket; "\u007b"; //"{"; |
| public static final String LEVELOUT = "\u23ac"; //variant of right curly bracket; "\u007d"; //"}"; |
| public static final String EMPTY = "null"; |
| public static final String DASH = "-"; |
| public static final String REF = "ref"; |
| public static final String LIST_ELEMENT_DELIM = "\t"; |
| |
| public static final String CDATA_BEGIN = "<![CDATA["; |
| public static final String CDATA_END = " ]]>"; |
| |
| public static final String PROG_BEGIN = " PROG" + LEVELIN; |
| public static final String PROG_END = LEVELOUT; |
| public static final String VARS_BEGIN = "VARS: "; |
| public static final String VARS_END = ""; |
| public static final String PBS_BEGIN = " PBS" + LEVELIN; |
| public static final String PBS_END = LEVELOUT; |
| public static final String INST_BEGIN = " INST: "; |
| public static final String INST_END = ""; |
| public static final String EC_BEGIN = " EC: "; |
| public static final String EC_END = ""; |
| public static final String PB_BEGIN = " PB" + LEVELIN; |
| public static final String PB_END = LEVELOUT; |
| public static final String PB_WHILE = " WHILE" + LEVELIN; |
| public static final String PB_FOR = " FOR" + LEVELIN; |
| public static final String PB_PARFOR = " PARFOR" + LEVELIN; |
| public static final String PB_IF = " IF" + LEVELIN; |
| public static final String PB_FC = " FC" + LEVELIN; |
| public static final String PB_EFC = " EFC" + LEVELIN; |
| |
| public static final String CONF_STATS = "stats"; |
| |
| // Used for parfor |
| public static final String PARFORBODY_BEGIN = CDATA_BEGIN + "PARFORBODY" + LEVELIN; |
| public static final String PARFORBODY_END = LEVELOUT + CDATA_END; |
| |
| // Used for paramserv builtin function |
| public static final String PSBODY_BEGIN = CDATA_BEGIN + "PSBODY" + LEVELIN; |
| public static final String PSBODY_END = LEVELOUT + CDATA_END; |
| |
| //exception msgs |
| public static final String NOT_SUPPORTED_EXTERNALFUNCTION_PB = "Not supported: ExternalFunctionProgramBlock contains MR instructions. " + |
| "(ExternalFunctionPRogramBlockCP can be used)"; |
| public static final String NOT_SUPPORTED_SPARK_INSTRUCTION = "Not supported: Instructions of type other than CP instructions"; |
| public static final String NOT_SUPPORTED_SPARK_PARFOR = "Not supported: Nested ParFOR REMOTE_SPARK due to possible deadlocks." + |
| "(LOCAL can be used for innner ParFOR)"; |
| public static final String NOT_SUPPORTED_PB = "Not supported: type of program block"; |
| |
| //////////////////////////////// |
| // CREATION of DEEP COPIES |
| //////////////////////////////// |
| |
| /** |
| * Creates a deep copy of the given execution context. |
| * For rt_platform=Hadoop, execution context has a symbol table. |
| * |
| * @param ec execution context |
| * @return execution context |
| * @throws CloneNotSupportedException if CloneNotSupportedException occurs |
| */ |
| public static ExecutionContext createDeepCopyExecutionContext(ExecutionContext ec) |
| throws CloneNotSupportedException |
| { |
| ExecutionContext cpec = ExecutionContextFactory.createContext(false, ec.getProgram()); |
| cpec.setVariables((LocalVariableMap) ec.getVariables().clone()); |
| if( ec.getLineage() != null ) |
| cpec.setLineage(new Lineage(ec.getLineage())); |
| |
| //handle result variables with in-place update flag |
| //(each worker requires its own copy of the empty matrix object) |
| for( String var : cpec.getVariables().keySet() ) { |
| Data dat = cpec.getVariables().get(var); |
| if( dat instanceof MatrixObject && ((MatrixObject)dat).getUpdateType().isInPlace() ) { |
| MatrixObject mo = (MatrixObject)dat; |
| MatrixObject moNew = new MatrixObject(mo); |
| if( mo.getNnz() != 0 ){ |
| // If output matrix is not empty (NNZ != 0), then local copy is created so that |
| // update in place operation can be applied. |
| MatrixBlock mbVar = mo.acquireRead(); |
| moNew.acquireModify (new MatrixBlock(mbVar)); |
| mo.release(); |
| } else { |
| //create empty matrix block w/ dense representation (preferred for update in-place) |
| //Creating a dense matrix block is valid because empty block not allocated and transfer |
| // to sparse representation happens in left indexing in place operation. |
| moNew.acquireModify(new MatrixBlock((int)mo.getNumRows(), (int)mo.getNumColumns(), false)); |
| } |
| moNew.release(); |
| cpec.setVariable(var, moNew); |
| } |
| } |
| |
| return cpec; |
| } |
| |
| /** |
| * This recursively creates a deep copy of program blocks and transparently replaces filenames according to the |
| * specified parallel worker in order to avoid conflicts between parworkers. This happens recursively in order |
| * to support arbitrary control-flow constructs within a parfor. |
| * |
| * @param childBlocks child program blocks |
| * @param pid ? |
| * @param IDPrefix ? |
| * @param fnStack ? |
| * @param fnCreated ? |
| * @param plain if true, full deep copy without id replacement |
| * @param forceDeepCopy if true, force deep copy |
| * @return list of program blocks |
| */ |
| public static ArrayList<ProgramBlock> rcreateDeepCopyProgramBlocks(ArrayList<ProgramBlock> childBlocks, long pid, int IDPrefix, Set<String> fnStack, Set<String> fnCreated, boolean plain, boolean forceDeepCopy) |
| { |
| ArrayList<ProgramBlock> tmp = new ArrayList<>(); |
| |
| for( ProgramBlock pb : childBlocks ) |
| { |
| Program prog = pb.getProgram(); |
| ProgramBlock tmpPB = null; |
| |
| if( pb instanceof WhileProgramBlock ) { |
| tmpPB = createDeepCopyWhileProgramBlock((WhileProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy); |
| } |
| else if( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock) ) { |
| tmpPB = createDeepCopyForProgramBlock((ForProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy ); |
| } |
| else if( pb instanceof ParForProgramBlock ) { |
| ParForProgramBlock pfpb = (ParForProgramBlock) pb; |
| if( ParForProgramBlock.ALLOW_NESTED_PARALLELISM ) |
| tmpPB = createDeepCopyParForProgramBlock(pfpb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy); |
| else |
| tmpPB = createDeepCopyForProgramBlock((ForProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy); |
| } |
| else if( pb instanceof IfProgramBlock ) { |
| tmpPB = createDeepCopyIfProgramBlock((IfProgramBlock) pb, pid, IDPrefix, prog, fnStack, fnCreated, plain, forceDeepCopy); |
| } |
| else if( pb instanceof BasicProgramBlock ) { //last-level program block |
| BasicProgramBlock bpb = (BasicProgramBlock) pb; |
| tmpPB = new BasicProgramBlock(prog); // general case use for most PBs |
| |
| //for recompile in the master node JVM |
| tmpPB.setStatementBlock(createStatementBlockCopy(bpb.getStatementBlock(), pid, plain, forceDeepCopy)); |
| tmpPB.setThreadID(pid); |
| |
| //copy instructions |
| ((BasicProgramBlock)tmpPB).setInstructions( |
| createDeepCopyInstructionSet(bpb.getInstructions(), |
| pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); |
| } |
| |
| //copy symbol table |
| //tmpPB.setVariables( pb.getVariables() ); //implicit cloning |
| |
| tmp.add(tmpPB); |
| } |
| |
| return tmp; |
| } |
| |
| public static WhileProgramBlock createDeepCopyWhileProgramBlock(WhileProgramBlock wpb, long pid, int IDPrefix, Program prog, Set<String> fnStack, Set<String> fnCreated, boolean plain, boolean forceDeepCopy) { |
| ArrayList<Instruction> predinst = createDeepCopyInstructionSet(wpb.getPredicate(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true); |
| WhileProgramBlock tmpPB = new WhileProgramBlock(prog, predinst); |
| StatementBlock sb = ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) ? |
| createWhileStatementBlockCopy((WhileStatementBlock) wpb.getStatementBlock(), forceDeepCopy) : wpb.getStatementBlock(); |
| tmpPB.setStatementBlock( sb ); |
| tmpPB.setThreadID(pid); |
| tmpPB.setChildBlocks(rcreateDeepCopyProgramBlocks(wpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy)); |
| tmpPB.setExitInstruction(wpb.getExitInstruction()); |
| return tmpPB; |
| } |
| |
| public static IfProgramBlock createDeepCopyIfProgramBlock(IfProgramBlock ipb, long pid, int IDPrefix, Program prog, Set<String> fnStack, Set<String> fnCreated, boolean plain, boolean forceDeepCopy) { |
| ArrayList<Instruction> predinst = createDeepCopyInstructionSet(ipb.getPredicate(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true); |
| IfProgramBlock tmpPB = new IfProgramBlock(prog, predinst); |
| StatementBlock sb = ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) ? |
| createIfStatementBlockCopy((IfStatementBlock)ipb.getStatementBlock(), forceDeepCopy ) : ipb.getStatementBlock(); |
| tmpPB.setStatementBlock( sb ); |
| tmpPB.setThreadID(pid); |
| tmpPB.setChildBlocksIfBody(rcreateDeepCopyProgramBlocks(ipb.getChildBlocksIfBody(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy)); |
| tmpPB.setChildBlocksElseBody(rcreateDeepCopyProgramBlocks(ipb.getChildBlocksElseBody(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy)); |
| tmpPB.setExitInstruction(ipb.getExitInstruction()); |
| return tmpPB; |
| } |
| |
| public static ForProgramBlock createDeepCopyForProgramBlock(ForProgramBlock fpb, long pid, int IDPrefix, Program prog, Set<String> fnStack, Set<String> fnCreated, boolean plain, boolean forceDeepCopy) { |
| ForProgramBlock tmpPB = new ForProgramBlock(prog,fpb.getIterVar()); |
| StatementBlock sb = ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) ? |
| createForStatementBlockCopy((ForStatementBlock)fpb.getStatementBlock(), forceDeepCopy) : fpb.getStatementBlock(); |
| tmpPB.setStatementBlock(sb); |
| tmpPB.setThreadID(pid); |
| tmpPB.setFromInstructions( createDeepCopyInstructionSet(fpb.getFromInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); |
| tmpPB.setToInstructions( createDeepCopyInstructionSet(fpb.getToInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); |
| tmpPB.setIncrementInstructions( createDeepCopyInstructionSet(fpb.getIncrementInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); |
| tmpPB.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy) ); |
| tmpPB.setExitInstruction(fpb.getExitInstruction()); |
| return tmpPB; |
| } |
| |
| public static ForProgramBlock createShallowCopyForProgramBlock(ForProgramBlock fpb, Program prog ) { |
| ForProgramBlock tmpPB = new ForProgramBlock(prog,fpb.getIterVar()); |
| tmpPB.setFromInstructions( fpb.getFromInstructions() ); |
| tmpPB.setToInstructions( fpb.getToInstructions() ); |
| tmpPB.setIncrementInstructions( fpb.getIncrementInstructions() ); |
| tmpPB.setChildBlocks( fpb.getChildBlocks() ); |
| tmpPB.setExitInstruction(fpb.getExitInstruction()); |
| return tmpPB; |
| } |
| |
| public static ParForProgramBlock createDeepCopyParForProgramBlock(ParForProgramBlock pfpb, long pid, int IDPrefix, Program prog, Set<String> fnStack, Set<String> fnCreated, boolean plain, boolean forceDeepCopy) { |
| ParForProgramBlock tmpPB = null; |
| |
| if( IDPrefix == -1 ) //still on master node |
| tmpPB = new ParForProgramBlock(prog,pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables()); |
| else //child of remote ParWorker at any level |
| tmpPB = new ParForProgramBlock(IDPrefix, prog, pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables()); |
| |
| StatementBlock sb = ConfigurationManager.getCompilerConfigFlag(ConfigType.ALLOW_PARALLEL_DYN_RECOMPILATION) ? |
| createForStatementBlockCopy((ForStatementBlock)pfpb.getStatementBlock(), forceDeepCopy) : pfpb.getStatementBlock(); |
| tmpPB.setStatementBlock( sb ); |
| tmpPB.setThreadID(pid); |
| |
| tmpPB.disableOptimization(); //already done in top-level parfor |
| tmpPB.disableMonitorReport(); //already done in top-level parfor |
| |
| tmpPB.setFromInstructions( createDeepCopyInstructionSet(pfpb.getFromInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); |
| tmpPB.setToInstructions( createDeepCopyInstructionSet(pfpb.getToInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); |
| tmpPB.setIncrementInstructions( createDeepCopyInstructionSet(pfpb.getIncrementInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); |
| |
| //NOTE: Normally, no recursive copy because (1) copied on each execution in this PB anyway |
| //and (2) leave placeholders as they are. However, if plain, an explicit deep copy is requested. |
| if( plain || forceDeepCopy ) |
| tmpPB.setChildBlocks( rcreateDeepCopyProgramBlocks(pfpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, forceDeepCopy) ); |
| else |
| tmpPB.setChildBlocks( pfpb.getChildBlocks() ); |
| tmpPB.setExitInstruction(pfpb.getExitInstruction()); |
| |
| return tmpPB; |
| } |
| |
| /** |
| * This creates a deep copy of a function program block. The central reference to singletons of function program blocks |
| * poses the need for explicit copies in order to prevent conflicting writes of temporary variables (see ExternalFunctionProgramBlock. |
| * |
| * @param namespace function namespace |
| * @param oldName ? |
| * @param pid ? |
| * @param IDPrefix ? |
| * @param prog runtime program |
| * @param fnStack ? |
| * @param fnCreated ? |
| * @param plain ? |
| */ |
| public static void createDeepCopyFunctionProgramBlock(String namespace, String oldName, long pid, int IDPrefix, Program prog, Set<String> fnStack, Set<String> fnCreated, boolean plain) |
| { |
| //fpb guaranteed to be non-null (checked inside getFunctionProgramBlock) |
| FunctionProgramBlock fpb = prog.getFunctionProgramBlock(namespace, oldName); |
| String fnameNew = (plain)? oldName :(oldName+Lop.CP_CHILD_THREAD+pid); |
| String fnameNewKey = DMLProgram.constructFunctionKey(namespace,fnameNew); |
| |
| if( prog.getFunctionProgramBlocks().containsKey(fnameNewKey) ) |
| return; //prevent redundant deep copy if already existent |
| |
| //create deep copy |
| FunctionProgramBlock copy = null; |
| ArrayList<DataIdentifier> tmp1 = new ArrayList<>(); |
| ArrayList<DataIdentifier> tmp2 = new ArrayList<>(); |
| if( fpb.getInputParams()!= null ) |
| tmp1.addAll(fpb.getInputParams()); |
| if( fpb.getOutputParams()!= null ) |
| tmp2.addAll(fpb.getOutputParams()); |
| |
| |
| if( !fnStack.contains(fnameNewKey) ) { |
| fnStack.add(fnameNewKey); |
| copy = new FunctionProgramBlock(prog, tmp1, tmp2); |
| copy.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), pid, IDPrefix, fnStack, fnCreated, plain, fpb.isRecompileOnce()) ); |
| copy.setRecompileOnce( fpb.isRecompileOnce() ); |
| copy.setThreadID(pid); |
| fnStack.remove(fnameNewKey); |
| } |
| else //stop deep copy for recursive function calls |
| copy = fpb; |
| |
| //copy.setVariables( (LocalVariableMap) fpb.getVariables() ); //implicit cloning |
| //note: instructions not used by function program block |
| |
| //put if not existing (recursive processing might have added it) |
| if( !prog.getFunctionProgramBlocks().containsKey(fnameNewKey) ) { |
| prog.addFunctionProgramBlock(namespace, fnameNew, copy); |
| fnCreated.add(DMLProgram.constructFunctionKey(namespace, fnameNew)); |
| } |
| } |
| |
| public static FunctionProgramBlock createDeepCopyFunctionProgramBlock(FunctionProgramBlock fpb, Set<String> fnStack, Set<String> fnCreated) |
| { |
| if( fpb == null ) |
| throw new DMLRuntimeException("Unable to create a deep copy of a non-existing FunctionProgramBlock."); |
| |
| //create deep copy |
| FunctionProgramBlock copy = null; |
| ArrayList<DataIdentifier> tmp1 = new ArrayList<>(); |
| ArrayList<DataIdentifier> tmp2 = new ArrayList<>(); |
| if( fpb.getInputParams()!= null ) |
| tmp1.addAll(fpb.getInputParams()); |
| if( fpb.getOutputParams()!= null ) |
| tmp2.addAll(fpb.getOutputParams()); |
| |
| copy = new FunctionProgramBlock(fpb.getProgram(), tmp1, tmp2); |
| copy.setChildBlocks( rcreateDeepCopyProgramBlocks(fpb.getChildBlocks(), 0, -1, fnStack, fnCreated, true, fpb.isRecompileOnce()) ); |
| copy.setStatementBlock( fpb.getStatementBlock() ); |
| copy.setRecompileOnce(fpb.isRecompileOnce()); |
| //copy.setVariables( (LocalVariableMap) fpb.getVariables() ); //implicit cloning |
| //note: instructions not used by function program block |
| |
| return copy; |
| } |
| |
| |
| /** |
| * Creates a deep copy of an array of instructions and replaces the placeholders of parworker |
| * IDs with the concrete IDs of this parfor instance. This is a helper method uses for generating |
| * deep copies of program blocks. |
| * |
| * @param instSet list of instructions |
| * @param pid ? |
| * @param IDPrefix ? |
| * @param prog runtime program |
| * @param fnStack ? |
| * @param fnCreated ? |
| * @param plain ? |
| * @param cpFunctions ? |
| * @return list of instructions |
| */ |
| public static ArrayList<Instruction> createDeepCopyInstructionSet(ArrayList<Instruction> instSet, long pid, int IDPrefix, Program prog, Set<String> fnStack, Set<String> fnCreated, boolean plain, boolean cpFunctions) { |
| ArrayList<Instruction> tmp = new ArrayList<>(); |
| for( Instruction inst : instSet ) { |
| if( inst instanceof FunctionCallCPInstruction && cpFunctions ) { |
| FunctionCallCPInstruction finst = (FunctionCallCPInstruction) inst; |
| createDeepCopyFunctionProgramBlock( finst.getNamespace(), |
| finst.getFunctionName(), pid, IDPrefix, prog, fnStack, fnCreated, plain ); |
| } |
| tmp.add( cloneInstruction( inst, pid, plain, cpFunctions ) ); |
| } |
| return tmp; |
| } |
| |
| public static Instruction cloneInstruction( Instruction oInst, long pid, boolean plain, boolean cpFunctions ) |
| { |
| Instruction inst = null; |
| String tmpString = oInst.toString(); |
| |
| try |
| { |
| if( oInst instanceof CPInstruction || oInst instanceof SPInstruction |
| || oInst instanceof GPUInstruction ) { |
| if( oInst instanceof FunctionCallCPInstruction && cpFunctions ) { |
| FunctionCallCPInstruction tmp = (FunctionCallCPInstruction) oInst; |
| if( !plain ) { |
| //safe replacement because target variables might include the function name |
| //note: this is no update-in-place in order to keep the original function name as basis |
| tmpString = tmp.updateInstStringFunctionName(tmp.getFunctionName(), tmp.getFunctionName() + Lop.CP_CHILD_THREAD+pid); |
| } |
| //otherwise: preserve function name |
| } |
| inst = InstructionParser.parseSingleInstruction(tmpString); |
| } |
| else |
| throw new DMLRuntimeException("Failed to clone instruction: "+oInst); |
| } |
| catch(Exception ex) { |
| throw new DMLRuntimeException(ex); |
| } |
| |
| //save replacement of thread id references in instructions |
| inst = saveReplaceThreadID( inst, Lop.CP_ROOT_THREAD_ID, Lop.CP_CHILD_THREAD+pid); |
| |
| return inst; |
| } |
| |
| public static FunctionStatementBlock createDeepCopyFunctionStatementBlock(FunctionStatementBlock fsb, Set<String> fnStack, Set<String> fnCreated) { |
| FunctionStatement fstmt = (FunctionStatement) fsb.getStatement(0); |
| FunctionStatementBlock retSb = new FunctionStatementBlock(); |
| FunctionStatement retStmt = new FunctionStatement(); |
| retStmt.setName(fstmt.getName()); |
| retStmt.setInputParams(fstmt.getInputParams()); |
| retStmt.setInputDefaults(fstmt.getInputDefaults()); |
| retStmt.setOutputParams(fstmt.getOutputParams()); |
| retSb.addStatement(retStmt); |
| retSb.setDMLProg(fsb.getDMLProg()); |
| retSb.setParseInfo(fsb); |
| retSb.setLiveIn( fsb.liveIn() ); |
| retSb.setLiveOut( fsb.liveOut() ); |
| for( StatementBlock sb : fstmt.getBody() ) |
| retStmt.getBody().add(rCreateDeepCopyStatementBlock(sb)); |
| return retSb; |
| } |
| |
| public static StatementBlock rCreateDeepCopyStatementBlock(StatementBlock sb) { |
| StatementBlock ret = null; |
| if( sb instanceof IfStatementBlock ) { |
| IfStatementBlock orig = (IfStatementBlock) sb; |
| IfStatementBlock isb = createIfStatementBlockCopy(orig, true); |
| IfStatement origstmt = (IfStatement) orig.getStatement(0); |
| IfStatement istmt = new IfStatement(); //only shallow |
| istmt.setConditionalPredicate(origstmt.getConditionalPredicate()); |
| isb.setStatements(CollectionUtils.asArrayList(istmt)); |
| for( StatementBlock c : origstmt.getIfBody() ) |
| istmt.addStatementBlockIfBody(rCreateDeepCopyStatementBlock(c)); |
| for( StatementBlock c : origstmt.getElseBody() ) |
| istmt.addStatementBlockElseBody(rCreateDeepCopyStatementBlock(c)); |
| ret = isb; |
| } |
| else if( sb instanceof WhileStatementBlock ) { |
| WhileStatementBlock orig = (WhileStatementBlock) sb; |
| WhileStatementBlock wsb = createWhileStatementBlockCopy(orig, true); |
| WhileStatement origstmt = (WhileStatement) orig.getStatement(0); |
| WhileStatement wstmt = new WhileStatement(); //only shallow |
| wstmt.setPredicate(origstmt.getConditionalPredicate()); |
| wsb.setStatements(CollectionUtils.asArrayList(wstmt)); |
| for( StatementBlock c : origstmt.getBody() ) |
| wstmt.addStatementBlock(rCreateDeepCopyStatementBlock(c)); |
| ret = wsb; |
| } |
| else if( sb instanceof ForStatementBlock ) { //incl parfor |
| ForStatementBlock orig = (ForStatementBlock) sb; |
| ForStatementBlock fsb = createForStatementBlockCopy(orig, true); |
| ForStatement origstmt = (ForStatement) orig.getStatement(0); |
| ForStatement fstmt = (origstmt instanceof ParForStatement) ? |
| new ParForStatement() : new ForStatement(); //only shallow |
| fstmt.setPredicate(origstmt.getIterablePredicate()); |
| fsb.setStatements(CollectionUtils.asArrayList(fstmt)); |
| for( StatementBlock c : origstmt.getBody() ) |
| fstmt.addStatementBlock(rCreateDeepCopyStatementBlock(c)); |
| ret = fsb; |
| } |
| else { |
| StatementBlock bsb = createStatementBlockCopy(sb, -1, true, true); |
| for( Hop root : bsb.getHops() ) |
| if( root instanceof FunctionOp ) |
| ((FunctionOp)root).setCallOptimized(false); |
| ret = bsb; |
| } |
| return ret; |
| } |
| |
| public static StatementBlock createStatementBlockCopy( StatementBlock sb, long pid, boolean plain, boolean forceDeepCopy ) |
| { |
| StatementBlock ret = null; |
| |
| try |
| { |
| if( sb != null //forced deep copy for function recompilation |
| && (Recompiler.requiresRecompilation( sb.getHops() ) || forceDeepCopy) ) |
| { |
| //create new statement (shallow copy livein/liveout for recompile, line numbers for explain) |
| ret = new StatementBlock(); |
| ret.setDMLProg(sb.getDMLProg()); |
| ret.setParseInfo(sb); |
| ret.setLiveIn( sb.liveIn() ); |
| ret.setLiveOut( sb.liveOut() ); |
| ret.setUpdatedVariables( sb.variablesUpdated() ); |
| ret.setReadVariables( sb.variablesRead() ); |
| |
| //deep copy hops dag for concurrent recompile |
| ArrayList<Hop> hops = Recompiler.deepCopyHopsDag( sb.getHops() ); |
| if( !plain ) |
| Recompiler.updateFunctionNames( hops, pid ); |
| ret.setHops( hops ); |
| ret.updateRecompilationFlag(); |
| ret.setNondeterministic(sb.isNondeterministic()); |
| } |
| else { |
| ret = sb; |
| } |
| } |
| catch( Exception ex ) { |
| throw new DMLRuntimeException( ex ); |
| } |
| |
| return ret; |
| } |
| |
| public static IfStatementBlock createIfStatementBlockCopy( IfStatementBlock sb, boolean forceDeepCopy ) |
| { |
| IfStatementBlock ret = null; |
| |
| try |
| { |
| if( sb != null //forced deep copy for function recompile |
| && (Recompiler.requiresRecompilation( sb.getPredicateHops() ) || forceDeepCopy) ) |
| { |
| //create new statement (shallow copy livein/liveout for recompile, line numbers for explain) |
| ret = new IfStatementBlock(); |
| ret.setDMLProg(sb.getDMLProg()); |
| ret.setParseInfo(sb); |
| ret.setLiveIn( sb.liveIn() ); |
| ret.setLiveOut( sb.liveOut() ); |
| ret.setUpdatedVariables( sb.variablesUpdated() ); |
| ret.setReadVariables( sb.variablesRead() ); |
| |
| //shallow copy child statements |
| ret.setStatements( sb.getStatements() ); |
| |
| //deep copy predicate hops dag for concurrent recompile |
| Hop hops = Recompiler.deepCopyHopsDag( sb.getPredicateHops() ); |
| ret.setPredicateHops( hops ); |
| ret.updatePredicateRecompilationFlag(); |
| ret.setNondeterministic(sb.isNondeterministic()); |
| } |
| else { |
| ret = sb; |
| } |
| } |
| catch( Exception ex ) { |
| throw new DMLRuntimeException( ex ); |
| } |
| |
| return ret; |
| } |
| |
| public static WhileStatementBlock createWhileStatementBlockCopy( WhileStatementBlock sb, boolean forceDeepCopy ) |
| { |
| WhileStatementBlock ret = null; |
| |
| try |
| { |
| if( sb != null //forced deep copy for function recompile |
| && (Recompiler.requiresRecompilation( sb.getPredicateHops() ) || forceDeepCopy) ) |
| { |
| //create new statement (shallow copy livein/liveout for recompile, line numbers for explain) |
| ret = new WhileStatementBlock(); |
| ret.setDMLProg(sb.getDMLProg()); |
| ret.setParseInfo(sb); |
| ret.setLiveIn( sb.liveIn() ); |
| ret.setLiveOut( sb.liveOut() ); |
| ret.setUpdatedVariables( sb.variablesUpdated() ); |
| ret.setReadVariables( sb.variablesRead() ); |
| ret.setUpdateInPlaceVars( sb.getUpdateInPlaceVars() ); |
| |
| //shallow copy child statements |
| ret.setStatements( sb.getStatements() ); |
| |
| //deep copy predicate hops dag for concurrent recompile |
| Hop hops = Recompiler.deepCopyHopsDag( sb.getPredicateHops() ); |
| ret.setPredicateHops( hops ); |
| ret.updatePredicateRecompilationFlag(); |
| ret.setNondeterministic(sb.isNondeterministic()); |
| } |
| else { |
| ret = sb; |
| } |
| } |
| catch( Exception ex ) { |
| throw new DMLRuntimeException( ex ); |
| } |
| |
| return ret; |
| } |
| |
| public static ForStatementBlock createForStatementBlockCopy( ForStatementBlock sb, boolean forceDeepCopy ) |
| { |
| ForStatementBlock ret = null; |
| |
| try |
| { |
| if( sb != null && (forceDeepCopy |
| || Recompiler.requiresRecompilation(sb.getFromHops()) |
| || Recompiler.requiresRecompilation(sb.getToHops()) |
| || Recompiler.requiresRecompilation(sb.getIncrementHops())) ) |
| { |
| ret = (sb instanceof ParForStatementBlock) ? new ParForStatementBlock() : new ForStatementBlock(); |
| |
| //create new statement (shallow copy livein/liveout for recompile, line numbers for explain) |
| ret.setDMLProg(sb.getDMLProg()); |
| ret.setParseInfo(sb); |
| ret.setLiveIn( sb.liveIn() ); |
| ret.setLiveOut( sb.liveOut() ); |
| ret.setUpdatedVariables( sb.variablesUpdated() ); |
| ret.setReadVariables( sb.variablesRead() ); |
| ret.setUpdateInPlaceVars( sb.getUpdateInPlaceVars() ); |
| |
| //shallow copy child statements |
| ret.setStatements( sb.getStatements() ); |
| |
| //deep copy predicate hops dag for concurrent recompile |
| //or on create full statement block copies |
| ret.setFromHops( Recompiler.deepCopyHopsDag(sb.getFromHops())); |
| ret.setToHops(Recompiler.deepCopyHopsDag(sb.getToHops())); |
| if( sb.getIncrementHops() != null ) |
| ret.setIncrementHops(Recompiler.deepCopyHopsDag(sb.getIncrementHops())); |
| |
| ret.updatePredicateRecompilationFlags(); |
| ret.setNondeterministic(sb.isNondeterministic()); |
| } |
| else { |
| ret = sb; |
| } |
| } |
| catch( Exception ex ) { |
| throw new DMLRuntimeException( ex ); |
| } |
| |
| return ret; |
| } |
| |
| |
| //////////////////////////////// |
| // SERIALIZATION |
| //////////////////////////////// |
| |
| public static String serializeSparkPSBody(SparkPSBody body, HashMap<String, byte[]> clsMap) { |
| |
| ExecutionContext ec = body.getEc(); |
| StringBuilder builder = new StringBuilder(); |
| builder.append(PSBODY_BEGIN); |
| builder.append(NEWLINE); |
| |
| //handle DMLScript UUID (propagate original uuid for writing to scratch space) |
| builder.append(DMLScript.getUUID()); |
| builder.append(COMPONENTS_DELIM); |
| builder.append(NEWLINE); |
| |
| //handle DML config |
| builder.append(ConfigurationManager.getDMLConfig().serializeDMLConfig()); |
| builder.append(COMPONENTS_DELIM); |
| builder.append(NEWLINE); |
| |
| //handle additional configurations |
| builder.append(CONF_STATS + "=" + DMLScript.STATISTICS); |
| builder.append(COMPONENTS_DELIM); |
| builder.append(NEWLINE); |
| |
| //handle program |
| builder.append(PROG_BEGIN); |
| builder.append(NEWLINE); |
| builder.append(rSerializeFunctionProgramBlocks(ec.getProgram().getFunctionProgramBlocks(), |
| new HashSet<>(ec.getProgram().getFunctionProgramBlocks().keySet()), clsMap)); |
| builder.append(PROG_END); |
| builder.append(NEWLINE); |
| builder.append(COMPONENTS_DELIM); |
| builder.append(NEWLINE); |
| |
| //handle execution context |
| builder.append(EC_BEGIN); |
| builder.append(serializeExecutionContext(ec)); |
| builder.append(EC_END); |
| builder.append(NEWLINE); |
| builder.append(COMPONENTS_DELIM); |
| builder.append(NEWLINE); |
| |
| //handle program blocks |
| builder.append(PBS_BEGIN); |
| builder.append(NEWLINE); |
| builder.append(rSerializeProgramBlocks(ec.getProgram().getProgramBlocks(), clsMap)); |
| builder.append(PBS_END); |
| builder.append(NEWLINE); |
| builder.append(COMPONENTS_DELIM); |
| builder.append(NEWLINE); |
| |
| builder.append(PSBODY_END); |
| return builder.toString(); |
| } |
| |
| public static String serializeParForBody( ParForBody body ) { |
| return serializeParForBody(body, new HashMap<String, byte[]>()); |
| } |
| |
| public static String serializeParForBody( ParForBody body, HashMap<String,byte[]> clsMap ) |
| { |
| ArrayList<ProgramBlock> pbs = body.getChildBlocks(); |
| ArrayList<ResultVar> rVnames = body.getResultVariables(); |
| ExecutionContext ec = body.getEc(); |
| |
| if( pbs.isEmpty() ) |
| return PARFORBODY_BEGIN + PARFORBODY_END; |
| |
| Program prog = pbs.get( 0 ).getProgram(); |
| |
| StringBuilder sb = new StringBuilder(); |
| sb.append( PARFORBODY_BEGIN ); |
| sb.append( NEWLINE ); |
| |
| //handle DMLScript UUID (propagate original uuid for writing to scratch space) |
| sb.append( DMLScript.getUUID() ); |
| sb.append( COMPONENTS_DELIM ); |
| sb.append( NEWLINE ); |
| |
| //handle DML config |
| sb.append( ConfigurationManager.getDMLConfig().serializeDMLConfig() ); |
| sb.append( COMPONENTS_DELIM ); |
| sb.append( NEWLINE ); |
| |
| //handle additional configurations |
| sb.append( CONF_STATS + "=" + DMLScript.STATISTICS ); |
| sb.append( COMPONENTS_DELIM ); |
| sb.append( NEWLINE ); |
| |
| //handle program |
| sb.append(PROG_BEGIN); |
| sb.append( NEWLINE ); |
| sb.append( serializeProgram(prog, pbs, clsMap) ); |
| sb.append(PROG_END); |
| sb.append( NEWLINE ); |
| sb.append( COMPONENTS_DELIM ); |
| sb.append( NEWLINE ); |
| |
| //handle result variable names |
| sb.append( serializeResultVariables(rVnames) ); |
| sb.append( COMPONENTS_DELIM ); |
| |
| //handle execution context |
| //note: this includes also the symbol table (serialize only the top-level variable map, |
| // (symbol tables for nested/child blocks are created at parse time, on the remote side) |
| sb.append(EC_BEGIN); |
| sb.append( serializeExecutionContext(ec) ); |
| sb.append(EC_END); |
| sb.append( NEWLINE ); |
| sb.append( COMPONENTS_DELIM ); |
| sb.append( NEWLINE ); |
| |
| //handle program blocks |
| sb.append(PBS_BEGIN); |
| sb.append( NEWLINE ); |
| sb.append( rSerializeProgramBlocks(pbs, clsMap) ); |
| sb.append(PBS_END); |
| sb.append( NEWLINE ); |
| |
| sb.append( PARFORBODY_END ); |
| |
| return sb.toString(); |
| } |
| |
| private static String serializeProgram( Program prog, ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap ) { |
| //note program contains variables, programblocks and function program blocks |
| //but in order to avoid redundancy, we only serialize function program blocks |
| HashMap<String, FunctionProgramBlock> fpb = prog.getFunctionProgramBlocks(); |
| HashSet<String> cand = new HashSet<>(); |
| rFindSerializationCandidates(pbs, cand); |
| return rSerializeFunctionProgramBlocks( fpb, cand, clsMap ); |
| } |
| |
| private static void rFindSerializationCandidates( ArrayList<ProgramBlock> pbs, HashSet<String> cand ) |
| { |
| for( ProgramBlock pb : pbs ) |
| { |
| if( pb instanceof WhileProgramBlock ) { |
| WhileProgramBlock wpb = (WhileProgramBlock) pb; |
| rFindSerializationCandidates(wpb.getChildBlocks(), cand ); |
| } |
| else if ( pb instanceof ForProgramBlock || pb instanceof ParForProgramBlock ) { |
| ForProgramBlock fpb = (ForProgramBlock) pb; |
| rFindSerializationCandidates(fpb.getChildBlocks(), cand); |
| } |
| else if ( pb instanceof IfProgramBlock ) { |
| IfProgramBlock ipb = (IfProgramBlock) pb; |
| rFindSerializationCandidates(ipb.getChildBlocksIfBody(), cand); |
| if( ipb.getChildBlocksElseBody() != null ) |
| rFindSerializationCandidates(ipb.getChildBlocksElseBody(), cand); |
| } |
| else if( pb instanceof BasicProgramBlock ) { |
| BasicProgramBlock bpb = (BasicProgramBlock) pb; |
| for( Instruction inst : bpb.getInstructions() ) |
| if( inst instanceof FunctionCallCPInstruction ) { |
| FunctionCallCPInstruction fci = (FunctionCallCPInstruction) inst; |
| String fkey = DMLProgram.constructFunctionKey(fci.getNamespace(), fci.getFunctionName()); |
| if( !cand.contains(fkey) ) { //memoization for multiple calls, recursion |
| cand.add( fkey ); //add to candidates |
| //investigate chains of function calls |
| FunctionProgramBlock fpb = pb.getProgram().getFunctionProgramBlock(fci.getNamespace(), fci.getFunctionName()); |
| rFindSerializationCandidates(fpb.getChildBlocks(), cand); |
| } |
| } |
| } |
| } |
| } |
| |
| private static String serializeVariables (LocalVariableMap vars) { |
| StringBuilder sb = new StringBuilder(); |
| sb.append(VARS_BEGIN); |
| sb.append( vars.serialize() ); |
| sb.append(VARS_END); |
| return sb.toString(); |
| } |
| |
| public static String serializeDataObject(String key, Data dat) |
| { |
| // SCHEMA: <name>|<datatype>|<valuetype>|value |
| // (scalars are serialize by value, matrices by filename) |
| StringBuilder sb = new StringBuilder(); |
| //prepare data for serialization |
| String name = key; |
| DataType datatype = dat.getDataType(); |
| ValueType valuetype = dat.getValueType(); |
| String value = null; |
| String[] metaData = null; |
| String[] listData = null; |
| switch( datatype ) |
| { |
| case SCALAR: |
| ScalarObject so = (ScalarObject) dat; |
| //name = so.getName(); |
| value = so.getStringValue(); |
| break; |
| case MATRIX: |
| MatrixObject mo = (MatrixObject) dat; |
| MetaDataFormat md = (MetaDataFormat) dat.getMetaData(); |
| DataCharacteristics dc = md.getDataCharacteristics(); |
| value = mo.getFileName(); |
| PartitionFormat partFormat = (mo.getPartitionFormat()!=null) ? new PartitionFormat( |
| mo.getPartitionFormat(),mo.getPartitionSize()) : PartitionFormat.NONE; |
| metaData = new String[10]; |
| metaData[0] = String.valueOf( dc.getRows() ); |
| metaData[1] = String.valueOf( dc.getCols() ); |
| metaData[2] = String.valueOf( dc.getBlocksize() ); |
| metaData[3] = String.valueOf( dc.getNonZeros() ); |
| metaData[4] = md.getFileFormat().toString(); |
| metaData[5] = String.valueOf( partFormat ); |
| metaData[6] = String.valueOf( mo.getUpdateType() ); |
| metaData[7] = String.valueOf(mo.isHDFSFileExists()); |
| metaData[8] = String.valueOf(mo.isCleanupEnabled()); |
| break; |
| case LIST: |
| // SCHEMA: <name>|<datatype>|<valuetype>|value|<metadata>|<tab>element1<tab>element2<tab>element3 (this is the list) |
| // (for the element1) <listName-index>|<datatype>|<valuetype>|value |
| // (for the element2) <listName-index>|<datatype>|<valuetype>|value |
| ListObject lo = (ListObject) dat; |
| value = REF; |
| metaData = new String[2]; |
| metaData[0] = String.valueOf(lo.getLength()); |
| metaData[1] = lo.getNames() == null ? EMPTY : serializeList(lo.getNames(), ELEMENT_DELIM2); |
| listData = new String[lo.getLength()]; |
| for (int index = 0; index < lo.getLength(); index++) { |
| listData[index] = serializeDataObject(name + DASH + index, lo.slice(index)); |
| } |
| break; |
| default: |
| throw new DMLRuntimeException("Unable to serialize datatype "+datatype); |
| } |
| |
| //serialize data |
| sb.append(name); |
| sb.append(DATA_FIELD_DELIM); |
| sb.append(datatype); |
| sb.append(DATA_FIELD_DELIM); |
| sb.append(valuetype); |
| sb.append(DATA_FIELD_DELIM); |
| sb.append(value); |
| if( metaData != null ) |
| for( int i=0; i<metaData.length; i++ ) { |
| sb.append(DATA_FIELD_DELIM); |
| sb.append(metaData[i]); |
| } |
| if (listData != null) { |
| sb.append(DATA_FIELD_DELIM); |
| for (String ld : listData) { |
| sb.append(LIST_ELEMENT_DELIM); |
| sb.append(ld); |
| } |
| } |
| |
| return sb.toString(); |
| } |
| |
| private static String serializeExecutionContext( ExecutionContext ec ) { |
| return (ec != null) ? serializeVariables( ec.getVariables() ) : EMPTY; |
| } |
| |
| @SuppressWarnings("all") |
| private static String serializeInstructions( ArrayList<Instruction> inst, HashMap<String, byte[]> clsMap ) |
| { |
| StringBuilder sb = new StringBuilder(); |
| int count = 0; |
| for( Instruction linst : inst ) { |
| //check that only cp instruction are transmitted |
| if( !( linst instanceof CPInstruction) ) |
| throw new DMLRuntimeException( NOT_SUPPORTED_SPARK_INSTRUCTION + " " +linst.getClass().getName()+"\n"+linst ); |
| |
| //obtain serialized version of generated classes |
| if( linst instanceof SpoofCPInstruction ) { |
| Class<?> cla = ((SpoofCPInstruction) linst).getOperatorClass(); |
| clsMap.put(cla.getName(), CodegenUtils.getClassData(cla.getName())); |
| } |
| |
| if( count > 0 ) |
| sb.append( ELEMENT_DELIM ); |
| |
| sb.append( checkAndReplaceLiterals( linst.toString() ) ); |
| count++; |
| } |
| |
| return sb.toString(); |
| } |
| |
| /** |
| * Replacement of internal delimiters occurring in literals of instructions |
| * in order to ensure robustness of serialization and parsing. |
| * (e.g. print( "a,b" ) would break the parsing of instruction that internally |
| * are separated with a "," ) |
| * |
| * @param instStr instruction string |
| * @return instruction string with replacements |
| */ |
| private static String checkAndReplaceLiterals( String instStr ) |
| { |
| String tmp = instStr; |
| |
| //1) check own delimiters (very unlikely due to special characters) |
| if( tmp.contains(COMPONENTS_DELIM) ) { |
| tmp = tmp.replaceAll(COMPONENTS_DELIM, "."); |
| LOG.warn("Replaced special literal character sequence "+COMPONENTS_DELIM+" with '.'"); |
| } |
| |
| if( tmp.contains(ELEMENT_DELIM) ) { |
| tmp = tmp.replaceAll(ELEMENT_DELIM, "."); |
| LOG.warn("Replaced special literal character sequence "+ELEMENT_DELIM+" with '.'"); |
| } |
| |
| if( tmp.contains( LEVELIN ) ){ |
| tmp = tmp.replaceAll(LEVELIN, "("); // '\\' required if LEVELIN='{' because regex |
| LOG.warn("Replaced special literal character sequence "+LEVELIN+" with '('"); |
| } |
| |
| if( tmp.contains(LEVELOUT) ){ |
| tmp = tmp.replaceAll(LEVELOUT, ")"); |
| LOG.warn("Replaced special literal character sequence "+LEVELOUT+" with ')'"); |
| } |
| |
| //NOTE: DATA_FIELD_DELIM and KEY_VALUE_DELIM not required |
| //because those literals cannot occur in critical places. |
| |
| //2) check end tag of CDATA |
| if( tmp.contains(CDATA_END) ){ |
| tmp = tmp.replaceAll(CDATA_END, "."); //prevent XML parsing issues in job.xml |
| LOG.warn("Replaced special literal character sequence "+ CDATA_END +" with '.'"); |
| } |
| |
| return tmp; |
| } |
| |
| private static String serializeStringHashMap(HashMap<String,String> vars) { |
| return serializeList(vars.entrySet().stream().map(e -> |
| e.getKey()+KEY_VALUE_DELIM+e.getValue()).collect(Collectors.toList())); |
| } |
| |
| public static String serializeResultVariables( List<ResultVar> vars) { |
| return serializeList(vars.stream().map(v -> v._isAccum ? |
| v._name+"+" : v._name).collect(Collectors.toList())); |
| } |
| |
| public static String serializeList(List<String> elements) { |
| return serializeList(elements, ELEMENT_DELIM); |
| } |
| |
| public static String serializeList(List<String> elements, String delim) { |
| return StringUtils.join(elements, delim); |
| } |
| |
| private static String serializeDataIdentifiers(List<DataIdentifier> vars) { |
| return serializeList(vars.stream().map(v -> |
| serializeDataIdentifier(v)).collect(Collectors.toList())); |
| } |
| |
| private static String serializeDataIdentifier( DataIdentifier dat ) { |
| // SCHEMA: <name>|<datatype>|<valuetype> |
| StringBuilder sb = new StringBuilder(); |
| sb.append(dat.getName()); |
| sb.append(DATA_FIELD_DELIM); |
| sb.append(dat.getDataType()); |
| sb.append(DATA_FIELD_DELIM); |
| sb.append(dat.getValueType()); |
| return sb.toString(); |
| } |
| |
| private static String rSerializeFunctionProgramBlocks(HashMap<String,FunctionProgramBlock> pbs, HashSet<String> cand, HashMap<String, byte[]> clsMap) { |
| StringBuilder sb = new StringBuilder(); |
| int count = 0; |
| for( Entry<String,FunctionProgramBlock> pb : pbs.entrySet() ) { |
| if( !cand.contains(pb.getKey()) ) //skip function not included in the parfor body |
| continue; |
| if( count>0 ) { |
| sb.append( ELEMENT_DELIM ); |
| sb.append( NEWLINE ); |
| } |
| sb.append( pb.getKey() ); |
| sb.append( KEY_VALUE_DELIM ); |
| sb.append( rSerializeProgramBlock(pb.getValue(), clsMap) ); |
| count++; |
| } |
| sb.append(NEWLINE); |
| return sb.toString(); |
| } |
| |
| private static String rSerializeProgramBlocks(ArrayList<ProgramBlock> pbs, HashMap<String, byte[]> clsMap) { |
| StringBuilder sb = new StringBuilder(); |
| int count = 0; |
| for( ProgramBlock pb : pbs ) { |
| if( count>0 ) { |
| sb.append( ELEMENT_DELIM ); |
| sb.append(NEWLINE); |
| } |
| sb.append( rSerializeProgramBlock(pb, clsMap) ); |
| count++; |
| } |
| return sb.toString(); |
| } |
| |
| private static String rSerializeProgramBlock( ProgramBlock pb, HashMap<String, byte[]> clsMap ) { |
| StringBuilder sb = new StringBuilder(); |
| |
| //handle header |
| if( pb instanceof WhileProgramBlock ) |
| sb.append(PB_WHILE); |
| else if ( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock) ) |
| sb.append(PB_FOR); |
| else if ( pb instanceof ParForProgramBlock ) |
| sb.append(PB_PARFOR); |
| else if ( pb instanceof IfProgramBlock ) |
| sb.append(PB_IF); |
| else if ( pb instanceof FunctionProgramBlock ) |
| sb.append(PB_FC); |
| else //all generic program blocks |
| sb.append(PB_BEGIN); |
| |
| //handle body |
| if( pb instanceof WhileProgramBlock ) { |
| WhileProgramBlock wpb = (WhileProgramBlock) pb; |
| sb.append(INST_BEGIN); |
| sb.append( serializeInstructions( wpb.getPredicate(), clsMap ) ); |
| sb.append(INST_END); |
| sb.append( COMPONENTS_DELIM ); |
| sb.append(PBS_BEGIN); |
| sb.append( rSerializeProgramBlocks( wpb.getChildBlocks(), clsMap) ); |
| sb.append(PBS_END); |
| } |
| else if ( pb instanceof ForProgramBlock && !(pb instanceof ParForProgramBlock ) ) { |
| ForProgramBlock fpb = (ForProgramBlock) pb; |
| sb.append( fpb.getIterVar() ); |
| sb.append( COMPONENTS_DELIM ); |
| sb.append(INST_BEGIN); |
| sb.append( serializeInstructions( fpb.getFromInstructions(), clsMap ) ); |
| sb.append(INST_END); |
| sb.append( COMPONENTS_DELIM ); |
| sb.append(INST_BEGIN); |
| sb.append( serializeInstructions(fpb.getToInstructions(), clsMap) ); |
| sb.append(INST_END); |
| sb.append( COMPONENTS_DELIM ); |
| sb.append(INST_BEGIN); |
| sb.append( serializeInstructions(fpb.getIncrementInstructions(), clsMap) ); |
| sb.append(INST_END); |
| sb.append( COMPONENTS_DELIM ); |
| sb.append(PBS_BEGIN); |
| sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) ); |
| sb.append(PBS_END); |
| } |
| else if ( pb instanceof ParForProgramBlock ) { |
| ParForProgramBlock pfpb = (ParForProgramBlock) pb; |
| |
| //check for nested remote ParFOR |
| if( PExecMode.valueOf( pfpb.getParForParams().get( ParForStatementBlock.EXEC_MODE )) == PExecMode.REMOTE_SPARK ) |
| throw new DMLRuntimeException( NOT_SUPPORTED_SPARK_PARFOR ); |
| |
| sb.append( pfpb.getIterVar() ); |
| sb.append( COMPONENTS_DELIM ); |
| sb.append( serializeResultVariables( pfpb.getResultVariables()) ); |
| sb.append( COMPONENTS_DELIM ); |
| sb.append( serializeStringHashMap( pfpb.getParForParams()) ); //parameters of nested parfor |
| sb.append( COMPONENTS_DELIM ); |
| sb.append(INST_BEGIN); |
| sb.append( serializeInstructions(pfpb.getFromInstructions(), clsMap) ); |
| sb.append(INST_END); |
| sb.append( COMPONENTS_DELIM ); |
| sb.append(INST_BEGIN); |
| sb.append( serializeInstructions(pfpb.getToInstructions(), clsMap) ); |
| sb.append(INST_END); |
| sb.append( COMPONENTS_DELIM ); |
| sb.append(INST_BEGIN); |
| sb.append( serializeInstructions(pfpb.getIncrementInstructions(), clsMap) ); |
| sb.append(INST_END); |
| sb.append( COMPONENTS_DELIM ); |
| sb.append(PBS_BEGIN); |
| sb.append( rSerializeProgramBlocks( pfpb.getChildBlocks(), clsMap ) ); |
| sb.append(PBS_END); |
| } |
| else if ( pb instanceof IfProgramBlock ) { |
| IfProgramBlock ipb = (IfProgramBlock) pb; |
| sb.append(INST_BEGIN); |
| sb.append( serializeInstructions(ipb.getPredicate(), clsMap) ); |
| sb.append(INST_END); |
| sb.append( COMPONENTS_DELIM ); |
| sb.append(PBS_BEGIN); |
| sb.append( rSerializeProgramBlocks(ipb.getChildBlocksIfBody(), clsMap) ); |
| sb.append(PBS_END); |
| sb.append( COMPONENTS_DELIM ); |
| sb.append(PBS_BEGIN); |
| sb.append( rSerializeProgramBlocks(ipb.getChildBlocksElseBody(), clsMap) ); |
| sb.append(PBS_END); |
| } |
| else if( pb instanceof FunctionProgramBlock ) { |
| FunctionProgramBlock fpb = (FunctionProgramBlock) pb; |
| sb.append( serializeDataIdentifiers( fpb.getInputParams() ) ); |
| sb.append( COMPONENTS_DELIM ); |
| sb.append( serializeDataIdentifiers( fpb.getOutputParams() ) ); |
| sb.append( COMPONENTS_DELIM ); |
| sb.append(PBS_BEGIN); |
| sb.append( rSerializeProgramBlocks(fpb.getChildBlocks(), clsMap) ); |
| sb.append(PBS_END); |
| sb.append( COMPONENTS_DELIM ); |
| } |
| else if( pb instanceof BasicProgramBlock ) { |
| BasicProgramBlock bpb = (BasicProgramBlock) pb; |
| sb.append(INST_BEGIN); |
| sb.append( serializeInstructions( |
| bpb.getInstructions(), clsMap) ); |
| sb.append(INST_END); |
| } |
| |
| //handle end |
| sb.append(PB_END); |
| |
| return sb.toString(); |
| } |
| |
| |
| //////////////////////////////// |
| // PARSING |
| //////////////////////////////// |
| |
| public static SparkPSBody parseSparkPSBody(String in, int id) { |
| SparkPSBody body = new SparkPSBody(); |
| |
| //header elimination |
| String tmpin = in.replaceAll(NEWLINE, ""); //normalization |
| tmpin = tmpin.substring(PSBODY_BEGIN.length(), tmpin.length() - PSBODY_END.length()); //remove start/end |
| HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(tmpin, COMPONENTS_DELIM); |
| |
| //handle DMLScript UUID (NOTE: set directly in DMLScript) |
| //(master UUID is used for all nodes (in order to simply cleanup)) |
| DMLScript.setUUID(st.nextToken()); |
| |
| //handle DML config (NOTE: set directly in ConfigurationManager) |
| handleDMLConfig(st.nextToken()); |
| |
| //handle additional configs |
| parseAndSetAdditionalConfigurations(st.nextToken()); |
| |
| //handle program |
| Program prog = parseProgram(st.nextToken(), id); |
| |
| //handle execution context |
| ExecutionContext ec = parseExecutionContext(st.nextToken(), prog); |
| ec.setProgram(prog); |
| |
| //handle program blocks |
| String spbs = st.nextToken(); |
| ArrayList<ProgramBlock> pbs = rParseProgramBlocks(spbs, prog, id); |
| prog.getProgramBlocks().addAll(pbs); |
| |
| body.setEc(ec); |
| return body; |
| } |
| |
| public static ParForBody parseParForBody( String in, int id ) { |
| return parseParForBody(in, id, false); |
| } |
| |
| public static ParForBody parseParForBody( String in, int id, boolean inSpark ) { |
| ParForBody body = new ParForBody(); |
| |
| //header elimination |
| String tmpin = in.replaceAll(NEWLINE, ""); //normalization |
| tmpin = tmpin.substring(PARFORBODY_BEGIN.length(),tmpin.length()-PARFORBODY_END.length()); //remove start/end |
| HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(tmpin, COMPONENTS_DELIM); |
| |
| //handle DMLScript UUID (NOTE: set directly in DMLScript) |
| //(master UUID is used for all nodes (in order to simply cleanup)) |
| DMLScript.setUUID( st.nextToken() ); |
| |
| //handle DML config (NOTE: set directly in ConfigurationManager) |
| String confStr = st.nextToken(); |
| JobConf job = ConfigurationManager.getCachedJobConf(); |
| if( !InfrastructureAnalyzer.isLocalMode(job) ) { |
| handleDMLConfig(confStr); |
| } |
| |
| //handle additional configs |
| String aconfs = st.nextToken(); |
| if( !inSpark ) |
| parseAndSetAdditionalConfigurations( aconfs ); |
| |
| //handle program |
| String progStr = st.nextToken(); |
| Program prog = parseProgram( progStr, id ); |
| |
| //handle result variable names |
| String rvarStr = st.nextToken(); |
| ArrayList<ResultVar> rvars = parseResultVariables(rvarStr); |
| body.setResultVariables(rvars); |
| |
| //handle execution context |
| String ecStr = st.nextToken(); |
| ExecutionContext ec = parseExecutionContext( ecStr, prog ); |
| |
| //handle program blocks |
| String spbs = st.nextToken(); |
| ArrayList<ProgramBlock> pbs = rParseProgramBlocks(spbs, prog, id); |
| |
| body.setChildBlocks( pbs ); |
| body.setEc( ec ); |
| |
| return body; |
| } |
| |
| private static void handleDMLConfig(String confStr) { |
| if(confStr != null && !confStr.trim().isEmpty()) { |
| DMLConfig dmlconf = DMLConfig.parseDMLConfig(confStr); |
| CompilerConfig cconf = OptimizerUtils.constructCompilerConfig(dmlconf); |
| ConfigurationManager.setLocalConfig(dmlconf); |
| ConfigurationManager.setLocalConfig(cconf); |
| } |
| } |
| |
| public static Program parseProgram( String in, int id ) { |
| String lin = in.substring( PROG_BEGIN.length(),in.length()- PROG_END.length()).trim(); |
| Program prog = new Program(); |
| HashMap<String,FunctionProgramBlock> fc = parseFunctionProgramBlocks(lin, prog, id); |
| for( Entry<String,FunctionProgramBlock> e : fc.entrySet() ) { |
| String[] keypart = e.getKey().split( Program.KEY_DELIM ); |
| String namespace = keypart[0]; |
| String name = keypart[1]; |
| prog.addFunctionProgramBlock(namespace, name, e.getValue()); |
| } |
| return prog; |
| } |
| |
| private static LocalVariableMap parseVariables(String in) { |
| LocalVariableMap ret = null; |
| if( in.length()> VARS_BEGIN.length() + VARS_END.length()) { |
| String varStr = in.substring( VARS_BEGIN.length(),in.length()- VARS_END.length()).trim(); |
| ret = LocalVariableMap.deserialize(varStr); |
| } |
| else { //empty input symbol table |
| ret = new LocalVariableMap(); |
| } |
| return ret; |
| } |
| |
| private static HashMap<String,FunctionProgramBlock> parseFunctionProgramBlocks( String in, Program prog, int id ) { |
| HashMap<String,FunctionProgramBlock> ret = new HashMap<>(); |
| HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer( in, ELEMENT_DELIM ); |
| while( st.hasMoreTokens() ) { |
| String lvar = st.nextToken(); //with ID = CP_CHILD_THREAD+id for current use |
| //put first copy into prog (for direct use) |
| int index = lvar.indexOf( KEY_VALUE_DELIM ); |
| String tmp1 = lvar.substring(0, index); // + CP_CHILD_THREAD+id; |
| String tmp2 = lvar.substring(index + 1); |
| ret.put(tmp1, (FunctionProgramBlock)rParseProgramBlock(tmp2, prog, id)); |
| } |
| return ret; |
| } |
| |
| private static ArrayList<ProgramBlock> rParseProgramBlocks(String in, Program prog, int id) { |
| ArrayList<ProgramBlock> pbs = new ArrayList<>(); |
| String tmpdata = in.substring(PBS_BEGIN.length(),in.length()- PBS_END.length()); |
| HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(tmpdata, ELEMENT_DELIM); |
| while( st.hasMoreTokens() ) |
| pbs.add( rParseProgramBlock( st.nextToken(), prog, id ) ); |
| return pbs; |
| } |
| |
| private static ProgramBlock rParseProgramBlock( String in, Program prog, int id ) { |
| ProgramBlock pb = null; |
| if( in.startsWith(PB_WHILE) ) |
| pb = rParseWhileProgramBlock( in, prog, id ); |
| else if ( in.startsWith(PB_FOR) ) |
| pb = rParseForProgramBlock( in, prog, id ); |
| else if ( in.startsWith(PB_PARFOR) ) |
| pb = rParseParForProgramBlock( in, prog, id ); |
| else if ( in.startsWith(PB_IF) ) |
| pb = rParseIfProgramBlock( in, prog, id ); |
| else if ( in.startsWith(PB_FC) ) |
| pb = rParseFunctionProgramBlock( in, prog, id ); |
| else if ( in.startsWith(PB_BEGIN) ) |
| pb = rParseGenericProgramBlock( in, prog, id ); |
| else |
| throw new DMLRuntimeException( NOT_SUPPORTED_PB+" "+in ); |
| return pb; |
| } |
| |
| private static WhileProgramBlock rParseWhileProgramBlock( String in, Program prog, int id ) { |
| String lin = in.substring( PB_WHILE.length(),in.length()- PB_END.length()); |
| HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); |
| |
| //predicate instructions |
| ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id); |
| |
| //program blocks |
| ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id); |
| |
| WhileProgramBlock wpb = new WhileProgramBlock(prog,inst); |
| wpb.setChildBlocks(pbs); |
| return wpb; |
| } |
| |
| private static ForProgramBlock rParseForProgramBlock( String in, Program prog, int id ) { |
| String lin = in.substring( PB_FOR.length(),in.length()- PB_END.length()); |
| HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); |
| |
| //inputs |
| String iterVar = st.nextToken(); |
| |
| //instructions |
| ArrayList<Instruction> from = parseInstructions(st.nextToken(),id); |
| ArrayList<Instruction> to = parseInstructions(st.nextToken(),id); |
| ArrayList<Instruction> incr = parseInstructions(st.nextToken(),id); |
| |
| //program blocks |
| ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id); |
| |
| ForProgramBlock fpb = new ForProgramBlock(prog, iterVar); |
| fpb.setFromInstructions(from); |
| fpb.setToInstructions(to); |
| fpb.setIncrementInstructions(incr); |
| fpb.setChildBlocks(pbs); |
| |
| return fpb; |
| } |
| |
| private static ParForProgramBlock rParseParForProgramBlock( String in, Program prog, int id ) { |
| String lin = in.substring( PB_PARFOR.length(),in.length()- PB_END.length()); |
| HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); |
| |
| //inputs |
| String iterVar = st.nextToken(); |
| ArrayList<ResultVar> resultVars = parseResultVariables(st.nextToken()); |
| HashMap<String,String> params = parseStringHashMap(st.nextToken()); |
| |
| //instructions |
| ArrayList<Instruction> from = parseInstructions(st.nextToken(), 0); |
| ArrayList<Instruction> to = parseInstructions(st.nextToken(), 0); |
| ArrayList<Instruction> incr = parseInstructions(st.nextToken(), 0); |
| |
| //program blocks //reset id to preinit state, replaced during exec |
| ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, 0); |
| |
| ParForProgramBlock pfpb = new ParForProgramBlock(id, prog, iterVar, params, resultVars); |
| pfpb.disableOptimization(); //already done in top-level parfor |
| pfpb.setFromInstructions(from); |
| pfpb.setToInstructions(to); |
| pfpb.setIncrementInstructions(incr); |
| pfpb.setChildBlocks(pbs); |
| |
| return pfpb; |
| } |
| |
| private static IfProgramBlock rParseIfProgramBlock( String in, Program prog, int id ) { |
| String lin = in.substring( PB_IF.length(),in.length()- PB_END.length()); |
| HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); |
| |
| //predicate instructions |
| ArrayList<Instruction> inst = parseInstructions(st.nextToken(),id); |
| |
| //program blocks: if and else |
| ArrayList<ProgramBlock> pbs1 = rParseProgramBlocks(st.nextToken(), prog, id); |
| ArrayList<ProgramBlock> pbs2 = rParseProgramBlocks(st.nextToken(), prog, id); |
| |
| IfProgramBlock ipb = new IfProgramBlock(prog,inst); |
| ipb.setChildBlocksIfBody(pbs1); |
| ipb.setChildBlocksElseBody(pbs2); |
| |
| return ipb; |
| } |
| |
| private static FunctionProgramBlock rParseFunctionProgramBlock( String in, Program prog, int id ) { |
| String lin = in.substring( PB_FC.length(),in.length()- PB_END.length()); |
| HierarchyAwareStringTokenizer st = new HierarchyAwareStringTokenizer(lin, COMPONENTS_DELIM); |
| |
| //inputs and outputs |
| ArrayList<DataIdentifier> dat1 = parseDataIdentifiers(st.nextToken()); |
| ArrayList<DataIdentifier> dat2 = parseDataIdentifiers(st.nextToken()); |
| |
| //program blocks |
| ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, id); |
| |
| ArrayList<DataIdentifier> tmp1 = new ArrayList<>(dat1); |
| ArrayList<DataIdentifier> tmp2 = new ArrayList<>(dat2); |
| FunctionProgramBlock fpb = new FunctionProgramBlock(prog, tmp1, tmp2); |
| fpb.setChildBlocks(pbs); |
| |
| return fpb; |
| } |
| |
| private static ProgramBlock rParseGenericProgramBlock( String in, Program prog, int id ) { |
| String lin = in.substring( PB_BEGIN.length(),in.length()- PB_END.length()); |
| StringTokenizer st = new StringTokenizer(lin,COMPONENTS_DELIM); |
| BasicProgramBlock pb = new BasicProgramBlock(prog); |
| pb.setInstructions(parseInstructions(st.nextToken(),id)); |
| return pb; |
| } |
| |
| private static ArrayList<Instruction> parseInstructions( String in, int id ) { |
| ArrayList<Instruction> insts = new ArrayList<>(); |
| String lin = in.substring( INST_BEGIN.length(),in.length()- INST_END.length()); |
| StringTokenizer st = new StringTokenizer(lin, ELEMENT_DELIM); |
| while(st.hasMoreTokens()) { |
| //Note that at this point only CP instructions and External function instruction can occur |
| String instStr = st.nextToken(); |
| try { |
| Instruction tmpinst = CPInstructionParser.parseSingleInstruction(instStr); |
| tmpinst = saveReplaceThreadID(tmpinst, Lop.CP_ROOT_THREAD_ID, Lop.CP_CHILD_THREAD+id ); |
| insts.add( tmpinst ); |
| } |
| catch(Exception ex) { |
| throw new DMLRuntimeException("Failed to parse instruction: " + instStr, ex); |
| } |
| } |
| return insts; |
| } |
| |
| private static ArrayList<ResultVar> parseResultVariables(String in) { |
| ArrayList<ResultVar> ret = new ArrayList<>(); |
| for(String var : parseStringArrayList(in)) { |
| boolean accum = var.endsWith("+"); |
| ret.add(new ResultVar(accum ? var.substring(0, var.length()-1) : var, accum)); |
| } |
| return ret; |
| } |
| |
| private static HashMap<String,String> parseStringHashMap( String in ) { |
| HashMap<String,String> vars = new HashMap<>(); |
| StringTokenizer st = new StringTokenizer(in,ELEMENT_DELIM); |
| while( st.hasMoreTokens() ) { |
| String lin = st.nextToken(); |
| int index = lin.indexOf( KEY_VALUE_DELIM ); |
| String tmp1 = lin.substring(0, index); |
| String tmp2 = lin.substring(index + 1); |
| vars.put(tmp1, tmp2); |
| } |
| return vars; |
| } |
| |
| private static ArrayList<String> parseStringArrayList(String in) { |
| return parseStringArrayList(in, ELEMENT_DELIM); |
| } |
| |
| private static ArrayList<String> parseStringArrayList(String in, String delim) { |
| StringTokenizer st = new StringTokenizer(in, delim); |
| ArrayList<String> vars = new ArrayList<>(st.countTokens()); |
| while( st.hasMoreTokens() ) |
| vars.add(st.nextToken()); |
| return vars; |
| } |
| |
| private static ArrayList<DataIdentifier> parseDataIdentifiers( String in ) { |
| ArrayList<DataIdentifier> vars = new ArrayList<>(); |
| StringTokenizer st = new StringTokenizer(in, ELEMENT_DELIM); |
| while( st.hasMoreTokens() ) { |
| String tmp = st.nextToken(); |
| DataIdentifier dat = parseDataIdentifier( tmp ); |
| vars.add(dat); |
| } |
| return vars; |
| } |
| |
| private static DataIdentifier parseDataIdentifier( String in ) { |
| StringTokenizer st = new StringTokenizer(in, DATA_FIELD_DELIM); |
| DataIdentifier dat = new DataIdentifier(st.nextToken()); |
| dat.setDataType(DataType.valueOf(st.nextToken())); |
| dat.setValueType(ValueType.valueOf(st.nextToken())); |
| return dat; |
| } |
| |
| /** |
| * NOTE: MRJobConfiguration cannot be used for the general case because program blocks and |
| * related symbol tables can be hierarchically structured. |
| * |
| * @param in data object as string |
| * @return array of objects |
| */ |
| public static Object[] parseDataObject(String in) { |
| Object[] ret = new Object[2]; |
| |
| StringTokenizer st = new StringTokenizer(in, DATA_FIELD_DELIM ); |
| String name = st.nextToken(); |
| DataType datatype = DataType.valueOf( st.nextToken() ); |
| ValueType valuetype = ValueType.valueOf( st.nextToken() ); |
| String valString = st.hasMoreTokens() ? st.nextToken() : ""; |
| Data dat = null; |
| switch( datatype ) |
| { |
| case SCALAR: { |
| switch ( valuetype ) { |
| case INT64: dat = new IntObject(Long.parseLong(valString)); break; |
| case FP64: dat = new DoubleObject(Double.parseDouble(valString)); break; |
| case BOOLEAN: dat = new BooleanObject(Boolean.parseBoolean(valString)); break; |
| case STRING: dat = new StringObject(valString); break; |
| default: |
| throw new DMLRuntimeException("Unable to parse valuetype "+valuetype); |
| } |
| break; |
| } |
| case MATRIX: { |
| MatrixObject mo = new MatrixObject(valuetype,valString); |
| long rows = Long.parseLong( st.nextToken() ); |
| long cols = Long.parseLong( st.nextToken() ); |
| int blen = Integer.parseInt( st.nextToken() ); |
| long nnz = Long.parseLong( st.nextToken() ); |
| FileFormat fmt = FileFormat.safeValueOf(st.nextToken()); |
| PartitionFormat partFormat = PartitionFormat.valueOf( st.nextToken() ); |
| UpdateType inplace = UpdateType.valueOf( st.nextToken() ); |
| MatrixCharacteristics mc = new MatrixCharacteristics(rows, cols, blen, nnz); |
| MetaDataFormat md = new MetaDataFormat(mc, fmt); |
| mo.setMetaData( md ); |
| if( partFormat._dpf != PDataPartitionFormat.NONE ) |
| mo.setPartitioned( partFormat._dpf, partFormat._N ); |
| mo.setUpdateType(inplace); |
| mo.setHDFSFileExists(Boolean.valueOf(st.nextToken())); |
| mo.enableCleanup(Boolean.valueOf(st.nextToken())); |
| dat = mo; |
| break; |
| } |
| case LIST: |
| int size = Integer.parseInt(st.nextToken()); |
| String namesStr = st.nextToken(); |
| List<String> names = namesStr.equals(EMPTY) ? null : |
| parseStringArrayList(namesStr, ELEMENT_DELIM2); |
| List<Data> data = new ArrayList<>(size); |
| st.nextToken(LIST_ELEMENT_DELIM); |
| for (int i = 0; i < size; i++) { |
| String dataStr = st.nextToken(); |
| Object[] obj = parseDataObject(dataStr); |
| data.add((Data) obj[1]); |
| } |
| dat = new ListObject(data, names); |
| break; |
| default: |
| throw new DMLRuntimeException("Unable to parse datatype "+datatype); |
| } |
| |
| ret[0] = name; |
| ret[1] = dat; |
| return ret; |
| } |
| |
| private static ExecutionContext parseExecutionContext(String in, Program prog) { |
| ExecutionContext ec = null; |
| String lin = in.substring(EC_BEGIN.length(),in.length()- EC_END.length()).trim(); |
| if( !lin.equals( EMPTY ) ) { |
| LocalVariableMap vars = parseVariables(lin); |
| ec = ExecutionContextFactory.createContext( false, prog ); |
| ec.setVariables(vars); |
| } |
| return ec; |
| } |
| |
| private static void parseAndSetAdditionalConfigurations(String conf) { |
| String[] statsFlag = conf.split("="); |
| DMLScript.STATISTICS = Boolean.parseBoolean(statsFlag[1]); |
| } |
| |
| ////////// |
| // CUSTOM SAFE LITERAL REPLACEMENT |
| |
| /** |
| * In-place replacement of thread ids in filenames, functions names etc |
| * |
| * @param inst instruction |
| * @param pattern ? |
| * @param replacement string replacement |
| * @return instruction |
| */ |
| private static Instruction saveReplaceThreadID( Instruction inst, String pattern, String replacement ) { |
| if ( inst instanceof VariableCPInstruction ) { //createvar, setfilename |
| //update in-memory representation |
| inst.updateInstructionThreadID(pattern, replacement); |
| } |
| //NOTE> //Rand, seq in CP not required |
| return inst; |
| } |
| |
| public static String saveReplaceFilenameThreadID(String fname, String pattern, String replace) { |
| //save replace necessary in order to account for the possibility that read variables have our prefix in the absolute path |
| //replace the last match only, because (1) we have at most one _t0 and (2) always concatenated to the end. |
| int pos = fname.lastIndexOf(pattern); |
| return ( pos < 0 ) ? fname : fname.substring(0, pos) |
| + replace + fname.substring(pos+pattern.length()); |
| } |
| |
| |
| ////////// |
| // CUSTOM HIERARCHICAL TOKENIZER |
| |
| |
| /** |
| * Custom StringTokenizer for splitting strings of hierarchies. The basic idea is to |
| * search for delim-Strings on the same hierarchy level, while delims of lower hierarchy |
| * levels are skipped. |
| * |
| */ |
| private static class HierarchyAwareStringTokenizer //extends StringTokenizer |
| { |
| private String _str = null; |
| private String _del = null; |
| private int _off = -1; |
| |
| public HierarchyAwareStringTokenizer( String in, String delim ) { |
| //super(in); |
| _str = in; |
| _del = delim; |
| _off = delim.length(); |
| } |
| |
| public boolean hasMoreTokens() { |
| return (_str.length() > 0); |
| } |
| |
| public String nextToken() { |
| int nextDelim = determineNextSameLevelIndexOf(_str, _del); |
| String token = null; |
| if(nextDelim < 0) { |
| nextDelim = _str.length(); |
| _off = 0; |
| } |
| token = _str.substring(0,nextDelim); |
| _str = _str.substring( nextDelim + _off ); |
| return token; |
| } |
| |
| private static int determineNextSameLevelIndexOf( String data, String pattern ) |
| { |
| String tmpdata = data; |
| int index = 0; |
| int count = 0; |
| int off=0,i1,i2,i3,min; |
| |
| while(true) { |
| i1 = tmpdata.indexOf(pattern); |
| i2 = tmpdata.indexOf(LEVELIN); |
| i3 = tmpdata.indexOf(LEVELOUT); |
| |
| if( i1 < 0 ) return i1; //no pattern found at all |
| |
| min = i1; //min >= 0 by definition |
| if( i2 >= 0 ) min = Math.min(min, i2); |
| if( i3 >= 0 ) min = Math.min(min, i3); |
| |
| //stack maintenance |
| if( i1 == min && count == 0 ) |
| return index+i1; |
| else if( i2 == min ) { |
| count++; |
| off = LEVELIN.length(); |
| } |
| else if( i3 == min ) { |
| count--; |
| off = LEVELOUT.length(); |
| } |
| |
| //prune investigated string |
| index += min+off; |
| tmpdata = tmpdata.substring(min+off); |
| } |
| } |
| } |
| } |