blob: 34ffaa4775d911ebaedec18ef72113b256104ebb [file] [log] [blame]
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <pulsar/Message.h>
#include <pulsar/MessageBuilder.h>
#include <boost/make_shared.hpp>
#include <boost/smart_ptr.hpp>
#include "PulsarApi.pb.h"
#include "MessageImpl.h"
#include "SharedBuffer.h"
#include <iostream>
using namespace pulsar;
namespace pulsar {
const static std::string emptyString;
const static MessageId invalidMessageId;
const Message::StringMap& Message::getProperties() const {
return impl_->properties();
}
bool Message::hasProperty(const std::string& name) const {
const StringMap& m = impl_->properties();
return m.find(name) != m.end();
}
const std::string& Message::getProperty(const std::string& name) const {
if (hasProperty(name)) {
const StringMap& m = impl_->properties();
return m.at(name);
} else {
return emptyString;
}
}
const void* Message::getData() const {
return impl_->payload.data();
}
std::size_t Message::getLength() const {
return impl_->payload.readableBytes();
}
std::string Message::getDataAsString() const {
return std::string((const char*) getData(), getLength());
}
Message::Message()
: impl_() {
}
Message::Message(MessageImplPtr& impl)
: impl_(impl) {
}
Message::Message(const proto::CommandMessage& msg, proto::MessageMetadata& metadata,
SharedBuffer& payload)
: impl_(boost::make_shared<MessageImpl>()) {
impl_->messageId = BatchMessageId(msg.message_id().ledgerid(), msg.message_id().entryid());
impl_->metadata = metadata;
impl_->payload = payload;
}
Message::Message(const BatchMessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer& payload, proto::SingleMessageMetadata& singleMetadata)
: impl_(boost::make_shared<MessageImpl>()) {
impl_->messageId = messageID;
impl_->metadata = metadata;
impl_->payload = payload;
impl_->metadata.mutable_properties()->CopyFrom(singleMetadata.properties());
}
const MessageId& Message::getMessageId() const {
if (!impl_) {
return invalidMessageId;
} else {
return impl_->messageId;
}
}
bool Message::hasPartitionKey() const{
if(impl_) {
return impl_->hasPartitionKey();
}
return false;
}
const std::string& Message::getPartitionKey() const {
if (!impl_) {
return emptyString;
}
return impl_->getPartitionKey();
}
uint64_t Message::getPublishTimestamp() const {
return impl_ ? impl_->getPublishTimestamp() : 0ull;
}
#pragma GCC visibility push(default)
std::ostream& operator<<(std::ostream& s, const Message::StringMap& map) {
// Output at most 10 elements -- appropriate if used for logging.
s << '{';
Message::StringMap::const_iterator begin = map.begin();
Message::StringMap::const_iterator end = map.end();
for (int i = 0; begin != end && i < 10; ++i, ++begin) {
if (i > 0) {
s << ", ";
}
s << "'" << begin->first << "':'" << begin->second << "'";
}
if (begin != end) {
s << " ...";
}
s << '}';
return s;
}
std::ostream& operator<<(std::ostream& s, const Message& msg) {
assert(msg.impl_.get());
assert(msg.impl_->metadata.has_sequence_id());
assert(msg.impl_->metadata.has_publish_time());
s << "Message(prod=" << msg.impl_->metadata.producer_name() << ", seq="
<< msg.impl_->metadata.sequence_id() << ", publish_time="
<< msg.impl_->metadata.publish_time() << ", payload_size=" << msg.getLength() << ", msg_id="
<< msg.getMessageId() << ", props=" << msg.getProperties() << ')';
if (! msg.impl_->metadata.has_producer_name()) {
s << "WARN: Message has no producer name set, this should only happen if batch messaging is enabled";
}
return s;
}
#pragma GCC visibility pop
}