Fix acknowledgeCumulative never returns when accepting an invalid message id for a multi-topics consumer (#492)

(cherry picked from commit 9e119cee53199da5694cd59491d097399ba67a1f)
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 8a43173..ff793fb 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -650,6 +650,14 @@
     callback(result, msg);
 }
 
+static void logErrorTopicNameForAcknowledge(const std::string& topic) {
+    if (topic.empty()) {
+        LOG_ERROR("MessageId without a topic name cannot be acknowledged for a multi-topics consumer");
+    } else {
+        LOG_ERROR("Message of topic: " << topic << " not in consumers");
+    }
+}
+
 void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
     if (state_ != Ready) {
         interceptors_->onAcknowledge(Consumer(shared_from_this()), ResultAlreadyClosed, msgId);
@@ -658,19 +666,14 @@
     }
 
     const std::string& topicPartitionName = msgId.getTopicName();
-    if (topicPartitionName.empty()) {
-        LOG_ERROR("MessageId without a topic name cannot be acknowledged for a multi-topics consumer");
-        callback(ResultOperationNotSupported);
-        return;
-    }
     auto optConsumer = consumers_.find(topicPartitionName);
 
     if (optConsumer) {
         unAckedMessageTrackerPtr_->remove(msgId);
         optConsumer.value()->acknowledgeAsync(msgId, callback);
     } else {
-        LOG_ERROR("Message of topic: " << topicPartitionName << " not in unAckedMessageTracker");
-        callback(ResultUnknownError);
+        logErrorTopicNameForAcknowledge(topicPartitionName);
+        callback(ResultOperationNotSupported);
     }
 }
 
@@ -684,7 +687,7 @@
     for (const MessageId& messageId : messageIdList) {
         auto topicName = messageId.getTopicName();
         if (topicName.empty()) {
-            LOG_ERROR("MessageId without a topic name cannot be acknowledged for a multi-topics consumer");
+            logErrorTopicNameForAcknowledge(topicName);
             callback(ResultOperationNotSupported);
             return;
         }
@@ -710,18 +713,21 @@
             unAckedMessageTrackerPtr_->remove(kv.second);
             optConsumer.value()->acknowledgeAsync(kv.second, cb);
         } else {
-            LOG_ERROR("Message of topic: " << kv.first << " not in consumers");
-            callback(ResultUnknownError);
+            logErrorTopicNameForAcknowledge(kv.first);
+            callback(ResultOperationNotSupported);
         }
     }
 }
 
 void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) {
-    msgId.getTopicName();
+    const auto& topic = msgId.getTopicName();
     auto optConsumer = consumers_.find(msgId.getTopicName());
     if (optConsumer) {
         unAckedMessageTrackerPtr_->removeMessagesTill(msgId);
         optConsumer.value()->acknowledgeCumulativeAsync(msgId, callback);
+    } else {
+        logErrorTopicNameForAcknowledge(topic);
+        callback(ResultOperationNotSupported);
     }
 }
 
diff --git a/tests/MultiTopicsConsumerTest.cc b/tests/MultiTopicsConsumerTest.cc
index 5aae1eb..6b9f3b3 100644
--- a/tests/MultiTopicsConsumerTest.cc
+++ b/tests/MultiTopicsConsumerTest.cc
@@ -103,3 +103,42 @@
 
     client.close();
 }
+
+TEST(MultiTopicsConsumerTest, testAcknowledgeInvalidMessageId) {
+    const std::string topicPrefix = "multi-topics-consumer-ack-invalid-msg-id";
+    Client client{lookupUrl};
+    std::vector<std::string> topics(2);
+    for (size_t i = 0; i < topics.size(); i++) {
+        Producer producer;
+        auto topic = topicPrefix + unique_str();
+        ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+        ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build()));
+        topics[i] = std::move(topic);
+    }
+
+    Consumer consumer;
+    ConsumerConfiguration conf;
+    conf.setSubscriptionInitialPosition(InitialPositionEarliest);
+    ASSERT_EQ(ResultOk, client.subscribe(topics, "sub", conf, consumer));
+
+    std::vector<MessageId> msgIds(topics.size());
+    for (size_t i = 0; i < topics.size(); i++) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
+        std::string serialized;
+        msg.getMessageId().serialize(serialized);
+        msgIds[i] = MessageId::deserialize(serialized);
+    }
+
+    ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds[0]));
+    ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds));
+    ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledgeCumulative(msgIds[1]));
+
+    msgIds[0].setTopicName("invalid-topic");
+    msgIds[1].setTopicName("invalid-topic");
+    ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds[0]));
+    ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds));
+    ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledgeCumulative(msgIds[1]));
+
+    client.close();
+}