blob: 3e6f646d6ca2822a52e2fd4c21efaeb681c322dd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.lops.compile;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.OpOpData;
import org.apache.sysds.conf.DMLConfig;
import org.apache.sysds.lops.Data;
import org.apache.sysds.lops.FunctionCallCP;
import org.apache.sysds.lops.Lop;
import org.apache.sysds.lops.Lop.Type;
import org.apache.sysds.lops.LopProperties.ExecType;
import org.apache.sysds.lops.LopsException;
import org.apache.sysds.lops.OutputParameters;
import org.apache.sysds.parser.DataExpression;
import org.apache.sysds.parser.StatementBlock;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
import org.apache.sysds.runtime.instructions.CPInstructionParser;
import org.apache.sysds.runtime.instructions.Instruction;
import org.apache.sysds.runtime.instructions.Instruction.IType;
import org.apache.sysds.runtime.instructions.InstructionParser;
import org.apache.sysds.runtime.instructions.SPInstructionParser;
import org.apache.sysds.runtime.instructions.cp.CPInstruction;
import org.apache.sysds.runtime.instructions.cp.CPInstruction.CPType;
import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
*
* Class to maintain a DAG of lops and compile it into
* runtime instructions, incl piggybacking into jobs.
*
* @param <N> the class parameter has no affect and is
* only kept for documentation purposes.
*/
public class Dag<N extends Lop>
{
private static final Log LOG = LogFactory.getLog(Dag.class.getName());
private static IDSequence job_id = null;
private static IDSequence var_index = null;
private String scratch = "";
private String scratchFilePath = null;
// list of all nodes in the dag
private ArrayList<Lop> nodes = null;
static {
job_id = new IDSequence();
var_index = new IDSequence();
}
private static class NodeOutput {
FileFormat outInfo;
ArrayList<Instruction> preInstructions; // instructions added before a MR instruction
ArrayList<Instruction> lastInstructions;
NodeOutput() {
outInfo = null;
preInstructions = new ArrayList<>();
lastInstructions = new ArrayList<>();
}
public FileFormat getOutInfo() {
return outInfo;
}
public void setOutInfo(FileFormat outInfo) {
this.outInfo = outInfo;
}
public ArrayList<Instruction> getPreInstructions() {
return preInstructions;
}
public void addPreInstruction(Instruction inst) {
preInstructions.add(inst);
}
public ArrayList<Instruction> getLastInstructions() {
return lastInstructions;
}
public void addLastInstruction(Instruction inst) {
lastInstructions.add(inst);
}
}
public Dag() {
//allocate internal data structures
nodes = new ArrayList<>();
}
///////
// filename handling
private String getFilePath() {
if ( scratchFilePath == null ) {
scratchFilePath = scratch + Lop.FILE_SEPARATOR
+ Lop.PROCESS_PREFIX + DMLScript.getUUID()
+ Lop.FILE_SEPARATOR + Lop.FILE_SEPARATOR
+ Lop.CP_ROOT_THREAD_ID + Lop.FILE_SEPARATOR;
}
return scratchFilePath;
}
public static String getNextUniqueFilenameSuffix() {
return "temp" + job_id.getNextID();
}
public String getNextUniqueFilename() {
return getFilePath() + getNextUniqueFilenameSuffix();
}
public static String getNextUniqueVarname(DataType dt) {
return (dt.isMatrix() ? Lop.MATRIX_VAR_NAME_PREFIX :
dt.isFrame() ? Lop.FRAME_VAR_NAME_PREFIX :
Lop.SCALAR_VAR_NAME_PREFIX) + var_index.getNextID();
}
///////
// Dag modifications
/**
* Method to add a node to the DAG.
*
* @param node low-level operator
* @return true if node was not already present, false if not.
*/
public boolean addNode(Lop node) {
if (nodes.contains(node))
return false;
nodes.add(node);
return true;
}
/**
* Method to compile a dag generically
*
* @param sb statement block
* @param config dml configuration
* @return list of instructions
*/
public ArrayList<Instruction> getJobs(StatementBlock sb, DMLConfig config) {
if (config != null) {
scratch = config.getTextValue(DMLConfig.SCRATCH_SPACE) + "/";
}
// create ordering of lops (for MR, we sort by level, while for all
// other exec types we use a two-level sorting of )
List<Lop> node_v =
//doTopologicalSortStrictOrder(nodes) :
doTopologicalSortTwoLevelOrder(nodes);
// do greedy grouping of operations
ArrayList<Instruction> inst =
//doGreedyGrouping(sb, node_v) :
doPlainInstructionGen(sb, node_v);
// cleanup instruction (e.g., create packed rmvar instructions)
return cleanupInstructions(inst);
}
private static List<Lop> doTopologicalSortTwoLevelOrder(List<Lop> v) {
//partition nodes into leaf/inner nodes and dag root nodes,
//+ sort leaf/inner nodes by ID to force depth-first scheduling
//+ append root nodes in order of their original definition
// (which also preserves the original order of prints)
List<Lop> nodes = Stream.concat(
v.stream().filter(l -> !l.getOutputs().isEmpty()).sorted(Comparator.comparing(l -> l.getID())),
v.stream().filter(l -> l.getOutputs().isEmpty())).collect(Collectors.toList());
//NOTE: in contrast to hadoop execution modes, we avoid computing the transitive
//closure here to ensure linear time complexity because its unnecessary for CP and Spark
return nodes;
}
private ArrayList<Instruction> doPlainInstructionGen(StatementBlock sb, List<Lop> nodes)
{
//prepare basic instruction sets
List<Instruction> deleteInst = new ArrayList<>();
List<Instruction> writeInst = deleteUpdatedTransientReadVariables(sb, nodes);
List<Instruction> endOfBlockInst = generateRemoveInstructions(sb);
ArrayList<Instruction> inst = generateInstructionsForInputVariables(nodes);
// filter out non-executable nodes
List<Lop> execNodes = nodes.stream()
.filter(l -> (!l.isDataExecLocation()
|| (((Data)l).getOperationType().isWrite() && !isTransientWriteRead((Data)l))
|| (((Data)l).isPersistentRead() && l.getDataType().isScalar())))
.collect(Collectors.toList());
// generate executable instruction
generateControlProgramJobs(execNodes, inst, writeInst, deleteInst);
// add write and delete inst at the very end.
inst.addAll(writeInst);
inst.addAll(deleteInst);
inst.addAll(endOfBlockInst);
return inst;
}
private static boolean isTransientWriteRead(Data dnode) {
Lop input = dnode.getInputs().get(0);
return dnode.getOperationType().isTransient()
&& input.isDataExecLocation() && ((Data)input).getOperationType().isTransient()
&& dnode.getOutputParameters().getLabel().equals(input.getOutputParameters().getLabel());
}
private static List<Instruction> deleteUpdatedTransientReadVariables(StatementBlock sb, List<Lop> nodeV) {
List<Instruction> insts = new ArrayList<>();
if ( sb == null ) //return modifiable list
return insts;
if( LOG.isTraceEnabled() )
LOG.trace("In delete updated variables");
// CANDIDATE list of variables which could have been updated in this statement block
HashMap<String, Lop> labelNodeMapping = new HashMap<>();
// ACTUAL list of variables whose value is updated, AND the old value of the variable
// is no longer accessible/used.
HashSet<String> updatedLabels = new HashSet<>();
HashMap<String, Lop> updatedLabelsLineNum = new HashMap<>();
// first capture all transient read variables
for ( Lop node : nodeV ) {
if (node.isDataExecLocation()
&& ((Data) node).getOperationType().isTransient()
&& ((Data) node).getOperationType().isRead()
&& ((Data) node).getDataType() == DataType.MATRIX) {
// "node" is considered as updated ONLY IF the old value is not used any more
// So, make sure that this READ node does not feed into any (transient/persistent) WRITE
boolean hasWriteParent=false;
for(Lop p : node.getOutputs()) {
if(p.isDataExecLocation()) {
// if the "p" is of type Data, then it has to be a WRITE
hasWriteParent = true;
break;
}
}
if ( !hasWriteParent ) {
// node has no parent of type WRITE, so this is a CANDIDATE variable
// add it to labelNodeMapping so that it is considered in further processing
labelNodeMapping.put(node.getOutputParameters().getLabel(), node);
}
}
}
// capture updated transient write variables
for ( Lop node : nodeV ) {
if (node.isDataExecLocation()
&& ((Data) node).getOperationType().isTransient()
&& ((Data) node).getOperationType().isWrite()
&& ((Data) node).getDataType() == DataType.MATRIX
&& labelNodeMapping.containsKey(node.getOutputParameters().getLabel()) // check to make sure corresponding (i.e., with the same label/name) transient read is present
&& !labelNodeMapping.containsValue(node.getInputs().get(0)) ){ // check to avoid cases where transient read feeds into a transient write
updatedLabels.add(node.getOutputParameters().getLabel());
updatedLabelsLineNum.put(node.getOutputParameters().getLabel(), node);
}
}
// generate RM instructions
Instruction rm_inst = null;
for ( String label : updatedLabels ) {
rm_inst = VariableCPInstruction.prepareRemoveInstruction(label);
rm_inst.setLocation(updatedLabelsLineNum.get(label));
if( LOG.isTraceEnabled() )
LOG.trace(rm_inst.toString());
insts.add(rm_inst);
}
return insts;
}
private static List<Instruction> generateRemoveInstructions(StatementBlock sb) {
if ( sb == null )
return Collections.emptyList();
ArrayList<Instruction> insts = new ArrayList<>();
if( LOG.isTraceEnabled() )
LOG.trace("In generateRemoveInstructions()");
// RULE 1: if in IN and not in OUT, then there should be an rmvar or rmfilevar inst
// (currently required for specific cases of external functions)
for (String varName : sb.liveIn().getVariableNames()) {
if (!sb.liveOut().containsVariable(varName)) {
Instruction inst = VariableCPInstruction.prepareRemoveInstruction(varName);
inst.setLocation(sb.getFilename(), sb.getEndLine(), sb.getEndLine(), -1, -1);
insts.add(inst);
if( LOG.isTraceEnabled() )
LOG.trace(" Adding " + inst.toString());
}
}
return insts;
}
/**
* Method to generate createvar instructions, which creates a new entry
* in the symbol table. One instruction is generated for every LOP that is
* 1) type Data and
* 2) persistent and
* 3) matrix and
* 4) read
*
* Transient reads needn't be considered here since the previous program
* block would already create appropriate entries in the symbol table.
*
* @param nodes_v list of nodes
* @return list of instructions
*/
private static ArrayList<Instruction> generateInstructionsForInputVariables(List<Lop> nodes_v) {
ArrayList<Instruction> insts = new ArrayList<>();
for(Lop n : nodes_v) {
if (n.isDataExecLocation()
&& !((Data) n).getOperationType().isTransient()
&& ((Data) n).getOperationType().isRead()
&& (n.getDataType() == DataType.MATRIX || n.getDataType() == DataType.FRAME) )
{
if ( !((Data)n).isLiteral() ) {
try {
String inst_string = n.getInstructions();
CPInstruction currInstr = CPInstructionParser.parseSingleInstruction(inst_string);
currInstr.setLocation(n);
// TODO find a more direct way of communicating the privacy constraints
// (visible to runtime explain); This change should apply to all occurrences.
currInstr.setPrivacyConstraint(n);
insts.add(currInstr);
} catch (DMLRuntimeException e) {
throw new LopsException(n.printErrorLocation() + "error generating instructions from input variables in Dag -- \n", e);
}
}
}
}
return insts;
}
/**
* Exclude rmvar instruction for varname from deleteInst, if exists
*
* @param varName variable name
* @param deleteInst list of instructions
*/
private static void excludeRemoveInstruction(String varName, List<Instruction> deleteInst) {
for(int i=0; i < deleteInst.size(); i++) {
Instruction inst = deleteInst.get(i);
if ((inst.getType() == IType.CONTROL_PROGRAM || inst.getType() == IType.SPARK)
&& ((CPInstruction)inst).getCPInstructionType() == CPType.Variable
&& ((VariableCPInstruction)inst).isRemoveVariable(varName) ) {
deleteInst.remove(i);
}
}
}
/**
* Generate rmvar instructions for the inputs, if their consumer count becomes zero.
*
* @param node low-level operator
* @param inst list of instructions
* @param delteInst list of instructions
*/
private static void processConsumersForInputs(Lop node, List<Instruction> inst, List<Instruction> delteInst) {
// reduce the consumer count for all input lops
// if the count becomes zero, then then variable associated w/ input can be removed
for(Lop in : node.getInputs() )
processConsumers(in, inst, delteInst, null);
}
private static void processConsumers(Lop node, List<Instruction> inst, List<Instruction> deleteInst, Lop locationInfo) {
// reduce the consumer count for all input lops
// if the count becomes zero, then then variable associated w/ input can be removed
if ( node.removeConsumer() == 0 ) {
if ( node.isDataExecLocation() && ((Data)node).isLiteral() ) {
return;
}
String label = node.getOutputParameters().getLabel();
Instruction currInstr = VariableCPInstruction.prepareRemoveInstruction(label);
if (locationInfo != null)
currInstr.setLocation(locationInfo);
else
{
currInstr.setLocation(node);
currInstr.setPrivacyConstraint(node);
}
inst.add(currInstr);
excludeRemoveInstruction(label, deleteInst);
}
}
/**
* Method to generate instructions that are executed in Control Program. At
* this point, this DAG has no dependencies on the MR dag. ie. none of the
* inputs are outputs of MR jobs
*
* @param execNodes list of low-level operators
* @param inst list of instructions
* @param writeInst list of write instructions
* @param deleteInst list of delete instructions
*/
private void generateControlProgramJobs(List<Lop> execNodes,
List<Instruction> inst, List<Instruction> writeInst, List<Instruction> deleteInst) {
// nodes to be deleted from execnodes
ArrayList<Lop> markedNodes = new ArrayList<>();
// variable names to be deleted
ArrayList<String> var_deletions = new ArrayList<>();
HashMap<String, Lop> var_deletionsLineNum = new HashMap<>();
boolean doRmVar = false;
for (int i = 0; i < execNodes.size(); i++) {
Lop node = execNodes.get(i);
doRmVar = false;
// mark input scalar read nodes for deletion
if (node.isDataExecLocation()
&& ((Data) node).getOperationType().isRead()
&& ((Data) node).getDataType() == DataType.SCALAR
&& node.getOutputParameters().getFile_name() == null ) {
markedNodes.add(node);
continue;
}
// output scalar instructions and mark nodes for deletion
if (!node.isDataExecLocation()) {
if (node.getDataType() == DataType.SCALAR) {
// Output from lops with SCALAR data type must
// go into Temporary Variables (Var0, Var1, etc.)
NodeOutput out = setupNodeOutputs(node, ExecType.CP, false, false);
inst.addAll(out.getPreInstructions()); // dummy
deleteInst.addAll(out.getLastInstructions());
} else {
// Output from lops with non-SCALAR data type must
// go into Temporary Files (temp0, temp1, etc.)
NodeOutput out = setupNodeOutputs(node, ExecType.CP, false, false);
inst.addAll(out.getPreInstructions());
boolean hasTransientWriteParent = false;
for ( Lop parent : node.getOutputs() ) {
if ( parent.isDataExecLocation()
&& ((Data)parent).getOperationType().isWrite()
&& ((Data)parent).getOperationType().isTransient() ) {
hasTransientWriteParent = true;
break;
}
}
if ( !hasTransientWriteParent ) {
deleteInst.addAll(out.getLastInstructions());
}
else {
var_deletions.add(node.getOutputParameters().getLabel());
var_deletionsLineNum.put(node.getOutputParameters().getLabel(), node);
}
}
String inst_string = "";
// Lops with arbitrary number of inputs (ParameterizedBuiltin, GroupedAggregate, DataGen)
// are handled separately, by simply passing ONLY the output variable to getInstructions()
if (node.getType() == Lop.Type.ParameterizedBuiltin
|| node.getType() == Lop.Type.GroupedAgg
|| node.getType() == Lop.Type.DataGen){
inst_string = node.getInstructions(node.getOutputParameters().getLabel());
}
// Lops with arbitrary number of inputs and outputs are handled
// separately as well by passing arrays of inputs and outputs
else if ( node.getType() == Lop.Type.FunctionCallCP )
{
String[] inputs = new String[node.getInputs().size()];
String[] outputs = new String[node.getOutputs().size()];
int count = 0;
for( Lop in : node.getInputs() )
inputs[count++] = in.getOutputParameters().getLabel();
count = 0;
for( Lop out : node.getOutputs() )
outputs[count++] = out.getOutputParameters().getLabel();
inst_string = node.getInstructions(inputs, outputs);
}
else if (node.getType() == Lop.Type.Nary) {
String[] inputs = new String[node.getInputs().size()];
int count = 0;
for( Lop in : node.getInputs() )
inputs[count++] = in.getOutputParameters().getLabel();
inst_string = node.getInstructions(inputs,
node.getOutputParameters().getLabel());
}
else {
if ( node.getInputs().isEmpty() ) {
// currently, such a case exists only for Rand lop
inst_string = node.getInstructions(node.getOutputParameters().getLabel());
}
else if (node.getInputs().size() == 1) {
inst_string = node.getInstructions(node.getInputs()
.get(0).getOutputParameters().getLabel(),
node.getOutputParameters().getLabel());
}
else if (node.getInputs().size() == 2) {
inst_string = node.getInstructions(
node.getInputs().get(0).getOutputParameters().getLabel(),
node.getInputs().get(1).getOutputParameters().getLabel(),
node.getOutputParameters().getLabel());
}
else if (node.getInputs().size() == 3 || node.getType() == Type.Ctable) {
inst_string = node.getInstructions(
node.getInputs().get(0).getOutputParameters().getLabel(),
node.getInputs().get(1).getOutputParameters().getLabel(),
node.getInputs().get(2).getOutputParameters().getLabel(),
node.getOutputParameters().getLabel());
}
else if (node.getInputs().size() == 4) {
inst_string = node.getInstructions(
node.getInputs().get(0).getOutputParameters().getLabel(),
node.getInputs().get(1).getOutputParameters().getLabel(),
node.getInputs().get(2).getOutputParameters().getLabel(),
node.getInputs().get(3).getOutputParameters().getLabel(),
node.getOutputParameters().getLabel());
}
else if (node.getInputs().size() == 5) {
inst_string = node.getInstructions(
node.getInputs().get(0).getOutputParameters().getLabel(),
node.getInputs().get(1).getOutputParameters().getLabel(),
node.getInputs().get(2).getOutputParameters().getLabel(),
node.getInputs().get(3).getOutputParameters().getLabel(),
node.getInputs().get(4).getOutputParameters().getLabel(),
node.getOutputParameters().getLabel());
}
else if (node.getInputs().size() == 6) {
inst_string = node.getInstructions(
node.getInputs().get(0).getOutputParameters().getLabel(),
node.getInputs().get(1).getOutputParameters().getLabel(),
node.getInputs().get(2).getOutputParameters().getLabel(),
node.getInputs().get(3).getOutputParameters().getLabel(),
node.getInputs().get(4).getOutputParameters().getLabel(),
node.getInputs().get(5).getOutputParameters().getLabel(),
node.getOutputParameters().getLabel());
}
else if (node.getInputs().size() == 7) {
inst_string = node.getInstructions(
node.getInputs().get(0).getOutputParameters().getLabel(),
node.getInputs().get(1).getOutputParameters().getLabel(),
node.getInputs().get(2).getOutputParameters().getLabel(),
node.getInputs().get(3).getOutputParameters().getLabel(),
node.getInputs().get(4).getOutputParameters().getLabel(),
node.getInputs().get(5).getOutputParameters().getLabel(),
node.getInputs().get(6).getOutputParameters().getLabel(),
node.getOutputParameters().getLabel());
}
else {
String[] inputs = new String[node.getInputs().size()];
for( int j=0; j<node.getInputs().size(); j++ )
inputs[j] = node.getInputs().get(j).getOutputParameters().getLabel();
inst_string = node.getInstructions(inputs,
node.getOutputParameters().getLabel());
}
}
try {
if( LOG.isTraceEnabled() )
LOG.trace("Generating instruction - "+ inst_string);
Instruction currInstr = InstructionParser.parseSingleInstruction(inst_string);
if(currInstr == null) {
throw new LopsException("Error parsing the instruction:" + inst_string);
}
if (node._beginLine != 0)
{
currInstr.setLocation(node);
currInstr.setPrivacyConstraint(node);
}
else if ( !node.getOutputs().isEmpty() )
{
currInstr.setLocation(node.getOutputs().get(0));
currInstr.setPrivacyConstraint(node.getOutputs().get(0));
}
else if ( !node.getInputs().isEmpty() )
{
currInstr.setLocation(node.getInputs().get(0));
currInstr.setPrivacyConstraint(node.getInputs().get(0));
}
inst.add(currInstr);
} catch (Exception e) {
throw new LopsException(node.printErrorLocation() + "Problem generating simple inst - "
+ inst_string, e);
}
markedNodes.add(node);
doRmVar = true;
}
else if (node.isDataExecLocation() ) {
Data dnode = (Data)node;
OpOpData op = dnode.getOperationType();
if ( op.isWrite() ) {
NodeOutput out = null;
out = setupNodeOutputs(node, ExecType.CP, false, false);
if ( dnode.getDataType() == DataType.SCALAR ) {
// processing is same for both transient and persistent scalar writes
writeInst.addAll(out.getLastInstructions());
doRmVar = false;
}
else {
// setupNodeOutputs() handles both transient and persistent matrix writes
if ( dnode.getOperationType().isTransient() ) {
deleteInst.addAll(out.getLastInstructions());
doRmVar = false;
}
else {
// In case of persistent write lop, write instruction will be generated
// and that instruction must be added to <code>inst</code> so that it gets
// executed immediately. If it is added to <code>deleteInst</code> then it
// gets executed at the end of program block's execution
inst.addAll(out.getLastInstructions());
doRmVar = true;
}
}
markedNodes.add(node);
}
else {
// generate a temp label to hold the value that is read from HDFS
if ( node.getDataType() == DataType.SCALAR ) {
node.getOutputParameters().setLabel(Lop.SCALAR_VAR_NAME_PREFIX + var_index.getNextID());
String io_inst = node.getInstructions(node.getOutputParameters().getLabel(),
node.getOutputParameters().getFile_name());
CPInstruction currInstr = CPInstructionParser.parseSingleInstruction(io_inst);
currInstr.setLocation(node);
inst.add(currInstr);
Instruction tempInstr = VariableCPInstruction.prepareRemoveInstruction(node.getOutputParameters().getLabel());
tempInstr.setLocation(node);
deleteInst.add(tempInstr);
}
else {
throw new LopsException("Matrix READs are not handled in CP yet!");
}
markedNodes.add(node);
doRmVar = true;
}
}
// see if rmvar instructions can be generated for node's inputs
if(doRmVar)
processConsumersForInputs(node, inst, deleteInst);
doRmVar = false;
}
for ( String var : var_deletions ) {
Instruction rmInst = VariableCPInstruction.prepareRemoveInstruction(var);
if( LOG.isTraceEnabled() )
LOG.trace(" Adding var_deletions: " + rmInst.toString());
rmInst.setLocation(var_deletionsLineNum.get(var));
deleteInst.add(rmInst);
}
// delete all marked nodes
for ( Lop node : markedNodes ) {
execNodes.remove(node);
}
}
/**
* Method that determines the output format for a given node.
*
* @param node low-level operator
* @param cellModeOverride override mode
* @return output info
*/
private static FileFormat getOutputFileFormat(Lop node, boolean cellModeOverride)
{
if ( (node.getDataType() == DataType.SCALAR && node.getExecType() == ExecType.CP)
|| node instanceof FunctionCallCP )
return null;
OutputParameters oparams = node.getOutputParameters();
return oparams.getFormat();
}
private static String prepareAssignVarInstruction(Lop input, Lop node) {
StringBuilder sb = new StringBuilder();
sb.append(ExecType.CP);
sb.append(Lop.OPERAND_DELIMITOR);
sb.append("assignvar");
sb.append(Lop.OPERAND_DELIMITOR);
sb.append( input.prepScalarInputOperand(ExecType.CP) );
sb.append(Lop.OPERAND_DELIMITOR);
sb.append(node.prepOutputOperand());
return sb.toString();
}
/**
* Method to setup output filenames and outputInfos, and to generate related instructions
*
* @param node low-level operator
* @param et exec type
* @param cellModeOverride override mode
* @param copyTWrite ?
* @return node output
*/
private NodeOutput setupNodeOutputs(Lop node, ExecType et, boolean cellModeOverride, boolean copyTWrite) {
OutputParameters oparams = node.getOutputParameters();
NodeOutput out = new NodeOutput();
node.setConsumerCount(node.getOutputs().size());
// Compute the output format for this node
out.setOutInfo(getOutputFileFormat(node, cellModeOverride));
// If node is NOT of type Data then we must generate
// a variable to hold the value produced by this node
// note: functioncallcp requires no createvar, rmvar since
// since outputs are explicitly specified
if( !node.isDataExecLocation() )
{
if (node.getDataType() == DataType.SCALAR || node.getDataType() == DataType.LIST) {
oparams.setLabel(Lop.SCALAR_VAR_NAME_PREFIX + var_index.getNextID());
Instruction currInstr = VariableCPInstruction.prepareRemoveInstruction(oparams.getLabel());
currInstr.setLocation(node);
currInstr.setPrivacyConstraint(node);
out.addLastInstruction(currInstr);
}
else if(!(node instanceof FunctionCallCP)) //general case
{
// generate temporary filename and a variable name to hold the
// output produced by "rootNode"
oparams.setFile_name(getNextUniqueFilename());
oparams.setLabel(getNextUniqueVarname(node.getDataType()));
// generate an instruction that creates a symbol table entry for the new variable
//String createInst = prepareVariableInstruction("createvar", node);
//out.addPreInstruction(CPInstructionParser.parseSingleInstruction(createInst));
int blen = (int) oparams.getBlocksize();
Instruction createvarInst = VariableCPInstruction.prepCreatevarInstruction(
oparams.getLabel(), oparams.getFile_name(), true, node.getDataType(),
getOutputFileFormat(node, false).toString(),
new MatrixCharacteristics(oparams.getNumRows(), oparams.getNumCols(), blen, oparams.getNnz()),
oparams.getUpdateType());
createvarInst.setLocation(node);
createvarInst.setPrivacyConstraint(node);
out.addPreInstruction(createvarInst);
// temp file as well as the variable has to be deleted at the end
Instruction currInstr = VariableCPInstruction.prepareRemoveInstruction(oparams.getLabel());
currInstr.setLocation(node);
currInstr.setPrivacyConstraint(node);
out.addLastInstruction(currInstr);
}
else {
// If the function call is set with output lops (e.g., multi return builtin),
// generate a createvar instruction for each function output
// (except for remove, which creates list outputs, i.e., meta data objects)
FunctionCallCP fcall = (FunctionCallCP) node;
if ( fcall.getFunctionOutputs() != null && fcall.requiresOutputCreateVar() ) {
for( Lop fnOut: fcall.getFunctionOutputs()) {
OutputParameters fnOutParams = fnOut.getOutputParameters();
//OutputInfo oinfo = getOutputInfo((N)fnOut, false);
Instruction createvarInst = VariableCPInstruction.prepCreatevarInstruction(
fnOutParams.getLabel(), getFilePath() + fnOutParams.getLabel(),
true, fnOut.getDataType(), getOutputFileFormat(fnOut, false).toString(),
new MatrixCharacteristics(fnOutParams.getNumRows(), fnOutParams.getNumCols(), (int)fnOutParams.getBlocksize(), fnOutParams.getNnz()),
oparams.getUpdateType());
if (node._beginLine != 0){
createvarInst.setLocation(node);
createvarInst.setPrivacyConstraint(node);
}
else {
createvarInst.setLocation(fnOut);
createvarInst.setPrivacyConstraint(fnOut);
}
out.addPreInstruction(createvarInst);
}
}
}
}
// rootNode is of type Data
else {
if ( node.getDataType() == DataType.SCALAR ) {
// generate assignment operations for final and transient writes
if ( oparams.getFile_name() == null && !(node instanceof Data && ((Data)node).isPersistentWrite()) ) {
String io_inst = prepareAssignVarInstruction(node.getInputs().get(0), node);
CPInstruction currInstr = CPInstructionParser.parseSingleInstruction(io_inst);
if (node._beginLine != 0)
currInstr.setLocation(node);
else if ( !node.getInputs().isEmpty() )
currInstr.setLocation(node.getInputs().get(0));
out.addLastInstruction(currInstr);
}
else {
//CP PERSISTENT WRITE SCALARS
Lop fname = ((Data)node).getNamedInputLop(DataExpression.IO_FILENAME);
String io_inst = node.getInstructions(node.getInputs().get(0).getOutputParameters().getLabel(), fname.getOutputParameters().getLabel());
CPInstruction currInstr = CPInstructionParser.parseSingleInstruction(io_inst);
if (node._beginLine != 0)
currInstr.setLocation(node);
else if ( !node.getInputs().isEmpty() )
currInstr.setLocation(node.getInputs().get(0));
out.addLastInstruction(currInstr);
}
}
else {
if ( ((Data)node).getOperationType().isTransient() ) {
if ( et == ExecType.CP ) {
// If transient matrix write is in CP then its input MUST be executed in CP as well.
// get variable and filename associated with the input
String inputVarName = node.getInputs().get(0).getOutputParameters().getLabel();
String constVarName = oparams.getLabel();
/*
* Symbol Table state must change as follows:
*
* FROM:
* mvar1 -> temp21
*
* TO:
* mVar1 -> temp21
* tVarH -> temp21
*/
Instruction currInstr = VariableCPInstruction.prepareCopyInstruction(inputVarName, constVarName);
currInstr.setLocation(node);
out.addLastInstruction(currInstr);
}
else {
if(copyTWrite) {
Instruction currInstr = VariableCPInstruction.prepareCopyInstruction(node.getInputs().get(0).getOutputParameters().getLabel(), oparams.getLabel());
currInstr.setLocation(node);
out.addLastInstruction(currInstr);
return out;
}
/*
* Since the "rootNode" is a transient data node, we first need to generate a
* temporary filename as well as a variable name to hold the <i>immediate</i>
* output produced by "rootNode". These generated HDFS filename and the
* variable name must be changed at the end of an iteration/program block
* so that the subsequent iteration/program block can correctly access the
* generated data. Therefore, we need to distinguish between the following:
*
* 1) Temporary file name & variable name: They hold the immediate output
* produced by "rootNode". Both names are generated below.
*
* 2) Constant file name & variable name: They are constant across iterations.
* Variable name is given by rootNode's label that is created in the upper layers.
* File name is generated by concatenating "temporary file name" and "constant variable name".
*
* Temporary files must be moved to constant files at the end of the iteration/program block.
*/
// generate temporary filename & var name
String tempVarName = oparams.getLabel() + "temp";
String tempFileName = getNextUniqueFilename();
int blen = (int) oparams.getBlocksize();
Instruction createvarInst = VariableCPInstruction.prepCreatevarInstruction(
tempVarName, tempFileName, true, node.getDataType(), out.getOutInfo().toString(),
new MatrixCharacteristics(oparams.getNumRows(), oparams.getNumCols(), blen, oparams.getNnz()),
oparams.getUpdateType());
createvarInst.setLocation(node);
out.addPreInstruction(createvarInst);
String constVarName = oparams.getLabel();
String constFileName = tempFileName + constVarName;
oparams.setFile_name(getFilePath() + constFileName);
/*
* Since this is a node that denotes a transient read/write, we need to make sure
* that the data computed for a given variable in a given iteration is passed on
* to the next iteration. This is done by generating miscellaneous instructions
* that gets executed at the end of the program block.
*
* The state of the symbol table must change
*
* FROM:
* tVarA -> temp21tVarA (old copy of temp21)
* tVarAtemp -> temp21 (new copy that should override the old copy)
*
* TO:
* tVarA -> temp21tVarA
*/
// Generate a single mvvar instruction (e.g., mvvar tempA A)
// instead of two instructions "cpvar tempA A" and "rmvar tempA"
Instruction currInstr = VariableCPInstruction.prepMoveInstruction(tempVarName, constVarName);
currInstr.setLocation(node);
out.addLastInstruction(currInstr);
}
}
// rootNode is not a transient write. It is a persistent write.
else {
{ //CP PERSISTENT WRITE
// generate a write instruction that writes matrix to HDFS
Lop fname = ((Data)node).getNamedInputLop(DataExpression.IO_FILENAME);
String io_inst = node.getInstructions(
node.getInputs().get(0).getOutputParameters().getLabel(),
fname.getOutputParameters().getLabel());
Instruction currInstr = (node.getExecType() == ExecType.SPARK) ?
SPInstructionParser.parseSingleInstruction(io_inst) :
CPInstructionParser.parseSingleInstruction(io_inst);
Lop useNode = (!node.getInputs().isEmpty()
&& node.getInputs().get(0)._beginLine != 0) ? node.getInputs().get(0) : node;
currInstr.setLocation(useNode);
currInstr.setPrivacyConstraint(useNode);
out.addLastInstruction(currInstr);
}
}
}
}
return out;
}
/**
* Performs various cleanups on the list of instructions in order to reduce the
* number of instructions to simply debugging and reduce interpretation overhead.
*
* @param insts list of instructions
* @return new list of potentially modified instructions
*/
private static ArrayList<Instruction> cleanupInstructions(List<Instruction> insts) {
//step 1: create mvvar instructions: assignvar s1 s2, rmvar s1 -> mvvar s1 s2,
// cpvar m1 m2, rmvar m1 --> mvvar m1 m2
List<Instruction> tmp1 = collapseAssignvarAndRmvarInstructions(insts);
//step 2: create packed rmvar instructions: rmvar m1, rmvar m2 -> rmvar m1 m2
ArrayList<Instruction> tmp2 = createPackedRmvarInstructions(tmp1);
return tmp2;
}
private static List<Instruction> collapseAssignvarAndRmvarInstructions(List<Instruction> insts) {
ArrayList<Instruction> ret = new ArrayList<>();
Iterator<Instruction> iter = insts.iterator();
while( iter.hasNext() ) {
Instruction inst = iter.next();
if( iter.hasNext() && inst instanceof VariableCPInstruction
&& ((VariableCPInstruction)inst).isAssignOrCopyVariable() ) {
VariableCPInstruction inst1 = (VariableCPInstruction) inst;
Instruction inst2 = iter.next();
if( inst2 instanceof VariableCPInstruction
&& ((VariableCPInstruction)inst2).isRemoveVariableNoFile()
&& inst1.getInput1().getName().equals(
((VariableCPInstruction)inst2).getInput1().getName()) ) {
ret.add(VariableCPInstruction.prepMoveInstruction(
inst1.getInput1().getName(), inst1.getInput2().getName()));
}
else {
ret.add(inst1);
ret.add(inst2);
}
}
else {
ret.add(inst);
}
}
return ret;
}
private static ArrayList<Instruction> createPackedRmvarInstructions(List<Instruction> insts) {
ArrayList<Instruction> ret = new ArrayList<>();
ArrayList<String> currRmVar = new ArrayList<>();
for( Instruction inst : insts ) {
if( inst instanceof VariableCPInstruction
&& ((VariableCPInstruction)inst).isRemoveVariableNoFile() ) {
//collect all subsequent rmvar instructions
currRmVar.add(((VariableCPInstruction)inst).getInput1().getName());
}
else {
//construct packed rmvar instruction
if( !currRmVar.isEmpty() ) {
ret.add(VariableCPInstruction.prepareRemoveInstruction(
currRmVar.toArray(new String[0])));
currRmVar.clear();
}
//add other instruction
ret.add(inst);
}
}
//construct last packed rmvar instruction
if( !currRmVar.isEmpty() ) {
ret.add(VariableCPInstruction.prepareRemoveInstruction(
currRmVar.toArray(new String[0])));
}
return ret;
}
}