[FLINK-24137][python] Fix the issue that tests planned to run in process mode was actually executed in loopback mode

This closes #17214.
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py
index 1983945..4f6a4ee 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -58,7 +58,6 @@
 
     def __init__(self, j_stream_execution_environment, serializer=PickleSerializer()):
         self._j_stream_execution_environment = j_stream_execution_environment
-        self._remote_mode = False
         self.serializer = serializer
 
     def get_config(self) -> ExecutionConfig:
@@ -891,14 +890,15 @@
         JPythonConfigUtil = gateway.jvm.org.apache.flink.python.util.PythonConfigUtil
         # start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster
         j_configuration = get_j_env_configuration(self._j_stream_execution_environment)
-        if not self._remote_mode and is_local_deployment(j_configuration):
+
+        def startup_loopback_server():
             jvm = gateway.jvm
             env_config = JPythonConfigUtil.getEnvironmentConfig(
                 self._j_stream_execution_environment)
             parallelism = self.get_parallelism()
             if parallelism > 1 and env_config.containsKey(jvm.PythonOptions.PYTHON_ARCHIVES.key()):
                 import logging
-                logging.warning("Lookback mode is disabled as python archives are used and the "
+                logging.warning("Loopback mode is disabled as python archives are used and the "
                                 "parallelism of the job is greater than 1. The Python user-defined "
                                 "functions will be executed in an independent Python process.")
             else:
@@ -908,6 +908,24 @@
                 get_field_value(j_env, "m").put(
                     'PYFLINK_LOOPBACK_SERVER_ADDRESS', BeamFnLoopbackWorkerPoolServicer().start())
 
+        python_worker_execution_mode = None
+        if hasattr(self, "_python_worker_execution_mode"):
+            python_worker_execution_mode = getattr(self, "_python_worker_execution_mode")
+
+        if python_worker_execution_mode is None:
+            if is_local_deployment(j_configuration):
+                startup_loopback_server()
+        elif python_worker_execution_mode == 'loopback':
+            if is_local_deployment(j_configuration):
+                startup_loopback_server()
+            else:
+                raise ValueError("Loopback mode is enabled, however the job wasn't configured to "
+                                 "run in local deployment mode")
+        elif python_worker_execution_mode != 'process':
+            raise ValueError(
+                "It only supports to execute the Python worker in 'loopback' mode and 'process' "
+                "mode, unknown mode '%s' is configured" % python_worker_execution_mode)
+
         JPythonConfigUtil.configPythonOperator(self._j_stream_execution_environment)
 
         gateway.jvm.org.apache.flink.python.chain.PythonOperatorChainingOptimizer.apply(
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index cb7189e..f3d95a3 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -50,13 +50,13 @@
 
     def setUp(self):
         self.env = self.create_new_env()
-        self.env._remote_mode = True
         self.test_sink = DataStreamTestSinkFunction()
 
     @staticmethod
-    def create_new_env():
+    def create_new_env(execution_mode='process'):
         env = StreamExecutionEnvironment.get_execution_environment()
         env.set_parallelism(2)
+        env._execution_mode = execution_mode
         return env
 
     def test_get_config(self):
@@ -358,7 +358,7 @@
 
     def test_add_python_file(self):
         import uuid
-        env = self.create_new_env()
+        env = self.create_new_env("loopback")
         python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
         os.mkdir(python_file_dir)
         python_file_path = os.path.join(python_file_dir, "test_dep1.py")
@@ -409,7 +409,7 @@
 
     def test_add_python_file_2(self):
         import uuid
-        env = self.create_new_env()
+        env = self.create_new_env("loopback")
         python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
         os.mkdir(python_file_dir)
         python_file_path = os.path.join(python_file_dir, "test_dep1.py")
@@ -477,7 +477,7 @@
     def test_set_requirements_with_cached_directory(self):
         import uuid
         tmp_dir = self.tempdir
-        env = self.create_new_env()
+        env = self.create_new_env("loopback")
         requirements_txt_path = os.path.join(tmp_dir, "requirements_txt_" + str(uuid.uuid4()))
         with open(requirements_txt_path, 'w') as f:
             f.write("python-package1==0.0.0")
@@ -523,7 +523,7 @@
         import uuid
         import shutil
         tmp_dir = self.tempdir
-        env = self.create_new_env()
+        env = self.create_new_env("loopback")
         archive_dir_path = os.path.join(tmp_dir, "archive_" + str(uuid.uuid4()))
         os.mkdir(archive_dir_path)
         with open(os.path.join(archive_dir_path, "data.txt"), 'w') as f:
@@ -550,7 +550,7 @@
         import sys
         python_exec = sys.executable
         tmp_dir = self.tempdir
-        env = self.create_new_env()
+        env = self.create_new_env("loopback")
         python_exec_link_path = os.path.join(tmp_dir, "py_exec")
         os.symlink(python_exec, python_exec_link_path)
         env.set_python_executable(python_exec_link_path)
@@ -614,12 +614,11 @@
 
     def test_generate_stream_graph_with_dependencies(self):
         python_file_dir = os.path.join(self.tempdir, "python_file_dir_" + str(uuid.uuid4()))
-        env = self.create_new_env()
-        env._remote_mode = True
         os.mkdir(python_file_dir)
         python_file_path = os.path.join(python_file_dir, "test_stream_dependency_manage_lib.py")
         with open(python_file_path, 'w') as f:
             f.write("def add_two(a):\n    return a + 2")
+        env = self.env
         env.add_python_file(python_file_path)
 
         def plus_two_map(value):
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 98a1f7b..9db27be 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -91,7 +91,6 @@
     def __init__(self, j_tenv, serializer=PickleSerializer()):
         self._j_tenv = j_tenv
         self._serializer = serializer
-        self._remote_mode = False
         # When running in MiniCluster, launch the Python UDF worker using the Python executable
         # specified by sys.executable if users have not specified it explicitly via configuration
         # python.executable.
@@ -1747,9 +1746,9 @@
         classpaths_key = jvm.org.apache.flink.configuration.PipelineOptions.CLASSPATHS.key()
         self._add_jars_to_j_env_config(jars_key)
         self._add_jars_to_j_env_config(classpaths_key)
+
         # start BeamFnLoopbackWorkerPoolServicer when executed in MiniCluster
-        if not self._remote_mode and \
-                is_local_deployment(get_j_env_configuration(self._get_j_env())):
+        def startup_loopback_server():
             from pyflink.common import Configuration
             _j_config = jvm.org.apache.flink.python.util.PythonConfigUtil.getMergedConfig(
                 self._get_j_env(), self.get_config()._j_table_config)
@@ -1769,6 +1768,24 @@
                 get_field_value(j_env, "m").put(
                     'PYFLINK_LOOPBACK_SERVER_ADDRESS', BeamFnLoopbackWorkerPoolServicer().start())
 
+        python_worker_execution_mode = None
+        if hasattr(self, "_python_worker_execution_mode"):
+            python_worker_execution_mode = getattr(self, "_python_worker_execution_mode")
+
+        if python_worker_execution_mode is None:
+            if is_local_deployment(get_j_env_configuration(self._get_j_env())):
+                startup_loopback_server()
+        elif python_worker_execution_mode == 'loopback':
+            if is_local_deployment(get_j_env_configuration(self._get_j_env())):
+                startup_loopback_server()
+            else:
+                raise ValueError("Loopback mode is enabled, however the job wasn't configured to "
+                                 "run in local deployment mode")
+        elif python_worker_execution_mode != 'process':
+            raise ValueError(
+                "It only supports to execute the Python worker in 'loopback' mode and 'process' "
+                "mode, unknown mode '%s' is configured" % python_worker_execution_mode)
+
     def _wrap_aggregate_function_if_needed(self, function) -> UserDefinedFunctionWrapper:
         if isinstance(function, AggregateFunction):
             function = udaf(function,
diff --git a/flink-python/pyflink/table/tests/test_dependency.py b/flink-python/pyflink/table/tests/test_dependency.py
index b61740e..27ebf6b 100644
--- a/flink-python/pyflink/table/tests/test_dependency.py
+++ b/flink-python/pyflink/table/tests/test_dependency.py
@@ -22,7 +22,7 @@
 import uuid
 
 from pyflink.pyflink_gateway_server import on_windows
-from pyflink.table import DataTypes
+from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
 from pyflink.table import expressions as expr
 from pyflink.table.udf import udf
 from pyflink.testing import source_sink_utils
@@ -74,26 +74,27 @@
 
     def setUp(self):
         super(StreamDependencyTests, self).setUp()
-        self.t_env._remote_mode = False
+        self.st_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
+        self.st_env._execution_mode = "loopback"
 
     def test_set_requirements_without_cached_directory(self):
         requirements_txt_path = os.path.join(self.tempdir, str(uuid.uuid4()))
         with open(requirements_txt_path, 'w') as f:
             f.write("cloudpickle==1.2.2")
-        self.t_env.set_python_requirements(requirements_txt_path)
+        self.st_env.set_python_requirements(requirements_txt_path)
 
         def check_requirements(i):
             import cloudpickle  # noqa # pylint: disable=unused-import
             assert '_PYTHON_REQUIREMENTS_INSTALL_DIR' in os.environ
             return i
 
-        self.t_env.create_temporary_system_function("check_requirements",
-                                                    udf(check_requirements, DataTypes.BIGINT(),
-                                                        DataTypes.BIGINT()))
+        self.st_env.create_temporary_system_function(
+            "check_requirements",
+            udf(check_requirements, DataTypes.BIGINT(), DataTypes.BIGINT()))
         table_sink = source_sink_utils.TestAppendSink(
             ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
-        self.t_env.register_table_sink("Results", table_sink)
-        t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
+        self.st_env.register_table_sink("Results", table_sink)
+        t = self.st_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
         t.select(expr.call('check_requirements', t.a), t.a).execute_insert("Results").wait()
 
         actual = source_sink_utils.results()
@@ -127,19 +128,19 @@
                 "7or3/88i0H/tfBFW7s/s/avRInQH06ieEy7tDrQeYHUdRN7wP+n/vf62LOH/pld7f9xz7a5Pfufedy0oP"
                 "86iJI8KxStAq6yLC4JWdbbVbWRikR2z1ZGytk5vauW3QdnBFE6XqwmykazCesAAAAAAAAAAAAAAAAAAAA"
                 "AAAAAAAAAAAAAAOBw/AJw5CHBAFAAAA=="))
-        self.t_env.set_python_requirements(requirements_txt_path, requirements_dir_path)
+        self.st_env.set_python_requirements(requirements_txt_path, requirements_dir_path)
 
         def add_one(i):
             from python_package1 import plus
             return plus(i, 1)
 
-        self.t_env.create_temporary_system_function("add_one",
-                                                    udf(add_one, DataTypes.BIGINT(),
-                                                        DataTypes.BIGINT()))
+        self.st_env.create_temporary_system_function(
+            "add_one",
+            udf(add_one, DataTypes.BIGINT(), DataTypes.BIGINT()))
         table_sink = source_sink_utils.TestAppendSink(
             ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
-        self.t_env.register_table_sink("Results", table_sink)
-        t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
+        self.st_env.register_table_sink("Results", table_sink)
+        t = self.st_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
         t.select(expr.call('add_one', t.a), t.a).execute_insert("Results").wait()
 
         actual = source_sink_utils.results()
@@ -163,7 +164,6 @@
         self.t_env.create_temporary_system_function("add_from_file",
                                                     udf(add_from_file, DataTypes.BIGINT(),
                                                         DataTypes.BIGINT()))
-        self.t_env._remote_mode = True
         table_sink = source_sink_utils.TestAppendSink(
             ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
         self.t_env.register_table_sink("Results", table_sink)
@@ -179,31 +179,31 @@
         tmp_dir = self.tempdir
         python_exec_link_path = os.path.join(tmp_dir, "py_exec")
         os.symlink(python_exec, python_exec_link_path)
-        self.t_env.get_config().set_python_executable(python_exec_link_path)
+        self.st_env.get_config().set_python_executable(python_exec_link_path)
 
         def check_python_exec(i):
             import os
             assert os.environ["python"] == python_exec_link_path
             return i
 
-        self.t_env.create_temporary_system_function("check_python_exec",
-                                                    udf(check_python_exec, DataTypes.BIGINT(),
-                                                        DataTypes.BIGINT()))
+        self.st_env.create_temporary_system_function(
+            "check_python_exec",
+            udf(check_python_exec, DataTypes.BIGINT(), DataTypes.BIGINT()))
 
         def check_pyflink_gateway_disabled(i):
             from pyflink.java_gateway import get_gateway
             get_gateway()
             return i
 
-        self.t_env.create_temporary_system_function(
+        self.st_env.create_temporary_system_function(
             "check_pyflink_gateway_disabled",
             udf(check_pyflink_gateway_disabled, DataTypes.BIGINT(),
                 DataTypes.BIGINT()))
 
         table_sink = source_sink_utils.TestAppendSink(
             ['a', 'b'], [DataTypes.BIGINT(), DataTypes.BIGINT()])
-        self.t_env.register_table_sink("Results", table_sink)
-        t = self.t_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
+        self.st_env.register_table_sink("Results", table_sink)
+        t = self.st_env.from_elements([(1, 2), (2, 5), (3, 1)], ['a', 'b'])
         t.select(
             expr.call('check_python_exec', t.a),
             expr.call('check_pyflink_gateway_disabled', t.a)) \
diff --git a/flink-python/pyflink/testing/test_case_utils.py b/flink-python/pyflink/testing/test_case_utils.py
index 9f8ff80..0b36750 100644
--- a/flink-python/pyflink/testing/test_case_utils.py
+++ b/flink-python/pyflink/testing/test_case_utils.py
@@ -136,7 +136,7 @@
         self.t_env.get_config().get_configuration().set_string("parallelism.default", "2")
         self.t_env.get_config().get_configuration().set_string(
             "python.fn-execution.bundle.size", "1")
-        self.t_env._remote_mode = True
+        self.t_env._execution_mode = "process"
 
 
 class PyFlinkBatchTableTestCase(PyFlinkTestCase):
@@ -150,7 +150,7 @@
         self.t_env.get_config().get_configuration().set_string("parallelism.default", "2")
         self.t_env.get_config().get_configuration().set_string(
             "python.fn-execution.bundle.size", "1")
-        self.t_env._remote_mode = True
+        self.t_env._execution_mode = "process"
 
 
 class PyFlinkStreamingTestCase(PyFlinkTestCase):
@@ -163,7 +163,7 @@
         self.env = StreamExecutionEnvironment.get_execution_environment()
         self.env.set_parallelism(2)
         self.env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
-        self.env._remote_mode = True
+        self.env._execution_mode = "process"
 
 
 class PyFlinkBatchTestCase(PyFlinkTestCase):
@@ -176,7 +176,7 @@
         self.env = StreamExecutionEnvironment.get_execution_environment()
         self.env.set_parallelism(2)
         self.env.set_runtime_mode(RuntimeExecutionMode.BATCH)
-        self.env._remote_mode = True
+        self.env._execution_mode = "process"
 
 
 class PythonAPICompletenessTestCase(object):