[MRQL-95] Add support for Spark 2.0
diff --git a/bin/mrql.spark b/bin/mrql.spark
index a422e18..239182a 100755
--- a/bin/mrql.spark
+++ b/bin/mrql.spark
@@ -28,7 +28,7 @@
. "$MRQL_HOME/conf/mrql-env.sh"
-if [[ !(-f ${SPARK_JARS}) ]]; then
+if [[ -z ${SPARK_JARS} ]]; then
echo "*** Cannot find the Spark jar file. Need to edit mrql-env.sh"; exit -1
fi
diff --git a/conf/mrql-env.sh b/conf/mrql-env.sh
index ed891e1..aefd589 100644
--- a/conf/mrql-env.sh
+++ b/conf/mrql-env.sh
@@ -78,10 +78,10 @@
BSP_SPLIT_INPUT=
-# Optional: Spark configuration. Supports versions 1.0.0, 1.0.2, 1.1.0, 1.1.1, 1.2.0, 1.3.0, 1.3.1, 1.6.0, and 1.6.2
-# (Spark versions 0.8.1, 0.9.0, and 0.9.1 are supported by MRQL 0.9.0)
+# Optional: Spark configuration. Supports versions 1.*, and 2.0.0
+# For Spark 2.*, use: mvn -Dspark2
+# Spark versions 0.8.1, 0.9.0, and 0.9.1 are supported by MRQL 0.9.0 only.
# You may use the Spark prebuilts bin-hadoop1 or bin-hadoop2 (Yarn)
-# For distributed mode, give write permission to /tmp: hadoop fs -chmod -R 777 /tmp
# Tested in local, standalone deploy, and Yarn modes
SPARK_HOME=${HOME}/spark-1.6.2-bin-hadoop2.6
# URI of the Spark master node:
@@ -123,10 +123,13 @@
SPARK_JARS=`ls ${SPARK_HOME}/assembly/target/scala-*/*.jar`
else if [[ -d ${SPARK_HOME}/lib ]]; then
SPARK_JARS=`ls ${SPARK_HOME}/lib/spark-assembly-*.jar`
+else if [[ -d ${SPARK_HOME}/jars ]]; then
+ SPARK_JARS=`ls ${SPARK_HOME}/jars/spark-core*.jar`
+fi
fi
fi
-HADOOP_JARS=`$HADOOP_HOME/bin/hadoop classpath`
+HADOOP_JARS=`${HADOOP_HOME}/bin/hadoop classpath`
if [[ !(-f ${CUP_JAR}) ]]; then
echo "*** Cannot find the parser generator CUP jar file. Need to edit mrql-env.sh"; exit -1
diff --git a/core/src/main/java/org/apache/mrql/Compiler.gen b/core/src/main/java/org/apache/mrql/Compiler.gen
index 5dbaf24..51f4534 100644
--- a/core/src/main/java/org/apache/mrql/Compiler.gen
+++ b/core/src/main/java/org/apache/mrql/Compiler.gen
@@ -164,7 +164,7 @@
compiled_lambdas = #[ ];
user_functions_num = lambda_num++;
// remove the old jar
- if (jar_path != null)
+ if (false && jar_path != null) // Spark 2.* needs the old jar
remove(new File(jar_path));
jar_path = tmp_dir+"/mrql_args_"+(new Random().nextInt(1000000))+".jar";
out = new StringBuffer(1000);
diff --git a/spark/pom.xml b/spark/pom.xml
index 614e707..e53aef9 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -152,4 +152,69 @@
</plugin>
</plugins>
</build>
+
+ <profiles>
+ <profile>
+ <id>spark1</id>
+ <activation>
+ <property>
+ <name>!spark2</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>add-sources</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>spark1</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>spark2</id>
+ <activation>
+ <property>
+ <name>spark2</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>add-sources</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>spark2</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
</project>
diff --git a/spark/spark1/Spark1.java b/spark/spark1/Spark1.java
new file mode 100644
index 0000000..efa719b
--- /dev/null
+++ b/spark/spark1/Spark1.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.util.Iterator;
+import scala.Tuple2;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+
+
+abstract class FmFunction<T,R> implements FlatMapFunction<T,R> {
+ abstract Iterator<R> eval ( T t ) throws Exception;
+
+ public Iterable<R> call ( final T t ) throws Exception {
+ return new Iterable<R>() {
+ public Iterator<R> iterator() {
+ try {
+ return eval(t);
+ } catch (Exception ex) {
+ throw new Error(ex);
+ }
+ }
+ };
+ }
+}
+
+abstract class PairFmFunction<T,K,V> implements PairFlatMapFunction<T,K,V> {
+ abstract Iterator<Tuple2<K,V>> eval ( T t ) throws Exception;
+
+ public Iterable<Tuple2<K,V>> call ( final T t ) throws Exception {
+ return new Iterable<Tuple2<K,V>>() {
+ public Iterator<Tuple2<K,V>> iterator() {
+ try {
+ return eval(t);
+ } catch (Exception ex) {
+ throw new Error(ex);
+ }
+ }
+ };
+ }
+}
diff --git a/spark/spark2/Spark2.java b/spark/spark2/Spark2.java
new file mode 100644
index 0000000..82793bd
--- /dev/null
+++ b/spark/spark2/Spark2.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.util.Iterator;
+import scala.Tuple2;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+
+
+abstract class FmFunction<T,R> implements FlatMapFunction<T,R> {
+ abstract Iterator<R> eval ( T t ) throws Exception;
+
+ public Iterator<R> call ( final T t ) throws Exception {
+ return eval(t);
+ }
+}
+
+abstract class PairFmFunction<T,K,V> implements PairFlatMapFunction<T,K,V> {
+ abstract Iterator<Tuple2<K,V>> eval ( T t ) throws Exception;
+
+ public Iterator<Tuple2<K,V>> call ( final T t ) throws Exception {
+ return eval(t);
+ }
+}
diff --git a/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen b/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
index 3ded60e..83de421 100644
--- a/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
+++ b/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
@@ -41,8 +41,6 @@
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.Function3;
import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
@@ -388,13 +386,13 @@
}
/* convert an MRQL lambda to a Spark Function */
- private static FlatMapFunction<MRData,MRData> cmap_fnc ( Tree fnc, Environment env ) {
+ private static FmFunction<MRData,MRData> cmap_fnc ( Tree fnc, Environment env ) {
final Broadcast<Environment> broadcast_env = spark_context.broadcast(global_env);
final org.apache.mrql.Function f = evalF(fnc,env);
- return new FlatMapFunction<MRData,MRData>() {
- public Iterable<MRData> call ( MRData value ) {
+ return new FmFunction<MRData,MRData>() {
+ public Iterator<MRData> eval ( MRData value ) {
global_env = broadcast_env.value();
- return (Bag)f.eval(value);
+ return ((Bag)f.eval(value)).iterator();
}
};
}
@@ -443,26 +441,22 @@
});
}
- private static Iterable<Tuple2<MRData,MRData>> joinIterator ( final Iterator<MRData> i ) {
- return new Iterable<Tuple2<MRData,MRData>>() {
- public Iterator<Tuple2<MRData,MRData>> iterator() {
- return new Iterator<Tuple2<MRData,MRData>> () {
- public Tuple2<MRData,MRData> next () {
- Tuple data = (Tuple)i.next();
- return new Tuple2<MRData,MRData>(data.first(),data.second());
- }
- public boolean hasNext () {
- return i.hasNext();
- }
- public void remove () {}
- };
+ private static Iterator<Tuple2<MRData,MRData>> joinIterator ( final Iterator<MRData> i ) {
+ return new Iterator<Tuple2<MRData,MRData>> () {
+ public Tuple2<MRData,MRData> next () {
+ Tuple data = (Tuple)i.next();
+ return new Tuple2<MRData,MRData>(data.first(),data.second());
}
+ public boolean hasNext () {
+ return i.hasNext();
+ }
+ public void remove () {}
};
}
- private static FlatMapFunction<Iterator<MRData>,MRData> combiner_fnc ( final org.apache.mrql.Function f ) {
- return new FlatMapFunction<Iterator<MRData>,MRData>() {
- public Iterable<MRData> call ( final Iterator<MRData> i ) {
+ private static FmFunction<Iterator<MRData>,MRData> combiner_fnc ( final org.apache.mrql.Function f ) {
+ return new FmFunction<Iterator<MRData>,MRData>() {
+ public Iterator<MRData> eval ( final Iterator<MRData> i ) {
return MapReduceAlgebra.cmap(new org.apache.mrql.Function() {
public MRData eval ( MRData x ) {
final MRData key = ((Tuple)x).first();
@@ -483,7 +477,7 @@
public boolean hasNext () {
return i.hasNext();
}
- })));
+ }))).iterator();
}
};
}
@@ -519,9 +513,9 @@
return eval(x,env,rdd_env)
.flatMap(cmap_fnc(mx,env))
.cartesian(eval(y,env,rdd_env).flatMap(cmap_fnc(my,env)))
- .flatMap(new FlatMapFunction<Tuple2<MRData,MRData>,MRData>() {
- public Iterable<MRData> call ( Tuple2<MRData,MRData> value ) {
- return (Bag)fr.eval(new Tuple(value._1,value._2));
+ .flatMap(new FmFunction<Tuple2<MRData,MRData>,MRData>() {
+ public Iterator<MRData> eval ( Tuple2<MRData,MRData> value ) {
+ return ((Bag)fr.eval(new Tuple(value._1,value._2))).iterator();
}
});
case MapReduce2(`mx,`my,`r,`x,`y,`o):
@@ -544,8 +538,8 @@
return new Tuple2(t.first(),t.second());
}
})
- : eval(x,env,rdd_env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
- public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
+ : eval(x,env,rdd_env).flatMapToPair(new PairFmFunction<MRData,MRData,MRData>() {
+ public Iterator<Tuple2<MRData,MRData>> eval ( MRData value ) {
return joinIterator(((Bag)fx.eval(value)).iterator());
}
});
@@ -567,15 +561,15 @@
return new Tuple2(t.first(),t.second());
}
})
- : eval(y,env,rdd_env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
- public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
+ : eval(y,env,rdd_env).flatMapToPair(new PairFmFunction<MRData,MRData,MRData>() {
+ public Iterator<Tuple2<MRData,MRData>> eval ( MRData value ) {
return joinIterator(((Bag)fy.eval(value)).iterator());
}
});
return xs.cogroup(ys)
- .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
- public Iterable<MRData> call ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
- return (Bag)fr.eval(new Tuple(bag(value._2._1),bag(value._2._2)));
+ .flatMap(new FmFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
+ public Iterator<MRData> eval ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
+ return ((Bag)fr.eval(new Tuple(bag(value._2._1),bag(value._2._2)))).iterator();
}
});
case GroupByJoin(`kx,`ky,`gx,`gy,`acc,`zero,`r,`x,`y,`o):
@@ -592,50 +586,42 @@
final MRData one = new MR_byte(1);
final MRData two = new MR_byte(2);
final JavaPairRDD<MRData,MRData> xs
- = eval(x,env,rdd_env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
- public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
- return new Iterable<Tuple2<MRData,MRData>>() {
- public Iterator<Tuple2<MRData,MRData>> iterator() {
- return new Iterator<Tuple2<MRData,MRData>>() {
- int i = 0;
- public Tuple2<MRData,MRData> next () {
- MRData key = new MR_int((fgx.eval(value).hashCode() % m)+m*i);
- i++;
- return new Tuple2<MRData,MRData>(key,new Tuple(one,value));
- }
- public boolean hasNext () {
- return i < n;
- }
- public void remove () {}
- };
+ = eval(x,env,rdd_env).flatMapToPair(new PairFmFunction<MRData,MRData,MRData>() {
+ public Iterator<Tuple2<MRData,MRData>> eval ( final MRData value ) {
+ return new Iterator<Tuple2<MRData,MRData>>() {
+ int i = 0;
+ public Tuple2<MRData,MRData> next () {
+ MRData key = new MR_int((fgx.eval(value).hashCode() % m)+m*i);
+ i++;
+ return new Tuple2<MRData,MRData>(key,new Tuple(one,value));
}
+ public boolean hasNext () {
+ return i < n;
+ }
+ public void remove () {}
};
}
});
final JavaPairRDD<MRData,MRData> ys
- = eval(y,env,rdd_env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
- public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
- return new Iterable<Tuple2<MRData,MRData>>() {
- public Iterator<Tuple2<MRData,MRData>> iterator() {
- return new Iterator<Tuple2<MRData,MRData>>() {
- int j = 0;
- public Tuple2<MRData,MRData> next () {
- MRData key = new MR_int((fgy.eval(value).hashCode() % n)*m+j);
- j++;
- return new Tuple2<MRData,MRData>(key,new Tuple(two,value));
- }
- public boolean hasNext () {
- return j < m;
- }
- public void remove () {}
- };
+ = eval(y,env,rdd_env).flatMapToPair(new PairFmFunction<MRData,MRData,MRData>() {
+ public Iterator<Tuple2<MRData,MRData>> eval ( final MRData value ) {
+ return new Iterator<Tuple2<MRData,MRData>>() {
+ int j = 0;
+ public Tuple2<MRData,MRData> next () {
+ MRData key = new MR_int((fgy.eval(value).hashCode() % n)*m+j);
+ j++;
+ return new Tuple2<MRData,MRData>(key,new Tuple(two,value));
}
+ public boolean hasNext () {
+ return j < m;
+ }
+ public void remove () {}
};
}
});
return xs.union(ys).groupByKey()
- .mapPartitions(new FlatMapFunction<Iterator<Tuple2<MRData,Iterable<MRData>>>,MRData>() {
- public Iterable<MRData> call ( final Iterator<Tuple2<MRData,Iterable<MRData>>> value ) {
+ .mapPartitions(new FmFunction<Iterator<Tuple2<MRData,Iterable<MRData>>>,MRData>() {
+ public Iterator<MRData> eval ( final Iterator<Tuple2<MRData,Iterable<MRData>>> value ) {
Bag xb = new Bag();
Bag yb = new Bag();
while (value.hasNext()) {
@@ -649,18 +635,14 @@
};
final Bag b = MapReduceAlgebra.mergeGroupByJoin(fkx,fky,fgx,fgy,fc,z,fr,xb,yb);
xb = null; yb = null;
- return new Iterable<MRData>() {
- public Iterator<MRData> iterator() {
- return b.iterator();
- }
- };
+ return b.iterator();
}
});
case OuterMerge(`merge,`state,`data):
final org.apache.mrql.Function fm = evalF(merge,env);
JavaPairRDD<MRData,MRData> S = eval(state,env,rdd_env)
- .mapPartitionsToPair(new PairFlatMapFunction<Iterator<MRData>,MRData,MRData>() {
- public Iterable<Tuple2<MRData,MRData>> call ( final Iterator<MRData> i ) {
+ .mapPartitionsToPair(new PairFmFunction<Iterator<MRData>,MRData,MRData>() {
+ public Iterator<Tuple2<MRData,MRData>> eval ( final Iterator<MRData> i ) {
return joinIterator(i);
}
},true); // do not repartition the state S
@@ -672,8 +654,8 @@
}
});
JavaRDD<MRData> res = S.cogroup(ds)
- .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
- public Iterable<MRData> call ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
+ .flatMap(new FmFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
+ public Iterator<MRData> eval ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
final Iterator<MRData> ix = value._2._1.iterator();
final Iterator<MRData> iy = value._2._2.iterator();
ArrayList<MRData> a = new ArrayList<MRData>();
@@ -683,7 +665,7 @@
else a.add(new Tuple(value._1,ix.next()));
else if (iy.hasNext())
a.add(new Tuple(value._1,iy.next()));
- return a;
+ return a.iterator();
}
}).cache();
cached_rdds.add(res);
@@ -691,8 +673,8 @@
case RightOuterMerge(`merge,`state,`data):
final org.apache.mrql.Function fm = evalF(merge,env);
JavaPairRDD<MRData,MRData> S = eval(state,env,rdd_env)
- .mapPartitionsToPair(new PairFlatMapFunction<Iterator<MRData>,MRData,MRData>() {
- public Iterable<Tuple2<MRData,MRData>> call ( final Iterator<MRData> i ) {
+ .mapPartitionsToPair(new PairFmFunction<Iterator<MRData>,MRData,MRData>() {
+ public Iterator<Tuple2<MRData,MRData>> eval ( final Iterator<MRData> i ) {
return joinIterator(i);
}
},true); // do not repartition the state S
@@ -704,17 +686,17 @@
}
});
JavaRDD<MRData> res = S.cogroup(ds)
- .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
- public Iterable<MRData> call ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
+ .flatMap(new FmFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
+ public Iterator<MRData> eval ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
final Iterator<MRData> ix = value._2._1.iterator();
final Iterator<MRData> iy = value._2._2.iterator();
- return (ix.hasNext())
+ return ((ix.hasNext())
? ((iy.hasNext())
? new Bag(new Tuple(value._1,fm.eval(new Tuple(ix.next(),iy.next()))))
: new Bag())
: ((iy.hasNext())
? new Bag(new Tuple(value._1,iy.next()))
- : new Bag());
+ : new Bag())).iterator();
}
}).cache();
cached_rdds.add(res);
@@ -731,23 +713,19 @@
return new Tuple2(t.first(),t.second());
}
}).collectAsMap());
- return eval(x,env,rdd_env).mapPartitions(new FlatMapFunction<Iterator<MRData>,MRData>() {
- public Iterable<MRData> call ( final Iterator<MRData> i ) {
+ return eval(x,env,rdd_env).mapPartitions(new FmFunction<Iterator<MRData>,MRData>() {
+ public Iterator<MRData> eval ( final Iterator<MRData> i ) {
final Map<MRData,MRData> m = ys.value();
- return new Iterable<MRData>() {
- public Iterator<MRData> iterator() {
- return new Iterator<MRData> () {
- public MRData next () {
- final Tuple p = (Tuple)fx.eval(i.next());
- final MRData pd = m.get(p.first());
- return ((Bag)fr.eval(new Tuple(p.second(),(pd == null) ? empty_bag : pd)));
- }
- public boolean hasNext () {
- return i.hasNext();
- }
- public void remove () {}
- };
+ return new Iterator<MRData> () {
+ public MRData next () {
+ final Tuple p = (Tuple)fx.eval(i.next());
+ final MRData pd = m.get(p.first());
+ return ((Bag)fr.eval(new Tuple(p.second(),(pd == null) ? empty_bag : pd)));
}
+ public boolean hasNext () {
+ return i.hasNext();
+ }
+ public void remove () {}
};
}
},true);
@@ -757,39 +735,35 @@
final org.apache.mrql.Function fr = evalF(r,env);
final Broadcast<Map<MRData,MRData>> ys
= spark_context.broadcast(eval(y,env,rdd_env)
- .flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
- public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
+ .flatMapToPair(new PairFmFunction<MRData,MRData,MRData>() {
+ public Iterator<Tuple2<MRData,MRData>> eval ( MRData value ) {
return joinIterator(((Bag)fy.eval(value)).iterator());
}
}).collectAsMap());
- return eval(x,env,rdd_env).flatMap(new FlatMapFunction<MRData,MRData>() {
- public Iterable<MRData> call ( MRData value ) {
+ return eval(x,env,rdd_env).flatMap(new FmFunction<MRData,MRData>() {
+ public Iterator<MRData> eval ( MRData value ) {
final Map<MRData,MRData> m = ys.value();
final Iterator<MRData> i = ((Bag)fx.eval(value)).iterator();
- return new Iterable<MRData>() {
- public Iterator<MRData> iterator() {
- return new Iterator<MRData>() {
- Tuple p;
- Iterator<MRData> ix = null;
- public MRData next () {
- return ix.next();
- }
- public boolean hasNext () {
- if (ix != null && ix.hasNext())
- return true;
- while (i.hasNext()) {
- p = (Tuple)i.next();
- MRData pd = m.get(p.first());
- Bag bb = ((Bag)fr.eval(new Tuple(p.second(),(pd == null) ? empty_bag : pd)));
- ix = bb.iterator();
- if (ix.hasNext())
- return true;
- };
- return false;
- }
- public void remove () {}
- };
+ return new Iterator<MRData>() {
+ Tuple p;
+ Iterator<MRData> ix = null;
+ public MRData next () {
+ return ix.next();
}
+ public boolean hasNext () {
+ if (ix != null && ix.hasNext())
+ return true;
+ while (i.hasNext()) {
+ p = (Tuple)i.next();
+ MRData pd = m.get(p.first());
+ Bag bb = ((Bag)fr.eval(new Tuple(p.second(),(pd == null) ? empty_bag : pd)));
+ ix = bb.iterator();
+ if (ix.hasNext())
+ return true;
+ };
+ return false;
+ }
+ public void remove () {}
};
}
});
diff --git a/spark/src/main/java/org/apache/mrql/SparkStreaming.gen b/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
index 1ae28b1..2b74b3e 100644
--- a/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
+++ b/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
@@ -159,7 +159,11 @@
plan = get_streams(plan,env);
stream_processing(plan,env,dataset_env,f);
};
- stream_context.start();
- stream_context.awaitTermination();
+ try {
+ stream_context.start();
+ stream_context.awaitTermination();
+ } catch (Exception ex) {
+ throw new Error(ex);
+ }
}
}