blob: 54de3eae4506ada54bb2da600f96d9f880e3209b [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/messaging/amqp/EncodedMessage.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/MessageImpl.h"
#include "qpid/amqp/Decoder.h"
#include <boost/lexical_cast.hpp>
#include <string.h>
namespace qpid {
namespace messaging {
namespace amqp {
using namespace qpid::amqp;
EncodedMessage::EncodedMessage(size_t s) : size(s), data(size ? new char[size] : 0)
{
init();
}
EncodedMessage::EncodedMessage() : size(0), data(0)
{
init();
}
EncodedMessage::EncodedMessage(const EncodedMessage& other) : size(other.size), data(size ? new char[size] : 0)
{
init();
}
void EncodedMessage::init()
{
//init all CharSequence members
deliveryAnnotations.init();
messageAnnotations.init();
userId.init();
to.init();
subject.init();
replyTo.init();
contentType.init();
contentEncoding.init();
groupId.init();
replyToGroupId.init();
applicationProperties.init();
body.init();
footer.init();
}
EncodedMessage::~EncodedMessage()
{
delete[] data;
}
size_t EncodedMessage::getSize() const
{
return size;
}
void EncodedMessage::trim(size_t t)
{
size = t;
}
void EncodedMessage::resize(size_t s)
{
delete[] data;
size = s;
data = new char[size];
}
char* EncodedMessage::getData()
{
return data;
}
const char* EncodedMessage::getData() const
{
return data;
}
void EncodedMessage::init(qpid::messaging::MessageImpl& impl)
{
//initial scan of raw data
qpid::amqp::Decoder decoder(data, size);
InitialScan reader(*this, impl);
decoder.read(reader);
bareMessage = reader.getBareMessage();
if (bareMessage.data && !bareMessage.size) {
bareMessage.size = (data + size) - bareMessage.data;
}
}
void EncodedMessage::populate(qpid::types::Variant::Map& map) const
{
//decode application properties
if (applicationProperties) {
qpid::amqp::Decoder decoder(applicationProperties.data, applicationProperties.size);
decoder.readMap(map);
}
//add in 'x-amqp-' prefixed values
if (!!firstAcquirer) {
map["x-amqp-first-acquirer"] = firstAcquirer.get();
}
if (!!deliveryCount) {
map["x-amqp-delivery-count"] = deliveryCount.get();
}
if (to) {
map["x-amqp-delivery-count"] = to.str();
}
if (!!absoluteExpiryTime) {
map["x-amqp-absolute-expiry-time"] = absoluteExpiryTime.get();
}
if (!!creationTime) {
map["x-amqp-creation-time"] = creationTime.get();
}
if (groupId) {
map["x-amqp-group-id"] = groupId.str();
}
if (!!groupSequence) {
map["x-amqp-qroup-sequence"] = groupSequence.get();
}
if (replyToGroupId) {
map["x-amqp-reply-to-group-id"] = replyToGroupId.str();
}
//add in any annotations
if (deliveryAnnotations) {
qpid::types::Variant::Map& annotations = map["x-amqp-delivery-annotations"].asMap();
qpid::amqp::Decoder decoder(deliveryAnnotations.data, deliveryAnnotations.size);
decoder.readMap(annotations);
}
if (messageAnnotations) {
qpid::types::Variant::Map& annotations = map["x-amqp-message-annotations"].asMap();
qpid::amqp::Decoder decoder(messageAnnotations.data, messageAnnotations.size);
decoder.readMap(annotations);
}
}
qpid::amqp::CharSequence EncodedMessage::getBareMessage() const
{
return bareMessage;
}
void EncodedMessage::getReplyTo(qpid::messaging::Address& a) const
{
a = qpid::messaging::Address(replyTo.str());
}
void EncodedMessage::getSubject(std::string& s) const
{
s.assign(subject.data, subject.size);
}
void EncodedMessage::getContentType(std::string& s) const
{
s.assign(contentType.data, contentType.size);
}
void EncodedMessage::getUserId(std::string& s) const
{
s.assign(userId.data, userId.size);
}
void EncodedMessage::getMessageId(std::string& s) const
{
messageId.assign(s);
}
void EncodedMessage::getCorrelationId(std::string& s) const
{
correlationId.assign(s);
}
void EncodedMessage::getBody(std::string& s) const
{
s.assign(body.data, body.size);
}
qpid::amqp::CharSequence EncodedMessage::getBody() const
{
return body;
}
bool EncodedMessage::hasHeaderChanged(const qpid::messaging::MessageImpl& msg) const
{
if (!durable) {
if (msg.isDurable()) return true;
} else {
if (durable.get() != msg.isDurable()) return true;
}
if (!priority) {
if (msg.getPriority() != 4) return true;
} else {
if (priority.get() != msg.getPriority()) return true;
}
if (msg.getTtl() && (!ttl || msg.getTtl() != ttl.get())) {
return true;
}
//first-acquirer can't be changed via Message interface as yet
if (msg.isRedelivered() && (!deliveryCount || deliveryCount.get() == 0)) {
return true;
}
return false;
}
EncodedMessage::InitialScan::InitialScan(EncodedMessage& e, qpid::messaging::MessageImpl& m) : em(e), mi(m)
{
//set up defaults as needed:
mi.setPriority(4);
}
//header:
void EncodedMessage::InitialScan::onDurable(bool b) { mi.setDurable(b); em.durable = b; }
void EncodedMessage::InitialScan::onPriority(uint8_t i) { mi.setPriority(i); em.priority = i; }
void EncodedMessage::InitialScan::onTtl(uint32_t i) { mi.setTtl(i); em.ttl = i; }
void EncodedMessage::InitialScan::onFirstAcquirer(bool b) { em.firstAcquirer = b; }
void EncodedMessage::InitialScan::onDeliveryCount(uint32_t i)
{
mi.setRedelivered(i);
em.deliveryCount = i;
}
//properties:
void EncodedMessage::InitialScan::onMessageId(uint64_t v) { em.messageId.set(v); }
void EncodedMessage::InitialScan::onMessageId(const qpid::amqp::CharSequence& v, qpid::types::VariantType t) { em.messageId.set(v, t); }
void EncodedMessage::InitialScan::onUserId(const qpid::amqp::CharSequence& v) { em.userId = v; }
void EncodedMessage::InitialScan::onTo(const qpid::amqp::CharSequence& v) { em.to = v; }
void EncodedMessage::InitialScan::onSubject(const qpid::amqp::CharSequence& v) { em.subject = v; }
void EncodedMessage::InitialScan::onReplyTo(const qpid::amqp::CharSequence& v) { em.replyTo = v;}
void EncodedMessage::InitialScan::onCorrelationId(uint64_t v) { em.correlationId.set(v); }
void EncodedMessage::InitialScan::onCorrelationId(const qpid::amqp::CharSequence& v, qpid::types::VariantType t) { em.correlationId.set(v, t); }
void EncodedMessage::InitialScan::onContentType(const qpid::amqp::CharSequence& v) { em.contentType = v; }
void EncodedMessage::InitialScan::onContentEncoding(const qpid::amqp::CharSequence& v) { em.contentEncoding = v; }
void EncodedMessage::InitialScan::onAbsoluteExpiryTime(int64_t i) { em.absoluteExpiryTime = i; }
void EncodedMessage::InitialScan::onCreationTime(int64_t i) { em.creationTime = i; }
void EncodedMessage::InitialScan::onGroupId(const qpid::amqp::CharSequence& v) { em.groupId = v; }
void EncodedMessage::InitialScan::onGroupSequence(uint32_t i) { em.groupSequence = i; }
void EncodedMessage::InitialScan::onReplyToGroupId(const qpid::amqp::CharSequence& v) { em.replyToGroupId = v; }
void EncodedMessage::InitialScan::onApplicationProperties(const qpid::amqp::CharSequence& v) { em.applicationProperties = v; }
void EncodedMessage::InitialScan::onDeliveryAnnotations(const qpid::amqp::CharSequence& v) { em.deliveryAnnotations = v; }
void EncodedMessage::InitialScan::onMessageAnnotations(const qpid::amqp::CharSequence& v) { em.messageAnnotations = v; }
void EncodedMessage::InitialScan::onBody(const qpid::amqp::CharSequence& v, const qpid::amqp::Descriptor&)
{
//TODO: how to communicate the type, i.e. descriptor?
em.body = v;
}
void EncodedMessage::InitialScan::onBody(const qpid::types::Variant&, const qpid::amqp::Descriptor&) {}
void EncodedMessage::InitialScan::onFooter(const qpid::amqp::CharSequence& v) { em.footer = v; }
}}} // namespace qpid::messaging::amqp