blob: 5981d07681db89ccab4562bc3ae7cce87e23a494 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ConsumeTask.h"
#include "ConsumeStats.h"
#include "PushConsumerImpl.h"
#include "Tag.h"
#include "rocketmq/ConsumeResult.h"
#include "rocketmq/ErrorCode.h"
#include "rocketmq/Logger.h"
#include "spdlog/spdlog.h"
ROCKETMQ_NAMESPACE_BEGIN
ConsumeTask::ConsumeTask(ConsumeMessageServiceWeakPtr service,
std::weak_ptr<ProcessQueue> process_queue,
MessageConstSharedPtr message)
: service_(service), process_queue_(std::move(process_queue)) {
messages_.emplace_back(message);
}
ConsumeTask::ConsumeTask(ConsumeMessageServiceWeakPtr service,
std::weak_ptr<ProcessQueue> process_queue,
std::vector<MessageConstSharedPtr> messages)
: service_(service), process_queue_(std::move(process_queue)), messages_(std::move(messages)) {
fifo_ = messages_.size() > 1;
}
void ConsumeTask::pop() {
assert(!messages_.empty());
auto process_queue = process_queue_.lock();
if (!process_queue) {
return;
}
process_queue->release(messages_[0]->body().size());
messages_.erase(messages_.begin());
}
void ConsumeTask::submit() {
auto svc = service_.lock();
if (!svc) {
return;
}
svc->submit(shared_from_this());
}
void ConsumeTask::schedule() {
auto svc = service_.lock();
if (!svc) {
return;
}
svc->schedule(shared_from_this(), std::chrono::seconds(1));
}
void ConsumeTask::onAck(std::shared_ptr<ConsumeTask> task, const std::error_code& ec) {
// Treat both success and invalid-receipt-handle as completion
if (!ec || ec == ErrorCode::InvalidReceiptHandle) {
task->pop();
task->next_step_ = NextStep::Consume;
task->submit();
return;
}
// Try to ack again later
SPDLOG_WARN("Failed to ack message[message-id={}]. Cause: {}. Action: retry after 1s.", task->messages_[0]->id(),
ec.message());
task->next_step_ = NextStep::Ack;
task->schedule();
}
void ConsumeTask::onNack(std::shared_ptr<ConsumeTask> task, const std::error_code& ec) {
assert(!task->fifo_);
assert(!task->messages_.empty());
// Treat both success and invalid-receipt-handle as completion
if (!ec || ErrorCode::InvalidReceiptHandle == ec) {
task->pop();
task->next_step_ = NextStep::Consume;
task->submit();
return;
}
SPDLOG_WARN("Failed to nack message[message-id={}]. Cause: {}. Action: retry after 1s.", task->messages_[0]->id(),
ec.message());
task->next_step_ = NextStep::Nack;
task->schedule();
}
void ConsumeTask::onForward(std::shared_ptr<ConsumeTask> task, const std::error_code& ec) {
assert(task->fifo_);
assert(!task->messages_.empty());
// Treat both success and invalid-receipt-handle as completion
if (!ec || ErrorCode::InvalidReceiptHandle == ec) {
SPDLOG_DEBUG("Message[message-id={}] is forwarded to DLQ", task->messages_[0]->id());
task->pop();
task->next_step_ = NextStep::Consume;
task->submit();
return;
}
SPDLOG_DEBUG("Failed to forward Message[message-id={}] to DLQ. Cause: {}. Action: retry after 1s.",
task->messages_[0]->id(), ec.message());
task->next_step_ = NextStep::Forward;
task->schedule();
}
void ConsumeTask::process() {
auto svc = service_.lock();
if (!svc) {
SPDLOG_DEBUG("ConsumeMessageService has destructed");
return;
}
if (messages_.empty()) {
SPDLOG_DEBUG("No more messages to process");
return;
}
std::shared_ptr<PushConsumerImpl> consumer = svc->consumer().lock();
auto self = shared_from_this();
switch (next_step_) {
case NextStep::Consume: {
const auto& listener = svc->listener();
auto it = messages_.begin();
SPDLOG_DEBUG("Start to process message[message-id={}]", (*it)->id());
svc->preHandle(**it);
// Collect metrics of await_time
auto await_time = std::chrono::system_clock::now() - (*it)->extension().decode_time;
opencensus::stats::Record(
{{consumer->stats().awaitTime(), MixAll::millisecondsOf(await_time)}},
{{Tag::topicTag(), (*it)->topic()}, {Tag::clientIdTag(), consumer->config().client_id}, {Tag::consumerGroupTag(), consumer->groupName()}});
std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
// Invoke user-defined-callback
auto result = listener(**it);
// Collect metrics of process_time
auto duration = std::chrono::steady_clock::now() - start;
switch (result) {
case ConsumeResult::SUCCESS: {
opencensus::stats::Record({{consumer->stats().processTime(), MixAll::millisecondsOf(duration)}},
{
{Tag::topicTag(), (*it)->topic()},
{Tag::clientIdTag(), consumer->config().client_id},
{Tag::invocationStatusTag(), "success"},
{Tag::consumerGroupTag(), consumer->groupName()}
});
break;
}
case ConsumeResult::FAILURE: {
opencensus::stats::Record({{consumer->stats().processTime(), MixAll::millisecondsOf(duration)}},
{
{Tag::topicTag(), (*it)->topic()},
{Tag::clientIdTag(), consumer->config().client_id},
{Tag::invocationStatusTag(), "failure"},
{Tag::consumerGroupTag(), consumer->groupName()}
});
break;
}
}
svc->postHandle(**it, result);
switch (result) {
case ConsumeResult::SUCCESS: {
auto callback = std::bind(&ConsumeTask::onAck, self, std::placeholders::_1);
svc->ack(**it, callback);
break;
}
case ConsumeResult::FAILURE: {
if (fifo_) {
next_step_ = NextStep::Consume;
// Increase delivery attempts.
auto raw = const_cast<Message*>((*it).get());
raw->mutableExtension().delivery_attempt++;
schedule();
} else {
// For standard way of processing, Nack to server.
auto callback = std::bind(&ConsumeTask::onNack, self, std::placeholders::_1);
svc->nack(**it, callback);
}
break;
}
}
break;
}
case NextStep::Ack: {
assert(!messages_.empty());
auto callback = std::bind(&ConsumeTask::onAck, self, std::placeholders::_1);
svc->ack(*messages_[0], callback);
break;
}
case NextStep::Nack: {
auto callback = std::bind(&ConsumeTask::onNack, self, std::placeholders::_1);
svc->nack(*messages_[0], callback);
break;
}
case NextStep::Forward: {
assert(!messages_.empty());
auto callback = std::bind(&ConsumeTask::onForward, self, std::placeholders::_1);
svc->forward(*messages_[0], callback);
break;
}
}
}
ROCKETMQ_NAMESPACE_END