[FLINK-24245][python] Fix the problem caused by multiple jobs sharing the loopback mode address stored in the environment variable in PyFlink

This closes #17239.
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py
index 3dbd3e8..4c41721 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -59,6 +59,7 @@
     def __init__(self, j_stream_execution_environment, serializer=PickleSerializer()):
         self._j_stream_execution_environment = j_stream_execution_environment
         self.serializer = serializer
+        self._open()
 
     def get_config(self) -> ExecutionConfig:
         """
@@ -888,20 +889,33 @@
             -> JavaObject:
         gateway = get_gateway()
         JPythonConfigUtil = gateway.jvm.org.apache.flink.python.util.PythonConfigUtil
+
+        JPythonConfigUtil.configPythonOperator(self._j_stream_execution_environment)
+
+        gateway.jvm.org.apache.flink.python.chain.PythonOperatorChainingOptimizer.apply(
+            self._j_stream_execution_environment)
+
+        JPythonConfigUtil.setPartitionCustomOperatorNumPartitions(
+            get_field_value(self._j_stream_execution_environment, "transformations"))
+
+        j_stream_graph = self._j_stream_execution_environment.getStreamGraph(clear_transformations)
+        if job_name is not None:
+            j_stream_graph.setJobName(job_name)
+        return j_stream_graph
+
+    def _open(self):
         # start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster
         j_configuration = get_j_env_configuration(self._j_stream_execution_environment)
 
         def startup_loopback_server():
+            from pyflink.common import Configuration
             from pyflink.fn_execution.beam.beam_worker_pool_service import \
                 BeamFnLoopbackWorkerPoolServicer
-            jvm = gateway.jvm
-            j_env = jvm.System.getenv()
-            get_field_value(j_env, "m").put(
-                'PYFLINK_LOOPBACK_SERVER_ADDRESS', BeamFnLoopbackWorkerPoolServicer().start())
+            config = Configuration(j_configuration=j_configuration)
+            config.set_string(
+                "PYFLINK_LOOPBACK_SERVER_ADDRESS", BeamFnLoopbackWorkerPoolServicer().start())
 
-        python_worker_execution_mode = None
-        if hasattr(self, "_python_worker_execution_mode"):
-            python_worker_execution_mode = getattr(self, "_python_worker_execution_mode")
+        python_worker_execution_mode = os.environ.get('_python_worker_execution_mode')
 
         if python_worker_execution_mode is None:
             if is_local_deployment(j_configuration):
@@ -917,19 +931,6 @@
                 "It only supports to execute the Python worker in 'loopback' mode and 'process' "
                 "mode, unknown mode '%s' is configured" % python_worker_execution_mode)
 
-        JPythonConfigUtil.configPythonOperator(self._j_stream_execution_environment)
-
-        gateway.jvm.org.apache.flink.python.chain.PythonOperatorChainingOptimizer.apply(
-            self._j_stream_execution_environment)
-
-        JPythonConfigUtil.setPartitionCustomOperatorNumPartitions(
-            get_field_value(self._j_stream_execution_environment, "transformations"))
-
-        j_stream_graph = self._j_stream_execution_environment.getStreamGraph(clear_transformations)
-        if job_name is not None:
-            j_stream_graph.setJobName(job_name)
-        return j_stream_graph
-
     def is_unaligned_checkpoints_enabled(self):
         """
         Returns whether Unaligned Checkpoints are enabled.
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index f3d95a3..283cfa3 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -49,16 +49,12 @@
 class StreamExecutionEnvironmentTests(PyFlinkTestCase):
 
     def setUp(self):
-        self.env = self.create_new_env()
+        os.environ['_python_worker_execution_mode'] = "loopback"
+        self.env = StreamExecutionEnvironment.get_execution_environment()
+        os.environ['_python_worker_execution_mode'] = "process"
+        self.env.set_parallelism(2)
         self.test_sink = DataStreamTestSinkFunction()
 
-    @staticmethod
-    def create_new_env(execution_mode='process'):
-        env = StreamExecutionEnvironment.get_execution_environment()
-        env.set_parallelism(2)
-        env._execution_mode = execution_mode
-        return env
-
     def test_get_config(self):
         execution_config = self.env.get_config()
 
@@ -358,7 +354,7 @@
 
     def test_add_python_file(self):
         import uuid
-        env = self.create_new_env("loopback")
+        env = self.env
         python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
         os.mkdir(python_file_dir)
         python_file_path = os.path.join(python_file_dir, "test_dep1.py")
@@ -409,7 +405,7 @@
 
     def test_add_python_file_2(self):
         import uuid
-        env = self.create_new_env("loopback")
+        env = self.env
         python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
         os.mkdir(python_file_dir)
         python_file_path = os.path.join(python_file_dir, "test_dep1.py")
@@ -477,7 +473,7 @@
     def test_set_requirements_with_cached_directory(self):
         import uuid
         tmp_dir = self.tempdir
-        env = self.create_new_env("loopback")
+        env = self.env
         requirements_txt_path = os.path.join(tmp_dir, "requirements_txt_" + str(uuid.uuid4()))
         with open(requirements_txt_path, 'w') as f:
             f.write("python-package1==0.0.0")
@@ -523,7 +519,7 @@
         import uuid
         import shutil
         tmp_dir = self.tempdir
-        env = self.create_new_env("loopback")
+        env = self.env
         archive_dir_path = os.path.join(tmp_dir, "archive_" + str(uuid.uuid4()))
         os.mkdir(archive_dir_path)
         with open(os.path.join(archive_dir_path, "data.txt"), 'w') as f:
@@ -550,7 +546,7 @@
         import sys
         python_exec = sys.executable
         tmp_dir = self.tempdir
-        env = self.create_new_env("loopback")
+        env = self.env
         python_exec_link_path = os.path.join(tmp_dir, "py_exec")
         os.symlink(python_exec, python_exec_link_path)
         env.set_python_executable(python_exec_link_path)
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 1e10a0b..9562fcc 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -48,7 +48,7 @@
 from pyflink.table.utils import to_expression_jarray
 from pyflink.util import java_utils
 from pyflink.util.java_utils import get_j_env_configuration, is_local_deployment, load_java_class, \
-    to_j_explain_detail_arr, to_jarray, get_field, get_field_value
+    to_j_explain_detail_arr, to_jarray, get_field
 
 __all__ = [
     'StreamTableEnvironment',
@@ -96,6 +96,7 @@
         # python.executable.
         self._set_python_executable_for_local_executor()
         self._config_chaining_optimization()
+        self._open()
 
     @staticmethod
     def create(environment_settings: EnvironmentSettings) -> 'TableEnvironment':
@@ -1747,33 +1748,6 @@
         self._add_jars_to_j_env_config(jars_key)
         self._add_jars_to_j_env_config(classpaths_key)
 
-        # start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster
-        def startup_loopback_server():
-            from pyflink.fn_execution.beam.beam_worker_pool_service import \
-                BeamFnLoopbackWorkerPoolServicer
-
-            j_env = jvm.System.getenv()
-            get_field_value(j_env, "m").put(
-                'PYFLINK_LOOPBACK_SERVER_ADDRESS', BeamFnLoopbackWorkerPoolServicer().start())
-
-        python_worker_execution_mode = None
-        if hasattr(self, "_python_worker_execution_mode"):
-            python_worker_execution_mode = getattr(self, "_python_worker_execution_mode")
-
-        if python_worker_execution_mode is None:
-            if is_local_deployment(get_j_env_configuration(self._get_j_env())):
-                startup_loopback_server()
-        elif python_worker_execution_mode == 'loopback':
-            if is_local_deployment(get_j_env_configuration(self._get_j_env())):
-                startup_loopback_server()
-            else:
-                raise ValueError("Loopback mode is enabled, however the job wasn't configured to "
-                                 "run in local deployment mode")
-        elif python_worker_execution_mode != 'process':
-            raise ValueError(
-                "It only supports to execute the Python worker in 'loopback' mode and 'process' "
-                "mode, unknown mode '%s' is configured" % python_worker_execution_mode)
-
     def _wrap_aggregate_function_if_needed(self, function) -> UserDefinedFunctionWrapper:
         if isinstance(function, AggregateFunction):
             function = udaf(function,
@@ -1794,6 +1768,34 @@
         exec_env_field.set(self._j_tenv,
                            JChainingOptimizingExecutor(exec_env_field.get(self._j_tenv)))
 
+    def _open(self):
+        # start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster
+        def startup_loopback_server():
+            from pyflink.common import Configuration
+            from pyflink.fn_execution.beam.beam_worker_pool_service import \
+                BeamFnLoopbackWorkerPoolServicer
+
+            j_configuration = get_j_env_configuration(self._get_j_env())
+            config = Configuration(j_configuration=j_configuration)
+            config.set_string(
+                "PYFLINK_LOOPBACK_SERVER_ADDRESS", BeamFnLoopbackWorkerPoolServicer().start())
+
+        python_worker_execution_mode = os.environ.get('_python_worker_execution_mode')
+
+        if python_worker_execution_mode is None:
+            if is_local_deployment(get_j_env_configuration(self._get_j_env())):
+                startup_loopback_server()
+        elif python_worker_execution_mode == 'loopback':
+            if is_local_deployment(get_j_env_configuration(self._get_j_env())):
+                startup_loopback_server()
+            else:
+                raise ValueError("Loopback mode is enabled, however the job wasn't configured to "
+                                 "run in local deployment mode")
+        elif python_worker_execution_mode != 'process':
+            raise ValueError(
+                "It only supports to execute the Python worker in 'loopback' mode and 'process' "
+                "mode, unknown mode '%s' is configured" % python_worker_execution_mode)
+
 
 class StreamTableEnvironment(TableEnvironment):
 
diff --git a/flink-python/pyflink/table/tests/test_dependency.py b/flink-python/pyflink/table/tests/test_dependency.py
index 5a8e7ab..8eeaee4 100644
--- a/flink-python/pyflink/table/tests/test_dependency.py
+++ b/flink-python/pyflink/table/tests/test_dependency.py
@@ -74,8 +74,13 @@
 
     def setUp(self):
         super(StreamDependencyTests, self).setUp()
-        self.st_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
-        self.st_env._execution_mode = "loopback"
+        origin_execution_mode = os.environ['_python_worker_execution_mode']
+        os.environ['_python_worker_execution_mode'] = "loopback"
+        try:
+            self.st_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
+        finally:
+            if origin_execution_mode is not None:
+                os.environ['_python_worker_execution_mode'] = origin_execution_mode
 
     def test_set_requirements_without_cached_directory(self):
         requirements_txt_path = os.path.join(self.tempdir, str(uuid.uuid4()))
diff --git a/flink-python/pyflink/testing/test_case_utils.py b/flink-python/pyflink/testing/test_case_utils.py
index 0b36750..3849d0c 100644
--- a/flink-python/pyflink/testing/test_case_utils.py
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -83,6 +83,7 @@
         cls.tempdir = tempfile.mkdtemp()
 
         os.environ["FLINK_TESTING"] = "1"
+        os.environ['_python_worker_execution_mode'] = "process"
         _find_flink_home()
 
         logging.info("Using %s as FLINK_HOME...", os.environ["FLINK_HOME"])
@@ -90,6 +91,7 @@
     @classmethod
     def tearDownClass(cls):
         shutil.rmtree(cls.tempdir, ignore_errors=True)
+        del os.environ['_python_worker_execution_mode']
 
     @classmethod
     def assert_equals(cls, actual, expected):
@@ -136,7 +138,6 @@
         self.t_env.get_config().get_configuration().set_string("parallelism.default", "2")
         self.t_env.get_config().get_configuration().set_string(
             "python.fn-execution.bundle.size", "1")
-        self.t_env._execution_mode = "process"
 
 
 class PyFlinkBatchTableTestCase(PyFlinkTestCase):
@@ -150,7 +151,6 @@
         self.t_env.get_config().get_configuration().set_string("parallelism.default", "2")
         self.t_env.get_config().get_configuration().set_string(
             "python.fn-execution.bundle.size", "1")
-        self.t_env._execution_mode = "process"
 
 
 class PyFlinkStreamingTestCase(PyFlinkTestCase):
@@ -163,7 +163,6 @@
         self.env = StreamExecutionEnvironment.get_execution_environment()
         self.env.set_parallelism(2)
         self.env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
-        self.env._execution_mode = "process"
 
 
 class PyFlinkBatchTestCase(PyFlinkTestCase):
@@ -176,7 +175,6 @@
         self.env = StreamExecutionEnvironment.get_execution_environment()
         self.env.set_parallelism(2)
         self.env.set_runtime_mode(RuntimeExecutionMode.BATCH)
-        self.env._execution_mode = "process"
 
 
 class PythonAPICompletenessTestCase(object):