Enable dataflow streaming engine when running runner_v2 and streaming.
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index dca3a39..ca908a6 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -529,14 +529,20 @@
if google_cloud_options.enable_streaming_engine:
debug_options.add_experiment("enable_windmill_service")
debug_options.add_experiment("enable_streaming_engine")
+ elif (apiclient._use_fnapi(options) and
+ apiclient._use_unified_worker(options) and
+ options.view_as(StandardOptions).streaming):
+ debug_options.add_experiment("enable_windmill_service")
+ debug_options.add_experiment("enable_streaming_engine")
else:
if (debug_options.lookup_experiment("enable_windmill_service") or
debug_options.lookup_experiment("enable_streaming_engine")):
raise ValueError(
"""Streaming engine both disabled and enabled:
- enable_streaming_engine flag is not set, but enable_windmill_service
+ --enable_streaming_engine flag is not set, but
+ enable_windmill_service
and/or enable_streaming_engine experiments are present.
- It is recommended you only set the enable_streaming_engine flag.""")
+ It is recommended you only set the --enable_streaming_engine flag.""")
dataflow_worker_jar = getattr(worker_options, 'dataflow_worker_jar', None)
if dataflow_worker_jar is not None: