blob: 0ae272cf458d3b62994f7542154feea438bd4eaf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import java.util.Iterator;
import org.apache.mrql.gen.*;
/** Embeds provenance information to queries for debugging and tracing */
public class Provenance extends Streaming {
// set it to true for fine-grained provenance
static boolean fine_grain = false;
/** map {((k,v),p)} to {(k,(v,p))} */
private static Tree flip ( Tree e ) {
Tree nv = new_var();
return #<cmap(lambda(`nv,bag(tuple(nth(nth(`nv,0),0),
tuple(nth(nth(`nv,0),1),
nth(`nv,1))))),
`e)>;
}
/** map {((v,b),p)} to {((v,p),b)} */
private static Tree flipr ( Tree e ) {
Tree nv = new_var();
return #<cmap(lambda(`nv,bag(tuple(tuple(nth(nth(`nv,0),0),
nth(`nv,1)),
nth(nth(`nv,0),1)))),
`e)>;
}
/** map {(v,p)} to {v} */
private static Tree first ( Tree e ) {
Tree nv = new_var();
return #<cmap(lambda(`nv,bag(nth(`nv,0))),`e)>;
}
/** map {(v,p)} to {p} */
private static Tree second ( Tree e ) {
Tree nv = new_var();
return #<cmap(lambda(`nv,bag(nth(`nv,1))),`e)>;
}
/** The nodes of the query AST */
static Trees exprs = #[];
/** Construct a provenance tuple
* @param expr the AST that corresponds to this value
* @param value the value
* @param provenance the input provenance of this value
* @return a provenance tuple
*/
private static Tree prov ( Tree expr, Tree value, Trees provenance ) {
Tree tp = TypeInference.type_inference(expr);
exprs = exprs.append(tp);
exprs = exprs.append(expr);
int loc = exprs.length()-1;
if (value.is_variable())
return #<tuple(`value,Lineage(`loc,`value,...provenance))>;
Tree nv = new_var();
return #<let(`nv,`value,tuple(`nv,Lineage(`loc,`nv,...provenance)))>;
}
private static Tree prov ( Tree expr, Tree value, Tree provenance ) {
return prov(expr,value,#[`provenance]);
}
private static Tree lift_var ( Tree var, Tree nvar, Tree fvar, Tree e ) {
match e {
case cmap(`f,`x):
if (fine_grain || contains_trace(f))
fail;
// don't lift the cmap function in coarse-grained provenance
Tree nf = subst(var,fvar,f);
Tree nx = lift_var(var,nvar,fvar,x);
return #<cmap(`nf,`nx)>;
case `f(...as):
Trees bs = #[ ];
for ( Tree a: as )
bs = bs.append(lift_var(var,nvar,fvar,a));
return #<`f(...bs)>;
};
return (e.equals(var)) ? nvar : e;
}
private static boolean contains_trace ( Tree e ) {
match e {
case trace(`msg,`tp,`x):
return true;
case `f(...as):
for ( Tree a: as )
if (contains_trace(a))
return true;
};
return false;
}
/** Lift the expression e of type {t} to {(t,{provenance})} */
private static Tree embedB ( Tree e ) {
match e {
case repeat(lambda(`v,`u),`x,`n):
Tree nv = new_var();
Tree ex = embedB(x);
Tree ef = subst(v,nv,flipr(embedB(u)));
return #<repeat(lambda(`nv,`ef),`ex,`n)>;
case cmap(lambda(`v,`b),`x):
if (fine_grain || contains_trace(b))
fail;
// coarse grain
Tree nv = new_var();
Tree nw = new_var();
Tree ex = embedB(x);
Tree nb = subst(v,#<nth(`nv,0)>,b);
Tree p = prov(e,nw,#<nth(`nv,1)>);
return #<cmap(lambda(`nv,cmap(lambda(`nw,bag(`p)),`nb)),`ex)>;
case cmap(lambda(`v,`b),`x):
// fine grain
Tree nv = new_var();
Tree nw = new_var();
Tree ex = embedB(x);
Tree ef = subst(v,nv,embedB(b));
Tree p = prov(e,#<nth(`nw,0)>,#<nth(`nw,1)>);
return #<cmap(lambda(`nv,cmap(lambda(`nw,bag(`p)),`ef)),`ex)>;
case groupBy(`x):
Tree nv = new_var();
Tree ex = flip(embedB(x));
Tree val = #<tuple(nth(`nv,0),`(first(#<nth(`nv,1)>)))>;
Tree p = prov(e,val,second(#<nth(`nv,1)>));
return #<cmap(lambda(`nv,bag(`p)),groupBy(`ex))>;
case orderBy(`x):
Tree nv = new_var();
Tree ex = flip(embedB(x));
Tree val = #<tuple(nth(`nv,0),`(first(#<nth(`nv,1)>)))>;
Tree p = prov(e,val,second(#<nth(`nv,1)>));
return #<cmap(lambda(`nv,bag(`p)),orderBy(`ex))>;
case coGroup(`x,`y):
Tree nv = new_var();
Tree xv = new_var();
Tree yv = new_var();
Tree ex = flip(embedB(x));
Tree ey = flip(embedB(y));
Tree val = #<tuple(nth(`nv,0),tuple(`(first(xv)),`(first(yv))))>;
Tree p = prov(e,val,#[`(second(xv)),`(second(yv))]);
return #<cmap(lambda(`nv,let(`xv,nth(nth(`nv,1),0),
let(`yv,nth(nth(`nv,1),1),bag(`p)))),
coGroup(`ex,`ey))>;
case call(source,...):
Tree nv = new_var();
Tree p = prov(e,nv,#[ ]);
return #<cmap(lambda(`nv,bag(`p)),`e)>;
case bag(...as):
Trees es = #[ ];
for ( Tree a: as )
es = es.append(embedP(a));
return #<bag(...es)>;
case nth(`x,`n):
Tree nv = new_var();
Tree nw = new_var();
Tree ex = embedP(x);
Tree p = prov(e,nv,#<nth(`nw,1)>);
return #<let(`nw,`ex,cmap(lambda(`nv,bag(`p)),
nth(nth(`nw,0),`n)))>;
case project(`x,`a):
Tree nv = new_var();
Tree nw = new_var();
Tree ex = embedP(x);
Tree p = prov(e,nv,#<nth(`nw,1)>);
return #<let(`nw,`ex,cmap(lambda(`nv,bag(`p)),
project(nth(`nw,0),`a)))>;
case if(`pred,`x,`y):
Tree nv = new_var();
Tree nw = new_var();
Tree ep = embedP(pred);
Tree ex = embedB(x);
Tree ey = embedB(y);
Tree p = prov(e,#<nth(`nw,0)>,#[nth(`nv,1),nth(`nw,1)]);
return #<let(`nv,`ep,
cmap(lambda(`nw,bag(`p)),if(nth(`nv,0),`ex,`ey)))>;
case typed(`x,`T(`tp)):
if (!is_collection(T))
fail;
Tree nv = new_var();
Tree ex = embedB(x);
Tree p = prov(e,#<typed(nth(`nv,0),`tp)>,#<nth(`nv,1)>);
return #<cmap(lambda(`nv,bag(`p)),`ex)>;
case trace(`msg,`tp,`x):
Tree nv = new_var();
Tree ex = embedB(x);
Tree p = prov(e,#<nth(`nv,0)>,#<nth(`nv,1)>);
return #<cmap(lambda(`nv,bag(`p)),`ex)>;
case `v:
if (v.is_variable())
if (Interpreter.repeat_variables.member(v))
return v;
else if (Interpreter.lookup_global_binding(v.toString()) != null) {
Tree nv = new_var();
Tree p = prov(v,nv,#[ ]);
return #<cmap(lambda(`nv,bag(`p)),`v)>;
} else return v;
};
match TypeInference.type_inference(e) {
case `T(_):
if (!is_collection(T))
fail;
Tree nv = new_var();
Tree p = prov(e,nv,#[]);
return #<cmap(lambda(`nv,bag(`p)),`e)>;
};
return embedP(e);
}
/** Lift the expression e of type t to (t,provenance) */
public static Tree embedP ( Tree e ) {
match TypeInference.type_inference(e) {
case `T(_):
if (!is_collection(T))
fail;
Tree nv = new_var();
Tree ex = embedB(e);
Tree p = prov(e,first(nv),second(nv));
return #<let(`nv,`ex,`p)>;
};
match e {
case reduce(`m,`x):
Tree nv = new_var();
Tree ex = embedB(x);
Tree p = prov(e,#<reduce(`m,`(first(nv)))>,second(nv));
return #<let(`nv,`ex,`p)>;
case tuple(...as):
Tree nv = new_var();
Trees es = #[ ];
Trees vs = #[ ];
Trees ps = #[ ];
int i = 0;
for ( Tree a: as ) {
es = es.append(embedP(a));
vs = vs.append(#<nth(nth(`nv,`i),0)>);
ps = ps.append(#<nth(nth(`nv,`i),1)>);
i++;
};
Tree p = prov(e,#<tuple(...vs)>,ps);
return #<let(`nv,tuple(...es),`p)>;
case record(...as):
Tree nv = new_var();
Trees es = #[ ];
Trees vs = #[ ];
Trees ps = #[ ];
int i = 0;
for ( Tree a: as )
match a {
case bind(`v,`b):
es = es.append(embedP(b));
vs = vs.append(#<bind(`v,nth(nth(`nv,`i),0))>);
ps = ps.append(#<nth(nth(`nv,`i),1)>);
i++;
};
Tree p = prov(e,#<record(...vs)>,ps);
return #<let(`nv,tuple(...es),`p)>;
case call(`f,...as):
Tree nv = new_var();
Trees es = #[ ];
Trees vs = #[ ];
Trees ps = #[ ];
int i = 0;
for ( Tree a: as ) {
es = es.append(embedP(a));
vs = vs.append(#<nth(nth(`nv,`i),0)>);
ps = ps.append(#<nth(nth(`nv,`i),1)>);
i++;
};
Tree p = prov(e,#<call(`f,...vs)>,ps);
return #<let(`nv,tuple(...es),`p)>;
case nth(`x,`n):
Tree nv = new_var();
Tree ex = embedP(x);
Tree p = prov(e,#<nth(nth(`nv,0),`n)>,#<nth(`nv,1)>);
return #<let(`nv,`ex,`p)>;
case project(`x,`a):
Tree nv = new_var();
Tree ex = embedP(x);
Tree p = prov(e,#<project(nth(`nv,0),`a)>,#<nth(`nv,1)>);
return #<let(`nv,`ex,`p)>;
case if(`pred,`x,`y):
Tree nv = new_var();
Tree ep = embedP(pred);
Tree ex = embedP(x);
Tree ey = embedP(y);
Tree p = prov(e,#<if(nth(nth(`nv,0),0),nth(nth(`nv,1),0),nth(nth(`nv,2),0))>,
#[nth(nth(`nv,0),1),nth(nth(`nv,1),1),nth(nth(`nv,2),1)]);
return #<let(`nv,tuple(`ep,`ex,`ey),`p)>;
case index(`x,`n):
Tree ex = embedB(x);
return #<index(`ex,`n)>;
case true: return prov(e,e,#[ ]);
case false: return prov(e,e,#[ ]);
case typed(tagged_union(`n,`u),`tp):
Tree nv = new_var();
Tree ex = embedP(u);
Tree p = prov(e,#<typed(tagged_union(`n,nth(`nv,0)),`tp)>,#<nth(`nv,1)>);
return #<let(`nv,`ex,`p)>;
case typed(union_value(`u),`tp):
Tree nv = new_var();
Tree ex = embedP(u);
Tree p = prov(e,#<typed(union_value(nth(`nv,0)),`tp)>,#<nth(`nv,1)>);
return #<let(`nv,`ex,`p)>;
case typed(`x,`tp):
Tree nv = new_var();
Tree ex = embedP(x);
Tree p = prov(e,#<typed(nth(`nv,0),`tp)>,#<nth(`nv,1)>);
return #<let(`nv,`ex,`p)>;
case trace(`msg,`tp,`x):
Tree nv = new_var();
Tree ex = embedP(x);
Tree p = prov(e,#<nth(`nv,0)>,#<nth(`nv,1)>);
return #<let(`nv,`ex,`p)>;
case `f(...as):
Tree nv = new_var();
Trees es = #[ ];
Trees vs = #[ ];
Trees ps = #[ ];
int i = 0;
for ( Tree a: as ) {
es = es.append(embedP(a));
vs = vs.append(#<nth(nth(`nv,`i),0)>);
ps = ps.append(#<nth(nth(`nv,`i),1)>);
i++;
};
Tree p = prov(e,#<`f(...vs)>,ps);
return #<let(`nv,tuple(...es),`p)>;
case `v:
if (v.is_variable())
if (Interpreter.lookup_global_binding(v.toString()) != null)
return prov(e,e,#[ ]);
else return v;
};
return prov(e,e,#[ ]);
}
/** Lift the expression e to an expression with provenance annotations */
public static Tree embed_provenance ( Tree e, boolean fine_grained ) {
fine_grain = fine_grained;
exprs = #[ ];
Tree ne = SimplifyTerm(normalize_term(e));
Tree tp = TypeInference.type_inference(ne);
ne = SimplifyTerm(embed_missing_cmaps(ne));
TypeInference.type_inference(ne);
match TypeInference.type_inference(e) {
case `T(_):
if (!is_collection(T))
fail;
ne = SimplifyTerm(embedB(ne));
TypeInference.type_inference(ne);
case _: ne = SimplifyTerm(embedP(ne));
};
ne = SimplifyTerm(convert_to_algebra(ne));
TypeInference.type_inference(ne);
return #<provenance(`ne,`tp,...exprs)>;
}
private static boolean member ( Bag s, MRData x ) {
for ( MRData e: s )
if (e.equals(x))
return true;
return false;
}
/** Print the provenance trace and leaf nodes
* (the data sources that have contributed to the output value) */
private static void print_lineage ( MRData value, boolean trace_only, String tab ) {
if (value instanceof Tuple) {
Tuple p = ((Tuple)value);
Tree e = exprs.nth(((MR_int)p.get(0)).get());
match e {
case call(source,_,`path,...):
if (trace_only)
fail;
match exprs.nth(((MR_int)p.get(0)).get()-1) {
case `T(`tp):
if (!is_collection(T))
fail;
System.out.println(tab+path.stringValue()+": "
+Printer.print(p.get(1),tp));
}
case trace(`msg,`T(`tp),`x):
if (!is_collection(T))
fail;
System.out.println(tab+msg.stringValue()+": "+Printer.print(p.get(1),tp));
print_lineage(p.get(2),true,tab+" ");
case trace(`msg,`tp,`x):
System.out.println(tab+msg.stringValue()+": "+Printer.print(p.get(1),tp));
print_lineage(p.get(2),true,tab+" ");
case _:
for ( int i = 2; i < p.size(); i++ )
print_lineage(p.get(i),trace_only,tab);
}
} else if (value instanceof Bag) {
((Bag)value).materialize();
for ( MRData e: (Bag)value )
print_lineage(e,trace_only,tab);
}
}
/** Print the data sources that contribute to the output */
public static void display ( MRData value, Tree tp, Trees prov_exprs ) {
exprs = prov_exprs;
match tp {
case `T(`etp):
if (!is_collection(T))
fail;
if (value instanceof Bag)
for ( MRData e: (Bag)value ) {
System.out.println(Printer.print(((Tuple)e).get(0),etp));
print_lineage(((Tuple)e).get(1),false," ");
} else if (value instanceof MR_dataset)
for ( MRData e: ((MR_dataset)value).dataset().take(Config.max_bag_size_print) ) {
System.out.println(Printer.print(((Tuple)e).get(0),etp));
print_lineage(((Tuple)e).get(1),false," ");
}
case _:
System.out.println(Printer.print(((Tuple)value).get(0),tp));
print_lineage(((Tuple)value).get(1),false," ");
}
}
/** Return the query result without the provenance annotations */
public static MRData getValue ( MRData value ) {
if (value instanceof Bag) {
final Iterator<MRData> i = ((Bag)value).iterator();
return new Bag(new BagIterator() {
public MRData next () { return ((Tuple)i.next()).get(0); }
public boolean hasNext() { return i.hasNext(); }
});
};
return ((Tuple)value).get(0);
}
}