blob: 591bd568c35763a998990492b4e658f6b1256b2c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ProcessQueueImpl.h"
#include <atomic>
#include <chrono>
#include <memory>
#include <system_error>
#include <utility>
#include "ClientManagerImpl.h"
#include "MetadataConstants.h"
#include "Protocol.h"
#include "PushConsumer.h"
#include "Signature.h"
#include "rocketmq/MessageListener.h"
#include "rocketmq/MessageModel.h"
using namespace std::chrono;
ROCKETMQ_NAMESPACE_BEGIN
ProcessQueueImpl::ProcessQueueImpl(MQMessageQueue message_queue, FilterExpression filter_expression,
std::weak_ptr<PushConsumer> consumer, std::shared_ptr<ClientManager> client_instance)
: message_queue_(std::move(message_queue)), filter_expression_(std::move(filter_expression)),
invisible_time_(MixAll::millisecondsOf(MixAll::DEFAULT_INVISIBLE_TIME_)),
simple_name_(message_queue_.simpleName()), consumer_(std::move(consumer)),
client_manager_(std::move(client_instance)), cached_message_quantity_(0), cached_message_memory_(0) {
SPDLOG_DEBUG("Created ProcessQueue={}", simpleName());
}
ProcessQueueImpl::~ProcessQueueImpl() {
SPDLOG_INFO("ProcessQueue={} should have been re-balanced away, thus, is destructed", simpleName());
}
void ProcessQueueImpl::callback(std::shared_ptr<ReceiveMessageCallback> callback) {
receive_callback_ = std::move(callback);
}
bool ProcessQueueImpl::expired() const {
auto duration = std::chrono::steady_clock::now() - idle_since_;
if (duration > MixAll::PROCESS_QUEUE_EXPIRATION_THRESHOLD_) {
SPDLOG_WARN("ProcessQueue={} is expired. It remains idle for {}ms", simpleName(), MixAll::millisecondsOf(duration));
return true;
}
return false;
}
bool ProcessQueueImpl::shouldThrottle() const {
auto consumer = consumer_.lock();
if (!consumer) {
return false;
}
std::size_t quantity = cached_message_quantity_.load(std::memory_order_relaxed);
uint32_t quantity_threshold = consumer->maxCachedMessageQuantity();
uint64_t memory_threshold = consumer->maxCachedMessageMemory();
bool need_throttle = quantity >= quantity_threshold;
if (need_throttle) {
SPDLOG_INFO("{}: Number of locally cached messages is {}, which exceeds threshold={}", simple_name_, quantity,
quantity_threshold);
return true;
}
if (memory_threshold) {
uint64_t bytes = cached_message_memory_.load(std::memory_order_relaxed);
need_throttle = bytes >= memory_threshold;
if (need_throttle) {
SPDLOG_INFO("{}: Locally cached messages take {} bytes, which exceeds threshold={}", simple_name_, bytes,
memory_threshold);
return true;
}
}
return false;
}
void ProcessQueueImpl::receiveMessage() {
auto consumer = consumer_.lock();
if (!consumer) {
return;
}
switch (consumer->messageModel()) {
case MessageModel::CLUSTERING: {
popMessage();
break;
}
case MessageModel::BROADCASTING: {
pullMessage();
break;
}
}
}
void ProcessQueueImpl::popMessage() {
rmq::ReceiveMessageRequest request;
absl::flat_hash_map<std::string, std::string> metadata;
auto consumer_client = consumer_.lock();
if (!consumer_client) {
return;
}
Signature::sign(consumer_client.get(), metadata);
wrapPopMessageRequest(metadata, request);
syncIdleState();
SPDLOG_DEBUG("Try to pop message from {}", message_queue_.simpleName());
client_manager_->receiveMessage(message_queue_.serviceAddress(), metadata, request,
absl::ToChronoMilliseconds(consumer_client->getLongPollingTimeout()),
receive_callback_);
}
void ProcessQueueImpl::pullMessage() {
rmq::PullMessageRequest request;
absl::flat_hash_map<std::string, std::string> metadata;
auto consumer = consumer_.lock();
if (!consumer) {
SPDLOG_INFO("Owner consumer has destructed");
return;
}
Signature::sign(consumer.get(), metadata);
wrapPullMessageRequest(request);
syncIdleState();
SPDLOG_DEBUG("Try to pull message from {}", message_queue_.simpleName());
auto timeout = consumer->getLongPollingTimeout();
auto callback = [this](const std::error_code& ec, const ReceiveMessageResult& result) {
receive_callback_->onCompletion(ec, result);
};
client_manager_->pullMessage(message_queue_.serviceAddress(), metadata, request, absl::ToChronoMilliseconds(timeout),
callback);
}
bool ProcessQueueImpl::hasPendingMessages() const {
absl::MutexLock lk(&messages_mtx_);
return !cached_messages_.empty();
}
void ProcessQueueImpl::cacheMessages(const std::vector<MQMessageExt>& messages) {
auto consumer = consumer_.lock();
if (!consumer) {
return;
}
{
absl::MutexLock messages_lock_guard(&messages_mtx_);
absl::MutexLock offsets_lock_guard(&offsets_mtx_);
for (const auto& message : messages) {
const std::string& msg_id = message.getMsgId();
if (!filter_expression_.accept(message)) {
const std::string& topic = message.getTopic();
auto callback = [topic, msg_id](const std::error_code& ec) {
if (ec) {
SPDLOG_WARN(
"Failed to ack message[Topic={}, MsgId={}] directly as it fails to pass filter expression. Cause: {}",
topic, msg_id, ec.message());
} else {
SPDLOG_DEBUG("Ack message[Topic={}, MsgId={}] directly as it fails to pass filter expression", topic,
msg_id);
}
};
consumer->ack(message, callback);
continue;
}
cached_messages_.emplace_back(message);
cached_message_quantity_.fetch_add(1, std::memory_order_relaxed);
cached_message_memory_.fetch_add(message.getBody().size(), std::memory_order_relaxed);
if (MessageModel::BROADCASTING == consumer->messageModel()) {
if (offsets_.size() == 1 && offsets_.begin()->released_) {
int64_t previously_released = offsets_.begin()->offset_;
offsets_.erase(OffsetRecord(previously_released));
}
offsets_.emplace(message.getQueueOffset());
}
}
}
}
bool ProcessQueueImpl::take(uint32_t batch_size, std::vector<MQMessageExt>& messages) {
absl::MutexLock lock(&messages_mtx_);
if (cached_messages_.empty()) {
return false;
}
for (auto it = cached_messages_.begin(); it != cached_messages_.end();) {
if (0 == batch_size--) {
break;
}
messages.push_back(*it);
it = cached_messages_.erase(it);
}
return !cached_messages_.empty();
}
bool ProcessQueueImpl::committedOffset(int64_t& offset) {
absl::MutexLock lk(&offsets_mtx_);
if (offsets_.empty()) {
return false;
}
if (offsets_.begin()->released_) {
offset = offsets_.begin()->offset_ + 1;
} else {
offset = offsets_.begin()->offset_;
}
return true;
}
void ProcessQueueImpl::release(uint64_t body_size, int64_t offset) {
auto consumer = consumer_.lock();
if (!consumer) {
return;
}
cached_message_quantity_.fetch_sub(1);
cached_message_memory_.fetch_sub(body_size);
if (MessageModel::BROADCASTING == consumer->messageModel()) {
absl::MutexLock lk(&offsets_mtx_);
if (offsets_.size() > 1) {
offsets_.erase(OffsetRecord(offset));
} else {
assert(offsets_.begin()->offset_ == offset);
offsets_.erase(OffsetRecord(offset));
offsets_.emplace(OffsetRecord(offset, true));
}
}
}
void ProcessQueueImpl::wrapFilterExpression(rmq::FilterExpression* filter_expression) {
assert(filter_expression);
auto consumer = consumer_.lock();
if (!consumer) {
return;
}
auto&& optional = consumer->getFilterExpression(message_queue_.getTopic());
if (optional.has_value()) {
auto expression = optional.value();
switch (expression.type_) {
case TAG:
filter_expression->set_type(rmq::FilterType::TAG);
filter_expression->set_expression(expression.content_);
break;
case SQL92:
filter_expression->set_type(rmq::FilterType::SQL);
filter_expression->set_expression(expression.content_);
break;
}
} else {
filter_expression->set_type(rmq::FilterType::TAG);
filter_expression->set_expression("*");
}
}
void ProcessQueueImpl::wrapPopMessageRequest(absl::flat_hash_map<std::string, std::string>& metadata,
rmq::ReceiveMessageRequest& request) {
std::shared_ptr<PushConsumer> consumer = consumer_.lock();
assert(consumer);
request.set_client_id(consumer->clientId());
request.mutable_group()->set_name(consumer->getGroupName());
request.mutable_group()->set_resource_namespace(consumer->resourceNamespace());
request.mutable_partition()->set_id(message_queue_.getQueueId());
request.mutable_partition()->mutable_broker()->set_name(message_queue_.getBrokerName());
request.mutable_partition()->mutable_topic()->set_name(message_queue_.getTopic());
request.mutable_partition()->mutable_topic()->set_resource_namespace(consumer->resourceNamespace());
wrapFilterExpression(request.mutable_filter_expression());
switch (consumer->getConsumeMessageService()->messageListenerType()) {
case MessageListenerType::STANDARD: {
request.set_fifo_flag(false);
break;
}
case MessageListenerType::FIFO: {
request.set_fifo_flag(true);
break;
}
}
// Batch size
request.set_batch_size(consumer->receiveBatchSize());
// Consume policy
request.set_consume_policy(rmq::ConsumePolicy::RESUME);
// Set invisible time
request.mutable_invisible_duration()->set_seconds(
std::chrono::duration_cast<std::chrono::seconds>(invisible_time_).count());
auto fraction = invisible_time_ - std::chrono::duration_cast<std::chrono::seconds>(invisible_time_);
int32_t nano_seconds = static_cast<int32_t>(std::chrono::duration_cast<std::chrono::nanoseconds>(fraction).count());
request.mutable_invisible_duration()->set_nanos(nano_seconds);
}
void ProcessQueueImpl::wrapPullMessageRequest(rmq::PullMessageRequest& request) {
std::shared_ptr<PushConsumer> consumer = consumer_.lock();
assert(consumer);
request.set_client_id(consumer->clientId());
request.mutable_group()->set_name(consumer->getGroupName());
request.mutable_group()->set_resource_namespace(consumer->resourceNamespace());
request.mutable_partition()->set_id(message_queue_.getQueueId());
request.mutable_partition()->mutable_broker()->set_name(message_queue_.getBrokerName());
request.mutable_partition()->mutable_topic()->set_name(message_queue_.getTopic());
request.mutable_partition()->mutable_topic()->set_resource_namespace(consumer->resourceNamespace());
request.set_offset(next_offset_);
request.set_batch_size(consumer->receiveBatchSize());
wrapFilterExpression(request.mutable_filter_expression());
}
std::weak_ptr<PushConsumer> ProcessQueueImpl::getConsumer() {
return consumer_;
}
std::shared_ptr<ClientManager> ProcessQueueImpl::getClientManager() {
return client_manager_;
}
MQMessageQueue ProcessQueueImpl::getMQMessageQueue() {
return message_queue_;
}
const FilterExpression& ProcessQueueImpl::getFilterExpression() const {
return filter_expression_;
}
ROCKETMQ_NAMESPACE_END