Merge pull request #14767 from [BEAM-7819] Add missing fields to python apache_beam.io.gcp.pubsub.PubsubMessage 

[BEAM-7819] Add missing fields to python apache_beam.io.gcp.pubsub.PubsubMessage
diff --git a/CHANGES.md b/CHANGES.md
index af72444..90d0123 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -97,6 +97,7 @@
 * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 * Added capability to declare resource hints in Java and Python SDKs ([BEAM-2085](https://issues.apache.org/jira/browse/BEAM-2085)).
 * Added Spanner IO Performance tests for read and write. (Python) ([BEAM-10029](https://issues.apache.org/jira/browse/BEAM-10029)).
+* Added support for accessing GCP PubSub Message ordering keys, message IDs and message publish timestamp (Python) ([BEAM-7819](https://issues.apache.org/jira/browse/BEAM-7819)).
 
 ## Breaking Changes
 
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index 4e2811e..39312bf 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -69,13 +69,29 @@
     attributes: (dict) Key-value map of str to str, containing both user-defined
       and service generated attributes (such as id_label and
       timestamp_attribute). May be None.
+    message_id: (str) ID of the message, assigned by the pubsub service when the
+      message is published. Guaranteed to be unique within the topic. Will be
+      reset to None if the message is being written to pubsub.
+    publish_time: (datetime) Time at which the message was published. Will be
+      reset to None if the Message is being written to pubsub.
+    ordering_key: (str) If non-empty, identifies related messages for which
+      publish order is respected by the PubSub subscription.
   """
-  def __init__(self, data, attributes):
+  def __init__(
+      self,
+      data,
+      attributes,
+      message_id=None,
+      publish_time=None,
+      ordering_key=""):
     if data is None and not attributes:
       raise ValueError(
           'Either data (%r) or attributes (%r) must be set.', data, attributes)
     self.data = data
     self.attributes = attributes
+    self.message_id = message_id
+    self.publish_time = publish_time
+    self.ordering_key = ordering_key
 
   def __hash__(self):
     return hash((self.data, frozenset(self.attributes.items())))
@@ -104,13 +120,21 @@
     msg.ParseFromString(proto_msg)
     # Convert ScalarMapContainer to dict.
     attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
-    return PubsubMessage(msg.data, attributes)
+    return PubsubMessage(
+        msg.data,
+        attributes,
+        msg.message_id,
+        msg.publish_time.ToDatetime(),
+        msg.ordering_key)
 
-  def _to_proto_str(self):
+  def _to_proto_str(self, for_publish=False):
     """Get serialized form of ``PubsubMessage``.
 
     Args:
       proto_msg: str containing a serialized protobuf.
+      for_publish: bool, if True strip out message fields which cannot be
+        published (currently message_id and publish_time) per
+        https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#pubsubmessage
 
     Returns:
       A str containing a serialized protobuf of type
@@ -121,6 +145,11 @@
     msg.data = self.data
     for key, value in self.attributes.items():
       msg.attributes[key] = value
+    if self.message_id and not for_publish:
+      msg.message_id = self.message_id
+    if self.publish_time and not for_publish:
+      msg.publish_time = msg.publish_time.FromDatetime(self.publish_time)
+    msg.ordering_key = self.ordering_key
     return msg.SerializeToString()
 
   @staticmethod
@@ -133,7 +162,14 @@
     """
     # Convert ScalarMapContainer to dict.
     attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
-    return PubsubMessage(msg.data, attributes)
+    pubsubmessage = PubsubMessage(msg.data, attributes)
+    if msg.message_id:
+      pubsubmessage.message_id = msg.message_id
+    if msg.publish_time:
+      pubsubmessage.publish_time = msg.publish_time
+    if msg.ordering_key:
+      pubsubmessage.ordering_key = msg.ordering_key
+    return pubsubmessage
 
 
 class ReadFromPubSub(PTransform):
@@ -294,7 +330,7 @@
       raise TypeError(
           'Unexpected element. Type: %s (expected: PubsubMessage), '
           'value: %r' % (type(element), element))
-    return element._to_proto_str()
+    return element._to_proto_str(for_publish=True)
 
   @staticmethod
   def bytes_to_proto_str(element):