blob: 1b1b558f8a3708f70de1a461d0aac01f9db53f92 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.lineage;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.sysds.runtime.instructions.spark.RandSPInstruction;
import org.apache.sysds.runtime.io.IOUtilFunctions;
import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
import org.apache.sysds.runtime.lineage.LineageItem.LineageItemType;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.AggOp;
import org.apache.sysds.common.Types.Direction;
import org.apache.sysds.hops.AggBinaryOp;
import org.apache.sysds.hops.AggUnaryOp;
import org.apache.sysds.hops.BinaryOp;
import org.apache.sysds.hops.Hop;
import org.apache.sysds.hops.IndexingOp;
import org.apache.sysds.hops.LiteralOp;
import org.apache.sysds.hops.ReorgOp;
import org.apache.sysds.hops.TernaryOp;
import org.apache.sysds.hops.UnaryOp;
import org.apache.sysds.hops.codegen.SpoofFusedOp;
import org.apache.sysds.lops.PartialAggregate;
import org.apache.sysds.lops.UnaryCP;
import org.apache.sysds.lops.compile.Dag;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.instructions.Instruction;
import org.apache.sysds.runtime.instructions.InstructionParser;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.instructions.cp.DataGenCPInstruction;
import org.apache.sysds.runtime.instructions.cp.ScalarObject;
import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
import org.apache.sysds.runtime.util.HDFSTool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Stack;
import java.util.stream.Collectors;
public class LineageItemUtils {
public static final String LPLACEHOLDER = "IN#";
public static LineageItemType getType(String str) {
if (str.length() == 1) {
switch (str) {
case "C":
return LineageItemType.Creation;
case "L":
return LineageItemType.Literal;
case "I":
return LineageItemType.Instruction;
case "D":
return LineageItemType.Dedup;
default:
throw new DMLRuntimeException("Unknown LineageItemType given!");
}
} else
throw new DMLRuntimeException("Unknown LineageItemType given!");
}
private static String getString(LineageItemType lit) {
switch (lit) {
case Creation:
return "C";
case Literal:
return "L";
case Instruction:
return "I";
case Dedup:
return "D";
default:
throw new DMLRuntimeException("Unknown LineageItemType given!");
}
}
private static String getString(LineageItem li) {
return getString(li.getType());
}
public static String explainSingleLineageItem(LineageItem li) {
StringBuilder sb = new StringBuilder();
sb.append("(").append(li.getId()).append(") ");
sb.append("(").append(getString(li)).append(") ");
if (li.isLeaf()) {
sb.append(li.getData()).append(" ");
} else {
if (li.getType() == LineageItemType.Dedup)
sb.append(li.getOpcode()).append(li.getData()).append(" ");
else
sb.append(li.getOpcode()).append(" ");
String ids = Arrays.stream(li.getInputs())
.map(i -> String.format("(%d)", i.getId()))
.collect(Collectors.joining(" "));
sb.append(ids);
}
return sb.toString().trim();
}
public static LineageItem[] getLineage(ExecutionContext ec, CPOperand... operands) {
return Arrays.stream(operands).filter(c -> c!=null)
.map(c -> ec.getLineage().getOrCreate(c)).toArray(LineageItem[]::new);
}
public static void constructLineageFromHops(Hop[] roots, String claName, Hop[] inputs, HashMap<Long, Hop> spoofmap) {
//probe existence and only generate lineage if non-existing
//(a fused operator might be used in multiple places of a program)
if( LineageCodegenItem.getCodegenLTrace(claName) == null ) {
//recursively construct lineage for fused operator
Map<Long, LineageItem> operands = new HashMap<>();
//reset visit status once for entire sub DAG
for( Hop root : roots )
root.resetVisitStatus();
//construct lineage dags for roots (potentially overlapping)
for( Hop root : roots )
rConstructLineageFromHops(root, inputs, operands, spoofmap);
//create single lineage item (single root or multiagg)
LineageItem out = operands.get(roots[0].getHopID());
if( roots.length > 1 ) { //multi-agg
LineageItem[] outputs = Arrays.stream(roots)
.map(h -> new LineageItem("", UnaryCP.CAST_AS_MATRIX_OPCODE,
new LineageItem[]{operands.get(h.getHopID())}))
.toArray(LineageItem[]::new);
out = new LineageItem("", "cbind", outputs);
}
//cache to avoid reconstruction
LineageCodegenItem.setCodegenLTrace(claName, out);
for (Hop root : roots)
root.resetVisitStatus();
}
}
public static void rConstructLineageFromHops(Hop root, Hop[] inputs, Map<Long, LineageItem> operands, HashMap<Long, Hop> spoofmap) {
if (root.isVisited())
return;
boolean spoof = root instanceof SpoofFusedOp && ArrayUtils.contains(inputs, spoofmap.get(root.getHopID()));
if (ArrayUtils.contains(inputs, root) || spoof) {
Hop tmp = spoof ? spoofmap.get(root.getHopID()) : root;
int pos = ArrayUtils.indexOf(inputs, tmp);
LineageItem li = new LineageItem(LPLACEHOLDER+pos,
"Create"+String.valueOf(root.getHopID()));
operands.put(tmp.getHopID(), li);
return;
}
for (int i = 0; i < root.getInput().size(); i++)
rConstructLineageFromHops(root.getInput().get(i), inputs, operands, spoofmap);
LineageItem li = null;
LineageItem[] LIinputs = root.getInput().stream()
.map(h->ArrayUtils.contains(inputs, spoofmap.get(h.getHopID())) ? spoofmap.get(h.getHopID()) : h)
.map(h->operands.get(h.getHopID()))
.toArray(LineageItem[]::new);
String name = Dag.getNextUniqueVarname(root.getDataType());
if (root instanceof ReorgOp)
li = new LineageItem(name, "r'", LIinputs);
else if (root instanceof UnaryOp) {
String opcode = ((UnaryOp) root).getOp().toString();
li = new LineageItem(name, opcode, LIinputs);
}
else if (root instanceof AggBinaryOp)
li = new LineageItem(name, "ba+*", LIinputs);
else if (root instanceof BinaryOp)
li = new LineageItem(name, ((BinaryOp)root).getOp().toString(), LIinputs);
else if (root instanceof TernaryOp) {
String opcode = ((TernaryOp) root).getOp().toString();
li = new LineageItem(name, opcode, LIinputs);
}
else if (root instanceof AggUnaryOp) {
AggOp op = ((AggUnaryOp) root).getOp();
Direction dir = ((AggUnaryOp) root).getDirection();
String opcode = PartialAggregate.getOpcode(op, dir);
li = new LineageItem(name, opcode, LIinputs);
}
else if (root instanceof IndexingOp)
li = new LineageItem(name, "rightIndex", LIinputs);
else if (root instanceof SpoofFusedOp)
li = LineageCodegenItem.getCodegenLTrace(((SpoofFusedOp) root).getClassName());
else if (root instanceof LiteralOp) { //TODO: remove redundancy
StringBuilder sb = new StringBuilder(root.getName());
sb.append(Instruction.VALUETYPE_PREFIX);
sb.append(root.getDataType().toString());
sb.append(Instruction.VALUETYPE_PREFIX);
sb.append(root.getValueType().toString());
sb.append(Instruction.VALUETYPE_PREFIX);
sb.append(true); //isLiteral = true
li = new LineageItem(sb.toString());
}
else
throw new DMLRuntimeException("Unsupported hop: "+root.getOpString());
//TODO: include all the other hops
operands.put(root.getHopID(), li);
root.setVisited();
}
@Deprecated
@SuppressWarnings("unused")
public static LineageItem rDecompress(LineageItem item) {
if (item.getType() == LineageItemType.Dedup) {
LineageItem dedupInput = rDecompress(item.getInputs()[0]);
ArrayList<LineageItem> inputs = new ArrayList<>();
for (LineageItem li : item.getInputs()[1].getInputs())
inputs.add(rDecompress(li));
LineageItem li = new LineageItem(item.getInputs()[1].getData(),
item.getInputs()[1].getOpcode(), inputs.toArray(new LineageItem[0]));
li.resetVisitStatusNR();
rSetDedupInputOntoOutput(item.getData(), li, dedupInput);
li.resetVisitStatusNR();
return li;
}
else {
ArrayList<LineageItem> inputs = new ArrayList<>();
if (item.getInputs() != null) {
for (LineageItem li : item.getInputs())
inputs.add(rDecompress(li));
}
return new LineageItem(
item.getData(), item.getOpcode(), inputs.toArray(new LineageItem[0]));
}
}
public static void writeTraceToHDFS(String trace, String fname) {
try {
HDFSTool.writeStringToHDFS(trace, fname);
FileSystem fs = IOUtilFunctions.getFileSystem(fname);
if (fs instanceof LocalFileSystem) {
Path path = new Path(fname);
IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
}
} catch (IOException e) {
throw new DMLRuntimeException(e);
}
}
private static void rSetDedupInputOntoOutput(String name, LineageItem item, LineageItem dedupInput) {
if (item.isVisited())
return;
if (item.getInputs() != null)
for (int i = 0; i < item.getInputs().length; i++) {
LineageItem li = item.getInputs()[i];
//replace CPOperand literals (placeholders)
//TODO should use the same placeholder meta data as codegen
if( li.getType() == LineageItemType.Literal ) {
CPOperand tmp = new CPOperand(li.getData());
if( !tmp.isLiteral() && tmp.getName().equals(name) )
item.getInputs()[i] = dedupInput;
}
if (li.getType() == LineageItemType.Creation) {
item.getInputs()[i] = dedupInput;
}
rSetDedupInputOntoOutput(name, li, dedupInput);
}
item.setVisited();
}
public static LineageItem replace(LineageItem root, LineageItem liOld, LineageItem liNew) {
if( liNew == null )
throw new DMLRuntimeException("Invalid null lineage item for "+liOld.getId());
root.resetVisitStatusNR();
rReplaceNR(root, liOld, liNew);
root.resetVisitStatusNR();
return root;
}
/**
* Non-recursive equivalent of {@link #rReplace(LineageItem, LineageItem, LineageItem)}
* for robustness with regard to stack overflow errors.
*
* @param current Current lineage item
* @param liOld Old lineage item
* @param liNew New Lineage item.
*/
public static void rReplaceNR(LineageItem current, LineageItem liOld, LineageItem liNew) {
Stack<LineageItem> q = new Stack<>();
q.push(current);
while( !q.empty() ) {
LineageItem tmp = q.pop();
if( tmp.isVisited() || tmp.getInputs() == null )
continue;
//process children until old item found, then replace
for(int i=0; i<tmp.getInputs().length; i++) {
LineageItem ctmp = tmp.getInputs()[i];
if (liOld.getId() == ctmp.getId() && liOld.equals(ctmp))
tmp.setInput(i, liNew);
else
q.push(ctmp);
}
tmp.setVisited(true);
}
}
@Deprecated
@SuppressWarnings("unused")
private static void rReplace(LineageItem current, LineageItem liOld, LineageItem liNew) {
if( current.isVisited() || current.getInputs() == null )
return;
if( liNew == null )
throw new DMLRuntimeException("Invalid null lineage item for "+liOld.getId());
//process children until old item found, then replace
for(int i=0; i<current.getInputs().length; i++) {
LineageItem tmp = current.getInputs()[i];
if (liOld.equals(tmp))
current.setInput(i, liNew);
else
rReplace(tmp, liOld, liNew);
}
current.setVisited();
}
public static void replaceDagLeaves(ExecutionContext ec, LineageItem root, CPOperand[] newLeaves) {
//find and replace the placeholder leaves
root.resetVisitStatusNR();
rReplaceDagLeaves(root, LineageItemUtils.getLineage(ec, newLeaves));
root.resetVisitStatusNR();
}
public static void rReplaceDagLeaves(LineageItem root, LineageItem[] newleaves) {
if (root.isVisited() || root.isLeaf())
return;
for (int i=0; i<root.getInputs().length; i++) {
LineageItem li = root.getInputs()[i];
if (li.isLeaf() && li.getType() != LineageItemType.Literal
&& li.getData().startsWith(LPLACEHOLDER))
//order-preserving replacement. IN#<xxx> represents relative position xxx
root.setInput(i, newleaves[Integer.parseInt(li.getData().substring(3))]);
else
rReplaceDagLeaves(li, newleaves);
}
root.setVisited();
}
public static void rGetDagLeaves(HashSet<LineageItem> leaves, LineageItem root) {
if (root.isVisited())
return;
if (root.isLeaf())
leaves.add(root);
else {
for (LineageItem li : root.getInputs())
rGetDagLeaves(leaves, li);
}
root.setVisited();
}
public static void checkCycles(LineageItem current) {
current.resetVisitStatusNR();
rCheckCycles(current, new HashSet<Long>(), true);
current.resetVisitStatusNR();
}
public static void rCheckCycles(LineageItem current, Set<Long> probe, boolean useObjIdent) {
if( current.isVisited() )
return;
long id = useObjIdent ? System.identityHashCode(current) : current.getId();
if( probe.contains(id) )
throw new DMLRuntimeException("Cycle detected for "+current.toString());
probe.add(id);
if( current.getInputs() != null )
for(LineageItem li : current.getInputs())
rCheckCycles(li, probe, useObjIdent);
current.setVisited();
}
public static boolean containsRandDataGen(HashSet<LineageItem> entries, LineageItem root) {
if (entries.contains(root) || root.isVisited())
return false;
boolean isRand = isNonDeterministic(root);
if (!root.isLeaf() && !isRand)
for (LineageItem input : root.getInputs())
isRand |= containsRandDataGen(entries, input);
root.setVisited();
return isRand;
}
private static boolean isNonDeterministic(LineageItem li) {
if (li.getType() != LineageItemType.Creation)
return false;
boolean isND = false;
DataGenCPInstruction cprand = null;
RandSPInstruction sprand = null;
Instruction ins = InstructionParser.parseSingleInstruction(li.getData());
if (ins instanceof DataGenCPInstruction)
cprand = (DataGenCPInstruction)ins;
else if (ins instanceof RandSPInstruction)
sprand = (RandSPInstruction)ins;
else
return false;
switch(li.getOpcode().toUpperCase())
{
case "RAND":
if (cprand != null)
if ((cprand.getMinValue() != cprand.getMaxValue()) || (cprand.getSparsity() != 1))
isND = true;
if (sprand!= null)
if ((sprand.getMinValue() != sprand.getMaxValue()) || (sprand.getSparsity() != 1))
isND = true;
//NOTE:It is hard to detect in runtime if rand was called with unspecified seed
//as -1 is already replaced by computed seed. Solution is to unmark for caching in
//compile time. That way we can differentiate between given and unspecified seed.
break;
case "SAMPLE":
isND = true;
break;
default:
isND = false;
break;
}
//TODO: add 'read' in this list
return isND;
}
public static LineageItem[] getLineageItemInputstoSB(ArrayList<String> inputs, ExecutionContext ec) {
if (ReuseCacheType.isNone() && !DMLScript.LINEAGE_DEDUP)
return null;
ArrayList<CPOperand> CPOpInputs = inputs.size() > 0 ? new ArrayList<>() : null;
for (int i=0; i<inputs.size(); i++) {
Data value = ec.getVariable(inputs.get(i));
if (value != null) {
CPOpInputs.add(new CPOperand(value instanceof ScalarObject ? value.toString() : inputs.get(i),
value.getValueType(), value.getDataType()));
}
}
return(CPOpInputs != null ? LineageItemUtils.getLineage(ec,
CPOpInputs.toArray(new CPOperand[CPOpInputs.size()])) : null);
}
public static void addAllDataLineage(ExecutionContext ec) {
for( Entry<String, Data> e : ec.getVariables().entrySet() ) {
if( e.getValue() instanceof CacheableData<?> ) {
CacheableData<?> cdata = (CacheableData<?>) e.getValue();
//only createvar instruction with pREAD prefix added to lineage
String fromVar = org.apache.sysds.lops.Data.PREAD_PREFIX+e.getKey();
ec.traceLineage(VariableCPInstruction.prepCreatevarInstruction(
fromVar, "CacheableData::"+cdata.getUniqueID(), false, "binary"));
//move from pREADx to x
ec.traceLineage(VariableCPInstruction.prepMoveInstruction(fromVar, e.getKey()));
}
}
}
}