STORM-1200. Support collations of primary keys.
diff --git a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
index d026027..db3a675 100644
--- a/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
+++ b/external/sql/storm-sql-core/src/codegen/data/Parser.tdd
@@ -21,6 +21,7 @@
# List of import statements.
imports: [
+ "org.apache.calcite.sql.validate.*",
"org.apache.calcite.util.*",
"org.apache.storm.sql.parser.*",
"java.util.*"
diff --git a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl b/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
index 2d87d7fa..72a8546 100644
--- a/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
+++ b/external/sql/storm-sql-core/src/codegen/includes/parserImpls.ftl
@@ -16,12 +16,18 @@
SqlIdentifier name;
SqlDataTypeSpec type;
ColumnConstraint constraint = null;
+ SqlMonotonicity monotonicity = SqlMonotonicity.NOT_MONOTONIC;
}
{
name = SimpleIdentifier() { pos = getPos(); }
type = DataType()
- [ <PRIMARY> <KEY> { constraint = new ColumnConstraint.PrimaryKey
- (getPos()); } ]
+ [
+ <PRIMARY> <KEY>
+ [ <ASC> { monotonicity = SqlMonotonicity.INCREASING; }
+ | <DESC> { monotonicity = SqlMonotonicity.DECREASING; }
+ ]
+ { constraint = new ColumnConstraint.PrimaryKey(monotonicity, getPos()); }
+ ]
{
list.add(new ColumnDefinition(name, type, constraint, pos));
}
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
index d951243..2350422 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/StormSqlImpl.java
@@ -28,8 +28,8 @@
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
-import org.apache.storm.sql.compiler.PlanCompiler;
import org.apache.storm.sql.parser.ColumnConstraint;
+import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler;
import org.apache.storm.sql.parser.ColumnDefinition;
import org.apache.storm.sql.parser.SqlCreateTable;
import org.apache.storm.sql.parser.StormParser;
@@ -77,7 +77,7 @@
TableBuilderInfo builder = new TableBuilderInfo(typeFactory);
List<FieldInfo> fields = new ArrayList<>();
for (ColumnDefinition col : n.fieldList()) {
- builder.field(col.name(), col.type());
+ builder.field(col.name(), col.type(), col.constraint());
RelDataType dataType = col.type().deriveType(typeFactory);
Class<?> javaType = (Class<?>)typeFactory.getJavaClass(dataType);
ColumnConstraint constraint = col.constraint();
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
index 1ef1cb7..30ea0e3 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/CompilerUtil.java
@@ -18,20 +18,28 @@
package org.apache.storm.sql.compiler;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.schema.Schema;
-import org.apache.calcite.schema.Statistic;
-import org.apache.calcite.schema.Statistics;
-import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.*;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Util;
+import org.apache.storm.sql.parser.ColumnConstraint;
import java.util.ArrayList;
+import static org.apache.calcite.rel.RelFieldCollation.Direction;
+import static org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING;
+import static org.apache.calcite.rel.RelFieldCollation.Direction.DESCENDING;
+import static org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import static org.apache.calcite.sql.validate.SqlMonotonicity.INCREASING;
+
public class CompilerUtil {
public static String escapeJavaString(String s, boolean nullMeansNull) {
if(s == null) {
@@ -66,6 +74,8 @@
private final ArrayList<FieldType> fields = new ArrayList<>();
private final ArrayList<Object[]> rows = new ArrayList<>();
+ private int primaryKey = -1;
+ private SqlMonotonicity primaryKeyMonotonicity;
private Statistic stats;
public TableBuilderInfo field(String name, SqlTypeName type) {
@@ -74,16 +84,14 @@
return this;
}
- public TableBuilderInfo field(String name, SqlTypeName type, int
- precision) {
- RelDataType dataType = typeFactory.createSqlType(type, precision);
- fields.add(new FieldType(name, dataType));
- return this;
- }
-
- public TableBuilderInfo field(
- String name, SqlDataTypeSpec type) {
+ public TableBuilderInfo field(String name, SqlDataTypeSpec type, ColumnConstraint constraint) {
RelDataType dataType = type.deriveType(typeFactory);
+ if (constraint instanceof ColumnConstraint.PrimaryKey) {
+ ColumnConstraint.PrimaryKey pk = (ColumnConstraint.PrimaryKey) constraint;
+ Preconditions.checkState(primaryKey == -1, "There are more than one primary key in the table");
+ primaryKey = fields.size();
+ primaryKeyMonotonicity = pk.monotonicity();
+ }
fields.add(new FieldType(name, dataType));
return this;
}
@@ -99,9 +107,9 @@
return this;
}
- public Table build() {
- final Statistic stat = stats;
- return new Table() {
+ public StreamableTable build() {
+ final Statistic stat = buildStatistic();
+ final Table tbl = new Table() {
@Override
public RelDataType getRowType(
RelDataTypeFactory relDataTypeFactory) {
@@ -123,6 +131,38 @@
return Schema.TableType.TABLE;
}
};
+
+ return new StreamableTable() {
+ @Override
+ public Table stream() {
+ return tbl;
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
+ return tbl.getRowType(relDataTypeFactory);
+ }
+
+ @Override
+ public Statistic getStatistic() {
+ return tbl.getStatistic();
+ }
+
+ @Override
+ public Schema.TableType getJdbcTableType() {
+ return Schema.TableType.TABLE;
+ }
+ };
+ }
+
+ private Statistic buildStatistic() {
+ if (stats != null || primaryKey == -1) {
+ return stats;
+ }
+ Direction dir = primaryKeyMonotonicity == INCREASING ? ASCENDING : DESCENDING;
+ RelFieldCollation collation = new RelFieldCollation(primaryKey, dir, NullDirection.UNSPECIFIED);
+ return Statistics.of(fields.size(), ImmutableList.of(ImmutableBitSet.of(primaryKey)),
+ ImmutableList.of(RelCollations.of(collation)));
}
}
}
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
similarity index 97%
rename from external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
rename to external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
index d2d3710..46009e9 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/PlanCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/PlanCompiler.java
@@ -15,12 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm.sql.compiler;
+package org.apache.storm.sql.compiler.backends.standalone;
import com.google.common.base.Joiner;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;
+import org.apache.storm.sql.compiler.CompilerUtil;
import org.apache.storm.sql.javac.CompilingClassLoader;
import org.apache.storm.sql.runtime.AbstractValuesProcessor;
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
similarity index 83%
rename from external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java
rename to external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
index eea451f..6d51a11 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/RelNodeCompiler.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/compiler/backends/standalone/RelNodeCompiler.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm.sql.compiler;
+package org.apache.storm.sql.compiler.backends.standalone;
import com.google.common.base.Joiner;
import org.apache.calcite.adapter.java.JavaTypeFactory;
@@ -23,6 +23,9 @@
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.stream.Delta;
+import org.apache.storm.sql.compiler.ExprCompiler;
+import org.apache.storm.sql.compiler.PostOrderRelNodeVisitor;
import java.io.PrintWriter;
@@ -41,6 +44,9 @@
" public void dataReceived(ChannelContext ctx, Values _data) {",
""
);
+ private static final String STAGE_PASSTHROUGH = NEW_LINE_JOINER.join(
+ " private static final ChannelHandler %1$s = AbstractChannelHandler.PASS_THROUGH;",
+ "");
RelNodeCompiler(PrintWriter pw, JavaTypeFactory typeFactory) {
this.pw = pw;
@@ -48,6 +54,12 @@
}
@Override
+ public Void visitDelta(Delta delta) throws Exception {
+ pw.print(String.format(STAGE_PASSTHROUGH, getStageName(delta)));
+ return null;
+ }
+
+ @Override
public Void visitFilter(Filter filter) throws Exception {
beginStage(filter);
ExprCompiler compiler = new ExprCompiler(pw, typeFactory);
@@ -81,9 +93,7 @@
@Override
public Void visitTableScan(TableScan scan) throws Exception {
- beginStage(scan);
- pw.print(" ctx.emit(_data);\n");
- endStage();
+ pw.print(String.format(STAGE_PASSTHROUGH, getStageName(scan)));
return null;
}
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
index 6daf6d3..c67d8e7 100644
--- a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/ColumnConstraint.java
@@ -21,6 +21,7 @@
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlMonotonicity;
public class ColumnConstraint extends SqlLiteral {
private ColumnConstraint(
@@ -29,8 +30,13 @@
}
public static class PrimaryKey extends ColumnConstraint {
- public PrimaryKey(SqlParserPos pos) {
- super("PRIMARY", SqlTypeName.SYMBOL, pos);
+ private final SqlMonotonicity monotonicity;
+ public PrimaryKey(SqlMonotonicity monotonicity, SqlParserPos pos) {
+ super(SqlDDLKeywords.PRIMARY, SqlTypeName.SYMBOL, pos);
+ this.monotonicity = monotonicity;
+ }
+ public SqlMonotonicity monotonicity() {
+ return monotonicity;
}
}
}
diff --git a/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java
new file mode 100644
index 0000000..3112e53
--- /dev/null
+++ b/external/sql/storm-sql-core/src/jvm/org/apache/storm/sql/parser/SqlDDLKeywords.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.sql.parser;
+
+import org.apache.calcite.sql.SqlLiteral;
+
+/**
+ * Define the keywords that can occur in a CREATE TABLE statement
+ */
+public enum SqlDDLKeywords implements SqlLiteral.SqlSymbol {
+ PRIMARY
+}
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
index 9facd8a..b238e18 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/TestStormSql.java
@@ -63,7 +63,7 @@
public void testExternalDataSource() throws Exception {
List<String> stmt = new ArrayList<>();
stmt.add("CREATE EXTERNAL TABLE FOO (ID INT) LOCATION 'mock:///foo'");
- stmt.add("SELECT ID + 1 FROM FOO WHERE ID > 2");
+ stmt.add("SELECT STREAM ID + 1 FROM FOO WHERE ID > 2");
StormSql sql = StormSql.construct();
List<Values> values = new ArrayList<>();
ChannelHandler h = new TestUtils.CollectDataChannelHandler(values);
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
index c582fdc..0e5fa0b 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestCompilerUtils.java
@@ -5,6 +5,7 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.StreamableTable;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
@@ -17,8 +18,9 @@
SchemaPlus schema = Frameworks.createRootSchema(true);
JavaTypeFactory typeFactory = new JavaTypeFactoryImpl
(RelDataTypeSystem.DEFAULT);
- Table table = new CompilerUtil.TableBuilderInfo(typeFactory)
+ StreamableTable streamableTable = new CompilerUtil.TableBuilderInfo(typeFactory)
.field("ID", SqlTypeName.INTEGER).build();
+ Table table = streamableTable.stream();
schema.add("FOO", table);
schema.add("BAR", table);
FrameworkConfig config = Frameworks.newConfigBuilder().defaultSchema(
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
index 40bb884..febfdb5 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestExprSemantic.java
@@ -24,6 +24,7 @@
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.storm.sql.TestUtils;
+import org.apache.storm.sql.compiler.backends.standalone.PlanCompiler;
import org.apache.storm.sql.runtime.ChannelHandler;
import org.apache.storm.sql.runtime.DataSource;
import org.apache.storm.sql.runtime.AbstractValuesProcessor;
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
similarity index 91%
rename from external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
rename to external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
index 074a23f..e46ae9c 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestPlanCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestPlanCompiler.java
@@ -15,16 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm.sql.compiler;
+package org.apache.storm.sql.compiler.backends.standalone;
import backtype.storm.tuple.Values;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.storm.sql.TestUtils;
+import org.apache.storm.sql.compiler.TestCompilerUtils;
+import org.apache.storm.sql.runtime.AbstractValuesProcessor;
import org.apache.storm.sql.runtime.ChannelHandler;
import org.apache.storm.sql.runtime.DataSource;
-import org.apache.storm.sql.runtime.AbstractValuesProcessor;
import org.junit.Assert;
import org.junit.Test;
@@ -42,7 +43,7 @@
String sql = "SELECT ID + 1 FROM FOO WHERE ID > 2";
TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
PlanCompiler compiler = new PlanCompiler(typeFactory);
- AbstractValuesProcessor proc = compiler.compile(state.tree);
+ AbstractValuesProcessor proc = compiler.compile(state.tree());
Map<String, DataSource> data = new HashMap<>();
data.put("FOO", new TestUtils.MockDataSource());
List<Values> values = new ArrayList<>();
@@ -57,7 +58,7 @@
String sql = "SELECT ID > 0 OR ID < 1, ID > 0 AND ID < 1, NOT (ID > 0 AND ID < 1) FROM FOO WHERE ID > 0 AND ID < 2";
TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
PlanCompiler compiler = new PlanCompiler(typeFactory);
- AbstractValuesProcessor proc = compiler.compile(state.tree);
+ AbstractValuesProcessor proc = compiler.compile(state.tree());
Map<String, DataSource> data = new HashMap<>();
data.put("FOO", new TestUtils.MockDataSource());
List<Values> values = new ArrayList<>();
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
similarity index 90%
rename from external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
rename to external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
index 99083cb..76eba1d 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/TestRelNodeCompiler.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/compiler/backends/standalone/TestRelNodeCompiler.java
@@ -15,20 +15,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.storm.sql.compiler;
+package org.apache.storm.sql.compiler.backends.standalone;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.storm.sql.compiler.TestCompilerUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.PrintWriter;
import java.io.StringWriter;
-import static org.hamcrest.CoreMatchers.*;
+import static org.hamcrest.CoreMatchers.containsString;
public class TestRelNodeCompiler {
@Test
@@ -37,7 +38,7 @@
TestCompilerUtils.CalciteState state = TestCompilerUtils.sqlOverDummyTable(sql);
JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(
RelDataTypeSystem.DEFAULT);
- LogicalProject project = (LogicalProject) state.tree;
+ LogicalProject project = (LogicalProject) state.tree();
LogicalFilter filter = (LogicalFilter) project.getInput();
try (StringWriter sw = new StringWriter();
diff --git a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
index 41e031d..b957565 100644
--- a/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
+++ b/external/sql/storm-sql-core/src/test/org/apache/storm/sql/parser/TestSqlParser.java
@@ -29,6 +29,12 @@
parse(sql);
}
+ @Test
+ public void testCreateTableWithPrimaryKey() throws Exception {
+ String sql = "CREATE EXTERNAL TABLE foo (bar INT PRIMARY KEY ASC) LOCATION 'kafka:///foo'";
+ parse(sql);
+ }
+
@Test(expected = ParseException.class)
public void testCreateTableWithoutLocation() throws Exception {
String sql = "CREATE EXTERNAL TABLE foo (bar INT)";
diff --git a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
index 73a078c..892d2e4 100644
--- a/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
+++ b/external/sql/storm-sql-runtime/src/jvm/org/apache/storm/sql/runtime/AbstractChannelHandler.java
@@ -34,4 +34,11 @@
public void exceptionCaught(Throwable cause) {
}
+
+ public static final AbstractChannelHandler PASS_THROUGH = new AbstractChannelHandler() {
+ @Override
+ public void dataReceived(ChannelContext ctx, Values data) {
+ ctx.emit(data);
+ }
+ };
}