[FLINK-19049][table] Fix validation of table functions in projections

Validates that table functions cannot be used at locations of scalar
functions and vice versa. An exception is thrown in pre-flight phase.

This closes #13285.
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
index 9406de4..2fa275d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala
@@ -19,8 +19,10 @@
 
 import org.apache.flink.api.common.functions.{FlatMapFunction, Function}
 import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.api.{TableConfig, TableException, ValidationException}
 import org.apache.flink.table.data.{BoxedWrapperRowData, RowData}
+import org.apache.flink.table.functions.FunctionKind
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
 import org.apache.flink.table.runtime.generated.GeneratedFunction
 import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
@@ -117,6 +119,12 @@
       allowSplit: Boolean = false): String = {
 
     val projection = calcProgram.getProjectList.map(calcProgram.expandLocalRef)
+
+    // according to the SQL standard, every table function should also be a scalar function
+    // but we don't allow that for now
+    projection.foreach(_.accept(ScalarFunctionsValidator))
+    condition.foreach(_.accept(ScalarFunctionsValidator))
+
     val exprGenerator = new ExprCodeGenerator(ctx, false)
         .bindInput(inputType, inputTerm = inputTerm)
 
@@ -195,4 +203,17 @@
       }
     }
   }
+
+  private object ScalarFunctionsValidator extends RexVisitorImpl[Unit](true) {
+    override def visitCall(call: RexCall): Unit = {
+      super.visitCall(call)
+      call.getOperator match {
+        case bsf: BridgingSqlFunction if bsf.getDefinition.getKind != FunctionKind.SCALAR =>
+          throw new ValidationException(
+            s"Invalid use of function '$bsf'. " +
+              s"Currently, only scalar functions can be used in a projection or filter operation.")
+        case _ => // ok
+      }
+    }
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
index c09606f..3b1157b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
@@ -70,8 +70,10 @@
     rexCall.getOperator match {
       case func: BridgingSqlFunction if func.getDefinition.getKind == FunctionKind.TABLE => // ok
       case _: TableSqlFunction => // ok
-      case _ =>
-        throw new ValidationException("Currently, only table functions can emit rows.")
+      case f@_ =>
+        throw new ValidationException(
+          s"Invalid use of function '$f'. " +
+            s"Currently, only table functions can be used in a correlate operation.")
     }
 
     val swallowInputOnly = if (projectProgram.isDefined) {
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
index fdd2e4c..9b7281f 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
@@ -858,7 +858,7 @@
 				e,
 				hasMessage(
 					containsString(
-						"Currently, only table functions can emit rows.")));
+						"Currently, only table functions can be used in a correlate operation.")));
 		}
 	}
 
@@ -869,12 +869,19 @@
 		tEnv().executeSql("CREATE TABLE SinkTable(s ROW<s STRING, sa ARRAY<STRING> NOT NULL>) WITH ('connector' = 'COLLECTION')");
 
 		tEnv().createTemporarySystemFunction("RowTableFunction", RowTableFunction.class);
-		tEnv().executeSql(
-			"INSERT INTO SinkTable " +
-			"SELECT RowTableFunction('test')");
 
-		// currently, calling a table function like a scalar function produces no result
-		assertThat(TestCollectionTableFactory.getResult(), equalTo(Collections.emptyList()));
+		try {
+			tEnv().explainSql(
+				"INSERT INTO SinkTable " +
+				"SELECT RowTableFunction('test')");
+			fail();
+		} catch (ValidationException e) {
+			assertThat(
+				e,
+				hasMessage(
+					containsString(
+						"Currently, only scalar functions can be used in a projection or filter operation.")));
+		}
 	}
 
 	@Test
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java
index 2fb79da..35150b0 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java
@@ -38,8 +38,10 @@
 
 import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.api.Expressions.call;
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
+import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
 
 /**
  * Tests for user defined functions in the Table API.
@@ -114,7 +116,10 @@
 	@Test
 	public void testLateralJoinWithScalarFunction() throws Exception {
 		thrown.expect(ValidationException.class);
-		thrown.expectMessage("Currently, only table functions can emit rows.");
+		thrown.expect(
+			hasMessage(
+				containsString(
+					"Currently, only table functions can be used in a correlate operation.")));
 
 		TestCollectionTableFactory.reset();
 		tEnv().executeSql("CREATE TABLE SourceTable(s STRING) WITH ('connector' = 'COLLECTION')");