blob: 1af8c090a3b400bb6aaebbb2b7f0efa4b54b2ff3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <pulsar/Message.h>
#include <pulsar/MessageBuilder.h>
#include <boost/make_shared.hpp>
#include <boost/smart_ptr.hpp>
#include "PulsarApi.pb.h"
#include "MessageImpl.h"
#include "SharedBuffer.h"
#include <iostream>
using namespace pulsar;
namespace pulsar {
const static std::string emptyString;
const static MessageId invalidMessageId;
const Message::StringMap& Message::getProperties() const {
return impl_->properties();
}
bool Message::hasProperty(const std::string& name) const {
const StringMap& m = impl_->properties();
return m.find(name) != m.end();
}
const std::string& Message::getProperty(const std::string& name) const {
if (hasProperty(name)) {
const StringMap& m = impl_->properties();
return m.at(name);
} else {
return emptyString;
}
}
const void* Message::getData() const {
return impl_->payload.data();
}
std::size_t Message::getLength() const {
return impl_->payload.readableBytes();
}
std::string Message::getDataAsString() const {
return std::string((const char*) getData(), getLength());
}
Message::Message()
: impl_() {
}
Message::Message(MessageImplPtr& impl)
: impl_(impl) {
}
Message::Message(const proto::CommandMessage& msg, proto::MessageMetadata& metadata,
SharedBuffer& payload)
: impl_(boost::make_shared<MessageImpl>()) {
impl_->messageId = BatchMessageId(msg.message_id().ledgerid(), msg.message_id().entryid());
impl_->metadata = metadata;
impl_->payload = payload;
}
Message::Message(const BatchMessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer& payload, proto::SingleMessageMetadata& singleMetadata)
: impl_(boost::make_shared<MessageImpl>()) {
impl_->messageId = messageID;
impl_->metadata = metadata;
impl_->payload = payload;
impl_->metadata.mutable_properties()->CopyFrom(singleMetadata.properties());
}
const MessageId& Message::getMessageId() const {
if (!impl_) {
return invalidMessageId;
} else {
return impl_->messageId;
}
}
bool Message::hasPartitionKey() const{
if(impl_) {
return impl_->hasPartitionKey();
}
return false;
}
const std::string& Message::getPartitionKey() const {
if (!impl_) {
return emptyString;
}
return impl_->getPartitionKey();
}
uint64_t Message::getPublishTimestamp() const {
return impl_ ? impl_->getPublishTimestamp() : 0ull;
}
#pragma GCC visibility push(default)
std::ostream& operator<<(std::ostream& s, const Message::StringMap& map) {
// Output at most 10 elements -- appropriate if used for logging.
s << '{';
Message::StringMap::const_iterator begin = map.begin();
Message::StringMap::const_iterator end = map.end();
for (int i = 0; begin != end && i < 10; ++i, ++begin) {
if (i > 0) {
s << ", ";
}
s << "'" << begin->first << "':'" << begin->second << "'";
}
if (begin != end) {
s << " ...";
}
s << '}';
return s;
}
std::ostream& operator<<(std::ostream& s, const Message& msg) {
assert(msg.impl_.get());
assert(msg.impl_->metadata.has_sequence_id());
assert(msg.impl_->metadata.has_publish_time());
s << "Message(prod=" << msg.impl_->metadata.producer_name() << ", seq="
<< msg.impl_->metadata.sequence_id() << ", publish_time="
<< msg.impl_->metadata.publish_time() << ", payload_size=" << msg.getLength() << ", msg_id="
<< msg.getMessageId() << ", props=" << msg.getProperties() << ')';
return s;
}
#pragma GCC visibility pop
}