blob: a8fc24c3f72a2c3eaa92ee1730c71c6d9bbd553c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "ClientImpl.h"
#include <pulsar/ClientConfiguration.h>
#include <pulsar/Version.h>
#include <random>
#include <sstream>
#include "BinaryProtoLookupService.h"
#include "ClientConfigurationImpl.h"
#include "Commands.h"
#include "ConsumerImpl.h"
#include "ConsumerInterceptors.h"
#include "ExecutorService.h"
#include "HTTPLookupService.h"
#include "LogUtils.h"
#include "MultiTopicsConsumerImpl.h"
#include "PartitionedProducerImpl.h"
#include "PatternMultiTopicsConsumerImpl.h"
#include "ProducerImpl.h"
#include "ProducerInterceptors.h"
#include "ReaderImpl.h"
#include "RetryableLookupService.h"
#include "TableViewImpl.h"
#include "TimeUtils.h"
#include "TopicName.h"
#ifdef USE_LOG4CXX
#include "Log4CxxLogger.h"
#endif
#ifdef PULSAR_USE_BOOST_REGEX
#include <boost/regex.hpp>
#define PULSAR_REGEX_NAMESPACE boost
#else
#include <regex>
#define PULSAR_REGEX_NAMESPACE std
#endif
DECLARE_LOG_OBJECT()
namespace pulsar {
static const char hexDigits[] = {'0', '1', '2', '3', '4', '5', '6', '7',
'8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
static std::uniform_int_distribution<> hexDigitsDist(0, sizeof(hexDigits) - 1);
static std::mt19937 randomEngine =
std::mt19937(std::chrono::high_resolution_clock::now().time_since_epoch().count());
std::string generateRandomName() {
const int randomNameLength = 10;
std::string randomName;
for (int i = 0; i < randomNameLength; ++i) {
randomName += hexDigits[hexDigitsDist(randomEngine)];
}
return randomName;
}
typedef std::unique_lock<std::mutex> Lock;
typedef std::vector<std::string> StringList;
ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration,
bool poolConnections)
: mutex_(),
state_(Open),
serviceNameResolver_(serviceUrl),
clientConfiguration_(ClientConfiguration(clientConfiguration).setUseTls(serviceNameResolver_.useTls())),
memoryLimitController_(clientConfiguration.getMemoryLimit()),
ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getIOThreads())),
listenerExecutorProvider_(
std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getMessageListenerThreads())),
partitionListenerExecutorProvider_(
std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getMessageListenerThreads())),
pool_(clientConfiguration_, ioExecutorProvider_, clientConfiguration_.getAuthPtr(), poolConnections,
ClientImpl::getClientVersion(clientConfiguration)),
producerIdGenerator_(0),
consumerIdGenerator_(0),
closingError(ResultOk) {
std::unique_ptr<LoggerFactory> loggerFactory = clientConfiguration_.impl_->takeLogger();
if (!loggerFactory) {
#ifdef USE_LOG4CXX
if (!clientConfiguration_.getLogConfFilePath().empty()) {
// A log4cxx log file was passed through deprecated parameter. Use that to configure Log4CXX
loggerFactory = Log4CxxLoggerFactory::create(clientConfiguration_.getLogConfFilePath());
} else {
// Use default simple console logger
loggerFactory.reset(new ConsoleLoggerFactory);
}
#else
// Use default simple console logger
loggerFactory.reset(new ConsoleLoggerFactory);
#endif
}
LogUtils::setLoggerFactory(std::move(loggerFactory));
LookupServicePtr underlyingLookupServicePtr;
if (serviceNameResolver_.useHttp()) {
LOG_DEBUG("Using HTTP Lookup");
underlyingLookupServicePtr = std::make_shared<HTTPLookupService>(
std::ref(serviceNameResolver_), std::cref(clientConfiguration_),
std::cref(clientConfiguration_.getAuthPtr()));
} else {
LOG_DEBUG("Using Binary Lookup");
underlyingLookupServicePtr = std::make_shared<BinaryProtoLookupService>(
std::ref(serviceNameResolver_), std::ref(pool_), std::cref(clientConfiguration_));
}
lookupServicePtr_ = RetryableLookupService::create(
underlyingLookupServicePtr, clientConfiguration_.getOperationTimeoutSeconds(), ioExecutorProvider_);
}
ClientImpl::~ClientImpl() { shutdown(); }
const ClientConfiguration& ClientImpl::conf() const { return clientConfiguration_; }
MemoryLimitController& ClientImpl::getMemoryLimitController() { return memoryLimitController_; }
ExecutorServiceProviderPtr ClientImpl::getIOExecutorProvider() { return ioExecutorProvider_; }
ExecutorServiceProviderPtr ClientImpl::getListenerExecutorProvider() { return listenerExecutorProvider_; }
ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() {
return partitionListenerExecutorProvider_;
}
LookupServicePtr ClientImpl::getLookup() { return lookupServicePtr_; }
void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfiguration conf,
CreateProducerCallback callback, bool autoDownloadSchema) {
if (conf.isChunkingEnabled() && conf.getBatchingEnabled()) {
throw std::invalid_argument("Batching and chunking of messages can't be enabled together");
}
TopicNamePtr topicName;
{
Lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, Producer());
return;
} else if (!(topicName = TopicName::get(topic))) {
lock.unlock();
callback(ResultInvalidTopicName, Producer());
return;
}
}
if (autoDownloadSchema) {
auto self = shared_from_this();
lookupServicePtr_->getSchema(topicName).addListener(
[self, topicName, callback](Result res, SchemaInfo topicSchema) {
if (res != ResultOk) {
callback(res, Producer());
return;
}
ProducerConfiguration conf;
conf.setSchema(topicSchema);
self->lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
std::bind(&ClientImpl::handleCreateProducer, self, std::placeholders::_1,
std::placeholders::_2, topicName, conf, callback));
});
} else {
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, topicName, conf, callback));
}
}
void ClientImpl::handleCreateProducer(const Result result, const LookupDataResultPtr partitionMetadata,
TopicNamePtr topicName, ProducerConfiguration conf,
CreateProducerCallback callback) {
if (!result) {
ProducerImplBasePtr producer;
auto interceptors = std::make_shared<ProducerInterceptors>(conf.getInterceptors());
try {
if (partitionMetadata->getPartitions() > 0) {
producer = std::make_shared<PartitionedProducerImpl>(
shared_from_this(), topicName, partitionMetadata->getPartitions(), conf, interceptors);
} else {
producer = std::make_shared<ProducerImpl>(shared_from_this(), *topicName, conf, interceptors);
}
} catch (const std::runtime_error& e) {
LOG_ERROR("Failed to create producer: " << e.what());
callback(ResultConnectError, {});
return;
}
producer->getProducerCreatedFuture().addListener(
std::bind(&ClientImpl::handleProducerCreated, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, callback, producer));
producer->start();
} else {
LOG_ERROR("Error Checking/Getting Partition Metadata while creating producer on "
<< topicName->toString() << " -- " << result);
callback(result, Producer());
}
}
void ClientImpl::handleProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
CreateProducerCallback callback, ProducerImplBasePtr producer) {
if (result == ResultOk) {
auto pair = producers_.emplace(producer.get(), producer);
if (!pair.second) {
auto existingProducer = pair.first->second.lock();
LOG_ERROR("Unexpected existing producer at the same address: "
<< pair.first->first << ", producer: "
<< (existingProducer ? existingProducer->getProducerName() : "(null)"));
callback(ResultUnknownError, {});
return;
}
callback(result, Producer(producer));
} else {
callback(result, {});
}
}
void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf, ReaderCallback callback) {
TopicNamePtr topicName;
{
Lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, Reader());
return;
} else if (!(topicName = TopicName::get(topic))) {
lock.unlock();
callback(ResultInvalidTopicName, Reader());
return;
}
}
MessageId msgId(startMessageId);
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
std::bind(&ClientImpl::handleReaderMetadataLookup, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, topicName, msgId, conf, callback));
}
void ClientImpl::createTableViewAsync(const std::string& topic, const TableViewConfiguration& conf,
TableViewCallback callback) {
TopicNamePtr topicName;
{
Lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, TableView());
return;
} else if (!(topicName = TopicName::get(topic))) {
lock.unlock();
callback(ResultInvalidTopicName, TableView());
return;
}
}
TableViewImplPtr tableViewPtr =
std::make_shared<TableViewImpl>(shared_from_this(), topicName->toString(), conf);
tableViewPtr->start().addListener([callback](Result result, TableViewImplPtr tableViewImplPtr) {
if (result == ResultOk) {
callback(result, TableView{tableViewImplPtr});
} else {
callback(result, {});
}
});
}
void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDataResultPtr partitionMetadata,
TopicNamePtr topicName, MessageId startMessageId,
ReaderConfiguration conf, ReaderCallback callback) {
if (result != ResultOk) {
LOG_ERROR("Error Checking/Getting Partition Metadata while creating readeron "
<< topicName->toString() << " -- " << result);
callback(result, Reader());
return;
}
ReaderImplPtr reader;
try {
reader.reset(new ReaderImpl(shared_from_this(), topicName->toString(),
partitionMetadata->getPartitions(), conf,
getListenerExecutorProvider()->get(), callback));
} catch (const std::runtime_error& e) {
LOG_ERROR("Failed to create reader: " << e.what());
callback(ResultConnectError, {});
return;
}
ConsumerImplBasePtr consumer = reader->getConsumer();
auto self = shared_from_this();
reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
auto consumer = weakConsumerPtr.lock();
if (consumer) {
auto pair = consumers_.emplace(consumer.get(), consumer);
if (!pair.second) {
auto existingConsumer = pair.first->second.lock();
LOG_ERROR("Unexpected existing consumer at the same address: "
<< pair.first->first
<< ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
}
} else {
LOG_ERROR("Unexpected case: the consumer is somehow expired");
}
});
}
void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName,
const ConsumerConfiguration& conf, SubscribeCallback callback) {
TopicNamePtr topicNamePtr = TopicName::get(regexPattern);
Lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, Consumer());
return;
} else {
lock.unlock();
if (!topicNamePtr) {
LOG_ERROR("Topic pattern not valid: " << regexPattern);
callback(ResultInvalidTopicName, Consumer());
return;
}
}
if (TopicName::containsDomain(regexPattern)) {
LOG_WARN("Ignore invalid domain: "
<< topicNamePtr->getDomain()
<< ", use the RegexSubscriptionMode parameter to set the topic type");
}
CommandGetTopicsOfNamespace_Mode mode;
auto regexSubscriptionMode = conf.getRegexSubscriptionMode();
switch (regexSubscriptionMode) {
case PersistentOnly:
mode = CommandGetTopicsOfNamespace_Mode_PERSISTENT;
break;
case NonPersistentOnly:
mode = CommandGetTopicsOfNamespace_Mode_NON_PERSISTENT;
break;
case AllTopics:
mode = CommandGetTopicsOfNamespace_Mode_ALL;
break;
default:
LOG_ERROR("RegexSubscriptionMode not valid: " << regexSubscriptionMode);
callback(ResultInvalidConfiguration, Consumer());
return;
}
lookupServicePtr_->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), mode)
.addListener(std::bind(&ClientImpl::createPatternMultiTopicsConsumer, shared_from_this(),
std::placeholders::_1, std::placeholders::_2, regexPattern, mode,
subscriptionName, conf, callback));
}
void ClientImpl::createPatternMultiTopicsConsumer(const Result result, const NamespaceTopicsPtr topics,
const std::string& regexPattern,
CommandGetTopicsOfNamespace_Mode mode,
const std::string& subscriptionName,
const ConsumerConfiguration& conf,
SubscribeCallback callback) {
if (result == ResultOk) {
ConsumerImplBasePtr consumer;
PULSAR_REGEX_NAMESPACE::regex pattern(TopicName::removeDomain(regexPattern));
NamespaceTopicsPtr matchTopics =
PatternMultiTopicsConsumerImpl::topicsPatternFilter(*topics, pattern);
auto interceptors = std::make_shared<ConsumerInterceptors>(conf.getInterceptors());
consumer = std::make_shared<PatternMultiTopicsConsumerImpl>(shared_from_this(), regexPattern, mode,
*matchTopics, subscriptionName, conf,
lookupServicePtr_, interceptors);
consumer->getConsumerCreatedFuture().addListener(
std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, callback, consumer));
consumer->start();
} else {
LOG_ERROR("Error Getting topicsOfNameSpace while createPatternMultiTopicsConsumer: " << result);
callback(result, Consumer());
}
}
void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
const ConsumerConfiguration& conf, SubscribeCallback callback) {
TopicNamePtr topicNamePtr;
Lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, Consumer());
return;
} else {
if (!topics.empty() && !(topicNamePtr = MultiTopicsConsumerImpl::topicNamesValid(topics))) {
lock.unlock();
callback(ResultInvalidTopicName, Consumer());
return;
}
}
lock.unlock();
if (topicNamePtr) {
std::string randomName = generateRandomName();
std::stringstream consumerTopicNameStream;
consumerTopicNameStream << topicNamePtr->toString() << "-TopicsConsumerFakeName-" << randomName;
topicNamePtr = TopicName::get(consumerTopicNameStream.str());
}
auto interceptors = std::make_shared<ConsumerInterceptors>(conf.getInterceptors());
ConsumerImplBasePtr consumer = std::make_shared<MultiTopicsConsumerImpl>(
shared_from_this(), topics, subscriptionName, topicNamePtr, conf, lookupServicePtr_, interceptors);
consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated,
shared_from_this(), std::placeholders::_1,
std::placeholders::_2, callback, consumer));
consumer->start();
}
void ClientImpl::subscribeAsync(const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf, SubscribeCallback callback) {
TopicNamePtr topicName;
{
Lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, Consumer());
return;
} else if (!(topicName = TopicName::get(topic))) {
lock.unlock();
callback(ResultInvalidTopicName, Consumer());
return;
} else if (conf.isReadCompacted() && (topicName->getDomain().compare("persistent") != 0 ||
(conf.getConsumerType() != ConsumerExclusive &&
conf.getConsumerType() != ConsumerFailover))) {
lock.unlock();
callback(ResultInvalidConfiguration, Consumer());
return;
}
}
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
std::bind(&ClientImpl::handleSubscribe, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, topicName, subscriptionName, conf, callback));
}
void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr partitionMetadata,
TopicNamePtr topicName, const std::string& subscriptionName,
ConsumerConfiguration conf, SubscribeCallback callback) {
if (result == ResultOk) {
// generate random name if not supplied by the customer.
if (conf.getConsumerName().empty()) {
conf.setConsumerName(generateRandomName());
}
ConsumerImplBasePtr consumer;
auto interceptors = std::make_shared<ConsumerInterceptors>(conf.getInterceptors());
try {
if (partitionMetadata->getPartitions() > 0) {
if (conf.getReceiverQueueSize() == 0) {
LOG_ERROR("Can't use partitioned topic if the queue size is 0.");
callback(ResultInvalidConfiguration, Consumer());
return;
}
consumer = std::make_shared<MultiTopicsConsumerImpl>(
shared_from_this(), topicName, partitionMetadata->getPartitions(), subscriptionName, conf,
lookupServicePtr_, interceptors);
} else {
auto consumerImpl = std::make_shared<ConsumerImpl>(shared_from_this(), topicName->toString(),
subscriptionName, conf,
topicName->isPersistent(), interceptors);
consumerImpl->setPartitionIndex(topicName->getPartitionIndex());
consumer = consumerImpl;
}
} catch (const std::runtime_error& e) {
LOG_ERROR("Failed to create consumer: " << e.what());
callback(ResultConnectError, {});
return;
}
consumer->getConsumerCreatedFuture().addListener(
std::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, callback, consumer));
consumer->start();
} else {
LOG_ERROR("Error Checking/Getting Partition Metadata while Subscribing on " << topicName->toString()
<< " -- " << result);
callback(result, Consumer());
}
}
void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
SubscribeCallback callback, ConsumerImplBasePtr consumer) {
if (result == ResultOk) {
auto pair = consumers_.emplace(consumer.get(), consumer);
if (!pair.second) {
auto existingConsumer = pair.first->second.lock();
LOG_ERROR("Unexpected existing consumer at the same address: "
<< pair.first->first
<< ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
callback(ResultUnknownError, {});
return;
}
callback(result, Consumer(consumer));
} else {
callback(result, {});
}
}
Future<Result, ClientConnectionWeakPtr> ClientImpl::getConnection(const std::string& topic) {
Promise<Result, ClientConnectionWeakPtr> promise;
const auto topicNamePtr = TopicName::get(topic);
if (!topicNamePtr) {
LOG_ERROR("Unable to parse topic - " << topic);
promise.setFailed(ResultInvalidTopicName);
return promise.getFuture();
}
auto self = shared_from_this();
lookupServicePtr_->getBroker(*topicNamePtr)
.addListener([this, self, promise](Result result, const LookupService::LookupResult& data) {
if (result != ResultOk) {
promise.setFailed(result);
return;
}
pool_.getConnectionAsync(data.logicalAddress, data.physicalAddress)
.addListener([promise](Result result, const ClientConnectionWeakPtr& weakCnx) {
if (result == ResultOk) {
promise.setValue(weakCnx);
} else {
promise.setFailed(result);
}
});
});
return promise.getFuture();
}
void ClientImpl::handleGetPartitions(const Result result, const LookupDataResultPtr partitionMetadata,
TopicNamePtr topicName, GetPartitionsCallback callback) {
if (result != ResultOk) {
LOG_ERROR("Error getting topic partitions metadata: " << result);
callback(result, StringList());
return;
}
StringList partitions;
if (partitionMetadata->getPartitions() > 0) {
for (unsigned int i = 0; i < partitionMetadata->getPartitions(); i++) {
partitions.push_back(topicName->getTopicPartitionName(i));
}
} else {
partitions.push_back(topicName->toString());
}
callback(ResultOk, partitions);
}
void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, GetPartitionsCallback callback) {
TopicNamePtr topicName;
{
Lock lock(mutex_);
if (state_ != Open) {
lock.unlock();
callback(ResultAlreadyClosed, StringList());
return;
} else if (!(topicName = TopicName::get(topic))) {
lock.unlock();
callback(ResultInvalidTopicName, StringList());
return;
}
}
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
std::bind(&ClientImpl::handleGetPartitions, shared_from_this(), std::placeholders::_1,
std::placeholders::_2, topicName, callback));
}
void ClientImpl::closeAsync(CloseCallback callback) {
if (state_ != Open) {
if (callback) {
callback(ResultAlreadyClosed);
}
return;
}
// Set the state to Closing so that no producers could get added
state_ = Closing;
memoryLimitController_.close();
auto producers = producers_.move();
auto consumers = consumers_.move();
SharedInt numberOfOpenHandlers = std::make_shared<int>(producers.size() + consumers.size());
LOG_INFO("Closing Pulsar client with " << producers.size() << " producers and " << consumers.size()
<< " consumers");
for (auto&& kv : producers) {
ProducerImplBasePtr producer = kv.second.lock();
if (producer && !producer->isClosed()) {
producer->closeAsync(std::bind(&ClientImpl::handleClose, shared_from_this(),
std::placeholders::_1, numberOfOpenHandlers, callback));
} else {
// Since the connection is already closed
(*numberOfOpenHandlers)--;
}
}
for (auto&& kv : consumers) {
ConsumerImplBasePtr consumer = kv.second.lock();
if (consumer && !consumer->isClosed()) {
consumer->closeAsync(std::bind(&ClientImpl::handleClose, shared_from_this(),
std::placeholders::_1, numberOfOpenHandlers, callback));
} else {
// Since the connection is already closed
(*numberOfOpenHandlers)--;
}
}
if (*numberOfOpenHandlers == 0 && callback) {
handleClose(ResultOk, numberOfOpenHandlers, callback);
}
}
void ClientImpl::handleClose(Result result, SharedInt numberOfOpenHandlers, ResultCallback callback) {
Result expected = ResultOk;
if (!closingError.compare_exchange_strong(expected, result)) {
LOG_DEBUG("Tried to updated closingError, but already set to "
<< expected << ". This means multiple errors have occurred while closing the client");
}
if (*numberOfOpenHandlers > 0) {
--(*numberOfOpenHandlers);
}
if (*numberOfOpenHandlers == 0) {
Lock lock(mutex_);
if (state_ == Closed) {
LOG_DEBUG("Client is already shutting down, possible race condition in handleClose");
return;
} else {
state_ = Closed;
lock.unlock();
}
LOG_DEBUG("Shutting down producers and consumers for client");
// handleClose() is called in ExecutorService's event loop, while shutdown() tried to wait the event
// loop exits. So here we use another thread to call shutdown().
auto self = shared_from_this();
std::thread shutdownTask{[this, self, callback] {
shutdown();
if (callback) {
if (closingError != ResultOk) {
LOG_DEBUG(
"Problem in closing client, could not close one or more consumers or producers");
}
callback(closingError);
}
}};
shutdownTask.detach();
}
}
void ClientImpl::shutdown() {
auto producers = producers_.move();
auto consumers = consumers_.move();
for (auto&& kv : producers) {
ProducerImplBasePtr producer = kv.second.lock();
if (producer) {
producer->shutdown();
}
}
for (auto&& kv : consumers) {
ConsumerImplBasePtr consumer = kv.second.lock();
if (consumer) {
consumer->shutdown();
}
}
if (producers.size() + consumers.size() > 0) {
LOG_DEBUG(producers.size() << " producers and " << consumers.size()
<< " consumers have been shutdown.");
}
if (!pool_.close()) {
// pool_ has already been closed. It means shutdown() has been called before.
return;
}
LOG_DEBUG("ConnectionPool is closed");
// 500ms as the timeout is long enough because ExecutorService::close calls io_service::stop() internally
// and waits until io_service::run() in another thread returns, which should be as soon as possible after
// stop() is called.
TimeoutProcessor<std::chrono::milliseconds> timeoutProcessor{500};
timeoutProcessor.tik();
ioExecutorProvider_->close(timeoutProcessor.getLeftTimeout());
timeoutProcessor.tok();
LOG_DEBUG("ioExecutorProvider_ is closed");
timeoutProcessor.tik();
listenerExecutorProvider_->close(timeoutProcessor.getLeftTimeout());
timeoutProcessor.tok();
LOG_DEBUG("listenerExecutorProvider_ is closed");
timeoutProcessor.tik();
partitionListenerExecutorProvider_->close(timeoutProcessor.getLeftTimeout());
timeoutProcessor.tok();
LOG_DEBUG("partitionListenerExecutorProvider_ is closed");
}
uint64_t ClientImpl::newProducerId() {
Lock lock(mutex_);
return producerIdGenerator_++;
}
uint64_t ClientImpl::newConsumerId() {
Lock lock(mutex_);
return consumerIdGenerator_++;
}
uint64_t ClientImpl::newRequestId() { return (*requestIdGenerator_)++; }
uint64_t ClientImpl::getNumberOfProducers() {
uint64_t numberOfAliveProducers = 0;
producers_.forEachValue([&numberOfAliveProducers](const ProducerImplBaseWeakPtr& producer) {
const auto& producerImpl = producer.lock();
if (producerImpl) {
numberOfAliveProducers += producerImpl->getNumberOfConnectedProducer();
}
});
return numberOfAliveProducers;
}
uint64_t ClientImpl::getNumberOfConsumers() {
uint64_t numberOfAliveConsumers = 0;
consumers_.forEachValue([&numberOfAliveConsumers](const ConsumerImplBaseWeakPtr& consumer) {
const auto consumerImpl = consumer.lock();
if (consumerImpl) {
numberOfAliveConsumers += consumerImpl->getNumberOfConnectedConsumer();
}
});
return numberOfAliveConsumers;
}
const ClientConfiguration& ClientImpl::getClientConfig() const { return clientConfiguration_; }
std::string ClientImpl::getClientVersion(const ClientConfiguration& clientConfiguration) {
std::ostringstream oss;
oss << "Pulsar-CPP-v" << PULSAR_VERSION_STR;
if (!clientConfiguration.getDescription().empty()) {
oss << "-" << clientConfiguration.getDescription();
}
return oss.str();
}
} /* namespace pulsar */