[FLINK-24137][python] Fix the issue that tests planned to run in process mode was actually executed in loopback mode
This closes #17214.
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py
index 1983945..4f6a4ee 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -58,7 +58,6 @@
def __init__(self, j_stream_execution_environment, serializer=PickleSerializer()):
self._j_stream_execution_environment = j_stream_execution_environment
- self._remote_mode = False
self.serializer = serializer
def get_config(self) -> ExecutionConfig:
@@ -891,14 +890,15 @@
JPythonConfigUtil = gateway.jvm.org.apache.flink.python.util.PythonConfigUtil
# start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster
j_configuration = get_j_env_configuration(self._j_stream_execution_environment)
- if not self._remote_mode and is_local_deployment(j_configuration):
+
+ def startup_loopback_server():
jvm = gateway.jvm
env_config = JPythonConfigUtil.getEnvironmentConfig(
self._j_stream_execution_environment)
parallelism = self.get_parallelism()
if parallelism > 1 and env_config.containsKey(jvm.PythonOptions.PYTHON_ARCHIVES.key()):
import logging
- logging.warning("Lookback mode is disabled as python archives are used and the "
+ logging.warning("Loopback mode is disabled as python archives are used and the "
"parallelism of the job is greater than 1. The Python user-defined "
"functions will be executed in an independent Python process.")
else:
@@ -908,6 +908,24 @@
get_field_value(j_env, "m").put(
'PYFLINK_LOOPBACK_SERVER_ADDRESS', BeamFnLoopbackWorkerPoolServicer().start())
+ python_worker_execution_mode = None
+ if hasattr(self, "_python_worker_execution_mode"):
+ python_worker_execution_mode = getattr(self, "_python_worker_execution_mode")
+
+ if python_worker_execution_mode is None:
+ if is_local_deployment(j_configuration):
+ startup_loopback_server()
+ elif python_worker_execution_mode == 'loopback':
+ if is_local_deployment(j_configuration):
+ startup_loopback_server()
+ else:
+ raise ValueError("Loopback mode is enabled, however the job wasn't configured to "
+ "run in local deployment mode")
+ elif python_worker_execution_mode != 'process':
+ raise ValueError(
+ "It only supports to execute the Python worker in 'loopback' mode and 'process' "
+ "mode, unknown mode '%s' is configured" % python_worker_execution_mode)
+
JPythonConfigUtil.configPythonOperator(self._j_stream_execution_environment)
gateway.jvm.org.apache.flink.python.chain.PythonOperatorChainingOptimizer.apply(
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index cb7189e..f3d95a3 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -50,13 +50,13 @@
def setUp(self):
self.env = self.create_new_env()
- self.env._remote_mode = True
self.test_sink = DataStreamTestSinkFunction()
@staticmethod
- def create_new_env():
+ def create_new_env(execution_mode='process'):
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(2)
+ env._execution_mode = execution_mode
return env
def test_get_config(self):
@@ -358,7 +358,7 @@
def test_add_python_file(self):
import uuid
- env = self.create_new_env()
+ env = self.create_new_env("loopback")
python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
os.mkdir(python_file_dir)
python_file_path = os.path.join(python_file_dir, "test_dep1.py")
@@ -409,7 +409,7 @@
def test_add_python_file_2(self):
import uuid
- env = self.create_new_env()
+ env = self.create_new_env("loopback")
python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
os.mkdir(python_file_dir)
python_file_path = os.path.join(python_file_dir, "test_dep1.py")
@@ -477,7 +477,7 @@
def test_set_requirements_with_cached_directory(self):
import uuid
tmp_dir = self.tempdir
- env = self.create_new_env()
+ env = self.create_new_env("loopback")
requirements_txt_path = os.path.join(tmp_dir, "requirements_txt_" + str(uuid.uuid4()))
with open(requirements_txt_path, 'w') as f:
f.write("python-package1==0.0.0")
@@ -523,7 +523,7 @@
import uuid
import shutil
tmp_dir = self.tempdir
- env = self.create_new_env()
+ env = self.create_new_env("loopback")
archive_dir_path = os.path.join(tmp_dir, "archive_" + str(uuid.uuid4()))
os.mkdir(archive_dir_path)
with open(os.path.join(archive_dir_path, "data.txt"), 'w') as f:
@@ -550,7 +550,7 @@
import sys
python_exec = sys.executable
tmp_dir = self.tempdir
- env = self.create_new_env()
+ env = self.create_new_env("loopback")
python_exec_link_path = os.path.join(tmp_dir, "py_exec")
os.symlink(python_exec, python_exec_link_path)
env.set_python_executable(python_exec_link_path)
@@ -614,12 +614,11 @@
def test_generate_stream_graph_with_dependencies(self):
python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
- env = self.create_new_env()
- env._remote_mode = True
os.mkdir(python_file_dir)
python_file_path = os.path.join(python_file_dir, "test_stream_dependency_manage_lib.py")
with open(python_file_path, 'w') as f:
f.write("def add_two(a):\n return a + 2")
+ env = self.env
env.add_python_file(python_file_path)
def plus_two_map(value):
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 98a1f7b..9db27be 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -91,7 +91,6 @@
def __init__(self, j_tenv, serializer=PickleSerializer()):
self._j_tenv = j_tenv
self._serializer = serializer
- self._remote_mode = False
# When running in MiniCluster, launch the Python UDF worker using the Python executable
# specified by sys.executable if users have not specified it explicitly via configuration
# python.executable.
@@ -1747,9 +1746,9 @@
classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
self._add_jars_to_j_env_config(jars_key)
self._add_jars_to_j_env_config(classpaths_key)
+
# start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster
- if not self._remote_mode and \
- is_local_deployment(get_j_env_configuration(self._get_j_env())):
+ def startup_loopback_server():
from pyflink.common import Configuration
_j_config = jvm.org.apache.flink.python.util.PythonConfigUtil.getMergedConfig(
self._get_j_env(), self.get_config()._j_table_config)
@@ -1769,6 +1768,24 @@
get_field_value(j_env, "m").put(
'PYFLINK_LOOPBACK_SERVER_ADDRESS', BeamFnLoopbackWorkerPoolServicer().start())
+ python_worker_execution_mode = None
+ if hasattr(self, "_python_worker_execution_mode"):
+ python_worker_execution_mode = getattr(self, "_python_worker_execution_mode")
+
+ if python_worker_execution_mode is None:
+ if is_local_deployment(get_j_env_configuration(self._get_j_env())):
+ startup_loopback_server()
+ elif python_worker_execution_mode == 'loopback':
+ if is_local_deployment(get_j_env_configuration(self._get_j_env())):
+ startup_loopback_server()
+ else:
+ raise ValueError("Loopback mode is enabled, however the job wasn't configured to "
+ "run in local deployment mode")
+ elif python_worker_execution_mode != 'process':
+ raise ValueError(
+ "It only supports to execute the Python worker in 'loopback' mode and 'process' "
+ "mode, unknown mode '%s' is configured" % python_worker_execution_mode)
+
def _wrap_aggregate_function_if_needed(self, function) -> UserDefinedFunctionWrapper:
if isinstance(function, AggregateFunction):
function = udaf(function,
diff --git a/flink-python/pyflink/table/tests/test_dependency.py b/flink-python/pyflink/table/tests/test_dependency.py
index b61740e..27ebf6b 100644
--- a/flink-python/pyflink/table/tests/test_dependency.py
+++ b/flink-python/pyflink/table/tests/test_dependency.py
@@ -22,7 +22,7 @@
import uuid
from pyflink.pyflink_gateway_server import on_windows
-from pyflink.table import DataTypes
+from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
from pyflink.table import expressions as expr
from pyflink.table.udf import udf
from pyflink.testing import source_sink_utils
@@ -74,26 +74,27 @@
def setUp(self):
super(StreamDependencyTests, self).setUp()
- self.t_env._remote_mode = False
+ self.st_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
+ self.st_env._execution_mode = "loopback"
def test_set_requirements_without_cached_directory(self):
requirements_txt_path = os.path.join(self.tempdir, str(uuid.uuid4()))
with open(requirements_txt_path, 'w') as f:
f.write("cloudpickle==1.2.2")
- self.t_env.set_python_requirements(requirements_txt_path)
+ self.st_env.set_python_requirements(requirements_txt_path)
def check_requirements(i):
import cloudpickle # noqa # pylint: disable=unused-import
assert '_PYTHON_REQUIREMENTS_INSTALL_DIR' in os.environ
return i
- self.t_env.create_temporary_system_function("check_requirements",
- udf(check_requirements, DataTypes.BIGINT(),
- DataTypes.BIGINT()))
+ self.st_env.create_temporary_system_function(
+ "check_requirements",
+ udf(check_requirements, DataTypes.BIGINT(), DataTypes.BIGINT()))
table_sink = source_sink_utils.TestAppendSink(
['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
- self.t_env.register_table_sink("Results", table_sink)
- t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
+ self.st_env.register_table_sink("Results", table_sink)
+ t = self.st_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
t.select(expr.call('check_requirements', t.a), t.a).execute_insert("Results").wait()
actual = source_sink_utils.results()
@@ -127,19 +128,19 @@
"7or3/88i0H/tfBFW7s/s/avRInQH06ieEy7tDrQeYHUdRN7wP+n/vf62LOH/pld7f9xz7a5Pfufedy0oP"
"86iJI8KxStAq6yLC4JWdbbVbWRikR2z1ZGytk5vauW3QdnBFE6XqwmykazCesAAAAAAAAAAAAAAAAAAAA"
"AAAAAAAAAAAAAAOBw/AJw5CHBAFAAAA=="))
- self.t_env.set_python_requirements(requirements_txt_path, requirements_dir_path)
+ self.st_env.set_python_requirements(requirements_txt_path, requirements_dir_path)
def add_one(i):
from python_package1 import plus
return plus(i, 1)
- self.t_env.create_temporary_system_function("add_one",
- udf(add_one, DataTypes.BIGINT(),
- DataTypes.BIGINT()))
+ self.st_env.create_temporary_system_function(
+ "add_one",
+ udf(add_one, DataTypes.BIGINT(), DataTypes.BIGINT()))
table_sink = source_sink_utils.TestAppendSink(
['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
- self.t_env.register_table_sink("Results", table_sink)
- t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
+ self.st_env.register_table_sink("Results", table_sink)
+ t = self.st_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
t.select(expr.call('add_one', t.a), t.a).execute_insert("Results").wait()
actual = source_sink_utils.results()
@@ -163,7 +164,6 @@
self.t_env.create_temporary_system_function("add_from_file",
udf(add_from_file, DataTypes.BIGINT(),
DataTypes.BIGINT()))
- self.t_env._remote_mode = True
table_sink = source_sink_utils.TestAppendSink(
['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
self.t_env.register_table_sink("Results", table_sink)
@@ -179,31 +179,31 @@
tmp_dir = self.tempdir
python_exec_link_path = os.path.join(tmp_dir, "py_exec")
os.symlink(python_exec, python_exec_link_path)
- self.t_env.get_config().set_python_executable(python_exec_link_path)
+ self.st_env.get_config().set_python_executable(python_exec_link_path)
def check_python_exec(i):
import os
assert os.environ["python"] == python_exec_link_path
return i
- self.t_env.create_temporary_system_function("check_python_exec",
- udf(check_python_exec, DataTypes.BIGINT(),
- DataTypes.BIGINT()))
+ self.st_env.create_temporary_system_function(
+ "check_python_exec",
+ udf(check_python_exec, DataTypes.BIGINT(), DataTypes.BIGINT()))
def check_pyflink_gateway_disabled(i):
from pyflink.java_gateway import get_gateway
get_gateway()
return i
- self.t_env.create_temporary_system_function(
+ self.st_env.create_temporary_system_function(
"check_pyflink_gateway_disabled",
udf(check_pyflink_gateway_disabled, DataTypes.BIGINT(),
DataTypes.BIGINT()))
table_sink = source_sink_utils.TestAppendSink(
['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
- self.t_env.register_table_sink("Results", table_sink)
- t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
+ self.st_env.register_table_sink("Results", table_sink)
+ t = self.st_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
t.select(
expr.call('check_python_exec', t.a),
expr.call('check_pyflink_gateway_disabled', t.a)) \
diff --git a/flink-python/pyflink/testing/test_case_utils.py b/flink-python/pyflink/testing/test_case_utils.py
index 9f8ff80..0b36750 100644
--- a/flink-python/pyflink/testing/test_case_utils.py
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -136,7 +136,7 @@
self.t_env.get_config().get_configuration().set_string("parallelism.default", "2")
self.t_env.get_config().get_configuration().set_string(
"python.fn-execution.bundle.size", "1")
- self.t_env._remote_mode = True
+ self.t_env._execution_mode = "process"
class PyFlinkBatchTableTestCase(PyFlinkTestCase):
@@ -150,7 +150,7 @@
self.t_env.get_config().get_configuration().set_string("parallelism.default", "2")
self.t_env.get_config().get_configuration().set_string(
"python.fn-execution.bundle.size", "1")
- self.t_env._remote_mode = True
+ self.t_env._execution_mode = "process"
class PyFlinkStreamingTestCase(PyFlinkTestCase):
@@ -163,7 +163,7 @@
self.env = StreamExecutionEnvironment.get_execution_environment()
self.env.set_parallelism(2)
self.env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
- self.env._remote_mode = True
+ self.env._execution_mode = "process"
class PyFlinkBatchTestCase(PyFlinkTestCase):
@@ -176,7 +176,7 @@
self.env = StreamExecutionEnvironment.get_execution_environment()
self.env.set_parallelism(2)
self.env.set_runtime_mode(RuntimeExecutionMode.BATCH)
- self.env._remote_mode = True
+ self.env._execution_mode = "process"
class PythonAPICompletenessTestCase(object):