blob: dd530381aeeed9a2f7ec34344c8ca399da6db087 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "MultiTopicsConsumerImpl.h"
#include <stdexcept>
#include "ClientImpl.h"
#include "ConsumerImpl.h"
#include "ExecutorService.h"
#include "LogUtils.h"
#include "LookupService.h"
#include "MessageImpl.h"
#include "MessagesImpl.h"
#include "MultiResultCallback.h"
#include "MultiTopicsBrokerConsumerStatsImpl.h"
#include "TopicName.h"
#include "UnAckedMessageTrackerDisabled.h"
#include "UnAckedMessageTrackerEnabled.h"
DECLARE_LOG_OBJECT()
using namespace pulsar;
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName,
int numPartitions, const std::string& subscriptionName,
const ConsumerConfiguration& conf,
LookupServicePtr lookupServicePtr,
const ConsumerInterceptorsPtr& interceptors,
const Commands::SubscriptionMode subscriptionMode,
boost::optional<MessageId> startMessageId)
: MultiTopicsConsumerImpl(client, {topicName->toString()}, subscriptionName, topicName, conf,
lookupServicePtr, interceptors, subscriptionMode, startMessageId) {
topicsPartitions_[topicName->toString()] = numPartitions;
}
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
const std::string& subscriptionName, TopicNamePtr topicName,
const ConsumerConfiguration& conf,
LookupServicePtr lookupServicePtr,
const ConsumerInterceptorsPtr& interceptors,
const Commands::SubscriptionMode subscriptionMode,
boost::optional<MessageId> startMessageId)
: ConsumerImplBase(client, topicName ? topicName->toString() : "EmptyTopics",
Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf,
client->getListenerExecutorProvider()->get()),
client_(client),
subscriptionName_(subscriptionName),
conf_(conf),
incomingMessages_(conf.getReceiverQueueSize()),
messageListener_(conf.getMessageListener()),
lookupServicePtr_(lookupServicePtr),
numberTopicPartitions_(std::make_shared<std::atomic<int>>(0)),
topics_(topics),
subscriptionMode_(subscriptionMode),
startMessageId_(startMessageId),
interceptors_(interceptors) {
std::stringstream consumerStrStream;
consumerStrStream << "[Muti Topics Consumer: "
<< "TopicName - " << topic_ << " - Subscription - " << subscriptionName << "]";
consumerStr_ = consumerStrStream.str();
if (conf.getUnAckedMessagesTimeoutMs() != 0) {
if (conf.getTickDurationInMs() > 0) {
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this));
} else {
unAckedMessageTrackerPtr_.reset(
new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
}
} else {
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
}
auto partitionsUpdateInterval = static_cast<unsigned int>(client->conf().getPartitionsUpdateInterval());
if (partitionsUpdateInterval > 0) {
partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval);
lookupServicePtr_ = client->getLookup();
}
state_ = Pending;
}
void MultiTopicsConsumerImpl::start() {
if (topics_.empty()) {
State state = Pending;
if (state_.compare_exchange_strong(state, Ready)) {
LOG_DEBUG("No topics passed in when create MultiTopicsConsumer.");
multiTopicsConsumerCreatedPromise_.setValue(get_shared_this_ptr());
return;
} else {
LOG_ERROR("Consumer " << consumerStr_ << " in wrong state: " << state_);
multiTopicsConsumerCreatedPromise_.setFailed(ResultUnknownError);
return;
}
}
// start call subscribeOneTopicAsync for each single topic
int topicsNumber = topics_.size();
std::shared_ptr<std::atomic<int>> topicsNeedCreate = std::make_shared<std::atomic<int>>(topicsNumber);
// subscribe for each passed in topic
auto weakSelf = weak_from_this();
for (std::vector<std::string>::const_iterator itr = topics_.begin(); itr != topics_.end(); itr++) {
auto topic = *itr;
subscribeOneTopicAsync(topic).addListener(
[this, weakSelf, topic, topicsNeedCreate](Result result, const Consumer& consumer) {
auto self = weakSelf.lock();
if (self) {
handleOneTopicSubscribed(result, consumer, topic, topicsNeedCreate);
}
});
}
}
void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer consumer,
const std::string& topic,
std::shared_ptr<std::atomic<int>> topicsNeedCreate) {
if (result != ResultOk) {
state_ = Failed;
// Use the first failed result
auto expectedResult = ResultOk;
failedResult.compare_exchange_strong(expectedResult, result);
LOG_ERROR("Failed when subscribed to topic " << topic << " in TopicsConsumer. Error - " << result);
} else {
LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer ");
}
if (--(*topicsNeedCreate) == 0) {
State state = Pending;
if (state_.compare_exchange_strong(state, Ready)) {
LOG_INFO("Successfully Subscribed to Topics");
multiTopicsConsumerCreatedPromise_.setValue(get_shared_this_ptr());
} else {
LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result);
// unsubscribed all of the successfully subscribed partitioned consumers
// `shutdown()`, which set multiTopicsConsumerCreatedPromise_ with `failedResult`, will be called
// when `closeAsync` completes.
closeAsync(nullptr);
}
}
}
// subscribe for passed in topic
Future<Result, Consumer> MultiTopicsConsumerImpl::subscribeOneTopicAsync(const std::string& topic) {
TopicNamePtr topicName;
ConsumerSubResultPromisePtr topicPromise = std::make_shared<Promise<Result, Consumer>>();
if (!(topicName = TopicName::get(topic))) {
LOG_ERROR("TopicName invalid: " << topic);
topicPromise->setFailed(ResultInvalidTopicName);
return topicPromise->getFuture();
}
const auto state = state_.load();
if (state == Closed || state == Closing) {
LOG_ERROR("MultiTopicsConsumer already closed when subscribe.");
topicPromise->setFailed(ResultAlreadyClosed);
return topicPromise->getFuture();
}
// subscribe for each partition, when all partitions completed, complete promise
Lock lock(mutex_);
auto entry = topicsPartitions_.find(topic);
if (entry == topicsPartitions_.end()) {
lock.unlock();
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
[this, topicName, topicPromise](Result result, const LookupDataResultPtr& lookupDataResult) {
if (result != ResultOk) {
LOG_ERROR("Error Checking/Getting Partition Metadata while MultiTopics Subscribing- "
<< consumerStr_ << " result: " << result)
topicPromise->setFailed(result);
return;
}
subscribeTopicPartitions(lookupDataResult->getPartitions(), topicName, subscriptionName_,
topicPromise);
});
} else {
auto numPartitions = entry->second;
lock.unlock();
subscribeTopicPartitions(numPartitions, topicName, subscriptionName_, topicPromise);
}
return topicPromise->getFuture();
}
void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicNamePtr topicName,
const std::string& consumerName,
ConsumerSubResultPromisePtr topicSubResultPromise) {
std::shared_ptr<ConsumerImpl> consumer;
ConsumerConfiguration config = conf_.clone();
auto client = client_.lock();
if (!client) {
topicSubResultPromise->setFailed(ResultAlreadyClosed);
return;
}
ExecutorServicePtr internalListenerExecutor = client->getPartitionListenerExecutorProvider()->get();
auto weakSelf = weak_from_this();
config.setMessageListener([this, weakSelf](Consumer consumer, const Message& msg) {
auto self = weakSelf.lock();
if (self) {
messageReceived(consumer, msg);
}
});
int partitions = numPartitions == 0 ? 1 : numPartitions;
// Apply total limit of receiver queue size across partitions
config.setReceiverQueueSize(
std::min(conf_.getReceiverQueueSize(),
(int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / partitions)));
Lock lock(mutex_);
topicsPartitions_[topicName->toString()] = partitions;
lock.unlock();
numberTopicPartitions_->fetch_add(partitions);
std::shared_ptr<std::atomic<int>> partitionsNeedCreate = std::make_shared<std::atomic<int>>(partitions);
// non-partitioned topic
if (numPartitions == 0) {
// We don't have to add partition-n suffix
try {
consumer = std::make_shared<ConsumerImpl>(client, topicName->toString(), subscriptionName_,
config, topicName->isPersistent(), interceptors_,
internalListenerExecutor, true, NonPartitioned,
subscriptionMode_, startMessageId_);
} catch (const std::runtime_error& e) {
LOG_ERROR("Failed to create ConsumerImpl for " << topicName->toString() << ": " << e.what());
topicSubResultPromise->setFailed(ResultConnectError);
return;
}
consumer->getConsumerCreatedFuture().addListener(std::bind(
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
consumers_.emplace(topicName->toString(), consumer);
LOG_DEBUG("Creating Consumer for - " << topicName << " - " << consumerStr_);
consumer->start();
} else {
std::vector<ConsumerImplPtr> consumers;
for (int i = 0; i < numPartitions; i++) {
std::string topicPartitionName = topicName->getTopicPartitionName(i);
try {
consumer = std::make_shared<ConsumerImpl>(client, topicPartitionName, subscriptionName_,
config, topicName->isPersistent(), interceptors_,
internalListenerExecutor, true, Partitioned,
subscriptionMode_, startMessageId_);
} catch (const std::runtime_error& e) {
LOG_ERROR("Failed to create ConsumerImpl for " << topicPartitionName << ": " << e.what());
topicSubResultPromise->setFailed(ResultConnectError);
return;
}
consumers.emplace_back(consumer);
}
for (size_t i = 0; i < consumers.size(); i++) {
std::string topicPartitionName = topicName->getTopicPartitionName(i);
auto&& consumer = consumers[i];
consumer->getConsumerCreatedFuture().addListener(std::bind(
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
consumer->setPartitionIndex(i);
consumers_.emplace(topicPartitionName, consumer);
LOG_DEBUG("Creating Consumer for - " << topicPartitionName << " - " << consumerStr_);
consumer->start();
}
}
}
void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
std::shared_ptr<std::atomic<int>> partitionsNeedCreate,
ConsumerSubResultPromisePtr topicSubResultPromise) {
if (state_ == Failed) {
// one of the consumer creation failed, and we are cleaning up
topicSubResultPromise->setFailed(ResultAlreadyClosed);
LOG_ERROR("Unable to create Consumer " << consumerStr_ << " state == Failed, result: " << result);
return;
}
int previous = partitionsNeedCreate->fetch_sub(1);
assert(previous > 0);
if (result != ResultOk) {
topicSubResultPromise->setFailed(result);
LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result);
return;
}
LOG_INFO("Successfully Subscribed to a single partition of topic in TopicsConsumer. "
<< "Partitions need to create : " << previous - 1);
if (partitionsNeedCreate->load() == 0) {
if (partitionsUpdateTimer_) {
runPartitionUpdateTask();
}
topicSubResultPromise->setValue(Consumer(get_shared_this_ptr()));
}
}
void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "] Unsubscribing");
auto callback = [this, originalCallback](Result result) {
if (result == ResultOk) {
shutdown();
LOG_INFO(getName() << "Unsubscribed successfully");
} else {
state_ = Ready;
LOG_WARN(getName() << "Failed to unsubscribe: " << result);
}
if (originalCallback) {
originalCallback(result);
}
};
const auto state = state_.load();
if (state == Closing || state == Closed) {
callback(ResultAlreadyClosed);
return;
}
state_ = Closing;
std::shared_ptr<std::atomic<int>> consumerUnsubed = std::make_shared<std::atomic<int>>(0);
auto self = get_shared_this_ptr();
int numConsumers = 0;
consumers_.forEachValue(
[&numConsumers, &consumerUnsubed, &self, callback](const ConsumerImplPtr& consumer) {
numConsumers++;
consumer->unsubscribeAsync([self, consumerUnsubed, callback](Result result) {
self->handleUnsubscribedAsync(result, consumerUnsubed, callback);
});
});
if (numConsumers == 0) {
// No need to unsubscribe, since the list matching the regex was empty
callback(ResultOk);
}
}
void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
std::shared_ptr<std::atomic<int>> consumerUnsubed,
ResultCallback callback) {
(*consumerUnsubed)++;
if (result != ResultOk) {
state_ = Failed;
LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: "
<< result << " subscription - " << subscriptionName_);
}
if (consumerUnsubed->load() == numberTopicPartitions_->load()) {
LOG_DEBUG("Unsubscribed all of the partition consumer for TopicsConsumer. - " << consumerStr_);
Result result1 = (state_ != Failed) ? ResultOk : ResultUnknownError;
// The `callback` is a wrapper of user provided callback, it's not null and will call `shutdown()` if
// unsubscribe succeeds.
callback(result1);
return;
}
}
void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic, ResultCallback callback) {
Lock lock(mutex_);
std::map<std::string, int>::iterator it = topicsPartitions_.find(topic);
if (it == topicsPartitions_.end()) {
lock.unlock();
LOG_ERROR("TopicsConsumer does not subscribe topic : " << topic << " subscription - "
<< subscriptionName_);
callback(ResultTopicNotFound);
return;
}
int numberPartitions = it->second;
lock.unlock();
const auto state = state_.load();
if (state == Closing || state == Closed) {
LOG_ERROR("TopicsConsumer already closed when unsubscribe topic: " << topic << " subscription - "
<< subscriptionName_);
callback(ResultAlreadyClosed);
return;
}
TopicNamePtr topicName;
if (!(topicName = TopicName::get(topic))) {
LOG_ERROR("TopicName invalid: " << topic);
callback(ResultUnknownError);
}
std::shared_ptr<std::atomic<int>> consumerUnsubed = std::make_shared<std::atomic<int>>(0);
for (int i = 0; i < numberPartitions; i++) {
std::string topicPartitionName = topicName->getTopicPartitionName(i);
auto optConsumer = consumers_.find(topicPartitionName);
if (!optConsumer) {
LOG_ERROR("TopicsConsumer not subscribed on topicPartitionName: " << topicPartitionName);
callback(ResultUnknownError);
continue;
}
optConsumer.value()->unsubscribeAsync(
std::bind(&MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync, get_shared_this_ptr(),
std::placeholders::_1, consumerUnsubed, numberPartitions, topicName, topicPartitionName,
callback));
}
}
void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
Result result, std::shared_ptr<std::atomic<int>> consumerUnsubed, int numberPartitions,
TopicNamePtr topicNamePtr, std::string& topicPartitionName, ResultCallback callback) {
(*consumerUnsubed)++;
if (result != ResultOk) {
state_ = Failed;
LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: "
<< result << " topicPartitionName - " << topicPartitionName);
}
LOG_DEBUG("Successfully Unsubscribed one Consumer. topicPartitionName - " << topicPartitionName);
auto optConsumer = consumers_.remove(topicPartitionName);
if (optConsumer) {
optConsumer.value()->pauseMessageListener();
}
if (consumerUnsubed->load() == numberPartitions) {
LOG_DEBUG("Unsubscribed all of the partition consumer for TopicsConsumer. - " << consumerStr_);
std::map<std::string, int>::iterator it = topicsPartitions_.find(topicNamePtr->toString());
if (it != topicsPartitions_.end()) {
numberTopicPartitions_->fetch_sub(numberPartitions);
Lock lock(mutex_);
topicsPartitions_.erase(it);
lock.unlock();
}
if (state_ != Failed) {
callback(ResultOk);
} else {
callback(ResultUnknownError);
}
unAckedMessageTrackerPtr_->removeTopicMessage(topicNamePtr->toString());
return;
}
}
void MultiTopicsConsumerImpl::closeAsync(ResultCallback originalCallback) {
std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{get_shared_this_ptr()};
auto callback = [weakSelf, originalCallback](Result result) {
auto self = weakSelf.lock();
if (self) {
self->shutdown();
if (result != ResultOk) {
LOG_WARN(self->getName() << "Failed to close consumer: " << result);
if (result != ResultAlreadyClosed) {
self->state_ = Failed;
}
}
}
if (originalCallback) {
originalCallback(result);
}
};
const auto state = state_.load();
if (state == Closing || state == Closed) {
callback(ResultAlreadyClosed);
return;
}
state_ = Closing;
cancelTimers();
auto consumers = consumers_.move();
*numberTopicPartitions_ = 0;
if (consumers.empty()) {
LOG_DEBUG("TopicsConsumer have no consumers to close "
<< " topic" << topic_ << " subscription - " << subscriptionName_);
callback(ResultAlreadyClosed);
return;
}
auto numConsumers = std::make_shared<std::atomic<size_t>>(consumers.size());
for (auto&& kv : consumers) {
auto& name = kv.first;
auto& consumer = kv.second;
consumer->closeAsync([name, numConsumers, callback](Result result) {
const auto numConsumersLeft = --*numConsumers;
LOG_DEBUG("Closing the consumer for partition - " << name << " numConsumersLeft - "
<< numConsumersLeft);
if (result != ResultOk) {
LOG_ERROR("Closing the consumer failed for partition - " << name << " with error - "
<< result);
}
if (numConsumersLeft == 0) {
callback(result);
}
});
}
// fail pending receive
failPendingReceiveCallback();
failPendingBatchReceiveCallback();
// cancel timer
batchReceiveTimer_->cancel();
}
void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic()
<< " message:" << msg.getDataAsString());
msg.impl_->setTopicName(consumer.impl_->topic_);
Lock lock(pendingReceiveMutex_);
if (!pendingReceives_.empty()) {
ReceiveCallback callback = pendingReceives_.front();
pendingReceives_.pop();
lock.unlock();
auto weakSelf = weak_from_this();
listenerExecutor_->postWork([this, weakSelf, msg, callback]() {
auto self = weakSelf.lock();
if (self) {
notifyPendingReceivedCallback(ResultOk, msg, callback);
}
});
return;
}
if (incomingMessages_.full()) {
lock.unlock();
}
// add message to block queue.
// when messages queue is full, will block listener thread on ConsumerImpl,
// then will not send permits to broker, will broker stop push message.
incomingMessages_.push(msg);
incomingMessagesSize_.fetch_add(msg.getLength());
// try trigger pending batch messages
Lock batchOptionLock(batchReceiveOptionMutex_);
if (hasEnoughMessagesForBatchReceive()) {
ConsumerImplBase::notifyBatchPendingReceivedCallback();
}
batchOptionLock.unlock();
if (messageListener_) {
listenerExecutor_->postWork(
std::bind(&MultiTopicsConsumerImpl::internalListener, get_shared_this_ptr(), consumer));
}
}
void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
Message m;
incomingMessages_.pop(m);
try {
Consumer self{get_shared_this_ptr()};
messageListener_(self, m);
messageProcessed(m);
} catch (const std::exception& e) {
LOG_ERROR("Exception thrown from listener of Partitioned Consumer" << e.what());
}
}
Result MultiTopicsConsumerImpl::receive(Message& msg) {
if (state_ != Ready) {
return ResultAlreadyClosed;
}
if (messageListener_) {
LOG_ERROR("Can not receive when a listener has been set");
return ResultInvalidConfiguration;
}
incomingMessages_.pop(msg);
messageProcessed(msg);
return ResultOk;
}
Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
if (state_ != Ready) {
return ResultAlreadyClosed;
}
if (messageListener_) {
LOG_ERROR("Can not receive when a listener has been set");
return ResultInvalidConfiguration;
}
if (incomingMessages_.pop(msg, std::chrono::milliseconds(timeout))) {
messageProcessed(msg);
return ResultOk;
} else {
if (state_ != Ready) {
return ResultAlreadyClosed;
}
return ResultTimeout;
}
}
void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback callback) {
Message msg;
// fail the callback if consumer is closing or closed
if (state_ != Ready) {
callback(ResultAlreadyClosed, msg);
return;
}
Lock lock(pendingReceiveMutex_);
if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
lock.unlock();
messageProcessed(msg);
callback(ResultOk, msg);
} else {
pendingReceives_.push(callback);
}
}
void MultiTopicsConsumerImpl::failPendingReceiveCallback() {
Message msg;
incomingMessages_.close();
Lock lock(pendingReceiveMutex_);
while (!pendingReceives_.empty()) {
ReceiveCallback callback = pendingReceives_.front();
pendingReceives_.pop();
auto weakSelf = weak_from_this();
listenerExecutor_->postWork([this, weakSelf, msg, callback]() {
auto self = weakSelf.lock();
if (self) {
notifyPendingReceivedCallback(ResultAlreadyClosed, msg, callback);
}
});
}
lock.unlock();
}
void MultiTopicsConsumerImpl::notifyPendingReceivedCallback(Result result, const Message& msg,
const ReceiveCallback& callback) {
if (result == ResultOk) {
unAckedMessageTrackerPtr_->add(msg.getMessageId());
}
callback(result, msg);
}
void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
if (state_ != Ready) {
interceptors_->onAcknowledge(Consumer(shared_from_this()), ResultAlreadyClosed, msgId);
callback(ResultAlreadyClosed);
return;
}
const std::string& topicPartitionName = msgId.getTopicName();
if (topicPartitionName.empty()) {
LOG_ERROR("MessageId without a topic name cannot be acknowledged for a multi-topics consumer");
callback(ResultOperationNotSupported);
return;
}
auto optConsumer = consumers_.find(topicPartitionName);
if (optConsumer) {
unAckedMessageTrackerPtr_->remove(msgId);
optConsumer.value()->acknowledgeAsync(msgId, callback);
} else {
LOG_ERROR("Message of topic: " << topicPartitionName << " not in unAckedMessageTracker");
callback(ResultUnknownError);
}
}
void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList, ResultCallback callback) {
if (state_ != Ready) {
callback(ResultAlreadyClosed);
return;
}
std::unordered_map<std::string, MessageIdList> topicToMessageId;
for (const MessageId& messageId : messageIdList) {
auto topicName = messageId.getTopicName();
if (topicName.empty()) {
LOG_ERROR("MessageId without a topic name cannot be acknowledged for a multi-topics consumer");
callback(ResultOperationNotSupported);
return;
}
topicToMessageId[topicName].emplace_back(messageId);
}
auto needCallBack = std::make_shared<std::atomic<int>>(topicToMessageId.size());
auto cb = [callback, needCallBack](Result result) {
if (result != ResultOk) {
LOG_ERROR("Filed when acknowledge list: " << result);
// set needCallBack is -1 to avoid repeated callback.
needCallBack->store(-1);
callback(result);
return;
}
if (--(*needCallBack) == 0) {
callback(result);
}
};
for (const auto& kv : topicToMessageId) {
auto optConsumer = consumers_.find(kv.first);
if (optConsumer) {
unAckedMessageTrackerPtr_->remove(kv.second);
optConsumer.value()->acknowledgeAsync(kv.second, cb);
} else {
LOG_ERROR("Message of topic: " << kv.first << " not in consumers");
callback(ResultUnknownError);
}
}
}
void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) {
msgId.getTopicName();
auto optConsumer = consumers_.find(msgId.getTopicName());
if (optConsumer) {
unAckedMessageTrackerPtr_->removeMessagesTill(msgId);
optConsumer.value()->acknowledgeCumulativeAsync(msgId, callback);
}
}
void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
auto optConsumer = consumers_.find(msgId.getTopicName());
if (optConsumer) {
unAckedMessageTrackerPtr_->remove(msgId);
optConsumer.value()->negativeAcknowledge(msgId);
}
}
MultiTopicsConsumerImpl::~MultiTopicsConsumerImpl() { shutdown(); }
Future<Result, ConsumerImplBaseWeakPtr> MultiTopicsConsumerImpl::getConsumerCreatedFuture() {
return multiTopicsConsumerCreatedPromise_.getFuture();
}
const std::string& MultiTopicsConsumerImpl::getSubscriptionName() const { return subscriptionName_; }
const std::string& MultiTopicsConsumerImpl::getTopic() const { return *topic_; }
const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; }
void MultiTopicsConsumerImpl::shutdown() {
cancelTimers();
incomingMessages_.clear();
topicsPartitions_.clear();
unAckedMessageTrackerPtr_->clear();
interceptors_->close();
auto client = client_.lock();
if (client) {
client->cleanupConsumer(this);
}
consumers_.clear();
topicsPartitions_.clear();
if (failedResult != ResultOk) {
multiTopicsConsumerCreatedPromise_.setFailed(failedResult);
} else {
multiTopicsConsumerCreatedPromise_.setFailed(ResultAlreadyClosed);
}
state_ = Closed;
}
bool MultiTopicsConsumerImpl::isClosed() { return state_ == Closed; }
bool MultiTopicsConsumerImpl::isOpen() { return state_ == Ready; }
void MultiTopicsConsumerImpl::receiveMessages() {
const auto receiverQueueSize = conf_.getReceiverQueueSize();
consumers_.forEachValue([receiverQueueSize](const ConsumerImplPtr& consumer) {
consumer->sendFlowPermitsToBroker(consumer->getCnx().lock(), receiverQueueSize);
LOG_DEBUG("Sending FLOW command for consumer - " << consumer->getConsumerId());
});
}
Result MultiTopicsConsumerImpl::pauseMessageListener() {
if (!messageListener_) {
return ResultInvalidConfiguration;
}
consumers_.forEachValue([](const ConsumerImplPtr& consumer) { consumer->pauseMessageListener(); });
return ResultOk;
}
Result MultiTopicsConsumerImpl::resumeMessageListener() {
if (!messageListener_) {
return ResultInvalidConfiguration;
}
consumers_.forEachValue([](const ConsumerImplPtr& consumer) { consumer->resumeMessageListener(); });
return ResultOk;
}
void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages() {
LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer.");
consumers_.forEachValue(
[](const ConsumerImplPtr& consumer) { consumer->redeliverUnacknowledgedMessages(); });
unAckedMessageTrackerPtr_->clear();
}
void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) {
if (messageIds.empty()) {
return;
}
if (conf_.getConsumerType() != ConsumerShared && conf_.getConsumerType() != ConsumerKeyShared) {
redeliverUnacknowledgedMessages();
return;
}
LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer.");
std::unordered_map<std::string, std::set<MessageId>> topicToMessageId;
for (const MessageId& messageId : messageIds) {
auto topicName = messageId.getTopicName();
topicToMessageId[topicName].emplace(messageId);
}
for (const auto& kv : topicToMessageId) {
auto optConsumer = consumers_.find(kv.first);
if (optConsumer) {
optConsumer.value()->redeliverUnacknowledgedMessages(kv.second);
} else {
LOG_ERROR("Message of topic: " << kv.first << " not in consumers");
}
}
}
int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return incomingMessages_.size(); }
void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) {
if (state_ != Ready) {
callback(ResultConsumerNotInitialized, BrokerConsumerStats());
return;
}
Lock lock(mutex_);
MultiTopicsBrokerConsumerStatsPtr statsPtr =
std::make_shared<MultiTopicsBrokerConsumerStatsImpl>(numberTopicPartitions_->load());
LatchPtr latchPtr = std::make_shared<Latch>(numberTopicPartitions_->load());
lock.unlock();
size_t i = 0;
consumers_.forEachValue([this, &latchPtr, &statsPtr, &i, callback](const ConsumerImplPtr& consumer) {
size_t index = i++;
auto weakSelf = weak_from_this();
consumer->getBrokerConsumerStatsAsync(
[this, weakSelf, latchPtr, statsPtr, index, callback](Result result, BrokerConsumerStats stats) {
auto self = weakSelf.lock();
if (self) {
handleGetConsumerStats(result, stats, latchPtr, statsPtr, index, callback);
}
});
});
}
void MultiTopicsConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) {
callback(ResultOperationNotSupported, GetLastMessageIdResponse());
}
void MultiTopicsConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerStats brokerConsumerStats,
LatchPtr latchPtr,
MultiTopicsBrokerConsumerStatsPtr statsPtr, size_t index,
BrokerConsumerStatsCallback callback) {
Lock lock(mutex_);
if (res == ResultOk) {
latchPtr->countdown();
statsPtr->add(brokerConsumerStats, index);
} else {
lock.unlock();
callback(res, BrokerConsumerStats());
return;
}
if (latchPtr->getCount() == 0) {
lock.unlock();
callback(ResultOk, BrokerConsumerStats(statsPtr));
}
}
std::shared_ptr<TopicName> MultiTopicsConsumerImpl::topicNamesValid(const std::vector<std::string>& topics) {
TopicNamePtr topicNamePtr = std::shared_ptr<TopicName>();
// all topics name valid, and all topics have same namespace
for (std::vector<std::string>::const_iterator itr = topics.begin(); itr != topics.end(); itr++) {
// topic name valid
if (!(topicNamePtr = TopicName::get(*itr))) {
LOG_ERROR("Topic name invalid when init " << *itr);
return std::shared_ptr<TopicName>();
}
}
return topicNamePtr;
}
void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
callback(ResultOperationNotSupported);
}
void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
if (state_ != Ready) {
callback(ResultAlreadyClosed);
return;
}
MultiResultCallback multiResultCallback(callback, consumers_.size());
consumers_.forEachValue([&timestamp, &multiResultCallback](ConsumerImplPtr consumer) {
consumer->seekAsync(timestamp, multiResultCallback);
});
}
void MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) {
consumers_.forEachValue([enabled](const ConsumerImplPtr& consumer) {
consumer->setNegativeAcknowledgeEnabledForTesting(enabled);
});
}
bool MultiTopicsConsumerImpl::isConnected() const {
if (state_ != Ready) {
return false;
}
return !consumers_.findFirstValueIf(
[](const ConsumerImplPtr& consumer) { return !consumer->isConnected(); });
}
uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
uint64_t numberOfConnectedConsumer = 0;
consumers_.forEachValue([&numberOfConnectedConsumer](const ConsumerImplPtr& consumer) {
if (consumer->isConnected()) {
numberOfConnectedConsumer++;
}
});
return numberOfConnectedConsumer;
}
void MultiTopicsConsumerImpl::runPartitionUpdateTask() {
partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
auto weakSelf = weak_from_this();
partitionsUpdateTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
// If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
// cannot continue at this time, and the request needs to be ignored.
auto self = weakSelf.lock();
if (self && !ec) {
self->topicPartitionUpdate();
}
});
}
void MultiTopicsConsumerImpl::topicPartitionUpdate() {
using namespace std::placeholders;
Lock lock(mutex_);
auto topicsPartitions = topicsPartitions_;
lock.unlock();
for (const auto& item : topicsPartitions) {
auto topicName = TopicName::get(item.first);
auto currentNumPartitions = item.second;
auto weakSelf = weak_from_this();
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
[this, weakSelf, topicName, currentNumPartitions](Result result,
const LookupDataResultPtr& lookupDataResult) {
auto self = weakSelf.lock();
if (self) {
this->handleGetPartitions(topicName, result, lookupDataResult, currentNumPartitions);
}
});
}
}
void MultiTopicsConsumerImpl::handleGetPartitions(TopicNamePtr topicName, Result result,
const LookupDataResultPtr& lookupDataResult,
int currentNumPartitions) {
if (state_ != Ready) {
return;
}
if (!result) {
const auto newNumPartitions = static_cast<unsigned int>(lookupDataResult->getPartitions());
if (newNumPartitions > currentNumPartitions) {
LOG_INFO("new partition count: " << newNumPartitions
<< " current partition count: " << currentNumPartitions);
auto partitionsNeedCreate =
std::make_shared<std::atomic<int>>(newNumPartitions - currentNumPartitions);
ConsumerSubResultPromisePtr topicPromise = std::make_shared<Promise<Result, Consumer>>();
Lock lock(mutex_);
topicsPartitions_[topicName->toString()] = newNumPartitions;
lock.unlock();
numberTopicPartitions_->fetch_add(newNumPartitions - currentNumPartitions);
for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) {
subscribeSingleNewConsumer(newNumPartitions, topicName, i, topicPromise,
partitionsNeedCreate);
}
// `runPartitionUpdateTask()` will be called in `handleSingleConsumerCreated()`
return;
}
} else {
LOG_WARN("Failed to getPartitionMetadata: " << strResult(result));
}
runPartitionUpdateTask();
}
void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
int numPartitions, TopicNamePtr topicName, int partitionIndex,
ConsumerSubResultPromisePtr topicSubResultPromise,
std::shared_ptr<std::atomic<int>> partitionsNeedCreate) {
ConsumerConfiguration config = conf_.clone();
auto client = client_.lock();
if (!client) {
topicSubResultPromise->setFailed(ResultAlreadyClosed);
return;
}
ExecutorServicePtr internalListenerExecutor = client->getPartitionListenerExecutorProvider()->get();
auto weakSelf = weak_from_this();
config.setMessageListener([this, weakSelf](Consumer consumer, const Message& msg) {
auto self = weakSelf.lock();
if (self) {
messageReceived(consumer, msg);
}
});
// Apply total limit of receiver queue size across partitions
config.setReceiverQueueSize(
std::min(conf_.getReceiverQueueSize(),
(int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions)));
std::string topicPartitionName = topicName->getTopicPartitionName(partitionIndex);
auto consumer = std::make_shared<ConsumerImpl>(
client, topicPartitionName, subscriptionName_, config, topicName->isPersistent(), interceptors_,
internalListenerExecutor, true, Partitioned, subscriptionMode_, startMessageId_);
consumer->getConsumerCreatedFuture().addListener(
[this, weakSelf, partitionsNeedCreate, topicSubResultPromise](
Result result, const ConsumerImplBaseWeakPtr& consumerImplBaseWeakPtr) {
auto self = weakSelf.lock();
if (self) {
handleSingleConsumerCreated(result, consumerImplBaseWeakPtr, partitionsNeedCreate,
topicSubResultPromise);
}
});
consumer->setPartitionIndex(partitionIndex);
consumer->start();
consumers_.emplace(topicPartitionName, consumer);
LOG_INFO("Add Creating Consumer for - " << topicPartitionName << " - " << consumerStr_
<< " consumerSize: " << consumers_.size());
}
bool MultiTopicsConsumerImpl::hasEnoughMessagesForBatchReceive() const {
if (batchReceivePolicy_.getMaxNumMessages() <= 0 && batchReceivePolicy_.getMaxNumBytes() <= 0) {
return false;
}
return (batchReceivePolicy_.getMaxNumMessages() > 0 &&
incomingMessages_.size() >= batchReceivePolicy_.getMaxNumMessages()) ||
(batchReceivePolicy_.getMaxNumBytes() > 0 &&
incomingMessagesSize_ >= batchReceivePolicy_.getMaxNumBytes());
}
void MultiTopicsConsumerImpl::notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) {
auto messages = std::make_shared<MessagesImpl>(batchReceivePolicy_.getMaxNumMessages(),
batchReceivePolicy_.getMaxNumBytes());
Message msg;
while (incomingMessages_.popIf(
msg, [&messages](const Message& peekMsg) { return messages->canAdd(peekMsg); })) {
messageProcessed(msg);
messages->add(msg);
}
auto weakSelf = weak_from_this();
listenerExecutor_->postWork([weakSelf, callback, messages]() {
auto self = weakSelf.lock();
if (self) {
callback(ResultOk, messages->getMessageList());
}
});
}
void MultiTopicsConsumerImpl::messageProcessed(Message& msg) {
incomingMessagesSize_.fetch_sub(msg.getLength());
unAckedMessageTrackerPtr_->add(msg.getMessageId());
}
std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImpl::get_shared_this_ptr() {
return std::dynamic_pointer_cast<MultiTopicsConsumerImpl>(shared_from_this());
}
void MultiTopicsConsumerImpl::beforeConnectionChange(ClientConnection& cnx) {
throw std::runtime_error("The connection_ field should not be modified for a MultiTopicsConsumerImpl");
}
void MultiTopicsConsumerImpl::cancelTimers() noexcept {
if (partitionsUpdateTimer_) {
boost::system::error_code ec;
partitionsUpdateTimer_->cancel(ec);
}
}
void MultiTopicsConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback) {
if (incomingMessagesSize_ > 0) {
callback(ResultOk, true);
return;
}
auto hasMessageAvailable = std::make_shared<std::atomic<bool>>();
auto needCallBack = std::make_shared<std::atomic<int>>(consumers_.size());
auto self = get_shared_this_ptr();
consumers_.forEachValue([self, needCallBack, callback, hasMessageAvailable](ConsumerImplPtr consumer) {
consumer->hasMessageAvailableAsync(
[self, needCallBack, callback, hasMessageAvailable](Result result, bool hasMsg) {
if (result != ResultOk) {
LOG_ERROR("Filed when acknowledge list: " << result);
// set needCallBack is -1 to avoid repeated callback.
needCallBack->store(-1);
callback(result, false);
return;
}
if (hasMsg) {
hasMessageAvailable->store(hasMsg);
}
if (--(*needCallBack) == 0) {
callback(result, hasMessageAvailable->load() || self->incomingMessagesSize_ > 0);
}
});
});
}