blob: f66d422779d46630243b77d2580d9ec3a307f76c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import java.util.List;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Arrays;
import java.util.Iterator;
import java.io.*;
import java.net.URI;
import java.net.URL;
import org.apache.mrql.gen.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.flink.util.Collector;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.functions.*;
import org.apache.flink.api.common.operators.*;
import org.apache.flink.api.java.*;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.functions.*;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.client.program.ContextEnvironment;
/** Evaluates physical plans in Apache Flink mode */
public class FlinkEvaluator extends Evaluator implements Serializable {
final static URL flink_jar = FlinkEvaluator.class.getProtectionDomain().getCodeSource().getLocation();
public static ExecutionEnvironment flink_env;
public static StreamExecutionEnvironment stream_env;
// an HDFS tmp file used to hold the data source directory information in distributed mode
static String data_source_dir_name;
static String master_host = "";
static int master_port = 6123;
static String fs_default_name;
/** initialize the Flink evaluator */
final public void init ( Configuration conf ) {
DataSource.parserDirectory = new DataSource.ParserDirectory();
DataSource.parserDirectory.put("xml",FlinkXMLParser.class);
DataSource.parserDirectory.put("json",FlinkJsonParser.class);
DataSource.parserDirectory.put("line",FlinkLineParser.class);
// Disable bag materialization (it uses the Hadoop HDFS library instead of Flink's)
Config.max_materialized_bag = Long.MAX_VALUE;
if (Config.local_mode) {
flink_env = ExecutionEnvironment.createLocalEnvironment(Config.nodes);
// curently, the compiler doesn't work in local mode
Config.compile_functional_arguments = false;
fs_default_name = "file:///";
Plan.conf.set("fs.defaultFS",fs_default_name);
} else if (Config.distributed_mode) {
String master_node = System.getenv("FLINK_MASTER");
if (master_node == null)
throw new Error("Need to run the Flink application master first: $FLINK_HOME/bin/yarn-session.sh");
if (!master_node.equals("")) {
String[] m = master_node.split(":");
if (m.length != 2)
throw new Error("Need both host name and port number for the Flink application master: "+master_node);
master_host = m[0];
master_port = Integer.parseInt(m[1]);
};
if (!System.getenv("FS_DEFAULT_NAME").equals("")) {
fs_default_name = System.getenv("FS_DEFAULT_NAME");
Plan.conf.set("fs.defaultFS",fs_default_name);
} else fs_default_name = Plan.conf.get("fs.defaultFS");
data_source_dir_name = absolute_path("tmp/data_source_dir.txt");
}
}
/** shutdown the Flink evaluator */
final public void shutdown ( Configuration conf ) {
}
final public void initialize_query () {
try {
if (!Config.local_mode) {
if (Config.compile_functional_arguments) {
Plan.distribute_compiled_arguments(Plan.conf);
if (master_host.equals("")) {
ContextEnvironment ce = (ContextEnvironment)ExecutionEnvironment.getExecutionEnvironment();
List<URL> jars = Arrays.asList(flink_jar,new URL("file://"+Plan.conf.get("mrql.jar.path")));
flink_env = new ContextEnvironment(ce.getClient(),jars,new ArrayList<URL>(),
FlinkEvaluator.class.getClassLoader(),
Plan.new_path(Plan.conf));
flink_env.setParallelism(Config.nodes);
} else flink_env = ExecutionEnvironment.createRemoteEnvironment(master_host,master_port,
Config.nodes,flink_jar.toURI().getPath(),Plan.conf.get("mrql.jar.path"));
} else if (master_host.equals(""))
flink_env = ExecutionEnvironment.getExecutionEnvironment();
else flink_env = ExecutionEnvironment.createRemoteEnvironment(master_host,master_port,
Config.nodes,flink_jar.toURI().getPath());
} else {
flink_env.setDefaultLocalParallelism(Config.nodes);
if (Config.stream_window > 0)
stream_env = StreamExecutionEnvironment.createLocalEnvironment(Config.nodes)
.setBufferTimeout(Config.stream_window);
};
if (!Config.info)
flink_env.getConfig().disableSysoutLogging();
} catch (Exception ex) {
throw new Error("Cannot initialize the Flink evaluator: "+ex);
}
}
final public Configuration new_configuration () {
return new Configuration();
}
/** return the FileInputFormat for parsed files (CSV, XML, JSON, etc) */
final public Class<? extends MRQLFileInputFormat> parsedInputFormat () {
return FlinkParsedInputFormat.class;
}
/** return the FileInputFormat for binary files */
final public Class<? extends MRQLFileInputFormat> binaryInputFormat () {
return FlinkSequenceInputFormat.class;
}
/** return the FileInputFormat for data generator files */
final public Class<? extends MRQLFileInputFormat> generatorInputFormat () {
return FlinkGeneratorInputFormat.class;
}
/** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
final public void streaming ( Tree plan, Environment env, Function f ) {
FlinkStreaming.evaluate(plan,env,f);
}
/** returns the absolute path relative to the directory that contains the MRQL executable */
public static String absolute_path ( String path) {
try {
URI uri = new URI(path);
if (uri.getScheme() == null)
if (Config.hadoop_mode && !Config.local_mode)
if (uri.toString().startsWith("/"))
uri = new URI(fs_default_name+uri);
else uri = new URI(fs_default_name+"/user/"+System.getProperty("user.name")+"/"+uri);
else if (uri.toString().startsWith("/"))
uri = new URI("file:"+uri);
else uri = new URI("file:"+System.getProperty("user.dir")+"/"+uri);
uri = uri.normalize();
return uri.toString();
} catch (Exception ex) {
throw new Error("Wrong pathname: "+path);
}
}
/** for dumped data to a file, return the MRQL type of the data */
@Override
public Tree get_type ( String file ) {
return super.get_type(absolute_path(file));
}
/** dump MRQL data into a Hadoop Sequence file */
@Override
public void dump ( String file, Tree type, MRData data ) throws Exception {
super.dump(absolute_path(file),type,data);
}
/** dump MRQL data into a text CVS file */
@Override
public void dump_text ( String file, Tree type, MRData data ) throws Exception {
int ps = Config.max_bag_size_print;
Config.max_bag_size_print = -1;
Path path = new Path(absolute_path(file));
FileSystem fs = path.getFileSystem();
final PrintStream out = new PrintStream(fs.create(path,true));
if (data instanceof MR_dataset)
data = Plan.collect(((MR_dataset)data).dataset());
if (Translator.collection_type(type)) {
Tree tp = ((Node)type).children().head();
if (tp instanceof Node && ((Node)tp).name().equals("tuple")) {
Trees ts = ((Node)tp).children();
for ( MRData x: (Bag)data ) {
Tuple t = (Tuple)x;
out.print(print(t.get((short)0),ts.nth(0)));
for ( short i = 1; i < t.size(); i++ )
out.print(","+print(t.get(i),ts.nth(i)));
out.println();
}
} else for ( MRData x: (Bag)data )
out.println(print(x,tp));
} else out.println(print(data,query_type));
Config.max_bag_size_print = ps;
out.close();
}
private static MRData aggregate ( DataSet<FData> data_set, MRData zero, Tree acc_fnc, Tree merge_fnc, Tree type ) {
DataSet<FData> d = data_set.flatMap(new FlinkDataSource.reduce_mapper(zero,acc_fnc.toString(),
merge_fnc.toString()));
//d.print(); // needs a datasink (prints the aggregation value)
d.output(new DiscardingOutputFormat());
try {
// due to a Flink bug, we cannot return a custom object (FData) from an accumulator
String value = FlinkEvaluator.flink_env.execute("MRQL aggregator").getAccumulatorResult("reducer");
Tree tp = TopLevel.query_type;
MRData res = MRQL.query(value+" as "+type); // parse the value using the MRQL query parser (ugly)
TopLevel.query_type = tp;
return res;
} catch (Exception ex) {
throw new Error("Evaluation error during aggregation: "+ex);
}
}
/* broadcast the loop value to all nodes */
public static final class set_environment extends RichMapFunction<FData,FData> {
String var;
set_environment ( String v ) { var = v; }
@Override
public void open ( org.apache.flink.configuration.Configuration parameters ) throws Exception {
final Bag value = bag(getRuntimeContext().getBroadcastVariable(var));
Interpreter.new_global_binding(var.toString(),value);
}
@Override
public FData map ( FData value ) throws Exception {
return new FData(((Tuple)value.data()).first());
}
}
private static Bag bag ( Collection<Object> values ) {
Bag s = new Bag();
for ( Object value: values )
s.add(((FData)value).data());
return s;
}
/* convert a Bag into a DataSet */
private static DataSet<FData> dataset ( Bag s ) {
ArrayList<FData> a = new ArrayList(s.size());
for ( MRData x: s )
a.add(new FData(x));
return flink_env.fromCollection(a);
}
/** Coerce a persistent collection to a Bag */
public Bag toBag ( MRData data ) {
try {
if (data instanceof MR_flink) {
final Iterator<FData> i = ((MR_flink)data).flink().collect().iterator();
return new Bag(new BagIterator() {
public MRData next () {
return i.next().data();
}
public boolean hasNext () {
return i.hasNext();
}
});
};
return (Bag)data;
} catch (Exception ex) {
throw new Error("Cannot coerce "+data+" to a Bag: "+ex);
}
}
/** The Aggregate physical operator
* @param merge the accumulator function from (T,T) to T
* @param zero the zero element of type T
* @param plan the plan that constructs the dataset that contains the bag of values {T}
* @param env contains bindings fro variables to values (MRData)
* @return the aggregation result of type T
*/
final public MRData aggregate ( Tree merge,
Tree zero,
Tree plan,
Environment env ) throws Exception {
final MRData z = evalE(zero,env);
Tree type = TypeInference.type_inference(zero);
match plan {
case repeat(lambda(`v,`b),`s,`n):
// Special case for Flink: repetition where the loop result must be shared among nodes
final int max_num = ((MR_int)evalE(n,null)).get();
final Bag init = (Bag)evalE(s,env);
Interpreter.new_global_binding(v.toString(),init);
final DataSet<FData> ds = dataset(init);
final IterativeDataSet<FData> startOfIteration = ds.iterate(max_num);
final DataSet<FData> step = eval(b,env);
final DataSet<FData> toBeFedBack = step.map(new set_environment(v.toString()))
.withBroadcastSet(startOfIteration,v.toString());
// Flink doesn't allow termination criteria for kmeans-like iterations:
// (java.lang.IllegalStateException: Error: Iterative task without a single iterative input)
//final DataSet<FData> terminationCriterion = step.filter(new repeat_filter());
final DataSet<FData> res = startOfIteration.closeWith(toBeFedBack);//,terminationCriterion);
String path = absolute_path(Plan.new_path(Plan.conf));
res.write(new FlinkOutputFormat(),path);
flink_env.execute("MRQL repeat");
Bag bs = new Bag();
for ( MRData x: new FlinkDataSource(res,path,false).take(-1) )
bs.add(x);
return bs;
case AggregateMap(`m,`acc,_,`s):
final DataSet<FData> ds = eval(s,env).mapPartition(new aggregate_partition(m,acc,z));
return aggregate(ds,z,merge,merge,type);
case MapAggregateReduce(`m,`r,`acc,_,`s,`o):
if (acc.equals(#<null>))
fail;
final DataSet<FData> ds = eval(s,env).flatMap(cmap_fnc(m));
final DataSet<FData> gs = ds.groupBy(new KeySelector<FData,FData>() {
@Override
public FData getKey ( FData value ) {
return new FData(((Tuple)value.data()).first());
}
}).reduceGroup(new groupBy_combiner_reducer(r,acc,z));
return aggregate(gs,z,merge,merge,type);
case CrossAggregateProduct(`mx,`my,`r,`acc,_,`x,`y):
if (acc.equals(#<null>))
fail;
final DataSet<FData> xs = eval(x,env).flatMap(cmap_fnc(mx));
final DataSet<FData> ys = eval(y,env).flatMap(cmap_fnc(my));
final DataSet<FData> ds = xs.cross(ys).with(new cross_combiner_reducer(r,acc,z));
return aggregate(ds,z,merge,merge,type);
case MapAggregateReduce2(`mx,`my,`r,`acc,_,`x,`y,`o):
if (acc.equals(#<null>))
fail;
final DataSet<FData> xs = eval(x,env).flatMap(cmap_fnc(mx));
final DataSet<FData> ys = eval(y,env).flatMap(cmap_fnc(my));
final DataSet<FData> ds = xs.coGroup(ys)
.where(new join_key())
.equalTo(new join_key())
.with(new join_combiner_reducer(r,acc,z));
return aggregate(ds,z,merge,merge,type);
case MapAggregateJoin(`mx,`my,`r,`acc,_,`x,`y):
if (acc.equals(#<null>))
fail;
return aggregate(evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env),z,acc,merge,type);
};
throw new Error("Unrecognized aggregation: "+plan);
}
/** Evaluate a loop a fixed number of times. Doesn't use Flink iterations */
final public Tuple loop ( Tree e, Environment env ) throws Exception {
match e {
case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
final int limit = ((MR_int)evalE(num,env)).get();
MR_flink[] s = new MR_flink[vs.length()];
for ( int i = 0; i < vs.length(); i++ )
s[i] = new MR_flink(eval(ss.nth(i),env));
for ( int n = 0; n < limit; n++ ) {
Environment nenv = env;
for ( int i = 0; i < vs.length(); i ++ )
nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
for ( int i = 0; i < vs.length(); i ++ )
s[i] = new MR_flink(eval(bs.nth(i),nenv));
};
return new Tuple(s);
};
throw new Error("Wrong Loop format: "+e);
}
private static boolean needs_sorting ( Tree e ) {
match e {
case MapReduce(`m,`r,`s,true): return true;
case MapCombineReduce(`m,`c,`r,`s,true): return true;
};
return false;
}
/** Evaluate an MRQL physical plan and print tracing info
* @param e the physical plan
* @param env contains bindings fro variables to values (MRData)
* @return an MRQL DataSet (stored in HDFS)
*/
final public org.apache.mrql.DataSet eval ( final Tree e,
final Environment env,
final String counter ) {
DataSet<FData> d = eval(e,env);
try {
String path = absolute_path(Plan.new_path(Plan.conf));
d.write(new FlinkOutputFormat(),path);
flink_env.execute("MRQL query");
return new org.apache.mrql.DataSet(new FlinkDataSource(d,path,needs_sorting(e)),0L,0L);
} catch (Exception ex) {
System.err.println(ex.getMessage());
throw new Error("Cannot evaluate the query: "+e);
}
}
/** Evaluate an MRQL physical plan using Flink and print tracing info
* @param e the physical plan
* @param env contains bindings from variables to values (MRData)
* @return a Flink DataSet
*/
final public DataSet<FData> eval ( final Tree e, final Environment env ) {
if (Config.trace_execution) {
tab_count += 3;
System.out.println(tabs(tab_count)+print_query(e));
};
final DataSet<FData> res = evalD(e,env);
if (Config.trace_execution)
try {
System.out.println(tabs(tab_count)+"->");
res.first(Config.max_bag_size_print).print();
tab_count -= 3;
} catch (Exception ex) {
System.err.println(ex.getMessage());
throw new Error("Cannot collect the operator output: "+e);
};
return res;
}
/* convert an MRQL lambda to a Flink Function */
private static RichFlatMapFunction<FData,FData> cmap_fnc ( final Tree fnc ) {
final Function f = evalF(fnc,null);
return new RichFlatMapFunction<FData,FData>() {
@Override
public void flatMap ( FData value, Collector<FData> out ) throws Exception {
for ( MRData e: (Bag)f.eval(value.data()) )
out.collect(new FData(e));
}
};
}
/* convert the stream of pairs to a bag of their second element */
private static Bag bag ( Iterable<FData> values ) {
Bag s = new Bag();
for ( FData value: values )
s.add(((Tuple)value.data()).second());
return s;
}
/* group-by s and then reduce by reduce_fnc (optional: use combine_fnc) */
private static DataSet<FData> groupBy ( DataSet<FData> s, Tree combine_fnc, Tree reduce_fnc ) {
final Function combiner = (combine_fnc == null) ? null : evalF(combine_fnc,null);
final Function reducer = evalF(reduce_fnc,null);
DataSet<FData> ds = s;
if (combiner != null) {
ds = ds.groupBy(new KeySelector<FData,FData>() {
@Override
public FData getKey ( FData value ) {
return new FData(((Tuple)value.data()).first());
}
}).combineGroup(new RichGroupCombineFunction<FData,FData>() {
@Override
public void combine ( Iterable<FData> values, Collector<FData> out ) throws Exception {
Bag c = null;
MRData key = null;
for ( FData v: values )
if (c == null) {
c = new Bag(((Tuple)v.data()).second());
key = ((Tuple)v.data()).first();
} else
c = (Bag)combiner.eval(new Tuple(key,c.add_element(((Tuple)v.data()).second())));
for ( MRData x: c )
out.collect(new FData(new Tuple(key,x)));
}
});
};
return ds.groupBy(new KeySelector<FData,FData>() {
@Override
public FData getKey ( FData value ) {
return new FData(((Tuple)value.data()).first());
}
}).reduceGroup(new GroupReduceFunction<FData,FData>() {
@Override
public void reduce ( final Iterable<FData> values, Collector<FData> out ) {
Bag s = new Bag();
MRData key = null;
for ( FData value: values ) {
Tuple t = (Tuple)value.data();
key = t.first();
s.add(t.second());
};
for ( MRData v: (Bag)reducer.eval(new Tuple(key,s)) )
out.collect(new FData(v));
}
});
}
/* group-by and sort s and then reduce by reduce_fnc (optional: use combine_fnc) */
private static DataSet<FData> sortBy ( DataSet<FData> s, Tree combine_fnc, Tree reduce_fnc ) {
final Function combiner = (combine_fnc == null) ? null : evalF(combine_fnc,null);
final Function reducer = evalF(reduce_fnc,null);
DataSet<Tuple2<FData,FData>> ds = s.map(new RichMapFunction<FData,Tuple2<FData,FData>>() {
@Override
public Tuple2<FData,FData> map ( FData value ) {
Tuple t = (Tuple)value.data();
return new Tuple2<FData,FData>(new FData(t.first()),new FData(t.second()));
}
});
if (combiner != null) {
ds = ds.groupBy(0).combineGroup(new GroupCombineFunction<Tuple2<FData,FData>,Tuple2<FData,FData>>() {
@Override
public void combine ( Iterable<Tuple2<FData,FData>> values,
Collector<Tuple2<FData,FData>> out ) throws Exception {
Bag c = null;
FData key = null;
for ( Tuple2<FData,FData> v: values )
if (c == null) {
c = new Bag(v.f1.data());
key = v.f0;
} else
c = (Bag)combiner.eval(new Tuple(key.data(),c.add_element(v.f0.data())));
for ( MRData x: c )
out.collect(new Tuple2<FData,FData>(key,new FData(x)));
}
});
};
return ds.groupBy(0).sortGroup(0,Order.ASCENDING)
.reduceGroup(new RichGroupReduceFunction<Tuple2<FData,FData>,FData>() {
@Override
public void reduce ( final Iterable<Tuple2<FData,FData>> values, Collector<FData> out ) {
Bag s = new Bag();
MRData key = null;
for ( Tuple2<FData,FData> value: values ) {
key = value.f0.data();
s.add(value.f1.data());
};
for ( MRData v: (Bag)reducer.eval(new Tuple(key,s)) )
out.collect(new FData(new Tuple(key,v)));
}
});
}
public static final class join_key implements KeySelector<FData,FData> {
@Override
public FData getKey ( FData value ) {
return new FData(((Tuple)value.data()).first());
}
}
public static final class aggregate_partition extends RichMapPartitionFunction<FData,FData> {
final Function m;
final Function acc;
final MRData zero;
aggregate_partition ( Tree m, Tree acc, MRData zero ) {
this.m = evalF(m,null);
this.acc = evalF(acc,null);
this.zero = zero;
}
@Override
public void mapPartition ( Iterable<FData> values, Collector<FData> out ) throws Exception {
MRData aggregated_value = zero;
for ( FData value: values )
for ( MRData v: (Bag)m.eval(value.data()) )
aggregated_value = acc.eval(new Tuple(aggregated_value,v));
out.collect(new FData(aggregated_value));
}
}
public static final class groupBy_combiner_reducer extends RichGroupReduceFunction<FData,FData> {
final Function r;
final Function acc;
final MRData zero;
groupBy_combiner_reducer ( Tree r, Tree acc, MRData zero ) {
this.r = evalF(r,null);
this.acc = evalF(acc,null);
this.zero = zero;
}
@Override
public void reduce ( final Iterable<FData> values, Collector<FData> out ) {
MRData value = zero;
for ( MRData v: (Bag)r.eval(new Tuple(bag(values))) )
value = acc.eval(new Tuple(value,v));
out.collect(new FData(value));
}
}
public static final class join_reducer extends RichCoGroupFunction<FData,FData,FData> {
final Function f;
join_reducer ( Tree fnc ) {
f = evalF(fnc,null);
}
@Override
public void coGroup ( Iterable<FData> xs, Iterable<FData> ys, Collector<FData> out ) {
for ( MRData v: (Bag)f.eval(new Tuple(bag(xs),bag(ys))) )
out.collect(new FData(v));
}
}
public static final class join_combiner_reducer extends RichCoGroupFunction<FData,FData,FData> {
final Function r;
final Function acc;
final MRData zero;
join_combiner_reducer ( Tree r, Tree acc, MRData zero ) {
this.r = evalF(r,null);
this.acc = evalF(acc,null);
this.zero = zero;
}
@Override
public void coGroup ( Iterable<FData> xs, Iterable<FData> ys, Collector<FData> out ) {
MRData value = zero;
for ( MRData v: (Bag)r.eval(new Tuple(bag(xs),bag(ys))) )
value = acc.eval(new Tuple(value,v));
out.collect(new FData(value));
}
}
public static final class cross_reducer extends RichCrossFunction<FData,FData,FData> {
final Function f;
cross_reducer ( Tree fnc ) {
f = evalF(fnc,null);
}
@Override
public FData cross ( FData x, FData y ) throws Exception {
return new FData(f.eval(new Tuple(x.data(),y.data())));
}
}
public static final class cross_combiner_reducer extends RichCrossFunction<FData,FData,FData> {
final Function r;
final Function acc;
final MRData zero;
cross_combiner_reducer ( Tree r, Tree acc, MRData zero ) {
this.r = evalF(r,null);
this.acc = evalF(acc,null);
this.zero = zero;
}
@Override
public FData cross ( FData x, FData y ) throws Exception {
return new FData(acc.eval(new Tuple(zero,r.eval(new Tuple(x.data(),y.data())))));
}
}
public static final class mapjoin_reducer extends RichJoinFunction<FData,FData,FData> {
final Function f;
mapjoin_reducer ( Tree fnc ) {
f = evalF(fnc,null);
}
@Override
public FData join ( FData x, FData y ) throws Exception {
return new FData(f.eval(new Tuple(x.data(),y.data())));
}
}
/** restore the global bindings at the client node */
public static final class restore_global_functions extends RichMapFunction<FData,FData> {
Environment client_env;
restore_global_functions () {
client_env = Interpreter.global_env;
}
@Override
public void open ( org.apache.flink.configuration.Configuration parameters ) throws Exception {
Interpreter.set_global_bindings(client_env);
}
@Override
public FData map ( FData value ) throws Exception { return value; }
}
public static final class generator_mapper extends RichMapFunction<Long,FData> {
@Override
public FData map ( Long value ) throws Exception {
return new FData(new MR_long(value));
}
}
public static final class parsed_mapper extends RichFlatMapFunction<String,FData> {
final Parser parser;
parsed_mapper ( Class<? extends Parser> parser_class ) throws Exception {
parser = parser_class.newInstance();
}
@Override
public void flatMap ( String value, Collector<FData> out ) throws Exception {
for ( MRData e: parser.parse(value) )
out.collect(new FData(e));
}
}
public static final class group_join_left extends RichFlatMapFunction<FData,FData> {
final MRData one = new MR_byte(1);
final Function fgx;
final int m, n;
group_join_left ( Tree gx, int m, int n ) {
this.m = m;
this.n = n;
fgx = evalF(gx,null);
}
@Override
public void flatMap ( FData value, Collector<FData> out ) throws Exception {
for ( int i = 0; i < n; i++ ) {
final MRData key = new MR_int((fgx.eval(value.data()).hashCode() % m)+m*i);
out.collect(new FData(new Tuple(key,new Tuple(one,value.data()))));
}
}
}
public static final class group_join_right extends RichFlatMapFunction<FData,FData> {
final MRData two = new MR_byte(2);
final Function fgy;
final int m, n;
group_join_right ( Tree gy, int m, int n ) {
this.m = m;
this.n = n;
fgy = evalF(gy,null);
}
@Override
public void flatMap ( FData value, Collector<FData> out ) throws Exception {
for ( int j = 0; j < m; j++ ) {
final MRData key = new MR_int((fgy.eval(value.data()).hashCode() % n)*m+j);
out.collect(new FData(new Tuple(key,new Tuple(two,value.data()))));
}
}
}
public static final class group_join_reducer extends RichGroupReduceFunction<FData,FData> {
final Function fkx, fky, fgx, fgy, fc, fr;
final MRData z;
group_join_reducer ( Tree kx, Tree ky, Tree gx, Tree gy, Tree c, Tree r, Tree zero ) {
fkx = evalF(kx,null); fky = evalF(ky,null); fgx = evalF(gx,null); fgy = evalF(gy,null);
fr = evalF(r,null); fc = evalF(c,null); z = evalE(zero,null);
}
@Override
public void reduce ( final Iterable<FData> values, Collector<FData> out ) {
Bag xb = new Bag();
Bag yb = new Bag();
for ( FData value: values ) {
final Tuple t = (Tuple)value.data();
final Tuple p = (Tuple)t.second();
if (((MR_byte)p.first()).get() == 1)
xb.add(new Tuple(t.first(),p.second()));
else yb.add(new Tuple(t.first(),p.second()));
}
final Bag b = MapReduceAlgebra.mergeGroupByJoin(fkx,fky,fgx,fgy,fc,z,fr,xb,yb);
for ( MRData v: b )
out.collect(new FData(v));
};
}
public static final class repeat_values extends RichMapFunction<FData,FData> {
@Override
public FData map ( FData value ) throws Exception {
return new FData(((Tuple)value.data()).first());
}
}
public static final class repeat_filter extends RichFilterFunction<FData> {
@Override
public boolean filter ( FData value ) throws Exception {
return ((MR_bool)(((Tuple)value.data()).second())).get();
}
}
/** Evaluate a MRQL physical plan
* @param e the physical plan
* @param env contains bindings from variables to values (MRData)
* @return a Flink DataSet
*/
final private DataSet<FData> evalD ( final Tree e, final Environment env ) {
try {
match e {
case MapAggregateReduce(`m,`r,null,_,`s,`o):
return evalD(#<MapReduce(`m,`r,`s,`o)>,env);
case CrossAggregateProduct(`mx,`my,`r,null,_,`x,`y):
return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env);
case MapAggregateReduce2(`mx,`my,`r,null,_,`x,`y,`o):
return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env);
case MapAggregateJoin(`mx,`my,`r,null,_,`x,`y):
return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env);
case cMap(`f,`s):
return eval(s,env).flatMap(cmap_fnc(f));
case MapReduce(`m,`r,`s,true):
return sortBy(eval(s,env).flatMap(cmap_fnc(m)),null,r);
case MapReduce(`m,`r,`s,`o):
return groupBy(eval(s,env).flatMap(cmap_fnc(m)),null,r);
case MapCombineReduce(`m,`c,`r,`s,true):
return sortBy(eval(s,env).flatMap(cmap_fnc(m)),c,r);
case MapCombineReduce(`m,`c,`r,`s,`o):
return groupBy(eval(s,env).flatMap(cmap_fnc(m)),c,r);
case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,`o):
return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env);
case MapReduce2(`mx,`my,`r,`x,`y,`o):
final DataSet<FData> xs = eval(x,env).flatMap(cmap_fnc(mx));
final DataSet<FData> ys = eval(y,env).flatMap(cmap_fnc(my));
final DataSet<FData> ds = xs.coGroup(ys)
.where(new join_key())
.equalTo(new join_key())
.with(new join_reducer(r));
if (o.equals(#<true>))
return sortBy(ds,null,#<lambda(x,bag(x))>);
else return ds;
case MapJoin(`mx,`my,`r,`x,`y):
final DataSet<FData> xs = eval(x,env).flatMap(cmap_fnc(mx));
final DataSet<FData> ys = eval(y,env).flatMap(cmap_fnc(my));
return xs.joinWithTiny(ys)
.where(new join_key())
.equalTo(new join_key())
.with(new mapjoin_reducer(r));
case CrossProduct(`mx,`my,`r,`x,`y):
final DataSet<FData> xs = eval(x,env).flatMap(cmap_fnc(mx));
final DataSet<FData> ys = eval(y,env).flatMap(cmap_fnc(my));
return xs.cross(ys).with(new cross_reducer(r))
.flatMap(cmap_fnc(#<lambda(x,x)>));
case GroupByJoin(`kx,`ky,`gx,`gy,`acc,`zero,`r,`x,`y,`o):
final int n = Math.max(2,(int)Math.floor(Math.sqrt(Config.nodes)));
final int m = n;
// System.err.println("Using a groupBy join on a "+n+"*"+m+" grid of partitions");
final DataSet<FData> xs = eval(x,env).flatMap(new group_join_left(gx,m,n));
final DataSet<FData> ys = eval(y,env).flatMap(new group_join_right(gy,m,n));
return xs.union(ys).groupBy(new join_key())
.reduceGroup(new group_join_reducer(kx,ky,gx,gy,acc,r,zero));
case Merge(`x,`y):
return eval(x,env).union(eval(y,env));
case Generator(`min,`max,`size):
final long from = ((MR_long)evalE(min,null)).get();
final long to = ((MR_long)evalE(max,null)).get();
return flink_env.generateSequence(from,to).map(new generator_mapper())
.map(new restore_global_functions());
case BinarySource(`file,_):
final String path = absolute_path(((MR_string)evalE(file,null)).get());
new BinaryDataSource(path,Plan.conf);
final FileInputFormat<FData> sf = new FlinkSequenceInputFormat().inputFormat(path);
return flink_env.createInput(sf).map(new restore_global_functions());
case ParsedSource(`parser,`file,...args):
final String path = absolute_path(((MR_string)evalE(file,null)).get());
final Class<? extends Parser> parser_class = DataSource.parserDirectory.get(parser.toString());
if (parser_class == null)
throw new Error("Unknown parser: "+parser);
final DataSource ds = new FlinkParsedDataSource(path,parser_class,args);
return flink_env.readFile(new FlinkParsedInputFormat.ParsedInputFormat(path),path)
.map(new restore_global_functions());
case Repeat(lambda(`v,`b),`s,`n):
final int max_num = ((MR_int)evalE(n,null)).get();
final IterativeDataSet<FData> startOfIteration = eval(s,env).iterate(max_num);
final DataSet<FData> step = eval(b,new Environment(v.toString(),new MR_flink(startOfIteration),env));
final DataSet<FData> toBeFedBack = step.map(new repeat_values());
final DataSet<FData> terminationCriterion = step.filter(new repeat_filter());
return startOfIteration.closeWith(toBeFedBack,terminationCriterion);
case Closure(lambda(`v,`b),`s,`n):
final int max_num = ((MR_int)evalE(n,null)).get();
IterativeDataSet<FData> startOfIteration = eval(s,env).iterate(max_num);
DataSet<FData> toBeFedBack = eval(b,new Environment(v.toString(),new MR_flink(startOfIteration),env));
return startOfIteration.closeWith(toBeFedBack);
case let(`v,DataSetCollect(`s),`body):
MRData x = evalE(s,env);
if (x instanceof MR_dataset)
x = new Bag(((MR_dataset)x).dataset().take(Integer.MAX_VALUE));
else if (x instanceof Bag)
((Bag)x).materialize();
new_distributed_binding(v.toString(),x);
return eval(body,new Environment(v.toString(),x,env));
case let(`v,`u,`body):
return eval(body,new Environment(v.toString(),evalE(u,null),env));
case Let(`v,`u,`body):
return eval(body,new Environment(v.toString(),new MR_flink(eval(u,env)),env));
case If(`c,`x,`y):
if (((MR_bool)evalE(c,env)).get())
return eval(x,env);
else return eval(y,env);
case nth(`u,`n):
MRData x = ((Tuple)evalE(u,env)).get((int)n.longValue());
if (x instanceof MR_flink)
return ((MR_flink)x).flink();
throw new Error("Evaluation error in: "+print_query(e));
case `v:
if (!v.is_variable())
fail;
MRData x = variable_lookup(v.toString(),env);
if (x != null)
if (x instanceof MR_flink)
return ((MR_flink)x).flink();
else new Error("Variable "+v+" is of type: "+x);
x = variable_lookup(v.toString(),global_env);
if (x != null)
if (x instanceof MR_flink)
return ((MR_flink)x).flink();
else if (x instanceof MR_dataset)
return dataset(new Bag(((MR_dataset)x).dataset.take(Integer.MAX_VALUE)))
.map(new restore_global_functions());
else new Error("Variable "+v+" is of type: "+x);
throw new Error("Variable "+v+" is not bound");
};
throw new Error("Cannot evaluate the Flink plan: "+e);
} catch (Error msg) {
if (!Config.trace)
throw new Error(msg.getMessage());
System.err.println(msg.getMessage());
msg.printStackTrace();
throw new Error("Evaluation error in: "+print_query(e));
} catch (Exception ex) {
System.err.println(ex.getMessage());
ex.printStackTrace();
throw new Error("Evaluation error in: "+print_query(e));
}
}
}