blob: 513d6298baa9dde74bc4853332e4fb5551d6da13 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import java.util.Iterator;
import org.apache.mrql.gen.*;
/** Embeds provenance information to queries for debugging and tracing */
public class Provenance extends Streaming {
// set it to true for fine-grained provenance
static boolean fine_grain = false;
/** map {((k,v),p)} to {(k,(v,p))} */
private static Tree flip ( Tree e ) {
Tree nv = new_var();
return #<cmap(lambda(`nv,bag(tuple(nth(nth(`nv,0),0),
tuple(nth(nth(`nv,0),1),
nth(`nv,1))))),
`e)>;
}
/** map {((v,b),p)} to {((v,p),b)} */
private static Tree flipr ( Tree e ) {
Tree nv = new_var();
return #<cmap(lambda(`nv,bag(tuple(tuple(nth(nth(`nv,0),0),
nth(`nv,1)),
nth(nth(`nv,0),1)))),
`e)>;
}
/** map {(v,p)} to {v} */
private static Tree first ( Tree e ) {
Tree nv = new_var();
return #<cmap(lambda(`nv,bag(nth(`nv,0))),`e)>;
}
/** map {(v,p)} to {p} */
private static Tree second ( Tree e ) {
Tree nv = new_var();
return #<cmap(lambda(`nv,bag(nth(`nv,1))),`e)>;
}
/** The nodes of the query AST */
private static Trees exprs = #[];
/** Construct a provenance tuple
* @param expr the AST that corresponds to this value
* @param value the value
* @param provenance the input provenance of this value
* @return a provenance tuple
*/
private static Tree prov ( Tree expr, Tree value, Trees provenance ) {
exprs = exprs.append(expr);
int loc = exprs.length()-1;
Tree nv = new_var();
return #<let(`nv,`value,tuple(`nv,Lineage(`loc,`nv,...provenance)))>;
}
private static Tree prov ( Tree expr, Tree value, Tree provenance ) {
return prov(expr,value,#[`provenance]);
}
private static Tree lift_var ( Tree var, Tree nvar, Tree fvar, Tree e ) {
match e {
case cmap(`f,`x):
if (fine_grain)
fail;
// don't lift the cmap function in coarse-grained provenance
Tree nf = subst(var,fvar,f);
Tree nx = lift_var(var,nvar,fvar,x);
return #<cmap(`nf,`nx)>;
case `f(...as):
Trees bs = #[ ];
for ( Tree a: as )
bs = bs.append(lift_var(var,nvar,fvar,a));
return #<`f(...bs)>;
};
return (e.equals(var)) ? nvar : e;
}
/** Lift the expression e of type {t} to {(t,{provenance})} */
private static Tree embedB ( Tree e ) {
match e {
case repeat(lambda(`v,`u),`x,`n):
Tree nv = new_var();
Tree nn = new_var();
Tree ex = embedB(x);
Tree ef = lift_var(v,nv,#<cmap(lambda(`nn,bag(nth(`nn,0))),`nv)>,
flipr(embedB(u)));
return #<repeat(lambda(`nv,`ef),`ex,`n)>;
case cmap(lambda(`v,`b),`x):
if (fine_grain)
fail;
Tree nv = new_var();
Tree nw = new_var();
Tree ex = embedB(x);
Tree nb = subst(v,#<nth(`nv,0)>,b);
return #<cmap(lambda(`nv,cmap(lambda(`nw,bag(tuple(`nw,nth(`nv,1)))),
`nb)),
`ex)>;
case cmap(lambda(`v,`b),`x):
Tree nv = new_var();
Tree nw = new_var();
Tree y = new_var();
Tree ex = embedB(x);
Tree ef = lift_var(v,nv,#<nth(`nv,0)>,embedB(b));
Tree p = prov(e,#<nth(`nw,0)>,#<nth(`nw,1)>);
return #<cmap(lambda(`nv,cmap(lambda(`nw,bag(`p)),`ef)),`ex)>;
case groupBy(`x):
Tree nv = new_var();
Tree ex = flip(embedB(x));
Tree val = #<tuple(nth(`nv,0),`(first(#<nth(`nv,1)>)))>;
Tree p = prov(e,val,second(#<nth(`nv,1)>));
return #<cmap(lambda(`nv,bag(`p)),groupBy(`ex))>;
case orderBy(`x):
Tree nv = new_var();
Tree ex = flip(embedB(x));
Tree val = #<tuple(nth(`nv,0),`(first(#<nth(`nv,1)>)))>;
Tree p = prov(e,val,second(#<nth(`nv,1)>));
return #<cmap(lambda(`nv,bag(`p)),orderBy(`ex))>;
case coGroup(`x,`y):
Tree nv = new_var();
Tree ex = flip(embedB(x));
Tree ey = flip(embedB(y));
Tree val = #<tuple(nth(`nv,0),tuple(`(first(#<nth(nth(`nv,1),0)>)),
`(first(#<nth(nth(`nv,1),1)>))))>;
Tree p = prov(e,val,#[`(second(#<nth(nth(`nv,1),0)>)),
`(second(#<nth(nth(`nv,1),1)>))]);
return #<cmap(lambda(`nv,bag(`p)),coGroup(`ex,`ey))>;
case call(source,...):
Tree nv = new_var();
Tree p = prov(e,nv,#[ ]);
return #<cmap(lambda(`nv,bag(`p)),`e)>;
case bag(...as):
Trees es = #[ ];
for ( Tree a: as )
es = es.append(embedP(a));
return #<bag(...es)>;
case nth(`x,`n):
Tree nv = new_var();
Tree nw = new_var();
Tree ex = embedP(x);
Tree p = prov(e,nv,#<nth(`nw,1)>);
return #<let(`nw,`ex,cmap(lambda(`nv,bag(`p)),
nth(nth(`nw,0),`n)))>;
case project(`x,`a):
Tree nv = new_var();
Tree nw = new_var();
Tree ex = embedP(x);
Tree p = prov(e,nv,#<nth(`nw,1)>);
return #<let(`nw,`ex,cmap(lambda(`nv,bag(`p)),
project(nth(`nw,0),`a)))>;
case if(`pred,`x,`y):
Tree nv = new_var();
Tree ep = embedP(pred);
Tree ex = embedB(x);
Tree ey = embedB(y);
return #<let(`nv,tuple(`ep,`ex,`ey),
if(nth(nth(`nv,0),0),nth(`nv,1),nth(`nv,2)))>;
case `v:
if (v.is_variable())
if (Interpreter.repeat_variables.member(v))
return v;
else if (Interpreter.lookup_global_binding(v.toString()) != null) {
Tree nv = new_var();
Tree p = prov(v,nv,#[ ]);
return #<cmap(lambda(`nv,bag(`p)),`v)>;
} else return v;
};
match TypeInference.type_inference(e) {
case `T(_):
if (!is_collection(T))
fail;
Tree nv = new_var();
Tree p = prov(e,nv,#[]);
return #<cmap(lambda(`nv,bag(`p)),`e)>;
};
return embedP(e);
}
/** Lift the expression e of type t to (t,provenance) */
public static Tree embedP ( Tree e ) {
match e {
case reduce(`m,`x):
Tree nv = new_var();
Tree ex = embedB(x);
Tree p = prov(e,#<reduce(`m,`(first(nv)))>,second(nv));
return #<Let(`nv,`ex,`p)>;
case tuple(...as):
Tree nv = new_var();
Trees es = #[ ];
Trees vs = #[ ];
Trees ps = #[ ];
int i = 0;
for ( Tree a: as ) {
es = es.append(embedP(a));
vs = vs.append(#<nth(nth(`nv,`i),0)>);
ps = ps.append(#<nth(nth(`nv,`i),1)>);
i++;
};
Tree p = prov(e,#<tuple(...vs)>,ps);
return #<let(`nv,tuple(...es),`p)>;
case record(...as):
Tree nv = new_var();
Trees es = #[ ];
Trees vs = #[ ];
Trees ps = #[ ];
int i = 0;
for ( Tree a: as )
match a {
case bind(`v,`b):
es = es.append(embedP(b));
vs = vs.append(#<bind(`v,nth(nth(`nv,`i),0))>);
ps = ps.append(#<nth(nth(`nv,`i),1)>);
i++;
};
Tree p = prov(e,#<record(...vs)>,ps);
return #<let(`nv,tuple(...es),`p)>;
case call(`f,...as):
Tree nv = new_var();
Trees es = #[ ];
Trees vs = #[ ];
Trees ps = #[ ];
int i = 0;
for ( Tree a: as ) {
es = es.append(embedP(a));
vs = vs.append(#<nth(nth(`nv,`i),0)>);
ps = ps.append(#<nth(nth(`nv,`i),1)>);
i++;
};
Tree p = prov(e,#<call(`f,...vs)>,ps);
return #<let(`nv,tuple(...es),`p)>;
case nth(`x,`n):
Tree nv = new_var();
Tree ex = embedP(x);
Tree p = prov(e,#<nth(nth(`nv,0),`n)>,#<nth(`nv,1)>);
return #<let(`nv,`ex,`p)>;
case project(`x,`a):
Tree nv = new_var();
Tree ex = embedP(x);
Tree p = prov(e,#<project(nth(`nv,0),`a)>,#<nth(`nv,1)>);
return #<let(`nv,`ex,`p)>;
case if(`pred,`x,`y):
Tree nv = new_var();
Tree ep = embedP(pred);
Tree ex = embedP(x);
Tree ey = embedP(y);
Tree p = prov(e,#<if(nth(nth(`nv,0),0),nth(nth(`nv,1),0),nth(nth(`nv,2),0))>,
#[nth(nth(`nv,0),1),nth(nth(`nv,1),1),nth(nth(`nv,2),1)]);
return #<let(`nv,tuple(`ep,`ex,`ey),`p)>;
case typed(`u,_):
return embedP(u);
case index(`x,`n):
Tree ex = embedB(x);
return #<index(`ex,`n)>;
case true: return prov(e,e,#[ ]);
case false: return prov(e,e,#[ ]);
case `v:
if (v.is_variable())
if (Interpreter.lookup_global_binding(v.toString()) != null)
return prov(e,e,#[ ]);
else return v;
};
match TypeInference.type_inference(e) {
case `T(_):
if (!is_collection(T))
fail;
Tree nv = new_var();
Tree ex = embedB(e);
Tree p = prov(e,first(nv),second(nv));
return #<Let(`nv,`ex,`p)>;
};
return prov(e,e,#[ ]);
}
/** Lift the expression e to an expression with provenance annotations */
public static Tree embed_provenance ( Tree e, boolean fine_grained ) {
fine_grain = fine_grained;
exprs = #[ ];
Tree ne = SimplifyTerm(normalize_term(e));
Tree tp = TypeInference.type_inference(ne);
ne = SimplifyTerm(embed_missing_cmaps(ne));
TypeInference.type_inference(ne);
match TypeInference.type_inference(e) {
case `T(_):
if (!is_collection(T))
fail;
ne = SimplifyTerm(embedB(ne));
case _: ne = SimplifyTerm(embedP(ne));
};
ne = SimplifyTerm(convert_to_algebra(ne));
TypeInference.type_inference(ne);
return #<provenance(`ne,`tp,...exprs)>;
}
private static boolean member ( Bag s, MRData x ) {
for ( MRData e: s )
if (e.equals(x))
return true;
return false;
}
/** Collect the provenance leaves (the data sources that contribute to the output) into a bag */
private static Bag collect_lineage ( MRData value ) {
if (value instanceof Tuple) {
Tuple p = ((Tuple)value);
if (p.size() == 2)
match exprs.nth(((MR_int)p.get(0)).get()) {
case call(source,...):
return new Bag(p.get(1));
};
Bag s = new Bag();
for ( int i = 2; i < p.size() && s.size() < Config.max_bag_size_print; i++ )
for ( MRData e: collect_lineage(p.get(i)) )
if (!member(s,e))
s.add(e);
return s;
} else if (value instanceof Bag) {
Bag s = new Bag();
((Bag)value).materialize();
for ( MRData e: (Bag)value )
for ( MRData x: collect_lineage(e) )
if (s.size() < Config.max_bag_size_print && !member(s,x))
s.add(x);
return s;
} else return new Bag();
}
/** Print the the data sources that contribute to the output to the output */
public static void display ( MRData value, Tree tp, Trees prov_exprs ) {
exprs = prov_exprs;
match tp {
case `T(`etp):
if (!is_collection(T))
fail;
if (value instanceof Bag)
for ( MRData e: (Bag)value ) {
System.out.println(Printer.print(((Tuple)e).get(0),etp));
System.out.println(" <- "+collect_lineage(((Tuple)e).get(1)));
} else if (value instanceof MR_dataset)
for ( MRData e: ((MR_dataset)value).dataset().take(Config.max_bag_size_print) ) {
System.out.println(Printer.print(((Tuple)e).get(0),etp));
System.out.println(" <- "+collect_lineage(((Tuple)e).get(1)));
}
case _:
System.out.println(Printer.print(((Tuple)value).get(0),tp));
System.out.println(" <- "+collect_lineage(((Tuple)value).get(1)));
}
}
/** Return the query result without the provenance annotations */
public static MRData getValue ( MRData value ) {
if (value instanceof Bag) {
final Iterator<MRData> i = ((Bag)value).iterator();
return new Bag(new BagIterator() {
public MRData next () { return ((Tuple)i.next()).get(0); }
public boolean hasNext() { return i.hasNext(); }
});
};
return ((Tuple)value).get(0);
}
}