Fix deadlock for negative acknowledgment (#266)
Fixes https://github.com/apache/pulsar-client-cpp/issues/265
### Modifications
Make `timer_` const and `enabledForTesting_` atomic in
`NegativeAcksTracker` so that the `mutex_` can be used only for the
`nackedMessages_` field. After that, we can unlock `mutex_` in
`handleTimer` to avoid the potential deadlock from user-provided logger
or intercepter.
Add `ConsumerTest.testNegativeAckDeadlock` to verify the fix.
(cherry picked from commit 8a9b2dc7f58b9b4a888be11d8068839abec51310)
diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc
index 451a13f..5c3ef3f 100644
--- a/lib/NegativeAcksTracker.cc
+++ b/lib/NegativeAcksTracker.cc
@@ -35,8 +35,7 @@
const ConsumerConfiguration &conf)
: consumer_(consumer),
timerInterval_(0),
- executor_(client->getIOExecutorProvider()->get()),
- enabledForTesting_(true) {
+ timer_(client->getIOExecutorProvider()->get()->createDeadlineTimer()) {
static const long MIN_NACK_DELAY_MILLIS = 100;
nackDelay_ =
@@ -47,7 +46,9 @@
}
void NegativeAcksTracker::scheduleTimer() {
- timer_ = executor_->createDeadlineTimer();
+ if (closed_) {
+ return;
+ }
timer_->expires_from_now(timerInterval_);
timer_->async_wait(std::bind(&NegativeAcksTracker::handleTimer, this, std::placeholders::_1));
}
@@ -58,8 +59,7 @@
return;
}
- std::lock_guard<std::mutex> lock(mutex_);
- timer_ = nullptr;
+ std::unique_lock<std::mutex> lock(mutex_);
if (nackedMessages_.empty() || !enabledForTesting_) {
return;
@@ -78,6 +78,7 @@
++it;
}
}
+ lock.unlock();
if (!messagesToRedeliver.empty()) {
consumer_.onNegativeAcksSend(messagesToRedeliver);
@@ -87,34 +88,30 @@
}
void NegativeAcksTracker::add(const MessageId &m) {
- std::lock_guard<std::mutex> lock(mutex_);
-
+ auto msgId = discardBatch(m);
auto now = Clock::now();
- // Erase batch id to group all nacks from same batch
- nackedMessages_[discardBatch(m)] = now + nackDelay_;
-
- if (!timer_) {
- scheduleTimer();
+ {
+ std::lock_guard<std::mutex> lock{mutex_};
+ // Erase batch id to group all nacks from same batch
+ nackedMessages_[msgId] = now + nackDelay_;
}
+
+ scheduleTimer();
}
void NegativeAcksTracker::close() {
+ closed_ = true;
+ boost::system::error_code ec;
+ timer_->cancel(ec);
std::lock_guard<std::mutex> lock(mutex_);
-
- if (timer_) {
- boost::system::error_code ec;
- timer_->cancel(ec);
- }
- timer_ = nullptr;
nackedMessages_.clear();
}
void NegativeAcksTracker::setEnabledForTesting(bool enabled) {
- std::lock_guard<std::mutex> lock(mutex_);
enabledForTesting_ = enabled;
- if (enabledForTesting_ && !timer_) {
+ if (enabledForTesting_) {
scheduleTimer();
}
}
diff --git a/lib/NegativeAcksTracker.h b/lib/NegativeAcksTracker.h
index f8b334b..029f7d2 100644
--- a/lib/NegativeAcksTracker.h
+++ b/lib/NegativeAcksTracker.h
@@ -22,6 +22,7 @@
#include <pulsar/ConsumerConfiguration.h>
#include <pulsar/MessageId.h>
+#include <atomic>
#include <boost/asio/deadline_timer.hpp>
#include <chrono>
#include <map>
@@ -65,9 +66,9 @@
typedef typename std::chrono::steady_clock Clock;
std::map<MessageId, Clock::time_point> nackedMessages_;
- ExecutorServicePtr executor_;
- DeadlineTimerPtr timer_;
- bool enabledForTesting_; // to be able to test deterministically
+ const DeadlineTimerPtr timer_;
+ std::atomic_bool closed_{false};
+ std::atomic_bool enabledForTesting_{true}; // to be able to test deterministically
FRIEND_TEST(ConsumerTest, testNegativeAcksTrackerClose);
};
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index 23ea687..7867a57 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -20,9 +20,11 @@
#include <pulsar/Client.h>
#include <array>
+#include <atomic>
#include <chrono>
#include <ctime>
#include <map>
+#include <mutex>
#include <set>
#include <thread>
#include <vector>
@@ -1240,4 +1242,67 @@
INSTANTIATE_TEST_CASE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false));
+class InterceptorForNegAckDeadlock : public ConsumerInterceptor {
+ public:
+ Message beforeConsume(const Consumer& consumer, const Message& message) override { return message; }
+
+ void onAcknowledge(const Consumer& consumer, Result result, const MessageId& messageID) override {}
+
+ void onAcknowledgeCumulative(const Consumer& consumer, Result result,
+ const MessageId& messageID) override {}
+
+ void onNegativeAcksSend(const Consumer& consumer, const std::set<MessageId>& messageIds) override {
+ duringNegativeAck_ = true;
+ // Wait for the next time Consumer::negativeAcknowledge is called
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ std::lock_guard<std::mutex> lock{mutex_};
+ LOG_INFO("onNegativeAcksSend is called for " << consumer.getTopic());
+ duringNegativeAck_ = false;
+ }
+
+ static std::mutex mutex_;
+ static std::atomic_bool duringNegativeAck_;
+};
+
+std::mutex InterceptorForNegAckDeadlock::mutex_;
+std::atomic_bool InterceptorForNegAckDeadlock::duringNegativeAck_{false};
+
+// For https://github.com/apache/pulsar-client-cpp/issues/265
+TEST(ConsumerTest, testNegativeAckDeadlock) {
+ const std::string topic = "test-negative-ack-deadlock";
+ Client client{lookupUrl};
+ ConsumerConfiguration conf;
+ conf.setNegativeAckRedeliveryDelayMs(500);
+ conf.intercept({std::make_shared<InterceptorForNegAckDeadlock>()});
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", conf, consumer));
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+ producer.send(MessageBuilder().setContent("msg").build());
+
+ Message msg;
+ ASSERT_EQ(ResultOk, consumer.receive(msg));
+
+ auto& duringNegativeAck = InterceptorForNegAckDeadlock::duringNegativeAck_;
+ duringNegativeAck = false;
+ consumer.negativeAcknowledge(msg); // schedule the negative ack timer
+ // Wait until the negative ack timer is triggered and onNegativeAcksSend will be called
+ for (int i = 0; !duringNegativeAck && i < 100; i++) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ ASSERT_TRUE(duringNegativeAck);
+
+ {
+ std::lock_guard<std::mutex> lock{InterceptorForNegAckDeadlock::mutex_};
+ consumer.negativeAcknowledge(msg);
+ }
+ for (int i = 0; duringNegativeAck && i < 100; i++) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ ASSERT_FALSE(duringNegativeAck);
+
+ client.close();
+}
+
} // namespace pulsar