[fix] Fix acknowledge MessageId list does not work when ackGroupingTimeMs is 0 (#128)

diff --git a/lib/AckGroupingTracker.cc b/lib/AckGroupingTracker.cc
index 4abfcb1..0379bc6 100644
--- a/lib/AckGroupingTracker.cc
+++ b/lib/AckGroupingTracker.cc
@@ -46,6 +46,19 @@
     return true;
 }
 
+static std::ostream& operator<<(std::ostream& os, const std::set<MessageId>& msgIds) {
+    bool first = true;
+    for (auto&& msgId : msgIds) {
+        if (first) {
+            first = false;
+        } else {
+            os << ", ";
+        }
+        os << "[" << msgId << "]";
+    }
+    return os;
+}
+
 bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId,
                                         const std::set<MessageId>& msgIds) {
     auto cnx = connWeakPtr.lock();
@@ -54,8 +67,15 @@
         return false;
     }
 
-    for (const auto& msgId : msgIds) {
-        sendAck(cnx, consumerId, msgId, CommandAck_AckType_Individual);
+    if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
+        auto cmd = Commands::newMultiMessageAck(consumerId, msgIds);
+        cnx->sendCommand(cmd);
+        LOG_DEBUG("ACK request is sent for " << msgIds.size() << " messages: " << msgIds);
+    } else {
+        // Broker does not support multi-message ACK, use multiple individual ACKs instead.
+        for (const auto& msgId : msgIds) {
+            sendAck(cnx, consumerId, msgId, CommandAck_AckType_Individual);
+        }
     }
     return true;
 }
diff --git a/lib/AckGroupingTrackerDisabled.cc b/lib/AckGroupingTrackerDisabled.cc
index ca53792..2fe2b2e 100644
--- a/lib/AckGroupingTrackerDisabled.cc
+++ b/lib/AckGroupingTrackerDisabled.cc
@@ -36,6 +36,14 @@
     this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, CommandAck_AckType_Individual);
 }
 
+void AckGroupingTrackerDisabled::addAcknowledgeList(const MessageIdList& msgIds) {
+    std::set<MessageId> msgIdSet;
+    for (auto&& msgId : msgIds) {
+        msgIdSet.emplace(msgId);
+    }
+    this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgIdSet);
+}
+
 void AckGroupingTrackerDisabled::addAcknowledgeCumulative(const MessageId& msgId) {
     this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, CommandAck_AckType_Cumulative);
 }
diff --git a/lib/AckGroupingTrackerDisabled.h b/lib/AckGroupingTrackerDisabled.h
index ef6bfbe..7b41686 100644
--- a/lib/AckGroupingTrackerDisabled.h
+++ b/lib/AckGroupingTrackerDisabled.h
@@ -44,6 +44,7 @@
     AckGroupingTrackerDisabled(HandlerBase& handler, uint64_t consumerId);
 
     void addAcknowledge(const MessageId& msgId) override;
+    void addAcknowledgeList(const MessageIdList& msgIds) override;
     void addAcknowledgeCumulative(const MessageId& msgId) override;
 
    private:
diff --git a/lib/AckGroupingTrackerEnabled.cc b/lib/AckGroupingTrackerEnabled.cc
index 4c39c6f..6eba2d3 100644
--- a/lib/AckGroupingTrackerEnabled.cc
+++ b/lib/AckGroupingTrackerEnabled.cc
@@ -132,13 +132,7 @@
     // Send ACK for individual ACK requests.
     std::lock_guard<std::recursive_mutex> lock(this->rmutexPendingIndAcks_);
     if (!this->pendingIndividualAcks_.empty()) {
-        if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
-            auto cmd = Commands::newMultiMessageAck(this->consumerId_, this->pendingIndividualAcks_);
-            cnx->sendCommand(cmd);
-        } else {
-            // Broker does not support multi-message ACK, use multiple individual ACK instead.
-            this->doImmediateAck(cnx, this->consumerId_, this->pendingIndividualAcks_);
-        }
+        this->doImmediateAck(cnx, consumerId_, this->pendingIndividualAcks_);
         this->pendingIndividualAcks_.clear();
     }
 }
diff --git a/tests/AcknowledgeTest.cc b/tests/AcknowledgeTest.cc
new file mode 100644
index 0000000..e0746be
--- /dev/null
+++ b/tests/AcknowledgeTest.cc
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+#include "lib/LogUtils.h"
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+static std::string lookupUrl = "pulsar://localhost:6650";
+static std::string adminUrl = "http://localhost:8080/";
+
+extern std::string unique_str();
+
+class AcknowledgeTest : public testing::TestWithParam<int> {};
+
+TEST_P(AcknowledgeTest, testAckMsgList) {
+    Client client(lookupUrl);
+    auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
+
+    constexpr auto numMsg = 100;
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk;
+    const std::string subName = "sub-ack-list";
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setAckGroupingMaxSize(numMsg);
+    consumerConfig.setAckGroupingTimeMs(GetParam());
+    consumerConfig.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer));
+
+    // Sending and receiving messages.
+    for (auto count = 0; count < numMsg; ++count) {
+        Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    std::vector<MessageId> recvMsgId;
+    for (auto count = 0; count < numMsg; ++count) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+        recvMsgId.emplace_back(msg.getMessageId());
+    }
+    ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId));
+
+    // try redeliver unack messages.
+    consumer.redeliverUnacknowledgedMessages();
+
+    auto consumerStats = PulsarFriend::getConsumerStatsPtr(consumer);
+    auto ackMap = consumerStats->getAckedMsgMap();
+    unsigned long totalAck = ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)];
+    ASSERT_EQ(totalAck, numMsg);
+
+    Message msg;
+    auto ret = consumer.receive(msg, 1000);
+    ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();
+
+    producer.close();
+    consumer.close();
+    client.close();
+}
+
+TEST_P(AcknowledgeTest, testAckMsgListWithMultiConsumer) {
+    Client client(lookupUrl);
+    auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
+
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk;
+
+    // call admin api to make it partitioned
+    std::string url =
+        adminUrl + "admin/v2/persistent/public/default/test-ack-msgs" + uniqueChunk + "/partitions";
+    int res = makePutRequest(url, "5");
+    LOG_INFO("res = " << res);
+    ASSERT_FALSE(res != 204 && res != 409);
+
+    constexpr auto numMsg = 100;
+    const std::string subName = "sub-ack-list";
+
+    Producer producer;
+    ProducerConfiguration producerConfig;
+    // Turn off batch to ensure even distribution
+    producerConfig.setBatchingEnabled(false);
+    producerConfig.setPartitionsRoutingMode(pulsar::ProducerConfiguration::RoundRobinDistribution);
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));
+
+    ConsumerConfiguration consumerConfig;
+    // set ack grouping max size is 10
+    consumerConfig.setAckGroupingMaxSize(10);
+    consumerConfig.setAckGroupingTimeMs(GetParam());
+    consumerConfig.setUnAckedMessagesTimeoutMs(10000);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer));
+
+    // Sending and receiving messages.
+    for (auto count = 0; count < numMsg; ++count) {
+        Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    std::vector<MessageId> recvMsgId;
+    for (auto count = 0; count < numMsg; ++count) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+        recvMsgId.emplace_back(msg.getMessageId());
+    }
+    ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId));
+
+    // try redeliver unack messages.
+    consumer.redeliverUnacknowledgedMessages();
+
+    // assert stats
+    unsigned long totalAck = 0;
+    auto consumerStatsList = PulsarFriend::getConsumerStatsPtrList(consumer);
+    for (auto consumerStats : consumerStatsList) {
+        auto ackMap = consumerStats->getAckedMsgMap();
+        totalAck += ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)];
+    }
+    ASSERT_EQ(totalAck, numMsg);
+
+    Message msg;
+    auto ret = consumer.receive(msg, 1000);
+    ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();
+
+    producer.close();
+    consumer.close();
+    client.close();
+}
+
+INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, AcknowledgeTest, testing::Values(100, 0));
diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc
index 4b181d0..b801d38 100644
--- a/tests/BasicEndToEndTest.cc
+++ b/tests/BasicEndToEndTest.cc
@@ -20,6 +20,7 @@
 #include <pulsar/Client.h>
 
 #include <algorithm>
+#include <atomic>
 #include <chrono>
 #include <cstring>
 #include <functional>
@@ -58,7 +59,7 @@
 static long globalResendMessageCount = 0;
 std::string lookupUrl = "pulsar://localhost:6650";
 static std::string adminUrl = "http://localhost:8080/";
-static int uniqueCounter = 0;
+static std::atomic_int uniqueCounter{0};
 
 std::string unique_str() {
     long nanos = std::chrono::duration_cast<std::chrono::milliseconds>(
@@ -4276,118 +4277,3 @@
 TEST(BasicEndToEndTest, testBatchReceiveClose) { testBatchReceiveClose(false); }
 
 TEST(BasicEndToEndTest, testBatchReceiveCloseWithMultiConsumer) { testBatchReceiveClose(true); }
-
-TEST(BasicEndToEndTest, testAckMsgList) {
-    Client client(lookupUrl);
-    auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
-
-    constexpr auto numMsg = 100;
-    std::string uniqueChunk = unique_str();
-    std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk;
-    const std::string subName = "sub-ack-list";
-
-    Producer producer;
-    ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
-
-    ConsumerConfiguration consumerConfig;
-    consumerConfig.setAckGroupingMaxSize(numMsg);
-    consumerConfig.setUnAckedMessagesTimeoutMs(10000);
-    Consumer consumer;
-    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer));
-
-    // Sending and receiving messages.
-    for (auto count = 0; count < numMsg; ++count) {
-        Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
-        ASSERT_EQ(ResultOk, producer.send(msg));
-    }
-
-    std::vector<MessageId> recvMsgId;
-    for (auto count = 0; count < numMsg; ++count) {
-        Message msg;
-        ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
-        recvMsgId.emplace_back(msg.getMessageId());
-    }
-    ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId));
-
-    // try redeliver unack messages.
-    consumer.redeliverUnacknowledgedMessages();
-
-    auto consumerStats = PulsarFriend::getConsumerStatsPtr(consumer);
-    auto ackMap = consumerStats->getAckedMsgMap();
-    unsigned long totalAck = ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)];
-    ASSERT_EQ(totalAck, numMsg);
-
-    Message msg;
-    auto ret = consumer.receive(msg, 1000);
-    ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();
-
-    producer.close();
-    consumer.close();
-    client.close();
-}
-
-TEST(BasicEndToEndTest, testAckMsgListWithMultiConsumer) {
-    Client client(lookupUrl);
-    auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
-
-    std::string uniqueChunk = unique_str();
-    std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk;
-
-    // call admin api to make it partitioned
-    std::string url =
-        adminUrl + "admin/v2/persistent/public/default/test-ack-msgs" + uniqueChunk + "/partitions";
-    int res = makePutRequest(url, "5");
-    LOG_INFO("res = " << res);
-    ASSERT_FALSE(res != 204 && res != 409);
-
-    constexpr auto numMsg = 100;
-    const std::string subName = "sub-ack-list";
-
-    Producer producer;
-    ProducerConfiguration producerConfig;
-    // Turn off batch to ensure even distribution
-    producerConfig.setBatchingEnabled(false);
-    producerConfig.setPartitionsRoutingMode(pulsar::ProducerConfiguration::RoundRobinDistribution);
-    ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));
-
-    ConsumerConfiguration consumerConfig;
-    // set ack grouping max size is 10
-    consumerConfig.setAckGroupingMaxSize(10);
-    consumerConfig.setUnAckedMessagesTimeoutMs(10000);
-    Consumer consumer;
-    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer));
-
-    // Sending and receiving messages.
-    for (auto count = 0; count < numMsg; ++count) {
-        Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
-        ASSERT_EQ(ResultOk, producer.send(msg));
-    }
-
-    std::vector<MessageId> recvMsgId;
-    for (auto count = 0; count < numMsg; ++count) {
-        Message msg;
-        ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
-        recvMsgId.emplace_back(msg.getMessageId());
-    }
-    ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId));
-
-    // try redeliver unack messages.
-    consumer.redeliverUnacknowledgedMessages();
-
-    // assert stats
-    unsigned long totalAck = 0;
-    auto consumerStatsList = PulsarFriend::getConsumerStatsPtrList(consumer);
-    for (auto consumerStats : consumerStatsList) {
-        auto ackMap = consumerStats->getAckedMsgMap();
-        totalAck += ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)];
-    }
-    ASSERT_EQ(totalAck, numMsg);
-
-    Message msg;
-    auto ret = consumer.receive(msg, 1000);
-    ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();
-
-    producer.close();
-    consumer.close();
-    client.close();
-}