STORM-1200. Support collations of primary keys.
diff --git a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
index d026027..db3a675 100644
--- a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
+++ b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
@@ -21,6 +21,7 @@
 
   # List of import statements.
   imports: [
+    "org.apache.calcite.sql.validate.*",
     "org.apache.calcite.util.*",
     "org.apache.storm.sql.parser.*",
     "java.util.*"
diff --git a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl b/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
index 2d87d7fa..72a8546 100644
--- a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
+++ b/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
@@ -16,12 +16,18 @@
     SqlIdentifier name;
     SqlDataTypeSpec type;
     ColumnConstraint constraint = null;
+    SqlMonotonicity monotonicity = SqlMonotonicity.NOT_MONOTONIC;
 }
 {
     name = SimpleIdentifier() { pos = getPos(); }
     type = DataType()
-    [ <PRIMARY> <KEY> { constraint = new ColumnConstraint.PrimaryKey
-    (getPos()); } ]
+    [
+      <PRIMARY> <KEY>
+      [ <ASC>   { monotonicity = SqlMonotonicity.INCREASING; }
+      | <DESC>  { monotonicity = SqlMonotonicity.DECREASING; }
+      ]
+      { constraint = new ColumnConstraint.PrimaryKey(monotonicity, getPos()); }
+    ]
     {
         list.add(new ColumnDefinition(name, type, constraint, pos));
     }
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
index d951243..2350422 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -28,8 +28,8 @@
 import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.tools.Planner;
-import org.apache.storm.sql.compiler.PlanCompiler;
 import org.apache.storm.sql.parser.ColumnConstraint;
+import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler;
 import org.apache.storm.sql.parser.ColumnDefinition;
 import org.apache.storm.sql.parser.SqlCreateTable;
 import org.apache.storm.sql.parser.StormParser;
@@ -77,7 +77,7 @@
     TableBuilderInfo builder = new TableBuilderInfo(typeFactory);
     List<FieldInfo> fields = new ArrayList<>();
     for (ColumnDefinition col : n.fieldList()) {
-      builder.field(col.name(), col.type());
+      builder.field(col.name(), col.type(), col.constraint());
       RelDataType dataType = col.type().deriveType(typeFactory);
       Class<?> javaType = (Class<?>)typeFactory.getJavaClass(dataType);
       ColumnConstraint constraint = col.constraint();
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
index 1ef1cb7..30ea0e3 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
@@ -18,20 +18,28 @@
 package org.apache.storm.sql.compiler;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.schema.Schema;
-import org.apache.calcite.schema.Statistic;
-import org.apache.calcite.schema.Statistics;
-import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.*;
 import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Util;
+import org.apache.storm.sql.parser.ColumnConstraint;
 
 import java.util.ArrayList;
 
+import static org.apache.calcite.rel.RelFieldCollation.Direction;
+import static org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING;
+import static org.apache.calcite.rel.RelFieldCollation.Direction.DESCENDING;
+import static org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import static org.apache.calcite.sql.validate.SqlMonotonicity.INCREASING;
+
 public class CompilerUtil {
   public static String escapeJavaString(String s, boolean nullMeansNull) {
       if(s == null) {
@@ -66,6 +74,8 @@
 
     private final ArrayList<FieldType> fields = new ArrayList<>();
     private final ArrayList<Object[]> rows = new ArrayList<>();
+    private int primaryKey = -1;
+    private SqlMonotonicity primaryKeyMonotonicity;
     private Statistic stats;
 
     public TableBuilderInfo field(String name, SqlTypeName type) {
@@ -74,16 +84,14 @@
       return this;
     }
 
-    public TableBuilderInfo field(String name, SqlTypeName type, int
-        precision) {
-      RelDataType dataType = typeFactory.createSqlType(type, precision);
-      fields.add(new FieldType(name, dataType));
-      return this;
-    }
-
-    public TableBuilderInfo field(
-        String name, SqlDataTypeSpec type) {
+    public TableBuilderInfo field(String name, SqlDataTypeSpec type, ColumnConstraint constraint) {
       RelDataType dataType = type.deriveType(typeFactory);
+      if (constraint instanceof ColumnConstraint.PrimaryKey) {
+        ColumnConstraint.PrimaryKey pk = (ColumnConstraint.PrimaryKey) constraint;
+        Preconditions.checkState(primaryKey == -1, "There are more than one primary key in the table");
+        primaryKey = fields.size();
+        primaryKeyMonotonicity = pk.monotonicity();
+      }
       fields.add(new FieldType(name, dataType));
       return this;
     }
@@ -99,9 +107,9 @@
       return this;
     }
 
-    public Table build() {
-      final Statistic stat = stats;
-      return new Table() {
+    public StreamableTable build() {
+      final Statistic stat = buildStatistic();
+      final Table tbl = new Table() {
         @Override
         public RelDataType getRowType(
             RelDataTypeFactory relDataTypeFactory) {
@@ -123,6 +131,38 @@
           return Schema.TableType.TABLE;
         }
       };
+
+      return new StreamableTable() {
+        @Override
+        public Table stream() {
+          return tbl;
+        }
+
+        @Override
+        public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
+          return tbl.getRowType(relDataTypeFactory);
+        }
+
+        @Override
+        public Statistic getStatistic() {
+          return tbl.getStatistic();
+        }
+
+        @Override
+        public Schema.TableType getJdbcTableType() {
+          return Schema.TableType.TABLE;
+        }
+      };
+    }
+
+    private Statistic buildStatistic() {
+      if (stats != null || primaryKey == -1) {
+        return stats;
+      }
+      Direction dir = primaryKeyMonotonicity == INCREASING ? ASCENDING : DESCENDING;
+      RelFieldCollation collation = new RelFieldCollation(primaryKey, dir, NullDirection.UNSPECIFIED);
+      return Statistics.of(fields.size(), ImmutableList.of(ImmutableBitSet.of(primaryKey)),
+          ImmutableList.of(RelCollations.of(collation)));
     }
   }
 }
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
similarity index 97%
rename from external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
rename to external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
index d2d3710..46009e9 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
@@ -15,12 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.sql.compiler;
+package org.apache.storm.sql.compiler.backends.standalone;
 
 import com.google.common.base.Joiner;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableScan;
+import org.apache.storm.sql.compiler.CompilerUtil;
 import org.apache.storm.sql.javac.CompilingClassLoader;
 import org.apache.storm.sql.runtime.AbstractValuesProcessor;
 
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
similarity index 83%
rename from external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java
rename to external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
index eea451f..6d51a11 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.sql.compiler;
+package org.apache.storm.sql.compiler.backends.standalone;
 
 import com.google.common.base.Joiner;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
@@ -23,6 +23,9 @@
 import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.stream.Delta;
+import org.apache.storm.sql.compiler.ExprCompiler;
+import org.apache.storm.sql.compiler.PostOrderRelNodeVisitor;
 
 import java.io.PrintWriter;
 
@@ -41,6 +44,9 @@
     "    public void dataReceived(ChannelContext ctx, Values _data) {",
     ""
   );
+  private static final String STAGE_PASSTHROUGH = NEW_LINE_JOINER.join(
+      "  private static final ChannelHandler %1$s = AbstractChannelHandler.PASS_THROUGH;",
+      "");
 
   RelNodeCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
     this.pw = pw;
@@ -48,6 +54,12 @@
   }
 
   @Override
+  public Void visitDelta(Delta delta) throws Exception {
+    pw.print(String.format(STAGE_PASSTHROUGH, getStageName(delta)));
+    return null;
+  }
+
+  @Override
   public Void visitFilter(Filter filter) throws Exception {
     beginStage(filter);
     ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
@@ -81,9 +93,7 @@
 
   @Override
   public Void visitTableScan(TableScan scan) throws Exception {
-    beginStage(scan);
-    pw.print("    ctx.emit(_data);\n");
-    endStage();
+    pw.print(String.format(STAGE_PASSTHROUGH, getStageName(scan)));
     return null;
   }
 
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
index 6daf6d3..c67d8e7 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
@@ -21,6 +21,7 @@
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
 
 public class ColumnConstraint extends SqlLiteral {
   private ColumnConstraint(
@@ -29,8 +30,13 @@
   }
 
   public static class PrimaryKey extends ColumnConstraint {
-    public PrimaryKey(SqlParserPos pos) {
-      super("PRIMARY", SqlTypeName.SYMBOL, pos);
+    private final SqlMonotonicity monotonicity;
+    public PrimaryKey(SqlMonotonicity monotonicity, SqlParserPos pos) {
+      super(SqlDDLKeywords.PRIMARY, SqlTypeName.SYMBOL, pos);
+      this.monotonicity = monotonicity;
+    }
+    public SqlMonotonicity monotonicity() {
+      return monotonicity;
     }
   }
 }
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java
new file mode 100644
index 0000000..3112e53
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.parser;
+
+import org.apache.calcite.sql.SqlLiteral;
+
+/**
+ * Define the keywords that can occur in a CREATE TABLE statement
+ */
+public enum SqlDDLKeywords implements SqlLiteral.SqlSymbol {
+  PRIMARY
+}
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
index 9facd8a..b238e18 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -63,7 +63,7 @@
   public void testExternalDataSource() throws Exception {
     List<String> stmt = new ArrayList<>();
     stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
-    stmt.add("SELECT ID + 1 FROM FOO WHERE ID > 2");
+    stmt.add("SELECT STREAM ID + 1 FROM FOO WHERE ID > 2");
     StormSql sql = StormSql.construct();
     List<Values> values = new ArrayList<>();
     ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
index c582fdc..0e5fa0b 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
@@ -5,6 +5,7 @@
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.StreamableTable;
 import org.apache.calcite.schema.Table;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.parser.SqlParseException;
@@ -17,8 +18,9 @@
     SchemaPlus schema = Frameworks.createRootSchema(true);
     JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
         (RelDataTypeSystem.DEFAULT);
-    Table table = new CompilerUtil.TableBuilderInfo(typeFactory)
+    StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
         .field("ID", SqlTypeName.INTEGER).build();
+    Table table = streamableTable.stream();
     schema.add("FOO", table);
     schema.add("BAR", table);
     FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
index 40bb884..febfdb5 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
@@ -24,6 +24,7 @@
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.storm.sql.TestUtils;
+import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler;
 import org.apache.storm.sql.runtime.ChannelHandler;
 import org.apache.storm.sql.runtime.DataSource;
 import org.apache.storm.sql.runtime.AbstractValuesProcessor;
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
similarity index 91%
rename from external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
rename to external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
index 074a23f..e46ae9c 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
@@ -15,16 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.sql.compiler;
+package org.apache.storm.sql.compiler.backends.standalone;
 
 import backtype.storm.tuple.Values;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.storm.sql.TestUtils;
+import org.apache.storm.sql.compiler.TestCompilerUtils;
+import org.apache.storm.sql.runtime.AbstractValuesProcessor;
 import org.apache.storm.sql.runtime.ChannelHandler;
 import org.apache.storm.sql.runtime.DataSource;
-import org.apache.storm.sql.runtime.AbstractValuesProcessor;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -42,7 +43,7 @@
     String sql = "SELECT ID + 1 FROM FOO WHERE ID > 2";
     TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
     PlanCompiler compiler = new PlanCompiler(typeFactory);
-    AbstractValuesProcessor proc = compiler.compile(state.tree);
+    AbstractValuesProcessor proc = compiler.compile(state.tree());
     Map<String, DataSource> data = new HashMap<>();
     data.put("FOO", new TestUtils.MockDataSource());
     List<Values> values = new ArrayList<>();
@@ -57,7 +58,7 @@
     String sql = "SELECT ID > 0 OR ID < 1, ID > 0 AND ID < 1, NOT (ID > 0 AND ID < 1) FROM FOO WHERE ID > 0 AND ID < 2";
     TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
     PlanCompiler compiler = new PlanCompiler(typeFactory);
-    AbstractValuesProcessor proc = compiler.compile(state.tree);
+    AbstractValuesProcessor proc = compiler.compile(state.tree());
     Map<String, DataSource> data = new HashMap<>();
     data.put("FOO", new TestUtils.MockDataSource());
     List<Values> values = new ArrayList<>();
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
similarity index 90%
rename from external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
rename to external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
index 99083cb..76eba1d 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
@@ -15,20 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.storm.sql.compiler;
+package org.apache.storm.sql.compiler.backends.standalone;
 
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.storm.sql.compiler.TestCompilerUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
 
-import static org.hamcrest.CoreMatchers.*;
+import static org.hamcrest.CoreMatchers.containsString;
 
 public class TestRelNodeCompiler {
   @Test
@@ -37,7 +38,7 @@
     TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
     JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
         RelDataTypeSystem.DEFAULT);
-    LogicalProject project = (LogicalProject) state.tree;
+    LogicalProject project = (LogicalProject) state.tree();
     LogicalFilter filter = (LogicalFilter) project.getInput();
 
     try (StringWriter sw = new StringWriter();
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
index 41e031d..b957565 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
@@ -29,6 +29,12 @@
     parse(sql);
   }
 
+  @Test
+  public void testCreateTableWithPrimaryKey() throws Exception {
+    String sql = "CREATE EXTERNAL TABLE foo (bar INT PRIMARY KEY ASC) LOCATION 'kafka:///foo'";
+    parse(sql);
+  }
+
   @Test(expected = ParseException.class)
   public void testCreateTableWithoutLocation() throws Exception {
     String sql = "CREATE EXTERNAL TABLE foo (bar INT)";
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
index 73a078c..892d2e4 100644
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
@@ -34,4 +34,11 @@
   public void exceptionCaught(Throwable cause) {
 
   }
+
+  public static final AbstractChannelHandler PASS_THROUGH = new AbstractChannelHandler() {
+    @Override
+    public void dataReceived(ChannelContext ctx, Values data) {
+      ctx.emit(data);
+    }
+  };
 }