blob: dede4a4aae791beb4480b6e64e86df3352b3bb28 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ConsumeTask.h"
#include "LoggerImpl.h"
ROCKETMQ_NAMESPACE_BEGIN
ConsumeTask::ConsumeTask(ConsumeMessageServiceWeakPtr service,
std::weak_ptr<ProcessQueue> process_queue,
MessageConstSharedPtr message)
: service_(service), process_queue_(std::move(process_queue)) {
messages_.emplace_back(message);
}
ConsumeTask::ConsumeTask(ConsumeMessageServiceWeakPtr service,
std::weak_ptr<ProcessQueue> process_queue,
std::vector<MessageConstSharedPtr> messages)
: service_(service), process_queue_(std::move(process_queue)), messages_(std::move(messages)) {
fifo_ = messages_.size() > 1;
}
void ConsumeTask::pop() {
assert(!messages_.empty());
auto process_queue = process_queue_.lock();
if (!process_queue) {
return;
}
process_queue->release(messages_[0]->body().size());
messages_.erase(messages_.begin());
}
void ConsumeTask::submit() {
auto svc = service_.lock();
if (!svc) {
return;
}
svc->submit(shared_from_this());
}
void ConsumeTask::schedule() {
auto svc = service_.lock();
if (!svc) {
return;
}
svc->schedule(shared_from_this(), std::chrono::seconds(1));
}
void ConsumeTask::onAck(std::shared_ptr<ConsumeTask> task, const std::error_code& ec) {
if (task->fifo_ && ec) {
auto service = task->service_.lock();
task->next_step_ = NextStep::Ack;
task->schedule();
} else {
// If it is not FIFO or ack operation succeeded
task->pop();
task->next_step_ = NextStep::Consume;
}
task->submit();
}
void ConsumeTask::onNack(std::shared_ptr<ConsumeTask> task, const std::error_code& ec) {
assert(!task->fifo_);
assert(!task->messages_.empty());
if (ec) {
SPDLOG_WARN("Failed to nack message[message-id={}]. Cause: {}", task->messages_[0]->id(), ec.message());
}
task->pop();
task->next_step_ = NextStep::Consume;
task->submit();
}
void ConsumeTask::onForward(std::shared_ptr<ConsumeTask> task, const std::error_code& ec) {
assert(task->fifo_);
assert(!task->messages_.empty());
if (ec) {
SPDLOG_DEBUG("Failed to forward Message[message-id={}] to DLQ", task->messages_[0]->id());
task->next_step_ = NextStep::Forward;
task->schedule();
} else {
SPDLOG_DEBUG("Message[message-id={}] forwarded to DLQ", task->messages_[0]->id());
task->pop();
task->next_step_ = NextStep::Consume;
task->submit();
}
}
void ConsumeTask::process() {
auto svc = service_.lock();
if (!svc) {
SPDLOG_DEBUG("ConsumeMessageService has destructed");
return;
}
if (messages_.empty()) {
SPDLOG_DEBUG("No more messages to process");
return;
}
auto self = shared_from_this();
switch (next_step_) {
case NextStep::Consume: {
const auto& listener = svc->listener();
auto it = messages_.begin();
SPDLOG_DEBUG("Start to process message[message-id={}]", (*it)->id());
svc->preHandle(**it);
auto result = listener(**it);
svc->postHandle(**it, result);
switch (result) {
case ConsumeResult::SUCCESS: {
auto callback = std::bind(&ConsumeTask::onAck, self, std::placeholders::_1);
svc->ack(**it, callback);
break;
}
case ConsumeResult::FAILURE: {
if (fifo_) {
next_step_ = NextStep::Consume;
// Increase delivery attempts.
auto raw = const_cast<Message*>((*it).get());
raw->mutableExtension().delivery_attempt++;
schedule();
} else {
// For standard way of processing, Nack to server.
auto callback = std::bind(&ConsumeTask::onNack, self, std::placeholders::_1);
svc->nack(**it, callback);
}
break;
}
}
break;
}
case NextStep::Ack: {
assert(!messages_.empty());
auto callback = std::bind(&ConsumeTask::onAck, self, std::placeholders::_1);
svc->ack(*messages_[0], callback);
break;
}
case NextStep::Forward: {
assert(!messages_.empty());
auto callback = std::bind(&ConsumeTask::onForward, self, std::placeholders::_1);
svc->forward(*messages_[0], callback);
break;
}
}
}
ROCKETMQ_NAMESPACE_END