[FLINK-19049][table] Fix validation of table functions in projections
Validates that table functions cannot be used at locations of scalar
functions and vice versa. An exception is thrown in pre-flight phase.
This closes #13285.
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
index 9406de4..2fa275d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
@@ -19,8 +19,10 @@
import org.apache.flink.api.common.functions.{FlatMapFunction, Function}
import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.api.{TableConfig, TableException, ValidationException}
import org.apache.flink.table.data.{BoxedWrapperRowData, RowData}
+import org.apache.flink.table.functions.FunctionKind
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
import org.apache.flink.table.runtime.generated.GeneratedFunction
import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
@@ -117,6 +119,12 @@
allowSplit: Boolean = false): String = {
val projection = calcProgram.getProjectList.map(calcProgram.expandLocalRef)
+
+ // according to the SQL standard, every table function should also be a scalar function
+ // but we don't allow that for now
+ projection.foreach(_.accept(ScalarFunctionsValidator))
+ condition.foreach(_.accept(ScalarFunctionsValidator))
+
val exprGenerator = new ExprCodeGenerator(ctx, false)
.bindInput(inputType, inputTerm = inputTerm)
@@ -195,4 +203,17 @@
}
}
}
+
+ private object ScalarFunctionsValidator extends RexVisitorImpl[Unit](true) {
+ override def visitCall(call: RexCall): Unit = {
+ super.visitCall(call)
+ call.getOperator match {
+ case bsf: BridgingSqlFunction if bsf.getDefinition.getKind != FunctionKind.SCALAR =>
+ throw new ValidationException(
+ s"Invalid use of function '$bsf'. " +
+ s"Currently, only scalar functions can be used in a projection or filter operation.")
+ case _ => // ok
+ }
+ }
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
index c09606f..3b1157b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
@@ -70,8 +70,10 @@
rexCall.getOperator match {
case func: BridgingSqlFunction if func.getDefinition.getKind == FunctionKind.TABLE => // ok
case _: TableSqlFunction => // ok
- case _ =>
- throw new ValidationException("Currently, only table functions can emit rows.")
+ case f@_ =>
+ throw new ValidationException(
+ s"Invalid use of function '$f'. " +
+ s"Currently, only table functions can be used in a correlate operation.")
}
val swallowInputOnly = if (projectProgram.isDefined) {
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
index fdd2e4c..9b7281f 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
@@ -858,7 +858,7 @@
e,
hasMessage(
containsString(
- "Currently, only table functions can emit rows.")));
+ "Currently, only table functions can be used in a correlate operation.")));
}
}
@@ -869,12 +869,19 @@
tEnv().executeSql("CREATE TABLE SinkTable(s ROW<s STRING, sa ARRAY<STRING> NOT NULL>) WITH ('connector' = 'COLLECTION')");
tEnv().createTemporarySystemFunction("RowTableFunction", RowTableFunction.class);
- tEnv().executeSql(
- "INSERT INTO SinkTable " +
- "SELECT RowTableFunction('test')");
- // currently, calling a table function like a scalar function produces no result
- assertThat(TestCollectionTableFactory.getResult(), equalTo(Collections.emptyList()));
+ try {
+ tEnv().explainSql(
+ "INSERT INTO SinkTable " +
+ "SELECT RowTableFunction('test')");
+ fail();
+ } catch (ValidationException e) {
+ assertThat(
+ e,
+ hasMessage(
+ containsString(
+ "Currently, only scalar functions can be used in a projection or filter operation.")));
+ }
}
@Test
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java
index 2fb79da..35150b0 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java
@@ -38,8 +38,10 @@
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
+import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
+import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
/**
* Tests for user defined functions in the Table API.
@@ -114,7 +116,10 @@
@Test
public void testLateralJoinWithScalarFunction() throws Exception {
thrown.expect(ValidationException.class);
- thrown.expectMessage("Currently, only table functions can emit rows.");
+ thrown.expect(
+ hasMessage(
+ containsString(
+ "Currently, only table functions can be used in a correlate operation.")));
TestCollectionTableFactory.reset();
tEnv().executeSql("CREATE TABLE SourceTable(s STRING) WITH ('connector' = 'COLLECTION')");