blob: edcbb44a8d4e61e51f65e058c38662318787d0b8 [file] [log] [blame]
/*
* Copyright 2016 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.mrql.gen.Node;
import org.apache.mrql.gen.Tree;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.FlatMapFunction;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.trident.fluent.GroupedStream;
import org.apache.storm.trident.operation.BaseAggregator;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.MapFunction;
import org.apache.storm.trident.operation.TridentCollector;
public class StormEvaluator extends Evaluator implements Serializable {
public static TridentTopology topology;
static Environment global_streams = null;
final static String data_source_dir_name = "tmp/"+System.getenv("USER")+"_data_source_dir.txt";
private static Function f;
@Override
public void init(Configuration conf) {
global_streams = null;
if (Config.stream_window > 0 && (Config.local_mode || Config.hadoop_mode)) {
if (Config.trace_execution) {
System.out.println("Creating a new storm topology");
};
}
topology = new TridentTopology();
Plan.conf = new Configuration();
if (Config.hadoop_mode && Config.local_mode) {
FileSystem.setDefaultUri(Plan.conf,"file:///");
} else if (Config.hadoop_mode) {
if (!System.getenv("FS_DEFAULT_NAME").equals(""))
FileSystem.setDefaultUri(Plan.conf,System.getenv("FS_DEFAULT_NAME"));
};
}
@Override
public void initialize_query() {
Plan.distribute_compiled_arguments(Plan.conf);
String jarPath = Plan.conf.get("mrql.jar.path");
try{
addURL(new URL("file://"+jarPath));
}
catch(Exception e){
e.printStackTrace();
}
}
public void addURL(URL url) throws Exception {
URLClassLoader classLoader
= (URLClassLoader) ClassLoader.getSystemClassLoader();
Class clazz= URLClassLoader.class;
// Use reflection
Method method= clazz.getDeclaredMethod("addURL", new Class[] { URL.class });
method.setAccessible(true);
method.invoke(classLoader, new Object[] { url });
}
@Override
public void shutdown(Configuration conf) {
}
@Override
public Configuration new_configuration() {
return new Configuration();
}
@Override
public Class<? extends MRQLFileInputFormat> parsedInputFormat() {
return StormParsedInputFormat.class;
}
@Override
public Class<? extends MRQLFileInputFormat> binaryInputFormat() {
return StormBinaryInputFormat.class;
}
@Override
public Class<? extends MRQLFileInputFormat> generatorInputFormat() {
return null;
}
/** used by the master to send parsing details (eg, record types) to workers */
public static void dump_source_dir () throws IOException {
if (Config.local_mode)
return;
DataSource.dataSourceDirectory.distribute(Plan.conf);
Path path = new Path(data_source_dir_name);
FileSystem fs = path.getFileSystem(Plan.conf);
PrintStream ps = new PrintStream(fs.create(path,true));
ps.println(Plan.conf.get("mrql.data.source.directory"));
ps.close();
}
/** executed by a worker when reading parsed input (see SparkParsedInputFormat) */
public static void load_source_dir () throws IOException {
if (Plan.conf == null) {
if (evaluator == null)
evaluator = new StormEvaluator();
Plan.conf = evaluator.new_configuration();
Config.read(Plan.conf);
};
if (Config.local_mode)
return;
// the name of the file that contains the source directory details is read from an HDFS file by workers
Path path = new Path(data_source_dir_name);
FileSystem fs = path.getFileSystem(Plan.conf);
BufferedReader ftp = new BufferedReader(new InputStreamReader(fs.open(path)));
Plan.conf.set("mrql.data.source.directory",ftp.readLine());
DataSource.dataSourceDirectory.read(Plan.conf);
ftp.close();
}
private static Bag bag ( final Iterable<MRData> s ) {
final Iterator<MRData> i = s.iterator();
return new Bag(new BagIterator() {
public MRData next () {
return i.next();
}
public boolean hasNext () {
return i.hasNext();
}
});
}
/** Coerce a persistent collection to a Bag */
@Override
public Bag toBag ( MRData data ) {
try {
return (Bag)data;
} catch (Exception ex) {
throw new Error("Cannot coerce "+data+" to a Bag: "+ex);
}
}
@Override
public MRData aggregate(Tree acc_fnc, Tree zero, Tree plan, Environment env) throws Exception {
return null;
}
@Override
public Tuple loop(Tree e, Environment env) throws Exception {
return null;
}
@Override
public void streaming(Tree plan, Environment env, Environment dataset_env, Function f) {
this.f = f;
StormStreaming.evaluate(plan, env, dataset_env, f);
}
@Override
public DataSet eval(Tree e, Environment env, String counter) {
Stream res = eval(e, env,(Environment)null);
res.map(new MapFunction() {
@Override
public Values execute(TridentTuple input) {
MRData dataset = (MRData)input.get(0);
f.eval(dataset);
return new Values(input);
}
});
DataSet data = new DataSet(new StreamDataSource(res), -1, -1);
return data;
}
final public Stream eval(final Tree e, final Environment env,final Environment stream_env){
if (Config.trace_execution) {
tab_count += 3;
System.out.println(tabs(tab_count) + print_query(e));
};
final Stream res = evalD(e, env,stream_env);
return res;
}
final public Stream evalD ( final Tree e, final Environment env,final Environment stream_env ) {
try {
match e {
case cMap(`f,`s):
return eval(s,env,stream_env).flatMap(cmap_fnc(f,env));
case MapReduce(`m,`r,`s,`o):
Stream mappedStream = eval(s,env,stream_env).flatMap(cmap_fnc(m,env));
return groupBy(mappedStream,r,env,o);
case MapCombineReduce(`m,`c,`r,`s,`o):
Stream mappedStream = eval(s,env,stream_env).flatMap(cmap_fnc(m,env));
return groupBy(mappedStream,r,env,o);
case `v:
if (!v.is_variable())
fail;
MRData x = variable_lookup(v.toString(),global_streams);
if (x != null && x instanceof MR_stream)
return ((MR_stream)x).stream();
x = variable_lookup(v.toString(),env);
throw new Error("Variable "+v+" is not bound");
};
throw new Error("Unrecognized Storm plan: "+e);
} catch (Error msg) {
if (!Config.trace)
throw new Error(msg.getMessage());
System.err.println(msg.getMessage());
throw new Error("Evaluation error in: "+print_query(e));
} catch (Exception ex) {
System.err.println(ex.getMessage());
ex.printStackTrace();
throw new Error("Evaluation error in: "+print_query(e));
}
}
private static Stream reduce_output(Stream s){
return null;
}
private static FlatMapFunction cmap_fnc ( final Tree fnc,Environment env ) {
final Function f = evalF(fnc,null);
return new FlatMapFunction() {
@Override
public Iterable<Values> execute(TridentTuple input) {
List<Values> out = new ArrayList<Values>();
MRData value = (MRData)input.get(0);
for (MRData e: (Bag)f.eval(value) ){
out.add(new Values(e));
}
return out;
}
};
}
private static Stream groupBy ( Stream s, Tree fnc, Environment env, Tree o ) {
final Function reducer = evalF(fnc,null);
Stream keyValueStream = s.each(s.getOutputFields(),new BaseFunction() {
@Override
public void execute(TridentTuple input, TridentCollector output) {
Tuple value = (Tuple)input.get(0);
MRData key = value.first();
MRData values = (MRData)value.second();
output.emit(new Values(key,values));
}
},new Fields("keyfield","valuefield"));
GroupedStream groupedStream = keyValueStream.groupBy(new Fields("keyfield"));
Stream finalStream = groupedStream.aggregate(new Fields("keyfield","valuefield"), new BaseAggregator<Map<MRData,Bag>>() {
@Override
public Map<MRData,Bag> init(Object batchId, TridentCollector collector) {
return new HashMap<MRData,Bag>();
}
@Override
public void aggregate(Map<MRData,Bag> val, TridentTuple tuple, TridentCollector collector) {
MRData key = (MRData)tuple.get(0);
Bag values;
if(!val.containsKey(key)){
values = new Bag();
}
else{
values = val.get(key);
}
MRData value = (MRData)tuple.get(1);
values.add(value);
val.put(key,values);
}
@Override
public void complete(Map<MRData,Bag> val, TridentCollector collector) {
for (Map.Entry<MRData, Bag> entry : val.entrySet()) {
MRData key = entry.getKey();
Bag value = entry.getValue();
Bag reducedValues = (Bag)reducer.eval(new Tuple(key,value));
collector.emit(new Values(reducedValues));
}
}
}, new Fields("outputdata"));
return finalStream.project(new Fields("outputdata"));
}
}