blob: b691b18cc71c09c2110f702fe3c675b4e04a9715 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "NegativeAcksTracker.h"
#include <cstdint>
#include <functional>
#include <set>
#include <utility>
#include "ClientImpl.h"
#include "ConsumerImpl.h"
#include "ExecutorService.h"
#include "LogUtils.h"
#include "MessageIdUtil.h"
#include "pulsar/MessageBuilder.h"
#include "pulsar/MessageId.h"
#include "pulsar/MessageIdBuilder.h"
DECLARE_LOG_OBJECT()
namespace pulsar {
NegativeAcksTracker::NegativeAcksTracker(const ClientImplPtr &client, ConsumerImpl &consumer,
const ConsumerConfiguration &conf)
: consumer_(consumer),
timerInterval_(0),
timer_(client->getIOExecutorProvider()->get()->createDeadlineTimer()) {
static const long MIN_NACK_DELAY_MILLIS = 100;
nackDelay_ =
std::chrono::milliseconds(std::max(conf.getNegativeAckRedeliveryDelayMs(), MIN_NACK_DELAY_MILLIS));
timerInterval_ = std::chrono::milliseconds((long)(nackDelay_.count() / 3));
nackPrecisionBit_ = conf.getNegativeAckPrecisionBitCnt();
LOG_DEBUG("Created negative ack tracker with delay: " << nackDelay_.count() << " ms - Timer interval: "
<< timerInterval_.count());
}
void NegativeAcksTracker::scheduleTimer() {
if (closed_) {
return;
}
std::weak_ptr<NegativeAcksTracker> weakSelf{shared_from_this()};
timer_->expires_after(timerInterval_);
timer_->async_wait([weakSelf](const ASIO_ERROR &ec) {
if (auto self = weakSelf.lock()) {
self->handleTimer(ec);
}
});
}
void NegativeAcksTracker::handleTimer(const ASIO_ERROR &ec) {
if (ec) {
// Ignore cancelled events
return;
}
std::unique_lock<std::mutex> lock(mutex_);
if (nackedMessages_.empty() || !enabledForTesting_) {
return;
}
// Group all the nacked messages into one single re-delivery request
std::set<MessageId> messagesToRedeliver;
auto now = Clock::now();
// The map is sorted by time, so we can exit immediately when we traverse to a time that does not match
for (auto it = nackedMessages_.begin(); it != nackedMessages_.end();) {
if (it->first > now) {
// We are done with all the messages that need to be redelivered
break;
}
const auto &ledgerMap = it->second;
for (auto ledgerIt = ledgerMap.begin(); ledgerIt != ledgerMap.end(); ++ledgerIt) {
const auto &entrySet = ledgerIt->second;
for (auto setIt = entrySet.begin(); setIt != entrySet.end(); ++setIt) {
messagesToRedeliver.insert(
MessageIdBuilder().ledgerId(ledgerIt->first).entryId(*setIt).build());
}
}
it = nackedMessages_.erase(it);
}
lock.unlock();
if (!messagesToRedeliver.empty()) {
consumer_.onNegativeAcksSend(messagesToRedeliver);
consumer_.redeliverUnacknowledgedMessages(messagesToRedeliver);
}
scheduleTimer();
}
std::chrono::steady_clock::time_point trimLowerBit(const std::chrono::steady_clock::time_point &tp,
int bits) {
// get origin timestamp in nanoseconds
auto timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>(tp.time_since_epoch()).count();
// trim lower bits
auto trimmedTimestamp = timestamp & (~((1LL << bits) - 1));
return std::chrono::steady_clock::time_point(std::chrono::nanoseconds(trimmedTimestamp));
}
void NegativeAcksTracker::add(const MessageId &m) {
auto msgId = discardBatch(m);
auto now = Clock::now();
{
std::lock_guard<std::mutex> lock{mutex_};
auto trimmedTimestamp = trimLowerBit(now + nackDelay_, nackPrecisionBit_);
// If the timestamp is already in the map, we can just add the message to the existing entry
// Erase batch id to group all nacks from same batch
nackedMessages_[trimmedTimestamp][msgId.ledgerId()].insert(msgId.entryId());
}
scheduleTimer();
}
void NegativeAcksTracker::close() {
closed_ = true;
cancelTimer(*timer_);
std::lock_guard<std::mutex> lock(mutex_);
nackedMessages_.clear();
}
void NegativeAcksTracker::setEnabledForTesting(bool enabled) {
enabledForTesting_ = enabled;
if (enabledForTesting_) {
scheduleTimer();
}
}
} // namespace pulsar