| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "PushConsumerImpl.h" |
| |
| #include <atomic> |
| #include <cassert> |
| #include <chrono> |
| #include <cstdlib> |
| #include <system_error> |
| |
| #include "AsyncReceiveMessageCallback.h" |
| #include "ConsumeMessageServiceImpl.h" |
| #include "MixAll.h" |
| #include "ProcessQueueImpl.h" |
| #include "Protocol.h" |
| #include "RpcClient.h" |
| #include "Signature.h" |
| #include "google/protobuf/util/time_util.h" |
| #include "rocketmq/MQClientException.h" |
| #include "rocketmq/MessageListener.h" |
| |
| ROCKETMQ_NAMESPACE_BEGIN |
| |
| PushConsumerImpl::PushConsumerImpl(absl::string_view group_name) : ClientImpl(group_name) { |
| } |
| |
| PushConsumerImpl::~PushConsumerImpl() { |
| SPDLOG_DEBUG("DefaultMQPushConsumerImpl is destructed"); |
| shutdown(); |
| } |
| |
| void PushConsumerImpl::topicsOfInterest(std::vector<std::string> topics) { |
| absl::MutexLock lk(&topic_filter_expression_table_mtx_); |
| for (const auto& entry : topic_filter_expression_table_) { |
| topics.push_back(entry.first); |
| } |
| } |
| |
| void PushConsumerImpl::start() { |
| ClientImpl::start(); |
| |
| State expecting = State::STARTING; |
| if (!state_.compare_exchange_strong(expecting, State::STARTED)) { |
| SPDLOG_ERROR("Unexpected consumer state. Expecting: {}, Actual: {}", State::STARTING, |
| state_.load(std::memory_order_relaxed)); |
| return; |
| } |
| |
| if (!message_listener_) { |
| SPDLOG_ERROR("Required message listener is missing"); |
| abort(); |
| return; |
| } |
| |
| client_manager_->addClientObserver(shared_from_this()); |
| |
| fetchRoutes(); |
| |
| SPDLOG_INFO("start concurrently consume service: {}", client_config_.subscriber.group.name()); |
| consume_message_service_ = |
| std::make_shared<ConsumeMessageServiceImpl>(shared_from_this(), consume_thread_pool_size_, message_listener_); |
| consume_message_service_->start(); |
| |
| // Heartbeat depends on initialization of consume-message-service |
| heartbeat(); |
| |
| std::weak_ptr<PushConsumerImpl> consumer_weak_ptr(shared_from_this()); |
| auto scan_assignment_functor = [consumer_weak_ptr]() { |
| std::shared_ptr<PushConsumerImpl> consumer = consumer_weak_ptr.lock(); |
| if (consumer) { |
| consumer->scanAssignments(); |
| } |
| }; |
| |
| scan_assignment_handle_ = client_manager_->getScheduler()->schedule( |
| scan_assignment_functor, SCAN_ASSIGNMENT_TASK_NAME, std::chrono::milliseconds(100), std::chrono::seconds(5)); |
| SPDLOG_INFO("PushConsumer started, groupName={}", client_config_.subscriber.group.name()); |
| } |
| |
| const char* PushConsumerImpl::SCAN_ASSIGNMENT_TASK_NAME = "scan-assignment-task"; |
| |
| void PushConsumerImpl::shutdown() { |
| State expecting = State::STARTED; |
| if (state_.compare_exchange_strong(expecting, State::STOPPING)) { |
| if (scan_assignment_handle_) { |
| client_manager_->getScheduler()->cancel(scan_assignment_handle_); |
| SPDLOG_DEBUG("Scan assignment periodic task cancelled"); |
| } |
| |
| { |
| absl::MutexLock lock(&process_queue_table_mtx_); |
| process_queue_table_.clear(); |
| } |
| |
| if (consume_message_service_) { |
| consume_message_service_->shutdown(); |
| } |
| |
| // Shutdown services started by parent |
| ClientImpl::shutdown(); |
| |
| SPDLOG_INFO("PushConsumerImpl stopped"); |
| } else { |
| SPDLOG_ERROR("Shutdown with unexpected state. Expecting: {}, Actual: {}", State::STARTED, |
| state_.load(std::memory_order_relaxed)); |
| } |
| } |
| |
| void PushConsumerImpl::subscribe(const std::string& topic, const std::string& expression, |
| ExpressionType expression_type) { |
| absl::MutexLock lock(&topic_filter_expression_table_mtx_); |
| FilterExpression filter_expression{expression, expression_type}; |
| topic_filter_expression_table_.emplace(topic, filter_expression); |
| } |
| |
| void PushConsumerImpl::unsubscribe(const std::string& topic) { |
| absl::MutexLock lock(&topic_filter_expression_table_mtx_); |
| topic_filter_expression_table_.erase(topic); |
| } |
| |
| absl::optional<FilterExpression> PushConsumerImpl::getFilterExpression(const std::string& topic) const { |
| { |
| absl::MutexLock lock(&topic_filter_expression_table_mtx_); |
| if (topic_filter_expression_table_.contains(topic)) { |
| return absl::make_optional(topic_filter_expression_table_.at(topic)); |
| } else { |
| return {}; |
| } |
| } |
| } |
| |
| void PushConsumerImpl::scanAssignments() { |
| SPDLOG_DEBUG("Start of assignment scanning"); |
| if (!active()) { |
| SPDLOG_INFO("Client has stopped. Abort scanning immediately."); |
| return; |
| } |
| |
| { |
| absl::MutexLock lk(&topic_filter_expression_table_mtx_); |
| for (auto& entry : topic_filter_expression_table_) { |
| std::string topic = entry.first; |
| const auto& filter_expression = entry.second; |
| SPDLOG_DEBUG("Scan assignments for {}", topic); |
| auto callback = [this, topic, filter_expression](const std::error_code& ec, |
| const TopicAssignmentPtr& assignments) { |
| if (ec) { |
| SPDLOG_WARN("Failed to acquire assignments for topic={} from load balancer. Cause: {}", topic, ec.message()); |
| } else if (assignments && !assignments->assignmentList().empty()) { |
| syncProcessQueue(topic, assignments, filter_expression); |
| } |
| }; |
| queryAssignment(topic, callback); |
| } // end of for-loop |
| } |
| SPDLOG_DEBUG("End of assignment scanning."); |
| } |
| |
| bool PushConsumerImpl::selectBroker(const TopicRouteDataPtr& topic_route_data, std::string& broker_host) { |
| if (topic_route_data && !topic_route_data->messageQueues().empty()) { |
| uint32_t index = TopicAssignment::getAndIncreaseQueryWhichBroker(); |
| for (uint32_t i = index; i < index + topic_route_data->messageQueues().size(); i++) { |
| auto message_queue = topic_route_data->messageQueues().at(i % topic_route_data->messageQueues().size()); |
| if (MixAll::MASTER_BROKER_ID != message_queue.broker().id() || !readable(message_queue.permission())) { |
| continue; |
| } |
| broker_host = urlOf(message_queue); |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| void PushConsumerImpl::wrapQueryAssignmentRequest(const std::string& topic, |
| const std::string& consumer_group, |
| const std::string& strategy_name, |
| QueryAssignmentRequest& request) { |
| request.mutable_endpoints()->CopyFrom(accessPoint()); |
| request.mutable_topic()->set_name(topic); |
| request.mutable_topic()->set_resource_namespace(resourceNamespace()); |
| request.mutable_group()->set_name(consumer_group); |
| request.mutable_group()->set_resource_namespace(resourceNamespace()); |
| } |
| |
| void PushConsumerImpl::queryAssignment( |
| const std::string& topic, const std::function<void(const std::error_code&, const TopicAssignmentPtr&)>& cb) { |
| auto callback = [this, topic, cb](const std::error_code& ec, const TopicRouteDataPtr& topic_route) { |
| TopicAssignmentPtr topic_assignment; |
| std::string broker_host; |
| if (!selectBroker(topic_route, broker_host)) { |
| SPDLOG_WARN("Failed to select a broker to query assignment for group={}, topic={}", |
| client_config_.subscriber.group.name(), topic); |
| } |
| |
| QueryAssignmentRequest request; |
| wrapQueryAssignmentRequest(topic, groupName(), MixAll::DEFAULT_LOAD_BALANCER_STRATEGY_NAME_, request); |
| SPDLOG_DEBUG("QueryAssignmentRequest: {}", request.DebugString()); |
| |
| absl::flat_hash_map<std::string, std::string> metadata; |
| Signature::sign(client_config_, metadata); |
| auto assignment_callback = [this, cb, topic, broker_host](const std::error_code& ec, |
| const QueryAssignmentResponse& response) { |
| if (ec) { |
| SPDLOG_WARN("Failed to acquire queue assignment of topic={} from brokerAddress={}", topic, broker_host); |
| cb(ec, nullptr); |
| } else { |
| SPDLOG_DEBUG("Query topic assignment OK. Topic={}, group={}, assignment-size={}", topic, groupName(), |
| response.assignments().size()); |
| SPDLOG_TRACE("Query assignment response for {} is: {}", topic, response.DebugString()); |
| cb(ec, std::make_shared<TopicAssignment>(response)); |
| } |
| }; |
| |
| client_manager_->queryAssignment(broker_host, metadata, request, |
| absl::ToChronoMilliseconds(client_config_.request_timeout), assignment_callback); |
| }; |
| getRouteFor(topic, callback); |
| } |
| |
| /** |
| * |
| * @param topic Topic to process |
| * @param assignment Latest assignment from load balancer |
| * @param filter_expression Filter expression |
| */ |
| void PushConsumerImpl::syncProcessQueue(const std::string& topic, |
| const std::shared_ptr<TopicAssignment>& topic_assignment, |
| const FilterExpression& filter_expression) { |
| const std::vector<rmq::Assignment>& assignment_list = topic_assignment->assignmentList(); |
| std::vector<rmq::MessageQueue> message_queue_list; |
| message_queue_list.reserve(assignment_list.size()); |
| for (const auto& assignment : assignment_list) { |
| message_queue_list.push_back(assignment.message_queue()); |
| } |
| |
| std::vector<rmq::MessageQueue> current; |
| { |
| absl::MutexLock lock(&process_queue_table_mtx_); |
| for (auto it = process_queue_table_.begin(); it != process_queue_table_.end();) { |
| if (topic != it->second->messageQueue().topic().name()) { |
| it++; |
| continue; |
| } |
| |
| if (std::none_of( |
| message_queue_list.cbegin(), message_queue_list.cend(), |
| [&](const rmq::MessageQueue& message_queue) { return it->second->messageQueue() == message_queue; })) { |
| SPDLOG_INFO("Stop receiving messages from {} as it is not assigned to current client according to latest " |
| "assignment result from load balancer", |
| simpleNameOf(it->second->messageQueue())); |
| process_queue_table_.erase(it++); |
| } else { |
| if (!it->second || it->second->expired()) { |
| SPDLOG_WARN("ProcessQueue={} is expired. Remove it for now.", it->first); |
| process_queue_table_.erase(it++); |
| continue; |
| } |
| current.push_back(it->second->messageQueue()); |
| it++; |
| } |
| } |
| } |
| |
| for (const auto& message_queue : message_queue_list) { |
| if (std::none_of(current.cbegin(), current.cend(), |
| [&](const rmq::MessageQueue& item) { return item == message_queue; })) { |
| SPDLOG_INFO("Start to receive message from {} according to latest assignment info from load balancer", |
| simpleNameOf(message_queue)); |
| if (!receiveMessage(message_queue, filter_expression)) { |
| if (!active()) { |
| SPDLOG_WARN("Failed to initiate receive message request-response-cycle for {}", simpleNameOf(message_queue)); |
| // TODO: remove it from current assignment such that a second attempt will be made again in the next round. |
| } |
| } |
| } |
| } |
| } |
| |
| std::shared_ptr<ProcessQueue> PushConsumerImpl::getOrCreateProcessQueue(const rmq::MessageQueue& message_queue, |
| const FilterExpression& filter_expression) { |
| std::shared_ptr<ProcessQueue> process_queue; |
| { |
| absl::MutexLock lock(&process_queue_table_mtx_); |
| if (!active()) { |
| SPDLOG_INFO("PushConsumer has stopped. Drop creation of ProcessQueue"); |
| return process_queue; |
| } |
| |
| if (process_queue_table_.contains(simpleNameOf(message_queue))) { |
| process_queue = process_queue_table_.at(simpleNameOf(message_queue)); |
| } else { |
| SPDLOG_INFO("Create ProcessQueue for message queue[{}]", simpleNameOf(message_queue)); |
| // create ProcessQueue |
| process_queue = |
| std::make_shared<ProcessQueueImpl>(message_queue, filter_expression, shared_from_this(), client_manager_); |
| std::shared_ptr<AsyncReceiveMessageCallback> receive_callback = |
| std::make_shared<AsyncReceiveMessageCallback>(process_queue); |
| process_queue->callback(receive_callback); |
| process_queue_table_.emplace(std::make_pair(simpleNameOf(message_queue), process_queue)); |
| } |
| } |
| return process_queue; |
| } |
| |
| bool PushConsumerImpl::receiveMessage(const rmq::MessageQueue& message_queue, |
| const FilterExpression& filter_expression) { |
| if (!active()) { |
| SPDLOG_INFO("PushConsumer has stopped. Drop further receive message request"); |
| return false; |
| } |
| |
| auto process_queue_ptr = getOrCreateProcessQueue(message_queue, filter_expression); |
| if (!process_queue_ptr) { |
| SPDLOG_INFO("Consumer has stopped. Stop creating processQueue"); |
| return false; |
| } |
| const std::string& broker_host = urlOf(message_queue); |
| if (broker_host.empty()) { |
| SPDLOG_ERROR("Failed to resolve address for brokerName={}", message_queue.broker().name()); |
| return false; |
| } |
| process_queue_ptr->receiveMessage(); |
| return true; |
| } |
| |
| std::shared_ptr<ConsumeMessageService> PushConsumerImpl::getConsumeMessageService() { |
| return consume_message_service_; |
| } |
| |
| void PushConsumerImpl::ack(const Message& msg, const std::function<void(const std::error_code&)>& callback) { |
| const std::string& target_host = msg.extension().target_endpoint; |
| assert(!target_host.empty()); |
| SPDLOG_DEBUG("Prepare to send ack to broker. BrokerAddress={}, topic={}, queueId={}, msgId={}", target_host, |
| msg.topic(), msg.extension().queue_id, msg.id()); |
| AckMessageRequest request; |
| wrapAckMessageRequest(msg, request); |
| absl::flat_hash_map<std::string, std::string> metadata; |
| Signature::sign(client_config_, metadata); |
| client_manager_->ack(target_host, metadata, request, absl::ToChronoMilliseconds(client_config_.request_timeout), |
| callback); |
| } |
| |
| std::chrono::milliseconds PushConsumerImpl::invisibleDuration(std::size_t attempt) { |
| return std::chrono::milliseconds(client_config_.backoff_policy.backoff(attempt)); |
| } |
| |
| void PushConsumerImpl::nack(const Message& message, const std::function<void(const std::error_code&)>& callback) { |
| const auto& target_host = message.extension().target_endpoint; |
| SPDLOG_DEBUG("Prepare to nack message[topic={}, message-id={}]", message.topic(), message.id()); |
| auto duration = invisibleDuration(message.extension().delivery_attempt + 1); |
| Metadata metadata; |
| Signature::sign(client_config_, metadata); |
| rmq::ChangeInvisibleDurationRequest request; |
| request.mutable_group()->CopyFrom(client_config_.subscriber.group); |
| request.mutable_topic()->set_resource_namespace(resourceNamespace()); |
| request.mutable_topic()->set_name(message.topic()); |
| request.set_receipt_handle(message.extension().receipt_handle); |
| request.set_message_id(message.id()); |
| request.mutable_invisible_duration()->CopyFrom( |
| google::protobuf::util::TimeUtil::MillisecondsToDuration(duration.count())); |
| client_manager_->changeInvisibleDuration(target_host, metadata, request, |
| absl::ToChronoMilliseconds(client_config_.request_timeout), callback); |
| } |
| |
| void PushConsumerImpl::forwardToDeadLetterQueue(const Message& message, |
| const std::function<void(const std::error_code&)>& cb) { |
| std::string target_host = message.extension().target_endpoint; |
| |
| absl::flat_hash_map<std::string, std::string> metadata; |
| Signature::sign(client_config_, metadata); |
| |
| ForwardMessageToDeadLetterQueueRequest request; |
| request.mutable_group()->set_resource_namespace(resourceNamespace()); |
| request.mutable_group()->set_name(groupName()); |
| |
| request.mutable_topic()->set_resource_namespace(resourceNamespace()); |
| request.mutable_topic()->set_name(message.topic()); |
| |
| request.set_message_id(message.id()); |
| |
| request.set_delivery_attempt(message.extension().delivery_attempt); |
| request.set_max_delivery_attempts(max_delivery_attempts_); |
| |
| client_manager_->forwardMessageToDeadLetterQueue(target_host, metadata, request, |
| absl::ToChronoMilliseconds(client_config_.request_timeout), cb); |
| } |
| |
| void PushConsumerImpl::wrapAckMessageRequest(const Message& msg, AckMessageRequest& request) { |
| request.mutable_group()->set_resource_namespace(resourceNamespace()); |
| request.mutable_group()->set_name(groupName()); |
| request.mutable_topic()->set_resource_namespace(resourceNamespace()); |
| request.mutable_topic()->set_name(msg.topic()); |
| auto entry = new rmq::AckMessageEntry(); |
| entry->set_message_id(msg.id()); |
| entry->set_receipt_handle(msg.extension().receipt_handle); |
| request.mutable_entries()->AddAllocated(entry); |
| } |
| |
| uint32_t PushConsumerImpl::consumeThreadPoolSize() const { |
| return consume_thread_pool_size_; |
| } |
| |
| void PushConsumerImpl::consumeThreadPoolSize(int thread_pool_size) { |
| if (thread_pool_size >= 1) { |
| consume_thread_pool_size_ = thread_pool_size; |
| } |
| } |
| |
| void PushConsumerImpl::registerMessageListener(MessageListener message_listener) { |
| message_listener_ = message_listener; |
| } |
| |
| std::size_t PushConsumerImpl::getProcessQueueTableSize() { |
| absl::MutexLock lock(&process_queue_table_mtx_); |
| return process_queue_table_.size(); |
| } |
| |
| void PushConsumerImpl::setThrottle(const std::string& topic, uint32_t threshold) { |
| absl::MutexLock lock(&throttle_table_mtx_); |
| throttle_table_.emplace(topic, threshold); |
| } |
| |
| void PushConsumerImpl::fetchRoutes() { |
| std::vector<std::string> topics; |
| { |
| absl::MutexLock lk(&topic_filter_expression_table_mtx_); |
| for (const auto& item : topic_filter_expression_table_) { |
| topics.emplace_back(item.first); |
| } |
| } |
| |
| if (topics.empty()) { |
| return; |
| } |
| |
| std::vector<std::string>::size_type countdown = topics.size(); |
| auto mtx = std::make_shared<absl::Mutex>(); |
| auto cv = std::make_shared<absl::CondVar>(); |
| int acquired = 0; |
| auto callback = [&, mtx, cv](const std::error_code& ec, const TopicRouteDataPtr& route) { |
| { |
| absl::MutexLock lk(mtx.get()); |
| countdown--; |
| if (!ec) { |
| acquired++; |
| } |
| } |
| |
| if (countdown <= 0) { |
| cv->SignalAll(); |
| } |
| }; |
| |
| for (const auto& topic : topics) { |
| getRouteFor(topic, callback); |
| } |
| |
| while (countdown) { |
| absl::MutexLock lk(mtx.get()); |
| cv->Wait(mtx.get()); |
| } |
| SPDLOG_INFO("Fetched route for {} out of {} topics", acquired, topics.size()); |
| } |
| |
| void PushConsumerImpl::buildClientSettings(rmq::Settings& settings) { |
| settings.set_client_type(rmq::ClientType::PUSH_CONSUMER); |
| auto subscription = settings.mutable_subscription(); |
| subscription->mutable_group()->CopyFrom(client_config_.subscriber.group); |
| auto polling_timeout = google::protobuf::util::TimeUtil::MillisecondsToDuration( |
| absl::ToInt64Milliseconds(client_config_.subscriber.polling_timeout)); |
| subscription->mutable_long_polling_timeout()->set_seconds(polling_timeout.seconds()); |
| subscription->mutable_long_polling_timeout()->set_nanos(polling_timeout.nanos()); |
| subscription->set_receive_batch_size(client_config_.subscriber.receive_batch_size); |
| |
| { |
| absl::MutexLock lk(&topic_filter_expression_table_mtx_); |
| for (const auto& entry : topic_filter_expression_table_) { |
| auto subscription_entry = new rmq::SubscriptionEntry; |
| subscription_entry->mutable_topic()->set_resource_namespace(resourceNamespace()); |
| subscription_entry->mutable_topic()->set_name(entry.first); |
| |
| subscription_entry->mutable_expression()->set_expression(entry.second.content_); |
| switch (entry.second.type_) { |
| case ExpressionType::TAG: { |
| subscription_entry->mutable_expression()->set_type(rmq::FilterType::TAG); |
| break; |
| } |
| case ExpressionType::SQL92: { |
| subscription_entry->mutable_expression()->set_type(rmq::FilterType::SQL); |
| break; |
| } |
| } |
| subscription->mutable_subscriptions()->AddAllocated(subscription_entry); |
| } |
| } |
| } |
| |
| void PushConsumerImpl::prepareHeartbeatData(HeartbeatRequest& request) { |
| request.set_client_type(rmq::ClientType::PUSH_CONSUMER); |
| request.mutable_group()->set_resource_namespace(resourceNamespace()); |
| request.mutable_group()->set_name(groupName()); |
| } |
| |
| void PushConsumerImpl::notifyClientTermination() { |
| NotifyClientTerminationRequest request; |
| request.mutable_group()->set_resource_namespace(resourceNamespace()); |
| request.mutable_group()->set_name(groupName()); |
| ClientImpl::notifyClientTermination(request); |
| } |
| |
| void PushConsumerImpl::onVerifyMessage(MessageConstSharedPtr message, std::function<void(TelemetryCommand)> cb) { |
| auto listener = messageListener(); |
| TelemetryCommand cmd; |
| auto verify_result = cmd.mutable_verify_message_result(); |
| verify_result->set_nonce(message->extension().nonce); |
| if (message) { |
| if (!client_config_.subscriber.fifo) { |
| try { |
| auto result = listener(*message); |
| switch (result) { |
| case ConsumeResult::SUCCESS: { |
| cmd.mutable_status()->set_code(rmq::Code::OK); |
| cmd.mutable_status()->set_message("OK"); |
| } |
| case ConsumeResult::FAILURE: { |
| cmd.mutable_status()->set_code(rmq::Code::FAILED_TO_CONSUME_MESSAGE); |
| cmd.mutable_status()->set_message("Consume message failed"); |
| } |
| } |
| } catch (const std::exception& e) { |
| cmd.mutable_status()->set_code(rmq::Code::FAILED_TO_CONSUME_MESSAGE); |
| cmd.mutable_status()->set_message(e.what()); |
| } catch (...) { |
| cmd.mutable_status()->set_code(rmq::Code::FAILED_TO_CONSUME_MESSAGE); |
| cmd.mutable_status()->set_message("Unexpected exception raised"); |
| } |
| } else { |
| cmd.mutable_status()->set_code(rmq::Code::VERIFY_FIFO_MESSAGE_UNSUPPORTED); |
| cmd.mutable_status()->set_message("Unsupported Operation For FIFO Message"); |
| } |
| } else { |
| cmd.mutable_status()->set_code(rmq::Code::MESSAGE_CORRUPTED); |
| cmd.mutable_status()->set_message("Checksum Mismatch"); |
| } |
| } |
| |
| ROCKETMQ_NAMESPACE_END |