Revert "Populate top-level display data in yaml main. (#28512)"
This reverts commit 2bbb3485c78ede3c4acddd462158814157f2b46f.
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 14177cd..042b483 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -88,7 +88,6 @@
from apache_beam.transforms import ParDo
from apache_beam.transforms import ptransform
from apache_beam.transforms.display import DisplayData
-from apache_beam.transforms.display import HasDisplayData
from apache_beam.transforms.resources import merge_resource_hints
from apache_beam.transforms.resources import resource_hints_from_options
from apache_beam.transforms.sideinputs import get_sideinput_index
@@ -109,7 +108,7 @@
__all__ = ['Pipeline', 'PTransformOverride']
-class Pipeline(HasDisplayData):
+class Pipeline(object):
"""A pipeline object that manages a DAG of
:class:`~apache_beam.pvalue.PValue` s and their
:class:`~apache_beam.transforms.ptransform.PTransform` s.
@@ -134,12 +133,9 @@
common_urns.primitives.IMPULSE.urn,
])
- def __init__(
- self,
- runner: Optional[Union[str, PipelineRunner]] = None,
- options: Optional[PipelineOptions] = None,
- argv: Optional[List[str]] = None,
- display_data: Optional[Dict[str, Any]] = None):
+ def __init__(self, runner=None, options=None, argv=None):
+ # type: (Optional[Union[str, PipelineRunner]], Optional[PipelineOptions], Optional[List[str]]) -> None
+
"""Initialize a pipeline object.
Args:
@@ -155,8 +151,6 @@
to be used for building a
:class:`~apache_beam.options.pipeline_options.PipelineOptions` object.
This will only be used if argument **options** is :data:`None`.
- display_data (Dict[str: Any]): a dictionary of static data associated
- with this pipeline that can be displayed when it runs.
Raises:
ValueError: if either the runner or options argument is not
@@ -239,11 +233,6 @@
# Records whether this pipeline contains any external transforms.
self.contains_external_transforms = False
- self._display_data = display_data or {}
-
- def display_data(self):
- # type: () -> Dict[str, Any]
- return self._display_data
@property # type: ignore[misc] # decorated property not supported
def options(self):
@@ -925,8 +914,7 @@
proto = beam_runner_api_pb2.Pipeline(
root_transform_ids=[root_transform_id],
components=context.to_runner_api(),
- requirements=context.requirements(),
- display_data=DisplayData('', self._display_data).to_proto())
+ requirements=context.requirements())
proto.components.transforms[root_transform_id].unique_name = (
root_transform_id)
self.merge_compatible_environments(proto)
@@ -982,11 +970,7 @@
# type: (...) -> Pipeline
"""For internal use only; no backwards-compatibility guarantees."""
- p = Pipeline(
- runner=runner,
- options=options,
- display_data={str(ix): d
- for ix, d in enumerate(proto.display_data)})
+ p = Pipeline(runner=runner, options=options)
from apache_beam.runners import pipeline_context
context = pipeline_context.PipelineContext(
proto.components, requirements=proto.requirements)
diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py
index 0d1dd55..b52a8fd 100644
--- a/sdks/python/apache_beam/transforms/display.py
+++ b/sdks/python/apache_beam/transforms/display.py
@@ -45,7 +45,6 @@
from datetime import timedelta
from typing import TYPE_CHECKING
from typing import List
-from typing import Union
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_runner_api_pb2
@@ -102,8 +101,7 @@
):
# type: (...) -> None
self.namespace = namespace
- self.items = [
- ] # type: List[Union[DisplayDataItem, beam_runner_api_pb2.DisplayData]]
+ self.items = [] # type: List[DisplayDataItem]
self._populate_items(display_data_dict)
def _populate_items(self, display_data_dict):
@@ -114,31 +112,26 @@
subcomponent_display_data = DisplayData(
element._get_display_data_namespace(), element.display_data())
self.items += subcomponent_display_data.items
+ continue
- elif isinstance(element, DisplayDataItem):
+ if isinstance(element, DisplayDataItem):
if element.should_drop():
continue
element.key = key
element.namespace = self.namespace
self.items.append(element)
+ continue
- elif isinstance(element, beam_runner_api_pb2.DisplayData):
- self.items.append(element)
-
- else:
- # If it's not a HasDisplayData element,
- # nor a dictionary, then it's a simple value
- self.items.append(
- DisplayDataItem(element, namespace=self.namespace, key=key))
+ # If it's not a HasDisplayData element,
+ # nor a dictionary, then it's a simple value
+ self.items.append(
+ DisplayDataItem(element, namespace=self.namespace, key=key))
def to_proto(self):
# type: (...) -> List[beam_runner_api_pb2.DisplayData]
"""Returns a List of Beam proto representation of Display data."""
def create_payload(dd):
- if isinstance(dd, beam_runner_api_pb2.DisplayData):
- return dd
-
display_data_dict = None
try:
display_data_dict = dd.get_dict()
diff --git a/sdks/python/apache_beam/yaml/main.py b/sdks/python/apache_beam/yaml/main.py
index e2ec8df..eb0695f 100644
--- a/sdks/python/apache_beam/yaml/main.py
+++ b/sdks/python/apache_beam/yaml/main.py
@@ -51,22 +51,19 @@
raise ValueError(
"Exactly one of pipeline_spec or pipeline_spec_file must be set.")
- return pipeline_yaml
+ return yaml.load(pipeline_yaml, Loader=yaml_transform.SafeLineLoader)
def run(argv=None):
yaml_transform._LOGGER.setLevel('INFO')
known_args, pipeline_args = _configure_parser(argv)
- pipeline_yaml = _pipeline_spec_from_args(known_args)
- pipeline_spec = yaml.load(pipeline_yaml, Loader=yaml_transform.SafeLineLoader)
+ pipeline_spec = _pipeline_spec_from_args(known_args)
- with beam.Pipeline( # linebreak for better yapf formatting
- options=beam.options.pipeline_options.PipelineOptions(
- pipeline_args,
- pickle_library='cloudpickle',
- **yaml_transform.SafeLineLoader.strip_metadata(pipeline_spec.get(
- 'options', {}))),
- display_data={'yaml': pipeline_yaml}) as p:
+ with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+ pipeline_args,
+ pickle_library='cloudpickle',
+ **yaml_transform.SafeLineLoader.strip_metadata(pipeline_spec.get(
+ 'options', {})))) as p:
print("Building pipeline...")
yaml_transform.expand_pipeline(p, pipeline_spec)
print("Running pipeline...")