Fix topic lookup segmentation fault after client is closed (#521)

(cherry picked from commit a03eb9278bf96cbda42eb9e7dc1c73ee0b65ea3e)
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index aa75128..eec3b34 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -78,7 +78,25 @@
 
 typedef std::vector<std::string> StringList;
 
+static LookupServicePtr defaultLookupServiceFactory(const std::string& serviceUrl,
+                                                    const ClientConfiguration& clientConfiguration,
+                                                    ConnectionPool& pool, const AuthenticationPtr& auth) {
+    if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) {
+        LOG_DEBUG("Using HTTP Lookup");
+        return std::make_shared<HTTPLookupService>(serviceUrl, std::cref(clientConfiguration),
+                                                   std::cref(auth));
+    } else {
+        LOG_DEBUG("Using Binary Lookup");
+        return std::make_shared<BinaryProtoLookupService>(serviceUrl, std::ref(pool),
+                                                          std::cref(clientConfiguration));
+    }
+}
+
 ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration)
+    : ClientImpl(serviceUrl, clientConfiguration, &defaultLookupServiceFactory) {}
+
+ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
+                       LookupServiceFactory&& lookupServiceFactory)
     : mutex_(),
       state_(Open),
       clientConfiguration_(ClientConfiguration(clientConfiguration)
@@ -95,7 +113,8 @@
       consumerIdGenerator_(0),
       closingError(ResultOk),
       useProxy_(false),
-      lookupCount_(0L) {
+      lookupCount_(0L),
+      lookupServiceFactory_(std::move(lookupServiceFactory)) {
     std::unique_ptr<LoggerFactory> loggerFactory = clientConfiguration_.impl_->takeLogger();
     if (loggerFactory) {
         LogUtils::setLoggerFactory(std::move(loggerFactory));
@@ -106,19 +125,9 @@
 ClientImpl::~ClientImpl() { shutdown(); }
 
 LookupServicePtr ClientImpl::createLookup(const std::string& serviceUrl) {
-    LookupServicePtr underlyingLookupServicePtr;
-    if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) {
-        LOG_DEBUG("Using HTTP Lookup");
-        underlyingLookupServicePtr = std::make_shared<HTTPLookupService>(
-            serviceUrl, std::cref(clientConfiguration_), std::cref(clientConfiguration_.getAuthPtr()));
-    } else {
-        LOG_DEBUG("Using Binary Lookup");
-        underlyingLookupServicePtr = std::make_shared<BinaryProtoLookupService>(
-            serviceUrl, std::ref(pool_), std::cref(clientConfiguration_));
-    }
-
     auto lookupServicePtr = RetryableLookupService::create(
-        underlyingLookupServicePtr, clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_);
+        lookupServiceFactory_(serviceUrl, clientConfiguration_, pool_, clientConfiguration_.getAuthPtr()),
+        clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_);
     return lookupServicePtr;
 }
 
@@ -767,6 +776,7 @@
                                    << " consumers have been shutdown.");
     }
 
+    lookupServicePtr_->close();
     if (!pool_.close()) {
         // pool_ has already been closed. It means shutdown() has been called before.
         return;
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index 000e443..0b4d596 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -54,6 +54,8 @@
 
 class LookupService;
 using LookupServicePtr = std::shared_ptr<LookupService>;
+using LookupServiceFactory = std::function<LookupServicePtr(const std::string&, const ClientConfiguration&,
+                                                            ConnectionPool& pool, const AuthenticationPtr&)>;
 
 class ProducerImplBase;
 using ProducerImplBaseWeakPtr = std::weak_ptr<ProducerImplBase>;
@@ -70,6 +72,11 @@
 class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
    public:
     ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration);
+
+    // only for tests
+    ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
+               LookupServiceFactory&& lookupServiceFactory);
+
     virtual ~ClientImpl();
 
     /**
@@ -205,6 +212,7 @@
     std::atomic<Result> closingError;
     std::atomic<bool> useProxy_;
     std::atomic<uint64_t> lookupCount_;
+    LookupServiceFactory lookupServiceFactory_;
 
     friend class Client;
 };
diff --git a/lib/ResultUtils.h b/lib/ResultUtils.h
index dfba7eb..cf4ff1f 100644
--- a/lib/ResultUtils.h
+++ b/lib/ResultUtils.h
@@ -49,7 +49,8 @@
                                                       ResultLookupError,
                                                       ResultTooManyLookupRequestException,
                                                       ResultProducerBlockedQuotaExceededException,
-                                                      ResultProducerBlockedQuotaExceededError};
+                                                      ResultProducerBlockedQuotaExceededError,
+                                                      ResultAlreadyClosed};
     return fatalResults.find(static_cast<int>(result)) == fatalResults.cend();
 }
 
diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h
index 8bc40bf..bbcf4f0 100644
--- a/lib/RetryableLookupService.h
+++ b/lib/RetryableLookupService.h
@@ -18,8 +18,6 @@
  */
 #pragma once
 
-#include <chrono>
-
 #include "LookupDataResult.h"
 #include "LookupService.h"
 #include "NamespaceName.h"
@@ -41,10 +39,10 @@
         : RetryableLookupService(std::forward<Args>(args)...) {}
 
     void close() override {
-        lookupCache_->clear();
-        partitionLookupCache_->clear();
-        namespaceLookupCache_->clear();
-        getSchemaCache_->clear();
+        lookupCache_->close();
+        partitionLookupCache_->close();
+        namespaceLookupCache_->close();
+        getSchemaCache_->close();
     }
 
     template <typename... Args>
@@ -89,7 +87,7 @@
 
     RetryableLookupService(std::shared_ptr<LookupService> lookupService, TimeDuration timeout,
                            ExecutorServiceProviderPtr executorProvider)
-        : lookupService_(lookupService),
+        : lookupService_(std::move(lookupService)),
           lookupCache_(RetryableOperationCache<LookupResult>::create(executorProvider, timeout)),
           partitionLookupCache_(
               RetryableOperationCache<LookupDataResultPtr>::create(executorProvider, timeout)),
diff --git a/lib/RetryableOperationCache.h b/lib/RetryableOperationCache.h
index e42460d..f2d390d 100644
--- a/lib/RetryableOperationCache.h
+++ b/lib/RetryableOperationCache.h
@@ -58,6 +58,11 @@
 
     Future<Result, T> run(const std::string& key, std::function<Future<Result, T>()>&& func) {
         std::unique_lock<std::mutex> lock{mutex_};
+        if (closed_) {
+            Promise<Result, T> promise;
+            promise.setFailed(ResultAlreadyClosed);
+            return promise.getFuture();
+        }
         auto it = operations_.find(key);
         if (it == operations_.end()) {
             DeadlineTimerPtr timer;
@@ -92,11 +97,15 @@
         }
     }
 
-    void clear() {
+    void close() {
         decltype(operations_) operations;
         {
             std::lock_guard<std::mutex> lock{mutex_};
+            if (closed_) {
+                return;
+            }
             operations.swap(operations_);
+            closed_ = true;
         }
         // cancel() could trigger the listener to erase the key from operations, so we should use a swap way
         // to release the lock here
@@ -110,6 +119,7 @@
     const TimeDuration timeout_;
 
     std::unordered_map<std::string, std::shared_ptr<RetryableOperation<T>>> operations_;
+    bool closed_{false};
     mutable std::mutex mutex_;
 
     DECLARE_LOG_OBJECT()
diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc
index ff3a7e0..92aa820 100644
--- a/tests/LookupServiceTest.cc
+++ b/tests/LookupServiceTest.cc
@@ -500,3 +500,65 @@
         }
     }
 }
+
+class MockLookupService : public BinaryProtoLookupService {
+   public:
+    using BinaryProtoLookupService::BinaryProtoLookupService;
+
+    Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override {
+        bool expected = true;
+        if (firstTime_.compare_exchange_strong(expected, false)) {
+            // Trigger the retry
+            LOG_INFO("Fail the lookup for " << topicName->toString() << " intentionally");
+            Promise<Result, LookupDataResultPtr> promise;
+            promise.setFailed(ResultRetryable);
+            return promise.getFuture();
+        }
+        return BinaryProtoLookupService::getPartitionMetadataAsync(topicName);
+    }
+
+   private:
+    std::atomic_bool firstTime_{true};
+};
+
+TEST(LookupServiceTest, testAfterClientShutdown) {
+    auto client = std::make_shared<ClientImpl>("pulsar://localhost:6650", ClientConfiguration{},
+                                               [](const std::string& serviceUrl, const ClientConfiguration&,
+                                                  ConnectionPool& pool, const AuthenticationPtr&) {
+                                                   return std::make_shared<MockLookupService>(
+                                                       serviceUrl, pool, ClientConfiguration{});
+                                               });
+    std::promise<Result> promise;
+    client->subscribeAsync("lookup-service-test-after-client-shutdown", "sub", ConsumerConfiguration{},
+                           [&promise](Result result, const Consumer&) { promise.set_value(result); });
+    // When shutdown is called, there is a pending lookup request due to the 1st lookup is failed in
+    // MockLookupService. Verify shutdown will cancel it and return ResultDisconnected.
+    client->shutdown();
+    EXPECT_EQ(ResultDisconnected, promise.get_future().get());
+
+    // A new subscribeAsync call will fail immediately in the current thread
+    Result result = ResultOk;
+    client->subscribeAsync("lookup-service-test-retry-after-destroyed", "sub", ConsumerConfiguration{},
+                           [&result](Result innerResult, const Consumer&) { result = innerResult; });
+    EXPECT_EQ(ResultAlreadyClosed, result);
+}
+
+TEST(LookupServiceTest, testRetryAfterDestroyed) {
+    auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
+    ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), "");
+
+    auto internalLookupService =
+        std::make_shared<MockLookupService>("pulsar://localhost:6650", pool, ClientConfiguration{});
+    auto lookupService =
+        RetryableLookupService::create(internalLookupService, std::chrono::seconds(30), executorProvider);
+
+    // Simulate the race condition that `getPartitionMetadataAsync` is called after `close` is called on the
+    // lookup service. It's expected the request fails immediately with ResultAlreadyClosed.
+    lookupService->close();
+    Result result = ResultOk;
+    lookupService->getPartitionMetadataAsync(TopicName::get("lookup-service-test-retry-after-destroyed"))
+        .addListener([&result](Result innerResult, const LookupDataResultPtr&) { result = innerResult; });
+    EXPECT_EQ(ResultAlreadyClosed, result);
+    pool.close();
+    executorProvider->close();
+}
diff --git a/tests/RetryableOperationCacheTest.cc b/tests/RetryableOperationCacheTest.cc
index 2a6948e..c9b8a1d 100644
--- a/tests/RetryableOperationCacheTest.cc
+++ b/tests/RetryableOperationCacheTest.cc
@@ -118,13 +118,13 @@
     }
 }
 
-TEST_F(RetryableOperationCacheTest, testClear) {
+TEST_F(RetryableOperationCacheTest, testClose) {
     auto cache = RetryableOperationCache<int>::create(provider_, std::chrono::seconds(30));
     for (int i = 0; i < 10; i++) {
         futures_.emplace_back(cache->run("key-" + std::to_string(i), CountdownFunc{100}));
     }
     ASSERT_EQ(getSize(*cache), 10);
-    cache->clear();
+    cache->close();
     for (auto&& future : futures_) {
         int value;
         // All cancelled futures complete with ResultDisconnected and the default int value