[FLINK-14272][python][table-planner-blink] Support Blink planner for Python UDF(Scalar Function)

Ports python User-Defined Scalar Function from flink planner to blink planner.

This closes #9890.
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 940aafc..e4af611 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -47,8 +47,9 @@
 
     __metaclass__ = ABCMeta
 
-    def __init__(self, j_tenv, serializer=PickleSerializer()):
+    def __init__(self, j_tenv, is_blink_planner, serializer=PickleSerializer()):
         self._j_tenv = j_tenv
+        self._is_blink_planner = is_blink_planner
         self._serializer = serializer
 
     def from_table_source(self, table_source):
@@ -570,7 +571,8 @@
         :param function: The python user-defined function to register.
         :type function: UserDefinedFunctionWrapper
         """
-        self._j_tenv.registerFunction(name, function._judf)
+        self._j_tenv.registerFunction(name, function._judf(self._is_blink_planner,
+                                                           self.get_config()._j_table_config))
 
     def execute(self, job_name):
         """
@@ -712,9 +714,17 @@
             execution_config = self._get_execution_config(temp_file.name, schema)
             gateway = get_gateway()
             j_objs = gateway.jvm.PythonBridgeUtils.readPythonObjects(temp_file.name, True)
-            j_input_format = gateway.jvm.PythonTableUtils.getInputFormat(
+            if self._is_blink_planner:
+                PythonTableUtils = gateway.jvm \
+                    .org.apache.flink.table.planner.utils.python.PythonTableUtils
+                PythonInputFormatTableSource = gateway.jvm \
+                    .org.apache.flink.table.planner.utils.python.PythonInputFormatTableSource
+            else:
+                PythonTableUtils = gateway.jvm.PythonTableUtils
+                PythonInputFormatTableSource = gateway.jvm.PythonInputFormatTableSource
+            j_input_format = PythonTableUtils.getInputFormat(
                 j_objs, row_type_info, execution_config)
-            j_table_source = gateway.jvm.PythonInputFormatTableSource(
+            j_table_source = PythonInputFormatTableSource(
                 j_input_format, row_type_info)
 
             return Table(self._j_tenv.fromTableSource(j_table_source))
@@ -728,9 +738,9 @@
 
 class StreamTableEnvironment(TableEnvironment):
 
-    def __init__(self, j_tenv):
+    def __init__(self, j_tenv, is_blink_planner):
         self._j_tenv = j_tenv
-        super(StreamTableEnvironment, self).__init__(j_tenv)
+        super(StreamTableEnvironment, self).__init__(j_tenv, is_blink_planner)
 
     def _get_execution_config(self, filename, schema):
         return self._j_tenv.execEnv().getConfig()
@@ -832,14 +842,18 @@
         else:
             j_tenv = gateway.jvm.StreamTableEnvironment.create(
                 stream_execution_environment._j_stream_execution_environment)
-        return StreamTableEnvironment(j_tenv)
+        j_planner_class = j_tenv.getPlanner().getClass()
+        j_blink_planner_class = get_java_class(
+            get_gateway().jvm.org.apache.flink.table.planner.delegation.PlannerBase)
+        is_blink_planner = j_blink_planner_class.isAssignableFrom(j_planner_class)
+        return StreamTableEnvironment(j_tenv, is_blink_planner)
 
 
 class BatchTableEnvironment(TableEnvironment):
 
-    def __init__(self, j_tenv):
+    def __init__(self, j_tenv, is_blink_planner):
         self._j_tenv = j_tenv
-        super(BatchTableEnvironment, self).__init__(j_tenv)
+        super(BatchTableEnvironment, self).__init__(j_tenv, is_blink_planner)
 
     def _get_execution_config(self, filename, schema):
         gateway = get_gateway()
@@ -966,7 +980,7 @@
             else:
                 j_tenv = gateway.jvm.BatchTableEnvironment.create(
                     execution_environment._j_execution_environment)
-            return BatchTableEnvironment(j_tenv)
+            return BatchTableEnvironment(j_tenv, False)
         elif environment_settings is not None and \
                 execution_environment is None and \
                 table_config is None:
@@ -975,4 +989,8 @@
                                  "set to batch mode.")
             j_tenv = gateway.jvm.TableEnvironment.create(
                 environment_settings._j_environment_settings)
-            return BatchTableEnvironment(j_tenv)
+            j_planner_class = j_tenv.getPlanner().getClass()
+            j_blink_planner_class = get_java_class(
+                get_gateway().jvm.org.apache.flink.table.planner.delegation.PlannerBase)
+            is_blink_planner = j_blink_planner_class.isAssignableFrom(j_planner_class)
+            return BatchTableEnvironment(j_tenv, is_blink_planner)
diff --git a/flink-python/pyflink/table/tests/test_udf.py b/flink-python/pyflink/table/tests/test_udf.py
index 321cd78..d529681 100644
--- a/flink-python/pyflink/table/tests/test_udf.py
+++ b/flink-python/pyflink/table/tests/test_udf.py
@@ -18,10 +18,11 @@
 from pyflink.table import DataTypes
 from pyflink.table.udf import ScalarFunction, udf
 from pyflink.testing import source_sink_utils
-from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
+from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, \
+    PyFlinkBlinkStreamTableTestCase, PyFlinkBlinkBatchTableTestCase
 
 
-class UserDefinedFunctionTests(PyFlinkStreamTableTestCase):
+class UserDefinedFunctionTests(object):
 
     def test_scalar_function(self):
         # test lambda function
@@ -49,19 +50,19 @@
             udf(functools.partial(partial_func, param=1), DataTypes.BIGINT(), DataTypes.BIGINT()))
 
         table_sink = source_sink_utils.TestAppendSink(
-            ['a', 'b', 'c', 'd', 'e'],
+            ['a', 'b', 'c', 'd', 'e', 'f'],
             [DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(),
-             DataTypes.BIGINT()])
+             DataTypes.BIGINT(), DataTypes.BIGINT()])
         self.t_env.register_table_sink("Results", table_sink)
 
         t = self.t_env.from_elements([(1, 2, 3), (2, 5, 6), (3, 1, 9)], ['a', 'b', 'c'])
         t.where("add_one(b) <= 3") \
             .select("add_one(a), subtract_one(b), add(a, c), add_one_callable(a), "
-                    "add_one_partial(a)") \
+                    "add_one_partial(a), a") \
             .insert_into("Results")
         self.t_env.execute("test")
         actual = source_sink_utils.results()
-        self.assert_equals(actual, ["2,1,4,2,2", "4,0,12,4,4"])
+        self.assert_equals(actual, ["2,1,4,2,2,1", "4,0,12,4,4,3"])
 
     def test_chaining_scalar_function(self):
         self.t_env.register_function(
@@ -327,6 +328,21 @@
         self.assert_equals(actual, ["1,2", "1,2", "1,2"])
 
 
+class PyFlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests,
+                                            PyFlinkStreamTableTestCase):
+    pass
+
+
+class PyFlinkBlinkStreamUserDefinedFunctionTests(UserDefinedFunctionTests,
+                                                 PyFlinkBlinkStreamTableTestCase):
+    pass
+
+
+class PyFlinkBlinkBatchUserDefinedFunctionTests(UserDefinedFunctionTests,
+                                                PyFlinkBlinkBatchTableTestCase):
+    pass
+
+
 @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
 def add(i, j):
     return i + j
diff --git a/flink-python/pyflink/table/udf.py b/flink-python/pyflink/table/udf.py
index 97ac391..ccc7b46 100644
--- a/flink-python/pyflink/table/udf.py
+++ b/flink-python/pyflink/table/udf.py
@@ -149,13 +149,12 @@
         self._deterministic = deterministic if deterministic is not None else (
             func.is_deterministic() if isinstance(func, UserDefinedFunction) else True)
 
-    @property
-    def _judf(self):
+    def _judf(self, is_blink_planner, table_config):
         if self._judf_placeholder is None:
-            self._judf_placeholder = self._create_judf()
+            self._judf_placeholder = self._create_judf(is_blink_planner, table_config)
         return self._judf_placeholder
 
-    def _create_judf(self):
+    def _create_judf(self, is_blink_planner, table_config):
         func = self._func
         if not isinstance(self._func, UserDefinedFunction):
             func = DelegatingScalarFunction(self._func)
@@ -167,13 +166,28 @@
         j_input_types = utils.to_jarray(gateway.jvm.TypeInformation,
                                         [_to_java_type(i) for i in self._input_types])
         j_result_type = _to_java_type(self._result_type)
-        return gateway.jvm.org.apache.flink.table.util.python.PythonTableUtils \
-            .createPythonScalarFunction(self._name,
-                                        bytearray(serialized_func),
-                                        j_input_types,
-                                        j_result_type,
-                                        self._deterministic,
-                                        _get_python_env())
+        if is_blink_planner:
+            PythonTableUtils = gateway.jvm\
+                .org.apache.flink.table.planner.utils.python.PythonTableUtils
+            j_scalar_function = PythonTableUtils \
+                .createPythonScalarFunction(table_config,
+                                            self._name,
+                                            bytearray(serialized_func),
+                                            j_input_types,
+                                            j_result_type,
+                                            self._deterministic,
+                                            _get_python_env())
+        else:
+            PythonTableUtils = gateway.jvm.PythonTableUtils
+            j_scalar_function = PythonTableUtils \
+                .createPythonScalarFunction(self._name,
+                                            bytearray(serialized_func),
+                                            j_input_types,
+                                            j_result_type,
+                                            self._deterministic,
+                                            _get_python_env())
+
+        return j_scalar_function
 
 
 # TODO: support to configure the python execution environment
diff --git a/flink-python/pyflink/testing/test_case_utils.py b/flink-python/pyflink/testing/test_case_utils.py
index 888824a..4d86f18 100644
--- a/flink-python/pyflink/testing/test_case_utils.py
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -31,7 +31,7 @@
 from pyflink.dataset import ExecutionEnvironment
 from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.find_flink_home import _find_flink_home
-from pyflink.table import BatchTableEnvironment, StreamTableEnvironment
+from pyflink.table import BatchTableEnvironment, StreamTableEnvironment, EnvironmentSettings
 from pyflink.java_gateway import get_gateway
 
 if sys.version_info[0] >= 3:
@@ -119,7 +119,7 @@
 
 class PyFlinkStreamTableTestCase(PyFlinkTestCase):
     """
-    Base class for stream unit tests.
+    Base class for stream tests.
     """
 
     def setUp(self):
@@ -131,7 +131,7 @@
 
 class PyFlinkBatchTableTestCase(PyFlinkTestCase):
     """
-    Base class for batch unit tests.
+    Base class for batch tests.
     """
 
     def setUp(self):
@@ -149,6 +149,32 @@
         return string_result
 
 
+class PyFlinkBlinkStreamTableTestCase(PyFlinkTestCase):
+    """
+    Base class for stream tests of blink planner.
+    """
+
+    def setUp(self):
+        super(PyFlinkBlinkStreamTableTestCase, self).setUp()
+        self.env = StreamExecutionEnvironment.get_execution_environment()
+
+        self.t_env = StreamTableEnvironment.create(
+            self.env, environment_settings=EnvironmentSettings.new_instance()
+                .in_streaming_mode().use_blink_planner().build())
+
+
+class PyFlinkBlinkBatchTableTestCase(PyFlinkTestCase):
+    """
+    Base class for batch tests of blink planner.
+    """
+
+    def setUp(self):
+        super(PyFlinkBlinkBatchTableTestCase, self).setUp()
+        self.t_env = BatchTableEnvironment.create(
+            environment_settings=EnvironmentSettings.new_instance()
+            .in_batch_mode().use_blink_planner().build())
+
+
 class PythonAPICompletenessTestCase(object):
     """
     Base class for Python API completeness tests, i.e.,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/PythonFunctionCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/PythonFunctionCodeGenerator.scala
new file mode 100644
index 0000000..68069bd
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/PythonFunctionCodeGenerator.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.codegen
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.functions.python.{PythonEnv, PythonFunction}
+import org.apache.flink.table.functions.{FunctionLanguage, ScalarFunction, UserDefinedFunction}
+import org.apache.flink.table.planner.codegen.CodeGenUtils.{newName, primitiveDefaultValue, primitiveTypeTermForType}
+import org.apache.flink.table.planner.codegen.Indenter.toISC
+import org.apache.flink.table.runtime.generated.GeneratedFunction
+import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter
+
+/**
+  * A code generator for generating Python [[UserDefinedFunction]]s.
+  */
+object PythonFunctionCodeGenerator {
+
+  private val PYTHON_SCALAR_FUNCTION_NAME = "PythonScalarFunction"
+
+  /**
+    * Generates a [[ScalarFunction]] for the specified Python user-defined function.
+    *
+    * @param ctx The context of the code generator
+    * @param name name of the user-defined function
+    * @param serializedScalarFunction serialized Python scalar function
+    * @param inputTypes input data types
+    * @param resultType expected result type
+    * @param deterministic the determinism of the function's results
+    * @param pythonEnv the Python execution environment
+    * @return instance of generated ScalarFunction
+    */
+  def generateScalarFunction(
+      ctx: CodeGeneratorContext,
+      name: String,
+      serializedScalarFunction: Array[Byte],
+      inputTypes: Array[TypeInformation[_]],
+      resultType: TypeInformation[_],
+      deterministic: Boolean,
+      pythonEnv: PythonEnv): ScalarFunction = {
+    val funcName = newName(PYTHON_SCALAR_FUNCTION_NAME)
+    val resultLogicType = TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType(resultType)
+    val resultTypeTerm = primitiveTypeTermForType(resultLogicType)
+    val defaultResultValue = primitiveDefaultValue(resultLogicType)
+    val inputParamCode = inputTypes.zipWithIndex.map { case (inputType, index) =>
+      s"${primitiveTypeTermForType(
+        TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType(inputType))} in$index"
+    }.mkString(", ")
+
+    val typeInfoTypeTerm = classOf[TypeInformation[_]].getCanonicalName
+    val pythonEnvTypeTerm = classOf[PythonEnv].getCanonicalName
+
+    val resultTypeNameTerm =
+      ctx.addReusableObject(resultType, "resultType", typeInfoTypeTerm)
+    val serializedScalarFunctionNameTerm =
+      ctx.addReusableObject(serializedScalarFunction, "serializedScalarFunction", "byte[]")
+    val pythonEnvNameTerm = ctx.addReusableObject(pythonEnv, "pythonEnv", pythonEnvTypeTerm)
+    val inputTypesCode = inputTypes
+      .map(ctx.addReusableObject(_, "inputType", typeInfoTypeTerm))
+      .mkString(", ")
+
+    val funcCode = j"""
+      |public class $funcName extends ${classOf[ScalarFunction].getCanonicalName}
+      |  implements ${classOf[PythonFunction].getCanonicalName} {
+      |
+      |  private static final long serialVersionUID = 1L;
+      |
+      |  ${ctx.reuseMemberCode()}
+      |
+      |  public $funcName(Object[] references) throws Exception {
+      |     ${ctx.reuseInitCode()}
+      |  }
+      |
+      |  public $resultTypeTerm eval($inputParamCode) {
+      |    return $defaultResultValue;
+      |  }
+      |
+      |  @Override
+      |  public $typeInfoTypeTerm[] getParameterTypes(Class<?>[] signature) {
+      |    return new $typeInfoTypeTerm[]{$inputTypesCode};
+      |  }
+      |
+      |  @Override
+      |  public $typeInfoTypeTerm getResultType(Class<?>[] signature) {
+      |    return $resultTypeNameTerm;
+      |  }
+      |
+      |  @Override
+      |  public ${classOf[FunctionLanguage].getCanonicalName} getLanguage() {
+      |    return ${classOf[FunctionLanguage].getCanonicalName}.PYTHON;
+      |  }
+      |
+      |  @Override
+      |  public byte[] getSerializedPythonFunction() {
+      |    return $serializedScalarFunctionNameTerm;
+      |  }
+      |
+      |  @Override
+      |  public $pythonEnvTypeTerm getPythonEnv() {
+      |    return $pythonEnvNameTerm;
+      |  }
+      |
+      |  @Override
+      |  public boolean isDeterministic() {
+      |    return $deterministic;
+      |  }
+      |
+      |  @Override
+      |  public String toString() {
+      |    return "$name";
+      |  }
+      |}
+      |""".stripMargin
+    new GeneratedFunction(funcName, funcCode, ctx.references.toArray)
+      .newInstance(Thread.currentThread().getContextClassLoader)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonCalc.scala
new file mode 100644
index 0000000..4ca9d8f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonCalc.scala
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.common
+
+import org.apache.calcite.plan.RelOptCluster
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.functions.FunctionLanguage
+import org.apache.flink.table.functions.python.{PythonFunction, PythonFunctionInfo, SimplePythonFunction}
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.codegen.{CalcCodeGenerator, CodeGeneratorContext}
+import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction
+import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCalc.PYTHON_SCALAR_FUNCTION_OPERATOR_NAME
+import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
+import org.apache.flink.table.types.logical.RowType
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+trait CommonPythonCalc {
+
+  private lazy val convertLiteralToPython = {
+    val clazz = Class.forName("org.apache.flink.api.common.python.PythonBridgeUtils")
+    clazz.getMethod("convertLiteralToPython", classOf[RexLiteral], classOf[SqlTypeName])
+  }
+
+  private def extractPythonScalarFunctionInfos(
+      rexCalls: Array[RexCall]): (Array[Int], Array[PythonFunctionInfo]) = {
+    // using LinkedHashMap to keep the insert order
+    val inputNodes = new mutable.LinkedHashMap[RexNode, Integer]()
+    val pythonFunctionInfos = rexCalls.map(createPythonScalarFunctionInfo(_, inputNodes))
+
+    val udfInputOffsets = inputNodes.toArray
+      .map(_._1)
+      .filter(_.isInstanceOf[RexInputRef])
+      .map(_.asInstanceOf[RexInputRef].getIndex)
+    (udfInputOffsets, pythonFunctionInfos)
+  }
+
+  private def createPythonScalarFunctionInfo(
+      rexCall: RexCall,
+      inputNodes: mutable.Map[RexNode, Integer]): PythonFunctionInfo = rexCall.getOperator match {
+    case sfc: ScalarSqlFunction if sfc.scalarFunction.getLanguage == FunctionLanguage.PYTHON =>
+      val inputs = new mutable.ArrayBuffer[AnyRef]()
+      rexCall.getOperands.foreach {
+        case pythonRexCall: RexCall if pythonRexCall.getOperator.asInstanceOf[ScalarSqlFunction]
+          .scalarFunction.getLanguage == FunctionLanguage.PYTHON =>
+          // Continuous Python UDFs can be chained together
+          val argPythonInfo = createPythonScalarFunctionInfo(pythonRexCall, inputNodes)
+          inputs.append(argPythonInfo)
+
+        case literal: RexLiteral =>
+          inputs.append(
+            convertLiteralToPython.invoke(null, literal, literal.getType.getSqlTypeName))
+
+        case argNode: RexNode =>
+          // For input arguments of RexInputRef, it's replaced with an offset into the input row
+          inputNodes.get(argNode) match {
+            case Some(existing) => inputs.append(existing)
+            case None =>
+              val inputOffset = Integer.valueOf(inputNodes.size)
+              inputs.append(inputOffset)
+              inputNodes.put(argNode, inputOffset)
+          }
+      }
+
+      // Extracts the necessary information for Python function execution, such as
+      // the serialized Python function, the Python env, etc
+      val pythonFunction = new SimplePythonFunction(
+        sfc.scalarFunction.asInstanceOf[PythonFunction].getSerializedPythonFunction,
+        sfc.scalarFunction.asInstanceOf[PythonFunction].getPythonEnv)
+      new PythonFunctionInfo(pythonFunction, inputs.toArray)
+  }
+
+  private def getPythonScalarFunctionOperator(
+      inputRowTypeInfo: BaseRowTypeInfo,
+      outputRowTypeInfo: BaseRowTypeInfo,
+      udfInputOffsets: Array[Int],
+      pythonFunctionInfos: Array[PythonFunctionInfo],
+      forwardedFields: Array[Int])= {
+    val clazz = Class.forName(PYTHON_SCALAR_FUNCTION_OPERATOR_NAME)
+    val ctor = clazz.getConstructor(
+      classOf[Array[PythonFunctionInfo]],
+      classOf[RowType],
+      classOf[RowType],
+      classOf[Array[Int]],
+      classOf[Array[Int]])
+    ctor.newInstance(
+      pythonFunctionInfos,
+      inputRowTypeInfo.toRowType,
+      outputRowTypeInfo.toRowType,
+      udfInputOffsets,
+      forwardedFields)
+      .asInstanceOf[OneInputStreamOperator[BaseRow, BaseRow]]
+  }
+
+  private def createPythonOneInputTransformation(
+      inputTransform: Transformation[BaseRow],
+      calcProgram: RexProgram,
+      name: String) = {
+    val pythonRexCalls = calcProgram.getProjectList
+      .map(calcProgram.expandLocalRef)
+      .filter(_.isInstanceOf[RexCall])
+      .map(_.asInstanceOf[RexCall])
+      .toArray
+
+    val forwardedFields: Array[Int] = calcProgram.getProjectList
+      .map(calcProgram.expandLocalRef)
+      .filter(_.isInstanceOf[RexInputRef])
+      .map(_.asInstanceOf[RexInputRef].getIndex)
+      .toArray
+
+    val resultProjectList = {
+      var idx = 0
+      calcProgram.getProjectList
+        .map(calcProgram.expandLocalRef)
+        .map {
+          case pythonCall: RexCall =>
+            val inputRef = new RexInputRef(forwardedFields.length + idx, pythonCall.getType)
+            idx += 1
+            inputRef
+          case node => node
+        }
+    }
+
+    val (pythonUdfInputOffsets, pythonFunctionInfos) =
+      extractPythonScalarFunctionInfos(pythonRexCalls)
+
+    val inputLogicalTypes =
+      inputTransform.getOutputType.asInstanceOf[BaseRowTypeInfo].getLogicalTypes
+    val pythonOperatorInputTypeInfo = inputTransform.getOutputType.asInstanceOf[BaseRowTypeInfo]
+    val pythonOperatorResultTyeInfo = new BaseRowTypeInfo(
+      forwardedFields.map(inputLogicalTypes(_)) ++
+        pythonRexCalls.map(node => FlinkTypeFactory.toLogicalType(node.getType)): _*)
+
+    val pythonOperator = getPythonScalarFunctionOperator(
+      pythonOperatorInputTypeInfo,
+      pythonOperatorResultTyeInfo,
+      pythonUdfInputOffsets,
+      pythonFunctionInfos,
+      forwardedFields)
+
+    val pythonInputTransform = new OneInputTransformation(
+      inputTransform,
+      name,
+      pythonOperator,
+      pythonOperatorResultTyeInfo,
+      inputTransform.getParallelism
+    )
+    (pythonInputTransform, pythonOperatorResultTyeInfo, resultProjectList)
+  }
+
+  private def createProjectionRexProgram(
+      inputRowType: RowType,
+      outputRelData: RelDataType,
+      projectList: mutable.Buffer[RexNode],
+      cluster: RelOptCluster) = {
+    val factory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+    val inputRelData = factory.createFieldTypeFromLogicalType(inputRowType)
+    RexProgram.create(inputRelData, projectList, null, outputRelData, cluster.getRexBuilder)
+  }
+
+  protected def createOneInputTransformation(
+      inputTransform: Transformation[BaseRow],
+      inputsContainSingleton: Boolean,
+      calcProgram: RexProgram,
+      name: String,
+      config : TableConfig,
+      ctx : CodeGeneratorContext,
+      cluster: RelOptCluster,
+      rowType: RelDataType,
+      opName: String): OneInputTransformation[BaseRow, BaseRow] = {
+    val (pythonInputTransform, pythonOperatorResultTyeInfo, resultProjectList) =
+      createPythonOneInputTransformation(inputTransform, calcProgram, name)
+
+    if (inputsContainSingleton) {
+      pythonInputTransform.setParallelism(1)
+      pythonInputTransform.setMaxParallelism(1)
+    }
+
+    val onlyFilter = resultProjectList.zipWithIndex.forall { case (rexNode, index) =>
+      rexNode.isInstanceOf[RexInputRef] && rexNode.asInstanceOf[RexInputRef].getIndex == index
+    }
+
+    if (onlyFilter) {
+      pythonInputTransform
+    } else {
+      // After executing python OneInputTransformation, the order of the output fields
+      // is Python Call after the forwarding fields, so in the case of sequential changes,
+      // a calc is needed to adjust the order.
+      val outputType = FlinkTypeFactory.toLogicalRowType(rowType)
+      val rexProgram = createProjectionRexProgram(
+        pythonOperatorResultTyeInfo.toRowType, rowType, resultProjectList, cluster)
+      val substituteOperator = CalcCodeGenerator.generateCalcOperator(
+        ctx,
+        cluster,
+        pythonInputTransform,
+        outputType,
+        config,
+        rexProgram,
+        None,
+        retainHeader = true,
+        opName
+      )
+
+      new OneInputTransformation(
+        pythonInputTransform,
+        name,
+        substituteOperator,
+        BaseRowTypeInfo.of(outputType),
+        pythonInputTransform.getParallelism)
+    }
+  }
+}
+
+object CommonPythonCalc {
+  val PYTHON_SCALAR_FUNCTION_OPERATOR_NAME =
+    "org.apache.flink.table.runtime.operators.python.BaseRowPythonScalarFunctionOperator"
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala
index aefcd2f..7684955 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala
@@ -19,28 +19,18 @@
 package org.apache.flink.table.planner.plan.nodes.physical.batch
 
 import org.apache.flink.api.dag.Transformation
-import org.apache.flink.runtime.operators.DamBehavior
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.codegen.{CalcCodeGenerator, CodeGeneratorContext}
 import org.apache.flink.table.planner.delegation.BatchPlanner
-import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef, TraitUtil}
-import org.apache.flink.table.planner.plan.nodes.common.CommonCalc
-import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
 import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
 
 import org.apache.calcite.plan._
 import org.apache.calcite.rel._
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.Calc
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexProgram}
-import org.apache.calcite.sql.SqlKind
-import org.apache.calcite.util.mapping.{Mapping, MappingType, Mappings}
-
-import java.util
-
-import scala.collection.JavaConversions._
+import org.apache.calcite.rex.RexProgram
 
 /**
   * Batch physical RelNode for [[Calc]].
@@ -51,87 +41,12 @@
     inputRel: RelNode,
     calcProgram: RexProgram,
     outputRowType: RelDataType)
-  extends CommonCalc(cluster, traitSet, inputRel, calcProgram)
-  with BatchPhysicalRel
-  with BatchExecNode[BaseRow] {
-
-  override def deriveRowType(): RelDataType = outputRowType
+  extends BatchExecCalcBase(cluster, traitSet, inputRel, calcProgram, outputRowType) {
 
   override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = {
     new BatchExecCalc(cluster, traitSet, child, program, outputRowType)
   }
 
-  override def satisfyTraits(requiredTraitSet: RelTraitSet): Option[RelNode] = {
-    val requiredDistribution = requiredTraitSet.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
-    // Does not push broadcast distribution trait down into Calc.
-    if (requiredDistribution.getType == RelDistribution.Type.BROADCAST_DISTRIBUTED) {
-      return None
-    }
-    val projects = calcProgram.getProjectList.map(calcProgram.expandLocalRef)
-
-    def getProjectMapping: Mapping = {
-      val mapping = Mappings.create(MappingType.INVERSE_FUNCTION,
-        getInput.getRowType.getFieldCount, projects.size)
-      projects.zipWithIndex.foreach {
-        case (project, index) =>
-          project match {
-            case inputRef: RexInputRef => mapping.set(inputRef.getIndex, index)
-            case call: RexCall if call.getKind == SqlKind.AS =>
-              call.getOperands.head match {
-                case inputRef: RexInputRef => mapping.set(inputRef.getIndex, index)
-                case _ => // ignore
-              }
-            case _ => // ignore
-          }
-      }
-      mapping.inverse()
-    }
-
-    val mapping = getProjectMapping
-    val appliedDistribution = requiredDistribution.apply(mapping)
-    // If both distribution and collation can be satisfied, satisfy both. If only distribution
-    // can be satisfied, only satisfy distribution. There is no possibility to only satisfy
-    // collation here except for there is no distribution requirement.
-    if ((!requiredDistribution.isTop) && (appliedDistribution eq FlinkRelDistribution.ANY)) {
-      return None
-    }
-
-    val requiredCollation = requiredTraitSet.getTrait(RelCollationTraitDef.INSTANCE)
-    val appliedCollation = TraitUtil.apply(requiredCollation, mapping)
-    val canCollationPushedDown = !appliedCollation.getFieldCollations.isEmpty
-    // If required traits only contains collation requirements, but collation keys are not columns
-    // from input, then no need to satisfy required traits.
-    if ((appliedDistribution eq FlinkRelDistribution.ANY) && !canCollationPushedDown) {
-      return None
-    }
-
-    var inputRequiredTraits = getInput.getTraitSet
-    var providedTraits = getTraitSet
-    if (!appliedDistribution.isTop) {
-      inputRequiredTraits = inputRequiredTraits.replace(appliedDistribution)
-      providedTraits = providedTraits.replace(requiredDistribution)
-    }
-    if (canCollationPushedDown) {
-      inputRequiredTraits = inputRequiredTraits.replace(appliedCollation)
-      providedTraits = providedTraits.replace(requiredCollation)
-    }
-    val newInput = RelOptRule.convert(getInput, inputRequiredTraits)
-    Some(copy(providedTraits, Seq(newInput)))
-  }
-
-  //~ ExecNode methods -----------------------------------------------------------
-
-  override def getDamBehavior = DamBehavior.PIPELINED
-
-  override def getInputNodes: util.List[ExecNode[BatchPlanner, _]] =
-    List(getInput.asInstanceOf[ExecNode[BatchPlanner, _]])
-
-  override def replaceInputNode(
-      ordinalInParent: Int,
-      newInputNode: ExecNode[BatchPlanner, _]): Unit = {
-    replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
-  }
-
   override protected def translateToPlanInternal(
       planner: BatchPlanner): Transformation[BaseRow] = {
     val config = planner.getTableConfig
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalcBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalcBase.scala
new file mode 100644
index 0000000..47f3fa7
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalcBase.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.batch
+
+import java.util
+
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.planner.plan.nodes.common.CommonCalc
+import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
+import org.apache.calcite.plan._
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Calc
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexProgram}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.util.mapping.{Mapping, MappingType, Mappings}
+import org.apache.flink.runtime.operators.DamBehavior
+import org.apache.flink.table.planner.delegation.BatchPlanner
+import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef, TraitUtil}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Base batch physical RelNode for [[Calc]].
+  */
+abstract class BatchExecCalcBase(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputRel: RelNode,
+    calcProgram: RexProgram,
+    outputRowType: RelDataType)
+  extends CommonCalc(cluster, traitSet, inputRel, calcProgram)
+  with BatchPhysicalRel
+  with BatchExecNode[BaseRow] {
+
+  override def deriveRowType(): RelDataType = outputRowType
+
+  override def satisfyTraits(requiredTraitSet: RelTraitSet): Option[RelNode] = {
+    val requiredDistribution = requiredTraitSet.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
+    // Does not push broadcast distribution trait down into Calc.
+    if (requiredDistribution.getType == RelDistribution.Type.BROADCAST_DISTRIBUTED) {
+      return None
+    }
+    val projects = calcProgram.getProjectList.map(calcProgram.expandLocalRef)
+
+    def getProjectMapping: Mapping = {
+      val mapping = Mappings.create(MappingType.INVERSE_FUNCTION,
+        getInput.getRowType.getFieldCount, projects.size)
+      projects.zipWithIndex.foreach {
+        case (project, index) =>
+          project match {
+            case inputRef: RexInputRef => mapping.set(inputRef.getIndex, index)
+            case call: RexCall if call.getKind == SqlKind.AS =>
+              call.getOperands.head match {
+                case inputRef: RexInputRef => mapping.set(inputRef.getIndex, index)
+                case _ => // ignore
+              }
+            case _ => // ignore
+          }
+      }
+      mapping.inverse()
+    }
+
+    val mapping = getProjectMapping
+    val appliedDistribution = requiredDistribution.apply(mapping)
+    // If both distribution and collation can be satisfied, satisfy both. If only distribution
+    // can be satisfied, only satisfy distribution. There is no possibility to only satisfy
+    // collation here except for there is no distribution requirement.
+    if ((!requiredDistribution.isTop) && (appliedDistribution eq FlinkRelDistribution.ANY)) {
+      return None
+    }
+
+    val requiredCollation = requiredTraitSet.getTrait(RelCollationTraitDef.INSTANCE)
+    val appliedCollation = TraitUtil.apply(requiredCollation, mapping)
+    val canCollationPushedDown = !appliedCollation.getFieldCollations.isEmpty
+    // If required traits only contains collation requirements, but collation keys are not columns
+    // from input, then no need to satisfy required traits.
+    if ((appliedDistribution eq FlinkRelDistribution.ANY) && !canCollationPushedDown) {
+      return None
+    }
+
+    var inputRequiredTraits = getInput.getTraitSet
+    var providedTraits = getTraitSet
+    if (!appliedDistribution.isTop) {
+      inputRequiredTraits = inputRequiredTraits.replace(appliedDistribution)
+      providedTraits = providedTraits.replace(requiredDistribution)
+    }
+    if (canCollationPushedDown) {
+      inputRequiredTraits = inputRequiredTraits.replace(appliedCollation)
+      providedTraits = providedTraits.replace(requiredCollation)
+    }
+    val newInput = RelOptRule.convert(getInput, inputRequiredTraits)
+    Some(copy(providedTraits, Seq(newInput)))
+  }
+
+  //~ ExecNode methods -----------------------------------------------------------
+
+  override def getDamBehavior = DamBehavior.PIPELINED
+
+  override def getInputNodes: util.List[ExecNode[BatchPlanner, _]] =
+    List(getInput.asInstanceOf[ExecNode[BatchPlanner, _]])
+
+  override def replaceInputNode(
+      ordinalInParent: Int,
+      newInputNode: ExecNode[BatchPlanner, _]): Unit = {
+    replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCalc.scala
new file mode 100644
index 0000000..7adf0ee
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCalc.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.batch
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Calc
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext
+import org.apache.flink.table.planner.delegation.BatchPlanner
+import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCalc
+
+/**
+  * Batch physical RelNode for Python ScalarFunctions.
+  */
+class BatchExecPythonCalc(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputRel: RelNode,
+    calcProgram: RexProgram,
+    outputRowType: RelDataType)
+  extends BatchExecCalcBase(
+    cluster,
+    traitSet,
+    inputRel,
+    calcProgram,
+    outputRowType)
+  with CommonPythonCalc {
+
+  override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = {
+    new BatchExecPythonCalc(cluster, traitSet, child, program, outputRowType)
+  }
+
+  override protected def translateToPlanInternal(planner: BatchPlanner): Transformation[BaseRow] = {
+    val inputTransform = getInputNodes.get(0).translateToPlan(planner)
+      .asInstanceOf[Transformation[BaseRow]]
+    val config = planner.getTableConfig
+    val ctx = CodeGeneratorContext(config)
+    createOneInputTransformation(
+      inputTransform,
+      inputsContainSingleton = false,
+      calcProgram,
+      getRelDetailedDescription,
+      config,
+      ctx,
+      cluster,
+      getRowType,
+    "BatchExecCalc")
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala
index dc98107..11e534d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala
@@ -24,9 +24,6 @@
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.codegen.{CalcCodeGenerator, CodeGeneratorContext}
 import org.apache.flink.table.planner.delegation.StreamPlanner
-import org.apache.flink.table.planner.plan.nodes.common.CommonCalc
-import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode}
-import org.apache.flink.table.planner.plan.utils.RelExplainUtil
 import org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
 import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
 
@@ -36,10 +33,6 @@
 import org.apache.calcite.rel.core.Calc
 import org.apache.calcite.rex.RexProgram
 
-import java.util
-
-import scala.collection.JavaConversions._
-
 /**
   * Stream physical RelNode for [[Calc]].
   */
@@ -49,37 +42,12 @@
     inputRel: RelNode,
     calcProgram: RexProgram,
     outputRowType: RelDataType)
-  extends CommonCalc(cluster, traitSet, inputRel, calcProgram)
-  with StreamPhysicalRel
-  with StreamExecNode[BaseRow] {
-
-  override def producesUpdates: Boolean = false
-
-  override def needsUpdatesAsRetraction(input: RelNode): Boolean = false
-
-  override def consumesRetractions: Boolean = false
-
-  override def producesRetractions: Boolean = false
-
-  override def requireWatermark: Boolean = false
-
-  override def deriveRowType(): RelDataType = outputRowType
+  extends StreamExecCalcBase(cluster, traitSet, inputRel, calcProgram, outputRowType) {
 
   override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = {
     new StreamExecCalc(cluster, traitSet, child, program, outputRowType)
   }
 
-  //~ ExecNode methods -----------------------------------------------------------
-
-  override def getInputNodes: util.List[ExecNode[StreamPlanner, _]] =
-    List(getInput.asInstanceOf[ExecNode[StreamPlanner, _]])
-
-  override def replaceInputNode(
-      ordinalInParent: Int,
-      newInputNode: ExecNode[StreamPlanner, _]): Unit = {
-    replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
-  }
-
   override protected def translateToPlanInternal(
       planner: StreamPlanner): Transformation[BaseRow] = {
     val config = planner.getTableConfig
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalcBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalcBase.scala
new file mode 100644
index 0000000..662d32d
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalcBase.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Calc
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.planner.delegation.StreamPlanner
+import org.apache.flink.table.planner.plan.nodes.common.CommonCalc
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Base stream physical RelNode for [[Calc]].
+  */
+abstract class StreamExecCalcBase(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputRel: RelNode,
+    calcProgram: RexProgram,
+    outputRowType: RelDataType)
+  extends CommonCalc(cluster, traitSet, inputRel, calcProgram)
+  with StreamPhysicalRel
+  with StreamExecNode[BaseRow] {
+
+  override def producesUpdates: Boolean = false
+
+  override def needsUpdatesAsRetraction(input: RelNode): Boolean = false
+
+  override def consumesRetractions: Boolean = false
+
+  override def producesRetractions: Boolean = false
+
+  override def requireWatermark: Boolean = false
+
+  override def deriveRowType(): RelDataType = outputRowType
+
+  //~ ExecNode methods -----------------------------------------------------------
+
+  override def getInputNodes: util.List[ExecNode[StreamPlanner, _]] =
+    List(getInput.asInstanceOf[ExecNode[StreamPlanner, _]])
+
+  override def replaceInputNode(
+      ordinalInParent: Int,
+      newInputNode: ExecNode[StreamPlanner, _]): Unit = {
+    replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
+  }
+
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCalc.scala
new file mode 100644
index 0000000..0bebc5e
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCalc.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Calc
+import org.apache.calcite.rex.RexProgram
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext
+import org.apache.flink.table.planner.delegation.StreamPlanner
+import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCalc
+import org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
+
+/**
+  * Stream physical RelNode for Python ScalarFunctions.
+  */
+class StreamExecPythonCalc(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputRel: RelNode,
+    calcProgram: RexProgram,
+    outputRowType: RelDataType)
+  extends StreamExecCalcBase(
+    cluster,
+    traitSet,
+    inputRel,
+    calcProgram,
+    outputRowType)
+  with CommonPythonCalc {
+
+  override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = {
+    new StreamExecPythonCalc(cluster, traitSet, child, program, outputRowType)
+  }
+
+  override protected def translateToPlanInternal(
+      planner: StreamPlanner): Transformation[BaseRow] = {
+    val inputTransform = getInputNodes.get(0).translateToPlan(planner)
+      .asInstanceOf[Transformation[BaseRow]]
+    val config = planner.getTableConfig
+    val ctx = CodeGeneratorContext(config).setOperatorBaseClass(
+      classOf[AbstractProcessStreamOperator[BaseRow]])
+    createOneInputTransformation(
+      inputTransform,
+      inputsContainSingleton(),
+      calcProgram,
+      getRelDetailedDescription,
+      config,
+      ctx,
+      cluster,
+      getRowType,
+      "StreamExecCalc")
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index 10aaef4..9dd7ef4 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -352,7 +352,9 @@
     // transpose calc past snapshot
     CalcSnapshotTransposeRule.INSTANCE,
     // merge calc after calc transpose
-    FlinkCalcMergeRule.INSTANCE
+    FlinkCalcMergeRule.INSTANCE,
+    // Rule that splits python ScalarFunctions from java/scala ScalarFunctions
+    PythonScalarFunctionSplitRule.INSTANCE
   )
 
   /**
@@ -367,6 +369,7 @@
     BatchExecValuesRule.INSTANCE,
     // calc
     BatchExecCalcRule.INSTANCE,
+    BatchExecPythonCalcRule.INSTANCE,
     // union
     BatchExecUnionRule.INSTANCE,
     // sort
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index 6268003..e443183 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -337,7 +337,9 @@
     // transpose calc past snapshot
     CalcSnapshotTransposeRule.INSTANCE,
     // merge calc after calc transpose
-    FlinkCalcMergeRule.INSTANCE
+    FlinkCalcMergeRule.INSTANCE,
+    // Rule that splits python ScalarFunctions from java/scala ScalarFunctions.
+    PythonScalarFunctionSplitRule.INSTANCE
   )
 
   /**
@@ -353,6 +355,7 @@
     StreamExecValuesRule.INSTANCE,
     // calc
     StreamExecCalcRule.INSTANCE,
+    StreamExecPythonCalcRule.INSTANCE,
     // union
     StreamExecUnionRule.INSTANCE,
     // sort
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PythonScalarFunctionSplitRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PythonScalarFunctionSplitRule.scala
new file mode 100644
index 0000000..fa219ca
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PythonScalarFunctionSplitRule.scala
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexProgram}
+import org.apache.calcite.sql.validate.SqlValidatorUtil
+import org.apache.flink.table.functions.{FunctionLanguage, ScalarFunction}
+import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc
+import org.apache.flink.table.planner.plan.utils.PythonUtil.containsFunctionOf
+import org.apache.flink.table.planner.plan.utils.{InputRefVisitor, RexDefaultVisitor}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+  * Rule that splits [[FlinkLogicalCalc]] into multiple [[FlinkLogicalCalc]]s. After this rule
+  * is applied, each [[FlinkLogicalCalc]] will only contain Python [[ScalarFunction]]s or Java
+  * [[ScalarFunction]]s. This is to ensure that the Python [[ScalarFunction]]s which could be
+  * executed in a batch are grouped into the same [[FlinkLogicalCalc]] node.
+  */
+class PythonScalarFunctionSplitRule extends RelOptRule(
+  operand(classOf[FlinkLogicalCalc], any),
+  "PythonScalarFunctionSplitRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+    val program = calc.getProgram
+
+    // This rule matches if one of the following cases is met:
+    // 1. There are Python functions and Java functions mixed in the Calc
+    // 2. There are Python functions in the condition of the Calc
+    (program.getExprList.exists(containsFunctionOf(_, FunctionLanguage.PYTHON)) &&
+      program.getExprList.exists(containsFunctionOf(_, FunctionLanguage.JVM))) ||
+      Option(program.getCondition)
+        .map(program.expandLocalRef)
+        .exists(containsFunctionOf(_, FunctionLanguage.PYTHON))
+  }
+
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+    val input = calc.getInput
+    val rexBuilder = call.builder().getRexBuilder
+    val program = calc.getProgram
+    val extractedRexCalls = new mutable.ArrayBuffer[RexCall]()
+
+    val convertPythonFunction =
+      program.getProjectList
+        .map(program.expandLocalRef)
+        .exists(containsFunctionOf(_, FunctionLanguage.JVM, recursive = false)) ||
+        Option(program.getCondition)
+          .map(program.expandLocalRef)
+          .exists(expr =>
+            containsFunctionOf(expr, FunctionLanguage.JVM, recursive = false) ||
+              containsFunctionOf(expr, FunctionLanguage.PYTHON))
+
+    val extractedFunctionOffset = input.getRowType.getFieldCount
+    val splitter = new ScalarFunctionSplitter(
+      extractedFunctionOffset,
+      extractedRexCalls,
+      convertPythonFunction)
+
+    val newProjects = program.getProjectList.map(program.expandLocalRef(_).accept(splitter))
+    val newCondition = Option(program.getCondition).map(program.expandLocalRef(_).accept(splitter))
+    val accessedFields = extractRefInputFields(newProjects, newCondition, extractedFunctionOffset)
+
+    val bottomCalcProjects =
+      accessedFields.map(RexInputRef.of(_, input.getRowType)) ++ extractedRexCalls
+    val bottomCalcFieldNames = SqlValidatorUtil.uniquify(
+      accessedFields.map(i => input.getRowType.getFieldNames.get(i)).toSeq ++
+        extractedRexCalls.indices.map("f" + _),
+      rexBuilder.getTypeFactory.getTypeSystem.isSchemaCaseSensitive)
+
+    val bottomCalc = new FlinkLogicalCalc(
+      calc.getCluster,
+      calc.getTraitSet,
+      input,
+      RexProgram.create(
+        input.getRowType,
+        bottomCalcProjects.toList,
+        null,
+        bottomCalcFieldNames,
+        rexBuilder))
+
+    val inputRewriter = new ExtractedFunctionInputRewriter(extractedFunctionOffset, accessedFields)
+    val topCalc = new FlinkLogicalCalc(
+      calc.getCluster,
+      calc.getTraitSet,
+      bottomCalc,
+      RexProgram.create(
+        bottomCalc.getRowType,
+        newProjects.map(_.accept(inputRewriter)),
+        newCondition.map(_.accept(inputRewriter)).orNull,
+        calc.getRowType,
+        rexBuilder))
+
+    call.transformTo(topCalc)
+  }
+
+  /**
+    * Extracts the indices of the input fields referred by the specified projects and condition.
+    */
+  private def extractRefInputFields(
+      projects: Seq[RexNode],
+      condition: Option[RexNode],
+      inputFieldsCount: Int): Array[Int] = {
+    val visitor = new InputRefVisitor
+
+    // extract referenced input fields from projections
+    projects.foreach(exp => exp.accept(visitor))
+
+    // extract referenced input fields from condition
+    condition.foreach(_.accept(visitor))
+
+    // fields of indexes greater than inputFieldsCount is the extracted functions and
+    // should be filtered as they are not from the original input
+    visitor.getFields.filter(_ < inputFieldsCount)
+  }
+}
+
+private class ScalarFunctionSplitter(
+    extractedFunctionOffset: Int,
+    extractedRexCalls: mutable.ArrayBuffer[RexCall],
+    convertPythonFunction: Boolean)
+  extends RexDefaultVisitor[RexNode] {
+
+  override def visitCall(call: RexCall): RexNode = {
+    call.getOperator match {
+      case sfc: ScalarSqlFunction if sfc.scalarFunction.getLanguage ==
+        FunctionLanguage.PYTHON =>
+        visit(convertPythonFunction, call)
+
+      case _ =>
+        visit(!convertPythonFunction, call)
+    }
+  }
+
+  override def visitNode(rexNode: RexNode): RexNode = rexNode
+
+  private def visit(needConvert: Boolean, call: RexCall): RexNode = {
+    if (needConvert) {
+      val newNode = new RexInputRef(
+        extractedFunctionOffset + extractedRexCalls.length, call.getType)
+      extractedRexCalls.append(call)
+      newNode
+    } else {
+      call.clone(call.getType, call.getOperands.asScala.map(_.accept(this)))
+    }
+  }
+}
+
+/**
+  * Rewrite field accesses of a RexNode as not all the fields from the original input are forwarded:
+  * 1) Fields of index greater than or equal to extractedFunctionOffset refer to the
+  * extracted function.
+  * 2) Fields of index less than extractedFunctionOffset refer to the original input field.
+  *
+  * @param extractedFunctionOffset the original start offset of the extracted functions
+  * @param accessedFields          the accessed fields which will be forwarded
+  */
+private class ExtractedFunctionInputRewriter(
+    extractedFunctionOffset: Int,
+    accessedFields: Array[Int])
+  extends RexDefaultVisitor[RexNode] {
+
+  /** old input fields ref index -> new input fields ref index mappings */
+  private val fieldMap: Map[Int, Int] = accessedFields.zipWithIndex.toMap
+
+  override def visitInputRef(inputRef: RexInputRef): RexNode = {
+    if (inputRef.getIndex >= extractedFunctionOffset) {
+      new RexInputRef(
+        inputRef.getIndex - extractedFunctionOffset + accessedFields.length,
+        inputRef.getType)
+    } else {
+      new RexInputRef(
+        fieldMap.getOrElse(inputRef.getIndex,
+          throw new IllegalArgumentException("input field contains invalid index")),
+        inputRef.getType)
+    }
+  }
+
+  override def visitCall(call: RexCall): RexNode = {
+    call.clone(call.getType, call.getOperands.asScala.map(_.accept(this)))
+  }
+
+  override def visitNode(rexNode: RexNode): RexNode = rexNode
+}
+
+object PythonScalarFunctionSplitRule {
+  val INSTANCE: RelOptRule = new PythonScalarFunctionSplitRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecCalcRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecCalcRule.scala
index 7ff972b..c6c2e93 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecCalcRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecCalcRule.scala
@@ -21,10 +21,13 @@
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc
 import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecCalc
-
-import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.functions.FunctionLanguage
+import org.apache.flink.table.planner.plan.utils.PythonUtil.containsFunctionOf
+
+import scala.collection.JavaConverters._
 
 /**
   * Rule that converts [[FlinkLogicalCalc]] to [[BatchExecCalc]].
@@ -36,6 +39,12 @@
     FlinkConventions.BATCH_PHYSICAL,
     "BatchExecCalcRule") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+    val program = calc.getProgram
+    !program.getExprList.asScala.exists(containsFunctionOf(_, FunctionLanguage.PYTHON))
+  }
+
   def convert(rel: RelNode): RelNode = {
     val calc = rel.asInstanceOf[FlinkLogicalCalc]
     val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonCalcRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonCalcRule.scala
new file mode 100644
index 0000000..1a16626
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonCalcRule.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.functions.FunctionLanguage
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecPythonCalc
+import org.apache.flink.table.planner.plan.utils.PythonUtil.containsFunctionOf
+
+import scala.collection.JavaConverters._
+
+/**
+  * Rule that converts [[FlinkLogicalCalc]] to [[BatchExecPythonCalc]].
+  */
+class BatchExecPythonCalcRule
+  extends ConverterRule(
+    classOf[FlinkLogicalCalc],
+    FlinkConventions.LOGICAL,
+    FlinkConventions.BATCH_PHYSICAL,
+    "BatchExecPythonCalcRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+    val program = calc.getProgram
+    program.getExprList.asScala.exists(containsFunctionOf(_, FunctionLanguage.PYTHON))
+  }
+
+  def convert(rel: RelNode): RelNode = {
+    val calc = rel.asInstanceOf[FlinkLogicalCalc]
+    val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
+    val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.BATCH_PHYSICAL)
+
+    new BatchExecPythonCalc(
+      rel.getCluster,
+      newTrait,
+      newInput,
+      calc.getProgram,
+      rel.getRowType)
+  }
+}
+
+object BatchExecPythonCalcRule {
+  val INSTANCE: RelOptRule = new BatchExecPythonCalcRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecCalcRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecCalcRule.scala
index fafc799..1626e2c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecCalcRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecCalcRule.scala
@@ -21,10 +21,13 @@
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc
 import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc
-
-import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
+import org.apache.flink.table.planner.plan.utils.PythonUtil.containsFunctionOf
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.functions.FunctionLanguage
+
+import scala.collection.JavaConverters._
 
 /**
   * Rule that converts [[FlinkLogicalCalc]] to [[StreamExecCalc]].
@@ -36,6 +39,12 @@
     FlinkConventions.STREAM_PHYSICAL,
     "StreamExecCalcRule") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+    val program = calc.getProgram
+    !program.getExprList.asScala.exists(containsFunctionOf(_, FunctionLanguage.PYTHON))
+  }
+
   def convert(rel: RelNode): RelNode = {
     val calc: FlinkLogicalCalc = rel.asInstanceOf[FlinkLogicalCalc]
     val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonCalcRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonCalcRule.scala
new file mode 100644
index 0000000..858f4d2
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonCalcRule.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.functions.FunctionLanguage
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc
+import org.apache.flink.table.planner.plan.utils.PythonUtil.containsFunctionOf
+
+import scala.collection.JavaConverters._
+
+/**
+  * Rule that converts [[FlinkLogicalCalc]] to [[StreamExecPythonCalc]].
+  */
+class StreamExecPythonCalcRule
+  extends ConverterRule(
+    classOf[FlinkLogicalCalc],
+    FlinkConventions.LOGICAL,
+    FlinkConventions.STREAM_PHYSICAL,
+    "StreamExecPythonCalcRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
+    val program = calc.getProgram
+    program.getExprList.asScala.exists(containsFunctionOf(_, FunctionLanguage.PYTHON))
+  }
+
+  def convert(rel: RelNode): RelNode = {
+    val calc: FlinkLogicalCalc = rel.asInstanceOf[FlinkLogicalCalc]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
+    val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.STREAM_PHYSICAL)
+
+    new StreamExecPythonCalc(
+      rel.getCluster,
+      traitSet,
+      newInput,
+      calc.getProgram,
+      rel.getRowType)
+  }
+}
+
+object StreamExecPythonCalcRule {
+  val INSTANCE: RelOptRule = new StreamExecPythonCalcRule
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala
new file mode 100644
index 0000000..3601854
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/utils/python/PythonTableUtils.scala
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils.python
+
+import java.nio.charset.StandardCharsets
+import java.sql.{Date, Time, Timestamp}
+import java.time.{LocalDate, LocalDateTime, LocalTime}
+import java.util.TimeZone
+import java.util.function.BiConsumer
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.io.InputFormat
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.java.io.CollectionInputFormat
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo}
+import org.apache.flink.core.io.InputSplit
+import org.apache.flink.table.api.{TableConfig, TableSchema, Types}
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.table.functions.python.PythonEnv
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, PythonFunctionCodeGenerator}
+import org.apache.flink.table.sources.InputFormatTableSource
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+object PythonTableUtils {
+
+  /**
+    * Creates a [[ScalarFunction]] for the specified Python ScalarFunction.
+    *
+    * @param funcName class name of the user-defined function. Must be a valid Java class identifier
+    * @param serializedScalarFunction serialized Python scalar function
+    * @param inputTypes input data types
+    * @param resultType expected result type
+    * @param deterministic the determinism of the function's results
+    * @param pythonEnv the Python execution environment
+    * @return A generated Java ScalarFunction representation for the specified Python ScalarFunction
+    */
+  def createPythonScalarFunction(
+      config: TableConfig,
+      funcName: String,
+      serializedScalarFunction: Array[Byte],
+      inputTypes: Array[TypeInformation[_]],
+      resultType: TypeInformation[_],
+      deterministic: Boolean,
+      pythonEnv: PythonEnv): ScalarFunction =
+    PythonFunctionCodeGenerator.generateScalarFunction(
+      CodeGeneratorContext(config),
+      funcName,
+      serializedScalarFunction,
+      inputTypes,
+      resultType,
+      deterministic,
+      pythonEnv)
+
+  /**
+    * Wrap the unpickled python data with an InputFormat. It will be passed to
+    * PythonInputFormatTableSource later.
+    *
+    * @param data The unpickled python data.
+    * @param dataType The python data type.
+    * @param config The execution config used to create serializer.
+    * @return An InputFormat containing the python data.
+    */
+  def getInputFormat(
+      data: java.util.List[Array[Object]],
+      dataType: TypeInformation[Row],
+      config: ExecutionConfig): InputFormat[Row, _] = {
+    val converter = convertTo(dataType)
+    new CollectionInputFormat(data.map(converter(_).asInstanceOf[Row]),
+      dataType.createSerializer(config))
+  }
+
+  /**
+    * Creates a converter that converts `obj` to the type specified by the data type, or returns
+    * null if the type of obj is unexpected because Python doesn't enforce the type.
+    */
+  private def convertTo(dataType: TypeInformation[_]): Any => Any = dataType match {
+    case _ if dataType == Types.BOOLEAN => (obj: Any) => nullSafeConvert(obj) {
+      case b: Boolean => b
+    }
+
+    case _ if dataType == Types.BYTE => (obj: Any) => nullSafeConvert(obj) {
+      case c: Byte => c
+      case c: Short => c.toByte
+      case c: Int => c.toByte
+      case c: Long => c.toByte
+    }
+
+    case _ if dataType == Types.SHORT => (obj: Any) => nullSafeConvert(obj) {
+      case c: Byte => c.toShort
+      case c: Short => c
+      case c: Int => c.toShort
+      case c: Long => c.toShort
+    }
+
+    case _ if dataType == Types.INT => (obj: Any) => nullSafeConvert(obj) {
+      case c: Byte => c.toInt
+      case c: Short => c.toInt
+      case c: Int => c
+      case c: Long => c.toInt
+    }
+
+    case _ if dataType == Types.LONG => (obj: Any) => nullSafeConvert(obj) {
+      case c: Byte => c.toLong
+      case c: Short => c.toLong
+      case c: Int => c.toLong
+      case c: Long => c
+    }
+
+    case _ if dataType == Types.FLOAT => (obj: Any) => nullSafeConvert(obj) {
+      case c: Float => c
+      case c: Double => c.toFloat
+    }
+
+    case _ if dataType == Types.DOUBLE => (obj: Any) => nullSafeConvert(obj) {
+      case c: Float => c.toDouble
+      case c: Double => c
+    }
+
+    case _ if dataType == Types.DECIMAL => (obj: Any) => nullSafeConvert(obj) {
+      case c: java.math.BigDecimal => c
+    }
+
+    case _ if dataType == Types.SQL_DATE => (obj: Any) => nullSafeConvert(obj) {
+      case c: Int =>
+        val millisLocal = c.toLong * 86400000
+        val millisUtc = millisLocal - getOffsetFromLocalMillis(millisLocal)
+        new Date(millisUtc)
+    }
+
+    case _ if dataType == Types.SQL_TIME => (obj: Any) => nullSafeConvert(obj) {
+      case c: Long => new Time(c / 1000)
+      case c: Int => new Time(c.toLong / 1000)
+    }
+
+    case _ if dataType == Types.SQL_TIMESTAMP => (obj: Any) => nullSafeConvert(obj) {
+      case c: Long => new Timestamp(c / 1000)
+      case c: Int => new Timestamp(c.toLong / 1000)
+    }
+
+    case _ if dataType == Types.INTERVAL_MILLIS() => (obj: Any) => nullSafeConvert(obj) {
+      case c: Long => c / 1000
+      case c: Int => c.toLong / 1000
+    }
+
+    case _ if dataType == Types.STRING => (obj: Any) => nullSafeConvert(obj) {
+      case _ => obj.toString
+    }
+
+    case PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO =>
+      (obj: Any) =>
+        nullSafeConvert(obj) {
+          case c: String => c.getBytes(StandardCharsets.UTF_8)
+          case c if c.getClass.isArray && c.getClass.getComponentType.getName == "byte" => c
+        }
+
+    case _: PrimitiveArrayTypeInfo[_] |
+         _: BasicArrayTypeInfo[_, _] |
+         _: ObjectArrayTypeInfo[_, _] =>
+      var boxed = false
+      val elementType = dataType match {
+        case p: PrimitiveArrayTypeInfo[_] =>
+          p.getComponentType
+        case b: BasicArrayTypeInfo[_, _] =>
+          boxed = true
+          b.getComponentInfo
+        case o: ObjectArrayTypeInfo[_, _] =>
+          boxed = true
+          o.getComponentInfo
+      }
+      val elementFromJava = convertTo(elementType)
+
+      (obj: Any) => nullSafeConvert(obj) {
+        case c: java.util.List[_] =>
+          createArray(elementType,
+                      c.size(),
+                      i => elementFromJava(c.get(i)),
+                      boxed)
+        case c if c.getClass.isArray =>
+          createArray(elementType,
+                      c.asInstanceOf[Array[_]].length,
+                      i => elementFromJava(c.asInstanceOf[Array[_]](i)),
+                      boxed)
+      }
+
+    case m: MapTypeInfo[_, _] =>
+      val keyFromJava = convertTo(m.getKeyTypeInfo)
+      val valueFromJava = convertTo(m.getValueTypeInfo)
+
+      (obj: Any) => nullSafeConvert(obj) {
+        case javaMap: java.util.Map[_, _] =>
+          val map = new java.util.HashMap[Any, Any]
+          javaMap.forEach(new BiConsumer[Any, Any] {
+            override def accept(k: Any, v: Any): Unit =
+              map.put(keyFromJava(k), valueFromJava(v))
+          })
+          map
+      }
+
+    case rowType: RowTypeInfo =>
+      val fieldsFromJava = rowType.getFieldTypes.map(f => convertTo(f))
+
+      (obj: Any) => nullSafeConvert(obj) {
+        case c if c.getClass.isArray =>
+          val r = c.asInstanceOf[Array[_]]
+          if (r.length != rowType.getFieldTypes.length) {
+            throw new IllegalStateException(
+              s"Input row doesn't have expected number of values required by the schema. " +
+                s"${rowType.getFieldTypes.length} fields are required while ${r.length} " +
+                s"values are provided."
+              )
+          }
+
+          val row = new Row(r.length)
+          var i = 0
+          while (i < r.length) {
+            row.setField(i, fieldsFromJava(i)(r(i)))
+            i += 1
+          }
+          row
+      }
+
+    // UserDefinedType
+    case _ => (obj: Any) => obj
+  }
+
+  private def nullSafeConvert(input: Any)(f: PartialFunction[Any, Any]): Any = {
+    if (input == null) {
+      null
+    } else {
+      f.applyOrElse(input, {
+        _: Any => null
+      })
+    }
+  }
+
+  private def createArray(
+      elementType: TypeInformation[_],
+      length: Int,
+      getElement: Int => Any,
+      boxed: Boolean = false): Array[_] = {
+    elementType match {
+      case BasicTypeInfo.BOOLEAN_TYPE_INFO =>
+        if (!boxed) {
+          val array = new Array[Boolean](length)
+          for (i <- 0 until length) {
+            array(i) = getElement(i).asInstanceOf[Boolean]
+          }
+          array
+        } else {
+          val array = new Array[java.lang.Boolean](length)
+          for (i <- 0 until length) {
+            if (getElement(i) != null) {
+              array(i) = java.lang.Boolean.valueOf(getElement(i).asInstanceOf[Boolean])
+            } else {
+              array(i) = null
+            }
+          }
+          array
+        }
+
+      case BasicTypeInfo.BYTE_TYPE_INFO =>
+        if (!boxed) {
+          val array = new Array[Byte](length)
+          for (i <- 0 until length) {
+            array(i) = getElement(i).asInstanceOf[Byte]
+          }
+          array
+        } else {
+          val array = new Array[java.lang.Byte](length)
+          for (i <- 0 until length) {
+            if (getElement(i) != null) {
+              array(i) = java.lang.Byte.valueOf(getElement(i).asInstanceOf[Byte])
+            } else {
+              array(i) = null
+            }
+          }
+          array
+        }
+
+      case BasicTypeInfo.SHORT_TYPE_INFO =>
+        if (!boxed) {
+          val array = new Array[Short](length)
+          for (i <- 0 until length) {
+            array(i) = getElement(i).asInstanceOf[Short]
+          }
+          array
+        } else {
+          val array = new Array[java.lang.Short](length)
+          for (i <- 0 until length) {
+            if (getElement(i) != null) {
+              array(i) = java.lang.Short.valueOf(getElement(i).asInstanceOf[Short])
+            } else {
+              array(i) = null
+            }
+          }
+          array
+        }
+
+      case BasicTypeInfo.INT_TYPE_INFO =>
+        if (!boxed) {
+          val array = new Array[Int](length)
+          for (i <- 0 until length) {
+            array(i) = getElement(i).asInstanceOf[Int]
+          }
+          array
+        } else {
+          val array = new Array[java.lang.Integer](length)
+          for (i <- 0 until length) {
+            if (getElement(i) != null) {
+              array(i) = java.lang.Integer.valueOf(getElement(i).asInstanceOf[Int])
+            } else {
+              array(i) = null
+            }
+          }
+          array
+        }
+
+      case BasicTypeInfo.LONG_TYPE_INFO =>
+        if (!boxed) {
+          val array = new Array[Long](length)
+          for (i <- 0 until length) {
+            array(i) = getElement(i).asInstanceOf[Long]
+          }
+          array
+        } else {
+          val array = new Array[java.lang.Long](length)
+          for (i <- 0 until length) {
+            if (getElement(i) != null) {
+              array(i) = java.lang.Long.valueOf(getElement(i).asInstanceOf[Long])
+            } else {
+              array(i) = null
+            }
+          }
+          array
+        }
+
+      case BasicTypeInfo.FLOAT_TYPE_INFO =>
+        if (!boxed) {
+          val array = new Array[Float](length)
+          for (i <- 0 until length) {
+            array(i) = getElement(i).asInstanceOf[Float]
+          }
+          array
+        } else {
+          val array = new Array[java.lang.Float](length)
+          for (i <- 0 until length) {
+            if (getElement(i) != null) {
+              array(i) = java.lang.Float.valueOf(getElement(i).asInstanceOf[Float])
+            } else {
+              array(i) = null
+            }
+          }
+          array
+        }
+
+      case BasicTypeInfo.DOUBLE_TYPE_INFO =>
+        if (!boxed) {
+          val array = new Array[Double](length)
+          for (i <- 0 until length) {
+            array(i) = getElement(i).asInstanceOf[Double]
+          }
+          array
+        } else {
+          val array = new Array[java.lang.Double](length)
+          for (i <- 0 until length) {
+            if (getElement(i) != null) {
+              array(i) = java.lang.Double.valueOf(getElement(i).asInstanceOf[Double])
+            } else {
+              array(i) = null
+            }
+          }
+          array
+        }
+
+      case BasicTypeInfo.STRING_TYPE_INFO =>
+        val array = new Array[java.lang.String](length)
+        for (i <- 0 until length) {
+          array(i) = getElement(i).asInstanceOf[java.lang.String]
+        }
+        array
+
+      case _ =>
+        val array = new Array[Object](length)
+        for (i <- 0 until length) {
+          array(i) = getElement(i).asInstanceOf[Object]
+        }
+        array
+    }
+  }
+
+  def getOffsetFromLocalMillis(millisLocal: Long): Int = {
+    val localZone = TimeZone.getDefault
+    var result = localZone.getRawOffset
+    // the actual offset should be calculated based on milliseconds in UTC
+    val offset = localZone.getOffset(millisLocal - result)
+    if (offset != result) {
+      // DayLight Saving Time
+      result = localZone.getOffset(millisLocal - offset)
+      if (result != offset) {
+        // fallback to do the reverse lookup using java.time.LocalDateTime
+        // this should only happen near the start or end of DST
+        val localDate = LocalDate.ofEpochDay(millisLocal / 86400000)
+        val localTime = LocalTime.ofNanoOfDay(
+          Math.floorMod(millisLocal, 86400000) * 1000 * 1000)
+        val localDateTime = LocalDateTime.of(localDate, localTime)
+        val millisEpoch = localDateTime.atZone(localZone.toZoneId).toInstant.toEpochMilli
+        result = (millisLocal - millisEpoch).toInt
+      }
+    }
+    result
+  }
+}
+
+/**
+  * An InputFormatTableSource created by python 'from_element' method.
+  *
+  * @param inputFormat The input format which contains the python data collection,
+  *                    usually created by PythonTableUtils#getInputFormat method
+  * @param rowTypeInfo The row type info of the python data.
+  *                    It is generated by the python 'from_element' method.
+  */
+class PythonInputFormatTableSource[Row](
+    inputFormat: InputFormat[Row, _ <: InputSplit],
+    rowTypeInfo: RowTypeInfo
+) extends InputFormatTableSource[Row] {
+
+  override def getInputFormat: InputFormat[Row, _ <: InputSplit] = inputFormat
+
+  override def getTableSchema: TableSchema = TableSchema.fromTypeInfo(rowTypeInfo)
+
+  override def getReturnType: TypeInformation[Row] = rowTypeInfo.asInstanceOf[TypeInformation[Row]]
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
index f3284d0..afb992b 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
@@ -18,9 +18,12 @@
 
 package org.apache.flink.table.planner.runtime.utils;
 
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.FunctionLanguage;
 import org.apache.flink.table.functions.ScalarFunction;
 
 import java.util.Arrays;
@@ -154,4 +157,63 @@
 		}
 	}
 
+	/**
+	 * Test for Python Scalar Function.
+	 */
+	public static class PythonScalarFunction extends ScalarFunction {
+		private final String name;
+
+		public PythonScalarFunction(String name) {
+			this.name = name;
+		}
+
+		public int eval(int i, int j) {
+			return i + j;
+		}
+
+		@Override
+		public TypeInformation<?> getResultType(Class<?>[] signature) {
+			return BasicTypeInfo.INT_TYPE_INFO;
+		}
+
+		@Override
+		public FunctionLanguage getLanguage() {
+			return FunctionLanguage.PYTHON;
+		}
+
+		@Override
+		public String toString() {
+			return name;
+		}
+	}
+
+	/**
+	 * Test for Python Scalar Function.
+	 */
+	public static class BooleanPythonScalarFunction extends ScalarFunction {
+		private final String name;
+
+		public BooleanPythonScalarFunction(String name) {
+			this.name = name;
+		}
+
+		public boolean eval(int i, int j) {
+			return i + j > 1;
+		}
+
+		@Override
+		public TypeInformation<?> getResultType(Class<?>[] signature) {
+			return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+		}
+
+		@Override
+		public FunctionLanguage getLanguage() {
+			return FunctionLanguage.PYTHON;
+		}
+
+		@Override
+		public String toString() {
+			return name;
+		}
+	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/PythonCalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/PythonCalcTest.xml
new file mode 100644
index 0000000..e488730
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/table/PythonCalcTest.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+	<TestCase name="testPythonFunctionMixedWithJavaFunction">
+		<Resource name="sql">
+			<![CDATA[SELECT pyFunc1(a, b), c + 1 FROM MyTable]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalProject(EXPR$0=[pyFunc1($0, $1)], EXPR$1=[+($2, 1)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+Calc(select=[f0 AS EXPR$0, +(c, 1) AS EXPR$1])
++- PythonCalc(select=[c, pyFunc1(a, b) AS f0])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ExpressionReductionRulesTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ExpressionReductionRulesTest.xml
index 0433fbe..ae612e2 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ExpressionReductionRulesTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ExpressionReductionRulesTest.xml
@@ -62,8 +62,9 @@
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[PyUdf() AS EXPR$0, CAST(2) AS EXPR$1])
-+- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+Calc(select=[f0 AS EXPR$0, CAST(2) AS EXPR$1])
++- PythonCalc(select=[PyUdf() AS f0])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonScalarFunctionSplitRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonScalarFunctionSplitRuleTest.xml
new file mode 100644
index 0000000..66a06cd
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PythonScalarFunctionSplitRuleTest.xml
@@ -0,0 +1,164 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+	<TestCase name="testPythonFunctionAsInputOfJavaFunction">
+		<Resource name="sql">
+			<![CDATA[SELECT pyFunc1(a, b) + 1 FROM MyTable]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalProject(EXPR$0=[+(pyFunc1($0, $1), 1)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+FlinkLogicalCalc(select=[+(f0, 1) AS EXPR$0])
++- FlinkLogicalCalc(select=[pyFunc1(a, b) AS f0])
+   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testPythonFunctionMixedWithJavaFunction">
+		<Resource name="sql">
+			<![CDATA[SELECT pyFunc1(a, b), c + 1 FROM MyTable]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalProject(EXPR$0=[pyFunc1($0, $1)], EXPR$1=[+($2, 1)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+FlinkLogicalCalc(select=[f0 AS EXPR$0, +(c, 1) AS EXPR$1])
++- FlinkLogicalCalc(select=[c, pyFunc1(a, b) AS f0])
+   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testPythonFunctionMixedWithJavaFunctionInWhereClause">
+		<Resource name="sql">
+			<![CDATA[SELECT pyFunc1(a, b), c + 1 FROM MyTable WHERE pyFunc2(a, c) > 0]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalProject(EXPR$0=[pyFunc1($0, $1)], EXPR$1=[+($2, 1)])
++- LogicalFilter(condition=[>(pyFunc2($0, $2), 0)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+FlinkLogicalCalc(select=[f0 AS EXPR$0, +(c, 1) AS EXPR$1], where=[>(f1, 0)])
++- FlinkLogicalCalc(select=[c, pyFunc1(a, b) AS f0, pyFunc2(a, c) AS f1])
+   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testPythonFunctionInWhereClause">
+		<Resource name="sql">
+			<![CDATA[SELECT pyFunc1(a, b) FROM MyTable WHERE pyFunc4(a, c)]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalProject(EXPR$0=[pyFunc1($0, $1)])
++- LogicalFilter(condition=[pyFunc4($0, $2)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+FlinkLogicalCalc(select=[f0 AS EXPR$0], where=[f1])
++- FlinkLogicalCalc(select=[pyFunc1(a, b) AS f0, pyFunc4(a, c) AS f1])
+   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testChainingPythonFunction">
+		<Resource name="sql">
+			<![CDATA[SELECT pyFunc3(pyFunc2(a + pyFunc1(a, c), b), c) FROM MyTable]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalProject(EXPR$0=[pyFunc3(pyFunc2(+($0, pyFunc1($0, $2)), $1), $2)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+FlinkLogicalCalc(select=[pyFunc3(pyFunc2(f0, b), c) AS EXPR$0])
++- FlinkLogicalCalc(select=[b, c, +(a, f0) AS f0])
+   +- FlinkLogicalCalc(select=[b, c, a, pyFunc1(a, c) AS f0])
+      +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testOnlyOnePythonFunction">
+		<Resource name="sql">
+			<![CDATA[SELECT pyFunc1(a, b) FROM MyTable]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalProject(EXPR$0=[pyFunc1($0, $1)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+FlinkLogicalCalc(select=[pyFunc1(a, b) AS EXPR$0])
++- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testOnlyOnePythonFunctionInWhereClause">
+		<Resource name="sql">
+			<![CDATA[SELECT a, b FROM MyTable WHERE pyFunc4(a, c)]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[pyFunc4($0, $2)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+FlinkLogicalCalc(select=[a, b], where=[f0])
++- FlinkLogicalCalc(select=[a, b, pyFunc4(a, c) AS f0])
+   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+	<TestCase name="testFieldNameUniquify">
+		<Resource name="sql">
+			<![CDATA[SELECT pyFunc1(f1, f2), f0 + 1 FROM MyTable2]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalProject(EXPR$0=[pyFunc1($1, $2)], EXPR$1=[+($0, 1)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(f0, f1, f2)]]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+FlinkLogicalCalc(select=[f00 AS EXPR$0, +(f0, 1) AS EXPR$1])
++- FlinkLogicalCalc(select=[f0, pyFunc1(f1, f2) AS f00])
+   +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(f0, f1, f2)]]], fields=[f0, f1, f2])
+]]>
+		</Resource>
+	</TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/PythonCalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/PythonCalcTest.xml
new file mode 100644
index 0000000..e488730
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/PythonCalcTest.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+	<TestCase name="testPythonFunctionMixedWithJavaFunction">
+		<Resource name="sql">
+			<![CDATA[SELECT pyFunc1(a, b), c + 1 FROM MyTable]]>
+		</Resource>
+		<Resource name="planBefore">
+			<![CDATA[
+LogicalProject(EXPR$0=[pyFunc1($0, $1)], EXPR$1=[+($2, 1)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
+]]>
+		</Resource>
+		<Resource name="planAfter">
+			<![CDATA[
+Calc(select=[f0 AS EXPR$0, +(c, 1) AS EXPR$1])
++- PythonCalc(select=[c, pyFunc1(a, b) AS f0])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+		</Resource>
+	</TestCase>
+</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/PythonCalcTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/PythonCalcTest.scala
new file mode 100644
index 0000000..4c01bfd
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/PythonCalcTest.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction
+import org.apache.flink.table.planner.utils.TableTestBase
+import org.junit.{Before, Test}
+
+class PythonCalcTest extends TableTestBase {
+  private val util = batchTestUtil()
+
+  @Before
+  def setup(): Unit = {
+    util.addTableSource[(Int, Int, Int)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("pyFunc1", new PythonScalarFunction("pyFunc1"))
+  }
+
+  @Test
+  def testPythonFunctionMixedWithJavaFunction(): Unit = {
+    val sqlQuery = "SELECT pyFunc1(a, b), c + 1 FROM MyTable"
+    util.verifyPlan(sqlQuery)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PythonScalarFunctionSplitRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PythonScalarFunctionSplitRuleTest.scala
new file mode 100644
index 0000000..d99d39f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PythonScalarFunctionSplitRuleTest.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical
+
+import org.apache.calcite.plan.hep.HepMatchOrder
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions
+import org.apache.flink.table.planner.plan.optimize.program._
+import org.apache.flink.table.planner.plan.rules.{FlinkBatchRuleSets, FlinkStreamRuleSets}
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.{BooleanPythonScalarFunction, PythonScalarFunction}
+import org.apache.flink.table.planner.utils.TableTestBase
+import org.junit.{Before, Test}
+
+/**
+  * Test for [[PythonScalarFunctionSplitRule]].
+  */
+class PythonScalarFunctionSplitRuleTest extends TableTestBase {
+
+  private val util = batchTestUtil()
+
+  @Before
+  def setup(): Unit = {
+    val programs = new FlinkChainedProgram[BatchOptimizeContext]()
+    programs.addLast(
+      "logical",
+      FlinkVolcanoProgramBuilder.newBuilder
+        .add(FlinkBatchRuleSets.LOGICAL_OPT_RULES)
+        .setRequiredOutputTraits(Array(FlinkConventions.LOGICAL))
+        .build())
+    programs.addLast(
+      "logical_rewrite",
+      FlinkHepRuleSetProgramBuilder.newBuilder
+        .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
+        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+        .add(FlinkStreamRuleSets.LOGICAL_REWRITE)
+        .build())
+    util.replaceBatchProgram(programs)
+
+    util.addTableSource[(Int, Int, Int)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("pyFunc1", new PythonScalarFunction("pyFunc1"))
+    util.addFunction("pyFunc2", new PythonScalarFunction("pyFunc2"))
+    util.addFunction("pyFunc3", new PythonScalarFunction("pyFunc3"))
+    util.addFunction("pyFunc4", new BooleanPythonScalarFunction("pyFunc4"))
+  }
+
+  @Test
+  def testPythonFunctionAsInputOfJavaFunction(): Unit = {
+    val sqlQuery = "SELECT pyFunc1(a, b) + 1 FROM MyTable"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testPythonFunctionMixedWithJavaFunction(): Unit = {
+    val sqlQuery = "SELECT pyFunc1(a, b), c + 1 FROM MyTable"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testPythonFunctionMixedWithJavaFunctionInWhereClause(): Unit = {
+    val sqlQuery = "SELECT pyFunc1(a, b), c + 1 FROM MyTable WHERE pyFunc2(a, c) > 0"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testPythonFunctionInWhereClause(): Unit = {
+    val sqlQuery = "SELECT pyFunc1(a, b) FROM MyTable WHERE pyFunc4(a, c)"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testChainingPythonFunction(): Unit = {
+    val sqlQuery = "SELECT pyFunc3(pyFunc2(a + pyFunc1(a, c), b), c) FROM MyTable"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testOnlyOnePythonFunction(): Unit = {
+    val sqlQuery = "SELECT pyFunc1(a, b) FROM MyTable"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testOnlyOnePythonFunctionInWhereClause(): Unit = {
+    val sqlQuery = "SELECT a, b FROM MyTable WHERE pyFunc4(a, c)"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testFieldNameUniquify(): Unit = {
+    util.addTableSource[(Int, Int, Int)]("MyTable2", 'f0, 'f1, 'f2)
+    val sqlQuery = "SELECT pyFunc1(f1, f2), f0 + 1 FROM MyTable2"
+    util.verifyPlan(sqlQuery)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/PythonCalcTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/PythonCalcTest.scala
new file mode 100644
index 0000000..d25b20f
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/PythonCalcTest.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction
+import org.apache.flink.table.planner.utils.TableTestBase
+import org.junit.{Before, Test}
+
+class PythonCalcTest extends TableTestBase {
+  private val util = streamTestUtil()
+
+  @Before
+  def setup(): Unit = {
+    util.addTableSource[(Int, Int, Int)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("pyFunc1", new PythonScalarFunction("pyFunc1"))
+  }
+
+  @Test
+  def testPythonFunctionMixedWithJavaFunction(): Unit = {
+    val sqlQuery = "SELECT pyFunc1(a, b), c + 1 FROM MyTable"
+    util.verifyPlan(sqlQuery)
+  }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/PythonFunctionCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/PythonFunctionCodeGenerator.scala
index e97e7b7..332d573 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/PythonFunctionCodeGenerator.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/PythonFunctionCodeGenerator.scala
@@ -34,7 +34,7 @@
   /**
     * Generates a [[ScalarFunction]] for the specified Python user-defined function.
     *
-    * @param name class name of the user-defined function. Must be a valid Java class identifier
+    * @param name name of the user-defined function
     * @param serializedScalarFunction serialized Python scalar function
     * @param inputTypes input data types
     * @param resultType expected result type