[MRQL-90] Add support for Storm streaming
diff --git a/bin/mrql.storm b/bin/mrql.storm
new file mode 100755
index 0000000..ce2b526
--- /dev/null
+++ b/bin/mrql.storm
@@ -0,0 +1,77 @@
+#!/bin/bash
+#--------------------------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#--------------------------------------------------------------------------------
+#
+# run Apache MRQL in Storm mode using Apache Storm
+#
+#--------------------------------------------------------------------------------
+
+MRQL_HOME="$(cd `dirname $0`/..; pwd -P)"
+
+. "$MRQL_HOME/conf/mrql-env.sh"
+
+GEN_JAR=`ls "$MRQL_HOME"/lib/mrql-gen-*.jar`
+CORE_JAR=`ls "$MRQL_HOME"/lib/mrql-core-*.jar`
+MRQL_JAR=`ls "$MRQL_HOME"/lib/mrql-storm-*.jar`
+FULL_JAR="/tmp/${USER}_mrql_storm.jar"
+CLASS_DIR="/tmp/${USER}_mrql_classes"
+
+if [[ -z ${STORM_JARS} ]]; then
+ echo "*** Cannot find the Storm jar file. Need to edit mrql-env.sh"; exit -1
+fi
+HADOOP_JARS=${HADOOP_HOME}/share/hadoop/mapreduce/hadoop-mapreduce-client-core-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/hadoop-common-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/hdfs/hadoop-hdfs-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/lib/*
+LAMDA_JARS="/tmp/mrql_jar_${USER}/*"
+
+export JAVA_HOME FS_DEFAULT_NAME BSP_MASTER_ADDRESS STORM_ZOOKEEPER_QUORUM BSP_SPLIT_INPUT
+
+if [[ ($MRQL_JAR -nt $FULL_JAR) ]]; then
+ rm -rf $CLASS_DIR
+ mkdir -p $CLASS_DIR
+ pushd $CLASS_DIR > /dev/null
+ $JAVA_HOME/bin/jar xf $CUP_JAR
+ $JAVA_HOME/bin/jar xf $JLINE_JAR
+ $JAVA_HOME/bin/jar xf $GEN_JAR
+ $JAVA_HOME/bin/jar xf $CORE_JAR
+ $JAVA_HOME/bin/jar xf $MRQL_JAR
+ #$JAVA_HOME/bin/jar xf $HADOOP_JARS
+ #$JAVA_HOME/bin/jar xf $LAMDA_JARS
+ cd ..
+ $JAVA_HOME/bin/jar cf $FULL_JAR -C $CLASS_DIR .
+ popd > /dev/null
+fi
+
+if [ "$1" == "-local" ]; then
+ export STORM_CLASSPATH=$FULL_JAR # FOR LOCAL CLUSTER IN STORM
+ export storm.jar=$FULL_JAR
+ $STORM_HOME/bin/storm jar --config $STORM_CONFIG jar $FULL_JAR org.apache.mrql.Main -storm $*
+else if [ "$1" == "-dist" ]; then
+ #export STORM_CLASSPATH=$FULL_JAR # FOR LOCAL CLUSTER IN STORM
+ #export storm.jar=$FULL_JAR
+ #echo $storm.jar
+ $STORM_HOME/bin/storm jar --config $STORM_CONFIG jar $FULL_JAR org.apache.mrql.Main -storm $*
+else
+ #storm.jar=$FULL_JAR
+ #echo $storm.jar
+ #$STORM_HOME/bin/storm jar $FULL_JAR org.apache.mrql.Main -storm $*
+ #$JAVA_HOME/bin/java -classpath "$FULL_JAR:$STORM_JARS:$HADOOP_JARS" -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=1044 org.apache.mrql.Main -storm $*
+ $JAVA_HOME/bin/java -classpath "$FULL_JAR:$STORM_JARS:$HADOOP_JARS:$LAMDA_JARS" org.apache.mrql.Main -storm $*
+fi
+fi
diff --git a/conf/mrql-env.sh b/conf/mrql-env.sh
index aefd589..3da0168 100644
--- a/conf/mrql-env.sh
+++ b/conf/mrql-env.sh
@@ -64,7 +64,6 @@
# The HDFS namenode URI (eg, hdfs://localhost:9000/). If empty, it is the one defined in core-site.xml
FS_DEFAULT_NAME=
-
# Optional: Hama configuration. Supports versions 0.6.2, 0.6.3, 0.6.4, and 0.7.0, and 0.7.1
HAMA_VERSION=0.7.1
# The Hama installation directory
@@ -83,7 +82,7 @@
# Spark versions 0.8.1, 0.9.0, and 0.9.1 are supported by MRQL 0.9.0 only.
# You may use the Spark prebuilts bin-hadoop1 or bin-hadoop2 (Yarn)
# Tested in local, standalone deploy, and Yarn modes
-SPARK_HOME=${HOME}/spark-1.6.2-bin-hadoop2.6
+SPARK_HOME=${HOME}/spark
# URI of the Spark master node:
# to run Spark on Standalone Mode, set it to spark://`hostname`:7077
# to run Spark on a YARN cluster, set it to "yarn-client"
@@ -100,15 +99,23 @@
# Optional: Flink configuration. Supports version 1.0.2 and 1.0.3
-FLINK_VERSION=1.0.2
+FLINK_VERSION=1.1.2
# Flink installation directory
-FLINK_HOME=${HOME}/flink-${FLINK_VERSION}
+FLINK_HOME=${HOME}/ap/flink-${FLINK_VERSION}
# number of slots per TaskManager (typically, the number of cores per node)
FLINK_SLOTS=4
# memory per TaskManager
FLINK_TASK_MANAGER_MEMORY=2048
+
+# STORM CONFIGURATIONS
+STORM_VERSION=1.0.2
+
+#Strom installation directory
+STORM_HOME=${HOME}/apache-storm-${STORM_VERSION}
+
+
# Claspaths
HAMA_JAR=${HAMA_HOME}/hama-core-${HAMA_VERSION}.jar
@@ -117,6 +124,14 @@
FLINK_JARS=${FLINK_JARS}:$I
done
+STORM_JARS=.
+for I in ${STORM_HOME}/lib/*.jar; do
+
+ if [[ $I != *"log4j-over-slf4j-"* ]]
+ then
+ STORM_JARS=${STORM_JARS}:$I
+ fi
+done
# YARN-enabled assembly jar
if [[ -d ${SPARK_HOME}/assembly/target ]]; then
diff --git a/core/src/main/java/org/apache/mrql/Config.java b/core/src/main/java/org/apache/mrql/Config.java
index 19069f3..ac9b2d8 100644
--- a/core/src/main/java/org/apache/mrql/Config.java
+++ b/core/src/main/java/org/apache/mrql/Config.java
@@ -41,6 +41,8 @@
public static boolean spark_mode = false;
// true, for Flink mode
public static boolean flink_mode = false;
+ // true, for Storm mode
+ public static boolean storm_mode = false;
// if true, it process the input interactively
public static boolean interactive = true;
// compile the MR functional arguments to Java bytecode at run-time
@@ -193,7 +195,11 @@
} else if (args[i].equals("-flink")) {
flink_mode = true;
i++;
- } else if (args[i].equals("-bsp_tasks")) {
+ }else if (args[i].equals("-storm")) {
+ storm_mode = true;
+ i++;
+ }
+ else if (args[i].equals("-bsp_tasks")) {
if (++i >= args.length && Integer.parseInt(args[i]) < 1)
throw new Error("Expected max number of bsp tasks > 1");
nodes = Integer.parseInt(args[i]);
diff --git a/core/src/main/java/org/apache/mrql/Main.java b/core/src/main/java/org/apache/mrql/Main.java
index cafc62f..db8a66f 100644
--- a/core/src/main/java/org/apache/mrql/Main.java
+++ b/core/src/main/java/org/apache/mrql/Main.java
@@ -63,11 +63,14 @@
Evaluator.evaluator = (Evaluator)Class.forName("org.apache.mrql.SparkEvaluator").newInstance();
else if (Config.flink_mode)
Evaluator.evaluator = (Evaluator)Class.forName("org.apache.mrql.FlinkEvaluator").newInstance();
+ else if (Config.storm_mode)
+ Evaluator.evaluator = (Evaluator)Class.forName("org.apache.mrql.StormEvaluator").newInstance();
else // when Config.map_reduce_mode but also the default
Evaluator.evaluator = (Evaluator)Class.forName("org.apache.mrql.MapReduceEvaluator").newInstance();
}
-
+
public static void initialize () throws Exception {
+
if (Evaluator.evaluator == null) {
if (Plan.conf == null)
Plan.conf = new Configuration();
@@ -86,8 +89,9 @@
Config.bsp_mode |= arg.equals("-bsp");
Config.spark_mode |= arg.equals("-spark");
Config.flink_mode |= arg.equals("-flink");
+ Config.storm_mode |= arg.equals("-storm");
};
- Config.map_reduce_mode = !Config.bsp_mode && !Config.spark_mode && !Config.flink_mode;
+ Config.map_reduce_mode = !Config.bsp_mode && !Config.spark_mode && !Config.flink_mode && !Config.storm_mode;
initialize_evaluator();
if (Config.hadoop_mode) {
conf = Evaluator.evaluator.new_configuration();
@@ -117,6 +121,8 @@
System.out.println("Spark mode using "+Config.nodes+" tasks)");
else if (Config.flink_mode)
System.out.println("Flink mode using "+Config.nodes+" tasks)");
+ else if (Config.storm_mode)
+ System.out.println("Storm mode using "+Config.nodes+" tasks)");
else if (Config.bsp_mode)
System.out.println("Hama BSP mode over "+Config.nodes+" BSP tasks)");
else if (Config.nodes > 0)
diff --git a/core/src/main/java/org/apache/mrql/Plan.java b/core/src/main/java/org/apache/mrql/Plan.java
index 94ee9b9..3547fe7 100644
--- a/core/src/main/java/org/apache/mrql/Plan.java
+++ b/core/src/main/java/org/apache/mrql/Plan.java
@@ -93,6 +93,8 @@
conf.set("mrql.jar.path",Compiler.jar_path);
else if (Config.spark_mode)
conf.set("mrql.jar.path",local_path.toString());
+ else if (Config.storm_mode)
+ conf.set("mrql.jar.path",Compiler.jar_path);
else {
// distribute the jar file with the compiled arguments to all clients
Path hdfs_path = new Path("mrql-tmp/class"+random_generator.nextInt(1000000)+".jar");
diff --git a/core/src/test/java/org/apache/mrql/QueryTest.java b/core/src/test/java/org/apache/mrql/QueryTest.java
index 0602a7b..b34a047 100644
--- a/core/src/test/java/org/apache/mrql/QueryTest.java
+++ b/core/src/test/java/org/apache/mrql/QueryTest.java
@@ -34,8 +34,8 @@
public abstract class QueryTest extends TestCase {
private static String TEST_QUERY_DIR = "../tests/queries";
private static String TEST_RESULT_DIR = "../tests/results";
- private File queryDir;
- private File resultDir;
+ protected File queryDir;
+ protected File resultDir;
private static Evaluator evaluator;
@BeforeClass
@@ -154,7 +154,7 @@
assertEquals(0, queryAndCompare(new File(queryDir, "xml_2.mrql"), resultDir));
}
- private int queryAndCompare ( File query, File resultDir ) throws Exception {
+ protected int queryAndCompare ( File query, File resultDir ) throws Exception {
System.err.println("Testing "+query);
Translator.global_reset();
String qname = query.getName();
diff --git a/pom.xml b/pom.xml
index 403f57f..9ac5989 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,6 +50,7 @@
<spark.version>1.6.2</spark.version>
<scala.version>2.10</scala.version>
<flink.version>1.0.3</flink.version>
+ <storm.version>1.0.2</storm.version>
<skipTests>true</skipTests>
</properties>
@@ -60,6 +61,7 @@
<module>bsp</module>
<module>spark</module>
<module>flink</module>
+ <module>storm</module>
<module>dist</module>
</modules>
@@ -154,6 +156,7 @@
<exclude>.classpath/**</exclude>
<exclude>.project/**</exclude>
<exclude>tests/data/**</exclude>
+ <exclude>queries/data/**</exclude>
<exclude>tests/results/**</exclude>
<exclude>tests/error_log.txt</exclude>
<exclude>**/dependency-reduced-pom.xml</exclude>
diff --git a/queries/data/customers.tbl b/queries/data/customers.tbl
new file mode 100644
index 0000000..89e6a3e
--- /dev/null
+++ b/queries/data/customers.tbl
@@ -0,0 +1,7 @@
+1|Ramesh|32|Ahmedabad|2000.00|
+2|Ramesh|25|Delhi|1500.00|
+3|kaushik|23|Kota|2000.00|
+4|kaushik|25|Mumbai|6500.00|
+5|Hardik|27|Bhopal|8500.00|
+6|Komal|22|MP|4500.00|
+7|Muffy|24|Indore|10000.00|
diff --git a/queries/groupby_stream_storm.mrql b/queries/groupby_stream_storm.mrql
new file mode 100644
index 0000000..d96cebf
--- /dev/null
+++ b/queries/groupby_stream_storm.mrql
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+C = stream(line,"queries/data/","|",type(<CUSTKEY:int,NAME:string,AGE:int,ADDRESS:string,SALARY:float>));
+
+SELECT (k,sum(c.SALARY))
+ FROM c in C
+ GROUP BY k: c.NAME;
diff --git a/storm/pom.xml b/storm/pom.xml
new file mode 100644
index 0000000..c33064a
--- /dev/null
+++ b/storm/pom.xml
@@ -0,0 +1,161 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.mrql</groupId>
+ <artifactId>mrql-storm</artifactId>
+ <packaging>jar</packaging>
+ <name>Apache MRQL Storm mode</name>
+ <description>Apache MRQL evaluation in storm mode on Apache storm</description>
+ <url>http://mrql.incubator.apache.org/</url>
+ <inceptionYear>2016</inceptionYear>
+
+ <parent>
+ <groupId>org.apache.mrql</groupId>
+ <artifactId>mrql-parent</artifactId>
+ <version>0.9.8-incubating-SNAPSHOT</version>
+ </parent>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.mrql</groupId>
+ <artifactId>mrql-gen</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.mrql</groupId>
+ <artifactId>mrql-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>${storm.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-hdfs</artifactId>
+ <version>${storm.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.mrql</groupId>
+ <artifactId>mrql-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals><goal>add-source</goal></goals>
+ <configuration>
+ <sources>
+ <source>${project.build.directory}/generated-sources</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.14.1</version>
+ <configuration>
+ <argLine>-Xmx1024m</argLine>
+ <useSystemClassLoader>true</useSystemClassLoader>
+ <skipTests>true</skipTests>
+<!-- Storm mode not ready for tests yet
+ <skipTests>${skipTests}</skipTests>
+-->
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.1</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <includes>
+ <include>org.apache.mrql:*</include>
+ </includes>
+ <excludes>
+ <exclude>org.apache.mrql:mrql-gen</exclude>
+ <exclude>org.apache.mrql:mrql-core</exclude>
+ </excludes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <configuration>
+ <finalName>mrql-storm-${project.version}</finalName>
+ <outputDirectory>${project.parent.basedir}/lib</outputDirectory>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <target>
+ <mkdir dir="${project.build.directory}/generated-sources/org/apache/mrql" />
+ <property name="compile_classpath" refid="maven.compile.classpath" />
+ <fileset id="mr.gen.path" dir="src/main/java/org/apache/mrql" includes="*.gen" />
+ <pathconvert pathsep=" " property="mr.gen.files" refid="mr.gen.path" />
+ <java classname="org.apache.mrql.gen.Main" classpath="../lib/mrql-gen-${project.version}.jar:${compile_classpath}">
+ <arg line="${mr.gen.files}" />
+ <arg line="-o" />
+ <arg file="${project.build.directory}/generated-sources/org/apache/mrql" />
+ </java>
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+</plugin>
+</plugins>
+</build>
+</project>
diff --git a/storm/src/main/java/org/apache/mrql/HDFSFileInputStream.java b/storm/src/main/java/org/apache/mrql/HDFSFileInputStream.java
new file mode 100644
index 0000000..aa94b14
--- /dev/null
+++ b/storm/src/main/java/org/apache/mrql/HDFSFileInputStream.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class HDFSFileInputStream extends BaseRichSpout{
+ private final String directory;
+ private final boolean is_binary;
+ private HashMap<String,Long> file_modification_times;
+ private StormMRQLFileInputFormat input_format;
+ final private SData container = new SData();
+ private SpoutOutputCollector _collector;
+
+ public HDFSFileInputStream(String directory, boolean is_binary,StormMRQLFileInputFormat input_format) {
+ this.directory = directory;
+ this.is_binary = is_binary;
+ this.input_format = input_format;
+ }
+
+ private ArrayList<String> new_files () {
+ try {
+ long ct = System.currentTimeMillis();
+ Path dpath = new Path(directory);
+ final FileSystem fs = dpath.getFileSystem(Plan.conf);
+ final FileStatus[] ds
+ = fs.listStatus(dpath,
+ new PathFilter () {
+ public boolean accept ( Path path ) {
+ return !path.getName().startsWith("_")
+ && !path.getName().endsWith(".type");
+ }
+ });
+ ArrayList<String> s = new ArrayList<String>();
+ for ( FileStatus d: ds ) {
+ String name = d.getPath().toString();
+ if (file_modification_times.get(name) == null
+ || d.getModificationTime() > file_modification_times.get(name)) {
+ file_modification_times.put(name,new Long(ct));
+ s.add(name);
+ }
+ };
+ return s;
+ } catch (Exception ex) {
+ throw new Error("Cannot open a new file from the directory "+directory+": "+ex);
+ }
+}
+
+@Override
+public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("inputdata"));
+}
+
+@Override
+public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.file_modification_times = new HashMap<String,Long>();
+ _collector = collector;
+}
+
+@Override
+public void nextTuple() {
+ try{
+ for ( String path: new_files() ) {
+ Path filePath = new Path(path);
+ Bag value = input_format.materialize(filePath);
+ for(MRData val : value){
+ _collector.emit(new Values(val));
+ }
+ }
+ }
+ catch(Exception e){
+ e.printStackTrace();
+ }
+}
+}
diff --git a/storm/src/main/java/org/apache/mrql/MR_stream.java b/storm/src/main/java/org/apache/mrql/MR_stream.java
new file mode 100644
index 0000000..ef222ac
--- /dev/null
+++ b/storm/src/main/java/org/apache/mrql/MR_stream.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import org.apache.storm.trident.Stream;
+
+public class MR_stream extends MRData{
+ Stream stream;
+
+ public MR_stream() {
+ }
+
+ public MR_stream(Stream stream) {
+ this.stream = stream;
+ }
+
+ public Stream stream(){
+ return this.stream;
+ }
+
+ @Override
+ public void materializeAll() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public void write(DataOutput d) throws IOException {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public void readFields(DataInput di) throws IOException {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public int compareTo(MRData o) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public void writeData(ObjectOutputStream arg0) throws IOException {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+
+}
diff --git a/storm/src/main/java/org/apache/mrql/SData.java b/storm/src/main/java/org/apache/mrql/SData.java
new file mode 100644
index 0000000..7d03d06
--- /dev/null
+++ b/storm/src/main/java/org/apache/mrql/SData.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.Serializable;
+
+
+public class SData implements Comparable<SData>,Serializable{
+ MRData data;
+
+ public SData(MRData data) {
+ this.data = data;
+}
+
+public SData() {
+}
+
+public MRData data () { return data; }
+
+
+@Override
+public int compareTo ( SData x ) { return data.compareTo(x.data); }
+
+@Override
+public boolean equals ( Object x ) {
+ return x instanceof SData && data.equals(((SData)x).data);
+}
+
+@Override
+public int hashCode () { return data.hashCode(); }
+
+@Override
+public String toString () { return data.toString(); }
+}
diff --git a/storm/src/main/java/org/apache/mrql/StormBinaryInputFormat.java b/storm/src/main/java/org/apache/mrql/StormBinaryInputFormat.java
new file mode 100644
index 0000000..5a24132
--- /dev/null
+++ b/storm/src/main/java/org/apache/mrql/StormBinaryInputFormat.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.IOException;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
+
+public class StormBinaryInputFormat extends StormMRQLFileInputFormat{
+ public static class BinaryInputRecordReader extends SequenceFileRecordReader<MRContainer,MRContainer> {
+ final MRContainer result = new MRContainer();
+
+ public BinaryInputRecordReader ( FileSplit split,
+ JobConf job ) throws IOException {
+ super(job,split);
+ }
+
+ @Override
+ public synchronized boolean next ( MRContainer key, MRContainer value ) throws IOException {
+ boolean b = super.next(key,result);
+ value.set(result.data());
+ return b;
+ }
+ }
+
+ @Override
+ public RecordReader<MRContainer,MRContainer>
+ getRecordReader ( InputSplit split,
+ JobConf job,
+ Reporter reporter ) throws IOException {
+ String path = ((FileSplit)split).getPath().toString();
+ BinaryDataSource ds = (BinaryDataSource)DataSource.get(path,job);
+ return new BinaryInputRecordReader((FileSplit)split,job);
+ }
+
+}
diff --git a/storm/src/main/java/org/apache/mrql/StormEvaluator.gen b/storm/src/main/java/org/apache/mrql/StormEvaluator.gen
new file mode 100644
index 0000000..edcbb44
--- /dev/null
+++ b/storm/src/main/java/org/apache/mrql/StormEvaluator.gen
@@ -0,0 +1,312 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.mrql;
+ import java.io.BufferedReader;
+ import java.io.IOException;
+ import java.io.InputStreamReader;
+ import java.io.PrintStream;
+ import java.io.Serializable;
+import java.lang.reflect.Method;
+ import java.net.URL;
+ import java.net.URLClassLoader;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.mrql.gen.Node;
+ import org.apache.mrql.gen.Tree;
+ import org.apache.storm.trident.Stream;
+ import org.apache.storm.trident.TridentTopology;
+ import org.apache.storm.trident.operation.FlatMapFunction;
+ import org.apache.storm.trident.tuple.TridentTuple;
+ import org.apache.storm.tuple.Fields;
+ import org.apache.storm.tuple.Values;
+ import org.apache.storm.trident.fluent.GroupedStream;
+ import org.apache.storm.trident.operation.BaseAggregator;
+ import org.apache.storm.trident.operation.BaseFunction;
+ import org.apache.storm.trident.operation.MapFunction;
+ import org.apache.storm.trident.operation.TridentCollector;
+
+ public class StormEvaluator extends Evaluator implements Serializable {
+
+ public static TridentTopology topology;
+ static Environment global_streams = null;
+ final static String data_source_dir_name = "tmp/"+System.getenv("USER")+"_data_source_dir.txt";
+ private static Function f;
+
+ @Override
+ public void init(Configuration conf) {
+ global_streams = null;
+ if (Config.stream_window > 0 && (Config.local_mode || Config.hadoop_mode)) {
+ if (Config.trace_execution) {
+ System.out.println("Creating a new storm topology");
+ };
+ }
+ topology = new TridentTopology();
+ Plan.conf = new Configuration();
+ if (Config.hadoop_mode && Config.local_mode) {
+ FileSystem.setDefaultUri(Plan.conf,"file:///");
+ } else if (Config.hadoop_mode) {
+ if (!System.getenv("FS_DEFAULT_NAME").equals(""))
+ FileSystem.setDefaultUri(Plan.conf,System.getenv("FS_DEFAULT_NAME"));
+ };
+ }
+
+ @Override
+ public void initialize_query() {
+ Plan.distribute_compiled_arguments(Plan.conf);
+ String jarPath = Plan.conf.get("mrql.jar.path");
+ try{
+ addURL(new URL("file://"+jarPath));
+ }
+ catch(Exception e){
+ e.printStackTrace();
+ }
+ }
+
+ public void addURL(URL url) throws Exception {
+ URLClassLoader classLoader
+ = (URLClassLoader) ClassLoader.getSystemClassLoader();
+ Class clazz= URLClassLoader.class;
+
+ // Use reflection
+ Method method= clazz.getDeclaredMethod("addURL", new Class[] { URL.class });
+ method.setAccessible(true);
+ method.invoke(classLoader, new Object[] { url });
+ }
+
+ @Override
+ public void shutdown(Configuration conf) {
+
+ }
+
+ @Override
+ public Configuration new_configuration() {
+ return new Configuration();
+ }
+
+ @Override
+ public Class<? extends MRQLFileInputFormat> parsedInputFormat() {
+ return StormParsedInputFormat.class;
+ }
+
+ @Override
+ public Class<? extends MRQLFileInputFormat> binaryInputFormat() {
+ return StormBinaryInputFormat.class;
+ }
+
+ @Override
+ public Class<? extends MRQLFileInputFormat> generatorInputFormat() {
+ return null;
+ }
+
+ /** used by the master to send parsing details (eg, record types) to workers */
+ public static void dump_source_dir () throws IOException {
+ if (Config.local_mode)
+ return;
+ DataSource.dataSourceDirectory.distribute(Plan.conf);
+ Path path = new Path(data_source_dir_name);
+ FileSystem fs = path.getFileSystem(Plan.conf);
+ PrintStream ps = new PrintStream(fs.create(path,true));
+ ps.println(Plan.conf.get("mrql.data.source.directory"));
+ ps.close();
+ }
+
+ /** executed by a worker when reading parsed input (see SparkParsedInputFormat) */
+ public static void load_source_dir () throws IOException {
+ if (Plan.conf == null) {
+ if (evaluator == null)
+ evaluator = new StormEvaluator();
+ Plan.conf = evaluator.new_configuration();
+ Config.read(Plan.conf);
+ };
+ if (Config.local_mode)
+ return;
+ // the name of the file that contains the source directory details is read from an HDFS file by workers
+ Path path = new Path(data_source_dir_name);
+ FileSystem fs = path.getFileSystem(Plan.conf);
+ BufferedReader ftp = new BufferedReader(new InputStreamReader(fs.open(path)));
+ Plan.conf.set("mrql.data.source.directory",ftp.readLine());
+ DataSource.dataSourceDirectory.read(Plan.conf);
+ ftp.close();
+ }
+
+ private static Bag bag ( final Iterable<MRData> s ) {
+ final Iterator<MRData> i = s.iterator();
+ return new Bag(new BagIterator() {
+ public MRData next () {
+ return i.next();
+ }
+ public boolean hasNext () {
+ return i.hasNext();
+ }
+ });
+ }
+
+ /** Coerce a persistent collection to a Bag */
+ @Override
+ public Bag toBag ( MRData data ) {
+ try {
+ return (Bag)data;
+ } catch (Exception ex) {
+ throw new Error("Cannot coerce "+data+" to a Bag: "+ex);
+ }
+ }
+
+ @Override
+ public MRData aggregate(Tree acc_fnc, Tree zero, Tree plan, Environment env) throws Exception {
+ return null;
+ }
+
+ @Override
+ public Tuple loop(Tree e, Environment env) throws Exception {
+ return null;
+ }
+
+ @Override
+ public void streaming(Tree plan, Environment env, Environment dataset_env, Function f) {
+ this.f = f;
+ StormStreaming.evaluate(plan, env, dataset_env, f);
+ }
+
+ @Override
+ public DataSet eval(Tree e, Environment env, String counter) {
+ Stream res = eval(e, env,(Environment)null);
+ res.map(new MapFunction() {
+ @Override
+ public Values execute(TridentTuple input) {
+ MRData dataset = (MRData)input.get(0);
+ f.eval(dataset);
+ return new Values(input);
+ }
+ });
+
+ DataSet data = new DataSet(new StreamDataSource(res), -1, -1);
+ return data;
+ }
+
+ final public Stream eval(final Tree e, final Environment env,final Environment stream_env){
+ if (Config.trace_execution) {
+ tab_count += 3;
+ System.out.println(tabs(tab_count) + print_query(e));
+ };
+ final Stream res = evalD(e, env,stream_env);
+ return res;
+ }
+
+ final public Stream evalD ( final Tree e, final Environment env,final Environment stream_env ) {
+ try {
+ match e {
+ case cMap(`f,`s):
+ return eval(s,env,stream_env).flatMap(cmap_fnc(f,env));
+ case MapReduce(`m,`r,`s,`o):
+ Stream mappedStream = eval(s,env,stream_env).flatMap(cmap_fnc(m,env));
+ return groupBy(mappedStream,r,env,o);
+ case MapCombineReduce(`m,`c,`r,`s,`o):
+ Stream mappedStream = eval(s,env,stream_env).flatMap(cmap_fnc(m,env));
+ return groupBy(mappedStream,r,env,o);
+ case `v:
+ if (!v.is_variable())
+ fail;
+ MRData x = variable_lookup(v.toString(),global_streams);
+ if (x != null && x instanceof MR_stream)
+ return ((MR_stream)x).stream();
+
+ x = variable_lookup(v.toString(),env);
+ throw new Error("Variable "+v+" is not bound");
+ };
+ throw new Error("Unrecognized Storm plan: "+e);
+ } catch (Error msg) {
+ if (!Config.trace)
+ throw new Error(msg.getMessage());
+ System.err.println(msg.getMessage());
+ throw new Error("Evaluation error in: "+print_query(e));
+ } catch (Exception ex) {
+ System.err.println(ex.getMessage());
+ ex.printStackTrace();
+ throw new Error("Evaluation error in: "+print_query(e));
+ }
+ }
+
+ private static Stream reduce_output(Stream s){
+ return null;
+ }
+
+ private static FlatMapFunction cmap_fnc ( final Tree fnc,Environment env ) {
+ final Function f = evalF(fnc,null);
+ return new FlatMapFunction() {
+ @Override
+ public Iterable<Values> execute(TridentTuple input) {
+ List<Values> out = new ArrayList<Values>();
+ MRData value = (MRData)input.get(0);
+ for (MRData e: (Bag)f.eval(value) ){
+ out.add(new Values(e));
+ }
+ return out;
+ }
+ };
+ }
+
+ private static Stream groupBy ( Stream s, Tree fnc, Environment env, Tree o ) {
+ final Function reducer = evalF(fnc,null);
+
+ Stream keyValueStream = s.each(s.getOutputFields(),new BaseFunction() {
+ @Override
+ public void execute(TridentTuple input, TridentCollector output) {
+ Tuple value = (Tuple)input.get(0);
+ MRData key = value.first();
+ MRData values = (MRData)value.second();
+ output.emit(new Values(key,values));
+ }
+ },new Fields("keyfield","valuefield"));
+ GroupedStream groupedStream = keyValueStream.groupBy(new Fields("keyfield"));
+ Stream finalStream = groupedStream.aggregate(new Fields("keyfield","valuefield"), new BaseAggregator<Map<MRData,Bag>>() {
+ @Override
+ public Map<MRData,Bag> init(Object batchId, TridentCollector collector) {
+ return new HashMap<MRData,Bag>();
+ }
+ @Override
+ public void aggregate(Map<MRData,Bag> val, TridentTuple tuple, TridentCollector collector) {
+ MRData key = (MRData)tuple.get(0);
+ Bag values;
+ if(!val.containsKey(key)){
+ values = new Bag();
+ }
+ else{
+ values = val.get(key);
+ }
+ MRData value = (MRData)tuple.get(1);
+ values.add(value);
+ val.put(key,values);
+ }
+
+ @Override
+ public void complete(Map<MRData,Bag> val, TridentCollector collector) {
+ for (Map.Entry<MRData, Bag> entry : val.entrySet()) {
+ MRData key = entry.getKey();
+ Bag value = entry.getValue();
+ Bag reducedValues = (Bag)reducer.eval(new Tuple(key,value));
+ collector.emit(new Values(reducedValues));
+ }
+ }
+ }, new Fields("outputdata"));
+ return finalStream.project(new Fields("outputdata"));
+ }
+ }
diff --git a/storm/src/main/java/org/apache/mrql/StormFileInputStream.java b/storm/src/main/java/org/apache/mrql/StormFileInputStream.java
new file mode 100644
index 0000000..dd63d8f
--- /dev/null
+++ b/storm/src/main/java/org/apache/mrql/StormFileInputStream.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IBatchSpout;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class StormFileInputStream extends BaseRichSpout{
+ private String directory;
+ private boolean is_binary;
+ SpoutOutputCollector _collector;
+
+ public StormFileInputStream(String directory, boolean is_binary) {
+ this.directory = directory;
+ this.is_binary = is_binary;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("data"));
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ _collector = collector;
+ }
+
+ @Override
+ public void nextTuple() {
+ Bag val = (Bag)MapReduceAlgebra.read_binary(directory);
+ _collector.emit(new Values(val));
+}
+}
diff --git a/storm/src/main/java/org/apache/mrql/StormMRQLFileInputFormat.java b/storm/src/main/java/org/apache/mrql/StormMRQLFileInputFormat.java
new file mode 100644
index 0000000..62a1e08
--- /dev/null
+++ b/storm/src/main/java/org/apache/mrql/StormMRQLFileInputFormat.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+public abstract class StormMRQLFileInputFormat extends FileInputFormat<MRContainer,MRContainer> implements MRQLFileInputFormat,Serializable {
+
+ abstract public RecordReader<MRContainer,MRContainer>
+ getRecordReader ( InputSplit split, JobConf job, Reporter reporter ) throws IOException;
+
+ @Override
+ public Bag materialize(final Path file) throws IOException {
+ final JobConf job = new JobConf(Plan.conf,MRQLFileInputFormat.class);
+ setInputPaths(job,file);
+ final InputSplit[] splits = getSplits(job,1);
+ final Reporter reporter = null;
+ final RecordReader<MRContainer,MRContainer> rd = getRecordReader(splits[0],job,reporter);
+ return new Bag(new BagIterator () {
+ RecordReader<MRContainer,MRContainer> reader = rd;
+ MRContainer key = reader.createKey();
+ MRContainer value = reader.createKey();
+ int i = 0;
+ public boolean hasNext () {
+ try {
+ if (reader.next(key,value))
+ return true;
+ do {
+ if (++i >= splits.length)
+ return false;
+ reader.close();
+ reader = getRecordReader(splits[i],job,reporter);
+ } while (!reader.next(key,value));
+ return true;
+ } catch (IOException e) {
+ throw new Error("Cannot collect values from an intermediate result");
+ }
+ }
+ public MRData next () {
+ return value.data();
+ }
+ });
+ }
+
+ @Override
+ public Bag collect(DataSet x, boolean strip) throws Exception {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+}
diff --git a/storm/src/main/java/org/apache/mrql/StormParsedInputFormat.java b/storm/src/main/java/org/apache/mrql/StormParsedInputFormat.java
new file mode 100644
index 0000000..bf66867
--- /dev/null
+++ b/storm/src/main/java/org/apache/mrql/StormParsedInputFormat.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.mrql.gen.Trees;
+
+public class StormParsedInputFormat extends StormMRQLFileInputFormat{
+ public static class ParsedRecordReader implements RecordReader<MRContainer,MRContainer> {
+ final FSDataInputStream fsin;
+ final long start;
+ final long end;
+ Iterator<MRData> result;
+ Parser parser;
+
+ public ParsedRecordReader ( FileSplit split,
+ Configuration conf,
+ Class<? extends Parser> parser_class,
+ Trees args ) throws IOException {
+ start = split.getStart();
+ end = start + split.getLength();
+ Path file = split.getPath();
+ FileSystem fs = file.getFileSystem(conf);
+ fsin = fs.open(split.getPath());
+ try {
+ parser = parser_class.newInstance();
+ } catch (Exception ex) {
+ throw new Error("Unrecognized parser:"+parser_class);
+ };
+ parser.initialize(args);
+ parser.open(fsin,start,end);
+ result = null;
+ }
+
+ public MRContainer createKey () {
+ return new MRContainer();
+ }
+
+ public MRContainer createValue () {
+ return new MRContainer();
+ }
+
+ public synchronized boolean next ( MRContainer key, MRContainer value ) throws IOException {
+ while (result == null || !result.hasNext()) {
+ String s = parser.slice();
+ if (s == null)
+ return false;
+ result = parser.parse(s).iterator();
+ };
+ value.set((MRData)result.next());
+ key.set(new MR_long(fsin.getPos()));
+ return true;
+ }
+
+ public synchronized long getPos () throws IOException { return fsin.getPos(); }
+
+ public synchronized void close () throws IOException { fsin.close(); }
+
+ public float getProgress () throws IOException {
+ if (end == start)
+ return 0.0f;
+ else return Math.min(1.0f, (getPos() - start) / (float)(end - start));
+ }
+ }
+
+
+ @Override
+ public RecordReader<MRContainer, MRContainer> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ StormEvaluator.load_source_dir(); // load the parsed source parameters from a file
+ String path = ((FileSplit)split).getPath().toString();
+ ParsedDataSource ds = (ParsedDataSource)DataSource.get(path,Plan.conf);
+ return new ParsedRecordReader((FileSplit)split,job,ds.parser,(Trees)ds.args);
+ }
+
+}
diff --git a/storm/src/main/java/org/apache/mrql/StormStreaming.gen b/storm/src/main/java/org/apache/mrql/StormStreaming.gen
new file mode 100644
index 0000000..4f2d8c6
--- /dev/null
+++ b/storm/src/main/java/org/apache/mrql/StormStreaming.gen
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.mrql;
+
+ import java.util.ArrayList;
+ import java.io.*;
+ import org.apache.mrql.gen.*;
+ import org.apache.storm.LocalCluster;
+ import org.apache.storm.trident.Stream;
+
+ /** Evaluates physical plans in Apache Storm mode */
+ public class StormStreaming extends StormEvaluator implements Serializable {
+
+ private static ArrayList<Stream> streams;
+ private static ArrayList<String> stream_names;
+ private static int tries = 0;
+ private final static StormEvaluator ef = (StormEvaluator)Evaluator.evaluator;
+
+
+ private static Stream stream_source ( Tree source, Environment env,String streamName) {
+ match source {
+ case BinaryStream(`file,_):
+ String path = ((MR_string)evalE(file,env)).get();
+ new BinaryDataSource(path,Plan.conf);
+ return topology.newStream(streamName, new HDFSFileInputStream(path, is_dataset, new StormBinaryInputFormat()));
+
+ case ParsedStream(`parser,`file,...args):
+ String path = ((MR_string)evalE(file,env)).get();
+ Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
+ if (p == null)
+ throw new Error("Unknown parser: "+parser);
+ new ParsedDataSource(path,p,args,Plan.conf);
+ try {
+ dump_source_dir();
+ } catch (IOException ex) {
+ throw new Error("Cannot dump source directory");
+ };
+ return topology.newStream(streamName, new HDFSFileInputStream(path, is_dataset, new StormParsedInputFormat()));
+
+ };
+ throw new Error("Unknown stream source: "+print_query(source));
+ }
+
+ private static Tree get_streams ( Tree plan, Environment env ) {
+ match plan {
+ case Stream(lambda(`v,`b),`source):
+ streams.add(stream_source(source,env,v.toString()));
+ stream_names.add(v.toString());
+ return get_streams(b,env);
+ };
+ return plan;
+ }
+
+ /** bind the pattern variables to values */
+ private static void bind_list ( Tree pattern, Tree src, Environment env, Environment rdd_env ) {
+ //System.out.println("Inside bind_list ");
+ throw new Error("Operation not implemented.");
+ }
+
+ private static void stream_processing ( final Tree plan, final Environment env,
+ final Environment dataset_env, final Function f ) {
+
+ if (streams.size() == 0)
+ throw new Error("No input streams in query");
+ ArrayList<Stream> rdds = new ArrayList<Stream>();
+ for ( Stream jd: streams )
+ rdds.add(jd);
+
+
+ final ArrayList<String> vars = stream_names;
+
+
+ long t = System.currentTimeMillis();
+ Environment rdd_env = dataset_env;
+ int i = 0;
+
+ for(String name:stream_names){
+ MRData d = new MR_stream(streams.get(i));
+ global_streams = new Environment(vars.get(i),d,global_streams);
+ rdd_env = new Environment(vars.get(i++),d,rdd_env);
+ }
+
+ match plan {
+ case lambda(`pat,`e):
+ bind_list(pat,e,env,rdd_env);
+ f.eval(null);
+ case _: // non-incremental streaming
+ final MRData result = ef.evalE(plan,env);
+ };
+
+}
+
+/** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
+final public static void evaluate ( Tree plan, Environment env, Environment dataset_env, Function f ) {
+ streams = new ArrayList<Stream>();
+ stream_names = new ArrayList<String>();
+ match plan {
+ case lambda(`p,`b):
+ b = get_streams(b,env);
+ stream_processing(#<lambda(`p,`b)>,env,null,f);
+ case _: // non-incremental streaming
+ plan = get_streams(plan,env);
+ stream_processing(plan,env,dataset_env,f);
+ };
+
+ org.apache.storm.Config conf = new org.apache.storm.Config();
+ conf.setFallBackOnJavaSerialization(false);
+ try{
+
+ conf.setMaxSpoutPending(20);
+ //StormSubmitter.submitTopology("mrqlstormevaluator",conf,topology.build());
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("mrqlstormevaluator", conf, topology.build());
+ Thread.sleep(15000);
+ cluster.shutdown();
+ }
+ catch(Exception e2){
+ e2.printStackTrace();
+ }
+}
+}
diff --git a/storm/src/main/java/org/apache/mrql/StreamDataSource.java b/storm/src/main/java/org/apache/mrql/StreamDataSource.java
new file mode 100644
index 0000000..8f49fc8
--- /dev/null
+++ b/storm/src/main/java/org/apache/mrql/StreamDataSource.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class StreamDataSource extends DataSource implements Serializable {
+ Stream stream;
+ long numOfRecords;
+
+ public StreamDataSource(Stream stream) {
+ super();
+ this.stream = stream;
+ }
+
+ @Override
+ public long size(Configuration conf) {
+ return super.size(conf);
+ }
+
+ @Override
+ public List<MRData> take(int num) {
+ final List<MRData> data = new ArrayList<MRData>();
+ stream.each(stream.getOutputFields(),new BaseFunction() {
+ @Override
+ public void execute(TridentTuple tuple, TridentCollector collector) {
+ System.out.println("output: "+tuple);
+ MRData value = (MRData)tuple.get(0);
+ data.add(value);
+ }
+ },new Fields("finaloutput"));
+ return data;
+ }
+
+ @Override
+ public MRData reduce(MRData zero,final Function acc) {
+ return null;
+ }
+}
diff --git a/storm/src/main/java/org/apache/mrql/utils/PrintUtil.java b/storm/src/main/java/org/apache/mrql/utils/PrintUtil.java
new file mode 100644
index 0000000..79b2ee7
--- /dev/null
+++ b/storm/src/main/java/org/apache/mrql/utils/PrintUtil.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql.utils;
+
+import java.util.Map;
+import org.apache.storm.trident.operation.Filter;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+
+public class PrintUtil implements Filter{
+
+ @Override
+ public boolean isKeep(TridentTuple tt) {
+ System.out.println("output: "+tt);
+ return true;
+ }
+
+ @Override
+ public void prepare(Map map, TridentOperationContext toc) {
+
+ }
+
+ @Override
+ public void cleanup() {
+
+ }
+
+
+}
diff --git a/storm/src/test/java/org/apache/mrql/StormEvaluatorLocalModeTest.java b/storm/src/test/java/org/apache/mrql/StormEvaluatorLocalModeTest.java
new file mode 100644
index 0000000..ce1ad9f
--- /dev/null
+++ b/storm/src/test/java/org/apache/mrql/StormEvaluatorLocalModeTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+public class StormEvaluatorLocalModeTest extends EvaluatorTest {
+
+ public void tearDown() throws IOException {
+ super.tearDown();
+ Plan.clean();
+ }
+
+ @Override
+ protected Evaluator createEvaluator() throws Exception {
+ Configuration conf = null;
+
+ Config.bsp_mode = false;
+ Config.spark_mode = false;
+ Config.flink_mode = false;
+ Config.storm_mode = true;
+ Config.map_reduce_mode = false;
+
+ Evaluator.evaluator = new StormEvaluator();
+
+ Config.quiet_execution = true;
+
+ String[] args = new String[] { "-local", "-storm" };
+
+ conf = Evaluator.evaluator.new_configuration();
+ GenericOptionsParser gop = new GenericOptionsParser(conf, args);
+ conf = gop.getConfiguration();
+
+ args = gop.getRemainingArgs();
+
+ Config.hadoop_mode = true;
+ Config.testing = true;
+ Config.parse_args(args, conf);
+
+ Evaluator.evaluator.init(conf);
+
+ return Evaluator.evaluator;
+ }
+}
diff --git a/storm/src/test/java/org/apache/mrql/StormQueryLocalModeTest.java b/storm/src/test/java/org/apache/mrql/StormQueryLocalModeTest.java
new file mode 100644
index 0000000..e3a71c5
--- /dev/null
+++ b/storm/src/test/java/org/apache/mrql/StormQueryLocalModeTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.IOException;
+import java.io.File;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+public class StormQueryLocalModeTest extends QueryTest {
+
+ public void tearDown() throws IOException {
+ super.tearDown();
+ Plan.clean();
+ }
+
+ @Override
+ protected Evaluator createEvaluator() throws Exception {
+ Configuration conf = new Configuration();
+
+ Config.bsp_mode = false;
+ Config.spark_mode = false;
+ Config.flink_mode = false;
+ Config.storm_mode = true;
+ Config.map_reduce_mode = false;
+
+ Evaluator.evaluator = new StormEvaluator();
+
+ Config.quiet_execution = true;
+
+ String[] args = new String[] { "-dist", "-storm","-stream","5" };
+
+ conf = Evaluator.evaluator.new_configuration();
+ GenericOptionsParser gop = new GenericOptionsParser(conf, args);
+ conf = gop.getConfiguration();
+
+ args = gop.getRemainingArgs();
+
+ Config.hadoop_mode = true;
+ Config.testing = true;
+ Config.parse_args(args, conf);
+
+ Evaluator.evaluator.init(conf);
+
+ return Evaluator.evaluator;
+ }
+
+ @Override
+ public void testCore() throws Exception {
+ assertEquals(0, queryAndCompare(new File(queryDir, "core_1.mrql"), resultDir));
+ }
+
+ @Override
+ public void testDistinct() throws Exception {
+ //assertEquals(0, queryAndCompare(new File(queryDir, "distinct_1.mrql"), resultDir));
+ }
+
+ @Override
+ public void testFactorization() throws Exception {
+ // if (!Config.bsp_mode) // matrix factorization needs at least 3 nodes in BSP Hama mode
+ //assertEquals(0, queryAndCompare(new File(queryDir, "factorization_1.mrql"), resultDir));
+ }
+
+ @Override
+ public void testGroupBy() throws Exception {
+ //assertEquals(0, queryAndCompare(new File(queryDir, "group_by_1.mrql"), resultDir));
+ }
+
+ @Override
+ public void testGroupByHaving() throws Exception {
+ //assertEquals(0, queryAndCompare(new File(queryDir, "group_by_having_1.mrql"), resultDir));
+ }
+
+ @Override
+ public void testGroupByOrderBy() throws Exception {
+ //assertEquals(0, queryAndCompare(new File(queryDir, "group_by_order_by_1.mrql"), resultDir));
+ //assertEquals(0, queryAndCompare(new File(queryDir, "group_by_order_by_2.mrql"), resultDir));
+ }
+
+ @Override
+ public void testJoinGroupBy() throws Exception {
+ //assertEquals(0, queryAndCompare(new File(queryDir, "join_group_by_1.mrql"), resultDir));
+ //assertEquals(0, queryAndCompare(new File(queryDir, "join_group_by_2.mrql"), resultDir));
+ //assertEquals(0, queryAndCompare(new File(queryDir, "join_group_by_3.mrql"), resultDir));
+ }
+
+ @Override
+ public void testJoinOrderBy() throws Exception {
+ //assertEquals(0, queryAndCompare(new File(queryDir, "join_order_by_1.mrql"), resultDir));
+ }
+
+ @Override
+ public void testJoins() throws Exception {
+ //assertEquals(0, queryAndCompare(new File(queryDir, "joins_1.mrql"), resultDir));
+ }
+
+ @Override
+ public void testKmeans() throws Exception {
+ //assertEquals(0, queryAndCompare(new File(queryDir, "kmeans_1.mrql"), resultDir));
+ }
+
+ @Override
+ public void testLoop() throws Exception {
+ //assertEquals(0, queryAndCompare(new File(queryDir, "loop_1.mrql"), resultDir));
+ }
+
+ @Override
+ public void testMatrix() throws Exception {
+ //assertEquals(0, queryAndCompare(new File(queryDir, "matrix_1.mrql"), resultDir));
+ }
+
+ @Override
+ public void testNestedSelect() throws Exception {
+ //assertEquals(0, queryAndCompare(new File(queryDir, "nested_select_1.mrql"), resultDir));
+ }
+
+ @Override
+ public void testOrderBy() throws Exception {
+ //assertEquals(0, queryAndCompare(new File(queryDir, "order_by_1.mrql"), resultDir));
+ }
+
+ @Override
+ public void testPagerank() throws Exception {
+ //assertEquals(0, queryAndCompare(new File(queryDir, "pagerank_1.mrql"), resultDir));
+ }
+
+ @Override
+ public void testRelationalJoin() throws Exception {
+ //assertEquals(0, queryAndCompare(new File(queryDir, "relational_join_1.mrql"), resultDir));
+ }
+
+ @Override
+ public void testTotalAggregation() throws Exception {
+ //assertEquals(0, queryAndCompare(new File(queryDir, "total_aggregation_1.mrql"), resultDir));
+ //assertEquals(0, queryAndCompare(new File(queryDir, "total_aggregation_2.mrql"), resultDir));
+ }
+
+ @Override
+ public void testUdf() throws Exception {
+ //assertEquals(0, queryAndCompare(new File(queryDir, "udf_1.mrql"), resultDir));
+ }
+
+ @Override
+ public void testUserAggregation() throws Exception {
+ //assertEquals(0, queryAndCompare(new File(queryDir, "user_aggregation_1.mrql"), resultDir));
+ }
+
+ @Override
+ public void testXml() throws Exception {
+ //assertEquals(0, queryAndCompare(new File(queryDir, "xml_1.mrql"), resultDir));
+ // assertEquals(0, queryAndCompare(new File(queryDir, "xml_2.mrql"), resultDir));
+ }
+}