MRQL-63: Add support for MRQL streaming in spark streaming mode
diff --git a/conf/mrql-env.sh b/conf/mrql-env.sh
index f288b62..8c67824 100644
--- a/conf/mrql-env.sh
+++ b/conf/mrql-env.sh
@@ -70,11 +70,12 @@
 BSP_SPLIT_INPUT=
 
 
-# Optional: Spark configuration. Supports versions 1.0.0, 1.0.2, and 1.1.0
+# Optional: Spark configuration. Supports versions 1.0.0, 1.0.2, 1.1.0, 1.1.1, and 1.2.0
 # (Spark versions 0.8.1, 0.9.0, and 0.9.1 are supported by MRQL 0.9.0)
 # You may use the Spark prebuilts bin-hadoop1 or bin-hadoop2 (Yarn)
+# For distributed mode, give write permission to /tmp: hadoop fs -chmod -R 777 /tmp
 # Tested in local, standalone deploy, and Yarn modes
-SPARK_HOME=${HOME}/spark-1.1.0-bin-hadoop2.3
+SPARK_HOME=${HOME}/spark-1.2.0-bin-hadoop2.3
 # URI of the Spark master node:
 #   to run Spark on Standalone Mode, set it to spark://`hostname`:7077
 #   to run Spark on a YARN cluster, set it to "yarn-client"
@@ -90,9 +91,9 @@
 SPARK_MASTER_MEMORY=512M
 
 
-# Optional: Flink configuration. Supports versions 0.6-incubating, 0.6.1-incubating, and 0.7.0-incubating
+# Optional: Flink configuration. Supports versions 0.6-incubating, 0.6.1-incubating, 0.7.0-incubating, and 0.8.0
 # Note: for yarn, set yarn.nodemanager.vmem-check-enabled to false in yarn-site.xml
-FLINK_VERSION=yarn-0.7.0-incubating
+FLINK_VERSION=yarn-0.8.0
 # Flink installation directory
 FLINK_HOME=${HOME}/flink-${FLINK_VERSION}
 #   (use this for a Flink snapshot):
diff --git a/core/src/main/java/org/apache/mrql/Config.java b/core/src/main/java/org/apache/mrql/Config.java
index ce631a4..7e1d291 100644
--- a/core/src/main/java/org/apache/mrql/Config.java
+++ b/core/src/main/java/org/apache/mrql/Config.java
@@ -84,6 +84,8 @@
     public static boolean testing = false;
     // true to display INFO log messages
     public static boolean info = false;
+    // for streaming, stream_window > 0 is the stream window duration in milliseconds
+    public static int stream_window = 0;
 
     /** store the configuration parameters */
     public static void write ( Configuration conf ) {
@@ -114,6 +116,7 @@
         conf.setBoolean("mrql.quiet.execution",quiet_execution);
         conf.setBoolean("mrql.testing",testing);
         conf.setBoolean("mrql.info",info);
+        conf.setInt("mrql.stream.window",stream_window);
     }
 
     /** load the configuration parameters */
@@ -145,6 +148,7 @@
         quiet_execution = conf.getBoolean("mrql.quiet.execution",quiet_execution);
         testing = conf.getBoolean("mrql.testing",testing);
         info = conf.getBoolean("mrql.info",info);
+        stream_window = conf.getInt("mrql.stream.window",stream_window);
     }
 
     public static ArrayList<String> extra_args = new ArrayList<String>();
@@ -258,6 +262,11 @@
                 Translator.print_aggregates();
                 System.out.println();
                 i++;
+            } else if (args[i].equals("-stream")) {
+                if (++i >= args.length)
+                    throw new Error("Expected a stream window duration");
+                stream_window = Integer.parseInt(args[i]);
+                i++;
             } else if (args[i].charAt(0) == '-')
                 throw new Error("Unknown MRQL parameter: "+args[i]);
             else {
diff --git a/core/src/main/java/org/apache/mrql/DataSetFunction.java b/core/src/main/java/org/apache/mrql/DataSetFunction.java
new file mode 100644
index 0000000..d9db585
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/DataSetFunction.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.*;
+
+
+/**
+ * An anonymous function on DataSets
+ * Must provide a concrete implementation for eval (the lambda body)
+ */
+abstract public class DataSetFunction implements Serializable {
+    /**
+     * evaluate the anonymous function on a DataSet
+     * @param arg the operand to be evaluated
+     */
+    abstract public void eval ( final DataSet arg );
+}
diff --git a/core/src/main/java/org/apache/mrql/Evaluator.java b/core/src/main/java/org/apache/mrql/Evaluator.java
index 0b19599..73aad03 100644
--- a/core/src/main/java/org/apache/mrql/Evaluator.java
+++ b/core/src/main/java/org/apache/mrql/Evaluator.java
@@ -150,6 +150,11 @@
 	out.close();
     }
 
+    /** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
+    public void streaming ( Tree plan, Environment env, DataSetFunction f ) {
+        throw new Error("MRQL Streaming is not supported in this evaluation mode yet");
+    }
+
     /** for dumped data to a file, return the MRQL type of the data */
     public Tree get_type ( String file ) {
         try {
diff --git a/core/src/main/java/org/apache/mrql/Interpreter.gen b/core/src/main/java/org/apache/mrql/Interpreter.gen
index e22afbe..5714a54 100644
--- a/core/src/main/java/org/apache/mrql/Interpreter.gen
+++ b/core/src/main/java/org/apache/mrql/Interpreter.gen
@@ -575,6 +575,14 @@
             if (!(t instanceof Tuple))
                 throw new Error("Expected a tuple in function application: "+t);
             return ((Lambda)fnc).lambda().eval(t);
+        case Stream(...):  // streaming
+            final Tree qt = query_type;
+            Evaluator.evaluator.streaming(e,env,new DataSetFunction(){
+                    public void eval ( final DataSet ds ) {
+                        System.out.println(print(new MR_dataset(ds),qt));
+                    }
+                });
+            return new Bag();
         case _:
             try {
                 if (Config.hadoop_mode)
@@ -858,6 +866,8 @@
                 System.out.println("Physical plan type: "+print_type(et));
             repeat_variables = #[];
             ne = Simplification.simplify_all(ne);
+            if (Streaming.is_streaming(ne))
+                ne = Streaming.streamify(ne);
             Tree plan = PlanGeneration.makePlan(ne);
             if (Config.bsp_mode) {
                 BSPTranslator.reset();
diff --git a/core/src/main/java/org/apache/mrql/PlanGeneration.gen b/core/src/main/java/org/apache/mrql/PlanGeneration.gen
index 72200fc..85e4528 100644
--- a/core/src/main/java/org/apache/mrql/PlanGeneration.gen
+++ b/core/src/main/java/org/apache/mrql/PlanGeneration.gen
@@ -384,6 +384,18 @@
            for ( Tree a: args )
                el = el.append(makePlan(a));
            return #<ParsedSource(`parser,`(makePlan(file)),...el)>;
+       case call(stream,binary,`file,`tp):
+               return #<BinaryStream(`file,`tp)>;
+       case call(stream,gen,`f,`len,`ulen):
+           return #<SequenceStream(`(makePlan(f)),`(makePlan(len)),
+                                   `(makePlan(ulen)))>;
+       case call(stream,`parser,`file,...args):
+           Trees el = #[];
+           for ( Tree a: args )
+               el = el.append(makePlan(a));
+           return #<ParsedStream(`parser,`(makePlan(file)),...el)>;
+       case stream(lambda(`v,`b),`u):
+           return #<Stream(lambda(`v,`(makePlan(b))),`(makePlan(u)))>;
        case type(`x): return e;
        case gen(`min,`max,`size):
            return #<Generator(`(makePlan(min)),`(makePlan(max)),`(makePlan(size)))>;
diff --git a/core/src/main/java/org/apache/mrql/Streaming.gen b/core/src/main/java/org/apache/mrql/Streaming.gen
new file mode 100644
index 0000000..973e323
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/Streaming.gen
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.*;
+
+
+/** Generate a physical plan from an algebraic expression */
+final public class Streaming extends AlgebraicOptimization {
+
+    static boolean is_streaming ( Tree e ) {
+        match e {
+        case call(stream,...): return true;
+        case `f(...al):
+            for ( Tree a: al )
+                if (is_streaming(a))
+                    return true;
+        };
+        return false;
+    }
+
+    static Trees stream_bindings ( Tree e ) {
+        match e {
+        case call(stream,...):
+            return #[bind(`(new_var()),`e)];
+        case `f(...al):
+            Trees rs = #[];
+            for ( Tree a: al )
+                rs = rs.append(stream_bindings(a));
+            return rs;
+        };
+        return #[];
+    }
+
+    static Tree streamify ( Tree e ) {
+        Trees sbs = stream_bindings(e);
+        Tree ne = e;
+        for ( Tree sb: sbs )
+            match sb {
+            case bind(`v,`u):
+                ne = #<stream(lambda(`v,`(subst(u,v,ne))),`u)>;
+                TypeInference.type_inference(ne);
+            };
+        return ne;
+    }
+}
diff --git a/core/src/main/java/org/apache/mrql/TopLevel.gen b/core/src/main/java/org/apache/mrql/TopLevel.gen
index a190648..645cc86 100644
--- a/core/src/main/java/org/apache/mrql/TopLevel.gen
+++ b/core/src/main/java/org/apache/mrql/TopLevel.gen
@@ -207,6 +207,18 @@
             }
     }
 
+    /** true, if e is a strem-processing expression */
+    public final static boolean stream_expression ( Tree e ) {
+        match e {
+        case call(stream,...): return true;
+        case `f(...as):
+            for ( Tree a: as )
+                if (stream_expression(a))
+                    return true;
+        };
+        return false;
+    }
+
     /** define a new named type (typedef) */
     private final static void typedef ( String name, Tree type ) {
         type_names.insert(name,normalize_type(type));
@@ -267,6 +279,46 @@
             long t = System.currentTimeMillis();
             if (distributed_assign(v.toString(),e) != null && !Config.quiet_execution)
                 System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs");
+        case dump(`s,`e):  // dump stream output
+            if (!stream_expression(e))
+                fail;
+            reset();
+            Tree plan = translate_expression(e);
+            if (plan == null)
+                return;
+            Evaluator.evaluator.initialize_query();
+            final Tree qt = query_type;
+            final String file = s.stringValue();
+            Evaluator.evaluator.streaming(plan,null,new DataSetFunction(){
+                    long i = 0;
+                    public void eval ( final DataSet ds ) {
+                        try {
+                            Evaluator.evaluator.dump(file+"/f"+(i++),qt,new MR_dataset(ds));
+                        } catch (Exception ex) {
+                            throw new Error("Cannot dump the streaming result to the directory "+file);
+                        };
+                    }
+                });
+        case dump_text(`s,`e):  // dump stream output
+            if (!stream_expression(e))
+                fail;
+            reset();
+            Tree plan = translate_expression(e);
+            if (plan == null)
+                return;
+            Evaluator.evaluator.initialize_query();
+            final Tree qt = query_type;
+            final String file = s.stringValue();
+            Evaluator.evaluator.streaming(plan,null,new DataSetFunction(){
+                    long i = 0;
+                    public void eval ( final DataSet ds ) {
+                        try {
+                            Evaluator.evaluator.dump_text(file+"/f"+(i++),qt,new MR_dataset(ds));
+                        } catch (Exception ex) {
+                            throw new Error("Cannot dump the streaming result to the directory "+file);
+                        }
+                    }
+                });
         case dump(`s,`e):
             long t = System.currentTimeMillis();
             dump(s.stringValue(),e);
diff --git a/core/src/main/java/org/apache/mrql/TypeInference.gen b/core/src/main/java/org/apache/mrql/TypeInference.gen
index 11dea76..4f78c06 100644
--- a/core/src/main/java/org/apache/mrql/TypeInference.gen
+++ b/core/src/main/java/org/apache/mrql/TypeInference.gen
@@ -615,6 +615,19 @@
             } catch (Exception x) {
                 type_error(e,"Unrecognized parser type: "+parser);
             }
+        case call(stream,binary,`f):
+            if (Config.stream_window == 0)
+                type_error(e,"Not in stream processing mode");
+            Tree tp = type_inference(#<call(source,binary,`f)>);
+            ((Node)e).children = ((Node)e).children.append(tp);       // destructive
+            return tp;
+        case call(stream,...r):
+            if (Config.stream_window == 0)
+                type_error(e,"Not in stream processing mode");
+            return type_inference(#<call(source,...r)>);
+        case stream(lambda(`v,`b),`u):
+            bind_pattern_type(v,type_inference2(u));
+            return type_inference2(b);
         case typed(null,`tp):
             return tp;
         case typed(`f(...),`tp):
diff --git a/flink/pom.xml b/flink/pom.xml
index 1161c77..bd3b8d7 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -54,6 +54,11 @@
       <version>${flink.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-core</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.mrql</groupId>
       <artifactId>mrql-core</artifactId>
       <version>${project.version}</version>
diff --git a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
index 428fb76..495d1ab 100644
--- a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
+++ b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
@@ -37,12 +37,14 @@
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.api.java.functions.*;
 import org.apache.flink.api.java.operators.*;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 
 /** Evaluates physical plans in Apache Flink mode */
-final public class FlinkEvaluator extends Evaluator implements Serializable {
+public class FlinkEvaluator extends Evaluator implements Serializable {
     final static URL flink_jar = FlinkEvaluator.class.getProtectionDomain().getCodeSource().getLocation();
     public static ExecutionEnvironment flink_env;
+    public static StreamExecutionEnvironment stream_env;
     // an HDFS tmp file used to hold the data source directory information in distributed mode
     static String data_source_dir_name;
     static String master_host = "localhost";
@@ -91,7 +93,12 @@
                                         Config.nodes,flink_jar.toURI().getPath(),Plan.conf.get("mrql.jar.path"));
                 else flink_env = ExecutionEnvironment.createRemoteEnvironment(master_host,master_port,
                                         Config.nodes,flink_jar.toURI().getPath());
-            } else flink_env.setDefaultLocalParallelism(Config.nodes);
+            } else {
+                flink_env.setDefaultLocalParallelism(Config.nodes);
+                if (Config.stream_window > 0)
+                    stream_env = StreamExecutionEnvironment.createLocalEnvironment(Config.nodes)
+                                          .setBufferTimeout(Config.stream_window);
+            }
         } catch (Exception ex) {
             throw new Error("Cannot initialize the Flink evaluator: "+ex);
         }
@@ -116,6 +123,11 @@
         return FlinkGeneratorInputFormat.class;
     }
 
+    /** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
+    final public void streaming ( Tree plan, Environment env, DataSetFunction f ) {
+        FlinkStreaming.evaluate(plan,env,f);
+    }
+
     /** returns the absolute path relative to the directory that contains the MRQL executable */
     public static String absolute_path ( String path) {
         try {
diff --git a/flink/src/main/java/org/apache/mrql/FlinkStreaming.gen b/flink/src/main/java/org/apache/mrql/FlinkStreaming.gen
new file mode 100644
index 0000000..f4303e5
--- /dev/null
+++ b/flink/src/main/java/org/apache/mrql/FlinkStreaming.gen
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.io.IOException;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.util.Collector;
+import org.apache.flink.streaming.api.datastream.*;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+
+
+/** Evaluates physical plans in Apache Flink stream mode */
+public class FlinkStreaming extends FlinkEvaluator {
+    private static ArrayList<DataStream<FData>> streams;
+    private static ArrayList<String> stream_names;
+
+    public static final class MRDataFileSourceFunction implements SourceFunction<FData> {
+	private static final long serialVersionUID = 1L;
+        final private FData container = new FData();
+        private HashMap<String,Long> file_modification_times;
+
+	final private String directory;
+        final private long time_window;
+        final private FileInputFormat<FData> input_format;
+
+	public MRDataFileSourceFunction ( String directory, long time_window,
+                                          FileInputFormat<FData> input_format ) {
+            this.directory = directory;
+            this.time_window = time_window;
+            this.input_format = input_format;
+	}
+
+        /** return the files within the directory that have been created within the last time window */
+        private ArrayList<String> new_files () {
+            try {
+                long ct = System.currentTimeMillis();
+                Path dpath = new Path(directory);
+                final FileSystem fs = dpath.getFileSystem(Plan.conf);
+                final FileStatus[] ds
+                    = fs.listStatus(dpath,
+                                    new PathFilter () {
+                                        public boolean accept ( Path path ) {
+                                            return !path.getName().startsWith("_")
+                                                && !path.getName().endsWith(".type");
+                                        }
+                                    });
+                ArrayList<String> s = new ArrayList<String>();
+                for ( FileStatus d: ds ) {
+                    String name = d.getPath().toString();
+                    if (file_modification_times.get(name) == null
+                           || d.getModificationTime() >  file_modification_times.get(name)) {
+                        file_modification_times.put(name,new Long(ct));
+                        s.add(name);
+                    }
+                };
+                return s;
+            } catch (Exception ex) {
+                throw new Error("Cannot open a new file from the directory "+directory+": "+ex);
+            }
+        }
+
+	@Override
+	public void invoke ( Collector<FData> collector ) throws IOException {
+            long tick = System.currentTimeMillis();
+            file_modification_times = new HashMap<String,Long>();
+            while (true) {
+                for ( String path: new_files() ) {
+                    input_format.setFilePath(path);
+                    FileInputSplit[] splits = input_format.createInputSplits(1);
+                    for ( FileInputSplit split: splits ) {
+                        input_format.open(split);
+                        while (!input_format.reachedEnd()) {
+                            FData fd = input_format.nextRecord(container);
+                            if (fd != null)
+                                collector.collect(fd);
+                        }
+                    };
+                    input_format.close();
+                };
+                try {
+                    long ct = System.currentTimeMillis();
+                    tick += time_window;
+                    if (tick > ct)
+                        Thread.sleep(tick-ct);
+                    else tick = ct;
+                } catch (Exception ex) {
+                    return;
+                }
+            }
+	}
+    }
+
+    private static DataStream<FData> stream_source ( Tree source, Environment env ) {
+        match source {
+        case BinaryStream(`file,_):
+            String path = absolute_path(((MR_string)evalE(file,null)).get());
+            new BinaryDataSource(path,Plan.conf);
+            return stream_env.addSource(new MRDataFileSourceFunction(path,Config.stream_window,
+                                                                     new FlinkBinaryInputFormat.FDataInputFormat()));
+        case ParsedStream(`parser,`file,...args):
+            String path = absolute_path(((MR_string)evalE(file,null)).get());
+            Class<? extends Parser> parser_class = DataSource.parserDirectory.get(parser.toString());
+            if (parser_class == null)
+                throw new Error("Unknown parser: "+parser);
+            new FlinkParsedDataSource(path,parser_class,args);
+            return stream_env.addSource(new MRDataFileSourceFunction(path,Config.stream_window,
+                                                                     new FlinkParsedInputFormat.ParsedInputFormat(path)));
+        };
+        throw new Error("Unknown stream source: "+print_query(source));
+    }
+
+    private static void stream_processing ( final Tree plan, final Environment env,
+                                            final DataSetFunction f, final DataStream<FData> stream ) {
+        final Evaluator ef = Evaluator.evaluator;
+        match plan {
+        case Stream(lambda(`v,`b),`source):
+            DataStream<FData> streamSource = stream_source(source,env);
+            streams.add(streamSource);
+            stream_names.add(v.toString());
+            stream_processing(b,env,f,streamSource);
+        case _:
+            /** Incomplete: need a function from List<DataSet> to DataSet (or void) on each Flink window
+            stream.window().reduceGroup(new GroupReduceFunction<FData,FData>() {
+                    public void reduce ( Iterable<FData> values, Collector<FData> out ) throws Exception {
+                        f.eval(ef.eval(plan,env,"-"));
+                    }
+                });
+            */
+        }
+    }
+
+    /** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
+    final public static void evaluate ( Tree plan, final Environment env, final DataSetFunction f ) {
+        try {
+            if (true)  // needs more work
+                throw new Error("MRQL Streaming is not supported in this evaluation mode yet");
+            streams = new ArrayList<DataStream<FData>>();
+            stream_names = new ArrayList<String>();
+            stream_processing(plan,env,f,null);
+            stream_env.execute();
+        } catch (Exception ex) {
+            throw new Error("Error during Flink stream processing: "+ex);
+        }
+    }
+}
diff --git a/pom.xml b/pom.xml
index 7414bf4..f2bb3e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,9 +58,9 @@
     <hadoop.version>1.2.1</hadoop.version>
     <yarn.version>2.2.0</yarn.version>
     <hama.version>0.6.4</hama.version>
-    <spark.version>1.1.0</spark.version>
+    <spark.version>1.2.0</spark.version>
     <scala.version>2.10</scala.version>
-    <flink.version>0.7.0-incubating</flink.version>
+    <flink.version>0.8.0</flink.version>
     <skipTests>true</skipTests>
   </properties>
 
diff --git a/queries/streaming.mrql b/queries/streaming.mrql
new file mode 100644
index 0000000..a453ab6
--- /dev/null
+++ b/queries/streaming.mrql
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+select (k,avg(p.Y))
+from p in stream(binary,"tmp/points.bin")
+group by k: p.X;
diff --git a/spark/pom.xml b/spark/pom.xml
index 24bec74..a12af11 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -49,6 +49,11 @@
       <version>${spark.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.mrql</groupId>
       <artifactId>mrql-core</artifactId>
       <version>${project.version}</version>
diff --git a/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen b/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
index df67385..0e824b6 100644
--- a/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
+++ b/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
@@ -21,7 +21,6 @@
 import java_cup.runtime.Scanner;
 import java.util.List;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.io.*;
@@ -42,16 +41,20 @@
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.Function3;
 import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.Duration;
 
 
 /** Evaluates physical plans in Apache Spark mode */
-final public class SparkEvaluator extends Evaluator implements Serializable {
+public class SparkEvaluator extends Evaluator implements Serializable {
     public static JavaSparkContext spark_context;
+    public static JavaStreamingContext stream_context;
     final static Bag empty_bag = new Bag();
     // an HDFS tmp file used to hold the data source directory information in distributed mode
     final static String data_source_dir_name = "tmp/"+System.getenv("USER")+"_data_source_dir.txt";
@@ -76,6 +79,8 @@
             Plan.conf = spark_context.hadoopConfiguration();
             FileSystem.setDefaultUri(Plan.conf,System.getenv("FS_DEFAULT_NAME"));
         };
+        if (Config.stream_window > 0 && (Config.local_mode || Config.hadoop_mode))
+            stream_context = new JavaStreamingContext(spark_context,new Duration(Config.stream_window));
         if (!Config.info) {
             for ( Enumeration en = LogManager.getCurrentLoggers(); en.hasMoreElements(); )
                 ((Logger)en.nextElement()).setLevel(Level.WARN);
@@ -118,6 +123,11 @@
         return SparkGeneratorInputFormat.class;
     }
 
+    /** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
+    final public void streaming ( Tree plan, Environment env, DataSetFunction f ) {
+        SparkStreaming.evaluate(plan,env,f);
+    }
+
     /** used by the master to send parsing details (eg, record types) to workers */
     public static void dump_source_dir () throws IOException {
         if (Config.local_mode)
@@ -446,7 +456,7 @@
             .flatMap(cmap_fnc(fnc,env));
     }
 
-    private static JavaRDD<MRData> containerData ( JavaPairRDD<MRContainer,MRContainer> rd ) {
+    static JavaRDD<MRData> containerData ( JavaPairRDD<MRContainer,MRContainer> rd ) {
         final Environment master_env = global_env;
         return rd.map(new Function<Tuple2<MRContainer,MRContainer>,MRData>() {
                 public MRData call ( Tuple2<MRContainer,MRContainer> value ) {
diff --git a/spark/src/main/java/org/apache/mrql/SparkFileInputStream.java b/spark/src/main/java/org/apache/mrql/SparkFileInputStream.java
new file mode 100644
index 0000000..f1e1535
--- /dev/null
+++ b/spark/src/main/java/org/apache/mrql/SparkFileInputStream.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.util.HashMap;
+import java.util.ArrayList;
+import scala.Option;
+import scala.Some;
+import scala.None;
+import scala.collection.immutable.List;
+import scala.collection.immutable.Nil$;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.streaming.*;
+import org.apache.spark.streaming.api.java.*;
+import org.apache.spark.streaming.dstream.*;
+
+
+/** A Spark InputDStream for MRQL InputFormats */
+public class SparkFileInputStream extends JavaInputDStream<MRData> {
+    private final static scala.reflect.ClassTag<MRData> classtag = scala.reflect.ClassTag$.MODULE$.apply(MRData.class);
+
+    public static final class MRDataInputStream extends InputDStream<MRData> {
+        private final String directory;
+        private final boolean is_binary;
+        private final JavaStreamingContext stream_context;
+        private final HashMap<String,Long> file_modification_times;
+
+        MRDataInputStream ( JavaStreamingContext stream_context, String directory, boolean is_binary ) {
+            super(stream_context.ssc(),classtag);
+            this.directory = directory;
+            this.is_binary = is_binary;
+            this.stream_context = stream_context;
+            file_modification_times = new HashMap<String,Long>();
+        }
+
+        @Override
+        public void start () {}
+
+        @Override
+        public void stop () {}
+
+        @Override
+        public Duration slideDuration () {
+            return new Duration(Config.stream_window);
+        }
+
+        @Override
+        public List dependencies () {
+            return Nil$.MODULE$;
+        }
+
+        /** return the files within the directory that have been created within the last time window */
+        private ArrayList<String> new_files () {
+            try {
+                long ct = System.currentTimeMillis();
+                Path dpath = new Path(directory);
+                final FileSystem fs = dpath.getFileSystem(Plan.conf);
+                final FileStatus[] ds
+                    = fs.listStatus(dpath,
+                                    new PathFilter () {
+                                        public boolean accept ( Path path ) {
+                                            return !path.getName().startsWith("_")
+                                                && !path.getName().endsWith(".type");
+                                        }
+                                    });
+                ArrayList<String> s = new ArrayList<String>();
+                for ( FileStatus d: ds ) {
+                    String name = d.getPath().toString();
+                    if (file_modification_times.get(name) == null
+                           || d.getModificationTime() >  file_modification_times.get(name)) {
+                        file_modification_times.put(name,new Long(ct));
+                        s.add(name);
+                    }
+                };
+                return s;
+            } catch (Exception ex) {
+                throw new Error("Cannot open a new file from the directory "+directory+": "+ex);
+            }
+        }
+
+        private JavaRDD<MRData> hadoopFile ( String file ) {
+            return SparkEvaluator.containerData(
+                          (is_binary)
+                          ? SparkEvaluator.spark_context.sequenceFile(file,
+                                                                      MRContainer.class,MRContainer.class,
+                                                                      Config.nodes)
+                          : SparkEvaluator.spark_context.hadoopFile(file,SparkParsedInputFormat.class,
+                                                                    MRContainer.class,MRContainer.class,
+                                                                    Config.nodes));
+        }
+
+        @Override
+        public Option<RDD<MRData>> compute ( Time validTime ) {
+            JavaRDD<MRData> rdd = null;
+            for ( String file: new_files() )
+                if (rdd == null)
+                    rdd = hadoopFile(file);
+                else rdd = rdd.union(hadoopFile(file));
+            if (rdd == null)
+                return scala.None$.MODULE$.apply(null);
+            return new Some<RDD<MRData>>(rdd.rdd());
+        }
+    }
+
+    SparkFileInputStream ( JavaStreamingContext stream_context, String directory, boolean is_binary ) {
+        super(new MRDataInputStream(stream_context,directory,is_binary),classtag);
+    }
+}
diff --git a/spark/src/main/java/org/apache/mrql/SparkStreaming.gen b/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
new file mode 100644
index 0000000..f514a48
--- /dev/null
+++ b/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.io.*;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.streaming.*;
+import org.apache.spark.streaming.api.java.*;
+import org.apache.spark.streaming.dstream.*;
+import scala.collection.Seq;
+import scala.collection.JavaConversions;
+import scala.runtime.AbstractFunction2;
+
+
+/** Evaluates physical plans in Apache Spark stream mode */
+public class SparkStreaming extends SparkEvaluator {
+    private final static scala.reflect.ClassTag<MRData> classtag = scala.reflect.ClassTag$.MODULE$.apply(MRData.class);
+    private static ArrayList<JavaInputDStream<MRData>> streams;
+    private static ArrayList<String> stream_names;
+
+    private static JavaInputDStream<MRData> stream_source ( Tree source, Environment env ) {
+        match source {
+        case BinaryStream(`file,_):
+            String path = ((MR_string)evalE(file,env)).get();
+            new BinaryDataSource(path,Plan.conf);
+            return new SparkFileInputStream(stream_context,path,true);
+        case ParsedStream(`parser,`file,...args):
+            String path = ((MR_string)evalE(file,env)).get();
+            Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
+            if (p == null)
+                throw new Error("Unknown parser: "+parser);
+            new ParsedDataSource(path,p,args,Plan.conf);
+	    try {
+		dump_source_dir();
+	    } catch (IOException ex) {
+		throw new Error("Cannot dump source directory");
+	    };
+            return new SparkFileInputStream(stream_context,path,false);
+        };
+        throw new Error("Unknown stream source: "+print_query(source));
+    }
+
+    private static void stream_processing ( final Tree plan, final Environment env, final DataSetFunction f ) {
+        final Evaluator ef = Evaluator.evaluator;
+        match plan {
+        case Stream(lambda(`v,`b),`source):
+            streams.add(stream_source(source,env));
+            stream_names.add(v.toString());
+            stream_processing(b,env,f);
+        case _:
+            ArrayList<DStream<?>> rdds = new ArrayList<DStream<?>>();
+            for ( JavaDStream<MRData> jd: streams )
+                rdds.add(jd.dstream());
+            final ArrayList<String> vars = stream_names;
+            final AbstractFunction2<Seq<RDD<?>>,Time,RDD<MRData>> fnc =
+                new AbstractFunction2<Seq<RDD<?>>,Time,RDD<MRData>>() {
+                public RDD<MRData> apply ( Seq<RDD<?>> rdds, Time tm ) {
+                    Environment new_env = env;
+                    int i = 0;
+                    for ( RDD<?> rdd: JavaConversions.seqAsJavaList(rdds) ) {
+                        try {
+                            if (rdd.count() == 0)
+                                rdd = SparkEvaluator.spark_context.emptyRDD().rdd();
+                        } catch (Exception ex) {
+                            return SparkEvaluator.spark_context.sc().emptyRDD(classtag);
+                        };
+                        new_env = new Environment(vars.get(i++),new MR_rdd(new JavaRDD(rdd,classtag)),new_env);
+                    };
+                    f.eval(ef.eval(plan,new_env,"-"));
+                    return SparkEvaluator.spark_context.sc().emptyRDD(classtag);
+                }
+            };
+            new TransformedDStream<MRData>(JavaConversions.asScalaBuffer(rdds).toSeq(),fnc,classtag).register();
+        }
+    }
+
+    /** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
+    final public static void evaluate ( Tree plan, Environment env, DataSetFunction f ) {
+        streams = new ArrayList<JavaInputDStream<MRData>>();
+        stream_names = new ArrayList<String>();
+        stream_processing(plan,env,f);
+        stream_context.start();
+        stream_context.awaitTermination();
+    }
+}