| /* |
| * Copyright 2016 The Apache Software Foundation. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.mrql; |
| import java.io.BufferedReader; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.io.PrintStream; |
| import java.io.Serializable; |
| import java.lang.reflect.Method; |
| import java.net.URL; |
| import java.net.URLClassLoader; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.mrql.gen.Node; |
| import org.apache.mrql.gen.Tree; |
| import org.apache.storm.trident.Stream; |
| import org.apache.storm.trident.TridentTopology; |
| import org.apache.storm.trident.operation.FlatMapFunction; |
| import org.apache.storm.trident.tuple.TridentTuple; |
| import org.apache.storm.tuple.Fields; |
| import org.apache.storm.tuple.Values; |
| import org.apache.storm.trident.fluent.GroupedStream; |
| import org.apache.storm.trident.operation.BaseAggregator; |
| import org.apache.storm.trident.operation.BaseFunction; |
| import org.apache.storm.trident.operation.MapFunction; |
| import org.apache.storm.trident.operation.TridentCollector; |
| |
| public class StormEvaluator extends Evaluator implements Serializable { |
| |
| public static TridentTopology topology; |
| static Environment global_streams = null; |
| final static String data_source_dir_name = "tmp/"+System.getenv("USER")+"_data_source_dir.txt"; |
| private static Function f; |
| |
| @Override |
| public void init(Configuration conf) { |
| global_streams = null; |
| if (Config.stream_window > 0 && (Config.local_mode || Config.hadoop_mode)) { |
| if (Config.trace_execution) { |
| System.out.println("Creating a new storm topology"); |
| }; |
| } |
| topology = new TridentTopology(); |
| Plan.conf = new Configuration(); |
| if (Config.hadoop_mode && Config.local_mode) { |
| FileSystem.setDefaultUri(Plan.conf,"file:///"); |
| } else if (Config.hadoop_mode) { |
| if (!System.getenv("FS_DEFAULT_NAME").equals("")) |
| FileSystem.setDefaultUri(Plan.conf,System.getenv("FS_DEFAULT_NAME")); |
| }; |
| } |
| |
| @Override |
| public void initialize_query() { |
| Plan.distribute_compiled_arguments(Plan.conf); |
| String jarPath = Plan.conf.get("mrql.jar.path"); |
| try{ |
| addURL(new URL("file://"+jarPath)); |
| } |
| catch(Exception e){ |
| e.printStackTrace(); |
| } |
| } |
| |
| public void addURL(URL url) throws Exception { |
| URLClassLoader classLoader |
| = (URLClassLoader) ClassLoader.getSystemClassLoader(); |
| Class clazz= URLClassLoader.class; |
| |
| // Use reflection |
| Method method= clazz.getDeclaredMethod("addURL", new Class[] { URL.class }); |
| method.setAccessible(true); |
| method.invoke(classLoader, new Object[] { url }); |
| } |
| |
| @Override |
| public void shutdown(Configuration conf) { |
| |
| } |
| |
| @Override |
| public Configuration new_configuration() { |
| return new Configuration(); |
| } |
| |
| @Override |
| public Class<? extends MRQLFileInputFormat> parsedInputFormat() { |
| return StormParsedInputFormat.class; |
| } |
| |
| @Override |
| public Class<? extends MRQLFileInputFormat> binaryInputFormat() { |
| return StormBinaryInputFormat.class; |
| } |
| |
| @Override |
| public Class<? extends MRQLFileInputFormat> generatorInputFormat() { |
| return null; |
| } |
| |
| /** used by the master to send parsing details (eg, record types) to workers */ |
| public static void dump_source_dir () throws IOException { |
| if (Config.local_mode) |
| return; |
| DataSource.dataSourceDirectory.distribute(Plan.conf); |
| Path path = new Path(data_source_dir_name); |
| FileSystem fs = path.getFileSystem(Plan.conf); |
| PrintStream ps = new PrintStream(fs.create(path,true)); |
| ps.println(Plan.conf.get("mrql.data.source.directory")); |
| ps.close(); |
| } |
| |
| /** executed by a worker when reading parsed input (see SparkParsedInputFormat) */ |
| public static void load_source_dir () throws IOException { |
| if (Plan.conf == null) { |
| if (evaluator == null) |
| evaluator = new StormEvaluator(); |
| Plan.conf = evaluator.new_configuration(); |
| Config.read(Plan.conf); |
| }; |
| if (Config.local_mode) |
| return; |
| // the name of the file that contains the source directory details is read from an HDFS file by workers |
| Path path = new Path(data_source_dir_name); |
| FileSystem fs = path.getFileSystem(Plan.conf); |
| BufferedReader ftp = new BufferedReader(new InputStreamReader(fs.open(path))); |
| Plan.conf.set("mrql.data.source.directory",ftp.readLine()); |
| DataSource.dataSourceDirectory.read(Plan.conf); |
| ftp.close(); |
| } |
| |
| private static Bag bag ( final Iterable<MRData> s ) { |
| final Iterator<MRData> i = s.iterator(); |
| return new Bag(new BagIterator() { |
| public MRData next () { |
| return i.next(); |
| } |
| public boolean hasNext () { |
| return i.hasNext(); |
| } |
| }); |
| } |
| |
| /** Coerce a persistent collection to a Bag */ |
| @Override |
| public Bag toBag ( MRData data ) { |
| try { |
| return (Bag)data; |
| } catch (Exception ex) { |
| throw new Error("Cannot coerce "+data+" to a Bag: "+ex); |
| } |
| } |
| |
| @Override |
| public MRData aggregate(Tree acc_fnc, Tree zero, Tree plan, Environment env) throws Exception { |
| return null; |
| } |
| |
| @Override |
| public Tuple loop(Tree e, Environment env) throws Exception { |
| return null; |
| } |
| |
| @Override |
| public void streaming(Tree plan, Environment env, Environment dataset_env, Function f) { |
| this.f = f; |
| StormStreaming.evaluate(plan, env, dataset_env, f); |
| } |
| |
| @Override |
| public DataSet eval(Tree e, Environment env, String counter) { |
| Stream res = eval(e, env,(Environment)null); |
| res.map(new MapFunction() { |
| @Override |
| public Values execute(TridentTuple input) { |
| MRData dataset = (MRData)input.get(0); |
| f.eval(dataset); |
| return new Values(input); |
| } |
| }); |
| |
| DataSet data = new DataSet(new StreamDataSource(res), -1, -1); |
| return data; |
| } |
| |
| final public Stream eval(final Tree e, final Environment env,final Environment stream_env){ |
| if (Config.trace_execution) { |
| tab_count += 3; |
| System.out.println(tabs(tab_count) + print_query(e)); |
| }; |
| final Stream res = evalD(e, env,stream_env); |
| return res; |
| } |
| |
| final public Stream evalD ( final Tree e, final Environment env,final Environment stream_env ) { |
| try { |
| match e { |
| case cMap(`f,`s): |
| return eval(s,env,stream_env).flatMap(cmap_fnc(f,env)); |
| case MapReduce(`m,`r,`s,`o): |
| Stream mappedStream = eval(s,env,stream_env).flatMap(cmap_fnc(m,env)); |
| return groupBy(mappedStream,r,env,o); |
| case MapCombineReduce(`m,`c,`r,`s,`o): |
| Stream mappedStream = eval(s,env,stream_env).flatMap(cmap_fnc(m,env)); |
| return groupBy(mappedStream,r,env,o); |
| case `v: |
| if (!v.is_variable()) |
| fail; |
| MRData x = variable_lookup(v.toString(),global_streams); |
| if (x != null && x instanceof MR_stream) |
| return ((MR_stream)x).stream(); |
| |
| x = variable_lookup(v.toString(),env); |
| throw new Error("Variable "+v+" is not bound"); |
| }; |
| throw new Error("Unrecognized Storm plan: "+e); |
| } catch (Error msg) { |
| if (!Config.trace) |
| throw new Error(msg.getMessage()); |
| System.err.println(msg.getMessage()); |
| throw new Error("Evaluation error in: "+print_query(e)); |
| } catch (Exception ex) { |
| System.err.println(ex.getMessage()); |
| ex.printStackTrace(); |
| throw new Error("Evaluation error in: "+print_query(e)); |
| } |
| } |
| |
| private static Stream reduce_output(Stream s){ |
| return null; |
| } |
| |
| private static FlatMapFunction cmap_fnc ( final Tree fnc,Environment env ) { |
| final Function f = evalF(fnc,null); |
| return new FlatMapFunction() { |
| @Override |
| public Iterable<Values> execute(TridentTuple input) { |
| List<Values> out = new ArrayList<Values>(); |
| MRData value = (MRData)input.get(0); |
| for (MRData e: (Bag)f.eval(value) ){ |
| out.add(new Values(e)); |
| } |
| return out; |
| } |
| }; |
| } |
| |
| private static Stream groupBy ( Stream s, Tree fnc, Environment env, Tree o ) { |
| final Function reducer = evalF(fnc,null); |
| |
| Stream keyValueStream = s.each(s.getOutputFields(),new BaseFunction() { |
| @Override |
| public void execute(TridentTuple input, TridentCollector output) { |
| Tuple value = (Tuple)input.get(0); |
| MRData key = value.first(); |
| MRData values = (MRData)value.second(); |
| output.emit(new Values(key,values)); |
| } |
| },new Fields("keyfield","valuefield")); |
| GroupedStream groupedStream = keyValueStream.groupBy(new Fields("keyfield")); |
| Stream finalStream = groupedStream.aggregate(new Fields("keyfield","valuefield"), new BaseAggregator<Map<MRData,Bag>>() { |
| @Override |
| public Map<MRData,Bag> init(Object batchId, TridentCollector collector) { |
| return new HashMap<MRData,Bag>(); |
| } |
| @Override |
| public void aggregate(Map<MRData,Bag> val, TridentTuple tuple, TridentCollector collector) { |
| MRData key = (MRData)tuple.get(0); |
| Bag values; |
| if(!val.containsKey(key)){ |
| values = new Bag(); |
| } |
| else{ |
| values = val.get(key); |
| } |
| MRData value = (MRData)tuple.get(1); |
| values.add(value); |
| val.put(key,values); |
| } |
| |
| @Override |
| public void complete(Map<MRData,Bag> val, TridentCollector collector) { |
| for (Map.Entry<MRData, Bag> entry : val.entrySet()) { |
| MRData key = entry.getKey(); |
| Bag value = entry.getValue(); |
| Bag reducedValues = (Bag)reducer.eval(new Tuple(key,value)); |
| collector.emit(new Values(reducedValues)); |
| } |
| } |
| }, new Fields("outputdata")); |
| return finalStream.project(new Fields("outputdata")); |
| } |
| } |