Fix deadlock for negative acknowledgment (#266)

Fixes https://github.com/apache/pulsar-client-cpp/issues/265

### Modifications

Make `timer_` const and `enabledForTesting_` atomic in
`NegativeAcksTracker` so that the `mutex_` can be used only for the
`nackedMessages_` field. After that, we can unlock `mutex_` in
`handleTimer` to avoid the potential deadlock from user-provided logger
or intercepter.

Add `ConsumerTest.testNegativeAckDeadlock` to verify the fix.

(cherry picked from commit 8a9b2dc7f58b9b4a888be11d8068839abec51310)
diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc
index 451a13f..5c3ef3f 100644
--- a/lib/NegativeAcksTracker.cc
+++ b/lib/NegativeAcksTracker.cc
@@ -35,8 +35,7 @@
                                          const ConsumerConfiguration &conf)
     : consumer_(consumer),
       timerInterval_(0),
-      executor_(client->getIOExecutorProvider()->get()),
-      enabledForTesting_(true) {
+      timer_(client->getIOExecutorProvider()->get()->createDeadlineTimer()) {
     static const long MIN_NACK_DELAY_MILLIS = 100;
 
     nackDelay_ =
@@ -47,7 +46,9 @@
 }
 
 void NegativeAcksTracker::scheduleTimer() {
-    timer_ = executor_->createDeadlineTimer();
+    if (closed_) {
+        return;
+    }
     timer_->expires_from_now(timerInterval_);
     timer_->async_wait(std::bind(&NegativeAcksTracker::handleTimer, this, std::placeholders::_1));
 }
@@ -58,8 +59,7 @@
         return;
     }
 
-    std::lock_guard<std::mutex> lock(mutex_);
-    timer_ = nullptr;
+    std::unique_lock<std::mutex> lock(mutex_);
 
     if (nackedMessages_.empty() || !enabledForTesting_) {
         return;
@@ -78,6 +78,7 @@
             ++it;
         }
     }
+    lock.unlock();
 
     if (!messagesToRedeliver.empty()) {
         consumer_.onNegativeAcksSend(messagesToRedeliver);
@@ -87,34 +88,30 @@
 }
 
 void NegativeAcksTracker::add(const MessageId &m) {
-    std::lock_guard<std::mutex> lock(mutex_);
-
+    auto msgId = discardBatch(m);
     auto now = Clock::now();
 
-    // Erase batch id to group all nacks from same batch
-    nackedMessages_[discardBatch(m)] = now + nackDelay_;
-
-    if (!timer_) {
-        scheduleTimer();
+    {
+        std::lock_guard<std::mutex> lock{mutex_};
+        // Erase batch id to group all nacks from same batch
+        nackedMessages_[msgId] = now + nackDelay_;
     }
+
+    scheduleTimer();
 }
 
 void NegativeAcksTracker::close() {
+    closed_ = true;
+    boost::system::error_code ec;
+    timer_->cancel(ec);
     std::lock_guard<std::mutex> lock(mutex_);
-
-    if (timer_) {
-        boost::system::error_code ec;
-        timer_->cancel(ec);
-    }
-    timer_ = nullptr;
     nackedMessages_.clear();
 }
 
 void NegativeAcksTracker::setEnabledForTesting(bool enabled) {
-    std::lock_guard<std::mutex> lock(mutex_);
     enabledForTesting_ = enabled;
 
-    if (enabledForTesting_ && !timer_) {
+    if (enabledForTesting_) {
         scheduleTimer();
     }
 }
diff --git a/lib/NegativeAcksTracker.h b/lib/NegativeAcksTracker.h
index f8b334b..029f7d2 100644
--- a/lib/NegativeAcksTracker.h
+++ b/lib/NegativeAcksTracker.h
@@ -22,6 +22,7 @@
 #include <pulsar/ConsumerConfiguration.h>
 #include <pulsar/MessageId.h>
 
+#include <atomic>
 #include <boost/asio/deadline_timer.hpp>
 #include <chrono>
 #include <map>
@@ -65,9 +66,9 @@
     typedef typename std::chrono::steady_clock Clock;
     std::map<MessageId, Clock::time_point> nackedMessages_;
 
-    ExecutorServicePtr executor_;
-    DeadlineTimerPtr timer_;
-    bool enabledForTesting_;  // to be able to test deterministically
+    const DeadlineTimerPtr timer_;
+    std::atomic_bool closed_{false};
+    std::atomic_bool enabledForTesting_{true};  // to be able to test deterministically
 
     FRIEND_TEST(ConsumerTest, testNegativeAcksTrackerClose);
 };
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index 23ea687..7867a57 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -20,9 +20,11 @@
 #include <pulsar/Client.h>
 
 #include <array>
+#include <atomic>
 #include <chrono>
 #include <ctime>
 #include <map>
+#include <mutex>
 #include <set>
 #include <thread>
 #include <vector>
@@ -1240,4 +1242,67 @@
 
 INSTANTIATE_TEST_CASE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false));
 
+class InterceptorForNegAckDeadlock : public ConsumerInterceptor {
+   public:
+    Message beforeConsume(const Consumer& consumer, const Message& message) override { return message; }
+
+    void onAcknowledge(const Consumer& consumer, Result result, const MessageId& messageID) override {}
+
+    void onAcknowledgeCumulative(const Consumer& consumer, Result result,
+                                 const MessageId& messageID) override {}
+
+    void onNegativeAcksSend(const Consumer& consumer, const std::set<MessageId>& messageIds) override {
+        duringNegativeAck_ = true;
+        // Wait for the next time Consumer::negativeAcknowledge is called
+        std::this_thread::sleep_for(std::chrono::milliseconds(500));
+        std::lock_guard<std::mutex> lock{mutex_};
+        LOG_INFO("onNegativeAcksSend is called for " << consumer.getTopic());
+        duringNegativeAck_ = false;
+    }
+
+    static std::mutex mutex_;
+    static std::atomic_bool duringNegativeAck_;
+};
+
+std::mutex InterceptorForNegAckDeadlock::mutex_;
+std::atomic_bool InterceptorForNegAckDeadlock::duringNegativeAck_{false};
+
+// For https://github.com/apache/pulsar-client-cpp/issues/265
+TEST(ConsumerTest, testNegativeAckDeadlock) {
+    const std::string topic = "test-negative-ack-deadlock";
+    Client client{lookupUrl};
+    ConsumerConfiguration conf;
+    conf.setNegativeAckRedeliveryDelayMs(500);
+    conf.intercept({std::make_shared<InterceptorForNegAckDeadlock>()});
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", conf, consumer));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    producer.send(MessageBuilder().setContent("msg").build());
+
+    Message msg;
+    ASSERT_EQ(ResultOk, consumer.receive(msg));
+
+    auto& duringNegativeAck = InterceptorForNegAckDeadlock::duringNegativeAck_;
+    duringNegativeAck = false;
+    consumer.negativeAcknowledge(msg);  // schedule the negative ack timer
+    // Wait until the negative ack timer is triggered and onNegativeAcksSend will be called
+    for (int i = 0; !duringNegativeAck && i < 100; i++) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+    ASSERT_TRUE(duringNegativeAck);
+
+    {
+        std::lock_guard<std::mutex> lock{InterceptorForNegAckDeadlock::mutex_};
+        consumer.negativeAcknowledge(msg);
+    }
+    for (int i = 0; duringNegativeAck && i < 100; i++) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+    ASSERT_FALSE(duringNegativeAck);
+
+    client.close();
+}
+
 }  // namespace pulsar