Fix TableView's existing key-value will never be updated (#487)
(cherry picked from commit d9dd029c1e0627aed416a354f2806c2acc4b95d8)
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index 3d19c42..a6f8b5f 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -222,12 +222,12 @@
void ClientImpl::handleProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
CreateProducerCallback callback, ProducerImplBasePtr producer) {
if (result == ResultOk) {
- auto pair = producers_.emplace(producer.get(), producer);
- if (!pair.second) {
- auto existingProducer = pair.first->second.lock();
+ auto address = producer.get();
+ auto existingProducer = producers_.putIfAbsent(address, producer);
+ if (existingProducer) {
+ auto producer = existingProducer.value().lock();
LOG_ERROR("Unexpected existing producer at the same address: "
- << pair.first->first << ", producer: "
- << (existingProducer ? existingProducer->getProducerName() : "(null)"));
+ << address << ", producer: " << (producer ? producer->getProducerName() : "(null)"));
callback(ResultUnknownError, {});
return;
}
@@ -311,12 +311,12 @@
reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
auto consumer = weakConsumerPtr.lock();
if (consumer) {
- auto pair = consumers_.emplace(consumer.get(), consumer);
- if (!pair.second) {
- auto existingConsumer = pair.first->second.lock();
+ auto address = consumer.get();
+ auto existingConsumer = consumers_.putIfAbsent(address, consumer);
+ if (existingConsumer) {
+ consumer = existingConsumer.value().lock();
LOG_ERROR("Unexpected existing consumer at the same address: "
- << pair.first->first
- << ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
+ << address << ", consumer: " << (consumer ? consumer->getName() : "(null)"));
}
} else {
LOG_ERROR("Unexpected case: the consumer is somehow expired");
@@ -512,12 +512,12 @@
void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
SubscribeCallback callback, ConsumerImplBasePtr consumer) {
if (result == ResultOk) {
- auto pair = consumers_.emplace(consumer.get(), consumer);
- if (!pair.second) {
- auto existingConsumer = pair.first->second.lock();
+ auto address = consumer.get();
+ auto existingConsumer = consumers_.putIfAbsent(address, consumer);
+ if (existingConsumer) {
+ auto consumer = existingConsumer.value().lock();
LOG_ERROR("Unexpected existing consumer at the same address: "
- << pair.first->first
- << ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
+ << address << ", consumer: " << (consumer ? consumer->getName() : "(null)"));
callback(ResultUnknownError, {});
return;
}
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 250845b..f429bfb 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -619,7 +619,7 @@
return;
}
if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) {
- possibleSendToDeadLetterTopicMessages_.emplace(m.getMessageId(), std::vector<Message>{m});
+ possibleSendToDeadLetterTopicMessages_.put(m.getMessageId(), std::vector<Message>{m});
if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) {
redeliverUnacknowledgedMessages({m.getMessageId()});
increaseAvailablePermits(cnx);
@@ -786,7 +786,7 @@
}
if (!possibleToDeadLetter.empty()) {
- possibleSendToDeadLetterTopicMessages_.emplace(batchedMessage.getMessageId(), possibleToDeadLetter);
+ possibleSendToDeadLetterTopicMessages_.put(batchedMessage.getMessageId(), possibleToDeadLetter);
if (redeliveryCount > deadLetterPolicy_.getMaxRedeliverCount()) {
redeliverUnacknowledgedMessages({batchedMessage.getMessageId()});
}
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index dddade5..8a43173 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -260,7 +260,7 @@
consumer->getConsumerCreatedFuture().addListener(std::bind(
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
- consumers_.emplace(topicName->toString(), consumer);
+ consumers_.put(topicName->toString(), consumer);
LOG_DEBUG("Creating Consumer for - " << topicName << " - " << consumerStr_);
consumer->start();
@@ -287,7 +287,7 @@
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
consumer->setPartitionIndex(i);
- consumers_.emplace(topicPartitionName, consumer);
+ consumers_.put(topicPartitionName, consumer);
LOG_DEBUG("Creating Consumer for - " << topicPartitionName << " - " << consumerStr_);
consumer->start();
}
@@ -1063,7 +1063,7 @@
});
consumer->setPartitionIndex(partitionIndex);
consumer->start();
- consumers_.emplace(topicPartitionName, consumer);
+ consumers_.put(topicPartitionName, consumer);
LOG_INFO("Add Creating Consumer for - " << topicPartitionName << " - " << consumerStr_
<< " consumerSize: " << consumers_.size());
}
diff --git a/lib/SynchronizedHashMap.h b/lib/SynchronizedHashMap.h
index e224913..dacaf45 100644
--- a/lib/SynchronizedHashMap.h
+++ b/lib/SynchronizedHashMap.h
@@ -59,10 +59,22 @@
}
}
- template <typename... Args>
- std::pair<Iterator, bool> emplace(Args&&... args) {
+ // Put a new key-value pair if the key does not exist.
+ // Return boost::none if the key already exists or the existing value.
+ OptValue putIfAbsent(const K& key, const V& value) {
Lock lock(mutex_);
- return data_.emplace(std::forward<Args>(args)...);
+ auto pair = data_.emplace(key, value);
+ if (pair.second) {
+ return boost::none;
+ } else {
+ return pair.first->second;
+ }
+ }
+
+ // Put a key-value pair no matter if the key exists.
+ void put(const K& key, const V& value) {
+ Lock lock(mutex_);
+ data_[key] = value;
}
void forEach(std::function<void(const K&, const V&)> f) const {
diff --git a/lib/TableViewImpl.cc b/lib/TableViewImpl.cc
index fd24932..e434e60 100644
--- a/lib/TableViewImpl.cc
+++ b/lib/TableViewImpl.cc
@@ -104,7 +104,7 @@
if (msg.getLength() == 0) {
data_.remove(msg.getPartitionKey());
} else {
- data_.emplace(msg.getPartitionKey(), value);
+ data_.put(msg.getPartitionKey(), value);
}
Lock lock(listenersMutex_);
@@ -167,4 +167,4 @@
});
}
-} // namespace pulsar
\ No newline at end of file
+} // namespace pulsar
diff --git a/tests/SynchronizedHashMapTest.cc b/tests/SynchronizedHashMapTest.cc
index cf184d9..3dba6bf 100644
--- a/tests/SynchronizedHashMapTest.cc
+++ b/tests/SynchronizedHashMapTest.cc
@@ -20,6 +20,7 @@
#include <algorithm>
#include <atomic>
+#include <boost/optional/optional_io.hpp>
#include <chrono>
#include <thread>
#include <vector>
@@ -100,18 +101,27 @@
ASSERT_TRUE(values.empty());
ASSERT_EQ(result, 1);
- m.emplace(1, 100);
+ ASSERT_EQ(m.putIfAbsent(1, 100), boost::none);
+ ASSERT_EQ(m.putIfAbsent(1, 101), boost::optional<int>(100));
m.forEachValue([&values](int value, SharedFuture) { values.emplace_back(value); },
[&result] { result = 2; });
ASSERT_EQ(values, (std::vector<int>({100})));
ASSERT_EQ(result, 1);
+ m.put(1, 102);
values.clear();
- m.emplace(2, 200);
+ m.forEachValue([&values](int value, SharedFuture) { values.emplace_back(value); },
+ [&result] { result = 2; });
+ ASSERT_EQ(values, (std::vector<int>({102})));
+ ASSERT_EQ(result, 1);
+
+ values.clear();
+ ASSERT_EQ(m.putIfAbsent(2, 200), boost::none);
+ ASSERT_EQ(m.putIfAbsent(2, 201), boost::optional<int>(200));
m.forEachValue([&values](int value, SharedFuture) { values.emplace_back(value); },
[&result] { result = 2; });
std::sort(values.begin(), values.end());
- ASSERT_EQ(values, (std::vector<int>({100, 200})));
+ ASSERT_EQ(values, (std::vector<int>({102, 200})));
ASSERT_EQ(result, 1);
}
diff --git a/tests/TableViewTest.cc b/tests/TableViewTest.cc
index 2515818..b51b2c8 100644
--- a/tests/TableViewTest.cc
+++ b/tests/TableViewTest.cc
@@ -96,10 +96,22 @@
// assert interfaces.
std::string value;
+ ASSERT_TRUE(tableView.containsKey("key1"));
ASSERT_TRUE(tableView.getValue("key1", value));
ASSERT_EQ(value, "value1");
+
+ // Test value update
+ ASSERT_EQ(ResultOk,
+ producer.send(MessageBuilder().setPartitionKey("key1").setContent("value1-update").build()));
+ ASSERT_TRUE(waitUntil(std::chrono::seconds(2), [&tableView]() {
+ std::string value;
+ tableView.getValue("key1", value);
+ return value == "value1-update";
+ }));
+
+ // retrieveValue will remove the key/value from the table view.
ASSERT_TRUE(tableView.retrieveValue("key1", value));
- ASSERT_EQ(value, "value1");
+ ASSERT_EQ(value, "value1-update");
ASSERT_FALSE(tableView.containsKey("key1"));
ASSERT_EQ(tableView.snapshot().size(), count * 2 - 1);
ASSERT_EQ(tableView.size(), 0);
diff --git a/tests/extensibleLM/ExtensibleLoadManagerTest.cc b/tests/extensibleLM/ExtensibleLoadManagerTest.cc
index c7e5aa0..4af0806 100644
--- a/tests/extensibleLM/ExtensibleLoadManagerTest.cc
+++ b/tests/extensibleLM/ExtensibleLoadManagerTest.cc
@@ -116,7 +116,7 @@
ASSERT_EQ(sendResult, ResultOk);
ASSERT_TRUE(elapsed < maxWaitTimeMs);
- producedMsgs.emplace(i, i);
+ producedMsgs.put(i, i);
i++;
}
LOG_INFO("producer finished");
@@ -143,7 +143,7 @@
LOG_INFO("acked i:" << i << " " << elapsed << " ms");
ASSERT_TRUE(elapsed < maxWaitTimeMs);
ASSERT_EQ(ackResult, ResultOk);
- consumedMsgs.emplace(i, i);
+ consumedMsgs.put(i, i);
}
LOG_INFO("consumer finished");
};