blob: d4ac095cf482fbb87aaf6bc7330dd17fe15d6f6d [file] [log] [blame]
#include "AsyncReceiveMessageCallback.h"
#include <system_error>
#include "ClientManagerImpl.h"
#include "ConsumeMessageType.h"
#include "LoggerImpl.h"
#include "PushConsumer.h"
ROCKETMQ_NAMESPACE_BEGIN
AsyncReceiveMessageCallback::AsyncReceiveMessageCallback(ProcessQueueWeakPtr process_queue)
: process_queue_(std::move(process_queue)) {
receive_message_later_ = std::bind(&AsyncReceiveMessageCallback::checkThrottleThenReceive, this);
}
void AsyncReceiveMessageCallback::onCompletion(const std::error_code& ec, const ReceiveMessageResult& result) {
ProcessQueueSharedPtr process_queue = process_queue_.lock();
if (!process_queue) {
SPDLOG_INFO("Process queue has been destructed.");
return;
}
std::shared_ptr<PushConsumer> impl = process_queue->getConsumer().lock();
if (!impl->active()) {
SPDLOG_INFO("Consumer is not active any more. It should be quitting");
return;
}
auto consumer = process_queue->getConsumer().lock();
if (!consumer) {
return;
}
if (ec) {
SPDLOG_WARN("Receive message from {} failed. Cause: {}. Attempt later.", process_queue->simpleName(), ec.message());
receiveMessageLater();
return;
}
SPDLOG_DEBUG("Receive messages from broker[host={}] returns with status=FOUND, msgListSize={}, queue={}",
result.source_host, result.messages.size(), process_queue->simpleName());
process_queue->cacheMessages(result.messages);
impl->getConsumeMessageService()->signalDispatcher();
checkThrottleThenReceive();
}
const char* AsyncReceiveMessageCallback::RECEIVE_LATER_TASK_NAME = "receive-later-task";
void AsyncReceiveMessageCallback::checkThrottleThenReceive() {
auto process_queue = process_queue_.lock();
if (!process_queue) {
SPDLOG_WARN("Process queue should have been destructed");
return;
}
if (process_queue->shouldThrottle()) {
SPDLOG_INFO("Number of messages in {} exceeds throttle threshold. Receive messages later.",
process_queue->simpleName());
process_queue->syncIdleState();
receiveMessageLater();
} else {
// Receive message immediately
receiveMessageImmediately();
}
}
void AsyncReceiveMessageCallback::receiveMessageLater() {
auto process_queue = process_queue_.lock();
if (!process_queue) {
return;
}
auto client_instance = process_queue->getClientManager();
std::weak_ptr<AsyncReceiveMessageCallback> receive_callback_weak_ptr(shared_from_this());
auto task = [receive_callback_weak_ptr]() {
auto async_receive_ptr = receive_callback_weak_ptr.lock();
if (async_receive_ptr) {
async_receive_ptr->checkThrottleThenReceive();
}
};
client_instance->getScheduler().schedule(task, RECEIVE_LATER_TASK_NAME, std::chrono::seconds(1),
std::chrono::seconds(0));
}
void AsyncReceiveMessageCallback::receiveMessageImmediately() {
ProcessQueueSharedPtr process_queue_shared_ptr = process_queue_.lock();
if (!process_queue_shared_ptr) {
SPDLOG_INFO("ProcessQueue has been released. Ignore further receive message request-response cycles");
return;
}
std::shared_ptr<PushConsumer> impl = process_queue_shared_ptr->getConsumer().lock();
if (!impl) {
SPDLOG_INFO("Owner of ProcessQueue[{}] has been released. Ignore further receive message request-response cycles",
process_queue_shared_ptr->simpleName());
return;
}
impl->receiveMessage(process_queue_shared_ptr->getMQMessageQueue(), process_queue_shared_ptr->getFilterExpression());
}
ROCKETMQ_NAMESPACE_END