[SYSTEMDS-2650] Re-computation from lineage with dedup
This patch adds the below changes.
- We now compile all the dedup patches into functions,
- The main program places a function call for each dedupOp
and calls the corresponding function,
- Move the recomputation related code to a new class,
- Add a new test class to match the recomputed results
with the original outputs.
Current code doesn't support multiple loops. Future commits
will add optimizations to construct multi-return functions
(instead of one per output variable), and compile sequence
of equivalent function calls into loops.
diff --git a/src/main/java/org/apache/sysds/parser/DataIdentifier.java b/src/main/java/org/apache/sysds/parser/DataIdentifier.java
index b58b0d9..22002d6 100644
--- a/src/main/java/org/apache/sysds/parser/DataIdentifier.java
+++ b/src/main/java/org/apache/sysds/parser/DataIdentifier.java
@@ -20,6 +20,7 @@
package org.apache.sysds.parser;
import org.apache.sysds.common.Types.DataType;
+import org.apache.sysds.common.Types.ValueType;
public class DataIdentifier extends Identifier
{
@@ -44,6 +45,12 @@
this(name);
_dataType = dt;
}
+
+ public DataIdentifier(String name, DataType dt, ValueType vt){
+ this(name);
+ _dataType = dt;
+ _valueType = vt;
+ }
public DataIdentifier(){
_name = null;
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java
index 6acc248..230e5e1 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java
@@ -20,6 +20,7 @@
package org.apache.sysds.runtime.lineage;
import java.util.ArrayList;
+import java.util.Map;
import org.apache.sysds.runtime.controlprogram.BasicProgramBlock;
import org.apache.sysds.runtime.controlprogram.ForProgramBlock;
@@ -28,8 +29,10 @@
import org.apache.sysds.runtime.controlprogram.ProgramBlock;
import org.apache.sysds.runtime.controlprogram.WhileProgramBlock;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.utils.Explain;
public class LineageDedupUtils {
+ public static final String DEDUP_DELIM = "_";
private static Lineage _tmpLineage = null;
private static Lineage _mainLineage = null;
private static ArrayList<Long> _numDistinctPaths = new ArrayList<>();
@@ -131,7 +134,38 @@
_tmpLineage.clearLineageMap();
_tmpLineage.clearDedupBlock();
}
+
+ public static String mergeExplainDedupBlocks(ExecutionContext ec) {
+ Map<ProgramBlock, LineageDedupBlock> dedupBlocks = ec.getLineage().getDedupBlocks();
+ StringBuilder sb = new StringBuilder();
+ // Gather all the DAG roots of all the paths in all the loops.
+ for (Map.Entry<ProgramBlock, LineageDedupBlock> dblock : dedupBlocks.entrySet()) {
+ if (dblock.getValue() != null) {
+ String forKey = dblock.getKey().getStatementBlock().getName();
+ LineageDedupBlock dedup = dblock.getValue();
+ for (Map.Entry<Long, LineageMap> patch : dedup.getPathMaps().entrySet()) {
+ for (Map.Entry<String, LineageItem> root : patch.getValue().getTraces().entrySet()) {
+ // Encode all the information in the headers that're
+ // needed by the deserialization logic.
+ sb.append("patch");
+ sb.append(DEDUP_DELIM);
+ sb.append(root.getKey());
+ sb.append(DEDUP_DELIM);
+ sb.append(forKey);
+ sb.append(DEDUP_DELIM);
+ sb.append(patch.getKey());
+ sb.append("\n");
+ sb.append(Explain.explain(root.getValue()));
+ sb.append("\n");
+
+ }
+ }
+ }
+ }
+ return sb.toString();
+ }
+ //------------------------------------------------------------------------------
/* The below static functions help to compute the number of distinct paths
* in any program block, and are used for diagnostic purposes. These will
* be removed in future.
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java
index 2a9891a..88f2fb8 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java
@@ -195,6 +195,10 @@
return !_opcode.isEmpty();
}
+ public boolean isDedup() {
+ return _opcode.startsWith(dedupItemOpcode);
+ }
+
/**
* Non-recursive equivalent of {@link #resetVisitStatus()}
* for robustness with regard to stack overflow errors.
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java
index 9a96828..f40582d 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java
@@ -19,7 +19,6 @@
package org.apache.sysds.runtime.lineage;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
@@ -30,23 +29,10 @@
import org.apache.sysds.runtime.lineage.LineageItem.LineageItemType;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.AggOp;
-import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.Direction;
-import org.apache.sysds.common.Types.OpOp1;
-import org.apache.sysds.common.Types.OpOp2;
-import org.apache.sysds.common.Types.OpOp3;
-import org.apache.sysds.common.Types.OpOpDG;
-import org.apache.sysds.common.Types.OpOpData;
-import org.apache.sysds.common.Types.OpOpN;
-import org.apache.sysds.common.Types.ParamBuiltinOp;
-import org.apache.sysds.common.Types.ReOrgOp;
-import org.apache.sysds.common.Types.ValueType;
-import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.AggBinaryOp;
import org.apache.sysds.hops.AggUnaryOp;
import org.apache.sysds.hops.BinaryOp;
-import org.apache.sysds.hops.DataGenOp;
-import org.apache.sysds.hops.DataOp;
import org.apache.sysds.hops.Hop;
import org.apache.sysds.hops.IndexingOp;
import org.apache.sysds.hops.LiteralOp;
@@ -54,41 +40,26 @@
import org.apache.sysds.hops.TernaryOp;
import org.apache.sysds.hops.UnaryOp;
import org.apache.sysds.hops.codegen.SpoofFusedOp;
-import org.apache.sysds.hops.rewrite.HopRewriteUtils;
-import org.apache.sysds.lops.Lop;
import org.apache.sysds.lops.PartialAggregate;
import org.apache.sysds.lops.UnaryCP;
import org.apache.sysds.lops.compile.Dag;
-import org.apache.sysds.parser.DataExpression;
-import org.apache.sysds.parser.DataIdentifier;
-import org.apache.sysds.parser.Statement;
import org.apache.sysds.runtime.DMLRuntimeException;
-import org.apache.sysds.runtime.controlprogram.BasicProgramBlock;
-import org.apache.sysds.runtime.controlprogram.Program;
-import org.apache.sysds.runtime.controlprogram.ProgramBlock;
import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory;
import org.apache.sysds.runtime.instructions.Instruction;
import org.apache.sysds.runtime.instructions.InstructionParser;
-import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.instructions.cp.DataGenCPInstruction;
import org.apache.sysds.runtime.instructions.cp.ScalarObject;
-import org.apache.sysds.runtime.instructions.cp.ScalarObjectFactory;
import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
-import org.apache.sysds.runtime.instructions.spark.SPInstruction.SPType;
-import org.apache.sysds.runtime.instructions.cp.CPInstruction.CPType;
import org.apache.sysds.runtime.util.HDFSTool;
-import org.apache.sysds.utils.Explain;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -97,9 +68,7 @@
public class LineageItemUtils {
- private static final String LVARPREFIX = "lvar";
public static final String LPLACEHOLDER = "IN#";
- public static final String DEDUP_DELIM = "_";
public static LineageItemType getType(String str) {
if (str.length() == 1) {
@@ -159,284 +128,11 @@
return sb.toString().trim();
}
- public static Data computeByLineage(LineageItem root) {
- long rootId = root.getOpcode().equals("write") ?
- root.getInputs()[0].getId() : root.getId();
- String varname = LVARPREFIX + rootId;
-
- //recursively construct hops
- root.resetVisitStatusNR();
- Map<Long, Hop> operands = new HashMap<>();
- rConstructHops(root, operands);
- Hop out = HopRewriteUtils.createTransientWrite(
- varname, operands.get(rootId));
-
- //generate instructions for temporary hops
- ExecutionContext ec = ExecutionContextFactory.createContext();
- BasicProgramBlock pb = new BasicProgramBlock(new Program());
- Dag<Lop> dag = new Dag<>();
- Lop lops = out.constructLops();
- lops.addToDag(dag);
- pb.setInstructions(dag.getJobs(null,
- ConfigurationManager.getDMLConfig()));
-
- // reset cache due to cleaned data objects
- LineageCache.resetCache();
- //execute instructions and get result
- pb.execute(ec);
- return ec.getVariable(varname);
- }
-
public static LineageItem[] getLineage(ExecutionContext ec, CPOperand... operands) {
return Arrays.stream(operands).filter(c -> c!=null)
.map(c -> ec.getLineage().getOrCreate(c)).toArray(LineageItem[]::new);
}
- private static void rConstructHops(LineageItem item, Map<Long, Hop> operands) {
- if (item.isVisited())
- return;
-
- //recursively process children (ordering by data dependencies)
- if (!item.isLeaf())
- for (LineageItem c : item.getInputs())
- rConstructHops(c, operands);
-
- //process current lineage item
- //NOTE: we generate instructions from hops (but without rewrites) to automatically
- //handle execution types, rmvar instructions, and rewiring of inputs/outputs
- switch (item.getType()) {
- case Creation: {
- Instruction inst = InstructionParser.parseSingleInstruction(item.getData());
-
- if (inst instanceof DataGenCPInstruction) {
- DataGenCPInstruction rand = (DataGenCPInstruction) inst;
- HashMap<String, Hop> params = new HashMap<>();
- if( rand.getOpcode().equals("rand") ) {
- if( rand.output.getDataType() == DataType.TENSOR)
- params.put(DataExpression.RAND_DIMS, new LiteralOp(rand.getDims()));
- else {
- params.put(DataExpression.RAND_ROWS, new LiteralOp(rand.getRows()));
- params.put(DataExpression.RAND_COLS, new LiteralOp(rand.getCols()));
- }
- params.put(DataExpression.RAND_MIN, new LiteralOp(rand.getMinValue()));
- params.put(DataExpression.RAND_MAX, new LiteralOp(rand.getMaxValue()));
- params.put(DataExpression.RAND_PDF, new LiteralOp(rand.getPdf()));
- params.put(DataExpression.RAND_LAMBDA, new LiteralOp(rand.getPdfParams()));
- params.put(DataExpression.RAND_SPARSITY, new LiteralOp(rand.getSparsity()));
- params.put(DataExpression.RAND_SEED, new LiteralOp(rand.getSeed()));
- }
- else if( rand.getOpcode().equals("seq") ) {
- params.put(Statement.SEQ_FROM, new LiteralOp(rand.getFrom()));
- params.put(Statement.SEQ_TO, new LiteralOp(rand.getTo()));
- params.put(Statement.SEQ_INCR, new LiteralOp(rand.getIncr()));
- }
- Hop datagen = new DataGenOp(OpOpDG.valueOf(rand.getOpcode().toUpperCase()),
- new DataIdentifier("tmp"), params);
- datagen.setBlocksize(rand.getBlocksize());
- operands.put(item.getId(), datagen);
- } else if (inst instanceof VariableCPInstruction
- && ((VariableCPInstruction) inst).isCreateVariable()) {
- String parts[] = InstructionUtils.getInstructionPartsWithValueType(inst.toString());
- DataType dt = DataType.valueOf(parts[4]);
- ValueType vt = dt == DataType.MATRIX ? ValueType.FP64 : ValueType.STRING;
- HashMap<String, Hop> params = new HashMap<>();
- params.put(DataExpression.IO_FILENAME, new LiteralOp(parts[2]));
- params.put(DataExpression.READROWPARAM, new LiteralOp(Long.parseLong(parts[6])));
- params.put(DataExpression.READCOLPARAM, new LiteralOp(Long.parseLong(parts[7])));
- params.put(DataExpression.READNNZPARAM, new LiteralOp(Long.parseLong(parts[8])));
- params.put(DataExpression.FORMAT_TYPE, new LiteralOp(parts[5]));
- DataOp pread = new DataOp(parts[1].substring(5), dt, vt, OpOpData.PERSISTENTREAD, params);
- pread.setFileName(parts[2]);
- operands.put(item.getId(), pread);
- }
- else if (inst instanceof RandSPInstruction) {
- RandSPInstruction rand = (RandSPInstruction) inst;
- HashMap<String, Hop> params = new HashMap<>();
- if (rand.output.getDataType() == DataType.TENSOR)
- params.put(DataExpression.RAND_DIMS, new LiteralOp(rand.getDims()));
- else {
- params.put(DataExpression.RAND_ROWS, new LiteralOp(rand.getRows()));
- params.put(DataExpression.RAND_COLS, new LiteralOp(rand.getCols()));
- }
- params.put(DataExpression.RAND_MIN, new LiteralOp(rand.getMinValue()));
- params.put(DataExpression.RAND_MAX, new LiteralOp(rand.getMaxValue()));
- params.put(DataExpression.RAND_PDF, new LiteralOp(rand.getPdf()));
- params.put(DataExpression.RAND_LAMBDA, new LiteralOp(rand.getPdfParams()));
- params.put(DataExpression.RAND_SPARSITY, new LiteralOp(rand.getSparsity()));
- params.put(DataExpression.RAND_SEED, new LiteralOp(rand.getSeed()));
- Hop datagen = new DataGenOp(OpOpDG.RAND, new DataIdentifier("tmp"), params);
- datagen.setBlocksize(rand.getBlocksize());
- operands.put(item.getId(), datagen);
- }
- break;
- }
- case Instruction: {
- CPType ctype = InstructionUtils.getCPTypeByOpcode(item.getOpcode());
- SPType stype = InstructionUtils.getSPTypeByOpcode(item.getOpcode());
-
- if (ctype != null) {
- switch (ctype) {
- case AggregateUnary: {
- Hop input = operands.get(item.getInputs()[0].getId());
- Hop aggunary = InstructionUtils.isUnaryMetadata(item.getOpcode()) ?
- HopRewriteUtils.createUnary(input, OpOp1.valueOfByOpcode(item.getOpcode())) :
- HopRewriteUtils.createAggUnaryOp(input, item.getOpcode());
- operands.put(item.getId(), aggunary);
- break;
- }
- case AggregateBinary: {
- Hop input1 = operands.get(item.getInputs()[0].getId());
- Hop input2 = operands.get(item.getInputs()[1].getId());
- Hop aggbinary = HopRewriteUtils.createMatrixMultiply(input1, input2);
- operands.put(item.getId(), aggbinary);
- break;
- }
- case AggregateTernary: {
- Hop input1 = operands.get(item.getInputs()[0].getId());
- Hop input2 = operands.get(item.getInputs()[1].getId());
- Hop input3 = operands.get(item.getInputs()[2].getId());
- Hop aggternary = HopRewriteUtils.createSum(
- HopRewriteUtils.createBinary(
- HopRewriteUtils.createBinary(input1, input2, OpOp2.MULT),
- input3, OpOp2.MULT));
- operands.put(item.getId(), aggternary);
- break;
- }
- case Unary:
- case Builtin: {
- Hop input = operands.get(item.getInputs()[0].getId());
- Hop unary = HopRewriteUtils.createUnary(input, item.getOpcode());
- operands.put(item.getId(), unary);
- break;
- }
- case Reorg: {
- operands.put(item.getId(), HopRewriteUtils.createReorg(
- operands.get(item.getInputs()[0].getId()), item.getOpcode()));
- break;
- }
- case Reshape: {
- ArrayList<Hop> inputs = new ArrayList<>();
- for(int i=0; i<5; i++)
- inputs.add(operands.get(item.getInputs()[i].getId()));
- operands.put(item.getId(), HopRewriteUtils.createReorg(inputs, ReOrgOp.RESHAPE));
- break;
- }
- case Binary: {
- //handle special cases of binary operations
- String opcode = ("^2".equals(item.getOpcode())
- || "*2".equals(item.getOpcode())) ?
- item.getOpcode().substring(0, 1) : item.getOpcode();
- Hop input1 = operands.get(item.getInputs()[0].getId());
- Hop input2 = operands.get(item.getInputs()[1].getId());
- Hop binary = HopRewriteUtils.createBinary(input1, input2, opcode);
- operands.put(item.getId(), binary);
- break;
- }
- case Ternary: {
- operands.put(item.getId(), HopRewriteUtils.createTernary(
- operands.get(item.getInputs()[0].getId()),
- operands.get(item.getInputs()[1].getId()),
- operands.get(item.getInputs()[2].getId()), item.getOpcode()));
- break;
- }
- case Ctable: { //e.g., ctable
- if( item.getInputs().length==3 )
- operands.put(item.getId(), HopRewriteUtils.createTernary(
- operands.get(item.getInputs()[0].getId()),
- operands.get(item.getInputs()[1].getId()),
- operands.get(item.getInputs()[2].getId()), OpOp3.CTABLE));
- else if( item.getInputs().length==5 )
- operands.put(item.getId(), HopRewriteUtils.createTernary(
- operands.get(item.getInputs()[0].getId()),
- operands.get(item.getInputs()[1].getId()),
- operands.get(item.getInputs()[2].getId()),
- operands.get(item.getInputs()[3].getId()),
- operands.get(item.getInputs()[4].getId()), OpOp3.CTABLE));
- break;
- }
- case BuiltinNary: {
- String opcode = item.getOpcode().equals("n+") ? "plus" : item.getOpcode();
- operands.put(item.getId(), HopRewriteUtils.createNary(
- OpOpN.valueOf(opcode.toUpperCase()), createNaryInputs(item, operands)));
- break;
- }
- case ParameterizedBuiltin: {
- operands.put(item.getId(), constructParameterizedBuiltinOp(item, operands));
- break;
- }
- case MatrixIndexing: {
- operands.put(item.getId(), constructIndexingOp(item, operands));
- break;
- }
- case MMTSJ: {
- //TODO handling of tsmm type left and right -> placement transpose
- Hop input = operands.get(item.getInputs()[0].getId());
- Hop aggunary = HopRewriteUtils.createMatrixMultiply(
- HopRewriteUtils.createTranspose(input), input);
- operands.put(item.getId(), aggunary);
- break;
- }
- case Variable: {
- if( item.getOpcode().startsWith("cast") )
- operands.put(item.getId(), HopRewriteUtils.createUnary(
- operands.get(item.getInputs()[0].getId()),
- OpOp1.valueOfByOpcode(item.getOpcode())));
- else //cpvar, write
- operands.put(item.getId(), operands.get(item.getInputs()[0].getId()));
- break;
- }
- default:
- throw new DMLRuntimeException("Unsupported instruction "
- + "type: " + ctype.name() + " (" + item.getOpcode() + ").");
- }
- }
- else if( stype != null ) {
- switch(stype) {
- case Reblock: {
- Hop input = operands.get(item.getInputs()[0].getId());
- input.setBlocksize(ConfigurationManager.getBlocksize());
- input.setRequiresReblock(true);
- operands.put(item.getId(), input);
- break;
- }
- case Checkpoint: {
- Hop input = operands.get(item.getInputs()[0].getId());
- operands.put(item.getId(), input);
- break;
- }
- case MatrixIndexing: {
- operands.put(item.getId(), constructIndexingOp(item, operands));
- break;
- }
- case GAppend: {
- operands.put(item.getId(), HopRewriteUtils.createBinary(
- operands.get(item.getInputs()[0].getId()),
- operands.get(item.getInputs()[1].getId()), OpOp2.CBIND));
- break;
- }
- default:
- throw new DMLRuntimeException("Unsupported instruction "
- + "type: " + stype.name() + " (" + item.getOpcode() + ").");
- }
- }
- else
- throw new DMLRuntimeException("Unsupported instruction: " + item.getOpcode());
- break;
- }
- case Literal: {
- CPOperand op = new CPOperand(item.getData());
- operands.put(item.getId(), ScalarObjectFactory
- .createLiteralOp(op.getValueType(), op.getName()));
- break;
- }
- case Dedup: {
- throw new NotImplementedException();
- }
- }
-
- item.setVisited();
- }
-
public static void constructLineageFromHops(Hop[] roots, String claName, Hop[] inputs, HashMap<Long, Hop> spoofmap) {
//probe existence and only generate lineage if non-existing
//(a fused operator might be used in multiple places of a program)
@@ -537,58 +233,6 @@
root.setVisited();
}
- private static Hop constructIndexingOp(LineageItem item, Map<Long, Hop> operands) {
- Hop input = operands.get(item.getInputs()[0].getId());
- if( "rightIndex".equals(item.getOpcode()) )
- return HopRewriteUtils.createIndexingOp(input,
- operands.get(item.getInputs()[1].getId()), //rl
- operands.get(item.getInputs()[2].getId()), //ru
- operands.get(item.getInputs()[3].getId()), //cl
- operands.get(item.getInputs()[4].getId())); //cu
- else if( "leftIndex".equals(item.getOpcode())
- || "mapLeftIndex".equals(item.getOpcode()) )
- return HopRewriteUtils.createLeftIndexingOp(input,
- operands.get(item.getInputs()[1].getId()), //rhs
- operands.get(item.getInputs()[2].getId()), //rl
- operands.get(item.getInputs()[3].getId()), //ru
- operands.get(item.getInputs()[4].getId()), //cl
- operands.get(item.getInputs()[5].getId())); //cu
- throw new DMLRuntimeException("Unsupported opcode: "+item.getOpcode());
- }
-
- private static Hop constructParameterizedBuiltinOp(LineageItem item, Map<Long, Hop> operands) {
- String opcode = item.getOpcode();
- Hop target = operands.get(item.getInputs()[0].getId());
- LinkedHashMap<String,Hop> args = new LinkedHashMap<>();
- if( opcode.equals("groupedagg") ) {
- args.put("target", target);
- args.put(Statement.GAGG_GROUPS, operands.get(item.getInputs()[1].getId()));
- args.put(Statement.GAGG_WEIGHTS, operands.get(item.getInputs()[2].getId()));
- args.put(Statement.GAGG_FN, operands.get(item.getInputs()[3].getId()));
- args.put(Statement.GAGG_NUM_GROUPS, operands.get(item.getInputs()[4].getId()));
- }
- else if (opcode.equalsIgnoreCase("rmempty")) {
- args.put("target", target);
- args.put("margin", operands.get(item.getInputs()[1].getId()));
- args.put("select", operands.get(item.getInputs()[2].getId()));
- }
- else if(opcode.equalsIgnoreCase("replace")) {
- args.put("target", target);
- args.put("pattern", operands.get(item.getInputs()[1].getId()));
- args.put("replacement", operands.get(item.getInputs()[2].getId()));
- }
- else if(opcode.equalsIgnoreCase("rexpand")) {
- args.put("target", target);
- args.put("max", operands.get(item.getInputs()[1].getId()));
- args.put("dir", operands.get(item.getInputs()[2].getId()));
- args.put("cast", operands.get(item.getInputs()[3].getId()));
- args.put("ignore", operands.get(item.getInputs()[4].getId()));
- }
-
- return HopRewriteUtils.createParameterizedBuiltinOp(
- target, args, ParamBuiltinOp.valueOf(opcode.toUpperCase()));
- }
-
public static LineageItem rDecompress(LineageItem item) {
if (item.getType() == LineageItemType.Dedup) {
LineageItem dedupInput = rDecompress(item.getInputs()[0]);
@@ -654,36 +298,6 @@
item.setVisited();
}
- public static String mergeExplainDedupBlocks(ExecutionContext ec) {
- Map<ProgramBlock, LineageDedupBlock> dedupBlocks = ec.getLineage().getDedupBlocks();
- StringBuilder sb = new StringBuilder();
- // Gather all the DAG roots of all the paths in all the loops.
- for (Map.Entry<ProgramBlock, LineageDedupBlock> dblock : dedupBlocks.entrySet()) {
- if (dblock.getValue() != null) {
- String forKey = dblock.getKey().getStatementBlock().getName();
- LineageDedupBlock dedup = dblock.getValue();
- for (Map.Entry<Long, LineageMap> patch : dedup.getPathMaps().entrySet()) {
- for (Map.Entry<String, LineageItem> root : patch.getValue().getTraces().entrySet()) {
- // Encode all the information in the headers that're
- // needed by the deserialization logic.
- sb.append("patch");
- sb.append(DEDUP_DELIM);
- sb.append(root.getKey());
- sb.append(DEDUP_DELIM);
- sb.append(forKey);
- sb.append(DEDUP_DELIM);
- sb.append(patch.getKey());
- sb.append("\n");
- sb.append(Explain.explain(root.getValue()));
- sb.append("\n");
-
- }
- }
- }
- }
- return sb.toString();
- }
-
public static LineageItem replace(LineageItem root, LineageItem liOld, LineageItem liNew) {
if( liNew == null )
throw new DMLRuntimeException("Invalid null lineage item for "+liOld.getId());
@@ -794,14 +408,6 @@
current.setVisited();
}
- private static Hop[] createNaryInputs(LineageItem item, Map<Long, Hop> operands) {
- int len = item.getInputs().length;
- Hop[] ret = new Hop[len];
- for( int i=0; i<len; i++ )
- ret[i] = operands.get(item.getInputs()[i].getId());
- return ret;
- }
-
public static boolean containsRandDataGen(HashSet<LineageItem> entries, LineageItem root) {
if (entries.contains(root) || root.isVisited())
return false;
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java
index fdcadff..2fc63ad 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java
@@ -83,7 +83,7 @@
}
public void processDedupItem(LineageMap lm, Long path, LineageItem[] liinputs, String name) {
- String delim = LineageItemUtils.DEDUP_DELIM;
+ String delim = LineageDedupUtils.DEDUP_DELIM;
for (Map.Entry<String, LineageItem> entry : lm._traces.entrySet()) {
// Encode everything in the opcode needed by the deserialization logic
// to map this lineage item to the right patch.
@@ -250,10 +250,7 @@
if (DMLScript.LINEAGE_DEDUP) {
// gracefully serialize the dedup maps without decompressing
- LineageItemUtils.writeTraceToHDFS(LineageItemUtils.mergeExplainDedupBlocks(ec), fName + ".lineage.dedup");
- // sample code to deserialize the dedup patches
- //String allDedup = LineageItemUtils.mergeExplainDedupBlocks(ec);
- //LineageItem tmp = LineageParser.parseLineageTraceDedup(allDedup);
+ LineageItemUtils.writeTraceToHDFS(LineageDedupUtils.mergeExplainDedupBlocks(ec), fName + ".lineage.dedup");
}
LineageItemUtils.writeTraceToHDFS(Explain.explain(li), fName + ".lineage");
}
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java
index c70cfdd..ad2f4e8 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java
@@ -112,26 +112,23 @@
return new LineageItem(id, "", opcode, inputs.toArray(new LineageItem[0]));
}
- public static LineageItem parseLineageTraceDedup(String str) {
- LineageItem li = null;
- Map<Long, Map<String, LineageItem>> patchLiMap = new HashMap<>();
+ protected static void parseLineageTraceDedup(String str) {
str.replaceAll("\r\n", "\n");
String[] allPatches = str.split("\n\n");
for (String patch : allPatches) {
String[] headBody = patch.split("\r\n|\r|\n", 2);
- // Parse the header
- String[] parts = headBody[0].split(LineageItemUtils.DEDUP_DELIM);
- // e.g. patch_R_SB15_1
+ // Parse the header (e.g. patch_R_SB15_1)
+ String[] parts = headBody[0].split(LineageDedupUtils.DEDUP_DELIM);
// Deserialize the patch
LineageItem patchLi = parseLineageTrace(headBody[1]);
+ //LineageItemUtils.computeByLineageDedup(patchLi);
Long pathId = Long.parseLong(parts[3]);
// Map the pathID and the DAG root name to the deserialized DAG.
- if (!patchLiMap.containsKey(pathId)) {
- patchLiMap.put(pathId, new HashMap<>());
+ if (!LineageRecomputeUtils.patchLiMap.containsKey(pathId)) {
+ LineageRecomputeUtils.patchLiMap.put(pathId, new HashMap<>());
}
- patchLiMap.get(pathId).put(parts[1], patchLi);
+ LineageRecomputeUtils.patchLiMap.get(pathId).put(parts[1], patchLi);
// TODO: handle multiple loops
}
- return li;
}
}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java
new file mode 100644
index 0000000..8f2c226
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.lineage;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types.DataType;
+import org.apache.sysds.common.Types.OpOp1;
+import org.apache.sysds.common.Types.OpOp2;
+import org.apache.sysds.common.Types.OpOp3;
+import org.apache.sysds.common.Types.OpOpDG;
+import org.apache.sysds.common.Types.OpOpData;
+import org.apache.sysds.common.Types.OpOpN;
+import org.apache.sysds.common.Types.ParamBuiltinOp;
+import org.apache.sysds.common.Types.ReOrgOp;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.hops.DataGenOp;
+import org.apache.sysds.hops.DataOp;
+import org.apache.sysds.hops.FunctionOp;
+import org.apache.sysds.hops.FunctionOp.FunctionType;
+import org.apache.sysds.hops.Hop;
+import org.apache.sysds.hops.LiteralOp;
+import org.apache.sysds.hops.rewrite.HopRewriteUtils;
+import org.apache.sysds.lops.Lop;
+import org.apache.sysds.lops.compile.Dag;
+import org.apache.sysds.parser.DMLProgram;
+import org.apache.sysds.parser.DataExpression;
+import org.apache.sysds.parser.DataIdentifier;
+import org.apache.sysds.parser.Statement;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.BasicProgramBlock;
+import org.apache.sysds.runtime.controlprogram.FunctionProgramBlock;
+import org.apache.sysds.runtime.controlprogram.Program;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory;
+import org.apache.sysds.runtime.instructions.Instruction;
+import org.apache.sysds.runtime.instructions.InstructionParser;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPInstruction.CPType;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.instructions.cp.DataGenCPInstruction;
+import org.apache.sysds.runtime.instructions.cp.ScalarObjectFactory;
+import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
+import org.apache.sysds.runtime.instructions.spark.RandSPInstruction;
+import org.apache.sysds.runtime.instructions.spark.SPInstruction.SPType;
+import org.apache.sysds.utils.Explain;
+import org.apache.sysds.utils.Explain.ExplainCounts;
+import org.apache.sysds.utils.Statistics;
+
+public class LineageRecomputeUtils {
+ private static final String LVARPREFIX = "lvar";
+ public static final String LPLACEHOLDER = "IN#";
+ private static final boolean DEBUG = false;
+ public static final Map<Long, Map<String, LineageItem>> patchLiMap = new HashMap<>();
+ private static final Map<Long, Map<String, Hop>> patchHopMap = new HashMap<>();
+
+ public static Data parseNComputeLineageTrace(String mainTrace, String dedupPatches) {
+ LineageItem root = LineageParser.parseLineageTrace(mainTrace);
+ if (dedupPatches != null)
+ LineageParser.parseLineageTraceDedup(dedupPatches);
+ Data ret = computeByLineage(root);
+ // Cleanup the statics
+ patchLiMap.clear();
+ patchHopMap.clear();
+ return ret;
+ }
+
+ private static Data computeByLineage(LineageItem root)
+ {
+ long rootId = root.getOpcode().equals("write") ?
+ root.getInputs()[0].getId() : root.getId();
+ String varname = LVARPREFIX + rootId;
+ Program prog = new Program(null);
+
+ // Recursively construct hops
+ root.resetVisitStatusNR();
+ Map<Long, Hop> operands = new HashMap<>();
+ Map<String, Hop> partDagRoots = new HashMap<>();
+ rConstructHops(root, operands, partDagRoots, prog);
+ Hop out = HopRewriteUtils.createTransientWrite(
+ varname, operands.get(rootId));
+
+ // Generate instructions
+ ExecutionContext ec = ExecutionContextFactory.createContext();
+ partDagRoots.put(varname, out);
+ constructBasicBlock(partDagRoots, varname, prog);
+
+ // Reset cache due to cleaned data objects
+ LineageCache.resetCache();
+ //execute instructions and get result
+ if (DEBUG) {
+ DMLScript.STATISTICS = true;
+ ExplainCounts counts = Explain.countDistributedOperations(prog);
+ System.out.println(Explain.display(null, prog, Explain.ExplainType.RUNTIME, counts));
+ }
+ ec.setProgram(prog);
+ prog.execute(ec);
+ if (DEBUG) {
+ Statistics.stopRunTimer();
+ System.out.println(Statistics.display(DMLScript.STATISTICS_COUNT));
+ }
+ return ec.getVariable(varname);
+ }
+
+ private static void constructBasicBlock(Map<String, Hop> partDagRoots, String dedupOut, Program prog) {
+ Hop out = partDagRoots.get(dedupOut);
+ // Compile and save
+ BasicProgramBlock pb = new BasicProgramBlock(prog);
+ pb.setInstructions(genInst(out));
+ prog.addProgramBlock(pb);
+ }
+
+
+ private static void rConstructHops(LineageItem item, Map<Long, Hop> operands, Map<String, Hop> partDagRoots, Program prog)
+ {
+ if (item.isVisited())
+ return;
+
+ //recursively process children (ordering by data dependencies)
+ if (!item.isLeaf())
+ for (LineageItem c : item.getInputs())
+ rConstructHops(c, operands, partDagRoots, prog);
+
+ //process current lineage item
+ //NOTE: we generate instructions from hops (but without rewrites) to automatically
+ //handle execution types, rmvar instructions, and rewiring of inputs/outputs
+ switch (item.getType()) {
+ case Creation: {
+ if (item.getData().startsWith(LPLACEHOLDER)) {
+ long phId = Long.parseLong(item.getData().substring(3));
+ Hop input = operands.get(phId);
+ operands.remove(phId);
+ // Replace the placeholders with TReads
+ operands.put(item.getId(), input); // order preserving
+ break;
+ }
+ Instruction inst = InstructionParser.parseSingleInstruction(item.getData());
+
+ if (inst instanceof DataGenCPInstruction) {
+ DataGenCPInstruction rand = (DataGenCPInstruction) inst;
+ HashMap<String, Hop> params = new HashMap<>();
+ if( rand.getOpcode().equals("rand") ) {
+ if( rand.output.getDataType() == DataType.TENSOR)
+ params.put(DataExpression.RAND_DIMS, new LiteralOp(rand.getDims()));
+ else {
+ params.put(DataExpression.RAND_ROWS, new LiteralOp(rand.getRows()));
+ params.put(DataExpression.RAND_COLS, new LiteralOp(rand.getCols()));
+ }
+ params.put(DataExpression.RAND_MIN, new LiteralOp(rand.getMinValue()));
+ params.put(DataExpression.RAND_MAX, new LiteralOp(rand.getMaxValue()));
+ params.put(DataExpression.RAND_PDF, new LiteralOp(rand.getPdf()));
+ params.put(DataExpression.RAND_LAMBDA, new LiteralOp(rand.getPdfParams()));
+ params.put(DataExpression.RAND_SPARSITY, new LiteralOp(rand.getSparsity()));
+ params.put(DataExpression.RAND_SEED, new LiteralOp(rand.getSeed()));
+ }
+ else if( rand.getOpcode().equals("seq") ) {
+ params.put(Statement.SEQ_FROM, new LiteralOp(rand.getFrom()));
+ params.put(Statement.SEQ_TO, new LiteralOp(rand.getTo()));
+ params.put(Statement.SEQ_INCR, new LiteralOp(rand.getIncr()));
+ }
+ Hop datagen = new DataGenOp(OpOpDG.valueOf(rand.getOpcode().toUpperCase()),
+ new DataIdentifier("tmp"), params);
+ datagen.setBlocksize(rand.getBlocksize());
+ operands.put(item.getId(), datagen);
+ } else if (inst instanceof VariableCPInstruction
+ && ((VariableCPInstruction) inst).isCreateVariable()) {
+ String parts[] = InstructionUtils.getInstructionPartsWithValueType(inst.toString());
+ DataType dt = DataType.valueOf(parts[4]);
+ ValueType vt = dt == DataType.MATRIX ? ValueType.FP64 : ValueType.STRING;
+ HashMap<String, Hop> params = new HashMap<>();
+ params.put(DataExpression.IO_FILENAME, new LiteralOp(parts[2]));
+ params.put(DataExpression.READROWPARAM, new LiteralOp(Long.parseLong(parts[6])));
+ params.put(DataExpression.READCOLPARAM, new LiteralOp(Long.parseLong(parts[7])));
+ params.put(DataExpression.READNNZPARAM, new LiteralOp(Long.parseLong(parts[8])));
+ params.put(DataExpression.FORMAT_TYPE, new LiteralOp(parts[5]));
+ DataOp pread = new DataOp(parts[1].substring(5), dt, vt, OpOpData.PERSISTENTREAD, params);
+ pread.setFileName(parts[2]);
+ operands.put(item.getId(), pread);
+ }
+ else if (inst instanceof RandSPInstruction) {
+ RandSPInstruction rand = (RandSPInstruction) inst;
+ HashMap<String, Hop> params = new HashMap<>();
+ if (rand.output.getDataType() == DataType.TENSOR)
+ params.put(DataExpression.RAND_DIMS, new LiteralOp(rand.getDims()));
+ else {
+ params.put(DataExpression.RAND_ROWS, new LiteralOp(rand.getRows()));
+ params.put(DataExpression.RAND_COLS, new LiteralOp(rand.getCols()));
+ }
+ params.put(DataExpression.RAND_MIN, new LiteralOp(rand.getMinValue()));
+ params.put(DataExpression.RAND_MAX, new LiteralOp(rand.getMaxValue()));
+ params.put(DataExpression.RAND_PDF, new LiteralOp(rand.getPdf()));
+ params.put(DataExpression.RAND_LAMBDA, new LiteralOp(rand.getPdfParams()));
+ params.put(DataExpression.RAND_SPARSITY, new LiteralOp(rand.getSparsity()));
+ params.put(DataExpression.RAND_SEED, new LiteralOp(rand.getSeed()));
+ Hop datagen = new DataGenOp(OpOpDG.RAND, new DataIdentifier("tmp"), params);
+ datagen.setBlocksize(rand.getBlocksize());
+ operands.put(item.getId(), datagen);
+ }
+ break;
+ }
+ case Instruction: {
+ if (item.isDedup()) {
+ // Create function call for each dedup entry
+ String[] parts = item.getOpcode().split(LineageDedupUtils.DEDUP_DELIM); //e.g. dedup_R_SB13_0
+ String name = parts[2] + parts[1] + parts[3]; //loopId + outVar + pathId
+ List<Hop> finputs = Arrays.stream(item.getInputs())
+ .map(inp -> operands.get(inp.getId())).collect(Collectors.toList());
+ String[] inputNames = new String[item.getInputs().length];
+ for (int i=0; i<item.getInputs().length; i++)
+ inputNames[i] = LPLACEHOLDER + i; //e.g. IN#0, IN#1
+ Hop funcOp = new FunctionOp(FunctionType.DML, DMLProgram.DEFAULT_NAMESPACE,
+ name, inputNames, finputs, new String[] {parts[1]}, false);
+
+ // Cut the Hop dag after function calls
+ partDagRoots.put(parts[1], funcOp);
+ // Compile the dag and save
+ constructBasicBlock(partDagRoots, parts[1], prog);
+
+ // Construct a Hop dag for the function body from the dedup patch, and compile
+ Hop output = constructHopsDedupPatch(parts, inputNames, finputs, prog);
+ // Create a TRead on the function o/p as a leaf for the next Hop dag
+ // Use the function body root/return hop to propagate right data type
+ operands.put(item.getId(), HopRewriteUtils.createTransientRead(parts[1], output));
+ break;
+ }
+ CPType ctype = InstructionUtils.getCPTypeByOpcode(item.getOpcode());
+ SPType stype = InstructionUtils.getSPTypeByOpcode(item.getOpcode());
+
+ if (ctype != null) {
+ switch (ctype) {
+ case AggregateUnary: {
+ Hop input = operands.get(item.getInputs()[0].getId());
+ Hop aggunary = InstructionUtils.isUnaryMetadata(item.getOpcode()) ?
+ HopRewriteUtils.createUnary(input, OpOp1.valueOfByOpcode(item.getOpcode())) :
+ HopRewriteUtils.createAggUnaryOp(input, item.getOpcode());
+ operands.put(item.getId(), aggunary);
+ break;
+ }
+ case AggregateBinary: {
+ Hop input1 = operands.get(item.getInputs()[0].getId());
+ Hop input2 = operands.get(item.getInputs()[1].getId());
+ Hop aggbinary = HopRewriteUtils.createMatrixMultiply(input1, input2);
+ operands.put(item.getId(), aggbinary);
+ break;
+ }
+ case AggregateTernary: {
+ Hop input1 = operands.get(item.getInputs()[0].getId());
+ Hop input2 = operands.get(item.getInputs()[1].getId());
+ Hop input3 = operands.get(item.getInputs()[2].getId());
+ Hop aggternary = HopRewriteUtils.createSum(
+ HopRewriteUtils.createBinary(
+ HopRewriteUtils.createBinary(input1, input2, OpOp2.MULT),
+ input3, OpOp2.MULT));
+ operands.put(item.getId(), aggternary);
+ break;
+ }
+ case Unary:
+ case Builtin: {
+ Hop input = operands.get(item.getInputs()[0].getId());
+ Hop unary = HopRewriteUtils.createUnary(input, item.getOpcode());
+ operands.put(item.getId(), unary);
+ break;
+ }
+ case Reorg: {
+ operands.put(item.getId(), HopRewriteUtils.createReorg(
+ operands.get(item.getInputs()[0].getId()), item.getOpcode()));
+ break;
+ }
+ case Reshape: {
+ ArrayList<Hop> inputs = new ArrayList<>();
+ for(int i=0; i<5; i++)
+ inputs.add(operands.get(item.getInputs()[i].getId()));
+ operands.put(item.getId(), HopRewriteUtils.createReorg(inputs, ReOrgOp.RESHAPE));
+ break;
+ }
+ case Binary: {
+ //handle special cases of binary operations
+ String opcode = ("^2".equals(item.getOpcode())
+ || "*2".equals(item.getOpcode())) ?
+ item.getOpcode().substring(0, 1) : item.getOpcode();
+ Hop input1 = operands.get(item.getInputs()[0].getId());
+ Hop input2 = operands.get(item.getInputs()[1].getId());
+ Hop binary = HopRewriteUtils.createBinary(input1, input2, opcode);
+ operands.put(item.getId(), binary);
+ break;
+ }
+ case Ternary: {
+ operands.put(item.getId(), HopRewriteUtils.createTernary(
+ operands.get(item.getInputs()[0].getId()),
+ operands.get(item.getInputs()[1].getId()),
+ operands.get(item.getInputs()[2].getId()), item.getOpcode()));
+ break;
+ }
+ case Ctable: { //e.g., ctable
+ if( item.getInputs().length==3 )
+ operands.put(item.getId(), HopRewriteUtils.createTernary(
+ operands.get(item.getInputs()[0].getId()),
+ operands.get(item.getInputs()[1].getId()),
+ operands.get(item.getInputs()[2].getId()), OpOp3.CTABLE));
+ else if( item.getInputs().length==5 )
+ operands.put(item.getId(), HopRewriteUtils.createTernary(
+ operands.get(item.getInputs()[0].getId()),
+ operands.get(item.getInputs()[1].getId()),
+ operands.get(item.getInputs()[2].getId()),
+ operands.get(item.getInputs()[3].getId()),
+ operands.get(item.getInputs()[4].getId()), OpOp3.CTABLE));
+ break;
+ }
+ case BuiltinNary: {
+ String opcode = item.getOpcode().equals("n+") ? "plus" : item.getOpcode();
+ operands.put(item.getId(), HopRewriteUtils.createNary(
+ OpOpN.valueOf(opcode.toUpperCase()), createNaryInputs(item, operands)));
+ break;
+ }
+ case ParameterizedBuiltin: {
+ operands.put(item.getId(), constructParameterizedBuiltinOp(item, operands));
+ break;
+ }
+ case MatrixIndexing: {
+ operands.put(item.getId(), constructIndexingOp(item, operands));
+ break;
+ }
+ case MMTSJ: {
+ //TODO handling of tsmm type left and right -> placement transpose
+ Hop input = operands.get(item.getInputs()[0].getId());
+ Hop aggunary = HopRewriteUtils.createMatrixMultiply(
+ HopRewriteUtils.createTranspose(input), input);
+ operands.put(item.getId(), aggunary);
+ break;
+ }
+ case Variable: {
+ if( item.getOpcode().startsWith("cast") )
+ operands.put(item.getId(), HopRewriteUtils.createUnary(
+ operands.get(item.getInputs()[0].getId()),
+ OpOp1.valueOfByOpcode(item.getOpcode())));
+ else //cpvar, write
+ operands.put(item.getId(), operands.get(item.getInputs()[0].getId()));
+ break;
+ }
+ default:
+ throw new DMLRuntimeException("Unsupported instruction "
+ + "type: " + ctype.name() + " (" + item.getOpcode() + ").");
+ }
+ }
+ else if( stype != null ) {
+ switch(stype) {
+ case Reblock: {
+ Hop input = operands.get(item.getInputs()[0].getId());
+ input.setBlocksize(ConfigurationManager.getBlocksize());
+ input.setRequiresReblock(true);
+ operands.put(item.getId(), input);
+ break;
+ }
+ case Checkpoint: {
+ Hop input = operands.get(item.getInputs()[0].getId());
+ operands.put(item.getId(), input);
+ break;
+ }
+ case MatrixIndexing: {
+ operands.put(item.getId(), constructIndexingOp(item, operands));
+ break;
+ }
+ case GAppend: {
+ operands.put(item.getId(), HopRewriteUtils.createBinary(
+ operands.get(item.getInputs()[0].getId()),
+ operands.get(item.getInputs()[1].getId()), OpOp2.CBIND));
+ break;
+ }
+ default:
+ throw new DMLRuntimeException("Unsupported instruction "
+ + "type: " + stype.name() + " (" + item.getOpcode() + ").");
+ }
+ }
+ else
+ throw new DMLRuntimeException("Unsupported instruction: " + item.getOpcode());
+ break;
+ }
+ case Literal: {
+ CPOperand op = new CPOperand(item.getData());
+ operands.put(item.getId(), ScalarObjectFactory
+ .createLiteralOp(op.getValueType(), op.getName()));
+ break;
+ }
+ case Dedup: {
+ throw new NotImplementedException();
+ }
+ }
+
+ item.setVisited();
+ }
+
+ private static Hop constructHopsDedupPatch(String[] parts, String[] inputs, List<Hop> inpHops, Program prog) {
+ // Construct and compile the function body
+ String outname = parts[1];
+ Long pathId = Long.parseLong(parts[3]);
+ // Return if this patch is already compiled
+ if (patchHopMap.containsKey(pathId) && patchHopMap.get(pathId).containsKey(outname))
+ return patchHopMap.get(pathId).get(outname);
+
+ // Construct a Hop dag
+ LineageItem patchRoot = patchLiMap.get(pathId).get(outname);
+ patchRoot.resetVisitStatusNR();
+ Map<Long, Hop> operands = new HashMap<>();
+ // Create TRead on the function inputs
+ //FIXME: the keys of operands can be replaced inside rConstructHops
+ for (int i=0; i<inputs.length; i++)
+ operands.put((long)i, HopRewriteUtils.createTransientRead(inputs[i], inpHops.get(i))); //order preserving
+ rConstructHops(patchRoot, operands, null, null);
+ Hop out = HopRewriteUtils.createTransientWrite(outname, operands.get(patchRoot.getId()));
+ if (!patchHopMap.containsKey(pathId))
+ patchHopMap.put(pathId, new HashMap<>());
+ patchHopMap.get(pathId).put(outname, out);
+
+ // Compile to instructions and save as a FunctionProgramBlock
+ List<DataIdentifier> funcInputs = new ArrayList<>();
+ for (int i=0; i<inpHops.size(); i++)
+ funcInputs.add(new DataIdentifier(inputs[i], inpHops.get(i).getDataType(), inpHops.get(i).getValueType()));
+ List<DataIdentifier> funcOutput = new ArrayList<>(Arrays.asList(new DataIdentifier(outname)));
+ // TODO: multi-return function
+ FunctionProgramBlock fpb = new FunctionProgramBlock(prog, funcInputs, funcOutput);
+ BasicProgramBlock pb = new BasicProgramBlock(prog);
+ pb.setInstructions(genInst(out));
+ fpb.addProgramBlock(pb);
+ prog.addFunctionProgramBlock(DMLProgram.DEFAULT_NAMESPACE, parts[2]+parts[1]+parts[3], fpb);
+ //fpb.setRecompileOnce(true);
+ return out;
+ }
+
+ private static ArrayList<Instruction> genInst (Hop root) {
+ Dag<Lop> dag = new Dag<>();
+ Lop lops = root.constructLops();
+ lops.addToDag(dag);
+ return dag.getJobs(null, ConfigurationManager.getDMLConfig());
+ }
+
+ private static Hop[] createNaryInputs(LineageItem item, Map<Long, Hop> operands) {
+ int len = item.getInputs().length;
+ Hop[] ret = new Hop[len];
+ for( int i=0; i<len; i++ )
+ ret[i] = operands.get(item.getInputs()[i].getId());
+ return ret;
+ }
+
+ private static Hop constructParameterizedBuiltinOp(LineageItem item, Map<Long, Hop> operands) {
+ String opcode = item.getOpcode();
+ Hop target = operands.get(item.getInputs()[0].getId());
+ LinkedHashMap<String,Hop> args = new LinkedHashMap<>();
+ if( opcode.equals("groupedagg") ) {
+ args.put("target", target);
+ args.put(Statement.GAGG_GROUPS, operands.get(item.getInputs()[1].getId()));
+ args.put(Statement.GAGG_WEIGHTS, operands.get(item.getInputs()[2].getId()));
+ args.put(Statement.GAGG_FN, operands.get(item.getInputs()[3].getId()));
+ args.put(Statement.GAGG_NUM_GROUPS, operands.get(item.getInputs()[4].getId()));
+ }
+ else if (opcode.equalsIgnoreCase("rmempty")) {
+ args.put("target", target);
+ args.put("margin", operands.get(item.getInputs()[1].getId()));
+ args.put("select", operands.get(item.getInputs()[2].getId()));
+ }
+ else if(opcode.equalsIgnoreCase("replace")) {
+ args.put("target", target);
+ args.put("pattern", operands.get(item.getInputs()[1].getId()));
+ args.put("replacement", operands.get(item.getInputs()[2].getId()));
+ }
+ else if(opcode.equalsIgnoreCase("rexpand")) {
+ args.put("target", target);
+ args.put("max", operands.get(item.getInputs()[1].getId()));
+ args.put("dir", operands.get(item.getInputs()[2].getId()));
+ args.put("cast", operands.get(item.getInputs()[3].getId()));
+ args.put("ignore", operands.get(item.getInputs()[4].getId()));
+ }
+
+ return HopRewriteUtils.createParameterizedBuiltinOp(
+ target, args, ParamBuiltinOp.valueOf(opcode.toUpperCase()));
+ }
+
+ private static Hop constructIndexingOp(LineageItem item, Map<Long, Hop> operands) {
+ Hop input = operands.get(item.getInputs()[0].getId());
+ if( "rightIndex".equals(item.getOpcode()) )
+ return HopRewriteUtils.createIndexingOp(input,
+ operands.get(item.getInputs()[1].getId()), //rl
+ operands.get(item.getInputs()[2].getId()), //ru
+ operands.get(item.getInputs()[3].getId()), //cl
+ operands.get(item.getInputs()[4].getId())); //cu
+ else if( "leftIndex".equals(item.getOpcode())
+ || "mapLeftIndex".equals(item.getOpcode()) )
+ return HopRewriteUtils.createLeftIndexingOp(input,
+ operands.get(item.getInputs()[1].getId()), //rhs
+ operands.get(item.getInputs()[2].getId()), //rl
+ operands.get(item.getInputs()[3].getId()), //ru
+ operands.get(item.getInputs()[4].getId()), //cl
+ operands.get(item.getInputs()[5].getId())); //cu
+ throw new DMLRuntimeException("Unsupported opcode: "+item.getOpcode());
+ }
+}
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index b0d66f2..58b9282 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -687,6 +687,10 @@
return TestUtils.readDMLString(baseDirectory + OUTPUT_DIR + fileName + ".lineage");
}
+ protected static String readDMLLineageDedupFromHDFS(String fileName) {
+ return TestUtils.readDMLString(baseDirectory + OUTPUT_DIR + fileName + ".lineage.dedup");
+ }
+
protected static FrameBlock readDMLFrameFromHDFS(String fileName, FileFormat fmt) throws IOException {
// read frame data from hdfs
String strFrameFileName = baseDirectory + OUTPUT_DIR + fileName;
diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java
index 6f57adc..de75ab7 100644
--- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java
@@ -29,9 +29,7 @@
import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.lineage.Lineage;
import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
-import org.apache.sysds.runtime.lineage.LineageItem;
-import org.apache.sysds.runtime.lineage.LineageItemUtils;
-import org.apache.sysds.runtime.lineage.LineageParser;
+import org.apache.sysds.runtime.lineage.LineageRecomputeUtils;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
import org.apache.sysds.test.AutomatedTestBase;
@@ -103,8 +101,7 @@
//get lineage and generate program
String Rtrace = readDMLLineageFromHDFS("R");
- LineageItem R = LineageParser.parseLineageTrace(Rtrace);
- Data ret = LineageItemUtils.computeByLineage(R);
+ Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R");
MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease();
TestUtils.compareMatrices(dmlfile, tmp, 1e-6);
diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java
index 03b7587..1dc675c 100644
--- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java
@@ -27,9 +27,7 @@
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.lineage.Lineage;
-import org.apache.sysds.runtime.lineage.LineageItem;
-import org.apache.sysds.runtime.lineage.LineageItemUtils;
-import org.apache.sysds.runtime.lineage.LineageParser;
+import org.apache.sysds.runtime.lineage.LineageRecomputeUtils;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
import org.apache.sysds.test.AutomatedTestBase;
@@ -81,8 +79,7 @@
//get lineage and generate program
String Rtrace = readDMLLineageFromHDFS("R");
- LineageItem R = LineageParser.parseLineageTrace(Rtrace);
- Data ret = LineageItemUtils.computeByLineage(R);
+ Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R");
MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease();
diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceDedupExecTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceDedupExecTest.java
new file mode 100644
index 0000000..5f729d3
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceDedupExecTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.lineage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.junit.Test;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.lineage.Lineage;
+import org.apache.sysds.runtime.lineage.LineageRecomputeUtils;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+
+public class LineageTraceDedupExecTest extends AutomatedTestBase {
+
+ protected static final String TEST_DIR = "functions/lineage/";
+ protected static final String TEST_NAME1 = "LineageTraceDedupExec1";
+ protected static final String TEST_NAME10 = "LineageTraceDedupExec10";
+ protected static final String TEST_NAME2 = "LineageTraceDedupExec2";
+ protected String TEST_CLASS_DIR = TEST_DIR + LineageTraceDedupExecTest.class.getSimpleName() + "/";
+
+ protected static final int numRecords = 10;
+ protected static final int numFeatures = 5;
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1));
+ addTestConfiguration(TEST_NAME10, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME10));
+ addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2));
+ }
+
+ @Test
+ public void testLineageTraceExec1() {
+ testLineageTraceExec(TEST_NAME1);
+ }
+
+ @Test
+ public void testLineageTraceExec10() {
+ testLineageTraceExec(TEST_NAME10);
+ }
+
+ @Test
+ public void testLineageTraceExec2() {
+ testLineageTraceExec(TEST_NAME2);
+ }
+
+ private void testLineageTraceExec(String testname) {
+ System.out.println("------------ BEGIN " + testname + "------------");
+
+ getAndLoadTestConfiguration(testname);
+ List<String> proArgs = new ArrayList<>();
+
+ proArgs.add("-lineage");
+ proArgs.add("dedup");
+ proArgs.add("-stats");
+ proArgs.add("-args");
+ proArgs.add(output("R"));
+ proArgs.add(String.valueOf(numRecords));
+ proArgs.add(String.valueOf(numFeatures));
+ programArgs = proArgs.toArray(new String[proArgs.size()]);
+ fullDMLScriptName = getScript();
+
+ Lineage.resetInternalState();
+ //run the test
+ runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
+
+ //get lineage and generate program
+ String Rtrace = readDMLLineageFromHDFS("R");
+ String RDedupPatches = readDMLLineageDedupFromHDFS("R");
+ Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, RDedupPatches);
+
+ HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R");
+ MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease();
+ TestUtils.compareMatrices(dmlfile, tmp, 1e-6);
+ }
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java
index d4f6f53..8e07c21 100644
--- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java
@@ -28,8 +28,8 @@
import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.lineage.Lineage;
import org.apache.sysds.runtime.lineage.LineageItem;
-import org.apache.sysds.runtime.lineage.LineageItemUtils;
import org.apache.sysds.runtime.lineage.LineageParser;
+import org.apache.sysds.runtime.lineage.LineageRecomputeUtils;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixValue;
import org.apache.sysds.test.AutomatedTestBase;
@@ -117,12 +117,12 @@
TestUtils.compareScalars(Y_lineage, Explain.explain(Y_li));
//generate program
- Data X_data = LineageItemUtils.computeByLineage(X_li);
+ Data X_data = LineageRecomputeUtils.parseNComputeLineageTrace(X_lineage, null);
HashMap<MatrixValue.CellIndex, Double> X_dmlfile = readDMLMatrixFromHDFS("X");
MatrixBlock X_tmp = ((MatrixObject)X_data).acquireReadAndRelease();
TestUtils.compareMatrices(X_dmlfile, X_tmp, 1e-6);
- Data Y_data = LineageItemUtils.computeByLineage(Y_li);
+ Data Y_data = LineageRecomputeUtils.parseNComputeLineageTrace(Y_lineage, null);
HashMap<MatrixValue.CellIndex, Double> Y_dmlfile = readDMLMatrixFromHDFS("Y");
MatrixBlock Y_tmp = ((MatrixObject)Y_data).acquireReadAndRelease();
TestUtils.compareMatrices(Y_dmlfile, Y_tmp, 1e-6);
diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java
index b2c2b1f..c1e9205 100644
--- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java
@@ -28,9 +28,7 @@
import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.instructions.cp.ScalarObject;
import org.apache.sysds.runtime.lineage.Lineage;
-import org.apache.sysds.runtime.lineage.LineageItem;
-import org.apache.sysds.runtime.lineage.LineageItemUtils;
-import org.apache.sysds.runtime.lineage.LineageParser;
+import org.apache.sysds.runtime.lineage.LineageRecomputeUtils;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
import org.apache.sysds.test.AutomatedTestBase;
@@ -123,8 +121,7 @@
//get lineage and generate program
String Rtrace = readDMLLineageFromHDFS("R");
- LineageItem R = LineageParser.parseLineageTrace(Rtrace);
- Data ret = LineageItemUtils.computeByLineage(R);
+ Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
if( testname.equals(TEST_NAME2) || testname.equals(TEST_NAME5)) {
double val1 = readDMLScalarFromHDFS("R").get(new CellIndex(1,1));
diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java
index 1aedb7b..e6ebf78 100644
--- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java
@@ -27,9 +27,7 @@
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.lineage.Lineage;
-import org.apache.sysds.runtime.lineage.LineageItem;
-import org.apache.sysds.runtime.lineage.LineageItemUtils;
-import org.apache.sysds.runtime.lineage.LineageParser;
+import org.apache.sysds.runtime.lineage.LineageRecomputeUtils;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
import org.apache.sysds.test.AutomatedTestBase;
@@ -89,8 +87,7 @@
//get lineage and generate program
String Rtrace = readDMLLineageFromHDFS("R");
- LineageItem R = LineageParser.parseLineageTrace(Rtrace);
- Data ret = LineageItemUtils.computeByLineage(R);
+ Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R");
MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease();
diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java
index e796825..6b9a7bd 100644
--- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java
@@ -28,9 +28,7 @@
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.lineage.Lineage;
-import org.apache.sysds.runtime.lineage.LineageItem;
-import org.apache.sysds.runtime.lineage.LineageItemUtils;
-import org.apache.sysds.runtime.lineage.LineageParser;
+import org.apache.sysds.runtime.lineage.LineageRecomputeUtils;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
import org.apache.sysds.test.AutomatedTestBase;
@@ -163,8 +161,7 @@
//get lineage and generate program
String Rtrace = readDMLLineageFromHDFS("R");
- LineageItem R = LineageParser.parseLineageTrace(Rtrace);
- Data ret = LineageItemUtils.computeByLineage(R);
+ Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R");
MatrixBlock tmp = ((MatrixObject) ret).acquireReadAndRelease();
diff --git a/src/test/scripts/functions/lineage/LineageTraceDedupExec1.dml b/src/test/scripts/functions/lineage/LineageTraceDedupExec1.dml
new file mode 100644
index 0000000..fc557c6
--- /dev/null
+++ b/src/test/scripts/functions/lineage/LineageTraceDedupExec1.dml
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = rand(rows=10, cols=5, seed=42);
+R = X;
+
+for(i in 1:2){
+ R = R + 1 / 2;
+ R = R * 3;
+ X = X - 5;
+ R = R - 5;
+}
+
+R = R * 4;
+write(R, $1, format="text");
diff --git a/src/test/scripts/functions/lineage/LineageTraceDedupExec10.dml b/src/test/scripts/functions/lineage/LineageTraceDedupExec10.dml
new file mode 100644
index 0000000..1e755df
--- /dev/null
+++ b/src/test/scripts/functions/lineage/LineageTraceDedupExec10.dml
@@ -0,0 +1,34 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = rand(rows=10, cols=5, seed=42);
+R = X;
+
+for (i in 1:4) {
+ R = R + 1/2;
+ if (i %% 2 == 0)
+ R = R * 3*i;
+ R = R - 5;
+}
+
+R = R * 4;
+
+write(R, $1, format="text");
diff --git a/src/test/scripts/functions/lineage/LineageTraceDedupExec2.dml b/src/test/scripts/functions/lineage/LineageTraceDedupExec2.dml
new file mode 100644
index 0000000..3fa49dd
--- /dev/null
+++ b/src/test/scripts/functions/lineage/LineageTraceDedupExec2.dml
@@ -0,0 +1,45 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = rand(rows=10, cols=5, seed=42);
+
+R = X;
+for(i in 1:5){ #10
+ if(i %% 2 == 1)
+ R = R + 1 / 2;
+ else
+ R = R * 3;
+
+ R = R - 5;
+
+ if (i %% 5 == 0)
+ R = t(R) %*% R;
+
+ R = R - 23
+}
+
+R = R * 3;
+
+#for (j in 1:2) {
+# R = R * 4;
+#}
+
+write(R, $1, format="text");