blob: ca2094b965c0a59891999ebbb66ed6fe14609ec8 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/broker/amqp/Translation.h"
#include "qpid/broker/amqp/Outgoing.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/amqp/Decoder.h"
#include "qpid/amqp/descriptors.h"
#include "qpid/amqp/MessageEncoder.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/types/Variant.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
#include <boost/lexical_cast.hpp>
namespace qpid {
namespace broker {
namespace amqp {
namespace {
const std::string EMPTY;
const std::string FORWARD_SLASH("/");
std::string translate(const qpid::framing::ReplyTo r)
{
if (r.hasExchange()) {
if (r.hasRoutingKey()) return r.getExchange() + FORWARD_SLASH + r.getRoutingKey();
else return r.getExchange();
} else return r.getRoutingKey();
}
std::string translate(const qpid::amqp::CharSequence& chars)
{
if (chars.data && chars.size) return std::string(chars.data, chars.size);
else return EMPTY;
}
bool setMessageId(qpid::framing::MessageProperties& m, const qpid::amqp::CharSequence& chars)
{
if (chars.data && chars.size) {
if (chars.size == 16) {
m.setMessageId(qpid::framing::Uuid(chars.data));
return true;
} else {
std::istringstream in(translate(chars));
qpid::framing::Uuid uuid;
in >> uuid;
if (!in.fail()) {
m.setMessageId(uuid);
return true;
}
}
}
return false;
}
class Properties_0_10 : public qpid::amqp::MessageEncoder::Properties
{
public:
bool hasMessageId() const { return messageProperties && messageProperties->hasMessageId(); }
std::string getMessageId() const { return messageProperties ? messageProperties->getMessageId().str() : EMPTY; }
bool hasUserId() const { return messageProperties && messageProperties->hasUserId(); }
std::string getUserId() const { return messageProperties ? messageProperties->getUserId() : EMPTY; }
bool hasTo() const { return getDestination().size() || hasSubject(); }
std::string getTo() const { return getDestination().size() ? getDestination() : getSubject(); }
bool hasSubject() const { return deliveryProperties && getDestination().size() && deliveryProperties->hasRoutingKey(); }
std::string getSubject() const { return deliveryProperties && getDestination().size() ? deliveryProperties->getRoutingKey() : EMPTY; }
bool hasReplyTo() const { return messageProperties && messageProperties->hasReplyTo(); }
std::string getReplyTo() const { return messageProperties ? translate(messageProperties->getReplyTo()) : EMPTY; }
bool hasCorrelationId() const { return messageProperties && messageProperties->hasCorrelationId(); }
std::string getCorrelationId() const { return messageProperties ? messageProperties->getCorrelationId() : EMPTY; }
bool hasContentType() const { return messageProperties && messageProperties->hasContentType(); }
std::string getContentType() const { return messageProperties ? messageProperties->getContentType() : EMPTY; }
bool hasContentEncoding() const { return messageProperties && messageProperties->hasContentEncoding(); }
std::string getContentEncoding() const { return messageProperties ? messageProperties->getContentEncoding() : EMPTY; }
bool hasAbsoluteExpiryTime() const { return deliveryProperties && deliveryProperties->hasExpiration(); }
int64_t getAbsoluteExpiryTime() const { return deliveryProperties ? deliveryProperties->getExpiration() : 0; }
bool hasCreationTime() const { return false; }
int64_t getCreationTime() const { return 0; }
bool hasGroupId() const {return false; }
std::string getGroupId() const { return EMPTY; }
bool hasGroupSequence() const { return false; }
uint32_t getGroupSequence() const { return 0; }
bool hasReplyToGroupId() const { return false; }
std::string getReplyToGroupId() const { return EMPTY; }
const qpid::framing::FieldTable& getApplicationProperties() { return messageProperties->getApplicationHeaders(); }
Properties_0_10(const qpid::broker::amqp_0_10::MessageTransfer& t) : transfer(t),
messageProperties(transfer.getProperties<qpid::framing::MessageProperties>()),
deliveryProperties(transfer.getProperties<qpid::framing::DeliveryProperties>())
{}
private:
const qpid::broker::amqp_0_10::MessageTransfer& transfer;
const qpid::framing::MessageProperties* messageProperties;
const qpid::framing::DeliveryProperties* deliveryProperties;
std::string getDestination() const
{
return transfer.getMethod<qpid::framing::MessageTransferBody>()->getDestination();
}
};
}
Translation::Translation(const qpid::broker::Message& m) : original(m) {}
boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation::getTransfer()
{
boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> t =
boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer>(dynamic_cast<const qpid::broker::amqp_0_10::MessageTransfer*>(&original.getEncoding()));
if (t) {
return t;//no translation required
} else {
const Message* message = dynamic_cast<const Message*>(&original.getEncoding());
if (message) {
//translate 1.0 message into 0-10
boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
qpid::framing::AMQFrame method((qpid::framing::MessageTransferBody(qpid::framing::ProtocolVersion(), EMPTY, 0, 0)));
qpid::framing::AMQFrame header((qpid::framing::AMQHeaderBody()));
qpid::framing::AMQFrame content((qpid::framing::AMQContentBody()));
method.setEof(false);
header.setBof(false);
header.setEof(false);
content.setBof(false);
transfer->getFrames().append(method);
transfer->getFrames().append(header);
qpid::amqp::CharSequence body = message->getBody();
content.castBody<qpid::framing::AMQContentBody>()->getData().assign(body.data, body.size);
transfer->getFrames().append(content);
qpid::framing::MessageProperties* props =
transfer->getFrames().getHeaders()->get<qpid::framing::MessageProperties>(true);
props->setContentLength(body.size);
qpid::amqp::MessageId mid = message->getMessageId();
qpid::framing::Uuid uuid;
switch (mid.type) {
case qpid::amqp::MessageId::UUID:
case qpid::amqp::MessageId::BYTES:
if (mid.value.bytes.size == 0) break;
if (setMessageId(*props, mid.value.bytes)) break;
case qpid::amqp::MessageId::ULONG:
QPID_LOG(info, "Skipping message id in translation from 1.0 to 0-10 as it is not a UUID");
break;
}
qpid::amqp::MessageId cid = message->getCorrelationId();
switch (cid.type) {
case qpid::amqp::MessageId::UUID:
assert(cid.value.bytes.size = 16);
props->setCorrelationId(qpid::framing::Uuid(cid.value.bytes.data).str());
break;
case qpid::amqp::MessageId::BYTES:
if (cid.value.bytes.size) {
props->setCorrelationId(translate(cid.value.bytes));
}
break;
case qpid::amqp::MessageId::ULONG:
props->setCorrelationId(boost::lexical_cast<std::string>(cid.value.ulong));
break;
}
// TODO: ReplyTo - there is no way to reliably determine
// the type of the node from just its name, unless we
// query the brokers registries
if (message->getContentType()) props->setContentType(translate(message->getContentType()));
if (message->getContentEncoding()) props->setContentEncoding(translate(message->getContentEncoding()));
props->setUserId(message->getUserId());
// TODO: FieldTable applicationHeaders;
qpid::amqp::CharSequence ap = message->getApplicationProperties();
if (ap) {
qpid::amqp::Decoder d(ap.data, ap.size);
qpid::amqp_0_10::translate(d.readMap(), props->getApplicationHeaders());
}
qpid::framing::DeliveryProperties* dp =
transfer->getFrames().getHeaders()->get<qpid::framing::DeliveryProperties>(true);
dp->setPriority(message->getPriority());
if (message->isPersistent()) dp->setDeliveryMode(2);
if (message->getRoutingKey().size()) dp->setRoutingKey(message->getRoutingKey());
return transfer.get();
} else {
throw qpid::Exception("Could not write message data in AMQP 0-10 format");
}
}
}
void Translation::write(Outgoing& out)
{
const Message* message = dynamic_cast<const Message*>(&original.getEncoding());
if (message) {
//write annotations
//TODO: merge in any newly added annotations
qpid::amqp::CharSequence deliveryAnnotations = message->getDeliveryAnnotations();
qpid::amqp::CharSequence messageAnnotations = message->getMessageAnnotations();
if (deliveryAnnotations.size) out.write(deliveryAnnotations.data, deliveryAnnotations.size);
if (messageAnnotations.size) out.write(messageAnnotations.data, messageAnnotations.size);
//write bare message
qpid::amqp::CharSequence bareMessage = message->getBareMessage();
if (bareMessage.size) out.write(bareMessage.data, bareMessage.size);
//write footer:
qpid::amqp::CharSequence footer = message->getFooter();
if (footer.size) out.write(footer.data, footer.size);
} else {
const qpid::broker::amqp_0_10::MessageTransfer* transfer = dynamic_cast<const qpid::broker::amqp_0_10::MessageTransfer*>(&original.getEncoding());
if (transfer) {
Properties_0_10 properties(*transfer);
qpid::types::Variant::Map applicationProperties;
qpid::amqp_0_10::translate(properties.getApplicationProperties(), applicationProperties);
std::string content = transfer->getContent();
size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties, applicationProperties, content);
std::vector<char> buffer(size);
qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
encoder.writeProperties(properties);
encoder.writeApplicationProperties(applicationProperties);
encoder.writeBinary(content, &qpid::amqp::message::DATA);
out.write(&buffer[0], encoder.getPosition());
} else {
QPID_LOG(error, "Could not write message data in AMQP 1.0 format");
}
}
}
}}} // namespace qpid::broker::amqp