blob: 769099ad585f3efe065d45a7ce6a2651e55718a4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import java.util.List;
import org.apache.mrql.gen.*;
/** Provides the API for compilation/code-generation */
final public class TopLevel extends Interpreter {
static Tree xml_type;
public TopLevel () {
// XML and JSON are user-defined types:
datadef("XML",#<union(Node(tuple(string,bag(tuple(string,string)),list(XML))),
CData(string))>);
datadef("JSON",#<union(JObject(bag(tuple(string,JSON))),
JArray(list(JSON)),
Jstring(string),
Jlong(long),
Jdouble(double),
Jbool(bool),
Jnull(tuple()))>);
constant(#<PI>,#<double>,new MR_double(Math.PI));
xml_type = global_datatype_env.lookup("XML");
DataSource.loadParsers();
}
private static boolean memory_parsed_source ( Tree e ) {
match e {
case ParsedSource(...): return true;
case Merge(`x,`y): return memory_parsed_source(x) && memory_parsed_source(y);
};
return false;
}
/** translate and evaluate an MRQL expression into MRData
* @param e MRQL query to be evaluated
* @param print do we want to print the result?
* @return the result of evaluation (MRData)
*/
public static MRData expression ( Tree e, boolean print ) {
try {
Tree plan = translate_expression(e);
Tree type = query_type;
query_plan = plan;
tab_count = -3;
trace_count = 0;
if (plan == null)
return null;
if (Config.hadoop_mode)
Evaluator.evaluator.initialize_query();
MRData res = evalE(plan,null);
if (print) {
if (!Config.quiet_execution)
System.out.println("Result:");
if (!Config.hadoop_mode && Config.bsp_mode && memory_parsed_source(plan))
System.out.println(print(((Tuple)((Bag)res).get(0)).second(),type));
else System.out.println(print(res,type));
} return res;
} catch (Exception x) {
if (x.getMessage() != null)
System.err.println("*** MRQL System Error at line "+Main.parser.line_pos()+": "+x);
if (Config.trace)
x.printStackTrace(System.err);
if (Config.testing)
throw new Error(x);
return null;
} catch (Error x) {
if (x.getMessage() != null)
System.err.println("*** MRQL System Error at line "+Main.parser.line_pos()+": "+x);
if (Config.trace)
x.printStackTrace(System.err);
if (Config.testing)
throw new Error(x);
return null;
}
}
/** translate, evaluate, and print the results of an MRQL expression e
* @param e MRQL query to be evaluated
* @return the result of evaluation (MRData)
*/
public final static MRData expression ( Tree e ) {
reset();
return expression(e,true);
}
/** handle the assignment v=e */
public final static void assign ( String v, Tree e ) {
if (variable_lookup(v,global_env) != null) {
global_type_env.remove(v);
remove_global_binding(v);
};
global_vars.insert(v,e);
}
private final static boolean is_function ( Tree e ) {
match e {
case function(...): return true;
};
return false;
}
/** handle the assignment v:=e */
public final static Tree distributed_assign ( String v, Tree e ) {
reset();
if (global_vars.lookup(v) != null)
global_vars.remove(v);
MRData res = expression(e,false);
global_type_env.insert(v,query_type);
if (res instanceof Bag)
((Bag)res).materialize();
new_distributed_binding(v,res);
return query_plan;
}
/** bind v to the result of e */
public final static Tree store ( String v, Tree e ) {
reset();
if (global_vars.lookup(v) != null)
global_vars.remove(v);
MRData res = expression(e,false);
global_type_env.insert(v,query_type);
if (res instanceof Bag)
((Bag)res).materialize();
new_global_binding(v,res);
return query_plan;
}
/** define an MRQL constant, such as PI */
private final static void constant ( Tree v, Tree type, MRData value ) {
String var = v.toString();
if (global_vars.lookup(var) != null)
global_vars.remove(var);
global_type_env.insert(var,type);
new_global_binding(var,value);
}
/** define a new function
* @param fnc function name
* @param params parameter list
* @param out_type output type
* @param body function body
*/
public final static void functiondef ( String fnc, Trees params, Tree out_type, Tree body ) {
reset();
Trees as = #[];
Trees ps = #[];
for ( Tree param: params )
match param {
case bind(`v,`tp):
Tree ntp = normalize_type(tp);
as = as.append(ntp);
ps = ps.append(#<bind(`v,`ntp)>);
case _: type_error(param,"Ill-formed function parameter: "+param);
};
out_type = normalize_type(out_type);
// needed for recursive functions
global_type_env.insert(fnc,#<arrow(tuple(...as),`out_type)>);
Tree fname = #<`fnc>;
if (!is_pure(body))
impure_functions = impure_functions.append(fname);
Tree plan = store(fnc,#<function(tuple(...ps),`out_type,`body)>);
if (plan != null)
Translator.global_functions.insert(fnc,plan);
if (Config.hadoop_mode && plan != null)
Plan.conf.set("mrql.global."+fnc,
closure(plan,global_env).toString());
}
/** dump the result of evaluating the MRQL query e to a binary file */
private final static void dump ( String file, Tree e ) {
MRData res = expression(e,false);
try {
query_type = make_persistent_type(query_type);
if (res != null)
if (Config.hadoop_mode)
Evaluator.evaluator.dump(file,query_type,res);
else MapReduceAlgebra.dump(file,query_type,res);
} catch (Exception x) {
throw new Error(x);
}
}
/** dump the result of evaluating the MRQL query e to a text CVS file */
private final static void dump_text ( String file, Tree e ) {
MRData res = expression(e,false);
if (res != null)
try {
Evaluator.evaluator.dump_text(file,query_type,res);
} catch (Exception x) {
throw new Error(x);
}
}
/** true, if e is a strem-processing expression */
public final static boolean stream_expression ( Tree e ) {
match e {
case call(stream,...): return true;
case `f(...as):
for ( Tree a: as )
if (stream_expression(a))
return true;
};
return false;
}
/** define a new named type (typedef) */
private final static void typedef ( String name, Tree type ) {
type_names.insert(name,normalize_type(type));
}
/** define a new data type, such as XML and JSON */
private final static void datadef ( String name, Tree type ) {
int i = 0;
Trees as = #[];
match type {
case union(...nl):
// needed for recursive datatypes
global_datatype_env.insert(name,#<union(...nl)>);
for ( Tree n: nl )
match n {
case `c(`t):
if (data_constructors.lookup(c.toString()) == null)
data_constructors.insert(c.toString(),#<`name(`i,`t)>);
else type_error(type,"Data constructor "+c+" has already been defined");
as = as.append(#<`c(`(normalize_type(t)))>);
i++;
}
};
global_datatype_env.remove(name);
global_datatype_env.insert(name,#<union(...as)>);
}
/** define a user aggregation */
private static void aggregation ( String name, Tree type, Tree plus, Tree zero, Tree unit ) {
reset();
zero = Simplification.rename(zero);
plus = Simplification.rename(plus);
unit = Simplification.rename(unit);
type = normalize_type(type);
Tree ztp = TypeInference.type_inference2(zero);
Tree v1 = new_var();
type_env.insert(v1.toString(),ztp);
TypeInference.type_inference2(Normalization.normalize_all(#<apply(`plus,tuple(`v1,`v1))>));
Tree v2 = new_var();
type_env.insert(v2.toString(),type);
Tree utp = TypeInference.type_inference2(Normalization.normalize_all(#<apply(`unit,`v2)>));
if (unify(utp,ztp) == null)
type_error(unit,"Wrong type in unit result (expected "+ztp+" found "+utp);
monoids = monoids.append(#<`name(`type,`plus,`zero,`unit)>);
}
/** bind the pattern variables to values */
private static Environment bind_list ( Tree pattern, MRData value, Environment env ) {
match pattern {
case tuple(...ps):
int i = 0;
Tuple t = (Tuple)value;
for ( Tree p: ps )
env = bind_list(p,t.get(i++),env);
return env;
};
return new Environment(pattern.toString(),value,env);
}
/** bind the pattern variables to values */
private static Environment bind_list ( Tree pattern, Tree src, Environment env ) {
match pattern {
case tuple(...ps):
int i = 0;
match src {
case tuple(...es):
for ( Tree p: ps )
env = bind_list(p,es.nth(i++),env);
return env;
}
};
MRData value = Evaluator.evaluator.evalE(src,env);
env.replace(pattern.toString(),value);
return env;
}
/** the MRQL top-level interfacse to evaluate a single MRQL expression or command */
public final static void evaluate_top_level ( Tree expr ) {
if (expr == null)
return;
match expr {
case expression(`e):
long t = System.currentTimeMillis();
if (expression(e) != null && !Config.quiet_execution)
System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs");
case assign(`v,`e): assign(v.toString(),e);
case store(`v,`e):
long t = System.currentTimeMillis();
if (distributed_assign(v.toString(),e) != null && !Config.quiet_execution)
System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs");
case dump(`s,`e): // dump stream output
if (!stream_expression(e))
fail;
reset();
Tree plan = translate_expression(e);
if (plan == null)
return;
Evaluator.evaluator.initialize_query();
final Tree qt = query_type;
final String file = s.stringValue();
Evaluator.evaluator.streaming(plan,null,null,new Function(){
long i = 0;
public MRData eval ( final MRData data ) {
try {
Evaluator.evaluator.dump(file+"/f"+(i++),qt,data);
return data;
} catch (Exception ex) {
throw new Error("Cannot dump the streaming result to the directory "+file);
}
}
});
case dump_text(`s,`e): // dump stream output
if (!stream_expression(e))
fail;
reset();
Tree plan = translate_expression(e);
if (plan == null)
return;
Evaluator.evaluator.initialize_query();
final Tree qt = query_type;
final String file = s.stringValue();
Evaluator.evaluator.streaming(plan,null,null,new Function(){
long i = 0;
public MRData eval ( final MRData data ) {
try {
Evaluator.evaluator.dump_text(file+"/f"+(i++),qt,data);
return data;
} catch (Exception ex) {
throw new Error("Cannot dump the streaming result to the directory "+file);
}
}
});
case incr(`e): // incremental stream processing
Config.incremental = true;
reset();
Tree plan = translate_expression(e);
Config.incremental = false;
if (plan == null)
return;
Evaluator.evaluator.initialize_query();
match plan {
case incr(`zero,lambda(`state,`merge),lambda(_,`answer)):
final Tree qt = make_persistent_type(query_type);
MRData z = evalE(zero,null);
final Environment env =bind_list(state,z,null);
final Tree ans = answer;
merge = Streaming.streamify(merge);
Evaluator.evaluator.streaming(#<lambda(`state,`merge)>,env,null,
new Function() {
public MRData eval ( final MRData state ) {
MRData a = Evaluator.evaluator.evalE(ans,env);
System.out.println(print(a,qt));
return state;
}
});
};
case lineage(`e):
Config.lineage = true;
boolean quiet = Config.quiet_execution;
Config.quiet_execution = true;
long t = System.currentTimeMillis();
if (expression(e,false) != null && !Config.quiet_execution)
System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs");
Config.lineage = false;
Config.quiet_execution = quiet;
case debug(`path,`e):
Config.debug = true;
boolean quiet = Config.quiet_execution;
Config.quiet_execution = true;
long t = System.currentTimeMillis();
dump(path.stringValue(),e);
if (!Config.quiet_execution)
System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs");
Config.debug = false;
Config.quiet_execution = quiet;
case dump(`s,`e):
long t = System.currentTimeMillis();
dump(s.stringValue(),e);
if (!Config.quiet_execution)
System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs");
case dump_text(`s,`e):
long t = System.currentTimeMillis();
dump_text(s.stringValue(),e);
if (!Config.quiet_execution)
System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs");
case typedef(`v,`t): typedef(v.toString(),t);
case datadef(`v,`t): datadef(v.toString(),t);
case functiondef(`f,params(...p),`tp,`e):
functiondef(f.toString(),p,tp,e);
case macrodef(`name,params(...p),`e):
Translator.global_macros.insert(name.toString(),#<macro(params(...p),`e)>);
case aggregation(`aggr,`type,`plus,`zero,`unit):
aggregation(aggr.toString(),type,plus,zero,unit);
case import(`c):
ClassImporter.importClass(c.variableValue());
case import(`c,...l):
for (Tree m: l)
ClassImporter.importMethod(c.variableValue(),m.variableValue());
case include(`file):
Main.include_file(file.toString());
case parser(`n,`p):
try {
Class<? extends Parser> c = Class.forName(p.toString()).asSubclass(Parser.class);
DataSource.parserDirectory.put(n.toString(),c);
} catch (ClassNotFoundException e) {
throw new Error("Class "+p.toString()+" not found");
}
case impure(`fn): // not used
impure_functions = impure_functions.append(fn);
case _: throw new Error("Unknown statement: "+expr);
}
}
}