Fix possible zombie consumer when closing after reconnection (#518)
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 92d25cb..3d5d294 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -293,13 +293,38 @@
Result handleResult = ResultOk;
if (result == ResultOk) {
- LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
{
Lock mutexLock(mutex_);
+ if (!changeToReadyState()) {
+ auto client = client_.lock();
+ if (client) {
+ LOG_INFO(getName() << "Closing subscribed consumer since it was already closed");
+ int requestId = client->newRequestId();
+ auto name = getName();
+ cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId)
+ .addListener([name](Result result, const ResponseData&) {
+ if (result == ResultOk) {
+ LOG_INFO(name << "Closed consumer successfully after subscribe completed");
+ } else {
+ LOG_WARN(name << "Failed to close consumer: " << strResult(result));
+ }
+ });
+ } else {
+ // This should not happen normally because if client is destroyed, the connection pool
+ // should also be closed, which means all connections should be closed. Close the
+ // connection to let broker know this registered consumer is inactive.
+ LOG_WARN(getName()
+ << "Client already closed when subscribe completed, close the connection "
+ << cnx->cnxString());
+ cnx->close(ResultNotConnected);
+ }
+ return ResultAlreadyClosed;
+ }
+
+ LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
setCnx(cnx);
incomingMessages_.clear();
possibleSendToDeadLetterTopicMessages_.clear();
- state_ = Ready;
backoff_.reset();
if (!messageListener_ && config_.getReceiverQueueSize() == 0) {
// Complicated logic since we don't have a isLocked() function for mutex
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index df7dc24..967322f 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -148,6 +148,11 @@
firstRequestIdAfterConnect_.store(requestId, std::memory_order_release);
}
+ bool changeToReadyState() noexcept {
+ State expected = Pending;
+ return state_ == Ready || state_.compare_exchange_strong(expected, Ready);
+ }
+
private:
DeadlineTimerPtr timer_;
DeadlineTimerPtr creationTimer_;
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index 3aa1dd3..f1bca77 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -1573,4 +1573,26 @@
ASSERT_EQ(ResultOk, client.close());
}
+TEST(ConsumerTest, testCloseAfterSeek) {
+ const auto topic = "test-close-after-seek-" + std::to_string(time(nullptr));
+ const auto subscription = "sub";
+ Client client(lookupUrl);
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, consumer));
+ ASSERT_EQ(ResultOk, consumer.seek(TimeUtils::currentTimeMillis()));
+ consumer.closeAsync(nullptr);
+
+ // Test the previous consumer will be closed even after seek is done, at the moment the connection might
+ // not be established.
+ ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, consumer));
+
+ // Test creating a consumer from a different client should also work for this case
+ Client anotherClient(lookupUrl);
+ consumer.closeAsync(nullptr);
+ ASSERT_EQ(ResultOk, anotherClient.subscribe(topic, subscription, consumer));
+
+ client.close();
+ anotherClient.close();
+}
+
} // namespace pulsar