fix: Incorrect acknowledgment behavior in the listener of the multi-topic consumer. (#423)
### Motivation
https://github.com/apache/pulsar-client-node/issues/371
### Modifications
- Add the message to the unacknowledged tracker before call the listener.
### Verifying this change
- Add `testMultiConsumerListenerAndAck` to cover it.
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 1484785..80566c8 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -568,8 +568,8 @@
incomingMessages_.pop(m);
try {
Consumer self{get_shared_this_ptr()};
- messageListener_(self, m);
messageProcessed(m);
+ messageListener_(self, m);
} catch (const std::exception& e) {
LOG_ERROR("Exception thrown from listener of Partitioned Consumer" << e.what());
}
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index 9d71a04..b5c51ec 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -183,6 +183,7 @@
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
FRIEND_TEST(ConsumerTest, testPatternSubscribeTopic);
+ FRIEND_TEST(ConsumerTest, testMultiConsumerListenerAndAck);
};
typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr;
diff --git a/lib/UnAckedMessageTrackerEnabled.h b/lib/UnAckedMessageTrackerEnabled.h
index 83edc4c..c5479a7 100644
--- a/lib/UnAckedMessageTrackerEnabled.h
+++ b/lib/UnAckedMessageTrackerEnabled.h
@@ -67,6 +67,7 @@
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker);
FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
+ FRIEND_TEST(ConsumerTest, testMultiConsumerListenerAndAck);
};
} // namespace pulsar
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index f97457f..2aab722 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -1490,4 +1490,46 @@
ASSERT_EQ(ResultOk, client.subscribe(topic, "test-sub", consumer));
client.close();
}
+
+TEST(ConsumerTest, testMultiConsumerListenerAndAck) {
+ Client client{lookupUrl};
+
+ const std::string topicName = "testConsumerEventWithPartition-topic-" + std::to_string(time(nullptr));
+ int res = makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions",
+ std::to_string(5));
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+ // Create a producer
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+ int num = 10;
+ // Use listener to consume
+ Latch latch{num};
+ Consumer consumer;
+ ConsumerConfiguration consumerConfiguration;
+ PulsarFriend::setConsumerUnAckMessagesTimeoutMs(consumerConfiguration, 2000);
+ consumerConfiguration.setMessageListener([&latch](Consumer& consumer, const Message& msg) {
+ LOG_INFO("Received message '" << msg.getDataAsString() << "' and ack it");
+ consumer.acknowledge(msg);
+ latch.countdown();
+ });
+ ASSERT_EQ(ResultOk, client.subscribe(topicName, "consumer-1", consumerConfiguration, consumer));
+
+ // Send synchronously
+ for (int i = 0; i < 10; ++i) {
+ Message msg = MessageBuilder().setContent("content" + std::to_string(i)).build();
+ Result result = producer.send(msg);
+ LOG_INFO("Message sent: " << result);
+ }
+
+ ASSERT_TRUE(latch.wait(std::chrono::seconds(5)));
+ auto multiConsumerImplPtr = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer);
+ auto tracker =
+ static_cast<UnAckedMessageTrackerEnabled*>(multiConsumerImplPtr->unAckedMessageTrackerPtr_.get());
+ ASSERT_EQ(0, tracker->size());
+
+ client.close();
+}
+
} // namespace pulsar