blob: 60a9bb38fc93a0919e42358a88996257d59b0683 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import org.apache.mrql.gen.*;
import java_cup.runtime.Scanner;
import java.util.List;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.Iterator;
import java.io.*;
import java.util.Enumeration;
import org.apache.log4j.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.Partition;
import org.apache.spark.Accumulator;
import org.apache.spark.Partitioner;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.Function3;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.Duration;
/** Evaluates physical plans in Apache Spark mode */
public class SparkEvaluator extends Evaluator implements Serializable {
public static JavaSparkContext spark_context;
public static JavaStreamingContext stream_context;
final static Bag empty_bag = new Bag();
// an HDFS tmp file used to hold the data source directory information in distributed mode
final static String data_source_dir_name = "tmp/"+System.getenv("USER")+"_data_source_dir.txt";
private static boolean first_time = true; // true at the beginning of a query execution
/** initialize the Spark evaluator */
final public void init ( Configuration conf ) {
Config.spark_mode = true;
SparkConf spark_conf = new SparkConf();
spark_conf.setAppName("MRQL");
if (Config.hadoop_mode && Config.local_mode) {
spark_conf.setMaster("local["+Config.nodes+"]");
spark_context = new JavaSparkContext(spark_conf);
Plan.conf = spark_context.hadoopConfiguration();
FileSystem.setDefaultUri(Plan.conf,"file:///");
} else if (Config.hadoop_mode) {
spark_conf.setMaster(System.getenv("SPARK_MASTER"));
spark_conf.setSparkHome(System.getenv("SPARK_HOME"));
spark_conf.set("spark.logConf","false");
spark_conf.set("spark.eventLog.enabled","false");
spark_conf.set("spark.executor.instances",""+Config.nodes);
spark_context = new JavaSparkContext(spark_conf);
Plan.conf = spark_context.hadoopConfiguration();
FileSystem.setDefaultUri(Plan.conf,System.getenv("FS_DEFAULT_NAME"));
};
if (Config.stream_window > 0 && (Config.local_mode || Config.hadoop_mode))
stream_context = new JavaStreamingContext(spark_context,new Duration(Config.stream_window));
if (!Config.info) {
for ( Enumeration en = LogManager.getCurrentLoggers(); en.hasMoreElements(); )
((Logger)en.nextElement()).setLevel(Level.WARN);
LogManager.getRootLogger().setLevel(Level.WARN);
}
}
/** shutdown the Spark evaluator */
final public void shutdown ( Configuration conf ) {
if (Config.local_mode) {
spark_context.stop();
spark_context = null;
System.clearProperty("spark.driver.port");
}
}
final public void initialize_query () {
first_time = true;
Plan.distribute_compiled_arguments(Plan.conf);
if (spark_context != null && Config.compile_functional_arguments)
spark_context.addJar(Plan.conf.get("mrql.jar.path"));
}
final public Configuration new_configuration () {
return new Configuration();
}
/** return the FileInputFormat for parsed files (CSV, XML, JSON, etc) */
final public Class<? extends MRQLFileInputFormat> parsedInputFormat () {
return SparkParsedInputFormat.class;
}
/** return the FileInputFormat for binary files */
final public Class<? extends MRQLFileInputFormat> binaryInputFormat () {
return SparkBinaryInputFormat.class;
}
/** return the FileInputFormat for data generator files */
final public Class<? extends MRQLFileInputFormat> generatorInputFormat () {
return SparkGeneratorInputFormat.class;
}
/** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
final public void streaming ( Tree plan, Environment env, org.apache.mrql.Function f ) {
SparkStreaming.evaluate(plan,env,f);
}
/** used by the master to send parsing details (eg, record types) to workers */
public static void dump_source_dir () throws IOException {
if (Config.local_mode)
return;
DataSource.dataSourceDirectory.distribute(Plan.conf);
Path path = new Path(data_source_dir_name);
FileSystem fs = path.getFileSystem(Plan.conf);
PrintStream ps = new PrintStream(fs.create(path,true));
ps.println(Plan.conf.get("mrql.data.source.directory"));
ps.close();
}
/** executed by a worker when reading parsed input (see SparkParsedInputFormat) */
public static void load_source_dir () throws IOException {
if (Plan.conf == null) {
if (evaluator == null)
evaluator = new SparkEvaluator();
Plan.conf = evaluator.new_configuration();
Config.read(Plan.conf);
};
if (Config.local_mode)
return;
// the name of the file that contains the source directory details is read from an HDFS file by workers
Path path = new Path(data_source_dir_name);
FileSystem fs = path.getFileSystem(Plan.conf);
BufferedReader ftp = new BufferedReader(new InputStreamReader(fs.open(path)));
Plan.conf.set("mrql.data.source.directory",ftp.readLine());
DataSource.dataSourceDirectory.read(Plan.conf);
ftp.close();
}
/** dump MRQL data into a sequence file */
@Override
final public void dump ( String file, Tree type, MRData data ) throws Exception {
if (data instanceof MR_dataset && ((MR_dataset)data).dataset().source.get(0) instanceof RDDDataSource) {
Path path = new Path(file);
FileSystem fs = path.getFileSystem(Plan.conf);
PrintStream ftp = new PrintStream(fs.create(path.suffix(".type")));
fs.delete(path,true);
ftp.print("2@"+type.toString()+"\n");
ftp.close();
// in Spark mode, a dataset has always one RDD datasource
JavaRDD<MRData> rdd = ((RDDDataSource)((MR_dataset)data).dataset().source.get(0)).rdd.cache();
rdd.mapToPair(new PairFunction<MRData,MRContainer,MRContainer>() {
final MRContainer zero = new MRContainer(new MR_int(0));
public Tuple2<MRContainer,MRContainer> call ( MRData value ) {
return new Tuple2<MRContainer,MRContainer>(zero,new MRContainer(value));
}
}).saveAsHadoopFile(file,MRContainer.class,MRContainer.class,SequenceFileOutputFormat.class);
} else super.dump(file,type,data);
}
/** dump MRQL data into text a CVS file */
@Override
final public void dump_text ( String file, Tree type, MRData data ) throws Exception {
if (data instanceof MR_dataset && ((MR_dataset)data).dataset().source.get(0) instanceof RDDDataSource) {
// in Spark mode, a dataset has always one RDD datasource
JavaRDD<MRData> rdd = ((RDDDataSource)((MR_dataset)data).dataset().source.get(0)).rdd.cache();
int ps = Config.max_bag_size_print;
Config.max_bag_size_print = -1;
match type {
case `T(tuple(...tps)):
final Trees ts = tps;
rdd.map(new Function<MRData,String>() {
public String call ( MRData value ) {
Tuple t = (Tuple)value;
String s = Printer.print(t.get((short)0),ts.nth(0));
for ( short i = 1; i < t.size(); i++ )
s += ","+Printer.print(t.get(i),ts.nth(i));
return s;
}
}).saveAsTextFile(file);
case _:
rdd.saveAsTextFile(file);
};
Config.max_bag_size_print = ps;
} else super.dump_text(file,type,data);
}
private static void set_global_env ( Environment env ) {
if (true || first_time) { // Bug: it doesn't need to be executed more than once per worker
// pass the global bindings to workers
set_global_bindings(env);
first_time = false;
}
}
private static Function2<MRData,MRData,MRData> accumulator ( final Tree acc_fnc, final Environment env ) {
final Environment master_env = global_env;
final org.apache.mrql.Function f = evalF(acc_fnc,env);
return new Function2<MRData,MRData,MRData>() {
public MRData call ( MRData x, MRData y ) {
set_global_env(master_env);
return f.eval(new Tuple(x,y));
}
};
}
/** The Aggregate physical operator
* @param acc_fnc the accumulator function from (T,T) to T
* @param zero the zero element of type T
* @param plan the plan that constructs the dataset that contains the bag of values {T}
* @param env contains bindings fro variables to values (MRData)
* @return the aggregation result of type T
*/
final public MRData aggregate ( Tree acc_fnc,
Tree zero,
Tree plan,
Environment env ) throws Exception {
Function2<MRData,MRData,MRData> f2 = accumulator(acc_fnc,env);
MRData z = evalE(zero,env);
match plan {
case AggregateMap(`m,`acc,_,`s):
return evalD(#<cMap(`m,`s)>,env)
.aggregate(z,accumulator(acc,env),f2);
case MapAggregateReduce(`m,`r,`acc,_,`s,`o):
if (acc.equals(#<null>))
fail;
return evalD(#<MapReduce(`m,`r,`s,`o)>,env)
.aggregate(z,accumulator(acc,env),f2);
case CrossAggregateProduct(`mx,`my,`r,`acc,_,`x,`y):
if (acc.equals(#<null>))
fail;
return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env)
.aggregate(z,accumulator(acc,env),f2);
case MapAggregateReduce2(`mx,`my,`r,`acc,_,`x,`y,`o):
if (acc.equals(#<null>))
fail;
return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env)
.aggregate(z,accumulator(acc,env),f2);
case MapAggregateJoin(`mx,`my,`r,`acc,_,`x,`y):
if (acc.equals(#<null>))
fail;
return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env)
.aggregate(z,accumulator(acc,env),f2);
};
throw new Error("Unrecognized aggregation: "+plan);
}
/** Evaluate a loop a fixed # of times */
final public Tuple loop ( Tree e, Environment env ) throws Exception {
match e {
case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
int limit = ((MR_int)evalE(num,env)).get();
MR_rdd[] s = new MR_rdd[vs.length()];
for ( int i = 0; i < vs.length(); i++ )
s[i] = new MR_rdd(eval(ss.nth(i),env).cache());
for ( int n = 0; n < limit; n++ ) {
Environment nenv = env;
for ( int i = 0; i < vs.length(); i ++ )
nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
for ( int i = 0; i < vs.length(); i ++ )
s[i] = new MR_rdd(eval(bs.nth(i),nenv).cache());
};
Tuple t = new Tuple(vs.length());
for ( int i = 0; i < vs.length(); i++ )
t.set(i,bag(s[i].rdd()));
return t;
};
throw new Error("Wrong Loop format");
}
private static Bag bag ( final Iterable<MRData> s ) {
final Iterator<MRData> i = s.iterator();
return new Bag(new BagIterator() {
public MRData next () {
return i.next();
}
public boolean hasNext () {
return i.hasNext();
}
});
}
// Return the RDD elements at the given position
private static Iterator<MRData> partition ( final JavaRDD<MRData> rdd, final int position ) {
/* Doesn't work (needs the right TaskContext)
TaskContext context = new TaskContext(0,0,(long)0,Config.local_mode,null);
List<Partition> ps = rdd.splits();
return rdd.iterator(ps.get(position),context);
*/
return rdd.mapPartitionsWithIndex(new Function2<Integer,Iterator<MRData>,Iterator<MRData>>() {
public Iterator<MRData> call ( Integer partition, Iterator<MRData> values ) {
if (partition == position)
return values;
else return new ArrayList<MRData>().iterator();
}
},true).collect().iterator();
}
final static int MAX_CACHE_SIZE = 1000;
/** Convert a Spark RDD into a lazy bag
* @param rdd the Spark RDD
* @return a lazy bag that contains all RDD elements
*/
public static Bag bag ( final JavaRDD<MRData> rdd ) throws IOException {
final JavaRDD<MRData> rd = rdd.cache();
if (rd.count() <= MAX_CACHE_SIZE) { // small RDD
final Iterator<MRData> i = rd.collect().iterator();
return new Bag(new BagIterator() {
public MRData next () {
return i.next();
}
public boolean hasNext () {
return i.hasNext();
}
});
};
// return the RDD elements lazily, one partition at a time
final int splits = rd.splits().size();
return new Bag(new BagIterator() {
Iterator<MRData> i = null;
int c = 0;
public MRData next () {
return i.next();
}
public boolean hasNext () {
do {
if (i != null && i.hasNext())
return true;
if (c >= splits)
return false;
i = partition(rd,c++);
} while (true);
}
});
}
/** Coerce a persistent collection to a Bag */
public Bag toBag ( MRData data ) {
try {
if (data instanceof MR_rdd)
return bag(((MR_rdd)data).rdd());
return (Bag)data;
} catch (Exception ex) {
throw new Error("Cannot coerce "+data+" to a Bag: "+ex);
}
}
private static JavaRDD<MRData> materialize ( JavaRDD<MRData> rdd ) {
return rdd.map(new Function<MRData,MRData>() {
public MRData call ( MRData value ) {
value.materializeAll();
return value;
};
}).cache();
}
private final static Function<MRData,MRData> get_first
= new Function<MRData,MRData>() {
public MRData call ( MRData value ) {
return ((Tuple)value).first();
};
};
/** Evaluate an MRQL physical plan using Spark
* @param e the physical plan
* @param env contains bindings fro variables to values (MRData)
* @param counter the name of the counter used in loops
* @return a DataSet
*/
final public DataSet eval ( final Tree e,
final Environment env,
final String counter ) {
JavaRDD<MRData> rd = eval(e,env);
long count = 0;
rd = rd.cache();
if (!counter.equals("-")) {
final Accumulator<Integer> c = spark_context.intAccumulator(0);
rd.foreach(new VoidFunction<MRData>() {
public void call ( MRData value ) {
if (((MR_bool)((Tuple)value).second()).get())
c.add(1); // count the true results (the results that need another loop step)
}
});
count = c.value();
rd = rd.map(get_first);
};
return new DataSet(new RDDDataSource(rd),count,rd.count());
}
/** Evaluate an MRQL physical plan using Spark and print tracing info
* @param e the physical plan
* @param env contains bindings from variables to values (MRData)
* @return a Spark RDD
*/
final public JavaRDD<MRData> eval ( final Tree e, final Environment env ) {
if (Config.trace_execution) {
tab_count += 3;
System.out.println(tabs(tab_count)+print_query(e));
};
final JavaRDD<MRData> res = evalD(e,env);
if (Config.trace_execution)
try {
System.out.println(tabs(tab_count)+"-> "+res.take(Config.max_bag_size_print));
tab_count -= 3;
} catch (Exception ex) {
throw new Error("Cannot collect the operator output: "+e+" ("+ex+")");
};
return res;
}
/* convert an MRQL lambda to a Spark Function */
private static FlatMapFunction<MRData,MRData> cmap_fnc ( Tree fnc, Environment env ) {
final Environment master_env = global_env;
final org.apache.mrql.Function f = evalF(fnc,env);
return new FlatMapFunction<MRData,MRData>() {
public Iterable<MRData> call ( MRData value ) {
set_global_env(master_env);
return (Bag)f.eval(value);
}
};
}
/* group-by s and then reduce by fnc; if o is true, sort after group-by */
private static JavaRDD<MRData> groupBy ( JavaRDD<MRData> s, Tree fnc, Environment env, Tree o ) {
match o {
case true: // the result must be sorted
return s.groupBy(get_first)
.sortByKey()
.map(new Function<Tuple2<MRData,Iterable<MRData>>,MRData>() {
public MRData call ( Tuple2<MRData,Iterable<MRData>> value ) {
return new Tuple(value._1,bag(value._2));
}})
.flatMap(cmap_fnc(fnc,env)).map(new Function<MRData,MRData>() {
public MRData call ( MRData value ) {
return ((Tuple)value).second();
}
});
};
return s.groupBy(get_first)
.map(new Function<Tuple2<MRData,Iterable<MRData>>,MRData> () {
public MRData call ( Tuple2<MRData,Iterable<MRData>> value ) {
final Iterator<MRData> i = value._2.iterator();
return new Tuple(value._1,new Bag(new BagIterator() {
public MRData next () {
return ((Tuple)i.next()).second();
}
public boolean hasNext () {
return i.hasNext();
}
}));
}
})
.flatMap(cmap_fnc(fnc,env));
}
static JavaRDD<MRData> containerData ( JavaPairRDD<MRContainer,MRContainer> rd ) {
final Environment master_env = global_env;
return rd.map(new Function<Tuple2<MRContainer,MRContainer>,MRData>() {
public MRData call ( Tuple2<MRContainer,MRContainer> value ) {
set_global_env(master_env);
return value._2.data();
}
});
}
private static Iterable<Tuple2<MRData,MRData>> joinIterator ( final Iterator<MRData> i ) {
return new Iterable<Tuple2<MRData,MRData>>() {
public Iterator<Tuple2<MRData,MRData>> iterator() {
return new Iterator<Tuple2<MRData,MRData>> () {
public Tuple2<MRData,MRData> next () {
Tuple data = (Tuple)i.next();
return new Tuple2<MRData,MRData>(data.first(),data.second());
}
public boolean hasNext () {
return i.hasNext();
}
public void remove () {}
};
}
};
}
private static FlatMapFunction<Iterator<MRData>,MRData> combiner_fnc ( final org.apache.mrql.Function f ) {
final Environment master_env = global_env;
return new FlatMapFunction<Iterator<MRData>,MRData>() {
public Iterable<MRData> call ( final Iterator<MRData> i ) {
set_global_env(master_env);
return MapReduceAlgebra.cmap(new org.apache.mrql.Function() {
public MRData eval ( MRData x ) {
final MRData key = ((Tuple)x).first();
final Iterator<MRData> it = ((Bag)f.eval(x)).iterator();
return new Bag(new BagIterator() {
public MRData next () {
return new Tuple(key,it.next());
}
public boolean hasNext () {
return it.hasNext();
}
});
}},
MapReduceAlgebra.groupBy(new Bag(new BagIterator() {
public MRData next () {
return i.next();
}
public boolean hasNext () {
return i.hasNext();
}
})));
}
};
}
private static Hashtable<MRData,Bag> make_built_table ( List<Tuple2<MRData,MRData>> values ) {
Hashtable<MRData,Bag> built_table = new Hashtable<MRData,Bag>(Config.map_cache_size);
for ( Tuple2<MRData,MRData> t: values ) {
Bag entries = built_table.get(t._1);
built_table.put(t._1,
(entries == null)
? (new Bag(t._2))
: entries.add_element(t._2));
};
return built_table;
}
/** Evaluate MRQL physical operators using Spark
* @param e the physical plan
* @param env contains bindings fro variables to values (MRData)
* @return a Spark RDD
*/
final public JavaRDD<MRData> evalD ( final Tree e, final Environment env ) {
final Environment master_env = global_env;
try {
match e {
case MapAggregateReduce(`m,`r,null,_,`s,`o):
return evalD(#<MapReduce(`m,`r,`s,`o)>,env);
case CrossAggregateProduct(`mx,`my,`r,null,_,`x,`y):
return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env);
case MapAggregateReduce2(`mx,`my,`r,null,_,`x,`y,`o):
return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env);
case MapAggregateJoin(`mx,`my,`r,null,_,`x,`y):
return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env);
case cMap(`f,`s):
return eval(s,env).flatMap(cmap_fnc(f,env));
case MapReduce(`m,`r,`s,`o):
return groupBy(eval(s,env).flatMap(cmap_fnc(m,env)),r,env,o);
case MapCombineReduce(`m,`c,`r,`s,`o):
return groupBy(eval(s,env).flatMap(cmap_fnc(m,env))
.mapPartitions(combiner_fnc(evalF(c,env))),r,env,o);
case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,`o):
return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env).mapPartitions(combiner_fnc(evalF(c,env)));
case CrossProduct(`mx,`my,`r,`x,`y):
final org.apache.mrql.Function fr = evalF(r,env);
return eval(x,env)
.flatMap(cmap_fnc(mx,env))
.cartesian(eval(y,env).flatMap(cmap_fnc(my,env)))
.flatMap(new FlatMapFunction<Tuple2<MRData,MRData>,MRData>() {
public Iterable<MRData> call ( Tuple2<MRData,MRData> value ) {
set_global_env(master_env);
return (Bag)fr.eval(new Tuple(value._1,value._2));
}
});
case MapReduce2(`mx,`my,`r,`x,`y,`o):
final org.apache.mrql.Function fx = evalF(mx,env);
final org.apache.mrql.Function fy = evalF(my,env);
final org.apache.mrql.Function fr = evalF(r,env);
JavaPairRDD<MRData,MRData> xs
= eval(x,env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
set_global_env(master_env);
return joinIterator(((Bag)fx.eval(value)).iterator());
}
});
JavaPairRDD<MRData,MRData> ys
= eval(y,env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
set_global_env(master_env);
return joinIterator(((Bag)fy.eval(value)).iterator());
}
});
return xs.cogroup(ys)
.flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
public Iterable<MRData> call ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
set_global_env(master_env);
return (Bag)fr.eval(new Tuple(bag(value._2._1),bag(value._2._2)));
}
});
case GroupByJoin(`kx,`ky,`gx,`gy,`acc,`zero,`r,`x,`y,`o):
final int n = Math.max(2,(int)Math.floor(Math.sqrt(Config.nodes)));
final int m = n;
// System.err.println("Using a groupBy join on a "+n+"*"+m+" grid of partitions");
final org.apache.mrql.Function fkx = evalF(kx,env);
final org.apache.mrql.Function fky = evalF(ky,env);
final org.apache.mrql.Function fgx = evalF(gx,env);
final org.apache.mrql.Function fgy = evalF(gy,env);
final org.apache.mrql.Function fc = evalF(acc,env);
final MRData z = evalE(zero,env);
final org.apache.mrql.Function fr = evalF(r,env);
final MRData one = new MR_byte(1);
final MRData two = new MR_byte(2);
final JavaPairRDD<MRData,MRData> xs
= eval(x,env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
set_global_env(master_env);
return new Iterable<Tuple2<MRData,MRData>>() {
public Iterator<Tuple2<MRData,MRData>> iterator() {
return new Iterator<Tuple2<MRData,MRData>>() {
int i = 0;
public Tuple2<MRData,MRData> next () {
MRData key = new MR_int((fgx.eval(value).hashCode() % m)+m*i);
i++;
return new Tuple2<MRData,MRData>(key,new Tuple(one,value));
}
public boolean hasNext () {
return i < n;
}
public void remove () {}
};
}
};
}
});
final JavaPairRDD<MRData,MRData> ys
= eval(y,env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
set_global_env(master_env);
return new Iterable<Tuple2<MRData,MRData>>() {
public Iterator<Tuple2<MRData,MRData>> iterator() {
return new Iterator<Tuple2<MRData,MRData>>() {
int j = 0;
public Tuple2<MRData,MRData> next () {
MRData key = new MR_int((fgy.eval(value).hashCode() % n)*m+j);
j++;
return new Tuple2<MRData,MRData>(key,new Tuple(two,value));
}
public boolean hasNext () {
return j < m;
}
public void remove () {}
};
}
};
}
});
return xs.union(ys).groupByKey(Config.nodes)
.mapPartitions(new FlatMapFunction<Iterator<Tuple2<MRData,Iterable<MRData>>>,MRData>() {
public Iterable<MRData> call ( final Iterator<Tuple2<MRData,Iterable<MRData>>> value ) {
Bag xb = new Bag();
Bag yb = new Bag();
while (value.hasNext()) {
Tuple2<MRData,Iterable<MRData>> t = value.next();
for ( MRData e: t._2 ) {
Tuple p = (Tuple)e;
if (((MR_byte)p.first()).get() == 1)
xb.add(new Tuple(t._1,p.second()));
else yb.add(new Tuple(t._1,p.second()));
}
};
final Bag b = MapReduceAlgebra.mergeGroupByJoin(fkx,fky,fgx,fgy,fc,z,fr,xb,yb);
xb = null; yb = null;
return new Iterable<MRData>() {
public Iterator<MRData> iterator() {
return b.iterator();
}
};
}
});
case MapJoin(`mx,`my,`r,`x,`y):
final org.apache.mrql.Function fx = evalF(mx,env);
final org.apache.mrql.Function fy = evalF(my,env);
final org.apache.mrql.Function fr = evalF(r,env);
final Broadcast<List<Tuple2<MRData,MRData>>> ys
= spark_context.broadcast(eval(y,env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
set_global_env(master_env);
return joinIterator(((Bag)fy.eval(value)).iterator());
}
}).collect());
return eval(x,env).flatMap(new FlatMapFunction<MRData,MRData>() {
final Hashtable<MRData,Bag> built_table = make_built_table(ys.value());
public Iterable<MRData> call ( MRData value ) {
set_global_env(master_env);
final Iterator<MRData> i = ((Bag)fx.eval(value)).iterator();
return new Iterable<MRData>() {
public Iterator<MRData> iterator() {
return new Iterator<MRData>() {
Tuple p;
Iterator<MRData> ix = null;
public MRData next () {
return ix.next();
}
public boolean hasNext () {
if (ix != null && ix.hasNext())
return true;
while (i.hasNext()) {
p = (Tuple)i.next();
MRData pd = built_table.get(p.first());
Bag bb = ((Bag)fr.eval(new Tuple(p.second(),(pd == null) ? empty_bag : pd)));
ix = bb.iterator();
if (ix.hasNext())
return true;
};
return false;
}
public void remove () {}
};
}
};
}
});
case BinarySource(`file,_):
String path = ((MR_string)evalE(file,env)).get();
new BinaryDataSource(path,Plan.conf);
return containerData(spark_context.sequenceFile(file.stringValue(),
MRContainer.class,MRContainer.class,
Config.nodes));
case ParsedSource(`parser,`file,...args):
String path = ((MR_string)evalE(file,env)).get();
Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
if (p == null)
throw new Error("Unknown parser: "+parser);
new ParsedDataSource(path,p,args,Plan.conf);
dump_source_dir();
return containerData(spark_context.hadoopFile(path,SparkParsedInputFormat.class,
MRContainer.class,MRContainer.class,
Config.nodes));
case Merge(`x,`y):
return eval(x,env).union(eval(y,env));
case Repeat(lambda(`v,`b),`s,`n):
int max_num = ((MR_int)evalE(n,env)).get();
JavaRDD<MRData> rd;
JavaRDD<MRData> res = eval(s,env);
//res = materialize(res);
int i = 0;
boolean cont = true;
do {
rd = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
res = rd.map(get_first);
//res = materialize(res);
long size = res.count();
Integer true_results
= rd.aggregate(new Integer(0),
new Function2<Integer,MRData,Integer>() {
public Integer call ( Integer x, MRData y ) {
return (((MR_bool)((Tuple)y).second()).get()) ? x+1 : x;
}
},
new Function2<Integer,Integer,Integer>() {
public Integer call ( Integer x, Integer y ) { return x+y; }
});
i++;
cont = (true_results > 0 || size == 0) && i < max_num;
if (!Config.testing)
if (size == 0)
System.err.println("Repeat #"+i);
else System.err.println("Repeat #"+i+": "+true_results+" true results out of "+size);
} while (cont);
return res;
case Closure(lambda(`v,`b),`s,`m):
int max_num = ((MR_int)evalE(m,env)).get();
JavaRDD<MRData> res = eval(s,env).cache();
long n = 0;
long old = 0;
int i = 0;
boolean cont = true;
do {
res = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
old = n;
n = res.count();
i++;
if (!Config.testing)
System.err.println("Repeat #"+i+": "+(old-n)+" new records");
} while (old < n && i < max_num);
return res;
case Generator(`min,`max,`size):
DataSet ds = Plan.generator(((MR_long)evalE(min,env)).get(),
((MR_long)evalE(max,env)).get(),
((MR_long)evalE(size,env)).get());
JavaRDD<MRData> rd = null;
for ( DataSource d: ds.source )
if (rd == null)
rd = containerData(spark_context.hadoopFile(d.path,SparkGeneratorInputFormat.class,
MRContainer.class,MRContainer.class,1));
else rd = rd.union(containerData(spark_context.hadoopFile(d.path,SparkGeneratorInputFormat.class,
MRContainer.class,MRContainer.class,1)));
return rd;
case let(`v,DataSetCollect(`s),`body):
MRData x = evalE(s,env);
if (x instanceof MR_dataset)
x = new Bag(((MR_dataset)x).dataset().take(Integer.MAX_VALUE));
else if (x instanceof Bag)
((Bag)x).materialize();
new_distributed_binding(v.toString(),x);
return eval(body,new Environment(v.toString(),x,env));
case let(`v,`u,`body):
MRData val = evalE(u,env);
Interpreter.new_global_binding(v.toString(),val);
return eval(body,new Environment(v.toString(),val,env));
case Let(`v,`u,`body):
return eval(body,new Environment(v.toString(),new MR_rdd(eval(u,env).cache()),env));
case If(`c,`x,`y):
if (((MR_bool)evalE(c,env)).get())
return eval(x,env);
else return eval(y,env);
case trace(`msg,`tp,`x):
long n = pre_trace(((MR_string)evalE(msg,env)).get());
JavaRDD<MRData> ds = evalD(x,env);
trace(n,tp,new Bag(ds.take(Config.max_bag_size_print)));
return ds;
case apply(`f,`arg):
MRData x = evalE(e,env);
if (x instanceof MR_dataset)
return ((RDDDataSource)((MR_dataset)x).dataset().source.get(0)).rdd;
throw new Error("Expected an RDD dataset: "+x);
case `v:
if (!v.is_variable())
fail;
MRData x = variable_lookup(v.toString(),env);
if (x != null)
if (x instanceof MR_dataset)
return ((RDDDataSource)((MR_dataset)x).dataset().source.get(0)).rdd;
else if (x instanceof MR_rdd)
return ((MR_rdd)x).rdd();
else if (x instanceof Bag) {
ArrayList<MRData> l = new ArrayList<MRData>();
for ( MRData a: (Bag)x )
l.add(a);
return spark_context.parallelize(l);
} else {
ArrayList<MRData> l = new ArrayList<MRData>();
l.add(x);
return spark_context.parallelize(l);
};
x = variable_lookup(v.toString(),global_env);
if (x != null)
if (x instanceof MR_dataset)
return ((RDDDataSource)((MR_dataset)x).dataset().source.get(0)).rdd;
else if (x instanceof MR_rdd)
return ((MR_rdd)x).rdd();
else if (x instanceof Bag) {
ArrayList<MRData> l = new ArrayList<MRData>();
for ( MRData a: (Bag)x )
l.add(a);
return spark_context.parallelize(l);
} else {
ArrayList<MRData> l = new ArrayList<MRData>();
l.add(x);
return spark_context.parallelize(l);
};
throw new Error("Variable "+v+" is not bound");
};
throw new Error("Unrecognized Spark plan: "+e);
} catch (Error msg) {
if (!Config.trace)
throw new Error(msg.getMessage());
System.err.println(msg.getMessage());
throw new Error("Evaluation error in: "+print_query(e));
} catch (Exception ex) {
System.err.println(ex.getMessage());
ex.printStackTrace();
throw new Error("Evaluation error in: "+print_query(e));
}
}
}