| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.mrql; |
| |
| import org.apache.mrql.gen.*; |
| import java.util.List; |
| import java.io.*; |
| import org.apache.hadoop.io.*; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hama.HamaConfiguration; |
| |
| |
| /** Evaluate a physical plan in BSP mode using Apache Hama */ |
| final public class BSPEvaluator extends Evaluator { |
| |
| /** initialize the BSP evaluator */ |
| final public void init ( Configuration conf ) { |
| Config.bsp_mode = true; |
| if (Config.hadoop_mode) |
| if (Config.local_mode) { |
| conf.set("bsp.master.address","local"); |
| conf.set("hama.zookeeper.quorum","localhost"); |
| conf.setInt("bsp.local.tasks.maximum",Config.nodes); |
| conf.set("fs.default.name","file:///"); |
| } else { |
| conf.set("bsp.master.address",System.getenv("BSP_MASTER_ADDRESS")); |
| conf.set("hama.zookeeper.quorum",System.getenv("HAMA_ZOOKEEPER_QUORUM")); |
| conf.setInt("bsp.local.tasks.maximum",Config.nodes); |
| conf.set("fs.default.name",System.getenv("FS_DEFAULT_NAME")); |
| } |
| } |
| |
| final public void initialize_query () { |
| } |
| |
| /** shutdown the BSP evaluator */ |
| final public void shutdown ( Configuration conf ) { |
| } |
| |
| final public Configuration new_configuration () { |
| return new HamaConfiguration(); |
| } |
| |
| final public MR_bool synchronize ( MR_string peerName, MR_bool mr_exit ) { |
| if (Config.hadoop_mode) |
| return BSPPlan.BSPop.synchronize(peerName,mr_exit); |
| else return mr_exit; |
| } |
| |
| final public Bag distribute ( MR_string peerName, Bag s ) { |
| if (Config.hadoop_mode) |
| return BSPPlan.BSPop.distribute(peerName,s); |
| else return s; |
| } |
| |
| /** Evaluate MRQL physical operators using BSP |
| * @param plan the physical plan |
| * @param env contains bindings fro variables to values (MRData) |
| * @return a DataSet (stored in HDFS) |
| */ |
| final public MRData bsp ( Tree plan, Environment env ) throws Exception { |
| match plan { |
| case BSP(tuple(...ns),`superstep,`init_state,`o,...S): |
| int[] os = new int[ns.length()]; |
| for ( int i = 0; i < os.length; i++ ) |
| os[i] = (int)((LongLeaf)ns.nth(i)).value(); |
| DataSet ds = eval(S.head(),env); |
| for ( Tree s: S.tail() ) |
| ds.merge(eval(s,env)); |
| return BSPPlan.BSP(os, |
| closure(superstep,env), |
| init_state, |
| o.equals(#<true>), |
| ds); |
| case BSP(`n,`superstep,`init_state,`o,...S): |
| DataSet ds = eval(S.head(),env); |
| for ( Tree s: S.tail() ) |
| ds.merge(eval(s,env)); |
| return BSPPlan.BSP(new int[]{(int)((LongLeaf)n).value()}, |
| closure(superstep,env), |
| init_state, |
| o.equals(#<true>), |
| ds); |
| } |
| throw new Error("Cannot perform the BSP plan: "+plan); |
| } |
| |
| /** return the FileInputFormat for parsed files (CSV, XML, JSON, etc) */ |
| final public Class<? extends MRQLFileInputFormat> parsedInputFormat () { |
| return BSPParsedInputFormat.class; |
| } |
| |
| /** return the FileInputFormat for binary files */ |
| final public Class<? extends MRQLFileInputFormat> binaryInputFormat () { |
| return BSPBinaryInputFormat.class; |
| } |
| |
| /** return the FileInputFormat for data generator files */ |
| final public Class<? extends MRQLFileInputFormat> generatorInputFormat () { |
| return BSPGeneratorInputFormat.class; |
| } |
| |
| /** The Aggregate physical operator |
| * @param acc_fnc the accumulator function from (T,T) to T |
| * @param zero the zero element of type T |
| * @param plan the plan that constructs the dataset that contains the bag of values {T} |
| * @param env contains bindings fro variables to values (MRData) |
| * @return the aggregation result of type T |
| */ |
| final public MRData aggregate ( Tree acc_fnc, |
| Tree zero, |
| Tree plan, |
| Environment env ) throws Exception { |
| match plan { |
| case BSP(`n,`superstep,`init_state,`o,...S): |
| DataSet ds = eval(S.head(),env,"-"); |
| for ( Tree s: S.tail() ) |
| ds.merge(eval(s,env,"-")); |
| return BSPPlan.BSPaggregate((int)((LongLeaf)n).value(), |
| closure(superstep,env), |
| init_state, |
| closure(acc_fnc,env), |
| zero, |
| ds); |
| } |
| throw new Error("Cannot perform the aggregation: "+plan); |
| } |
| |
| /** Evaluate a loop a fixed # of times */ |
| final public Tuple loop ( Tree e, Environment env ) throws Exception { |
| match e { |
| case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num): |
| int limit = ((MR_int)evalE(num,env)).get(); |
| MR_dataset[] s = new MR_dataset[vs.length()]; |
| for ( int i = 0; i < vs.length(); i++ ) |
| s[i] = new MR_dataset(eval(ss.nth(i),env,"-")); |
| for ( int n = 0; n < limit; n++ ) { |
| Environment nenv = env; |
| for ( int i = 0; i < vs.length(); i ++ ) |
| nenv = new Environment(vs.nth(i).toString(),s[i],nenv); |
| for ( int i = 0; i < vs.length(); i ++ ) |
| s[i] = new MR_dataset(eval(bs.nth(i),nenv,"-")); |
| }; |
| return new Tuple(s); |
| }; |
| throw new Error("Wrong Loop format"); |
| } |
| |
| final public DataSet eval ( final Tree e, |
| final Environment env, |
| final String counter ) { |
| return eval(e,env); |
| } |
| |
| /** Evaluate MRQL BSP physical operators using Hama (returns a DataSet) |
| * and print tracing info |
| * @param e the physical plan |
| * @param env contains bindings fro variables to values (MRData) |
| * @return a DataSet (stored in HDFS) |
| */ |
| final public DataSet eval ( final Tree e, final Environment env ) { |
| if (Config.trace_execution) { |
| tab_count += 3; |
| System.out.println(tabs(tab_count)+print_query(e)); |
| }; |
| DataSet res = evalD(e,env); |
| if (Config.trace_execution) |
| try { |
| System.out.println(tabs(tab_count) |
| +"-> "+res.take(Config.max_bag_size_print)); |
| tab_count -= 3; |
| } catch (Exception ex) { |
| throw new Error("Cannot collect the operator output: "+e); |
| }; |
| return res; |
| } |
| |
| /** Evaluate MRQL BSP physical operators using Hama (returns a DataSet) |
| * @param e the physical plan |
| * @param env contains bindings fro variables to values (MRData) |
| * @return a DataSet (stored in HDFS) |
| */ |
| final public DataSet evalD ( final Tree e, final Environment env ) { |
| try { |
| match e { |
| case BSPSource(`n,BinarySource(`file,_)): |
| return Plan.binarySource((int)((LongLeaf)n).value(),file.stringValue()); |
| case BSPSource(`n,ParsedSource(`parser,`file,...args)): |
| Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString()); |
| if (p == null) |
| throw new Error("Unknown parser: "+parser); |
| return Plan.parsedSource((int)((LongLeaf)n).value(), |
| p,((MR_string)evalE(file,env)).get(),args); |
| case BSPSource(`n,Generator(`min,`max,`size)): |
| return Plan.generator((int)((LongLeaf)n).value(), |
| ((MR_long)evalE(min,env)).get(), |
| ((MR_long)evalE(max,env)).get(), |
| ((MR_long)evalE(size,env)).get()); |
| case BSPSource(`n,`v): |
| if (!v.is_variable()) |
| fail; |
| DataSet ds = evalD(v,env); |
| for ( DataSource s: ds.source ) |
| s.source_num = (int)((LongLeaf)n).value(); |
| return ds; |
| case Merge(`x,`y): |
| return Plan.merge(eval(x,env),eval(y,env)); |
| case Dump(`s): |
| return Plan.fileCache((Bag)evalE(s,env)); |
| case trace(`msg,`tp,`x): |
| long n = pre_trace(((MR_string)evalE(msg,env)).get()); |
| DataSet ds = evalD(x,env); |
| trace(n,tp,new MR_dataset(ds)); |
| return ds; |
| case apply(`f,`arg): |
| if (!f.is_variable()) |
| return ((MR_dataset)evalF(f,env).eval(evalE(arg))).dataset(); |
| MRData fnc = variable_lookup(f.toString(),global_env); |
| if (fnc == null) |
| throw new Error("Unknown function: "+f); |
| MRData t = evalE(arg,env); |
| if (!(t instanceof Tuple)) |
| throw new Error("Expected a tuple in function application: "+t); |
| return ((MR_dataset)((Lambda)fnc).lambda().eval(t)).dataset(); |
| case `v: |
| if (!v.is_variable()) |
| fail; |
| MRData x = variable_lookup(v.toString(),env); |
| if (x != null) |
| if (x instanceof MR_dataset) |
| return ((MR_dataset)x).dataset(); |
| x = variable_lookup(v.toString(),global_env); |
| if (x != null) |
| if (x instanceof MR_dataset) |
| return ((MR_dataset)x).dataset(); |
| throw new Error("Variable "+v+" is not bound"); |
| }; |
| MRData d = bsp(e,env); |
| if (d instanceof MR_dataset) |
| return ((MR_dataset)d).dataset(); |
| throw new Error("Cannot evaluate the BSP plan: "+e); |
| } catch (Error msg) { |
| if (!Config.trace) |
| throw new Error(msg.getMessage()); |
| System.err.println(msg.getMessage()); |
| throw new Error("Evaluation error in: "+print_query(e)); |
| } catch (Exception ex) { |
| System.err.println(ex.getMessage()); |
| ex.printStackTrace(); |
| throw new Error("Evaluation error in: "+print_query(e)); |
| } |
| } |
| } |