[FLINK-14272][python][table-planner-blink] Support Blink planner for Python UDF(Scalar Function)
Ports python User-Defined Scalar Function from flink planner to blink planner.
This closes #9890.
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 940aafc..e4af611 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -47,8 +47,9 @@
__metaclass__ = ABCMeta
- def __init__(self, j_tenv, serializer=PickleSerializer()):
+ def __init__(self, j_tenv, is_blink_planner, serializer=PickleSerializer()):
self._j_tenv = j_tenv
+ self._is_blink_planner = is_blink_planner
self._serializer = serializer
def from_table_source(self, table_source):
@@ -570,7 +571,8 @@
:param function: The python user-defined function to register.
:type function: UserDefinedFunctionWrapper
"""
- self._j_tenv.registerFunction(name, function._judf)
+ self._j_tenv.registerFunction(name, function._judf(self._is_blink_planner,
+ self.get_config()._j_table_config))
def execute(self, job_name):
"""
@@ -712,9 +714,17 @@
execution_config = self._get_execution_config(temp_file.name, schema)
gateway = get_gateway()
j_objs = gateway.jvm.PythonBridgeUtils.readPythonObjects(temp_file.name, True)
- j_input_format = gateway.jvm.PythonTableUtils.getInputFormat(
+ if self._is_blink_planner:
+ PythonTableUtils = gateway.jvm \
+ .org.apache.flink.table.planner.utils.python.PythonTableUtils
+ PythonInputFormatTableSource = gateway.jvm \
+ .org.apache.flink.table.planner.utils.python.PythonInputFormatTableSource
+ else:
+ PythonTableUtils = gateway.jvm.PythonTableUtils
+ PythonInputFormatTableSource = gateway.jvm.PythonInputFormatTableSource
+ j_input_format = PythonTableUtils.getInputFormat(
j_objs, row_type_info, execution_config)
- j_table_source = gateway.jvm.PythonInputFormatTableSource(
+ j_table_source = PythonInputFormatTableSource(
j_input_format, row_type_info)
return Table(self._j_tenv.fromTableSource(j_table_source))
@@ -728,9 +738,9 @@
class StreamTableEnvironment(TableEnvironment):
- def __init__(self, j_tenv):
+ def __init__(self, j_tenv, is_blink_planner):
self._j_tenv = j_tenv
- super(StreamTableEnvironment, self).__init__(j_tenv)
+ super(StreamTableEnvironment, self).__init__(j_tenv, is_blink_planner)
def _get_execution_config(self, filename, schema):
return self._j_tenv.execEnv().getConfig()
@@ -832,14 +842,18 @@
else:
j_tenv = gateway.jvm.StreamTableEnvironment.create(
stream_execution_environment._j_stream_execution_environment)
- return StreamTableEnvironment(j_tenv)
+ j_planner_class = j_tenv.getPlanner().getClass()
+ j_blink_planner_class = get_java_class(
+ get_gateway().jvm.org.apache.flink.table.planner.delegation.PlannerBase)
+ is_blink_planner = j_blink_planner_class.isAssignableFrom(j_planner_class)
+ return StreamTableEnvironment(j_tenv, is_blink_planner)
class BatchTableEnvironment(TableEnvironment):
- def __init__(self, j_tenv):
+ def __init__(self, j_tenv, is_blink_planner):
self._j_tenv = j_tenv
- super(BatchTableEnvironment, self).__init__(j_tenv)
+ super(BatchTableEnvironment, self).__init__(j_tenv, is_blink_planner)
def _get_execution_config(self, filename, schema):
gateway = get_gateway()
@@ -966,7 +980,7 @@
else:
j_tenv = gateway.jvm.BatchTableEnvironment.create(
execution_environment._j_execution_environment)
- return BatchTableEnvironment(j_tenv)
+ return BatchTableEnvironment(j_tenv, False)
elif environment_settings is not None and \
execution_environment is None and \
table_config is None:
@@ -975,4 +989,8 @@
"set to batch mode.")
j_tenv = gateway.jvm.TableEnvironment.create(
environment_settings._j_environment_settings)
- return BatchTableEnvironment(j_tenv)
+ j_planner_class = j_tenv.getPlanner().getClass()
+ j_blink_planner_class = get_java_class(
+ get_gateway().jvm.org.apache.flink.table.planner.delegation.PlannerBase)
+ is_blink_planner = j_blink_planner_class.isAssignableFrom(j_planner_class)
+ return BatchTableEnvironment(j_tenv, is_blink_planner)
diff --git a/flink-python/pyflink/table/tests/test_udf.py b/flink-python/pyflink/table/tests/test_udf.py
index 321cd78..d529681 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -18,10 +18,11 @@
from pyflink.table import DataTypes
from pyflink.table.udf import ScalarFunction, udf
from pyflink.testing import source_sink_utils
-from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
+from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, \
+ PyFlinkBlinkStreamTableTestCase, PyFlinkBlinkBatchTableTestCase
-class UserDefinedFunctionTests(PyFlinkStreamTableTestCase):
+class UserDefinedFunctionTests(object):
def test_scalar_function(self):
# test lambda function
@@ -49,19 +50,19 @@
udf(functools.partial(partial_func, param=1), DataTypes.BIGINT(), DataTypes.BIGINT()))
table_sink = source_sink_utils.TestAppendSink(
- ['a', 'b', 'c', 'd', 'e'],
+ ['a', 'b', 'c', 'd', 'e', 'f'],
[DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(),
- DataTypes.BIGINT()])
+ DataTypes.BIGINT(), DataTypes.BIGINT()])
self.t_env.register_table_sink("Results", table_sink)
t = self.t_env.from_elements([(1, 2, 3), (2, 5, 6), (3, 1, 9)], ['a', 'b', 'c'])
t.where("add_one(b) <= 3") \
.select("add_one(a), subtract_one(b), add(a, c), add_one_callable(a), "
- "add_one_partial(a)") \
+ "add_one_partial(a), a") \
.insert_into("Results")
self.t_env.execute("test")
actual = source_sink_utils.results()
- self.assert_equals(actual, ["2,1,4,2,2", "4,0,12,4,4"])
+ self.assert_equals(actual, ["2,1,4,2,2,1", "4,0,12,4,4,3"])
def test_chaining_scalar_function(self):
self.t_env.register_function(
@@ -327,6 +328,21 @@
self.assert_equals(actual, ["1,2", "1,2", "1,2"])
+class PyFlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests,
+ PyFlinkStreamTableTestCase):
+ pass
+
+
+class PyFlinkBlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests,
+ PyFlinkBlinkStreamTableTestCase):
+ pass
+
+
+class PyFlinkBlinkBatchUserDefinedFunctionTests(UserDefinedFunctionTests,
+ PyFlinkBlinkBatchTableTestCase):
+ pass
+
+
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def add(i, j):
return i + j
diff --git a/flink-python/pyflink/table/udf.py b/flink-python/pyflink/table/udf.py
index 97ac391..ccc7b46 100644
--- a/flink-python/pyflink/table/udf.py
+++ b/flink-python/pyflink/table/udf.py
@@ -149,13 +149,12 @@
self._deterministic = deterministic if deterministic is not None else (
func.is_deterministic() if isinstance(func, UserDefinedFunction) else True)
- @property
- def _judf(self):
+ def _judf(self, is_blink_planner, table_config):
if self._judf_placeholder is None:
- self._judf_placeholder = self._create_judf()
+ self._judf_placeholder = self._create_judf(is_blink_planner, table_config)
return self._judf_placeholder
- def _create_judf(self):
+ def _create_judf(self, is_blink_planner, table_config):
func = self._func
if not isinstance(self._func, UserDefinedFunction):
func = DelegatingScalarFunction(self._func)
@@ -167,13 +166,28 @@
j_input_types = utils.to_jarray(gateway.jvm.TypeInformation,
[_to_java_type(i) for i in self._input_types])
j_result_type = _to_java_type(self._result_type)
- return gateway.jvm.org.apache.flink.table.util.python.PythonTableUtils \
- .createPythonScalarFunction(self._name,
- bytearray(serialized_func),
- j_input_types,
- j_result_type,
- self._deterministic,
- _get_python_env())
+ if is_blink_planner:
+ PythonTableUtils = gateway.jvm\
+ .org.apache.flink.table.planner.utils.python.PythonTableUtils
+ j_scalar_function = PythonTableUtils \
+ .createPythonScalarFunction(table_config,
+ self._name,
+ bytearray(serialized_func),
+ j_input_types,
+ j_result_type,
+ self._deterministic,
+ _get_python_env())
+ else:
+ PythonTableUtils = gateway.jvm.PythonTableUtils
+ j_scalar_function = PythonTableUtils \
+ .createPythonScalarFunction(self._name,
+ bytearray(serialized_func),
+ j_input_types,
+ j_result_type,
+ self._deterministic,
+ _get_python_env())
+
+ return j_scalar_function
# TODO: support to configure the python execution environment
diff --git a/flink-python/pyflink/testing/test_case_utils.py b/flink-python/pyflink/testing/test_case_utils.py
index 888824a..4d86f18 100644
--- a/flink-python/pyflink/testing/test_case_utils.py
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -31,7 +31,7 @@
from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.find_flink_home import _find_flink_home
-from pyflink.table import BatchTableEnvironment, StreamTableEnvironment
+from pyflink.table import BatchTableEnvironment, StreamTableEnvironment, EnvironmentSettings
from pyflink.java_gateway import get_gateway
if sys.version_info[0] >= 3:
@@ -119,7 +119,7 @@
class PyFlinkStreamTableTestCase(PyFlinkTestCase):
"""
- Base class for stream unit tests.
+ Base class for stream tests.
"""
def setUp(self):
@@ -131,7 +131,7 @@
class PyFlinkBatchTableTestCase(PyFlinkTestCase):
"""
- Base class for batch unit tests.
+ Base class for batch tests.
"""
def setUp(self):
@@ -149,6 +149,32 @@
return string_result
+class PyFlinkBlinkStreamTableTestCase(PyFlinkTestCase):
+ """
+ Base class for stream tests of blink planner.
+ """
+
+ def setUp(self):
+ super(PyFlinkBlinkStreamTableTestCase, self).setUp()
+ self.env = StreamExecutionEnvironment.get_execution_environment()
+
+ self.t_env = StreamTableEnvironment.create(
+ self.env, environment_settings=EnvironmentSettings.new_instance()
+ .in_streaming_mode().use_blink_planner().build())
+
+
+class PyFlinkBlinkBatchTableTestCase(PyFlinkTestCase):
+ """
+ Base class for batch tests of blink planner.
+ """
+
+ def setUp(self):
+ super(PyFlinkBlinkBatchTableTestCase, self).setUp()
+ self.t_env = BatchTableEnvironment.create(
+ environment_settings=EnvironmentSettings.new_instance()
+ .in_batch_mode().use_blink_planner().build())
+
+
class PythonAPICompletenessTestCase(object):
"""
Base class for Python API completeness tests, i.e.,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/PythonFunctionCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/PythonFunctionCodeGenerator.scala
new file mode 100644
index 0000000..68069bd
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/PythonFunctionCodeGenerator.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.codegen
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.functions.python.{PythonEnv, PythonFunction}
+import org.apache.flink.table.functions.{FunctionLanguage, ScalarFunction, UserDefinedFunction}
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{newName, primitiveDefaultValue, primitiveTypeTermForType}
+import org.apache.flink.table.planner.codegen.Indenter.toISC
+import org.apache.flink.table.runtime.generated.GeneratedFunction
+import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter
+
+/**
+ * A code generator for generating Python [[UserDefinedFunction]]s.
+ */
+object PythonFunctionCodeGenerator {
+
+ private val PYTHON_SCALAR_FUNCTION_NAME = "PythonScalarFunction"
+
+ /**
+ * Generates a [[ScalarFunction]] for the specified Python user-defined function.
+ *
+ * @param ctx The context of the code generator
+ * @param name name of the user-defined function
+ * @param serializedScalarFunction serialized Python scalar function
+ * @param inputTypes input data types
+ * @param resultType expected result type
+ * @param deterministic the determinism of the function's results
+ * @param pythonEnv the Python execution environment
+ * @return instance of generated ScalarFunction
+ */
+ def generateScalarFunction(
+ ctx: CodeGeneratorContext,
+ name: String,
+ serializedScalarFunction: Array[Byte],
+ inputTypes: Array[TypeInformation[_]],
+ resultType: TypeInformation[_],
+ deterministic: Boolean,
+ pythonEnv: PythonEnv): ScalarFunction = {
+ val funcName = newName(PYTHON_SCALAR_FUNCTION_NAME)
+ val resultLogicType = TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType(resultType)
+ val resultTypeTerm = primitiveTypeTermForType(resultLogicType)
+ val defaultResultValue = primitiveDefaultValue(resultLogicType)
+ val inputParamCode = inputTypes.zipWithIndex.map { case (inputType, index) =>
+ s"${primitiveTypeTermForType(
+ TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType(inputType))} in$index"
+ }.mkString(", ")
+
+ val typeInfoTypeTerm = classOf[TypeInformation[_]].getCanonicalName
+ val pythonEnvTypeTerm = classOf[PythonEnv].getCanonicalName
+
+ val resultTypeNameTerm =
+ ctx.addReusableObject(resultType, "resultType", typeInfoTypeTerm)
+ val serializedScalarFunctionNameTerm =
+ ctx.addReusableObject(serializedScalarFunction, "serializedScalarFunction", "byte[]")
+ val pythonEnvNameTerm = ctx.addReusableObject(pythonEnv, "pythonEnv", pythonEnvTypeTerm)
+ val inputTypesCode = inputTypes
+ .map(ctx.addReusableObject(_, "inputType", typeInfoTypeTerm))
+ .mkString(", ")
+
+ val funcCode = j"""
+ |public class $funcName extends ${classOf[ScalarFunction].getCanonicalName}
+ | implements ${classOf[PythonFunction].getCanonicalName} {
+ |
+ | private static final long serialVersionUID = 1L;
+ |
+ | ${ctx.reuseMemberCode()}
+ |
+ | public $funcName(Object[] references) throws Exception {
+ | ${ctx.reuseInitCode()}
+ | }
+ |
+ | public $resultTypeTerm eval($inputParamCode) {
+ | return $defaultResultValue;
+ | }
+ |
+ | @Override
+ | public $typeInfoTypeTerm[] getParameterTypes(Class<?>[] signature) {
+ | return new $typeInfoTypeTerm[]{$inputTypesCode};
+ | }
+ |
+ | @Override
+ | public $typeInfoTypeTerm getResultType(Class<?>[] signature) {
+ | return $resultTypeNameTerm;
+ | }
+ |
+ | @Override
+ | public ${classOf[FunctionLanguage].getCanonicalName} getLanguage() {
+ | return ${classOf[FunctionLanguage].getCanonicalName}.PYTHON;
+ | }
+ |
+ | @Override
+ | public byte[] getSerializedPythonFunction() {
+ | return $serializedScalarFunctionNameTerm;
+ | }
+ |
+ | @Override
+ | public $pythonEnvTypeTerm getPythonEnv() {
+ | return $pythonEnvNameTerm;
+ | }
+ |
+ | @Override
+ | public boolean isDeterministic() {
+ | return $deterministic;
+ | }
+ |
+ | @Override
+ | public String toString() {
+ | return "$name";
+ | }
+ |}
+ |""".stripMargin
+ new GeneratedFunction(funcName, funcCode, ctx.references.toArray)
+ .newInstance(Thread.currentThread().getContextClassLoader)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonCalc.scala
new file mode 100644
index 0000000..4ca9d8f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonCalc.scala
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.common
+
+import org.apache.calcite.plan.RelOptCluster
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.functions.FunctionLanguage
+import org.apache.flink.table.functions.python.{PythonFunction, PythonFunctionInfo, SimplePythonFunction}
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.codegen.{CalcCodeGenerator, CodeGeneratorContext}
+import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction
+import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCalc.PYTHON_SCALAR_FUNCTION_OPERATOR_NAME
+import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
+import org.apache.flink.table.types.logical.RowType
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+trait CommonPythonCalc {
+
+ private lazy val convertLiteralToPython = {
+ val clazz = Class.forName("org.apache.flink.api.common.python.PythonBridgeUtils")
+ clazz.getMethod("convertLiteralToPython", classOf[RexLiteral], classOf[SqlTypeName])
+ }
+
+ private def extractPythonScalarFunctionInfos(
+ rexCalls: Array[RexCall]): (Array[Int], Array[PythonFunctionInfo]) = {
+ // using LinkedHashMap to keep the insert order
+ val inputNodes = new mutable.LinkedHashMap[RexNode, Integer]()
+ val pythonFunctionInfos = rexCalls.map(createPythonScalarFunctionInfo(_, inputNodes))
+
+ val udfInputOffsets = inputNodes.toArray
+ .map(_._1)
+ .filter(_.isInstanceOf[RexInputRef])
+ .map(_.asInstanceOf[RexInputRef].getIndex)
+ (udfInputOffsets, pythonFunctionInfos)
+ }
+
+ private def createPythonScalarFunctionInfo(
+ rexCall: RexCall,
+ inputNodes: mutable.Map[RexNode, Integer]): PythonFunctionInfo = rexCall.getOperator match {
+ case sfc: ScalarSqlFunction if sfc.scalarFunction.getLanguage == FunctionLanguage.PYTHON =>
+ val inputs = new mutable.ArrayBuffer[AnyRef]()
+ rexCall.getOperands.foreach {
+ case pythonRexCall: RexCall if pythonRexCall.getOperator.asInstanceOf[ScalarSqlFunction]
+ .scalarFunction.getLanguage == FunctionLanguage.PYTHON =>
+ // Continuous Python UDFs can be chained together
+ val argPythonInfo = createPythonScalarFunctionInfo(pythonRexCall, inputNodes)
+ inputs.append(argPythonInfo)
+
+ case literal: RexLiteral =>
+ inputs.append(
+ convertLiteralToPython.invoke(null, literal, literal.getType.getSqlTypeName))
+
+ case argNode: RexNode =>
+ // For input arguments of RexInputRef, it's replaced with an offset into the input row
+ inputNodes.get(argNode) match {
+ case Some(existing) => inputs.append(existing)
+ case None =>
+ val inputOffset = Integer.valueOf(inputNodes.size)
+ inputs.append(inputOffset)
+ inputNodes.put(argNode, inputOffset)
+ }
+ }
+
+ // Extracts the necessary information for Python function execution, such as
+ // the serialized Python function, the Python env, etc
+ val pythonFunction = new SimplePythonFunction(
+ sfc.scalarFunction.asInstanceOf[PythonFunction].getSerializedPythonFunction,
+ sfc.scalarFunction.asInstanceOf[PythonFunction].getPythonEnv)
+ new PythonFunctionInfo(pythonFunction, inputs.toArray)
+ }
+
+ private def getPythonScalarFunctionOperator(
+ inputRowTypeInfo: BaseRowTypeInfo,
+ outputRowTypeInfo: BaseRowTypeInfo,
+ udfInputOffsets: Array[Int],
+ pythonFunctionInfos: Array[PythonFunctionInfo],
+ forwardedFields: Array[Int])= {
+ val clazz = Class.forName(PYTHON_SCALAR_FUNCTION_OPERATOR_NAME)
+ val ctor = clazz.getConstructor(
+ classOf[Array[PythonFunctionInfo]],
+ classOf[RowType],
+ classOf[RowType],
+ classOf[Array[Int]],
+ classOf[Array[Int]])
+ ctor.newInstance(
+ pythonFunctionInfos,
+ inputRowTypeInfo.toRowType,
+ outputRowTypeInfo.toRowType,
+ udfInputOffsets,
+ forwardedFields)
+ .asInstanceOf[OneInputStreamOperator[BaseRow, BaseRow]]
+ }
+
+ private def createPythonOneInputTransformation(
+ inputTransform: Transformation[BaseRow],
+ calcProgram: RexProgram,
+ name: String) = {
+ val pythonRexCalls = calcProgram.getProjectList
+ .map(calcProgram.expandLocalRef)
+ .filter(_.isInstanceOf[RexCall])
+ .map(_.asInstanceOf[RexCall])
+ .toArray
+
+ val forwardedFields: Array[Int] = calcProgram.getProjectList
+ .map(calcProgram.expandLocalRef)
+ .filter(_.isInstanceOf[RexInputRef])
+ .map(_.asInstanceOf[RexInputRef].getIndex)
+ .toArray
+
+ val resultProjectList = {
+ var idx = 0
+ calcProgram.getProjectList
+ .map(calcProgram.expandLocalRef)
+ .map {
+ case pythonCall: RexCall =>
+ val inputRef = new RexInputRef(forwardedFields.length + idx, pythonCall.getType)
+ idx += 1
+ inputRef
+ case node => node
+ }
+ }
+
+ val (pythonUdfInputOffsets, pythonFunctionInfos) =
+ extractPythonScalarFunctionInfos(pythonRexCalls)
+
+ val inputLogicalTypes =
+ inputTransform.getOutputType.asInstanceOf[BaseRowTypeInfo].getLogicalTypes
+ val pythonOperatorInputTypeInfo = inputTransform.getOutputType.asInstanceOf[BaseRowTypeInfo]
+ val pythonOperatorResultTyeInfo = new BaseRowTypeInfo(
+ forwardedFields.map(inputLogicalTypes(_)) ++
+ pythonRexCalls.map(node => FlinkTypeFactory.toLogicalType(node.getType)): _*)
+
+ val pythonOperator = getPythonScalarFunctionOperator(
+ pythonOperatorInputTypeInfo,
+ pythonOperatorResultTyeInfo,
+ pythonUdfInputOffsets,
+ pythonFunctionInfos,
+ forwardedFields)
+
+ val pythonInputTransform = new OneInputTransformation(
+ inputTransform,
+ name,
+ pythonOperator,
+ pythonOperatorResultTyeInfo,
+ inputTransform.getParallelism
+ )
+ (pythonInputTransform, pythonOperatorResultTyeInfo, resultProjectList)
+ }
+
+ private def createProjectionRexProgram(
+ inputRowType: RowType,
+ outputRelData: RelDataType,
+ projectList: mutable.Buffer[RexNode],
+ cluster: RelOptCluster) = {
+ val factory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+ val inputRelData = factory.createFieldTypeFromLogicalType(inputRowType)
+ RexProgram.create(inputRelData, projectList, null, outputRelData, cluster.getRexBuilder)
+ }
+
+ protected def createOneInputTransformation(
+ inputTransform: Transformation[BaseRow],
+ inputsContainSingleton: Boolean,
+ calcProgram: RexProgram,
+ name: String,
+ config : TableConfig,
+ ctx : CodeGeneratorContext,
+ cluster: RelOptCluster,
+ rowType: RelDataType,
+ opName: String): OneInputTransformation[BaseRow, BaseRow] = {
+ val (pythonInputTransform, pythonOperatorResultTyeInfo, resultProjectList) =
+ createPythonOneInputTransformation(inputTransform, calcProgram, name)
+
+ if (inputsContainSingleton) {
+ pythonInputTransform.setParallelism(1)
+ pythonInputTransform.setMaxParallelism(1)
+ }
+
+ val onlyFilter = resultProjectList.zipWithIndex.forall { case (rexNode, index) =>
+ rexNode.isInstanceOf[RexInputRef] && rexNode.asInstanceOf[RexInputRef].getIndex == index
+ }
+
+ if (onlyFilter) {
+ pythonInputTransform
+ } else {
+ // After executing python OneInputTransformation, the order of the output fields
+ // is Python Call after the forwarding fields, so in the case of sequential changes,
+ // a calc is needed to adjust the order.
+ val outputType = FlinkTypeFactory.toLogicalRowType(rowType)
+ val rexProgram = createProjectionRexProgram(
+ pythonOperatorResultTyeInfo.toRowType, rowType, resultProjectList, cluster)
+ val substituteOperator = CalcCodeGenerator.generateCalcOperator(
+ ctx,
+ cluster,
+ pythonInputTransform,
+ outputType,
+ config,
+ rexProgram,
+ None,
+ retainHeader = true,
+ opName
+ )
+
+ new OneInputTransformation(
+ pythonInputTransform,
+ name,
+ substituteOperator,
+ BaseRowTypeInfo.of(outputType),
+ pythonInputTransform.getParallelism)
+ }
+ }
+}
+
+object CommonPythonCalc {
+ val PYTHON_SCALAR_FUNCTION_OPERATOR_NAME =
+ "org.apache.flink.table.runtime.operators.python.BaseRowPythonScalarFunctionOperator"
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala
index aefcd2f..7684955 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala
@@ -19,28 +19,18 @@
package org.apache.flink.table.planner.plan.nodes.physical.batch
import org.apache.flink.api.dag.Transformation
-import org.apache.flink.runtime.operators.DamBehavior
import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.{CalcCodeGenerator, CodeGeneratorContext}
import org.apache.flink.table.planner.delegation.BatchPlanner
-import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef, TraitUtil}
-import org.apache.flink.table.planner.plan.nodes.common.CommonCalc
-import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
import org.apache.calcite.plan._
import org.apache.calcite.rel._
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.Calc
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexProgram}
-import org.apache.calcite.sql.SqlKind
-import org.apache.calcite.util.mapping.{Mapping, MappingType, Mappings}
-
-import java.util
-
-import scala.collection.JavaConversions._
+import org.apache.calcite.rex.RexProgram
/**
* Batch physical RelNode for [[Calc]].
@@ -51,87 +41,12 @@
inputRel: RelNode,
calcProgram: RexProgram,
outputRowType: RelDataType)
- extends CommonCalc(cluster, traitSet, inputRel, calcProgram)
- with BatchPhysicalRel
- with BatchExecNode[BaseRow] {
-
- override def deriveRowType(): RelDataType = outputRowType
+ extends BatchExecCalcBase(cluster, traitSet, inputRel, calcProgram, outputRowType) {
override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = {
new BatchExecCalc(cluster, traitSet, child, program, outputRowType)
}
- override def satisfyTraits(requiredTraitSet: RelTraitSet): Option[RelNode] = {
- val requiredDistribution = requiredTraitSet.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
- // Does not push broadcast distribution trait down into Calc.
- if (requiredDistribution.getType == RelDistribution.Type.BROADCAST_DISTRIBUTED) {
- return None
- }
- val projects = calcProgram.getProjectList.map(calcProgram.expandLocalRef)
-
- def getProjectMapping: Mapping = {
- val mapping = Mappings.create(MappingType.INVERSE_FUNCTION,
- getInput.getRowType.getFieldCount, projects.size)
- projects.zipWithIndex.foreach {
- case (project, index) =>
- project match {
- case inputRef: RexInputRef => mapping.set(inputRef.getIndex, index)
- case call: RexCall if call.getKind == SqlKind.AS =>
- call.getOperands.head match {
- case inputRef: RexInputRef => mapping.set(inputRef.getIndex, index)
- case _ => // ignore
- }
- case _ => // ignore
- }
- }
- mapping.inverse()
- }
-
- val mapping = getProjectMapping
- val appliedDistribution = requiredDistribution.apply(mapping)
- // If both distribution and collation can be satisfied, satisfy both. If only distribution
- // can be satisfied, only satisfy distribution. There is no possibility to only satisfy
- // collation here except for there is no distribution requirement.
- if ((!requiredDistribution.isTop) && (appliedDistribution eq FlinkRelDistribution.ANY)) {
- return None
- }
-
- val requiredCollation = requiredTraitSet.getTrait(RelCollationTraitDef.INSTANCE)
- val appliedCollation = TraitUtil.apply(requiredCollation, mapping)
- val canCollationPushedDown = !appliedCollation.getFieldCollations.isEmpty
- // If required traits only contains collation requirements, but collation keys are not columns
- // from input, then no need to satisfy required traits.
- if ((appliedDistribution eq FlinkRelDistribution.ANY) && !canCollationPushedDown) {
- return None
- }
-
- var inputRequiredTraits = getInput.getTraitSet
- var providedTraits = getTraitSet
- if (!appliedDistribution.isTop) {
- inputRequiredTraits = inputRequiredTraits.replace(appliedDistribution)
- providedTraits = providedTraits.replace(requiredDistribution)
- }
- if (canCollationPushedDown) {
- inputRequiredTraits = inputRequiredTraits.replace(appliedCollation)
- providedTraits = providedTraits.replace(requiredCollation)
- }
- val newInput = RelOptRule.convert(getInput, inputRequiredTraits)
- Some(copy(providedTraits, Seq(newInput)))
- }
-
- //~ ExecNode methods -----------------------------------------------------------
-
- override def getDamBehavior = DamBehavior.PIPELINED
-
- override def getInputNodes: util.List[ExecNode[BatchPlanner, _]] =
- List(getInput.asInstanceOf[ExecNode[BatchPlanner, _]])
-
- override def replaceInputNode(
- ordinalInParent: Int,
- newInputNode: ExecNode[BatchPlanner, _]): Unit = {
- replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
- }
-
override protected def translateToPlanInternal(
planner: BatchPlanner): Transformation[BaseRow] = {
val config = planner.getTableConfig
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalcBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalcBase.scala
new file mode 100644
index 0000000..47f3fa7
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalcBase.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.batch
+
+import java.util
+
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.planner.plan.nodes.common.CommonCalc
+import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
+import org.apache.calcite.plan._
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Calc
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexProgram}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.util.mapping.{Mapping, MappingType, Mappings}
+import org.apache.flink.runtime.operators.DamBehavior
+import org.apache.flink.table.planner.delegation.BatchPlanner
+import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef, TraitUtil}
+
+import scala.collection.JavaConversions._
+
+/**
+ * Base batch physical RelNode for [[Calc]].
+ */
+abstract class BatchExecCalcBase(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputRel: RelNode,
+ calcProgram: RexProgram,
+ outputRowType: RelDataType)
+ extends CommonCalc(cluster, traitSet, inputRel, calcProgram)
+ with BatchPhysicalRel
+ with BatchExecNode[BaseRow] {
+
+ override def deriveRowType(): RelDataType = outputRowType
+
+ override def satisfyTraits(requiredTraitSet: RelTraitSet): Option[RelNode] = {
+ val requiredDistribution = requiredTraitSet.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
+ // Does not push broadcast distribution trait down into Calc.
+ if (requiredDistribution.getType == RelDistribution.Type.BROADCAST_DISTRIBUTED) {
+ return None
+ }
+ val projects = calcProgram.getProjectList.map(calcProgram.expandLocalRef)
+
+ def getProjectMapping: Mapping = {
+ val mapping = Mappings.create(MappingType.INVERSE_FUNCTION,
+ getInput.getRowType.getFieldCount, projects.size)
+ projects.zipWithIndex.foreach {
+ case (project, index) =>
+ project match {
+ case inputRef: RexInputRef => mapping.set(inputRef.getIndex, index)
+ case call: RexCall if call.getKind == SqlKind.AS =>
+ call.getOperands.head match {
+ case inputRef: RexInputRef => mapping.set(inputRef.getIndex, index)
+ case _ => // ignore
+ }
+ case _ => // ignore
+ }
+ }
+ mapping.inverse()
+ }
+
+ val mapping = getProjectMapping
+ val appliedDistribution = requiredDistribution.apply(mapping)
+ // If both distribution and collation can be satisfied, satisfy both. If only distribution
+ // can be satisfied, only satisfy distribution. There is no possibility to only satisfy
+ // collation here except for there is no distribution requirement.
+ if ((!requiredDistribution.isTop) && (appliedDistribution eq FlinkRelDistribution.ANY)) {
+ return None
+ }
+
+ val requiredCollation = requiredTraitSet.getTrait(RelCollationTraitDef.INSTANCE)
+ val appliedCollation = TraitUtil.apply(requiredCollation, mapping)
+ val canCollationPushedDown = !appliedCollation.getFieldCollations.isEmpty
+ // If required traits only contains collation requirements, but collation keys are not columns
+ // from input, then no need to satisfy required traits.
+ if ((appliedDistribution eq FlinkRelDistribution.ANY) && !canCollationPushedDown) {
+ return None
+ }
+
+ var inputRequiredTraits = getInput.getTraitSet
+ var providedTraits = getTraitSet
+ if (!appliedDistribution.isTop) {
+ inputRequiredTraits = inputRequiredTraits.replace(appliedDistribution)
+ providedTraits = providedTraits.replace(requiredDistribution)
+ }
+ if (canCollationPushedDown) {
+ inputRequiredTraits = inputRequiredTraits.replace(appliedCollation)
+ providedTraits = providedTraits.replace(requiredCollation)
+ }
+ val newInput = RelOptRule.convert(getInput, inputRequiredTraits)
+ Some(copy(providedTraits, Seq(newInput)))
+ }
+
+ //~ ExecNode methods -----------------------------------------------------------
+
+ override def getDamBehavior = DamBehavior.PIPELINED
+
+ override def getInputNodes: util.List[ExecNode[BatchPlanner, _]] =
+ List(getInput.asInstanceOf[ExecNode[BatchPlanner, _]])
+
+ override def replaceInputNode(
+ ordinalInParent: Int,
+ newInputNode: ExecNode[BatchPlanner, _]): Unit = {
+ replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCalc.scala
new file mode 100644
index 0000000..7adf0ee
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCalc.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.batch
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Calc
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext
+import org.apache.flink.table.planner.delegation.BatchPlanner
+import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCalc
+
+/**
+ * Batch physical RelNode for Python ScalarFunctions.
+ */
+class BatchExecPythonCalc(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputRel: RelNode,
+ calcProgram: RexProgram,
+ outputRowType: RelDataType)
+ extends BatchExecCalcBase(
+ cluster,
+ traitSet,
+ inputRel,
+ calcProgram,
+ outputRowType)
+ with CommonPythonCalc {
+
+ override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = {
+ new BatchExecPythonCalc(cluster, traitSet, child, program, outputRowType)
+ }
+
+ override protected def translateToPlanInternal(planner: BatchPlanner): Transformation[BaseRow] = {
+ val inputTransform = getInputNodes.get(0).translateToPlan(planner)
+ .asInstanceOf[Transformation[BaseRow]]
+ val config = planner.getTableConfig
+ val ctx = CodeGeneratorContext(config)
+ createOneInputTransformation(
+ inputTransform,
+ inputsContainSingleton = false,
+ calcProgram,
+ getRelDetailedDescription,
+ config,
+ ctx,
+ cluster,
+ getRowType,
+ "BatchExecCalc")
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala
index dc98107..11e534d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala
@@ -24,9 +24,6 @@
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.{CalcCodeGenerator, CodeGeneratorContext}
import org.apache.flink.table.planner.delegation.StreamPlanner
-import org.apache.flink.table.planner.plan.nodes.common.CommonCalc
-import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode}
-import org.apache.flink.table.planner.plan.utils.RelExplainUtil
import org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
@@ -36,10 +33,6 @@
import org.apache.calcite.rel.core.Calc
import org.apache.calcite.rex.RexProgram
-import java.util
-
-import scala.collection.JavaConversions._
-
/**
* Stream physical RelNode for [[Calc]].
*/
@@ -49,37 +42,12 @@
inputRel: RelNode,
calcProgram: RexProgram,
outputRowType: RelDataType)
- extends CommonCalc(cluster, traitSet, inputRel, calcProgram)
- with StreamPhysicalRel
- with StreamExecNode[BaseRow] {
-
- override def producesUpdates: Boolean = false
-
- override def needsUpdatesAsRetraction(input: RelNode): Boolean = false
-
- override def consumesRetractions: Boolean = false
-
- override def producesRetractions: Boolean = false
-
- override def requireWatermark: Boolean = false
-
- override def deriveRowType(): RelDataType = outputRowType
+ extends StreamExecCalcBase(cluster, traitSet, inputRel, calcProgram, outputRowType) {
override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = {
new StreamExecCalc(cluster, traitSet, child, program, outputRowType)
}
- //~ ExecNode methods -----------------------------------------------------------
-
- override def getInputNodes: util.List[ExecNode[StreamPlanner, _]] =
- List(getInput.asInstanceOf[ExecNode[StreamPlanner, _]])
-
- override def replaceInputNode(
- ordinalInParent: Int,
- newInputNode: ExecNode[StreamPlanner, _]): Unit = {
- replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
- }
-
override protected def translateToPlanInternal(
planner: StreamPlanner): Transformation[BaseRow] = {
val config = planner.getTableConfig
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalcBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalcBase.scala
new file mode 100644
index 0000000..662d32d
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalcBase.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Calc
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.planner.delegation.StreamPlanner
+import org.apache.flink.table.planner.plan.nodes.common.CommonCalc
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode}
+
+import scala.collection.JavaConversions._
+
+/**
+ * Base stream physical RelNode for [[Calc]].
+ */
+abstract class StreamExecCalcBase(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputRel: RelNode,
+ calcProgram: RexProgram,
+ outputRowType: RelDataType)
+ extends CommonCalc(cluster, traitSet, inputRel, calcProgram)
+ with StreamPhysicalRel
+ with StreamExecNode[BaseRow] {
+
+ override def producesUpdates: Boolean = false
+
+ override def needsUpdatesAsRetraction(input: RelNode): Boolean = false
+
+ override def consumesRetractions: Boolean = false
+
+ override def producesRetractions: Boolean = false
+
+ override def requireWatermark: Boolean = false
+
+ override def deriveRowType(): RelDataType = outputRowType
+
+ //~ ExecNode methods -----------------------------------------------------------
+
+ override def getInputNodes: util.List[ExecNode[StreamPlanner, _]] =
+ List(getInput.asInstanceOf[ExecNode[StreamPlanner, _]])
+
+ override def replaceInputNode(
+ ordinalInParent: Int,
+ newInputNode: ExecNode[StreamPlanner, _]): Unit = {
+ replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
+ }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCalc.scala
new file mode 100644
index 0000000..0bebc5e
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCalc.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Calc
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext
+import org.apache.flink.table.planner.delegation.StreamPlanner
+import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCalc
+import org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
+
+/**
+ * Stream physical RelNode for Python ScalarFunctions.
+ */
+class StreamExecPythonCalc(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputRel: RelNode,
+ calcProgram: RexProgram,
+ outputRowType: RelDataType)
+ extends StreamExecCalcBase(
+ cluster,
+ traitSet,
+ inputRel,
+ calcProgram,
+ outputRowType)
+ with CommonPythonCalc {
+
+ override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = {
+ new StreamExecPythonCalc(cluster, traitSet, child, program, outputRowType)
+ }
+
+ override protected def translateToPlanInternal(
+ planner: StreamPlanner): Transformation[BaseRow] = {
+ val inputTransform = getInputNodes.get(0).translateToPlan(planner)
+ .asInstanceOf[Transformation[BaseRow]]
+ val config = planner.getTableConfig
+ val ctx = CodeGeneratorContext(config).setOperatorBaseClass(
+ classOf[AbstractProcessStreamOperator[BaseRow]])
+ createOneInputTransformation(
+ inputTransform,
+ inputsContainSingleton(),
+ calcProgram,
+ getRelDetailedDescription,
+ config,
+ ctx,
+ cluster,
+ getRowType,
+ "StreamExecCalc")
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 10aaef4..9dd7ef4 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -352,7 +352,9 @@
// transpose calc past snapshot
CalcSnapshotTransposeRule.INSTANCE,
// merge calc after calc transpose
- FlinkCalcMergeRule.INSTANCE
+ FlinkCalcMergeRule.INSTANCE,
+ // Rule that splits python ScalarFunctions from java/scala ScalarFunctions
+ PythonScalarFunctionSplitRule.INSTANCE
)
/**
@@ -367,6 +369,7 @@
BatchExecValuesRule.INSTANCE,
// calc
BatchExecCalcRule.INSTANCE,
+ BatchExecPythonCalcRule.INSTANCE,
// union
BatchExecUnionRule.INSTANCE,
// sort
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index 6268003..e443183 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -337,7 +337,9 @@
// transpose calc past snapshot
CalcSnapshotTransposeRule.INSTANCE,
// merge calc after calc transpose
- FlinkCalcMergeRule.INSTANCE
+ FlinkCalcMergeRule.INSTANCE,
+ // Rule that splits python ScalarFunctions from java/scala ScalarFunctions.
+ PythonScalarFunctionSplitRule.INSTANCE
)
/**
@@ -353,6 +355,7 @@
StreamExecValuesRule.INSTANCE,
// calc
StreamExecCalcRule.INSTANCE,
+ StreamExecPythonCalcRule.INSTANCE,
// union
StreamExecUnionRule.INSTANCE,
// sort
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PythonScalarFunctionSplitRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PythonScalarFunctionSplitRule.scala
new file mode 100644
index 0000000..fa219ca
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PythonScalarFunctionSplitRule.scala
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexProgram}
+import org.apache.calcite.sql.validate.SqlValidatorUtil
+import org.apache.flink.table.functions.{FunctionLanguage, ScalarFunction}
+import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc
+import org.apache.flink.table.planner.plan.utils.PythonUtil.containsFunctionOf
+import org.apache.flink.table.planner.plan.utils.{InputRefVisitor, RexDefaultVisitor}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+ * Rule that splits [[FlinkLogicalCalc]] into multiple [[FlinkLogicalCalc]]s. After this rule
+ * is applied, each [[FlinkLogicalCalc]] will only contain Python [[ScalarFunction]]s or Java
+ * [[ScalarFunction]]s. This is to ensure that the Python [[ScalarFunction]]s which could be
+ * executed in a batch are grouped into the same [[FlinkLogicalCalc]] node.
+ */
+class PythonScalarFunctionSplitRule extends RelOptRule(
+ operand(classOf[FlinkLogicalCalc], any),
+ "PythonScalarFunctionSplitRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+ val program = calc.getProgram
+
+ // This rule matches if one of the following cases is met:
+ // 1. There are Python functions and Java functions mixed in the Calc
+ // 2. There are Python functions in the condition of the Calc
+ (program.getExprList.exists(containsFunctionOf(_, FunctionLanguage.PYTHON)) &&
+ program.getExprList.exists(containsFunctionOf(_, FunctionLanguage.JVM))) ||
+ Option(program.getCondition)
+ .map(program.expandLocalRef)
+ .exists(containsFunctionOf(_, FunctionLanguage.PYTHON))
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+ val input = calc.getInput
+ val rexBuilder = call.builder().getRexBuilder
+ val program = calc.getProgram
+ val extractedRexCalls = new mutable.ArrayBuffer[RexCall]()
+
+ val convertPythonFunction =
+ program.getProjectList
+ .map(program.expandLocalRef)
+ .exists(containsFunctionOf(_, FunctionLanguage.JVM, recursive = false)) ||
+ Option(program.getCondition)
+ .map(program.expandLocalRef)
+ .exists(expr =>
+ containsFunctionOf(expr, FunctionLanguage.JVM, recursive = false) ||
+ containsFunctionOf(expr, FunctionLanguage.PYTHON))
+
+ val extractedFunctionOffset = input.getRowType.getFieldCount
+ val splitter = new ScalarFunctionSplitter(
+ extractedFunctionOffset,
+ extractedRexCalls,
+ convertPythonFunction)
+
+ val newProjects = program.getProjectList.map(program.expandLocalRef(_).accept(splitter))
+ val newCondition = Option(program.getCondition).map(program.expandLocalRef(_).accept(splitter))
+ val accessedFields = extractRefInputFields(newProjects, newCondition, extractedFunctionOffset)
+
+ val bottomCalcProjects =
+ accessedFields.map(RexInputRef.of(_, input.getRowType)) ++ extractedRexCalls
+ val bottomCalcFieldNames = SqlValidatorUtil.uniquify(
+ accessedFields.map(i => input.getRowType.getFieldNames.get(i)).toSeq ++
+ extractedRexCalls.indices.map("f" + _),
+ rexBuilder.getTypeFactory.getTypeSystem.isSchemaCaseSensitive)
+
+ val bottomCalc = new FlinkLogicalCalc(
+ calc.getCluster,
+ calc.getTraitSet,
+ input,
+ RexProgram.create(
+ input.getRowType,
+ bottomCalcProjects.toList,
+ null,
+ bottomCalcFieldNames,
+ rexBuilder))
+
+ val inputRewriter = new ExtractedFunctionInputRewriter(extractedFunctionOffset, accessedFields)
+ val topCalc = new FlinkLogicalCalc(
+ calc.getCluster,
+ calc.getTraitSet,
+ bottomCalc,
+ RexProgram.create(
+ bottomCalc.getRowType,
+ newProjects.map(_.accept(inputRewriter)),
+ newCondition.map(_.accept(inputRewriter)).orNull,
+ calc.getRowType,
+ rexBuilder))
+
+ call.transformTo(topCalc)
+ }
+
+ /**
+ * Extracts the indices of the input fields referred by the specified projects and condition.
+ */
+ private def extractRefInputFields(
+ projects: Seq[RexNode],
+ condition: Option[RexNode],
+ inputFieldsCount: Int): Array[Int] = {
+ val visitor = new InputRefVisitor
+
+ // extract referenced input fields from projections
+ projects.foreach(exp => exp.accept(visitor))
+
+ // extract referenced input fields from condition
+ condition.foreach(_.accept(visitor))
+
+ // fields of indexes greater than inputFieldsCount is the extracted functions and
+ // should be filtered as they are not from the original input
+ visitor.getFields.filter(_ < inputFieldsCount)
+ }
+}
+
+private class ScalarFunctionSplitter(
+ extractedFunctionOffset: Int,
+ extractedRexCalls: mutable.ArrayBuffer[RexCall],
+ convertPythonFunction: Boolean)
+ extends RexDefaultVisitor[RexNode] {
+
+ override def visitCall(call: RexCall): RexNode = {
+ call.getOperator match {
+ case sfc: ScalarSqlFunction if sfc.scalarFunction.getLanguage ==
+ FunctionLanguage.PYTHON =>
+ visit(convertPythonFunction, call)
+
+ case _ =>
+ visit(!convertPythonFunction, call)
+ }
+ }
+
+ override def visitNode(rexNode: RexNode): RexNode = rexNode
+
+ private def visit(needConvert: Boolean, call: RexCall): RexNode = {
+ if (needConvert) {
+ val newNode = new RexInputRef(
+ extractedFunctionOffset + extractedRexCalls.length, call.getType)
+ extractedRexCalls.append(call)
+ newNode
+ } else {
+ call.clone(call.getType, call.getOperands.asScala.map(_.accept(this)))
+ }
+ }
+}
+
+/**
+ * Rewrite field accesses of a RexNode as not all the fields from the original input are forwarded:
+ * 1) Fields of index greater than or equal to extractedFunctionOffset refer to the
+ * extracted function.
+ * 2) Fields of index less than extractedFunctionOffset refer to the original input field.
+ *
+ * @param extractedFunctionOffset the original start offset of the extracted functions
+ * @param accessedFields the accessed fields which will be forwarded
+ */
+private class ExtractedFunctionInputRewriter(
+ extractedFunctionOffset: Int,
+ accessedFields: Array[Int])
+ extends RexDefaultVisitor[RexNode] {
+
+ /** old input fields ref index -> new input fields ref index mappings */
+ private val fieldMap: Map[Int, Int] = accessedFields.zipWithIndex.toMap
+
+ override def visitInputRef(inputRef: RexInputRef): RexNode = {
+ if (inputRef.getIndex >= extractedFunctionOffset) {
+ new RexInputRef(
+ inputRef.getIndex - extractedFunctionOffset + accessedFields.length,
+ inputRef.getType)
+ } else {
+ new RexInputRef(
+ fieldMap.getOrElse(inputRef.getIndex,
+ throw new IllegalArgumentException("input field contains invalid index")),
+ inputRef.getType)
+ }
+ }
+
+ override def visitCall(call: RexCall): RexNode = {
+ call.clone(call.getType, call.getOperands.asScala.map(_.accept(this)))
+ }
+
+ override def visitNode(rexNode: RexNode): RexNode = rexNode
+}
+
+object PythonScalarFunctionSplitRule {
+ val INSTANCE: RelOptRule = new PythonScalarFunctionSplitRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecCalcRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecCalcRule.scala
index 7ff972b..c6c2e93 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecCalcRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecCalcRule.scala
@@ -21,10 +21,13 @@
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecCalc
-
-import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.functions.FunctionLanguage
+import org.apache.flink.table.planner.plan.utils.PythonUtil.containsFunctionOf
+
+import scala.collection.JavaConverters._
/**
* Rule that converts [[FlinkLogicalCalc]] to [[BatchExecCalc]].
@@ -36,6 +39,12 @@
FlinkConventions.BATCH_PHYSICAL,
"BatchExecCalcRule") {
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+ val program = calc.getProgram
+ !program.getExprList.asScala.exists(containsFunctionOf(_, FunctionLanguage.PYTHON))
+ }
+
def convert(rel: RelNode): RelNode = {
val calc = rel.asInstanceOf[FlinkLogicalCalc]
val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonCalcRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonCalcRule.scala
new file mode 100644
index 0000000..1a16626
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonCalcRule.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.functions.FunctionLanguage
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecPythonCalc
+import org.apache.flink.table.planner.plan.utils.PythonUtil.containsFunctionOf
+
+import scala.collection.JavaConverters._
+
+/**
+ * Rule that converts [[FlinkLogicalCalc]] to [[BatchExecPythonCalc]].
+ */
+class BatchExecPythonCalcRule
+ extends ConverterRule(
+ classOf[FlinkLogicalCalc],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchExecPythonCalcRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+ val program = calc.getProgram
+ program.getExprList.asScala.exists(containsFunctionOf(_, FunctionLanguage.PYTHON))
+ }
+
+ def convert(rel: RelNode): RelNode = {
+ val calc = rel.asInstanceOf[FlinkLogicalCalc]
+ val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
+ val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.BATCH_PHYSICAL)
+
+ new BatchExecPythonCalc(
+ rel.getCluster,
+ newTrait,
+ newInput,
+ calc.getProgram,
+ rel.getRowType)
+ }
+}
+
+object BatchExecPythonCalcRule {
+ val INSTANCE: RelOptRule = new BatchExecPythonCalcRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecCalcRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecCalcRule.scala
index fafc799..1626e2c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecCalcRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecCalcRule.scala
@@ -21,10 +21,13 @@
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc
-
-import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
+import org.apache.flink.table.planner.plan.utils.PythonUtil.containsFunctionOf
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.functions.FunctionLanguage
+
+import scala.collection.JavaConverters._
/**
* Rule that converts [[FlinkLogicalCalc]] to [[StreamExecCalc]].
@@ -36,6 +39,12 @@
FlinkConventions.STREAM_PHYSICAL,
"StreamExecCalcRule") {
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+ val program = calc.getProgram
+ !program.getExprList.asScala.exists(containsFunctionOf(_, FunctionLanguage.PYTHON))
+ }
+
def convert(rel: RelNode): RelNode = {
val calc: FlinkLogicalCalc = rel.asInstanceOf[FlinkLogicalCalc]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonCalcRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonCalcRule.scala
new file mode 100644
index 0000000..858f4d2
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonCalcRule.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.functions.FunctionLanguage
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc
+import org.apache.flink.table.planner.plan.utils.PythonUtil.containsFunctionOf
+
+import scala.collection.JavaConverters._
+
+/**
+ * Rule that converts [[FlinkLogicalCalc]] to [[StreamExecPythonCalc]].
+ */
+class StreamExecPythonCalcRule
+ extends ConverterRule(
+ classOf[FlinkLogicalCalc],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamExecPythonCalcRule") {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+ val program = calc.getProgram
+ program.getExprList.asScala.exists(containsFunctionOf(_, FunctionLanguage.PYTHON))
+ }
+
+ def convert(rel: RelNode): RelNode = {
+ val calc: FlinkLogicalCalc = rel.asInstanceOf[FlinkLogicalCalc]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
+ val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.STREAM_PHYSICAL)
+
+ new StreamExecPythonCalc(
+ rel.getCluster,
+ traitSet,
+ newInput,
+ calc.getProgram,
+ rel.getRowType)
+ }
+}
+
+object StreamExecPythonCalcRule {
+ val INSTANCE: RelOptRule = new StreamExecPythonCalcRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala
new file mode 100644
index 0000000..3601854
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils.python
+
+import java.nio.charset.StandardCharsets
+import java.sql.{Date, Time, Timestamp}
+import java.time.{LocalDate, LocalDateTime, LocalTime}
+import java.util.TimeZone
+import java.util.function.BiConsumer
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.io.InputFormat
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.java.io.CollectionInputFormat
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo}
+import org.apache.flink.core.io.InputSplit
+import org.apache.flink.table.api.{TableConfig, TableSchema, Types}
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.table.functions.python.PythonEnv
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, PythonFunctionCodeGenerator}
+import org.apache.flink.table.sources.InputFormatTableSource
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+object PythonTableUtils {
+
+ /**
+ * Creates a [[ScalarFunction]] for the specified Python ScalarFunction.
+ *
+ * @param funcName class name of the user-defined function. Must be a valid Java class identifier
+ * @param serializedScalarFunction serialized Python scalar function
+ * @param inputTypes input data types
+ * @param resultType expected result type
+ * @param deterministic the determinism of the function's results
+ * @param pythonEnv the Python execution environment
+ * @return A generated Java ScalarFunction representation for the specified Python ScalarFunction
+ */
+ def createPythonScalarFunction(
+ config: TableConfig,
+ funcName: String,
+ serializedScalarFunction: Array[Byte],
+ inputTypes: Array[TypeInformation[_]],
+ resultType: TypeInformation[_],
+ deterministic: Boolean,
+ pythonEnv: PythonEnv): ScalarFunction =
+ PythonFunctionCodeGenerator.generateScalarFunction(
+ CodeGeneratorContext(config),
+ funcName,
+ serializedScalarFunction,
+ inputTypes,
+ resultType,
+ deterministic,
+ pythonEnv)
+
+ /**
+ * Wrap the unpickled python data with an InputFormat. It will be passed to
+ * PythonInputFormatTableSource later.
+ *
+ * @param data The unpickled python data.
+ * @param dataType The python data type.
+ * @param config The execution config used to create serializer.
+ * @return An InputFormat containing the python data.
+ */
+ def getInputFormat(
+ data: java.util.List[Array[Object]],
+ dataType: TypeInformation[Row],
+ config: ExecutionConfig): InputFormat[Row, _] = {
+ val converter = convertTo(dataType)
+ new CollectionInputFormat(data.map(converter(_).asInstanceOf[Row]),
+ dataType.createSerializer(config))
+ }
+
+ /**
+ * Creates a converter that converts `obj` to the type specified by the data type, or returns
+ * null if the type of obj is unexpected because Python doesn't enforce the type.
+ */
+ private def convertTo(dataType: TypeInformation[_]): Any => Any = dataType match {
+ case _ if dataType == Types.BOOLEAN => (obj: Any) => nullSafeConvert(obj) {
+ case b: Boolean => b
+ }
+
+ case _ if dataType == Types.BYTE => (obj: Any) => nullSafeConvert(obj) {
+ case c: Byte => c
+ case c: Short => c.toByte
+ case c: Int => c.toByte
+ case c: Long => c.toByte
+ }
+
+ case _ if dataType == Types.SHORT => (obj: Any) => nullSafeConvert(obj) {
+ case c: Byte => c.toShort
+ case c: Short => c
+ case c: Int => c.toShort
+ case c: Long => c.toShort
+ }
+
+ case _ if dataType == Types.INT => (obj: Any) => nullSafeConvert(obj) {
+ case c: Byte => c.toInt
+ case c: Short => c.toInt
+ case c: Int => c
+ case c: Long => c.toInt
+ }
+
+ case _ if dataType == Types.LONG => (obj: Any) => nullSafeConvert(obj) {
+ case c: Byte => c.toLong
+ case c: Short => c.toLong
+ case c: Int => c.toLong
+ case c: Long => c
+ }
+
+ case _ if dataType == Types.FLOAT => (obj: Any) => nullSafeConvert(obj) {
+ case c: Float => c
+ case c: Double => c.toFloat
+ }
+
+ case _ if dataType == Types.DOUBLE => (obj: Any) => nullSafeConvert(obj) {
+ case c: Float => c.toDouble
+ case c: Double => c
+ }
+
+ case _ if dataType == Types.DECIMAL => (obj: Any) => nullSafeConvert(obj) {
+ case c: java.math.BigDecimal => c
+ }
+
+ case _ if dataType == Types.SQL_DATE => (obj: Any) => nullSafeConvert(obj) {
+ case c: Int =>
+ val millisLocal = c.toLong * 86400000
+ val millisUtc = millisLocal - getOffsetFromLocalMillis(millisLocal)
+ new Date(millisUtc)
+ }
+
+ case _ if dataType == Types.SQL_TIME => (obj: Any) => nullSafeConvert(obj) {
+ case c: Long => new Time(c / 1000)
+ case c: Int => new Time(c.toLong / 1000)
+ }
+
+ case _ if dataType == Types.SQL_TIMESTAMP => (obj: Any) => nullSafeConvert(obj) {
+ case c: Long => new Timestamp(c / 1000)
+ case c: Int => new Timestamp(c.toLong / 1000)
+ }
+
+ case _ if dataType == Types.INTERVAL_MILLIS() => (obj: Any) => nullSafeConvert(obj) {
+ case c: Long => c / 1000
+ case c: Int => c.toLong / 1000
+ }
+
+ case _ if dataType == Types.STRING => (obj: Any) => nullSafeConvert(obj) {
+ case _ => obj.toString
+ }
+
+ case PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO =>
+ (obj: Any) =>
+ nullSafeConvert(obj) {
+ case c: String => c.getBytes(StandardCharsets.UTF_8)
+ case c if c.getClass.isArray && c.getClass.getComponentType.getName == "byte" => c
+ }
+
+ case _: PrimitiveArrayTypeInfo[_] |
+ _: BasicArrayTypeInfo[_, _] |
+ _: ObjectArrayTypeInfo[_, _] =>
+ var boxed = false
+ val elementType = dataType match {
+ case p: PrimitiveArrayTypeInfo[_] =>
+ p.getComponentType
+ case b: BasicArrayTypeInfo[_, _] =>
+ boxed = true
+ b.getComponentInfo
+ case o: ObjectArrayTypeInfo[_, _] =>
+ boxed = true
+ o.getComponentInfo
+ }
+ val elementFromJava = convertTo(elementType)
+
+ (obj: Any) => nullSafeConvert(obj) {
+ case c: java.util.List[_] =>
+ createArray(elementType,
+ c.size(),
+ i => elementFromJava(c.get(i)),
+ boxed)
+ case c if c.getClass.isArray =>
+ createArray(elementType,
+ c.asInstanceOf[Array[_]].length,
+ i => elementFromJava(c.asInstanceOf[Array[_]](i)),
+ boxed)
+ }
+
+ case m: MapTypeInfo[_, _] =>
+ val keyFromJava = convertTo(m.getKeyTypeInfo)
+ val valueFromJava = convertTo(m.getValueTypeInfo)
+
+ (obj: Any) => nullSafeConvert(obj) {
+ case javaMap: java.util.Map[_, _] =>
+ val map = new java.util.HashMap[Any, Any]
+ javaMap.forEach(new BiConsumer[Any, Any] {
+ override def accept(k: Any, v: Any): Unit =
+ map.put(keyFromJava(k), valueFromJava(v))
+ })
+ map
+ }
+
+ case rowType: RowTypeInfo =>
+ val fieldsFromJava = rowType.getFieldTypes.map(f => convertTo(f))
+
+ (obj: Any) => nullSafeConvert(obj) {
+ case c if c.getClass.isArray =>
+ val r = c.asInstanceOf[Array[_]]
+ if (r.length != rowType.getFieldTypes.length) {
+ throw new IllegalStateException(
+ s"Input row doesn't have expected number of values required by the schema. " +
+ s"${rowType.getFieldTypes.length} fields are required while ${r.length} " +
+ s"values are provided."
+ )
+ }
+
+ val row = new Row(r.length)
+ var i = 0
+ while (i < r.length) {
+ row.setField(i, fieldsFromJava(i)(r(i)))
+ i += 1
+ }
+ row
+ }
+
+ // UserDefinedType
+ case _ => (obj: Any) => obj
+ }
+
+ private def nullSafeConvert(input: Any)(f: PartialFunction[Any, Any]): Any = {
+ if (input == null) {
+ null
+ } else {
+ f.applyOrElse(input, {
+ _: Any => null
+ })
+ }
+ }
+
+ private def createArray(
+ elementType: TypeInformation[_],
+ length: Int,
+ getElement: Int => Any,
+ boxed: Boolean = false): Array[_] = {
+ elementType match {
+ case BasicTypeInfo.BOOLEAN_TYPE_INFO =>
+ if (!boxed) {
+ val array = new Array[Boolean](length)
+ for (i <- 0 until length) {
+ array(i) = getElement(i).asInstanceOf[Boolean]
+ }
+ array
+ } else {
+ val array = new Array[java.lang.Boolean](length)
+ for (i <- 0 until length) {
+ if (getElement(i) != null) {
+ array(i) = java.lang.Boolean.valueOf(getElement(i).asInstanceOf[Boolean])
+ } else {
+ array(i) = null
+ }
+ }
+ array
+ }
+
+ case BasicTypeInfo.BYTE_TYPE_INFO =>
+ if (!boxed) {
+ val array = new Array[Byte](length)
+ for (i <- 0 until length) {
+ array(i) = getElement(i).asInstanceOf[Byte]
+ }
+ array
+ } else {
+ val array = new Array[java.lang.Byte](length)
+ for (i <- 0 until length) {
+ if (getElement(i) != null) {
+ array(i) = java.lang.Byte.valueOf(getElement(i).asInstanceOf[Byte])
+ } else {
+ array(i) = null
+ }
+ }
+ array
+ }
+
+ case BasicTypeInfo.SHORT_TYPE_INFO =>
+ if (!boxed) {
+ val array = new Array[Short](length)
+ for (i <- 0 until length) {
+ array(i) = getElement(i).asInstanceOf[Short]
+ }
+ array
+ } else {
+ val array = new Array[java.lang.Short](length)
+ for (i <- 0 until length) {
+ if (getElement(i) != null) {
+ array(i) = java.lang.Short.valueOf(getElement(i).asInstanceOf[Short])
+ } else {
+ array(i) = null
+ }
+ }
+ array
+ }
+
+ case BasicTypeInfo.INT_TYPE_INFO =>
+ if (!boxed) {
+ val array = new Array[Int](length)
+ for (i <- 0 until length) {
+ array(i) = getElement(i).asInstanceOf[Int]
+ }
+ array
+ } else {
+ val array = new Array[java.lang.Integer](length)
+ for (i <- 0 until length) {
+ if (getElement(i) != null) {
+ array(i) = java.lang.Integer.valueOf(getElement(i).asInstanceOf[Int])
+ } else {
+ array(i) = null
+ }
+ }
+ array
+ }
+
+ case BasicTypeInfo.LONG_TYPE_INFO =>
+ if (!boxed) {
+ val array = new Array[Long](length)
+ for (i <- 0 until length) {
+ array(i) = getElement(i).asInstanceOf[Long]
+ }
+ array
+ } else {
+ val array = new Array[java.lang.Long](length)
+ for (i <- 0 until length) {
+ if (getElement(i) != null) {
+ array(i) = java.lang.Long.valueOf(getElement(i).asInstanceOf[Long])
+ } else {
+ array(i) = null
+ }
+ }
+ array
+ }
+
+ case BasicTypeInfo.FLOAT_TYPE_INFO =>
+ if (!boxed) {
+ val array = new Array[Float](length)
+ for (i <- 0 until length) {
+ array(i) = getElement(i).asInstanceOf[Float]
+ }
+ array
+ } else {
+ val array = new Array[java.lang.Float](length)
+ for (i <- 0 until length) {
+ if (getElement(i) != null) {
+ array(i) = java.lang.Float.valueOf(getElement(i).asInstanceOf[Float])
+ } else {
+ array(i) = null
+ }
+ }
+ array
+ }
+
+ case BasicTypeInfo.DOUBLE_TYPE_INFO =>
+ if (!boxed) {
+ val array = new Array[Double](length)
+ for (i <- 0 until length) {
+ array(i) = getElement(i).asInstanceOf[Double]
+ }
+ array
+ } else {
+ val array = new Array[java.lang.Double](length)
+ for (i <- 0 until length) {
+ if (getElement(i) != null) {
+ array(i) = java.lang.Double.valueOf(getElement(i).asInstanceOf[Double])
+ } else {
+ array(i) = null
+ }
+ }
+ array
+ }
+
+ case BasicTypeInfo.STRING_TYPE_INFO =>
+ val array = new Array[java.lang.String](length)
+ for (i <- 0 until length) {
+ array(i) = getElement(i).asInstanceOf[java.lang.String]
+ }
+ array
+
+ case _ =>
+ val array = new Array[Object](length)
+ for (i <- 0 until length) {
+ array(i) = getElement(i).asInstanceOf[Object]
+ }
+ array
+ }
+ }
+
+ def getOffsetFromLocalMillis(millisLocal: Long): Int = {
+ val localZone = TimeZone.getDefault
+ var result = localZone.getRawOffset
+ // the actual offset should be calculated based on milliseconds in UTC
+ val offset = localZone.getOffset(millisLocal - result)
+ if (offset != result) {
+ // DayLight Saving Time
+ result = localZone.getOffset(millisLocal - offset)
+ if (result != offset) {
+ // fallback to do the reverse lookup using java.time.LocalDateTime
+ // this should only happen near the start or end of DST
+ val localDate = LocalDate.ofEpochDay(millisLocal / 86400000)
+ val localTime = LocalTime.ofNanoOfDay(
+ Math.floorMod(millisLocal, 86400000) * 1000 * 1000)
+ val localDateTime = LocalDateTime.of(localDate, localTime)
+ val millisEpoch = localDateTime.atZone(localZone.toZoneId).toInstant.toEpochMilli
+ result = (millisLocal - millisEpoch).toInt
+ }
+ }
+ result
+ }
+}
+
+/**
+ * An InputFormatTableSource created by python 'from_element' method.
+ *
+ * @param inputFormat The input format which contains the python data collection,
+ * usually created by PythonTableUtils#getInputFormat method
+ * @param rowTypeInfo The row type info of the python data.
+ * It is generated by the python 'from_element' method.
+ */
+class PythonInputFormatTableSource[Row](
+ inputFormat: InputFormat[Row, _ <: InputSplit],
+ rowTypeInfo: RowTypeInfo
+) extends InputFormatTableSource[Row] {
+
+ override def getInputFormat: InputFormat[Row, _ <: InputSplit] = inputFormat
+
+ override def getTableSchema: TableSchema = TableSchema.fromTypeInfo(rowTypeInfo)
+
+ override def getReturnType: TypeInformation[Row] = rowTypeInfo.asInstanceOf[TypeInformation[Row]]
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
index f3284d0..afb992b 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
@@ -18,9 +18,12 @@
package org.apache.flink.table.planner.runtime.utils;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.FunctionLanguage;
import org.apache.flink.table.functions.ScalarFunction;
import java.util.Arrays;
@@ -154,4 +157,63 @@
}
}
+ /**
+ * Test for Python Scalar Function.
+ */
+ public static class PythonScalarFunction extends ScalarFunction {
+ private final String name;
+
+ public PythonScalarFunction(String name) {
+ this.name = name;
+ }
+
+ public int eval(int i, int j) {
+ return i + j;
+ }
+
+ @Override
+ public TypeInformation<?> getResultType(Class<?>[] signature) {
+ return BasicTypeInfo.INT_TYPE_INFO;
+ }
+
+ @Override
+ public FunctionLanguage getLanguage() {
+ return FunctionLanguage.PYTHON;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
+
+ /**
+ * Test for Python Scalar Function.
+ */
+ public static class BooleanPythonScalarFunction extends ScalarFunction {
+ private final String name;
+
+ public BooleanPythonScalarFunction(String name) {
+ this.name = name;
+ }
+
+ public boolean eval(int i, int j) {
+ return i + j > 1;
+ }
+
+ @Override
+ public TypeInformation<?> getResultType(Class<?>[] signature) {
+ return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+ }
+
+ @Override
+ public FunctionLanguage getLanguage() {
+ return FunctionLanguage.PYTHON;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/PythonCalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/PythonCalcTest.xml
new file mode 100644
index 0000000..e488730
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/PythonCalcTest.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testPythonFunctionMixedWithJavaFunction">
+ <Resource name="sql">
+ <![CDATA[SELECT pyFunc1(a, b), c + 1 FROM MyTable]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(EXPR$0=[pyFunc1($0, $1)], EXPR$1=[+($2, 1)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[f0 AS EXPR$0, +(c, 1) AS EXPR$1])
++- PythonCalc(select=[c, pyFunc1(a, b) AS f0])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ExpressionReductionRulesTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ExpressionReductionRulesTest.xml
index 0433fbe..ae612e2 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ExpressionReductionRulesTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ExpressionReductionRulesTest.xml
@@ -62,8 +62,9 @@
</Resource>
<Resource name="planAfter">
<![CDATA[
-Calc(select=[PyUdf() AS EXPR$0, CAST(2) AS EXPR$1])
-+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Calc(select=[f0 AS EXPR$0, CAST(2) AS EXPR$1])
++- PythonCalc(select=[PyUdf() AS f0])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonScalarFunctionSplitRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonScalarFunctionSplitRuleTest.xml
new file mode 100644
index 0000000..66a06cd
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonScalarFunctionSplitRuleTest.xml
@@ -0,0 +1,164 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testPythonFunctionAsInputOfJavaFunction">
+ <Resource name="sql">
+ <![CDATA[SELECT pyFunc1(a, b) + 1 FROM MyTable]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(EXPR$0=[+(pyFunc1($0, $1), 1)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalCalc(select=[+(f0, 1) AS EXPR$0])
++- FlinkLogicalCalc(select=[pyFunc1(a, b) AS f0])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPythonFunctionMixedWithJavaFunction">
+ <Resource name="sql">
+ <![CDATA[SELECT pyFunc1(a, b), c + 1 FROM MyTable]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(EXPR$0=[pyFunc1($0, $1)], EXPR$1=[+($2, 1)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalCalc(select=[f0 AS EXPR$0, +(c, 1) AS EXPR$1])
++- FlinkLogicalCalc(select=[c, pyFunc1(a, b) AS f0])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPythonFunctionMixedWithJavaFunctionInWhereClause">
+ <Resource name="sql">
+ <![CDATA[SELECT pyFunc1(a, b), c + 1 FROM MyTable WHERE pyFunc2(a, c) > 0]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(EXPR$0=[pyFunc1($0, $1)], EXPR$1=[+($2, 1)])
++- LogicalFilter(condition=[>(pyFunc2($0, $2), 0)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalCalc(select=[f0 AS EXPR$0, +(c, 1) AS EXPR$1], where=[>(f1, 0)])
++- FlinkLogicalCalc(select=[c, pyFunc1(a, b) AS f0, pyFunc2(a, c) AS f1])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testPythonFunctionInWhereClause">
+ <Resource name="sql">
+ <![CDATA[SELECT pyFunc1(a, b) FROM MyTable WHERE pyFunc4(a, c)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(EXPR$0=[pyFunc1($0, $1)])
++- LogicalFilter(condition=[pyFunc4($0, $2)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalCalc(select=[f0 AS EXPR$0], where=[f1])
++- FlinkLogicalCalc(select=[pyFunc1(a, b) AS f0, pyFunc4(a, c) AS f1])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testChainingPythonFunction">
+ <Resource name="sql">
+ <![CDATA[SELECT pyFunc3(pyFunc2(a + pyFunc1(a, c), b), c) FROM MyTable]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(EXPR$0=[pyFunc3(pyFunc2(+($0, pyFunc1($0, $2)), $1), $2)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalCalc(select=[pyFunc3(pyFunc2(f0, b), c) AS EXPR$0])
++- FlinkLogicalCalc(select=[b, c, +(a, f0) AS f0])
+ +- FlinkLogicalCalc(select=[b, c, a, pyFunc1(a, c) AS f0])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testOnlyOnePythonFunction">
+ <Resource name="sql">
+ <![CDATA[SELECT pyFunc1(a, b) FROM MyTable]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(EXPR$0=[pyFunc1($0, $1)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalCalc(select=[pyFunc1(a, b) AS EXPR$0])
++- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testOnlyOnePythonFunctionInWhereClause">
+ <Resource name="sql">
+ <![CDATA[SELECT a, b FROM MyTable WHERE pyFunc4(a, c)]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[pyFunc4($0, $2)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalCalc(select=[a, b], where=[f0])
++- FlinkLogicalCalc(select=[a, b, pyFunc4(a, c) AS f0])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testFieldNameUniquify">
+ <Resource name="sql">
+ <![CDATA[SELECT pyFunc1(f1, f2), f0 + 1 FROM MyTable2]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(EXPR$0=[pyFunc1($1, $2)], EXPR$1=[+($0, 1)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(f0, f1, f2)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+FlinkLogicalCalc(select=[f00 AS EXPR$0, +(f0, 1) AS EXPR$1])
++- FlinkLogicalCalc(select=[f0, pyFunc1(f1, f2) AS f00])
+ +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(f0, f1, f2)]]], fields=[f0, f1, f2])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/PythonCalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/PythonCalcTest.xml
new file mode 100644
index 0000000..e488730
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/PythonCalcTest.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testPythonFunctionMixedWithJavaFunction">
+ <Resource name="sql">
+ <![CDATA[SELECT pyFunc1(a, b), c + 1 FROM MyTable]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(EXPR$0=[pyFunc1($0, $1)], EXPR$1=[+($2, 1)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[f0 AS EXPR$0, +(c, 1) AS EXPR$1])
++- PythonCalc(select=[c, pyFunc1(a, b) AS f0])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/PythonCalcTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/PythonCalcTest.scala
new file mode 100644
index 0000000..4c01bfd
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/PythonCalcTest.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction
+import org.apache.flink.table.planner.utils.TableTestBase
+import org.junit.{Before, Test}
+
+class PythonCalcTest extends TableTestBase {
+ private val util = batchTestUtil()
+
+ @Before
+ def setup(): Unit = {
+ util.addTableSource[(Int, Int, Int)]("MyTable", 'a, 'b, 'c)
+ util.addFunction("pyFunc1", new PythonScalarFunction("pyFunc1"))
+ }
+
+ @Test
+ def testPythonFunctionMixedWithJavaFunction(): Unit = {
+ val sqlQuery = "SELECT pyFunc1(a, b), c + 1 FROM MyTable"
+ util.verifyPlan(sqlQuery)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PythonScalarFunctionSplitRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PythonScalarFunctionSplitRuleTest.scala
new file mode 100644
index 0000000..d99d39f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PythonScalarFunctionSplitRuleTest.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions
+import org.apache.flink.table.planner.plan.optimize.program._
+import org.apache.flink.table.planner.plan.rules.{FlinkBatchRuleSets, FlinkStreamRuleSets}
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.{BooleanPythonScalarFunction, PythonScalarFunction}
+import org.apache.flink.table.planner.utils.TableTestBase
+import org.junit.{Before, Test}
+
+/**
+ * Test for [[PythonScalarFunctionSplitRule]].
+ */
+class PythonScalarFunctionSplitRuleTest extends TableTestBase {
+
+ private val util = batchTestUtil()
+
+ @Before
+ def setup(): Unit = {
+ val programs = new FlinkChainedProgram[BatchOptimizeContext]()
+ programs.addLast(
+ "logical",
+ FlinkVolcanoProgramBuilder.newBuilder
+ .add(FlinkBatchRuleSets.LOGICAL_OPT_RULES)
+ .setRequiredOutputTraits(Array(FlinkConventions.LOGICAL))
+ .build())
+ programs.addLast(
+ "logical_rewrite",
+ FlinkHepRuleSetProgramBuilder.newBuilder
+ .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+ .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(FlinkStreamRuleSets.LOGICAL_REWRITE)
+ .build())
+ util.replaceBatchProgram(programs)
+
+ util.addTableSource[(Int, Int, Int)]("MyTable", 'a, 'b, 'c)
+ util.addFunction("pyFunc1", new PythonScalarFunction("pyFunc1"))
+ util.addFunction("pyFunc2", new PythonScalarFunction("pyFunc2"))
+ util.addFunction("pyFunc3", new PythonScalarFunction("pyFunc3"))
+ util.addFunction("pyFunc4", new BooleanPythonScalarFunction("pyFunc4"))
+ }
+
+ @Test
+ def testPythonFunctionAsInputOfJavaFunction(): Unit = {
+ val sqlQuery = "SELECT pyFunc1(a, b) + 1 FROM MyTable"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testPythonFunctionMixedWithJavaFunction(): Unit = {
+ val sqlQuery = "SELECT pyFunc1(a, b), c + 1 FROM MyTable"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testPythonFunctionMixedWithJavaFunctionInWhereClause(): Unit = {
+ val sqlQuery = "SELECT pyFunc1(a, b), c + 1 FROM MyTable WHERE pyFunc2(a, c) > 0"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testPythonFunctionInWhereClause(): Unit = {
+ val sqlQuery = "SELECT pyFunc1(a, b) FROM MyTable WHERE pyFunc4(a, c)"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testChainingPythonFunction(): Unit = {
+ val sqlQuery = "SELECT pyFunc3(pyFunc2(a + pyFunc1(a, c), b), c) FROM MyTable"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testOnlyOnePythonFunction(): Unit = {
+ val sqlQuery = "SELECT pyFunc1(a, b) FROM MyTable"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testOnlyOnePythonFunctionInWhereClause(): Unit = {
+ val sqlQuery = "SELECT a, b FROM MyTable WHERE pyFunc4(a, c)"
+ util.verifyPlan(sqlQuery)
+ }
+
+ @Test
+ def testFieldNameUniquify(): Unit = {
+ util.addTableSource[(Int, Int, Int)]("MyTable2", 'f0, 'f1, 'f2)
+ val sqlQuery = "SELECT pyFunc1(f1, f2), f0 + 1 FROM MyTable2"
+ util.verifyPlan(sqlQuery)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/PythonCalcTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/PythonCalcTest.scala
new file mode 100644
index 0000000..d25b20f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/PythonCalcTest.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction
+import org.apache.flink.table.planner.utils.TableTestBase
+import org.junit.{Before, Test}
+
+class PythonCalcTest extends TableTestBase {
+ private val util = streamTestUtil()
+
+ @Before
+ def setup(): Unit = {
+ util.addTableSource[(Int, Int, Int)]("MyTable", 'a, 'b, 'c)
+ util.addFunction("pyFunc1", new PythonScalarFunction("pyFunc1"))
+ }
+
+ @Test
+ def testPythonFunctionMixedWithJavaFunction(): Unit = {
+ val sqlQuery = "SELECT pyFunc1(a, b), c + 1 FROM MyTable"
+ util.verifyPlan(sqlQuery)
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/PythonFunctionCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/PythonFunctionCodeGenerator.scala
index e97e7b7..332d573 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/PythonFunctionCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/PythonFunctionCodeGenerator.scala
@@ -34,7 +34,7 @@
/**
* Generates a [[ScalarFunction]] for the specified Python user-defined function.
*
- * @param name class name of the user-defined function. Must be a valid Java class identifier
+ * @param name name of the user-defined function
* @param serializedScalarFunction serialized Python scalar function
* @param inputTypes input data types
* @param resultType expected result type