[FLINK-24199][python] Expose StreamExecutionEnvironment#configure in Python API
This closes #17206.
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py
index e3b85f4..1983945 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -22,7 +22,7 @@
from py4j.java_gateway import JavaObject
-from pyflink.common import WatermarkStrategy
+from pyflink.common import Configuration, WatermarkStrategy
from pyflink.common.execution_config import ExecutionConfig
from pyflink.common.job_client import JobClient
from pyflink.common.job_execution_result import JobExecutionResult
@@ -462,6 +462,25 @@
j_characteristic = self._j_stream_execution_environment.getStreamTimeCharacteristic()
return TimeCharacteristic._from_j_time_characteristic(j_characteristic)
+ def configure(self, configuration: Configuration):
+ """
+ Sets all relevant options contained in the :class:`~pyflink.common.Configuration`. such as
+ e.g. `pipeline.time-characteristic`. It will reconfigure
+ :class:`~pyflink.datastream.StreamExecutionEnvironment`,
+ :class:`~pyflink.common.ExecutionConfig` and :class:`~pyflink.datastream.CheckpointConfig`.
+
+ It will change the value of a setting only if a corresponding option was set in the
+ `configuration`. If a key is not present, the current value of a field will remain
+ untouched.
+
+ :param configuration: a configuration to read the values from.
+
+ .. versionadded:: 1.15.0
+ """
+ self._j_stream_execution_environment.configure(configuration._j_configuration,
+ get_gateway().jvm.Thread.currentThread()
+ .getContextClassLoader())
+
def add_python_file(self, file_path: str):
"""
Adds a python dependency which could be python files, python packages or
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index 7bccda8..cb7189e 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -26,7 +26,7 @@
import unittest
import uuid
-from pyflink.common import ExecutionConfig, RestartStrategies
+from pyflink.common import Configuration, ExecutionConfig, RestartStrategies
from pyflink.common.serialization import JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import (StreamExecutionEnvironment, CheckpointConfig,
@@ -195,6 +195,21 @@
self.assertEqual(time_characteristic, TimeCharacteristic.ProcessingTime)
+ def test_configure(self):
+ configuration = Configuration()
+ configuration.set_string('pipeline.operator-chaining', 'false')
+ configuration.set_string('pipeline.time-characteristic', 'IngestionTime')
+ configuration.set_string('execution.buffer-timeout', '1 min')
+ configuration.set_string('execution.checkpointing.timeout', '12000')
+ configuration.set_string('state.backend', 'jobmanager')
+ self.env.configure(configuration)
+ self.assertEqual(self.env.is_chaining_enabled(), False)
+ self.assertEqual(self.env.get_stream_time_characteristic(),
+ TimeCharacteristic.IngestionTime)
+ self.assertEqual(self.env.get_buffer_timeout(), 60000)
+ self.assertEqual(self.env.get_checkpoint_config().get_checkpoint_timeout(), 12000)
+ self.assertTrue(isinstance(self.env.get_state_backend(), MemoryStateBackend))
+
@unittest.skip("Python API does not support DataStream now. refactor this test later")
def test_get_execution_plan(self):
tmp_dir = tempfile.gettempdir()
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
index d080ed9..755a2ea 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
@@ -47,7 +47,7 @@
'readFileStream', 'isForceCheckpointing', 'readFile', 'clean',
'createInput', 'createLocalEnvironmentWithWebUI', 'fromCollection',
'socketTextStream', 'initializeContextEnvironment', 'readTextFile',
- 'setNumberOfExecutionRetries', 'configure', 'executeAsync', 'registerJobListener',
+ 'setNumberOfExecutionRetries', 'executeAsync', 'registerJobListener',
'clearJobListeners', 'getJobListeners', 'fromSequence', 'getConfiguration',
'generateStreamGraph'}