| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "AsyncReceiveMessageCallback.h" |
| |
| #include <system_error> |
| |
| #include "ClientManagerImpl.h" |
| #include "ConsumeMessageType.h" |
| #include "rocketmq/Logger.h" |
| #include "spdlog/spdlog.h" |
| #include "ProcessQueue.h" |
| #include "PushConsumerImpl.h" |
| #include "rocketmq/ErrorCode.h" |
| |
| ROCKETMQ_NAMESPACE_BEGIN |
| |
| AsyncReceiveMessageCallback::AsyncReceiveMessageCallback(std::weak_ptr<ProcessQueue> process_queue) |
| : process_queue_(std::move(process_queue)) { |
| |
| receive_message_later_ = std::bind( |
| &AsyncReceiveMessageCallback::checkThrottleThenReceive, this, std::placeholders::_1); |
| } |
| |
| void AsyncReceiveMessageCallback::onCompletion( |
| const std::error_code& ec, const std::string& attempt_id, const ReceiveMessageResult& result) { |
| |
| std::shared_ptr<ProcessQueue> process_queue = process_queue_.lock(); |
| if (!process_queue) { |
| SPDLOG_INFO("Process queue has been destructed."); |
| return; |
| } |
| |
| auto consumer = process_queue->getConsumer().lock(); |
| if (!consumer) { |
| return; |
| } |
| |
| if (ec == ErrorCode::TooManyRequests) { |
| SPDLOG_WARN("Action of receiving message is throttled. Retry after 20ms. Queue={}", process_queue->simpleName()); |
| receiveMessageLater(std::chrono::milliseconds(20), attempt_id); |
| return; |
| } |
| |
| if (ec == ErrorCode::NoContent) { |
| checkThrottleThenReceive(""); |
| return; |
| } |
| |
| if (ec) { |
| SPDLOG_WARN("Receive message from {} failed. Cause: {}. Retry after 1 second.", process_queue->simpleName(), |
| ec.message()); |
| receiveMessageLater(std::chrono::seconds(1), attempt_id); |
| return; |
| } |
| |
| SPDLOG_DEBUG("Receive messages from broker[host={}] returns with status=FOUND, msgListSize={}, queue={}", |
| result.source_host, result.messages.size(), process_queue->simpleName()); |
| process_queue->accountCache(result.messages); |
| consumer->getConsumeMessageService()->dispatch(process_queue, result.messages); |
| checkThrottleThenReceive(""); |
| } |
| |
| const char* AsyncReceiveMessageCallback::RECEIVE_LATER_TASK_NAME = "receive-later-task"; |
| |
| void AsyncReceiveMessageCallback::checkThrottleThenReceive(const std::string& attempt_id) { |
| auto process_queue = process_queue_.lock(); |
| if (!process_queue) { |
| SPDLOG_WARN("Process queue should have been destructed"); |
| return; |
| } |
| |
| if (process_queue->shouldThrottle()) { |
| SPDLOG_INFO("Number of messages in {} exceeds throttle threshold. Receive messages later.", |
| process_queue->simpleName()); |
| process_queue->syncIdleState(); |
| receiveMessageLater(std::chrono::seconds(1), attempt_id); |
| } else { |
| // Receive message immediately |
| receiveMessageImmediately(attempt_id); |
| } |
| } |
| |
| void AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds delay, const std::string& attempt_id) { |
| auto process_queue = process_queue_.lock(); |
| if (!process_queue) { |
| return; |
| } |
| |
| auto client_instance = process_queue->getClientManager(); |
| std::weak_ptr<AsyncReceiveMessageCallback> receive_callback_weak_ptr(shared_from_this()); |
| |
| auto task = [receive_callback_weak_ptr, attempt_id]() { |
| auto async_receive_ptr = receive_callback_weak_ptr.lock(); |
| if (async_receive_ptr) { |
| async_receive_ptr->checkThrottleThenReceive(attempt_id); |
| } |
| }; |
| |
| client_instance->getScheduler()->schedule( |
| task, RECEIVE_LATER_TASK_NAME, delay, std::chrono::seconds(0)); |
| } |
| |
| void AsyncReceiveMessageCallback::receiveMessageImmediately(const std::string& attempt_id) { |
| auto process_queue_shared_ptr = process_queue_.lock(); |
| if (!process_queue_shared_ptr) { |
| SPDLOG_INFO("ProcessQueue has been released. Ignore further receive message request-response cycles"); |
| return; |
| } |
| |
| std::shared_ptr<PushConsumerImpl> impl = process_queue_shared_ptr->getConsumer().lock(); |
| if (!impl) { |
| SPDLOG_INFO("Owner of ProcessQueue[{}] has been released. Ignore further receive message request-response cycles", |
| process_queue_shared_ptr->simpleName()); |
| return; |
| } |
| |
| std::string attempt_id_copy = attempt_id; |
| impl->receiveMessage(process_queue_shared_ptr->messageQueue(), |
| process_queue_shared_ptr->getFilterExpression(), attempt_id_copy); |
| } |
| |
| ROCKETMQ_NAMESPACE_END |