blob: 852ad296352b08986dc977e72d0ad8318f3c312a [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/amqp/MessageEncoder.h"
#include "qpid/amqp/descriptors.h"
#include "qpid/log/Statement.h"
namespace qpid {
namespace amqp {
namespace {
size_t optimisable(const MessageEncoder::Header& msg)
{
if (msg.getDeliveryCount()) return 5;
else if (msg.isFirstAcquirer()) return 4;
else if (msg.hasTtl()) return 3;
else if (msg.getPriority() != 4) return 2;
else if (msg.isDurable()) return 1;
else return 0;
}
size_t optimisable(const MessageEncoder::Properties& msg)
{
if (msg.hasReplyToGroupId()) return 13;
else if (msg.hasGroupSequence()) return 12;
else if (msg.hasGroupId()) return 11;
else if (msg.hasCreationTime()) return 10;
else if (msg.hasAbsoluteExpiryTime()) return 9;
else if (msg.hasContentEncoding()) return 8;
else if (msg.hasContentType()) return 7;
else if (msg.hasCorrelationId()) return 6;
else if (msg.hasReplyTo()) return 5;
else if (msg.hasSubject()) return 4;
else if (msg.hasTo()) return 3;
else if (msg.hasUserId()) return 2;
else if (msg.hasMessageId()) return 1;
else return 0;
}
size_t encodedSize(const std::string& s)
{
size_t total = s.size();
if (total > 255) total += 4;
else total += 1;
return total;
}
const std::string BINARY("binary");
}
void MessageEncoder::writeHeader(const Header& msg)
{
size_t fields(optimise ? optimisable(msg) : 5);
if (fields) {
void* token = startList8(&qpid::amqp::message::HEADER);
writeBoolean(msg.isDurable());
if (fields > 1) writeUByte(msg.getPriority());
if (msg.getTtl()) writeUInt(msg.getTtl());
else if (fields > 2) writeNull();
if (msg.isFirstAcquirer()) writeBoolean(true);
else if (fields > 3) writeNull();
if (msg.getDeliveryCount()) writeUInt(msg.getDeliveryCount());
else if (fields > 4) writeNull();
endList8(fields, token);
}
}
void MessageEncoder::writeProperties(const Properties& msg)
{
size_t fields(optimise ? optimisable(msg) : 13);
if (fields) {
void* token = startList32(&qpid::amqp::message::PROPERTIES);
if (msg.hasMessageId()) writeString(msg.getMessageId());
else writeNull();
if (msg.hasUserId()) writeBinary(msg.getUserId());
else if (fields > 1) writeNull();
if (msg.hasTo()) writeString(msg.getTo());
else if (fields > 2) writeNull();
if (msg.hasSubject()) writeString(msg.getSubject());
else if (fields > 3) writeNull();
if (msg.hasReplyTo()) writeString(msg.getReplyTo());
else if (fields > 4) writeNull();
if (msg.hasCorrelationId()) writeString(msg.getCorrelationId());
else if (fields > 5) writeNull();
if (msg.hasContentType()) writeSymbol(msg.getContentType());
else if (fields > 6) writeNull();
if (msg.hasContentEncoding()) writeSymbol(msg.getContentEncoding());
else if (fields > 7) writeNull();
if (msg.hasAbsoluteExpiryTime()) writeLong(msg.getAbsoluteExpiryTime());
else if (fields > 8) writeNull();
if (msg.hasCreationTime()) writeLong(msg.getCreationTime());
else if (fields > 9) writeNull();
if (msg.hasGroupId()) writeString(msg.getGroupId());
else if (fields > 10) writeNull();
if (msg.hasGroupSequence()) writeUInt(msg.getGroupSequence());
else if (fields > 11) writeNull();
if (msg.hasReplyToGroupId()) writeString(msg.getReplyToGroupId());
else if (fields > 12) writeNull();
endList32(fields, token);
}
}
void MessageEncoder::writeApplicationProperties(const qpid::types::Variant::Map& properties)
{
writeApplicationProperties(properties, !optimise || properties.size()*2 > 255 || getEncodedSizeForElements(properties) > 255);
}
void MessageEncoder::writeApplicationProperties(const qpid::types::Variant::Map& properties, bool large)
{
writeMap(properties, &qpid::amqp::message::APPLICATION_PROPERTIES, large);
}
void MessageEncoder::writeMap(const qpid::types::Variant::Map& properties, const Descriptor* d, bool large)
{
void* token = large ? startMap32(d) : startMap8(d);
for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
writeString(i->first);
switch (i->second.getType()) {
case qpid::types::VAR_MAP:
case qpid::types::VAR_LIST:
//not allowed (TODO: revise, only strictly true for application-properties) whereas this is now a more general method)
QPID_LOG(warning, "Ignoring nested map/list; not allowed in application-properties for AMQP 1.0");
case qpid::types::VAR_VOID:
writeNull();
break;
case qpid::types::VAR_BOOL:
writeBoolean(i->second);
break;
case qpid::types::VAR_UINT8:
writeUByte(i->second);
break;
case qpid::types::VAR_UINT16:
writeUShort(i->second);
break;
case qpid::types::VAR_UINT32:
writeUInt(i->second);
break;
case qpid::types::VAR_UINT64:
writeULong(i->second);
break;
case qpid::types::VAR_INT8:
writeByte(i->second);
break;
case qpid::types::VAR_INT16:
writeShort(i->second);
break;
case qpid::types::VAR_INT32:
writeInt(i->second);
break;
case qpid::types::VAR_INT64:
writeULong(i->second);
break;
case qpid::types::VAR_FLOAT:
writeFloat(i->second);
break;
case qpid::types::VAR_DOUBLE:
writeDouble(i->second);
break;
case qpid::types::VAR_STRING:
if (i->second.getEncoding() == BINARY) {
writeBinary(i->second);
} else {
writeString(i->second);
}
break;
case qpid::types::VAR_UUID:
writeUuid(i->second);
break;
}
}
if (large) endMap32(properties.size()*2, token);
else endMap8(properties.size()*2, token);
}
size_t MessageEncoder::getEncodedSize(const Header& h, const Properties& p, const qpid::types::Variant::Map& ap, const std::string& d)
{
//NOTE: this does not take optional optimisation into account,
//i.e. it is a 'worst case' estimate for required buffer space
size_t total(0);
//header:
total += 3/*descriptor*/ + 1/*code*/ + 1/*size*/ + 1/*count*/ + 5/*codes for each field*/;
if (h.getPriority() != 4) total += 1;
if (h.getDeliveryCount()) total += 4;
if (h.hasTtl()) total += 4;
return total + getEncodedSize(p, ap, d);
}
size_t MessageEncoder::getEncodedSize(const Properties& p, const qpid::types::Variant::Map& ap, const std::string& d)
{
//NOTE: this does not take optional optimisation into account,
//i.e. it is a 'worst case' estimate for required buffer space
size_t total(0);
//properties:
total += 3/*descriptor*/ + 1/*code*/ + 4/*size*/ + 4/*count*/ + 13/*codes for each field*/;
if (p.hasMessageId()) total += encodedSize(p.getMessageId());
if (p.hasUserId()) total += encodedSize(p.getUserId());
if (p.hasTo()) total += encodedSize(p.getTo());
if (p.hasSubject()) total += encodedSize(p.getSubject());
if (p.hasReplyTo()) total += encodedSize(p.getReplyTo());
if (p.hasCorrelationId()) total += encodedSize(p.getCorrelationId());
if (p.hasContentType()) total += encodedSize(p.getContentType());
if (p.hasContentEncoding()) total += encodedSize(p.getContentEncoding());
if (p.hasAbsoluteExpiryTime()) total += 8;
if (p.hasCreationTime()) total += 8;
if (p.hasGroupId()) total += encodedSize(p.getGroupId());
if (p.hasGroupSequence()) total += 4;
if (p.hasReplyToGroupId()) total += encodedSize(p.getReplyToGroupId());
//application-properties:
total += 3/*descriptor*/ + getEncodedSize(ap, true);
//body:
if (d.size()) total += 3/*descriptor*/ + 1/*code*/ + encodedSize(d);
return total;
}
size_t MessageEncoder::getEncodedSizeForElements(const qpid::types::Variant::Map& map)
{
size_t total = 0;
for (qpid::types::Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
total += 1/*code*/ + encodedSize(i->first);
switch (i->second.getType()) {
case qpid::types::VAR_MAP:
case qpid::types::VAR_LIST:
case qpid::types::VAR_VOID:
case qpid::types::VAR_BOOL:
total += 1;
break;
case qpid::types::VAR_UINT8:
case qpid::types::VAR_INT8:
total += 2;
break;
case qpid::types::VAR_UINT16:
case qpid::types::VAR_INT16:
total += 3;
break;
case qpid::types::VAR_UINT32:
case qpid::types::VAR_INT32:
case qpid::types::VAR_FLOAT:
total += 5;
break;
case qpid::types::VAR_UINT64:
case qpid::types::VAR_INT64:
case qpid::types::VAR_DOUBLE:
total += 9;
break;
case qpid::types::VAR_UUID:
total += 17;
break;
case qpid::types::VAR_STRING:
total += 1/*code*/ + encodedSize(i->second);
break;
}
}
return total;
}
size_t MessageEncoder::getEncodedSize(const qpid::types::Variant::Map& map, bool alwaysUseLargeMap)
{
size_t total = getEncodedSizeForElements(map);
//its not just the count that determines whether we can use a small map, but the aggregate size:
if (alwaysUseLargeMap || map.size()*2 > 255 || total > 255) total += 4/*size*/ + 4/*count*/;
else total += 1/*size*/ + 1/*count*/;
total += 1 /*code for map itself*/;
return total;
}
}} // namespace qpid::amqp