Support deserializing a message id from bytes and topic (#259)
diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index de8787f..8802493 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -131,12 +131,30 @@
return self._msg_id > other._msg_id
@staticmethod
- def deserialize(message_id_bytes):
+ def deserialize(message_id_bytes, topic: Optional[str] = None) -> _pulsar.MessageId:
"""
Deserialize a message id object from a previously
serialized bytes sequence.
+
+ Parameters
+ ----------
+ topic: str, optional
+ For multi-topics consumers, the topic name is required to deserialize the message id.
+
+ .. code-block:: python
+
+ msg = consumer.receive()
+ topic = msg.topic_name()
+ msg_id_bytes = msg.message_id().serialize()
+ # Store topic and msg_id_bytes somewhere
+ # Later, deserialize the message id
+ msg_id = MessageId.deserialize(msg_id_bytes, topic=topic)
+
"""
- return _pulsar.MessageId.deserialize(message_id_bytes)
+ msg_id = _pulsar.MessageId.deserialize(message_id_bytes)
+ if topic is not None:
+ msg_id.topic_name(topic)
+ return msg_id
@classmethod
def wrap(cls, msg_id: _pulsar.MessageId):
diff --git a/src/message.cc b/src/message.cc
index dec6f05..dd263b6 100644
--- a/src/message.cc
+++ b/src/message.cc
@@ -72,6 +72,10 @@
.def("entry_id", &MessageId::entryId)
.def("batch_index", &MessageId::batchIndex)
.def("partition", &MessageId::partition)
+ .def(
+ "topic_name",
+ [](MessageId& msgId, const std::string& topicName) { msgId.setTopicName(topicName); },
+ return_value_policy::copy)
.def_property_readonly_static("earliest", [](object) { return MessageId::earliest(); })
.def_property_readonly_static("latest", [](object) { return MessageId::latest(); })
.def("serialize",
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index 6c0c3f2..4e1c5fb 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -1994,6 +1994,30 @@
self.assertEqual(consumer.consumer_name(), name)
client.close()
+ def test_deserialize_msg_id_with_topic(self):
+ client = Client(self.serviceUrl)
+ topic1 = "deserialize-msg-id-with-topic1-" + str(time.time())
+ topic2 = "deserialize-msg-id-with-topic2-" + str(time.time())
+ consumer = client.subscribe([topic1, topic2], 'sub')
+ producer1 = client.create_producer(topic1)
+ producer2 = client.create_producer(topic2)
+ producer1.send(b"msg-1")
+ producer2.send(b"msg-2")
+
+ serialized_msg_ids = dict()
+ for _ in range(2):
+ msg = consumer.receive(TM)
+ serialized_msg_ids[msg.topic_name()] = msg.message_id().serialize()
+ for topic, serialized_msg_id in serialized_msg_ids.items():
+ deserialized_msg_id = MessageId.deserialize(serialized_msg_id, topic=topic)
+ consumer.acknowledge_cumulative(deserialized_msg_id)
+ consumer.close()
+
+ consumer = client.subscribe([topic1, topic2], 'sub')
+ producer1.send(b'msg-3')
+ msg = consumer.receive(TM)
+ self.assertEqual(msg.value(), b'msg-3')
+ client.close()
if __name__ == "__main__":
main()