Fix possible zombie consumer when closing after reconnection (#518)

diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 92d25cb..3d5d294 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -293,13 +293,38 @@
     Result handleResult = ResultOk;
 
     if (result == ResultOk) {
-        LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
         {
             Lock mutexLock(mutex_);
+            if (!changeToReadyState()) {
+                auto client = client_.lock();
+                if (client) {
+                    LOG_INFO(getName() << "Closing subscribed consumer since it was already closed");
+                    int requestId = client->newRequestId();
+                    auto name = getName();
+                    cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId)
+                        .addListener([name](Result result, const ResponseData&) {
+                            if (result == ResultOk) {
+                                LOG_INFO(name << "Closed consumer successfully after subscribe completed");
+                            } else {
+                                LOG_WARN(name << "Failed to close consumer: " << strResult(result));
+                            }
+                        });
+                } else {
+                    // This should not happen normally because if client is destroyed, the connection pool
+                    // should also be closed, which means all connections should be closed. Close the
+                    // connection to let broker know this registered consumer is inactive.
+                    LOG_WARN(getName()
+                             << "Client already closed when subscribe completed, close the connection "
+                             << cnx->cnxString());
+                    cnx->close(ResultNotConnected);
+                }
+                return ResultAlreadyClosed;
+            }
+
+            LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
             setCnx(cnx);
             incomingMessages_.clear();
             possibleSendToDeadLetterTopicMessages_.clear();
-            state_ = Ready;
             backoff_.reset();
             if (!messageListener_ && config_.getReceiverQueueSize() == 0) {
                 // Complicated logic since we don't have a isLocked() function for mutex
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index df7dc24..967322f 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -148,6 +148,11 @@
         firstRequestIdAfterConnect_.store(requestId, std::memory_order_release);
     }
 
+    bool changeToReadyState() noexcept {
+        State expected = Pending;
+        return state_ == Ready || state_.compare_exchange_strong(expected, Ready);
+    }
+
    private:
     DeadlineTimerPtr timer_;
     DeadlineTimerPtr creationTimer_;
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index 3aa1dd3..f1bca77 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -1573,4 +1573,26 @@
     ASSERT_EQ(ResultOk, client.close());
 }
 
+TEST(ConsumerTest, testCloseAfterSeek) {
+    const auto topic = "test-close-after-seek-" + std::to_string(time(nullptr));
+    const auto subscription = "sub";
+    Client client(lookupUrl);
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, consumer));
+    ASSERT_EQ(ResultOk, consumer.seek(TimeUtils::currentTimeMillis()));
+    consumer.closeAsync(nullptr);
+
+    // Test the previous consumer will be closed even after seek is done, at the moment the connection might
+    // not be established.
+    ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, consumer));
+
+    // Test creating a consumer from a different client should also work for this case
+    Client anotherClient(lookupUrl);
+    consumer.closeAsync(nullptr);
+    ASSERT_EQ(ResultOk, anotherClient.subscribe(topic, subscription, consumer));
+
+    client.close();
+    anotherClient.close();
+}
+
 }  // namespace pulsar