| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.mrql; |
| |
| import org.apache.mrql.gen.*; |
| import java.util.List; |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.io.PrintStream; |
| |
| |
| /** The MRQL interpreter */ |
| public class Interpreter extends TypeInference { |
| |
| public final static Tree identity_mapper = #<lambda(x,bag(x))>; |
| public static boolean debug_mode = false; |
| |
| protected static Environment global_env = null; |
| |
| /** retrieve variable binding */ |
| public final static MRData variable_lookup ( final String v, final Environment environment ) { |
| for ( Environment env = environment; env != null; env = env.next ) { |
| if (v.equals(env.name)) |
| return env.value; |
| }; |
| return null; |
| } |
| |
| /** insert a new global variable binding */ |
| public final static void new_global_binding ( final String var, final MRData value ) { |
| if (value instanceof Bag) |
| ((Bag)value).materialize(); |
| global_env = new Environment(var,value,global_env); |
| } |
| |
| /** remove a global variable binding */ |
| public static void remove_global_binding ( String v ) { |
| if (global_env == null) |
| return; |
| for ( Environment env = global_env; env.next != null; env = env.next ) |
| if (v.equals(env.next.name)) |
| env.next = env.next.next; |
| if (global_env.name == v) |
| global_env = global_env.next; |
| } |
| |
| /** retrieve a global variable binding */ |
| public static MRData lookup_global_binding ( String v ) { |
| for ( Environment env = global_env; env != null; env = env.next ) |
| if (v.equals(env.name) && env.value != null) |
| return env.value; |
| return lookup_distributed_binding(v); |
| } |
| |
| public static void set_global_bindings ( Environment env ) { |
| global_env = env; |
| } |
| |
| /** insert a new global variable binding visible to all nodes */ |
| public final static void new_distributed_binding ( final String var, final MRData value ) { |
| if (value == null) |
| return; |
| new_global_binding(var,value); |
| if (Config.hadoop_mode) { // insert the binding in the hadoop configuration |
| String gvs = Plan.conf.get("mrql.global.vars"); |
| if (gvs == null) |
| gvs = ""; |
| gvs = var+":"+reify(value)+"@"+gvs; |
| Plan.conf.set("mrql.global.vars",gvs); |
| } |
| } |
| |
| /** retrieve a global variable binding available to all nodes */ |
| public static MRData lookup_distributed_binding ( String v ) { |
| try { |
| if (Config.hadoop_mode) { // find the binding in the hadoop configuration |
| String gvs = Plan.conf.get("mrql.global.vars"); |
| if (gvs == null) |
| return null; |
| for ( String s: gvs.split("@") ) { |
| String[] b = s.split(":"); |
| if (b.length == 2 && b[0].equals(v)) |
| return evalE(Tree.parse(b[1])); |
| } |
| } |
| } catch (Exception ex) {}; |
| return null; |
| } |
| |
| final static int coerce_method = ClassImporter.find_method_number("coerce",#[any,int]); |
| |
| /** untyped reify: not type-correct but will not crash the run-time system */ |
| private final static Tree reify ( final MRData x ) { |
| if (x instanceof Bag) { |
| Bag b = (Bag)x; |
| Trees as = #[]; |
| for ( MRData e: b) |
| as = as.append(reify(e)); |
| return #<list(...as)>; |
| } else if (x instanceof Tuple) { |
| Tuple t = (Tuple)x; |
| Trees as = #[]; |
| for ( short i = 0; i < t.size(); i++ ) |
| as = as.append(reify(t.get(i))); |
| return #<tuple(...as)>; |
| } else if (x instanceof Union) { |
| Union u = (Union)x; |
| return #<tagged_union(`(u.tag()),`(reify(u.value())))>; |
| } else if (x instanceof MR_string) |
| return new StringLeaf(((MR_string)x).get()); |
| else if (x instanceof MR_short) |
| return #<callM(coerce,`coerce_method,`(((MR_short)x).get()),`(MRContainer.SHORT))>; |
| else if (x instanceof MR_int) |
| return #<`(((MR_int)x).get())>; |
| else if (x instanceof MR_long) |
| return #<callM(coerce,`coerce_method,`((int)((MR_long)x).get()),`(MRContainer.LONG))>; |
| else if (x instanceof MR_float) |
| return #<`(((MR_float)x).get())>; |
| else if (x instanceof MR_double) |
| return #<callM(coerce,`coerce_method,`((float)(((MR_double)x).get())),`(MRContainer.DOUBLE))>; |
| else if (x instanceof MR_bool) |
| return (((MR_bool)x).get()) ? #<true> : #<false>; |
| else if (x instanceof MR_dataset) { |
| Trees vs = #[ ]; |
| for ( MRData v: ((MR_dataset)x).dataset().take(Integer.MAX_VALUE)) |
| vs = vs.append(reify(v)); |
| return #<list(...vs)>; |
| }; |
| throw new Error("wrong MRData: "+x); |
| } |
| |
| private static boolean has_source ( Tree e ) { |
| match e { |
| case Generator(...): return true; |
| case BinarySource(...): return true; |
| case ParsedSource(...): return true; |
| case `f(...as): |
| for ( Tree a: as ) |
| if (has_source(a)) |
| return true; |
| }; |
| return false; |
| } |
| |
| /** evaluate an MRQL function in memory */ |
| private final static Function evalf ( final String v, |
| final Tree body, |
| final Environment env ) { |
| return new Function() { |
| final public MRData eval ( final MRData x ) { |
| return evalE(body,new Environment(v,x,env)); |
| } |
| }; |
| } |
| |
| /** evaluate an MRQL function in memory */ |
| public final static Function evalF ( Tree fnc, Environment env ) { |
| match fnc { |
| case compiled(`ln,`lm,...vars): |
| try { |
| return Compiler.compiled(Thread.currentThread().getContextClassLoader(),ln.toString()); |
| } catch (Exception ex) { |
| System.err.println("*** Unable to retrieve the compiled lambda: "+fnc); |
| return ((Lambda) evalE(lm)).lambda(); |
| } |
| case lambda(`v,`b): |
| return evalf(v.toString(),b,env); |
| case function(tuple(...params),`tp,`body): |
| String[] as = new String[params.length()]; |
| int i = 0; |
| for ( Tree param: params ) |
| match param { |
| case `bind(`v,_): |
| as[i++] = v.toString(); |
| }; |
| return evalT(as,body,env); |
| }; |
| throw new Error("Ill-formed lambda: "+fnc); |
| } |
| |
| /** evaluate an MRQL function in memory */ |
| private final static Function evalT ( final String[] params, |
| final Tree body, |
| final Environment env ) { |
| return new Function() { |
| final public MRData eval ( final MRData x ) { |
| Environment new_env = env; |
| for ( int i = 0; i < params.length; i++ ) |
| new_env = new Environment(params[i],((Tuple)x).get(i),new_env); |
| return evalE(body,new_env); |
| } |
| }; |
| } |
| |
| final static String true_name = #<true>.toString(); |
| final static String false_name = #<false>.toString(); |
| final static String null_name = #<null>.toString(); |
| final static MRData null_value = new Tuple(0); |
| final static MRData true_value = new MR_bool(true); |
| final static MRData false_value = new MR_bool(false); |
| |
| static int tab_count = -3; |
| |
| static long trace_count = 0; |
| |
| public static String tabs ( int n ) { |
| StringBuffer b = new StringBuffer(); |
| for ( int i = 0; i < n; i++) |
| b.append(' '); |
| return b.toString(); |
| } |
| |
| /** implements the DataSetCollect operation */ |
| public static Bag dataSetCollect ( String var ) { |
| MRData val = lookup_global_binding(var); |
| if (val instanceof Bag) |
| return (Bag)val; |
| else if (val instanceof MR_dataset) |
| return new Bag(((MR_dataset)val).dataset().take(Integer.MAX_VALUE)); |
| throw new Error("Cannot collect the MRQL value into a bag: "+val); |
| } |
| |
| public static long pre_trace ( String msg ) { |
| tab_count += 3; |
| trace_count++; |
| System.out.println(tabs(tab_count)+"*** "+trace_count+": "+msg); |
| return trace_count; |
| } |
| |
| public static MRData trace ( long count, Tree type, MRData value ) { |
| System.out.print(tabs(tab_count)+"--> "+count+": "); |
| System.out.println(Printer.print(value,type)); |
| tab_count -= 3; |
| return value; |
| } |
| |
| /** evaluate an MRQL expression in memory and print tracing info */ |
| final static MRData evalE ( final Tree e, final Environment env ) { |
| if (Config.trace_exp_execution) { |
| tab_count += 3; |
| System.out.println(tabs(tab_count)+print_query(e)); |
| }; |
| MRData res = evalEE(e,env); |
| if (Config.trace_exp_execution) { |
| System.out.println(tabs(tab_count)+"-> "+res); |
| tab_count -= 3; |
| }; |
| return res; |
| } |
| |
| /** evaluate an MRQL expression in memory */ |
| private final static MRData evalEE ( final Tree e, final Environment env ) { |
| try { |
| if (e.is_variable()) { |
| String v = e.toString(); |
| if (v == true_name) |
| return true_value; |
| else if (v == false_name) |
| return false_value; |
| else if (v == null_name) |
| return null_value; |
| MRData x = variable_lookup(v,env); |
| if (x != null) |
| return x; |
| x = lookup_global_binding(v); |
| if (x == null) |
| throw new Error("Variable "+v+" is not bound"); |
| return x; |
| } else if (e.is_long()) |
| return new MR_int((int)e.longValue()); |
| else if (e.is_double()) |
| return new MR_double(e.doubleValue()); |
| else if (e.is_string()) |
| return new MR_string(e.stringValue()); |
| match e { |
| case callM(and,_,`x,`y): // lazy |
| return (((MR_bool)evalE(x,env)).get()) ? evalE(y,env) : false_value; |
| case callM(or,_,`x,`y): |
| return (((MR_bool)evalE(x,env)).get()) ? true_value : evalE(y,env); |
| case callM(coerce,_,`x,`n): |
| if (!x.is_long() || n.longValue() != MRContainer.LONG) |
| fail; |
| return new MR_long(x.longValue()); |
| case callM(coerce,_,`x,`n): |
| if (!x.is_double() || n.longValue() != MRContainer.FLOAT) |
| fail; |
| return new MR_float((float)x.doubleValue()); |
| case callM(`f,`n,...args): // internal function call |
| MRData[] as = new MRData[args.length()]; |
| for ( int i = 0; i < args.length(); i++ ) |
| as[i] = evalE(args.nth(i),env); |
| return ClassImporter.call((int)n.longValue(),as); |
| case compiled(`ln,_,...vars): |
| return new Lambda(Compiler.compiled(Thread.currentThread().getContextClassLoader(),ln.toString())); |
| case lambda(`v,`body): |
| return new Lambda(evalf(v.toString(),body,env)); |
| case nth(`x,`n): |
| return ((Tuple)evalE(x,env)).get((int)n.longValue()); |
| case setNth(`x,`n,`v,`ret): |
| return ((Tuple)evalE(x,env)).set((int)n.longValue(),evalE(v,env),evalE(ret,env)); |
| case materialize(`u): |
| return MapReduceAlgebra.materialize(evalE(u,env)); |
| case let(`v,DataSetCollect(`s),`body): |
| MRData x = evalE(s,env); |
| if (x instanceof MR_dataset) |
| x = new Bag(((MR_dataset)x).dataset().take(Integer.MAX_VALUE)); |
| else if (x instanceof Bag) |
| ((Bag)x).materialize(); |
| new_distributed_binding(v.toString(),x); |
| return evalE(body,new Environment(v.toString(),x,env)); |
| case let(`v,`u,`body): |
| MRData x = evalE(u,env); |
| if (x instanceof Bag) |
| ((Bag)x).materialize(); |
| return evalE(body,new Environment(v.toString(),x,env)); |
| case cmap(`f,`s): |
| return MapReduceAlgebra.cmap(evalF(f,env),(Bag)evalE(s,env)); |
| case filter(`p,`m,`s): |
| return MapReduceAlgebra.filter(evalF(p,env),evalF(m,env),(Bag)evalE(s,env)); |
| case map(`m,`s): |
| return MapReduceAlgebra.map(evalF(m,env),(Bag)evalE(s,env)); |
| case repeat(...): |
| if (!Config.flink_mode || !Config.hadoop_mode || !has_source(e)) |
| fail; |
| // send Flink repetitions to the Flink evaluator (they need Flink iterations) |
| return Evaluator.evaluator.aggregate(#<0>,#<0>,e,env); |
| case repeat(lambda(`v,`b),`s,`n): |
| final String nm = v.toString(); |
| final Tree body = b; |
| Function loop_fnc = new Function () { |
| public MRData eval ( MRData s ) { |
| new_distributed_binding(nm,s); |
| return evalE(body,new Environment(nm,s,env)); |
| }; }; |
| MRData init = evalE(s,env); |
| if (init instanceof MR_dataset) { |
| Bag vs = new Bag(); |
| for ( MRData dv: ((MR_dataset)init).dataset().take(Integer.MAX_VALUE)) |
| vs.add(dv); |
| init = vs; |
| }; |
| return MapReduceAlgebra.repeat(loop_fnc,(Bag)init,((MR_int)evalE(n,env)).get()); |
| case closure(lambda(`v,`b),`s,`n): |
| final String nm = v.toString(); |
| final Tree body = b; |
| Function loop_fnc = new Function () { |
| public MRData eval ( MRData s ) { |
| new_distributed_binding(nm,s); |
| return evalE(body,new Environment(nm,s,env)); |
| }; }; |
| return MapReduceAlgebra.closure(loop_fnc,(Bag)evalE(s,env),((MR_int)evalE(n,env)).get()); |
| case loop(lambda(tuple(...vs),`b),`s,`n): |
| final Trees nvs = vs; |
| final Tree body = b; |
| Function loop_fnc = new Function () { |
| public MRData eval ( MRData s ) { |
| Tuple t = (Tuple)s; |
| Environment nenv = env; |
| for ( int i = 0; i < nvs.length(); i++ ) { |
| new_distributed_binding(nvs.nth(i).toString(),t.get(i)); |
| nenv = new Environment(nvs.nth(i).toString(),t.get(i),nenv); |
| }; |
| return evalE(body,nenv); |
| }; }; |
| return MapReduceAlgebra.loop(loop_fnc,(Tuple)evalE(s,env),((MR_int)evalE(n,env)).get()); |
| case range(`min,`max): |
| return MapReduceAlgebra.generator(((MR_long)evalE(min,env)).get(), |
| ((MR_long)evalE(max,env)).get()); |
| case trace(`msg,`tp,`x): |
| String m = ((MR_string)evalE(msg,env)).get(); |
| return trace(pre_trace(m),tp,evalE(x,env)); |
| case call(`f,...args): |
| Tuple t = new Tuple(args.length()); |
| int i = 0; |
| for ( Tree a: args ) |
| t.set(i++,evalE(a,env)); |
| return evalF(f,env).eval(t); |
| case tuple(`x,`y): |
| return new Tuple(evalE(x,env),evalE(y,env)); |
| case tuple(`x,`y,`z): |
| return new Tuple(evalE(x,env),evalE(y,env),evalE(z,env)); |
| case tuple(...el): |
| Tuple t = new Tuple(el.length()); |
| int i = 0; |
| for ( Tree a: el ) |
| t.set(i++,evalE(a,env)); |
| return t; |
| case tagged_union(`n,`u): |
| return new Union((byte)n.longValue(),evalE(u,env)); |
| case union_value(`x): |
| return ((Union)evalE(x,env)).value(); |
| case union_tag(`x): |
| return new MR_int(((Union)evalE(x,env)).tag()); |
| // used for shortcutting sync in bsp supersteps |
| case BAG(): |
| return SystemFunctions.bsp_empty_bag; |
| case TRUE(): |
| return SystemFunctions.bsp_true_value; |
| case FALSE(): |
| return SystemFunctions.bsp_false_value; |
| case `T(...el): |
| if (!is_collection(T)) |
| fail; |
| if (el.is_empty()) |
| return new Bag(); |
| Bag b = new Bag(el.length()); |
| for ( Tree a: el ) |
| b.add(evalE(a,env)); |
| return b; |
| case if(`c,`x,`y): |
| if (((MR_bool)evalE(c,env)).get()) |
| return evalE(x,env); |
| else return evalE(y,env); |
| case Collect(`s): |
| try { |
| if (Config.hadoop_mode) |
| return Plan.collect(Evaluator.evaluator.eval(s,env,"-")); |
| Bag b = evalS(s,env); |
| b.materialize(); |
| return b; |
| } catch (Exception ex) { throw new Error(ex); } |
| case DataSetCollect(`s): |
| return dataSetCollect(s.toString()); |
| case dataset_size(`x): |
| return new MR_long(Plan.size(Evaluator.evaluator.eval(x,env,"-")) / (1024*1024)); |
| case synchronize(`peer,`b): |
| return Evaluator.evaluator.synchronize(((MR_string)evalE(peer,env)),(MR_bool)evalE(b,env)); |
| case distribute(`peer,`s): |
| return Evaluator.evaluator.distribute(((MR_string)evalE(peer,env)),(Bag)evalE(s,env)); |
| case mapReduce(`m,`r,`s,_): |
| return MapReduceAlgebra.mapReduce(evalF(m,env), |
| evalF(r,env), |
| (Bag)evalE(s,env)); |
| case mapReduce2(`mx,`my,`r,`x,`y,_): |
| return MapReduceAlgebra.mapReduce2( |
| evalF(mx,env), |
| evalF(my,env), |
| evalF(r,env), |
| (Bag)evalE(x,env), |
| (Bag)evalE(y,env)); |
| case mapJoin(`kx,`ky,`r,`x,`y): |
| return MapReduceAlgebra.mapJoin( |
| evalF(kx,env), |
| evalF(ky,env), |
| evalF(r,env), |
| (Bag)evalE(x,env), |
| (Bag)evalE(y,env)); |
| case crossProduct(`mx,`my,`r,`x,`y): |
| return MapReduceAlgebra.crossProduct( |
| evalF(mx,env), |
| evalF(my,env), |
| evalF(r,env), |
| (Bag)evalE(x,env), |
| (Bag)evalE(y,env)); |
| case groupBy(`s): |
| return MapReduceAlgebra.groupBy((Bag)evalE(s,env)); |
| case orderBy(`s): |
| return MapReduceAlgebra.groupBy((Bag)evalE(s,env)); |
| case index(`x,`n): |
| MRData xv = evalE(x,env); |
| MRData nv = evalE(n,env); |
| final int k = (int)((MR_int)nv).get(); |
| if (k < 0) |
| throw new Error("Negative list index: "+k+" (in "+print_query(x)+")"); |
| if (xv instanceof MR_dataset) { |
| DataSet ds = ((MR_dataset)xv).dataset(); |
| List<MRData> res = ds.take(k+1); |
| if (k >= res.size()) |
| throw new Error("List index out of bounds: "+k+" (in "+print_query(x)+" of size "+res.size()+")"); |
| return res.get(k); |
| }; |
| Bag b = (Bag)xv; |
| if (b.materialized() && k >= b.size()) |
| throw new Error("List index out of bounds: "+k+" (in "+print_query(x)+" of size "+b.size()+")"); |
| return b.get(k); |
| case range(`x,`i,`j): |
| MRData xv = evalE(x,env); |
| MRData ni = evalE(i,env); |
| MRData nj = evalE(j,env); |
| int ki = (int)((MR_int)ni).get(); |
| int kj = (int)((MR_int)nj).get(); |
| if (ki < 0 || kj < ki) |
| throw new Error("Wrong list range: ["+ki+","+kj+"] in "+print_query(x)); |
| Iterator<MRData> it = (xv instanceof MR_dataset) |
| ? ((MR_dataset)xv).dataset().take(kj+1).iterator() |
| : ((Bag)xv).iterator(); |
| Bag s = new Bag(); |
| for ( int n = 0; it.hasNext() && n < ki; n++ ) |
| it.next(); |
| for ( int n = ki; it.hasNext() && n <= kj; n++ ) |
| s.add(it.next()); |
| return s; |
| case map_index(`x,`key): |
| MRData xv = evalE(x,env); |
| final MRData nk = evalE(key,env); |
| if (xv instanceof MR_dataset) { |
| xv = ((MR_dataset)xv).dataset().reduce(new Tuple(),new Function() { |
| public MRData eval ( MRData value ) { |
| Tuple p = (Tuple)value; |
| Tuple y = (Tuple)p.second(); |
| return (y.first().equals(nk)) ? y.second() : p.first(); |
| } |
| }); |
| if (xv instanceof Tuple && ((Tuple)xv).size() == 0) |
| throw new Error("Map key not found: "+nk); |
| return xv; |
| }; |
| return ((Bag)xv).map_find(nk); |
| case aggregate(`acc,`zero,`s): |
| return MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env), |
| (Bag)evalE(s,env)); |
| case Aggregate(`acc,`zero,`s): |
| if (Config.hadoop_mode) |
| return Evaluator.evaluator.aggregate(closure(acc,env),zero,s,env); |
| else return MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),evalM(s,env)); |
| case mergeGroupByJoin(`kx,`ky,`gx,`gy,`acc,`zero,`r,`x,`y,`o): |
| return MapReduceAlgebra.mergeGroupByJoin(evalF(kx,env),evalF(ky,env),evalF(gx,env),evalF(gy,env), |
| evalF(acc,env),evalE(zero,env),evalF(r,env), |
| (Bag)evalE(x,env),(Bag)evalE(y,env)); |
| case BSP(tuple(...ns),`superstep,`state,`o,...as): |
| if (Config.hadoop_mode) |
| return Evaluator.evaluator.bsp(e,env); |
| Bag[] ds = new Bag[as.length()]; |
| for ( int i = 0; i < ds.length; i++ ) |
| ds[i] = evalM(as.nth(i),env); |
| int[] nn = new int[ns.length()]; |
| for ( int i = 0; i < ns.length(); i++ ) |
| nn[i] = (int)((LongLeaf)ns.nth(i)).value(); |
| return MapReduceAlgebra.BSP(nn, |
| evalF(superstep,env), |
| evalE(state,env), |
| o.equals(#<true>), |
| ds); |
| case BSP(`n,`superstep,`state,`o,...as): |
| if (Config.hadoop_mode) |
| return Evaluator.evaluator.bsp(e,env); |
| Bag[] ds = new Bag[as.length()]; |
| for ( int i = 0; i < ds.length; i++ ) |
| ds[i] = evalM(as.nth(i),env); |
| return MapReduceAlgebra.BSP(new int[]{(int)((LongLeaf)n).value()}, |
| evalF(superstep,env), |
| evalE(state,env), |
| o.equals(#<true>), |
| ds); |
| case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num): |
| if (Config.hadoop_mode) |
| return Evaluator.evaluator.loop(e,env); |
| int limit = ((MR_int)evalE(num,env)).get(); |
| Bag[] s = new Bag[vs.length()]; |
| for ( int i = 0; i < vs.length(); i++ ) |
| s[i] = evalM(ss.nth(i),env); |
| for ( int n = 0; n < limit; n++ ) { |
| Environment nenv = env; |
| for ( int i = 0; i < vs.length(); i ++ ) { |
| s[i].materialize(); |
| nenv = new Environment(vs.nth(i).toString(),s[i],nenv); |
| }; |
| for ( int i = 0; i < vs.length(); i ++ ) |
| s[i] = (Bag)evalM(bs.nth(i),nenv); |
| }; |
| return new Tuple(s); |
| case function(tuple(...params),`tp,`body): |
| String[] as = new String[params.length()]; |
| int i = 0; |
| for ( Tree param: params ) |
| match param { |
| case `bind(`v,_): |
| as[i++] = v.toString(); |
| }; |
| return new Lambda(evalT(as,body,env)); |
| case typed(`x,_): |
| return evalE(x,env); |
| case apply(`f,`arg): |
| if (!f.is_variable()) |
| return evalF(f,env).eval(evalE(arg,env)); |
| MRData fnc = lookup_global_binding(f.toString()); |
| if (fnc == null) { |
| String s = Plan.conf.get("mrql.global."+f); |
| if (s != null) |
| try { |
| Tree ft = Tree.parse(s); |
| //TopLevel.store(f.toString(),ft); |
| fnc = evalE(ft,env); |
| new_global_binding(f.toString(),fnc); |
| } catch (Exception ex) { |
| throw new Error(ex); |
| } |
| }; |
| MRData t = evalE(arg,env); |
| if (!(t instanceof Tuple)) |
| throw new Error("Expected a tuple in function application: "+t); |
| return ((Lambda)fnc).lambda().eval(t); |
| case Stream(...): // streaming |
| final Tree qt = query_type; |
| Evaluator.evaluator.streaming(e,env,null,new Function(){ |
| public MRData eval ( final MRData data ) { |
| System.out.println(print(data,qt)); |
| return data; |
| } |
| }); |
| return new Bag(); |
| case provenance(`x,`tp,...s): |
| MRData value = evalE(x,env); |
| if (Config.debug) |
| Debugger.debug(value,tp,#[...s]); |
| else Provenance.display(value,tp,#[...s]); |
| return new Tuple(0); |
| case Provenance(`x,`tp,...s): |
| MRData value = (Config.hadoop_mode) |
| ? new MR_dataset(Evaluator.evaluator.eval(x,env,"-")) |
| : evalS(x,env); |
| if (Config.debug) |
| Debugger.debug(value,tp,#[...s]); |
| else Provenance.display(value,tp,#[...s]); |
| return new Tuple(0); |
| case Lineage(...as): |
| return evalEE(#<tuple(...as)>,env); |
| case Let(`v,`x,`y): |
| if (Config.hadoop_mode) |
| return evalEE(y,new Environment(v.toString(), |
| new MR_dataset(Evaluator.evaluator.eval(x,env,"-")), |
| env)); |
| Bag b = evalS(x,env); |
| b.materialize(); |
| return evalEE(y,new Environment(v.toString(),b,env)); |
| case _: |
| try { |
| if (Config.hadoop_mode) |
| return new MR_dataset(Evaluator.evaluator.eval(e,env,"-")); |
| else return evalS(e,env); |
| } catch (Exception ex) { throw new Error(ex); } |
| }; |
| throw new Error("Cannot evaluate the expression: "+e); |
| } catch (Error msg) { |
| if (!Config.trace) |
| throw new Error(msg.getMessage()); |
| System.err.println(msg.getMessage()); |
| msg.printStackTrace(); |
| throw new Error("Evaluation error in: "+print_query(e)); |
| } catch (Exception ex) { |
| if (Config.trace) { |
| System.err.println(ex.getMessage()); |
| ex.printStackTrace(); |
| } |
| throw new Error("Evaluation error in: "+print_query(e)); |
| } |
| } |
| |
| /** evaluate an MRQL expression in memory */ |
| final static MRData evalE ( final Tree e ) { |
| return evalE(e,null); |
| } |
| |
| /** evaluate MRQL physical operators in memory (returns a Bag) */ |
| final static Bag evalS ( final Tree e, final Environment env ) { |
| return evalM(e,env); |
| } |
| |
| /** evaluate MRQL physical operators in memory (returns a Bag) */ |
| final static Bag evalM ( final Tree e, final Environment env ) { |
| if (Config.trace_execution) { |
| tab_count += 3; |
| System.out.println(tabs(tab_count)+print_query(e)); |
| }; |
| Bag res = evalMM(e,env); |
| if (Config.trace_execution) { |
| System.out.println(tabs(tab_count)+"-> "+res); |
| tab_count -= 3; |
| }; |
| return res; |
| } |
| |
| /** evaluate MRQL physical operators in memory (returns a Bag) */ |
| final static Bag evalMM ( final Tree e, final Environment env ) { |
| try { |
| match e { |
| case cMap(`f,`s): |
| return MapReduceAlgebra.cmap(evalF(f,env),evalM(s,env)); |
| case AggregateMap(`f,`acc,`zero,`s): |
| return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env), |
| evalM(#<cMap(`f,`s)>,env))); |
| case MapReduce(`m,`r,`s,_): |
| return MapReduceAlgebra.mapReduce( |
| evalF(m,env), |
| evalF(r,env), |
| evalM(s,env)); |
| case MapAggregateReduce(`m,`r,`acc,`zero,`s,_): |
| return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env), |
| evalM(#<MapReduce(`m,`r,`s,false)>,env))); |
| case MapCombineReduce(`m,`c,`r,`s,_): |
| return MapReduceAlgebra.mapReduce( |
| evalF(m,env), |
| evalF(r,env), |
| evalM(s,env)); |
| case MapReduce2(`mx,`my,`r,`x,`y,_): |
| return MapReduceAlgebra.mapReduce2( |
| evalF(mx,env), |
| evalF(my,env), |
| evalF(r,env), |
| evalM(x,env), |
| evalM(y,env)); |
| case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,_): |
| return MapReduceAlgebra.mapReduce2( |
| evalF(mx,env), |
| evalF(my,env), |
| evalF(r,env), |
| evalM(x,env), |
| evalM(y,env)); |
| case MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,_): |
| return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env), |
| evalM(#< MapReduce2(`mx,`my,`r,`x,`y,false)>,env))); |
| case MapJoin(`kx,`ky,`r,`x,`y): |
| return MapReduceAlgebra.mapJoin( |
| evalF(kx,env), |
| evalF(ky,env), |
| evalF(r,env), |
| evalM(x,env), |
| evalM(y,env)); |
| case MapAggregateJoin(`kx,`ky,`r,`acc,`zero,`x,`y): |
| return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env), |
| evalM(#<MapJoin(`kx,`ky,`r,`x,`y)>,env))); |
| case GroupByJoin(`kx,`ky,`gx,`gy,`acc,`zero,`r,`x,`y,`o): |
| return MapReduceAlgebra.groupByJoin( |
| evalF(kx,env), |
| evalF(ky,env), |
| evalF(gx,env), |
| evalF(gy,env), |
| evalF(acc,env), |
| evalE(zero,env), |
| evalF(r,env), |
| evalM(x,env), |
| evalM(y,env)); |
| case CrossProduct(`mx,`my,`r,`x,`y): |
| return MapReduceAlgebra.crossProduct( |
| evalF(mx,env), |
| evalF(my,env), |
| evalF(r,env), |
| evalM(x,env), |
| evalM(y,env)); |
| case CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y): |
| return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env), |
| evalM(#<CrossProduct(`mx,`my,`r,`x,`y)>,env))); |
| case BinarySource(`file,_): |
| return (Bag)MapReduceAlgebra.read_binary(file.stringValue()); |
| case BSPSource(`n,BinarySource(`file,_)): |
| return (Bag)MapReduceAlgebra.read_binary((int)((LongLeaf)n).value(), |
| file.stringValue()); |
| case BSPSource(`n,ParsedSource(`parser,`file,...args)): |
| if (!(n instanceof LongLeaf)) |
| fail; |
| Parser p = DataSource.parserDirectory.get(parser.toString()).newInstance(); |
| if (p == null) |
| throw new Error("Unknown parser: "+parser); |
| return MapReduceAlgebra.parsedSource((int)(((LongLeaf)n).value()),p, |
| ((MR_string)evalE(file,env)).get(),args); |
| case ParsedSource(`parser,`file,...args): |
| Parser p = DataSource.parserDirectory.get(parser.toString()).newInstance(); |
| if (p == null) |
| throw new Error("Unknown parser: "+parser); |
| return MapReduceAlgebra.parsedSource(p,((MR_string)evalE(file,env)).get(),args); |
| case Merge(`x,`y): |
| return evalM(x,env).union(evalM(y,env)); |
| case Repeat(lambda(`v,`b),`s,`n): |
| final String vs = v.toString(); |
| final Tree body = b; |
| Function loop = new Function() { |
| final public MRData eval ( final MRData x ) { |
| return evalM(body,new Environment(vs,x,env)); |
| } |
| }; |
| return MapReduceAlgebra.repeat(loop,(Bag)evalM(s,env),((MR_int)evalE(n,env)).get()); |
| case Closure(lambda(`v,`b),`s,`n): |
| final String vs = v.toString(); |
| final Tree body = b; |
| Function loop = new Function() { |
| final public MRData eval ( final MRData x ) { |
| return evalM(body,new Environment(vs,x,env)); |
| } |
| }; |
| return MapReduceAlgebra.closure(loop,(Bag)evalM(s,env),((MR_int)evalE(n,env)).get()); |
| case Generator(`min,`max,`size): |
| return MapReduceAlgebra.generator(((MR_long)evalE(min,env)).get(), |
| ((MR_long)evalE(max,env)).get()); |
| case BSPSource(`n,Generator(`min,`max,`size)): |
| return MapReduceAlgebra.generator((int)((LongLeaf)n).value(), |
| ((MR_long)evalE(min,env)).get(), |
| ((MR_long)evalE(max,env)).get()); |
| case Dump(`s): |
| Bag bs = (Bag)evalE(s,env); |
| final Iterator<MRData> iter = bs.iterator(); |
| return new Bag(new BagIterator() { |
| public boolean hasNext () { |
| return iter.hasNext(); |
| } |
| public MRData next () { |
| return new Tuple(new MR_int(0),iter.next()); |
| } |
| }); |
| case let(`v,`u,`body): |
| return evalM(body,new Environment(v.toString(),evalE(u,env),env)); |
| case trace(`msg,`tp,`x): |
| String m = ((MR_string)evalE(msg,env)).get(); |
| return (Bag)trace(pre_trace(m),tp,evalM(x,env)); |
| case apply(`f,`arg): |
| if (!f.is_variable()) |
| return (Bag)evalF(f,env).eval(evalE(arg)); |
| MRData fnc = lookup_global_binding(f.toString()); |
| if (fnc == null) |
| throw new Error("Unknown function: "+f); |
| MRData t = evalE(arg,env); |
| if (!(t instanceof Tuple)) |
| throw new Error("Expected a tuple in function application: "+t); |
| return (Bag)((Lambda)fnc).lambda().eval(t); |
| case BSPSource(`n,`s): |
| final MR_int i = new MR_int((int)((LongLeaf)n).value()); |
| Bag bs = (Bag)evalE(s,env); |
| final Iterator<MRData> iter = bs.iterator(); |
| return new Bag(new BagIterator() { |
| public boolean hasNext () { |
| return iter.hasNext(); |
| } |
| public MRData next () { |
| return new Tuple(i,iter.next()); |
| } |
| }); |
| case BSP(...): |
| MRData res = evalE(e,env); |
| if (res instanceof Bag) |
| return (Bag)res; |
| else return new Bag(res); |
| case `v: |
| if (!v.is_variable()) |
| fail; |
| MRData x = variable_lookup(v.toString(),env); |
| if (x != null) |
| return (Bag)x; |
| x = lookup_global_binding(v.toString()); |
| if (x != null) |
| return (Bag)x; |
| throw new Error("Variable "+v+" is not bound"); |
| }; |
| throw new Error("Cannot evaluate the plan: "+e); |
| } catch (Error msg) { |
| if (!Config.trace) |
| throw new Error(msg.getMessage()); |
| System.err.println(msg.getMessage()); |
| msg.printStackTrace(); |
| throw new Error("Evaluation error in: "+print_query(e)); |
| } catch (Exception ex) { |
| if (Config.trace) |
| ex.printStackTrace(); |
| throw new Error("Evaluation error in: "+print_query(e)); |
| } |
| } |
| |
| private final static Tree closure ( Tree e, Environment env, Trees local_vars ) { |
| return e; |
| } |
| |
| final static Tree closure ( Tree e, Environment env ) { |
| return closure(e,env,#[]); |
| } |
| |
| static Tree query_type; |
| static Tree query_plan; |
| static boolean is_dataset; |
| |
| /** translate an MRQL expression e into a physical plan */ |
| final static Tree translate_expression ( Tree e ) { |
| try { |
| if (Config.trace) |
| System.out.println("Query at line "+Main.parser.line_pos()+": "+print_query(e)); |
| Tree qt = TypeInference.type_inference(e); |
| if (!Config.quiet_execution) |
| System.out.println("Query type: "+print_type(qt)); |
| query_type = qt; |
| Tree ne = Normalization.remove_groupby(e); |
| if (Config.trace) |
| System.out.println("After removing group-by:\n"+ne.pretty(0)); |
| ne = Simplification.rename(ne); |
| if (Config.trace) |
| System.out.println("After renaming variables:\n"+ne.pretty(0)); |
| ne = Simplification.rename(Normalization.normalize_all(ne)); |
| if (Config.trace) |
| System.out.println("Normalized query:\n"+ne.pretty(0)); |
| type_inference(ne); |
| ne = QueryPlan.best_plan(ne); |
| if (Config.trace) |
| System.out.println("Best plan:\n"+ne.pretty(0)); |
| ne = Simplification.rename(Translator.translate_select(ne)); |
| if (Config.trace) |
| System.out.println("After removing select-queries:\n"+ne.pretty(0)); |
| type_inference(ne); |
| ne = Simplification.simplify_all(ne); |
| if (Config.trace) |
| System.out.println("Algebra expression:\n"+ne.pretty(0)); |
| Tree pt = type_inference(ne); |
| if (Config.trace) |
| System.out.println("Algebraic type: "+print_type(pt)); |
| if (Config.stream_window > 0 && Config.incremental) |
| ne = Streaming.generate_incremental_code(ne); |
| else if (Config.lineage) { |
| ne = Provenance.embed_provenance(ne,false); |
| if (Config.trace) |
| System.out.println("After provenance injection:\n"+ne.pretty(0)); |
| } else if (Config.debug) { |
| ne = Provenance.embed_provenance(ne,debug_mode); |
| if (Config.trace) |
| System.out.println("After provenance injection:\n"+ne.pretty(0)); |
| }; |
| ne = AlgebraicOptimization.translate_all(ne); |
| if (Config.trace) |
| System.out.println("Translated expression:\n"+ne.pretty(0)); |
| Tree et = TypeInference.type_inference(ne); |
| is_dataset = PlanGeneration.is_dataset_expr(ne); |
| if (Config.trace) |
| System.out.println("Physical plan type: "+print_type(et)); |
| repeat_variables = #[]; |
| ne = Simplification.simplify_all(ne); |
| if (Streaming.is_streaming(ne) && !Config.incremental) |
| ne = Streaming.streamify(ne); |
| Tree plan = PlanGeneration.makePlan(ne); |
| if (Config.bsp_mode) { |
| BSPTranslator.reset(); |
| if (Config.trace) |
| System.out.println("Physical plan:\n"+plan.pretty(0)); |
| plan = Materialization.materialize_terms(BSPTranslator.constructBSPplan(plan)); |
| if (Config.trace) |
| System.out.println("BSP plan:\n"+plan.pretty(0)); |
| else { |
| String splan = print_plan(plan,0,false); |
| if (!splan.equals("") && !Config.quiet_execution) |
| System.out.println("BSP plan:\n"+splan); |
| } |
| } else { |
| if (Config.hadoop_mode) |
| plan = PlanGeneration.physical_plan(plan); |
| plan = Materialization.materialize_terms(AlgebraicOptimization.common_factoring(plan)); |
| if (Config.trace) |
| System.out.println("Physical plan:\n"+plan.pretty(0)); |
| else { |
| String splan = print_plan(plan,0,false); |
| if (!splan.equals("") && !Config.quiet_execution) |
| System.out.println("Physical plan:\n"+splan); |
| } |
| }; |
| if (Config.compile_functional_arguments) |
| plan = Compiler.compile(plan); |
| return plan; |
| } catch (Error x) { |
| if (Config.testing) |
| throw new Error(x); |
| if (!Config.trace && x.toString().endsWith("Type Error")) |
| return null; |
| if (x.getMessage() != null) // system error |
| System.err.println("*** MRQL System Error at line "+Main.parser.line_pos()+": "+x); |
| if (Config.trace) |
| x.printStackTrace(System.err); |
| return null; |
| } |
| } |
| } |
| |