[FLINK-24245][python] Fix the problem caused by multiple jobs sharing the loopback mode address stored in the environment variable in PyFlink
This closes #17239.
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py
index 3dbd3e8..4c41721 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -59,6 +59,7 @@
def __init__(self, j_stream_execution_environment, serializer=PickleSerializer()):
self._j_stream_execution_environment = j_stream_execution_environment
self.serializer = serializer
+ self._open()
def get_config(self) -> ExecutionConfig:
"""
@@ -888,20 +889,33 @@
-> JavaObject:
gateway = get_gateway()
JPythonConfigUtil = gateway.jvm.org.apache.flink.python.util.PythonConfigUtil
+
+ JPythonConfigUtil.configPythonOperator(self._j_stream_execution_environment)
+
+ gateway.jvm.org.apache.flink.python.chain.PythonOperatorChainingOptimizer.apply(
+ self._j_stream_execution_environment)
+
+ JPythonConfigUtil.setPartitionCustomOperatorNumPartitions(
+ get_field_value(self._j_stream_execution_environment, "transformations"))
+
+ j_stream_graph = self._j_stream_execution_environment.getStreamGraph(clear_transformations)
+ if job_name is not None:
+ j_stream_graph.setJobName(job_name)
+ return j_stream_graph
+
+ def _open(self):
# start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster
j_configuration = get_j_env_configuration(self._j_stream_execution_environment)
def startup_loopback_server():
+ from pyflink.common import Configuration
from pyflink.fn_execution.beam.beam_worker_pool_service import \
BeamFnLoopbackWorkerPoolServicer
- jvm = gateway.jvm
- j_env = jvm.System.getenv()
- get_field_value(j_env, "m").put(
- 'PYFLINK_LOOPBACK_SERVER_ADDRESS', BeamFnLoopbackWorkerPoolServicer().start())
+ config = Configuration(j_configuration=j_configuration)
+ config.set_string(
+ "PYFLINK_LOOPBACK_SERVER_ADDRESS", BeamFnLoopbackWorkerPoolServicer().start())
- python_worker_execution_mode = None
- if hasattr(self, "_python_worker_execution_mode"):
- python_worker_execution_mode = getattr(self, "_python_worker_execution_mode")
+ python_worker_execution_mode = os.environ.get('_python_worker_execution_mode')
if python_worker_execution_mode is None:
if is_local_deployment(j_configuration):
@@ -917,19 +931,6 @@
"It only supports to execute the Python worker in 'loopback' mode and 'process' "
"mode, unknown mode '%s' is configured" % python_worker_execution_mode)
- JPythonConfigUtil.configPythonOperator(self._j_stream_execution_environment)
-
- gateway.jvm.org.apache.flink.python.chain.PythonOperatorChainingOptimizer.apply(
- self._j_stream_execution_environment)
-
- JPythonConfigUtil.setPartitionCustomOperatorNumPartitions(
- get_field_value(self._j_stream_execution_environment, "transformations"))
-
- j_stream_graph = self._j_stream_execution_environment.getStreamGraph(clear_transformations)
- if job_name is not None:
- j_stream_graph.setJobName(job_name)
- return j_stream_graph
-
def is_unaligned_checkpoints_enabled(self):
"""
Returns whether Unaligned Checkpoints are enabled.
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index f3d95a3..283cfa3 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -49,16 +49,12 @@
class StreamExecutionEnvironmentTests(PyFlinkTestCase):
def setUp(self):
- self.env = self.create_new_env()
+ os.environ['_python_worker_execution_mode'] = "loopback"
+ self.env = StreamExecutionEnvironment.get_execution_environment()
+ os.environ['_python_worker_execution_mode'] = "process"
+ self.env.set_parallelism(2)
self.test_sink = DataStreamTestSinkFunction()
- @staticmethod
- def create_new_env(execution_mode='process'):
- env = StreamExecutionEnvironment.get_execution_environment()
- env.set_parallelism(2)
- env._execution_mode = execution_mode
- return env
-
def test_get_config(self):
execution_config = self.env.get_config()
@@ -358,7 +354,7 @@
def test_add_python_file(self):
import uuid
- env = self.create_new_env("loopback")
+ env = self.env
python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
os.mkdir(python_file_dir)
python_file_path = os.path.join(python_file_dir, "test_dep1.py")
@@ -409,7 +405,7 @@
def test_add_python_file_2(self):
import uuid
- env = self.create_new_env("loopback")
+ env = self.env
python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
os.mkdir(python_file_dir)
python_file_path = os.path.join(python_file_dir, "test_dep1.py")
@@ -477,7 +473,7 @@
def test_set_requirements_with_cached_directory(self):
import uuid
tmp_dir = self.tempdir
- env = self.create_new_env("loopback")
+ env = self.env
requirements_txt_path = os.path.join(tmp_dir, "requirements_txt_" + str(uuid.uuid4()))
with open(requirements_txt_path, 'w') as f:
f.write("python-package1==0.0.0")
@@ -523,7 +519,7 @@
import uuid
import shutil
tmp_dir = self.tempdir
- env = self.create_new_env("loopback")
+ env = self.env
archive_dir_path = os.path.join(tmp_dir, "archive_" + str(uuid.uuid4()))
os.mkdir(archive_dir_path)
with open(os.path.join(archive_dir_path, "data.txt"), 'w') as f:
@@ -550,7 +546,7 @@
import sys
python_exec = sys.executable
tmp_dir = self.tempdir
- env = self.create_new_env("loopback")
+ env = self.env
python_exec_link_path = os.path.join(tmp_dir, "py_exec")
os.symlink(python_exec, python_exec_link_path)
env.set_python_executable(python_exec_link_path)
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 1e10a0b..9562fcc 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -48,7 +48,7 @@
from pyflink.table.utils import to_expression_jarray
from pyflink.util import java_utils
from pyflink.util.java_utils import get_j_env_configuration, is_local_deployment, load_java_class, \
- to_j_explain_detail_arr, to_jarray, get_field, get_field_value
+ to_j_explain_detail_arr, to_jarray, get_field
__all__ = [
'StreamTableEnvironment',
@@ -96,6 +96,7 @@
# python.executable.
self._set_python_executable_for_local_executor()
self._config_chaining_optimization()
+ self._open()
@staticmethod
def create(environment_settings: EnvironmentSettings) -> 'TableEnvironment':
@@ -1747,33 +1748,6 @@
self._add_jars_to_j_env_config(jars_key)
self._add_jars_to_j_env_config(classpaths_key)
- # start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster
- def startup_loopback_server():
- from pyflink.fn_execution.beam.beam_worker_pool_service import \
- BeamFnLoopbackWorkerPoolServicer
-
- j_env = jvm.System.getenv()
- get_field_value(j_env, "m").put(
- 'PYFLINK_LOOPBACK_SERVER_ADDRESS', BeamFnLoopbackWorkerPoolServicer().start())
-
- python_worker_execution_mode = None
- if hasattr(self, "_python_worker_execution_mode"):
- python_worker_execution_mode = getattr(self, "_python_worker_execution_mode")
-
- if python_worker_execution_mode is None:
- if is_local_deployment(get_j_env_configuration(self._get_j_env())):
- startup_loopback_server()
- elif python_worker_execution_mode == 'loopback':
- if is_local_deployment(get_j_env_configuration(self._get_j_env())):
- startup_loopback_server()
- else:
- raise ValueError("Loopback mode is enabled, however the job wasn't configured to "
- "run in local deployment mode")
- elif python_worker_execution_mode != 'process':
- raise ValueError(
- "It only supports to execute the Python worker in 'loopback' mode and 'process' "
- "mode, unknown mode '%s' is configured" % python_worker_execution_mode)
-
def _wrap_aggregate_function_if_needed(self, function) -> UserDefinedFunctionWrapper:
if isinstance(function, AggregateFunction):
function = udaf(function,
@@ -1794,6 +1768,34 @@
exec_env_field.set(self._j_tenv,
JChainingOptimizingExecutor(exec_env_field.get(self._j_tenv)))
+ def _open(self):
+ # start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster
+ def startup_loopback_server():
+ from pyflink.common import Configuration
+ from pyflink.fn_execution.beam.beam_worker_pool_service import \
+ BeamFnLoopbackWorkerPoolServicer
+
+ j_configuration = get_j_env_configuration(self._get_j_env())
+ config = Configuration(j_configuration=j_configuration)
+ config.set_string(
+ "PYFLINK_LOOPBACK_SERVER_ADDRESS", BeamFnLoopbackWorkerPoolServicer().start())
+
+ python_worker_execution_mode = os.environ.get('_python_worker_execution_mode')
+
+ if python_worker_execution_mode is None:
+ if is_local_deployment(get_j_env_configuration(self._get_j_env())):
+ startup_loopback_server()
+ elif python_worker_execution_mode == 'loopback':
+ if is_local_deployment(get_j_env_configuration(self._get_j_env())):
+ startup_loopback_server()
+ else:
+ raise ValueError("Loopback mode is enabled, however the job wasn't configured to "
+ "run in local deployment mode")
+ elif python_worker_execution_mode != 'process':
+ raise ValueError(
+ "It only supports to execute the Python worker in 'loopback' mode and 'process' "
+ "mode, unknown mode '%s' is configured" % python_worker_execution_mode)
+
class StreamTableEnvironment(TableEnvironment):
diff --git a/flink-python/pyflink/table/tests/test_dependency.py b/flink-python/pyflink/table/tests/test_dependency.py
index 5a8e7ab..8eeaee4 100644
--- a/flink-python/pyflink/table/tests/test_dependency.py
+++ b/flink-python/pyflink/table/tests/test_dependency.py
@@ -74,8 +74,13 @@
def setUp(self):
super(StreamDependencyTests, self).setUp()
- self.st_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
- self.st_env._execution_mode = "loopback"
+ origin_execution_mode = os.environ['_python_worker_execution_mode']
+ os.environ['_python_worker_execution_mode'] = "loopback"
+ try:
+ self.st_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
+ finally:
+ if origin_execution_mode is not None:
+ os.environ['_python_worker_execution_mode'] = origin_execution_mode
def test_set_requirements_without_cached_directory(self):
requirements_txt_path = os.path.join(self.tempdir, str(uuid.uuid4()))
diff --git a/flink-python/pyflink/testing/test_case_utils.py b/flink-python/pyflink/testing/test_case_utils.py
index 0b36750..3849d0c 100644
--- a/flink-python/pyflink/testing/test_case_utils.py
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -83,6 +83,7 @@
cls.tempdir = tempfile.mkdtemp()
os.environ["FLINK_TESTING"] = "1"
+ os.environ['_python_worker_execution_mode'] = "process"
_find_flink_home()
logging.info("Using %s as FLINK_HOME...", os.environ["FLINK_HOME"])
@@ -90,6 +91,7 @@
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.tempdir, ignore_errors=True)
+ del os.environ['_python_worker_execution_mode']
@classmethod
def assert_equals(cls, actual, expected):
@@ -136,7 +138,6 @@
self.t_env.get_config().get_configuration().set_string("parallelism.default", "2")
self.t_env.get_config().get_configuration().set_string(
"python.fn-execution.bundle.size", "1")
- self.t_env._execution_mode = "process"
class PyFlinkBatchTableTestCase(PyFlinkTestCase):
@@ -150,7 +151,6 @@
self.t_env.get_config().get_configuration().set_string("parallelism.default", "2")
self.t_env.get_config().get_configuration().set_string(
"python.fn-execution.bundle.size", "1")
- self.t_env._execution_mode = "process"
class PyFlinkStreamingTestCase(PyFlinkTestCase):
@@ -163,7 +163,6 @@
self.env = StreamExecutionEnvironment.get_execution_environment()
self.env.set_parallelism(2)
self.env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
- self.env._execution_mode = "process"
class PyFlinkBatchTestCase(PyFlinkTestCase):
@@ -176,7 +175,6 @@
self.env = StreamExecutionEnvironment.get_execution_environment()
self.env.set_parallelism(2)
self.env.set_runtime_mode(RuntimeExecutionMode.BATCH)
- self.env._execution_mode = "process"
class PythonAPICompletenessTestCase(object):