blob: 654b4e665a360b619670e51d918ea57680c93b98 [file] [log] [blame]
#include "AsyncReceiveMessageCallback.h"
#include "ClientManagerImpl.h"
#include "ConsumeMessageType.h"
#include "LoggerImpl.h"
#include "PushConsumer.h"
ROCKETMQ_NAMESPACE_BEGIN
AsyncReceiveMessageCallback::AsyncReceiveMessageCallback(ProcessQueueWeakPtr process_queue)
: process_queue_(std::move(process_queue)) {
receive_message_later_ = std::bind(&AsyncReceiveMessageCallback::checkThrottleThenReceive, this);
}
void AsyncReceiveMessageCallback::onSuccess(ReceiveMessageResult& result) {
ProcessQueueSharedPtr process_queue_shared_ptr = process_queue_.lock();
if (!process_queue_shared_ptr) {
SPDLOG_WARN("Process queue has been released. Drop PopResult: {}", result.toString());
return;
}
std::shared_ptr<PushConsumer> impl = process_queue_shared_ptr->getConsumer().lock();
if (!impl->active()) {
return;
}
auto receive_message_action = process_queue_shared_ptr->getConsumer().lock()->receiveMessageAction();
switch (result.status()) {
case ReceiveMessageStatus::OK:
SPDLOG_DEBUG("Receive messages from broker[host={}] returns with status=FOUND, msgListSize={}, queue={}",
result.sourceHost(), result.getMsgFoundList().size(), process_queue_shared_ptr->simpleName());
process_queue_shared_ptr->cacheMessages(result.getMsgFoundList());
impl->getConsumeMessageService()->signalDispatcher();
if (ReceiveMessageAction::PULL == receive_message_action) {
process_queue_shared_ptr->nextOffset(result.next_offset_);
}
checkThrottleThenReceive();
break;
case ReceiveMessageStatus::DATA_CORRUPTED:
if (ReceiveMessageAction::POLLING == receive_message_action) {
process_queue_shared_ptr->cacheMessages(result.messages_);
impl->getConsumeMessageService()->signalDispatcher();
}
checkThrottleThenReceive();
break;
case ReceiveMessageStatus::OUT_OF_RANGE:
assert(ReceiveMessageAction::PULL == receive_message_action);
process_queue_shared_ptr->nextOffset(result.next_offset_);
checkThrottleThenReceive();
break;
case ReceiveMessageStatus::DEADLINE_EXCEEDED:
SPDLOG_DEBUG("Receive messages from broker[host={}] returns with status=DEADLINE_EXCEEDED, queue={}",
result.sourceHost(), process_queue_shared_ptr->simpleName());
checkThrottleThenReceive();
break;
case ReceiveMessageStatus::INTERNAL:
SPDLOG_DEBUG("Receive messages from broker[host={}] returns with status=UNKNOWN, queue={}", result.sourceHost(),
process_queue_shared_ptr->simpleName());
receiveMessageLater();
break;
case ReceiveMessageStatus::RESOURCE_EXHAUSTED:
SPDLOG_DEBUG("Receive messages from broker[host={}] returns with status=RESOURCE_EXHAUSTED, queue={}",
result.sourceHost(), process_queue_shared_ptr->simpleName());
receiveMessageLater();
break;
case ReceiveMessageStatus::NOT_FOUND:
SPDLOG_DEBUG("Receive messages from broker[host={}] returns with status=NOT_FOUND, queue={}", result.sourceHost(),
process_queue_shared_ptr->simpleName());
receiveMessageLater();
break;
default:
SPDLOG_WARN("Unknown receive message status: {} from broker[host={}], queue={}", result.status(),
result.sourceHost(), process_queue_shared_ptr->simpleName());
receiveMessageLater();
break;
}
}
const char* AsyncReceiveMessageCallback::RECEIVE_LATER_TASK_NAME = "receive-later-task";
void AsyncReceiveMessageCallback::checkThrottleThenReceive() {
auto process_queue = process_queue_.lock();
if (!process_queue) {
SPDLOG_WARN("Process queue should have been destructed");
return;
}
if (process_queue->shouldThrottle()) {
SPDLOG_INFO("Number of messages in {} exceeds throttle threshold. Receive messages later.",
process_queue->simpleName());
process_queue->syncIdleState();
receiveMessageLater();
} else {
// Receive message immediately
receiveMessageImmediately();
}
}
void AsyncReceiveMessageCallback::onException(MQException& e) {
auto process_queue_ptr = process_queue_.lock();
if (process_queue_ptr) {
SPDLOG_WARN("pop message error:{}, pop message later. Queue={}", e.what(), process_queue_ptr->simpleName());
// pop message later
receiveMessageLater();
}
}
void AsyncReceiveMessageCallback::receiveMessageLater() {
auto process_queue = process_queue_.lock();
if (!process_queue) {
return;
}
auto client_instance = process_queue->getClientManager();
std::weak_ptr<AsyncReceiveMessageCallback> receive_callback_weak_ptr(shared_from_this());
auto task = [receive_callback_weak_ptr]() {
auto async_receive_ptr = receive_callback_weak_ptr.lock();
if (async_receive_ptr) {
async_receive_ptr->checkThrottleThenReceive();
}
};
client_instance->getScheduler().schedule(task, RECEIVE_LATER_TASK_NAME, std::chrono::seconds(1),
std::chrono::seconds(0));
}
void AsyncReceiveMessageCallback::receiveMessageImmediately() {
ProcessQueueSharedPtr process_queue_shared_ptr = process_queue_.lock();
if (!process_queue_shared_ptr) {
SPDLOG_INFO("ProcessQueue has been released. Ignore further receive message request-response cycles");
return;
}
std::shared_ptr<PushConsumer> impl = process_queue_shared_ptr->getConsumer().lock();
if (!impl) {
SPDLOG_INFO("Owner of ProcessQueue[{}] has been released. Ignore further receive message request-response cycles",
process_queue_shared_ptr->simpleName());
return;
}
impl->receiveMessage(process_queue_shared_ptr->getMQMessageQueue(), process_queue_shared_ptr->getFilterExpression());
}
ROCKETMQ_NAMESPACE_END