Fix acknowledgeCumulative never returns when accepting an invalid message id for a multi-topics consumer (#492)
(cherry picked from commit 9e119cee53199da5694cd59491d097399ba67a1f)
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 8a43173..ff793fb 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -650,6 +650,14 @@
callback(result, msg);
}
+static void logErrorTopicNameForAcknowledge(const std::string& topic) {
+ if (topic.empty()) {
+ LOG_ERROR("MessageId without a topic name cannot be acknowledged for a multi-topics consumer");
+ } else {
+ LOG_ERROR("Message of topic: " << topic << " not in consumers");
+ }
+}
+
void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
if (state_ != Ready) {
interceptors_->onAcknowledge(Consumer(shared_from_this()), ResultAlreadyClosed, msgId);
@@ -658,19 +666,14 @@
}
const std::string& topicPartitionName = msgId.getTopicName();
- if (topicPartitionName.empty()) {
- LOG_ERROR("MessageId without a topic name cannot be acknowledged for a multi-topics consumer");
- callback(ResultOperationNotSupported);
- return;
- }
auto optConsumer = consumers_.find(topicPartitionName);
if (optConsumer) {
unAckedMessageTrackerPtr_->remove(msgId);
optConsumer.value()->acknowledgeAsync(msgId, callback);
} else {
- LOG_ERROR("Message of topic: " << topicPartitionName << " not in unAckedMessageTracker");
- callback(ResultUnknownError);
+ logErrorTopicNameForAcknowledge(topicPartitionName);
+ callback(ResultOperationNotSupported);
}
}
@@ -684,7 +687,7 @@
for (const MessageId& messageId : messageIdList) {
auto topicName = messageId.getTopicName();
if (topicName.empty()) {
- LOG_ERROR("MessageId without a topic name cannot be acknowledged for a multi-topics consumer");
+ logErrorTopicNameForAcknowledge(topicName);
callback(ResultOperationNotSupported);
return;
}
@@ -710,18 +713,21 @@
unAckedMessageTrackerPtr_->remove(kv.second);
optConsumer.value()->acknowledgeAsync(kv.second, cb);
} else {
- LOG_ERROR("Message of topic: " << kv.first << " not in consumers");
- callback(ResultUnknownError);
+ logErrorTopicNameForAcknowledge(kv.first);
+ callback(ResultOperationNotSupported);
}
}
}
void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) {
- msgId.getTopicName();
+ const auto& topic = msgId.getTopicName();
auto optConsumer = consumers_.find(msgId.getTopicName());
if (optConsumer) {
unAckedMessageTrackerPtr_->removeMessagesTill(msgId);
optConsumer.value()->acknowledgeCumulativeAsync(msgId, callback);
+ } else {
+ logErrorTopicNameForAcknowledge(topic);
+ callback(ResultOperationNotSupported);
}
}
diff --git a/tests/MultiTopicsConsumerTest.cc b/tests/MultiTopicsConsumerTest.cc
index 5aae1eb..6b9f3b3 100644
--- a/tests/MultiTopicsConsumerTest.cc
+++ b/tests/MultiTopicsConsumerTest.cc
@@ -103,3 +103,42 @@
client.close();
}
+
+TEST(MultiTopicsConsumerTest, testAcknowledgeInvalidMessageId) {
+ const std::string topicPrefix = "multi-topics-consumer-ack-invalid-msg-id";
+ Client client{lookupUrl};
+ std::vector<std::string> topics(2);
+ for (size_t i = 0; i < topics.size(); i++) {
+ Producer producer;
+ auto topic = topicPrefix + unique_str();
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+ ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build()));
+ topics[i] = std::move(topic);
+ }
+
+ Consumer consumer;
+ ConsumerConfiguration conf;
+ conf.setSubscriptionInitialPosition(InitialPositionEarliest);
+ ASSERT_EQ(ResultOk, client.subscribe(topics, "sub", conf, consumer));
+
+ std::vector<MessageId> msgIds(topics.size());
+ for (size_t i = 0; i < topics.size(); i++) {
+ Message msg;
+ ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
+ std::string serialized;
+ msg.getMessageId().serialize(serialized);
+ msgIds[i] = MessageId::deserialize(serialized);
+ }
+
+ ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds[0]));
+ ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds));
+ ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledgeCumulative(msgIds[1]));
+
+ msgIds[0].setTopicName("invalid-topic");
+ msgIds[1].setTopicName("invalid-topic");
+ ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds[0]));
+ ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msgIds));
+ ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledgeCumulative(msgIds[1]));
+
+ client.close();
+}