[MRQL-89] Support Flink 1.0 and fix environment serialization for Flink
diff --git a/conf/mrql-env.sh b/conf/mrql-env.sh
index 135d96c..921f30d 100644
--- a/conf/mrql-env.sh
+++ b/conf/mrql-env.sh
@@ -93,8 +93,8 @@
SPARK_EXECUTOR_MEMORY=1G
-# Optional: Flink configuration. Supports versions 0.10.1 and 0.10.2
-FLINK_VERSION=0.10.2
+# Optional: Flink configuration. Supports version 1.0.2
+FLINK_VERSION=1.0.2
# Flink installation directory
FLINK_HOME=${HOME}/flink-${FLINK_VERSION}
# number of slots per TaskManager (typically, the number of CPUs per machine)
diff --git a/core/src/main/java/org/apache/mrql/Environment.java b/core/src/main/java/org/apache/mrql/Environment.java
index d848e7c..d1f9ddc 100644
--- a/core/src/main/java/org/apache/mrql/Environment.java
+++ b/core/src/main/java/org/apache/mrql/Environment.java
@@ -27,6 +27,8 @@
public MRData value;
public Environment next;
+ Environment () {}
+
Environment ( String n, MRData v, Environment next ) {
name = n;
value = v;
@@ -49,12 +51,31 @@
return s+" ]";
}
- private void writeObject(ObjectOutputStream out) throws IOException {
- out.defaultWriteObject();
+ final static MRData one = new MR_byte(1);
+
+ private void writeObject ( ObjectOutputStream out ) throws IOException {
+ if (value == null || value instanceof Lambda) {
+ out.writeUTF("");
+ one.write(out);
+ } else {
+ out.writeUTF(name);
+ value.write(out);
+ };
+ if (next == null)
+ out.writeByte(0);
+ else {
+ out.writeByte(1);
+ next.writeObject(out);
+ }
}
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- in.defaultReadObject();
+ private void readObject ( ObjectInputStream in ) throws IOException, ClassNotFoundException {
+ name = in.readUTF();
name = Tree.add(name);
+ value = MRContainer.read(in);
+ if ( in.readByte() > 0 ) {
+ next = new Environment();
+ next.readObject(in);
+ } else next = null;
}
}
diff --git a/core/src/main/java/org/apache/mrql/Interpreter.gen b/core/src/main/java/org/apache/mrql/Interpreter.gen
index 9d99e8b..054963c 100644
--- a/core/src/main/java/org/apache/mrql/Interpreter.gen
+++ b/core/src/main/java/org/apache/mrql/Interpreter.gen
@@ -593,7 +593,7 @@
if (s != null)
try {
Tree ft = Tree.parse(s);
- TopLevel.store(f.toString(),ft);
+ //TopLevel.store(f.toString(),ft);
fnc = evalE(ft,env);
new_global_binding(f.toString(),fnc);
} catch (Exception ex) {
diff --git a/core/src/main/java/org/apache/mrql/MR_dataset.java b/core/src/main/java/org/apache/mrql/MR_dataset.java
index abc81cd..6515b1d 100644
--- a/core/src/main/java/org/apache/mrql/MR_dataset.java
+++ b/core/src/main/java/org/apache/mrql/MR_dataset.java
@@ -35,7 +35,7 @@
public DataSet dataset () { return dataset; }
public void write ( DataOutput out ) throws IOException {
- throw new Error("DataSets are not serializable");
+ new Bag(dataset.take(Integer.MAX_VALUE)).write(out);
}
public void readFields ( DataInput in ) throws IOException {
diff --git a/core/src/main/java/org/apache/mrql/TopLevel.gen b/core/src/main/java/org/apache/mrql/TopLevel.gen
index 5838ed5..13f4d68 100644
--- a/core/src/main/java/org/apache/mrql/TopLevel.gen
+++ b/core/src/main/java/org/apache/mrql/TopLevel.gen
@@ -57,6 +57,7 @@
public static MRData expression ( Tree e, boolean print ) {
try {
Tree plan = translate_expression(e);
+ Tree type = query_type;
query_plan = plan;
tab_count = -3;
trace_count = 0;
@@ -69,8 +70,8 @@
if (!Config.quiet_execution)
System.out.println("Result:");
if (!Config.hadoop_mode && Config.bsp_mode && memory_parsed_source(plan))
- System.out.println(print(((Tuple)((Bag)res).get(0)).second(),query_type));
- else System.out.println(print(res,query_type));
+ System.out.println(print(((Tuple)((Bag)res).get(0)).second(),type));
+ else System.out.println(print(res,type));
} return res;
} catch (Exception x) {
if (x.getMessage() != null)
diff --git a/flink/pom.xml b/flink/pom.xml
index 4be03d6..3ef5b00 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -50,12 +50,12 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
+ <artifactId>flink-clients_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java</artifactId>
+ <artifactId>flink-streaming-java_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
diff --git a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
index eb5098a..ba5cf1b 100644
--- a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
+++ b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
@@ -99,7 +99,8 @@
ContextEnvironment ce = (ContextEnvironment)ExecutionEnvironment.getExecutionEnvironment();
List<URL> jars = Arrays.asList(flink_jar,new URL("file://"+Plan.conf.get("mrql.jar.path")));
flink_env = new ContextEnvironment(ce.getClient(),jars,new ArrayList<URL>(),
- FlinkEvaluator.class.getClassLoader(),true);
+ FlinkEvaluator.class.getClassLoader(),
+ Plan.new_path(Plan.conf));
flink_env.setParallelism(Config.nodes);
} else flink_env = ExecutionEnvironment.createRemoteEnvironment(master_host,master_port,
Config.nodes,flink_jar.toURI().getPath(),Plan.conf.get("mrql.jar.path"));
@@ -442,24 +443,14 @@
private static DataSet<FData> groupBy ( DataSet<FData> s, Tree combine_fnc, Tree reduce_fnc ) {
final Function combiner = (combine_fnc == null) ? null : evalF(combine_fnc,null);
final Function reducer = evalF(reduce_fnc,null);
- GroupReduceOperator<FData,FData> ro = s.groupBy(new KeySelector<FData,FData>() {
- @Override
- public FData getKey ( FData value ) {
- return new FData(((Tuple)value.data()).first());
- }
- }).reduceGroup(new RichGroupReduceFunction<FData,FData>() {
- @Override
- public void reduce ( final Iterable<FData> values, Collector<FData> out ) {
- Bag s = new Bag();
- MRData key = null;
- for ( FData value: values ) {
- Tuple t = (Tuple)value.data();
- key = t.first();
- s.add(t.second());
- };
- for ( MRData v: (Bag)reducer.eval(new Tuple(key,s)) )
- out.collect(new FData(v));
- }
+ DataSet<FData> ds = s;
+ if (combiner != null) {
+ ds = ds.groupBy(new KeySelector<FData,FData>() {
+ @Override
+ public FData getKey ( FData value ) {
+ return new FData(((Tuple)value.data()).first());
+ }
+ }).combineGroup(new RichGroupCombineFunction<FData,FData>() {
@Override
public void combine ( Iterable<FData> values, Collector<FData> out ) throws Exception {
Bag c = null;
@@ -473,35 +464,42 @@
for ( MRData x: c )
out.collect(new FData(new Tuple(key,x)));
}
- });
- ro.setCombinable(combiner != null);
- return ro;
+ });
+ };
+ return ds.groupBy(new KeySelector<FData,FData>() {
+ @Override
+ public FData getKey ( FData value ) {
+ return new FData(((Tuple)value.data()).first());
+ }
+ }).reduceGroup(new GroupReduceFunction<FData,FData>() {
+ @Override
+ public void reduce ( final Iterable<FData> values, Collector<FData> out ) {
+ Bag s = new Bag();
+ MRData key = null;
+ for ( FData value: values ) {
+ Tuple t = (Tuple)value.data();
+ key = t.first();
+ s.add(t.second());
+ };
+ for ( MRData v: (Bag)reducer.eval(new Tuple(key,s)) )
+ out.collect(new FData(v));
+ }
+ });
}
/* group-by and sort s and then reduce by reduce_fnc (optional: use combine_fnc) */
private static DataSet<FData> sortBy ( DataSet<FData> s, Tree combine_fnc, Tree reduce_fnc ) {
final Function combiner = (combine_fnc == null) ? null : evalF(combine_fnc,null);
final Function reducer = evalF(reduce_fnc,null);
- GroupReduceOperator<Tuple2<FData,FData>,FData> ro
- = s.map(new RichMapFunction<FData,Tuple2<FData,FData>>() {
- @Override
- public Tuple2<FData,FData> map ( FData value ) {
- Tuple t = (Tuple)value.data();
- return new Tuple2<FData,FData>(new FData(t.first()),new FData(t.second()));
- }
- }).groupBy(0).sortGroup(0,Order.ASCENDING)
- .reduceGroup(new RichGroupReduceFunction<Tuple2<FData,FData>,FData>() {
- @Override
- public void reduce ( final Iterable<Tuple2<FData,FData>> values, Collector<FData> out ) {
- Bag s = new Bag();
- MRData key = null;
- for ( Tuple2<FData,FData> value: values ) {
- key = value.f0.data();
- s.add(value.f1.data());
- };
- for ( MRData v: (Bag)reducer.eval(new Tuple(key,s)) )
- out.collect(new FData(new Tuple(key,v)));
- }
+ DataSet<Tuple2<FData,FData>> ds = s.map(new RichMapFunction<FData,Tuple2<FData,FData>>() {
+ @Override
+ public Tuple2<FData,FData> map ( FData value ) {
+ Tuple t = (Tuple)value.data();
+ return new Tuple2<FData,FData>(new FData(t.first()),new FData(t.second()));
+ }
+ });
+ if (combiner != null) {
+ ds = ds.groupBy(0).combineGroup(new GroupCombineFunction<Tuple2<FData,FData>,Tuple2<FData,FData>>() {
@Override
public void combine ( Iterable<Tuple2<FData,FData>> values,
Collector<Tuple2<FData,FData>> out ) throws Exception {
@@ -516,9 +514,22 @@
for ( MRData x: c )
out.collect(new Tuple2<FData,FData>(key,new FData(x)));
}
- });
- ro.setCombinable(combiner != null);
- return ro;
+ });
+ };
+ return ds.groupBy(0).sortGroup(0,Order.ASCENDING)
+ .reduceGroup(new RichGroupReduceFunction<Tuple2<FData,FData>,FData>() {
+ @Override
+ public void reduce ( final Iterable<Tuple2<FData,FData>> values, Collector<FData> out ) {
+ Bag s = new Bag();
+ MRData key = null;
+ for ( Tuple2<FData,FData> value: values ) {
+ key = value.f0.data();
+ s.add(value.f1.data());
+ };
+ for ( MRData v: (Bag)reducer.eval(new Tuple(key,s)) )
+ out.collect(new FData(new Tuple(key,v)));
+ }
+ });
}
public static final class join_key implements KeySelector<FData,FData> {
@@ -887,6 +898,9 @@
if (x != null)
if (x instanceof MR_flink)
return ((MR_flink)x).flink();
+ else if (x instanceof MR_dataset)
+ return dataset(new Bag(((MR_dataset)x).dataset.take(Integer.MAX_VALUE)))
+ .map(new restore_global_functions());
else new Error("Variable "+v+" is of type: "+x);
throw new Error("Variable "+v+" is not bound");
};
diff --git a/pom.xml b/pom.xml
index f8235a8..546e3b5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -49,7 +49,7 @@
<hama.version>0.7.0</hama.version>
<spark.version>1.6.0</spark.version>
<scala.version>2.10</scala.version>
- <flink.version>0.10.2</flink.version>
+ <flink.version>1.0.2</flink.version>
<skipTests>true</skipTests>
</properties>