[BEAM-9122] Add uses_keyed_state step property in python dataflow run… (#10596)
* [BEAM-9122] Add uses_keyed_state step property in python dataflow runner run_ParDo
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 69b1fb8..ada700c 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -52,6 +52,7 @@
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.portability import common_urns
from apache_beam.pvalue import AsSideInput
+from apache_beam.runners.common import DoFnSignature
from apache_beam.runners.dataflow.internal import names
from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
from apache_beam.runners.dataflow.internal.names import PropertyNames
@@ -952,6 +953,10 @@
step.add_property(PropertyNames.RESTRICTION_ENCODING,
self._get_cloud_encoding(restriction_coder))
+ if options.view_as(StandardOptions).streaming and DoFnSignature(
+ transform.dofn).is_stateful_dofn():
+ step.add_property(PropertyNames.USES_KEYED_STATE, "true")
+
@staticmethod
def _pardo_fn_data(transform_node, get_label):
transform = transform_node.transform
@@ -1129,7 +1134,6 @@
coders.registry.get_coder(transform_node.outputs[None].element_type),
coders.coders.GlobalWindowCoder())
- from apache_beam.runners.dataflow.internal import apiclient
step.encoding = self._get_cloud_encoding(coder)
step.add_property(
PropertyNames.OUTPUT_INFO,
@@ -1215,7 +1219,6 @@
# correct coder.
coder = coders.WindowedValueCoder(transform.sink.coder,
coders.coders.GlobalWindowCoder())
- from apache_beam.runners.dataflow.internal import apiclient
step.encoding = self._get_cloud_encoding(coder)
step.add_property(PropertyNames.ENCODING, step.encoding)
step.add_property(
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index 7bc0295..e9b34d4 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -123,6 +123,7 @@
USE_INDEXED_FORMAT = 'use_indexed_format'
USER_FN = 'user_fn'
USER_NAME = 'user_name'
+ USES_KEYED_STATE = 'uses_keyed_state'
VALIDATE_SINK = 'validate_sink'
VALIDATE_SOURCE = 'validate_source'
VALUE = 'value'