fix: Incorrect acknowledgment behavior in the listener of the multi-topic consumer. (#423)

### Motivation
https://github.com/apache/pulsar-client-node/issues/371

### Modifications
- Add the message to the unacknowledged tracker before call the listener.

### Verifying this change
- Add `testMultiConsumerListenerAndAck` to cover it.
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 1484785..80566c8 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -568,8 +568,8 @@
     incomingMessages_.pop(m);
     try {
         Consumer self{get_shared_this_ptr()};
-        messageListener_(self, m);
         messageProcessed(m);
+        messageListener_(self, m);
     } catch (const std::exception& e) {
         LOG_ERROR("Exception thrown from listener of Partitioned Consumer" << e.what());
     }
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index 9d71a04..b5c51ec 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -183,6 +183,7 @@
     FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
     FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
     FRIEND_TEST(ConsumerTest, testPatternSubscribeTopic);
+    FRIEND_TEST(ConsumerTest, testMultiConsumerListenerAndAck);
 };
 
 typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr;
diff --git a/lib/UnAckedMessageTrackerEnabled.h b/lib/UnAckedMessageTrackerEnabled.h
index 83edc4c..c5479a7 100644
--- a/lib/UnAckedMessageTrackerEnabled.h
+++ b/lib/UnAckedMessageTrackerEnabled.h
@@ -67,6 +67,7 @@
     FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
     FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker);
     FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
+    FRIEND_TEST(ConsumerTest, testMultiConsumerListenerAndAck);
 };
 }  // namespace pulsar
 
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index f97457f..2aab722 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -1490,4 +1490,46 @@
     ASSERT_EQ(ResultOk, client.subscribe(topic, "test-sub", consumer));
     client.close();
 }
+
+TEST(ConsumerTest, testMultiConsumerListenerAndAck) {
+    Client client{lookupUrl};
+
+    const std::string topicName = "testConsumerEventWithPartition-topic-" + std::to_string(time(nullptr));
+    int res = makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions",
+                             std::to_string(5));
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    // Create a producer
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+    int num = 10;
+    // Use listener to consume
+    Latch latch{num};
+    Consumer consumer;
+    ConsumerConfiguration consumerConfiguration;
+    PulsarFriend::setConsumerUnAckMessagesTimeoutMs(consumerConfiguration, 2000);
+    consumerConfiguration.setMessageListener([&latch](Consumer& consumer, const Message& msg) {
+        LOG_INFO("Received message '" << msg.getDataAsString() << "' and ack it");
+        consumer.acknowledge(msg);
+        latch.countdown();
+    });
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, "consumer-1", consumerConfiguration, consumer));
+
+    // Send synchronously
+    for (int i = 0; i < 10; ++i) {
+        Message msg = MessageBuilder().setContent("content" + std::to_string(i)).build();
+        Result result = producer.send(msg);
+        LOG_INFO("Message sent: " << result);
+    }
+
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(5)));
+    auto multiConsumerImplPtr = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer);
+    auto tracker =
+        static_cast<UnAckedMessageTrackerEnabled*>(multiConsumerImplPtr->unAckedMessageTrackerPtr_.get());
+    ASSERT_EQ(0, tracker->size());
+
+    client.close();
+}
+
 }  // namespace pulsar