[C++] Fix segfault when get topic name from received message id (#10006)

* Fix segfault when get topic name from received message id

* Add tests for non-batched message
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 5bd54ed..47250c8 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -427,6 +427,7 @@
         // This is a cheap copy since message contains only one shared pointer (impl_)
         Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i);
         msg.impl_->setRedeliveryCount(redeliveryCount);
+        msg.impl_->setTopicName(batchedMessage.getTopicName());
 
         if (startMessageId_.is_present()) {
             const MessageId& msgId = msg.getMessageId();
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc b/pulsar-client-cpp/tests/ConsumerTest.cc
index e0d4091..05147f8 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -413,4 +413,57 @@
     client.close();
 }
 
+TEST(ConsumerTest, testGetTopicNameFromReceivedMessage) {
+    // topic1 and topic2 are non-partitioned topics, topic3 is a partitioned topic
+    const std::string topic1 = "testGetTopicNameFromReceivedMessage1-" + std::to_string(time(nullptr));
+    const std::string topic2 = "testGetTopicNameFromReceivedMessage2-" + std::to_string(time(nullptr));
+    const std::string topic3 = "testGetTopicNameFromReceivedMessage3-" + std::to_string(time(nullptr));
+    int res = makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + topic3 + "/partitions", "3");
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    Client client(lookupUrl);
+
+    auto sendMessage = [&client](const std::string& topic, bool enabledBatching) {
+        const auto producerConf = ProducerConfiguration().setBatchingEnabled(enabledBatching);
+        Producer producer;
+        ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer));
+        ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("hello").build()));
+        LOG_INFO("Send 'hello' to " << topic);
+    };
+    auto validateTopicName = [](Consumer& consumer, const std::string& topic) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
+
+        const auto fullTopic = "persistent://public/default/" + topic;
+        ASSERT_EQ(msg.getTopicName(), fullTopic);
+        ASSERT_EQ(msg.getMessageId().getTopicName(), fullTopic);
+    };
+
+    // 1. ConsumerImpl
+    Consumer consumer1;
+    ASSERT_EQ(ResultOk, client.subscribe(topic1, "sub-1", consumer1));
+
+    // 2. MultiTopicsConsumerImpl
+    Consumer consumer2;
+    ASSERT_EQ(ResultOk, client.subscribe({topic1, topic2}, "sub-2", consumer2));
+
+    sendMessage(topic1, true);
+    validateTopicName(consumer1, topic1);
+    validateTopicName(consumer2, topic1);
+    sendMessage(topic1, false);
+    validateTopicName(consumer1, topic1);
+    validateTopicName(consumer2, topic1);
+
+    // 3. PartitionedConsumerImpl
+    Consumer consumer3;
+    ASSERT_EQ(ResultOk, client.subscribe(topic3, "sub-3", consumer3));
+    const auto partition = topic3 + "-partition-0";
+    sendMessage(partition, true);
+    validateTopicName(consumer3, partition);
+    sendMessage(partition, false);
+    validateTopicName(consumer3, partition);
+
+    client.close();
+}
+
 }  // namespace pulsar