[MRQL-95] Add support for Spark 2.0
diff --git a/bin/mrql.spark b/bin/mrql.spark
index a422e18..239182a 100755
--- a/bin/mrql.spark
+++ b/bin/mrql.spark
@@ -28,7 +28,7 @@
 
 . "$MRQL_HOME/conf/mrql-env.sh"
 
-if [[ !(-f ${SPARK_JARS}) ]]; then
+if [[ -z ${SPARK_JARS} ]]; then
    echo "*** Cannot find the Spark jar file. Need to edit mrql-env.sh"; exit -1
 fi
 
diff --git a/conf/mrql-env.sh b/conf/mrql-env.sh
index ed891e1..aefd589 100644
--- a/conf/mrql-env.sh
+++ b/conf/mrql-env.sh
@@ -78,10 +78,10 @@
 BSP_SPLIT_INPUT=
 
 
-# Optional: Spark configuration. Supports versions 1.0.0, 1.0.2, 1.1.0, 1.1.1, 1.2.0, 1.3.0, 1.3.1, 1.6.0, and 1.6.2
-# (Spark versions 0.8.1, 0.9.0, and 0.9.1 are supported by MRQL 0.9.0)
+# Optional: Spark configuration. Supports versions 1.*, and 2.0.0
+# For Spark 2.*, use: mvn -Dspark2
+# Spark versions 0.8.1, 0.9.0, and 0.9.1 are supported by MRQL 0.9.0 only.
 # You may use the Spark prebuilts bin-hadoop1 or bin-hadoop2 (Yarn)
-# For distributed mode, give write permission to /tmp: hadoop fs -chmod -R 777 /tmp
 # Tested in local, standalone deploy, and Yarn modes
 SPARK_HOME=${HOME}/spark-1.6.2-bin-hadoop2.6
 # URI of the Spark master node:
@@ -123,10 +123,13 @@
    SPARK_JARS=`ls ${SPARK_HOME}/assembly/target/scala-*/*.jar`
 else if [[ -d ${SPARK_HOME}/lib ]]; then
    SPARK_JARS=`ls ${SPARK_HOME}/lib/spark-assembly-*.jar`
+else if [[ -d ${SPARK_HOME}/jars ]]; then
+   SPARK_JARS=`ls ${SPARK_HOME}/jars/spark-core*.jar`
+fi
 fi
 fi
 
-HADOOP_JARS=`$HADOOP_HOME/bin/hadoop classpath`
+HADOOP_JARS=`${HADOOP_HOME}/bin/hadoop classpath`
 
 if [[ !(-f ${CUP_JAR}) ]]; then
    echo "*** Cannot find the parser generator CUP jar file. Need to edit mrql-env.sh"; exit -1
diff --git a/core/src/main/java/org/apache/mrql/Compiler.gen b/core/src/main/java/org/apache/mrql/Compiler.gen
index 5dbaf24..51f4534 100644
--- a/core/src/main/java/org/apache/mrql/Compiler.gen
+++ b/core/src/main/java/org/apache/mrql/Compiler.gen
@@ -164,7 +164,7 @@
             compiled_lambdas = #[ ];
             user_functions_num = lambda_num++;
             // remove the old jar
-            if (jar_path != null)
+            if (false && jar_path != null) // Spark 2.* needs the old jar
                 remove(new File(jar_path));
             jar_path = tmp_dir+"/mrql_args_"+(new Random().nextInt(1000000))+".jar";
             out = new StringBuffer(1000);
diff --git a/spark/pom.xml b/spark/pom.xml
index 614e707..e53aef9 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -152,4 +152,69 @@
       </plugin>
     </plugins>
   </build>
+
+  <profiles>
+    <profile>
+      <id>spark1</id>
+      <activation>
+        <property>
+          <name>!spark2</name>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>1.5</version>
+            <executions>
+              <execution>
+                <id>add-sources</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>spark1</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>spark2</id>
+      <activation>
+        <property>
+          <name>spark2</name>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>1.5</version>
+            <executions>
+              <execution>
+                <id>add-sources</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>spark2</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
 </project>
diff --git a/spark/spark1/Spark1.java b/spark/spark1/Spark1.java
new file mode 100644
index 0000000..efa719b
--- /dev/null
+++ b/spark/spark1/Spark1.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.util.Iterator;
+import scala.Tuple2;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+
+
+abstract class FmFunction<T,R> implements FlatMapFunction<T,R> {
+    abstract Iterator<R> eval ( T t ) throws Exception;
+
+    public Iterable<R> call ( final T t ) throws Exception {
+        return new Iterable<R>() {
+            public Iterator<R> iterator() {
+                try {
+                    return eval(t);
+                } catch (Exception ex) {
+                    throw new Error(ex);
+                }
+            }
+        };
+    }
+}
+
+abstract class PairFmFunction<T,K,V> implements PairFlatMapFunction<T,K,V> {
+    abstract Iterator<Tuple2<K,V>> eval ( T t ) throws Exception;
+
+    public Iterable<Tuple2<K,V>> call ( final T t ) throws Exception {
+        return new Iterable<Tuple2<K,V>>() {
+            public Iterator<Tuple2<K,V>> iterator() {
+                try {
+                    return eval(t);
+                } catch (Exception ex) {
+                    throw new Error(ex);
+                }
+            }
+        };
+    }
+}
diff --git a/spark/spark2/Spark2.java b/spark/spark2/Spark2.java
new file mode 100644
index 0000000..82793bd
--- /dev/null
+++ b/spark/spark2/Spark2.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.util.Iterator;
+import scala.Tuple2;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+
+
+abstract class FmFunction<T,R> implements FlatMapFunction<T,R> {
+    abstract Iterator<R> eval ( T t ) throws Exception;
+
+    public Iterator<R> call ( final T t ) throws Exception {
+        return eval(t);
+    }
+}
+
+abstract class PairFmFunction<T,K,V> implements PairFlatMapFunction<T,K,V> {
+    abstract Iterator<Tuple2<K,V>> eval ( T t ) throws Exception;
+
+    public Iterator<Tuple2<K,V>> call ( final T t ) throws Exception {
+        return eval(t);
+    }
+}
diff --git a/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen b/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
index 3ded60e..83de421 100644
--- a/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
+++ b/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
@@ -41,8 +41,6 @@
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.Function3;
 import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
@@ -388,13 +386,13 @@
     }
 
     /* convert an MRQL lambda to a Spark Function */
-    private static FlatMapFunction<MRData,MRData> cmap_fnc ( Tree fnc, Environment env ) {
+    private static FmFunction<MRData,MRData> cmap_fnc ( Tree fnc, Environment env ) {
         final Broadcast<Environment> broadcast_env = spark_context.broadcast(global_env);
         final org.apache.mrql.Function f = evalF(fnc,env);
-        return new FlatMapFunction<MRData,MRData>() {
-            public Iterable<MRData> call ( MRData value ) {
+        return new FmFunction<MRData,MRData>() {
+            public Iterator<MRData> eval ( MRData value ) {
                 global_env = broadcast_env.value();
-                return (Bag)f.eval(value);
+                return ((Bag)f.eval(value)).iterator();
             }
         };
     }
@@ -443,26 +441,22 @@
             });
     }
 
-    private static Iterable<Tuple2<MRData,MRData>> joinIterator ( final Iterator<MRData> i ) {
-        return new Iterable<Tuple2<MRData,MRData>>() {
-            public Iterator<Tuple2<MRData,MRData>> iterator() {
-                return new Iterator<Tuple2<MRData,MRData>> () {
-                    public Tuple2<MRData,MRData> next () {
-                        Tuple data = (Tuple)i.next();
-                        return new Tuple2<MRData,MRData>(data.first(),data.second());
-                    }
-                    public boolean hasNext () {
-                        return i.hasNext();
-                    }
-                    public void remove () {}
-                };
+    private static Iterator<Tuple2<MRData,MRData>> joinIterator ( final Iterator<MRData> i ) {
+        return new Iterator<Tuple2<MRData,MRData>> () {
+            public Tuple2<MRData,MRData> next () {
+                Tuple data = (Tuple)i.next();
+                return new Tuple2<MRData,MRData>(data.first(),data.second());
             }
+            public boolean hasNext () {
+                return i.hasNext();
+            }
+            public void remove () {}
         };
     }
 
-    private static FlatMapFunction<Iterator<MRData>,MRData> combiner_fnc ( final org.apache.mrql.Function f ) {
-        return new FlatMapFunction<Iterator<MRData>,MRData>() {
-                  public Iterable<MRData> call ( final Iterator<MRData> i ) {
+    private static FmFunction<Iterator<MRData>,MRData> combiner_fnc ( final org.apache.mrql.Function f ) {
+        return new FmFunction<Iterator<MRData>,MRData>() {
+                  public Iterator<MRData> eval ( final Iterator<MRData> i ) {
                       return MapReduceAlgebra.cmap(new org.apache.mrql.Function() {
                               public MRData eval ( MRData x ) {
                                   final MRData key = ((Tuple)x).first();
@@ -483,7 +477,7 @@
                                   public boolean hasNext () {
                                       return i.hasNext();
                                   }
-                              })));
+                              }))).iterator();
                   }
         };
     }
@@ -519,9 +513,9 @@
                 return eval(x,env,rdd_env)
                     .flatMap(cmap_fnc(mx,env))
                     .cartesian(eval(y,env,rdd_env).flatMap(cmap_fnc(my,env)))
-                    .flatMap(new FlatMapFunction<Tuple2<MRData,MRData>,MRData>() {
-                            public Iterable<MRData> call ( Tuple2<MRData,MRData> value ) {
-                                return (Bag)fr.eval(new Tuple(value._1,value._2));
+                    .flatMap(new FmFunction<Tuple2<MRData,MRData>,MRData>() {
+                            public Iterator<MRData> eval ( Tuple2<MRData,MRData> value ) {
+                                return ((Bag)fr.eval(new Tuple(value._1,value._2))).iterator();
                             }
                         });
             case MapReduce2(`mx,`my,`r,`x,`y,`o):
@@ -544,8 +538,8 @@
                                 return new Tuple2(t.first(),t.second());
                             }
                         })
-                    : eval(x,env,rdd_env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
-                            public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
+                    : eval(x,env,rdd_env).flatMapToPair(new PairFmFunction<MRData,MRData,MRData>() {
+                            public Iterator<Tuple2<MRData,MRData>> eval ( MRData value ) {
                                 return joinIterator(((Bag)fx.eval(value)).iterator());
                             }
                         });
@@ -567,15 +561,15 @@
                                 return new Tuple2(t.first(),t.second());
                             }
                         })
-                    : eval(y,env,rdd_env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
-                            public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
+                    : eval(y,env,rdd_env).flatMapToPair(new PairFmFunction<MRData,MRData,MRData>() {
+                            public Iterator<Tuple2<MRData,MRData>> eval ( MRData value ) {
                                 return joinIterator(((Bag)fy.eval(value)).iterator());
                             }
                         });
 		return xs.cogroup(ys)
-                    .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
-                            public Iterable<MRData> call ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
-                                return (Bag)fr.eval(new Tuple(bag(value._2._1),bag(value._2._2)));
+                    .flatMap(new FmFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
+                            public Iterator<MRData> eval ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
+                                return ((Bag)fr.eval(new Tuple(bag(value._2._1),bag(value._2._2)))).iterator();
                             }
                         });
             case GroupByJoin(`kx,`ky,`gx,`gy,`acc,`zero,`r,`x,`y,`o):
@@ -592,50 +586,42 @@
                 final MRData one = new MR_byte(1);
                 final MRData two = new MR_byte(2);
                 final JavaPairRDD<MRData,MRData> xs
-                    = eval(x,env,rdd_env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
-                            public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
-                                return new Iterable<Tuple2<MRData,MRData>>() {
-                                    public Iterator<Tuple2<MRData,MRData>> iterator() {
-                                        return new Iterator<Tuple2<MRData,MRData>>() {
-                                            int i = 0;
-                                            public Tuple2<MRData,MRData> next () {
-                                                MRData key = new MR_int((fgx.eval(value).hashCode() % m)+m*i);
-                                                i++;
-                                                return new Tuple2<MRData,MRData>(key,new Tuple(one,value));
-                                            }
-                                            public boolean hasNext () {
-                                                return i < n;
-                                            }
-                                            public void remove () {}
-                                        };
+                    = eval(x,env,rdd_env).flatMapToPair(new PairFmFunction<MRData,MRData,MRData>() {
+                            public Iterator<Tuple2<MRData,MRData>> eval ( final MRData value ) {
+                                return new Iterator<Tuple2<MRData,MRData>>() {
+                                    int i = 0;
+                                    public Tuple2<MRData,MRData> next () {
+                                        MRData key = new MR_int((fgx.eval(value).hashCode() % m)+m*i);
+                                        i++;
+                                        return new Tuple2<MRData,MRData>(key,new Tuple(one,value));
                                     }
+                                    public boolean hasNext () {
+                                        return i < n;
+                                    }
+                                    public void remove () {}
                                 };
                             }
                         });
                 final JavaPairRDD<MRData,MRData> ys
-                    = eval(y,env,rdd_env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
-                            public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
-                                return new Iterable<Tuple2<MRData,MRData>>() {
-                                    public Iterator<Tuple2<MRData,MRData>> iterator() {
-                                        return new Iterator<Tuple2<MRData,MRData>>() {
-                                            int j = 0;
-                                            public Tuple2<MRData,MRData> next () {
-                                                MRData key = new MR_int((fgy.eval(value).hashCode() % n)*m+j);
-                                                j++;
-                                                return new Tuple2<MRData,MRData>(key,new Tuple(two,value));
-                                            }
-                                            public boolean hasNext () {
-                                                return j < m;
-                                            }
-                                            public void remove () {}
-                                        };
+                    = eval(y,env,rdd_env).flatMapToPair(new PairFmFunction<MRData,MRData,MRData>() {
+                            public Iterator<Tuple2<MRData,MRData>> eval ( final MRData value ) {
+                                return new Iterator<Tuple2<MRData,MRData>>() {
+                                    int j = 0;
+                                    public Tuple2<MRData,MRData> next () {
+                                        MRData key = new MR_int((fgy.eval(value).hashCode() % n)*m+j);
+                                        j++;
+                                        return new Tuple2<MRData,MRData>(key,new Tuple(two,value));
                                     }
+                                    public boolean hasNext () {
+                                        return j < m;
+                                    }
+                                    public void remove () {}
                                 };
                             }
                         });
                 return xs.union(ys).groupByKey()
-                    .mapPartitions(new FlatMapFunction<Iterator<Tuple2<MRData,Iterable<MRData>>>,MRData>() {
-                        public Iterable<MRData> call ( final Iterator<Tuple2<MRData,Iterable<MRData>>> value ) {
+                    .mapPartitions(new FmFunction<Iterator<Tuple2<MRData,Iterable<MRData>>>,MRData>() {
+                        public Iterator<MRData> eval ( final Iterator<Tuple2<MRData,Iterable<MRData>>> value ) {
                             Bag xb = new Bag();
                             Bag yb = new Bag();
                             while (value.hasNext()) {
@@ -649,18 +635,14 @@
                             };
                             final Bag b = MapReduceAlgebra.mergeGroupByJoin(fkx,fky,fgx,fgy,fc,z,fr,xb,yb);
                             xb = null; yb = null;
-                            return new Iterable<MRData>() {
-                                public Iterator<MRData> iterator() {
-                                    return b.iterator();
-                                }
-                            };
+                            return b.iterator();
                         }
                         });                
             case OuterMerge(`merge,`state,`data):
                 final org.apache.mrql.Function fm = evalF(merge,env);
                 JavaPairRDD<MRData,MRData> S = eval(state,env,rdd_env)
-                    .mapPartitionsToPair(new PairFlatMapFunction<Iterator<MRData>,MRData,MRData>() {
-                            public Iterable<Tuple2<MRData,MRData>> call ( final Iterator<MRData> i ) {
+                    .mapPartitionsToPair(new PairFmFunction<Iterator<MRData>,MRData,MRData>() {
+                            public Iterator<Tuple2<MRData,MRData>> eval ( final Iterator<MRData> i ) {
                                 return joinIterator(i);
                             }
                         },true);   // do not repartition the state S
@@ -672,8 +654,8 @@
                             }
                         });
                 JavaRDD<MRData> res = S.cogroup(ds)
-                    .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
-                            public Iterable<MRData> call ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
+                    .flatMap(new FmFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
+                            public Iterator<MRData> eval ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
                                 final Iterator<MRData> ix = value._2._1.iterator();
                                 final Iterator<MRData> iy = value._2._2.iterator();
                                 ArrayList<MRData> a = new ArrayList<MRData>();
@@ -683,7 +665,7 @@
                                     else a.add(new Tuple(value._1,ix.next()));
                                 else if (iy.hasNext())
                                     a.add(new Tuple(value._1,iy.next()));
-                                return a;
+                                return a.iterator();
                             }
                         }).cache();
                 cached_rdds.add(res);
@@ -691,8 +673,8 @@
             case RightOuterMerge(`merge,`state,`data):
                 final org.apache.mrql.Function fm = evalF(merge,env);
                 JavaPairRDD<MRData,MRData> S = eval(state,env,rdd_env)
-                    .mapPartitionsToPair(new PairFlatMapFunction<Iterator<MRData>,MRData,MRData>() {
-                            public Iterable<Tuple2<MRData,MRData>> call ( final Iterator<MRData> i ) {
+                    .mapPartitionsToPair(new PairFmFunction<Iterator<MRData>,MRData,MRData>() {
+                            public Iterator<Tuple2<MRData,MRData>> eval ( final Iterator<MRData> i ) {
                                 return joinIterator(i);
                             }
                         },true);   // do not repartition the state S
@@ -704,17 +686,17 @@
                             }
                         });
                 JavaRDD<MRData> res = S.cogroup(ds)
-                    .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
-                            public Iterable<MRData> call ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
+                    .flatMap(new FmFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
+                            public Iterator<MRData> eval ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
                                 final Iterator<MRData> ix = value._2._1.iterator();
                                 final Iterator<MRData> iy = value._2._2.iterator();
-                                return (ix.hasNext())
+                                return ((ix.hasNext())
                                     ? ((iy.hasNext())
                                        ? new Bag(new Tuple(value._1,fm.eval(new Tuple(ix.next(),iy.next()))))
                                        : new Bag())
                                     : ((iy.hasNext())
                                        ? new Bag(new Tuple(value._1,iy.next()))
-                                       : new Bag());
+                                       : new Bag())).iterator();
                             }
                         }).cache();
                 cached_rdds.add(res);
@@ -731,23 +713,19 @@
                                 return new Tuple2(t.first(),t.second());
                             }
                         }).collectAsMap());
-                return eval(x,env,rdd_env).mapPartitions(new FlatMapFunction<Iterator<MRData>,MRData>() {
-                        public Iterable<MRData> call ( final Iterator<MRData> i ) {
+                return eval(x,env,rdd_env).mapPartitions(new FmFunction<Iterator<MRData>,MRData>() {
+                        public Iterator<MRData> eval ( final Iterator<MRData> i ) {
                             final Map<MRData,MRData> m = ys.value();
-                            return new Iterable<MRData>() {
-                                public Iterator<MRData> iterator() {
-                                    return new Iterator<MRData> () {
-                                        public MRData next () {
-                                            final Tuple p = (Tuple)fx.eval(i.next());
-                                            final MRData pd = m.get(p.first());
-                                            return ((Bag)fr.eval(new Tuple(p.second(),(pd == null) ? empty_bag : pd)));
-                                        }
-                                        public boolean hasNext () {
-                                            return i.hasNext();
-                                        }
-                                        public void remove () {}
-                                    };
+                            return new Iterator<MRData> () {
+                                public MRData next () {
+                                    final Tuple p = (Tuple)fx.eval(i.next());
+                                    final MRData pd = m.get(p.first());
+                                    return ((Bag)fr.eval(new Tuple(p.second(),(pd == null) ? empty_bag : pd)));
                                 }
+                                public boolean hasNext () {
+                                    return i.hasNext();
+                                }
+                                public void remove () {}
                             };
                         }
                     },true);
@@ -757,39 +735,35 @@
                 final org.apache.mrql.Function fr = evalF(r,env);
                 final Broadcast<Map<MRData,MRData>> ys
                     = spark_context.broadcast(eval(y,env,rdd_env)
-                         .flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
-                            public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
+                         .flatMapToPair(new PairFmFunction<MRData,MRData,MRData>() {
+                            public Iterator<Tuple2<MRData,MRData>> eval ( MRData value ) {
                                 return joinIterator(((Bag)fy.eval(value)).iterator());
                             }
                         }).collectAsMap());
-                return eval(x,env,rdd_env).flatMap(new FlatMapFunction<MRData,MRData>() {
-                        public Iterable<MRData> call ( MRData value ) {
+                return eval(x,env,rdd_env).flatMap(new FmFunction<MRData,MRData>() {
+                        public Iterator<MRData> eval ( MRData value ) {
                             final Map<MRData,MRData> m = ys.value();
                             final Iterator<MRData> i = ((Bag)fx.eval(value)).iterator();
-                            return new Iterable<MRData>() {
-                                public Iterator<MRData> iterator() {
-                                    return new Iterator<MRData>() {
-                                        Tuple p;
-                                        Iterator<MRData> ix = null;
-                                        public MRData next () {
-                                            return ix.next();
-                                        }
-                                        public boolean hasNext () {
-                                            if (ix != null && ix.hasNext())
-                                                return true;
-                                            while (i.hasNext()) {
-                                                p = (Tuple)i.next();
-                                                MRData pd = m.get(p.first());
-                                                Bag bb = ((Bag)fr.eval(new Tuple(p.second(),(pd == null) ? empty_bag : pd)));
-                                                ix = bb.iterator();
-                                                if (ix.hasNext())
-                                                    return true;
-                                            };
-                                            return false;
-                                        }
-                                        public void remove () {}
-                                    };
+                            return new Iterator<MRData>() {
+                                Tuple p;
+                                Iterator<MRData> ix = null;
+                                public MRData next () {
+                                    return ix.next();
                                 }
+                                public boolean hasNext () {
+                                    if (ix != null && ix.hasNext())
+                                        return true;
+                                    while (i.hasNext()) {
+                                        p = (Tuple)i.next();
+                                        MRData pd = m.get(p.first());
+                                        Bag bb = ((Bag)fr.eval(new Tuple(p.second(),(pd == null) ? empty_bag : pd)));
+                                        ix = bb.iterator();
+                                        if (ix.hasNext())
+                                            return true;
+                                    };
+                                    return false;
+                                }
+                                public void remove () {}
                             };
                         }
                     });
diff --git a/spark/src/main/java/org/apache/mrql/SparkStreaming.gen b/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
index 1ae28b1..2b74b3e 100644
--- a/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
+++ b/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
@@ -159,7 +159,11 @@
             plan = get_streams(plan,env);
             stream_processing(plan,env,dataset_env,f);
         };
-        stream_context.start();
-        stream_context.awaitTermination();
+        try {
+            stream_context.start();
+            stream_context.awaitTermination();
+        } catch (Exception ex) {
+            throw new Error(ex);
+        }
     }
 }