Revert "Populate top-level display data in yaml main. (#28512)"

This reverts commit 2bbb3485c78ede3c4acddd462158814157f2b46f.
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 14177cd..042b483 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -88,7 +88,6 @@
 from apache_beam.transforms import ParDo
 from apache_beam.transforms import ptransform
 from apache_beam.transforms.display import DisplayData
-from apache_beam.transforms.display import HasDisplayData
 from apache_beam.transforms.resources import merge_resource_hints
 from apache_beam.transforms.resources import resource_hints_from_options
 from apache_beam.transforms.sideinputs import get_sideinput_index
@@ -109,7 +108,7 @@
 __all__ = ['Pipeline', 'PTransformOverride']
 
 
-class Pipeline(HasDisplayData):
+class Pipeline(object):
   """A pipeline object that manages a DAG of
   :class:`~apache_beam.pvalue.PValue` s and their
   :class:`~apache_beam.transforms.ptransform.PTransform` s.
@@ -134,12 +133,9 @@
         common_urns.primitives.IMPULSE.urn,
     ])
 
-  def __init__(
-      self,
-      runner: Optional[Union[str, PipelineRunner]] = None,
-      options: Optional[PipelineOptions] = None,
-      argv: Optional[List[str]] = None,
-      display_data: Optional[Dict[str, Any]] = None):
+  def __init__(self, runner=None, options=None, argv=None):
+    # type: (Optional[Union[str, PipelineRunner]], Optional[PipelineOptions], Optional[List[str]]) -> None
+
     """Initialize a pipeline object.
 
     Args:
@@ -155,8 +151,6 @@
         to be used for building a
         :class:`~apache_beam.options.pipeline_options.PipelineOptions` object.
         This will only be used if argument **options** is :data:`None`.
-      display_data (Dict[str: Any]): a dictionary of static data associated
-        with this pipeline that can be displayed when it runs.
 
     Raises:
       ValueError: if either the runner or options argument is not
@@ -239,11 +233,6 @@
     # Records whether this pipeline contains any external transforms.
     self.contains_external_transforms = False
 
-    self._display_data = display_data or {}
-
-  def display_data(self):
-    # type: () -> Dict[str, Any]
-    return self._display_data
 
   @property  # type: ignore[misc]  # decorated property not supported
   def options(self):
@@ -925,8 +914,7 @@
     proto = beam_runner_api_pb2.Pipeline(
         root_transform_ids=[root_transform_id],
         components=context.to_runner_api(),
-        requirements=context.requirements(),
-        display_data=DisplayData('', self._display_data).to_proto())
+        requirements=context.requirements())
     proto.components.transforms[root_transform_id].unique_name = (
         root_transform_id)
     self.merge_compatible_environments(proto)
@@ -982,11 +970,7 @@
     # type: (...) -> Pipeline
 
     """For internal use only; no backwards-compatibility guarantees."""
-    p = Pipeline(
-        runner=runner,
-        options=options,
-        display_data={str(ix): d
-                      for ix, d in enumerate(proto.display_data)})
+    p = Pipeline(runner=runner, options=options)
     from apache_beam.runners import pipeline_context
     context = pipeline_context.PipelineContext(
         proto.components, requirements=proto.requirements)
diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py
index 0d1dd55..b52a8fd 100644
--- a/sdks/python/apache_beam/transforms/display.py
+++ b/sdks/python/apache_beam/transforms/display.py
@@ -45,7 +45,6 @@
 from datetime import timedelta
 from typing import TYPE_CHECKING
 from typing import List
-from typing import Union
 
 from apache_beam.portability import common_urns
 from apache_beam.portability.api import beam_runner_api_pb2
@@ -102,8 +101,7 @@
   ):
     # type: (...) -> None
     self.namespace = namespace
-    self.items = [
-    ]  # type: List[Union[DisplayDataItem, beam_runner_api_pb2.DisplayData]]
+    self.items = []  # type: List[DisplayDataItem]
     self._populate_items(display_data_dict)
 
   def _populate_items(self, display_data_dict):
@@ -114,31 +112,26 @@
         subcomponent_display_data = DisplayData(
             element._get_display_data_namespace(), element.display_data())
         self.items += subcomponent_display_data.items
+        continue
 
-      elif isinstance(element, DisplayDataItem):
+      if isinstance(element, DisplayDataItem):
         if element.should_drop():
           continue
         element.key = key
         element.namespace = self.namespace
         self.items.append(element)
+        continue
 
-      elif isinstance(element, beam_runner_api_pb2.DisplayData):
-        self.items.append(element)
-
-      else:
-        # If it's not a HasDisplayData element,
-        # nor a dictionary, then it's a simple value
-        self.items.append(
-            DisplayDataItem(element, namespace=self.namespace, key=key))
+      # If it's not a HasDisplayData element,
+      # nor a dictionary, then it's a simple value
+      self.items.append(
+          DisplayDataItem(element, namespace=self.namespace, key=key))
 
   def to_proto(self):
     # type: (...) -> List[beam_runner_api_pb2.DisplayData]
 
     """Returns a List of Beam proto representation of Display data."""
     def create_payload(dd):
-      if isinstance(dd, beam_runner_api_pb2.DisplayData):
-        return dd
-
       display_data_dict = None
       try:
         display_data_dict = dd.get_dict()
diff --git a/sdks/python/apache_beam/yaml/main.py b/sdks/python/apache_beam/yaml/main.py
index e2ec8df..eb0695f 100644
--- a/sdks/python/apache_beam/yaml/main.py
+++ b/sdks/python/apache_beam/yaml/main.py
@@ -51,22 +51,19 @@
     raise ValueError(
         "Exactly one of pipeline_spec or pipeline_spec_file must be set.")
 
-  return pipeline_yaml
+  return yaml.load(pipeline_yaml, Loader=yaml_transform.SafeLineLoader)
 
 
 def run(argv=None):
   yaml_transform._LOGGER.setLevel('INFO')
   known_args, pipeline_args = _configure_parser(argv)
-  pipeline_yaml = _pipeline_spec_from_args(known_args)
-  pipeline_spec = yaml.load(pipeline_yaml, Loader=yaml_transform.SafeLineLoader)
+  pipeline_spec = _pipeline_spec_from_args(known_args)
 
-  with beam.Pipeline(  # linebreak for better yapf formatting
-      options=beam.options.pipeline_options.PipelineOptions(
-          pipeline_args,
-          pickle_library='cloudpickle',
-          **yaml_transform.SafeLineLoader.strip_metadata(pipeline_spec.get(
-              'options', {}))),
-      display_data={'yaml': pipeline_yaml}) as p:
+  with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+      pipeline_args,
+      pickle_library='cloudpickle',
+      **yaml_transform.SafeLineLoader.strip_metadata(pipeline_spec.get(
+          'options', {})))) as p:
     print("Building pipeline...")
     yaml_transform.expand_pipeline(p, pipeline_spec)
     print("Running pipeline...")