| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| #include "ClientImpl.h" |
| |
| #include "LogUtils.h" |
| #include "ConsumerImpl.h" |
| #include "ProducerImpl.h" |
| #include "ReaderImpl.h" |
| #include "PartitionedProducerImpl.h" |
| #include "PartitionedConsumerImpl.h" |
| #include "SimpleLoggerImpl.h" |
| #include "Log4CxxLogger.h" |
| #include <boost/bind.hpp> |
| #include <boost/algorithm/string/predicate.hpp> |
| #include <sstream> |
| #include <openssl/sha.h> |
| #include "boost/date_time/posix_time/posix_time.hpp" |
| #include <lib/HTTPLookupService.h> |
| #include <lib/TopicName.h> |
| |
| DECLARE_LOG_OBJECT() |
| |
| namespace pulsar { |
| |
| static const char hexDigits[] = {'0', '1', '2', '3', '4', '5', '6', '7', |
| '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; |
| |
| const std::string generateRandomName() { |
| unsigned char hash[SHA_DIGEST_LENGTH]; // == 20; |
| boost::posix_time::ptime t(boost::posix_time::microsec_clock::universal_time()); |
| long nanoSeconds = t.time_of_day().total_nanoseconds(); |
| std::stringstream ss; |
| ss << nanoSeconds; |
| SHA1(reinterpret_cast<const unsigned char*>(ss.str().c_str()), ss.str().length(), hash); |
| |
| const int nameLength = 6; |
| std::stringstream hexHash; |
| for (int i = 0; i < nameLength / 2; i++) { |
| hexHash << hexDigits[(hash[i] & 0xF0) >> 4]; |
| hexHash << hexDigits[hash[i] & 0x0F]; |
| } |
| |
| return hexHash.str(); |
| } |
| typedef boost::unique_lock<boost::mutex> Lock; |
| |
| ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration& clientConfiguration, |
| bool poolConnections) |
| : mutex_(), |
| state_(Open), |
| serviceUrl_(serviceUrl), |
| clientConfiguration_(clientConfiguration), |
| ioExecutorProvider_(boost::make_shared<ExecutorServiceProvider>(clientConfiguration.getIOThreads())), |
| listenerExecutorProvider_( |
| boost::make_shared<ExecutorServiceProvider>(clientConfiguration.getMessageListenerThreads())), |
| partitionListenerExecutorProvider_( |
| boost::make_shared<ExecutorServiceProvider>(clientConfiguration.getMessageListenerThreads())), |
| pool_(clientConfiguration, ioExecutorProvider_, clientConfiguration.getAuthPtr(), poolConnections), |
| producerIdGenerator_(0), |
| consumerIdGenerator_(0), |
| requestIdGenerator_(0) { |
| if (clientConfiguration.getLogger()) { |
| // A logger factory was explicitely configured. Let's just use that |
| LogUtils::setLoggerFactory(clientConfiguration.getLogger()); |
| } else { |
| #ifdef USE_LOG4CXX |
| if (!clientConfiguration.getLogConfFilePath().empty()) { |
| // A log4cxx log file was passed through deprecated parameter. Use that to configure Log4CXX |
| LogUtils::setLoggerFactory( |
| Log4CxxLoggerFactory::create(clientConfiguration.getLogConfFilePath())); |
| } else { |
| // Use default simple console logger |
| LogUtils::setLoggerFactory(SimpleLoggerFactory::create()); |
| } |
| #else |
| // Use default simple console logger |
| LogUtils::setLoggerFactory(SimpleLoggerFactory::create()); |
| #endif |
| } |
| |
| if (serviceUrl_.compare(0, 4, "http") == 0) { |
| LOG_DEBUG("Using HTTP Lookup"); |
| lookupServicePtr_ = |
| boost::make_shared<HTTPLookupService>(boost::cref(serviceUrl_), boost::cref(clientConfiguration_), |
| boost::cref(clientConfiguration.getAuthPtr())); |
| } else { |
| LOG_DEBUG("Using Binary Lookup"); |
| lookupServicePtr_ = |
| boost::make_shared<BinaryProtoLookupService>(boost::ref(pool_), boost::ref(serviceUrl)); |
| } |
| } |
| |
| ClientImpl::~ClientImpl() { shutdown(); } |
| |
| const ClientConfiguration& ClientImpl::conf() const { return clientConfiguration_; } |
| |
| ExecutorServiceProviderPtr ClientImpl::getIOExecutorProvider() { return ioExecutorProvider_; } |
| |
| ExecutorServiceProviderPtr ClientImpl::getListenerExecutorProvider() { return listenerExecutorProvider_; } |
| |
| ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() { |
| return partitionListenerExecutorProvider_; |
| } |
| void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfiguration conf, |
| CreateProducerCallback callback) { |
| TopicNamePtr topicName; |
| { |
| Lock lock(mutex_); |
| if (state_ != Open) { |
| lock.unlock(); |
| callback(ResultAlreadyClosed, Producer()); |
| return; |
| } else if (!(topicName = TopicName::get(topic))) { |
| lock.unlock(); |
| callback(ResultInvalidTopicName, Producer()); |
| return; |
| } |
| } |
| lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(boost::bind( |
| &ClientImpl::handleCreateProducer, shared_from_this(), _1, _2, topicName, conf, callback)); |
| } |
| |
| void ClientImpl::handleCreateProducer(const Result result, const LookupDataResultPtr partitionMetadata, |
| TopicNamePtr topicName, ProducerConfiguration conf, |
| CreateProducerCallback callback) { |
| if (!result) { |
| ProducerImplBasePtr producer; |
| if (partitionMetadata->getPartitions() > 1) { |
| producer = boost::make_shared<PartitionedProducerImpl>(shared_from_this(), topicName, |
| partitionMetadata->getPartitions(), conf); |
| } else { |
| producer = boost::make_shared<ProducerImpl>(shared_from_this(), topicName->toString(), conf); |
| } |
| producer->getProducerCreatedFuture().addListener( |
| boost::bind(&ClientImpl::handleProducerCreated, shared_from_this(), _1, _2, callback, producer)); |
| Lock lock(mutex_); |
| producers_.push_back(producer); |
| lock.unlock(); |
| producer->start(); |
| } else { |
| LOG_ERROR("Error Checking/Getting Partition Metadata while creating producer on " |
| << topicName->toString() << " -- " << result); |
| callback(result, Producer()); |
| } |
| } |
| |
| void ClientImpl::handleProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr, |
| CreateProducerCallback callback, ProducerImplBasePtr producer) { |
| callback(result, Producer(producer)); |
| } |
| |
| void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& startMessageId, |
| const ReaderConfiguration& conf, ReaderCallback callback) { |
| TopicNamePtr topicName; |
| { |
| Lock lock(mutex_); |
| if (state_ != Open) { |
| lock.unlock(); |
| callback(ResultAlreadyClosed, Reader()); |
| return; |
| } else if (!(topicName = TopicName::get(topic))) { |
| lock.unlock(); |
| callback(ResultInvalidTopicName, Reader()); |
| return; |
| } |
| } |
| |
| MessageId msgId(startMessageId); |
| lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( |
| boost::bind(&ClientImpl::handleReaderMetadataLookup, shared_from_this(), _1, _2, topicName, msgId, |
| conf, callback)); |
| } |
| |
| void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDataResultPtr partitionMetadata, |
| TopicNamePtr topicName, MessageId startMessageId, |
| ReaderConfiguration conf, ReaderCallback callback) { |
| if (result != ResultOk) { |
| LOG_ERROR("Error Checking/Getting Partition Metadata while creating reader: " << result); |
| callback(result, Reader()); |
| return; |
| } |
| |
| if (partitionMetadata->getPartitions() > 1) { |
| LOG_ERROR("Topic reader cannot be created on a partitioned topic: " << topicName->toString()); |
| callback(ResultOperationNotSupported, Reader()); |
| return; |
| } |
| |
| ReaderImplPtr reader = boost::make_shared<ReaderImpl>(shared_from_this(), topicName->toString(), conf, |
| getListenerExecutorProvider()->get(), callback); |
| reader->start(startMessageId); |
| |
| Lock lock(mutex_); |
| consumers_.push_back(reader->getConsumer()); |
| } |
| |
| void ClientImpl::subscribeAsync(const std::string& topic, const std::string& consumerName, |
| const ConsumerConfiguration& conf, SubscribeCallback callback) { |
| TopicNamePtr topicName; |
| { |
| Lock lock(mutex_); |
| if (state_ != Open) { |
| lock.unlock(); |
| callback(ResultAlreadyClosed, Consumer()); |
| return; |
| } else if (!(topicName = TopicName::get(topic))) { |
| lock.unlock(); |
| callback(ResultInvalidTopicName, Consumer()); |
| return; |
| } else if (conf.isReadCompacted() && (topicName->getDomain().compare("persistent") != 0 || |
| (conf.getConsumerType() != ConsumerExclusive && |
| conf.getConsumerType() != ConsumerFailover))) { |
| lock.unlock(); |
| callback(ResultInvalidConfiguration, Consumer()); |
| return; |
| } |
| } |
| |
| lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(boost::bind( |
| &ClientImpl::handleSubscribe, shared_from_this(), _1, _2, topicName, consumerName, conf, callback)); |
| } |
| |
| void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr partitionMetadata, |
| TopicNamePtr topicName, const std::string& consumerName, |
| ConsumerConfiguration conf, SubscribeCallback callback) { |
| if (result == ResultOk) { |
| // generate random name if not supplied by the customer. |
| if (conf.getConsumerName().empty()) { |
| conf.setConsumerName(generateRandomName()); |
| } |
| ConsumerImplBasePtr consumer; |
| if (partitionMetadata->getPartitions() > 1) { |
| if (conf.getReceiverQueueSize() == 0) { |
| LOG_ERROR("Can't use partitioned topic if the queue size is 0."); |
| callback(ResultInvalidConfiguration, Consumer()); |
| return; |
| } |
| consumer = boost::make_shared<PartitionedConsumerImpl>( |
| shared_from_this(), consumerName, topicName, partitionMetadata->getPartitions(), conf); |
| } else { |
| consumer = boost::make_shared<ConsumerImpl>(shared_from_this(), topicName->toString(), |
| consumerName, conf); |
| } |
| consumer->getConsumerCreatedFuture().addListener( |
| boost::bind(&ClientImpl::handleConsumerCreated, shared_from_this(), _1, _2, callback, consumer)); |
| Lock lock(mutex_); |
| consumers_.push_back(consumer); |
| lock.unlock(); |
| consumer->start(); |
| } else { |
| LOG_ERROR("Error Checking/Getting Partition Metadata while Subscribing- " << result); |
| callback(result, Consumer()); |
| } |
| } |
| |
| void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, |
| SubscribeCallback callback, ConsumerImplBasePtr consumer) { |
| callback(result, Consumer(consumer)); |
| } |
| |
| Future<Result, ClientConnectionWeakPtr> ClientImpl::getConnection(const std::string& topic) { |
| Promise<Result, ClientConnectionWeakPtr> promise; |
| lookupServicePtr_->lookupAsync(topic).addListener( |
| boost::bind(&ClientImpl::handleLookup, this, _1, _2, promise)); |
| return promise.getFuture(); |
| } |
| |
| void ClientImpl::handleLookup(Result result, LookupDataResultPtr data, |
| Promise<Result, ClientConnectionWeakPtr> promise) { |
| if (data) { |
| LOG_DEBUG("Getting connection to broker: " << data->getBrokerUrl()); |
| const std::string& logicalAddress = data->getBrokerUrl(); |
| const std::string& physicalAddress = |
| data->shouldProxyThroughServiceUrl() ? serviceUrl_ : logicalAddress; |
| Future<Result, ClientConnectionWeakPtr> future = |
| pool_.getConnectionAsync(logicalAddress, physicalAddress); |
| future.addListener(boost::bind(&ClientImpl::handleNewConnection, this, _1, _2, promise)); |
| } else { |
| promise.setFailed(result); |
| } |
| } |
| |
| void ClientImpl::handleNewConnection(Result result, const ClientConnectionWeakPtr& conn, |
| Promise<Result, ClientConnectionWeakPtr> promise) { |
| if (result == ResultOk) { |
| promise.setValue(conn); |
| } else { |
| promise.setFailed(ResultConnectError); |
| } |
| } |
| |
| void ClientImpl::closeAsync(CloseCallback callback) { |
| Lock lock(mutex_); |
| ProducersList producers(producers_); |
| ConsumersList consumers(consumers_); |
| |
| if (state_ != Open && callback) { |
| lock.unlock(); |
| callback(ResultAlreadyClosed); |
| return; |
| } |
| // Set the state to Closing so that no producers could get added |
| state_ = Closing; |
| lock.unlock(); |
| |
| LOG_DEBUG("Closing Pulsar client"); |
| SharedInt numberOfOpenHandlers = boost::make_shared<int>(producers.size() + consumers.size()); |
| |
| for (ProducersList::iterator it = producers.begin(); it != producers.end(); ++it) { |
| ProducerImplBasePtr producer = it->lock(); |
| if (producer && !producer->isClosed()) { |
| producer->closeAsync(boost::bind(&ClientImpl::handleClose, shared_from_this(), _1, |
| numberOfOpenHandlers, callback)); |
| } else { |
| // Since the connection is already closed |
| (*numberOfOpenHandlers)--; |
| } |
| } |
| |
| for (ConsumersList::iterator it = consumers.begin(); it != consumers.end(); ++it) { |
| ConsumerImplBasePtr consumer = it->lock(); |
| if (consumer && !consumer->isClosed()) { |
| consumer->closeAsync(boost::bind(&ClientImpl::handleClose, shared_from_this(), _1, |
| numberOfOpenHandlers, callback)); |
| } else { |
| // Since the connection is already closed |
| (*numberOfOpenHandlers)--; |
| } |
| } |
| |
| if (*numberOfOpenHandlers == 0 && callback) { |
| callback(ResultOk); |
| } |
| } |
| |
| void ClientImpl::handleClose(Result result, SharedInt numberOfOpenHandlers, ResultCallback callback) { |
| static bool errorClosing = false; |
| static Result failResult = ResultOk; |
| if (result != ResultOk) { |
| errorClosing = true; |
| failResult = result; |
| } |
| if (*numberOfOpenHandlers > 0) { |
| --(*numberOfOpenHandlers); |
| } |
| if (*numberOfOpenHandlers == 0) { |
| Lock lock(mutex_); |
| state_ = Closed; |
| lock.unlock(); |
| if (errorClosing) { |
| LOG_DEBUG("Problem in closing client, could not close one or more consumers or producers"); |
| if (callback) { |
| callback(failResult); |
| } |
| } |
| |
| LOG_DEBUG("Shutting down producers and consumers for client"); |
| shutdown(); |
| if (callback) { |
| callback(ResultOk); |
| } |
| } |
| } |
| |
| void ClientImpl::shutdown() { |
| Lock lock(mutex_); |
| ProducersList producers; |
| ConsumersList consumers; |
| |
| producers.swap(producers_); |
| consumers.swap(consumers_); |
| lock.unlock(); |
| |
| for (ProducersList::iterator it = producers.begin(); it != producers.end(); ++it) { |
| ProducerImplBasePtr producer = it->lock(); |
| if (producer) { |
| producer->shutdown(); |
| } |
| } |
| |
| for (ConsumersList::iterator it = consumers.begin(); it != consumers.end(); ++it) { |
| ConsumerImplBasePtr consumer = it->lock(); |
| if (consumer) { |
| consumer->shutdown(); |
| } |
| } |
| |
| ioExecutorProvider_->close(); |
| listenerExecutorProvider_->close(); |
| partitionListenerExecutorProvider_->close(); |
| } |
| |
| uint64_t ClientImpl::newProducerId() { |
| Lock lock(mutex_); |
| return producerIdGenerator_++; |
| } |
| |
| uint64_t ClientImpl::newConsumerId() { |
| Lock lock(mutex_); |
| return consumerIdGenerator_++; |
| } |
| |
| uint64_t ClientImpl::newRequestId() { |
| Lock lock(mutex_); |
| return requestIdGenerator_++; |
| } |
| |
| const ClientConfiguration& ClientImpl::getClientConfig() const { return clientConfiguration_; } |
| |
| } /* namespace pulsar */ |