Fix wrong results of hasMessageAvailable and readNext after seeking by timestamp (#422)
Fixes https://github.com/apache/pulsar-client-cpp/issues/420
It's a catch-up for https://github.com/apache/pulsar/pull/22363
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 7d5250e..e5df421 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -1050,7 +1050,9 @@
*/
void ConsumerImpl::clearReceiveQueue() {
if (duringSeek()) {
- startMessageId_ = seekMessageId_.get();
+ if (!hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
+ startMessageId_ = seekMessageId_.get();
+ }
SeekStatus expected = SeekStatus::COMPLETED;
if (seekStatus_.compare_exchange_strong(expected, SeekStatus::NOT_STARTED)) {
auto seekCallback = seekCallback_.release();
@@ -1476,7 +1478,7 @@
return;
}
const auto requestId = client->newRequestId();
- seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), msgId, 0L, callback);
+ seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, msgId), SeekArg{msgId}, callback);
}
void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
@@ -1495,8 +1497,8 @@
return;
}
const auto requestId = client->newRequestId();
- seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, timestamp), MessageId::earliest(),
- timestamp, callback);
+ seekAsyncInternal(requestId, Commands::newSeek(consumerId_, requestId, timestamp), SeekArg{timestamp},
+ callback);
}
bool ConsumerImpl::isReadCompacted() { return readCompacted_; }
@@ -1509,7 +1511,7 @@
(lastDequedMessageId_ == MessageId::earliest()) &&
(startMessageId_.get().value_or(MessageId::earliest()) == MessageId::latest());
}
- if (compareMarkDeletePosition) {
+ if (compareMarkDeletePosition || hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
auto self = get_shared_this_ptr();
getLastMessageIdAsync([self, callback](Result result, const GetLastMessageIdResponse& response) {
if (result != ResultOk) {
@@ -1518,8 +1520,8 @@
}
auto handleResponse = [self, response, callback] {
if (response.hasMarkDeletePosition() && response.getLastMessageId().entryId() >= 0) {
- // We only care about comparing ledger ids and entry ids as mark delete position doesn't
- // have other ids such as batch index
+ // We only care about comparing ledger ids and entry ids as mark delete position
+ // doesn't have other ids such as batch index
auto compareResult = compareLedgerAndEntryId(response.getMarkDeletePosition(),
response.getLastMessageId());
callback(ResultOk, self->config_.isStartMessageIdInclusive() ? compareResult <= 0
@@ -1528,7 +1530,8 @@
callback(ResultOk, false);
}
};
- if (self->config_.isStartMessageIdInclusive()) {
+ if (self->config_.isStartMessageIdInclusive() &&
+ !self->hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
self->seekAsync(response.getLastMessageId(), [callback, handleResponse](Result result) {
if (result != ResultOk) {
callback(result, {});
@@ -1644,8 +1647,8 @@
uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; }
-void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& seekId,
- long timestamp, ResultCallback callback) {
+void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const SeekArg& seekArg,
+ ResultCallback callback) {
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
LOG_ERROR(getName() << " Client Connection not ready for Consumer");
@@ -1655,21 +1658,21 @@
auto expected = SeekStatus::NOT_STARTED;
if (!seekStatus_.compare_exchange_strong(expected, SeekStatus::IN_PROGRESS)) {
- LOG_ERROR(getName() << " attempted to seek (" << seekId << ", " << timestamp << " when the status is "
+ LOG_ERROR(getName() << " attempted to seek " << seekArg << " when the status is "
<< static_cast<int>(expected));
callback(ResultNotAllowedError);
return;
}
const auto originalSeekMessageId = seekMessageId_.get();
- seekMessageId_ = seekId;
+ if (boost::get<uint64_t>(&seekArg)) {
+ hasSoughtByTimestamp_.store(true, std::memory_order_release);
+ } else {
+ seekMessageId_ = *boost::get<MessageId>(&seekArg);
+ }
seekStatus_ = SeekStatus::IN_PROGRESS;
seekCallback_ = std::move(callback);
- if (timestamp > 0) {
- LOG_INFO(getName() << " Seeking subscription to " << timestamp);
- } else {
- LOG_INFO(getName() << " Seeking subscription to " << seekId);
- }
+ LOG_INFO(getName() << " Seeking subscription to " << seekArg);
std::weak_ptr<ConsumerImpl> weakSelf{get_shared_this_ptr()};
@@ -1692,7 +1695,9 @@
// It's during reconnection, complete the seek future after connection is established
seekStatus_ = SeekStatus::COMPLETED;
} else {
- startMessageId_ = seekMessageId_.get();
+ if (!hasSoughtByTimestamp_.load(std::memory_order_acquire)) {
+ startMessageId_ = seekMessageId_.get();
+ }
seekCallback_.release()(result);
}
} else {
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 82e323b..35636ad 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -22,6 +22,7 @@
#include <pulsar/Reader.h>
#include <boost/optional.hpp>
+#include <boost/variant.hpp>
#include <functional>
#include <list>
#include <memory>
@@ -201,7 +202,18 @@
BrokerGetLastMessageIdCallback callback);
void clearReceiveQueue();
- void seekAsyncInternal(long requestId, SharedBuffer seek, const MessageId& seekId, long timestamp,
+ using SeekArg = boost::variant<uint64_t, MessageId>;
+ friend std::ostream& operator<<(std::ostream& os, const SeekArg& seekArg) {
+ auto ptr = boost::get<uint64_t>(&seekArg);
+ if (ptr) {
+ os << *ptr;
+ } else {
+ os << *boost::get<MessageId>(&seekArg);
+ }
+ return os;
+ }
+
+ void seekAsyncInternal(long requestId, SharedBuffer seek, const SeekArg& seekArg,
ResultCallback callback);
void processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb);
@@ -250,6 +262,7 @@
Synchronized<ResultCallback> seekCallback_{[](Result) {}};
Synchronized<boost::optional<MessageId>> startMessageId_;
Synchronized<MessageId> seekMessageId_{MessageId::earliest()};
+ std::atomic<bool> hasSoughtByTimestamp_{false};
bool duringSeek() const { return seekStatus_ != SeekStatus::NOT_STARTED; }
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index 9d2fe5f..92fdf62 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -700,9 +700,16 @@
client.close();
}
-TEST(ReaderSeekTest, testSeekForMessageId) {
- Client client(serviceUrl);
+class ReaderSeekTest : public ::testing::TestWithParam<bool> {
+ public:
+ void SetUp() override { client = Client{serviceUrl}; }
+ void TearDown() override { client.close(); }
+
+ Client client{serviceUrl};
+};
+
+TEST_F(ReaderSeekTest, testSeekForMessageId) {
const std::string topic = "test-seek-for-message-id-" + std::to_string(time(nullptr));
Producer producer;
@@ -752,18 +759,24 @@
producer.close();
}
-class ReaderSeekTest : public ::testing::TestWithParam<bool> {};
+#define EXPECT_HAS_MESSAGE_AVAILABLE(reader, expected) \
+ { \
+ bool actual; \
+ ASSERT_EQ(ResultOk, reader.hasMessageAvailable(actual)); \
+ EXPECT_EQ(actual, (expected)); \
+ }
-TEST(ReaderSeekTest, testStartAtLatestMessageId) {
- Client client(serviceUrl);
-
+TEST_F(ReaderSeekTest, testStartAtLatestMessageId) {
const std::string topic = "test-seek-latest-message-id-" + std::to_string(time(nullptr));
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
MessageId id;
- ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg").build(), id));
+ for (int i = 0; i < 10; i++) {
+ ASSERT_EQ(ResultOk,
+ producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build(), id));
+ }
Reader readerExclusive;
ASSERT_EQ(ResultOk,
@@ -774,20 +787,24 @@
client.createReader(topic, MessageId::latest(),
ReaderConfiguration().setStartMessageIdInclusive(true), readerInclusive));
+ EXPECT_HAS_MESSAGE_AVAILABLE(readerExclusive, false);
+ EXPECT_HAS_MESSAGE_AVAILABLE(readerInclusive, true);
+
Message msg;
- bool hasMsgAvaliable = false;
- readerInclusive.hasMessageAvailable(hasMsgAvaliable);
- ASSERT_TRUE(hasMsgAvaliable);
ASSERT_EQ(ResultOk, readerInclusive.readNext(msg, 3000));
- ASSERT_EQ(ResultTimeout, readerExclusive.readNext(msg, 3000));
+ ASSERT_EQ(msg.getDataAsString(), "msg-9");
+
+ readerInclusive.seek(0L);
+ EXPECT_HAS_MESSAGE_AVAILABLE(readerInclusive, true);
+ ASSERT_EQ(ResultOk, readerInclusive.readNext(msg, 3000));
+ ASSERT_EQ(msg.getDataAsString(), "msg-0");
readerExclusive.close();
readerInclusive.close();
producer.close();
}
-TEST(ReaderTest, testSeekInProgress) {
- Client client(serviceUrl);
+TEST_F(ReaderSeekTest, testSeekInProgress) {
const auto topic = "test-seek-in-progress-" + std::to_string(time(nullptr));
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader));
@@ -798,11 +815,9 @@
Result result;
promise.getFuture().get(result);
ASSERT_EQ(result, ResultNotAllowedError);
- client.close();
}
TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) {
- Client client(serviceUrl);
const auto topic = "test-has-message-available-after-seek-to-end-" + std::to_string(time(nullptr));
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
@@ -814,7 +829,6 @@
bool hasMessageAvailable;
if (GetParam()) {
- // Test the case when `ConsumerImpl.lastMessageIdInBroker_` has been initialized
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
}
@@ -834,8 +848,44 @@
ASSERT_EQ(ResultOk, reader.seek(MessageId::latest()));
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
ASSERT_FALSE(hasMessageAvailable);
+}
- client.close();
+TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {
+ using namespace std::chrono;
+ const auto topic = "test-has-message-available-after-seek-timestamp-" + std::to_string(time(nullptr));
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+ MessageId sentMsgId;
+ const auto timestampBeforeSend =
+ duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+ ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg").build(), sentMsgId));
+
+ auto createReader = [this, &topic](Reader& reader, const MessageId& msgId) {
+ ASSERT_EQ(ResultOk, client.createReader(topic, msgId, {}, reader));
+ if (GetParam()) {
+ if (msgId == MessageId::earliest()) {
+ EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
+ } else {
+ EXPECT_HAS_MESSAGE_AVAILABLE(reader, false);
+ }
+ }
+ };
+
+ std::vector<MessageId> msgIds{MessageId::earliest(), sentMsgId, MessageId::latest()};
+
+ for (auto&& msgId : msgIds) {
+ Reader reader;
+ createReader(reader, msgId);
+ ASSERT_EQ(ResultOk,
+ reader.seek(duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count()));
+ EXPECT_HAS_MESSAGE_AVAILABLE(reader, false);
+ }
+ for (auto&& msgId : msgIds) {
+ Reader reader;
+ createReader(reader, msgId);
+ ASSERT_EQ(ResultOk, reader.seek(timestampBeforeSend));
+ EXPECT_HAS_MESSAGE_AVAILABLE(reader, true);
+ }
}
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));