[C++] Fix segfault when get topic name from received message id (#10006)
* Fix segfault when get topic name from received message id
* Add tests for non-batched message
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 5bd54ed..47250c8 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -427,6 +427,7 @@
// This is a cheap copy since message contains only one shared pointer (impl_)
Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i);
msg.impl_->setRedeliveryCount(redeliveryCount);
+ msg.impl_->setTopicName(batchedMessage.getTopicName());
if (startMessageId_.is_present()) {
const MessageId& msgId = msg.getMessageId();
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc b/pulsar-client-cpp/tests/ConsumerTest.cc
index e0d4091..05147f8 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -413,4 +413,57 @@
client.close();
}
+TEST(ConsumerTest, testGetTopicNameFromReceivedMessage) {
+ // topic1 and topic2 are non-partitioned topics, topic3 is a partitioned topic
+ const std::string topic1 = "testGetTopicNameFromReceivedMessage1-" + std::to_string(time(nullptr));
+ const std::string topic2 = "testGetTopicNameFromReceivedMessage2-" + std::to_string(time(nullptr));
+ const std::string topic3 = "testGetTopicNameFromReceivedMessage3-" + std::to_string(time(nullptr));
+ int res = makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + topic3 + "/partitions", "3");
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+ Client client(lookupUrl);
+
+ auto sendMessage = [&client](const std::string& topic, bool enabledBatching) {
+ const auto producerConf = ProducerConfiguration().setBatchingEnabled(enabledBatching);
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer));
+ ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("hello").build()));
+ LOG_INFO("Send 'hello' to " << topic);
+ };
+ auto validateTopicName = [](Consumer& consumer, const std::string& topic) {
+ Message msg;
+ ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
+
+ const auto fullTopic = "persistent://public/default/" + topic;
+ ASSERT_EQ(msg.getTopicName(), fullTopic);
+ ASSERT_EQ(msg.getMessageId().getTopicName(), fullTopic);
+ };
+
+ // 1. ConsumerImpl
+ Consumer consumer1;
+ ASSERT_EQ(ResultOk, client.subscribe(topic1, "sub-1", consumer1));
+
+ // 2. MultiTopicsConsumerImpl
+ Consumer consumer2;
+ ASSERT_EQ(ResultOk, client.subscribe({topic1, topic2}, "sub-2", consumer2));
+
+ sendMessage(topic1, true);
+ validateTopicName(consumer1, topic1);
+ validateTopicName(consumer2, topic1);
+ sendMessage(topic1, false);
+ validateTopicName(consumer1, topic1);
+ validateTopicName(consumer2, topic1);
+
+ // 3. PartitionedConsumerImpl
+ Consumer consumer3;
+ ASSERT_EQ(ResultOk, client.subscribe(topic3, "sub-3", consumer3));
+ const auto partition = topic3 + "-partition-0";
+ sendMessage(partition, true);
+ validateTopicName(consumer3, partition);
+ sendMessage(partition, false);
+ validateTopicName(consumer3, partition);
+
+ client.close();
+}
+
} // namespace pulsar