[MRQL-93] Support query evaluation tracing and how-provenance
diff --git a/conf/mrql-env.sh b/conf/mrql-env.sh
index 6462949..ed891e1 100644
--- a/conf/mrql-env.sh
+++ b/conf/mrql-env.sh
@@ -34,9 +34,13 @@
# Required: The Java installation directory
-if [ ! -f ${JAVA_HOME}) ]; then
+if [ -z ${JAVA_HOME} ]; then
export JAVA_HOME=/usr/lib/jvm/java-8-oracle
fi
+if [ ! -e ${JAVA_HOME} ]; then
+ echo "*** Non-existent JAVA_HOME"
+ exit -1
+fi
# Required: The CUP parser library
# You can download it from http://www2.cs.tum.edu/projects/cup/
@@ -49,7 +53,7 @@
# Hadoop configuration. Supports versions 1.x and 2.x (YARN)
# The Hadoop installation directory
-if [ ! -f ${HADOOP_HOME}) ]; then
+if [ -z ${HADOOP_HOME} ]; then
HADOOP_VERSION=2.7.1
HADOOP_HOME=${HOME}/hadoop-${HADOOP_VERSION}
fi
diff --git a/core/src/main/java/org/apache/mrql/AlgebraicOptimization.gen b/core/src/main/java/org/apache/mrql/AlgebraicOptimization.gen
index 7f1fbcc..0421e43 100644
--- a/core/src/main/java/org/apache/mrql/AlgebraicOptimization.gen
+++ b/core/src/main/java/org/apache/mrql/AlgebraicOptimization.gen
@@ -184,6 +184,8 @@
};
};
fail
+ case provenance(`x,...s):
+ return #<provenance(`(translate(x)),...s)>;
case `f(...al):
Trees bl = #[];
for ( Tree a: al )
diff --git a/core/src/main/java/org/apache/mrql/Compiler.gen b/core/src/main/java/org/apache/mrql/Compiler.gen
index 0c2ba3a..5dbaf24 100644
--- a/core/src/main/java/org/apache/mrql/Compiler.gen
+++ b/core/src/main/java/org/apache/mrql/Compiler.gen
@@ -421,6 +421,8 @@
for ( Tree x: el )
ret += ","+compileE(x);
return ret+"))";
+ case Lineage(...el):
+ return compileE(#<tuple(...el)>);
case tagged_union(`n,`u):
return "(new Union((byte)"+((LongLeaf)n).value()+","+compileE(u)+"))";
case union_value(`x):
diff --git a/core/src/main/java/org/apache/mrql/Config.java b/core/src/main/java/org/apache/mrql/Config.java
index 53db404..19069f3 100644
--- a/core/src/main/java/org/apache/mrql/Config.java
+++ b/core/src/main/java/org/apache/mrql/Config.java
@@ -89,6 +89,9 @@
public static int stream_tries = 100;
// if true and stream_window > 0, then incremental streaming
public static boolean incremental = false;
+ // if true, generate provenance tracing
+ public static boolean lineage = false;
+ public static boolean debug = false;
/** store the configuration parameters */
public static void write ( Configuration conf ) {
@@ -121,6 +124,8 @@
conf.setBoolean("mrql.info",info);
conf.setInt("mrql.stream.window",stream_window);
conf.setBoolean("mrql.incremental",incremental);
+ conf.setBoolean("mrql.lineage",lineage);
+ conf.setBoolean("mrql.debug",debug);
}
/** load the configuration parameters */
@@ -154,6 +159,8 @@
info = conf.getBoolean("mrql.info",info);
stream_window = conf.getInt("mrql.stream.window",stream_window);
incremental = conf.getBoolean("mrql.incremental",incremental);
+ lineage = conf.getBoolean("mrql.lineage",lineage);
+ debug = conf.getBoolean("mrql.debug",debug);
}
public static ArrayList<String> extra_args = new ArrayList<String>();
diff --git a/core/src/main/java/org/apache/mrql/Interpreter.gen b/core/src/main/java/org/apache/mrql/Interpreter.gen
index 600f5aa..e7dac63 100644
--- a/core/src/main/java/org/apache/mrql/Interpreter.gen
+++ b/core/src/main/java/org/apache/mrql/Interpreter.gen
@@ -613,6 +613,26 @@
}
});
return new Bag();
+ case provenance(`x,`tp,...s):
+ MRData value = evalE(x,env);
+ Provenance.display(value,tp,#[...s]);
+ return new Tuple(0);
+ case Provenance(`x,`tp,...s):
+ MRData value = (Config.hadoop_mode)
+ ? new MR_dataset(Evaluator.evaluator.eval(x,env,"-"))
+ : evalS(x,env);
+ Provenance.display(value,tp,#[...s]);
+ return new Tuple(0);
+ case Lineage(...as):
+ return evalEE(#<tuple(...as)>,env);
+ case Let(`v,`x,`y):
+ if (Config.hadoop_mode)
+ return evalEE(y,new Environment(v.toString(),
+ new MR_dataset(Evaluator.evaluator.eval(x,env,"-")),
+ env));
+ Bag b = evalS(x,env);
+ b.materialize();
+ return evalEE(y,new Environment(v.toString(),b,env));
case _:
try {
if (Config.hadoop_mode)
@@ -889,6 +909,15 @@
System.out.println("Algebraic type: "+print_type(pt));
if (Config.stream_window > 0 && Config.incremental)
ne = Streaming.generate_incremental_code(ne);
+ else if (Config.lineage) {
+ ne = Provenance.embed_provenance(ne,false);
+ if (Config.trace)
+ System.out.println("After provenance injection:\n"+ne.pretty(0));
+ } else if (Config.debug) {
+ ne = Provenance.embed_provenance(ne,true);
+ if (Config.trace)
+ System.out.println("After provenance injection:\n"+ne.pretty(0));
+ };
ne = AlgebraicOptimization.translate_all(ne);
if (Config.trace)
System.out.println("Translated expression:\n"+ne.pretty(0));
@@ -941,3 +970,4 @@
}
}
}
+
diff --git a/core/src/main/java/org/apache/mrql/PlanGeneration.gen b/core/src/main/java/org/apache/mrql/PlanGeneration.gen
index a42380c..fdb148f 100644
--- a/core/src/main/java/org/apache/mrql/PlanGeneration.gen
+++ b/core/src/main/java/org/apache/mrql/PlanGeneration.gen
@@ -744,6 +744,11 @@
};
body = makePlan(body);
return #<function(tuple(...params),`outp,`body)>;
+ case provenance(`x,...s):
+ Tree px = makePlan(x);
+ if (is_dataset_expr(x))
+ return #<Provenance(`px,...s)>;
+ else return #<provenance(`px,...s)>;
case `f(...al):
Trees bl = #[];
for ( Tree a: al )
diff --git a/core/src/main/java/org/apache/mrql/Printer.gen b/core/src/main/java/org/apache/mrql/Printer.gen
index 03fc28e..ee651c2 100644
--- a/core/src/main/java/org/apache/mrql/Printer.gen
+++ b/core/src/main/java/org/apache/mrql/Printer.gen
@@ -381,6 +381,8 @@
case Merge(`x,`y):
return "Merge:\n"+tab(n+3)+"left: "+print_plan(x,n+9,true)+"\n"
+tab(n+3)+"right: "+print_plan(y,n+10,true);
+ case Provenance(`x,...):
+ return print_plan(x,n,pv);
case BSP(_,_,_,_,...ds):
String ret = "BSP:\n";
for ( Tree d: ds )
diff --git a/core/src/main/java/org/apache/mrql/Provenance.gen b/core/src/main/java/org/apache/mrql/Provenance.gen
new file mode 100644
index 0000000..513d629
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/Provenance.gen
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.util.Iterator;
+import org.apache.mrql.gen.*;
+
+
+/** Embeds provenance information to queries for debugging and tracing */
+public class Provenance extends Streaming {
+ // set it to true for fine-grained provenance
+ static boolean fine_grain = false;
+
+ /** map {((k,v),p)} to {(k,(v,p))} */
+ private static Tree flip ( Tree e ) {
+ Tree nv = new_var();
+ return #<cmap(lambda(`nv,bag(tuple(nth(nth(`nv,0),0),
+ tuple(nth(nth(`nv,0),1),
+ nth(`nv,1))))),
+ `e)>;
+ }
+
+ /** map {((v,b),p)} to {((v,p),b)} */
+ private static Tree flipr ( Tree e ) {
+ Tree nv = new_var();
+ return #<cmap(lambda(`nv,bag(tuple(tuple(nth(nth(`nv,0),0),
+ nth(`nv,1)),
+ nth(nth(`nv,0),1)))),
+ `e)>;
+ }
+
+ /** map {(v,p)} to {v} */
+ private static Tree first ( Tree e ) {
+ Tree nv = new_var();
+ return #<cmap(lambda(`nv,bag(nth(`nv,0))),`e)>;
+ }
+
+ /** map {(v,p)} to {p} */
+ private static Tree second ( Tree e ) {
+ Tree nv = new_var();
+ return #<cmap(lambda(`nv,bag(nth(`nv,1))),`e)>;
+ }
+
+ /** The nodes of the query AST */
+ private static Trees exprs = #[];
+
+ /** Construct a provenance tuple
+ * @param expr the AST that corresponds to this value
+ * @param value the value
+ * @param provenance the input provenance of this value
+ * @return a provenance tuple
+ */
+ private static Tree prov ( Tree expr, Tree value, Trees provenance ) {
+ exprs = exprs.append(expr);
+ int loc = exprs.length()-1;
+ Tree nv = new_var();
+ return #<let(`nv,`value,tuple(`nv,Lineage(`loc,`nv,...provenance)))>;
+ }
+
+ private static Tree prov ( Tree expr, Tree value, Tree provenance ) {
+ return prov(expr,value,#[`provenance]);
+ }
+
+ private static Tree lift_var ( Tree var, Tree nvar, Tree fvar, Tree e ) {
+ match e {
+ case cmap(`f,`x):
+ if (fine_grain)
+ fail;
+ // don't lift the cmap function in coarse-grained provenance
+ Tree nf = subst(var,fvar,f);
+ Tree nx = lift_var(var,nvar,fvar,x);
+ return #<cmap(`nf,`nx)>;
+ case `f(...as):
+ Trees bs = #[ ];
+ for ( Tree a: as )
+ bs = bs.append(lift_var(var,nvar,fvar,a));
+ return #<`f(...bs)>;
+ };
+ return (e.equals(var)) ? nvar : e;
+ }
+
+ /** Lift the expression e of type {t} to {(t,{provenance})} */
+ private static Tree embedB ( Tree e ) {
+ match e {
+ case repeat(lambda(`v,`u),`x,`n):
+ Tree nv = new_var();
+ Tree nn = new_var();
+ Tree ex = embedB(x);
+ Tree ef = lift_var(v,nv,#<cmap(lambda(`nn,bag(nth(`nn,0))),`nv)>,
+ flipr(embedB(u)));
+ return #<repeat(lambda(`nv,`ef),`ex,`n)>;
+ case cmap(lambda(`v,`b),`x):
+ if (fine_grain)
+ fail;
+ Tree nv = new_var();
+ Tree nw = new_var();
+ Tree ex = embedB(x);
+ Tree nb = subst(v,#<nth(`nv,0)>,b);
+ return #<cmap(lambda(`nv,cmap(lambda(`nw,bag(tuple(`nw,nth(`nv,1)))),
+ `nb)),
+ `ex)>;
+ case cmap(lambda(`v,`b),`x):
+ Tree nv = new_var();
+ Tree nw = new_var();
+ Tree y = new_var();
+ Tree ex = embedB(x);
+ Tree ef = lift_var(v,nv,#<nth(`nv,0)>,embedB(b));
+ Tree p = prov(e,#<nth(`nw,0)>,#<nth(`nw,1)>);
+ return #<cmap(lambda(`nv,cmap(lambda(`nw,bag(`p)),`ef)),`ex)>;
+ case groupBy(`x):
+ Tree nv = new_var();
+ Tree ex = flip(embedB(x));
+ Tree val = #<tuple(nth(`nv,0),`(first(#<nth(`nv,1)>)))>;
+ Tree p = prov(e,val,second(#<nth(`nv,1)>));
+ return #<cmap(lambda(`nv,bag(`p)),groupBy(`ex))>;
+ case orderBy(`x):
+ Tree nv = new_var();
+ Tree ex = flip(embedB(x));
+ Tree val = #<tuple(nth(`nv,0),`(first(#<nth(`nv,1)>)))>;
+ Tree p = prov(e,val,second(#<nth(`nv,1)>));
+ return #<cmap(lambda(`nv,bag(`p)),orderBy(`ex))>;
+ case coGroup(`x,`y):
+ Tree nv = new_var();
+ Tree ex = flip(embedB(x));
+ Tree ey = flip(embedB(y));
+ Tree val = #<tuple(nth(`nv,0),tuple(`(first(#<nth(nth(`nv,1),0)>)),
+ `(first(#<nth(nth(`nv,1),1)>))))>;
+ Tree p = prov(e,val,#[`(second(#<nth(nth(`nv,1),0)>)),
+ `(second(#<nth(nth(`nv,1),1)>))]);
+ return #<cmap(lambda(`nv,bag(`p)),coGroup(`ex,`ey))>;
+ case call(source,...):
+ Tree nv = new_var();
+ Tree p = prov(e,nv,#[ ]);
+ return #<cmap(lambda(`nv,bag(`p)),`e)>;
+ case bag(...as):
+ Trees es = #[ ];
+ for ( Tree a: as )
+ es = es.append(embedP(a));
+ return #<bag(...es)>;
+ case nth(`x,`n):
+ Tree nv = new_var();
+ Tree nw = new_var();
+ Tree ex = embedP(x);
+ Tree p = prov(e,nv,#<nth(`nw,1)>);
+ return #<let(`nw,`ex,cmap(lambda(`nv,bag(`p)),
+ nth(nth(`nw,0),`n)))>;
+ case project(`x,`a):
+ Tree nv = new_var();
+ Tree nw = new_var();
+ Tree ex = embedP(x);
+ Tree p = prov(e,nv,#<nth(`nw,1)>);
+ return #<let(`nw,`ex,cmap(lambda(`nv,bag(`p)),
+ project(nth(`nw,0),`a)))>;
+ case if(`pred,`x,`y):
+ Tree nv = new_var();
+ Tree ep = embedP(pred);
+ Tree ex = embedB(x);
+ Tree ey = embedB(y);
+ return #<let(`nv,tuple(`ep,`ex,`ey),
+ if(nth(nth(`nv,0),0),nth(`nv,1),nth(`nv,2)))>;
+ case `v:
+ if (v.is_variable())
+ if (Interpreter.repeat_variables.member(v))
+ return v;
+ else if (Interpreter.lookup_global_binding(v.toString()) != null) {
+ Tree nv = new_var();
+ Tree p = prov(v,nv,#[ ]);
+ return #<cmap(lambda(`nv,bag(`p)),`v)>;
+ } else return v;
+ };
+ match TypeInference.type_inference(e) {
+ case `T(_):
+ if (!is_collection(T))
+ fail;
+ Tree nv = new_var();
+ Tree p = prov(e,nv,#[]);
+ return #<cmap(lambda(`nv,bag(`p)),`e)>;
+ };
+ return embedP(e);
+ }
+
+ /** Lift the expression e of type t to (t,provenance) */
+ public static Tree embedP ( Tree e ) {
+ match e {
+ case reduce(`m,`x):
+ Tree nv = new_var();
+ Tree ex = embedB(x);
+ Tree p = prov(e,#<reduce(`m,`(first(nv)))>,second(nv));
+ return #<Let(`nv,`ex,`p)>;
+ case tuple(...as):
+ Tree nv = new_var();
+ Trees es = #[ ];
+ Trees vs = #[ ];
+ Trees ps = #[ ];
+ int i = 0;
+ for ( Tree a: as ) {
+ es = es.append(embedP(a));
+ vs = vs.append(#<nth(nth(`nv,`i),0)>);
+ ps = ps.append(#<nth(nth(`nv,`i),1)>);
+ i++;
+ };
+ Tree p = prov(e,#<tuple(...vs)>,ps);
+ return #<let(`nv,tuple(...es),`p)>;
+ case record(...as):
+ Tree nv = new_var();
+ Trees es = #[ ];
+ Trees vs = #[ ];
+ Trees ps = #[ ];
+ int i = 0;
+ for ( Tree a: as )
+ match a {
+ case bind(`v,`b):
+ es = es.append(embedP(b));
+ vs = vs.append(#<bind(`v,nth(nth(`nv,`i),0))>);
+ ps = ps.append(#<nth(nth(`nv,`i),1)>);
+ i++;
+ };
+ Tree p = prov(e,#<record(...vs)>,ps);
+ return #<let(`nv,tuple(...es),`p)>;
+ case call(`f,...as):
+ Tree nv = new_var();
+ Trees es = #[ ];
+ Trees vs = #[ ];
+ Trees ps = #[ ];
+ int i = 0;
+ for ( Tree a: as ) {
+ es = es.append(embedP(a));
+ vs = vs.append(#<nth(nth(`nv,`i),0)>);
+ ps = ps.append(#<nth(nth(`nv,`i),1)>);
+ i++;
+ };
+ Tree p = prov(e,#<call(`f,...vs)>,ps);
+ return #<let(`nv,tuple(...es),`p)>;
+ case nth(`x,`n):
+ Tree nv = new_var();
+ Tree ex = embedP(x);
+ Tree p = prov(e,#<nth(nth(`nv,0),`n)>,#<nth(`nv,1)>);
+ return #<let(`nv,`ex,`p)>;
+ case project(`x,`a):
+ Tree nv = new_var();
+ Tree ex = embedP(x);
+ Tree p = prov(e,#<project(nth(`nv,0),`a)>,#<nth(`nv,1)>);
+ return #<let(`nv,`ex,`p)>;
+ case if(`pred,`x,`y):
+ Tree nv = new_var();
+ Tree ep = embedP(pred);
+ Tree ex = embedP(x);
+ Tree ey = embedP(y);
+ Tree p = prov(e,#<if(nth(nth(`nv,0),0),nth(nth(`nv,1),0),nth(nth(`nv,2),0))>,
+ #[nth(nth(`nv,0),1),nth(nth(`nv,1),1),nth(nth(`nv,2),1)]);
+ return #<let(`nv,tuple(`ep,`ex,`ey),`p)>;
+ case typed(`u,_):
+ return embedP(u);
+ case index(`x,`n):
+ Tree ex = embedB(x);
+ return #<index(`ex,`n)>;
+ case true: return prov(e,e,#[ ]);
+ case false: return prov(e,e,#[ ]);
+ case `v:
+ if (v.is_variable())
+ if (Interpreter.lookup_global_binding(v.toString()) != null)
+ return prov(e,e,#[ ]);
+ else return v;
+ };
+ match TypeInference.type_inference(e) {
+ case `T(_):
+ if (!is_collection(T))
+ fail;
+ Tree nv = new_var();
+ Tree ex = embedB(e);
+ Tree p = prov(e,first(nv),second(nv));
+ return #<Let(`nv,`ex,`p)>;
+ };
+ return prov(e,e,#[ ]);
+ }
+
+ /** Lift the expression e to an expression with provenance annotations */
+ public static Tree embed_provenance ( Tree e, boolean fine_grained ) {
+ fine_grain = fine_grained;
+ exprs = #[ ];
+ Tree ne = SimplifyTerm(normalize_term(e));
+ Tree tp = TypeInference.type_inference(ne);
+ ne = SimplifyTerm(embed_missing_cmaps(ne));
+ TypeInference.type_inference(ne);
+ match TypeInference.type_inference(e) {
+ case `T(_):
+ if (!is_collection(T))
+ fail;
+ ne = SimplifyTerm(embedB(ne));
+ case _: ne = SimplifyTerm(embedP(ne));
+ };
+ ne = SimplifyTerm(convert_to_algebra(ne));
+ TypeInference.type_inference(ne);
+ return #<provenance(`ne,`tp,...exprs)>;
+ }
+
+ private static boolean member ( Bag s, MRData x ) {
+ for ( MRData e: s )
+ if (e.equals(x))
+ return true;
+ return false;
+ }
+
+ /** Collect the provenance leaves (the data sources that contribute to the output) into a bag */
+ private static Bag collect_lineage ( MRData value ) {
+ if (value instanceof Tuple) {
+ Tuple p = ((Tuple)value);
+ if (p.size() == 2)
+ match exprs.nth(((MR_int)p.get(0)).get()) {
+ case call(source,...):
+ return new Bag(p.get(1));
+ };
+ Bag s = new Bag();
+ for ( int i = 2; i < p.size() && s.size() < Config.max_bag_size_print; i++ )
+ for ( MRData e: collect_lineage(p.get(i)) )
+ if (!member(s,e))
+ s.add(e);
+ return s;
+ } else if (value instanceof Bag) {
+ Bag s = new Bag();
+ ((Bag)value).materialize();
+ for ( MRData e: (Bag)value )
+ for ( MRData x: collect_lineage(e) )
+ if (s.size() < Config.max_bag_size_print && !member(s,x))
+ s.add(x);
+ return s;
+ } else return new Bag();
+ }
+
+ /** Print the the data sources that contribute to the output to the output */
+ public static void display ( MRData value, Tree tp, Trees prov_exprs ) {
+ exprs = prov_exprs;
+ match tp {
+ case `T(`etp):
+ if (!is_collection(T))
+ fail;
+ if (value instanceof Bag)
+ for ( MRData e: (Bag)value ) {
+ System.out.println(Printer.print(((Tuple)e).get(0),etp));
+ System.out.println(" <- "+collect_lineage(((Tuple)e).get(1)));
+ } else if (value instanceof MR_dataset)
+ for ( MRData e: ((MR_dataset)value).dataset().take(Config.max_bag_size_print) ) {
+ System.out.println(Printer.print(((Tuple)e).get(0),etp));
+ System.out.println(" <- "+collect_lineage(((Tuple)e).get(1)));
+ }
+ case _:
+ System.out.println(Printer.print(((Tuple)value).get(0),tp));
+ System.out.println(" <- "+collect_lineage(((Tuple)value).get(1)));
+ }
+ }
+
+ /** Return the query result without the provenance annotations */
+ public static MRData getValue ( MRData value ) {
+ if (value instanceof Bag) {
+ final Iterator<MRData> i = ((Bag)value).iterator();
+ return new Bag(new BagIterator() {
+ public MRData next () { return ((Tuple)i.next()).get(0); }
+ public boolean hasNext() { return i.hasNext(); }
+ });
+ };
+ return ((Tuple)value).get(0);
+ }
+}
diff --git a/core/src/main/java/org/apache/mrql/Streaming.gen b/core/src/main/java/org/apache/mrql/Streaming.gen
index f54cad8..53034a9 100644
--- a/core/src/main/java/org/apache/mrql/Streaming.gen
+++ b/core/src/main/java/org/apache/mrql/Streaming.gen
@@ -20,7 +20,7 @@
import org.apache.mrql.gen.*;
/** Generates code for streaming queries */
-final public class Streaming extends AlgebraicOptimization {
+public class Streaming extends AlgebraicOptimization {
private static int istate = 0;
// set it to true to debug the monoid inference
private static boolean inference_tracing = false;
@@ -846,7 +846,7 @@
}
/** Convert coGroups back to joins and reduces to aggregations */
- private static Tree convert_to_algebra ( Tree e ) {
+ static Tree convert_to_algebra ( Tree e ) {
match e {
case coGroup(`x,`y):
Tree v = new_var();
@@ -1129,7 +1129,7 @@
}
/** Convert joins to coGroups, plus other transformations */
- private static Tree normalize_term ( Tree e ) {
+ static Tree normalize_term ( Tree e ) {
match e {
case join(`kx,`ky,`r,cmap(`f,`x),cmap(`g,`y)):
if (!x.equals(y))
@@ -1254,18 +1254,18 @@
}
/** Embed missing cmaps */
- private static Tree embed_missing_cmaps ( Tree e ) {
+ static Tree embed_missing_cmaps ( Tree e ) {
match e {
case project(`x,`a):
match TypeInference.type_inference(x) {
case `T(`tp):
if (!is_collection(T))
- fail;
+ return #<project(`(embed_missing_cmaps(x)),`a)>;
Tree v = new_var();
type_env.insert(v.toString(),tp);
return embed_missing_cmaps(#<cmap(lambda(`v,bag(project(`v,`a))),`x)>);
};
- fail
+ return #<project(`(embed_missing_cmaps(x)),`a)>;
case nth(`x,`n):
match TypeInference.type_inference(x) {
case `T(`tp):
@@ -1335,6 +1335,16 @@
if (v.equals(a))
return simplify_term(u);
};
+ case nth(let(`v,`w,tuple(...al)),`n):
+ if (!n.is_long())
+ fail;
+ int i = (int)n.longValue();
+ if (i >= 0 && i < al.length())
+ return #<let(`v,`w,`(simplify_term(al.nth(i))))>;
+ case let(`v,`w,`x):
+ if (occurences(v,x) <= 1)
+ return subst_var(v,w,x);
+ fail
case `f(...as):
Trees bs = #[ ];
for ( Tree a: as )
diff --git a/core/src/main/java/org/apache/mrql/TopLevel.gen b/core/src/main/java/org/apache/mrql/TopLevel.gen
index 9c5ecfa..769099a 100644
--- a/core/src/main/java/org/apache/mrql/TopLevel.gen
+++ b/core/src/main/java/org/apache/mrql/TopLevel.gen
@@ -377,6 +377,25 @@
}
});
};
+ case lineage(`e):
+ Config.lineage = true;
+ boolean quiet = Config.quiet_execution;
+ Config.quiet_execution = true;
+ long t = System.currentTimeMillis();
+ if (expression(e,false) != null && !Config.quiet_execution)
+ System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs");
+ Config.lineage = false;
+ Config.quiet_execution = quiet;
+ case debug(`path,`e):
+ Config.debug = true;
+ boolean quiet = Config.quiet_execution;
+ Config.quiet_execution = true;
+ long t = System.currentTimeMillis();
+ dump(path.stringValue(),e);
+ if (!Config.quiet_execution)
+ System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs");
+ Config.debug = false;
+ Config.quiet_execution = quiet;
case dump(`s,`e):
long t = System.currentTimeMillis();
dump(s.stringValue(),e);
diff --git a/core/src/main/java/org/apache/mrql/Translator.gen b/core/src/main/java/org/apache/mrql/Translator.gen
index 2d9d2fe..01db2f3 100644
--- a/core/src/main/java/org/apache/mrql/Translator.gen
+++ b/core/src/main/java/org/apache/mrql/Translator.gen
@@ -179,7 +179,7 @@
= #[MapReduce,MapAggregateReduce,MapCombineReduce,FroupByJoin,Aggregate,
MapReduce2,MapCombineReduce2,MapAggregateReduce2,MapJoin,MapAggregateJoin,
CrossProduct,CrossAggregateProduct,cMap,AggregateMap,BSP,GroupByJoin,
- OuterMerge,RightOuterMerge];
+ OuterMerge,RightOuterMerge,Provenance];
static Trees algebraic_operators
= #[mapReduce,mapReduce2,cmap,join,groupBy,orderBy,aggregate,map,filter,repeat,closure];
diff --git a/core/src/main/java/org/apache/mrql/TypeInference.gen b/core/src/main/java/org/apache/mrql/TypeInference.gen
index f6769f4..60ca56c 100644
--- a/core/src/main/java/org/apache/mrql/TypeInference.gen
+++ b/core/src/main/java/org/apache/mrql/TypeInference.gen
@@ -1199,6 +1199,19 @@
};
case cstep(`x): // used in QueryPlan for closure
return type_inference(x);
+ case provenance(`x,...exprs): // used in Provenance generation
+ match type_inference(x) {
+ case `S(tuple(`tp,_)):
+ if (!is_collection(S))
+ fail;
+ return #<`S(`tp)>;
+ case tuple(`tp,_):
+ return tp;
+ };
+ type_error(e,"Wrong provenance injection");
+ case Lineage(...as): // used in Provenance generation
+ type_inference(#<tuple(...as)>);
+ return #<lineage>;
case outerMerge(lambda(`v,`b),`s,`d):
Tree ts = type_inference(s);
if (unify(type_inference(d),ts) == null)
diff --git a/core/src/main/java/org/apache/mrql/mrql.cgen b/core/src/main/java/org/apache/mrql/mrql.cgen
index 87245a0..6e09902 100644
--- a/core/src/main/java/org/apache/mrql/mrql.cgen
+++ b/core/src/main/java/org/apache/mrql/mrql.cgen
@@ -34,7 +34,8 @@
sym.SOME, sym.ALL, sym.GTR, sym.SEP, sym.STORE, sym.DUMP, sym.TYPE, sym.DATA, sym.REPEAT,
sym.STEP, sym.LIMIT, sym.LET, sym.ATSYM, sym.EXCLAMATION,
sym.Variable, sym.Integer, sym.Double, sym.String, sym.Decimal,
- sym.START_TEMPLATE, sym.END_TEMPLATE, sym.TEXT, sym.TRACE, sym.INCR
+ sym.START_TEMPLATE, sym.END_TEMPLATE, sym.TEXT, sym.TRACE, sym.INCR,
+ sym.LINEAGE, sym.DEBUG
};
static String[] token_names = {
@@ -49,7 +50,8 @@
"some", "all", ">", "|", "store", "dump", "type", "data", "repeat",
"step", "limit", "let", "@", "!",
"Variable", "Integer", "Double", "String", "Decimal",
- "[|", "|]", "Text", "trace", "incr"
+ "[|", "|]", "Text", "trace", "incr",
+ "lineage", "debug"
};
public static String print ( Symbol s ) {
@@ -98,7 +100,8 @@
UNION, INTERSECT, EXCEPT, EXISTS, IN, COMMA, DOT, COLON, ASSIGN, SEMI, WHERE,
ORDER, GROUP, BY, ASCENDING, DESCENDING, UMINUS, FUNCTION, DISTINCT, BSLASH,
SOME, ALL, GTR, SEP, STORE, TYPE, DATA, CASE, ATSYM, XPATH, REPEAT, STEP, LIMIT,
- LET, IMPORT, PARSER, AGGREGATION, INCLUDE, EXCLAMATION, MACRO, DUMP, TRACE, INCR;
+ LET, IMPORT, PARSER, AGGREGATION, INCLUDE, EXCLAMATION, MACRO, DUMP, TRACE, INCR,
+ LINEAGE, DEBUG;
terminal String Variable;
terminal Long Integer;
@@ -143,6 +146,8 @@
| STORE String:s FROM expr:e {: RESULT = #<dump(`(new StringLeaf(s)),`e)>; :}
| DUMP String:s FROM expr:e {: RESULT = #<dump_text(`(new StringLeaf(s)),`e)>; :}
| INCR expr:e {: RESULT = #<incr(`e)>; :}
+ | LINEAGE expr:e {: RESULT = #<lineage(`e)>; :}
+ | DEBUG String:s expr:e {: RESULT = #<debug(`s,`e)>; :}
| TYPE var:v EQ type:t {: RESULT = #<typedef(`v,`t)>; :}
| DATA var:v EQ data_binds:nl {: RESULT = #<datadef(`v,union(...nl))>; :}
| FUNCTION var:f LP
diff --git a/core/src/main/java/org/apache/mrql/mrql.lex b/core/src/main/java/org/apache/mrql/mrql.lex
index 2f6cb54..868eb05 100644
--- a/core/src/main/java/org/apache/mrql/mrql.lex
+++ b/core/src/main/java/org/apache/mrql/mrql.lex
@@ -184,6 +184,8 @@
<YYINITIAL> "include" { return symbol(sym.INCLUDE); }
<YYINITIAL> "aggregation" { return symbol(sym.AGGREGATION); }
<YYINITIAL> "trace" { return symbol(sym.TRACE); }
+<YYINITIAL> "lineage" { return symbol(sym.LINEAGE); }
+<YYINITIAL> "debug" { return symbol(sym.DEBUG); }
<YYINITIAL> {ID} { return symbol(sym.Variable,yytext()); }
diff --git a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
index f66d422..61072a3 100644
--- a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
+++ b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
@@ -895,6 +895,9 @@
if (x != null)
if (x instanceof MR_flink)
return ((MR_flink)x).flink();
+ else if (x instanceof MR_dataset)
+ return dataset(new Bag(((MR_dataset)x).dataset.take(Integer.MAX_VALUE)))
+ .map(new restore_global_functions());
else new Error("Variable "+v+" is of type: "+x);
x = variable_lookup(v.toString(),global_env);
if (x != null)