| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| #include "MultiTopicsConsumerImpl.h" |
| |
| DECLARE_LOG_OBJECT() |
| |
| using namespace pulsar; |
| |
| MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics, |
| const std::string& subscriptionName, TopicNamePtr topicName, |
| const ConsumerConfiguration& conf, |
| const LookupServicePtr lookupServicePtr) |
| : client_(client), |
| subscriptionName_(subscriptionName), |
| topic_(topicName ? topicName->toString() : "EmptyTopics"), |
| conf_(conf), |
| state_(Pending), |
| messages_(conf.getReceiverQueueSize()), |
| listenerExecutor_(client->getListenerExecutorProvider()->get()), |
| messageListener_(conf.getMessageListener()), |
| pendingReceives_(), |
| lookupServicePtr_(lookupServicePtr), |
| numberTopicPartitions_(std::make_shared<std::atomic<int>>(0)), |
| topics_(topics) { |
| std::stringstream consumerStrStream; |
| consumerStrStream << "[Muti Topics Consumer: " |
| << "TopicName - " << topic_ << " - Subscription - " << subscriptionName << "]"; |
| consumerStr_ = consumerStrStream.str(); |
| |
| if (conf.getUnAckedMessagesTimeoutMs() != 0) { |
| if (conf.getTickDurationInMs() > 0) { |
| unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled( |
| conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this)); |
| } else { |
| unAckedMessageTrackerPtr_.reset( |
| new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this)); |
| } |
| } else { |
| unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled()); |
| } |
| } |
| |
| void MultiTopicsConsumerImpl::start() { |
| if (topics_.empty()) { |
| if (compareAndSetState(Pending, Ready)) { |
| LOG_DEBUG("No topics passed in when create MultiTopicsConsumer."); |
| multiTopicsConsumerCreatedPromise_.setValue(shared_from_this()); |
| return; |
| } else { |
| LOG_ERROR("Consumer " << consumerStr_ << " in wrong state: " << state_); |
| multiTopicsConsumerCreatedPromise_.setFailed(ResultUnknownError); |
| return; |
| } |
| } |
| |
| // start call subscribeOneTopicAsync for each single topic |
| int topicsNumber = topics_.size(); |
| std::shared_ptr<std::atomic<int>> topicsNeedCreate = std::make_shared<std::atomic<int>>(topicsNumber); |
| // subscribe for each passed in topic |
| for (std::vector<std::string>::const_iterator itr = topics_.begin(); itr != topics_.end(); itr++) { |
| subscribeOneTopicAsync(*itr).addListener(std::bind(&MultiTopicsConsumerImpl::handleOneTopicSubscribed, |
| shared_from_this(), std::placeholders::_1, |
| std::placeholders::_2, *itr, topicsNeedCreate)); |
| } |
| } |
| |
| void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer consumer, |
| const std::string& topic, |
| std::shared_ptr<std::atomic<int>> topicsNeedCreate) { |
| int previous = topicsNeedCreate->fetch_sub(1); |
| assert(previous > 0); |
| |
| if (result != ResultOk) { |
| setState(Failed); |
| LOG_ERROR("Failed when subscribed to topic " << topic << " in TopicsConsumer. Error - " << result); |
| } |
| |
| LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer "); |
| |
| if (topicsNeedCreate->load() == 0) { |
| if (compareAndSetState(Pending, Ready)) { |
| LOG_INFO("Successfully Subscribed to Topics"); |
| multiTopicsConsumerCreatedPromise_.setValue(shared_from_this()); |
| } else { |
| LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result); |
| // unsubscribed all of the successfully subscribed partitioned consumers |
| closeAsync(nullptr); |
| multiTopicsConsumerCreatedPromise_.setFailed(result); |
| return; |
| } |
| return; |
| } |
| } |
| |
| // subscribe for passed in topic |
| Future<Result, Consumer> MultiTopicsConsumerImpl::subscribeOneTopicAsync(const std::string& topic) { |
| TopicNamePtr topicName; |
| ConsumerSubResultPromisePtr topicPromise = std::make_shared<Promise<Result, Consumer>>(); |
| if (!(topicName = TopicName::get(topic))) { |
| LOG_ERROR("TopicName invalid: " << topic); |
| topicPromise->setFailed(ResultInvalidTopicName); |
| return topicPromise->getFuture(); |
| } |
| |
| if (state_ == Closed || state_ == Closing) { |
| LOG_ERROR("MultiTopicsConsumer already closed when subscribe."); |
| topicPromise->setFailed(ResultAlreadyClosed); |
| return topicPromise->getFuture(); |
| } |
| |
| // subscribe for each partition, when all partitions completed, complete promise |
| lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(std::bind( |
| &MultiTopicsConsumerImpl::subscribeTopicPartitions, shared_from_this(), std::placeholders::_1, |
| std::placeholders::_2, topicName, subscriptionName_, conf_, topicPromise)); |
| return topicPromise->getFuture(); |
| } |
| |
| void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result, |
| const LookupDataResultPtr partitionMetadata, |
| TopicNamePtr topicName, |
| const std::string& consumerName, |
| ConsumerConfiguration conf, |
| ConsumerSubResultPromisePtr topicSubResultPromise) { |
| if (result != ResultOk) { |
| LOG_ERROR("Error Checking/Getting Partition Metadata while MultiTopics Subscribing- " |
| << consumerStr_ << " result: " << result) |
| topicSubResultPromise->setFailed(result); |
| return; |
| } |
| |
| std::shared_ptr<ConsumerImpl> consumer; |
| ConsumerConfiguration config = conf_.clone(); |
| ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get(); |
| |
| config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(), |
| std::placeholders::_1, std::placeholders::_2)); |
| |
| int numPartitions = partitionMetadata->getPartitions(); |
| int partitions = numPartitions == 0 ? 1 : numPartitions; |
| |
| // Apply total limit of receiver queue size across partitions |
| config.setReceiverQueueSize( |
| std::min(conf_.getReceiverQueueSize(), |
| (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / partitions))); |
| |
| Lock lock(mutex_); |
| topicsPartitions_.insert(std::make_pair(topicName->toString(), partitions)); |
| lock.unlock(); |
| numberTopicPartitions_->fetch_add(partitions); |
| |
| std::shared_ptr<std::atomic<int>> partitionsNeedCreate = std::make_shared<std::atomic<int>>(partitions); |
| |
| // non-partitioned topic |
| if (numPartitions == 0) { |
| // We don't have to add partition-n suffix |
| consumer = std::make_shared<ConsumerImpl>(client_, topicName->toString(), subscriptionName_, config, |
| internalListenerExecutor, true, NonPartitioned); |
| consumer->getConsumerCreatedFuture().addListener(std::bind( |
| &MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(), std::placeholders::_1, |
| std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise)); |
| consumers_.insert(std::make_pair(topicName->toString(), consumer)); |
| LOG_DEBUG("Creating Consumer for - " << topicName << " - " << consumerStr_); |
| consumer->start(); |
| |
| } else { |
| for (int i = 0; i < numPartitions; i++) { |
| std::string topicPartitionName = topicName->getTopicPartitionName(i); |
| consumer = std::make_shared<ConsumerImpl>(client_, topicPartitionName, subscriptionName_, config, |
| internalListenerExecutor, true, Partitioned); |
| consumer->getConsumerCreatedFuture().addListener(std::bind( |
| &MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(), |
| std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise)); |
| consumer->setPartitionIndex(i); |
| consumers_.insert(std::make_pair(topicPartitionName, consumer)); |
| LOG_DEBUG("Creating Consumer for - " << topicPartitionName << " - " << consumerStr_); |
| consumer->start(); |
| } |
| } |
| } |
| |
| void MultiTopicsConsumerImpl::handleSingleConsumerCreated( |
| Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, |
| std::shared_ptr<std::atomic<int>> partitionsNeedCreate, |
| ConsumerSubResultPromisePtr topicSubResultPromise) { |
| if (state_ == Failed) { |
| // one of the consumer creation failed, and we are cleaning up |
| topicSubResultPromise->setFailed(ResultAlreadyClosed); |
| LOG_ERROR("Unable to create Consumer " << consumerStr_ << " state == Failed, result: " << result); |
| return; |
| } |
| |
| int previous = partitionsNeedCreate->fetch_sub(1); |
| assert(previous > 0); |
| |
| if (result != ResultOk) { |
| topicSubResultPromise->setFailed(result); |
| LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result); |
| return; |
| } |
| |
| LOG_DEBUG("Successfully Subscribed to a single partition of topic in TopicsConsumer. " |
| << "Partitions need to create - " << previous - 1); |
| |
| if (partitionsNeedCreate->load() == 0) { |
| topicSubResultPromise->setValue(Consumer(shared_from_this())); |
| } |
| } |
| |
| void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) { |
| LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "] Unsubscribing"); |
| |
| Lock lock(mutex_); |
| if (state_ == Closing || state_ == Closed) { |
| LOG_INFO(consumerStr_ << " already closed"); |
| lock.unlock(); |
| callback(ResultAlreadyClosed); |
| return; |
| } |
| state_ = Closing; |
| lock.unlock(); |
| |
| if (consumers_.empty()) { |
| // No need to unsubscribe, since the list matching the regex was empty |
| callback(ResultOk); |
| return; |
| } |
| |
| std::shared_ptr<std::atomic<int>> consumerUnsubed = std::make_shared<std::atomic<int>>(0); |
| |
| for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end(); |
| consumer++) { |
| (consumer->second) |
| ->unsubscribeAsync(std::bind(&MultiTopicsConsumerImpl::handleUnsubscribedAsync, |
| shared_from_this(), std::placeholders::_1, consumerUnsubed, |
| callback)); |
| } |
| } |
| |
| void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result, |
| std::shared_ptr<std::atomic<int>> consumerUnsubed, |
| ResultCallback callback) { |
| int previous = consumerUnsubed->fetch_add(1); |
| assert(previous < numberTopicPartitions_->load()); |
| |
| if (result != ResultOk) { |
| setState(Failed); |
| LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: " |
| << result << " subscription - " << subscriptionName_); |
| } |
| |
| if (consumerUnsubed->load() == numberTopicPartitions_->load()) { |
| LOG_DEBUG("Unsubscribed all of the partition consumer for TopicsConsumer. - " << consumerStr_); |
| consumers_.clear(); |
| topicsPartitions_.clear(); |
| unAckedMessageTrackerPtr_->clear(); |
| |
| Result result1 = (state_ != Failed) ? ResultOk : ResultUnknownError; |
| setState(Closed); |
| callback(result1); |
| return; |
| } |
| } |
| |
| void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic, ResultCallback callback) { |
| std::map<std::string, int>::iterator it = topicsPartitions_.find(topic); |
| if (it == topicsPartitions_.end()) { |
| LOG_ERROR("TopicsConsumer does not subscribe topic : " << topic << " subscription - " |
| << subscriptionName_); |
| callback(ResultTopicNotFound); |
| return; |
| } |
| |
| if (state_ == Closing || state_ == Closed) { |
| LOG_ERROR("TopicsConsumer already closed when unsubscribe topic: " << topic << " subscription - " |
| << subscriptionName_); |
| callback(ResultAlreadyClosed); |
| return; |
| } |
| |
| TopicNamePtr topicName; |
| if (!(topicName = TopicName::get(topic))) { |
| LOG_ERROR("TopicName invalid: " << topic); |
| callback(ResultUnknownError); |
| } |
| int numberPartitions = it->second; |
| std::shared_ptr<std::atomic<int>> consumerUnsubed = std::make_shared<std::atomic<int>>(0); |
| |
| for (int i = 0; i < numberPartitions; i++) { |
| std::string topicPartitionName = topicName->getTopicPartitionName(i); |
| std::map<std::string, ConsumerImplPtr>::iterator iterator = consumers_.find(topicPartitionName); |
| |
| if (consumers_.end() == iterator) { |
| LOG_ERROR("TopicsConsumer not subscribed on topicPartitionName: " << topicPartitionName); |
| callback(ResultUnknownError); |
| } |
| |
| (iterator->second) |
| ->unsubscribeAsync(std::bind(&MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync, |
| shared_from_this(), std::placeholders::_1, consumerUnsubed, |
| numberPartitions, topicName, topicPartitionName, callback)); |
| } |
| } |
| |
| void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync( |
| Result result, std::shared_ptr<std::atomic<int>> consumerUnsubed, int numberPartitions, |
| TopicNamePtr topicNamePtr, std::string& topicPartitionName, ResultCallback callback) { |
| int previous = consumerUnsubed->fetch_add(1); |
| assert(previous < numberPartitions); |
| |
| if (result != ResultOk) { |
| setState(Failed); |
| LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: " |
| << result << " topicPartitionName - " << topicPartitionName); |
| } |
| |
| LOG_DEBUG("Successfully Unsubscribed one Consumer. topicPartitionName - " << topicPartitionName); |
| |
| std::map<std::string, ConsumerImplPtr>::iterator iterator = consumers_.find(topicPartitionName); |
| if (consumers_.end() != iterator) { |
| iterator->second->pauseMessageListener(); |
| consumers_.erase(iterator); |
| } |
| |
| if (consumerUnsubed->load() == numberPartitions) { |
| LOG_DEBUG("Unsubscribed all of the partition consumer for TopicsConsumer. - " << consumerStr_); |
| std::map<std::string, int>::iterator it = topicsPartitions_.find(topicNamePtr->toString()); |
| if (it != topicsPartitions_.end()) { |
| numberTopicPartitions_->fetch_sub(numberPartitions); |
| Lock lock(mutex_); |
| topicsPartitions_.erase(it); |
| lock.unlock(); |
| } |
| if (state_ != Failed) { |
| callback(ResultOk); |
| } else { |
| callback(ResultUnknownError); |
| } |
| unAckedMessageTrackerPtr_->removeTopicMessage(topicNamePtr->toString()); |
| return; |
| } |
| } |
| |
| void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { |
| if (state_ == Closing || state_ == Closed) { |
| LOG_ERROR("TopicsConsumer already closed " |
| << " topic" << topic_ << " consumer - " << consumerStr_); |
| if (callback) { |
| callback(ResultAlreadyClosed); |
| } |
| return; |
| } |
| |
| setState(Closing); |
| |
| if (consumers_.empty()) { |
| LOG_DEBUG("TopicsConsumer have no consumers to close " |
| << " topic" << topic_ << " subscription - " << subscriptionName_); |
| setState(Closed); |
| if (callback) { |
| callback(ResultAlreadyClosed); |
| } |
| return; |
| } |
| |
| // close successfully subscribed consumers |
| for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end(); |
| consumer++) { |
| std::string topicPartitionName = consumer->first; |
| ConsumerImplPtr consumerPtr = consumer->second; |
| |
| consumerPtr->closeAsync(std::bind(&MultiTopicsConsumerImpl::handleSingleConsumerClose, |
| shared_from_this(), std::placeholders::_1, topicPartitionName, |
| callback)); |
| } |
| |
| // fail pending recieve |
| failPendingReceiveCallback(); |
| } |
| |
| void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, std::string& topicPartitionName, |
| CloseCallback callback) { |
| std::map<std::string, ConsumerImplPtr>::iterator iterator = consumers_.find(topicPartitionName); |
| if (consumers_.end() != iterator) { |
| consumers_.erase(iterator); |
| } |
| |
| LOG_DEBUG("Closing the consumer for partition - " << topicPartitionName << " numberTopicPartitions_ - " |
| << numberTopicPartitions_->load()); |
| |
| assert(numberTopicPartitions_->load() > 0); |
| numberTopicPartitions_->fetch_sub(1); |
| |
| if (result != ResultOk) { |
| setState(Failed); |
| LOG_ERROR("Closing the consumer failed for partition - " << topicPartitionName << " with error - " |
| << result); |
| } |
| |
| // closed all consumers |
| if (numberTopicPartitions_->load() == 0) { |
| messages_.clear(); |
| consumers_.clear(); |
| topicsPartitions_.clear(); |
| unAckedMessageTrackerPtr_->clear(); |
| |
| if (state_ != Failed) { |
| state_ = Closed; |
| } |
| |
| multiTopicsConsumerCreatedPromise_.setFailed(ResultUnknownError); |
| if (callback) { |
| callback(result); |
| } |
| return; |
| } |
| } |
| |
| void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) { |
| LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic() |
| << " message:" << msg.getDataAsString()); |
| const std::string& topicPartitionName = consumer.getTopic(); |
| msg.impl_->setTopicName(topicPartitionName); |
| |
| Lock lock(pendingReceiveMutex_); |
| if (!pendingReceives_.empty()) { |
| ReceiveCallback callback = pendingReceives_.front(); |
| pendingReceives_.pop(); |
| lock.unlock(); |
| unAckedMessageTrackerPtr_->add(msg.getMessageId()); |
| listenerExecutor_->postWork(std::bind(callback, ResultOk, msg)); |
| } else { |
| if (messages_.full()) { |
| lock.unlock(); |
| } |
| messages_.push(msg); |
| if (messageListener_) { |
| unAckedMessageTrackerPtr_->add(msg.getMessageId()); |
| listenerExecutor_->postWork( |
| std::bind(&MultiTopicsConsumerImpl::internalListener, shared_from_this(), consumer)); |
| } |
| } |
| } |
| |
| void MultiTopicsConsumerImpl::internalListener(Consumer consumer) { |
| Message m; |
| messages_.pop(m); |
| |
| try { |
| messageListener_(Consumer(shared_from_this()), m); |
| } catch (const std::exception& e) { |
| LOG_ERROR("Exception thrown from listener of Partitioned Consumer" << e.what()); |
| } |
| } |
| |
| Result MultiTopicsConsumerImpl::receive(Message& msg) { |
| Lock lock(mutex_); |
| if (state_ != Ready) { |
| lock.unlock(); |
| return ResultAlreadyClosed; |
| } |
| |
| if (messageListener_) { |
| lock.unlock(); |
| LOG_ERROR("Can not receive when a listener has been set"); |
| return ResultInvalidConfiguration; |
| } |
| lock.unlock(); |
| messages_.pop(msg); |
| |
| unAckedMessageTrackerPtr_->add(msg.getMessageId()); |
| return ResultOk; |
| } |
| |
| Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) { |
| Lock lock(mutex_); |
| if (state_ != Ready) { |
| lock.unlock(); |
| return ResultAlreadyClosed; |
| } |
| |
| if (messageListener_) { |
| lock.unlock(); |
| LOG_ERROR("Can not receive when a listener has been set"); |
| return ResultInvalidConfiguration; |
| } |
| |
| lock.unlock(); |
| if (messages_.pop(msg, std::chrono::milliseconds(timeout))) { |
| unAckedMessageTrackerPtr_->add(msg.getMessageId()); |
| return ResultOk; |
| } else { |
| return ResultTimeout; |
| } |
| } |
| |
| void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) { |
| Message msg; |
| |
| // fail the callback if consumer is closing or closed |
| Lock stateLock(mutex_); |
| if (state_ != Ready) { |
| callback(ResultAlreadyClosed, msg); |
| return; |
| } |
| stateLock.unlock(); |
| |
| Lock lock(pendingReceiveMutex_); |
| if (messages_.pop(msg, std::chrono::milliseconds(0))) { |
| lock.unlock(); |
| unAckedMessageTrackerPtr_->add(msg.getMessageId()); |
| callback(ResultOk, msg); |
| } else { |
| pendingReceives_.push(callback); |
| } |
| } |
| |
| void MultiTopicsConsumerImpl::failPendingReceiveCallback() { |
| Message msg; |
| Lock lock(pendingReceiveMutex_); |
| while (!pendingReceives_.empty()) { |
| ReceiveCallback callback = pendingReceives_.front(); |
| pendingReceives_.pop(); |
| listenerExecutor_->postWork(std::bind(callback, ResultAlreadyClosed, msg)); |
| } |
| lock.unlock(); |
| } |
| |
| void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) { |
| if (state_ != Ready) { |
| callback(ResultAlreadyClosed); |
| return; |
| } |
| |
| const std::string& topicPartitionName = msgId.getTopicName(); |
| std::map<std::string, ConsumerImplPtr>::iterator iterator = consumers_.find(topicPartitionName); |
| |
| if (consumers_.end() != iterator) { |
| unAckedMessageTrackerPtr_->remove(msgId); |
| iterator->second->acknowledgeAsync(msgId, callback); |
| } else { |
| LOG_ERROR("Message of topic: " << topicPartitionName << " not in unAckedMessageTracker"); |
| callback(ResultUnknownError); |
| return; |
| } |
| } |
| |
| void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) { |
| callback(ResultOperationNotSupported); |
| } |
| |
| void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) { |
| auto iterator = consumers_.find(msgId.getTopicName()); |
| |
| if (consumers_.end() != iterator) { |
| unAckedMessageTrackerPtr_->remove(msgId); |
| iterator->second->negativeAcknowledge(msgId); |
| } |
| } |
| |
| MultiTopicsConsumerImpl::~MultiTopicsConsumerImpl() {} |
| |
| Future<Result, ConsumerImplBaseWeakPtr> MultiTopicsConsumerImpl::getConsumerCreatedFuture() { |
| return multiTopicsConsumerCreatedPromise_.getFuture(); |
| } |
| const std::string& MultiTopicsConsumerImpl::getSubscriptionName() const { return subscriptionName_; } |
| |
| const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic_; } |
| |
| const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; } |
| |
| void MultiTopicsConsumerImpl::setState(const MultiTopicsConsumerState state) { |
| Lock lock(mutex_); |
| state_ = state; |
| } |
| |
| bool MultiTopicsConsumerImpl::compareAndSetState(MultiTopicsConsumerState expect, |
| MultiTopicsConsumerState update) { |
| Lock lock(mutex_); |
| if (state_ == expect) { |
| state_ = update; |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| void MultiTopicsConsumerImpl::shutdown() {} |
| |
| bool MultiTopicsConsumerImpl::isClosed() { return state_ == Closed; } |
| |
| bool MultiTopicsConsumerImpl::isOpen() { |
| Lock lock(mutex_); |
| return state_ == Ready; |
| } |
| |
| void MultiTopicsConsumerImpl::receiveMessages() { |
| for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end(); |
| consumer++) { |
| ConsumerImplPtr consumerPtr = consumer->second; |
| consumerPtr->sendFlowPermitsToBroker(consumerPtr->getCnx().lock(), conf_.getReceiverQueueSize()); |
| LOG_DEBUG("Sending FLOW command for consumer - " << consumerPtr->getConsumerId()); |
| } |
| } |
| |
| Result MultiTopicsConsumerImpl::pauseMessageListener() { |
| if (!messageListener_) { |
| return ResultInvalidConfiguration; |
| } |
| for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end(); |
| consumer++) { |
| (consumer->second)->pauseMessageListener(); |
| } |
| return ResultOk; |
| } |
| |
| Result MultiTopicsConsumerImpl::resumeMessageListener() { |
| if (!messageListener_) { |
| return ResultInvalidConfiguration; |
| } |
| for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end(); |
| consumer++) { |
| (consumer->second)->resumeMessageListener(); |
| } |
| return ResultOk; |
| } |
| |
| void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages() { |
| LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer."); |
| for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end(); |
| consumer++) { |
| (consumer->second)->redeliverUnacknowledgedMessages(); |
| } |
| unAckedMessageTrackerPtr_->clear(); |
| } |
| |
| void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) { |
| if (messageIds.empty()) { |
| return; |
| } |
| if (conf_.getConsumerType() != ConsumerShared && conf_.getConsumerType() != ConsumerKeyShared) { |
| redeliverUnacknowledgedMessages(); |
| return; |
| } |
| LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer."); |
| for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end(); |
| consumer++) { |
| (consumer->second)->redeliverUnacknowledgedMessages(messageIds); |
| } |
| } |
| |
| int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); } |
| |
| void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) { |
| Lock lock(mutex_); |
| if (state_ != Ready) { |
| lock.unlock(); |
| callback(ResultConsumerNotInitialized, BrokerConsumerStats()); |
| return; |
| } |
| MultiTopicsBrokerConsumerStatsPtr statsPtr = |
| std::make_shared<MultiTopicsBrokerConsumerStatsImpl>(numberTopicPartitions_->load()); |
| LatchPtr latchPtr = std::make_shared<Latch>(numberTopicPartitions_->load()); |
| int size = consumers_.size(); |
| lock.unlock(); |
| |
| ConsumerMap::const_iterator consumer = consumers_.begin(); |
| for (int i = 0; i < size; i++, consumer++) { |
| consumer->second->getBrokerConsumerStatsAsync( |
| std::bind(&MultiTopicsConsumerImpl::handleGetConsumerStats, shared_from_this(), |
| std::placeholders::_1, std::placeholders::_2, latchPtr, statsPtr, i, callback)); |
| } |
| } |
| |
| void MultiTopicsConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerStats brokerConsumerStats, |
| LatchPtr latchPtr, |
| MultiTopicsBrokerConsumerStatsPtr statsPtr, size_t index, |
| BrokerConsumerStatsCallback callback) { |
| Lock lock(mutex_); |
| if (res == ResultOk) { |
| latchPtr->countdown(); |
| statsPtr->add(brokerConsumerStats, index); |
| } else { |
| lock.unlock(); |
| callback(res, BrokerConsumerStats()); |
| return; |
| } |
| if (latchPtr->getCount() == 0) { |
| lock.unlock(); |
| callback(ResultOk, BrokerConsumerStats(statsPtr)); |
| } |
| } |
| |
| std::shared_ptr<TopicName> MultiTopicsConsumerImpl::topicNamesValid(const std::vector<std::string>& topics) { |
| TopicNamePtr topicNamePtr = std::shared_ptr<TopicName>(); |
| |
| // all topics name valid, and all topics have same namespace |
| for (std::vector<std::string>::const_iterator itr = topics.begin(); itr != topics.end(); itr++) { |
| // topic name valid |
| if (!(topicNamePtr = TopicName::get(*itr))) { |
| LOG_ERROR("Topic name invalid when init " << *itr); |
| return std::shared_ptr<TopicName>(); |
| } |
| } |
| |
| return topicNamePtr; |
| } |
| |
| void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) { |
| callback(ResultOperationNotSupported); |
| } |
| |
| void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { |
| callback(ResultOperationNotSupported); |
| } |
| |
| void MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) { |
| Lock lock(mutex_); |
| for (auto&& c : consumers_) { |
| c.second->setNegativeAcknowledgeEnabledForTesting(enabled); |
| } |
| } |