blob: 83de42190caac7455df650f51837062ae449764f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import org.apache.mrql.gen.*;
import java_cup.runtime.Scanner;
import java.util.*;
import java.io.*;
import java.util.Enumeration;
import org.apache.log4j.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.Partition;
import org.apache.spark.Accumulator;
import org.apache.spark.Partitioner;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.Function3;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.Duration;
/** Evaluates physical plans in Apache Spark mode */
public class SparkEvaluator extends Evaluator implements Serializable {
public static JavaSparkContext spark_context;
public static JavaStreamingContext stream_context;
final static Bag empty_bag = new Bag();
// an HDFS tmp file used to hold the data source directory information in distributed mode
final static String data_source_dir_name = "tmp/"+System.getenv("USER")+"_data_source_dir.txt";
// holds the cached RDDs that need to unpersist
private static ArrayList<JavaRDD<MRData>> cached_rdds = new ArrayList<JavaRDD<MRData>>();
static Environment global_rdds = null;
/** initialize the Spark evaluator */
final public void init ( Configuration conf ) {
Config.spark_mode = true;
global_rdds = null;
SparkConf spark_conf = new SparkConf();
spark_conf.setAppName("MRQL");
if (Config.hadoop_mode && Config.local_mode) {
spark_conf.setMaster("local["+Config.nodes+"]");
spark_context = new JavaSparkContext(spark_conf);
Plan.conf = spark_context.hadoopConfiguration();
FileSystem.setDefaultUri(Plan.conf,"file:///");
} else if (Config.hadoop_mode) {
spark_conf.setMaster(System.getenv("SPARK_MASTER"));
spark_conf.setSparkHome(System.getenv("SPARK_HOME"));
spark_conf.set("spark.logConf","false");
spark_conf.set("spark.eventLog.enabled","false");
spark_conf.set("spark.executor.instances",""+Config.nodes);
spark_context = new JavaSparkContext(spark_conf);
Plan.conf = spark_context.hadoopConfiguration();
if (!System.getenv("FS_DEFAULT_NAME").equals(""))
FileSystem.setDefaultUri(Plan.conf,System.getenv("FS_DEFAULT_NAME"));
};
if (Config.stream_window > 0 && (Config.local_mode || Config.hadoop_mode))
stream_context = new JavaStreamingContext(spark_context,new Duration(Config.stream_window));
if (!Config.info) {
for ( Enumeration en = LogManager.getCurrentLoggers(); en.hasMoreElements(); )
((Logger)en.nextElement()).setLevel(Level.WARN);
LogManager.getRootLogger().setLevel(Level.WARN);
}
}
/** shutdown the Spark evaluator */
final public void shutdown ( Configuration conf ) {
if (Config.local_mode) {
spark_context.stop();
spark_context = null;
System.clearProperty("spark.driver.port");
}
}
final public void initialize_query () {
Plan.distribute_compiled_arguments(Plan.conf);
if (spark_context != null && Config.compile_functional_arguments)
spark_context.addJar(Plan.conf.get("mrql.jar.path"));
}
final public Configuration new_configuration () {
return new Configuration();
}
/** return the FileInputFormat for parsed files (CSV, XML, JSON, etc) */
final public Class<? extends MRQLFileInputFormat> parsedInputFormat () {
return SparkParsedInputFormat.class;
}
/** return the FileInputFormat for binary files */
final public Class<? extends MRQLFileInputFormat> binaryInputFormat () {
return SparkBinaryInputFormat.class;
}
/** return the FileInputFormat for data generator files */
final public Class<? extends MRQLFileInputFormat> generatorInputFormat () {
return SparkGeneratorInputFormat.class;
}
final public static void GC_cached_rdds () {
for ( JavaRDD<MRData> rdd: cached_rdds )
rdd.unpersist();
cached_rdds = new ArrayList<JavaRDD<MRData>>();
}
/** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
final public void streaming ( Tree plan, Environment env, Environment dataset_env, org.apache.mrql.Function f ) {
SparkStreaming.evaluate(plan,env,dataset_env,f);
}
/** used by the master to send parsing details (eg, record types) to workers */
public static void dump_source_dir () throws IOException {
if (Config.local_mode)
return;
DataSource.dataSourceDirectory.distribute(Plan.conf);
Path path = new Path(data_source_dir_name);
FileSystem fs = path.getFileSystem(Plan.conf);
PrintStream ps = new PrintStream(fs.create(path,true));
ps.println(Plan.conf.get("mrql.data.source.directory"));
ps.close();
}
/** executed by a worker when reading parsed input (see SparkParsedInputFormat) */
public static void load_source_dir () throws IOException {
if (Plan.conf == null) {
if (evaluator == null)
evaluator = new SparkEvaluator();
Plan.conf = evaluator.new_configuration();
Config.read(Plan.conf);
};
if (Config.local_mode)
return;
// the name of the file that contains the source directory details is read from an HDFS file by workers
Path path = new Path(data_source_dir_name);
FileSystem fs = path.getFileSystem(Plan.conf);
BufferedReader ftp = new BufferedReader(new InputStreamReader(fs.open(path)));
Plan.conf.set("mrql.data.source.directory",ftp.readLine());
DataSource.dataSourceDirectory.read(Plan.conf);
ftp.close();
}
/** dump MRQL data into a sequence file */
@Override
final public void dump ( String file, Tree type, MRData data ) throws Exception {
if (data instanceof MR_dataset && ((MR_dataset)data).dataset().source.get(0) instanceof RDDDataSource) {
Path path = new Path(file);
FileSystem fs = path.getFileSystem(Plan.conf);
PrintStream ftp = new PrintStream(fs.create(path.suffix(".type")));
fs.delete(path,true);
ftp.print("2@"+type.toString()+"\n");
ftp.close();
// in Spark mode, a dataset has always one RDD datasource
JavaRDD<MRData> rdd = ((RDDDataSource)((MR_dataset)data).dataset().source.get(0)).rdd.cache();
rdd.mapToPair(new PairFunction<MRData,MRContainer,MRContainer>() {
final MRContainer zero = new MRContainer(new MR_int(0));
public Tuple2<MRContainer,MRContainer> call ( MRData value ) {
return new Tuple2<MRContainer,MRContainer>(zero,new MRContainer(value));
}
}).saveAsHadoopFile(file,MRContainer.class,MRContainer.class,SequenceFileOutputFormat.class);
rdd.unpersist();
} else super.dump(file,type,data);
}
/** dump MRQL data into text a CVS file */
@Override
final public void dump_text ( String file, Tree type, MRData data ) throws Exception {
if (data instanceof MR_dataset && ((MR_dataset)data).dataset().source.get(0) instanceof RDDDataSource) {
// in Spark mode, a dataset has always one RDD datasource
JavaRDD<MRData> rdd = ((RDDDataSource)((MR_dataset)data).dataset().source.get(0)).rdd.cache();
int ps = Config.max_bag_size_print;
Config.max_bag_size_print = -1;
match type {
case `T(tuple(...tps)):
final Trees ts = tps;
rdd.map(new Function<MRData,String>() {
public String call ( MRData value ) {
Tuple t = (Tuple)value;
String s = Printer.print(t.get((short)0),ts.nth(0));
for ( short i = 1; i < t.size(); i++ )
s += ","+Printer.print(t.get(i),ts.nth(i));
return s;
}
}).saveAsTextFile(file);
case _:
rdd.saveAsTextFile(file);
};
Config.max_bag_size_print = ps;
rdd.unpersist();
} else super.dump_text(file,type,data);
}
private static Function2<MRData,MRData,MRData> accumulator ( final Tree acc_fnc, final Environment env ) {
final org.apache.mrql.Function f = evalF(acc_fnc,env);
return new Function2<MRData,MRData,MRData>() {
public MRData call ( MRData x, MRData y ) {
return f.eval(new Tuple(x,y));
}
};
}
/** The Aggregate physical operator
* @param acc_fnc the accumulator function from (T,T) to T
* @param zero the zero element of type T
* @param plan the plan that constructs the dataset that contains the bag of values {T}
* @param env contains bindings fro variables to values (MRData)
* @return the aggregation result of type T
*/
final public MRData aggregate ( Tree acc_fnc,
Tree zero,
Tree plan,
Environment env ) throws Exception {
Function2<MRData,MRData,MRData> f2 = accumulator(acc_fnc,env);
MRData z = evalE(zero,env);
match plan {
case AggregateMap(`m,`acc,_,`s):
return evalD(#<cMap(`m,`s)>,env,null)
.aggregate(z,accumulator(acc,env),f2);
case MapAggregateReduce(`m,`r,`acc,_,`s,`o):
if (acc.equals(#<null>))
fail;
return evalD(#<MapReduce(`m,`r,`s,`o)>,env,null)
.aggregate(z,accumulator(acc,env),f2);
case CrossAggregateProduct(`mx,`my,`r,`acc,_,`x,`y):
if (acc.equals(#<null>))
fail;
return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env,null)
.aggregate(z,accumulator(acc,env),f2);
case MapAggregateReduce2(`mx,`my,`r,`acc,_,`x,`y,`o):
if (acc.equals(#<null>))
fail;
return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env,null)
.aggregate(z,accumulator(acc,env),f2);
case MapAggregateJoin(`mx,`my,`r,`acc,_,`x,`y):
if (acc.equals(#<null>))
fail;
return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env,null)
.aggregate(z,accumulator(acc,env),f2);
};
throw new Error("Unrecognized aggregation: "+plan);
}
/** Evaluate a loop a fixed # of times */
final public Tuple loop ( Tree e, Environment env ) throws Exception {
match e {
case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
int limit = ((MR_int)evalE(num,env)).get();
MR_rdd[] s = new MR_rdd[vs.length()];
for ( int i = 0; i < vs.length(); i++ )
s[i] = new MR_rdd(eval(ss.nth(i),env,(Environment)null).cache());
for ( int n = 0; n < limit; n++ ) {
Environment nenv = null;
for ( int i = 0; i < vs.length(); i ++ )
nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
for ( int i = 0; i < vs.length(); i ++ ) {
s[i].rdd().unpersist();
s[i] = new MR_rdd(eval(bs.nth(i),env,nenv).cache());
}
};
Tuple t = new Tuple(vs.length());
for ( int i = 0; i < vs.length(); i++ )
t.set(i,bag(s[i].rdd()));
return t;
};
throw new Error("Wrong Loop format");
}
private static Bag bag ( final Iterable<MRData> s ) {
final Iterator<MRData> i = s.iterator();
return new Bag(new BagIterator() {
public MRData next () {
return i.next();
}
public boolean hasNext () {
return i.hasNext();
}
});
}
/** Convert a Spark RDD into a lazy bag
* @param rdd the Spark RDD
* @return a lazy bag that contains all RDD elements
*/
public static Bag bag ( final JavaRDD<MRData> rdd ) throws IOException {
final Iterator<MRData> i = rdd.toLocalIterator();
return new Bag(new BagIterator() {
public MRData next () {
return i.next();
}
public boolean hasNext () {
return i.hasNext();
}
});
}
/** Coerce a persistent collection to a Bag */
public Bag toBag ( MRData data ) {
try {
if (data instanceof MR_rdd)
return bag(((MR_rdd)data).rdd());
return (Bag)data;
} catch (Exception ex) {
throw new Error("Cannot coerce "+data+" to a Bag: "+ex);
}
}
private final static Function<MRData,MRData> get_first
= new Function<MRData,MRData>() {
public MRData call ( MRData value ) {
return ((Tuple)value).first();
};
};
/** Evaluate an MRQL physical plan using Spark
* @param e the physical plan
* @param env contains bindings fro variables to values (MRData)
* @param counter the name of the counter used in loops
* @return a DataSet
*/
final public DataSet eval ( final Tree e,
final Environment env,
final String counter ) {
JavaRDD<MRData> rd = eval(e,env,(Environment)null);
long count = 0;
rd = rd.cache();
cached_rdds.add(rd);
if (!counter.equals("-")) {
final Accumulator<Integer> c = spark_context.intAccumulator(0);
rd.foreach(new VoidFunction<MRData>() {
public void call ( MRData value ) {
if (((MR_bool)((Tuple)value).second()).get())
c.add(1); // count the true results (the results that need another loop step)
}
});
count = c.value();
rd = rd.map(get_first);
};
return new DataSet(new RDDDataSource(rd),count,rd.count());
}
/** Evaluate an MRQL physical plan using Spark and print tracing info
* @param e the physical plan
* @param env contains bindings from variables to values (MRData)
* @param rdd_env contains bindings from variables to RDDs
* @return a Spark RDD
*/
final public JavaRDD<MRData> eval ( final Tree e, final Environment env, final Environment rdd_env ) {
if (Config.trace_execution) {
tab_count += 3;
System.out.println(tabs(tab_count)+print_query(e));
};
final JavaRDD<MRData> res = evalD(e,env,rdd_env);
if (Config.trace_execution)
try {
System.out.println(tabs(tab_count)+"-> "+res.take(Config.max_bag_size_print));
tab_count -= 3;
} catch (Exception ex) {
throw new Error("Cannot collect the operator output: "+e+" ("+ex+")");
};
return res;
}
/* convert an MRQL lambda to a Spark Function */
private static FmFunction<MRData,MRData> cmap_fnc ( Tree fnc, Environment env ) {
final Broadcast<Environment> broadcast_env = spark_context.broadcast(global_env);
final org.apache.mrql.Function f = evalF(fnc,env);
return new FmFunction<MRData,MRData>() {
public Iterator<MRData> eval ( MRData value ) {
global_env = broadcast_env.value();
return ((Bag)f.eval(value)).iterator();
}
};
}
/* group-by s and then reduce by fnc; if o is true, sort after group-by */
private static JavaRDD<MRData> groupBy ( JavaRDD<MRData> s, Tree fnc, Environment env, Tree o ) {
match o {
case true: // the result must be sorted
return s.mapToPair(new PairFunction<MRData,MRData,MRData>() {
public Tuple2<MRData,MRData> call ( MRData value ) {
Tuple t = (Tuple)value;
return new Tuple2(t.first(),t.second());
}
})
.sortByKey(true)
.map(new Function<Tuple2<MRData,MRData>,MRData>() {
public MRData call ( Tuple2<MRData,MRData> value ) {
return new Tuple(value._1,new Bag(value._2));
}})
.flatMap(cmap_fnc(fnc,env));
};
return s.groupBy(get_first)
.map(new Function<Tuple2<MRData,Iterable<MRData>>,MRData> () {
public MRData call ( Tuple2<MRData,Iterable<MRData>> value ) {
final Iterator<MRData> i = value._2.iterator();
return new Tuple(value._1,new Bag(new BagIterator() {
public MRData next () {
return ((Tuple)i.next()).second();
}
public boolean hasNext () {
return i.hasNext();
}
}));
}
})
.flatMap(cmap_fnc(fnc,env));
}
static JavaRDD<MRData> containerData ( JavaPairRDD<MRContainer,MRContainer> rd ) {
final Broadcast<Environment> broadcast_env = spark_context.broadcast(global_env);
return rd.map(new Function<Tuple2<MRContainer,MRContainer>,MRData>() {
public MRData call ( Tuple2<MRContainer,MRContainer> value ) {
global_env = broadcast_env.value();
return value._2.data();
}
});
}
private static Iterator<Tuple2<MRData,MRData>> joinIterator ( final Iterator<MRData> i ) {
return new Iterator<Tuple2<MRData,MRData>> () {
public Tuple2<MRData,MRData> next () {
Tuple data = (Tuple)i.next();
return new Tuple2<MRData,MRData>(data.first(),data.second());
}
public boolean hasNext () {
return i.hasNext();
}
public void remove () {}
};
}
private static FmFunction<Iterator<MRData>,MRData> combiner_fnc ( final org.apache.mrql.Function f ) {
return new FmFunction<Iterator<MRData>,MRData>() {
public Iterator<MRData> eval ( final Iterator<MRData> i ) {
return MapReduceAlgebra.cmap(new org.apache.mrql.Function() {
public MRData eval ( MRData x ) {
final MRData key = ((Tuple)x).first();
final Iterator<MRData> it = ((Bag)f.eval(x)).iterator();
return new Bag(new BagIterator() {
public MRData next () {
return new Tuple(key,it.next());
}
public boolean hasNext () {
return it.hasNext();
}
});
}},
MapReduceAlgebra.groupBy(new Bag(new BagIterator() {
public MRData next () {
return i.next();
}
public boolean hasNext () {
return i.hasNext();
}
}))).iterator();
}
};
}
/** Evaluate MRQL physical operators using Spark
* @param e the physical plan
* @param env contains bindings from variables to values (MRData)
* @param rdd_env contains bindings from variables to RDDs
* @return a Spark RDD
*/
final public JavaRDD<MRData> evalD ( final Tree e, final Environment env, final Environment rdd_env ) {
try {
match e {
case MapAggregateReduce(`m,`r,null,_,`s,`o):
return evalD(#<MapReduce(`m,`r,`s,`o)>,env,rdd_env);
case CrossAggregateProduct(`mx,`my,`r,null,_,`x,`y):
return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env,rdd_env);
case MapAggregateReduce2(`mx,`my,`r,null,_,`x,`y,`o):
return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env,rdd_env);
case MapAggregateJoin(`mx,`my,`r,null,_,`x,`y):
return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env,rdd_env);
case cMap(`f,`s):
return eval(s,env,rdd_env).flatMap(cmap_fnc(f,env));
case MapReduce(`m,`r,`s,`o):
return groupBy(eval(s,env,rdd_env).flatMap(cmap_fnc(m,env)),r,env,o);
case MapCombineReduce(`m,`c,`r,`s,`o):
return groupBy(eval(s,env,rdd_env).flatMap(cmap_fnc(m,env))
.mapPartitions(combiner_fnc(evalF(c,env))),r,env,o);
case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,`o):
return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env,rdd_env).mapPartitions(combiner_fnc(evalF(c,env)));
case CrossProduct(`mx,`my,`r,`x,`y):
final org.apache.mrql.Function fr = evalF(r,env);
return eval(x,env,rdd_env)
.flatMap(cmap_fnc(mx,env))
.cartesian(eval(y,env,rdd_env).flatMap(cmap_fnc(my,env)))
.flatMap(new FmFunction<Tuple2<MRData,MRData>,MRData>() {
public Iterator<MRData> eval ( Tuple2<MRData,MRData> value ) {
return ((Bag)fr.eval(new Tuple(value._1,value._2))).iterator();
}
});
case MapReduce2(`mx,`my,`r,`x,`y,`o):
final org.apache.mrql.Function fr = evalF(r,env);
boolean sfx = false;
match mx {
case compiled(`fmx,lambda(`vx,bag(`mex))):
sfx = true;
mx = #<lambda(`vx,`mex)>;
case lambda(`vx,bag(`mex)):
sfx = true;
mx = #<lambda(`vx,`mex)>;
};
final org.apache.mrql.Function fx = evalF(mx,env);
JavaPairRDD<MRData,MRData> xs
= (sfx)
? eval(x,env,rdd_env).mapToPair(new PairFunction<MRData,MRData,MRData>() {
public Tuple2<MRData,MRData> call ( MRData value ) {
Tuple t = (Tuple)fx.eval(value);
return new Tuple2(t.first(),t.second());
}
})
: eval(x,env,rdd_env).flatMapToPair(new PairFmFunction<MRData,MRData,MRData>() {
public Iterator<Tuple2<MRData,MRData>> eval ( MRData value ) {
return joinIterator(((Bag)fx.eval(value)).iterator());
}
});
boolean sfy = false;
match my {
case compiled(`fmy,lambda(`vy,bag(`mey))):
sfy = true;
my = #<lambda(`vy,`mey)>;
case lambda(`vy,bag(`mey)):
sfy = true;
my = #<lambda(`vy,`mey)>;
};
final org.apache.mrql.Function fy = evalF(my,env);
JavaPairRDD<MRData,MRData> ys
= (sfy)
? eval(y,env,rdd_env).mapToPair(new PairFunction<MRData,MRData,MRData>() {
public Tuple2<MRData,MRData> call ( MRData value ) {
Tuple t = (Tuple)fy.eval(value);
return new Tuple2(t.first(),t.second());
}
})
: eval(y,env,rdd_env).flatMapToPair(new PairFmFunction<MRData,MRData,MRData>() {
public Iterator<Tuple2<MRData,MRData>> eval ( MRData value ) {
return joinIterator(((Bag)fy.eval(value)).iterator());
}
});
return xs.cogroup(ys)
.flatMap(new FmFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
public Iterator<MRData> eval ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
return ((Bag)fr.eval(new Tuple(bag(value._2._1),bag(value._2._2)))).iterator();
}
});
case GroupByJoin(`kx,`ky,`gx,`gy,`acc,`zero,`r,`x,`y,`o):
final int n = Math.max(2,(int)Math.floor(Math.sqrt(Config.nodes)));
final int m = n;
// System.err.println("Using a groupBy join on a "+n+"*"+m+" grid of partitions");
final org.apache.mrql.Function fkx = evalF(kx,env);
final org.apache.mrql.Function fky = evalF(ky,env);
final org.apache.mrql.Function fgx = evalF(gx,env);
final org.apache.mrql.Function fgy = evalF(gy,env);
final org.apache.mrql.Function fc = evalF(acc,env);
final MRData z = evalE(zero,env);
final org.apache.mrql.Function fr = evalF(r,env);
final MRData one = new MR_byte(1);
final MRData two = new MR_byte(2);
final JavaPairRDD<MRData,MRData> xs
= eval(x,env,rdd_env).flatMapToPair(new PairFmFunction<MRData,MRData,MRData>() {
public Iterator<Tuple2<MRData,MRData>> eval ( final MRData value ) {
return new Iterator<Tuple2<MRData,MRData>>() {
int i = 0;
public Tuple2<MRData,MRData> next () {
MRData key = new MR_int((fgx.eval(value).hashCode() % m)+m*i);
i++;
return new Tuple2<MRData,MRData>(key,new Tuple(one,value));
}
public boolean hasNext () {
return i < n;
}
public void remove () {}
};
}
});
final JavaPairRDD<MRData,MRData> ys
= eval(y,env,rdd_env).flatMapToPair(new PairFmFunction<MRData,MRData,MRData>() {
public Iterator<Tuple2<MRData,MRData>> eval ( final MRData value ) {
return new Iterator<Tuple2<MRData,MRData>>() {
int j = 0;
public Tuple2<MRData,MRData> next () {
MRData key = new MR_int((fgy.eval(value).hashCode() % n)*m+j);
j++;
return new Tuple2<MRData,MRData>(key,new Tuple(two,value));
}
public boolean hasNext () {
return j < m;
}
public void remove () {}
};
}
});
return xs.union(ys).groupByKey()
.mapPartitions(new FmFunction<Iterator<Tuple2<MRData,Iterable<MRData>>>,MRData>() {
public Iterator<MRData> eval ( final Iterator<Tuple2<MRData,Iterable<MRData>>> value ) {
Bag xb = new Bag();
Bag yb = new Bag();
while (value.hasNext()) {
Tuple2<MRData,Iterable<MRData>> t = value.next();
for ( MRData e: t._2 ) {
Tuple p = (Tuple)e;
if (((MR_byte)p.first()).get() == 1)
xb.add(new Tuple(t._1,p.second()));
else yb.add(new Tuple(t._1,p.second()));
}
};
final Bag b = MapReduceAlgebra.mergeGroupByJoin(fkx,fky,fgx,fgy,fc,z,fr,xb,yb);
xb = null; yb = null;
return b.iterator();
}
});
case OuterMerge(`merge,`state,`data):
final org.apache.mrql.Function fm = evalF(merge,env);
JavaPairRDD<MRData,MRData> S = eval(state,env,rdd_env)
.mapPartitionsToPair(new PairFmFunction<Iterator<MRData>,MRData,MRData>() {
public Iterator<Tuple2<MRData,MRData>> eval ( final Iterator<MRData> i ) {
return joinIterator(i);
}
},true); // do not repartition the state S
JavaPairRDD<MRData,MRData> ds = eval(data,env,rdd_env)
.mapToPair(new PairFunction<MRData,MRData,MRData>() {
public Tuple2<MRData,MRData> call ( MRData value ) {
Tuple t = (Tuple)value;
return new Tuple2(t.first(),t.second());
}
});
JavaRDD<MRData> res = S.cogroup(ds)
.flatMap(new FmFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
public Iterator<MRData> eval ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
final Iterator<MRData> ix = value._2._1.iterator();
final Iterator<MRData> iy = value._2._2.iterator();
ArrayList<MRData> a = new ArrayList<MRData>();
if (ix.hasNext())
if (iy.hasNext())
a.add(new Tuple(value._1,fm.eval(new Tuple(ix.next(),iy.next()))));
else a.add(new Tuple(value._1,ix.next()));
else if (iy.hasNext())
a.add(new Tuple(value._1,iy.next()));
return a.iterator();
}
}).cache();
cached_rdds.add(res);
return res;
case RightOuterMerge(`merge,`state,`data):
final org.apache.mrql.Function fm = evalF(merge,env);
JavaPairRDD<MRData,MRData> S = eval(state,env,rdd_env)
.mapPartitionsToPair(new PairFmFunction<Iterator<MRData>,MRData,MRData>() {
public Iterator<Tuple2<MRData,MRData>> eval ( final Iterator<MRData> i ) {
return joinIterator(i);
}
},true); // do not repartition the state S
JavaPairRDD<MRData,MRData> ds = eval(data,env,rdd_env)
.mapToPair(new PairFunction<MRData,MRData,MRData>() {
public Tuple2<MRData,MRData> call ( MRData value ) {
Tuple t = (Tuple)value;
return new Tuple2(t.first(),t.second());
}
});
JavaRDD<MRData> res = S.cogroup(ds)
.flatMap(new FmFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
public Iterator<MRData> eval ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
final Iterator<MRData> ix = value._2._1.iterator();
final Iterator<MRData> iy = value._2._2.iterator();
return ((ix.hasNext())
? ((iy.hasNext())
? new Bag(new Tuple(value._1,fm.eval(new Tuple(ix.next(),iy.next()))))
: new Bag())
: ((iy.hasNext())
? new Bag(new Tuple(value._1,iy.next()))
: new Bag())).iterator();
}
}).cache();
cached_rdds.add(res);
return res;
case MapJoin(lambda(`vx,bag(`mx)),lambda(`vy,bag(`my)),`r,`x,`y,`o):
final org.apache.mrql.Function fx = evalF(#<lambda(`vx,`mx)>,env);
final org.apache.mrql.Function fy = evalF(#<lambda(`vy,`my)>,env);
final org.apache.mrql.Function fr = evalF(r,env);
final Broadcast<Map<MRData,MRData>> ys
= spark_context.broadcast(eval(y,env,rdd_env)
.mapToPair(new PairFunction<MRData,MRData,MRData>() {
public Tuple2<MRData,MRData> call ( MRData value ) {
Tuple t = (Tuple)fy.eval(value);
return new Tuple2(t.first(),t.second());
}
}).collectAsMap());
return eval(x,env,rdd_env).mapPartitions(new FmFunction<Iterator<MRData>,MRData>() {
public Iterator<MRData> eval ( final Iterator<MRData> i ) {
final Map<MRData,MRData> m = ys.value();
return new Iterator<MRData> () {
public MRData next () {
final Tuple p = (Tuple)fx.eval(i.next());
final MRData pd = m.get(p.first());
return ((Bag)fr.eval(new Tuple(p.second(),(pd == null) ? empty_bag : pd)));
}
public boolean hasNext () {
return i.hasNext();
}
public void remove () {}
};
}
},true);
case MapJoin(`mx,`my,`r,`x,`y):
final org.apache.mrql.Function fx = evalF(mx,env);
final org.apache.mrql.Function fy = evalF(my,env);
final org.apache.mrql.Function fr = evalF(r,env);
final Broadcast<Map<MRData,MRData>> ys
= spark_context.broadcast(eval(y,env,rdd_env)
.flatMapToPair(new PairFmFunction<MRData,MRData,MRData>() {
public Iterator<Tuple2<MRData,MRData>> eval ( MRData value ) {
return joinIterator(((Bag)fy.eval(value)).iterator());
}
}).collectAsMap());
return eval(x,env,rdd_env).flatMap(new FmFunction<MRData,MRData>() {
public Iterator<MRData> eval ( MRData value ) {
final Map<MRData,MRData> m = ys.value();
final Iterator<MRData> i = ((Bag)fx.eval(value)).iterator();
return new Iterator<MRData>() {
Tuple p;
Iterator<MRData> ix = null;
public MRData next () {
return ix.next();
}
public boolean hasNext () {
if (ix != null && ix.hasNext())
return true;
while (i.hasNext()) {
p = (Tuple)i.next();
MRData pd = m.get(p.first());
Bag bb = ((Bag)fr.eval(new Tuple(p.second(),(pd == null) ? empty_bag : pd)));
ix = bb.iterator();
if (ix.hasNext())
return true;
};
return false;
}
public void remove () {}
};
}
});
case BinarySource(`file,_):
String path = ((MR_string)evalE(file,env)).get();
new BinaryDataSource(path,Plan.conf);
return containerData(spark_context.sequenceFile(file.stringValue(),
MRContainer.class,MRContainer.class,
Config.nodes));
case ParsedSource(`parser,`file,...args):
String path = ((MR_string)evalE(file,env)).get();
Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
if (p == null)
throw new Error("Unknown parser: "+parser);
new ParsedDataSource(path,p,args,Plan.conf);
dump_source_dir();
return containerData(spark_context.hadoopFile(path,SparkParsedInputFormat.class,
MRContainer.class,MRContainer.class,
Config.nodes));
case Merge(`x,`y):
return eval(x,env,rdd_env).union(eval(y,env,rdd_env));
case Repeat(lambda(`v,`b),`s,`n):
int max_num = ((MR_int)evalE(n,env)).get();
JavaRDD<MRData> rd;
JavaRDD<MRData> res = eval(s,env,rdd_env).cache();
int i = 0;
boolean cont = true;
do {
rd = eval(b,env,new Environment(v.toString(),new MR_rdd(res),rdd_env)).cache();
res.unpersist();
res = rd.map(get_first).cache();
long size = rd.count();
Integer true_results
= rd.aggregate(new Integer(0),
new Function2<Integer,MRData,Integer>() {
public Integer call ( Integer x, MRData y ) {
return (((MR_bool)((Tuple)y).second()).get()) ? x+1 : x;
}
},
new Function2<Integer,Integer,Integer>() {
public Integer call ( Integer x, Integer y ) { return x+y; }
});
i++;
cont = (true_results > 0 || size == 0) && i < max_num;
if (!Config.testing)
if (size == 0)
System.err.println("Repeat #"+i);
else System.err.println("Repeat #"+i+": "+true_results+" true results out of "+size);
rd.unpersist();
} while (cont);
cached_rdds.add(res);
return res;
case Closure(lambda(`v,`b),`s,`m):
int max_num = ((MR_int)evalE(m,env)).get();
JavaRDD<MRData> res = eval(s,env,rdd_env).cache();
long n = 0;
long old = 0;
int i = 0;
boolean cont = true;
do {
JavaRDD<MRData> rd = eval(b,env,new Environment(v.toString(),new MR_rdd(res),rdd_env)).cache();
res.unpersist();
res = rd;
old = n;
n = res.count();
i++;
if (!Config.testing)
System.err.println("Repeat #"+i+": "+(old-n)+" new records");
} while (old < n && i < max_num);
cached_rdds.add(res);
return res;
case Generator(`min,`max,`size):
DataSet ds = Plan.generator(((MR_long)evalE(min,env)).get(),
((MR_long)evalE(max,env)).get(),
((MR_long)evalE(size,env)).get());
JavaRDD<MRData> rd = null;
for ( DataSource d: ds.source )
if (rd == null)
rd = containerData(spark_context.hadoopFile(d.path,SparkGeneratorInputFormat.class,
MRContainer.class,MRContainer.class,1));
else rd = rd.union(containerData(spark_context.hadoopFile(d.path,SparkGeneratorInputFormat.class,
MRContainer.class,MRContainer.class,1)));
return rd;
case let(`v,DataSetCollect(`s),`body):
MRData x = evalE(s,env);
if (x instanceof MR_dataset)
x = new Bag(((MR_dataset)x).dataset().take(Integer.MAX_VALUE));
else if (x instanceof Bag)
((Bag)x).materialize();
new_distributed_binding(v.toString(),x);
return eval(body,new Environment(v.toString(),x,env),rdd_env);
case let(`v,`u,`body):
MRData val = evalE(u,env);
Interpreter.new_global_binding(v.toString(),val);
return eval(body,new Environment(v.toString(),val,env),rdd_env);
case Let(`v,`u,`body):
JavaRDD<MRData> val = eval(u,env,rdd_env).cache();
JavaRDD<MRData> rd = eval(body,env,new Environment(v.toString(),new MR_rdd(val),rdd_env));
cached_rdds.add(val);
return rd;
case If(`c,`x,`y):
if (((MR_bool)evalE(c,env)).get())
return eval(x,env,rdd_env);
else return eval(y,env,rdd_env);
case trace(`msg,`tp,`x):
long n = pre_trace(((MR_string)evalE(msg,env)).get());
JavaRDD<MRData> ds = evalD(x,env,rdd_env);
trace(n,tp,new Bag(ds.take(Config.max_bag_size_print)));
return ds;
case apply(`f,`arg):
MRData x = evalE(e,env);
if (x instanceof MR_dataset)
return ((RDDDataSource)((MR_dataset)x).dataset().source.get(0)).rdd;
throw new Error("Expected an RDD dataset: "+x);
case `v:
if (!v.is_variable())
fail;
MRData x = variable_lookup(v.toString(),rdd_env);
if (x != null && x instanceof MR_rdd)
return ((MR_rdd)x).rdd();
x = variable_lookup(v.toString(),global_rdds);
if (x != null && x instanceof MR_rdd)
return ((MR_rdd)x).rdd();
x = variable_lookup(v.toString(),global_env);
if (x != null)
if (x instanceof MR_dataset)
return ((RDDDataSource)((MR_dataset)x).dataset().source.get(0)).rdd;
else if (x instanceof MR_rdd)
return ((MR_rdd)x).rdd();
else if (x instanceof Bag) {
ArrayList<MRData> l = new ArrayList<MRData>();
for ( MRData a: (Bag)x )
l.add(a);
return spark_context.parallelize(l);
} else {
ArrayList<MRData> l = new ArrayList<MRData>();
l.add(x);
return spark_context.parallelize(l);
};
x = variable_lookup(v.toString(),env);
if (x != null)
if (x instanceof MR_dataset)
return ((RDDDataSource)((MR_dataset)x).dataset().source.get(0)).rdd;
else if (x instanceof Bag) {
ArrayList<MRData> l = new ArrayList<MRData>();
for ( MRData a: (Bag)x )
l.add(a);
return spark_context.parallelize(l);
} else {
ArrayList<MRData> l = new ArrayList<MRData>();
l.add(x);
return spark_context.parallelize(l);
};
throw new Error("Variable "+v+" is not bound");
};
throw new Error("Unrecognized Spark plan: "+e);
} catch (Error msg) {
if (!Config.trace)
throw new Error(msg.getMessage());
System.err.println(msg.getMessage());
throw new Error("Evaluation error in: "+print_query(e));
} catch (Exception ex) {
System.err.println(ex.getMessage());
ex.printStackTrace();
throw new Error("Evaluation error in: "+print_query(e));
}
}
}