| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.mrql; |
| |
| import org.apache.mrql.gen.*; |
| import java.io.*; |
| import javax.tools.*; |
| import javax.tools.JavaCompiler.CompilationTask; |
| import java.lang.reflect.*; |
| import java.util.*; |
| import java.net.*; |
| import java.util.jar.*; |
| import org.apache.hadoop.io.WritableComparable; |
| |
| |
| /** compilation of MRQL expressions to Java code and then to Java bytecode */ |
| final public class Compiler extends Translator { |
| static JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); |
| static DiagnosticCollector<JavaFileObject> diagnostics = new DiagnosticCollector<JavaFileObject>(); |
| final static String tmp_dir = "/tmp/mrql_jar_"+System.getProperty("user.name"); |
| public static String jar_path; |
| static int lambda_num = 0; |
| static int user_functions_num = 0; |
| static Trees compiled_lambdas = #[ ]; |
| |
| /** Compile the MRQL functional arguments into Java bytecode */ |
| final private static class JavaSourceFromString extends SimpleJavaFileObject { |
| final String code; |
| |
| JavaSourceFromString ( String name, String code ) { |
| super(URI.create("string:///org/apache/mrql/" + name.replace('.','/') + Kind.SOURCE.extension),Kind.SOURCE); |
| this.code = code; |
| } |
| |
| @Override |
| public CharSequence getCharContent ( boolean ignoreEncodingErrors ) { |
| return code; |
| } |
| } |
| |
| private static String new_lambda_name () { |
| return "MRQL_Lambda_"+(lambda_num++); |
| } |
| |
| private static void add2jar ( File source, int offset, JarOutputStream target ) throws IOException { |
| if (source.isDirectory()) { |
| String name = source.getPath(); |
| if (name.length() > offset) { |
| JarEntry entry = new JarEntry(name.substring(offset)); |
| entry.setTime(source.lastModified()); |
| target.putNextEntry(entry); |
| target.closeEntry(); |
| }; |
| for ( File nestedFile: source.listFiles() ) |
| add2jar(nestedFile,offset,target); |
| } else { |
| JarEntry entry = new JarEntry(source.getPath().substring(offset)); |
| entry.setTime(source.lastModified()); |
| target.putNextEntry(entry); |
| BufferedInputStream in = new BufferedInputStream(new FileInputStream(source)); |
| byte[] buffer = new byte[1024]; |
| int count = 1; |
| while (count > 0) { |
| count = in.read(buffer); |
| if (count > 0) |
| target.write(buffer,0,count); |
| }; |
| target.closeEntry(); |
| in.close(); |
| } |
| } |
| |
| private static void remove ( File file ) throws IOException { |
| if (file.isDirectory()) |
| for ( File nestedFile: file.listFiles() ) |
| remove(nestedFile); |
| file.delete(); |
| } |
| |
| public static void clean () throws IOException { |
| remove(new File(tmp_dir)); |
| } |
| |
| final private static Tree compile ( Tree e, StringBuffer out ) throws Exception { |
| match e { |
| case `f(...al): |
| if (!plans_with_distributed_lambdas.member(#<`f>)) |
| fail; |
| Trees nl = #[]; |
| for ( Tree a: al) |
| match Interpreter.closure(a,Interpreter.global_env) { |
| case compiled(`fname,lambda(`v,`body)): |
| if (compiled_lambdas.member(fname)) |
| fail; |
| compiled_lambdas = compiled_lambdas.append(fname); |
| StringBuffer sb = new StringBuffer(1000); |
| sb.append("final class "+fname+" extends Function {\n"); |
| sb.append(fname+" () {}\n"); |
| sb.append("final public MRData eval ( final MRData "+v |
| +" ) { return "+compileE(body)+"; }\n}\n"); |
| out.append(sb); |
| nl = nl.append(#<compiled(`fname,lambda(`v,`body))>); |
| case lambda(`v,`body): |
| String fname = new_lambda_name(); |
| compiled_lambdas = compiled_lambdas.append(#<`fname>); |
| StringBuffer sb = new StringBuffer(1000); |
| sb.append("final class "+fname+" extends Function {\n"); |
| sb.append(fname+" () {}\n"); |
| sb.append("final public MRData eval ( final MRData "+v |
| +" ) { return "+compileE(body)+"; }\n}\n"); |
| out.append(sb); |
| nl = nl.append(#<compiled(`fname,`a)>); |
| case _: nl = nl.append(compile(a,out)); |
| }; |
| return #<`f(...nl)>; |
| case `f(...al): |
| Trees nl = #[]; |
| for ( Tree a: al) |
| nl = nl.append(compile(a,out)); |
| return #<`f(...nl)>; |
| }; |
| return e; |
| } |
| |
| private static boolean is_persistent_type ( Tree tp ) { |
| match tp { |
| case `T(`t): |
| return is_persistent_collection(T.toString()); |
| case `f(...ts): |
| for ( Tree t: ts ) |
| if (is_persistent_type(t)) |
| return true; |
| case `v: |
| Tree vt = TypeInference.expand(v); |
| if (!v.equals(vt)) |
| return is_persistent_type(vt); |
| }; |
| return false; |
| } |
| |
| private static StringBuffer out; |
| |
| /** compile the functional arguments of the MRQL operators using the Java compiler |
| * @param query the expression to compile |
| * @return the query with all functional arguments compiled to Java bytecode |
| */ |
| final public static Tree compile ( Tree query ) { |
| try { |
| compiled_lambdas = #[ ]; |
| user_functions_num = lambda_num++; |
| // remove the old jar |
| if (jar_path != null) |
| remove(new File(jar_path)); |
| jar_path = tmp_dir+"/mrql_args_"+(new Random().nextInt(1000000))+".jar"; |
| out = new StringBuffer(1000); |
| out.append("package org.apache.mrql;\n"); |
| Tree nq = compile(query,out); |
| StringBuffer sb = new StringBuffer(1000); |
| for ( String f: global_functions ) |
| match global_functions.lookup(f) { |
| case function(tuple(...params),`otp,`body): |
| if (is_persistent_type(otp)) { |
| compile(body,out); // incorporate compiled arguments but ignore body |
| fail |
| }; |
| sb.append("final public static "+get_MR_type(otp)+" "+f); |
| if (params.is_empty()) |
| sb.append(" ()"); |
| else { |
| match params.head() { |
| case bind(`v,`tp): |
| sb.append(" ( final "+get_MR_type(tp)+" "+v); |
| }; |
| for ( Tree var: params.tail() ) |
| match var { |
| case bind(`v,`tp): |
| sb.append(", final "+get_MR_type(tp)+" "+v); |
| } |
| sb.append(" ) { return ("+get_MR_type(otp)+")"); |
| sb.append(compileE(body)); |
| sb.append("; }\n"); |
| } |
| }; |
| out.append("final class UserFunctions_"+user_functions_num+" {\n"); |
| out.append(sb); |
| out.append("}\n"); |
| String code = out.toString(); |
| //System.out.println(code); |
| JavaFileObject file = new JavaSourceFromString("UserFunctions_"+user_functions_num,code); |
| Iterable<? extends JavaFileObject> compilationUnits = Arrays.asList(file); |
| List<String> optionList = new ArrayList<String>(); |
| (new File(tmp_dir)).mkdir(); |
| String dir = tmp_dir+"/classes_"+(new Random().nextInt(1000000)); |
| File fdir = new File(dir); |
| fdir.mkdir(); |
| ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); |
| String classpath = jar_path; |
| String separator = System.getProperty("path.separator"); |
| for ( URL url: ((URLClassLoader) classLoader).getURLs() ) |
| classpath += separator+url.getFile(); |
| // use hadoop core jar |
| classpath += separator + WritableComparable.class.getProtectionDomain().getCodeSource().getLocation().toString(); |
| optionList.addAll(Arrays.asList("-classpath",classpath)); |
| optionList.addAll(Arrays.asList("-d",dir)); |
| CompilationTask task = compiler.getTask(null,null,diagnostics,optionList,null,compilationUnits); |
| boolean success = task.call(); |
| if (!success) |
| for ( Diagnostic d: diagnostics.getDiagnostics() ) |
| System.err.println("*** Compilation error at line "+d.getLineNumber()+" position " |
| +d.getColumnNumber()+": "+d.getMessage(Locale.US)); |
| Manifest manifest = new Manifest(); |
| manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION,"1.0"); |
| JarOutputStream target = new JarOutputStream(new FileOutputStream(jar_path),manifest); |
| add2jar(new File(dir+"/"),dir.length()+1,target); |
| target.close(); |
| remove(fdir); |
| return nq; |
| } catch (Exception e) { |
| System.err.println("*** Warning: Unable to compile the query:\n"+query); |
| if (Config.trace) |
| e.printStackTrace(System.err); |
| return query; |
| } |
| } |
| |
| /** load the Java class of the anonymous function with name lambda_name */ |
| final public static Function compiled ( ClassLoader cl, String lambda_name ) throws Exception { |
| URL[] urls = ((URLClassLoader) cl).getURLs(); |
| URL[] new_urls = new URL[urls.length+1]; |
| for ( int i = 0; i < urls.length; i++ ) |
| new_urls[i+1] = urls[i]; |
| new_urls[0] = new URL("file://"+jar_path); |
| URLClassLoader loader = new URLClassLoader(new_urls,cl); |
| Class c = loader.loadClass("org.apache.mrql."+lambda_name); |
| Constructor cc = c.getDeclaredConstructors()[0]; |
| cc.setAccessible(true); |
| return (Function)cc.newInstance(); |
| } |
| |
| /** The Java type of an MRQL type */ |
| private static String get_MR_type ( Tree type ) { |
| match type { |
| case boolean: return "MR_bool"; |
| case byte: return "MR_byte"; |
| case short: return "MR_short"; |
| case int: return "MR_int"; |
| case long: return "MR_long"; |
| case float: return "MR_float"; |
| case double: return "MR_double"; |
| case char: return "MR_char"; |
| case string: return "MR_string"; |
| case union: return "Union"; |
| case bag(...): return "Bag"; |
| case list(...): return "Bag"; |
| }; |
| return "MRData"; |
| } |
| |
| private static Trees remove_duplicates ( Trees al ) { |
| if (al.is_empty()) |
| return al; |
| Trees el = remove_duplicates(al.tail()); |
| if (el.member(al.head())) |
| return el; |
| else return el.cons(al.head()); |
| } |
| |
| /** lambda lifting: generate Java code from an anonymous function */ |
| private static String compilef ( String v, Tree body ) throws Exception { |
| String fname = new_lambda_name(); |
| Trees free_vars = remove_duplicates(free_variables(body,#[`v,...repeat_variables])); |
| StringBuffer sb = new StringBuffer(1000); |
| sb.append("final class "+fname+" extends Function {\n"); |
| for ( Tree var: free_vars ) |
| sb.append("MRData "+var+"; "); |
| sb.append("\npublic "+fname+" ("); |
| if (free_vars.is_empty()) |
| sb.append(") {}\n"); |
| else { |
| sb.append(" MRData "+free_vars.head()); |
| for ( Tree var: free_vars.tail() ) |
| sb.append(", MRData "+var); |
| sb.append(" ) { "); |
| for ( Tree var: free_vars ) |
| sb.append("this."+var+" = "+var+"; "); |
| sb.append("}\n"); |
| }; |
| sb.append("final public MRData eval ( final MRData "+v |
| +" ) { return "+compileE(body)+"; }\n}\n"); |
| out.append(sb); |
| String s = "new "+fname+"("; |
| if (!free_vars.is_empty()) { |
| s += free_vars.head(); |
| for ( Tree var: free_vars.tail() ) |
| s += ","+var; |
| }; |
| return s+")"; |
| } |
| |
| private static String compileF ( Tree fnc ) throws Exception { |
| match fnc { |
| case lambda(`v,`b): |
| return compilef(v.toString(),b); |
| case compiled(`f,`lm): |
| // recompile the function |
| String s = compileF(lm); |
| ((Node)fnc).children().head = new VariableLeaf(s.substring(4,s.indexOf("("))); //destructive |
| return s; |
| case function(tuple(...params),`tp,`body): |
| if (is_persistent_type(tp)) { |
| compileE(body); |
| return ""; |
| }; |
| String ret = "new Lambda(new Function () { " |
| +"final public MRData eval ( final MRData _x ) { "; |
| for ( int i = 0; i < params.length(); i++ ) |
| match params.nth(i) { |
| case bind(`v,_): |
| ret += "final MRData "+v+" = ((Tuple)_x).get("+i+"); "; |
| }; |
| return ret+" return "+compileE(body)+"; } })"; |
| }; |
| throw new Exception("Ill-formed lambda: "+fnc); |
| } |
| |
| private static String compileEL ( Trees el ) throws Exception { |
| if (el.is_empty()) |
| return ""; |
| String ret = compileE(el.head()); |
| for ( Tree a: el.tail() ) |
| ret += ","+compileE(a); |
| return ret; |
| } |
| |
| private static String compileE ( Tree e ) throws Exception { |
| if (e == null) |
| return "(new MR_byte(0))"; |
| if (e.equals(#<true>)) |
| return "(new MR_bool(true))"; |
| else if (e.equals(#<false>)) |
| return "(new MR_bool(false))"; |
| else if (e.equals(#<null>)) |
| return "(new MR_byte(0))"; |
| else if (e.is_variable()) |
| if (global_type_env.lookup(e.toString()) != null || repeat_variables.member(e)) |
| return "Interpreter.lookup_global_binding(\""+e.toString()+"\")"; |
| else return e.toString(); |
| else if (e.is_long()) |
| return "(new MR_int("+((LongLeaf)e).value()+"))"; |
| else if (e.is_double()) |
| return "(new MR_double("+((DoubleLeaf)e).value()+"))"; |
| else if (e.is_string()) |
| return "(new MR_string("+e.toString()+"))"; |
| match e { |
| case trace(`msg,`tp,`x): |
| return "Interpreter.trace(Interpreter.pre_trace(((MR_string)(" |
| +compileE(msg)+")).get()),"+reify(tp)+","+compileE(x)+")"; |
| case callM(and,_,`x,`y): |
| return "(new MR_bool(((MR_bool)"+compileE(x) |
| +").get() && ((MR_bool)"+compileE(y)+").get()))"; |
| case callM(or,_,`x,`y): |
| return "(new MR_bool(((MR_bool)"+compileE(x) |
| +").get() || ((MR_bool)"+compileE(y)+").get()))"; |
| case callM(not,_,`x): |
| return "(new MR_bool(!((MR_bool)"+compileE(x)+").get()))"; |
| case callM(`f,`n,...args): |
| if (!n.is_long()) |
| fail; |
| String ret = "SystemFunctions."+ClassImporter.method_name((int)((LongLeaf)n).value())+"("; |
| Trees sig = ClassImporter.signature((int)((LongLeaf)n).value()); |
| for (int i = 0; i < args.length(); i++) |
| ret += ((i > 0) ? ",(" : "(")+get_MR_type(sig.nth(i+1))+")("+compileE(args.nth(i))+")"; |
| return ret+")"; |
| case lambda(`v,`body): |
| return "new Lambda("+compilef(v.toString(),body)+")"; |
| case nth(`x,`n): |
| return "(((Tuple)("+compileE(x)+")).get("+((LongLeaf)n).value()+"))"; |
| case setNth(`x,`n,`v,`ret): |
| return "(((Tuple)("+compileE(x)+")).set("+((LongLeaf)n).value()+","+compileE(v)+","+compileE(ret)+"))"; |
| case materialize(`u): |
| return "MapReduceAlgebra.materialize("+compileE(u)+")"; |
| case let(`v,`u,`body): |
| return "(new Function () { public MRData eval ( final MRData "+v |
| +" ) { if ("+v+" instanceof Bag) ((Bag)"+v+").materialize(); return " |
| +compileE(body)+"; }; }).eval("+compileE(u)+")"; |
| case cmap(`m,`s): |
| return "MapReduceAlgebra.cmap("+compileF(m)+",(Bag)("+compileE(s)+"))"; |
| case filter(`p,`m,`s): |
| return "MapReduceAlgebra.filter("+compileF(p)+","+compileF(m) |
| +",(Bag)"+compileE(s)+")"; |
| case map(`m,`s): |
| return "MapReduceAlgebra.map("+compileF(m)+",(Bag)"+compileE(s)+")"; |
| case fold(`acc,`zero,`s): |
| return "MapReduceAlgebra.fold("+compileF(acc)+","+compileE(zero)+",(Bag)"+compileE(s)+")"; |
| case range(`min,`max): |
| return "MapReduceAlgebra.generator(((MR_long)"+compileE(min)+").get()," |
| +"((MR_long)"+compileE(max)+").get())"; |
| case call(`f,...args): |
| return "("+compileF(f)+".eval("+compileE(#<tuple(...args)>)+"))"; |
| case tuple(): |
| return "(new Tuple())"; |
| case tuple(`x): |
| return "(new Tuple("+compileE(x)+"))"; |
| case tuple(`a,...el): |
| String ret = "(new Tuple("+compileE(a); |
| for ( Tree x: el ) |
| ret += ","+compileE(x); |
| return ret+"))"; |
| case Lineage(...el): |
| return compileE(#<tuple(...el)>); |
| case tagged_union(`n,`u): |
| return "(new Union((byte)"+((LongLeaf)n).value()+","+compileE(u)+"))"; |
| case union_value(`x): |
| return "(((Union)"+compileE(x)+").value())"; |
| case union_tag(`x): |
| return "(new MR_int(((Union)"+compileE(x)+").tag()))"; |
| // used for shortcutting sync in bsp supersteps |
| case BAG(): |
| return "SystemFunctions.bsp_empty_bag"; |
| case TRUE(): |
| return "SystemFunctions.bsp_true_value"; |
| case FALSE(): |
| return "SystemFunctions.bsp_false_value"; |
| case `T(): |
| if (is_collection(T)) |
| return "(new Bag())"; |
| else fail |
| case `T(e): |
| if (is_collection(T)) |
| return "(new Bag("+compileE(e)+"))"; |
| else fail |
| case `T(`a,...el): |
| if (!is_collection(T)) |
| fail; |
| String ret = "(new Bag("+compileE(a); |
| for ( Tree x: el ) |
| ret += ",(MRData)"+compileE(x); |
| return ret+"))"; |
| case if(`c,`x,`y): |
| return "((((MR_bool)"+compileE(c)+").get())?"+compileE(x)+":"+compileE(y)+")"; |
| case synchronize(`peer,`b): |
| return "SystemFunctions.synchronize(((MR_string)"+compileE(peer)+"),(MR_bool)"+compileE(b)+")"; |
| case distribute(`peer,`s): |
| return "SystemFunctions.distribute(((MR_string)"+compileE(peer)+"),(Bag)"+compileE(s)+")"; |
| case mapReduce(`mx,`my,`s,_): |
| return "MapReduceAlgebra.mapReduce("+compileF(mx)+","+compileF(my)+",(Bag)("+compileE(s)+"))"; |
| case mapReduce2(`mx,`my,`r,`x,`y,_): |
| return "MapReduceAlgebra.mapReduce2("+compileF(mx)+","+compileF(my)+","+compileF(r) |
| +",(Bag)("+compileE(x)+"),(Bag)("+compileE(y)+"))"; |
| case mapJoin(`kx,`ky,`r,`x,`y): |
| return "MapReduceAlgebra.mapJoin("+compileF(kx)+","+compileF(ky)+","+compileF(r) |
| +",(Bag)("+compileE(x)+"),(Bag)("+compileE(y)+"))"; |
| case join(`kx,`ky,`r,`x,`y): |
| return "MapReduceAlgebra.join("+compileF(kx)+","+compileF(ky)+","+compileF(r) |
| +",(Bag)("+compileE(x)+"),(Bag)("+compileE(y)+"))"; |
| case groupBy(`s): |
| return "MapReduceAlgebra.groupBy((Bag)("+compileE(s)+"))"; |
| case index(`x,`n): |
| return "SystemFunctions.index((Bag)("+compileE(x)+"),"+compileE(n)+")"; |
| case range(`x,`i,`j): |
| return "SystemFunctions.range((Bag)("+compileE(x)+"),"+compileE(i)+","+compileE(j)+")"; |
| case map_index(`x,`key): |
| return "((Bag)("+compileE(x)+")).map_find("+compileE(key)+")"; |
| case aggregate(`acc,`zero,`s): |
| return "MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero)+",(Bag)("+compileE(s)+"))"; |
| case Aggregate(`acc,`zero,`s): |
| return "MapReducePlan.aggregate("+compileF(acc)+","+compileE(zero)+","+compileM(s)+")"; |
| case mergeGroupByJoin(`kx,`ky,`gx,`gy,`acc,`zero,`r,`x,`y,`o): |
| return "MapReduceAlgebra.mergeGroupByJoin("+compileF(kx)+","+compileF(ky) |
| +","+compileF(gx)+","+compileF(gy)+","+compileF(acc)+","+compileE(zero) |
| +","+compileF(r)+",(Bag)"+compileE(x)+",(Bag)"+compileE(y)+")"; |
| case loop(lambda(tuple(...vs),`b),`s,`n): |
| String fs = "new Function () { final public MRData eval ( final MRData _x ) { "; |
| for ( int i = 0; i < vs.length(); i++ ) |
| fs += "final MRData "+vs.nth(i)+" = ((Tuple)_x).get("+i+"); "; |
| return "MapReduceAlgebra.loop("+fs+"return "+compileE(b)+";}},(Tuple)("+compileE(s) |
| +"),"+((LongLeaf)n).value()+")"; |
| case function(tuple(...params),`tp,`body): |
| return compileF(e); |
| case typed(`x,_): |
| return compileE(x); |
| case apply(`f,tuple(...args)): |
| if (!f.is_variable()) |
| fail; |
| match global_functions.lookup(f.toString()) { |
| case function(tuple(...params),`otp,`body): |
| if (is_persistent_type(otp)) |
| fail; |
| String ret = "UserFunctions_"+user_functions_num+"."+f+"("; |
| if (args.is_empty()) |
| return ret+")"; |
| for ( int i = 0; i < params.length(); i++ ) |
| match params.nth(i) { |
| case bind(_,`tp): |
| ret += ((i==0)?"":",")+"("+get_MR_type(tp)+")"+compileE(args.nth(i)); |
| }; |
| return ret+")"; |
| }; |
| case apply(`f,`arg): |
| if (!f.is_variable()) |
| fail; |
| match global_functions.lookup(f.toString()) { |
| case function(tuple(...params),`otp,`body): |
| if (is_persistent_type(otp)) |
| fail; |
| String ac = compileE(arg); |
| String ret = "UserFunctions_"+user_functions_num+"."+f+"("; |
| for ( int i = 0; i < params.length(); i++ ) |
| match params.nth(i) { |
| case bind(_,`tp): |
| ret += ((i==0)?"":",")+"("+get_MR_type(tp)+")((Tuple)"+ac+").get("+i+")"; |
| }; |
| return ret+")"; |
| }; |
| case apply(`f,`arg): |
| if (!f.is_variable()) |
| return "("+compileF(f)+").eval("+compileE(arg)+")"; |
| else return "(((Lambda)"+compileE(f)+").lambda().eval("+compileE(arg)+"))"; |
| case Collect(`s): |
| return "Plan.collect("+compileM(s)+")"; |
| case trace(`x): |
| return compileE(x); |
| case _: |
| return compileM(e); |
| }; |
| throw new Exception("Cannot compile: "+e); |
| } |
| |
| final private static String compileM ( Tree e ) throws Exception { |
| match e { |
| case cMap(`f,`s): |
| return "MapReduceAlgebra.cmap("+compileF(f)+",(Bag)"+compileM(s)+")"; |
| case AggregateMap(`f,`acc,`zero,`s): |
| return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero) |
| +","+compileM(#<cMap(`f,`s)>)+"))"; |
| case MapReduce(`m,`r,`s,_): |
| return "MapReduceAlgebra.mapReduce("+compileF(m)+"," |
| +compileF(r)+",(Bag)"+compileM(s)+")"; |
| case MapAggregateReduce(`m,`r,`acc,`zero,`s,_): |
| return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc) |
| +","+compileE(zero)+","+compileM(#<MapReduce(`m,`r,`s)>)+"))"; |
| case MapCombineReduce(`m,`c,`r,`s,_): |
| return "MapReduceAlgebra.mapReduce("+compileF(m) |
| +","+compileF(r)+",(Bag)"+compileM(s)+")"; |
| case MapReduce2(`mx,`my,`c,`r,`x,`y,_): |
| return "MapReduceAlgebra.mapReduce2("+compileF(mx)+","+compileF(my) |
| +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")"; |
| case MapReduce2(`mx,`my,`r,`x,`y,_): |
| return "MapReduceAlgebra.mapReduce2("+compileF(mx)+","+compileF(my) |
| +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")"; |
| case MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,_): |
| return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero) |
| +","+compileM(#< MapReduce2(`mx,`my,`r,`x,`y)>)+"))"; |
| case MapJoin(`kx,`ky,`r,`x,`y): |
| return "MapReduceAlgebra.mapJoin("+compileF(kx)+","+compileF(ky) |
| +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")"; |
| case MapAggregateJoin(`kx,`ky,`r,`acc,`zero,`x,`y): |
| return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc) |
| +","+compileE(zero)+","+compileM(#<MapJoin(`kx,`ky,`r,`x,`y)>)+"))"; |
| case GroupByJoin(`kx,`ky,`gx,`gy,`acc,`zero,`r,`x,`y,`o): |
| return "MapReduceAlgebra.groupByJoin("+compileF(kx)+","+compileF(ky) |
| +","+compileF(gx)+","+compileF(gy)+","+compileF(acc)+","+compileE(zero) |
| +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")"; |
| case CrossProduct(`mx,`my,`r,`x,`y): |
| return "MapReduceAlgebra.crossProduct("+compileF(mx)+","+compileF(my) |
| +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")"; |
| case CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y): |
| return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero) |
| +","+compileM(#<CrossProduct(`mx,`my,`r,`x,`y)>)+"))"; |
| case BSPSource(`n,BinarySource(`file,_)): |
| if (Config.hadoop_mode) |
| return "Plan.binarySource("+compileE(n)+",((MR_string)" |
| +compileE(file)+").get())"; |
| else return "(Bag)MapReduceAlgebra.read_binary("+compileE(n)+",((MR_string)" |
| +compileE(file)+").get())"; |
| case BinarySource(`file,_): |
| if (Config.hadoop_mode) |
| return "Plan.binarySource(((MR_string)"+compileE(file)+").get())"; |
| else return "(Bag)MapReduceAlgebra.read_binary(((MR_string)"+compileE(file)+").get())"; |
| case BSPSource(`n,ParsedSource(`parser,`file,...args)): |
| if (!(n instanceof LongLeaf)) |
| fail; |
| if (!Config.hadoop_mode) |
| return "MapReduceAlgebra.parsedSource(((MR_int)"+compileE(n)+").get(),\"" |
| +parser+"\",((MR_string)"+compileE(file)+").get()," |
| +reify(args)+")"; |
| Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString()); |
| if (p == null) |
| throw new Error("Unknown parser: "+parser); |
| return "Plan.parsedSource("+p.getName()+".class,((MR_string)"+compileE(file)+").get()," |
| +reify(args)+")"; |
| case ParsedSource(`parser,`file,...args): |
| if (!Config.hadoop_mode) |
| return "MapReduceAlgebra.parsedSource(\""+parser+"\",((MR_string)" |
| +compileE(file)+").get(),"+reify(args)+")"; |
| Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString()); |
| if (p == null) |
| throw new Error("Unknown parser: "+parser); |
| return "Plan.parsedSource("+p.getName()+".class,((MR_string)"+compileE(file)+").get()," |
| +reify(args)+")"; |
| case Merge(`x,`y): |
| return "((Bag)"+compileM(x)+").union((Bag)"+compileM(y)+")"; |
| case Generator(`min,`max,`size): |
| return "MapReduceAlgebra.generator(((MR_long)"+compileE(min)+").get()," |
| +"((MR_long)"+compileE(max)+").get())"; |
| case BSP(`n,`superstep,`state,`o,...as): |
| String ds = ""; |
| for ( Tree a: as ) |
| ds += ",(Bag)("+compileM(a)+")"; |
| return "MapReduceAlgebra.BSP("+((LongLeaf)n).value()+"," |
| +compileF(superstep)+","+compileE(state)+","+o+"," |
| +"new Bag[]{"+ds.substring(1)+"})"; |
| case DataSetCollect(`s): |
| return "Interpreter.dataSetCollect(\""+s+"\")"; |
| case `v: |
| if (v.is_variable()) |
| return v.toString(); |
| }; |
| throw new Exception("Cannot compile: "+e); |
| } |
| } |