Fix TableView's existing key-value will never be updated (#487)

(cherry picked from commit d9dd029c1e0627aed416a354f2806c2acc4b95d8)
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index 3d19c42..a6f8b5f 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -222,12 +222,12 @@
 void ClientImpl::handleProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
                                        CreateProducerCallback callback, ProducerImplBasePtr producer) {
     if (result == ResultOk) {
-        auto pair = producers_.emplace(producer.get(), producer);
-        if (!pair.second) {
-            auto existingProducer = pair.first->second.lock();
+        auto address = producer.get();
+        auto existingProducer = producers_.putIfAbsent(address, producer);
+        if (existingProducer) {
+            auto producer = existingProducer.value().lock();
             LOG_ERROR("Unexpected existing producer at the same address: "
-                      << pair.first->first << ", producer: "
-                      << (existingProducer ? existingProducer->getProducerName() : "(null)"));
+                      << address << ", producer: " << (producer ? producer->getProducerName() : "(null)"));
             callback(ResultUnknownError, {});
             return;
         }
@@ -311,12 +311,12 @@
     reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
         auto consumer = weakConsumerPtr.lock();
         if (consumer) {
-            auto pair = consumers_.emplace(consumer.get(), consumer);
-            if (!pair.second) {
-                auto existingConsumer = pair.first->second.lock();
+            auto address = consumer.get();
+            auto existingConsumer = consumers_.putIfAbsent(address, consumer);
+            if (existingConsumer) {
+                consumer = existingConsumer.value().lock();
                 LOG_ERROR("Unexpected existing consumer at the same address: "
-                          << pair.first->first
-                          << ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
+                          << address << ", consumer: " << (consumer ? consumer->getName() : "(null)"));
             }
         } else {
             LOG_ERROR("Unexpected case: the consumer is somehow expired");
@@ -512,12 +512,12 @@
 void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
                                        SubscribeCallback callback, ConsumerImplBasePtr consumer) {
     if (result == ResultOk) {
-        auto pair = consumers_.emplace(consumer.get(), consumer);
-        if (!pair.second) {
-            auto existingConsumer = pair.first->second.lock();
+        auto address = consumer.get();
+        auto existingConsumer = consumers_.putIfAbsent(address, consumer);
+        if (existingConsumer) {
+            auto consumer = existingConsumer.value().lock();
             LOG_ERROR("Unexpected existing consumer at the same address: "
-                      << pair.first->first
-                      << ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
+                      << address << ", consumer: " << (consumer ? consumer->getName() : "(null)"));
             callback(ResultUnknownError, {});
             return;
         }
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 250845b..f429bfb 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -619,7 +619,7 @@
             return;
         }
         if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) {
-            possibleSendToDeadLetterTopicMessages_.emplace(m.getMessageId(), std::vector<Message>{m});
+            possibleSendToDeadLetterTopicMessages_.put(m.getMessageId(), std::vector<Message>{m});
             if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) {
                 redeliverUnacknowledgedMessages({m.getMessageId()});
                 increaseAvailablePermits(cnx);
@@ -786,7 +786,7 @@
     }
 
     if (!possibleToDeadLetter.empty()) {
-        possibleSendToDeadLetterTopicMessages_.emplace(batchedMessage.getMessageId(), possibleToDeadLetter);
+        possibleSendToDeadLetterTopicMessages_.put(batchedMessage.getMessageId(), possibleToDeadLetter);
         if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) {
             redeliverUnacknowledgedMessages({batchedMessage.getMessageId()});
         }
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index dddade5..8a43173 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -260,7 +260,7 @@
         consumer->getConsumerCreatedFuture().addListener(std::bind(
             &MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
             std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
-        consumers_.emplace(topicName->toString(), consumer);
+        consumers_.put(topicName->toString(), consumer);
         LOG_DEBUG("Creating Consumer for - " << topicName << " - " << consumerStr_);
         consumer->start();
 
@@ -287,7 +287,7 @@
                 &MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
                 std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
             consumer->setPartitionIndex(i);
-            consumers_.emplace(topicPartitionName, consumer);
+            consumers_.put(topicPartitionName, consumer);
             LOG_DEBUG("Creating Consumer for - " << topicPartitionName << " - " << consumerStr_);
             consumer->start();
         }
@@ -1063,7 +1063,7 @@
         });
     consumer->setPartitionIndex(partitionIndex);
     consumer->start();
-    consumers_.emplace(topicPartitionName, consumer);
+    consumers_.put(topicPartitionName, consumer);
     LOG_INFO("Add Creating Consumer for - " << topicPartitionName << " - " << consumerStr_
                                             << " consumerSize: " << consumers_.size());
 }
diff --git a/lib/SynchronizedHashMap.h b/lib/SynchronizedHashMap.h
index e224913..dacaf45 100644
--- a/lib/SynchronizedHashMap.h
+++ b/lib/SynchronizedHashMap.h
@@ -59,10 +59,22 @@
         }
     }
 
-    template <typename... Args>
-    std::pair<Iterator, bool> emplace(Args&&... args) {
+    // Put a new key-value pair if the key does not exist.
+    // Return boost::none if the key already exists or the existing value.
+    OptValue putIfAbsent(const K& key, const V& value) {
         Lock lock(mutex_);
-        return data_.emplace(std::forward<Args>(args)...);
+        auto pair = data_.emplace(key, value);
+        if (pair.second) {
+            return boost::none;
+        } else {
+            return pair.first->second;
+        }
+    }
+
+    // Put a key-value pair no matter if the key exists.
+    void put(const K& key, const V& value) {
+        Lock lock(mutex_);
+        data_[key] = value;
     }
 
     void forEach(std::function<void(const K&, const V&)> f) const {
diff --git a/lib/TableViewImpl.cc b/lib/TableViewImpl.cc
index fd24932..e434e60 100644
--- a/lib/TableViewImpl.cc
+++ b/lib/TableViewImpl.cc
@@ -104,7 +104,7 @@
         if (msg.getLength() == 0) {
             data_.remove(msg.getPartitionKey());
         } else {
-            data_.emplace(msg.getPartitionKey(), value);
+            data_.put(msg.getPartitionKey(), value);
         }
 
         Lock lock(listenersMutex_);
@@ -167,4 +167,4 @@
     });
 }
 
-}  // namespace pulsar
\ No newline at end of file
+}  // namespace pulsar
diff --git a/tests/SynchronizedHashMapTest.cc b/tests/SynchronizedHashMapTest.cc
index cf184d9..3dba6bf 100644
--- a/tests/SynchronizedHashMapTest.cc
+++ b/tests/SynchronizedHashMapTest.cc
@@ -20,6 +20,7 @@
 
 #include <algorithm>
 #include <atomic>
+#include <boost/optional/optional_io.hpp>
 #include <chrono>
 #include <thread>
 #include <vector>
@@ -100,18 +101,27 @@
     ASSERT_TRUE(values.empty());
     ASSERT_EQ(result, 1);
 
-    m.emplace(1, 100);
+    ASSERT_EQ(m.putIfAbsent(1, 100), boost::none);
+    ASSERT_EQ(m.putIfAbsent(1, 101), boost::optional<int>(100));
     m.forEachValue([&values](int value, SharedFuture) { values.emplace_back(value); },
                    [&result] { result = 2; });
     ASSERT_EQ(values, (std::vector<int>({100})));
     ASSERT_EQ(result, 1);
 
+    m.put(1, 102);
     values.clear();
-    m.emplace(2, 200);
+    m.forEachValue([&values](int value, SharedFuture) { values.emplace_back(value); },
+                   [&result] { result = 2; });
+    ASSERT_EQ(values, (std::vector<int>({102})));
+    ASSERT_EQ(result, 1);
+
+    values.clear();
+    ASSERT_EQ(m.putIfAbsent(2, 200), boost::none);
+    ASSERT_EQ(m.putIfAbsent(2, 201), boost::optional<int>(200));
     m.forEachValue([&values](int value, SharedFuture) { values.emplace_back(value); },
                    [&result] { result = 2; });
     std::sort(values.begin(), values.end());
-    ASSERT_EQ(values, (std::vector<int>({100, 200})));
+    ASSERT_EQ(values, (std::vector<int>({102, 200})));
     ASSERT_EQ(result, 1);
 }
 
diff --git a/tests/TableViewTest.cc b/tests/TableViewTest.cc
index 2515818..b51b2c8 100644
--- a/tests/TableViewTest.cc
+++ b/tests/TableViewTest.cc
@@ -96,10 +96,22 @@
 
     // assert interfaces.
     std::string value;
+    ASSERT_TRUE(tableView.containsKey("key1"));
     ASSERT_TRUE(tableView.getValue("key1", value));
     ASSERT_EQ(value, "value1");
+
+    // Test value update
+    ASSERT_EQ(ResultOk,
+              producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1-update").build()));
+    ASSERT_TRUE(waitUntil(std::chrono::seconds(2), [&tableView]() {
+        std::string value;
+        tableView.getValue("key1", value);
+        return value == "value1-update";
+    }));
+
+    // retrieveValue will remove the key/value from the table view.
     ASSERT_TRUE(tableView.retrieveValue("key1", value));
-    ASSERT_EQ(value, "value1");
+    ASSERT_EQ(value, "value1-update");
     ASSERT_FALSE(tableView.containsKey("key1"));
     ASSERT_EQ(tableView.snapshot().size(), count * 2 - 1);
     ASSERT_EQ(tableView.size(), 0);
diff --git a/tests/extensibleLM/ExtensibleLoadManagerTest.cc b/tests/extensibleLM/ExtensibleLoadManagerTest.cc
index c7e5aa0..4af0806 100644
--- a/tests/extensibleLM/ExtensibleLoadManagerTest.cc
+++ b/tests/extensibleLM/ExtensibleLoadManagerTest.cc
@@ -116,7 +116,7 @@
             ASSERT_EQ(sendResult, ResultOk);
             ASSERT_TRUE(elapsed < maxWaitTimeMs);
 
-            producedMsgs.emplace(i, i);
+            producedMsgs.put(i, i);
             i++;
         }
         LOG_INFO("producer finished");
@@ -143,7 +143,7 @@
             LOG_INFO("acked i:" << i << " " << elapsed << " ms");
             ASSERT_TRUE(elapsed < maxWaitTimeMs);
             ASSERT_EQ(ackResult, ResultOk);
-            consumedMsgs.emplace(i, i);
+            consumedMsgs.put(i, i);
         }
         LOG_INFO("consumer finished");
     };