| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "ProcessQueueImpl.h" |
| |
| #include <atomic> |
| #include <chrono> |
| #include <memory> |
| #include <system_error> |
| #include <utility> |
| |
| #include "ClientManagerImpl.h" |
| #include "MetadataConstants.h" |
| #include "Protocol.h" |
| #include "PushConsumer.h" |
| #include "Signature.h" |
| #include "rocketmq/MessageListener.h" |
| #include "rocketmq/MessageModel.h" |
| |
| using namespace std::chrono; |
| |
| ROCKETMQ_NAMESPACE_BEGIN |
| |
| ProcessQueueImpl::ProcessQueueImpl(MQMessageQueue message_queue, FilterExpression filter_expression, |
| std::weak_ptr<PushConsumer> consumer, std::shared_ptr<ClientManager> client_instance) |
| : message_queue_(std::move(message_queue)), filter_expression_(std::move(filter_expression)), |
| invisible_time_(MixAll::millisecondsOf(MixAll::DEFAULT_INVISIBLE_TIME_)), |
| simple_name_(message_queue_.simpleName()), consumer_(std::move(consumer)), |
| client_manager_(std::move(client_instance)), cached_message_quantity_(0), cached_message_memory_(0) { |
| SPDLOG_DEBUG("Created ProcessQueue={}", simpleName()); |
| } |
| |
| ProcessQueueImpl::~ProcessQueueImpl() { |
| SPDLOG_INFO("ProcessQueue={} should have been re-balanced away, thus, is destructed", simpleName()); |
| } |
| |
| void ProcessQueueImpl::callback(std::shared_ptr<ReceiveMessageCallback> callback) { |
| receive_callback_ = std::move(callback); |
| } |
| |
| bool ProcessQueueImpl::expired() const { |
| auto duration = std::chrono::steady_clock::now() - idle_since_; |
| if (duration > MixAll::PROCESS_QUEUE_EXPIRATION_THRESHOLD_) { |
| SPDLOG_WARN("ProcessQueue={} is expired. It remains idle for {}ms", simpleName(), MixAll::millisecondsOf(duration)); |
| return true; |
| } |
| return false; |
| } |
| |
| bool ProcessQueueImpl::shouldThrottle() const { |
| auto consumer = consumer_.lock(); |
| if (!consumer) { |
| return false; |
| } |
| |
| std::size_t quantity = cached_message_quantity_.load(std::memory_order_relaxed); |
| uint32_t quantity_threshold = consumer->maxCachedMessageQuantity(); |
| uint64_t memory_threshold = consumer->maxCachedMessageMemory(); |
| bool need_throttle = quantity >= quantity_threshold; |
| if (need_throttle) { |
| SPDLOG_INFO("{}: Number of locally cached messages is {}, which exceeds threshold={}", simple_name_, quantity, |
| quantity_threshold); |
| return true; |
| } |
| |
| if (memory_threshold) { |
| uint64_t bytes = cached_message_memory_.load(std::memory_order_relaxed); |
| need_throttle = bytes >= memory_threshold; |
| if (need_throttle) { |
| SPDLOG_INFO("{}: Locally cached messages take {} bytes, which exceeds threshold={}", simple_name_, bytes, |
| memory_threshold); |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| void ProcessQueueImpl::receiveMessage() { |
| auto consumer = consumer_.lock(); |
| if (!consumer) { |
| return; |
| } |
| |
| switch (consumer->messageModel()) { |
| case MessageModel::CLUSTERING: { |
| popMessage(); |
| break; |
| } |
| case MessageModel::BROADCASTING: { |
| pullMessage(); |
| break; |
| } |
| } |
| } |
| |
| void ProcessQueueImpl::popMessage() { |
| rmq::ReceiveMessageRequest request; |
| absl::flat_hash_map<std::string, std::string> metadata; |
| auto consumer_client = consumer_.lock(); |
| if (!consumer_client) { |
| return; |
| } |
| Signature::sign(consumer_client.get(), metadata); |
| wrapPopMessageRequest(metadata, request); |
| syncIdleState(); |
| SPDLOG_DEBUG("Try to pop message from {}", message_queue_.simpleName()); |
| client_manager_->receiveMessage(message_queue_.serviceAddress(), metadata, request, |
| absl::ToChronoMilliseconds(consumer_client->getLongPollingTimeout()), |
| receive_callback_); |
| } |
| |
| void ProcessQueueImpl::pullMessage() { |
| rmq::PullMessageRequest request; |
| absl::flat_hash_map<std::string, std::string> metadata; |
| auto consumer = consumer_.lock(); |
| if (!consumer) { |
| SPDLOG_INFO("Owner consumer has destructed"); |
| return; |
| } |
| |
| Signature::sign(consumer.get(), metadata); |
| wrapPullMessageRequest(request); |
| syncIdleState(); |
| SPDLOG_DEBUG("Try to pull message from {}", message_queue_.simpleName()); |
| |
| auto timeout = consumer->getLongPollingTimeout(); |
| |
| auto callback = [this](const std::error_code& ec, const ReceiveMessageResult& result) { |
| receive_callback_->onCompletion(ec, result); |
| }; |
| |
| client_manager_->pullMessage(message_queue_.serviceAddress(), metadata, request, absl::ToChronoMilliseconds(timeout), |
| callback); |
| } |
| |
| bool ProcessQueueImpl::hasPendingMessages() const { |
| absl::MutexLock lk(&messages_mtx_); |
| return !cached_messages_.empty(); |
| } |
| |
| void ProcessQueueImpl::cacheMessages(const std::vector<MQMessageExt>& messages) { |
| auto consumer = consumer_.lock(); |
| if (!consumer) { |
| return; |
| } |
| |
| { |
| absl::MutexLock messages_lock_guard(&messages_mtx_); |
| absl::MutexLock offsets_lock_guard(&offsets_mtx_); |
| for (const auto& message : messages) { |
| const std::string& msg_id = message.getMsgId(); |
| if (!filter_expression_.accept(message)) { |
| const std::string& topic = message.getTopic(); |
| auto callback = [topic, msg_id](const std::error_code& ec) { |
| if (ec) { |
| SPDLOG_WARN( |
| "Failed to ack message[Topic={}, MsgId={}] directly as it fails to pass filter expression. Cause: {}", |
| topic, msg_id, ec.message()); |
| } else { |
| SPDLOG_DEBUG("Ack message[Topic={}, MsgId={}] directly as it fails to pass filter expression", topic, |
| msg_id); |
| } |
| }; |
| consumer->ack(message, callback); |
| continue; |
| } |
| cached_messages_.emplace_back(message); |
| cached_message_quantity_.fetch_add(1, std::memory_order_relaxed); |
| cached_message_memory_.fetch_add(message.getBody().size(), std::memory_order_relaxed); |
| if (MessageModel::BROADCASTING == consumer->messageModel()) { |
| if (offsets_.size() == 1 && offsets_.begin()->released_) { |
| int64_t previously_released = offsets_.begin()->offset_; |
| offsets_.erase(OffsetRecord(previously_released)); |
| } |
| offsets_.emplace(message.getQueueOffset()); |
| } |
| } |
| } |
| } |
| |
| bool ProcessQueueImpl::take(uint32_t batch_size, std::vector<MQMessageExt>& messages) { |
| absl::MutexLock lock(&messages_mtx_); |
| if (cached_messages_.empty()) { |
| return false; |
| } |
| |
| for (auto it = cached_messages_.begin(); it != cached_messages_.end();) { |
| if (0 == batch_size--) { |
| break; |
| } |
| |
| messages.push_back(*it); |
| it = cached_messages_.erase(it); |
| } |
| return !cached_messages_.empty(); |
| } |
| |
| bool ProcessQueueImpl::committedOffset(int64_t& offset) { |
| absl::MutexLock lk(&offsets_mtx_); |
| if (offsets_.empty()) { |
| return false; |
| } |
| if (offsets_.begin()->released_) { |
| offset = offsets_.begin()->offset_ + 1; |
| } else { |
| offset = offsets_.begin()->offset_; |
| } |
| return true; |
| } |
| |
| void ProcessQueueImpl::release(uint64_t body_size, int64_t offset) { |
| auto consumer = consumer_.lock(); |
| if (!consumer) { |
| return; |
| } |
| |
| cached_message_quantity_.fetch_sub(1); |
| cached_message_memory_.fetch_sub(body_size); |
| |
| if (MessageModel::BROADCASTING == consumer->messageModel()) { |
| absl::MutexLock lk(&offsets_mtx_); |
| if (offsets_.size() > 1) { |
| offsets_.erase(OffsetRecord(offset)); |
| } else { |
| assert(offsets_.begin()->offset_ == offset); |
| offsets_.erase(OffsetRecord(offset)); |
| offsets_.emplace(OffsetRecord(offset, true)); |
| } |
| } |
| } |
| |
| void ProcessQueueImpl::wrapFilterExpression(rmq::FilterExpression* filter_expression) { |
| assert(filter_expression); |
| auto consumer = consumer_.lock(); |
| if (!consumer) { |
| return; |
| } |
| auto&& optional = consumer->getFilterExpression(message_queue_.getTopic()); |
| if (optional.has_value()) { |
| auto expression = optional.value(); |
| switch (expression.type_) { |
| case TAG: |
| filter_expression->set_type(rmq::FilterType::TAG); |
| filter_expression->set_expression(expression.content_); |
| break; |
| case SQL92: |
| filter_expression->set_type(rmq::FilterType::SQL); |
| filter_expression->set_expression(expression.content_); |
| break; |
| } |
| } else { |
| filter_expression->set_type(rmq::FilterType::TAG); |
| filter_expression->set_expression("*"); |
| } |
| } |
| |
| void ProcessQueueImpl::wrapPopMessageRequest(absl::flat_hash_map<std::string, std::string>& metadata, |
| rmq::ReceiveMessageRequest& request) { |
| std::shared_ptr<PushConsumer> consumer = consumer_.lock(); |
| assert(consumer); |
| request.set_client_id(consumer->clientId()); |
| request.mutable_group()->set_name(consumer->getGroupName()); |
| request.mutable_group()->set_resource_namespace(consumer->resourceNamespace()); |
| request.mutable_partition()->set_id(message_queue_.getQueueId()); |
| request.mutable_partition()->mutable_broker()->set_name(message_queue_.getBrokerName()); |
| request.mutable_partition()->mutable_topic()->set_name(message_queue_.getTopic()); |
| request.mutable_partition()->mutable_topic()->set_resource_namespace(consumer->resourceNamespace()); |
| |
| wrapFilterExpression(request.mutable_filter_expression()); |
| |
| switch (consumer->getConsumeMessageService()->messageListenerType()) { |
| case MessageListenerType::STANDARD: { |
| request.set_fifo_flag(false); |
| break; |
| } |
| case MessageListenerType::FIFO: { |
| request.set_fifo_flag(true); |
| break; |
| } |
| } |
| |
| // Batch size |
| request.set_batch_size(consumer->receiveBatchSize()); |
| |
| // Consume policy |
| request.set_consume_policy(rmq::ConsumePolicy::RESUME); |
| |
| // Set invisible time |
| request.mutable_invisible_duration()->set_seconds( |
| std::chrono::duration_cast<std::chrono::seconds>(invisible_time_).count()); |
| auto fraction = invisible_time_ - std::chrono::duration_cast<std::chrono::seconds>(invisible_time_); |
| int32_t nano_seconds = static_cast<int32_t>(std::chrono::duration_cast<std::chrono::nanoseconds>(fraction).count()); |
| request.mutable_invisible_duration()->set_nanos(nano_seconds); |
| } |
| |
| void ProcessQueueImpl::wrapPullMessageRequest(rmq::PullMessageRequest& request) { |
| std::shared_ptr<PushConsumer> consumer = consumer_.lock(); |
| assert(consumer); |
| request.set_client_id(consumer->clientId()); |
| request.mutable_group()->set_name(consumer->getGroupName()); |
| request.mutable_group()->set_resource_namespace(consumer->resourceNamespace()); |
| request.mutable_partition()->set_id(message_queue_.getQueueId()); |
| request.mutable_partition()->mutable_broker()->set_name(message_queue_.getBrokerName()); |
| request.mutable_partition()->mutable_topic()->set_name(message_queue_.getTopic()); |
| request.mutable_partition()->mutable_topic()->set_resource_namespace(consumer->resourceNamespace()); |
| request.set_offset(next_offset_); |
| request.set_batch_size(consumer->receiveBatchSize()); |
| |
| wrapFilterExpression(request.mutable_filter_expression()); |
| } |
| |
| std::weak_ptr<PushConsumer> ProcessQueueImpl::getConsumer() { |
| return consumer_; |
| } |
| |
| std::shared_ptr<ClientManager> ProcessQueueImpl::getClientManager() { |
| return client_manager_; |
| } |
| |
| MQMessageQueue ProcessQueueImpl::getMQMessageQueue() { |
| return message_queue_; |
| } |
| |
| const FilterExpression& ProcessQueueImpl::getFilterExpression() const { |
| return filter_expression_; |
| } |
| |
| ROCKETMQ_NAMESPACE_END |