* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.mrql;
import org.apache.mrql.gen.*;
import java.lang.reflect.*;
import java.util.*;
import java.util.jar.*;
/** compilation of MRQL expressions to Java code and then to Java bytecode */
final public class Compiler extends Translator {
static JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
static DiagnosticCollector<JavaFileObject> diagnostics = new DiagnosticCollector<JavaFileObject>();
final static String tmp_dir = "/tmp/mrql_jar_"+System.getProperty("");
public static String jar_path;
static int lambda_num = 0;
static int user_functions_num = 0;
static Trees compiled_lambdas = #[ ];
/** Compile the MRQL functional arguments into Java bytecode */
final private static class JavaSourceFromString extends SimpleJavaFileObject {
final String code;
JavaSourceFromString ( String name, String code ) {
super(URI.create("string:///org/apache/mrql/" + name.replace('.','/') + Kind.SOURCE.extension),Kind.SOURCE);
this.code = code;
public CharSequence getCharContent ( boolean ignoreEncodingErrors ) {
return code;
private static String new_lambda_name () {
return "MRQL_Lambda_"+(lambda_num++);
private static void add2jar ( File source, int offset, JarOutputStream target ) throws IOException {
if (source.isDirectory()) {
String name = source.getPath();
if (name.length() > offset) {
JarEntry entry = new JarEntry(name.substring(offset));
for ( File nestedFile: source.listFiles() )
} else {
JarEntry entry = new JarEntry(source.getPath().substring(offset));
BufferedInputStream in = new BufferedInputStream(new FileInputStream(source));
byte[] buffer = new byte[1024];
int count = 1;
while (count > 0) {
count =;
if (count > 0)
private static void remove ( File file ) throws IOException {
if (file.isDirectory())
for ( File nestedFile: file.listFiles() )
public static void clean () throws IOException {
remove(new File(tmp_dir));
final private static Tree compile ( Tree e, StringBuffer out ) throws Exception {
match e {
case `f(
if (!plans_with_distributed_lambdas.member(#<`f>))
Trees nl = #[];
for ( Tree a: al)
match Interpreter.closure(a,Interpreter.global_env) {
case compiled(`fname,lambda(`v,`body)):
if (compiled_lambdas.member(fname))
compiled_lambdas = compiled_lambdas.append(fname);
StringBuffer sb = new StringBuffer(1000);
sb.append("final class "+fname+" extends Function {\n");
sb.append(fname+" () {}\n");
sb.append("final public MRData eval ( final MRData "+v
+" ) { return "+compileE(body)+"; }\n}\n");
nl = nl.append(#<compiled(`fname,lambda(`v,`body))>);
case lambda(`v,`body):
String fname = new_lambda_name();
compiled_lambdas = compiled_lambdas.append(#<`fname>);
StringBuffer sb = new StringBuffer(1000);
sb.append("final class "+fname+" extends Function {\n");
sb.append(fname+" () {}\n");
sb.append("final public MRData eval ( final MRData "+v
+" ) { return "+compileE(body)+"; }\n}\n");
nl = nl.append(#<compiled(`fname,`a)>);
case _: nl = nl.append(compile(a,out));
return #<`f(>;
case `f(
Trees nl = #[];
for ( Tree a: al)
nl = nl.append(compile(a,out));
return #<`f(>;
return e;
private static boolean is_persistent_type ( Tree tp ) {
match tp {
case `T(`t):
return is_persistent_collection(T.toString());
case `f(...ts):
for ( Tree t: ts )
if (is_persistent_type(t))
return true;
case `v:
Tree vt = TypeInference.expand(v);
if (!v.equals(vt))
return is_persistent_type(vt);
return false;
private static StringBuffer out;
/** compile the functional arguments of the MRQL operators using the Java compiler
* @param query the expression to compile
* @return the query with all functional arguments compiled to Java bytecode
final public static Tree compile ( Tree query ) {
try {
compiled_lambdas = #[ ];
user_functions_num = lambda_num++;
// remove the old jar
if (jar_path != null)
remove(new File(jar_path));
jar_path = tmp_dir+"/mrql_args_"+(new Random().nextInt(1000000))+".jar";
out = new StringBuffer(1000);
out.append("package org.apache.mrql;\n");
Tree nq = compile(query,out);
StringBuffer sb = new StringBuffer(1000);
for ( String f: global_functions )
match global_functions.lookup(f) {
case function(tuple(...params),`otp,`body):
if (is_persistent_type(otp)) {
compile(body,out); // incorporate compiled arguments but ignore body
sb.append("final public static "+get_MR_type(otp)+" "+f);
if (params.is_empty())
sb.append(" ()");
else {
match params.head() {
case bind(`v,`tp):
sb.append(" ( final "+get_MR_type(tp)+" "+v);
for ( Tree var: params.tail() )
match var {
case bind(`v,`tp):
sb.append(", final "+get_MR_type(tp)+" "+v);
sb.append(" ) { return ("+get_MR_type(otp)+")");
sb.append("; }\n");
out.append("final class UserFunctions_"+user_functions_num+" {\n");
String code = out.toString();
JavaFileObject file = new JavaSourceFromString("UserFunctions_"+user_functions_num,code);
Iterable<? extends JavaFileObject> compilationUnits = Arrays.asList(file);
List<String> optionList = new ArrayList<String>();
(new File(tmp_dir)).mkdir();
String dir = tmp_dir+"/classes_"+(new Random().nextInt(1000000));
File fdir = new File(dir);
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
String classpath = jar_path;
String separator = System.getProperty("path.separator");
for ( URL url: ((URLClassLoader) classLoader).getURLs() )
classpath += separator+url.getFile();
// use hadoop core jar
classpath += separator + WritableComparable.class.getProtectionDomain().getCodeSource().getLocation().toString();
CompilationTask task = compiler.getTask(null,null,diagnostics,optionList,null,compilationUnits);
boolean success =;
if (!success)
for ( Diagnostic d: diagnostics.getDiagnostics() )
System.err.println("*** Compilation error at line "+d.getLineNumber()+" position "
+d.getColumnNumber()+": "+d.getMessage(Locale.US));
Manifest manifest = new Manifest();
JarOutputStream target = new JarOutputStream(new FileOutputStream(jar_path),manifest);
add2jar(new File(dir+"/"),dir.length()+1,target);
return nq;
} catch (Exception e) {
System.err.println("*** Warning: Unable to compile the query:\n"+query);
if (Config.trace)
return query;
/** load the Java class of the anonymous function with name lambda_name */
final public static Function compiled ( ClassLoader cl, String lambda_name ) throws Exception {
URL[] urls = ((URLClassLoader) cl).getURLs();
URL[] new_urls = new URL[urls.length+1];
for ( int i = 0; i < urls.length; i++ )
new_urls[i+1] = urls[i];
new_urls[0] = new URL("file://"+jar_path);
URLClassLoader loader = new URLClassLoader(new_urls,cl);
Class c = loader.loadClass("org.apache.mrql."+lambda_name);
Constructor cc = c.getDeclaredConstructors()[0];
return (Function)cc.newInstance();
/** The Java type of an MRQL type */
private static String get_MR_type ( Tree type ) {
match type {
case boolean: return "MR_bool";
case byte: return "MR_byte";
case short: return "MR_short";
case int: return "MR_int";
case long: return "MR_long";
case float: return "MR_float";
case double: return "MR_double";
case char: return "MR_char";
case string: return "MR_string";
case union: return "Union";
case bag(...): return "Bag";
case list(...): return "Bag";
return "MRData";
private static Trees remove_duplicates ( Trees al ) {
if (al.is_empty())
return al;
Trees el = remove_duplicates(al.tail());
if (el.member(al.head()))
return el;
else return el.cons(al.head());
/** lambda lifting: generate Java code from an anonymous function */
private static String compilef ( String v, Tree body ) throws Exception {
String fname = new_lambda_name();
Trees free_vars = remove_duplicates(free_variables(body,#[`v,...repeat_variables]));
StringBuffer sb = new StringBuffer(1000);
sb.append("final class "+fname+" extends Function {\n");
for ( Tree var: free_vars )
sb.append("MRData "+var+"; ");
sb.append("\npublic "+fname+" (");
if (free_vars.is_empty())
sb.append(") {}\n");
else {
sb.append(" MRData "+free_vars.head());
for ( Tree var: free_vars.tail() )
sb.append(", MRData "+var);
sb.append(" ) { ");
for ( Tree var: free_vars )
sb.append("this."+var+" = "+var+"; ");
sb.append("final public MRData eval ( final MRData "+v
+" ) { return "+compileE(body)+"; }\n}\n");
String s = "new "+fname+"(";
if (!free_vars.is_empty()) {
s += free_vars.head();
for ( Tree var: free_vars.tail() )
s += ","+var;
return s+")";
private static String compileF ( Tree fnc ) throws Exception {
match fnc {
case lambda(`v,`b):
return compilef(v.toString(),b);
case compiled(`f,`lm):
// recompile the function
String s = compileF(lm);
((Node)fnc).children().head = new VariableLeaf(s.substring(4,s.indexOf("("))); //destructive
return s;
case function(tuple(...params),`tp,`body):
if (is_persistent_type(tp)) {
return "";
String ret = "new Lambda(new Function () { "
+"final public MRData eval ( final MRData _x ) { ";
for ( int i = 0; i < params.length(); i++ )
match params.nth(i) {
case bind(`v,_):
ret += "final MRData "+v+" = ((Tuple)_x).get("+i+"); ";
return ret+" return "+compileE(body)+"; } })";
throw new Exception("Ill-formed lambda: "+fnc);
private static String compileEL ( Trees el ) throws Exception {
if (el.is_empty())
return "";
String ret = compileE(el.head());
for ( Tree a: el.tail() )
ret += ","+compileE(a);
return ret;
private static String compileE ( Tree e ) throws Exception {
if (e == null)
return "(new MR_byte(0))";
if (e.equals(#<true>))
return "(new MR_bool(true))";
else if (e.equals(#<false>))
return "(new MR_bool(false))";
else if (e.equals(#<null>))
return "(new MR_byte(0))";
else if (e.is_variable())
if (global_type_env.lookup(e.toString()) != null || repeat_variables.member(e))
return "Interpreter.lookup_global_binding(\""+e.toString()+"\")";
else return e.toString();
else if (e.is_long())
return "(new MR_int("+((LongLeaf)e).value()+"))";
else if (e.is_double())
return "(new MR_double("+((DoubleLeaf)e).value()+"))";
else if (e.is_string())
return "(new MR_string("+e.toString()+"))";
match e {
case trace(`msg,`tp,`x):
return "Interpreter.trace(Interpreter.pre_trace(((MR_string)("
case callM(and,_,`x,`y):
return "(new MR_bool(((MR_bool)"+compileE(x)
+").get() && ((MR_bool)"+compileE(y)+").get()))";
case callM(or,_,`x,`y):
return "(new MR_bool(((MR_bool)"+compileE(x)
+").get() || ((MR_bool)"+compileE(y)+").get()))";
case callM(not,_,`x):
return "(new MR_bool(!((MR_bool)"+compileE(x)+").get()))";
case callM(`f,`n,...args):
if (!n.is_long())
String ret = "SystemFunctions."+ClassImporter.method_name((int)((LongLeaf)n).value())+"(";
Trees sig = ClassImporter.signature((int)((LongLeaf)n).value());
for (int i = 0; i < args.length(); i++)
ret += ((i > 0) ? ",(" : "(")+get_MR_type(sig.nth(i+1))+")("+compileE(args.nth(i))+")";
return ret+")";
case lambda(`v,`body):
return "new Lambda("+compilef(v.toString(),body)+")";
case nth(`x,`n):
return "(((Tuple)("+compileE(x)+")).get("+((LongLeaf)n).value()+"))";
case setNth(`x,`n,`v,`ret):
return "(((Tuple)("+compileE(x)+")).set("+((LongLeaf)n).value()+","+compileE(v)+","+compileE(ret)+"))";
case materialize(`u):
return "MapReduceAlgebra.materialize("+compileE(u)+")";
case let(`v,`u,`body):
return "(new Function () { public MRData eval ( final MRData "+v
+" ) { if ("+v+" instanceof Bag) ((Bag)"+v+").materialize(); return "
+compileE(body)+"; }; }).eval("+compileE(u)+")";
case cmap(`m,`s):
return "MapReduceAlgebra.cmap("+compileF(m)+",(Bag)("+compileE(s)+"))";
case filter(`p,`m,`s):
return "MapReduceAlgebra.filter("+compileF(p)+","+compileF(m)
case map(`m,`s):
return ""+compileF(m)+",(Bag)"+compileE(s)+")";
case fold(`acc,`zero,`s):
return "MapReduceAlgebra.fold("+compileF(acc)+","+compileE(zero)+",(Bag)"+compileE(s)+")";
case range(`min,`max):
return "MapReduceAlgebra.generator(((MR_long)"+compileE(min)+").get(),"
case call(`f,...args):
return "("+compileF(f)+".eval("+compileE(#<tuple(...args)>)+"))";
case tuple():
return "(new Tuple())";
case tuple(`x):
return "(new Tuple("+compileE(x)+"))";
case tuple(`a,...el):
String ret = "(new Tuple("+compileE(a);
for ( Tree x: el )
ret += ","+compileE(x);
return ret+"))";
case tagged_union(`n,`u):
return "(new Union((byte)"+((LongLeaf)n).value()+","+compileE(u)+"))";
case union_value(`x):
return "(((Union)"+compileE(x)+").value())";
case union_tag(`x):
return "(new MR_int(((Union)"+compileE(x)+").tag()))";
// used for shortcutting sync in bsp supersteps
case BAG():
return "SystemFunctions.bsp_empty_bag";
case TRUE():
return "SystemFunctions.bsp_true_value";
case FALSE():
return "SystemFunctions.bsp_false_value";
case `T():
if (is_collection(T))
return "(new Bag())";
else fail
case `T(e):
if (is_collection(T))
return "(new Bag("+compileE(e)+"))";
else fail
case `T(`a,...el):
if (!is_collection(T))
String ret = "(new Bag("+compileE(a);
for ( Tree x: el )
ret += ",(MRData)"+compileE(x);
return ret+"))";
case if(`c,`x,`y):
return "((((MR_bool)"+compileE(c)+").get())?"+compileE(x)+":"+compileE(y)+")";
case synchronize(`peer,`b):
return "SystemFunctions.synchronize(((MR_string)"+compileE(peer)+"),(MR_bool)"+compileE(b)+")";
case distribute(`peer,`s):
return "SystemFunctions.distribute(((MR_string)"+compileE(peer)+"),(Bag)"+compileE(s)+")";
case mapReduce(`mx,`my,`s,_):
return "MapReduceAlgebra.mapReduce("+compileF(mx)+","+compileF(my)+",(Bag)("+compileE(s)+"))";
case mapReduce2(`mx,`my,`r,`x,`y,_):
return "MapReduceAlgebra.mapReduce2("+compileF(mx)+","+compileF(my)+","+compileF(r)
case mapJoin(`kx,`ky,`r,`x,`y):
return "MapReduceAlgebra.mapJoin("+compileF(kx)+","+compileF(ky)+","+compileF(r)
case join(`kx,`ky,`r,`x,`y):
return "MapReduceAlgebra.join("+compileF(kx)+","+compileF(ky)+","+compileF(r)
case groupBy(`s):
return "MapReduceAlgebra.groupBy((Bag)("+compileE(s)+"))";
case index(`x,`n):
return "SystemFunctions.index((Bag)("+compileE(x)+"),"+compileE(n)+")";
case range(`x,`i,`j):
return "SystemFunctions.range((Bag)("+compileE(x)+"),"+compileE(i)+","+compileE(j)+")";
case map_index(`x,`key):
return "((Bag)("+compileE(x)+")).map_find("+compileE(key)+")";
case aggregate(`acc,`zero,`s):
return "MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero)+",(Bag)("+compileE(s)+"))";
case Aggregate(`acc,`zero,`s):
return "MapReducePlan.aggregate("+compileF(acc)+","+compileE(zero)+","+compileM(s)+")";
case mergeGroupByJoin(`kx,`ky,`gx,`gy,`acc,`zero,`r,`x,`y,`o):
return "MapReduceAlgebra.mergeGroupByJoin("+compileF(kx)+","+compileF(ky)
case loop(lambda(tuple(...vs),`b),`s,`n):
String fs = "new Function () { final public MRData eval ( final MRData _x ) { ";
for ( int i = 0; i < vs.length(); i++ )
fs += "final MRData "+vs.nth(i)+" = ((Tuple)_x).get("+i+"); ";
return "MapReduceAlgebra.loop("+fs+"return "+compileE(b)+";}},(Tuple)("+compileE(s)
case function(tuple(...params),`tp,`body):
return compileF(e);
case typed(`x,_):
return compileE(x);
case apply(`f,tuple(...args)):
if (!f.is_variable())
match global_functions.lookup(f.toString()) {
case function(tuple(...params),`otp,`body):
if (is_persistent_type(otp))
String ret = "UserFunctions_"+user_functions_num+"."+f+"(";
if (args.is_empty())
return ret+")";
for ( int i = 0; i < params.length(); i++ )
match params.nth(i) {
case bind(_,`tp):
ret += ((i==0)?"":",")+"("+get_MR_type(tp)+")"+compileE(args.nth(i));
return ret+")";
case apply(`f,`arg):
if (!f.is_variable())
match global_functions.lookup(f.toString()) {
case function(tuple(...params),`otp,`body):
if (is_persistent_type(otp))
String ac = compileE(arg);
String ret = "UserFunctions_"+user_functions_num+"."+f+"(";
for ( int i = 0; i < params.length(); i++ )
match params.nth(i) {
case bind(_,`tp):
ret += ((i==0)?"":",")+"("+get_MR_type(tp)+")((Tuple)"+ac+").get("+i+")";
return ret+")";
case apply(`f,`arg):
if (!f.is_variable())
return "("+compileF(f)+").eval("+compileE(arg)+")";
else return "(((Lambda)"+compileE(f)+").lambda().eval("+compileE(arg)+"))";
case Collect(`s):
return "Plan.collect("+compileM(s)+")";
case trace(`x):
return compileE(x);
case _:
return compileM(e);
throw new Exception("Cannot compile: "+e);
final private static String compileM ( Tree e ) throws Exception {
match e {
case cMap(`f,`s):
return "MapReduceAlgebra.cmap("+compileF(f)+",(Bag)"+compileM(s)+")";
case AggregateMap(`f,`acc,`zero,`s):
return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero)
case MapReduce(`m,`r,`s,_):
return "MapReduceAlgebra.mapReduce("+compileF(m)+","
case MapAggregateReduce(`m,`r,`acc,`zero,`s,_):
return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)
case MapCombineReduce(`m,`c,`r,`s,_):
return "MapReduceAlgebra.mapReduce("+compileF(m)
case MapReduce2(`mx,`my,`c,`r,`x,`y,_):
return "MapReduceAlgebra.mapReduce2("+compileF(mx)+","+compileF(my)
case MapReduce2(`mx,`my,`r,`x,`y,_):
return "MapReduceAlgebra.mapReduce2("+compileF(mx)+","+compileF(my)
case MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,_):
return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero)
+","+compileM(#< MapReduce2(`mx,`my,`r,`x,`y)>)+"))";
case MapJoin(`kx,`ky,`r,`x,`y):
return "MapReduceAlgebra.mapJoin("+compileF(kx)+","+compileF(ky)
case MapAggregateJoin(`kx,`ky,`r,`acc,`zero,`x,`y):
return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)
case GroupByJoin(`kx,`ky,`gx,`gy,`acc,`zero,`r,`x,`y,`o):
return "MapReduceAlgebra.groupByJoin("+compileF(kx)+","+compileF(ky)
case CrossProduct(`mx,`my,`r,`x,`y):
return "MapReduceAlgebra.crossProduct("+compileF(mx)+","+compileF(my)
case CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y):
return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero)
case BSPSource(`n,BinarySource(`file,_)):
if (Config.hadoop_mode)
return "Plan.binarySource("+compileE(n)+",((MR_string)"
else return "(Bag)MapReduceAlgebra.read_binary("+compileE(n)+",((MR_string)"
case BinarySource(`file,_):
if (Config.hadoop_mode)
return "Plan.binarySource(((MR_string)"+compileE(file)+").get())";
else return "(Bag)MapReduceAlgebra.read_binary(((MR_string)"+compileE(file)+").get())";
case BSPSource(`n,ParsedSource(`parser,`file,...args)):
if (!(n instanceof LongLeaf))
if (!Config.hadoop_mode)
return "MapReduceAlgebra.parsedSource(((MR_int)"+compileE(n)+").get(),\""
Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
if (p == null)
throw new Error("Unknown parser: "+parser);
return "Plan.parsedSource("+p.getName()+".class,((MR_string)"+compileE(file)+").get(),"
case ParsedSource(`parser,`file,...args):
if (!Config.hadoop_mode)
return "MapReduceAlgebra.parsedSource(\""+parser+"\",((MR_string)"
Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
if (p == null)
throw new Error("Unknown parser: "+parser);
return "Plan.parsedSource("+p.getName()+".class,((MR_string)"+compileE(file)+").get(),"
case Merge(`x,`y):
return "((Bag)"+compileM(x)+").union((Bag)"+compileM(y)+")";
case Generator(`min,`max,`size):
return "MapReduceAlgebra.generator(((MR_long)"+compileE(min)+").get(),"
case BSP(`n,`superstep,`state,`o,
String ds = "";
for ( Tree a: as )
ds += ",(Bag)("+compileM(a)+")";
return "MapReduceAlgebra.BSP("+((LongLeaf)n).value()+","
+"new Bag[]{"+ds.substring(1)+"})";
case DataSetCollect(`s):
return "Interpreter.dataSetCollect(\""+s+"\")";
case `v:
if (v.is_variable())
return v.toString();
throw new Exception("Cannot compile: "+e);