Merge pull request #14767 from [BEAM-7819] Add missing fields to python apache_beam.io.gcp.pubsub.PubsubMessage
[BEAM-7819] Add missing fields to python apache_beam.io.gcp.pubsub.PubsubMessage
diff --git a/CHANGES.md b/CHANGES.md
index af72444..90d0123 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -97,6 +97,7 @@
* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* Added capability to declare resource hints in Java and Python SDKs ([BEAM-2085](https://issues.apache.org/jira/browse/BEAM-2085)).
* Added Spanner IO Performance tests for read and write. (Python) ([BEAM-10029](https://issues.apache.org/jira/browse/BEAM-10029)).
+* Added support for accessing GCP PubSub Message ordering keys, message IDs and message publish timestamp (Python) ([BEAM-7819](https://issues.apache.org/jira/browse/BEAM-7819)).
## Breaking Changes
diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py
index 4e2811e..39312bf 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub.py
@@ -69,13 +69,29 @@
attributes: (dict) Key-value map of str to str, containing both user-defined
and service generated attributes (such as id_label and
timestamp_attribute). May be None.
+ message_id: (str) ID of the message, assigned by the pubsub service when the
+ message is published. Guaranteed to be unique within the topic. Will be
+ reset to None if the message is being written to pubsub.
+ publish_time: (datetime) Time at which the message was published. Will be
+ reset to None if the Message is being written to pubsub.
+ ordering_key: (str) If non-empty, identifies related messages for which
+ publish order is respected by the PubSub subscription.
"""
- def __init__(self, data, attributes):
+ def __init__(
+ self,
+ data,
+ attributes,
+ message_id=None,
+ publish_time=None,
+ ordering_key=""):
if data is None and not attributes:
raise ValueError(
'Either data (%r) or attributes (%r) must be set.', data, attributes)
self.data = data
self.attributes = attributes
+ self.message_id = message_id
+ self.publish_time = publish_time
+ self.ordering_key = ordering_key
def __hash__(self):
return hash((self.data, frozenset(self.attributes.items())))
@@ -104,13 +120,21 @@
msg.ParseFromString(proto_msg)
# Convert ScalarMapContainer to dict.
attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
- return PubsubMessage(msg.data, attributes)
+ return PubsubMessage(
+ msg.data,
+ attributes,
+ msg.message_id,
+ msg.publish_time.ToDatetime(),
+ msg.ordering_key)
- def _to_proto_str(self):
+ def _to_proto_str(self, for_publish=False):
"""Get serialized form of ``PubsubMessage``.
Args:
proto_msg: str containing a serialized protobuf.
+ for_publish: bool, if True strip out message fields which cannot be
+ published (currently message_id and publish_time) per
+ https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#pubsubmessage
Returns:
A str containing a serialized protobuf of type
@@ -121,6 +145,11 @@
msg.data = self.data
for key, value in self.attributes.items():
msg.attributes[key] = value
+ if self.message_id and not for_publish:
+ msg.message_id = self.message_id
+ if self.publish_time and not for_publish:
+ msg.publish_time = msg.publish_time.FromDatetime(self.publish_time)
+ msg.ordering_key = self.ordering_key
return msg.SerializeToString()
@staticmethod
@@ -133,7 +162,14 @@
"""
# Convert ScalarMapContainer to dict.
attributes = dict((key, msg.attributes[key]) for key in msg.attributes)
- return PubsubMessage(msg.data, attributes)
+ pubsubmessage = PubsubMessage(msg.data, attributes)
+ if msg.message_id:
+ pubsubmessage.message_id = msg.message_id
+ if msg.publish_time:
+ pubsubmessage.publish_time = msg.publish_time
+ if msg.ordering_key:
+ pubsubmessage.ordering_key = msg.ordering_key
+ return pubsubmessage
class ReadFromPubSub(PTransform):
@@ -294,7 +330,7 @@
raise TypeError(
'Unexpected element. Type: %s (expected: PubsubMessage), '
'value: %r' % (type(element), element))
- return element._to_proto_str()
+ return element._to_proto_str(for_publish=True)
@staticmethod
def bytes_to_proto_str(element):