| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.mrql; |
| |
| import java.util.List; |
| import org.apache.mrql.gen.*; |
| |
| |
| /** Provides the API for compilation/code-generation */ |
| final public class TopLevel extends Interpreter { |
| static Tree xml_type; |
| |
| public TopLevel () { |
| // XML and JSON are user-defined types: |
| datadef("XML",#<union(Node(tuple(string,bag(tuple(string,string)),list(XML))), |
| CData(string))>); |
| datadef("JSON",#<union(JObject(bag(tuple(string,JSON))), |
| JArray(list(JSON)), |
| Jstring(string), |
| Jlong(long), |
| Jdouble(double), |
| Jbool(bool), |
| Jnull(tuple()))>); |
| constant(#<PI>,#<double>,new MR_double(Math.PI)); |
| xml_type = global_datatype_env.lookup("XML"); |
| DataSource.loadParsers(); |
| } |
| |
| private static boolean memory_parsed_source ( Tree e ) { |
| match e { |
| case ParsedSource(...): return true; |
| case Merge(`x,`y): return memory_parsed_source(x) && memory_parsed_source(y); |
| }; |
| return false; |
| } |
| |
| /** translate and evaluate an MRQL expression into MRData |
| * @param e MRQL query to be evaluated |
| * @param print do we want to print the result? |
| * @return the result of evaluation (MRData) |
| */ |
| public static MRData expression ( Tree e, boolean print ) { |
| try { |
| Tree plan = translate_expression(e); |
| Tree type = query_type; |
| query_plan = plan; |
| tab_count = -3; |
| trace_count = 0; |
| if (plan == null) |
| return null; |
| if (Config.hadoop_mode) |
| Evaluator.evaluator.initialize_query(); |
| MRData res = evalE(plan,null); |
| if (print) { |
| if (!Config.quiet_execution) |
| System.out.println("Result:"); |
| if (!Config.hadoop_mode && Config.bsp_mode && memory_parsed_source(plan)) |
| System.out.println(print(((Tuple)((Bag)res).get(0)).second(),type)); |
| else System.out.println(print(res,type)); |
| } return res; |
| } catch (Exception x) { |
| if (x.getMessage() != null) |
| System.err.println("*** MRQL System Error at line "+Main.parser.line_pos()+": "+x); |
| if (Config.trace) |
| x.printStackTrace(System.err); |
| if (Config.testing) |
| throw new Error(x); |
| return null; |
| } catch (Error x) { |
| if (x.getMessage() != null) |
| System.err.println("*** MRQL System Error at line "+Main.parser.line_pos()+": "+x); |
| if (Config.trace) |
| x.printStackTrace(System.err); |
| if (Config.testing) |
| throw new Error(x); |
| return null; |
| } |
| } |
| |
| /** translate, evaluate, and print the results of an MRQL expression e |
| * @param e MRQL query to be evaluated |
| * @return the result of evaluation (MRData) |
| */ |
| public final static MRData expression ( Tree e ) { |
| reset(); |
| return expression(e,true); |
| } |
| |
| /** handle the assignment v=e */ |
| public final static void assign ( String v, Tree e ) { |
| if (variable_lookup(v,global_env) != null) { |
| global_type_env.remove(v); |
| remove_global_binding(v); |
| }; |
| global_vars.insert(v,e); |
| } |
| |
| private final static boolean is_function ( Tree e ) { |
| match e { |
| case function(...): return true; |
| }; |
| return false; |
| } |
| |
| /** handle the assignment v:=e */ |
| public final static Tree distributed_assign ( String v, Tree e ) { |
| reset(); |
| if (global_vars.lookup(v) != null) |
| global_vars.remove(v); |
| MRData res = expression(e,false); |
| global_type_env.insert(v,query_type); |
| if (res instanceof Bag) |
| ((Bag)res).materialize(); |
| new_distributed_binding(v,res); |
| return query_plan; |
| } |
| |
| /** bind v to the result of e */ |
| public final static Tree store ( String v, Tree e ) { |
| reset(); |
| if (global_vars.lookup(v) != null) |
| global_vars.remove(v); |
| MRData res = expression(e,false); |
| global_type_env.insert(v,query_type); |
| if (res instanceof Bag) |
| ((Bag)res).materialize(); |
| new_global_binding(v,res); |
| return query_plan; |
| } |
| |
| /** define an MRQL constant, such as PI */ |
| private final static void constant ( Tree v, Tree type, MRData value ) { |
| String var = v.toString(); |
| if (global_vars.lookup(var) != null) |
| global_vars.remove(var); |
| global_type_env.insert(var,type); |
| new_global_binding(var,value); |
| } |
| |
| /** define a new function |
| * @param fnc function name |
| * @param params parameter list |
| * @param out_type output type |
| * @param body function body |
| */ |
| public final static void functiondef ( String fnc, Trees params, Tree out_type, Tree body ) { |
| reset(); |
| Trees as = #[]; |
| Trees ps = #[]; |
| for ( Tree param: params ) |
| match param { |
| case bind(`v,`tp): |
| Tree ntp = normalize_type(tp); |
| as = as.append(ntp); |
| ps = ps.append(#<bind(`v,`ntp)>); |
| case _: type_error(param,"Ill-formed function parameter: "+param); |
| }; |
| out_type = normalize_type(out_type); |
| // needed for recursive functions |
| global_type_env.insert(fnc,#<arrow(tuple(...as),`out_type)>); |
| Tree fname = #<`fnc>; |
| if (!is_pure(body)) |
| impure_functions = impure_functions.append(fname); |
| Tree plan = store(fnc,#<function(tuple(...ps),`out_type,`body)>); |
| if (plan != null) |
| Translator.global_functions.insert(fnc,plan); |
| if (Config.hadoop_mode && plan != null) |
| Plan.conf.set("mrql.global."+fnc, |
| closure(plan,global_env).toString()); |
| } |
| |
| /** dump the result of evaluating the MRQL query e to a binary file */ |
| private final static void dump ( String file, Tree e ) { |
| MRData res = expression(e,false); |
| try { |
| query_type = make_persistent_type(query_type); |
| if (res != null) |
| if (Config.hadoop_mode) |
| Evaluator.evaluator.dump(file,query_type,res); |
| else MapReduceAlgebra.dump(file,query_type,res); |
| } catch (Exception x) { |
| throw new Error(x); |
| } |
| } |
| |
| /** dump the result of evaluating the MRQL query e to a text CVS file */ |
| private final static void dump_text ( String file, Tree e ) { |
| MRData res = expression(e,false); |
| if (res != null) |
| try { |
| Evaluator.evaluator.dump_text(file,query_type,res); |
| } catch (Exception x) { |
| throw new Error(x); |
| } |
| } |
| |
| /** true, if e is a strem-processing expression */ |
| public final static boolean stream_expression ( Tree e ) { |
| match e { |
| case call(stream,...): return true; |
| case `f(...as): |
| for ( Tree a: as ) |
| if (stream_expression(a)) |
| return true; |
| }; |
| return false; |
| } |
| |
| /** define a new named type (typedef) */ |
| private final static void typedef ( String name, Tree type ) { |
| type_names.insert(name,normalize_type(type)); |
| } |
| |
| /** define a new data type, such as XML and JSON */ |
| private final static void datadef ( String name, Tree type ) { |
| int i = 0; |
| Trees as = #[]; |
| match type { |
| case union(...nl): |
| // needed for recursive datatypes |
| global_datatype_env.insert(name,#<union(...nl)>); |
| for ( Tree n: nl ) |
| match n { |
| case `c(`t): |
| if (data_constructors.lookup(c.toString()) == null) |
| data_constructors.insert(c.toString(),#<`name(`i,`t)>); |
| else type_error(type,"Data constructor "+c+" has already been defined"); |
| as = as.append(#<`c(`(normalize_type(t)))>); |
| i++; |
| } |
| }; |
| global_datatype_env.remove(name); |
| global_datatype_env.insert(name,#<union(...as)>); |
| } |
| |
| /** define a user aggregation */ |
| private static void aggregation ( String name, Tree type, Tree plus, Tree zero, Tree unit ) { |
| reset(); |
| zero = Simplification.rename(zero); |
| plus = Simplification.rename(plus); |
| unit = Simplification.rename(unit); |
| type = normalize_type(type); |
| Tree ztp = TypeInference.type_inference2(zero); |
| Tree v1 = new_var(); |
| type_env.insert(v1.toString(),ztp); |
| TypeInference.type_inference2(Normalization.normalize_all(#<apply(`plus,tuple(`v1,`v1))>)); |
| Tree v2 = new_var(); |
| type_env.insert(v2.toString(),type); |
| Tree utp = TypeInference.type_inference2(Normalization.normalize_all(#<apply(`unit,`v2)>)); |
| if (unify(utp,ztp) == null) |
| type_error(unit,"Wrong type in unit result (expected "+ztp+" found "+utp); |
| monoids = monoids.append(#<`name(`type,`plus,`zero,`unit)>); |
| } |
| |
| /** bind the pattern variables to values */ |
| private static Environment bind_list ( Tree pattern, MRData value, Environment env ) { |
| match pattern { |
| case tuple(...ps): |
| int i = 0; |
| Tuple t = (Tuple)value; |
| for ( Tree p: ps ) |
| env = bind_list(p,t.get(i++),env); |
| return env; |
| }; |
| return new Environment(pattern.toString(),value,env); |
| } |
| |
| /** bind the pattern variables to values */ |
| private static Environment bind_list ( Tree pattern, Tree src, Environment env ) { |
| match pattern { |
| case tuple(...ps): |
| int i = 0; |
| match src { |
| case tuple(...es): |
| for ( Tree p: ps ) |
| env = bind_list(p,es.nth(i++),env); |
| return env; |
| } |
| }; |
| MRData value = Evaluator.evaluator.evalE(src,env); |
| env.replace(pattern.toString(),value); |
| return env; |
| } |
| |
| /** the MRQL top-level interfacse to evaluate a single MRQL expression or command */ |
| public final static void evaluate_top_level ( Tree expr ) { |
| if (expr == null) |
| return; |
| match expr { |
| case expression(`e): |
| long t = System.currentTimeMillis(); |
| if (expression(e) != null && !Config.quiet_execution) |
| System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs"); |
| case assign(`v,`e): assign(v.toString(),e); |
| case store(`v,`e): |
| long t = System.currentTimeMillis(); |
| if (distributed_assign(v.toString(),e) != null && !Config.quiet_execution) |
| System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs"); |
| case dump(`s,`e): // dump stream output |
| if (!stream_expression(e)) |
| fail; |
| reset(); |
| Tree plan = translate_expression(e); |
| if (plan == null) |
| return; |
| Evaluator.evaluator.initialize_query(); |
| final Tree qt = query_type; |
| final String file = s.stringValue(); |
| Evaluator.evaluator.streaming(plan,null,null,new Function(){ |
| long i = 0; |
| public MRData eval ( final MRData data ) { |
| try { |
| Evaluator.evaluator.dump(file+"/f"+(i++),qt,data); |
| return data; |
| } catch (Exception ex) { |
| throw new Error("Cannot dump the streaming result to the directory "+file); |
| } |
| } |
| }); |
| case dump_text(`s,`e): // dump stream output |
| if (!stream_expression(e)) |
| fail; |
| reset(); |
| Tree plan = translate_expression(e); |
| if (plan == null) |
| return; |
| Evaluator.evaluator.initialize_query(); |
| final Tree qt = query_type; |
| final String file = s.stringValue(); |
| Evaluator.evaluator.streaming(plan,null,null,new Function(){ |
| long i = 0; |
| public MRData eval ( final MRData data ) { |
| try { |
| Evaluator.evaluator.dump_text(file+"/f"+(i++),qt,data); |
| return data; |
| } catch (Exception ex) { |
| throw new Error("Cannot dump the streaming result to the directory "+file); |
| } |
| } |
| }); |
| case incr(`e): // incremental stream processing |
| Config.incremental = true; |
| reset(); |
| Tree plan = translate_expression(e); |
| Config.incremental = false; |
| if (plan == null) |
| return; |
| Evaluator.evaluator.initialize_query(); |
| match plan { |
| case incr(`zero,lambda(`state,`merge),lambda(_,`answer)): |
| final Tree qt = make_persistent_type(query_type); |
| MRData z = evalE(zero,null); |
| final Environment env =bind_list(state,z,null); |
| final Tree ans = answer; |
| merge = Streaming.streamify(merge); |
| Evaluator.evaluator.streaming(#<lambda(`state,`merge)>,env,null, |
| new Function() { |
| public MRData eval ( final MRData state ) { |
| MRData a = Evaluator.evaluator.evalE(ans,env); |
| System.out.println(print(a,qt)); |
| return state; |
| } |
| }); |
| }; |
| case lineage(`e): |
| Config.lineage = true; |
| boolean quiet = Config.quiet_execution; |
| Config.quiet_execution = true; |
| long t = System.currentTimeMillis(); |
| if (expression(e,false) != null && !Config.quiet_execution) |
| System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs"); |
| Config.lineage = false; |
| Config.quiet_execution = quiet; |
| case debug(`path,`e): |
| Config.debug = true; |
| boolean quiet = Config.quiet_execution; |
| Config.quiet_execution = true; |
| long t = System.currentTimeMillis(); |
| dump(path.stringValue(),e); |
| if (!Config.quiet_execution) |
| System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs"); |
| Config.debug = false; |
| Config.quiet_execution = quiet; |
| case dump(`s,`e): |
| long t = System.currentTimeMillis(); |
| dump(s.stringValue(),e); |
| if (!Config.quiet_execution) |
| System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs"); |
| case dump_text(`s,`e): |
| long t = System.currentTimeMillis(); |
| dump_text(s.stringValue(),e); |
| if (!Config.quiet_execution) |
| System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs"); |
| case typedef(`v,`t): typedef(v.toString(),t); |
| case datadef(`v,`t): datadef(v.toString(),t); |
| case functiondef(`f,params(...p),`tp,`e): |
| functiondef(f.toString(),p,tp,e); |
| case macrodef(`name,params(...p),`e): |
| Translator.global_macros.insert(name.toString(),#<macro(params(...p),`e)>); |
| case aggregation(`aggr,`type,`plus,`zero,`unit): |
| aggregation(aggr.toString(),type,plus,zero,unit); |
| case import(`c): |
| ClassImporter.importClass(c.variableValue()); |
| case import(`c,...l): |
| for (Tree m: l) |
| ClassImporter.importMethod(c.variableValue(),m.variableValue()); |
| case include(`file): |
| Main.include_file(file.toString()); |
| case parser(`n,`p): |
| try { |
| Class<? extends Parser> c = Class.forName(p.toString()).asSubclass(Parser.class); |
| DataSource.parserDirectory.put(n.toString(),c); |
| } catch (ClassNotFoundException e) { |
| throw new Error("Class "+p.toString()+" not found"); |
| } |
| case impure(`fn): // not used |
| impure_functions = impure_functions.append(fn); |
| case _: throw new Error("Unknown statement: "+expr); |
| } |
| } |
| } |