blob: 0bb41e071f1a579d3c6ae98b227a9be28aef8fff [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import java_cup.runtime.*;
import org.apache.mrql.gen.*;
import java.util.*;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.conf.Configuration;
/** Evaluates physical plans in Apache Hadoop MapReduce mode */
final public class MapReduceEvaluator extends Evaluator {
/** initialize the MapReduce evaluator */
final public void init ( Configuration conf ) {
Config.map_reduce_mode = true;
if (Config.hadoop_mode)
if (Config.local_mode) {
conf.set("mapred.job.tracker","local");
conf.set("mapreduce.framework.name","local");
conf.set("fs.defaultFS","file:///"); // doesn't work for hadoop 1.*
} else {
if (!System.getenv("MAPRED_JOB_TRACKER").equals(""))
conf.set("mapred.job.tracker",System.getenv("MAPRED_JOB_TRACKER"));
if (!System.getenv("FS_DEFAULT_NAME").equals(""))
conf.set("fs.defaultFS",System.getenv("FS_DEFAULT_NAME")); // doesn't work for hadoop 1.*
}
}
/** shutdown the MapReduce evaluator */
final public void shutdown ( Configuration conf ) {
}
/** initialize the query evaluation */
final public void initialize_query () {
}
/** create a new evaluation configuration */
final public Configuration new_configuration () {
return new Configuration();
}
final public static Configuration clear_configuration ( Configuration old_conf ) {
Configuration conf = new Configuration();
evaluator.init(conf);
// keep the current mrql configurations
for ( Map.Entry<String,String> entry: old_conf )
if (entry.getKey().startsWith("mrql.") && entry.getValue() != null)
conf.set(entry.getKey(),entry.getValue());
// old Configuration doesn't have unset
// conf.unset("mapred.input.dir.formats"); // need to reset MultipleInputs for local mode
return conf;
}
/** return the FileInputFormat for parsed files (CSV, XML, JSON, etc) */
final public Class<? extends MRQLFileInputFormat> parsedInputFormat () {
return MapReduceParsedInputFormat.class;
}
/** return the FileInputFormat for binary files */
final public Class<? extends MRQLFileInputFormat> binaryInputFormat () {
return MapReduceBinaryInputFormat.class;
}
/** return the FileInputFormat for data generator files */
final public Class<? extends MRQLFileInputFormat> generatorInputFormat () {
return MapReduceGeneratorInputFormat.class;
}
/** Coerce a persistent collection to a Bag */
public Bag toBag ( MRData data ) {
try {
if (data instanceof MR_dataset)
return Plan.collect(((MR_dataset)data).dataset());
return (Bag)data;
} catch (Exception ex) {
throw new Error("Cannot coerce "+data+" to a Bag: "+ex);
}
}
/** The Aggregate physical operator
* @param acc_fnc the accumulator function from (T,T) to T
* @param zero the zero element of type T
* @param plan the plan that constructs the dataset that contains the bag of values {T}
* @param env contains bindings fro variables to values (MRData)
* @return the aggregation result of type T
*/
final public MRData aggregate ( Tree acc_fnc,
Tree zero,
Tree plan,
Environment env ) throws Exception {
return MapReducePlan.aggregate(acc_fnc,zero,eval(plan,env,"-"));
}
/** Evaluate a loop a fixed # of times */
final public Tuple loop ( Tree e, Environment env ) throws Exception {
match e {
case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
int limit = ((MR_int)evalE(num,env)).get();
MR_dataset[] s = new MR_dataset[vs.length()];
for ( int i = 0; i < vs.length(); i++ )
s[i] = new MR_dataset(eval(ss.nth(i),env,"-"));
for ( int n = 0; n < limit; n++ ) {
Environment nenv = env;
for ( int i = 0; i < vs.length(); i ++ )
nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
for ( int i = 0; i < vs.length(); i ++ )
s[i] = new MR_dataset(eval(bs.nth(i),nenv,"-"));
};
return new Tuple(s);
};
throw new Error("Wrong Loop format");
}
/** Evaluate an MRQL physical plan using Hadoop and print tracing info
* @param e the physical plan
* @param env contains bindings fro variables to values (MRData)
* @return a DataSet (stored in HDFS)
*/
final public DataSet eval ( final Tree e,
final Environment env,
final String counter ) {
if (Config.trace_execution) {
tab_count += 3;
System.out.println(tabs(tab_count)+print_query(e));
};
DataSet res = evalD(e,env,counter);
if (Config.trace_execution)
try {
System.out.println(tabs(tab_count)
+"-> "+res.take(Config.max_bag_size_print));
tab_count -= 3;
} catch (Exception ex) {
throw new Error("Cannot collect the operator output: "+e);
};
return res;
}
/** Evaluate MRQL physical operators using Hadoop
* @param e the physical plan
* @param env contains bindings fro variables to values (MRData)
* @return a DataSet (stored in HDFS)
*/
final DataSet evalD ( final Tree e,
final Environment env,
final String counter ) {
try {
match e {
case cMap(`f,`s):
return MapOperation.cMap(closure(f,env),null,null,eval(s,env,"-"),counter);
case AggregateMap(`f,`acc,`zero,`s):
return MapOperation.cMap(closure(f,env),closure(acc,env),
(zero.equals(#<null>))?null:zero,
eval(s,env,"-"),counter);
case MapReduce(`m,`r,`s,`o):
return MapReduceOperation.mapReduce(closure(m,env),#<null>,closure(r,env),
null,null,
eval(s,env,"-"),
Config.nodes,counter,
o.equals(#<true>));
case MapAggregateReduce(`m,`r,`acc,`zero,`s,`o):
return MapReduceOperation.mapReduce(closure(m,env),null,closure(r,env),
closure(acc,env),
(zero.equals(#<null>))?null:zero,
eval(s,env,"-"),
Config.nodes,counter,
o.equals(#<true>));
case MapCombineReduce(`m,`c,`r,`s,`o):
return MapReduceOperation.mapReduce(closure(m,env),closure(c,env),closure(r,env),
null,null,
eval(s,env,"-"),
Config.nodes,counter,
o.equals(#<true>));
case CrossProduct(`mx,`my,`r,`x,`y):
return CrossProductOperation.crossProduct(closure(mx,env),closure(my,env),closure(r,env),
null,null,
eval(x,env,"-"),
eval(y,env,"-"),
counter);
case CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y):
return CrossProductOperation.crossProduct(closure(mx,env),closure(my,env),closure(r,env),
closure(acc,env),
(zero.equals(#<null>))?null:zero,
eval(x,env,"-"),
eval(y,env,"-"),
counter);
case MapReduce2(`mx,`my,`r,`x,`y,`o):
return eval(#<MapAggregateReduce2(`mx,`my,`r,null,null,`x,`y,`o)>,env,counter);
case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,`o):
return JoinOperation.mapReduce2(closure(mx,env),closure(my,env),
closure(c,env),
closure(r,env),
null,null,
eval(x,env,"-"),
eval(y,env,"-"),
Config.nodes,counter,
o.equals(#<true>));
case MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,`o):
return JoinOperation.mapReduce2(closure(mx,env),closure(my,env),
null,
closure(r,env),closure(acc,env),
(zero.equals(#<null>))?null:zero,
eval(x,env,"-"),
eval(y,env,"-"),
Config.nodes,counter,
o.equals(#<true>));
case GroupByJoin(`kx,`ky,`gx,`gy,`mp,`c,`r,`x,`y,`o):
DataSet X = eval(x,env,"-");
DataSet Y = eval(y,env,"-");
int n = (int)Math.floor(Math.sqrt(Config.nodes));
int m = n;
// System.err.println("Using a groupBy join on a "+n+"*"+m+" grid of partitions");
return GroupByJoinPlan.groupByJoin(closure(kx,env),closure(ky,env),
closure(gx,env),closure(gy,env),
closure(mp,env),closure(c,env),closure(r,env),
X,Y,
Config.nodes,
n,m,counter);
case MapJoin(`mx,`my,`r,`x,`y):
return MapJoinOperation.mapJoin(closure(mx,env),closure(my,env),closure(r,env),
null,null,
eval(x,env,"-"),
eval(y,env,"-"),
counter);
case MapAggregateJoin(`mx,`my,`r,`acc,`zero,`x,`y):
return MapJoinOperation.mapJoin(closure(mx,env),closure(my,env),closure(r,env),
closure(acc,env),
(zero.equals(#<null>))?null:zero,
eval(x,env,"-"),
eval(y,env,"-"),
counter);
case BinarySource(`file,_):
return Plan.binarySource(file.stringValue());
case ParsedSource(`parser,`file,...args):
Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
if (p == null)
throw new Error("Unknown parser: "+parser);
return Plan.parsedSource(p,((MR_string)evalE(file,env)).get(),args);
case Merge(`x,`y):
return Plan.merge(eval(x,env,"-"),eval(y,env,"-"));
case Repeat(lambda(`v,`b),`s,`n):
final String nm = v.toString();
final Tree body = b;
Function loop_fnc = new Function () {
public MRData eval ( MRData s ) {
return new MR_dataset(evaluator.eval(body,new Environment(nm,s,env),nm));
}; };
return MapReducePlan.repeat(loop_fnc,eval(s,env,"-"),((MR_int)evalE(n,env)).get());
case Closure(lambda(`v,`b),`s,`n):
final String nm = v.toString();
final Tree body = b;
Function loop_fnc = new Function () {
public MRData eval ( MRData s ) {
return new MR_dataset(evaluator.eval(body,new Environment(nm,s,env),"-"));
}; };
return MapReducePlan.closure(loop_fnc,eval(s,env,"-"),((MR_int)evalE(n,env)).get());
case Generator(`min,`max,`size):
return Plan.generator(((MR_long)evalE(min,env)).get(),
((MR_long)evalE(max,env)).get(),
((MR_long)evalE(size,env)).get());
case let(`v,`u,`body):
return eval(body,new Environment(v.toString(),evalE(u,env),env),"-");
case Let(`v,`u,`body):
return eval(body,new Environment(v.toString(),new MR_dataset(eval(u,env,"-")),env),"-");
case If(`c,`x,`y):
if (((MR_bool)evalE(c,env)).get())
return eval(x,env,"-");
else return eval(y,env,"-");
case trace(`msg,`tp,`x):
long n = pre_trace(((MR_string)evalE(msg,env)).get());
DataSet ds = evalD(x,env,counter);
if (!counter.equals("-")) // if inside a repeat loop
match tp {
case `T(tuple(`etp,bool)):
tp = #<`T(`etp)>;
};
trace(n,tp,new MR_dataset(ds));
return ds;
case apply(`f,`arg):
if (!f.is_variable())
return ((MR_dataset)evalF(f,env).eval(evalE(arg))).dataset();
MRData fnc = variable_lookup(f.toString(),global_env);
if (fnc == null)
throw new Error("Unknown function: "+f);
MRData t = evalE(arg,env);
if (!(t instanceof Tuple))
throw new Error("Expected a tuple in function application: "+t);
return ((MR_dataset)((Lambda)fnc).lambda().eval(t)).dataset();
case nth(`u,`n):
MRData x = ((Tuple)evalE(u,env)).get((int)n.longValue());
if (x instanceof MR_dataset)
return ((MR_dataset)x).dataset();
throw new Error("Evaluation error in: "+print_query(e));
case `v:
if (!v.is_variable())
fail;
MRData x = variable_lookup(v.toString(),env);
if (x != null)
if (x instanceof MR_dataset)
return ((MR_dataset)x).dataset();
x = variable_lookup(v.toString(),global_env);
if (x != null)
if (x instanceof MR_dataset)
return ((MR_dataset)x).dataset();
throw new Error("Variable "+v+" is not bound");
};
throw new Error("Cannot evaluate the map-reduce plan: "+e);
} catch (Error msg) {
if (!Config.trace)
throw new Error(msg.getMessage());
System.err.println(msg.getMessage());
throw new Error("Evaluation error in: "+print_query(e));
} catch (Exception ex) {
System.err.println(ex.getMessage());
ex.printStackTrace();
throw new Error("Evaluation error in: "+print_query(e));
}
}
}