MRQL-63: Add support for MRQL streaming in spark streaming mode
diff --git a/conf/mrql-env.sh b/conf/mrql-env.sh
index f288b62..8c67824 100644
--- a/conf/mrql-env.sh
+++ b/conf/mrql-env.sh
@@ -70,11 +70,12 @@
BSP_SPLIT_INPUT=
-# Optional: Spark configuration. Supports versions 1.0.0, 1.0.2, and 1.1.0
+# Optional: Spark configuration. Supports versions 1.0.0, 1.0.2, 1.1.0, 1.1.1, and 1.2.0
# (Spark versions 0.8.1, 0.9.0, and 0.9.1 are supported by MRQL 0.9.0)
# You may use the Spark prebuilts bin-hadoop1 or bin-hadoop2 (Yarn)
+# For distributed mode, give write permission to /tmp: hadoop fs -chmod -R 777 /tmp
# Tested in local, standalone deploy, and Yarn modes
-SPARK_HOME=${HOME}/spark-1.1.0-bin-hadoop2.3
+SPARK_HOME=${HOME}/spark-1.2.0-bin-hadoop2.3
# URI of the Spark master node:
# to run Spark on Standalone Mode, set it to spark://`hostname`:7077
# to run Spark on a YARN cluster, set it to "yarn-client"
@@ -90,9 +91,9 @@
SPARK_MASTER_MEMORY=512M
-# Optional: Flink configuration. Supports versions 0.6-incubating, 0.6.1-incubating, and 0.7.0-incubating
+# Optional: Flink configuration. Supports versions 0.6-incubating, 0.6.1-incubating, 0.7.0-incubating, and 0.8.0
# Note: for yarn, set yarn.nodemanager.vmem-check-enabled to false in yarn-site.xml
-FLINK_VERSION=yarn-0.7.0-incubating
+FLINK_VERSION=yarn-0.8.0
# Flink installation directory
FLINK_HOME=${HOME}/flink-${FLINK_VERSION}
# (use this for a Flink snapshot):
diff --git a/core/src/main/java/org/apache/mrql/Config.java b/core/src/main/java/org/apache/mrql/Config.java
index ce631a4..7e1d291 100644
--- a/core/src/main/java/org/apache/mrql/Config.java
+++ b/core/src/main/java/org/apache/mrql/Config.java
@@ -84,6 +84,8 @@
public static boolean testing = false;
// true to display INFO log messages
public static boolean info = false;
+ // for streaming, stream_window > 0 is the stream window duration in milliseconds
+ public static int stream_window = 0;
/** store the configuration parameters */
public static void write ( Configuration conf ) {
@@ -114,6 +116,7 @@
conf.setBoolean("mrql.quiet.execution",quiet_execution);
conf.setBoolean("mrql.testing",testing);
conf.setBoolean("mrql.info",info);
+ conf.setInt("mrql.stream.window",stream_window);
}
/** load the configuration parameters */
@@ -145,6 +148,7 @@
quiet_execution = conf.getBoolean("mrql.quiet.execution",quiet_execution);
testing = conf.getBoolean("mrql.testing",testing);
info = conf.getBoolean("mrql.info",info);
+ stream_window = conf.getInt("mrql.stream.window",stream_window);
}
public static ArrayList<String> extra_args = new ArrayList<String>();
@@ -258,6 +262,11 @@
Translator.print_aggregates();
System.out.println();
i++;
+ } else if (args[i].equals("-stream")) {
+ if (++i >= args.length)
+ throw new Error("Expected a stream window duration");
+ stream_window = Integer.parseInt(args[i]);
+ i++;
} else if (args[i].charAt(0) == '-')
throw new Error("Unknown MRQL parameter: "+args[i]);
else {
diff --git a/core/src/main/java/org/apache/mrql/DataSetFunction.java b/core/src/main/java/org/apache/mrql/DataSetFunction.java
new file mode 100644
index 0000000..d9db585
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/DataSetFunction.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.*;
+
+
+/**
+ * An anonymous function on DataSets
+ * Must provide a concrete implementation for eval (the lambda body)
+ */
+abstract public class DataSetFunction implements Serializable {
+ /**
+ * evaluate the anonymous function on a DataSet
+ * @param arg the operand to be evaluated
+ */
+ abstract public void eval ( final DataSet arg );
+}
diff --git a/core/src/main/java/org/apache/mrql/Evaluator.java b/core/src/main/java/org/apache/mrql/Evaluator.java
index 0b19599..73aad03 100644
--- a/core/src/main/java/org/apache/mrql/Evaluator.java
+++ b/core/src/main/java/org/apache/mrql/Evaluator.java
@@ -150,6 +150,11 @@
out.close();
}
+ /** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
+ public void streaming ( Tree plan, Environment env, DataSetFunction f ) {
+ throw new Error("MRQL Streaming is not supported in this evaluation mode yet");
+ }
+
/** for dumped data to a file, return the MRQL type of the data */
public Tree get_type ( String file ) {
try {
diff --git a/core/src/main/java/org/apache/mrql/Interpreter.gen b/core/src/main/java/org/apache/mrql/Interpreter.gen
index e22afbe..5714a54 100644
--- a/core/src/main/java/org/apache/mrql/Interpreter.gen
+++ b/core/src/main/java/org/apache/mrql/Interpreter.gen
@@ -575,6 +575,14 @@
if (!(t instanceof Tuple))
throw new Error("Expected a tuple in function application: "+t);
return ((Lambda)fnc).lambda().eval(t);
+ case Stream(...): // streaming
+ final Tree qt = query_type;
+ Evaluator.evaluator.streaming(e,env,new DataSetFunction(){
+ public void eval ( final DataSet ds ) {
+ System.out.println(print(new MR_dataset(ds),qt));
+ }
+ });
+ return new Bag();
case _:
try {
if (Config.hadoop_mode)
@@ -858,6 +866,8 @@
System.out.println("Physical plan type: "+print_type(et));
repeat_variables = #[];
ne = Simplification.simplify_all(ne);
+ if (Streaming.is_streaming(ne))
+ ne = Streaming.streamify(ne);
Tree plan = PlanGeneration.makePlan(ne);
if (Config.bsp_mode) {
BSPTranslator.reset();
diff --git a/core/src/main/java/org/apache/mrql/PlanGeneration.gen b/core/src/main/java/org/apache/mrql/PlanGeneration.gen
index 72200fc..85e4528 100644
--- a/core/src/main/java/org/apache/mrql/PlanGeneration.gen
+++ b/core/src/main/java/org/apache/mrql/PlanGeneration.gen
@@ -384,6 +384,18 @@
for ( Tree a: args )
el = el.append(makePlan(a));
return #<ParsedSource(`parser,`(makePlan(file)),...el)>;
+ case call(stream,binary,`file,`tp):
+ return #<BinaryStream(`file,`tp)>;
+ case call(stream,gen,`f,`len,`ulen):
+ return #<SequenceStream(`(makePlan(f)),`(makePlan(len)),
+ `(makePlan(ulen)))>;
+ case call(stream,`parser,`file,...args):
+ Trees el = #[];
+ for ( Tree a: args )
+ el = el.append(makePlan(a));
+ return #<ParsedStream(`parser,`(makePlan(file)),...el)>;
+ case stream(lambda(`v,`b),`u):
+ return #<Stream(lambda(`v,`(makePlan(b))),`(makePlan(u)))>;
case type(`x): return e;
case gen(`min,`max,`size):
return #<Generator(`(makePlan(min)),`(makePlan(max)),`(makePlan(size)))>;
diff --git a/core/src/main/java/org/apache/mrql/Streaming.gen b/core/src/main/java/org/apache/mrql/Streaming.gen
new file mode 100644
index 0000000..973e323
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/Streaming.gen
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.*;
+
+
+/** Generate a physical plan from an algebraic expression */
+final public class Streaming extends AlgebraicOptimization {
+
+ static boolean is_streaming ( Tree e ) {
+ match e {
+ case call(stream,...): return true;
+ case `f(...al):
+ for ( Tree a: al )
+ if (is_streaming(a))
+ return true;
+ };
+ return false;
+ }
+
+ static Trees stream_bindings ( Tree e ) {
+ match e {
+ case call(stream,...):
+ return #[bind(`(new_var()),`e)];
+ case `f(...al):
+ Trees rs = #[];
+ for ( Tree a: al )
+ rs = rs.append(stream_bindings(a));
+ return rs;
+ };
+ return #[];
+ }
+
+ static Tree streamify ( Tree e ) {
+ Trees sbs = stream_bindings(e);
+ Tree ne = e;
+ for ( Tree sb: sbs )
+ match sb {
+ case bind(`v,`u):
+ ne = #<stream(lambda(`v,`(subst(u,v,ne))),`u)>;
+ TypeInference.type_inference(ne);
+ };
+ return ne;
+ }
+}
diff --git a/core/src/main/java/org/apache/mrql/TopLevel.gen b/core/src/main/java/org/apache/mrql/TopLevel.gen
index a190648..645cc86 100644
--- a/core/src/main/java/org/apache/mrql/TopLevel.gen
+++ b/core/src/main/java/org/apache/mrql/TopLevel.gen
@@ -207,6 +207,18 @@
}
}
+ /** true, if e is a strem-processing expression */
+ public final static boolean stream_expression ( Tree e ) {
+ match e {
+ case call(stream,...): return true;
+ case `f(...as):
+ for ( Tree a: as )
+ if (stream_expression(a))
+ return true;
+ };
+ return false;
+ }
+
/** define a new named type (typedef) */
private final static void typedef ( String name, Tree type ) {
type_names.insert(name,normalize_type(type));
@@ -267,6 +279,46 @@
long t = System.currentTimeMillis();
if (distributed_assign(v.toString(),e) != null && !Config.quiet_execution)
System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs");
+ case dump(`s,`e): // dump stream output
+ if (!stream_expression(e))
+ fail;
+ reset();
+ Tree plan = translate_expression(e);
+ if (plan == null)
+ return;
+ Evaluator.evaluator.initialize_query();
+ final Tree qt = query_type;
+ final String file = s.stringValue();
+ Evaluator.evaluator.streaming(plan,null,new DataSetFunction(){
+ long i = 0;
+ public void eval ( final DataSet ds ) {
+ try {
+ Evaluator.evaluator.dump(file+"/f"+(i++),qt,new MR_dataset(ds));
+ } catch (Exception ex) {
+ throw new Error("Cannot dump the streaming result to the directory "+file);
+ };
+ }
+ });
+ case dump_text(`s,`e): // dump stream output
+ if (!stream_expression(e))
+ fail;
+ reset();
+ Tree plan = translate_expression(e);
+ if (plan == null)
+ return;
+ Evaluator.evaluator.initialize_query();
+ final Tree qt = query_type;
+ final String file = s.stringValue();
+ Evaluator.evaluator.streaming(plan,null,new DataSetFunction(){
+ long i = 0;
+ public void eval ( final DataSet ds ) {
+ try {
+ Evaluator.evaluator.dump_text(file+"/f"+(i++),qt,new MR_dataset(ds));
+ } catch (Exception ex) {
+ throw new Error("Cannot dump the streaming result to the directory "+file);
+ }
+ }
+ });
case dump(`s,`e):
long t = System.currentTimeMillis();
dump(s.stringValue(),e);
diff --git a/core/src/main/java/org/apache/mrql/TypeInference.gen b/core/src/main/java/org/apache/mrql/TypeInference.gen
index 11dea76..4f78c06 100644
--- a/core/src/main/java/org/apache/mrql/TypeInference.gen
+++ b/core/src/main/java/org/apache/mrql/TypeInference.gen
@@ -615,6 +615,19 @@
} catch (Exception x) {
type_error(e,"Unrecognized parser type: "+parser);
}
+ case call(stream,binary,`f):
+ if (Config.stream_window == 0)
+ type_error(e,"Not in stream processing mode");
+ Tree tp = type_inference(#<call(source,binary,`f)>);
+ ((Node)e).children = ((Node)e).children.append(tp); // destructive
+ return tp;
+ case call(stream,...r):
+ if (Config.stream_window == 0)
+ type_error(e,"Not in stream processing mode");
+ return type_inference(#<call(source,...r)>);
+ case stream(lambda(`v,`b),`u):
+ bind_pattern_type(v,type_inference2(u));
+ return type_inference2(b);
case typed(null,`tp):
return tp;
case typed(`f(...),`tp):
diff --git a/flink/pom.xml b/flink/pom.xml
index 1161c77..bd3b8d7 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -54,6 +54,11 @@
<version>${flink.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-core</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.mrql</groupId>
<artifactId>mrql-core</artifactId>
<version>${project.version}</version>
diff --git a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
index 428fb76..495d1ab 100644
--- a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
+++ b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
@@ -37,12 +37,14 @@
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.api.java.functions.*;
import org.apache.flink.api.java.operators.*;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/** Evaluates physical plans in Apache Flink mode */
-final public class FlinkEvaluator extends Evaluator implements Serializable {
+public class FlinkEvaluator extends Evaluator implements Serializable {
final static URL flink_jar = FlinkEvaluator.class.getProtectionDomain().getCodeSource().getLocation();
public static ExecutionEnvironment flink_env;
+ public static StreamExecutionEnvironment stream_env;
// an HDFS tmp file used to hold the data source directory information in distributed mode
static String data_source_dir_name;
static String master_host = "localhost";
@@ -91,7 +93,12 @@
Config.nodes,flink_jar.toURI().getPath(),Plan.conf.get("mrql.jar.path"));
else flink_env = ExecutionEnvironment.createRemoteEnvironment(master_host,master_port,
Config.nodes,flink_jar.toURI().getPath());
- } else flink_env.setDefaultLocalParallelism(Config.nodes);
+ } else {
+ flink_env.setDefaultLocalParallelism(Config.nodes);
+ if (Config.stream_window > 0)
+ stream_env = StreamExecutionEnvironment.createLocalEnvironment(Config.nodes)
+ .setBufferTimeout(Config.stream_window);
+ }
} catch (Exception ex) {
throw new Error("Cannot initialize the Flink evaluator: "+ex);
}
@@ -116,6 +123,11 @@
return FlinkGeneratorInputFormat.class;
}
+ /** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
+ final public void streaming ( Tree plan, Environment env, DataSetFunction f ) {
+ FlinkStreaming.evaluate(plan,env,f);
+ }
+
/** returns the absolute path relative to the directory that contains the MRQL executable */
public static String absolute_path ( String path) {
try {
diff --git a/flink/src/main/java/org/apache/mrql/FlinkStreaming.gen b/flink/src/main/java/org/apache/mrql/FlinkStreaming.gen
new file mode 100644
index 0000000..f4303e5
--- /dev/null
+++ b/flink/src/main/java/org/apache/mrql/FlinkStreaming.gen
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.io.IOException;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.util.Collector;
+import org.apache.flink.streaming.api.datastream.*;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+
+
+/** Evaluates physical plans in Apache Flink stream mode */
+public class FlinkStreaming extends FlinkEvaluator {
+ private static ArrayList<DataStream<FData>> streams;
+ private static ArrayList<String> stream_names;
+
+ public static final class MRDataFileSourceFunction implements SourceFunction<FData> {
+ private static final long serialVersionUID = 1L;
+ final private FData container = new FData();
+ private HashMap<String,Long> file_modification_times;
+
+ final private String directory;
+ final private long time_window;
+ final private FileInputFormat<FData> input_format;
+
+ public MRDataFileSourceFunction ( String directory, long time_window,
+ FileInputFormat<FData> input_format ) {
+ this.directory = directory;
+ this.time_window = time_window;
+ this.input_format = input_format;
+ }
+
+ /** return the files within the directory that have been created within the last time window */
+ private ArrayList<String> new_files () {
+ try {
+ long ct = System.currentTimeMillis();
+ Path dpath = new Path(directory);
+ final FileSystem fs = dpath.getFileSystem(Plan.conf);
+ final FileStatus[] ds
+ = fs.listStatus(dpath,
+ new PathFilter () {
+ public boolean accept ( Path path ) {
+ return !path.getName().startsWith("_")
+ && !path.getName().endsWith(".type");
+ }
+ });
+ ArrayList<String> s = new ArrayList<String>();
+ for ( FileStatus d: ds ) {
+ String name = d.getPath().toString();
+ if (file_modification_times.get(name) == null
+ || d.getModificationTime() > file_modification_times.get(name)) {
+ file_modification_times.put(name,new Long(ct));
+ s.add(name);
+ }
+ };
+ return s;
+ } catch (Exception ex) {
+ throw new Error("Cannot open a new file from the directory "+directory+": "+ex);
+ }
+ }
+
+ @Override
+ public void invoke ( Collector<FData> collector ) throws IOException {
+ long tick = System.currentTimeMillis();
+ file_modification_times = new HashMap<String,Long>();
+ while (true) {
+ for ( String path: new_files() ) {
+ input_format.setFilePath(path);
+ FileInputSplit[] splits = input_format.createInputSplits(1);
+ for ( FileInputSplit split: splits ) {
+ input_format.open(split);
+ while (!input_format.reachedEnd()) {
+ FData fd = input_format.nextRecord(container);
+ if (fd != null)
+ collector.collect(fd);
+ }
+ };
+ input_format.close();
+ };
+ try {
+ long ct = System.currentTimeMillis();
+ tick += time_window;
+ if (tick > ct)
+ Thread.sleep(tick-ct);
+ else tick = ct;
+ } catch (Exception ex) {
+ return;
+ }
+ }
+ }
+ }
+
+ private static DataStream<FData> stream_source ( Tree source, Environment env ) {
+ match source {
+ case BinaryStream(`file,_):
+ String path = absolute_path(((MR_string)evalE(file,null)).get());
+ new BinaryDataSource(path,Plan.conf);
+ return stream_env.addSource(new MRDataFileSourceFunction(path,Config.stream_window,
+ new FlinkBinaryInputFormat.FDataInputFormat()));
+ case ParsedStream(`parser,`file,...args):
+ String path = absolute_path(((MR_string)evalE(file,null)).get());
+ Class<? extends Parser> parser_class = DataSource.parserDirectory.get(parser.toString());
+ if (parser_class == null)
+ throw new Error("Unknown parser: "+parser);
+ new FlinkParsedDataSource(path,parser_class,args);
+ return stream_env.addSource(new MRDataFileSourceFunction(path,Config.stream_window,
+ new FlinkParsedInputFormat.ParsedInputFormat(path)));
+ };
+ throw new Error("Unknown stream source: "+print_query(source));
+ }
+
+ private static void stream_processing ( final Tree plan, final Environment env,
+ final DataSetFunction f, final DataStream<FData> stream ) {
+ final Evaluator ef = Evaluator.evaluator;
+ match plan {
+ case Stream(lambda(`v,`b),`source):
+ DataStream<FData> streamSource = stream_source(source,env);
+ streams.add(streamSource);
+ stream_names.add(v.toString());
+ stream_processing(b,env,f,streamSource);
+ case _:
+ /** Incomplete: need a function from List<DataSet> to DataSet (or void) on each Flink window
+ stream.window().reduceGroup(new GroupReduceFunction<FData,FData>() {
+ public void reduce ( Iterable<FData> values, Collector<FData> out ) throws Exception {
+ f.eval(ef.eval(plan,env,"-"));
+ }
+ });
+ */
+ }
+ }
+
+ /** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
+ final public static void evaluate ( Tree plan, final Environment env, final DataSetFunction f ) {
+ try {
+ if (true) // needs more work
+ throw new Error("MRQL Streaming is not supported in this evaluation mode yet");
+ streams = new ArrayList<DataStream<FData>>();
+ stream_names = new ArrayList<String>();
+ stream_processing(plan,env,f,null);
+ stream_env.execute();
+ } catch (Exception ex) {
+ throw new Error("Error during Flink stream processing: "+ex);
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 7414bf4..f2bb3e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,9 +58,9 @@
<hadoop.version>1.2.1</hadoop.version>
<yarn.version>2.2.0</yarn.version>
<hama.version>0.6.4</hama.version>
- <spark.version>1.1.0</spark.version>
+ <spark.version>1.2.0</spark.version>
<scala.version>2.10</scala.version>
- <flink.version>0.7.0-incubating</flink.version>
+ <flink.version>0.8.0</flink.version>
<skipTests>true</skipTests>
</properties>
diff --git a/queries/streaming.mrql b/queries/streaming.mrql
new file mode 100644
index 0000000..a453ab6
--- /dev/null
+++ b/queries/streaming.mrql
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+select (k,avg(p.Y))
+from p in stream(binary,"tmp/points.bin")
+group by k: p.X;
diff --git a/spark/pom.xml b/spark/pom.xml
index 24bec74..a12af11 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -49,6 +49,11 @@
<version>${spark.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.mrql</groupId>
<artifactId>mrql-core</artifactId>
<version>${project.version}</version>
diff --git a/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen b/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
index df67385..0e824b6 100644
--- a/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
+++ b/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
@@ -21,7 +21,6 @@
import java_cup.runtime.Scanner;
import java.util.List;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.io.*;
@@ -42,16 +41,20 @@
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.Function3;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.Duration;
/** Evaluates physical plans in Apache Spark mode */
-final public class SparkEvaluator extends Evaluator implements Serializable {
+public class SparkEvaluator extends Evaluator implements Serializable {
public static JavaSparkContext spark_context;
+ public static JavaStreamingContext stream_context;
final static Bag empty_bag = new Bag();
// an HDFS tmp file used to hold the data source directory information in distributed mode
final static String data_source_dir_name = "tmp/"+System.getenv("USER")+"_data_source_dir.txt";
@@ -76,6 +79,8 @@
Plan.conf = spark_context.hadoopConfiguration();
FileSystem.setDefaultUri(Plan.conf,System.getenv("FS_DEFAULT_NAME"));
};
+ if (Config.stream_window > 0 && (Config.local_mode || Config.hadoop_mode))
+ stream_context = new JavaStreamingContext(spark_context,new Duration(Config.stream_window));
if (!Config.info) {
for ( Enumeration en = LogManager.getCurrentLoggers(); en.hasMoreElements(); )
((Logger)en.nextElement()).setLevel(Level.WARN);
@@ -118,6 +123,11 @@
return SparkGeneratorInputFormat.class;
}
+ /** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
+ final public void streaming ( Tree plan, Environment env, DataSetFunction f ) {
+ SparkStreaming.evaluate(plan,env,f);
+ }
+
/** used by the master to send parsing details (eg, record types) to workers */
public static void dump_source_dir () throws IOException {
if (Config.local_mode)
@@ -446,7 +456,7 @@
.flatMap(cmap_fnc(fnc,env));
}
- private static JavaRDD<MRData> containerData ( JavaPairRDD<MRContainer,MRContainer> rd ) {
+ static JavaRDD<MRData> containerData ( JavaPairRDD<MRContainer,MRContainer> rd ) {
final Environment master_env = global_env;
return rd.map(new Function<Tuple2<MRContainer,MRContainer>,MRData>() {
public MRData call ( Tuple2<MRContainer,MRContainer> value ) {
diff --git a/spark/src/main/java/org/apache/mrql/SparkFileInputStream.java b/spark/src/main/java/org/apache/mrql/SparkFileInputStream.java
new file mode 100644
index 0000000..f1e1535
--- /dev/null
+++ b/spark/src/main/java/org/apache/mrql/SparkFileInputStream.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.util.HashMap;
+import java.util.ArrayList;
+import scala.Option;
+import scala.Some;
+import scala.None;
+import scala.collection.immutable.List;
+import scala.collection.immutable.Nil$;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.streaming.*;
+import org.apache.spark.streaming.api.java.*;
+import org.apache.spark.streaming.dstream.*;
+
+
+/** A Spark InputDStream for MRQL InputFormats */
+public class SparkFileInputStream extends JavaInputDStream<MRData> {
+ private final static scala.reflect.ClassTag<MRData> classtag = scala.reflect.ClassTag$.MODULE$.apply(MRData.class);
+
+ public static final class MRDataInputStream extends InputDStream<MRData> {
+ private final String directory;
+ private final boolean is_binary;
+ private final JavaStreamingContext stream_context;
+ private final HashMap<String,Long> file_modification_times;
+
+ MRDataInputStream ( JavaStreamingContext stream_context, String directory, boolean is_binary ) {
+ super(stream_context.ssc(),classtag);
+ this.directory = directory;
+ this.is_binary = is_binary;
+ this.stream_context = stream_context;
+ file_modification_times = new HashMap<String,Long>();
+ }
+
+ @Override
+ public void start () {}
+
+ @Override
+ public void stop () {}
+
+ @Override
+ public Duration slideDuration () {
+ return new Duration(Config.stream_window);
+ }
+
+ @Override
+ public List dependencies () {
+ return Nil$.MODULE$;
+ }
+
+ /** return the files within the directory that have been created within the last time window */
+ private ArrayList<String> new_files () {
+ try {
+ long ct = System.currentTimeMillis();
+ Path dpath = new Path(directory);
+ final FileSystem fs = dpath.getFileSystem(Plan.conf);
+ final FileStatus[] ds
+ = fs.listStatus(dpath,
+ new PathFilter () {
+ public boolean accept ( Path path ) {
+ return !path.getName().startsWith("_")
+ && !path.getName().endsWith(".type");
+ }
+ });
+ ArrayList<String> s = new ArrayList<String>();
+ for ( FileStatus d: ds ) {
+ String name = d.getPath().toString();
+ if (file_modification_times.get(name) == null
+ || d.getModificationTime() > file_modification_times.get(name)) {
+ file_modification_times.put(name,new Long(ct));
+ s.add(name);
+ }
+ };
+ return s;
+ } catch (Exception ex) {
+ throw new Error("Cannot open a new file from the directory "+directory+": "+ex);
+ }
+ }
+
+ private JavaRDD<MRData> hadoopFile ( String file ) {
+ return SparkEvaluator.containerData(
+ (is_binary)
+ ? SparkEvaluator.spark_context.sequenceFile(file,
+ MRContainer.class,MRContainer.class,
+ Config.nodes)
+ : SparkEvaluator.spark_context.hadoopFile(file,SparkParsedInputFormat.class,
+ MRContainer.class,MRContainer.class,
+ Config.nodes));
+ }
+
+ @Override
+ public Option<RDD<MRData>> compute ( Time validTime ) {
+ JavaRDD<MRData> rdd = null;
+ for ( String file: new_files() )
+ if (rdd == null)
+ rdd = hadoopFile(file);
+ else rdd = rdd.union(hadoopFile(file));
+ if (rdd == null)
+ return scala.None$.MODULE$.apply(null);
+ return new Some<RDD<MRData>>(rdd.rdd());
+ }
+ }
+
+ SparkFileInputStream ( JavaStreamingContext stream_context, String directory, boolean is_binary ) {
+ super(new MRDataInputStream(stream_context,directory,is_binary),classtag);
+ }
+}
diff --git a/spark/src/main/java/org/apache/mrql/SparkStreaming.gen b/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
new file mode 100644
index 0000000..f514a48
--- /dev/null
+++ b/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.io.*;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.streaming.*;
+import org.apache.spark.streaming.api.java.*;
+import org.apache.spark.streaming.dstream.*;
+import scala.collection.Seq;
+import scala.collection.JavaConversions;
+import scala.runtime.AbstractFunction2;
+
+
+/** Evaluates physical plans in Apache Spark stream mode */
+public class SparkStreaming extends SparkEvaluator {
+ private final static scala.reflect.ClassTag<MRData> classtag = scala.reflect.ClassTag$.MODULE$.apply(MRData.class);
+ private static ArrayList<JavaInputDStream<MRData>> streams;
+ private static ArrayList<String> stream_names;
+
+ private static JavaInputDStream<MRData> stream_source ( Tree source, Environment env ) {
+ match source {
+ case BinaryStream(`file,_):
+ String path = ((MR_string)evalE(file,env)).get();
+ new BinaryDataSource(path,Plan.conf);
+ return new SparkFileInputStream(stream_context,path,true);
+ case ParsedStream(`parser,`file,...args):
+ String path = ((MR_string)evalE(file,env)).get();
+ Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
+ if (p == null)
+ throw new Error("Unknown parser: "+parser);
+ new ParsedDataSource(path,p,args,Plan.conf);
+ try {
+ dump_source_dir();
+ } catch (IOException ex) {
+ throw new Error("Cannot dump source directory");
+ };
+ return new SparkFileInputStream(stream_context,path,false);
+ };
+ throw new Error("Unknown stream source: "+print_query(source));
+ }
+
+ private static void stream_processing ( final Tree plan, final Environment env, final DataSetFunction f ) {
+ final Evaluator ef = Evaluator.evaluator;
+ match plan {
+ case Stream(lambda(`v,`b),`source):
+ streams.add(stream_source(source,env));
+ stream_names.add(v.toString());
+ stream_processing(b,env,f);
+ case _:
+ ArrayList<DStream<?>> rdds = new ArrayList<DStream<?>>();
+ for ( JavaDStream<MRData> jd: streams )
+ rdds.add(jd.dstream());
+ final ArrayList<String> vars = stream_names;
+ final AbstractFunction2<Seq<RDD<?>>,Time,RDD<MRData>> fnc =
+ new AbstractFunction2<Seq<RDD<?>>,Time,RDD<MRData>>() {
+ public RDD<MRData> apply ( Seq<RDD<?>> rdds, Time tm ) {
+ Environment new_env = env;
+ int i = 0;
+ for ( RDD<?> rdd: JavaConversions.seqAsJavaList(rdds) ) {
+ try {
+ if (rdd.count() == 0)
+ rdd = SparkEvaluator.spark_context.emptyRDD().rdd();
+ } catch (Exception ex) {
+ return SparkEvaluator.spark_context.sc().emptyRDD(classtag);
+ };
+ new_env = new Environment(vars.get(i++),new MR_rdd(new JavaRDD(rdd,classtag)),new_env);
+ };
+ f.eval(ef.eval(plan,new_env,"-"));
+ return SparkEvaluator.spark_context.sc().emptyRDD(classtag);
+ }
+ };
+ new TransformedDStream<MRData>(JavaConversions.asScalaBuffer(rdds).toSeq(),fnc,classtag).register();
+ }
+ }
+
+ /** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
+ final public static void evaluate ( Tree plan, Environment env, DataSetFunction f ) {
+ streams = new ArrayList<JavaInputDStream<MRData>>();
+ stream_names = new ArrayList<String>();
+ stream_processing(plan,env,f);
+ stream_context.start();
+ stream_context.awaitTermination();
+ }
+}