| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.mrql; |
| |
| import java.util.Iterator; |
| import org.apache.mrql.gen.*; |
| |
| |
| /** Embeds provenance information to queries for debugging and tracing */ |
| public class Provenance extends Streaming { |
| // set it to true for fine-grained provenance |
| static boolean fine_grain = false; |
| |
| /** map {((k,v),p)} to {(k,(v,p))} */ |
| private static Tree flip ( Tree e ) { |
| Tree nv = new_var(); |
| return #<cmap(lambda(`nv,bag(tuple(nth(nth(`nv,0),0), |
| tuple(nth(nth(`nv,0),1), |
| nth(`nv,1))))), |
| `e)>; |
| } |
| |
| /** map {((v,b),p)} to {((v,p),b)} */ |
| private static Tree flipr ( Tree e ) { |
| Tree nv = new_var(); |
| return #<cmap(lambda(`nv,bag(tuple(tuple(nth(nth(`nv,0),0), |
| nth(`nv,1)), |
| nth(nth(`nv,0),1)))), |
| `e)>; |
| } |
| |
| /** map {(v,p)} to {v} */ |
| private static Tree first ( Tree e ) { |
| Tree nv = new_var(); |
| return #<cmap(lambda(`nv,bag(nth(`nv,0))),`e)>; |
| } |
| |
| /** map {(v,p)} to {p} */ |
| private static Tree second ( Tree e ) { |
| Tree nv = new_var(); |
| return #<cmap(lambda(`nv,bag(nth(`nv,1))),`e)>; |
| } |
| |
| /** The nodes of the query AST */ |
| static Trees exprs = #[]; |
| |
| /** Construct a provenance tuple |
| * @param expr the AST that corresponds to this value |
| * @param value the value |
| * @param provenance the input provenance of this value |
| * @return a provenance tuple |
| */ |
| private static Tree prov ( Tree expr, Tree value, Trees provenance ) { |
| Tree tp = TypeInference.type_inference(expr); |
| exprs = exprs.append(tp); |
| exprs = exprs.append(expr); |
| int loc = exprs.length()-1; |
| if (value.is_variable()) |
| return #<tuple(`value,Lineage(`loc,`value,...provenance))>; |
| Tree nv = new_var(); |
| return #<let(`nv,`value,tuple(`nv,Lineage(`loc,`nv,...provenance)))>; |
| } |
| |
| private static Tree prov ( Tree expr, Tree value, Tree provenance ) { |
| return prov(expr,value,#[`provenance]); |
| } |
| |
| private static Tree lift_var ( Tree var, Tree nvar, Tree fvar, Tree e ) { |
| match e { |
| case cmap(`f,`x): |
| if (fine_grain || contains_trace(f)) |
| fail; |
| // don't lift the cmap function in coarse-grained provenance |
| Tree nf = subst(var,fvar,f); |
| Tree nx = lift_var(var,nvar,fvar,x); |
| return #<cmap(`nf,`nx)>; |
| case `f(...as): |
| Trees bs = #[ ]; |
| for ( Tree a: as ) |
| bs = bs.append(lift_var(var,nvar,fvar,a)); |
| return #<`f(...bs)>; |
| }; |
| return (e.equals(var)) ? nvar : e; |
| } |
| |
| private static boolean contains_trace ( Tree e ) { |
| match e { |
| case trace(`msg,`tp,`x): |
| return true; |
| case `f(...as): |
| for ( Tree a: as ) |
| if (contains_trace(a)) |
| return true; |
| }; |
| return false; |
| } |
| |
| /** Lift the expression e of type {t} to {(t,{provenance})} */ |
| private static Tree embedB ( Tree e ) { |
| match e { |
| case repeat(lambda(`v,`u),`x,`n): |
| Tree nv = new_var(); |
| Tree ex = embedB(x); |
| Tree ef = subst(v,nv,flipr(embedB(u))); |
| return #<repeat(lambda(`nv,`ef),`ex,`n)>; |
| case cmap(lambda(`v,`b),`x): |
| if (fine_grain || contains_trace(b)) |
| fail; |
| // coarse grain |
| Tree nv = new_var(); |
| Tree nw = new_var(); |
| Tree ex = embedB(x); |
| Tree nb = subst(v,#<nth(`nv,0)>,b); |
| Tree p = prov(e,nw,#<nth(`nv,1)>); |
| return #<cmap(lambda(`nv,cmap(lambda(`nw,bag(`p)),`nb)),`ex)>; |
| case cmap(lambda(`v,`b),`x): |
| // fine grain |
| Tree nv = new_var(); |
| Tree nw = new_var(); |
| Tree ex = embedB(x); |
| Tree ef = subst(v,nv,embedB(b)); |
| Tree p = prov(e,#<nth(`nw,0)>,#<nth(`nw,1)>); |
| return #<cmap(lambda(`nv,cmap(lambda(`nw,bag(`p)),`ef)),`ex)>; |
| case groupBy(`x): |
| Tree nv = new_var(); |
| Tree ex = flip(embedB(x)); |
| Tree val = #<tuple(nth(`nv,0),`(first(#<nth(`nv,1)>)))>; |
| Tree p = prov(e,val,second(#<nth(`nv,1)>)); |
| return #<cmap(lambda(`nv,bag(`p)),groupBy(`ex))>; |
| case orderBy(`x): |
| Tree nv = new_var(); |
| Tree ex = flip(embedB(x)); |
| Tree val = #<tuple(nth(`nv,0),`(first(#<nth(`nv,1)>)))>; |
| Tree p = prov(e,val,second(#<nth(`nv,1)>)); |
| return #<cmap(lambda(`nv,bag(`p)),orderBy(`ex))>; |
| case coGroup(`x,`y): |
| Tree nv = new_var(); |
| Tree xv = new_var(); |
| Tree yv = new_var(); |
| Tree ex = flip(embedB(x)); |
| Tree ey = flip(embedB(y)); |
| Tree val = #<tuple(nth(`nv,0),tuple(`(first(xv)),`(first(yv))))>; |
| Tree p = prov(e,val,#[`(second(xv)),`(second(yv))]); |
| return #<cmap(lambda(`nv,let(`xv,nth(nth(`nv,1),0), |
| let(`yv,nth(nth(`nv,1),1),bag(`p)))), |
| coGroup(`ex,`ey))>; |
| case call(source,...): |
| Tree nv = new_var(); |
| Tree p = prov(e,nv,#[ ]); |
| return #<cmap(lambda(`nv,bag(`p)),`e)>; |
| case bag(...as): |
| Trees es = #[ ]; |
| for ( Tree a: as ) |
| es = es.append(embedP(a)); |
| return #<bag(...es)>; |
| case nth(`x,`n): |
| Tree nv = new_var(); |
| Tree nw = new_var(); |
| Tree ex = embedP(x); |
| Tree p = prov(e,nv,#<nth(`nw,1)>); |
| return #<let(`nw,`ex,cmap(lambda(`nv,bag(`p)), |
| nth(nth(`nw,0),`n)))>; |
| case project(`x,`a): |
| Tree nv = new_var(); |
| Tree nw = new_var(); |
| Tree ex = embedP(x); |
| Tree p = prov(e,nv,#<nth(`nw,1)>); |
| return #<let(`nw,`ex,cmap(lambda(`nv,bag(`p)), |
| project(nth(`nw,0),`a)))>; |
| case if(`pred,`x,`y): |
| Tree nv = new_var(); |
| Tree nw = new_var(); |
| Tree ep = embedP(pred); |
| Tree ex = embedB(x); |
| Tree ey = embedB(y); |
| Tree p = prov(e,#<nth(`nw,0)>,#[nth(`nv,1),nth(`nw,1)]); |
| return #<let(`nv,`ep, |
| cmap(lambda(`nw,bag(`p)),if(nth(`nv,0),`ex,`ey)))>; |
| case typed(`x,`T(`tp)): |
| if (!is_collection(T)) |
| fail; |
| Tree nv = new_var(); |
| Tree ex = embedB(x); |
| Tree p = prov(e,#<typed(nth(`nv,0),`tp)>,#<nth(`nv,1)>); |
| return #<cmap(lambda(`nv,bag(`p)),`ex)>; |
| case trace(`msg,`tp,`x): |
| Tree nv = new_var(); |
| Tree ex = embedB(x); |
| Tree p = prov(e,#<nth(`nv,0)>,#<nth(`nv,1)>); |
| return #<cmap(lambda(`nv,bag(`p)),`ex)>; |
| case `v: |
| if (v.is_variable()) |
| if (Interpreter.repeat_variables.member(v)) |
| return v; |
| else if (Interpreter.lookup_global_binding(v.toString()) != null) { |
| Tree nv = new_var(); |
| Tree p = prov(v,nv,#[ ]); |
| return #<cmap(lambda(`nv,bag(`p)),`v)>; |
| } else return v; |
| }; |
| match TypeInference.type_inference(e) { |
| case `T(_): |
| if (!is_collection(T)) |
| fail; |
| Tree nv = new_var(); |
| Tree p = prov(e,nv,#[]); |
| return #<cmap(lambda(`nv,bag(`p)),`e)>; |
| }; |
| return embedP(e); |
| } |
| |
| /** Lift the expression e of type t to (t,provenance) */ |
| public static Tree embedP ( Tree e ) { |
| match TypeInference.type_inference(e) { |
| case `T(_): |
| if (!is_collection(T)) |
| fail; |
| Tree nv = new_var(); |
| Tree ex = embedB(e); |
| Tree p = prov(e,first(nv),second(nv)); |
| return #<let(`nv,`ex,`p)>; |
| }; |
| match e { |
| case reduce(`m,`x): |
| Tree nv = new_var(); |
| Tree ex = embedB(x); |
| Tree p = prov(e,#<reduce(`m,`(first(nv)))>,second(nv)); |
| return #<let(`nv,`ex,`p)>; |
| case tuple(...as): |
| Tree nv = new_var(); |
| Trees es = #[ ]; |
| Trees vs = #[ ]; |
| Trees ps = #[ ]; |
| int i = 0; |
| for ( Tree a: as ) { |
| es = es.append(embedP(a)); |
| vs = vs.append(#<nth(nth(`nv,`i),0)>); |
| ps = ps.append(#<nth(nth(`nv,`i),1)>); |
| i++; |
| }; |
| Tree p = prov(e,#<tuple(...vs)>,ps); |
| return #<let(`nv,tuple(...es),`p)>; |
| case record(...as): |
| Tree nv = new_var(); |
| Trees es = #[ ]; |
| Trees vs = #[ ]; |
| Trees ps = #[ ]; |
| int i = 0; |
| for ( Tree a: as ) |
| match a { |
| case bind(`v,`b): |
| es = es.append(embedP(b)); |
| vs = vs.append(#<bind(`v,nth(nth(`nv,`i),0))>); |
| ps = ps.append(#<nth(nth(`nv,`i),1)>); |
| i++; |
| }; |
| Tree p = prov(e,#<record(...vs)>,ps); |
| return #<let(`nv,tuple(...es),`p)>; |
| case call(`f,...as): |
| Tree nv = new_var(); |
| Trees es = #[ ]; |
| Trees vs = #[ ]; |
| Trees ps = #[ ]; |
| int i = 0; |
| for ( Tree a: as ) { |
| es = es.append(embedP(a)); |
| vs = vs.append(#<nth(nth(`nv,`i),0)>); |
| ps = ps.append(#<nth(nth(`nv,`i),1)>); |
| i++; |
| }; |
| Tree p = prov(e,#<call(`f,...vs)>,ps); |
| return #<let(`nv,tuple(...es),`p)>; |
| case nth(`x,`n): |
| Tree nv = new_var(); |
| Tree ex = embedP(x); |
| Tree p = prov(e,#<nth(nth(`nv,0),`n)>,#<nth(`nv,1)>); |
| return #<let(`nv,`ex,`p)>; |
| case project(`x,`a): |
| Tree nv = new_var(); |
| Tree ex = embedP(x); |
| Tree p = prov(e,#<project(nth(`nv,0),`a)>,#<nth(`nv,1)>); |
| return #<let(`nv,`ex,`p)>; |
| case if(`pred,`x,`y): |
| Tree nv = new_var(); |
| Tree ep = embedP(pred); |
| Tree ex = embedP(x); |
| Tree ey = embedP(y); |
| Tree p = prov(e,#<if(nth(nth(`nv,0),0),nth(nth(`nv,1),0),nth(nth(`nv,2),0))>, |
| #[nth(nth(`nv,0),1),nth(nth(`nv,1),1),nth(nth(`nv,2),1)]); |
| return #<let(`nv,tuple(`ep,`ex,`ey),`p)>; |
| case index(`x,`n): |
| Tree ex = embedB(x); |
| return #<index(`ex,`n)>; |
| case true: return prov(e,e,#[ ]); |
| case false: return prov(e,e,#[ ]); |
| case typed(tagged_union(`n,`u),`tp): |
| Tree nv = new_var(); |
| Tree ex = embedP(u); |
| Tree p = prov(e,#<typed(tagged_union(`n,nth(`nv,0)),`tp)>,#<nth(`nv,1)>); |
| return #<let(`nv,`ex,`p)>; |
| case typed(union_value(`u),`tp): |
| Tree nv = new_var(); |
| Tree ex = embedP(u); |
| Tree p = prov(e,#<typed(union_value(nth(`nv,0)),`tp)>,#<nth(`nv,1)>); |
| return #<let(`nv,`ex,`p)>; |
| case typed(`x,`tp): |
| Tree nv = new_var(); |
| Tree ex = embedP(x); |
| Tree p = prov(e,#<typed(nth(`nv,0),`tp)>,#<nth(`nv,1)>); |
| return #<let(`nv,`ex,`p)>; |
| case trace(`msg,`tp,`x): |
| Tree nv = new_var(); |
| Tree ex = embedP(x); |
| Tree p = prov(e,#<nth(`nv,0)>,#<nth(`nv,1)>); |
| return #<let(`nv,`ex,`p)>; |
| case `f(...as): |
| Tree nv = new_var(); |
| Trees es = #[ ]; |
| Trees vs = #[ ]; |
| Trees ps = #[ ]; |
| int i = 0; |
| for ( Tree a: as ) { |
| es = es.append(embedP(a)); |
| vs = vs.append(#<nth(nth(`nv,`i),0)>); |
| ps = ps.append(#<nth(nth(`nv,`i),1)>); |
| i++; |
| }; |
| Tree p = prov(e,#<`f(...vs)>,ps); |
| return #<let(`nv,tuple(...es),`p)>; |
| case `v: |
| if (v.is_variable()) |
| if (Interpreter.lookup_global_binding(v.toString()) != null) |
| return prov(e,e,#[ ]); |
| else return v; |
| }; |
| return prov(e,e,#[ ]); |
| } |
| |
| /** Lift the expression e to an expression with provenance annotations */ |
| public static Tree embed_provenance ( Tree e, boolean fine_grained ) { |
| fine_grain = fine_grained; |
| exprs = #[ ]; |
| Tree ne = SimplifyTerm(normalize_term(e)); |
| Tree tp = TypeInference.type_inference(ne); |
| ne = SimplifyTerm(embed_missing_cmaps(ne)); |
| TypeInference.type_inference(ne); |
| match TypeInference.type_inference(e) { |
| case `T(_): |
| if (!is_collection(T)) |
| fail; |
| ne = SimplifyTerm(embedB(ne)); |
| TypeInference.type_inference(ne); |
| case _: ne = SimplifyTerm(embedP(ne)); |
| }; |
| ne = SimplifyTerm(convert_to_algebra(ne)); |
| TypeInference.type_inference(ne); |
| return #<provenance(`ne,`tp,...exprs)>; |
| } |
| |
| private static boolean member ( Bag s, MRData x ) { |
| for ( MRData e: s ) |
| if (e.equals(x)) |
| return true; |
| return false; |
| } |
| |
| /** Print the provenance trace and leaf nodes |
| * (the data sources that have contributed to the output value) */ |
| private static void print_lineage ( MRData value, boolean trace_only, String tab ) { |
| if (value instanceof Tuple) { |
| Tuple p = ((Tuple)value); |
| Tree e = exprs.nth(((MR_int)p.get(0)).get()); |
| match e { |
| case call(source,_,`path,...): |
| if (trace_only) |
| fail; |
| match exprs.nth(((MR_int)p.get(0)).get()-1) { |
| case `T(`tp): |
| if (!is_collection(T)) |
| fail; |
| System.out.println(tab+path.stringValue()+": " |
| +Printer.print(p.get(1),tp)); |
| } |
| case trace(`msg,`T(`tp),`x): |
| if (!is_collection(T)) |
| fail; |
| System.out.println(tab+msg.stringValue()+": "+Printer.print(p.get(1),tp)); |
| print_lineage(p.get(2),true,tab+" "); |
| case trace(`msg,`tp,`x): |
| System.out.println(tab+msg.stringValue()+": "+Printer.print(p.get(1),tp)); |
| print_lineage(p.get(2),true,tab+" "); |
| case _: |
| for ( int i = 2; i < p.size(); i++ ) |
| print_lineage(p.get(i),trace_only,tab); |
| } |
| } else if (value instanceof Bag) { |
| ((Bag)value).materialize(); |
| for ( MRData e: (Bag)value ) |
| print_lineage(e,trace_only,tab); |
| } |
| } |
| |
| /** Print the data sources that contribute to the output */ |
| public static void display ( MRData value, Tree tp, Trees prov_exprs ) { |
| exprs = prov_exprs; |
| match tp { |
| case `T(`etp): |
| if (!is_collection(T)) |
| fail; |
| if (value instanceof Bag) |
| for ( MRData e: (Bag)value ) { |
| System.out.println(Printer.print(((Tuple)e).get(0),etp)); |
| print_lineage(((Tuple)e).get(1),false," "); |
| } else if (value instanceof MR_dataset) |
| for ( MRData e: ((MR_dataset)value).dataset().take(Config.max_bag_size_print) ) { |
| System.out.println(Printer.print(((Tuple)e).get(0),etp)); |
| print_lineage(((Tuple)e).get(1),false," "); |
| } |
| case _: |
| System.out.println(Printer.print(((Tuple)value).get(0),tp)); |
| print_lineage(((Tuple)value).get(1),false," "); |
| } |
| } |
| |
| /** Return the query result without the provenance annotations */ |
| public static MRData getValue ( MRData value ) { |
| if (value instanceof Bag) { |
| final Iterator<MRData> i = ((Bag)value).iterator(); |
| return new Bag(new BagIterator() { |
| public MRData next () { return ((Tuple)i.next()).get(0); } |
| public boolean hasNext() { return i.hasNext(); } |
| }); |
| }; |
| return ((Tuple)value).get(0); |
| } |
| } |