| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.mrql; |
| |
| import java_cup.runtime.*; |
| import org.apache.mrql.gen.*; |
| import java.util.*; |
| import java.io.*; |
| import org.apache.hadoop.io.*; |
| import org.apache.hadoop.conf.Configuration; |
| |
| |
| /** Evaluates physical plans in Apache Hadoop MapReduce mode */ |
| final public class MapReduceEvaluator extends Evaluator { |
| |
| /** initialize the MapReduce evaluator */ |
| final public void init ( Configuration conf ) { |
| Config.map_reduce_mode = true; |
| if (Config.hadoop_mode) |
| if (Config.local_mode) { |
| conf.set("mapred.job.tracker","local"); |
| conf.set("mapreduce.framework.name","local"); |
| conf.set("fs.defaultFS","file:///"); // doesn't work for hadoop 1.* |
| } else { |
| if (!System.getenv("MAPRED_JOB_TRACKER").equals("")) |
| conf.set("mapred.job.tracker",System.getenv("MAPRED_JOB_TRACKER")); |
| if (!System.getenv("FS_DEFAULT_NAME").equals("")) |
| conf.set("fs.defaultFS",System.getenv("FS_DEFAULT_NAME")); // doesn't work for hadoop 1.* |
| } |
| } |
| |
| /** shutdown the MapReduce evaluator */ |
| final public void shutdown ( Configuration conf ) { |
| } |
| |
| /** initialize the query evaluation */ |
| final public void initialize_query () { |
| } |
| |
| /** create a new evaluation configuration */ |
| final public Configuration new_configuration () { |
| return new Configuration(); |
| } |
| |
| final public static Configuration clear_configuration ( Configuration old_conf ) { |
| Configuration conf = new Configuration(); |
| evaluator.init(conf); |
| // keep the current mrql configurations |
| for ( Map.Entry<String,String> entry: old_conf ) |
| if (entry.getKey().startsWith("mrql.") && entry.getValue() != null) |
| conf.set(entry.getKey(),entry.getValue()); |
| // old Configuration doesn't have unset |
| // conf.unset("mapred.input.dir.formats"); // need to reset MultipleInputs for local mode |
| return conf; |
| } |
| |
| /** return the FileInputFormat for parsed files (CSV, XML, JSON, etc) */ |
| final public Class<? extends MRQLFileInputFormat> parsedInputFormat () { |
| return MapReduceParsedInputFormat.class; |
| } |
| |
| /** return the FileInputFormat for binary files */ |
| final public Class<? extends MRQLFileInputFormat> binaryInputFormat () { |
| return MapReduceBinaryInputFormat.class; |
| } |
| |
| /** return the FileInputFormat for data generator files */ |
| final public Class<? extends MRQLFileInputFormat> generatorInputFormat () { |
| return MapReduceGeneratorInputFormat.class; |
| } |
| |
| /** Coerce a persistent collection to a Bag */ |
| public Bag toBag ( MRData data ) { |
| try { |
| if (data instanceof MR_dataset) |
| return Plan.collect(((MR_dataset)data).dataset()); |
| return (Bag)data; |
| } catch (Exception ex) { |
| throw new Error("Cannot coerce "+data+" to a Bag: "+ex); |
| } |
| } |
| |
| /** The Aggregate physical operator |
| * @param acc_fnc the accumulator function from (T,T) to T |
| * @param zero the zero element of type T |
| * @param plan the plan that constructs the dataset that contains the bag of values {T} |
| * @param env contains bindings fro variables to values (MRData) |
| * @return the aggregation result of type T |
| */ |
| final public MRData aggregate ( Tree acc_fnc, |
| Tree zero, |
| Tree plan, |
| Environment env ) throws Exception { |
| return MapReducePlan.aggregate(acc_fnc,zero,eval(plan,env,"-")); |
| } |
| |
| /** Evaluate a loop a fixed # of times */ |
| final public Tuple loop ( Tree e, Environment env ) throws Exception { |
| match e { |
| case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num): |
| int limit = ((MR_int)evalE(num,env)).get(); |
| MR_dataset[] s = new MR_dataset[vs.length()]; |
| for ( int i = 0; i < vs.length(); i++ ) |
| s[i] = new MR_dataset(eval(ss.nth(i),env,"-")); |
| for ( int n = 0; n < limit; n++ ) { |
| Environment nenv = env; |
| for ( int i = 0; i < vs.length(); i ++ ) |
| nenv = new Environment(vs.nth(i).toString(),s[i],nenv); |
| for ( int i = 0; i < vs.length(); i ++ ) |
| s[i] = new MR_dataset(eval(bs.nth(i),nenv,"-")); |
| }; |
| return new Tuple(s); |
| }; |
| throw new Error("Wrong Loop format"); |
| } |
| |
| /** Evaluate an MRQL physical plan using Hadoop and print tracing info |
| * @param e the physical plan |
| * @param env contains bindings fro variables to values (MRData) |
| * @return a DataSet (stored in HDFS) |
| */ |
| final public DataSet eval ( final Tree e, |
| final Environment env, |
| final String counter ) { |
| if (Config.trace_execution) { |
| tab_count += 3; |
| System.out.println(tabs(tab_count)+print_query(e)); |
| }; |
| DataSet res = evalD(e,env,counter); |
| if (Config.trace_execution) |
| try { |
| System.out.println(tabs(tab_count) |
| +"-> "+res.take(Config.max_bag_size_print)); |
| tab_count -= 3; |
| } catch (Exception ex) { |
| throw new Error("Cannot collect the operator output: "+e); |
| }; |
| return res; |
| } |
| |
| /** Evaluate MRQL physical operators using Hadoop |
| * @param e the physical plan |
| * @param env contains bindings fro variables to values (MRData) |
| * @return a DataSet (stored in HDFS) |
| */ |
| final DataSet evalD ( final Tree e, |
| final Environment env, |
| final String counter ) { |
| try { |
| match e { |
| case cMap(`f,`s): |
| return MapOperation.cMap(closure(f,env),null,null,eval(s,env,"-"),counter); |
| case AggregateMap(`f,`acc,`zero,`s): |
| return MapOperation.cMap(closure(f,env),closure(acc,env), |
| (zero.equals(#<null>))?null:zero, |
| eval(s,env,"-"),counter); |
| case MapReduce(`m,`r,`s,`o): |
| return MapReduceOperation.mapReduce(closure(m,env),#<null>,closure(r,env), |
| null,null, |
| eval(s,env,"-"), |
| Config.nodes,counter, |
| o.equals(#<true>)); |
| case MapAggregateReduce(`m,`r,`acc,`zero,`s,`o): |
| return MapReduceOperation.mapReduce(closure(m,env),null,closure(r,env), |
| closure(acc,env), |
| (zero.equals(#<null>))?null:zero, |
| eval(s,env,"-"), |
| Config.nodes,counter, |
| o.equals(#<true>)); |
| case MapCombineReduce(`m,`c,`r,`s,`o): |
| return MapReduceOperation.mapReduce(closure(m,env),closure(c,env),closure(r,env), |
| null,null, |
| eval(s,env,"-"), |
| Config.nodes,counter, |
| o.equals(#<true>)); |
| case CrossProduct(`mx,`my,`r,`x,`y): |
| return CrossProductOperation.crossProduct(closure(mx,env),closure(my,env),closure(r,env), |
| null,null, |
| eval(x,env,"-"), |
| eval(y,env,"-"), |
| counter); |
| case CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y): |
| return CrossProductOperation.crossProduct(closure(mx,env),closure(my,env),closure(r,env), |
| closure(acc,env), |
| (zero.equals(#<null>))?null:zero, |
| eval(x,env,"-"), |
| eval(y,env,"-"), |
| counter); |
| case MapReduce2(`mx,`my,`r,`x,`y,`o): |
| return eval(#<MapAggregateReduce2(`mx,`my,`r,null,null,`x,`y,`o)>,env,counter); |
| case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,`o): |
| return JoinOperation.mapReduce2(closure(mx,env),closure(my,env), |
| closure(c,env), |
| closure(r,env), |
| null,null, |
| eval(x,env,"-"), |
| eval(y,env,"-"), |
| Config.nodes,counter, |
| o.equals(#<true>)); |
| case MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,`o): |
| return JoinOperation.mapReduce2(closure(mx,env),closure(my,env), |
| null, |
| closure(r,env),closure(acc,env), |
| (zero.equals(#<null>))?null:zero, |
| eval(x,env,"-"), |
| eval(y,env,"-"), |
| Config.nodes,counter, |
| o.equals(#<true>)); |
| case GroupByJoin(`kx,`ky,`gx,`gy,`mp,`c,`r,`x,`y,`o): |
| DataSet X = eval(x,env,"-"); |
| DataSet Y = eval(y,env,"-"); |
| int n = (int)Math.floor(Math.sqrt(Config.nodes)); |
| int m = n; |
| // System.err.println("Using a groupBy join on a "+n+"*"+m+" grid of partitions"); |
| return GroupByJoinPlan.groupByJoin(closure(kx,env),closure(ky,env), |
| closure(gx,env),closure(gy,env), |
| closure(mp,env),closure(c,env),closure(r,env), |
| X,Y, |
| Config.nodes, |
| n,m,counter); |
| case MapJoin(`mx,`my,`r,`x,`y): |
| return MapJoinOperation.mapJoin(closure(mx,env),closure(my,env),closure(r,env), |
| null,null, |
| eval(x,env,"-"), |
| eval(y,env,"-"), |
| counter); |
| case MapAggregateJoin(`mx,`my,`r,`acc,`zero,`x,`y): |
| return MapJoinOperation.mapJoin(closure(mx,env),closure(my,env),closure(r,env), |
| closure(acc,env), |
| (zero.equals(#<null>))?null:zero, |
| eval(x,env,"-"), |
| eval(y,env,"-"), |
| counter); |
| case BinarySource(`file,_): |
| return Plan.binarySource(file.stringValue()); |
| case ParsedSource(`parser,`file,...args): |
| Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString()); |
| if (p == null) |
| throw new Error("Unknown parser: "+parser); |
| return Plan.parsedSource(p,((MR_string)evalE(file,env)).get(),args); |
| case Merge(`x,`y): |
| return Plan.merge(eval(x,env,"-"),eval(y,env,"-")); |
| case Repeat(lambda(`v,`b),`s,`n): |
| final String nm = v.toString(); |
| final Tree body = b; |
| Function loop_fnc = new Function () { |
| public MRData eval ( MRData s ) { |
| return new MR_dataset(evaluator.eval(body,new Environment(nm,s,env),nm)); |
| }; }; |
| return MapReducePlan.repeat(loop_fnc,eval(s,env,"-"),((MR_int)evalE(n,env)).get()); |
| case Closure(lambda(`v,`b),`s,`n): |
| final String nm = v.toString(); |
| final Tree body = b; |
| Function loop_fnc = new Function () { |
| public MRData eval ( MRData s ) { |
| return new MR_dataset(evaluator.eval(body,new Environment(nm,s,env),"-")); |
| }; }; |
| return MapReducePlan.closure(loop_fnc,eval(s,env,"-"),((MR_int)evalE(n,env)).get()); |
| case Generator(`min,`max,`size): |
| return Plan.generator(((MR_long)evalE(min,env)).get(), |
| ((MR_long)evalE(max,env)).get(), |
| ((MR_long)evalE(size,env)).get()); |
| case let(`v,`u,`body): |
| return eval(body,new Environment(v.toString(),evalE(u,env),env),"-"); |
| case Let(`v,`u,`body): |
| return eval(body,new Environment(v.toString(),new MR_dataset(eval(u,env,"-")),env),"-"); |
| case If(`c,`x,`y): |
| if (((MR_bool)evalE(c,env)).get()) |
| return eval(x,env,"-"); |
| else return eval(y,env,"-"); |
| case trace(`msg,`tp,`x): |
| long n = pre_trace(((MR_string)evalE(msg,env)).get()); |
| DataSet ds = evalD(x,env,counter); |
| if (!counter.equals("-")) // if inside a repeat loop |
| match tp { |
| case `T(tuple(`etp,bool)): |
| tp = #<`T(`etp)>; |
| }; |
| trace(n,tp,new MR_dataset(ds)); |
| return ds; |
| case apply(`f,`arg): |
| if (!f.is_variable()) |
| return ((MR_dataset)evalF(f,env).eval(evalE(arg))).dataset(); |
| MRData fnc = variable_lookup(f.toString(),global_env); |
| if (fnc == null) |
| throw new Error("Unknown function: "+f); |
| MRData t = evalE(arg,env); |
| if (!(t instanceof Tuple)) |
| throw new Error("Expected a tuple in function application: "+t); |
| return ((MR_dataset)((Lambda)fnc).lambda().eval(t)).dataset(); |
| case nth(`u,`n): |
| MRData x = ((Tuple)evalE(u,env)).get((int)n.longValue()); |
| if (x instanceof MR_dataset) |
| return ((MR_dataset)x).dataset(); |
| throw new Error("Evaluation error in: "+print_query(e)); |
| case `v: |
| if (!v.is_variable()) |
| fail; |
| MRData x = variable_lookup(v.toString(),env); |
| if (x != null) |
| if (x instanceof MR_dataset) |
| return ((MR_dataset)x).dataset(); |
| x = variable_lookup(v.toString(),global_env); |
| if (x != null) |
| if (x instanceof MR_dataset) |
| return ((MR_dataset)x).dataset(); |
| throw new Error("Variable "+v+" is not bound"); |
| }; |
| throw new Error("Cannot evaluate the map-reduce plan: "+e); |
| } catch (Error msg) { |
| if (!Config.trace) |
| throw new Error(msg.getMessage()); |
| System.err.println(msg.getMessage()); |
| throw new Error("Evaluation error in: "+print_query(e)); |
| } catch (Exception ex) { |
| System.err.println(ex.getMessage()); |
| ex.printStackTrace(); |
| throw new Error("Evaluation error in: "+print_query(e)); |
| } |
| } |
| } |