blob: 12b6f4035bc635dd14a68ede5a35399bc1f672c1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <pulsar/MessageId.h>
#include <pulsar/MessageIdBuilder.h>
#include <iostream>
#include <limits>
#include <memory>
#include <stdexcept>
#include "ChunkMessageIdImpl.h"
#include "MessageIdImpl.h"
#include "PulsarApi.pb.h"
namespace pulsar {
MessageId::MessageId() {
static const MessageIdImplPtr emptyMessageId = std::make_shared<MessageIdImpl>();
impl_ = emptyMessageId;
}
MessageId& MessageId::operator=(const MessageId& m) {
impl_ = m.impl_;
return *this;
}
MessageId::MessageId(int32_t partition, int64_t ledgerId, int64_t entryId, int32_t batchIndex)
: impl_(std::make_shared<MessageIdImpl>(partition, ledgerId, entryId, batchIndex)) {}
MessageId::MessageId(const MessageIdImplPtr& impl) : impl_(impl) {}
const MessageId& MessageId::earliest() {
static const auto _earliest = MessageIdBuilder().build();
return _earliest;
}
const MessageId& MessageId::latest() {
static const int64_t long_max = std::numeric_limits<int64_t>::max();
static const auto _latest = MessageIdBuilder().ledgerId(long_max).entryId(long_max).build();
return _latest;
}
void MessageId::serialize(std::string& result) const {
proto::MessageIdData idData;
idData.set_ledgerid(impl_->ledgerId_);
idData.set_entryid(impl_->entryId_);
if (impl_->partition_ != -1) {
idData.set_partition(impl_->partition_);
}
if (impl_->batchIndex_ != -1) {
idData.set_batch_index(impl_->batchIndex_);
}
if (impl_->batchSize_ != 0) {
idData.set_batch_size(impl_->batchSize_);
}
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(impl_);
if (chunkMsgId) {
proto::MessageIdData& firstChunkIdData = *idData.mutable_first_chunk_message_id();
auto firstChunkId = chunkMsgId->getFirstChunkMessageId();
firstChunkIdData.set_ledgerid(firstChunkId->ledgerId_);
firstChunkIdData.set_entryid(firstChunkId->entryId_);
if (chunkMsgId->partition_ != -1) {
firstChunkIdData.set_partition(firstChunkId->partition_);
}
}
idData.SerializeToString(&result);
}
/**
* Deserialize a message id from a binary string
*/
MessageId MessageId::deserialize(const std::string& serializedMessageId) {
proto::MessageIdData idData;
if (!idData.ParseFromString(serializedMessageId)) {
throw std::invalid_argument("Failed to parse serialized message id");
}
MessageId msgId = MessageIdBuilder::from(idData).build();
if (idData.has_first_chunk_message_id()) {
ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
chunkMsgId->setFirstChunkMessageId(MessageIdBuilder::from(idData.first_chunk_message_id()).build());
chunkMsgId->setLastChunkMessageId(msgId);
return chunkMsgId->build();
}
return msgId;
}
int64_t MessageId::ledgerId() const { return impl_->ledgerId_; }
int64_t MessageId::entryId() const { return impl_->entryId_; }
int32_t MessageId::batchIndex() const { return impl_->batchIndex_; }
int32_t MessageId::partition() const { return impl_->partition_; }
int32_t MessageId::batchSize() const { return impl_->batchSize_; }
PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const pulsar::MessageId& messageId) {
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
if (chunkMsgId) {
auto firstId = chunkMsgId->getFirstChunkMessageId();
s << '(' << firstId->ledgerId_ << ',' << firstId->entryId_ << ',' << firstId->partition_ << ','
<< firstId->batchIndex_ << ");";
}
s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ << ','
<< messageId.impl_->partition_ << ',' << messageId.impl_->batchIndex_ << ')';
return s;
}
PULSAR_PUBLIC bool MessageId::operator<(const MessageId& other) const {
if (impl_->ledgerId_ < other.impl_->ledgerId_) {
return true;
} else if (impl_->ledgerId_ > other.impl_->ledgerId_) {
return false;
}
if (impl_->entryId_ < other.impl_->entryId_) {
return true;
} else if (impl_->entryId_ > other.impl_->entryId_) {
return false;
}
if (impl_->batchIndex_ < other.impl_->batchIndex_) {
return true;
} else {
return false;
}
}
PULSAR_PUBLIC bool MessageId::operator<=(const MessageId& other) const {
return *this < other || *this == other;
}
PULSAR_PUBLIC bool MessageId::operator>(const MessageId& other) const { return !(*this <= other); }
PULSAR_PUBLIC bool MessageId::operator>=(const MessageId& other) const { return !(*this < other); }
PULSAR_PUBLIC bool MessageId::operator==(const MessageId& other) const {
return impl_->ledgerId_ == other.impl_->ledgerId_ && impl_->entryId_ == other.impl_->entryId_ &&
impl_->batchIndex_ == other.impl_->batchIndex_ && impl_->partition_ == other.impl_->partition_;
}
PULSAR_PUBLIC bool MessageId::operator!=(const MessageId& other) const { return !(*this == other); }
PULSAR_PUBLIC const std::string& MessageId::getTopicName() const { return impl_->getTopicName(); }
PULSAR_PUBLIC void MessageId::setTopicName(const std::string& topicName) {
return setTopicName(std::make_shared<std::string>(topicName));
}
void MessageId::setTopicName(const std::shared_ptr<std::string>& topic) { return impl_->setTopicName(topic); }
} // namespace pulsar