| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.mrql; |
| |
| import org.apache.mrql.gen.*; |
| import java_cup.runtime.Scanner; |
| import java.util.*; |
| import java.io.*; |
| import java.util.Enumeration; |
| import org.apache.log4j.*; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.mapred.FileInputFormat; |
| import org.apache.hadoop.mapred.SequenceFileOutputFormat; |
| import scala.Tuple2; |
| import org.apache.spark.SparkConf; |
| import org.apache.spark.TaskContext; |
| import org.apache.spark.Partition; |
| import org.apache.spark.Accumulator; |
| import org.apache.spark.Partitioner; |
| import org.apache.spark.broadcast.Broadcast; |
| import org.apache.spark.api.java.JavaRDD; |
| import org.apache.spark.api.java.JavaPairRDD; |
| import org.apache.spark.api.java.JavaSparkContext; |
| import org.apache.spark.api.java.function.Function2; |
| import org.apache.spark.api.java.function.Function3; |
| import org.apache.spark.api.java.function.PairFunction; |
| import org.apache.spark.api.java.function.Function; |
| import org.apache.spark.api.java.function.VoidFunction; |
| import org.apache.spark.streaming.api.java.JavaStreamingContext; |
| import org.apache.spark.streaming.Duration; |
| |
| |
| /** Evaluates physical plans in Apache Spark mode */ |
| public class SparkEvaluator extends Evaluator implements Serializable { |
| public static JavaSparkContext spark_context; |
| public static JavaStreamingContext stream_context; |
| final static Bag empty_bag = new Bag(); |
| // an HDFS tmp file used to hold the data source directory information in distributed mode |
| final static String data_source_dir_name = "tmp/"+System.getenv("USER")+"_data_source_dir.txt"; |
| // holds the cached RDDs that need to unpersist |
| private static ArrayList<JavaRDD<MRData>> cached_rdds = new ArrayList<JavaRDD<MRData>>(); |
| static Environment global_rdds = null; |
| |
| /** initialize the Spark evaluator */ |
| final public void init ( Configuration conf ) { |
| Config.spark_mode = true; |
| global_rdds = null; |
| SparkConf spark_conf = new SparkConf(); |
| spark_conf.setAppName("MRQL"); |
| if (Config.hadoop_mode && Config.local_mode) { |
| spark_conf.setMaster("local["+Config.nodes+"]"); |
| spark_context = new JavaSparkContext(spark_conf); |
| Plan.conf = spark_context.hadoopConfiguration(); |
| FileSystem.setDefaultUri(Plan.conf,"file:///"); |
| } else if (Config.hadoop_mode) { |
| spark_conf.setMaster(System.getenv("SPARK_MASTER")); |
| spark_conf.setSparkHome(System.getenv("SPARK_HOME")); |
| spark_conf.set("spark.logConf","false"); |
| spark_conf.set("spark.eventLog.enabled","false"); |
| spark_conf.set("spark.executor.instances",""+Config.nodes); |
| spark_context = new JavaSparkContext(spark_conf); |
| Plan.conf = spark_context.hadoopConfiguration(); |
| if (!System.getenv("FS_DEFAULT_NAME").equals("")) |
| FileSystem.setDefaultUri(Plan.conf,System.getenv("FS_DEFAULT_NAME")); |
| }; |
| if (Config.stream_window > 0 && (Config.local_mode || Config.hadoop_mode)) |
| stream_context = new JavaStreamingContext(spark_context,new Duration(Config.stream_window)); |
| if (!Config.info) { |
| for ( Enumeration en = LogManager.getCurrentLoggers(); en.hasMoreElements(); ) |
| ((Logger)en.nextElement()).setLevel(Level.WARN); |
| LogManager.getRootLogger().setLevel(Level.WARN); |
| } |
| } |
| |
| /** shutdown the Spark evaluator */ |
| final public void shutdown ( Configuration conf ) { |
| if (Config.local_mode) { |
| spark_context.stop(); |
| spark_context = null; |
| System.clearProperty("spark.driver.port"); |
| } |
| } |
| |
| final public void initialize_query () { |
| Plan.distribute_compiled_arguments(Plan.conf); |
| if (spark_context != null && Config.compile_functional_arguments) |
| spark_context.addJar(Plan.conf.get("mrql.jar.path")); |
| } |
| |
| final public Configuration new_configuration () { |
| return new Configuration(); |
| } |
| |
| /** return the FileInputFormat for parsed files (CSV, XML, JSON, etc) */ |
| final public Class<? extends MRQLFileInputFormat> parsedInputFormat () { |
| return SparkParsedInputFormat.class; |
| } |
| |
| /** return the FileInputFormat for binary files */ |
| final public Class<? extends MRQLFileInputFormat> binaryInputFormat () { |
| return SparkBinaryInputFormat.class; |
| } |
| |
| /** return the FileInputFormat for data generator files */ |
| final public Class<? extends MRQLFileInputFormat> generatorInputFormat () { |
| return SparkGeneratorInputFormat.class; |
| } |
| |
| final public static void GC_cached_rdds () { |
| for ( JavaRDD<MRData> rdd: cached_rdds ) |
| rdd.unpersist(); |
| cached_rdds = new ArrayList<JavaRDD<MRData>>(); |
| } |
| |
| /** evaluate plan in stream mode: evaluate each batch of data and apply the function f */ |
| final public void streaming ( Tree plan, Environment env, Environment dataset_env, org.apache.mrql.Function f ) { |
| SparkStreaming.evaluate(plan,env,dataset_env,f); |
| } |
| |
| /** used by the master to send parsing details (eg, record types) to workers */ |
| public static void dump_source_dir () throws IOException { |
| if (Config.local_mode) |
| return; |
| DataSource.dataSourceDirectory.distribute(Plan.conf); |
| Path path = new Path(data_source_dir_name); |
| FileSystem fs = path.getFileSystem(Plan.conf); |
| PrintStream ps = new PrintStream(fs.create(path,true)); |
| ps.println(Plan.conf.get("mrql.data.source.directory")); |
| ps.close(); |
| } |
| |
| /** executed by a worker when reading parsed input (see SparkParsedInputFormat) */ |
| public static void load_source_dir () throws IOException { |
| if (Plan.conf == null) { |
| if (evaluator == null) |
| evaluator = new SparkEvaluator(); |
| Plan.conf = evaluator.new_configuration(); |
| Config.read(Plan.conf); |
| }; |
| if (Config.local_mode) |
| return; |
| // the name of the file that contains the source directory details is read from an HDFS file by workers |
| Path path = new Path(data_source_dir_name); |
| FileSystem fs = path.getFileSystem(Plan.conf); |
| BufferedReader ftp = new BufferedReader(new InputStreamReader(fs.open(path))); |
| Plan.conf.set("mrql.data.source.directory",ftp.readLine()); |
| DataSource.dataSourceDirectory.read(Plan.conf); |
| ftp.close(); |
| } |
| |
| /** dump MRQL data into a sequence file */ |
| @Override |
| final public void dump ( String file, Tree type, MRData data ) throws Exception { |
| if (data instanceof MR_dataset && ((MR_dataset)data).dataset().source.get(0) instanceof RDDDataSource) { |
| Path path = new Path(file); |
| FileSystem fs = path.getFileSystem(Plan.conf); |
| PrintStream ftp = new PrintStream(fs.create(path.suffix(".type"))); |
| fs.delete(path,true); |
| ftp.print("2@"+type.toString()+"\n"); |
| ftp.close(); |
| // in Spark mode, a dataset has always one RDD datasource |
| JavaRDD<MRData> rdd = ((RDDDataSource)((MR_dataset)data).dataset().source.get(0)).rdd.cache(); |
| rdd.mapToPair(new PairFunction<MRData,MRContainer,MRContainer>() { |
| final MRContainer zero = new MRContainer(new MR_int(0)); |
| public Tuple2<MRContainer,MRContainer> call ( MRData value ) { |
| return new Tuple2<MRContainer,MRContainer>(zero,new MRContainer(value)); |
| } |
| }).saveAsHadoopFile(file,MRContainer.class,MRContainer.class,SequenceFileOutputFormat.class); |
| rdd.unpersist(); |
| } else super.dump(file,type,data); |
| } |
| |
| /** dump MRQL data into text a CVS file */ |
| @Override |
| final public void dump_text ( String file, Tree type, MRData data ) throws Exception { |
| if (data instanceof MR_dataset && ((MR_dataset)data).dataset().source.get(0) instanceof RDDDataSource) { |
| // in Spark mode, a dataset has always one RDD datasource |
| JavaRDD<MRData> rdd = ((RDDDataSource)((MR_dataset)data).dataset().source.get(0)).rdd.cache(); |
| int ps = Config.max_bag_size_print; |
| Config.max_bag_size_print = -1; |
| match type { |
| case `T(tuple(...tps)): |
| final Trees ts = tps; |
| rdd.map(new Function<MRData,String>() { |
| public String call ( MRData value ) { |
| Tuple t = (Tuple)value; |
| String s = Printer.print(t.get((short)0),ts.nth(0)); |
| for ( short i = 1; i < t.size(); i++ ) |
| s += ","+Printer.print(t.get(i),ts.nth(i)); |
| return s; |
| } |
| }).saveAsTextFile(file); |
| case _: |
| rdd.saveAsTextFile(file); |
| }; |
| Config.max_bag_size_print = ps; |
| rdd.unpersist(); |
| } else super.dump_text(file,type,data); |
| } |
| |
| private static Function2<MRData,MRData,MRData> accumulator ( final Tree acc_fnc, final Environment env ) { |
| final org.apache.mrql.Function f = evalF(acc_fnc,env); |
| return new Function2<MRData,MRData,MRData>() { |
| public MRData call ( MRData x, MRData y ) { |
| return f.eval(new Tuple(x,y)); |
| } |
| }; |
| } |
| |
| /** The Aggregate physical operator |
| * @param acc_fnc the accumulator function from (T,T) to T |
| * @param zero the zero element of type T |
| * @param plan the plan that constructs the dataset that contains the bag of values {T} |
| * @param env contains bindings fro variables to values (MRData) |
| * @return the aggregation result of type T |
| */ |
| final public MRData aggregate ( Tree acc_fnc, |
| Tree zero, |
| Tree plan, |
| Environment env ) throws Exception { |
| Function2<MRData,MRData,MRData> f2 = accumulator(acc_fnc,env); |
| MRData z = evalE(zero,env); |
| match plan { |
| case AggregateMap(`m,`acc,_,`s): |
| return evalD(#<cMap(`m,`s)>,env,null) |
| .aggregate(z,accumulator(acc,env),f2); |
| case MapAggregateReduce(`m,`r,`acc,_,`s,`o): |
| if (acc.equals(#<null>)) |
| fail; |
| return evalD(#<MapReduce(`m,`r,`s,`o)>,env,null) |
| .aggregate(z,accumulator(acc,env),f2); |
| case CrossAggregateProduct(`mx,`my,`r,`acc,_,`x,`y): |
| if (acc.equals(#<null>)) |
| fail; |
| return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env,null) |
| .aggregate(z,accumulator(acc,env),f2); |
| case MapAggregateReduce2(`mx,`my,`r,`acc,_,`x,`y,`o): |
| if (acc.equals(#<null>)) |
| fail; |
| return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env,null) |
| .aggregate(z,accumulator(acc,env),f2); |
| case MapAggregateJoin(`mx,`my,`r,`acc,_,`x,`y): |
| if (acc.equals(#<null>)) |
| fail; |
| return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env,null) |
| .aggregate(z,accumulator(acc,env),f2); |
| }; |
| throw new Error("Unrecognized aggregation: "+plan); |
| } |
| |
| /** Evaluate a loop a fixed # of times */ |
| final public Tuple loop ( Tree e, Environment env ) throws Exception { |
| match e { |
| case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num): |
| int limit = ((MR_int)evalE(num,env)).get(); |
| MR_rdd[] s = new MR_rdd[vs.length()]; |
| for ( int i = 0; i < vs.length(); i++ ) |
| s[i] = new MR_rdd(eval(ss.nth(i),env,(Environment)null).cache()); |
| for ( int n = 0; n < limit; n++ ) { |
| Environment nenv = null; |
| for ( int i = 0; i < vs.length(); i ++ ) |
| nenv = new Environment(vs.nth(i).toString(),s[i],nenv); |
| for ( int i = 0; i < vs.length(); i ++ ) { |
| s[i].rdd().unpersist(); |
| s[i] = new MR_rdd(eval(bs.nth(i),env,nenv).cache()); |
| } |
| }; |
| Tuple t = new Tuple(vs.length()); |
| for ( int i = 0; i < vs.length(); i++ ) |
| t.set(i,bag(s[i].rdd())); |
| return t; |
| }; |
| throw new Error("Wrong Loop format"); |
| } |
| |
| private static Bag bag ( final Iterable<MRData> s ) { |
| final Iterator<MRData> i = s.iterator(); |
| return new Bag(new BagIterator() { |
| public MRData next () { |
| return i.next(); |
| } |
| public boolean hasNext () { |
| return i.hasNext(); |
| } |
| }); |
| } |
| |
| /** Convert a Spark RDD into a lazy bag |
| * @param rdd the Spark RDD |
| * @return a lazy bag that contains all RDD elements |
| */ |
| public static Bag bag ( final JavaRDD<MRData> rdd ) throws IOException { |
| final Iterator<MRData> i = rdd.toLocalIterator(); |
| return new Bag(new BagIterator() { |
| public MRData next () { |
| return i.next(); |
| } |
| public boolean hasNext () { |
| return i.hasNext(); |
| } |
| }); |
| } |
| |
| /** Coerce a persistent collection to a Bag */ |
| public Bag toBag ( MRData data ) { |
| try { |
| if (data instanceof MR_rdd) |
| return bag(((MR_rdd)data).rdd()); |
| return (Bag)data; |
| } catch (Exception ex) { |
| throw new Error("Cannot coerce "+data+" to a Bag: "+ex); |
| } |
| } |
| |
| private final static Function<MRData,MRData> get_first |
| = new Function<MRData,MRData>() { |
| public MRData call ( MRData value ) { |
| return ((Tuple)value).first(); |
| }; |
| }; |
| |
| /** Evaluate an MRQL physical plan using Spark |
| * @param e the physical plan |
| * @param env contains bindings fro variables to values (MRData) |
| * @param counter the name of the counter used in loops |
| * @return a DataSet |
| */ |
| final public DataSet eval ( final Tree e, |
| final Environment env, |
| final String counter ) { |
| JavaRDD<MRData> rd = eval(e,env,(Environment)null); |
| long count = 0; |
| rd = rd.cache(); |
| cached_rdds.add(rd); |
| if (!counter.equals("-")) { |
| final Accumulator<Integer> c = spark_context.intAccumulator(0); |
| rd.foreach(new VoidFunction<MRData>() { |
| public void call ( MRData value ) { |
| if (((MR_bool)((Tuple)value).second()).get()) |
| c.add(1); // count the true results (the results that need another loop step) |
| } |
| }); |
| count = c.value(); |
| rd = rd.map(get_first); |
| }; |
| return new DataSet(new RDDDataSource(rd),count,rd.count()); |
| } |
| |
| /** Evaluate an MRQL physical plan using Spark and print tracing info |
| * @param e the physical plan |
| * @param env contains bindings from variables to values (MRData) |
| * @param rdd_env contains bindings from variables to RDDs |
| * @return a Spark RDD |
| */ |
| final public JavaRDD<MRData> eval ( final Tree e, final Environment env, final Environment rdd_env ) { |
| if (Config.trace_execution) { |
| tab_count += 3; |
| System.out.println(tabs(tab_count)+print_query(e)); |
| }; |
| final JavaRDD<MRData> res = evalD(e,env,rdd_env); |
| if (Config.trace_execution) |
| try { |
| System.out.println(tabs(tab_count)+"-> "+res.take(Config.max_bag_size_print)); |
| tab_count -= 3; |
| } catch (Exception ex) { |
| throw new Error("Cannot collect the operator output: "+e+" ("+ex+")"); |
| }; |
| return res; |
| } |
| |
| /* convert an MRQL lambda to a Spark Function */ |
| private static FmFunction<MRData,MRData> cmap_fnc ( Tree fnc, Environment env ) { |
| final Broadcast<Environment> broadcast_env = spark_context.broadcast(global_env); |
| final org.apache.mrql.Function f = evalF(fnc,env); |
| return new FmFunction<MRData,MRData>() { |
| public Iterator<MRData> eval ( MRData value ) { |
| global_env = broadcast_env.value(); |
| return ((Bag)f.eval(value)).iterator(); |
| } |
| }; |
| } |
| |
| /* group-by s and then reduce by fnc; if o is true, sort after group-by */ |
| private static JavaRDD<MRData> groupBy ( JavaRDD<MRData> s, Tree fnc, Environment env, Tree o ) { |
| match o { |
| case true: // the result must be sorted |
| return s.mapToPair(new PairFunction<MRData,MRData,MRData>() { |
| public Tuple2<MRData,MRData> call ( MRData value ) { |
| Tuple t = (Tuple)value; |
| return new Tuple2(t.first(),t.second()); |
| } |
| }) |
| .sortByKey(true) |
| .map(new Function<Tuple2<MRData,MRData>,MRData>() { |
| public MRData call ( Tuple2<MRData,MRData> value ) { |
| return new Tuple(value._1,new Bag(value._2)); |
| }}) |
| .flatMap(cmap_fnc(fnc,env)); |
| }; |
| return s.groupBy(get_first) |
| .map(new Function<Tuple2<MRData,Iterable<MRData>>,MRData> () { |
| public MRData call ( Tuple2<MRData,Iterable<MRData>> value ) { |
| final Iterator<MRData> i = value._2.iterator(); |
| return new Tuple(value._1,new Bag(new BagIterator() { |
| public MRData next () { |
| return ((Tuple)i.next()).second(); |
| } |
| public boolean hasNext () { |
| return i.hasNext(); |
| } |
| })); |
| } |
| }) |
| .flatMap(cmap_fnc(fnc,env)); |
| } |
| |
| static JavaRDD<MRData> containerData ( JavaPairRDD<MRContainer,MRContainer> rd ) { |
| final Broadcast<Environment> broadcast_env = spark_context.broadcast(global_env); |
| return rd.map(new Function<Tuple2<MRContainer,MRContainer>,MRData>() { |
| public MRData call ( Tuple2<MRContainer,MRContainer> value ) { |
| global_env = broadcast_env.value(); |
| return value._2.data(); |
| } |
| }); |
| } |
| |
| private static Iterator<Tuple2<MRData,MRData>> joinIterator ( final Iterator<MRData> i ) { |
| return new Iterator<Tuple2<MRData,MRData>> () { |
| public Tuple2<MRData,MRData> next () { |
| Tuple data = (Tuple)i.next(); |
| return new Tuple2<MRData,MRData>(data.first(),data.second()); |
| } |
| public boolean hasNext () { |
| return i.hasNext(); |
| } |
| public void remove () {} |
| }; |
| } |
| |
| private static FmFunction<Iterator<MRData>,MRData> combiner_fnc ( final org.apache.mrql.Function f ) { |
| return new FmFunction<Iterator<MRData>,MRData>() { |
| public Iterator<MRData> eval ( final Iterator<MRData> i ) { |
| return MapReduceAlgebra.cmap(new org.apache.mrql.Function() { |
| public MRData eval ( MRData x ) { |
| final MRData key = ((Tuple)x).first(); |
| final Iterator<MRData> it = ((Bag)f.eval(x)).iterator(); |
| return new Bag(new BagIterator() { |
| public MRData next () { |
| return new Tuple(key,it.next()); |
| } |
| public boolean hasNext () { |
| return it.hasNext(); |
| } |
| }); |
| }}, |
| MapReduceAlgebra.groupBy(new Bag(new BagIterator() { |
| public MRData next () { |
| return i.next(); |
| } |
| public boolean hasNext () { |
| return i.hasNext(); |
| } |
| }))).iterator(); |
| } |
| }; |
| } |
| |
| /** Evaluate MRQL physical operators using Spark |
| * @param e the physical plan |
| * @param env contains bindings from variables to values (MRData) |
| * @param rdd_env contains bindings from variables to RDDs |
| * @return a Spark RDD |
| */ |
| final public JavaRDD<MRData> evalD ( final Tree e, final Environment env, final Environment rdd_env ) { |
| try { |
| match e { |
| case MapAggregateReduce(`m,`r,null,_,`s,`o): |
| return evalD(#<MapReduce(`m,`r,`s,`o)>,env,rdd_env); |
| case CrossAggregateProduct(`mx,`my,`r,null,_,`x,`y): |
| return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env,rdd_env); |
| case MapAggregateReduce2(`mx,`my,`r,null,_,`x,`y,`o): |
| return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env,rdd_env); |
| case MapAggregateJoin(`mx,`my,`r,null,_,`x,`y): |
| return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env,rdd_env); |
| case cMap(`f,`s): |
| return eval(s,env,rdd_env).flatMap(cmap_fnc(f,env)); |
| case MapReduce(`m,`r,`s,`o): |
| return groupBy(eval(s,env,rdd_env).flatMap(cmap_fnc(m,env)),r,env,o); |
| case MapCombineReduce(`m,`c,`r,`s,`o): |
| return groupBy(eval(s,env,rdd_env).flatMap(cmap_fnc(m,env)) |
| .mapPartitions(combiner_fnc(evalF(c,env))),r,env,o); |
| case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,`o): |
| return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env,rdd_env).mapPartitions(combiner_fnc(evalF(c,env))); |
| case CrossProduct(`mx,`my,`r,`x,`y): |
| final org.apache.mrql.Function fr = evalF(r,env); |
| return eval(x,env,rdd_env) |
| .flatMap(cmap_fnc(mx,env)) |
| .cartesian(eval(y,env,rdd_env).flatMap(cmap_fnc(my,env))) |
| .flatMap(new FmFunction<Tuple2<MRData,MRData>,MRData>() { |
| public Iterator<MRData> eval ( Tuple2<MRData,MRData> value ) { |
| return ((Bag)fr.eval(new Tuple(value._1,value._2))).iterator(); |
| } |
| }); |
| case MapReduce2(`mx,`my,`r,`x,`y,`o): |
| final org.apache.mrql.Function fr = evalF(r,env); |
| boolean sfx = false; |
| match mx { |
| case compiled(`fmx,lambda(`vx,bag(`mex))): |
| sfx = true; |
| mx = #<lambda(`vx,`mex)>; |
| case lambda(`vx,bag(`mex)): |
| sfx = true; |
| mx = #<lambda(`vx,`mex)>; |
| }; |
| final org.apache.mrql.Function fx = evalF(mx,env); |
| JavaPairRDD<MRData,MRData> xs |
| = (sfx) |
| ? eval(x,env,rdd_env).mapToPair(new PairFunction<MRData,MRData,MRData>() { |
| public Tuple2<MRData,MRData> call ( MRData value ) { |
| Tuple t = (Tuple)fx.eval(value); |
| return new Tuple2(t.first(),t.second()); |
| } |
| }) |
| : eval(x,env,rdd_env).flatMapToPair(new PairFmFunction<MRData,MRData,MRData>() { |
| public Iterator<Tuple2<MRData,MRData>> eval ( MRData value ) { |
| return joinIterator(((Bag)fx.eval(value)).iterator()); |
| } |
| }); |
| boolean sfy = false; |
| match my { |
| case compiled(`fmy,lambda(`vy,bag(`mey))): |
| sfy = true; |
| my = #<lambda(`vy,`mey)>; |
| case lambda(`vy,bag(`mey)): |
| sfy = true; |
| my = #<lambda(`vy,`mey)>; |
| }; |
| final org.apache.mrql.Function fy = evalF(my,env); |
| JavaPairRDD<MRData,MRData> ys |
| = (sfy) |
| ? eval(y,env,rdd_env).mapToPair(new PairFunction<MRData,MRData,MRData>() { |
| public Tuple2<MRData,MRData> call ( MRData value ) { |
| Tuple t = (Tuple)fy.eval(value); |
| return new Tuple2(t.first(),t.second()); |
| } |
| }) |
| : eval(y,env,rdd_env).flatMapToPair(new PairFmFunction<MRData,MRData,MRData>() { |
| public Iterator<Tuple2<MRData,MRData>> eval ( MRData value ) { |
| return joinIterator(((Bag)fy.eval(value)).iterator()); |
| } |
| }); |
| return xs.cogroup(ys) |
| .flatMap(new FmFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() { |
| public Iterator<MRData> eval ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) { |
| return ((Bag)fr.eval(new Tuple(bag(value._2._1),bag(value._2._2)))).iterator(); |
| } |
| }); |
| case GroupByJoin(`kx,`ky,`gx,`gy,`acc,`zero,`r,`x,`y,`o): |
| final int n = Math.max(2,(int)Math.floor(Math.sqrt(Config.nodes))); |
| final int m = n; |
| // System.err.println("Using a groupBy join on a "+n+"*"+m+" grid of partitions"); |
| final org.apache.mrql.Function fkx = evalF(kx,env); |
| final org.apache.mrql.Function fky = evalF(ky,env); |
| final org.apache.mrql.Function fgx = evalF(gx,env); |
| final org.apache.mrql.Function fgy = evalF(gy,env); |
| final org.apache.mrql.Function fc = evalF(acc,env); |
| final MRData z = evalE(zero,env); |
| final org.apache.mrql.Function fr = evalF(r,env); |
| final MRData one = new MR_byte(1); |
| final MRData two = new MR_byte(2); |
| final JavaPairRDD<MRData,MRData> xs |
| = eval(x,env,rdd_env).flatMapToPair(new PairFmFunction<MRData,MRData,MRData>() { |
| public Iterator<Tuple2<MRData,MRData>> eval ( final MRData value ) { |
| return new Iterator<Tuple2<MRData,MRData>>() { |
| int i = 0; |
| public Tuple2<MRData,MRData> next () { |
| MRData key = new MR_int((fgx.eval(value).hashCode() % m)+m*i); |
| i++; |
| return new Tuple2<MRData,MRData>(key,new Tuple(one,value)); |
| } |
| public boolean hasNext () { |
| return i < n; |
| } |
| public void remove () {} |
| }; |
| } |
| }); |
| final JavaPairRDD<MRData,MRData> ys |
| = eval(y,env,rdd_env).flatMapToPair(new PairFmFunction<MRData,MRData,MRData>() { |
| public Iterator<Tuple2<MRData,MRData>> eval ( final MRData value ) { |
| return new Iterator<Tuple2<MRData,MRData>>() { |
| int j = 0; |
| public Tuple2<MRData,MRData> next () { |
| MRData key = new MR_int((fgy.eval(value).hashCode() % n)*m+j); |
| j++; |
| return new Tuple2<MRData,MRData>(key,new Tuple(two,value)); |
| } |
| public boolean hasNext () { |
| return j < m; |
| } |
| public void remove () {} |
| }; |
| } |
| }); |
| return xs.union(ys).groupByKey() |
| .mapPartitions(new FmFunction<Iterator<Tuple2<MRData,Iterable<MRData>>>,MRData>() { |
| public Iterator<MRData> eval ( final Iterator<Tuple2<MRData,Iterable<MRData>>> value ) { |
| Bag xb = new Bag(); |
| Bag yb = new Bag(); |
| while (value.hasNext()) { |
| Tuple2<MRData,Iterable<MRData>> t = value.next(); |
| for ( MRData e: t._2 ) { |
| Tuple p = (Tuple)e; |
| if (((MR_byte)p.first()).get() == 1) |
| xb.add(new Tuple(t._1,p.second())); |
| else yb.add(new Tuple(t._1,p.second())); |
| } |
| }; |
| final Bag b = MapReduceAlgebra.mergeGroupByJoin(fkx,fky,fgx,fgy,fc,z,fr,xb,yb); |
| xb = null; yb = null; |
| return b.iterator(); |
| } |
| }); |
| case OuterMerge(`merge,`state,`data): |
| final org.apache.mrql.Function fm = evalF(merge,env); |
| JavaPairRDD<MRData,MRData> S = eval(state,env,rdd_env) |
| .mapPartitionsToPair(new PairFmFunction<Iterator<MRData>,MRData,MRData>() { |
| public Iterator<Tuple2<MRData,MRData>> eval ( final Iterator<MRData> i ) { |
| return joinIterator(i); |
| } |
| },true); // do not repartition the state S |
| JavaPairRDD<MRData,MRData> ds = eval(data,env,rdd_env) |
| .mapToPair(new PairFunction<MRData,MRData,MRData>() { |
| public Tuple2<MRData,MRData> call ( MRData value ) { |
| Tuple t = (Tuple)value; |
| return new Tuple2(t.first(),t.second()); |
| } |
| }); |
| JavaRDD<MRData> res = S.cogroup(ds) |
| .flatMap(new FmFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() { |
| public Iterator<MRData> eval ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) { |
| final Iterator<MRData> ix = value._2._1.iterator(); |
| final Iterator<MRData> iy = value._2._2.iterator(); |
| ArrayList<MRData> a = new ArrayList<MRData>(); |
| if (ix.hasNext()) |
| if (iy.hasNext()) |
| a.add(new Tuple(value._1,fm.eval(new Tuple(ix.next(),iy.next())))); |
| else a.add(new Tuple(value._1,ix.next())); |
| else if (iy.hasNext()) |
| a.add(new Tuple(value._1,iy.next())); |
| return a.iterator(); |
| } |
| }).cache(); |
| cached_rdds.add(res); |
| return res; |
| case RightOuterMerge(`merge,`state,`data): |
| final org.apache.mrql.Function fm = evalF(merge,env); |
| JavaPairRDD<MRData,MRData> S = eval(state,env,rdd_env) |
| .mapPartitionsToPair(new PairFmFunction<Iterator<MRData>,MRData,MRData>() { |
| public Iterator<Tuple2<MRData,MRData>> eval ( final Iterator<MRData> i ) { |
| return joinIterator(i); |
| } |
| },true); // do not repartition the state S |
| JavaPairRDD<MRData,MRData> ds = eval(data,env,rdd_env) |
| .mapToPair(new PairFunction<MRData,MRData,MRData>() { |
| public Tuple2<MRData,MRData> call ( MRData value ) { |
| Tuple t = (Tuple)value; |
| return new Tuple2(t.first(),t.second()); |
| } |
| }); |
| JavaRDD<MRData> res = S.cogroup(ds) |
| .flatMap(new FmFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() { |
| public Iterator<MRData> eval ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) { |
| final Iterator<MRData> ix = value._2._1.iterator(); |
| final Iterator<MRData> iy = value._2._2.iterator(); |
| return ((ix.hasNext()) |
| ? ((iy.hasNext()) |
| ? new Bag(new Tuple(value._1,fm.eval(new Tuple(ix.next(),iy.next())))) |
| : new Bag()) |
| : ((iy.hasNext()) |
| ? new Bag(new Tuple(value._1,iy.next())) |
| : new Bag())).iterator(); |
| } |
| }).cache(); |
| cached_rdds.add(res); |
| return res; |
| case MapJoin(lambda(`vx,bag(`mx)),lambda(`vy,bag(`my)),`r,`x,`y,`o): |
| final org.apache.mrql.Function fx = evalF(#<lambda(`vx,`mx)>,env); |
| final org.apache.mrql.Function fy = evalF(#<lambda(`vy,`my)>,env); |
| final org.apache.mrql.Function fr = evalF(r,env); |
| final Broadcast<Map<MRData,MRData>> ys |
| = spark_context.broadcast(eval(y,env,rdd_env) |
| .mapToPair(new PairFunction<MRData,MRData,MRData>() { |
| public Tuple2<MRData,MRData> call ( MRData value ) { |
| Tuple t = (Tuple)fy.eval(value); |
| return new Tuple2(t.first(),t.second()); |
| } |
| }).collectAsMap()); |
| return eval(x,env,rdd_env).mapPartitions(new FmFunction<Iterator<MRData>,MRData>() { |
| public Iterator<MRData> eval ( final Iterator<MRData> i ) { |
| final Map<MRData,MRData> m = ys.value(); |
| return new Iterator<MRData> () { |
| public MRData next () { |
| final Tuple p = (Tuple)fx.eval(i.next()); |
| final MRData pd = m.get(p.first()); |
| return ((Bag)fr.eval(new Tuple(p.second(),(pd == null) ? empty_bag : pd))); |
| } |
| public boolean hasNext () { |
| return i.hasNext(); |
| } |
| public void remove () {} |
| }; |
| } |
| },true); |
| case MapJoin(`mx,`my,`r,`x,`y): |
| final org.apache.mrql.Function fx = evalF(mx,env); |
| final org.apache.mrql.Function fy = evalF(my,env); |
| final org.apache.mrql.Function fr = evalF(r,env); |
| final Broadcast<Map<MRData,MRData>> ys |
| = spark_context.broadcast(eval(y,env,rdd_env) |
| .flatMapToPair(new PairFmFunction<MRData,MRData,MRData>() { |
| public Iterator<Tuple2<MRData,MRData>> eval ( MRData value ) { |
| return joinIterator(((Bag)fy.eval(value)).iterator()); |
| } |
| }).collectAsMap()); |
| return eval(x,env,rdd_env).flatMap(new FmFunction<MRData,MRData>() { |
| public Iterator<MRData> eval ( MRData value ) { |
| final Map<MRData,MRData> m = ys.value(); |
| final Iterator<MRData> i = ((Bag)fx.eval(value)).iterator(); |
| return new Iterator<MRData>() { |
| Tuple p; |
| Iterator<MRData> ix = null; |
| public MRData next () { |
| return ix.next(); |
| } |
| public boolean hasNext () { |
| if (ix != null && ix.hasNext()) |
| return true; |
| while (i.hasNext()) { |
| p = (Tuple)i.next(); |
| MRData pd = m.get(p.first()); |
| Bag bb = ((Bag)fr.eval(new Tuple(p.second(),(pd == null) ? empty_bag : pd))); |
| ix = bb.iterator(); |
| if (ix.hasNext()) |
| return true; |
| }; |
| return false; |
| } |
| public void remove () {} |
| }; |
| } |
| }); |
| case BinarySource(`file,_): |
| String path = ((MR_string)evalE(file,env)).get(); |
| new BinaryDataSource(path,Plan.conf); |
| return containerData(spark_context.sequenceFile(file.stringValue(), |
| MRContainer.class,MRContainer.class, |
| Config.nodes)); |
| case ParsedSource(`parser,`file,...args): |
| String path = ((MR_string)evalE(file,env)).get(); |
| Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString()); |
| if (p == null) |
| throw new Error("Unknown parser: "+parser); |
| new ParsedDataSource(path,p,args,Plan.conf); |
| dump_source_dir(); |
| return containerData(spark_context.hadoopFile(path,SparkParsedInputFormat.class, |
| MRContainer.class,MRContainer.class, |
| Config.nodes)); |
| case Merge(`x,`y): |
| return eval(x,env,rdd_env).union(eval(y,env,rdd_env)); |
| case Repeat(lambda(`v,`b),`s,`n): |
| int max_num = ((MR_int)evalE(n,env)).get(); |
| JavaRDD<MRData> rd; |
| JavaRDD<MRData> res = eval(s,env,rdd_env).cache(); |
| int i = 0; |
| boolean cont = true; |
| do { |
| rd = eval(b,env,new Environment(v.toString(),new MR_rdd(res),rdd_env)).cache(); |
| res.unpersist(); |
| res = rd.map(get_first).cache(); |
| long size = rd.count(); |
| Integer true_results |
| = rd.aggregate(new Integer(0), |
| new Function2<Integer,MRData,Integer>() { |
| public Integer call ( Integer x, MRData y ) { |
| return (((MR_bool)((Tuple)y).second()).get()) ? x+1 : x; |
| } |
| }, |
| new Function2<Integer,Integer,Integer>() { |
| public Integer call ( Integer x, Integer y ) { return x+y; } |
| }); |
| i++; |
| cont = (true_results > 0 || size == 0) && i < max_num; |
| if (!Config.testing) |
| if (size == 0) |
| System.err.println("Repeat #"+i); |
| else System.err.println("Repeat #"+i+": "+true_results+" true results out of "+size); |
| rd.unpersist(); |
| } while (cont); |
| cached_rdds.add(res); |
| return res; |
| case Closure(lambda(`v,`b),`s,`m): |
| int max_num = ((MR_int)evalE(m,env)).get(); |
| JavaRDD<MRData> res = eval(s,env,rdd_env).cache(); |
| long n = 0; |
| long old = 0; |
| int i = 0; |
| boolean cont = true; |
| do { |
| JavaRDD<MRData> rd = eval(b,env,new Environment(v.toString(),new MR_rdd(res),rdd_env)).cache(); |
| res.unpersist(); |
| res = rd; |
| old = n; |
| n = res.count(); |
| i++; |
| if (!Config.testing) |
| System.err.println("Repeat #"+i+": "+(old-n)+" new records"); |
| } while (old < n && i < max_num); |
| cached_rdds.add(res); |
| return res; |
| case Generator(`min,`max,`size): |
| DataSet ds = Plan.generator(((MR_long)evalE(min,env)).get(), |
| ((MR_long)evalE(max,env)).get(), |
| ((MR_long)evalE(size,env)).get()); |
| JavaRDD<MRData> rd = null; |
| for ( DataSource d: ds.source ) |
| if (rd == null) |
| rd = containerData(spark_context.hadoopFile(d.path,SparkGeneratorInputFormat.class, |
| MRContainer.class,MRContainer.class,1)); |
| else rd = rd.union(containerData(spark_context.hadoopFile(d.path,SparkGeneratorInputFormat.class, |
| MRContainer.class,MRContainer.class,1))); |
| return rd; |
| case let(`v,DataSetCollect(`s),`body): |
| MRData x = evalE(s,env); |
| if (x instanceof MR_dataset) |
| x = new Bag(((MR_dataset)x).dataset().take(Integer.MAX_VALUE)); |
| else if (x instanceof Bag) |
| ((Bag)x).materialize(); |
| new_distributed_binding(v.toString(),x); |
| return eval(body,new Environment(v.toString(),x,env),rdd_env); |
| case let(`v,`u,`body): |
| MRData val = evalE(u,env); |
| Interpreter.new_global_binding(v.toString(),val); |
| return eval(body,new Environment(v.toString(),val,env),rdd_env); |
| case Let(`v,`u,`body): |
| JavaRDD<MRData> val = eval(u,env,rdd_env).cache(); |
| JavaRDD<MRData> rd = eval(body,env,new Environment(v.toString(),new MR_rdd(val),rdd_env)); |
| cached_rdds.add(val); |
| return rd; |
| case If(`c,`x,`y): |
| if (((MR_bool)evalE(c,env)).get()) |
| return eval(x,env,rdd_env); |
| else return eval(y,env,rdd_env); |
| case trace(`msg,`tp,`x): |
| long n = pre_trace(((MR_string)evalE(msg,env)).get()); |
| JavaRDD<MRData> ds = evalD(x,env,rdd_env); |
| trace(n,tp,new Bag(ds.take(Config.max_bag_size_print))); |
| return ds; |
| case apply(`f,`arg): |
| MRData x = evalE(e,env); |
| if (x instanceof MR_dataset) |
| return ((RDDDataSource)((MR_dataset)x).dataset().source.get(0)).rdd; |
| throw new Error("Expected an RDD dataset: "+x); |
| case `v: |
| if (!v.is_variable()) |
| fail; |
| MRData x = variable_lookup(v.toString(),rdd_env); |
| if (x != null && x instanceof MR_rdd) |
| return ((MR_rdd)x).rdd(); |
| x = variable_lookup(v.toString(),global_rdds); |
| if (x != null && x instanceof MR_rdd) |
| return ((MR_rdd)x).rdd(); |
| x = variable_lookup(v.toString(),global_env); |
| if (x != null) |
| if (x instanceof MR_dataset) |
| return ((RDDDataSource)((MR_dataset)x).dataset().source.get(0)).rdd; |
| else if (x instanceof MR_rdd) |
| return ((MR_rdd)x).rdd(); |
| else if (x instanceof Bag) { |
| ArrayList<MRData> l = new ArrayList<MRData>(); |
| for ( MRData a: (Bag)x ) |
| l.add(a); |
| return spark_context.parallelize(l); |
| } else { |
| ArrayList<MRData> l = new ArrayList<MRData>(); |
| l.add(x); |
| return spark_context.parallelize(l); |
| }; |
| x = variable_lookup(v.toString(),env); |
| if (x != null) |
| if (x instanceof MR_dataset) |
| return ((RDDDataSource)((MR_dataset)x).dataset().source.get(0)).rdd; |
| else if (x instanceof Bag) { |
| ArrayList<MRData> l = new ArrayList<MRData>(); |
| for ( MRData a: (Bag)x ) |
| l.add(a); |
| return spark_context.parallelize(l); |
| } else { |
| ArrayList<MRData> l = new ArrayList<MRData>(); |
| l.add(x); |
| return spark_context.parallelize(l); |
| }; |
| throw new Error("Variable "+v+" is not bound"); |
| }; |
| throw new Error("Unrecognized Spark plan: "+e); |
| } catch (Error msg) { |
| if (!Config.trace) |
| throw new Error(msg.getMessage()); |
| System.err.println(msg.getMessage()); |
| throw new Error("Evaluation error in: "+print_query(e)); |
| } catch (Exception ex) { |
| System.err.println(ex.getMessage()); |
| ex.printStackTrace(); |
| throw new Error("Evaluation error in: "+print_query(e)); |
| } |
| } |
| } |