blob: 600f5aa104e3bb852533015c0ac5ec99a0f8719b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import org.apache.mrql.gen.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
import java.io.PrintStream;
/** The MRQL interpreter */
public class Interpreter extends TypeInference {
public final static Tree identity_mapper = #<lambda(x,bag(x))>;
protected static Environment global_env = null;
/** retrieve variable binding */
public final static MRData variable_lookup ( final String v, final Environment environment ) {
for ( Environment env = environment; env != null; env = env.next ) {
if (v.equals(env.name))
return env.value;
};
return null;
}
/** insert a new global variable binding */
public final static void new_global_binding ( final String var, final MRData value ) {
if (value instanceof Bag)
((Bag)value).materialize();
global_env = new Environment(var,value,global_env);
}
/** remove a global variable binding */
public static void remove_global_binding ( String v ) {
if (global_env == null)
return;
for ( Environment env = global_env; env.next != null; env = env.next )
if (v.equals(env.next.name))
env.next = env.next.next;
if (global_env.name == v)
global_env = global_env.next;
}
/** retrieve a global variable binding */
public static MRData lookup_global_binding ( String v ) {
for ( Environment env = global_env; env != null; env = env.next )
if (v.equals(env.name) && env.value != null)
return env.value;
return lookup_distributed_binding(v);
}
public static void set_global_bindings ( Environment env ) {
global_env = env;
}
/** insert a new global variable binding visible to all nodes */
public final static void new_distributed_binding ( final String var, final MRData value ) {
if (value == null)
return;
new_global_binding(var,value);
if (Config.hadoop_mode) { // insert the binding in the hadoop configuration
String gvs = Plan.conf.get("mrql.global.vars");
if (gvs == null)
gvs = "";
gvs = var+":"+reify(value)+"@"+gvs;
Plan.conf.set("mrql.global.vars",gvs);
}
}
/** retrieve a global variable binding available to all nodes */
public static MRData lookup_distributed_binding ( String v ) {
try {
if (Config.hadoop_mode) { // find the binding in the hadoop configuration
String gvs = Plan.conf.get("mrql.global.vars");
if (gvs == null)
return null;
for ( String s: gvs.split("@") ) {
String[] b = s.split(":");
if (b.length == 2 && b[0].equals(v))
return evalE(Tree.parse(b[1]));
}
}
} catch (Exception ex) {};
return null;
}
final static int coerce_method = ClassImporter.find_method_number("coerce",#[any,int]);
/** untyped reify: not type-correct but will not crash the run-time system */
private final static Tree reify ( final MRData x ) {
if (x instanceof Bag) {
Bag b = (Bag)x;
Trees as = #[];
for ( MRData e: b)
as = as.append(reify(e));
return #<list(...as)>;
} else if (x instanceof Tuple) {
Tuple t = (Tuple)x;
Trees as = #[];
for ( short i = 0; i < t.size(); i++ )
as = as.append(reify(t.get(i)));
return #<tuple(...as)>;
} else if (x instanceof Union) {
Union u = (Union)x;
return #<tagged_union(`(u.tag()),`(reify(u.value())))>;
} else if (x instanceof MR_string)
return new StringLeaf(((MR_string)x).get());
else if (x instanceof MR_short)
return #<callM(coerce,`coerce_method,`(((MR_short)x).get()),`(MRContainer.SHORT))>;
else if (x instanceof MR_int)
return #<`(((MR_int)x).get())>;
else if (x instanceof MR_long)
return #<callM(coerce,`coerce_method,`((int)((MR_long)x).get()),`(MRContainer.LONG))>;
else if (x instanceof MR_float)
return #<`(((MR_float)x).get())>;
else if (x instanceof MR_double)
return #<callM(coerce,`coerce_method,`((float)(((MR_double)x).get())),`(MRContainer.DOUBLE))>;
else if (x instanceof MR_bool)
return (((MR_bool)x).get()) ? #<true> : #<false>;
else if (x instanceof MR_dataset) {
Trees vs = #[ ];
for ( MRData v: ((MR_dataset)x).dataset().take(Integer.MAX_VALUE))
vs = vs.append(reify(v));
return #<list(...vs)>;
};
throw new Error("wrong MRData: "+x);
}
private static boolean has_source ( Tree e ) {
match e {
case Generator(...): return true;
case BinarySource(...): return true;
case ParsedSource(...): return true;
case `f(...as):
for ( Tree a: as )
if (has_source(a))
return true;
};
return false;
}
/** evaluate an MRQL function in memory */
private final static Function evalf ( final String v,
final Tree body,
final Environment env ) {
return new Function() {
final public MRData eval ( final MRData x ) {
return evalE(body,new Environment(v,x,env));
}
};
}
/** evaluate an MRQL function in memory */
public final static Function evalF ( Tree fnc, Environment env ) {
match fnc {
case compiled(`ln,`lm,...vars):
try {
return Compiler.compiled(Thread.currentThread().getContextClassLoader(),ln.toString());
} catch (Exception ex) {
System.err.println("*** Unable to retrieve the compiled lambda: "+fnc);
return ((Lambda) evalE(lm)).lambda();
}
case lambda(`v,`b):
return evalf(v.toString(),b,env);
case function(tuple(...params),`tp,`body):
String[] as = new String[params.length()];
int i = 0;
for ( Tree param: params )
match param {
case `bind(`v,_):
as[i++] = v.toString();
};
return evalT(as,body,env);
};
throw new Error("Ill-formed lambda: "+fnc);
}
/** evaluate an MRQL function in memory */
private final static Function evalT ( final String[] params,
final Tree body,
final Environment env ) {
return new Function() {
final public MRData eval ( final MRData x ) {
Environment new_env = env;
for ( int i = 0; i < params.length; i++ )
new_env = new Environment(params[i],((Tuple)x).get(i),new_env);
return evalE(body,new_env);
}
};
}
final static String true_name = #<true>.toString();
final static String false_name = #<false>.toString();
final static String null_name = #<null>.toString();
final static MRData null_value = new Tuple(0);
final static MRData true_value = new MR_bool(true);
final static MRData false_value = new MR_bool(false);
static int tab_count = -3;
static long trace_count = 0;
public static String tabs ( int n ) {
StringBuffer b = new StringBuffer();
for ( int i = 0; i < n; i++)
b.append(' ');
return b.toString();
}
/** implements the DataSetCollect operation */
public static Bag dataSetCollect ( String var ) {
MRData val = lookup_global_binding(var);
if (val instanceof Bag)
return (Bag)val;
else if (val instanceof MR_dataset)
return new Bag(((MR_dataset)val).dataset().take(Integer.MAX_VALUE));
throw new Error("Cannot collect the MRQL value into a bag: "+val);
}
public static long pre_trace ( String msg ) {
tab_count += 3;
trace_count++;
System.out.println(tabs(tab_count)+"*** "+trace_count+": "+msg);
return trace_count;
}
public static MRData trace ( long count, Tree type, MRData value ) {
System.out.print(tabs(tab_count)+"--> "+count+": ");
System.out.println(Printer.print(value,type));
tab_count -= 3;
return value;
}
/** evaluate an MRQL expression in memory and print tracing info */
final static MRData evalE ( final Tree e, final Environment env ) {
if (Config.trace_exp_execution) {
tab_count += 3;
System.out.println(tabs(tab_count)+print_query(e));
};
MRData res = evalEE(e,env);
if (Config.trace_exp_execution) {
System.out.println(tabs(tab_count)+"-> "+res);
tab_count -= 3;
};
return res;
}
/** evaluate an MRQL expression in memory */
private final static MRData evalEE ( final Tree e, final Environment env ) {
try {
if (e.is_variable()) {
String v = e.toString();
if (v == true_name)
return true_value;
else if (v == false_name)
return false_value;
else if (v == null_name)
return null_value;
MRData x = variable_lookup(v,env);
if (x != null)
return x;
x = lookup_global_binding(v);
if (x == null)
throw new Error("Variable "+v+" is not bound");
return x;
} else if (e.is_long())
return new MR_int((int)e.longValue());
else if (e.is_double())
return new MR_double(e.doubleValue());
else if (e.is_string())
return new MR_string(e.stringValue());
match e {
case callM(and,_,`x,`y): // lazy
return (((MR_bool)evalE(x,env)).get()) ? evalE(y,env) : false_value;
case callM(or,_,`x,`y):
return (((MR_bool)evalE(x,env)).get()) ? true_value : evalE(y,env);
case callM(coerce,_,`x,`n):
if (!x.is_long() || n.longValue() != MRContainer.LONG)
fail;
return new MR_long(x.longValue());
case callM(coerce,_,`x,`n):
if (!x.is_double() || n.longValue() != MRContainer.FLOAT)
fail;
return new MR_float((float)x.doubleValue());
case callM(`f,`n,...args): // internal function call
MRData[] as = new MRData[args.length()];
for ( int i = 0; i < args.length(); i++ )
as[i] = evalE(args.nth(i),env);
return ClassImporter.call((int)n.longValue(),as);
case compiled(`ln,_,...vars):
return new Lambda(Compiler.compiled(Thread.currentThread().getContextClassLoader(),ln.toString()));
case lambda(`v,`body):
return new Lambda(evalf(v.toString(),body,env));
case nth(`x,`n):
return ((Tuple)evalE(x,env)).get((int)n.longValue());
case setNth(`x,`n,`v,`ret):
return ((Tuple)evalE(x,env)).set((int)n.longValue(),evalE(v,env),evalE(ret,env));
case materialize(`u):
return MapReduceAlgebra.materialize(evalE(u,env));
case let(`v,DataSetCollect(`s),`body):
MRData x = evalE(s,env);
if (x instanceof MR_dataset)
x = new Bag(((MR_dataset)x).dataset().take(Integer.MAX_VALUE));
else if (x instanceof Bag)
((Bag)x).materialize();
new_distributed_binding(v.toString(),x);
return evalE(body,new Environment(v.toString(),x,env));
case let(`v,`u,`body):
MRData x = evalE(u,env);
if (x instanceof Bag)
((Bag)x).materialize();
return evalE(body,new Environment(v.toString(),x,env));
case cmap(`f,`s):
return MapReduceAlgebra.cmap(evalF(f,env),(Bag)evalE(s,env));
case filter(`p,`m,`s):
return MapReduceAlgebra.filter(evalF(p,env),evalF(m,env),(Bag)evalE(s,env));
case map(`m,`s):
return MapReduceAlgebra.map(evalF(m,env),(Bag)evalE(s,env));
case repeat(...):
if (!Config.flink_mode || !Config.hadoop_mode || !has_source(e))
fail;
// send Flink repetitions to the Flink evaluator (they need Flink iterations)
return Evaluator.evaluator.aggregate(#<0>,#<0>,e,env);
case repeat(lambda(`v,`b),`s,`n):
final String nm = v.toString();
final Tree body = b;
Function loop_fnc = new Function () {
public MRData eval ( MRData s ) {
new_distributed_binding(nm,s);
return evalE(body,new Environment(nm,s,env));
}; };
MRData init = evalE(s,env);
if (init instanceof MR_dataset) {
Bag vs = new Bag();
for ( MRData dv: ((MR_dataset)init).dataset().take(Integer.MAX_VALUE))
vs.add(dv);
init = vs;
};
return MapReduceAlgebra.repeat(loop_fnc,(Bag)init,((MR_int)evalE(n,env)).get());
case closure(lambda(`v,`b),`s,`n):
final String nm = v.toString();
final Tree body = b;
Function loop_fnc = new Function () {
public MRData eval ( MRData s ) {
new_distributed_binding(nm,s);
return evalE(body,new Environment(nm,s,env));
}; };
return MapReduceAlgebra.closure(loop_fnc,(Bag)evalE(s,env),((MR_int)evalE(n,env)).get());
case loop(lambda(tuple(...vs),`b),`s,`n):
final Trees nvs = vs;
final Tree body = b;
Function loop_fnc = new Function () {
public MRData eval ( MRData s ) {
Tuple t = (Tuple)s;
Environment nenv = env;
for ( int i = 0; i < nvs.length(); i++ ) {
new_distributed_binding(nvs.nth(i).toString(),t.get(i));
nenv = new Environment(nvs.nth(i).toString(),t.get(i),nenv);
};
return evalE(body,nenv);
}; };
return MapReduceAlgebra.loop(loop_fnc,(Tuple)evalE(s,env),((MR_int)evalE(n,env)).get());
case range(`min,`max):
return MapReduceAlgebra.generator(((MR_long)evalE(min,env)).get(),
((MR_long)evalE(max,env)).get());
case trace(`msg,`tp,`x):
String m = ((MR_string)evalE(msg,env)).get();
return trace(pre_trace(m),tp,evalE(x,env));
case call(`f,...args):
Tuple t = new Tuple(args.length());
int i = 0;
for ( Tree a: args )
t.set(i++,evalE(a,env));
return evalF(f,env).eval(t);
case tuple(`x,`y):
return new Tuple(evalE(x,env),evalE(y,env));
case tuple(`x,`y,`z):
return new Tuple(evalE(x,env),evalE(y,env),evalE(z,env));
case tuple(...el):
Tuple t = new Tuple(el.length());
int i = 0;
for ( Tree a: el )
t.set(i++,evalE(a,env));
return t;
case tagged_union(`n,`u):
return new Union((byte)n.longValue(),evalE(u,env));
case union_value(`x):
return ((Union)evalE(x,env)).value();
case union_tag(`x):
return new MR_int(((Union)evalE(x,env)).tag());
// used for shortcutting sync in bsp supersteps
case BAG():
return SystemFunctions.bsp_empty_bag;
case TRUE():
return SystemFunctions.bsp_true_value;
case FALSE():
return SystemFunctions.bsp_false_value;
case `T(...el):
if (!is_collection(T))
fail;
if (el.is_empty())
return new Bag();
Bag b = new Bag(el.length());
for ( Tree a: el )
b.add(evalE(a,env));
return b;
case if(`c,`x,`y):
if (((MR_bool)evalE(c,env)).get())
return evalE(x,env);
else return evalE(y,env);
case Collect(`s):
try {
if (Config.hadoop_mode)
return Plan.collect(Evaluator.evaluator.eval(s,env,"-"));
Bag b = evalS(s,env);
b.materialize();
return b;
} catch (Exception ex) { throw new Error(ex); }
case DataSetCollect(`s):
return dataSetCollect(s.toString());
case dataset_size(`x):
return new MR_long(Plan.size(Evaluator.evaluator.eval(x,env,"-")) / (1024*1024));
case synchronize(`peer,`b):
return Evaluator.evaluator.synchronize(((MR_string)evalE(peer,env)),(MR_bool)evalE(b,env));
case distribute(`peer,`s):
return Evaluator.evaluator.distribute(((MR_string)evalE(peer,env)),(Bag)evalE(s,env));
case mapReduce(`m,`r,`s,_):
return MapReduceAlgebra.mapReduce(evalF(m,env),
evalF(r,env),
(Bag)evalE(s,env));
case mapReduce2(`mx,`my,`r,`x,`y,_):
return MapReduceAlgebra.mapReduce2(
evalF(mx,env),
evalF(my,env),
evalF(r,env),
(Bag)evalE(x,env),
(Bag)evalE(y,env));
case mapJoin(`kx,`ky,`r,`x,`y):
return MapReduceAlgebra.mapJoin(
evalF(kx,env),
evalF(ky,env),
evalF(r,env),
(Bag)evalE(x,env),
(Bag)evalE(y,env));
case crossProduct(`mx,`my,`r,`x,`y):
return MapReduceAlgebra.crossProduct(
evalF(mx,env),
evalF(my,env),
evalF(r,env),
(Bag)evalE(x,env),
(Bag)evalE(y,env));
case groupBy(`s):
return MapReduceAlgebra.groupBy((Bag)evalE(s,env));
case orderBy(`s):
return MapReduceAlgebra.groupBy((Bag)evalE(s,env));
case index(`x,`n):
MRData xv = evalE(x,env);
MRData nv = evalE(n,env);
final int k = (int)((MR_int)nv).get();
if (k < 0)
throw new Error("Negative list index: "+k+" (in "+print_query(x)+")");
if (xv instanceof MR_dataset) {
DataSet ds = ((MR_dataset)xv).dataset();
List<MRData> res = ds.take(k+1);
if (k >= res.size())
throw new Error("List index out of bounds: "+k+" (in "+print_query(x)+" of size "+res.size()+")");
return res.get(k);
};
Bag b = (Bag)xv;
if (b.materialized() && k >= b.size())
throw new Error("List index out of bounds: "+k+" (in "+print_query(x)+" of size "+b.size()+")");
return b.get(k);
case range(`x,`i,`j):
MRData xv = evalE(x,env);
MRData ni = evalE(i,env);
MRData nj = evalE(j,env);
int ki = (int)((MR_int)ni).get();
int kj = (int)((MR_int)nj).get();
if (ki < 0 || kj < ki)
throw new Error("Wrong list range: ["+ki+","+kj+"] in "+print_query(x));
Iterator<MRData> it = (xv instanceof MR_dataset)
? ((MR_dataset)xv).dataset().take(kj+1).iterator()
: ((Bag)xv).iterator();
Bag s = new Bag();
for ( int n = 0; it.hasNext() && n < ki; n++ )
it.next();
for ( int n = ki; it.hasNext() && n <= kj; n++ )
s.add(it.next());
return s;
case map_index(`x,`key):
MRData xv = evalE(x,env);
final MRData nk = evalE(key,env);
if (xv instanceof MR_dataset) {
xv = ((MR_dataset)xv).dataset().reduce(new Tuple(),new Function() {
public MRData eval ( MRData value ) {
Tuple p = (Tuple)value;
Tuple y = (Tuple)p.second();
return (y.first().equals(nk)) ? y.second() : p.first();
}
});
if (xv instanceof Tuple && ((Tuple)xv).size() == 0)
throw new Error("Map key not found: "+nk);
return xv;
};
return ((Bag)xv).map_find(nk);
case aggregate(`acc,`zero,`s):
return MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
(Bag)evalE(s,env));
case Aggregate(`acc,`zero,`s):
if (Config.hadoop_mode)
return Evaluator.evaluator.aggregate(closure(acc,env),zero,s,env);
else return MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),evalM(s,env));
case mergeGroupByJoin(`kx,`ky,`gx,`gy,`acc,`zero,`r,`x,`y,`o):
return MapReduceAlgebra.mergeGroupByJoin(evalF(kx,env),evalF(ky,env),evalF(gx,env),evalF(gy,env),
evalF(acc,env),evalE(zero,env),evalF(r,env),
(Bag)evalE(x,env),(Bag)evalE(y,env));
case BSP(tuple(...ns),`superstep,`state,`o,...as):
if (Config.hadoop_mode)
return Evaluator.evaluator.bsp(e,env);
Bag[] ds = new Bag[as.length()];
for ( int i = 0; i < ds.length; i++ )
ds[i] = evalM(as.nth(i),env);
int[] nn = new int[ns.length()];
for ( int i = 0; i < ns.length(); i++ )
nn[i] = (int)((LongLeaf)ns.nth(i)).value();
return MapReduceAlgebra.BSP(nn,
evalF(superstep,env),
evalE(state,env),
o.equals(#<true>),
ds);
case BSP(`n,`superstep,`state,`o,...as):
if (Config.hadoop_mode)
return Evaluator.evaluator.bsp(e,env);
Bag[] ds = new Bag[as.length()];
for ( int i = 0; i < ds.length; i++ )
ds[i] = evalM(as.nth(i),env);
return MapReduceAlgebra.BSP(new int[]{(int)((LongLeaf)n).value()},
evalF(superstep,env),
evalE(state,env),
o.equals(#<true>),
ds);
case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
if (Config.hadoop_mode)
return Evaluator.evaluator.loop(e,env);
int limit = ((MR_int)evalE(num,env)).get();
Bag[] s = new Bag[vs.length()];
for ( int i = 0; i < vs.length(); i++ )
s[i] = evalM(ss.nth(i),env);
for ( int n = 0; n < limit; n++ ) {
Environment nenv = env;
for ( int i = 0; i < vs.length(); i ++ ) {
s[i].materialize();
nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
};
for ( int i = 0; i < vs.length(); i ++ )
s[i] = (Bag)evalM(bs.nth(i),nenv);
};
return new Tuple(s);
case function(tuple(...params),`tp,`body):
String[] as = new String[params.length()];
int i = 0;
for ( Tree param: params )
match param {
case `bind(`v,_):
as[i++] = v.toString();
};
return new Lambda(evalT(as,body,env));
case typed(`x,_):
return evalE(x,env);
case apply(`f,`arg):
if (!f.is_variable())
return evalF(f,env).eval(evalE(arg,env));
MRData fnc = lookup_global_binding(f.toString());
if (fnc == null) {
String s = Plan.conf.get("mrql.global."+f);
if (s != null)
try {
Tree ft = Tree.parse(s);
//TopLevel.store(f.toString(),ft);
fnc = evalE(ft,env);
new_global_binding(f.toString(),fnc);
} catch (Exception ex) {
throw new Error(ex);
}
};
MRData t = evalE(arg,env);
if (!(t instanceof Tuple))
throw new Error("Expected a tuple in function application: "+t);
return ((Lambda)fnc).lambda().eval(t);
case Stream(...): // streaming
final Tree qt = query_type;
Evaluator.evaluator.streaming(e,env,null,new Function(){
public MRData eval ( final MRData data ) {
System.out.println(print(data,qt));
return data;
}
});
return new Bag();
case _:
try {
if (Config.hadoop_mode)
return new MR_dataset(Evaluator.evaluator.eval(e,env,"-"));
else return evalS(e,env);
} catch (Exception ex) { throw new Error(ex); }
};
throw new Error("Cannot evaluate the expression: "+e);
} catch (Error msg) {
if (!Config.trace)
throw new Error(msg.getMessage());
System.err.println(msg.getMessage());
msg.printStackTrace();
throw new Error("Evaluation error in: "+print_query(e));
} catch (Exception ex) {
if (Config.trace) {
System.err.println(ex.getMessage());
ex.printStackTrace();
}
throw new Error("Evaluation error in: "+print_query(e));
}
}
/** evaluate an MRQL expression in memory */
final static MRData evalE ( final Tree e ) {
return evalE(e,null);
}
/** evaluate MRQL physical operators in memory (returns a Bag) */
final static Bag evalS ( final Tree e, final Environment env ) {
return evalM(e,env);
}
/** evaluate MRQL physical operators in memory (returns a Bag) */
final static Bag evalM ( final Tree e, final Environment env ) {
if (Config.trace_execution) {
tab_count += 3;
System.out.println(tabs(tab_count)+print_query(e));
};
Bag res = evalMM(e,env);
if (Config.trace_execution) {
System.out.println(tabs(tab_count)+"-> "+res);
tab_count -= 3;
};
return res;
}
/** evaluate MRQL physical operators in memory (returns a Bag) */
final static Bag evalMM ( final Tree e, final Environment env ) {
try {
match e {
case cMap(`f,`s):
return MapReduceAlgebra.cmap(evalF(f,env),evalM(s,env));
case AggregateMap(`f,`acc,`zero,`s):
return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
evalM(#<cMap(`f,`s)>,env)));
case MapReduce(`m,`r,`s,_):
return MapReduceAlgebra.mapReduce(
evalF(m,env),
evalF(r,env),
evalM(s,env));
case MapAggregateReduce(`m,`r,`acc,`zero,`s,_):
return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
evalM(#<MapReduce(`m,`r,`s,false)>,env)));
case MapCombineReduce(`m,`c,`r,`s,_):
return MapReduceAlgebra.mapReduce(
evalF(m,env),
evalF(r,env),
evalM(s,env));
case MapReduce2(`mx,`my,`r,`x,`y,_):
return MapReduceAlgebra.mapReduce2(
evalF(mx,env),
evalF(my,env),
evalF(r,env),
evalM(x,env),
evalM(y,env));
case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,_):
return MapReduceAlgebra.mapReduce2(
evalF(mx,env),
evalF(my,env),
evalF(r,env),
evalM(x,env),
evalM(y,env));
case MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,_):
return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
evalM(#< MapReduce2(`mx,`my,`r,`x,`y,false)>,env)));
case MapJoin(`kx,`ky,`r,`x,`y):
return MapReduceAlgebra.mapJoin(
evalF(kx,env),
evalF(ky,env),
evalF(r,env),
evalM(x,env),
evalM(y,env));
case MapAggregateJoin(`kx,`ky,`r,`acc,`zero,`x,`y):
return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
evalM(#<MapJoin(`kx,`ky,`r,`x,`y)>,env)));
case GroupByJoin(`kx,`ky,`gx,`gy,`acc,`zero,`r,`x,`y,`o):
return MapReduceAlgebra.groupByJoin(
evalF(kx,env),
evalF(ky,env),
evalF(gx,env),
evalF(gy,env),
evalF(acc,env),
evalE(zero,env),
evalF(r,env),
evalM(x,env),
evalM(y,env));
case CrossProduct(`mx,`my,`r,`x,`y):
return MapReduceAlgebra.crossProduct(
evalF(mx,env),
evalF(my,env),
evalF(r,env),
evalM(x,env),
evalM(y,env));
case CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y):
return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
evalM(#<CrossProduct(`mx,`my,`r,`x,`y)>,env)));
case BinarySource(`file,_):
return (Bag)MapReduceAlgebra.read_binary(file.stringValue());
case BSPSource(`n,BinarySource(`file,_)):
return (Bag)MapReduceAlgebra.read_binary((int)((LongLeaf)n).value(),
file.stringValue());
case BSPSource(`n,ParsedSource(`parser,`file,...args)):
if (!(n instanceof LongLeaf))
fail;
Parser p = DataSource.parserDirectory.get(parser.toString()).newInstance();
if (p == null)
throw new Error("Unknown parser: "+parser);
return MapReduceAlgebra.parsedSource((int)(((LongLeaf)n).value()),p,
((MR_string)evalE(file,env)).get(),args);
case ParsedSource(`parser,`file,...args):
Parser p = DataSource.parserDirectory.get(parser.toString()).newInstance();
if (p == null)
throw new Error("Unknown parser: "+parser);
return MapReduceAlgebra.parsedSource(p,((MR_string)evalE(file,env)).get(),args);
case Merge(`x,`y):
return evalM(x,env).union(evalM(y,env));
case Repeat(lambda(`v,`b),`s,`n):
final String vs = v.toString();
final Tree body = b;
Function loop = new Function() {
final public MRData eval ( final MRData x ) {
return evalM(body,new Environment(vs,x,env));
}
};
return MapReduceAlgebra.repeat(loop,(Bag)evalM(s,env),((MR_int)evalE(n,env)).get());
case Closure(lambda(`v,`b),`s,`n):
final String vs = v.toString();
final Tree body = b;
Function loop = new Function() {
final public MRData eval ( final MRData x ) {
return evalM(body,new Environment(vs,x,env));
}
};
return MapReduceAlgebra.closure(loop,(Bag)evalM(s,env),((MR_int)evalE(n,env)).get());
case Generator(`min,`max,`size):
return MapReduceAlgebra.generator(((MR_long)evalE(min,env)).get(),
((MR_long)evalE(max,env)).get());
case BSPSource(`n,Generator(`min,`max,`size)):
return MapReduceAlgebra.generator((int)((LongLeaf)n).value(),
((MR_long)evalE(min,env)).get(),
((MR_long)evalE(max,env)).get());
case Dump(`s):
Bag bs = (Bag)evalE(s,env);
final Iterator<MRData> iter = bs.iterator();
return new Bag(new BagIterator() {
public boolean hasNext () {
return iter.hasNext();
}
public MRData next () {
return new Tuple(new MR_int(0),iter.next());
}
});
case let(`v,`u,`body):
return evalM(body,new Environment(v.toString(),evalE(u,env),env));
case trace(`msg,`tp,`x):
String m = ((MR_string)evalE(msg,env)).get();
return (Bag)trace(pre_trace(m),tp,evalM(x,env));
case apply(`f,`arg):
if (!f.is_variable())
return (Bag)evalF(f,env).eval(evalE(arg));
MRData fnc = lookup_global_binding(f.toString());
if (fnc == null)
throw new Error("Unknown function: "+f);
MRData t = evalE(arg,env);
if (!(t instanceof Tuple))
throw new Error("Expected a tuple in function application: "+t);
return (Bag)((Lambda)fnc).lambda().eval(t);
case BSPSource(`n,`s):
final MR_int i = new MR_int((int)((LongLeaf)n).value());
Bag bs = (Bag)evalE(s,env);
final Iterator<MRData> iter = bs.iterator();
return new Bag(new BagIterator() {
public boolean hasNext () {
return iter.hasNext();
}
public MRData next () {
return new Tuple(i,iter.next());
}
});
case BSP(...):
MRData res = evalE(e,env);
if (res instanceof Bag)
return (Bag)res;
else return new Bag(res);
case `v:
if (!v.is_variable())
fail;
MRData x = variable_lookup(v.toString(),env);
if (x != null)
return (Bag)x;
x = lookup_global_binding(v.toString());
if (x != null)
return (Bag)x;
throw new Error("Variable "+v+" is not bound");
};
throw new Error("Cannot evaluate the plan: "+e);
} catch (Error msg) {
if (!Config.trace)
throw new Error(msg.getMessage());
System.err.println(msg.getMessage());
msg.printStackTrace();
throw new Error("Evaluation error in: "+print_query(e));
} catch (Exception ex) {
if (Config.trace)
ex.printStackTrace();
throw new Error("Evaluation error in: "+print_query(e));
}
}
private final static Tree closure ( Tree e, Environment env, Trees local_vars ) {
return e;
}
final static Tree closure ( Tree e, Environment env ) {
return closure(e,env,#[]);
}
static Tree query_type;
static Tree query_plan;
static boolean is_dataset;
/** translate an MRQL expression e into a physical plan */
final static Tree translate_expression ( Tree e ) {
try {
if (Config.trace)
System.out.println("Query at line "+Main.parser.line_pos()+": "+print_query(e));
Tree qt = TypeInference.type_inference(e);
if (!Config.quiet_execution)
System.out.println("Query type: "+print_type(qt));
query_type = qt;
Tree ne = Normalization.remove_groupby(e);
if (Config.trace)
System.out.println("After removing group-by:\n"+ne.pretty(0));
ne = Simplification.rename(ne);
if (Config.trace)
System.out.println("After renaming variables:\n"+ne.pretty(0));
ne = Simplification.rename(Normalization.normalize_all(ne));
if (Config.trace)
System.out.println("Normalized query:\n"+ne.pretty(0));
type_inference(ne);
ne = QueryPlan.best_plan(ne);
if (Config.trace)
System.out.println("Best plan:\n"+ne.pretty(0));
ne = Simplification.rename(Translator.translate_select(ne));
if (Config.trace)
System.out.println("After removing select-queries:\n"+ne.pretty(0));
type_inference(ne);
ne = Simplification.simplify_all(ne);
if (Config.trace)
System.out.println("Algebra expression:\n"+ne.pretty(0));
Tree pt = type_inference(ne);
if (Config.trace)
System.out.println("Algebraic type: "+print_type(pt));
if (Config.stream_window > 0 && Config.incremental)
ne = Streaming.generate_incremental_code(ne);
ne = AlgebraicOptimization.translate_all(ne);
if (Config.trace)
System.out.println("Translated expression:\n"+ne.pretty(0));
Tree et = TypeInference.type_inference(ne);
is_dataset = PlanGeneration.is_dataset_expr(ne);
if (Config.trace)
System.out.println("Physical plan type: "+print_type(et));
repeat_variables = #[];
ne = Simplification.simplify_all(ne);
if (Streaming.is_streaming(ne) && !Config.incremental)
ne = Streaming.streamify(ne);
Tree plan = PlanGeneration.makePlan(ne);
if (Config.bsp_mode) {
BSPTranslator.reset();
if (Config.trace)
System.out.println("Physical plan:\n"+plan.pretty(0));
plan = Materialization.materialize_terms(BSPTranslator.constructBSPplan(plan));
if (Config.trace)
System.out.println("BSP plan:\n"+plan.pretty(0));
else {
String splan = print_plan(plan,0,false);
if (!splan.equals("") && !Config.quiet_execution)
System.out.println("BSP plan:\n"+splan);
}
} else {
if (Config.hadoop_mode)
plan = PlanGeneration.physical_plan(plan);
plan = Materialization.materialize_terms(AlgebraicOptimization.common_factoring(plan));
if (Config.trace)
System.out.println("Physical plan:\n"+plan.pretty(0));
else {
String splan = print_plan(plan,0,false);
if (!splan.equals("") && !Config.quiet_execution)
System.out.println("Physical plan:\n"+splan);
}
};
if (Config.compile_functional_arguments)
plan = Compiler.compile(plan);
return plan;
} catch (Error x) {
if (Config.testing)
throw new Error(x);
if (!Config.trace && x.toString().endsWith("Type Error"))
return null;
if (x.getMessage() != null) // system error
System.err.println("*** MRQL System Error at line "+Main.parser.line_pos()+": "+x);
if (Config.trace)
x.printStackTrace(System.err);
return null;
}
}
}