Fix duplicated subscribed topics not deduplicated (#501)

diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index 8298ebc..aa75128 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -21,7 +21,9 @@
 #include <pulsar/ClientConfiguration.h>
 #include <pulsar/Version.h>
 
+#include <algorithm>
 #include <chrono>
+#include <iterator>
 #include <random>
 #include <sstream>
 
@@ -404,10 +406,17 @@
     }
 }
 
-void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
-                                const ConsumerConfiguration& conf, const SubscribeCallback& callback) {
+void ClientImpl::subscribeAsync(const std::vector<std::string>& originalTopics,
+                                const std::string& subscriptionName, const ConsumerConfiguration& conf,
+                                const SubscribeCallback& callback) {
     TopicNamePtr topicNamePtr;
 
+    // Remove duplicates from the list of topics
+    auto topics = originalTopics;
+    std::sort(topics.begin(), topics.end());
+    auto it = std::unique(topics.begin(), topics.end());
+    auto newSize = std::distance(topics.begin(), it);
+    topics.resize(newSize);
     Lock lock(mutex_);
     if (state_ != Open) {
         lock.unlock();
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index ad4ee81..b4c7261 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -1499,4 +1499,24 @@
     client.close();
 }
 
+TEST(ConsumerTest, testDuplicatedTopics) {
+    Client client{lookupUrl};
+    auto topicPrefix = "consumer-test-duplicated-topics" + std::to_string(time(nullptr));
+    std::array<std::string, 3> uniqueTopics{topicPrefix + "0", topicPrefix + "1", topicPrefix + "2"};
+    std::vector<std::string> topics{uniqueTopics[1], uniqueTopics[0], uniqueTopics[2], uniqueTopics[1],
+                                    uniqueTopics[2]};
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topics, "sub", consumer));
+
+    for (size_t i = 0; i < uniqueTopics.size(); i++) {
+        Producer producer;
+        ASSERT_EQ(ResultOk, client.createProducer(topics[i], producer));
+        ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build()));
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
+        ASSERT_EQ("msg-" + std::to_string(i), msg.getDataAsString());
+    }
+}
+
 }  // namespace pulsar