Fix wrong results of hasMessageAvailable and readNext after seeking by timestamp (#422)

Fixes https://github.com/apache/pulsar-client-cpp/issues/420

It's a catch-up for https://github.com/apache/pulsar/pull/22363
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 7d5250e..e5df421 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -1050,7 +1050,9 @@
  */
 void ConsumerImpl::clearReceiveQueue() {
     if (duringSeek()) {
-        startMessageId_ = seekMessageId_.get();
+        if (!hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
+            startMessageId_ = seekMessageId_.get();
+        }
         SeekStatus expected = SeekStatus::COMPLETED;
         if (seekStatus_.compare_exchange_strong(expected, SeekStatus::NOT_STARTED)) {
             auto seekCallback = seekCallback_.release();
@@ -1476,7 +1478,7 @@
         return;
     }
     const auto requestId = client->newRequestId();
-    seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), msgId, 0L, callback);
+    seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), SeekArg{msgId}, callback);
 }
 
 void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
@@ -1495,8 +1497,8 @@
         return;
     }
     const auto requestId = client->newRequestId();
-    seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, timestamp), MessageId::earliest(),
-                      timestamp, callback);
+    seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, timestamp), SeekArg{timestamp},
+                      callback);
 }
 
 bool ConsumerImpl::isReadCompacted() { return readCompacted_; }
@@ -1509,7 +1511,7 @@
             (lastDequedMessageId_ == MessageId::earliest()) &&
             (startMessageId_.get().value_or(MessageId::earliest()) == MessageId::latest());
     }
-    if (compareMarkDeletePosition) {
+    if (compareMarkDeletePosition || hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
         auto self = get_shared_this_ptr();
         getLastMessageIdAsync([self, callback](Result result, const GetLastMessageIdResponse& response) {
             if (result != ResultOk) {
@@ -1518,8 +1520,8 @@
             }
             auto handleResponse = [self, response, callback] {
                 if (response.hasMarkDeletePosition() && response.getLastMessageId().entryId() >= 0) {
-                    // We only care about comparing ledger ids and entry ids as mark delete position doesn't
-                    // have other ids such as batch index
+                    // We only care about comparing ledger ids and entry ids as mark delete position
+                    // doesn't have other ids such as batch index
                     auto compareResult = compareLedgerAndEntryId(response.getMarkDeletePosition(),
                                                                  response.getLastMessageId());
                     callback(ResultOk, self->config_.isStartMessageIdInclusive() ? compareResult <= 0
@@ -1528,7 +1530,8 @@
                     callback(ResultOk, false);
                 }
             };
-            if (self->config_.isStartMessageIdInclusive()) {
+            if (self->config_.isStartMessageIdInclusive() &&
+                !self->hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
                 self->seekAsync(response.getLastMessageId(), [callback, handleResponse](Result result) {
                     if (result != ResultOk) {
                         callback(result, {});
@@ -1644,8 +1647,8 @@
 
 uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; }
 
-void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& seekId,
-                                     long timestamp, ResultCallback callback) {
+void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const SeekArg& seekArg,
+                                     ResultCallback callback) {
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
         LOG_ERROR(getName() << " Client Connection not ready for Consumer");
@@ -1655,21 +1658,21 @@
 
     auto expected = SeekStatus::NOT_STARTED;
     if (!seekStatus_.compare_exchange_strong(expected, SeekStatus::IN_PROGRESS)) {
-        LOG_ERROR(getName() << " attempted to seek (" << seekId << ", " << timestamp << " when the status is "
+        LOG_ERROR(getName() << " attempted to seek " << seekArg << " when the status is "
                             << static_cast<int>(expected));
         callback(ResultNotAllowedError);
         return;
     }
 
     const auto originalSeekMessageId = seekMessageId_.get();
-    seekMessageId_ = seekId;
+    if (boost::get<uint64_t>(&seekArg)) {
+        hasSoughtByTimestamp_.store(true, std::memory_order_release);
+    } else {
+        seekMessageId_ = *boost::get<MessageId>(&seekArg);
+    }
     seekStatus_ = SeekStatus::IN_PROGRESS;
     seekCallback_ = std::move(callback);
-    if (timestamp > 0) {
-        LOG_INFO(getName() << " Seeking subscription to " << timestamp);
-    } else {
-        LOG_INFO(getName() << " Seeking subscription to " << seekId);
-    }
+    LOG_INFO(getName() << " Seeking subscription to " << seekArg);
 
     std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()};
 
@@ -1692,7 +1695,9 @@
                     // It's during reconnection, complete the seek future after connection is established
                     seekStatus_ = SeekStatus::COMPLETED;
                 } else {
-                    startMessageId_ = seekMessageId_.get();
+                    if (!hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
+                        startMessageId_ = seekMessageId_.get();
+                    }
                     seekCallback_.release()(result);
                 }
             } else {
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 82e323b..35636ad 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -22,6 +22,7 @@
 #include <pulsar/Reader.h>
 
 #include <boost/optional.hpp>
+#include <boost/variant.hpp>
 #include <functional>
 #include <list>
 #include <memory>
@@ -201,7 +202,18 @@
                                        BrokerGetLastMessageIdCallback callback);
 
     void clearReceiveQueue();
-    void seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& seekId, long timestamp,
+    using SeekArg = boost::variant<uint64_t, MessageId>;
+    friend std::ostream& operator<<(std::ostream& os, const SeekArg& seekArg) {
+        auto ptr = boost::get<uint64_t>(&seekArg);
+        if (ptr) {
+            os << *ptr;
+        } else {
+            os << *boost::get<MessageId>(&seekArg);
+        }
+        return os;
+    }
+
+    void seekAsyncInternal(long requestId, SharedBuffer seek, const SeekArg& seekArg,
                            ResultCallback callback);
     void processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb);
 
@@ -250,6 +262,7 @@
     Synchronized<ResultCallback> seekCallback_{[](Result) {}};
     Synchronized<boost::optional<MessageId>> startMessageId_;
     Synchronized<MessageId> seekMessageId_{MessageId::earliest()};
+    std::atomic<bool> hasSoughtByTimestamp_{false};
 
     bool duringSeek() const { return seekStatus_ != SeekStatus::NOT_STARTED; }
 
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index 9d2fe5f..92fdf62 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -700,9 +700,16 @@
     client.close();
 }
 
-TEST(ReaderSeekTest, testSeekForMessageId) {
-    Client client(serviceUrl);
+class ReaderSeekTest : public ::testing::TestWithParam<bool> {
+   public:
+    void SetUp() override { client = Client{serviceUrl}; }
 
+    void TearDown() override { client.close(); }
+
+    Client client{serviceUrl};
+};
+
+TEST_F(ReaderSeekTest, testSeekForMessageId) {
     const std::string topic = "test-seek-for-message-id-" + std::to_string(time(nullptr));
 
     Producer producer;
@@ -752,18 +759,24 @@
     producer.close();
 }
 
-class ReaderSeekTest : public ::testing::TestWithParam<bool> {};
+#define EXPECT_HAS_MESSAGE_AVAILABLE(reader, expected)           \
+    {                                                            \
+        bool actual;                                             \
+        ASSERT_EQ(ResultOk, reader.hasMessageAvailable(actual)); \
+        EXPECT_EQ(actual, (expected));                           \
+    }
 
-TEST(ReaderSeekTest, testStartAtLatestMessageId) {
-    Client client(serviceUrl);
-
+TEST_F(ReaderSeekTest, testStartAtLatestMessageId) {
     const std::string topic = "test-seek-latest-message-id-" + std::to_string(time(nullptr));
 
     Producer producer;
     ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
 
     MessageId id;
-    ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg").build(), id));
+    for (int i = 0; i < 10; i++) {
+        ASSERT_EQ(ResultOk,
+                  producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build(), id));
+    }
 
     Reader readerExclusive;
     ASSERT_EQ(ResultOk,
@@ -774,20 +787,24 @@
               client.createReader(topic, MessageId::latest(),
                                   ReaderConfiguration().setStartMessageIdInclusive(true), readerInclusive));
 
+    EXPECT_HAS_MESSAGE_AVAILABLE(readerExclusive, false);
+    EXPECT_HAS_MESSAGE_AVAILABLE(readerInclusive, true);
+
     Message msg;
-    bool hasMsgAvaliable = false;
-    readerInclusive.hasMessageAvailable(hasMsgAvaliable);
-    ASSERT_TRUE(hasMsgAvaliable);
     ASSERT_EQ(ResultOk, readerInclusive.readNext(msg, 3000));
-    ASSERT_EQ(ResultTimeout, readerExclusive.readNext(msg, 3000));
+    ASSERT_EQ(msg.getDataAsString(), "msg-9");
+
+    readerInclusive.seek(0L);
+    EXPECT_HAS_MESSAGE_AVAILABLE(readerInclusive, true);
+    ASSERT_EQ(ResultOk, readerInclusive.readNext(msg, 3000));
+    ASSERT_EQ(msg.getDataAsString(), "msg-0");
 
     readerExclusive.close();
     readerInclusive.close();
     producer.close();
 }
 
-TEST(ReaderTest, testSeekInProgress) {
-    Client client(serviceUrl);
+TEST_F(ReaderSeekTest, testSeekInProgress) {
     const auto topic = "test-seek-in-progress-" + std::to_string(time(nullptr));
     Reader reader;
     ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader));
@@ -798,11 +815,9 @@
     Result result;
     promise.getFuture().get(result);
     ASSERT_EQ(result, ResultNotAllowedError);
-    client.close();
 }
 
 TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) {
-    Client client(serviceUrl);
     const auto topic = "test-has-message-available-after-seek-to-end-" + std::to_string(time(nullptr));
     Producer producer;
     ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
@@ -814,7 +829,6 @@
 
     bool hasMessageAvailable;
     if (GetParam()) {
-        // Test the case when `ConsumerImpl.lastMessageIdInBroker_` has been initialized
         ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
     }
 
@@ -834,8 +848,44 @@
     ASSERT_EQ(ResultOk, reader.seek(MessageId::latest()));
     ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
     ASSERT_FALSE(hasMessageAvailable);
+}
 
-    client.close();
+TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {
+    using namespace std::chrono;
+    const auto topic = "test-has-message-available-after-seek-timestamp-" + std::to_string(time(nullptr));
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    MessageId sentMsgId;
+    const auto timestampBeforeSend =
+        duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+    ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg").build(), sentMsgId));
+
+    auto createReader = [this, &topic](Reader& reader, const MessageId& msgId) {
+        ASSERT_EQ(ResultOk, client.createReader(topic, msgId, {}, reader));
+        if (GetParam()) {
+            if (msgId == MessageId::earliest()) {
+                EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
+            } else {
+                EXPECT_HAS_MESSAGE_AVAILABLE(reader, false);
+            }
+        }
+    };
+
+    std::vector<MessageId> msgIds{MessageId::earliest(), sentMsgId, MessageId::latest()};
+
+    for (auto&& msgId : msgIds) {
+        Reader reader;
+        createReader(reader, msgId);
+        ASSERT_EQ(ResultOk,
+                  reader.seek(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count()));
+        EXPECT_HAS_MESSAGE_AVAILABLE(reader, false);
+    }
+    for (auto&& msgId : msgIds) {
+        Reader reader;
+        createReader(reader, msgId);
+        ASSERT_EQ(ResultOk, reader.seek(timestampBeforeSend));
+        EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
+    }
 }
 
 INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));