[MRQL-90] Add support for Storm streaming
diff --git a/bin/mrql.storm b/bin/mrql.storm
new file mode 100755
index 0000000..ce2b526
--- /dev/null
+++ b/bin/mrql.storm
@@ -0,0 +1,77 @@
+#!/bin/bash
+#--------------------------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#--------------------------------------------------------------------------------
+#
+# run Apache MRQL in Storm mode using Apache Storm
+#
+#--------------------------------------------------------------------------------
+
+MRQL_HOME="$(cd `dirname $0`/..; pwd -P)"
+
+. "$MRQL_HOME/conf/mrql-env.sh"
+
+GEN_JAR=`ls "$MRQL_HOME"/lib/mrql-gen-*.jar`
+CORE_JAR=`ls "$MRQL_HOME"/lib/mrql-core-*.jar`
+MRQL_JAR=`ls "$MRQL_HOME"/lib/mrql-storm-*.jar`
+FULL_JAR="/tmp/${USER}_mrql_storm.jar"
+CLASS_DIR="/tmp/${USER}_mrql_classes"
+
+if [[ -z ${STORM_JARS} ]]; then
+   echo "*** Cannot find the Storm jar file. Need to edit mrql-env.sh"; exit -1
+fi
+HADOOP_JARS=${HADOOP_HOME}/share/hadoop/mapreduce/hadoop-mapreduce-client-core-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/hadoop-common-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/hdfs/hadoop-hdfs-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/lib/*
+LAMDA_JARS="/tmp/mrql_jar_${USER}/*"
+
+export JAVA_HOME FS_DEFAULT_NAME BSP_MASTER_ADDRESS STORM_ZOOKEEPER_QUORUM BSP_SPLIT_INPUT
+
+if [[ ($MRQL_JAR -nt $FULL_JAR) ]]; then
+   rm -rf $CLASS_DIR
+   mkdir -p $CLASS_DIR
+   pushd $CLASS_DIR > /dev/null
+   $JAVA_HOME/bin/jar xf $CUP_JAR
+   $JAVA_HOME/bin/jar xf $JLINE_JAR
+   $JAVA_HOME/bin/jar xf $GEN_JAR
+   $JAVA_HOME/bin/jar xf $CORE_JAR
+   $JAVA_HOME/bin/jar xf $MRQL_JAR
+   #$JAVA_HOME/bin/jar xf $HADOOP_JARS
+   #$JAVA_HOME/bin/jar xf $LAMDA_JARS
+   cd ..
+   $JAVA_HOME/bin/jar cf $FULL_JAR -C $CLASS_DIR .
+   popd > /dev/null
+fi
+
+if [ "$1" == "-local" ]; then
+   export STORM_CLASSPATH=$FULL_JAR  # FOR LOCAL CLUSTER IN STORM
+   export storm.jar=$FULL_JAR
+   $STORM_HOME/bin/storm jar --config $STORM_CONFIG jar $FULL_JAR org.apache.mrql.Main -storm $*
+else if [ "$1" == "-dist" ]; then
+   #export STORM_CLASSPATH=$FULL_JAR  # FOR LOCAL CLUSTER IN STORM
+   #export storm.jar=$FULL_JAR
+   #echo $storm.jar
+   $STORM_HOME/bin/storm jar --config $STORM_CONFIG jar $FULL_JAR org.apache.mrql.Main -storm $*
+else
+   #storm.jar=$FULL_JAR
+   #echo $storm.jar
+   #$STORM_HOME/bin/storm jar $FULL_JAR org.apache.mrql.Main -storm $*
+   #$JAVA_HOME/bin/java -classpath "$FULL_JAR:$STORM_JARS:$HADOOP_JARS" -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=1044 org.apache.mrql.Main -storm $*
+   $JAVA_HOME/bin/java -classpath "$FULL_JAR:$STORM_JARS:$HADOOP_JARS:$LAMDA_JARS" org.apache.mrql.Main -storm $*
+fi
+fi
diff --git a/conf/mrql-env.sh b/conf/mrql-env.sh
index aefd589..3da0168 100644
--- a/conf/mrql-env.sh
+++ b/conf/mrql-env.sh
@@ -64,7 +64,6 @@
 # The HDFS namenode URI (eg, hdfs://localhost:9000/). If empty, it is the one defined in core-site.xml
 FS_DEFAULT_NAME=
 
-
 # Optional: Hama configuration. Supports versions 0.6.2, 0.6.3, 0.6.4, and 0.7.0, and 0.7.1
 HAMA_VERSION=0.7.1
 # The Hama installation directory
@@ -83,7 +82,7 @@
 # Spark versions 0.8.1, 0.9.0, and 0.9.1 are supported by MRQL 0.9.0 only.
 # You may use the Spark prebuilts bin-hadoop1 or bin-hadoop2 (Yarn)
 # Tested in local, standalone deploy, and Yarn modes
-SPARK_HOME=${HOME}/spark-1.6.2-bin-hadoop2.6
+SPARK_HOME=${HOME}/spark
 # URI of the Spark master node:
 #   to run Spark on Standalone Mode, set it to spark://`hostname`:7077
 #   to run Spark on a YARN cluster, set it to "yarn-client"
@@ -100,15 +99,23 @@
 
 
 # Optional: Flink configuration. Supports version 1.0.2 and 1.0.3
-FLINK_VERSION=1.0.2
+FLINK_VERSION=1.1.2
 # Flink installation directory
-FLINK_HOME=${HOME}/flink-${FLINK_VERSION}
+FLINK_HOME=${HOME}/ap/flink-${FLINK_VERSION}
 # number of slots per TaskManager (typically, the number of cores per node)
 FLINK_SLOTS=4
 # memory per TaskManager
 FLINK_TASK_MANAGER_MEMORY=2048
 
 
+
+# STORM CONFIGURATIONS
+STORM_VERSION=1.0.2
+
+#Strom installation directory
+STORM_HOME=${HOME}/apache-storm-${STORM_VERSION}
+
+
 # Claspaths
 
 HAMA_JAR=${HAMA_HOME}/hama-core-${HAMA_VERSION}.jar
@@ -117,6 +124,14 @@
     FLINK_JARS=${FLINK_JARS}:$I
 done
 
+STORM_JARS=.
+for I in ${STORM_HOME}/lib/*.jar; do
+    
+    if [[ $I != *"log4j-over-slf4j-"* ]]
+    then
+      STORM_JARS=${STORM_JARS}:$I
+    fi
+done
 
 # YARN-enabled assembly jar
 if [[ -d ${SPARK_HOME}/assembly/target ]]; then
diff --git a/core/src/main/java/org/apache/mrql/Config.java b/core/src/main/java/org/apache/mrql/Config.java
index 19069f3..ac9b2d8 100644
--- a/core/src/main/java/org/apache/mrql/Config.java
+++ b/core/src/main/java/org/apache/mrql/Config.java
@@ -41,6 +41,8 @@
     public static boolean spark_mode = false;
     // true, for Flink mode
     public static boolean flink_mode = false;
+    // true, for Storm mode
+    public static boolean storm_mode = false;
     // if true, it process the input interactively
     public static boolean interactive = true;
     // compile the MR functional arguments to Java bytecode at run-time
@@ -193,7 +195,11 @@
             } else if (args[i].equals("-flink")) {
                 flink_mode = true;
                 i++;
-            } else if (args[i].equals("-bsp_tasks")) {
+            }else if (args[i].equals("-storm")) {
+                storm_mode = true;
+                i++;
+            }  
+            else if (args[i].equals("-bsp_tasks")) {
                 if (++i >= args.length && Integer.parseInt(args[i]) < 1)
                     throw new Error("Expected max number of bsp tasks > 1");
                 nodes = Integer.parseInt(args[i]);
diff --git a/core/src/main/java/org/apache/mrql/Main.java b/core/src/main/java/org/apache/mrql/Main.java
index cafc62f..db8a66f 100644
--- a/core/src/main/java/org/apache/mrql/Main.java
+++ b/core/src/main/java/org/apache/mrql/Main.java
@@ -63,11 +63,14 @@
             Evaluator.evaluator = (Evaluator)Class.forName("org.apache.mrql.SparkEvaluator").newInstance();
         else if (Config.flink_mode)
             Evaluator.evaluator = (Evaluator)Class.forName("org.apache.mrql.FlinkEvaluator").newInstance();
+        else if (Config.storm_mode)
+            Evaluator.evaluator = (Evaluator)Class.forName("org.apache.mrql.StormEvaluator").newInstance();
         else // when Config.map_reduce_mode but also the default
             Evaluator.evaluator = (Evaluator)Class.forName("org.apache.mrql.MapReduceEvaluator").newInstance();
     }
-
+    
     public static void initialize () throws Exception {
+        
         if (Evaluator.evaluator == null) {
             if (Plan.conf == null)
                 Plan.conf = new Configuration();
@@ -86,8 +89,9 @@
             Config.bsp_mode |= arg.equals("-bsp");
             Config.spark_mode |= arg.equals("-spark");
             Config.flink_mode |= arg.equals("-flink");
+            Config.storm_mode |= arg.equals("-storm");
         };
-        Config.map_reduce_mode = !Config.bsp_mode && !Config.spark_mode && !Config.flink_mode;
+        Config.map_reduce_mode = !Config.bsp_mode && !Config.spark_mode && !Config.flink_mode  && !Config.storm_mode;
         initialize_evaluator();
 	if (Config.hadoop_mode) {
 	    conf = Evaluator.evaluator.new_configuration();
@@ -117,6 +121,8 @@
                 System.out.println("Spark mode using "+Config.nodes+" tasks)");
             else if (Config.flink_mode)
                 System.out.println("Flink mode using "+Config.nodes+" tasks)");
+            else if (Config.storm_mode)
+                System.out.println("Storm mode using "+Config.nodes+" tasks)");
             else if (Config.bsp_mode)
                 System.out.println("Hama BSP mode over "+Config.nodes+" BSP tasks)");
             else if (Config.nodes > 0)
diff --git a/core/src/main/java/org/apache/mrql/Plan.java b/core/src/main/java/org/apache/mrql/Plan.java
index 94ee9b9..3547fe7 100644
--- a/core/src/main/java/org/apache/mrql/Plan.java
+++ b/core/src/main/java/org/apache/mrql/Plan.java
@@ -93,6 +93,8 @@
                 conf.set("mrql.jar.path",Compiler.jar_path);
             else if (Config.spark_mode)
                 conf.set("mrql.jar.path",local_path.toString());
+            else if (Config.storm_mode)
+                conf.set("mrql.jar.path",Compiler.jar_path);
             else {
                 // distribute the jar file with the compiled arguments to all clients
                 Path hdfs_path = new Path("mrql-tmp/class"+random_generator.nextInt(1000000)+".jar");
diff --git a/core/src/test/java/org/apache/mrql/QueryTest.java b/core/src/test/java/org/apache/mrql/QueryTest.java
index 0602a7b..b34a047 100644
--- a/core/src/test/java/org/apache/mrql/QueryTest.java
+++ b/core/src/test/java/org/apache/mrql/QueryTest.java
@@ -34,8 +34,8 @@
 public abstract class QueryTest extends TestCase {
 	private static String TEST_QUERY_DIR = "../tests/queries";
 	private static String TEST_RESULT_DIR = "../tests/results";
-	private File queryDir;
-	private File resultDir;
+	protected File queryDir;
+	protected File resultDir;
 	private static Evaluator evaluator;
 	
 	@BeforeClass
@@ -154,7 +154,7 @@
 		assertEquals(0, queryAndCompare(new File(queryDir, "xml_2.mrql"), resultDir));
 	}
 
-    private int queryAndCompare ( File query, File resultDir ) throws Exception {
+    protected int queryAndCompare ( File query, File resultDir ) throws Exception {
         System.err.println("Testing "+query);
         Translator.global_reset();
         String qname = query.getName();
diff --git a/pom.xml b/pom.xml
index 403f57f..9ac5989 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,6 +50,7 @@
     <spark.version>1.6.2</spark.version>
     <scala.version>2.10</scala.version>
     <flink.version>1.0.3</flink.version>
+    <storm.version>1.0.2</storm.version>
     <skipTests>true</skipTests>
   </properties>
 
@@ -60,6 +61,7 @@
     <module>bsp</module>
     <module>spark</module>
     <module>flink</module>
+    <module>storm</module>
     <module>dist</module>
   </modules>
 
@@ -154,6 +156,7 @@
             <exclude>.classpath/**</exclude>
             <exclude>.project/**</exclude>
             <exclude>tests/data/**</exclude>
+	    <exclude>queries/data/**</exclude>
             <exclude>tests/results/**</exclude>
             <exclude>tests/error_log.txt</exclude>
             <exclude>**/dependency-reduced-pom.xml</exclude>
diff --git a/queries/data/customers.tbl b/queries/data/customers.tbl
new file mode 100644
index 0000000..89e6a3e
--- /dev/null
+++ b/queries/data/customers.tbl
@@ -0,0 +1,7 @@
+1|Ramesh|32|Ahmedabad|2000.00|
+2|Ramesh|25|Delhi|1500.00|
+3|kaushik|23|Kota|2000.00|
+4|kaushik|25|Mumbai|6500.00|
+5|Hardik|27|Bhopal|8500.00|
+6|Komal|22|MP|4500.00|
+7|Muffy|24|Indore|10000.00|
diff --git a/queries/groupby_stream_storm.mrql b/queries/groupby_stream_storm.mrql
new file mode 100644
index 0000000..d96cebf
--- /dev/null
+++ b/queries/groupby_stream_storm.mrql
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+C = stream(line,"queries/data/","|",type(<CUSTKEY:int,NAME:string,AGE:int,ADDRESS:string,SALARY:float>));
+
+SELECT (k,sum(c.SALARY))
+ FROM c in C
+ GROUP BY k: c.NAME;
diff --git a/storm/pom.xml b/storm/pom.xml
new file mode 100644
index 0000000..c33064a
--- /dev/null
+++ b/storm/pom.xml
@@ -0,0 +1,161 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+ -->
+ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>org.apache.mrql</groupId>
+  <artifactId>mrql-storm</artifactId>
+  <packaging>jar</packaging>
+  <name>Apache MRQL Storm mode</name>
+  <description>Apache MRQL evaluation in storm mode on Apache storm</description>
+  <url>http://mrql.incubator.apache.org/</url>
+  <inceptionYear>2016</inceptionYear>
+
+  <parent>
+    <groupId>org.apache.mrql</groupId>
+    <artifactId>mrql-parent</artifactId>
+    <version>0.9.8-incubating-SNAPSHOT</version>
+  </parent>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.mrql</groupId>
+      <artifactId>mrql-gen</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.mrql</groupId>
+      <artifactId>mrql-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-core</artifactId>
+      <version>${storm.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-hdfs</artifactId>
+      <version>${storm.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.mrql</groupId>
+      <artifactId>mrql-core</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.11</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.8</version>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals><goal>add-source</goal></goals>
+            <configuration>
+              <sources>
+                <source>${project.build.directory}/generated-sources</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>2.14.1</version>
+        <configuration>
+          <argLine>-Xmx1024m</argLine>
+          <useSystemClassLoader>true</useSystemClassLoader>
+          <skipTests>true</skipTests>
+<!--  Storm mode not ready for tests yet
+          <skipTests>${skipTests}</skipTests>
+-->
+        </configuration>
+      </plugin>
+      <plugin>
+       <groupId>org.apache.maven.plugins</groupId>
+       <artifactId>maven-shade-plugin</artifactId>
+       <version>2.1</version>
+       <executions>
+         <execution>
+           <phase>package</phase>
+           <goals>
+             <goal>shade</goal>
+           </goals>
+           <configuration>
+             <artifactSet>
+              <includes>
+               <include>org.apache.mrql:*</include>
+             </includes>
+             <excludes>
+               <exclude>org.apache.mrql:mrql-gen</exclude>
+               <exclude>org.apache.mrql:mrql-core</exclude>
+             </excludes>
+           </artifactSet>
+         </configuration>
+       </execution>
+     </executions>
+   </plugin>
+   <plugin>
+    <groupId>org.apache.maven.plugins</groupId>
+    <artifactId>maven-jar-plugin</artifactId>
+    <version>2.4</version>
+    <configuration>
+     <finalName>mrql-storm-${project.version}</finalName>
+     <outputDirectory>${project.parent.basedir}/lib</outputDirectory>
+   </configuration>
+ </plugin>
+ <plugin>
+   <artifactId>maven-antrun-plugin</artifactId>
+   <version>1.7</version>
+   <executions>
+     <execution>
+       <phase>generate-sources</phase>
+       <goals>
+         <goal>run</goal>
+       </goals>
+       <configuration>
+        <target>
+          <mkdir dir="${project.build.directory}/generated-sources/org/apache/mrql" />
+          <property name="compile_classpath" refid="maven.compile.classpath" />
+          <fileset id="mr.gen.path" dir="src/main/java/org/apache/mrql" includes="*.gen" />
+          <pathconvert pathsep=" " property="mr.gen.files" refid="mr.gen.path" />
+          <java classname="org.apache.mrql.gen.Main" classpath="../lib/mrql-gen-${project.version}.jar:${compile_classpath}">
+            <arg line="${mr.gen.files}" />
+            <arg line="-o" />
+            <arg file="${project.build.directory}/generated-sources/org/apache/mrql" />
+          </java>
+        </target>
+      </configuration>
+    </execution>
+  </executions>
+</plugin>
+</plugins>
+</build>
+</project>
diff --git a/storm/src/main/java/org/apache/mrql/HDFSFileInputStream.java b/storm/src/main/java/org/apache/mrql/HDFSFileInputStream.java
new file mode 100644
index 0000000..aa94b14
--- /dev/null
+++ b/storm/src/main/java/org/apache/mrql/HDFSFileInputStream.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class HDFSFileInputStream extends BaseRichSpout{
+    private final String directory;
+    private final boolean is_binary;
+    private HashMap<String,Long> file_modification_times;
+    private StormMRQLFileInputFormat input_format;
+    final private SData container = new SData();
+    private SpoutOutputCollector _collector;
+    
+    public HDFSFileInputStream(String directory, boolean is_binary,StormMRQLFileInputFormat input_format) {
+        this.directory = directory;
+        this.is_binary = is_binary;
+        this.input_format = input_format;
+    }
+    
+    private ArrayList<String> new_files () {
+        try {
+            long ct = System.currentTimeMillis();
+            Path dpath = new Path(directory);
+            final FileSystem fs = dpath.getFileSystem(Plan.conf);
+            final FileStatus[] ds
+            = fs.listStatus(dpath,
+                new PathFilter () {
+                    public boolean accept ( Path path ) {
+                        return !path.getName().startsWith("_")
+                        && !path.getName().endsWith(".type");
+                    }
+                });
+            ArrayList<String> s = new ArrayList<String>();
+            for ( FileStatus d: ds ) {
+                String name = d.getPath().toString();
+                if (file_modification_times.get(name) == null
+                 || d.getModificationTime() >  file_modification_times.get(name)) {
+                    file_modification_times.put(name,new Long(ct));
+                s.add(name);
+            }
+        };
+        return s;
+    } catch (Exception ex) {
+        throw new Error("Cannot open a new file from the directory "+directory+": "+ex);
+    }
+}
+
+@Override
+public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("inputdata"));
+}
+
+@Override
+public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+    this.file_modification_times =  new HashMap<String,Long>();
+    _collector = collector;
+}
+
+@Override
+public void nextTuple() {
+    try{
+        for ( String path: new_files() ) {
+            Path filePath = new Path(path);
+            Bag value = input_format.materialize(filePath);
+            for(MRData val : value){
+                _collector.emit(new Values(val));
+            }   
+        }
+    }
+    catch(Exception e){
+        e.printStackTrace();
+    }
+}
+}
diff --git a/storm/src/main/java/org/apache/mrql/MR_stream.java b/storm/src/main/java/org/apache/mrql/MR_stream.java
new file mode 100644
index 0000000..ef222ac
--- /dev/null
+++ b/storm/src/main/java/org/apache/mrql/MR_stream.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import org.apache.storm.trident.Stream;
+
+public class MR_stream extends MRData{
+    Stream stream;
+
+    public MR_stream() {
+    }
+
+    public MR_stream(Stream stream) {
+        this.stream = stream;
+    }
+    
+    public Stream stream(){
+        return this.stream;
+    }
+    
+    @Override
+    public void materializeAll() {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public void write(DataOutput d) throws IOException {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public void readFields(DataInput di) throws IOException {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public int compareTo(MRData o) {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public void writeData(ObjectOutputStream arg0) throws IOException {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    
+}
diff --git a/storm/src/main/java/org/apache/mrql/SData.java b/storm/src/main/java/org/apache/mrql/SData.java
new file mode 100644
index 0000000..7d03d06
--- /dev/null
+++ b/storm/src/main/java/org/apache/mrql/SData.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.Serializable;
+
+
+public class SData implements Comparable<SData>,Serializable{
+   MRData data;
+
+   public SData(MRData data) {
+    this.data = data;
+}
+
+public SData() {
+}
+
+public MRData data () { return data; }
+
+
+@Override
+public int compareTo ( SData x ) { return data.compareTo(x.data); }
+
+@Override
+public boolean equals ( Object x ) {
+    return x instanceof SData && data.equals(((SData)x).data);
+}
+
+@Override
+public int hashCode () { return data.hashCode(); }
+
+@Override
+public String toString () { return data.toString(); }
+}
diff --git a/storm/src/main/java/org/apache/mrql/StormBinaryInputFormat.java b/storm/src/main/java/org/apache/mrql/StormBinaryInputFormat.java
new file mode 100644
index 0000000..5a24132
--- /dev/null
+++ b/storm/src/main/java/org/apache/mrql/StormBinaryInputFormat.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.IOException;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
+
+public class StormBinaryInputFormat extends StormMRQLFileInputFormat{
+    public static class BinaryInputRecordReader extends SequenceFileRecordReader<MRContainer,MRContainer> {
+        final MRContainer result = new MRContainer();
+
+        public BinaryInputRecordReader ( FileSplit split,
+           JobConf job ) throws IOException {
+            super(job,split);
+        }
+
+        @Override
+        public synchronized boolean next ( MRContainer key, MRContainer value ) throws IOException {
+            boolean b = super.next(key,result);
+            value.set(result.data());
+            return b;
+        }
+    }
+
+    @Override
+    public RecordReader<MRContainer,MRContainer>
+    getRecordReader ( InputSplit split,
+        JobConf job,
+        Reporter reporter ) throws IOException {
+        String path = ((FileSplit)split).getPath().toString();
+        BinaryDataSource ds = (BinaryDataSource)DataSource.get(path,job);
+        return new BinaryInputRecordReader((FileSplit)split,job);
+    }
+    
+}
diff --git a/storm/src/main/java/org/apache/mrql/StormEvaluator.gen b/storm/src/main/java/org/apache/mrql/StormEvaluator.gen
new file mode 100644
index 0000000..edcbb44
--- /dev/null
+++ b/storm/src/main/java/org/apache/mrql/StormEvaluator.gen
@@ -0,0 +1,312 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.mrql;
+  import java.io.BufferedReader;
+ import java.io.IOException;
+ import java.io.InputStreamReader;
+ import java.io.PrintStream;
+ import java.io.Serializable;
+import java.lang.reflect.Method;
+ import java.net.URL;
+ import java.net.URLClassLoader;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.mrql.gen.Node;
+ import org.apache.mrql.gen.Tree;
+ import org.apache.storm.trident.Stream;
+ import org.apache.storm.trident.TridentTopology;
+ import org.apache.storm.trident.operation.FlatMapFunction;
+ import org.apache.storm.trident.tuple.TridentTuple;
+ import org.apache.storm.tuple.Fields;
+ import org.apache.storm.tuple.Values;
+ import org.apache.storm.trident.fluent.GroupedStream;
+ import org.apache.storm.trident.operation.BaseAggregator;
+ import org.apache.storm.trident.operation.BaseFunction;
+ import org.apache.storm.trident.operation.MapFunction;
+ import org.apache.storm.trident.operation.TridentCollector;
+
+ public class StormEvaluator extends Evaluator implements Serializable {
+
+    public static TridentTopology topology;
+    static Environment global_streams = null;
+    final static String data_source_dir_name = "tmp/"+System.getenv("USER")+"_data_source_dir.txt";
+    private static Function f;
+
+    @Override
+    public void init(Configuration conf) {
+        global_streams = null;
+        if (Config.stream_window > 0 && (Config.local_mode || Config.hadoop_mode)) {
+            if (Config.trace_execution) {
+                System.out.println("Creating a new storm topology");
+            };
+        }
+        topology = new TridentTopology();
+        Plan.conf = new Configuration();
+        if (Config.hadoop_mode && Config.local_mode) {
+            FileSystem.setDefaultUri(Plan.conf,"file:///");
+            } else if (Config.hadoop_mode) {
+                if (!System.getenv("FS_DEFAULT_NAME").equals(""))
+                FileSystem.setDefaultUri(Plan.conf,System.getenv("FS_DEFAULT_NAME"));
+            };
+        }
+
+    @Override
+    public void initialize_query() {
+        Plan.distribute_compiled_arguments(Plan.conf);
+        String jarPath =  Plan.conf.get("mrql.jar.path");
+        try{
+            addURL(new URL("file://"+jarPath));
+        }
+        catch(Exception e){
+            e.printStackTrace();
+        }
+    }
+
+    public void addURL(URL url) throws Exception {
+        URLClassLoader classLoader
+               = (URLClassLoader) ClassLoader.getSystemClassLoader();
+        Class clazz= URLClassLoader.class;
+
+        // Use reflection
+        Method method= clazz.getDeclaredMethod("addURL", new Class[] { URL.class });
+        method.setAccessible(true);
+        method.invoke(classLoader, new Object[] { url });
+    }
+    
+    @Override
+    public void shutdown(Configuration conf) {
+
+    }
+
+    @Override
+    public Configuration new_configuration() {
+        return new Configuration();
+    }
+
+    @Override
+    public Class<? extends MRQLFileInputFormat> parsedInputFormat() {
+        return StormParsedInputFormat.class;
+    }
+
+    @Override
+    public Class<? extends MRQLFileInputFormat> binaryInputFormat() {
+        return StormBinaryInputFormat.class;
+    }
+
+    @Override
+    public Class<? extends MRQLFileInputFormat> generatorInputFormat() {
+        return null;
+    }
+
+    /** used by the master to send parsing details (eg, record types) to workers */
+    public static void dump_source_dir () throws IOException {
+        if (Config.local_mode)
+        return;
+        DataSource.dataSourceDirectory.distribute(Plan.conf);
+        Path path = new Path(data_source_dir_name);
+        FileSystem fs = path.getFileSystem(Plan.conf);
+        PrintStream ps = new PrintStream(fs.create(path,true));
+        ps.println(Plan.conf.get("mrql.data.source.directory"));
+        ps.close();
+    }
+
+    /** executed by a worker when reading parsed input (see SparkParsedInputFormat) */
+    public static void load_source_dir () throws IOException {
+        if (Plan.conf == null) {
+            if (evaluator == null)
+            evaluator = new StormEvaluator();
+            Plan.conf = evaluator.new_configuration();
+            Config.read(Plan.conf);
+        };
+        if (Config.local_mode)
+        return;
+        // the name of the file that contains the source directory details is read from an HDFS file by workers
+        Path path = new Path(data_source_dir_name);
+        FileSystem fs = path.getFileSystem(Plan.conf);
+        BufferedReader ftp = new BufferedReader(new InputStreamReader(fs.open(path)));
+        Plan.conf.set("mrql.data.source.directory",ftp.readLine());
+        DataSource.dataSourceDirectory.read(Plan.conf);
+        ftp.close();
+    }
+
+    private static Bag bag ( final Iterable<MRData> s ) {
+        final Iterator<MRData> i = s.iterator();
+        return new Bag(new BagIterator() {
+            public MRData next () {
+                return i.next();
+            }
+            public boolean hasNext () {
+                return i.hasNext();
+            }
+            });
+    }
+
+    /** Coerce a persistent collection to a Bag */
+    @Override
+    public Bag toBag ( MRData data ) {
+        try {
+            return (Bag)data;
+            } catch (Exception ex) {
+                throw new Error("Cannot coerce "+data+" to a Bag: "+ex);
+            }
+        }
+
+        @Override
+        public MRData aggregate(Tree acc_fnc, Tree zero, Tree plan, Environment env) throws Exception {
+            return null;
+        }
+
+        @Override
+        public Tuple loop(Tree e, Environment env) throws Exception {
+            return null;
+        }
+        
+        @Override
+        public void streaming(Tree plan, Environment env, Environment dataset_env, Function f) {
+            this.f = f;
+            StormStreaming.evaluate(plan, env, dataset_env, f);
+        }
+
+        @Override
+        public DataSet eval(Tree e, Environment env, String counter) {
+            Stream res = eval(e, env,(Environment)null);
+            res.map(new MapFunction() {
+                @Override
+                public Values execute(TridentTuple input) {
+                    MRData dataset = (MRData)input.get(0);
+                    f.eval(dataset);
+                    return new Values(input);
+                }
+                });
+
+            DataSet data = new DataSet(new StreamDataSource(res), -1, -1);
+            return data;
+        }
+
+        final public Stream eval(final Tree e, final Environment env,final Environment stream_env){
+            if (Config.trace_execution) {
+                tab_count += 3;
+                System.out.println(tabs(tab_count) + print_query(e));
+            };
+            final Stream res = evalD(e, env,stream_env);
+            return res;
+        }
+
+        final public Stream evalD ( final Tree e, final Environment env,final Environment stream_env ) {
+            try {
+                match e {
+                    case cMap(`f,`s):
+                    return eval(s,env,stream_env).flatMap(cmap_fnc(f,env));
+                    case MapReduce(`m,`r,`s,`o):
+                    Stream mappedStream = eval(s,env,stream_env).flatMap(cmap_fnc(m,env));
+                    return groupBy(mappedStream,r,env,o);
+                    case MapCombineReduce(`m,`c,`r,`s,`o):
+                    Stream mappedStream = eval(s,env,stream_env).flatMap(cmap_fnc(m,env));
+                    return groupBy(mappedStream,r,env,o);
+                    case `v:
+                    if (!v.is_variable())
+                    fail;
+                    MRData x = variable_lookup(v.toString(),global_streams);
+                    if (x != null && x instanceof MR_stream)
+                    return ((MR_stream)x).stream();
+
+                    x = variable_lookup(v.toString(),env);
+                    throw new Error("Variable "+v+" is not bound");
+                };
+                throw new Error("Unrecognized Storm plan: "+e);
+                } catch (Error msg) {
+                    if (!Config.trace)
+                    throw new Error(msg.getMessage());
+                    System.err.println(msg.getMessage());
+                    throw new Error("Evaluation error in: "+print_query(e));
+                    } catch (Exception ex) {
+                        System.err.println(ex.getMessage());
+                        ex.printStackTrace();
+                        throw new Error("Evaluation error in: "+print_query(e));
+                    }
+                }
+
+                private static Stream reduce_output(Stream s){
+                    return null;
+                }
+
+                private static FlatMapFunction cmap_fnc ( final Tree fnc,Environment env ) {
+                   final Function f = evalF(fnc,null);
+                   return new FlatMapFunction() {
+                       @Override
+                       public Iterable<Values> execute(TridentTuple input) {
+                           List<Values> out = new ArrayList<Values>();
+                           MRData value = (MRData)input.get(0);
+                           for (MRData e: (Bag)f.eval(value) ){
+                            out.add(new Values(e));
+                        }
+                        return out;
+                    }
+                };  
+            }
+            
+            private static Stream groupBy ( Stream s, Tree fnc, Environment env, Tree o ) {
+               final Function reducer = evalF(fnc,null);
+
+               Stream keyValueStream = s.each(s.getOutputFields(),new BaseFunction() {
+                @Override
+                public void execute(TridentTuple input, TridentCollector output) {
+                    Tuple value = (Tuple)input.get(0);
+                    MRData key = value.first();
+                    MRData values = (MRData)value.second();
+                    output.emit(new Values(key,values));
+                }
+                },new Fields("keyfield","valuefield"));
+               GroupedStream groupedStream = keyValueStream.groupBy(new Fields("keyfield"));
+               Stream finalStream = groupedStream.aggregate(new Fields("keyfield","valuefield"), new BaseAggregator<Map<MRData,Bag>>() {
+                @Override
+                public Map<MRData,Bag> init(Object batchId, TridentCollector collector) {
+                    return new HashMap<MRData,Bag>();
+                }
+                @Override
+                public void aggregate(Map<MRData,Bag> val, TridentTuple tuple, TridentCollector collector) {
+                    MRData key = (MRData)tuple.get(0);
+                    Bag values;
+                    if(!val.containsKey(key)){
+                        values = new Bag();
+                    }
+                    else{
+                        values = val.get(key); 
+                    }
+                    MRData value = (MRData)tuple.get(1);
+                    values.add(value);
+                    val.put(key,values);                
+                }
+
+                @Override
+                public void complete(Map<MRData,Bag> val, TridentCollector collector) {
+                    for (Map.Entry<MRData, Bag> entry : val.entrySet()) {
+                        MRData key = entry.getKey();
+                        Bag value = entry.getValue();
+                        Bag reducedValues = (Bag)reducer.eval(new Tuple(key,value));
+                        collector.emit(new Values(reducedValues));
+                    }
+                }
+                }, new Fields("outputdata"));
+               return finalStream.project(new Fields("outputdata"));
+           }
+       }
diff --git a/storm/src/main/java/org/apache/mrql/StormFileInputStream.java b/storm/src/main/java/org/apache/mrql/StormFileInputStream.java
new file mode 100644
index 0000000..dd63d8f
--- /dev/null
+++ b/storm/src/main/java/org/apache/mrql/StormFileInputStream.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class StormFileInputStream extends BaseRichSpout{
+    private String directory;
+    private boolean is_binary; 
+    SpoutOutputCollector _collector;
+
+    public StormFileInputStream(String directory, boolean is_binary) {
+        this.directory = directory;
+        this.is_binary = is_binary;
+    }
+    
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("data"));
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+     _collector = collector;
+ }
+
+ @Override
+ public void nextTuple() {
+    Bag val =  (Bag)MapReduceAlgebra.read_binary(directory);
+    _collector.emit(new Values(val));
+}    
+}
diff --git a/storm/src/main/java/org/apache/mrql/StormMRQLFileInputFormat.java b/storm/src/main/java/org/apache/mrql/StormMRQLFileInputFormat.java
new file mode 100644
index 0000000..62a1e08
--- /dev/null
+++ b/storm/src/main/java/org/apache/mrql/StormMRQLFileInputFormat.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+public abstract class StormMRQLFileInputFormat extends FileInputFormat<MRContainer,MRContainer> implements MRQLFileInputFormat,Serializable {
+
+    abstract public RecordReader<MRContainer,MRContainer>
+    getRecordReader ( InputSplit split, JobConf job, Reporter reporter ) throws IOException;
+
+    @Override
+    public Bag materialize(final Path file) throws IOException {
+        final JobConf job = new JobConf(Plan.conf,MRQLFileInputFormat.class);
+        setInputPaths(job,file);
+        final InputSplit[] splits = getSplits(job,1);
+        final Reporter reporter = null;
+        final RecordReader<MRContainer,MRContainer> rd = getRecordReader(splits[0],job,reporter);
+        return new Bag(new BagIterator () {
+            RecordReader<MRContainer,MRContainer> reader = rd;
+            MRContainer key = reader.createKey();
+            MRContainer value = reader.createKey();
+            int i = 0;
+            public boolean hasNext () {
+                try {
+                    if (reader.next(key,value))
+                        return true;
+                    do {
+                        if (++i >= splits.length)
+                            return false;
+                        reader.close();
+                        reader = getRecordReader(splits[i],job,reporter);
+                    } while (!reader.next(key,value));
+                    return true;
+                } catch (IOException e) {
+                    throw new Error("Cannot collect values from an intermediate result");
+                }
+            }
+            public MRData next () {
+                return value.data();
+            }
+        });
+    }
+
+    @Override
+    public Bag collect(DataSet x, boolean strip) throws Exception {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+    
+}
diff --git a/storm/src/main/java/org/apache/mrql/StormParsedInputFormat.java b/storm/src/main/java/org/apache/mrql/StormParsedInputFormat.java
new file mode 100644
index 0000000..bf66867
--- /dev/null
+++ b/storm/src/main/java/org/apache/mrql/StormParsedInputFormat.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mrql.gen.Trees;
+
+public class StormParsedInputFormat extends StormMRQLFileInputFormat{
+    public static class ParsedRecordReader implements RecordReader<MRContainer,MRContainer> {
+        final FSDataInputStream fsin;
+        final long start;
+        final long end;
+        Iterator<MRData> result;
+        Parser parser;
+
+        public ParsedRecordReader ( FileSplit split,
+            Configuration conf,
+            Class<? extends Parser> parser_class,
+            Trees args ) throws IOException {
+            start = split.getStart();
+            end = start + split.getLength();
+            Path file = split.getPath();
+            FileSystem fs = file.getFileSystem(conf);
+            fsin = fs.open(split.getPath());
+            try {
+                parser = parser_class.newInstance();
+            } catch (Exception ex) {
+                throw new Error("Unrecognized parser:"+parser_class);
+            };
+            parser.initialize(args);
+            parser.open(fsin,start,end);
+            result = null;
+        }
+
+        public MRContainer createKey () {
+            return new MRContainer();
+        }
+
+        public MRContainer createValue () {
+            return new MRContainer();
+        }
+
+        public synchronized boolean next ( MRContainer key, MRContainer value ) throws IOException {
+            while (result == null || !result.hasNext()) {
+                String s = parser.slice();
+                if (s == null)
+                    return false;
+                result = parser.parse(s).iterator();
+            };
+            value.set((MRData)result.next());
+            key.set(new MR_long(fsin.getPos()));
+            return true;
+        }
+
+        public synchronized long getPos () throws IOException { return fsin.getPos(); }
+
+        public synchronized void close () throws IOException { fsin.close(); }
+
+        public float getProgress () throws IOException {
+            if (end == start)
+                return 0.0f;
+            else return Math.min(1.0f, (getPos() - start) / (float)(end - start));
+        }
+    }
+    
+    
+    @Override
+    public RecordReader<MRContainer, MRContainer> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+        StormEvaluator.load_source_dir();  // load the parsed source parameters from a file
+        String path = ((FileSplit)split).getPath().toString();
+        ParsedDataSource ds = (ParsedDataSource)DataSource.get(path,Plan.conf);
+        return new ParsedRecordReader((FileSplit)split,job,ds.parser,(Trees)ds.args);
+    }
+    
+}
diff --git a/storm/src/main/java/org/apache/mrql/StormStreaming.gen b/storm/src/main/java/org/apache/mrql/StormStreaming.gen
new file mode 100644
index 0000000..4f2d8c6
--- /dev/null
+++ b/storm/src/main/java/org/apache/mrql/StormStreaming.gen
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.mrql;
+
+ import java.util.ArrayList;
+ import java.io.*;
+ import org.apache.mrql.gen.*;
+ import org.apache.storm.LocalCluster;
+ import org.apache.storm.trident.Stream;
+
+ /** Evaluates physical plans in Apache Storm mode */
+ public class StormStreaming extends StormEvaluator implements Serializable {
+    
+    private static ArrayList<Stream> streams;
+    private static ArrayList<String> stream_names;
+    private static int tries = 0;
+    private final static StormEvaluator ef = (StormEvaluator)Evaluator.evaluator;
+    
+
+    private static Stream stream_source ( Tree source, Environment env,String streamName) {
+        match source {
+            case BinaryStream(`file,_):
+            String path = ((MR_string)evalE(file,env)).get();
+            new BinaryDataSource(path,Plan.conf);
+            return topology.newStream(streamName, new HDFSFileInputStream(path, is_dataset, new StormBinaryInputFormat()));
+
+            case ParsedStream(`parser,`file,...args):
+            String path = ((MR_string)evalE(file,env)).get();
+            Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
+            if (p == null)
+            throw new Error("Unknown parser: "+parser);
+            new ParsedDataSource(path,p,args,Plan.conf);
+            try {
+                dump_source_dir();
+                } catch (IOException ex) {
+                  throw new Error("Cannot dump source directory");
+              };
+              return topology.newStream(streamName, new HDFSFileInputStream(path, is_dataset, new StormParsedInputFormat()));
+              
+          };
+          throw new Error("Unknown stream source: "+print_query(source));
+      }
+
+      private static Tree get_streams ( Tree plan, Environment env ) {
+        match plan {
+            case Stream(lambda(`v,`b),`source):
+            streams.add(stream_source(source,env,v.toString()));
+            stream_names.add(v.toString());
+            return get_streams(b,env);
+        };
+        return plan;
+    }
+
+    /** bind the pattern variables to values */
+    private static void bind_list ( Tree pattern, Tree src, Environment env, Environment rdd_env ) {
+        //System.out.println("Inside bind_list ");
+        throw new Error("Operation not implemented.");
+    }
+    
+    private static void stream_processing ( final Tree plan, final Environment env,
+        final Environment dataset_env, final Function f ) {
+       
+        if (streams.size() == 0)
+        throw new Error("No input streams in query");
+        ArrayList<Stream> rdds = new ArrayList<Stream>();
+        for ( Stream jd: streams )
+        rdds.add(jd);
+
+        
+        final ArrayList<String> vars = stream_names;
+
+
+        long t = System.currentTimeMillis();
+        Environment rdd_env = dataset_env;
+        int i = 0;
+        
+        for(String name:stream_names){
+            MRData d = new MR_stream(streams.get(i));
+            global_streams = new Environment(vars.get(i),d,global_streams);
+            rdd_env = new Environment(vars.get(i++),d,rdd_env);
+        }
+
+        match plan {
+            case lambda(`pat,`e):
+            bind_list(pat,e,env,rdd_env);
+            f.eval(null);
+        case _: // non-incremental streaming
+        final MRData result = ef.evalE(plan,env);
+    };
+    
+}
+
+/** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
+final public static void evaluate ( Tree plan, Environment env, Environment dataset_env, Function f ) {
+    streams = new ArrayList<Stream>();
+    stream_names = new ArrayList<String>();
+    match plan {
+        case lambda(`p,`b):
+        b = get_streams(b,env);
+        stream_processing(#<lambda(`p,`b)>,env,null,f);
+        case _:  // non-incremental streaming
+        plan = get_streams(plan,env);
+        stream_processing(plan,env,dataset_env,f);
+    };
+
+    org.apache.storm.Config conf = new org.apache.storm.Config();
+    conf.setFallBackOnJavaSerialization(false);
+    try{
+
+        conf.setMaxSpoutPending(20);
+        //StormSubmitter.submitTopology("mrqlstormevaluator",conf,topology.build());
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology("mrqlstormevaluator", conf, topology.build());
+        Thread.sleep(15000);
+        cluster.shutdown();
+    }
+    catch(Exception e2){
+        e2.printStackTrace();
+    }
+}
+}
diff --git a/storm/src/main/java/org/apache/mrql/StreamDataSource.java b/storm/src/main/java/org/apache/mrql/StreamDataSource.java
new file mode 100644
index 0000000..8f49fc8
--- /dev/null
+++ b/storm/src/main/java/org/apache/mrql/StreamDataSource.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class StreamDataSource extends DataSource implements Serializable {
+    Stream stream;
+    long numOfRecords;
+    
+    public StreamDataSource(Stream stream) {
+        super();
+        this.stream = stream;
+    }
+
+    @Override
+    public long size(Configuration conf) {
+        return super.size(conf);
+    }
+
+    @Override
+    public List<MRData> take(int num) {
+        final List<MRData> data = new ArrayList<MRData>();
+        stream.each(stream.getOutputFields(),new BaseFunction() {
+            @Override
+            public void execute(TridentTuple tuple, TridentCollector collector) {
+               System.out.println("output: "+tuple);
+               MRData value = (MRData)tuple.get(0);
+               data.add(value);
+           }
+       },new Fields("finaloutput"));
+        return data;
+    }
+
+    @Override
+    public MRData reduce(MRData zero,final Function acc) {
+     return null;
+ }
+}
diff --git a/storm/src/main/java/org/apache/mrql/utils/PrintUtil.java b/storm/src/main/java/org/apache/mrql/utils/PrintUtil.java
new file mode 100644
index 0000000..79b2ee7
--- /dev/null
+++ b/storm/src/main/java/org/apache/mrql/utils/PrintUtil.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql.utils;
+
+import java.util.Map;
+import org.apache.storm.trident.operation.Filter;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+
+public class PrintUtil implements Filter{
+
+    @Override
+    public boolean isKeep(TridentTuple tt) {
+        System.out.println("output: "+tt);
+        return true;
+    }
+
+    @Override
+    public void prepare(Map map, TridentOperationContext toc) {
+        
+    }
+
+    @Override
+    public void cleanup() {
+        
+    }
+    
+    
+}
diff --git a/storm/src/test/java/org/apache/mrql/StormEvaluatorLocalModeTest.java b/storm/src/test/java/org/apache/mrql/StormEvaluatorLocalModeTest.java
new file mode 100644
index 0000000..ce1ad9f
--- /dev/null
+++ b/storm/src/test/java/org/apache/mrql/StormEvaluatorLocalModeTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+public class StormEvaluatorLocalModeTest extends EvaluatorTest {
+
+	public void tearDown() throws IOException {
+		super.tearDown();
+		Plan.clean();
+	}
+
+	@Override
+	protected Evaluator createEvaluator() throws Exception {
+		Configuration conf = null;
+
+		Config.bsp_mode = false;
+		Config.spark_mode = false;
+		Config.flink_mode = false;
+		Config.storm_mode = true;
+		Config.map_reduce_mode = false;
+
+		Evaluator.evaluator = new StormEvaluator();
+
+		Config.quiet_execution = true;
+
+		String[] args = new String[] { "-local", "-storm" };
+
+		conf = Evaluator.evaluator.new_configuration();
+		GenericOptionsParser gop = new GenericOptionsParser(conf, args);
+		conf = gop.getConfiguration();
+
+		args = gop.getRemainingArgs();
+
+		Config.hadoop_mode = true;
+		Config.testing = true;
+		Config.parse_args(args, conf);
+		
+		Evaluator.evaluator.init(conf);
+		
+		return Evaluator.evaluator;
+	}
+}
diff --git a/storm/src/test/java/org/apache/mrql/StormQueryLocalModeTest.java b/storm/src/test/java/org/apache/mrql/StormQueryLocalModeTest.java
new file mode 100644
index 0000000..e3a71c5
--- /dev/null
+++ b/storm/src/test/java/org/apache/mrql/StormQueryLocalModeTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.IOException;
+import java.io.File;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+public class StormQueryLocalModeTest extends QueryTest {
+
+	public void tearDown() throws IOException {
+		super.tearDown();
+		Plan.clean();
+	}
+
+	@Override
+	protected Evaluator createEvaluator() throws Exception {
+		Configuration conf = new Configuration();
+
+		Config.bsp_mode = false;
+		Config.spark_mode = false;
+		Config.flink_mode = false;
+		Config.storm_mode = true;
+		Config.map_reduce_mode = false;
+
+		Evaluator.evaluator = new StormEvaluator();
+
+		Config.quiet_execution = true;
+
+		String[] args = new String[] { "-dist", "-storm","-stream","5" };
+
+		conf = Evaluator.evaluator.new_configuration();
+		GenericOptionsParser gop = new GenericOptionsParser(conf, args);
+		conf = gop.getConfiguration();
+
+		args = gop.getRemainingArgs();
+		
+		Config.hadoop_mode = true;
+		Config.testing = true;		
+		Config.parse_args(args, conf);
+
+		Evaluator.evaluator.init(conf);
+		
+		return Evaluator.evaluator;
+	}
+
+	@Override 
+	public void testCore() throws Exception {
+		assertEquals(0, queryAndCompare(new File(queryDir, "core_1.mrql"), resultDir));
+	}
+
+	@Override 
+	public void testDistinct() throws Exception {
+		//assertEquals(0, queryAndCompare(new File(queryDir, "distinct_1.mrql"), resultDir));
+	}
+
+	@Override 
+	public void testFactorization() throws Exception {
+           // if (!Config.bsp_mode) // matrix factorization needs at least 3 nodes in BSP Hama mode
+		//assertEquals(0, queryAndCompare(new File(queryDir, "factorization_1.mrql"), resultDir));
+	}
+
+	@Override
+	public void testGroupBy() throws Exception {
+		//assertEquals(0, queryAndCompare(new File(queryDir, "group_by_1.mrql"), resultDir));
+	}
+	
+	@Override 
+	public void testGroupByHaving() throws Exception {
+		//assertEquals(0, queryAndCompare(new File(queryDir, "group_by_having_1.mrql"), resultDir));
+	}	
+
+	@Override 
+	public void testGroupByOrderBy() throws Exception {
+		//assertEquals(0, queryAndCompare(new File(queryDir, "group_by_order_by_1.mrql"), resultDir));
+		//assertEquals(0, queryAndCompare(new File(queryDir, "group_by_order_by_2.mrql"), resultDir));
+	}
+
+	@Override 
+	public void testJoinGroupBy() throws Exception {
+		//assertEquals(0, queryAndCompare(new File(queryDir, "join_group_by_1.mrql"), resultDir));
+		//assertEquals(0, queryAndCompare(new File(queryDir, "join_group_by_2.mrql"), resultDir));
+		//assertEquals(0, queryAndCompare(new File(queryDir, "join_group_by_3.mrql"), resultDir));
+	}
+	
+	@Override 
+	public void testJoinOrderBy() throws Exception {
+		//assertEquals(0, queryAndCompare(new File(queryDir, "join_order_by_1.mrql"), resultDir));
+	}
+
+	@Override 
+	public void testJoins() throws Exception {
+		//assertEquals(0, queryAndCompare(new File(queryDir, "joins_1.mrql"), resultDir));
+	}
+
+	@Override 
+	public void testKmeans() throws Exception {
+		//assertEquals(0, queryAndCompare(new File(queryDir, "kmeans_1.mrql"), resultDir));
+	}
+
+	@Override 
+	public void testLoop() throws Exception {
+		//assertEquals(0, queryAndCompare(new File(queryDir, "loop_1.mrql"), resultDir));
+	}
+
+	@Override 
+	public void testMatrix() throws Exception {
+		//assertEquals(0, queryAndCompare(new File(queryDir, "matrix_1.mrql"), resultDir));
+	}
+
+	@Override 
+	public void testNestedSelect() throws Exception {
+		//assertEquals(0, queryAndCompare(new File(queryDir, "nested_select_1.mrql"), resultDir));
+	}
+
+	@Override 
+	public void testOrderBy() throws Exception {
+		//assertEquals(0, queryAndCompare(new File(queryDir, "order_by_1.mrql"), resultDir));
+	}
+
+	@Override
+	public void testPagerank() throws Exception {
+		//assertEquals(0, queryAndCompare(new File(queryDir, "pagerank_1.mrql"), resultDir));
+	}
+
+	@Override 
+	public void testRelationalJoin() throws Exception {
+		//assertEquals(0, queryAndCompare(new File(queryDir, "relational_join_1.mrql"), resultDir));
+	}
+
+	@Override 
+	public void testTotalAggregation() throws Exception {
+		//assertEquals(0, queryAndCompare(new File(queryDir, "total_aggregation_1.mrql"), resultDir));
+		//assertEquals(0, queryAndCompare(new File(queryDir, "total_aggregation_2.mrql"), resultDir));
+	}
+
+	@Override 
+	public void testUdf() throws Exception {
+		//assertEquals(0, queryAndCompare(new File(queryDir, "udf_1.mrql"), resultDir));
+	}
+
+	@Override 
+	public void testUserAggregation() throws Exception {
+		//assertEquals(0, queryAndCompare(new File(queryDir, "user_aggregation_1.mrql"), resultDir));
+	}
+
+	@Override 
+	public void testXml() throws Exception {
+		//assertEquals(0, queryAndCompare(new File(queryDir, "xml_1.mrql"), resultDir));
+	//	assertEquals(0, queryAndCompare(new File(queryDir, "xml_2.mrql"), resultDir));
+	}
+}