Merge branch 'master' into STORM-1040
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4eb137b..f513a7f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,6 @@
 ## 0.11.0
+ * STORM-1060: Serialize Calcite plans into JSON format.
+ * STORM-1062: Establish the basic structure of the code generator.
  * STORM-1341: Let topology have own heartbeat timeout for multilang subprocess
  * STORM-1207: Added flux support for IWindowedBolt
  * STORM-1352: Trident should support writing to multiple Kafka clusters.
diff --git a/external/sql/README.md b/external/sql/README.md
new file mode 100644
index 0000000..078acc6
--- /dev/null
+++ b/external/sql/README.md
@@ -0,0 +1,22 @@
+# Storm SQL
+
+Compile SQL queries to Storm topologies.
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
diff --git a/external/sql/pom.xml b/external/sql/pom.xml
new file mode 100644
index 0000000..7884d39
--- /dev/null
+++ b/external/sql/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>sql</artifactId>
+    <packaging>pom</packaging>
+
+    <developers>
+        <developer>
+            <id>haohui</id>
+            <name>Haohui Mai</name>
+            <email>ricetons@gmail.com</email>
+        </developer>
+    </developers>
+
+    <modules>
+        <module>storm-sql-core</module>
+        <module>storm-sql-runtime</module>
+        <module>storm-sql-kafka</module>
+    </modules>
+</project>
diff --git a/external/sql/storm-sql-core/pom.xml b/external/sql/storm-sql-core/pom.xml
new file mode 100644
index 0000000..399d215
--- /dev/null
+++ b/external/sql/storm-sql-core/pom.xml
@@ -0,0 +1,245 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-sql-core</artifactId>
+
+    <developers>
+        <developer>
+            <id>haohui</id>
+            <name>Haohui Mai</name>
+            <email>ricetons@gmail.com</email>
+        </developer>
+    </developers>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-sql-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-sql-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.calcite</groupId>
+            <artifactId>calcite-core</artifactId>
+            <version>${calcite.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <sourceDirectory>src/jvm</sourceDirectory>
+        <testSourceDirectory>src/test</testSourceDirectory>
+        <plugins>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy-fmpp-resources</id>
+                        <phase>initialize</phase>
+                        <goals>
+                            <goal>copy-resources</goal>
+                        </goals>
+                        <configuration>
+                            <outputDirectory>${project.build.directory}/codegen</outputDirectory>
+                            <resources>
+                                <resource>
+                                    <directory>src/codegen</directory>
+                                    <filtering>false</filtering>
+                                </resource>
+                            </resources>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>copy-java-sources</id>
+                        <phase>process-sources</phase>
+                        <goals>
+                            <goal>copy-resources</goal>
+                        </goals>
+                        <configuration>
+                            <outputDirectory>${basedir}/target/classes/</outputDirectory>
+                            <resources>
+                                <resource>
+                                    <directory>src/jvm</directory>
+                                    <filtering>true</filtering>
+                                </resource>
+                                <resource>
+                                    <directory>src/test</directory>
+                                    <filtering>true</filtering>
+                                </resource>
+                                <resource>
+                                    <directory>target/generated-sources</directory>
+                                    <!-- <include>*/org</include> -->
+                                    <filtering>true</filtering>
+                                </resource>
+                            </resources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.2</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <!-- Extract parser grammar template from calcite-core.jar and put
+                     it under ${project.build.directory} where all freemarker templates are. -->
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>2.8</version>
+                <executions>
+                    <execution>
+                        <id>unpack-parser-template</id>
+                        <phase>initialize</phase>
+                        <goals>
+                            <goal>unpack</goal>
+                        </goals>
+                        <configuration>
+                            <artifactItems>
+                                <artifactItem>
+                                    <groupId>org.apache.calcite</groupId>
+                                    <artifactId>calcite-core</artifactId>
+                                    <type>jar</type>
+                                    <overWrite>true</overWrite>
+                                    <outputDirectory>${project.build.directory}/</outputDirectory>
+                                    <includes>**/Parser.jj</includes>
+                                </artifactItem>
+                            </artifactItems>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>com.googlecode.fmpp-maven-plugin</groupId>
+                <artifactId>fmpp-maven-plugin</artifactId>
+                <version>1.0</version>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.freemarker</groupId>
+                        <artifactId>freemarker</artifactId>
+                        <version>2.3.19</version>
+                    </dependency>
+                </dependencies>
+                <executions>
+                    <execution>
+                        <id>generate-fmpp-sources</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                        <configuration>
+                            <cfgFile>${project.build.directory}/codegen/config.fmpp</cfgFile>
+                            <outputDirectory>target/generated-sources</outputDirectory>
+                            <templateDirectory>${project.build.directory}/codegen/templates</templateDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>1.5</version>
+                <executions>
+                    <execution>
+                        <id>add-generated-sources</id>
+                        <phase>process-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>${project.build.directory}/generated-sources</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>javacc-maven-plugin</artifactId>
+                <version>2.4</version>
+                <executions>
+                    <execution>
+                        <phase>generate-sources</phase>
+                        <id>javacc</id>
+                        <goals>
+                            <goal>javacc</goal>
+                        </goals>
+                        <configuration>
+                            <sourceDirectory>${project.build.directory}/generated-sources/</sourceDirectory>
+                            <includes>
+                                <include>**/Parser.jj</include>
+                            </includes>
+                            <lookAhead>2</lookAhead>
+                            <isStatic>false</isStatic>
+                            <outputDirectory>${project.build.directory}/generated-sources/</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <systemPropertyVariables>
+                        <java.library.path>${project.build.directory}/lib</java.library.path>
+                    </systemPropertyVariables>
+                    <forkMode>once</forkMode>
+                    <argLine>-Djava.library.path=${project.build.directory}/lib</argLine>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/external/sql/storm-sql-core/src/codegen/config.fmpp b/external/sql/storm-sql-core/src/codegen/config.fmpp
new file mode 100644
index 0000000..be5a792
--- /dev/null
+++ b/external/sql/storm-sql-core/src/codegen/config.fmpp
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http:# www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+data: {
+  parser:                   tdd(../data/Parser.tdd)
+}
+
+freemarkerLinks: {
+  includes: includes/
+}
diff --git a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
new file mode 100644
index 0000000..db3a675
--- /dev/null
+++ b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
@@ -0,0 +1,64 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http:# www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  # Generated parser implementation class package and name
+  package: "org.apache.storm.sql.parser.impl",
+  class: "StormParserImpl",
+
+  # List of import statements.
+  imports: [
+    "org.apache.calcite.sql.validate.*",
+    "org.apache.calcite.util.*",
+    "org.apache.storm.sql.parser.*",
+    "java.util.*"
+  ]
+
+  # List of keywords.
+  keywords: [
+    "LOCATION",
+    "INPUTFORMAT",
+    "OUTPUTFORMAT",
+    "STORED",
+    "TBLPROPERTIES",
+  ]
+
+  # List of methods for parsing custom SQL statements.
+  statementParserMethods: [
+    "SqlCreateTable()"
+  ]
+
+  # List of methods for parsing custom literals.
+  # Example: ParseJsonLiteral().
+  literalParserMethods: [
+  ]
+
+  # List of methods for parsing custom data types.
+  dataTypeParserMethods: [
+  ]
+
+  # List of files in @includes directory that have parser method
+  # implementations for custom SQL statements, literals or types
+  # given as part of "statementParserMethods", "literalParserMethods" or
+  # "dataTypeParserMethods".
+  implementationFiles: [
+    "parserImpls.ftl"
+  ]
+
+  includeCompoundIdentifier: true,
+  includeBraces: true,
+  includeAdditionalDeclarations: false
+}
diff --git a/external/sql/storm-sql-core/src/codegen/includes/license.ftl b/external/sql/storm-sql-core/src/codegen/includes/license.ftl
new file mode 100644
index 0000000..7e66353
--- /dev/null
+++ b/external/sql/storm-sql-core/src/codegen/includes/license.ftl
@@ -0,0 +1,17 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
diff --git a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl b/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
new file mode 100644
index 0000000..72a8546
--- /dev/null
+++ b/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
@@ -0,0 +1,86 @@
+<#-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  You under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+
+
+private void ColumnDef(List<ColumnDefinition> list) :
+{
+    SqlParserPos pos;
+    SqlIdentifier name;
+    SqlDataTypeSpec type;
+    ColumnConstraint constraint = null;
+    SqlMonotonicity monotonicity = SqlMonotonicity.NOT_MONOTONIC;
+}
+{
+    name = SimpleIdentifier() { pos = getPos(); }
+    type = DataType()
+    [
+      <PRIMARY> <KEY>
+      [ <ASC>   { monotonicity = SqlMonotonicity.INCREASING; }
+      | <DESC>  { monotonicity = SqlMonotonicity.DECREASING; }
+      ]
+      { constraint = new ColumnConstraint.PrimaryKey(monotonicity, getPos()); }
+    ]
+    {
+        list.add(new ColumnDefinition(name, type, constraint, pos));
+    }
+}
+
+SqlNodeList ColumnDefinitionList() :
+{
+    SqlParserPos pos;
+    List<ColumnDefinition> list = Lists.newArrayList();
+}
+{
+    <LPAREN> { pos = getPos(); }
+    ColumnDef(list)
+    ( <COMMA> ColumnDef(list) )*
+    <RPAREN> {
+        return new SqlNodeList(list, pos.plus(getPos()));
+    }
+}
+
+/**
+ * CREATE EXTERNAL TABLE ( IF NOT EXISTS )?
+ *   ( database_name '.' )? table_name ( '(' column_def ( ',' column_def )* ')'
+ *   ( STORED AS INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname )?
+ *   LOCATION location_uri
+ *   ( TBLPROPERTIES tbl_properties )?
+ *   ( AS select_stmt )
+ */
+SqlNode SqlCreateTable() :
+{
+    SqlParserPos pos;
+    SqlIdentifier tblName;
+    SqlNodeList fieldList;
+    SqlNode location;
+    SqlNode input_format_class_name = null, output_format_class_name = null;
+    SqlNode tbl_properties = null;
+    SqlNode select = null;
+}
+{
+    <CREATE> { pos = getPos(); }
+    <EXTERNAL> <TABLE>
+    tblName = CompoundIdentifier()
+    fieldList = ColumnDefinitionList()
+    [
+      <STORED> <AS>
+      <INPUTFORMAT> input_format_class_name = StringLiteral()
+      <OUTPUTFORMAT> output_format_class_name = StringLiteral()
+    ]
+    <LOCATION>
+    location = StringLiteral()
+    [ <TBLPROPERTIES> tbl_properties = StringLiteral() ]
+    [ <AS> select = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) ] {
+        return new SqlCreateTable(pos, tblName, fieldList,
+        input_format_class_name, output_format_class_name, location,
+        tbl_properties, select);
+    }
+}
\ No newline at end of file
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
new file mode 100644
index 0000000..6859f8e
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSql.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql;
+
+import org.apache.storm.sql.runtime.ChannelHandler;
+
+/**
+ * The StormSql class provides standalone, interactive interfaces to execute
+ * SQL statements over streaming data.
+ *
+ * The StormSql class is stateless. The user needs to submit the data
+ * definition language (DDL) statements and the query statements in the same
+ * batch.
+ */
+public abstract class StormSql {
+  public abstract void execute(Iterable<String> statements,
+      ChannelHandler handler) throws Exception;
+
+  public static StormSql construct() {
+    return new StormSqlImpl();
+  }
+}
+
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
new file mode 100644
index 0000000..2350422
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+import org.apache.storm.sql.parser.ColumnConstraint;
+import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler;
+import org.apache.storm.sql.parser.ColumnDefinition;
+import org.apache.storm.sql.parser.SqlCreateTable;
+import org.apache.storm.sql.parser.StormParser;
+import org.apache.storm.sql.runtime.*;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.storm.sql.compiler.CompilerUtil.TableBuilderInfo;
+
+class StormSqlImpl extends StormSql {
+  private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+      RelDataTypeSystem.DEFAULT);
+  private final SchemaPlus schema = Frameworks.createRootSchema(true);
+
+  @Override
+  public void execute(
+      Iterable<String> statements, ChannelHandler result)
+      throws Exception {
+    Map<String, DataSource> dataSources = new HashMap<>();
+    for (String sql : statements) {
+      StormParser parser = new StormParser(sql);
+      SqlNode node = parser.impl().parseSqlStmtEof();
+      if (node instanceof SqlCreateTable) {
+        handleCreateTable((SqlCreateTable) node, dataSources);
+      } else {
+        FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
+            schema).build();
+        Planner planner = Frameworks.getPlanner(config);
+        SqlNode parse = planner.parse(sql);
+        SqlNode validate = planner.validate(parse);
+        RelNode tree = planner.convert(validate);
+        PlanCompiler compiler = new PlanCompiler(typeFactory);
+        AbstractValuesProcessor proc = compiler.compile(tree);
+        proc.initialize(dataSources, result);
+      }
+    }
+  }
+
+  private void handleCreateTable(
+      SqlCreateTable n, Map<String, DataSource> dataSources) {
+    TableBuilderInfo builder = new TableBuilderInfo(typeFactory);
+    List<FieldInfo> fields = new ArrayList<>();
+    for (ColumnDefinition col : n.fieldList()) {
+      builder.field(col.name(), col.type(), col.constraint());
+      RelDataType dataType = col.type().deriveType(typeFactory);
+      Class<?> javaType = (Class<?>)typeFactory.getJavaClass(dataType);
+      ColumnConstraint constraint = col.constraint();
+      boolean isPrimary = constraint != null && constraint instanceof ColumnConstraint.PrimaryKey;
+      fields.add(new FieldInfo(col.name(), javaType, isPrimary));
+    }
+
+    Table table = builder.build();
+    schema.add(n.tableName(), table);
+    DataSource ds = DataSourcesRegistry.construct(n.location(), n
+        .inputFormatClass(), n.outputFormatClass(), fields);
+    if (ds == null) {
+      throw new RuntimeException("Cannot construct data source for " + n
+          .tableName());
+    } else if (dataSources.containsKey(n.tableName())) {
+      throw new RuntimeException("Duplicated definition for table " + n
+          .tableName());
+    }
+    dataSources.put(n.tableName(), ds);
+  }
+}
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
new file mode 100644
index 0000000..30ea0e3
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.*;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Util;
+import org.apache.storm.sql.parser.ColumnConstraint;
+
+import java.util.ArrayList;
+
+import static org.apache.calcite.rel.RelFieldCollation.Direction;
+import static org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING;
+import static org.apache.calcite.rel.RelFieldCollation.Direction.DESCENDING;
+import static org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import static org.apache.calcite.sql.validate.SqlMonotonicity.INCREASING;
+
+public class CompilerUtil {
+  public static String escapeJavaString(String s, boolean nullMeansNull) {
+      if(s == null) {
+        return nullMeansNull ? "null" : "\"\"";
+      } else {
+        String s1 = Util.replace(s, "\\", "\\\\");
+        String s2 = Util.replace(s1, "\"", "\\\"");
+        String s3 = Util.replace(s2, "\n\r", "\\n");
+        String s4 = Util.replace(s3, "\n", "\\n");
+        String s5 = Util.replace(s4, "\r", "\\r");
+        return "\"" + s5 + "\"";
+      }
+  }
+
+  public static class TableBuilderInfo {
+    private final RelDataTypeFactory typeFactory;
+
+    public TableBuilderInfo(RelDataTypeFactory typeFactory) {
+      this.typeFactory = typeFactory;
+    }
+
+    private static class FieldType {
+      private final String name;
+      private final RelDataType relDataType;
+
+      private FieldType(String name, RelDataType relDataType) {
+        this.name = name;
+        this.relDataType = relDataType;
+      }
+
+    }
+
+    private final ArrayList<FieldType> fields = new ArrayList<>();
+    private final ArrayList<Object[]> rows = new ArrayList<>();
+    private int primaryKey = -1;
+    private SqlMonotonicity primaryKeyMonotonicity;
+    private Statistic stats;
+
+    public TableBuilderInfo field(String name, SqlTypeName type) {
+      RelDataType dataType = typeFactory.createSqlType(type);
+      fields.add(new FieldType(name, dataType));
+      return this;
+    }
+
+    public TableBuilderInfo field(String name, SqlDataTypeSpec type, ColumnConstraint constraint) {
+      RelDataType dataType = type.deriveType(typeFactory);
+      if (constraint instanceof ColumnConstraint.PrimaryKey) {
+        ColumnConstraint.PrimaryKey pk = (ColumnConstraint.PrimaryKey) constraint;
+        Preconditions.checkState(primaryKey == -1, "There are more than one primary key in the table");
+        primaryKey = fields.size();
+        primaryKeyMonotonicity = pk.monotonicity();
+      }
+      fields.add(new FieldType(name, dataType));
+      return this;
+    }
+
+    public TableBuilderInfo statistics(Statistic stats) {
+      this.stats = stats;
+      return this;
+    }
+
+    @VisibleForTesting
+    public TableBuilderInfo rows(Object[] data) {
+      rows.add(data);
+      return this;
+    }
+
+    public StreamableTable build() {
+      final Statistic stat = buildStatistic();
+      final Table tbl = new Table() {
+        @Override
+        public RelDataType getRowType(
+            RelDataTypeFactory relDataTypeFactory) {
+          RelDataTypeFactory.FieldInfoBuilder b = relDataTypeFactory.builder();
+          for (FieldType f : fields) {
+            b.add(f.name, f.relDataType);
+          }
+          return b.build();
+        }
+
+        @Override
+        public Statistic getStatistic() {
+          return stat != null ? stat : Statistics.of(rows.size(),
+                                                     ImmutableList.<ImmutableBitSet>of());
+        }
+
+        @Override
+        public Schema.TableType getJdbcTableType() {
+          return Schema.TableType.TABLE;
+        }
+      };
+
+      return new StreamableTable() {
+        @Override
+        public Table stream() {
+          return tbl;
+        }
+
+        @Override
+        public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
+          return tbl.getRowType(relDataTypeFactory);
+        }
+
+        @Override
+        public Statistic getStatistic() {
+          return tbl.getStatistic();
+        }
+
+        @Override
+        public Schema.TableType getJdbcTableType() {
+          return Schema.TableType.TABLE;
+        }
+      };
+    }
+
+    private Statistic buildStatistic() {
+      if (stats != null || primaryKey == -1) {
+        return stats;
+      }
+      Direction dir = primaryKeyMonotonicity == INCREASING ? ASCENDING : DESCENDING;
+      RelFieldCollation collation = new RelFieldCollation(primaryKey, dir, NullDirection.UNSPECIFIED);
+      return Statistics.of(fields.size(), ImmutableList.of(ImmutableBitSet.of(primaryKey)),
+          ImmutableList.of(RelCollations.of(collation)));
+    }
+  }
+}
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
new file mode 100644
index 0000000..01024f0
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/ExprCompiler.java
@@ -0,0 +1,471 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.apache.calcite.adapter.enumerable.NullPolicy;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.tree.Primitive;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.*;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.calcite.util.NlsString;
+import org.apache.calcite.util.Util;
+import org.apache.storm.sql.runtime.StormSqlFunctions;
+
+import java.io.PrintWriter;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
+import java.math.BigDecimal;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.calcite.sql.fun.SqlStdOperatorTable.*;
+
+/**
+ * Compile RexNode on top of the Tuple abstraction.
+ */
+public class ExprCompiler implements RexVisitor<String> {
+  private final PrintWriter pw;
+  private final JavaTypeFactory typeFactory;
+  private static final ImpTable IMP_TABLE = new ImpTable();
+  private int nameCount;
+
+  public ExprCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
+    this.pw = pw;
+    this.typeFactory = typeFactory;
+  }
+
+  @Override
+  public String visitInputRef(RexInputRef rexInputRef) {
+    String name = reserveName();
+    String typeName = javaTypeName(rexInputRef);
+    pw.print(String.format("%s %s = (%s)(_data.get(%d));\n", typeName, name,
+                           typeName, rexInputRef.getIndex()));
+    return name;
+  }
+
+  @Override
+  public String visitLocalRef(RexLocalRef rexLocalRef) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String visitLiteral(RexLiteral rexLiteral) {
+    Object v = rexLiteral.getValue();
+    RelDataType ty = rexLiteral.getType();
+    switch(rexLiteral.getTypeName()) {
+      case BOOLEAN:
+        return v.toString();
+      case CHAR:
+        return CompilerUtil.escapeJavaString(((NlsString) v).getValue(), true);
+      case NULL:
+        return "((" + ((Class<?>)typeFactory.getJavaClass(ty)).getCanonicalName() + ")null)";
+      case DOUBLE:
+      case BIGINT:
+      case DECIMAL:
+        switch (ty.getSqlTypeName()) {
+          case TINYINT:
+          case SMALLINT:
+          case INTEGER:
+            return Long.toString(((BigDecimal) v).longValueExact());
+          case BIGINT:
+            return Long.toString(((BigDecimal)v).longValueExact()) + 'L';
+          case DECIMAL:
+          case FLOAT:
+          case REAL:
+          case DOUBLE:
+            return Util.toScientificNotation((BigDecimal) v);
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException();
+    }
+    return null;
+  }
+
+  @Override
+  public String visitCall(RexCall rexCall) {
+    return IMP_TABLE.compile(this, rexCall);
+  }
+
+  @Override
+  public String visitOver(RexOver rexOver) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String visitCorrelVariable(
+      RexCorrelVariable rexCorrelVariable) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String visitDynamicParam(
+      RexDynamicParam rexDynamicParam) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String visitRangeRef(RexRangeRef rexRangeRef) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String visitFieldAccess(
+      RexFieldAccess rexFieldAccess) {
+    throw new UnsupportedOperationException();
+  }
+
+  private String javaTypeName(RexNode node) {
+    Type ty = typeFactory.getJavaClass(node.getType());
+    return ((Class<?>)ty).getCanonicalName();
+  }
+
+  private String reserveName() {
+    return "t" + ++nameCount;
+  }
+
+  // Only generate inline expressions when comparing primitive types
+  private boolean primitiveCompareExpr(SqlOperator op, RelDataType type) {
+    final Primitive primitive = Primitive.ofBoxOr(typeFactory.getJavaClass(type));
+    return primitive != null &&
+        (op == LESS_THAN || op == LESS_THAN_OR_EQUAL || op == GREATER_THAN || op == GREATER_THAN_OR_EQUAL);
+  }
+
+  private interface CallExprPrinter {
+    String translate(ExprCompiler compiler, RexCall call);
+  }
+
+  /**
+   * Inspired by Calcite's RexImpTable, the ImpTable class maps the operators
+   * to their corresponding implementation that generates the expressions in
+   * the format of Java source code.
+   */
+  private static class ImpTable {
+    private final Map<SqlOperator, CallExprPrinter> translators;
+
+    private ImpTable() {
+      ImmutableMap.Builder<SqlOperator, CallExprPrinter> builder =
+          ImmutableMap.builder();
+      builder
+          .put(builtInMethod(UPPER, BuiltInMethod.UPPER, NullPolicy.STRICT))
+          .put(builtInMethod(LOWER, BuiltInMethod.LOWER, NullPolicy.STRICT))
+          .put(builtInMethod(INITCAP, BuiltInMethod.INITCAP, NullPolicy.STRICT))
+          .put(builtInMethod(SUBSTRING, BuiltInMethod.SUBSTRING, NullPolicy.STRICT))
+          .put(builtInMethod(CHARACTER_LENGTH, BuiltInMethod.CHAR_LENGTH, NullPolicy.STRICT))
+          .put(builtInMethod(CHAR_LENGTH, BuiltInMethod.CHAR_LENGTH, NullPolicy.STRICT))
+          .put(builtInMethod(CONCAT, BuiltInMethod.STRING_CONCAT, NullPolicy.STRICT))
+          .put(infixBinary(LESS_THAN, "<", "lt"))
+          .put(infixBinary(LESS_THAN_OR_EQUAL, "<=", "le"))
+          .put(infixBinary(GREATER_THAN, ">", "gt"))
+          .put(infixBinary(GREATER_THAN_OR_EQUAL, ">=", "ge"))
+          .put(infixBinary(EQUALS, "==", StormSqlFunctions.class, "eq"))
+          .put(infixBinary(NOT_EQUALS, "<>", StormSqlFunctions.class, "ne"))
+          .put(infixBinary(PLUS, "+", "plus"))
+          .put(infixBinary(MINUS, "-", "minus"))
+          .put(infixBinary(MULTIPLY, "*", "multiply"))
+          .put(infixBinary(DIVIDE, "/", "divide"))
+          .put(infixBinary(DIVIDE_INTEGER, "/", "divide"))
+          .put(expect(IS_NULL, null))
+          .put(expectNot(IS_NOT_NULL, null))
+          .put(expect(IS_TRUE, true))
+          .put(expectNot(IS_NOT_TRUE, true))
+          .put(expect(IS_FALSE, false))
+          .put(expectNot(IS_NOT_FALSE, false))
+          .put(AND, AND_EXPR)
+          .put(OR, OR_EXPR)
+          .put(NOT, NOT_EXPR);
+      this.translators = builder.build();
+    }
+
+    private String compile(ExprCompiler compiler, RexCall call) {
+      SqlOperator op = call.getOperator();
+      CallExprPrinter printer = translators.get(op);
+      if (printer == null) {
+        throw new UnsupportedOperationException();
+      } else {
+        return printer.translate(compiler, call);
+      }
+    }
+
+    private Map.Entry<SqlOperator, CallExprPrinter> builtInMethod(
+        final SqlOperator op, final BuiltInMethod method, NullPolicy nullPolicy) {
+      if (nullPolicy != NullPolicy.STRICT) {
+        throw new UnsupportedOperationException();
+      }
+      CallExprPrinter printer = new CallExprPrinter() {
+        @Override
+        public String translate(ExprCompiler compiler, RexCall call) {
+          PrintWriter pw = compiler.pw;
+          String val = compiler.reserveName();
+          pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call), val));
+          List<String> args = new ArrayList<>();
+          for (RexNode op : call.getOperands()) {
+            args.add(op.accept(compiler));
+          }
+          pw.print("if (false) {}\n");
+          for (int i = 0; i < args.size(); ++i) {
+            String arg = args.get(i);
+            if (call.getOperands().get(i).getType().isNullable()) {
+              pw.print(String.format("else if (%2$s == null) { %1$s = null; }\n", val, arg));
+            }
+          }
+          String calc = printMethodCall(method.method, args);
+          pw.print(String.format("else { %1$s = %2$s; }\n", val, calc));
+          return val;
+        }
+      };
+      return new AbstractMap.SimpleImmutableEntry<>(op, printer);
+    }
+
+    private Map.Entry<SqlOperator, CallExprPrinter> infixBinary
+        (final SqlOperator op, final String javaOperator, final Class<?> clazz, final String backupMethodName) {
+      CallExprPrinter trans = new CallExprPrinter() {
+        @Override
+        public String translate(
+            ExprCompiler compiler, RexCall call) {
+          int size = call.getOperands().size();
+          assert size == 2;
+          String val = compiler.reserveName();
+          RexNode op0 = call.getOperands().get(0);
+          RexNode op1 = call.getOperands().get(1);
+          PrintWriter pw = compiler.pw;
+          if (backupMethodName != null) {
+            if (!compiler.primitiveCompareExpr(op, op0.getType())) {
+              String lhs = op0.accept(compiler);
+              String rhs = op1.accept(compiler);
+              pw.print(String.format("%s %s = %s;\n", compiler.javaTypeName(call), val,
+                  printMethodCall(clazz, backupMethodName, true, Lists.newArrayList(lhs, rhs))));
+              return val;
+            }
+          }
+          boolean lhsNullable = op0.getType().isNullable();
+          boolean rhsNullable = op1.getType().isNullable();
+
+          pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call), val));
+          String lhs = op0.accept(compiler);
+          String rhs = op1.accept(compiler);
+          pw.print("if (false) {}\n");
+          if (lhsNullable) {
+            String calc = foldNullExpr(String.format("%s %s %s", lhs, javaOperator, rhs), "null", op1);
+            pw.print(String.format("else if (%2$s == null) { %1$s = %3$s; }\n", val, lhs, calc));
+          }
+          if (rhsNullable) {
+            String calc = foldNullExpr(String.format("%s %s %s", lhs, javaOperator, rhs), "null", op0);
+            pw.print(String.format("else if (%2$s == null) { %1$s = %3$s; }\n", val, rhs, calc));
+          }
+          String calc = String.format("%s %s %s", lhs, javaOperator, rhs);
+          pw.print(String.format("else { %1$s = %2$s; }\n", val, calc));
+          return val;
+        }
+      };
+      return new AbstractMap.SimpleImmutableEntry<>(op, trans);
+    }
+
+    private Map.Entry<SqlOperator, CallExprPrinter> infixBinary
+        (final SqlOperator op, final String javaOperator, final String backupMethodName) {
+      return infixBinary(op, javaOperator, SqlFunctions.class, backupMethodName);
+    }
+
+    private Map.Entry<SqlOperator, CallExprPrinter> expect(
+        SqlOperator op, final Boolean expect) {
+      return expect0(op, expect, false);
+    }
+
+    private Map.Entry<SqlOperator, CallExprPrinter> expectNot(
+        SqlOperator op, final Boolean expect) {
+      return expect0(op, expect, true);
+    }
+
+    private Map.Entry<SqlOperator, CallExprPrinter> expect0(
+        SqlOperator op, final Boolean expect, final boolean negate) {
+      CallExprPrinter trans = new CallExprPrinter() {
+        @Override
+        public String translate(
+            ExprCompiler compiler, RexCall call) {
+          assert call.getOperands().size() == 1;
+          String val = compiler.reserveName();
+          RexNode operand = call.getOperands().get(0);
+          boolean nullable = operand.getType().isNullable();
+          String op = operand.accept(compiler);
+          PrintWriter pw = compiler.pw;
+          if (!nullable) {
+            if (expect == null) {
+              pw.print(String.format("boolean %s = %b;\n", val, !negate));
+            } else {
+              pw.print(String.format("boolean %s = %s == %b;\n", val, op,
+                                     expect ^ negate));
+            }
+          } else {
+            String expr;
+            if (expect == null) {
+              expr = String.format("%s == null", op);
+            } else {
+              expr = String.format("%s == Boolean.%s", op, expect ? "TRUE" :
+                  "FALSE");
+            }
+            if (negate) {
+              expr = String.format("!(%s)", expr);
+            }
+            pw.print(String.format("boolean %s = %s;\n", val, expr));
+          }
+          return val;
+        }
+      };
+      return new AbstractMap.SimpleImmutableEntry<>(op, trans);
+    }
+
+
+    // If any of the arguments are false, result is false;
+    // else if any arguments are null, result is null;
+    // else true.
+    private static final CallExprPrinter AND_EXPR = new CallExprPrinter() {
+      @Override
+      public String translate(
+          ExprCompiler compiler, RexCall call) {
+        String val = compiler.reserveName();
+        PrintWriter pw = compiler.pw;
+        pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call),
+                               val));
+        RexNode op0 = call.getOperands().get(0);
+        RexNode op1 = call.getOperands().get(1);
+        boolean lhsNullable = op0.getType().isNullable();
+        boolean rhsNullable = op1.getType().isNullable();
+        String lhs = op0.accept(compiler);
+        if (!lhsNullable) {
+          pw.print(String.format("if (!(%2$s)) { %1$s = false; }\n", val, lhs));
+          pw.print("else {\n");
+          String rhs = op1.accept(compiler);
+          pw.print(String.format("  %1$s = %2$s;\n}\n", val, rhs));
+        } else {
+          String foldedLHS = foldNullExpr(
+              String.format("%1$s == null || %1$s", lhs), "true", op0);
+          pw.print(String.format("if (%s) {\n", foldedLHS));
+          String rhs = op1.accept(compiler);
+          String s;
+          if (rhsNullable) {
+            s = foldNullExpr(
+                String.format("(%2$s != null && !(%2$s)) ? false : %1$s", lhs,
+                    rhs), "null", op1);
+          } else {
+            s = String.format("!(%2$s) ? Boolean.FALSE : %1$s", lhs, rhs);
+          }
+          pw.print(String.format("  %1$s = %2$s;\n", val, s));
+          pw.print(String.format("} else { %1$s = false; }\n", val));
+        }
+        return val;
+      }
+    };
+
+    // If any of the arguments are true, result is true;
+    // else if any arguments are null, result is null;
+    // else false.
+    private static final CallExprPrinter OR_EXPR = new CallExprPrinter() {
+      @Override
+      public String translate(
+          ExprCompiler compiler, RexCall call) {
+        String val = compiler.reserveName();
+        PrintWriter pw = compiler.pw;
+        pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call),
+                               val));
+        RexNode op0 = call.getOperands().get(0);
+        RexNode op1 = call.getOperands().get(1);
+        boolean lhsNullable = op0.getType().isNullable();
+        boolean rhsNullable = op1.getType().isNullable();
+        String lhs = op0.accept(compiler);
+        if (!lhsNullable) {
+          pw.print(String.format("if (%2$s) { %1$s = true; }\n", val, lhs));
+          pw.print("else {\n");
+          String rhs = op1.accept(compiler);
+          pw.print(String.format("  %1$s = %2$s;\n}\n", val, rhs));
+        } else {
+          String foldedLHS = foldNullExpr(
+              String.format("%1$s == null || !(%1$s)", lhs), "true", op0);
+          pw.print(String.format("if (%s) {\n", foldedLHS));
+          String rhs = op1.accept(compiler);
+          String s;
+          if (rhsNullable) {
+            s = foldNullExpr(
+                String.format("(%2$s != null && %2$s) ? true : %1$s", lhs, rhs),
+                "null", op1);
+          } else {
+            s = String.format("%2$s ? Boolean.valueOf(%2$s) : %1$s", lhs, rhs);
+          }
+          pw.print(String.format("  %1$s = %2$s;\n", val, s));
+          pw.print(String.format("} else { %1$s = true; }\n", val));
+        }
+        return val;
+      }
+    };
+
+    private static final CallExprPrinter NOT_EXPR = new CallExprPrinter() {
+      @Override
+      public String translate(
+          ExprCompiler compiler, RexCall call) {
+        String val = compiler.reserveName();
+        PrintWriter pw = compiler.pw;
+        RexNode op = call.getOperands().get(0);
+        String lhs = op.accept(compiler);
+        boolean nullable = call.getType().isNullable();
+        pw.print(String.format("final %s %s;\n", compiler.javaTypeName(call),
+                               val));
+        if (!nullable) {
+          pw.print(String.format("%1$s = !(%2$s);\n", val, lhs));
+        } else {
+          String s = foldNullExpr(
+              String.format("%1$s == null ? null : !(%1$s)", lhs), "null", op);
+          pw.print(String.format("%1$s = %2$s;\n", val, s));
+        }
+        return val;
+      }
+    };
+  }
+
+  private static String foldNullExpr(String notNullExpr, String
+      nullExpr, RexNode op) {
+    if (op instanceof RexLiteral && ((RexLiteral)op).getTypeName() == SqlTypeName.NULL) {
+      return nullExpr;
+    } else {
+      return notNullExpr;
+    }
+  }
+
+  private static String printMethodCall(Method method, List<String> args) {
+    return printMethodCall(method.getDeclaringClass(), method.getName(),
+        Modifier.isStatic(method.getModifiers()), args);
+  }
+
+  private static String printMethodCall(Class<?> clazz, String method, boolean isStatic, List<String> args) {
+    if (isStatic) {
+      return String.format("%s.%s(%s)", clazz.getCanonicalName(), method, Joiner.on(',').join(args));
+    } else {
+      return String.format("%s.%s(%s)", args.get(0), method,
+          Joiner.on(',').join(args.subList(1, args.size())));
+    }
+  }
+
+}
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PostOrderRelNodeVisitor.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PostOrderRelNodeVisitor.java
new file mode 100644
index 0000000..bb7c8d1
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PostOrderRelNodeVisitor.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.*;
+import org.apache.calcite.rel.stream.Delta;
+
+public abstract class PostOrderRelNodeVisitor<T> {
+  public final T traverse(RelNode n) throws Exception {
+    for (RelNode input : n.getInputs()) {
+      traverse(input);
+    }
+
+    if (n instanceof Aggregate) {
+      return visitAggregate((Aggregate) n);
+    } else if (n instanceof Calc) {
+      return visitCalc((Calc) n);
+    } else if (n instanceof Collect) {
+      return visitCollect((Collect) n);
+    } else if (n instanceof Correlate) {
+      return visitCorrelate((Correlate) n);
+    } else if (n instanceof Delta) {
+      return visitDelta((Delta) n);
+    } else if (n instanceof Exchange) {
+      return visitExchange((Exchange) n);
+    } else if (n instanceof Project) {
+      return visitProject((Project) n);
+    } else if (n instanceof Filter) {
+      return visitFilter((Filter) n);
+    } else if (n instanceof Sample) {
+      return visitSample((Sample) n);
+    } else if (n instanceof Sort) {
+      return visitSort((Sort) n);
+    } else if (n instanceof TableModify) {
+      return visitTableModify((TableModify) n);
+    } else if (n instanceof TableScan) {
+      return visitTableScan((TableScan) n);
+    } else if (n instanceof Uncollect) {
+      return visitUncollect((Uncollect) n);
+    } else if (n instanceof Window) {
+      return visitWindow((Window) n);
+    } else {
+      return defaultValue(n);
+    }
+  }
+
+  public T visitAggregate(Aggregate aggregate) throws Exception {
+    return defaultValue(aggregate);
+  }
+
+  public T visitCalc(Calc calc) throws Exception {
+    return defaultValue(calc);
+  }
+
+  public T visitCollect(Collect collect) throws Exception {
+    return defaultValue(collect);
+  }
+
+  public T visitCorrelate(Correlate correlate) throws Exception {
+    return defaultValue(correlate);
+  }
+
+  public T visitDelta(Delta delta) throws Exception {
+    return defaultValue(delta);
+  }
+
+  public T visitExchange(Exchange exchange) throws Exception {
+    return defaultValue(exchange);
+  }
+
+  public T visitProject(Project project) throws Exception {
+    return defaultValue(project);
+  }
+
+  public T visitFilter(Filter filter) throws Exception {
+    return defaultValue(filter);
+  }
+
+  public T visitSample(Sample sample) throws Exception {
+    return defaultValue(sample);
+  }
+
+  public T visitSort(Sort sort) throws Exception {
+    return defaultValue(sort);
+  }
+
+  public T visitTableModify(TableModify modify) throws Exception {
+    return defaultValue(modify);
+  }
+
+  public T visitTableScan(TableScan scan) throws Exception {
+    return defaultValue(scan);
+  }
+
+  public T visitUncollect(Uncollect uncollect) throws Exception {
+    return defaultValue(uncollect);
+  }
+
+  public T visitWindow(Window window) throws Exception {
+    return defaultValue(window);
+  }
+
+  public T defaultValue(RelNode n) {
+    return null;
+  }
+}
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
new file mode 100644
index 0000000..46009e9
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler.backends.standalone;
+
+import com.google.common.base.Joiner;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.storm.sql.compiler.CompilerUtil;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.runtime.AbstractValuesProcessor;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+
+public class PlanCompiler {
+  private static final Joiner NEW_LINE_JOINER = Joiner.on("\n");
+  private static final String PACKAGE_NAME = "org.apache.storm.sql.generated";
+  private static final String PROLOGUE = NEW_LINE_JOINER.join(
+      "// GENERATED CODE", "package " + PACKAGE_NAME + ";", "",
+      "import java.util.Iterator;", "import java.util.Map;",
+      "import backtype.storm.tuple.Values;",
+      "import org.apache.storm.sql.runtime.AbstractChannelHandler;",
+      "import org.apache.storm.sql.runtime.Channels;",
+      "import org.apache.storm.sql.runtime.ChannelContext;",
+      "import org.apache.storm.sql.runtime.ChannelHandler;",
+      "import org.apache.storm.sql.runtime.DataSource;",
+      "import org.apache.storm.sql.runtime.AbstractValuesProcessor;",
+      "public final class Processor extends AbstractValuesProcessor {", "");
+  private static final String INITIALIZER_PROLOGUE = NEW_LINE_JOINER.join(
+      "  @Override",
+      "  public void initialize(Map<String, DataSource> data,",
+      "                         ChannelHandler result) {",
+      "    ChannelContext r = Channels.chain(Channels.voidContext(), result);",
+      ""
+  );
+
+  private final JavaTypeFactory typeFactory;
+
+  public PlanCompiler(JavaTypeFactory typeFactory) {
+    this.typeFactory = typeFactory;
+  }
+
+  private String generateJavaSource(RelNode root) throws Exception {
+    StringWriter sw = new StringWriter();
+    try (PrintWriter pw = new PrintWriter(sw)) {
+      RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
+      printPrologue(pw);
+      compiler.traverse(root);
+      printMain(pw, root);
+      printEpilogue(pw);
+    }
+    return sw.toString();
+  }
+
+  private void printMain(PrintWriter pw, RelNode root) {
+    Set<TableScan> tables = new HashSet<>();
+    pw.print(INITIALIZER_PROLOGUE);
+    chainOperators(pw, root, tables);
+    for (TableScan n : tables) {
+      String escaped = CompilerUtil.escapeJavaString(
+          Joiner.on('.').join(n.getTable().getQualifiedName()), true);
+      String r = NEW_LINE_JOINER.join(
+          "    if (!data.containsKey(%1$s))",
+          "      throw new RuntimeException(\"Cannot find table \" + %1$s);",
+          "  data.get(%1$s).open(CTX_%2$d);",
+          "");
+      pw.print(String.format(r, escaped, n.getId()));
+    }
+    pw.print("  }\n");
+  }
+
+  private void chainOperators(
+      PrintWriter pw, RelNode root, Set<TableScan> tables) {
+    String lastCtx = "r";
+    Queue<RelNode> q = new ArrayDeque<>();
+    q.add(root);
+    RelNode n;
+    while ((n = q.poll()) != null) {
+      pw.print(
+          String.format("    ChannelContext CTX_%d = Channels.chain(%2$s, %3$s);\n",
+              n.getId(), lastCtx, RelNodeCompiler.getStageName(n)));
+      lastCtx = String.format("CTX_%d", n.getId());
+
+      if (n instanceof TableScan) {
+        tables.add((TableScan)n);
+      }
+
+      for (RelNode i : n.getInputs()) {
+        q.add(i);
+      }
+    }
+  }
+
+  public AbstractValuesProcessor compile(RelNode plan) throws Exception {
+    String javaCode = generateJavaSource(plan);
+    ClassLoader cl = new CompilingClassLoader(getClass().getClassLoader(),
+                                              PACKAGE_NAME + ".Processor",
+                                              javaCode, null);
+    return (AbstractValuesProcessor) cl.loadClass(
+        PACKAGE_NAME + ".Processor").newInstance();
+  }
+
+  private static void printEpilogue(
+      PrintWriter pw) throws Exception {
+    pw.print("}\n");
+  }
+
+  private static void printPrologue(PrintWriter pw) {
+    pw.append(PROLOGUE);
+  }
+}
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
new file mode 100644
index 0000000..6d51a11
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler.backends.standalone;
+
+import com.google.common.base.Joiner;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.stream.Delta;
+import org.apache.storm.sql.compiler.ExprCompiler;
+import org.apache.storm.sql.compiler.PostOrderRelNodeVisitor;
+
+import java.io.PrintWriter;
+
+/**
+ * Compile RelNodes into individual functions.
+ */
+class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
+  public static Joiner NEW_LINE_JOINER = Joiner.on('\n');
+
+  private final PrintWriter pw;
+  private final JavaTypeFactory typeFactory;
+  private static final String STAGE_PROLOGUE = NEW_LINE_JOINER.join(
+    "  private static final ChannelHandler %1$s = ",
+    "    new AbstractChannelHandler() {",
+    "    @Override",
+    "    public void dataReceived(ChannelContext ctx, Values _data) {",
+    ""
+  );
+  private static final String STAGE_PASSTHROUGH = NEW_LINE_JOINER.join(
+      "  private static final ChannelHandler %1$s = AbstractChannelHandler.PASS_THROUGH;",
+      "");
+
+  RelNodeCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
+    this.pw = pw;
+    this.typeFactory = typeFactory;
+  }
+
+  @Override
+  public Void visitDelta(Delta delta) throws Exception {
+    pw.print(String.format(STAGE_PASSTHROUGH, getStageName(delta)));
+    return null;
+  }
+
+  @Override
+  public Void visitFilter(Filter filter) throws Exception {
+    beginStage(filter);
+    ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
+    String r = filter.getCondition().accept(compiler);
+    pw.print(String.format("    if (%s) { ctx.emit(_data); }\n", r));
+    endStage();
+    return null;
+  }
+
+  @Override
+  public Void visitProject(Project project) throws Exception {
+    beginStage(project);
+    ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
+
+    int size = project.getChildExps().size();
+    String[] res = new String[size];
+    for (int i = 0; i < size; ++i) {
+      res[i] = project.getChildExps().get(i).accept(compiler);
+    }
+
+    pw.print(String.format("    ctx.emit(new Values(%s));\n",
+                           Joiner.on(',').join(res)));
+    endStage();
+    return null;
+  }
+
+  @Override
+  public Void defaultValue(RelNode n) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Void visitTableScan(TableScan scan) throws Exception {
+    pw.print(String.format(STAGE_PASSTHROUGH, getStageName(scan)));
+    return null;
+  }
+
+  private void beginStage(RelNode n) {
+    pw.print(String.format(STAGE_PROLOGUE, getStageName(n)));
+  }
+
+  private void endStage() {
+    pw.print("  }\n  };\n");
+  }
+
+  static String getStageName(RelNode n) {
+    return n.getClass().getSimpleName().toUpperCase() + "_" + n.getId();
+  }
+}
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java
new file mode 100644
index 0000000..f8bfd12
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/PlanCompiler.java
@@ -0,0 +1,194 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.compiler.backends.trident;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.storm.sql.compiler.CompilerUtil;
+import org.apache.storm.sql.compiler.PostOrderRelNodeVisitor;
+import org.apache.storm.sql.javac.CompilingClassLoader;
+import org.apache.storm.sql.runtime.trident.AbstractTridentProcessor;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+public class PlanCompiler {
+  private static final Joiner NEW_LINE_JOINER = Joiner.on("\n");
+  private static final String PACKAGE_NAME = "org.apache.storm.sql.generated";
+  private static final String PROLOGUE = NEW_LINE_JOINER.join(
+      "// GENERATED CODE", "package " + PACKAGE_NAME + ";", "",
+      "import java.util.List;",
+      "import java.util.Map;",
+      "import backtype.storm.tuple.Fields;",
+      "import backtype.storm.tuple.Values;",
+      "import org.apache.storm.sql.runtime.ISqlTridentDataSource;",
+      "import org.apache.storm.sql.runtime.trident.AbstractTridentProcessor;",
+      "import storm.trident.Stream;",
+      "import storm.trident.TridentTopology;",
+      "import storm.trident.fluent.IAggregatableStream;",
+      "import storm.trident.operation.TridentCollector;",
+      "import storm.trident.operation.BaseFunction;",
+      "import storm.trident.spout.IBatchSpout;",
+      "import storm.trident.tuple.TridentTuple;",
+      "",
+      "public final class TridentProcessor extends AbstractTridentProcessor {",
+      "");
+  private static final String INITIALIZER_PROLOGUE = NEW_LINE_JOINER.join(
+      "  @Override",
+      "  public TridentTopology build(Map<String, ISqlTridentDataSource> _sources) {",
+      "    TridentTopology topo = new TridentTopology();",
+      ""
+  );
+
+  private final JavaTypeFactory typeFactory;
+  public PlanCompiler(JavaTypeFactory typeFactory) {
+    this.typeFactory = typeFactory;
+  }
+
+  private String generateJavaSource(RelNode root) throws Exception {
+    StringWriter sw = new StringWriter();
+    try (PrintWriter pw = new PrintWriter(sw)) {
+      RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
+      printPrologue(pw);
+      compiler.traverse(root);
+      printMain(pw, root);
+      printEpilogue(pw);
+    }
+    return sw.toString();
+  }
+
+  private static class MainFuncCompiler extends PostOrderRelNodeVisitor<Void> {
+    private final PrintWriter pw;
+    private static final String TABLESCAN_TMPL = NEW_LINE_JOINER.join(
+        "if (!_sources.containsKey(%2$s))",
+        "    throw new RuntimeException(\"Cannot find table \" + %2$s);",
+        "Stream _%1$s = topo.newStream(%2$s, _sources.get(%2$s).getProducer());",
+        ""
+    );
+
+    private static final String TABLEMODIFY_TMPL = NEW_LINE_JOINER.join(
+        "Stream _%1$s = _%3$s.each(new Fields(%4$s), _sources.get(%2$s).getConsumer(), new Fields(%5$s));",
+        ""
+    );
+    private static final String TRANSFORMATION_TMPL = NEW_LINE_JOINER.join(
+        "Stream _%1$s = _%2$s.each(new Fields(%3$s), %1$s, new Fields(%4$s)).toStream().project(new Fields(%4$s));",
+        ""
+    );
+
+    private MainFuncCompiler(PrintWriter pw) {
+      this.pw = pw;
+    }
+
+    @Override
+    public Void defaultValue(RelNode n) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Void visitFilter(Filter filter) throws Exception {
+      visitTransformation(filter);
+      return null;
+    }
+
+    @Override
+    public Void visitTableModify(TableModify modify) throws Exception {
+      Preconditions.checkArgument(modify.isInsert(), "Only INSERT statement is supported.");
+      String name = RelNodeCompiler.getStageName(modify);
+      RelNode input = modify.getInput();
+      String inputName = RelNodeCompiler.getStageName(input);
+      pw.print(String.format(TABLEMODIFY_TMPL, name, CompilerUtil.escapeJavaString(
+          Joiner.on('.').join(modify.getTable().getQualifiedName()), true),
+          inputName, getFieldString(input), getFieldString(modify)));
+      return null;
+    }
+
+    @Override
+    public Void visitTableScan(TableScan scan) throws Exception {
+      String name = RelNodeCompiler.getStageName(scan);
+      pw.print(String.format(TABLESCAN_TMPL, name, CompilerUtil.escapeJavaString(
+          Joiner.on('.').join(scan.getTable().getQualifiedName()), true)));
+      return null;
+    }
+
+    @Override
+    public Void visitProject(Project project) throws Exception {
+      visitTransformation(project);
+      return null;
+    }
+
+    private static String getFieldString(RelNode n) {
+      int id = n.getId();
+      StringBuilder sb = new StringBuilder();
+      boolean first = true;
+      for (String f: n.getRowType().getFieldNames()) {
+        if (!first) {
+          sb.append(", ");
+        }
+        if (n instanceof TableScan) {
+          sb.append(CompilerUtil.escapeJavaString(f, true));
+        } else {
+          sb.append(CompilerUtil.escapeJavaString(String.format("%d_%s", id, f), true));
+        }
+        first = false;
+      }
+      return sb.toString();
+    }
+
+    private void visitTransformation(SingleRel node) {
+      String name = RelNodeCompiler.getStageName(node);
+      RelNode input = node.getInput();
+      String inputName = RelNodeCompiler.getStageName(input);
+      pw.print(String.format(TRANSFORMATION_TMPL, name, inputName,
+          getFieldString(input), getFieldString(node)));
+    }
+  }
+
+  private void printMain(PrintWriter pw, RelNode root) throws Exception {
+    pw.print(INITIALIZER_PROLOGUE);
+    MainFuncCompiler compiler = new MainFuncCompiler(pw);
+    compiler.traverse(root);
+    pw.print(String.format("  this.outputStream = _%s;\n", RelNodeCompiler.getStageName(root)));
+    pw.print("  return topo; \n}\n");
+  }
+
+  public AbstractTridentProcessor compile(RelNode plan) throws Exception {
+    String javaCode = generateJavaSource(plan);
+    ClassLoader cl = new CompilingClassLoader(getClass().getClassLoader(),
+                                              PACKAGE_NAME + ".TridentProcessor",
+                                              javaCode, null);
+    return (AbstractTridentProcessor) cl.loadClass(PACKAGE_NAME + ".TridentProcessor").newInstance();
+  }
+
+  private static void printEpilogue(
+      PrintWriter pw) throws Exception {
+    pw.print("}\n");
+  }
+
+  private static void printPrologue(PrintWriter pw) {
+    pw.append(PROLOGUE);
+  }
+}
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/RelNodeCompiler.java
new file mode 100644
index 0000000..1de39d3
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/trident/RelNodeCompiler.java
@@ -0,0 +1,116 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.compiler.backends.trident;
+
+import backtype.storm.tuple.Fields;
+import com.google.common.base.Joiner;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.storm.sql.compiler.ExprCompiler;
+import org.apache.storm.sql.compiler.PostOrderRelNodeVisitor;
+
+import java.io.PrintWriter;
+import java.util.IdentityHashMap;
+import java.util.Map;
+
+/**
+ * Compile RelNodes into individual functions.
+ */
+class RelNodeCompiler extends PostOrderRelNodeVisitor<Void> {
+  public static Joiner NEW_LINE_JOINER = Joiner.on('\n');
+
+  private final PrintWriter pw;
+  private final JavaTypeFactory typeFactory;
+  private static final String STAGE_PROLOGUE = NEW_LINE_JOINER.join(
+    "  private static final BaseFunction %1$s = ",
+    "    new BaseFunction() {",
+    "    @Override",
+    "    public void execute(TridentTuple tuple, TridentCollector collector) {",
+    "      List<Object> _data = tuple.getValues();",
+    ""
+  );
+
+  private final IdentityHashMap<RelNode, Fields> outputFields = new IdentityHashMap<>();
+
+  RelNodeCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
+    this.pw = pw;
+    this.typeFactory = typeFactory;
+  }
+
+  @Override
+  public Void visitFilter(Filter filter) throws Exception {
+    beginStage(filter);
+    ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
+    String r = filter.getCondition().accept(compiler);
+    pw.print(String.format("    if (%s) { collector.emit(_data); }\n", r));
+    endStage();
+    return null;
+  }
+
+  @Override
+  public Void visitProject(Project project) throws Exception {
+    beginStage(project);
+    ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
+
+    int size = project.getChildExps().size();
+    String[] res = new String[size];
+    for (int i = 0; i < size; ++i) {
+      res[i] = project.getChildExps().get(i).accept(compiler);
+    }
+
+    pw.print(String.format("    collector.emit(new Values(%s));\n",
+                           Joiner.on(',').join(res)));
+    endStage();
+    return null;
+  }
+
+  @Override
+  public Void defaultValue(RelNode n) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Void visitTableScan(TableScan scan) throws Exception {
+    return null;
+  }
+
+  @Override
+  public Void visitTableModify(TableModify modify) throws Exception {
+    return null;
+  }
+
+  private void beginStage(RelNode n) {
+    pw.print(String.format(STAGE_PROLOGUE, getStageName(n)));
+  }
+
+  private void endStage() {
+    pw.print("  }\n  };\n");
+  }
+
+  static String getStageName(RelNode n) {
+    return n.getClass().getSimpleName().toUpperCase() + "_" + n.getId();
+  }
+}
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
new file mode 100644
index 0000000..cf76964
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/javac/CompilingClassLoader.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright (C) 2010 Google, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.javac;
+
+
+import javax.tools.DiagnosticListener;
+import javax.tools.FileObject;
+import javax.tools.ForwardingJavaFileManager;
+import javax.tools.JavaCompiler;
+import javax.tools.JavaFileManager;
+import javax.tools.JavaFileObject;
+import javax.tools.SimpleJavaFileObject;
+import javax.tools.ToolProvider;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Collections.singleton;
+
+/**
+ * This is a Java ClassLoader that will attempt to load a class from a string of source code.
+ *
+ * <h3>Example</h3>
+ *
+ * <pre>
+ * String className = "com.foo.MyClass";
+ * String classSource =
+ *   "package com.foo;\n" +
+ *   "public class MyClass implements Runnable {\n" +
+ *   "  @Override public void run() {\n" +
+ *   "   log(\"Hello world\");\n" +
+ *   "  }\n" +
+ *   "}";
+ *
+ * // Load class from source.
+ * ClassLoader classLoader = new CompilingClassLoader(
+ *     parentClassLoader, className, classSource);
+ * Class myClass = classLoader.loadClass(className);
+ *
+ * // Use it.
+ * Runnable instance = (Runnable)myClass.newInstance();
+ * instance.run();
+ * </pre>
+ *
+ * Only one chunk of source can be compiled per instance of CompilingClassLoader. If you need to
+ * compile more, create multiple CompilingClassLoader instances.
+ *
+ * Uses Java 1.6's in built compiler API.
+ *
+ * If the class cannot be compiled, loadClass() will throw a ClassNotFoundException and log the
+ * compile errors to System.err. If you don't want the messages logged, or want to explicitly handle
+ * the messages you can provide your own {@link javax.tools.DiagnosticListener} through
+ * {#setDiagnosticListener()}.
+ *
+ * @see java.lang.ClassLoader
+ * @see javax.tools.JavaCompiler
+ */
+public class CompilingClassLoader extends ClassLoader {
+
+  /**
+   * Thrown when code cannot be compiled.
+   */
+  public static class CompilerException extends Exception {
+    private static final long serialVersionUID = -2936958840023603270L;
+
+    public CompilerException(String message) {
+      super(message);
+    }
+  }
+
+  private final Map<String, ByteArrayOutputStream> byteCodeForClasses = new HashMap<>();
+
+  private static final URI EMPTY_URI;
+
+  static {
+    try {
+      // Needed to keep SimpleFileObject constructor happy.
+      EMPTY_URI = new URI("");
+    } catch (URISyntaxException e) {
+      throw new Error(e);
+    }
+  }
+
+  /**
+   * @param parent Parent classloader to resolve dependencies from.
+   * @param className Name of class to compile. eg. "com.foo.MyClass".
+   * @param sourceCode Java source for class. e.g. "package com.foo; class MyClass { ... }".
+   * @param diagnosticListener Notified of compiler errors (may be null).
+   */
+  public CompilingClassLoader(
+      ClassLoader parent,
+      String className,
+      String sourceCode,
+      DiagnosticListener<JavaFileObject> diagnosticListener)
+      throws CompilerException {
+    super(parent);
+    if (!compileSourceCodeToByteCode(className, sourceCode, diagnosticListener)) {
+      throw new CompilerException("Could not compile " + className);
+    }
+  }
+
+  /**
+   * Override ClassLoader's class resolving method. Don't call this directly, instead use
+   * {@link ClassLoader#loadClass(String)}.
+   */
+  @Override
+  public Class<?> findClass(String name) throws ClassNotFoundException {
+    ByteArrayOutputStream byteCode = byteCodeForClasses.get(name);
+    if (byteCode == null) {
+      throw new ClassNotFoundException(name);
+    }
+    return defineClass(name, byteCode.toByteArray(), 0, byteCode.size());
+  }
+
+  /**
+   * @return Whether compilation was successful.
+   */
+  private boolean compileSourceCodeToByteCode(
+      String className, String sourceCode, DiagnosticListener<JavaFileObject> diagnosticListener) {
+    JavaCompiler javaCompiler = ToolProvider.getSystemJavaCompiler();
+
+    // Set up the in-memory filesystem.
+    InMemoryFileManager fileManager =
+        new InMemoryFileManager(javaCompiler.getStandardFileManager(null, null, null));
+    JavaFileObject javaFile = new InMemoryJavaFile(className, sourceCode);
+
+    // Javac option: remove these when the javac zip impl is fixed
+    // (http://b/issue?id=1822932)
+    System.setProperty("useJavaUtilZip", "true"); // setting value to any non-null string
+    List<String> options = new LinkedList<>();
+    // this is ignored by javac currently but useJavaUtilZip should be
+    // a valid javac XD option, which is another bug
+    options.add("-XDuseJavaUtilZip");
+
+    // Now compile!
+    JavaCompiler.CompilationTask compilationTask =
+        javaCompiler.getTask(
+            null, // Null: log any unhandled errors to stderr.
+            fileManager,
+            diagnosticListener,
+            options,
+            null,
+            singleton(javaFile));
+    return compilationTask.call();
+  }
+
+  /**
+   * Provides an in-memory representation of JavaFileManager abstraction, so we do not need to write
+   * any files to disk.
+   *
+   * When files are written to, rather than putting the bytes on disk, they are appended to buffers
+   * in byteCodeForClasses.
+   *
+   * @see javax.tools.JavaFileManager
+   */
+  private class InMemoryFileManager extends ForwardingJavaFileManager<JavaFileManager> {
+    public InMemoryFileManager(JavaFileManager fileManager) {
+      super(fileManager);
+    }
+
+    @Override
+    public JavaFileObject getJavaFileForOutput(
+        Location location, final String className, JavaFileObject.Kind kind, FileObject sibling)
+        throws IOException {
+      return new SimpleJavaFileObject(EMPTY_URI, kind) {
+        @Override
+        public OutputStream openOutputStream() throws IOException {
+          ByteArrayOutputStream outputStream = byteCodeForClasses.get(className);
+          if (outputStream != null) {
+            throw new IllegalStateException("Cannot write more than once");
+          }
+          // Reasonable size for a simple .class.
+          outputStream = new ByteArrayOutputStream(256);
+          byteCodeForClasses.put(className, outputStream);
+          return outputStream;
+        }
+      };
+    }
+  }
+
+  private static class InMemoryJavaFile extends SimpleJavaFileObject {
+    private final String sourceCode;
+
+    public InMemoryJavaFile(String className, String sourceCode) {
+      super(makeUri(className), Kind.SOURCE);
+      this.sourceCode = sourceCode;
+    }
+
+    private static URI makeUri(String className) {
+      try {
+        return new URI(className.replaceAll("\\.", "/") + Kind.SOURCE.extension);
+      } catch (URISyntaxException e) {
+        throw new RuntimeException(e); // Not sure what could cause this.
+      }
+    }
+
+    @Override
+    public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException {
+      return sourceCode;
+    }
+  }
+}
\ No newline at end of file
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
new file mode 100644
index 0000000..c67d8e7
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.parser;
+
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
+
+public class ColumnConstraint extends SqlLiteral {
+  private ColumnConstraint(
+      Object value, SqlTypeName typeName, SqlParserPos pos) {
+    super(value, typeName, pos);
+  }
+
+  public static class PrimaryKey extends ColumnConstraint {
+    private final SqlMonotonicity monotonicity;
+    public PrimaryKey(SqlMonotonicity monotonicity, SqlParserPos pos) {
+      super(SqlDDLKeywords.PRIMARY, SqlTypeName.SYMBOL, pos);
+      this.monotonicity = monotonicity;
+    }
+    public SqlMonotonicity monotonicity() {
+      return monotonicity;
+    }
+  }
+}
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java
new file mode 100644
index 0000000..3520b86
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnDefinition.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.parser;
+
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.Arrays;
+
+public class ColumnDefinition extends SqlNodeList {
+  public ColumnDefinition(
+      SqlIdentifier name, SqlDataTypeSpec type, ColumnConstraint constraint, SqlParserPos pos) {
+    super(Arrays.asList(name, type, constraint), pos);
+  }
+
+  public String name() {
+    return get(0).toString();
+  }
+
+  public SqlDataTypeSpec type() {
+    return (SqlDataTypeSpec) get(1);
+  }
+
+  public ColumnConstraint constraint() {
+    return (ColumnConstraint) get(2);
+  }
+}
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
new file mode 100644
index 0000000..8fe4160
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlCreateTable.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.parser;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.net.URI;
+import java.util.List;
+
+public class SqlCreateTable extends SqlCall {
+  public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator(
+      "CREATE_TABLE", SqlKind.OTHER) {
+    @Override
+    public SqlCall createCall(
+        SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... o) {
+      assert functionQualifier == null;
+      return new SqlCreateTable(pos, (SqlIdentifier) o[0], (SqlNodeList) o[1],
+                                o[2], o[3], o[4], o[5], o[6]);
+    }
+
+    @Override
+    public void unparse(
+        SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
+      SqlCreateTable t = (SqlCreateTable) call;
+      UnparseUtil u = new UnparseUtil(writer, leftPrec, rightPrec);
+      u.keyword("CREATE", "EXTERNAL", "TABLE").node(t.tblName).nodeList(
+          t.fieldList);
+      if (t.inputFormatClass != null && t.outputFormatClass != null) {
+        u.keyword("STORED", "AS", "INPUTFORMAT").node(
+            t.inputFormatClass).keyword("OUTPUTFORMAT").node(
+            t.outputFormatClass);
+      }
+      u.keyword("LOCATION").node(t.location);
+      if (t.properties != null) {
+        u.keyword("TBLPROPERTIES").node(t.properties);
+      }
+      if (t.query != null) {
+        u.keyword("AS").node(t.query);
+      }
+    }
+  };
+
+  private final SqlIdentifier tblName;
+  private final SqlNodeList fieldList;
+  private final SqlNode inputFormatClass;
+  private final SqlNode outputFormatClass;
+  private final SqlNode location;
+  private final SqlNode properties;
+  private final SqlNode query;
+
+  public SqlCreateTable(
+      SqlParserPos pos, SqlIdentifier tblName, SqlNodeList fieldList,
+      SqlNode inputFormatClass, SqlNode outputFormatClass, SqlNode location,
+      SqlNode properties, SqlNode query) {
+    super(pos);
+    this.tblName = tblName;
+    this.fieldList = fieldList;
+    this.inputFormatClass = inputFormatClass;
+    this.outputFormatClass = outputFormatClass;
+    this.location = location;
+    this.properties = properties;
+    this.query = query;
+  }
+
+  @Override
+  public SqlOperator getOperator() {
+    return OPERATOR;
+  }
+
+  @Override
+  public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+    getOperator().unparse(writer, this, leftPrec, rightPrec);
+  }
+
+  @Override
+  public List<SqlNode> getOperandList() {
+    return ImmutableNullableList.of(tblName, fieldList, inputFormatClass,
+                                    outputFormatClass, location, properties,
+                                    query);
+  }
+
+  public String tableName() {
+    return tblName.toString();
+  }
+
+  public URI location() {
+    return URI.create(SqlLiteral.stringValue(location));
+  }
+
+  public String inputFormatClass() {
+    return inputFormatClass == null ? null : SqlLiteral.stringValue(
+        inputFormatClass);
+  }
+
+  public String outputFormatClass() {
+    return outputFormatClass == null ? null : SqlLiteral.stringValue
+        (outputFormatClass);
+  }
+
+  @SuppressWarnings("unchecked")
+  public List<ColumnDefinition> fieldList() {
+    return (List<ColumnDefinition>)((List<? extends SqlNode>)fieldList.getList());
+  }
+
+}
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java
new file mode 100644
index 0000000..3112e53
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.parser;
+
+import org.apache.calcite.sql.SqlLiteral;
+
+/**
+ * Define the keywords that can occur in a CREATE TABLE statement
+ */
+public enum SqlDDLKeywords implements SqlLiteral.SqlSymbol {
+  PRIMARY
+}
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
new file mode 100644
index 0000000..670901e
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/StormParser.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.parser;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.calcite.config.Lex;
+import org.apache.storm.sql.parser.impl.StormParserImpl;
+
+import java.io.StringReader;
+
+public class StormParser {
+  public static final int DEFAULT_IDENTIFIER_MAX_LENGTH = 128;
+  private final StormParserImpl impl;
+
+  public StormParser(String s) {
+    this.impl = new StormParserImpl(new StringReader(s));
+    this.impl.setTabSize(1);
+    this.impl.setQuotedCasing(Lex.ORACLE.quotedCasing);
+    this.impl.setUnquotedCasing(Lex.ORACLE.unquotedCasing);
+    this.impl.setIdentifierMaxLength(DEFAULT_IDENTIFIER_MAX_LENGTH);
+  }
+
+  @VisibleForTesting
+  public StormParserImpl impl() {
+    return impl;
+  }
+}
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/UnparseUtil.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/UnparseUtil.java
new file mode 100644
index 0000000..834fe7c
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/UnparseUtil.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.parser;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+
+class UnparseUtil {
+  private final SqlWriter writer;
+  private final int leftPrec;
+  private final int rightPrec;
+
+  UnparseUtil(SqlWriter writer, int leftPrec, int rightPrec) {
+    this.writer = writer;
+    this.leftPrec = leftPrec;
+    this.rightPrec = rightPrec;
+  }
+
+  UnparseUtil keyword(String... keywords) {
+    for (String k : keywords) {
+      writer.keyword(k);
+    }
+    return this;
+  }
+
+  UnparseUtil node(SqlNode n) {
+    n.unparse(writer, leftPrec, rightPrec);
+    return this;
+  }
+
+  UnparseUtil nodeList(SqlNodeList l) {
+    writer.keyword("(");
+    if (l.size() > 0) {
+      l.get(0).unparse(writer, leftPrec, rightPrec);
+      for (int i = 1; i < l.size(); ++i) {
+        writer.keyword(",");
+        l.get(i).unparse(writer, leftPrec, rightPrec);
+      }
+    }
+    writer.keyword(")");
+    return this;
+  }
+}
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
new file mode 100644
index 0000000..b238e18
--- /dev/null
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql;
+
+import backtype.storm.tuple.Values;
+import org.apache.storm.sql.runtime.*;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestStormSql {
+  private static class MockDataSourceProvider implements DataSourcesProvider {
+    @Override
+    public String scheme() {
+      return "mock";
+    }
+
+    @Override
+    public DataSource construct(
+        URI uri, String inputFormatClass, String outputFormatClass,
+        List<FieldInfo> fields) {
+      return new TestUtils.MockDataSource();
+    }
+
+    @Override
+    public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+         List<FieldInfo> fields) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  @BeforeClass
+  public static void setUp() {
+    DataSourcesRegistry.providerMap().put("mock", new MockDataSourceProvider());
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    DataSourcesRegistry.providerMap().remove("mock");
+  }
+
+  @Test
+  public void testExternalDataSource() throws Exception {
+    List<String> stmt = new ArrayList<>();
+    stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
+    stmt.add("SELECT STREAM ID + 1 FROM FOO WHERE ID > 2");
+    StormSql sql = StormSql.construct();
+    List<Values> values = new ArrayList<>();
+    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+    sql.execute(stmt, h);
+    Assert.assertEquals(2, values.size());
+    Assert.assertEquals(4, values.get(0).get(0));
+    Assert.assertEquals(5, values.get(1).get(0));
+  }
+
+}
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
new file mode 100644
index 0000000..092230f
--- /dev/null
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.StreamableTable;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.*;
+
+public class TestCompilerUtils {
+  public static CalciteState sqlOverDummyTable(String sql)
+      throws RelConversionException, ValidationException, SqlParseException {
+    SchemaPlus schema = Frameworks.createRootSchema(true);
+    JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
+        (RelDataTypeSystem.DEFAULT);
+    StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
+        .field("ID", SqlTypeName.INTEGER).build();
+    Table table = streamableTable.stream();
+    schema.add("FOO", table);
+    schema.add("BAR", table);
+    FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
+        schema).build();
+    Planner planner = Frameworks.getPlanner(config);
+    SqlNode parse = planner.parse(sql);
+    SqlNode validate = planner.validate(parse);
+    RelNode tree = planner.convert(validate);
+    return new CalciteState(schema, tree);
+  }
+
+  public static class CalciteState {
+    final SchemaPlus schema;
+    final RelNode tree;
+
+    private CalciteState(SchemaPlus schema, RelNode tree) {
+      this.schema = schema;
+      this.tree = tree;
+    }
+
+    public SchemaPlus schema() { return schema; }
+    public RelNode tree() { return tree; }
+  }
+
+}
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
new file mode 100644
index 0000000..017aa25
--- /dev/null
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprCompiler.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.junit.Test;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+public class TestExprCompiler {
+  @Test
+  public void testLiteral() throws Exception {
+    String sql = "SELECT 1,1.0,TRUE,'FOO' FROM FOO";
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+    JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+    LogicalProject project = (LogicalProject) state.tree;
+    String[] res = new String[project.getChildExps().size()];
+    for (int i = 0; i < project.getChildExps().size(); ++i) {
+      StringWriter sw = new StringWriter();
+      try (PrintWriter pw = new PrintWriter(sw)) {
+        ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
+        res[i] = project.getChildExps().get(i).accept(compiler);
+      }
+    }
+
+    assertArrayEquals(new String[] {"1", "1.0E0", "true", "\"FOO\""}, res);
+  }
+
+  @Test
+  public void testInputRef() throws Exception {
+    String sql = "SELECT ID FROM FOO";
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+    JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+    LogicalProject project = (LogicalProject) state.tree;
+    StringWriter sw = new StringWriter();
+    try (PrintWriter pw = new PrintWriter(sw)) {
+      ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
+      project.getChildExps().get(0).accept(compiler);
+    }
+
+    assertThat(sw.toString(), containsString("(int)(_data.get(0));"));
+  }
+
+  @Test
+  public void testCallExpr() throws Exception {
+    String sql = "SELECT 1>2, 3+5, 1-1.0, 3+ID FROM FOO";
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+    JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+    LogicalProject project = (LogicalProject) state.tree;
+    String[] res = new String[project.getChildExps().size()];
+    List<StringWriter> sw = new ArrayList<>();
+    for (int i = 0; i < project.getChildExps().size(); ++i) {
+      sw.add(new StringWriter());
+    }
+
+    for (int i = 0; i < project.getChildExps().size(); ++i) {
+      try (PrintWriter pw = new PrintWriter(sw.get(i))) {
+        ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
+        res[i] = project.getChildExps().get(i).accept(compiler);
+      }
+    }
+    assertThat(sw.get(0).toString(), containsString("1 > 2"));
+    assertThat(sw.get(1).toString(), containsString("plus(3,5)"));
+    assertThat(sw.get(2).toString(), containsString("minus(1,1.0E0)"));
+    assertThat(sw.get(3).toString(), containsString("plus(3,"));
+  }
+}
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
new file mode 100644
index 0000000..febfdb5
--- /dev/null
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler;
+
+import backtype.storm.tuple.Values;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.storm.sql.TestUtils;
+import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.AbstractValuesProcessor;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestExprSemantic {
+  private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+      RelDataTypeSystem.DEFAULT);
+
+  @Test
+  public void testLogicalExpr() throws Exception {
+    Values v = testExpr(
+        Lists.newArrayList("ID > 0 OR ID < 1", "ID > 0 AND ID < 1",
+                           "NOT (ID > 0 AND ID < 1)"));
+    assertEquals(new Values(true, false, true), v);
+  }
+
+  @Test
+  public void testExpectOperator() throws Exception {
+    Values v = testExpr(
+        Lists.newArrayList("TRUE IS TRUE", "TRUE IS NOT TRUE",
+                           "UNKNOWN IS TRUE", "UNKNOWN IS NOT TRUE",
+                           "TRUE IS FALSE", "UNKNOWN IS NULL",
+                           "UNKNOWN IS NOT NULL"));
+    assertEquals(new Values(true, false, false, true, false, true, false), v);
+  }
+
+  @Test
+  public void testArithmeticWithNull() throws Exception {
+    Values v = testExpr(
+      Lists.newArrayList(
+          "1 + CAST(NULL AS INT)", "CAST(NULL AS INT) + 1", "CAST(NULL AS INT) + CAST(NULL AS INT)", "1 + 2"
+      ));
+    assertEquals(new Values(null, null, null, 3), v);
+  }
+
+  @Test
+  public void testNotWithNull() throws Exception {
+    Values v = testExpr(
+        Lists.newArrayList(
+            "NOT TRUE", "NOT FALSE", "NOT UNKNOWN"
+        ));
+    assertEquals(new Values(false, true, null), v);
+  }
+
+  @Test
+  public void testAndWithNull() throws Exception {
+    Values v = testExpr(
+        Lists.newArrayList(
+            "UNKNOWN AND TRUE", "UNKNOWN AND FALSE", "UNKNOWN AND UNKNOWN",
+            "TRUE AND TRUE", "TRUE AND FALSE", "TRUE AND UNKNOWN",
+            "FALSE AND TRUE", "FALSE AND FALSE", "FALSE AND UNKNOWN"
+        ));
+    assertEquals(new Values(null, false, null, true, false, null, false,
+                            false, false), v);
+  }
+
+  @Test
+  public void testOrWithNull() throws Exception {
+    Values v = testExpr(
+        Lists.newArrayList(
+            "UNKNOWN OR TRUE", "UNKNOWN OR FALSE", "UNKNOWN OR UNKNOWN",
+            "TRUE OR TRUE", "TRUE OR FALSE", "TRUE OR UNKNOWN",
+            "FALSE OR TRUE", "FALSE OR FALSE", "FALSE OR UNKNOWN"
+            ));
+    assertEquals(new Values(true, null, null, true, true, true, true,
+                            false, null), v);
+  }
+
+  @Test
+  public void testEquals() throws Exception {
+    Values v = testExpr(
+        Lists.newArrayList(
+            "1 = 2", "UNKNOWN = UNKNOWN", "'a' = 'a'", "'a' = UNKNOWN", "UNKNOWN = 'a'", "'a' = 'b'",
+            "1 <> 2", "UNKNOWN <> UNKNOWN", "'a' <> 'a'", "'a' <> UNKNOWN", "UNKNOWN <> 'a'", "'a' <> 'b'"
+        ));
+    assertEquals(new Values(false, null, true, null, null, false,
+        true, null, false, null, null, true), v);
+  }
+
+  @Test
+  public void testStringMethods() throws Exception {
+    Values v = testExpr(
+        Lists.newArrayList(
+            "UPPER('a')", "LOWER('A')", "INITCAP('foo')",
+            "SUBSTRING('foo', 2)", "CHARACTER_LENGTH('foo')", "CHAR_LENGTH('foo')",
+            "'ab' || 'cd'"
+        ));
+    assertEquals(new Values("A", "a", "Foo", "oo", 3, 3, "abcd"), v);
+  }
+
+  private Values testExpr(List<String> exprs) throws Exception {
+    String sql = "SELECT " + Joiner.on(',').join(exprs) + " FROM FOO" +
+        " WHERE ID > 0 AND ID < 2";
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+    PlanCompiler compiler = new PlanCompiler(typeFactory);
+    AbstractValuesProcessor proc = compiler.compile(state.tree);
+    Map<String, DataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockDataSource());
+    List<Values> values = new ArrayList<>();
+    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+    proc.initialize(data, h);
+    return values.get(0);
+  }
+
+}
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
new file mode 100644
index 0000000..e46ae9c
--- /dev/null
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler.backends.standalone;
+
+import backtype.storm.tuple.Values;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.storm.sql.TestUtils;
+import org.apache.storm.sql.compiler.TestCompilerUtils;
+import org.apache.storm.sql.runtime.AbstractValuesProcessor;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestPlanCompiler {
+  private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+      RelDataTypeSystem.DEFAULT);
+
+  @Test
+  public void testCompile() throws Exception {
+    String sql = "SELECT ID + 1 FROM FOO WHERE ID > 2";
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+    PlanCompiler compiler = new PlanCompiler(typeFactory);
+    AbstractValuesProcessor proc = compiler.compile(state.tree());
+    Map<String, DataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockDataSource());
+    List<Values> values = new ArrayList<>();
+    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+    proc.initialize(data, h);
+    Assert.assertArrayEquals(new Values[] { new Values(4), new Values(5)},
+                             values.toArray());
+  }
+
+  @Test
+  public void testLogicalExpr() throws Exception {
+    String sql = "SELECT ID > 0 OR ID < 1, ID > 0 AND ID < 1, NOT (ID > 0 AND ID < 1) FROM FOO WHERE ID > 0 AND ID < 2";
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+    PlanCompiler compiler = new PlanCompiler(typeFactory);
+    AbstractValuesProcessor proc = compiler.compile(state.tree());
+    Map<String, DataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockDataSource());
+    List<Values> values = new ArrayList<>();
+    ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
+    proc.initialize(data, h);
+    Assert.assertEquals(new Values(true, false, true), values.get(0));
+  }
+
+}
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
new file mode 100644
index 0000000..76eba1d
--- /dev/null
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.compiler.backends.standalone;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.storm.sql.compiler.TestCompilerUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+import static org.hamcrest.CoreMatchers.containsString;
+
+public class TestRelNodeCompiler {
+  @Test
+  public void testFilter() throws Exception {
+    String sql = "SELECT ID + 1 FROM FOO WHERE ID > 3";
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+    JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+        RelDataTypeSystem.DEFAULT);
+    LogicalProject project = (LogicalProject) state.tree();
+    LogicalFilter filter = (LogicalFilter) project.getInput();
+
+    try (StringWriter sw = new StringWriter();
+         PrintWriter pw = new PrintWriter(sw)
+    ) {
+      RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
+      compiler.visitFilter(filter);
+      pw.flush();
+      Assert.assertThat(sw.toString(), containsString("> 3"));
+    }
+
+    try (StringWriter sw = new StringWriter();
+         PrintWriter pw = new PrintWriter(sw)
+    ) {
+      RelNodeCompiler compiler = new RelNodeCompiler(pw, typeFactory);
+      compiler.visitProject(project);
+      pw.flush();
+      Assert.assertThat(sw.toString(), containsString("plus("));
+    }
+  }
+}
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
new file mode 100644
index 0000000..a68ba0c
--- /dev/null
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/trident/TestPlanCompiler.java
@@ -0,0 +1,116 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.compiler.backends.trident;
+
+import backtype.storm.Config;
+import backtype.storm.ILocalCluster;
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.storm.sql.TestUtils;
+import org.apache.storm.sql.TestUtils.MockSqlTridentDataSource.CollectDataFunction;
+import org.apache.storm.sql.compiler.TestCompilerUtils;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.apache.storm.sql.runtime.trident.AbstractTridentProcessor;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import storm.trident.TridentTopology;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import static org.apache.storm.sql.TestUtils.MockSqlTridentDataSource.CollectDataFunction.*;
+
+public class TestPlanCompiler {
+  private final JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
+      RelDataTypeSystem.DEFAULT);
+
+  @Before
+  public void setUp() {
+    getCollectedValues().clear();
+  }
+
+  @Test
+  public void testCompile() throws Exception {
+    final int EXPECTED_VALUE_SIZE = 2;
+    String sql = "SELECT ID FROM FOO WHERE ID > 2";
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+    PlanCompiler compiler = new PlanCompiler(typeFactory);
+    final AbstractTridentProcessor proc = compiler.compile(state.tree());
+    final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockSqlTridentDataSource());
+    final TridentTopology topo = proc.build(data);
+    Fields f = proc.outputStream().getOutputFields();
+    proc.outputStream().each(f, new CollectDataFunction(), new Fields()).toStream();
+    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+    Assert.assertArrayEquals(new Values[] { new Values(3), new Values(4)}, getCollectedValues().toArray());
+  }
+
+  @Test
+  public void testInsert() throws Exception {
+    final int EXPECTED_VALUE_SIZE = 1;
+    String sql = "INSERT INTO BAR SELECT ID FROM FOO WHERE ID > 3";
+    TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
+    PlanCompiler compiler = new PlanCompiler(typeFactory);
+    final AbstractTridentProcessor proc = compiler.compile(state.tree());
+    final Map<String, ISqlTridentDataSource> data = new HashMap<>();
+    data.put("FOO", new TestUtils.MockSqlTridentDataSource());
+    data.put("BAR", new TestUtils.MockSqlTridentDataSource());
+    final TridentTopology topo = proc.build(data);
+    runTridentTopology(EXPECTED_VALUE_SIZE, proc, topo);
+    Assert.assertArrayEquals(new Values[] { new Values(4)}, getCollectedValues().toArray());
+  }
+
+  private void runTridentTopology(final int expectedValueSize, AbstractTridentProcessor proc,
+                                  TridentTopology topo) throws Exception {
+    final Config conf = new Config();
+    conf.setMaxSpoutPending(20);
+
+    ILocalCluster cluster = new LocalCluster();
+    StormTopology stormTopo = topo.build();
+    try {
+      Utils.setClassLoaderForJavaDeSerialize(proc.getClass().getClassLoader());
+      cluster.submitTopology("storm-sql", conf, stormTopo);
+      waitForCompletion(1000 * 1000, new Callable<Boolean>() {
+        @Override
+        public Boolean call() throws Exception {
+          return getCollectedValues().size() < expectedValueSize;
+        }
+      });
+    } finally {
+      Utils.resetClassLoaderForJavaDeSerialize();
+      cluster.shutdown();
+    }
+  }
+
+  private void waitForCompletion(long timeout, Callable<Boolean> cond) throws Exception {
+    long start = TestUtils.monotonicNow();
+    while (TestUtils.monotonicNow() - start < timeout && cond.call()) {
+      Thread.sleep(100);
+    }
+  }
+}
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
new file mode 100644
index 0000000..b957565
--- /dev/null
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.sql.parser;
+
+import org.apache.calcite.sql.SqlNode;
+import org.apache.storm.sql.parser.impl.ParseException;
+import org.junit.Test;
+
+public class TestSqlParser {
+  @Test
+  public void testCreateTable() throws Exception {
+    String sql = "CREATE EXTERNAL TABLE foo (bar INT) LOCATION 'kafka:///foo'";
+    parse(sql);
+  }
+
+  @Test
+  public void testCreateTableWithPrimaryKey() throws Exception {
+    String sql = "CREATE EXTERNAL TABLE foo (bar INT PRIMARY KEY ASC) LOCATION 'kafka:///foo'";
+    parse(sql);
+  }
+
+  @Test(expected = ParseException.class)
+  public void testCreateTableWithoutLocation() throws Exception {
+    String sql = "CREATE EXTERNAL TABLE foo (bar INT)";
+    parse(sql);
+  }
+
+  private static SqlNode parse(String sql) throws Exception {
+    StormParser parser = new StormParser(sql);
+    return parser.impl().parseSqlStmtEof();
+  }
+}
\ No newline at end of file
diff --git a/external/sql/storm-sql-kafka/pom.xml b/external/sql/storm-sql-kafka/pom.xml
new file mode 100644
index 0000000..0f6bd19
--- /dev/null
+++ b/external/sql/storm-sql-kafka/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-sql-kafka</artifactId>
+
+    <developers>
+        <developer>
+            <id>haohui</id>
+            <name>Haohui Mai</name>
+            <email>ricetons@gmail.com</email>
+        </developer>
+    </developers>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-sql-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-sql-runtime</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-kafka</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.10</artifactId>
+            <version>0.8.2.1</version>
+            <!-- use provided scope, so users can pull in whichever scala version they choose -->
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.8.2.1</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <sourceDirectory>src/jvm</sourceDirectory>
+        <testSourceDirectory>src/test</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>${basedir}/src/resources</directory>
+            </resource>
+        </resources>
+    </build>
+</project>
diff --git a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java
new file mode 100644
index 0000000..1b45b30
--- /dev/null
+++ b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonScheme.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class JsonScheme implements Scheme {
+  private final List<String> fields;
+
+  JsonScheme(List<String> fields) {
+    this.fields = fields;
+  }
+
+  @Override
+  public List<Object> deserialize(ByteBuffer ser) {
+    ObjectMapper mapper = new ObjectMapper();
+    try {
+      @SuppressWarnings("unchecked")
+      HashMap<String, Object> map = mapper.readValue(Utils.toByteArray(ser), HashMap.class);
+      ArrayList<Object> list = new ArrayList<>();
+      for (String f : fields) {
+        list.add(map.get(f));
+      }
+      return list;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Fields getOutputFields() {
+    return new Fields(fields);
+  }
+}
\ No newline at end of file
diff --git a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
new file mode 100644
index 0000000..7c5aa57
--- /dev/null
+++ b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/JsonSerializer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.lang.CharSet;
+import org.apache.storm.sql.runtime.IOutputSerializer;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.List;
+
+class JsonSerializer implements IOutputSerializer {
+  private final List<String> fieldNames;
+  private transient final JsonFactory jsonFactory;
+
+  JsonSerializer(List<String> fieldNames) {
+    this.fieldNames = fieldNames;
+    jsonFactory = new JsonFactory();
+  }
+
+  @Override
+  public ByteBuffer write(List<Object> data, ByteBuffer buffer) {
+    Preconditions.checkArgument(data != null && data.size() == fieldNames.size(), "Invalid schema");
+    StringWriter sw = new StringWriter();
+    try (JsonGenerator jg = jsonFactory.createGenerator(sw)) {
+      jg.writeStartObject();
+      for (int i = 0; i < fieldNames.size(); ++i) {
+        jg.writeFieldName(fieldNames.get(i));
+        jg.writeObject(data.get(i));
+      }
+      jg.writeEndObject();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return ByteBuffer.wrap(sw.toString().getBytes(Charsets.UTF_8));
+  }
+}
diff --git a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
new file mode 100644
index 0000000..e57e4d3
--- /dev/null
+++ b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import backtype.storm.spout.SchemeAsMultiScheme;
+import com.google.common.base.Preconditions;
+import org.apache.storm.sql.runtime.*;
+import storm.kafka.ZkHosts;
+import storm.kafka.trident.OpaqueTridentKafkaSpout;
+import storm.kafka.trident.TridentKafkaConfig;
+import storm.kafka.trident.TridentKafkaState;
+import storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import storm.kafka.trident.selector.KafkaTopicSelector;
+import storm.trident.operation.BaseFunction;
+import storm.trident.operation.Function;
+import storm.trident.operation.TridentCollector;
+import storm.trident.operation.TridentOperationContext;
+import storm.trident.spout.ITridentDataSource;
+import storm.trident.tuple.TridentTuple;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+/**
+ * Create a Kafka spout based on the URI. The URI has the format of
+ * kafka://zkhost:port/broker_path?topic=topic.
+ */
+public class KafkaDataSourcesProvider implements DataSourcesProvider {
+  private static final int DEFAULT_ZK_PORT = 2181;
+  private static class StaticTopicSelector implements KafkaTopicSelector {
+    private final String topic;
+
+    private StaticTopicSelector(String topic) {
+      this.topic = topic;
+    }
+
+    @Override
+    public String getTopic(TridentTuple tuple) {
+      return topic;
+    }
+  }
+
+  private static class SqlKafkaMapper implements TridentTupleToKafkaMapper<Object, ByteBuffer> {
+    private final int primaryKeyIndex;
+    private final IOutputSerializer serializer;
+
+    private SqlKafkaMapper(int primaryKeyIndex, IOutputSerializer serializer) {
+      this.primaryKeyIndex = primaryKeyIndex;
+      this.serializer = serializer;
+    }
+
+    @Override
+    public Object getKeyFromTuple(TridentTuple tuple) {
+      return tuple.get(primaryKeyIndex);
+    }
+
+    @Override
+    public ByteBuffer getMessageFromTuple(TridentTuple tuple) {
+      return serializer.write(tuple.getValues(), null);
+    }
+  }
+
+  static class KafkaTridentSink extends BaseFunction {
+    private transient TridentKafkaState state;
+    private final String topic;
+    private final int primaryKeyIndex;
+    private final List<String> fieldNames;
+
+    private KafkaTridentSink(String topic, int primaryKeyIndex, List<String> fieldNames) {
+      this.topic = topic;
+      this.primaryKeyIndex = primaryKeyIndex;
+      this.fieldNames = fieldNames;
+    }
+
+    @Override
+    public void cleanup() {
+      super.cleanup();
+    }
+
+    @Override
+    public void prepare(Map conf, TridentOperationContext context) {
+      JsonSerializer serializer = new JsonSerializer(fieldNames);
+      SqlKafkaMapper m = new SqlKafkaMapper(primaryKeyIndex, serializer);
+      state = new TridentKafkaState()
+          .withKafkaTopicSelector(new StaticTopicSelector(topic))
+          .withTridentTupleToKafkaMapper(m);
+    }
+
+    @Override
+    public void execute(TridentTuple tuple, TridentCollector collector) {
+      state.updateState(Collections.singletonList(tuple), collector);
+    }
+  }
+
+  private static class KafkaTridentDataSource implements ISqlTridentDataSource {
+    private final TridentKafkaConfig conf;
+    private final String topic;
+    private final int primaryKeyIndex;
+    private final List<String> fields;
+    private KafkaTridentDataSource(TridentKafkaConfig conf, String topic, int primaryKeyIndex,
+                                   List<String> fields) {
+      this.conf = conf;
+      this.topic = topic;
+      this.primaryKeyIndex = primaryKeyIndex;
+      this.fields = fields;
+    }
+
+    @Override
+    public ITridentDataSource getProducer() {
+      return new OpaqueTridentKafkaSpout(conf);
+    }
+
+    @Override
+    public Function getConsumer() {
+      return new KafkaTridentSink(topic, primaryKeyIndex, fields);
+    }
+  }
+
+  @Override
+  public String scheme() {
+    return "kafka";
+  }
+
+  @Override
+  public DataSource construct(URI uri, String inputFormatClass, String outputFormatClass,
+                              List<FieldInfo> fields) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, String outputFormatClass,
+                                                List<FieldInfo> fields) {
+    int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_ZK_PORT;
+    ZkHosts zk = new ZkHosts(uri.getHost() + ":" + port, uri.getPath());
+    Map<String, String> values = parseURIParams(uri.getQuery());
+    String topic = values.get("topic");
+    Preconditions.checkNotNull(topic, "No topic of the spout is specified");
+    TridentKafkaConfig conf = new TridentKafkaConfig(zk, topic);
+    List<String> fieldNames = new ArrayList<>();
+    int primaryIndex = -1;
+    for (int i = 0; i < fields.size(); ++i) {
+      FieldInfo f = fields.get(i);
+      fieldNames.add(f.name());
+      if (f.isPrimary()) {
+        primaryIndex = i;
+      }
+    }
+    Preconditions.checkState(primaryIndex != -1, "Kafka stream table must have a primary key");
+    conf.scheme = new SchemeAsMultiScheme(new JsonScheme(fieldNames));
+    return new KafkaTridentDataSource(conf, topic, primaryIndex, fieldNames);
+  }
+
+  private static Map<String, String> parseURIParams(String query) {
+    HashMap<String, String> res = new HashMap<>();
+    if (query == null) {
+      return res;
+    }
+
+    String[] params = query.split("&");
+    for (String p : params) {
+      String[] v = p.split("=", 2);
+      if (v.length > 1) {
+        res.put(v[0], v[1]);
+      }
+    }
+    return res;
+  }
+}
diff --git a/external/sql/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider b/external/sql/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
new file mode 100644
index 0000000..7f687cc
--- /dev/null
+++ b/external/sql/storm-sql-kafka/src/resources/META-INF/services/org.apache.storm.sql.runtime.DataSourcesProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.storm.sql.kafka.KafkaDataSourcesProvider
\ No newline at end of file
diff --git a/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java b/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java
new file mode 100644
index 0000000..5973672
--- /dev/null
+++ b/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestJsonRepresentation.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import backtype.storm.utils.Utils;
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class TestJsonRepresentation {
+  @Test
+  public void testJsonScheme() {
+    final List<String> fields = Lists.newArrayList("ID", "val");
+    final String s = "{\"ID\": 1, \"val\": \"2\"}";
+    JsonScheme scheme = new JsonScheme(fields);
+    List<Object> o = scheme.deserialize(ByteBuffer.wrap(s.getBytes(Charset.defaultCharset())));
+    assertArrayEquals(new Object[] {1, "2"}, o.toArray());
+  }
+
+  @Test
+  public void testJsonSerializer() {
+    final List<String> fields = Lists.newArrayList("ID", "val");
+    List<Object> o = Lists.<Object> newArrayList(1, "2");
+    JsonSerializer s = new JsonSerializer(fields);
+    ByteBuffer buf = s.write(o, null);
+    byte[] b = Utils.toByteArray(buf);
+    assertEquals("{\"ID\":1,\"val\":\"2\"}", new String(b));
+  }
+}
diff --git a/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java b/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
new file mode 100644
index 0000000..418bc68
--- /dev/null
+++ b/external/sql/storm-sql-kafka/src/test/org/apache/storm/sql/kafka/TestKafkaDataSourcesProvider.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.kafka;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.sql.kafka.KafkaDataSourcesProvider.KafkaTridentSink;
+import org.apache.storm.sql.runtime.DataSourcesRegistry;
+import org.apache.storm.sql.runtime.FieldInfo;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.internal.util.reflection.Whitebox;
+import storm.kafka.trident.TridentKafkaState;
+import storm.trident.tuple.TridentTuple;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Future;
+
+import static org.mockito.Mockito.*;
+
+public class TestKafkaDataSourcesProvider {
+  private static final List<FieldInfo> FIELDS = ImmutableList.of(
+      new FieldInfo("ID", int.class, true),
+      new FieldInfo("val", String.class, false));
+  private static final List<String> FIELD_NAMES = ImmutableList.of("ID", "val");
+  private static final JsonSerializer SERIALIZER = new JsonSerializer(FIELD_NAMES);
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testKafkaSink() {
+    ISqlTridentDataSource ds = DataSourcesRegistry.constructTridentDataSource(
+        URI.create("kafka://mock?topic=foo"), null, null, FIELDS);
+    Assert.assertNotNull(ds);
+    KafkaTridentSink sink = (KafkaTridentSink) ds.getConsumer();
+    sink.prepare(null, null);
+    TridentKafkaState state = (TridentKafkaState) Whitebox.getInternalState(sink, "state");
+    KafkaProducer producer = mock(KafkaProducer.class);
+    doReturn(mock(Future.class)).when(producer).send(any(ProducerRecord.class));
+    Whitebox.setInternalState(state, "producer", producer);
+    List<TridentTuple> tupleList = mockTupleList();
+    for (TridentTuple t : tupleList) {
+      state.updateState(Collections.singletonList(t), null);
+      verify(producer).send(argThat(new KafkaMessageMatcher(t)));
+    }
+    verifyNoMoreInteractions(producer);
+  }
+
+  private static List<TridentTuple> mockTupleList() {
+    List<TridentTuple> tupleList = new ArrayList<>();
+    TridentTuple t0 = mock(TridentTuple.class);
+    TridentTuple t1 = mock(TridentTuple.class);
+    doReturn(1).when(t0).get(0);
+    doReturn(2).when(t1).get(0);
+    doReturn(Lists.<Object>newArrayList(1, "2")).when(t0).getValues();
+    doReturn(Lists.<Object>newArrayList(2, "3")).when(t1).getValues();
+    tupleList.add(t0);
+    tupleList.add(t1);
+    return tupleList;
+  }
+
+  private static class KafkaMessageMatcher extends ArgumentMatcher<ProducerRecord> {
+    private static final int PRIMARY_INDEX = 0;
+    private final TridentTuple tuple;
+
+    private KafkaMessageMatcher(TridentTuple tuple) {
+      this.tuple = tuple;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public boolean matches(Object o) {
+      ProducerRecord<Object, ByteBuffer> m = (ProducerRecord<Object,ByteBuffer>)o;
+      if (m.key() != tuple.get(PRIMARY_INDEX)) {
+        return false;
+      }
+      ByteBuffer buf = m.value();
+      ByteBuffer b = SERIALIZER.write(tuple.getValues(), null);
+      return b.equals(buf);
+    }
+  }
+
+}
diff --git a/external/sql/storm-sql-runtime/pom.xml b/external/sql/storm-sql-runtime/pom.xml
new file mode 100644
index 0000000..eb6580a
--- /dev/null
+++ b/external/sql/storm-sql-runtime/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-sql-runtime</artifactId>
+
+    <developers>
+        <developer>
+            <id>haohui</id>
+            <name>Haohui Mai</name>
+            <email>ricetons@gmail.com</email>
+        </developer>
+    </developers>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+      <sourceDirectory>src/jvm</sourceDirectory>
+      <testSourceDirectory>src/test</testSourceDirectory>
+      <plugins>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-jar-plugin</artifactId>
+          <executions>
+            <execution>
+              <goals>
+                <goal>test-jar</goal>
+              </goals>
+            </execution>
+          </executions>
+        </plugin>
+      </plugins>
+    </build>
+</project>
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
new file mode 100644
index 0000000..892d2e4
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
@@ -0,0 +1,44 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.runtime;
+
+import backtype.storm.tuple.Values;
+
+public abstract class AbstractChannelHandler implements ChannelHandler {
+  @Override
+  public abstract void dataReceived(ChannelContext ctx, Values data);
+
+  @Override
+  public void channelInactive(ChannelContext ctx) {
+
+  }
+
+  @Override
+  public void exceptionCaught(Throwable cause) {
+
+  }
+
+  public static final AbstractChannelHandler PASS_THROUGH = new AbstractChannelHandler() {
+    @Override
+    public void dataReceived(ChannelContext ctx, Values data) {
+      ctx.emit(data);
+    }
+  };
+}
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
new file mode 100644
index 0000000..11aa065
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractValuesProcessor.java
@@ -0,0 +1,49 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime;
+
+import backtype.storm.tuple.Values;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
+
+import java.util.Map;
+
+/**
+ * Subclass of AbstractTupleProcessor provides a series of tuple. It
+ * takes a series of iterators of {@link Values} and produces a stream of
+ * tuple.
+ *
+ * The subclass implements the {@see next()} method to provide
+ * the output of the stream. It can choose to return null in {@see next()} to
+ * indicate that this particular iteration is a no-op. SQL processors depend
+ * on this semantic to implement filtering and nullable records.
+ */
+public abstract class AbstractValuesProcessor {
+
+  /**
+   * Initialize the data sources.
+   *
+   * @param data a map from the table name to the iterators of the values.
+   *
+   */
+  public abstract void initialize(Map<String, DataSource> data, ChannelHandler
+      result);
+}
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
new file mode 100644
index 0000000..71aba03
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelContext.java
@@ -0,0 +1,30 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.runtime;
+
+import backtype.storm.tuple.Values;
+
+public interface ChannelContext {
+  /**
+   * Emit data to the next stage of the data pipeline.
+   */
+  void emit(Values data);
+  void fireChannelInactive();
+}
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
new file mode 100644
index 0000000..117f312
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ChannelHandler.java
@@ -0,0 +1,39 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.runtime;
+
+import backtype.storm.tuple.Values;
+
+/**
+ * DataListener provides an event-driven interface for the user to process
+ * series of events.
+ */
+public interface ChannelHandler {
+  void dataReceived(ChannelContext ctx, Values data);
+
+  /**
+   * The producer of the data has indicated that the channel is no longer
+   * active.
+   * @param ctx
+   */
+  void channelInactive(ChannelContext ctx);
+
+  void exceptionCaught(Throwable cause);
+}
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
new file mode 100644
index 0000000..7214f9a
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/Channels.java
@@ -0,0 +1,80 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.runtime;
+
+import backtype.storm.tuple.Values;
+
+public class Channels {
+  private static final ChannelContext VOID_CTX = new ChannelContext() {
+    @Override
+    public void emit(Values data) {}
+
+    @Override
+    public void fireChannelInactive() {}
+  };
+
+  private static class ChannelContextAdapter implements ChannelContext {
+    private final ChannelHandler handler;
+    private final ChannelContext next;
+
+    public ChannelContextAdapter(
+        ChannelContext next, ChannelHandler handler) {
+      this.handler = handler;
+      this.next = next;
+    }
+
+    @Override
+    public void emit(Values data) {
+      handler.dataReceived(next, data);
+    }
+
+    @Override
+    public void fireChannelInactive() {
+      handler.channelInactive(next);
+    }
+  }
+
+  private static class ForwardingChannelContext implements ChannelContext {
+    private final ChannelContext next;
+
+    public ForwardingChannelContext(ChannelContext next) {
+      this.next = next;
+    }
+
+    @Override
+    public void emit(Values data) {
+      next.emit(data);
+    }
+
+    @Override
+    public void fireChannelInactive() {
+      next.fireChannelInactive();
+    }
+  }
+
+  public static ChannelContext chain(
+      ChannelContext next, ChannelHandler handler) {
+    return new ChannelContextAdapter(next, handler);
+  }
+
+  public static ChannelContext voidContext() {
+    return VOID_CTX;
+  }
+}
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
new file mode 100644
index 0000000..3e80cb2
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSource.java
@@ -0,0 +1,29 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.runtime;
+
+/**
+ * A DataSource ingests data in StormSQL. It provides a series of tuple to
+ * the downstream {@link ChannelHandler}.
+ *
+ */
+public interface DataSource {
+  void open(ChannelContext ctx);
+}
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
new file mode 100644
index 0000000..eaabc8d
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesProvider.java
@@ -0,0 +1,50 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+
+public interface DataSourcesProvider {
+  /**
+   * @return the scheme of the data source
+   */
+  String scheme();
+
+  /**
+   * Construct a new data source.
+   * @param uri The URI that specifies the data source. The format of the URI
+   *            is fully customizable.
+   * @param inputFormatClass the name of the class that deserializes data.
+   *                         It is null when unspecified.
+   * @param outputFormatClass the name of the class that serializes data. It
+   *                          is null when unspecified.
+   * @param fields The name of the fields and the schema of the table.
+   */
+  DataSource construct(
+      URI uri, String inputFormatClass, String outputFormatClass,
+      List<FieldInfo> fields);
+
+  ISqlTridentDataSource constructTrident(
+      URI uri, String inputFormatClass, String outputFormatClass,
+      List<FieldInfo> fields);
+}
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
new file mode 100644
index 0000000..0285c97
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/DataSourcesRegistry.java
@@ -0,0 +1,80 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+public class DataSourcesRegistry {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      DataSourcesRegistry.class);
+  private static final Map<String, DataSourcesProvider> providers;
+
+  static {
+    providers = new HashMap<>();
+    ServiceLoader<DataSourcesProvider> loader = ServiceLoader.load(
+        DataSourcesProvider.class);
+    for (DataSourcesProvider p : loader) {
+      LOG.info("Registering scheme {} with {}", p.scheme(), p);
+      providers.put(p.scheme(), p);
+    }
+  }
+
+  private DataSourcesRegistry() {
+  }
+
+  public static DataSource construct(
+      URI uri, String inputFormatClass, String outputFormatClass,
+      List<FieldInfo> fields) {
+    DataSourcesProvider provider = providers.get(uri.getScheme());
+    if (provider == null) {
+      return null;
+    }
+
+    return provider.construct(uri, inputFormatClass, outputFormatClass, fields);
+  }
+
+  public static ISqlTridentDataSource constructTridentDataSource(
+      URI uri, String inputFormatClass, String outputFormatClass,
+      List<FieldInfo> fields) {
+    DataSourcesProvider provider = providers.get(uri.getScheme());
+    if (provider == null) {
+      return null;
+    }
+
+    return provider.constructTrident(uri, inputFormatClass, outputFormatClass, fields);
+  }
+
+  /**
+   * Allow unit tests to inject data sources.
+   */
+  @VisibleForTesting
+  public static Map<String, DataSourcesProvider> providerMap() {
+    return providers;
+  }
+}
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
new file mode 100644
index 0000000..cb1176b
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/FieldInfo.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+/**
+ * Describe each column of the field
+ */
+public class FieldInfo {
+  private final String name;
+  private final Class<?> type;
+  private final boolean isPrimary;
+
+  public FieldInfo(String name, Class<?> type, boolean isPrimary) {
+    this.name = name;
+    this.type = type;
+    this.isPrimary = isPrimary;
+  }
+
+  public String name() {
+    return name;
+  }
+
+  public Class<?> type() {
+    return type;
+  }
+
+  public boolean isPrimary() {
+    return isPrimary;
+  }
+}
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
new file mode 100644
index 0000000..b6670d9
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/IOutputSerializer.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public interface IOutputSerializer {
+  /**
+   * Serialize the data to a ByteBuffer. The caller can pass in a ByteBuffer so that the serializer can reuse the
+   * memory.
+   *
+   * @return A ByteBuffer contains the serialized result.
+   */
+  ByteBuffer write(List<Object> data, ByteBuffer buffer);
+}
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
new file mode 100644
index 0000000..d9e1db7
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/ISqlTridentDataSource.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.runtime;
+
+import storm.trident.operation.Function;
+import storm.trident.spout.IBatchSpout;
+import storm.trident.spout.ITridentDataSource;
+
+/**
+ * A ISqlTridentDataSource specifies how an external data source produces and consumes data.
+ */
+public interface ISqlTridentDataSource {
+  ITridentDataSource getProducer();
+  Function getConsumer();
+}
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
new file mode 100644
index 0000000..62b1019
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/StormSqlFunctions.java
@@ -0,0 +1,36 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql.runtime;
+
+public class StormSqlFunctions {
+  public static Boolean eq(Object b0, Object b1) {
+    if (b0 == null || b1 == null) {
+      return null;
+    }
+    return b0.equals(b1);
+  }
+
+  public static Boolean ne(Object b0, Object b1) {
+    if (b0 == null || b1 == null) {
+      return null;
+    }
+    return !b0.equals(b1);
+  }
+}
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/AbstractTridentProcessor.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/AbstractTridentProcessor.java
new file mode 100644
index 0000000..7faa7e4
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/trident/AbstractTridentProcessor.java
@@ -0,0 +1,43 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.storm.sql.runtime.trident;
+
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import storm.trident.Stream;
+import storm.trident.TridentTopology;
+
+import java.util.Map;
+
+public abstract class AbstractTridentProcessor {
+  protected Stream outputStream;
+  /**
+   * @return the output stream of the SQL
+   */
+  public Stream outputStream() {
+    return outputStream;
+  }
+
+  /**
+   * Construct the trident topology based on the SQL.
+   * @param sources the data sources.
+   */
+  public abstract TridentTopology build(Map<String, ISqlTridentDataSource> sources);
+}
diff --git a/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
new file mode 100644
index 0000000..46aac4a
--- /dev/null
+++ b/external/sql/storm-sql-runtime/src/test/org/apache/storm/sql/TestUtils.java
@@ -0,0 +1,157 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  * <p>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.storm.sql;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.apache.storm.sql.runtime.ChannelContext;
+import org.apache.storm.sql.runtime.ChannelHandler;
+import org.apache.storm.sql.runtime.DataSource;
+import org.apache.storm.sql.runtime.ISqlTridentDataSource;
+import storm.trident.operation.BaseFunction;
+import storm.trident.operation.Function;
+import storm.trident.operation.TridentCollector;
+import storm.trident.spout.IBatchSpout;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class TestUtils {
+  public static class MockDataSource implements DataSource {
+    private final ArrayList<Values> RECORDS = new ArrayList<>();
+
+    public MockDataSource() {
+      for (int i = 0; i < 5; ++i) {
+        RECORDS.add(new Values(i));
+      }
+    }
+
+    @Override
+    public void open(ChannelContext ctx) {
+      for (Values v : RECORDS) {
+        ctx.emit(v);
+      }
+      ctx.fireChannelInactive();
+    }
+  }
+
+  public static class MockSqlTridentDataSource implements ISqlTridentDataSource {
+    @Override
+    public IBatchSpout getProducer() {
+      return new MockSpout();
+    }
+
+    @Override
+    public Function getConsumer() {
+      return new CollectDataFunction();
+    }
+
+    public static class CollectDataFunction extends BaseFunction {
+      /**
+       * Collect all values in a static variable as the instance will go through serialization and deserialization.
+       */
+      private transient static final List<List<Object> > VALUES = new ArrayList<>();
+      public static List<List<Object>> getCollectedValues() {
+        return VALUES;
+      }
+
+      @Override
+      public void execute(TridentTuple tuple, TridentCollector collector) {
+        VALUES.add(tuple.getValues());
+      }
+    }
+
+    private static class MockSpout implements IBatchSpout {
+      private final ArrayList<Values> RECORDS = new ArrayList<>();
+      private final Fields OUTPUT_FIELDS = new Fields("ID");
+
+      public MockSpout() {
+        for (int i = 0; i < 5; ++i) {
+          RECORDS.add(new Values(i));
+        }
+      }
+
+      private boolean emitted = false;
+
+      @Override
+      public void open(Map conf, TopologyContext context) {
+      }
+
+      @Override
+      public void emitBatch(long batchId, TridentCollector collector) {
+        if (emitted) {
+          return;
+        }
+
+        for (Values r : RECORDS) {
+          collector.emit(r);
+        }
+        emitted = true;
+      }
+
+      @Override
+      public void ack(long batchId) {
+      }
+
+      @Override
+      public void close() {
+      }
+
+      @Override
+      public Map<String, Object> getComponentConfiguration() {
+        return null;
+      }
+
+      @Override
+      public Fields getOutputFields() {
+        return OUTPUT_FIELDS;
+      }
+    }
+  }
+
+  public static class CollectDataChannelHandler implements ChannelHandler {
+    private final List<Values> values;
+
+    public CollectDataChannelHandler(List<Values> values) {
+      this.values = values;
+    }
+
+    @Override
+    public void dataReceived(ChannelContext ctx, Values data) {
+      values.add(data);
+    }
+
+    @Override
+    public void channelInactive(ChannelContext ctx) {}
+
+    @Override
+    public void exceptionCaught(Throwable cause) {
+      throw new RuntimeException(cause);
+    }
+  }
+
+  public static long monotonicNow() {
+    final long NANOSECONDS_PER_MILLISECOND = 1000000;
+    return System.nanoTime() / NANOSECONDS_PER_MILLISECOND;
+  }
+}
diff --git a/pom.xml b/pom.xml
index 0da88b0..3ed8e3f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -217,6 +217,7 @@
         <junit.version>4.11</junit.version>
         <metrics-clojure.version>2.5.1</metrics-clojure.version>
         <hdrhistogram.version>2.1.7</hdrhistogram.version>
+        <calcite.version>1.4.0-incubating</calcite.version>
     </properties>
 
     <modules>
@@ -237,6 +238,7 @@
         <module>external/storm-elasticsearch</module>
         <module>external/storm-solr</module>
         <module>external/storm-metrics</module>
+        <module>external/sql</module>
         <module>examples/storm-starter</module>
     </modules>
 
@@ -660,6 +662,16 @@
                 <scope>compile</scope>
             </dependency>
             <dependency>
+                <groupId>org.apache.calcite</groupId>
+                <artifactId>calcite-core</artifactId>
+                <version>${calcite.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-databind</artifactId>
+                <version>${jackson.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>junit</groupId>
                 <artifactId>junit</artifactId>
                 <version>${junit.version}</version>
@@ -860,6 +872,9 @@
                         <exclude>**/.git/**</exclude>
                         <exclude>**/derby.log</exclude>
                         <exclude>**/src/dev/**</exclude>
+                        <!-- Storm SQL -->
+                        <exclude>**/src/codegen/config.fmpp</exclude>
+                        <exclude>**/src/codegen/data/Parser.tdd</exclude>
                     </excludes>
                 </configuration>
             </plugin>
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index c086be2..47d2332 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -26,6 +26,8 @@
 import backtype.storm.serialization.SerializationDelegate;
 import clojure.lang.IFn;
 import clojure.lang.RT;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.io.input.ClassLoaderObjectInputStream;
 import org.apache.commons.lang.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -87,6 +89,7 @@
     private static ThreadLocal<TDeserializer> threadDes = new ThreadLocal<TDeserializer>();
 
     private static SerializationDelegate serializationDelegate;
+    private static ClassLoader cl = ClassLoader.getSystemClassLoader();
 
     static {
         Map conf = readStormConfig();
@@ -170,7 +173,7 @@
     public static <T> T javaDeserialize(byte[] serialized, Class<T> clazz) {
         try {
             ByteArrayInputStream bis = new ByteArrayInputStream(serialized);
-            ObjectInputStream ois = new ObjectInputStream(bis);
+            ObjectInputStream ois = new ClassLoaderObjectInputStream(cl, bis);
             Object ret = ois.readObject();
             ois.close();
             return (T)ret;
@@ -862,5 +865,15 @@
             return defaultValue;
         }
     }
+
+    @VisibleForTesting
+    public static void setClassLoaderForJavaDeSerialize(ClassLoader cl) {
+        Utils.cl = cl;
+    }
+
+    @VisibleForTesting
+    public static void resetClassLoaderForJavaDeSerialize() {
+        Utils.cl = ClassLoader.getSystemClassLoader();
+    }
 }