/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
#include "BatchAcknowledgementTracker.h"

namespace pulsar {
DECLARE_LOG_OBJECT()

BatchAcknowledgementTracker::BatchAcknowledgementTracker(const std::string topic,
                                                         const std::string subscription,
                                                         const long consumerId)
    : greatestCumulativeAckSent_() {
    std::stringstream consumerStrStream;
    consumerStrStream << "BatchAcknowledgementTracker for [" << topic << ", " << subscription << ", "
                      << consumerId << "] ";
    name_ = consumerStrStream.str();
    LOG_DEBUG(name_ << "Constructed BatchAcknowledgementTracker");
}

void BatchAcknowledgementTracker::clear() {
    Lock lock(mutex_);
    trackerMap_.clear();
    sendList_.clear();
}

void BatchAcknowledgementTracker::receivedMessage(const Message& message) {
    // ignore message if it is not a batch message
    if (!message.impl_->metadata.has_num_messages_in_batch()) {
        return;
    }
    Lock lock(mutex_);
    MessageId msgID = message.impl_->messageId;

    // ignore message if it is less than the last cumulative ack sent or messageID is already being tracked
    TrackerMap::iterator pos = trackerMap_.find(msgID);
    if (msgID < greatestCumulativeAckSent_ || pos != trackerMap_.end() ||
        std::find(sendList_.begin(), sendList_.end(), msgID) != sendList_.end()) {
        return;
    }
    LOG_DEBUG("Initializing the trackerMap_ with Message ID = "
              << msgID << " -- Map size: " << trackerMap_.size() << " -- List size: " << sendList_.size());

    // Since dynamic_set (this version) doesn't have all() function, initializing all bits with 1 and then
    // reseting them to 0 and using any()
    trackerMap_.insert(
        pos,
        TrackerPair(msgID, boost::dynamic_bitset<>(message.impl_->metadata.num_messages_in_batch()).set()));
}

void BatchAcknowledgementTracker::deleteAckedMessage(const MessageId& messageId,
                                                     proto::CommandAck_AckType ackType) {
    // Not a batch message and a individual ack
    if (messageId.batchIndex() == -1 && ackType == proto::CommandAck_AckType_Individual) {
        return;
    }

    MessageId batchMessageId =
        MessageId(messageId.partition(), messageId.ledgerId(), messageId.entryId(), -1 /* Batch index */);

    Lock lock(mutex_);
    if (ackType == proto::CommandAck_AckType_Cumulative) {
        // delete from trackerMap and sendList all messageIDs less than or equal to this one
        // equal to - since getGreatestCumulativeAckReady already gives us the exact message id to be acked

        TrackerMap::iterator it = trackerMap_.begin();
        TrackerMapRemoveCriteria criteria(messageId);
        while (it != trackerMap_.end()) {
            if (criteria(*it)) {
                trackerMap_.erase(it++);
            } else {
                ++it;
            }
        }

        // std::remove shifts all to be deleted items to the end of the vector and returns an iterator to the
        // first
        // instance of item, then we erase all elements from this iterator to the end of the list
        sendList_.erase(
            std::remove_if(sendList_.begin(), sendList_.end(), SendRemoveCriteria(batchMessageId)),
            sendList_.end());

        if (greatestCumulativeAckSent_ < messageId) {
            greatestCumulativeAckSent_ = messageId;
            LOG_DEBUG(*this << " The greatestCumulativeAckSent_ is now " << greatestCumulativeAckSent_);
        }
    } else {
        // Error - if it is a batch message and found in trackerMap_
        if (trackerMap_.find(messageId) != trackerMap_.end()) {
            LOG_ERROR(*this << " - This should not happened - Message should have been removed from "
                               "trakerMap_ and moved to sendList_ "
                            << messageId);
        }

        sendList_.erase(std::remove(sendList_.begin(), sendList_.end(), batchMessageId), sendList_.end());
    }
}

bool BatchAcknowledgementTracker::isBatchReady(const MessageId& msgID,
                                               const proto::CommandAck_AckType ackType) {
    Lock lock(mutex_);
    // Remove batch index
    MessageId batchMessageId =
        MessageId(msgID.partition(), msgID.ledgerId(), msgID.entryId(), -1 /* Batch index */);

    TrackerMap::iterator pos = trackerMap_.find(batchMessageId);
    if (pos == trackerMap_.end() ||
        std::find(sendList_.begin(), sendList_.end(), batchMessageId) != sendList_.end()) {
        LOG_DEBUG(
            "Batch is ready since message present in sendList_ or not present in trackerMap_ [message ID = "
            << batchMessageId << "]");
        return true;
    }

    int batchIndex = msgID.batchIndex();
    assert(batchIndex < pos->second.size());
    pos->second.set(batchIndex, false);

    if (ackType == proto::CommandAck_AckType_Cumulative) {
        for (int i = 0; i < batchIndex; i++) {
            pos->second.set(i, false);
        }
    }

    if (pos->second.any()) {
        return false;
    }
    sendList_.push_back(batchMessageId);
    trackerMap_.erase(pos);
    LOG_DEBUG("Batch is ready since message all bits are reset in trackerMap_ [message ID = " << msgID
                                                                                              << "]");
    return true;
}

// returns
// - a batch message id < messageId
// - same messageId if it is the last message in the batch
const MessageId BatchAcknowledgementTracker::getGreatestCumulativeAckReady(const MessageId& messageId) {
    Lock lock(mutex_);

    // Remove batch index
    MessageId batchMessageId =
        MessageId(messageId.partition(), messageId.ledgerId(), messageId.entryId(), -1 /* Batch index */);
    TrackerMap::iterator pos = trackerMap_.find(batchMessageId);

    // element not found
    if (pos == trackerMap_.end()) {
        return MessageId();
    }

    if (pos->second.size() - 1 != messageId.batchIndex()) {
        // Can't cumulatively ack this batch message
        if (pos == trackerMap_.begin()) {
            // This was the first message hence we can't decrement the iterator
            return MessageId();
        }
        pos--;
    }

    return pos->first;
}
}  // namespace pulsar
