| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.mrql; |
| |
| import java.util.List; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Arrays; |
| import java.util.Iterator; |
| import java.io.*; |
| import java.net.URI; |
| import java.net.URL; |
| import org.apache.mrql.gen.*; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.flink.util.Collector; |
| import org.apache.flink.core.fs.Path; |
| import org.apache.flink.core.fs.FileSystem; |
| import org.apache.flink.api.common.io.FileInputFormat; |
| import org.apache.flink.api.common.functions.*; |
| import org.apache.flink.api.common.operators.*; |
| import org.apache.flink.api.java.*; |
| import org.apache.flink.api.java.DataSet; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.api.java.io.LocalCollectionOutputFormat; |
| import org.apache.flink.api.java.io.DiscardingOutputFormat; |
| import org.apache.flink.api.java.functions.*; |
| import org.apache.flink.api.java.operators.*; |
| import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
| import org.apache.flink.client.program.ContextEnvironment; |
| |
| |
| /** Evaluates physical plans in Apache Flink mode */ |
| public class FlinkEvaluator extends Evaluator implements Serializable { |
| final static URL flink_jar = FlinkEvaluator.class.getProtectionDomain().getCodeSource().getLocation(); |
| public static ExecutionEnvironment flink_env; |
| public static StreamExecutionEnvironment stream_env; |
| // an HDFS tmp file used to hold the data source directory information in distributed mode |
| static String data_source_dir_name; |
| static String master_host = ""; |
| static int master_port = 6123; |
| static String fs_default_name; |
| |
| /** initialize the Flink evaluator */ |
| final public void init ( Configuration conf ) { |
| DataSource.parserDirectory = new DataSource.ParserDirectory(); |
| DataSource.parserDirectory.put("xml",FlinkXMLParser.class); |
| DataSource.parserDirectory.put("json",FlinkJsonParser.class); |
| DataSource.parserDirectory.put("line",FlinkLineParser.class); |
| // Disable bag materialization (it uses the Hadoop HDFS library instead of Flink's) |
| Config.max_materialized_bag = Long.MAX_VALUE; |
| if (Config.local_mode) { |
| flink_env = ExecutionEnvironment.createLocalEnvironment(Config.nodes); |
| // curently, the compiler doesn't work in local mode |
| Config.compile_functional_arguments = false; |
| fs_default_name = "file:///"; |
| Plan.conf.set("fs.defaultFS",fs_default_name); |
| } else if (Config.distributed_mode) { |
| String master_node = System.getenv("FLINK_MASTER"); |
| if (master_node == null) |
| throw new Error("Need to run the Flink application master first: $FLINK_HOME/bin/yarn-session.sh"); |
| if (!master_node.equals("")) { |
| String[] m = master_node.split(":"); |
| if (m.length != 2) |
| throw new Error("Need both host name and port number for the Flink application master: "+master_node); |
| master_host = m[0]; |
| master_port = Integer.parseInt(m[1]); |
| }; |
| fs_default_name = System.getenv("FS_DEFAULT_NAME"); |
| data_source_dir_name = absolute_path("tmp/data_source_dir.txt"); |
| Plan.conf.set("fs.defaultFS",fs_default_name); |
| } |
| } |
| |
| /** shutdown the Flink evaluator */ |
| final public void shutdown ( Configuration conf ) { |
| } |
| |
| final public void initialize_query () { |
| try { |
| if (!Config.local_mode) { |
| if (Config.compile_functional_arguments) { |
| Plan.distribute_compiled_arguments(Plan.conf); |
| if (master_host.equals("")) { |
| ContextEnvironment ce = (ContextEnvironment)ExecutionEnvironment.getExecutionEnvironment(); |
| List<URL> jars = Arrays.asList(flink_jar,new URL("file://"+Plan.conf.get("mrql.jar.path"))); |
| flink_env = new ContextEnvironment(ce.getClient(),jars,new ArrayList<URL>(), |
| FlinkEvaluator.class.getClassLoader(), |
| Plan.new_path(Plan.conf)); |
| flink_env.setParallelism(Config.nodes); |
| } else flink_env = ExecutionEnvironment.createRemoteEnvironment(master_host,master_port, |
| Config.nodes,flink_jar.toURI().getPath(),Plan.conf.get("mrql.jar.path")); |
| } else if (master_host.equals("")) |
| flink_env = ExecutionEnvironment.getExecutionEnvironment(); |
| else flink_env = ExecutionEnvironment.createRemoteEnvironment(master_host,master_port, |
| Config.nodes,flink_jar.toURI().getPath()); |
| } else { |
| flink_env.setDefaultLocalParallelism(Config.nodes); |
| if (Config.stream_window > 0) |
| stream_env = StreamExecutionEnvironment.createLocalEnvironment(Config.nodes) |
| .setBufferTimeout(Config.stream_window); |
| }; |
| if (!Config.info) |
| flink_env.getConfig().disableSysoutLogging(); |
| } catch (Exception ex) { |
| throw new Error("Cannot initialize the Flink evaluator: "+ex); |
| } |
| } |
| |
| final public Configuration new_configuration () { |
| return new Configuration(); |
| } |
| |
| /** return the FileInputFormat for parsed files (CSV, XML, JSON, etc) */ |
| final public Class<? extends MRQLFileInputFormat> parsedInputFormat () { |
| return FlinkParsedInputFormat.class; |
| } |
| |
| /** return the FileInputFormat for binary files */ |
| final public Class<? extends MRQLFileInputFormat> binaryInputFormat () { |
| return FlinkSequenceInputFormat.class; |
| } |
| |
| /** return the FileInputFormat for data generator files */ |
| final public Class<? extends MRQLFileInputFormat> generatorInputFormat () { |
| return FlinkGeneratorInputFormat.class; |
| } |
| |
| /** evaluate plan in stream mode: evaluate each batch of data and apply the function f */ |
| final public void streaming ( Tree plan, Environment env, Function f ) { |
| FlinkStreaming.evaluate(plan,env,f); |
| } |
| |
| /** returns the absolute path relative to the directory that contains the MRQL executable */ |
| public static String absolute_path ( String path) { |
| try { |
| URI uri = new URI(path); |
| if (uri.getScheme() == null) |
| if (Config.hadoop_mode && !Config.local_mode) |
| if (uri.toString().startsWith("/")) |
| uri = new URI(fs_default_name+uri); |
| else uri = new URI(fs_default_name+"/user/"+System.getProperty("user.name")+"/"+uri); |
| else if (uri.toString().startsWith("/")) |
| uri = new URI("file:"+uri); |
| else uri = new URI("file:"+System.getProperty("user.dir")+"/"+uri); |
| uri = uri.normalize(); |
| return uri.toString(); |
| } catch (Exception ex) { |
| throw new Error("Wrong pathname: "+path); |
| } |
| } |
| |
| /** for dumped data to a file, return the MRQL type of the data */ |
| @Override |
| public Tree get_type ( String file ) { |
| return super.get_type(absolute_path(file)); |
| } |
| |
| /** dump MRQL data into a Hadoop Sequence file */ |
| @Override |
| public void dump ( String file, Tree type, MRData data ) throws Exception { |
| super.dump(absolute_path(file),type,data); |
| } |
| |
| /** dump MRQL data into a text CVS file */ |
| @Override |
| public void dump_text ( String file, Tree type, MRData data ) throws Exception { |
| int ps = Config.max_bag_size_print; |
| Config.max_bag_size_print = -1; |
| Path path = new Path(absolute_path(file)); |
| FileSystem fs = path.getFileSystem(); |
| final PrintStream out = new PrintStream(fs.create(path,true)); |
| if (data instanceof MR_dataset) |
| data = Plan.collect(((MR_dataset)data).dataset()); |
| if (Translator.collection_type(type)) { |
| Tree tp = ((Node)type).children().head(); |
| if (tp instanceof Node && ((Node)tp).name().equals("tuple")) { |
| Trees ts = ((Node)tp).children(); |
| for ( MRData x: (Bag)data ) { |
| Tuple t = (Tuple)x; |
| out.print(print(t.get((short)0),ts.nth(0))); |
| for ( short i = 1; i < t.size(); i++ ) |
| out.print(","+print(t.get(i),ts.nth(i))); |
| out.println(); |
| } |
| } else for ( MRData x: (Bag)data ) |
| out.println(print(x,tp)); |
| } else out.println(print(data,query_type)); |
| Config.max_bag_size_print = ps; |
| out.close(); |
| } |
| |
| private static MRData aggregate ( DataSet<FData> data_set, MRData zero, Tree acc_fnc, Tree merge_fnc, Tree type ) { |
| DataSet<FData> d = data_set.flatMap(new FlinkDataSource.reduce_mapper(zero,acc_fnc.toString(), |
| merge_fnc.toString())); |
| //d.print(); // needs a datasink (prints the aggregation value) |
| d.output(new DiscardingOutputFormat()); |
| try { |
| // due to a Flink bug, we cannot return a custom object (FData) from an accumulator |
| String value = FlinkEvaluator.flink_env.execute("MRQL aggregator").getAccumulatorResult("reducer"); |
| Tree tp = TopLevel.query_type; |
| MRData res = MRQL.query(value+" as "+type); // parse the value using the MRQL query parser (ugly) |
| TopLevel.query_type = tp; |
| return res; |
| } catch (Exception ex) { |
| throw new Error("Evaluation error during aggregation: "+ex); |
| } |
| } |
| |
| /* broadcast the loop value to all nodes */ |
| public static final class set_environment extends RichMapFunction<FData,FData> { |
| String var; |
| |
| set_environment ( String v ) { var = v; } |
| |
| @Override |
| public void open ( org.apache.flink.configuration.Configuration parameters ) throws Exception { |
| final Bag value = bag(getRuntimeContext().getBroadcastVariable(var)); |
| Interpreter.new_global_binding(var.toString(),value); |
| } |
| |
| @Override |
| public FData map ( FData value ) throws Exception { |
| return new FData(((Tuple)value.data()).first()); |
| } |
| } |
| |
| private static Bag bag ( Collection<Object> values ) { |
| Bag s = new Bag(); |
| for ( Object value: values ) |
| s.add(((FData)value).data()); |
| return s; |
| } |
| |
| /* convert a Bag into a DataSet */ |
| private static DataSet<FData> dataset ( Bag s ) { |
| ArrayList<FData> a = new ArrayList(s.size()); |
| for ( MRData x: s ) |
| a.add(new FData(x)); |
| return flink_env.fromCollection(a); |
| } |
| |
| /** Coerce a persistent collection to a Bag */ |
| public Bag toBag ( MRData data ) { |
| try { |
| if (data instanceof MR_flink) { |
| final Iterator<FData> i = ((MR_flink)data).flink().collect().iterator(); |
| return new Bag(new BagIterator() { |
| public MRData next () { |
| return i.next().data(); |
| } |
| public boolean hasNext () { |
| return i.hasNext(); |
| } |
| }); |
| }; |
| return (Bag)data; |
| } catch (Exception ex) { |
| throw new Error("Cannot coerce "+data+" to a Bag: "+ex); |
| } |
| } |
| |
| /** The Aggregate physical operator |
| * @param merge the accumulator function from (T,T) to T |
| * @param zero the zero element of type T |
| * @param plan the plan that constructs the dataset that contains the bag of values {T} |
| * @param env contains bindings fro variables to values (MRData) |
| * @return the aggregation result of type T |
| */ |
| final public MRData aggregate ( Tree merge, |
| Tree zero, |
| Tree plan, |
| Environment env ) throws Exception { |
| final MRData z = evalE(zero,env); |
| Tree type = TypeInference.type_inference(zero); |
| match plan { |
| case repeat(lambda(`v,`b),`s,`n): |
| // Special case for Flink: repetition where the loop result must be shared among nodes |
| final int max_num = ((MR_int)evalE(n,null)).get(); |
| final Bag init = (Bag)evalE(s,env); |
| Interpreter.new_global_binding(v.toString(),init); |
| final DataSet<FData> ds = dataset(init); |
| final IterativeDataSet<FData> startOfIteration = ds.iterate(max_num); |
| final DataSet<FData> step = eval(b,env); |
| final DataSet<FData> toBeFedBack = step.map(new set_environment(v.toString())) |
| .withBroadcastSet(startOfIteration,v.toString()); |
| // Flink doesn't allow termination criteria for kmeans-like iterations: |
| // (java.lang.IllegalStateException: Error: Iterative task without a single iterative input) |
| //final DataSet<FData> terminationCriterion = step.filter(new repeat_filter()); |
| final DataSet<FData> res = startOfIteration.closeWith(toBeFedBack);//,terminationCriterion); |
| String path = absolute_path(Plan.new_path(Plan.conf)); |
| res.write(new FlinkOutputFormat(),path); |
| flink_env.execute("MRQL repeat"); |
| Bag bs = new Bag(); |
| for ( MRData x: new FlinkDataSource(res,path,false).take(-1) ) |
| bs.add(x); |
| return bs; |
| case AggregateMap(`m,`acc,_,`s): |
| final DataSet<FData> ds = eval(s,env).mapPartition(new aggregate_partition(m,acc,z)); |
| return aggregate(ds,z,merge,merge,type); |
| case MapAggregateReduce(`m,`r,`acc,_,`s,`o): |
| if (acc.equals(#<null>)) |
| fail; |
| final DataSet<FData> ds = eval(s,env).flatMap(cmap_fnc(m)); |
| final DataSet<FData> gs = ds.groupBy(new KeySelector<FData,FData>() { |
| @Override |
| public FData getKey ( FData value ) { |
| return new FData(((Tuple)value.data()).first()); |
| } |
| }).reduceGroup(new groupBy_combiner_reducer(r,acc,z)); |
| return aggregate(gs,z,merge,merge,type); |
| case CrossAggregateProduct(`mx,`my,`r,`acc,_,`x,`y): |
| if (acc.equals(#<null>)) |
| fail; |
| final DataSet<FData> xs = eval(x,env).flatMap(cmap_fnc(mx)); |
| final DataSet<FData> ys = eval(y,env).flatMap(cmap_fnc(my)); |
| final DataSet<FData> ds = xs.cross(ys).with(new cross_combiner_reducer(r,acc,z)); |
| return aggregate(ds,z,merge,merge,type); |
| case MapAggregateReduce2(`mx,`my,`r,`acc,_,`x,`y,`o): |
| if (acc.equals(#<null>)) |
| fail; |
| final DataSet<FData> xs = eval(x,env).flatMap(cmap_fnc(mx)); |
| final DataSet<FData> ys = eval(y,env).flatMap(cmap_fnc(my)); |
| final DataSet<FData> ds = xs.coGroup(ys) |
| .where(new join_key()) |
| .equalTo(new join_key()) |
| .with(new join_combiner_reducer(r,acc,z)); |
| return aggregate(ds,z,merge,merge,type); |
| case MapAggregateJoin(`mx,`my,`r,`acc,_,`x,`y): |
| if (acc.equals(#<null>)) |
| fail; |
| return aggregate(evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env),z,acc,merge,type); |
| }; |
| throw new Error("Unrecognized aggregation: "+plan); |
| } |
| |
| /** Evaluate a loop a fixed number of times. Doesn't use Flink iterations */ |
| final public Tuple loop ( Tree e, Environment env ) throws Exception { |
| match e { |
| case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num): |
| final int limit = ((MR_int)evalE(num,env)).get(); |
| MR_flink[] s = new MR_flink[vs.length()]; |
| for ( int i = 0; i < vs.length(); i++ ) |
| s[i] = new MR_flink(eval(ss.nth(i),env)); |
| for ( int n = 0; n < limit; n++ ) { |
| Environment nenv = env; |
| for ( int i = 0; i < vs.length(); i ++ ) |
| nenv = new Environment(vs.nth(i).toString(),s[i],nenv); |
| for ( int i = 0; i < vs.length(); i ++ ) |
| s[i] = new MR_flink(eval(bs.nth(i),nenv)); |
| }; |
| return new Tuple(s); |
| }; |
| throw new Error("Wrong Loop format: "+e); |
| } |
| |
| private static boolean needs_sorting ( Tree e ) { |
| match e { |
| case MapReduce(`m,`r,`s,true): return true; |
| case MapCombineReduce(`m,`c,`r,`s,true): return true; |
| }; |
| return false; |
| } |
| |
| /** Evaluate an MRQL physical plan and print tracing info |
| * @param e the physical plan |
| * @param env contains bindings fro variables to values (MRData) |
| * @return an MRQL DataSet (stored in HDFS) |
| */ |
| final public org.apache.mrql.DataSet eval ( final Tree e, |
| final Environment env, |
| final String counter ) { |
| DataSet<FData> d = eval(e,env); |
| try { |
| String path = absolute_path(Plan.new_path(Plan.conf)); |
| d.write(new FlinkOutputFormat(),path); |
| flink_env.execute("MRQL query"); |
| return new org.apache.mrql.DataSet(new FlinkDataSource(d,path,needs_sorting(e)),0L,0L); |
| } catch (Exception ex) { |
| System.err.println(ex.getMessage()); |
| throw new Error("Cannot evaluate the query: "+e); |
| } |
| } |
| |
| /** Evaluate an MRQL physical plan using Flink and print tracing info |
| * @param e the physical plan |
| * @param env contains bindings from variables to values (MRData) |
| * @return a Flink DataSet |
| */ |
| final public DataSet<FData> eval ( final Tree e, final Environment env ) { |
| if (Config.trace_execution) { |
| tab_count += 3; |
| System.out.println(tabs(tab_count)+print_query(e)); |
| }; |
| final DataSet<FData> res = evalD(e,env); |
| if (Config.trace_execution) |
| try { |
| System.out.println(tabs(tab_count)+"->"); |
| res.first(Config.max_bag_size_print).print(); |
| tab_count -= 3; |
| } catch (Exception ex) { |
| System.err.println(ex.getMessage()); |
| throw new Error("Cannot collect the operator output: "+e); |
| }; |
| return res; |
| } |
| |
| /* convert an MRQL lambda to a Flink Function */ |
| private static RichFlatMapFunction<FData,FData> cmap_fnc ( final Tree fnc ) { |
| final Function f = evalF(fnc,null); |
| return new RichFlatMapFunction<FData,FData>() { |
| @Override |
| public void flatMap ( FData value, Collector<FData> out ) throws Exception { |
| for ( MRData e: (Bag)f.eval(value.data()) ) |
| out.collect(new FData(e)); |
| } |
| }; |
| } |
| |
| /* convert the stream of pairs to a bag of their second element */ |
| private static Bag bag ( Iterable<FData> values ) { |
| Bag s = new Bag(); |
| for ( FData value: values ) |
| s.add(((Tuple)value.data()).second()); |
| return s; |
| } |
| |
| /* group-by s and then reduce by reduce_fnc (optional: use combine_fnc) */ |
| private static DataSet<FData> groupBy ( DataSet<FData> s, Tree combine_fnc, Tree reduce_fnc ) { |
| final Function combiner = (combine_fnc == null) ? null : evalF(combine_fnc,null); |
| final Function reducer = evalF(reduce_fnc,null); |
| DataSet<FData> ds = s; |
| if (combiner != null) { |
| ds = ds.groupBy(new KeySelector<FData,FData>() { |
| @Override |
| public FData getKey ( FData value ) { |
| return new FData(((Tuple)value.data()).first()); |
| } |
| }).combineGroup(new RichGroupCombineFunction<FData,FData>() { |
| @Override |
| public void combine ( Iterable<FData> values, Collector<FData> out ) throws Exception { |
| Bag c = null; |
| MRData key = null; |
| for ( FData v: values ) |
| if (c == null) { |
| c = new Bag(((Tuple)v.data()).second()); |
| key = ((Tuple)v.data()).first(); |
| } else |
| c = (Bag)combiner.eval(new Tuple(key,c.add_element(((Tuple)v.data()).second()))); |
| for ( MRData x: c ) |
| out.collect(new FData(new Tuple(key,x))); |
| } |
| }); |
| }; |
| return ds.groupBy(new KeySelector<FData,FData>() { |
| @Override |
| public FData getKey ( FData value ) { |
| return new FData(((Tuple)value.data()).first()); |
| } |
| }).reduceGroup(new GroupReduceFunction<FData,FData>() { |
| @Override |
| public void reduce ( final Iterable<FData> values, Collector<FData> out ) { |
| Bag s = new Bag(); |
| MRData key = null; |
| for ( FData value: values ) { |
| Tuple t = (Tuple)value.data(); |
| key = t.first(); |
| s.add(t.second()); |
| }; |
| for ( MRData v: (Bag)reducer.eval(new Tuple(key,s)) ) |
| out.collect(new FData(v)); |
| } |
| }); |
| } |
| |
| /* group-by and sort s and then reduce by reduce_fnc (optional: use combine_fnc) */ |
| private static DataSet<FData> sortBy ( DataSet<FData> s, Tree combine_fnc, Tree reduce_fnc ) { |
| final Function combiner = (combine_fnc == null) ? null : evalF(combine_fnc,null); |
| final Function reducer = evalF(reduce_fnc,null); |
| DataSet<Tuple2<FData,FData>> ds = s.map(new RichMapFunction<FData,Tuple2<FData,FData>>() { |
| @Override |
| public Tuple2<FData,FData> map ( FData value ) { |
| Tuple t = (Tuple)value.data(); |
| return new Tuple2<FData,FData>(new FData(t.first()),new FData(t.second())); |
| } |
| }); |
| if (combiner != null) { |
| ds = ds.groupBy(0).combineGroup(new GroupCombineFunction<Tuple2<FData,FData>,Tuple2<FData,FData>>() { |
| @Override |
| public void combine ( Iterable<Tuple2<FData,FData>> values, |
| Collector<Tuple2<FData,FData>> out ) throws Exception { |
| Bag c = null; |
| FData key = null; |
| for ( Tuple2<FData,FData> v: values ) |
| if (c == null) { |
| c = new Bag(v.f1.data()); |
| key = v.f0; |
| } else |
| c = (Bag)combiner.eval(new Tuple(key.data(),c.add_element(v.f0.data()))); |
| for ( MRData x: c ) |
| out.collect(new Tuple2<FData,FData>(key,new FData(x))); |
| } |
| }); |
| }; |
| return ds.groupBy(0).sortGroup(0,Order.ASCENDING) |
| .reduceGroup(new RichGroupReduceFunction<Tuple2<FData,FData>,FData>() { |
| @Override |
| public void reduce ( final Iterable<Tuple2<FData,FData>> values, Collector<FData> out ) { |
| Bag s = new Bag(); |
| MRData key = null; |
| for ( Tuple2<FData,FData> value: values ) { |
| key = value.f0.data(); |
| s.add(value.f1.data()); |
| }; |
| for ( MRData v: (Bag)reducer.eval(new Tuple(key,s)) ) |
| out.collect(new FData(new Tuple(key,v))); |
| } |
| }); |
| } |
| |
| public static final class join_key implements KeySelector<FData,FData> { |
| @Override |
| public FData getKey ( FData value ) { |
| return new FData(((Tuple)value.data()).first()); |
| } |
| } |
| |
| public static final class aggregate_partition extends RichMapPartitionFunction<FData,FData> { |
| final Function m; |
| final Function acc; |
| final MRData zero; |
| |
| aggregate_partition ( Tree m, Tree acc, MRData zero ) { |
| this.m = evalF(m,null); |
| this.acc = evalF(acc,null); |
| this.zero = zero; |
| } |
| |
| @Override |
| public void mapPartition ( Iterable<FData> values, Collector<FData> out ) throws Exception { |
| MRData aggregated_value = zero; |
| for ( FData value: values ) |
| for ( MRData v: (Bag)m.eval(value.data()) ) |
| aggregated_value = acc.eval(new Tuple(aggregated_value,v)); |
| out.collect(new FData(aggregated_value)); |
| } |
| } |
| |
| public static final class groupBy_combiner_reducer extends RichGroupReduceFunction<FData,FData> { |
| final Function r; |
| final Function acc; |
| final MRData zero; |
| |
| groupBy_combiner_reducer ( Tree r, Tree acc, MRData zero ) { |
| this.r = evalF(r,null); |
| this.acc = evalF(acc,null); |
| this.zero = zero; |
| } |
| |
| @Override |
| public void reduce ( final Iterable<FData> values, Collector<FData> out ) { |
| MRData value = zero; |
| for ( MRData v: (Bag)r.eval(new Tuple(bag(values))) ) |
| value = acc.eval(new Tuple(value,v)); |
| out.collect(new FData(value)); |
| } |
| } |
| |
| public static final class join_reducer extends RichCoGroupFunction<FData,FData,FData> { |
| final Function f; |
| |
| join_reducer ( Tree fnc ) { |
| f = evalF(fnc,null); |
| } |
| |
| @Override |
| public void coGroup ( Iterable<FData> xs, Iterable<FData> ys, Collector<FData> out ) { |
| for ( MRData v: (Bag)f.eval(new Tuple(bag(xs),bag(ys))) ) |
| out.collect(new FData(v)); |
| } |
| } |
| |
| public static final class join_combiner_reducer extends RichCoGroupFunction<FData,FData,FData> { |
| final Function r; |
| final Function acc; |
| final MRData zero; |
| |
| join_combiner_reducer ( Tree r, Tree acc, MRData zero ) { |
| this.r = evalF(r,null); |
| this.acc = evalF(acc,null); |
| this.zero = zero; |
| } |
| |
| @Override |
| public void coGroup ( Iterable<FData> xs, Iterable<FData> ys, Collector<FData> out ) { |
| MRData value = zero; |
| for ( MRData v: (Bag)r.eval(new Tuple(bag(xs),bag(ys))) ) |
| value = acc.eval(new Tuple(value,v)); |
| out.collect(new FData(value)); |
| } |
| } |
| |
| public static final class cross_reducer extends RichCrossFunction<FData,FData,FData> { |
| final Function f; |
| |
| cross_reducer ( Tree fnc ) { |
| f = evalF(fnc,null); |
| } |
| |
| @Override |
| public FData cross ( FData x, FData y ) throws Exception { |
| return new FData(f.eval(new Tuple(x.data(),y.data()))); |
| } |
| } |
| |
| public static final class cross_combiner_reducer extends RichCrossFunction<FData,FData,FData> { |
| final Function r; |
| final Function acc; |
| final MRData zero; |
| |
| cross_combiner_reducer ( Tree r, Tree acc, MRData zero ) { |
| this.r = evalF(r,null); |
| this.acc = evalF(acc,null); |
| this.zero = zero; |
| } |
| |
| @Override |
| public FData cross ( FData x, FData y ) throws Exception { |
| return new FData(acc.eval(new Tuple(zero,r.eval(new Tuple(x.data(),y.data()))))); |
| } |
| } |
| |
| public static final class mapjoin_reducer extends RichJoinFunction<FData,FData,FData> { |
| final Function f; |
| |
| mapjoin_reducer ( Tree fnc ) { |
| f = evalF(fnc,null); |
| } |
| |
| @Override |
| public FData join ( FData x, FData y ) throws Exception { |
| return new FData(f.eval(new Tuple(x.data(),y.data()))); |
| } |
| } |
| |
| /** restore the global bindings at the client node */ |
| public static final class restore_global_functions extends RichMapFunction<FData,FData> { |
| Environment client_env; |
| |
| restore_global_functions () { |
| client_env = Interpreter.global_env; |
| } |
| |
| @Override |
| public void open ( org.apache.flink.configuration.Configuration parameters ) throws Exception { |
| Interpreter.set_global_bindings(client_env); |
| } |
| |
| @Override |
| public FData map ( FData value ) throws Exception { return value; } |
| } |
| |
| public static final class generator_mapper extends RichMapFunction<Long,FData> { |
| @Override |
| public FData map ( Long value ) throws Exception { |
| return new FData(new MR_long(value)); |
| } |
| } |
| |
| public static final class parsed_mapper extends RichFlatMapFunction<String,FData> { |
| final Parser parser; |
| |
| parsed_mapper ( Class<? extends Parser> parser_class ) throws Exception { |
| parser = parser_class.newInstance(); |
| } |
| |
| @Override |
| public void flatMap ( String value, Collector<FData> out ) throws Exception { |
| for ( MRData e: parser.parse(value) ) |
| out.collect(new FData(e)); |
| } |
| } |
| |
| public static final class group_join_left extends RichFlatMapFunction<FData,FData> { |
| final MRData one = new MR_byte(1); |
| final Function fgx; |
| final int m, n; |
| |
| group_join_left ( Tree gx, int m, int n ) { |
| this.m = m; |
| this.n = n; |
| fgx = evalF(gx,null); |
| } |
| |
| @Override |
| public void flatMap ( FData value, Collector<FData> out ) throws Exception { |
| for ( int i = 0; i < n; i++ ) { |
| final MRData key = new MR_int((fgx.eval(value.data()).hashCode() % m)+m*i); |
| out.collect(new FData(new Tuple(key,new Tuple(one,value.data())))); |
| } |
| } |
| } |
| |
| public static final class group_join_right extends RichFlatMapFunction<FData,FData> { |
| final MRData two = new MR_byte(2); |
| final Function fgy; |
| final int m, n; |
| |
| group_join_right ( Tree gy, int m, int n ) { |
| this.m = m; |
| this.n = n; |
| fgy = evalF(gy,null); |
| } |
| |
| @Override |
| public void flatMap ( FData value, Collector<FData> out ) throws Exception { |
| for ( int j = 0; j < m; j++ ) { |
| final MRData key = new MR_int((fgy.eval(value.data()).hashCode() % n)*m+j); |
| out.collect(new FData(new Tuple(key,new Tuple(two,value.data())))); |
| } |
| } |
| } |
| |
| public static final class group_join_reducer extends RichGroupReduceFunction<FData,FData> { |
| final Function fkx, fky, fgx, fgy, fc, fr; |
| final MRData z; |
| |
| group_join_reducer ( Tree kx, Tree ky, Tree gx, Tree gy, Tree c, Tree r, Tree zero ) { |
| fkx = evalF(kx,null); fky = evalF(ky,null); fgx = evalF(gx,null); fgy = evalF(gy,null); |
| fr = evalF(r,null); fc = evalF(c,null); z = evalE(zero,null); |
| } |
| |
| @Override |
| public void reduce ( final Iterable<FData> values, Collector<FData> out ) { |
| Bag xb = new Bag(); |
| Bag yb = new Bag(); |
| for ( FData value: values ) { |
| final Tuple t = (Tuple)value.data(); |
| final Tuple p = (Tuple)t.second(); |
| if (((MR_byte)p.first()).get() == 1) |
| xb.add(new Tuple(t.first(),p.second())); |
| else yb.add(new Tuple(t.first(),p.second())); |
| } |
| final Bag b = MapReduceAlgebra.mergeGroupByJoin(fkx,fky,fgx,fgy,fc,z,fr,xb,yb); |
| for ( MRData v: b ) |
| out.collect(new FData(v)); |
| }; |
| } |
| |
| public static final class repeat_values extends RichMapFunction<FData,FData> { |
| @Override |
| public FData map ( FData value ) throws Exception { |
| return new FData(((Tuple)value.data()).first()); |
| } |
| } |
| |
| public static final class repeat_filter extends RichFilterFunction<FData> { |
| @Override |
| public boolean filter ( FData value ) throws Exception { |
| return ((MR_bool)(((Tuple)value.data()).second())).get(); |
| } |
| } |
| |
| /** Evaluate a MRQL physical plan |
| * @param e the physical plan |
| * @param env contains bindings from variables to values (MRData) |
| * @return a Flink DataSet |
| */ |
| final private DataSet<FData> evalD ( final Tree e, final Environment env ) { |
| try { |
| match e { |
| case MapAggregateReduce(`m,`r,null,_,`s,`o): |
| return evalD(#<MapReduce(`m,`r,`s,`o)>,env); |
| case CrossAggregateProduct(`mx,`my,`r,null,_,`x,`y): |
| return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env); |
| case MapAggregateReduce2(`mx,`my,`r,null,_,`x,`y,`o): |
| return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env); |
| case MapAggregateJoin(`mx,`my,`r,null,_,`x,`y): |
| return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env); |
| case cMap(`f,`s): |
| return eval(s,env).flatMap(cmap_fnc(f)); |
| case MapReduce(`m,`r,`s,true): |
| return sortBy(eval(s,env).flatMap(cmap_fnc(m)),null,r); |
| case MapReduce(`m,`r,`s,`o): |
| return groupBy(eval(s,env).flatMap(cmap_fnc(m)),null,r); |
| case MapCombineReduce(`m,`c,`r,`s,true): |
| return sortBy(eval(s,env).flatMap(cmap_fnc(m)),c,r); |
| case MapCombineReduce(`m,`c,`r,`s,`o): |
| return groupBy(eval(s,env).flatMap(cmap_fnc(m)),c,r); |
| case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,`o): |
| return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env); |
| case MapReduce2(`mx,`my,`r,`x,`y,`o): |
| final DataSet<FData> xs = eval(x,env).flatMap(cmap_fnc(mx)); |
| final DataSet<FData> ys = eval(y,env).flatMap(cmap_fnc(my)); |
| final DataSet<FData> ds = xs.coGroup(ys) |
| .where(new join_key()) |
| .equalTo(new join_key()) |
| .with(new join_reducer(r)); |
| if (o.equals(#<true>)) |
| return sortBy(ds,null,#<lambda(x,bag(x))>); |
| else return ds; |
| case MapJoin(`mx,`my,`r,`x,`y): |
| final DataSet<FData> xs = eval(x,env).flatMap(cmap_fnc(mx)); |
| final DataSet<FData> ys = eval(y,env).flatMap(cmap_fnc(my)); |
| return xs.joinWithTiny(ys) |
| .where(new join_key()) |
| .equalTo(new join_key()) |
| .with(new mapjoin_reducer(r)); |
| case CrossProduct(`mx,`my,`r,`x,`y): |
| final DataSet<FData> xs = eval(x,env).flatMap(cmap_fnc(mx)); |
| final DataSet<FData> ys = eval(y,env).flatMap(cmap_fnc(my)); |
| return xs.cross(ys).with(new cross_reducer(r)) |
| .flatMap(cmap_fnc(#<lambda(x,x)>)); |
| case GroupByJoin(`kx,`ky,`gx,`gy,`acc,`zero,`r,`x,`y,`o): |
| final int n = Math.max(2,(int)Math.floor(Math.sqrt(Config.nodes))); |
| final int m = n; |
| // System.err.println("Using a groupBy join on a "+n+"*"+m+" grid of partitions"); |
| final DataSet<FData> xs = eval(x,env).flatMap(new group_join_left(gx,m,n)); |
| final DataSet<FData> ys = eval(y,env).flatMap(new group_join_right(gy,m,n)); |
| return xs.union(ys).groupBy(new join_key()) |
| .reduceGroup(new group_join_reducer(kx,ky,gx,gy,acc,r,zero)); |
| case Merge(`x,`y): |
| return eval(x,env).union(eval(y,env)); |
| case Generator(`min,`max,`size): |
| final long from = ((MR_long)evalE(min,null)).get(); |
| final long to = ((MR_long)evalE(max,null)).get(); |
| return flink_env.generateSequence(from,to).map(new generator_mapper()) |
| .map(new restore_global_functions()); |
| case BinarySource(`file,_): |
| final String path = absolute_path(((MR_string)evalE(file,null)).get()); |
| new BinaryDataSource(path,Plan.conf); |
| final FileInputFormat<FData> sf = new FlinkSequenceInputFormat().inputFormat(path); |
| return flink_env.createInput(sf).map(new restore_global_functions()); |
| case ParsedSource(`parser,`file,...args): |
| final String path = absolute_path(((MR_string)evalE(file,null)).get()); |
| final Class<? extends Parser> parser_class = DataSource.parserDirectory.get(parser.toString()); |
| if (parser_class == null) |
| throw new Error("Unknown parser: "+parser); |
| final DataSource ds = new FlinkParsedDataSource(path,parser_class,args); |
| return flink_env.readFile(new FlinkParsedInputFormat.ParsedInputFormat(path),path) |
| .map(new restore_global_functions()); |
| case Repeat(lambda(`v,`b),`s,`n): |
| final int max_num = ((MR_int)evalE(n,null)).get(); |
| final IterativeDataSet<FData> startOfIteration = eval(s,env).iterate(max_num); |
| final DataSet<FData> step = eval(b,new Environment(v.toString(),new MR_flink(startOfIteration),env)); |
| final DataSet<FData> toBeFedBack = step.map(new repeat_values()); |
| final DataSet<FData> terminationCriterion = step.filter(new repeat_filter()); |
| return startOfIteration.closeWith(toBeFedBack,terminationCriterion); |
| case Closure(lambda(`v,`b),`s,`n): |
| final int max_num = ((MR_int)evalE(n,null)).get(); |
| IterativeDataSet<FData> startOfIteration = eval(s,env).iterate(max_num); |
| DataSet<FData> toBeFedBack = eval(b,new Environment(v.toString(),new MR_flink(startOfIteration),env)); |
| return startOfIteration.closeWith(toBeFedBack); |
| case let(`v,DataSetCollect(`s),`body): |
| MRData x = evalE(s,env); |
| if (x instanceof MR_dataset) |
| x = new Bag(((MR_dataset)x).dataset().take(Integer.MAX_VALUE)); |
| else if (x instanceof Bag) |
| ((Bag)x).materialize(); |
| new_distributed_binding(v.toString(),x); |
| return eval(body,new Environment(v.toString(),x,env)); |
| case let(`v,`u,`body): |
| return eval(body,new Environment(v.toString(),evalE(u,null),env)); |
| case Let(`v,`u,`body): |
| return eval(body,new Environment(v.toString(),new MR_flink(eval(u,env)),env)); |
| case If(`c,`x,`y): |
| if (((MR_bool)evalE(c,env)).get()) |
| return eval(x,env); |
| else return eval(y,env); |
| case nth(`u,`n): |
| MRData x = ((Tuple)evalE(u,env)).get((int)n.longValue()); |
| if (x instanceof MR_flink) |
| return ((MR_flink)x).flink(); |
| throw new Error("Evaluation error in: "+print_query(e)); |
| case `v: |
| if (!v.is_variable()) |
| fail; |
| MRData x = variable_lookup(v.toString(),env); |
| if (x != null) |
| if (x instanceof MR_flink) |
| return ((MR_flink)x).flink(); |
| else new Error("Variable "+v+" is of type: "+x); |
| x = variable_lookup(v.toString(),global_env); |
| if (x != null) |
| if (x instanceof MR_flink) |
| return ((MR_flink)x).flink(); |
| else if (x instanceof MR_dataset) |
| return dataset(new Bag(((MR_dataset)x).dataset.take(Integer.MAX_VALUE))) |
| .map(new restore_global_functions()); |
| else new Error("Variable "+v+" is of type: "+x); |
| throw new Error("Variable "+v+" is not bound"); |
| }; |
| throw new Error("Cannot evaluate the Flink plan: "+e); |
| } catch (Error msg) { |
| if (!Config.trace) |
| throw new Error(msg.getMessage()); |
| System.err.println(msg.getMessage()); |
| msg.printStackTrace(); |
| throw new Error("Evaluation error in: "+print_query(e)); |
| } catch (Exception ex) { |
| System.err.println(ex.getMessage()); |
| ex.printStackTrace(); |
| throw new Error("Evaluation error in: "+print_query(e)); |
| } |
| } |
| } |