blob: 3d6d9208adc464b6b0f24e9e5877ee1c233889c4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "BatchAcknowledgementTracker.h"
namespace pulsar {
DECLARE_LOG_OBJECT()
BatchAcknowledgementTracker::BatchAcknowledgementTracker(const std::string topic,
const std::string subscription,
const long consumerId)
: greatestCumulativeAckSent_() {
std::stringstream consumerStrStream;
consumerStrStream << "BatchAcknowledgementTracker for [" << topic << ", " << subscription << ", "
<< consumerId << "] ";
name_ = consumerStrStream.str();
LOG_DEBUG(name_ << "Constructed BatchAcknowledgementTracker");
}
void BatchAcknowledgementTracker::clear() {
Lock lock(mutex_);
trackerMap_.clear();
sendList_.clear();
}
void BatchAcknowledgementTracker::receivedMessage(const Message& message) {
// ignore message if it is not a batch message
if (!message.impl_->metadata.has_num_messages_in_batch()) {
return;
}
Lock lock(mutex_);
MessageId msgID = message.impl_->messageId;
// ignore message if it is less than the last cumulative ack sent or messageID is already being tracked
TrackerMap::iterator pos = trackerMap_.find(msgID);
if (msgID < greatestCumulativeAckSent_ || pos != trackerMap_.end() ||
std::find(sendList_.begin(), sendList_.end(), msgID) != sendList_.end()) {
return;
}
LOG_DEBUG("Initializing the trackerMap_ with Message ID = "
<< msgID << " -- Map size: " << trackerMap_.size() << " -- List size: " << sendList_.size());
// Since dynamic_set (this version) doesn't have all() function, initializing all bits with 1 and then
// reseting them to 0 and using any()
trackerMap_.insert(
pos,
TrackerPair(msgID, boost::dynamic_bitset<>(message.impl_->metadata.num_messages_in_batch()).set()));
}
void BatchAcknowledgementTracker::deleteAckedMessage(const MessageId& messageId,
proto::CommandAck_AckType ackType) {
// Not a batch message and a individual ack
if (messageId.batchIndex() == -1 && ackType == proto::CommandAck_AckType_Individual) {
return;
}
MessageId batchMessageId =
MessageId(messageId.partition(), messageId.ledgerId(), messageId.entryId(), -1 /* Batch index */);
Lock lock(mutex_);
if (ackType == proto::CommandAck_AckType_Cumulative) {
// delete from trackerMap and sendList all messageIDs less than or equal to this one
// equal to - since getGreatestCumulativeAckReady already gives us the exact message id to be acked
TrackerMap::iterator it = trackerMap_.begin();
TrackerMapRemoveCriteria criteria(messageId);
while (it != trackerMap_.end()) {
if (criteria(*it)) {
trackerMap_.erase(it++);
} else {
++it;
}
}
// std::remove shifts all to be deleted items to the end of the vector and returns an iterator to the
// first
// instance of item, then we erase all elements from this iterator to the end of the list
sendList_.erase(
std::remove_if(sendList_.begin(), sendList_.end(), SendRemoveCriteria(batchMessageId)),
sendList_.end());
if (greatestCumulativeAckSent_ < messageId) {
greatestCumulativeAckSent_ = messageId;
LOG_DEBUG(*this << " The greatestCumulativeAckSent_ is now " << greatestCumulativeAckSent_);
}
} else {
// Error - if it is a batch message and found in trackerMap_
if (trackerMap_.find(messageId) != trackerMap_.end()) {
LOG_ERROR(*this << " - This should not happened - Message should have been removed from "
"trakerMap_ and moved to sendList_ "
<< messageId);
}
sendList_.erase(std::remove(sendList_.begin(), sendList_.end(), batchMessageId), sendList_.end());
}
}
bool BatchAcknowledgementTracker::isBatchReady(const MessageId& msgID,
const proto::CommandAck_AckType ackType) {
Lock lock(mutex_);
// Remove batch index
MessageId batchMessageId =
MessageId(msgID.partition(), msgID.ledgerId(), msgID.entryId(), -1 /* Batch index */);
TrackerMap::iterator pos = trackerMap_.find(batchMessageId);
if (pos == trackerMap_.end() ||
std::find(sendList_.begin(), sendList_.end(), batchMessageId) != sendList_.end()) {
LOG_DEBUG(
"Batch is ready since message present in sendList_ or not present in trackerMap_ [message ID = "
<< batchMessageId << "]");
return true;
}
int batchIndex = msgID.batchIndex();
assert(batchIndex < pos->second.size());
pos->second.set(batchIndex, false);
if (ackType == proto::CommandAck_AckType_Cumulative) {
for (int i = 0; i < batchIndex; i++) {
pos->second.set(i, false);
}
}
if (pos->second.any()) {
return false;
}
sendList_.push_back(batchMessageId);
trackerMap_.erase(pos);
LOG_DEBUG("Batch is ready since message all bits are reset in trackerMap_ [message ID = " << msgID
<< "]");
return true;
}
// returns
// - a batch message id < messageId
// - same messageId if it is the last message in the batch
const MessageId BatchAcknowledgementTracker::getGreatestCumulativeAckReady(const MessageId& messageId) {
Lock lock(mutex_);
// Remove batch index
MessageId batchMessageId =
MessageId(messageId.partition(), messageId.ledgerId(), messageId.entryId(), -1 /* Batch index */);
TrackerMap::iterator pos = trackerMap_.find(batchMessageId);
// element not found
if (pos == trackerMap_.end()) {
return MessageId();
}
if (pos->second.size() - 1 != messageId.batchIndex()) {
// Can't cumulatively ack this batch message
if (pos == trackerMap_.begin()) {
// This was the first message hence we can't decrement the iterator
return MessageId();
}
pos--;
}
return pos->first;
}
} // namespace pulsar