blob: 9a4713547704fe43da9854dcdee8cfdd625399ba [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "AckGroupingTracker.h"
#include <atomic>
#include <limits>
#include "BitSet.h"
#include "ClientConnection.h"
#include "Commands.h"
#include "LogUtils.h"
#include "MessageIdImpl.h"
namespace pulsar {
DECLARE_LOG_OBJECT();
void AckGroupingTracker::doImmediateAck(const MessageId& msgId, ResultCallback callback,
CommandAck_AckType ackType) const {
const auto cnx = connectionSupplier_();
if (!cnx) {
LOG_DEBUG("Connection is not ready, ACK failed for " << msgId);
if (callback) {
callback(ResultAlreadyClosed);
}
return;
}
const auto& ackSet = Commands::getMessageIdImpl(msgId)->getBitSet();
if (waitResponse_) {
const auto requestId = requestIdSupplier_();
cnx->sendRequestWithId(
Commands::newAck(consumerId_, msgId.ledgerId(), msgId.entryId(), ackSet, ackType, requestId),
requestId)
.addListener([callback](Result result, const ResponseData&) {
if (callback) {
callback(result);
}
});
} else {
cnx->sendCommand(Commands::newAck(consumerId_, msgId.ledgerId(), msgId.entryId(), ackSet, ackType));
if (callback) {
callback(ResultOk);
}
}
}
static std::ostream& operator<<(std::ostream& os, const std::set<MessageId>& msgIds) {
bool first = true;
for (auto&& msgId : msgIds) {
if (first) {
first = false;
} else {
os << ", ";
}
os << "[" << msgId << "]";
}
return os;
}
void AckGroupingTracker::doImmediateAck(const std::set<MessageId>& msgIds, ResultCallback callback) const {
const auto cnx = connectionSupplier_();
if (!cnx) {
LOG_DEBUG("Connection is not ready, ACK failed for " << msgIds);
if (callback) {
callback(ResultAlreadyClosed);
}
return;
}
if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
if (waitResponse_) {
const auto requestId = requestIdSupplier_();
cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, msgIds, requestId), requestId)
.addListener([callback](Result result, const ResponseData&) {
if (callback) {
callback(result);
}
});
} else {
cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, msgIds));
if (callback) {
callback(ResultOk);
}
}
} else {
auto count = std::make_shared<std::atomic<size_t>>(msgIds.size());
auto wrappedCallback = [callback, count](Result result) {
if (--*count == 0 && callback) {
callback(result);
}
};
for (auto&& msgId : msgIds) {
doImmediateAck(msgId, wrappedCallback, CommandAck_AckType_Individual);
}
}
}
} // namespace pulsar