Fix topic lookup segmentation fault after client is closed (#521)
(cherry picked from commit a03eb9278bf96cbda42eb9e7dc1c73ee0b65ea3e)
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index aa75128..eec3b34 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -78,7 +78,25 @@
typedef std::vector<std::string> StringList;
+static LookupServicePtr defaultLookupServiceFactory(const std::string& serviceUrl,
+ const ClientConfiguration& clientConfiguration,
+ ConnectionPool& pool, const AuthenticationPtr& auth) {
+ if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) {
+ LOG_DEBUG("Using HTTP Lookup");
+ return std::make_shared<HTTPLookupService>(serviceUrl, std::cref(clientConfiguration),
+ std::cref(auth));
+ } else {
+ LOG_DEBUG("Using Binary Lookup");
+ return std::make_shared<BinaryProtoLookupService>(serviceUrl, std::ref(pool),
+ std::cref(clientConfiguration));
+ }
+}
+
ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration)
+ : ClientImpl(serviceUrl, clientConfiguration, &defaultLookupServiceFactory) {}
+
+ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
+ LookupServiceFactory&& lookupServiceFactory)
: mutex_(),
state_(Open),
clientConfiguration_(ClientConfiguration(clientConfiguration)
@@ -95,7 +113,8 @@
consumerIdGenerator_(0),
closingError(ResultOk),
useProxy_(false),
- lookupCount_(0L) {
+ lookupCount_(0L),
+ lookupServiceFactory_(std::move(lookupServiceFactory)) {
std::unique_ptr<LoggerFactory> loggerFactory = clientConfiguration_.impl_->takeLogger();
if (loggerFactory) {
LogUtils::setLoggerFactory(std::move(loggerFactory));
@@ -106,19 +125,9 @@
ClientImpl::~ClientImpl() { shutdown(); }
LookupServicePtr ClientImpl::createLookup(const std::string& serviceUrl) {
- LookupServicePtr underlyingLookupServicePtr;
- if (ServiceNameResolver::useHttp(ServiceURI(serviceUrl))) {
- LOG_DEBUG("Using HTTP Lookup");
- underlyingLookupServicePtr = std::make_shared<HTTPLookupService>(
- serviceUrl, std::cref(clientConfiguration_), std::cref(clientConfiguration_.getAuthPtr()));
- } else {
- LOG_DEBUG("Using Binary Lookup");
- underlyingLookupServicePtr = std::make_shared<BinaryProtoLookupService>(
- serviceUrl, std::ref(pool_), std::cref(clientConfiguration_));
- }
-
auto lookupServicePtr = RetryableLookupService::create(
- underlyingLookupServicePtr, clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_);
+ lookupServiceFactory_(serviceUrl, clientConfiguration_, pool_, clientConfiguration_.getAuthPtr()),
+ clientConfiguration_.impl_->operationTimeout, ioExecutorProvider_);
return lookupServicePtr;
}
@@ -767,6 +776,7 @@
<< " consumers have been shutdown.");
}
+ lookupServicePtr_->close();
if (!pool_.close()) {
// pool_ has already been closed. It means shutdown() has been called before.
return;
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index 000e443..0b4d596 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -54,6 +54,8 @@
class LookupService;
using LookupServicePtr = std::shared_ptr<LookupService>;
+using LookupServiceFactory = std::function<LookupServicePtr(const std::string&, const ClientConfiguration&,
+ ConnectionPool& pool, const AuthenticationPtr&)>;
class ProducerImplBase;
using ProducerImplBaseWeakPtr = std::weak_ptr<ProducerImplBase>;
@@ -70,6 +72,11 @@
class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
public:
ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration);
+
+ // only for tests
+ ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
+ LookupServiceFactory&& lookupServiceFactory);
+
virtual ~ClientImpl();
/**
@@ -205,6 +212,7 @@
std::atomic<Result> closingError;
std::atomic<bool> useProxy_;
std::atomic<uint64_t> lookupCount_;
+ LookupServiceFactory lookupServiceFactory_;
friend class Client;
};
diff --git a/lib/ResultUtils.h b/lib/ResultUtils.h
index dfba7eb..cf4ff1f 100644
--- a/lib/ResultUtils.h
+++ b/lib/ResultUtils.h
@@ -49,7 +49,8 @@
ResultLookupError,
ResultTooManyLookupRequestException,
ResultProducerBlockedQuotaExceededException,
- ResultProducerBlockedQuotaExceededError};
+ ResultProducerBlockedQuotaExceededError,
+ ResultAlreadyClosed};
return fatalResults.find(static_cast<int>(result)) == fatalResults.cend();
}
diff --git a/lib/RetryableLookupService.h b/lib/RetryableLookupService.h
index 8bc40bf..bbcf4f0 100644
--- a/lib/RetryableLookupService.h
+++ b/lib/RetryableLookupService.h
@@ -18,8 +18,6 @@
*/
#pragma once
-#include <chrono>
-
#include "LookupDataResult.h"
#include "LookupService.h"
#include "NamespaceName.h"
@@ -41,10 +39,10 @@
: RetryableLookupService(std::forward<Args>(args)...) {}
void close() override {
- lookupCache_->clear();
- partitionLookupCache_->clear();
- namespaceLookupCache_->clear();
- getSchemaCache_->clear();
+ lookupCache_->close();
+ partitionLookupCache_->close();
+ namespaceLookupCache_->close();
+ getSchemaCache_->close();
}
template <typename... Args>
@@ -89,7 +87,7 @@
RetryableLookupService(std::shared_ptr<LookupService> lookupService, TimeDuration timeout,
ExecutorServiceProviderPtr executorProvider)
- : lookupService_(lookupService),
+ : lookupService_(std::move(lookupService)),
lookupCache_(RetryableOperationCache<LookupResult>::create(executorProvider, timeout)),
partitionLookupCache_(
RetryableOperationCache<LookupDataResultPtr>::create(executorProvider, timeout)),
diff --git a/lib/RetryableOperationCache.h b/lib/RetryableOperationCache.h
index e42460d..f2d390d 100644
--- a/lib/RetryableOperationCache.h
+++ b/lib/RetryableOperationCache.h
@@ -58,6 +58,11 @@
Future<Result, T> run(const std::string& key, std::function<Future<Result, T>()>&& func) {
std::unique_lock<std::mutex> lock{mutex_};
+ if (closed_) {
+ Promise<Result, T> promise;
+ promise.setFailed(ResultAlreadyClosed);
+ return promise.getFuture();
+ }
auto it = operations_.find(key);
if (it == operations_.end()) {
DeadlineTimerPtr timer;
@@ -92,11 +97,15 @@
}
}
- void clear() {
+ void close() {
decltype(operations_) operations;
{
std::lock_guard<std::mutex> lock{mutex_};
+ if (closed_) {
+ return;
+ }
operations.swap(operations_);
+ closed_ = true;
}
// cancel() could trigger the listener to erase the key from operations, so we should use a swap way
// to release the lock here
@@ -110,6 +119,7 @@
const TimeDuration timeout_;
std::unordered_map<std::string, std::shared_ptr<RetryableOperation<T>>> operations_;
+ bool closed_{false};
mutable std::mutex mutex_;
DECLARE_LOG_OBJECT()
diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc
index ff3a7e0..92aa820 100644
--- a/tests/LookupServiceTest.cc
+++ b/tests/LookupServiceTest.cc
@@ -500,3 +500,65 @@
}
}
}
+
+class MockLookupService : public BinaryProtoLookupService {
+ public:
+ using BinaryProtoLookupService::BinaryProtoLookupService;
+
+ Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override {
+ bool expected = true;
+ if (firstTime_.compare_exchange_strong(expected, false)) {
+ // Trigger the retry
+ LOG_INFO("Fail the lookup for " << topicName->toString() << " intentionally");
+ Promise<Result, LookupDataResultPtr> promise;
+ promise.setFailed(ResultRetryable);
+ return promise.getFuture();
+ }
+ return BinaryProtoLookupService::getPartitionMetadataAsync(topicName);
+ }
+
+ private:
+ std::atomic_bool firstTime_{true};
+};
+
+TEST(LookupServiceTest, testAfterClientShutdown) {
+ auto client = std::make_shared<ClientImpl>("pulsar://localhost:6650", ClientConfiguration{},
+ [](const std::string& serviceUrl, const ClientConfiguration&,
+ ConnectionPool& pool, const AuthenticationPtr&) {
+ return std::make_shared<MockLookupService>(
+ serviceUrl, pool, ClientConfiguration{});
+ });
+ std::promise<Result> promise;
+ client->subscribeAsync("lookup-service-test-after-client-shutdown", "sub", ConsumerConfiguration{},
+ [&promise](Result result, const Consumer&) { promise.set_value(result); });
+ // When shutdown is called, there is a pending lookup request due to the 1st lookup is failed in
+ // MockLookupService. Verify shutdown will cancel it and return ResultDisconnected.
+ client->shutdown();
+ EXPECT_EQ(ResultDisconnected, promise.get_future().get());
+
+ // A new subscribeAsync call will fail immediately in the current thread
+ Result result = ResultOk;
+ client->subscribeAsync("lookup-service-test-retry-after-destroyed", "sub", ConsumerConfiguration{},
+ [&result](Result innerResult, const Consumer&) { result = innerResult; });
+ EXPECT_EQ(ResultAlreadyClosed, result);
+}
+
+TEST(LookupServiceTest, testRetryAfterDestroyed) {
+ auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
+ ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), "");
+
+ auto internalLookupService =
+ std::make_shared<MockLookupService>("pulsar://localhost:6650", pool, ClientConfiguration{});
+ auto lookupService =
+ RetryableLookupService::create(internalLookupService, std::chrono::seconds(30), executorProvider);
+
+ // Simulate the race condition that `getPartitionMetadataAsync` is called after `close` is called on the
+ // lookup service. It's expected the request fails immediately with ResultAlreadyClosed.
+ lookupService->close();
+ Result result = ResultOk;
+ lookupService->getPartitionMetadataAsync(TopicName::get("lookup-service-test-retry-after-destroyed"))
+ .addListener([&result](Result innerResult, const LookupDataResultPtr&) { result = innerResult; });
+ EXPECT_EQ(ResultAlreadyClosed, result);
+ pool.close();
+ executorProvider->close();
+}
diff --git a/tests/RetryableOperationCacheTest.cc b/tests/RetryableOperationCacheTest.cc
index 2a6948e..c9b8a1d 100644
--- a/tests/RetryableOperationCacheTest.cc
+++ b/tests/RetryableOperationCacheTest.cc
@@ -118,13 +118,13 @@
}
}
-TEST_F(RetryableOperationCacheTest, testClear) {
+TEST_F(RetryableOperationCacheTest, testClose) {
auto cache = RetryableOperationCache<int>::create(provider_, std::chrono::seconds(30));
for (int i = 0; i < 10; i++) {
futures_.emplace_back(cache->run("key-" + std::to_string(i), CountdownFunc{100}));
}
ASSERT_EQ(getSize(*cache), 10);
- cache->clear();
+ cache->close();
for (auto&& future : futures_) {
int value;
// All cancelled futures complete with ResultDisconnected and the default int value