[MRQL-89] Support Flink 1.0 and fix environment serialization for Flink
diff --git a/conf/mrql-env.sh b/conf/mrql-env.sh
index 135d96c..921f30d 100644
--- a/conf/mrql-env.sh
+++ b/conf/mrql-env.sh
@@ -93,8 +93,8 @@
 SPARK_EXECUTOR_MEMORY=1G
 
 
-# Optional: Flink configuration. Supports versions 0.10.1 and 0.10.2
-FLINK_VERSION=0.10.2
+# Optional: Flink configuration. Supports version 1.0.2
+FLINK_VERSION=1.0.2
 # Flink installation directory
 FLINK_HOME=${HOME}/flink-${FLINK_VERSION}
 # number of slots per TaskManager (typically, the number of CPUs per machine)
diff --git a/core/src/main/java/org/apache/mrql/Environment.java b/core/src/main/java/org/apache/mrql/Environment.java
index d848e7c..d1f9ddc 100644
--- a/core/src/main/java/org/apache/mrql/Environment.java
+++ b/core/src/main/java/org/apache/mrql/Environment.java
@@ -27,6 +27,8 @@
     public MRData value;
     public Environment next;
 
+    Environment () {}
+
     Environment ( String n, MRData v, Environment next ) {
         name = n;
         value = v;
@@ -49,12 +51,31 @@
         return s+" ]";
     }
 
-    private void writeObject(ObjectOutputStream out) throws IOException {
-        out.defaultWriteObject();
+    final static MRData one = new MR_byte(1);
+
+    private void writeObject ( ObjectOutputStream out ) throws IOException {
+        if (value == null || value instanceof Lambda) {
+            out.writeUTF("");
+            one.write(out);
+        } else {
+            out.writeUTF(name);
+            value.write(out);
+        };
+        if (next == null)
+            out.writeByte(0);
+        else {
+            out.writeByte(1);
+            next.writeObject(out);
+        }
     }
 
-    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-        in.defaultReadObject();
+    private void readObject ( ObjectInputStream in ) throws IOException, ClassNotFoundException {
+        name = in.readUTF();
         name = Tree.add(name);
+        value = MRContainer.read(in);
+        if ( in.readByte() > 0 ) {
+            next = new Environment();
+            next.readObject(in);
+        } else next = null;
     }
 }
diff --git a/core/src/main/java/org/apache/mrql/Interpreter.gen b/core/src/main/java/org/apache/mrql/Interpreter.gen
index 9d99e8b..054963c 100644
--- a/core/src/main/java/org/apache/mrql/Interpreter.gen
+++ b/core/src/main/java/org/apache/mrql/Interpreter.gen
@@ -593,7 +593,7 @@
                 if (s != null)
                     try {
                         Tree ft = Tree.parse(s);
-                        TopLevel.store(f.toString(),ft);
+                        //TopLevel.store(f.toString(),ft);
                         fnc = evalE(ft,env);
                         new_global_binding(f.toString(),fnc);
                     } catch (Exception ex) {
diff --git a/core/src/main/java/org/apache/mrql/MR_dataset.java b/core/src/main/java/org/apache/mrql/MR_dataset.java
index abc81cd..6515b1d 100644
--- a/core/src/main/java/org/apache/mrql/MR_dataset.java
+++ b/core/src/main/java/org/apache/mrql/MR_dataset.java
@@ -35,7 +35,7 @@
     public DataSet dataset () { return dataset; }
 
     public void write ( DataOutput out ) throws IOException {
-        throw new Error("DataSets are not serializable");
+        new Bag(dataset.take(Integer.MAX_VALUE)).write(out);
     }
 
     public void readFields ( DataInput in ) throws IOException {
diff --git a/core/src/main/java/org/apache/mrql/TopLevel.gen b/core/src/main/java/org/apache/mrql/TopLevel.gen
index 5838ed5..13f4d68 100644
--- a/core/src/main/java/org/apache/mrql/TopLevel.gen
+++ b/core/src/main/java/org/apache/mrql/TopLevel.gen
@@ -57,6 +57,7 @@
     public static MRData expression ( Tree e, boolean print ) {
         try {
             Tree plan = translate_expression(e);
+            Tree type = query_type;
             query_plan = plan;
             tab_count = -3;
             trace_count = 0;
@@ -69,8 +70,8 @@
                 if (!Config.quiet_execution)
                     System.out.println("Result:");
                 if (!Config.hadoop_mode && Config.bsp_mode && memory_parsed_source(plan))
-                    System.out.println(print(((Tuple)((Bag)res).get(0)).second(),query_type));
-                else System.out.println(print(res,query_type));
+                    System.out.println(print(((Tuple)((Bag)res).get(0)).second(),type));
+                else System.out.println(print(res,type));
             } return res;
         } catch (Exception x) {
             if (x.getMessage() != null)
diff --git a/flink/pom.xml b/flink/pom.xml
index 4be03d6..3ef5b00 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -50,12 +50,12 @@
     </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-clients</artifactId>
+      <artifactId>flink-clients_2.10</artifactId>
       <version>${flink.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.flink</groupId>
-      <artifactId>flink-streaming-java</artifactId>
+      <artifactId>flink-streaming-java_2.10</artifactId>
       <version>${flink.version}</version>
     </dependency>
     <dependency>
diff --git a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
index eb5098a..ba5cf1b 100644
--- a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
+++ b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
@@ -99,7 +99,8 @@
 			ContextEnvironment ce = (ContextEnvironment)ExecutionEnvironment.getExecutionEnvironment();
 			List<URL> jars = Arrays.asList(flink_jar,new URL("file://"+Plan.conf.get("mrql.jar.path")));
 			flink_env = new ContextEnvironment(ce.getClient(),jars,new ArrayList<URL>(),
-                                                           FlinkEvaluator.class.getClassLoader(),true);
+                                                           FlinkEvaluator.class.getClassLoader(),
+                                                           Plan.new_path(Plan.conf));
 			flink_env.setParallelism(Config.nodes);
 		    } else flink_env = ExecutionEnvironment.createRemoteEnvironment(master_host,master_port,
 					     Config.nodes,flink_jar.toURI().getPath(),Plan.conf.get("mrql.jar.path"));
@@ -442,24 +443,14 @@
     private static DataSet<FData> groupBy ( DataSet<FData> s, Tree combine_fnc, Tree reduce_fnc ) {
         final Function combiner = (combine_fnc == null) ? null : evalF(combine_fnc,null);
         final Function reducer = evalF(reduce_fnc,null);
-        GroupReduceOperator<FData,FData> ro = s.groupBy(new KeySelector<FData,FData>() {
-                @Override
-                public FData getKey ( FData value ) {
-                    return new FData(((Tuple)value.data()).first());
-                }
-            }).reduceGroup(new RichGroupReduceFunction<FData,FData>() {
-                @Override
-                public void reduce ( final Iterable<FData> values, Collector<FData> out ) {
-                    Bag s = new Bag();
-                    MRData key = null;
-                    for ( FData value: values ) {
-                        Tuple t = (Tuple)value.data();
-                        key = t.first();
-                        s.add(t.second());
-                    };
-                    for ( MRData v: (Bag)reducer.eval(new Tuple(key,s)) )
-                        out.collect(new FData(v));
-                }
+        DataSet<FData> ds = s;
+        if (combiner != null) {
+            ds = ds.groupBy(new KeySelector<FData,FData>() {
+                    @Override
+                    public FData getKey ( FData value ) {
+                        return new FData(((Tuple)value.data()).first());
+                    }
+            }).combineGroup(new RichGroupCombineFunction<FData,FData>() {
                 @Override
                 public void combine ( Iterable<FData> values, Collector<FData> out ) throws Exception {
                     Bag c = null;
@@ -473,35 +464,42 @@
                     for ( MRData x: c )
                         out.collect(new FData(new Tuple(key,x)));
                 }
-            });
-        ro.setCombinable(combiner != null);
-        return ro;
+           });
+        };
+        return ds.groupBy(new KeySelector<FData,FData>() {
+                @Override
+                public FData getKey ( FData value ) {
+                    return new FData(((Tuple)value.data()).first());
+                }
+            }).reduceGroup(new GroupReduceFunction<FData,FData>() {
+                @Override
+                public void reduce ( final Iterable<FData> values, Collector<FData> out ) {
+                    Bag s = new Bag();
+                    MRData key = null;
+                    for ( FData value: values ) {
+                        Tuple t = (Tuple)value.data();
+                        key = t.first();
+                        s.add(t.second());
+                    };
+                    for ( MRData v: (Bag)reducer.eval(new Tuple(key,s)) )
+                        out.collect(new FData(v));
+                }
+           });
     }
 
     /* group-by and sort s and then reduce by reduce_fnc (optional: use combine_fnc) */
     private static DataSet<FData> sortBy ( DataSet<FData> s, Tree combine_fnc, Tree reduce_fnc ) {
         final Function combiner = (combine_fnc == null) ? null : evalF(combine_fnc,null);
         final Function reducer = evalF(reduce_fnc,null);
-        GroupReduceOperator<Tuple2<FData,FData>,FData> ro
-            = s.map(new RichMapFunction<FData,Tuple2<FData,FData>>() {
-                    @Override
-                    public Tuple2<FData,FData> map ( FData value ) {
-                        Tuple t = (Tuple)value.data();
-                        return new Tuple2<FData,FData>(new FData(t.first()),new FData(t.second()));
-                    }
-                }).groupBy(0).sortGroup(0,Order.ASCENDING)
-              .reduceGroup(new RichGroupReduceFunction<Tuple2<FData,FData>,FData>() {
-                    @Override
-                    public void reduce ( final Iterable<Tuple2<FData,FData>> values, Collector<FData> out ) {
-                        Bag s = new Bag();
-                        MRData key = null;
-                        for ( Tuple2<FData,FData> value: values ) {
-                            key = value.f0.data();
-                            s.add(value.f1.data());
-                        };
-                        for ( MRData v: (Bag)reducer.eval(new Tuple(key,s)) )
-                            out.collect(new FData(new Tuple(key,v)));
-                    }
+        DataSet<Tuple2<FData,FData>> ds = s.map(new RichMapFunction<FData,Tuple2<FData,FData>>() {
+                @Override
+                public Tuple2<FData,FData> map ( FData value ) {
+                    Tuple t = (Tuple)value.data();
+                    return new Tuple2<FData,FData>(new FData(t.first()),new FData(t.second()));
+                }
+            });
+        if (combiner != null) {
+            ds = ds.groupBy(0).combineGroup(new GroupCombineFunction<Tuple2<FData,FData>,Tuple2<FData,FData>>() {
                     @Override
                     public void combine ( Iterable<Tuple2<FData,FData>> values,
                                           Collector<Tuple2<FData,FData>> out ) throws Exception {
@@ -516,9 +514,22 @@
                         for ( MRData x: c )
                             out.collect(new Tuple2<FData,FData>(key,new FData(x)));
                     }
-                  });
-        ro.setCombinable(combiner != null);
-        return ro;
+                });
+        };
+        return ds.groupBy(0).sortGroup(0,Order.ASCENDING)
+            .reduceGroup(new RichGroupReduceFunction<Tuple2<FData,FData>,FData>() {
+                    @Override
+                    public void reduce ( final Iterable<Tuple2<FData,FData>> values, Collector<FData> out ) {
+                        Bag s = new Bag();
+                        MRData key = null;
+                        for ( Tuple2<FData,FData> value: values ) {
+                            key = value.f0.data();
+                            s.add(value.f1.data());
+                        };
+                        for ( MRData v: (Bag)reducer.eval(new Tuple(key,s)) )
+                            out.collect(new FData(new Tuple(key,v)));
+                    }
+               });
     }
 
     public static final class join_key implements KeySelector<FData,FData> {
@@ -887,6 +898,9 @@
                 if (x != null)
                     if (x instanceof MR_flink)
                         return ((MR_flink)x).flink();
+                    else if (x instanceof MR_dataset)
+                        return dataset(new Bag(((MR_dataset)x).dataset.take(Integer.MAX_VALUE)))
+			    .map(new restore_global_functions());
                     else new Error("Variable "+v+" is of type: "+x);
                 throw new Error("Variable "+v+" is not bound");
             };
diff --git a/pom.xml b/pom.xml
index f8235a8..546e3b5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -49,7 +49,7 @@
     <hama.version>0.7.0</hama.version>
     <spark.version>1.6.0</spark.version>
     <scala.version>2.10</scala.version>
-    <flink.version>0.10.2</flink.version>
+    <flink.version>1.0.2</flink.version>
     <skipTests>true</skipTests>
   </properties>