blob: 27d9651ef5b53cb2b2c5864540a3a5f06bd10011 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import org.apache.mrql.gen.*;
import java.util.List;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hama.HamaConfiguration;
/** Evaluate a physical plan in BSP mode using Apache Hama */
final public class BSPEvaluator extends Evaluator {
/** initialize the BSP evaluator */
final public void init ( Configuration conf ) {
Config.bsp_mode = true;
if (Config.hadoop_mode)
if (Config.local_mode) {
conf.set("bsp.master.address","local");
conf.set("hama.zookeeper.quorum","localhost");
conf.setInt("bsp.local.tasks.maximum",Config.nodes);
conf.set("fs.defaultFS","file:///");
} else {
conf.set("bsp.master.address",System.getenv("BSP_MASTER_ADDRESS"));
conf.set("hama.zookeeper.quorum",System.getenv("HAMA_ZOOKEEPER_QUORUM"));
conf.setInt("bsp.local.tasks.maximum",Config.nodes);
if (!System.getenv("FS_DEFAULT_NAME").equals(""))
conf.set("fs.defaultFS",System.getenv("FS_DEFAULT_NAME"));
}
}
final public void initialize_query () {
}
/** shutdown the BSP evaluator */
final public void shutdown ( Configuration conf ) {
}
final public Configuration new_configuration () {
return new HamaConfiguration();
}
final public MR_bool synchronize ( MR_string peerName, MR_bool mr_exit ) {
if (Config.hadoop_mode)
return BSPPlan.BSPop.synchronize(peerName,mr_exit);
else return mr_exit;
}
final public Bag distribute ( MR_string peerName, Bag s ) {
if (Config.hadoop_mode)
return BSPPlan.BSPop.distribute(peerName,s);
else return s;
}
/** Evaluate MRQL physical operators using BSP
* @param plan the physical plan
* @param env contains bindings fro variables to values (MRData)
* @return a DataSet (stored in HDFS)
*/
final public MRData bsp ( Tree plan, Environment env ) throws Exception {
match plan {
case BSP(tuple(...ns),`superstep,`init_state,`o,...S):
int[] os = new int[ns.length()];
for ( int i = 0; i < os.length; i++ )
os[i] = (int)((LongLeaf)ns.nth(i)).value();
DataSet ds = eval(S.head(),env);
for ( Tree s: S.tail() )
ds.merge(eval(s,env));
return BSPPlan.BSP(os,
closure(superstep,env),
init_state,
o.equals(#<true>),
ds);
case BSP(`n,`superstep,`init_state,`o,...S):
DataSet ds = eval(S.head(),env);
for ( Tree s: S.tail() )
ds.merge(eval(s,env));
return BSPPlan.BSP(new int[]{(int)((LongLeaf)n).value()},
closure(superstep,env),
init_state,
o.equals(#<true>),
ds);
}
throw new Error("Cannot perform the BSP plan: "+plan);
}
/** return the FileInputFormat for parsed files (CSV, XML, JSON, etc) */
final public Class<? extends MRQLFileInputFormat> parsedInputFormat () {
return BSPParsedInputFormat.class;
}
/** return the FileInputFormat for binary files */
final public Class<? extends MRQLFileInputFormat> binaryInputFormat () {
return BSPBinaryInputFormat.class;
}
/** return the FileInputFormat for data generator files */
final public Class<? extends MRQLFileInputFormat> generatorInputFormat () {
return BSPGeneratorInputFormat.class;
}
/** Coerce a persistent collection to a Bag */
public Bag toBag ( MRData data ) {
try {
if (data instanceof MR_dataset)
return Plan.collect(((MR_dataset)data).dataset());
} catch (Exception ex) {
throw new Error("Cannot coerce "+data+" to a Bag");
};
return (Bag)data;
}
/** The Aggregate physical operator
* @param acc_fnc the accumulator function from (T,T) to T
* @param zero the zero element of type T
* @param plan the plan that constructs the dataset that contains the bag of values {T}
* @param env contains bindings fro variables to values (MRData)
* @return the aggregation result of type T
*/
final public MRData aggregate ( Tree acc_fnc,
Tree zero,
Tree plan,
Environment env ) throws Exception {
match plan {
case BSP(`n,`superstep,`init_state,`o,...S):
DataSet ds = eval(S.head(),env,"-");
for ( Tree s: S.tail() )
ds.merge(eval(s,env,"-"));
return BSPPlan.BSPaggregate((int)((LongLeaf)n).value(),
closure(superstep,env),
init_state,
closure(acc_fnc,env),
zero,
ds);
}
throw new Error("Cannot perform the aggregation: "+plan);
}
/** Evaluate a loop a fixed # of times */
final public Tuple loop ( Tree e, Environment env ) throws Exception {
match e {
case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
int limit = ((MR_int)evalE(num,env)).get();
MR_dataset[] s = new MR_dataset[vs.length()];
for ( int i = 0; i < vs.length(); i++ )
s[i] = new MR_dataset(eval(ss.nth(i),env,"-"));
for ( int n = 0; n < limit; n++ ) {
Environment nenv = env;
for ( int i = 0; i < vs.length(); i ++ )
nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
for ( int i = 0; i < vs.length(); i ++ )
s[i] = new MR_dataset(eval(bs.nth(i),nenv,"-"));
};
return new Tuple(s);
};
throw new Error("Wrong Loop format");
}
final public DataSet eval ( final Tree e,
final Environment env,
final String counter ) {
return eval(e,env);
}
/** Evaluate MRQL BSP physical operators using Hama (returns a DataSet)
* and print tracing info
* @param e the physical plan
* @param env contains bindings fro variables to values (MRData)
* @return a DataSet (stored in HDFS)
*/
final public DataSet eval ( final Tree e, final Environment env ) {
if (Config.trace_execution) {
tab_count += 3;
System.out.println(tabs(tab_count)+print_query(e));
};
DataSet res = evalD(e,env);
if (Config.trace_execution)
try {
System.out.println(tabs(tab_count)
+"-> "+res.take(Config.max_bag_size_print));
tab_count -= 3;
} catch (Exception ex) {
throw new Error("Cannot collect the operator output: "+e);
};
return res;
}
/** Evaluate MRQL BSP physical operators using Hama (returns a DataSet)
* @param e the physical plan
* @param env contains bindings fro variables to values (MRData)
* @return a DataSet (stored in HDFS)
*/
final public DataSet evalD ( final Tree e, final Environment env ) {
try {
match e {
case BSPSource(`n,BinarySource(`file,_)):
return Plan.binarySource((int)((LongLeaf)n).value(),file.stringValue());
case BSPSource(`n,ParsedSource(`parser,`file,...args)):
Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
if (p == null)
throw new Error("Unknown parser: "+parser);
return Plan.parsedSource((int)((LongLeaf)n).value(),
p,((MR_string)evalE(file,env)).get(),args);
case BSPSource(`n,Generator(`min,`max,`size)):
return Plan.generator((int)((LongLeaf)n).value(),
((MR_long)evalE(min,env)).get(),
((MR_long)evalE(max,env)).get(),
((MR_long)evalE(size,env)).get());
case BSPSource(`n,`v):
if (!v.is_variable())
fail;
DataSet ds = evalD(v,env);
for ( DataSource s: ds.source )
s.source_num = (int)((LongLeaf)n).value();
return ds;
case Merge(`x,`y):
return Plan.merge(eval(x,env),eval(y,env));
case Dump(`s):
return Plan.fileCache((Bag)evalE(s,env));
case trace(`msg,`tp,`x):
long n = pre_trace(((MR_string)evalE(msg,env)).get());
DataSet ds = evalD(x,env);
trace(n,tp,new MR_dataset(ds));
return ds;
case apply(`f,`arg):
if (!f.is_variable())
return ((MR_dataset)evalF(f,env).eval(evalE(arg))).dataset();
MRData fnc = variable_lookup(f.toString(),global_env);
if (fnc == null)
throw new Error("Unknown function: "+f);
MRData t = evalE(arg,env);
if (!(t instanceof Tuple))
throw new Error("Expected a tuple in function application: "+t);
return ((MR_dataset)((Lambda)fnc).lambda().eval(t)).dataset();
case nth(`u,`n):
MRData x = ((Tuple)evalE(u,env)).get((int)n.longValue());
if (x instanceof MR_dataset)
return ((MR_dataset)x).dataset();
throw new Error("Evaluation error in: "+print_query(e));
case `v:
if (!v.is_variable())
fail;
MRData x = variable_lookup(v.toString(),env);
if (x != null)
if (x instanceof MR_dataset)
return ((MR_dataset)x).dataset();
x = variable_lookup(v.toString(),global_env);
if (x != null)
if (x instanceof MR_dataset)
return ((MR_dataset)x).dataset();
throw new Error("Variable "+v+" is not bound");
};
MRData d = bsp(e,env);
if (d instanceof MR_dataset)
return ((MR_dataset)d).dataset();
throw new Error("Cannot evaluate the BSP plan: "+e);
} catch (Error msg) {
if (!Config.trace)
throw new Error(msg.getMessage());
System.err.println(msg.getMessage());
throw new Error("Evaluation error in: "+print_query(e));
} catch (Exception ex) {
System.err.println(ex.getMessage());
ex.printStackTrace();
throw new Error("Evaluation error in: "+print_query(e));
}
}
}