[fix] Fix acknowledge MessageId list does not work when ackGroupingTimeMs is 0 (#128)
diff --git a/lib/AckGroupingTracker.cc b/lib/AckGroupingTracker.cc
index 4abfcb1..0379bc6 100644
--- a/lib/AckGroupingTracker.cc
+++ b/lib/AckGroupingTracker.cc
@@ -46,6 +46,19 @@
return true;
}
+static std::ostream& operator<<(std::ostream& os, const std::set<MessageId>& msgIds) {
+ bool first = true;
+ for (auto&& msgId : msgIds) {
+ if (first) {
+ first = false;
+ } else {
+ os << ", ";
+ }
+ os << "[" << msgId << "]";
+ }
+ return os;
+}
+
bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId,
const std::set<MessageId>& msgIds) {
auto cnx = connWeakPtr.lock();
@@ -54,8 +67,15 @@
return false;
}
- for (const auto& msgId : msgIds) {
- sendAck(cnx, consumerId, msgId, CommandAck_AckType_Individual);
+ if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
+ auto cmd = Commands::newMultiMessageAck(consumerId, msgIds);
+ cnx->sendCommand(cmd);
+ LOG_DEBUG("ACK request is sent for " << msgIds.size() << " messages: " << msgIds);
+ } else {
+ // Broker does not support multi-message ACK, use multiple individual ACKs instead.
+ for (const auto& msgId : msgIds) {
+ sendAck(cnx, consumerId, msgId, CommandAck_AckType_Individual);
+ }
}
return true;
}
diff --git a/lib/AckGroupingTrackerDisabled.cc b/lib/AckGroupingTrackerDisabled.cc
index ca53792..2fe2b2e 100644
--- a/lib/AckGroupingTrackerDisabled.cc
+++ b/lib/AckGroupingTrackerDisabled.cc
@@ -36,6 +36,14 @@
this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, CommandAck_AckType_Individual);
}
+void AckGroupingTrackerDisabled::addAcknowledgeList(const MessageIdList& msgIds) {
+ std::set<MessageId> msgIdSet;
+ for (auto&& msgId : msgIds) {
+ msgIdSet.emplace(msgId);
+ }
+ this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgIdSet);
+}
+
void AckGroupingTrackerDisabled::addAcknowledgeCumulative(const MessageId& msgId) {
this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, CommandAck_AckType_Cumulative);
}
diff --git a/lib/AckGroupingTrackerDisabled.h b/lib/AckGroupingTrackerDisabled.h
index ef6bfbe..7b41686 100644
--- a/lib/AckGroupingTrackerDisabled.h
+++ b/lib/AckGroupingTrackerDisabled.h
@@ -44,6 +44,7 @@
AckGroupingTrackerDisabled(HandlerBase& handler, uint64_t consumerId);
void addAcknowledge(const MessageId& msgId) override;
+ void addAcknowledgeList(const MessageIdList& msgIds) override;
void addAcknowledgeCumulative(const MessageId& msgId) override;
private:
diff --git a/lib/AckGroupingTrackerEnabled.cc b/lib/AckGroupingTrackerEnabled.cc
index 4c39c6f..6eba2d3 100644
--- a/lib/AckGroupingTrackerEnabled.cc
+++ b/lib/AckGroupingTrackerEnabled.cc
@@ -132,13 +132,7 @@
// Send ACK for individual ACK requests.
std::lock_guard<std::recursive_mutex> lock(this->rmutexPendingIndAcks_);
if (!this->pendingIndividualAcks_.empty()) {
- if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
- auto cmd = Commands::newMultiMessageAck(this->consumerId_, this->pendingIndividualAcks_);
- cnx->sendCommand(cmd);
- } else {
- // Broker does not support multi-message ACK, use multiple individual ACK instead.
- this->doImmediateAck(cnx, this->consumerId_, this->pendingIndividualAcks_);
- }
+ this->doImmediateAck(cnx, consumerId_, this->pendingIndividualAcks_);
this->pendingIndividualAcks_.clear();
}
}
diff --git a/tests/AcknowledgeTest.cc b/tests/AcknowledgeTest.cc
new file mode 100644
index 0000000..e0746be
--- /dev/null
+++ b/tests/AcknowledgeTest.cc
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+#include "lib/LogUtils.h"
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+static std::string lookupUrl = "pulsar://localhost:6650";
+static std::string adminUrl = "http://localhost:8080/";
+
+extern std::string unique_str();
+
+class AcknowledgeTest : public testing::TestWithParam<int> {};
+
+TEST_P(AcknowledgeTest, testAckMsgList) {
+ Client client(lookupUrl);
+ auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
+
+ constexpr auto numMsg = 100;
+ std::string uniqueChunk = unique_str();
+ std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk;
+ const std::string subName = "sub-ack-list";
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+ ConsumerConfiguration consumerConfig;
+ consumerConfig.setAckGroupingMaxSize(numMsg);
+ consumerConfig.setAckGroupingTimeMs(GetParam());
+ consumerConfig.setUnAckedMessagesTimeoutMs(10000);
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer));
+
+ // Sending and receiving messages.
+ for (auto count = 0; count < numMsg; ++count) {
+ Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ }
+
+ std::vector<MessageId> recvMsgId;
+ for (auto count = 0; count < numMsg; ++count) {
+ Message msg;
+ ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+ recvMsgId.emplace_back(msg.getMessageId());
+ }
+ ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId));
+
+ // try redeliver unack messages.
+ consumer.redeliverUnacknowledgedMessages();
+
+ auto consumerStats = PulsarFriend::getConsumerStatsPtr(consumer);
+ auto ackMap = consumerStats->getAckedMsgMap();
+ unsigned long totalAck = ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)];
+ ASSERT_EQ(totalAck, numMsg);
+
+ Message msg;
+ auto ret = consumer.receive(msg, 1000);
+ ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();
+
+ producer.close();
+ consumer.close();
+ client.close();
+}
+
+TEST_P(AcknowledgeTest, testAckMsgListWithMultiConsumer) {
+ Client client(lookupUrl);
+ auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
+
+ std::string uniqueChunk = unique_str();
+ std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk;
+
+ // call admin api to make it partitioned
+ std::string url =
+ adminUrl + "admin/v2/persistent/public/default/test-ack-msgs" + uniqueChunk + "/partitions";
+ int res = makePutRequest(url, "5");
+ LOG_INFO("res = " << res);
+ ASSERT_FALSE(res != 204 && res != 409);
+
+ constexpr auto numMsg = 100;
+ const std::string subName = "sub-ack-list";
+
+ Producer producer;
+ ProducerConfiguration producerConfig;
+ // Turn off batch to ensure even distribution
+ producerConfig.setBatchingEnabled(false);
+ producerConfig.setPartitionsRoutingMode(pulsar::ProducerConfiguration::RoundRobinDistribution);
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));
+
+ ConsumerConfiguration consumerConfig;
+ // set ack grouping max size is 10
+ consumerConfig.setAckGroupingMaxSize(10);
+ consumerConfig.setAckGroupingTimeMs(GetParam());
+ consumerConfig.setUnAckedMessagesTimeoutMs(10000);
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer));
+
+ // Sending and receiving messages.
+ for (auto count = 0; count < numMsg; ++count) {
+ Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ }
+
+ std::vector<MessageId> recvMsgId;
+ for (auto count = 0; count < numMsg; ++count) {
+ Message msg;
+ ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+ recvMsgId.emplace_back(msg.getMessageId());
+ }
+ ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId));
+
+ // try redeliver unack messages.
+ consumer.redeliverUnacknowledgedMessages();
+
+ // assert stats
+ unsigned long totalAck = 0;
+ auto consumerStatsList = PulsarFriend::getConsumerStatsPtrList(consumer);
+ for (auto consumerStats : consumerStatsList) {
+ auto ackMap = consumerStats->getAckedMsgMap();
+ totalAck += ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)];
+ }
+ ASSERT_EQ(totalAck, numMsg);
+
+ Message msg;
+ auto ret = consumer.receive(msg, 1000);
+ ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();
+
+ producer.close();
+ consumer.close();
+ client.close();
+}
+
+INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, AcknowledgeTest, testing::Values(100, 0));
diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc
index 4b181d0..b801d38 100644
--- a/tests/BasicEndToEndTest.cc
+++ b/tests/BasicEndToEndTest.cc
@@ -20,6 +20,7 @@
#include <pulsar/Client.h>
#include <algorithm>
+#include <atomic>
#include <chrono>
#include <cstring>
#include <functional>
@@ -58,7 +59,7 @@
static long globalResendMessageCount = 0;
std::string lookupUrl = "pulsar://localhost:6650";
static std::string adminUrl = "http://localhost:8080/";
-static int uniqueCounter = 0;
+static std::atomic_int uniqueCounter{0};
std::string unique_str() {
long nanos = std::chrono::duration_cast<std::chrono::milliseconds>(
@@ -4276,118 +4277,3 @@
TEST(BasicEndToEndTest, testBatchReceiveClose) { testBatchReceiveClose(false); }
TEST(BasicEndToEndTest, testBatchReceiveCloseWithMultiConsumer) { testBatchReceiveClose(true); }
-
-TEST(BasicEndToEndTest, testAckMsgList) {
- Client client(lookupUrl);
- auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
-
- constexpr auto numMsg = 100;
- std::string uniqueChunk = unique_str();
- std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk;
- const std::string subName = "sub-ack-list";
-
- Producer producer;
- ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
-
- ConsumerConfiguration consumerConfig;
- consumerConfig.setAckGroupingMaxSize(numMsg);
- consumerConfig.setUnAckedMessagesTimeoutMs(10000);
- Consumer consumer;
- ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer));
-
- // Sending and receiving messages.
- for (auto count = 0; count < numMsg; ++count) {
- Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
- ASSERT_EQ(ResultOk, producer.send(msg));
- }
-
- std::vector<MessageId> recvMsgId;
- for (auto count = 0; count < numMsg; ++count) {
- Message msg;
- ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
- recvMsgId.emplace_back(msg.getMessageId());
- }
- ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId));
-
- // try redeliver unack messages.
- consumer.redeliverUnacknowledgedMessages();
-
- auto consumerStats = PulsarFriend::getConsumerStatsPtr(consumer);
- auto ackMap = consumerStats->getAckedMsgMap();
- unsigned long totalAck = ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)];
- ASSERT_EQ(totalAck, numMsg);
-
- Message msg;
- auto ret = consumer.receive(msg, 1000);
- ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();
-
- producer.close();
- consumer.close();
- client.close();
-}
-
-TEST(BasicEndToEndTest, testAckMsgListWithMultiConsumer) {
- Client client(lookupUrl);
- auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
-
- std::string uniqueChunk = unique_str();
- std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk;
-
- // call admin api to make it partitioned
- std::string url =
- adminUrl + "admin/v2/persistent/public/default/test-ack-msgs" + uniqueChunk + "/partitions";
- int res = makePutRequest(url, "5");
- LOG_INFO("res = " << res);
- ASSERT_FALSE(res != 204 && res != 409);
-
- constexpr auto numMsg = 100;
- const std::string subName = "sub-ack-list";
-
- Producer producer;
- ProducerConfiguration producerConfig;
- // Turn off batch to ensure even distribution
- producerConfig.setBatchingEnabled(false);
- producerConfig.setPartitionsRoutingMode(pulsar::ProducerConfiguration::RoundRobinDistribution);
- ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));
-
- ConsumerConfiguration consumerConfig;
- // set ack grouping max size is 10
- consumerConfig.setAckGroupingMaxSize(10);
- consumerConfig.setUnAckedMessagesTimeoutMs(10000);
- Consumer consumer;
- ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer));
-
- // Sending and receiving messages.
- for (auto count = 0; count < numMsg; ++count) {
- Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
- ASSERT_EQ(ResultOk, producer.send(msg));
- }
-
- std::vector<MessageId> recvMsgId;
- for (auto count = 0; count < numMsg; ++count) {
- Message msg;
- ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
- recvMsgId.emplace_back(msg.getMessageId());
- }
- ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId));
-
- // try redeliver unack messages.
- consumer.redeliverUnacknowledgedMessages();
-
- // assert stats
- unsigned long totalAck = 0;
- auto consumerStatsList = PulsarFriend::getConsumerStatsPtrList(consumer);
- for (auto consumerStats : consumerStatsList) {
- auto ackMap = consumerStats->getAckedMsgMap();
- totalAck += ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)];
- }
- ASSERT_EQ(totalAck, numMsg);
-
- Message msg;
- auto ret = consumer.receive(msg, 1000);
- ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();
-
- producer.close();
- consumer.close();
- client.close();
-}