blob: 7ae5e2acef9db9ca6bc2d900c5ae35bc006d5df8 [file]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "SenderContext.h"
#include "Transaction.h"
#include "EncodedMessage.h"
#include "PnData.h"
#include "util.h"
#include "qpid/messaging/AddressImpl.h"
#include "qpid/messaging/exceptions.h"
#include "qpid/Exception.h"
#include "qpid/amqp/descriptors.h"
#include "qpid/amqp/MapHandler.h"
#include "qpid/amqp/MessageEncoder.h"
#include "qpid/messaging/exceptions.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/MessageImpl.h"
#include "qpid/log/Statement.h"
#include "config.h"
extern "C" {
#include <proton/engine.h>
}
#include <boost/shared_ptr.hpp>
#include <string.h>
namespace qpid {
namespace messaging {
MessageReleased::MessageReleased(const std::string& msg) : SendError(msg) {}
namespace amqp {
SenderOptions::SenderOptions(bool setToOnSend_, uint32_t maxDeliveryAttempts_, bool raiseRejected_, const qpid::sys::Duration& d)
: setToOnSend(setToOnSend_), maxDeliveryAttempts(maxDeliveryAttempts_), raiseRejected(raiseRejected_), redeliveryTimeout(d) {}
//TODO: proper conversion to wide string for address
SenderContext::SenderContext(pn_session_t* session, const std::string& n,
const qpid::messaging::Address& a,
const SenderOptions& o,
const CoordinatorPtr& coord)
: sender(pn_sender(session, n.c_str())),
name(n),
address(a),
helper(address),
nextId(0), capacity(50), unreliable(helper.isUnreliable()),
options(o),
transaction(coord)
{}
SenderContext::~SenderContext()
{
if (!error && sender) pn_link_free(sender);
}
void SenderContext::close()
{
error.raise();
if (sender) pn_link_close(sender);
}
void SenderContext::setCapacity(uint32_t c)
{
error.raise();
if (c < deliveries.size()) throw qpid::messaging::SenderError("Desired capacity is less than unsettled message count!");
capacity = c;
}
uint32_t SenderContext::getCapacity()
{
return capacity;
}
uint32_t SenderContext::getUnsettled()
{
error.raise();
return processUnsettled(true/*always allow retrieval of unsettled count, even if link has failed*/);
}
const std::string& SenderContext::getName() const
{
return name;
}
const std::string& SenderContext::getTarget() const
{
return address.getName();
}
bool SenderContext::send(const qpid::messaging::Message& message, SenderContext::Delivery** out)
{
error.raise();
resend();//if there are any messages needing to be resent at the front of the queue, send them first
if (processUnsettled(false) < capacity && pn_link_credit(sender)) {
types::Variant state;
if (transaction)
state = transaction->getSendState();
if (unreliable) {
Delivery delivery(nextId++);
delivery.encode(MessageImplAccess::get(message), address, options.setToOnSend);
delivery.send(sender, unreliable, state);
*out = 0;
return true;
} else {
deliveries.push_back(Delivery(nextId++, options.maxDeliveryAttempts, options.redeliveryTimeout));
try {
Delivery& delivery = deliveries.back();
delivery.encode(MessageImplAccess::get(message), address, options.setToOnSend);
delivery.send(sender, unreliable, state);
*out = &delivery;
return true;
} catch (const std::exception& e) {
deliveries.pop_back();
--nextId;
throw SendError(e.what());
}
}
} else {
return false;
}
}
void SenderContext::check()
{
error.raise();
if (pn_link_state(sender) & PN_REMOTE_CLOSED && !(pn_link_state(sender) & PN_LOCAL_CLOSED)) {
std::string text = get_error_string(pn_link_remote_condition(sender), "Link detached by peer");
pn_link_close(sender);
throw qpid::messaging::LinkError(text);
}
}
uint32_t SenderContext::processUnsettled(bool silent)
{
error.raise();
if (!silent) {
check();
}
bool resend_required = false;
//remove messages from front of deque once peer has confirmed receipt
while (!deliveries.empty() && !(pn_link_state(sender) & PN_REMOTE_CLOSED)) {
try {
if (deliveries.front().delivered()) {
deliveries.front().settle();
deliveries.pop_front();
} else {
break;
}
} catch (const MessageReleased& e) {
//mark it eligible for resending,
deliveries.front().settleAndReset();
//and move it to the back
deliveries.push_back(deliveries.front());
deliveries.pop_front();
resend_required = true;
} catch (const MessageRejected& e) {
deliveries.front().settle();
if (options.raiseRejected) {
QPID_LOG(info, e.what());
throw;
} else {
QPID_LOG(warning, e.what());
deliveries.pop_front();
}
}
}
if (resend_required) resend();
return deliveries.size();
}
namespace {
const std::string X_AMQP("x-amqp-");
const std::string X_AMQP_FIRST_ACQUIRER("x-amqp-first-acquirer");
const std::string X_AMQP_DELIVERY_COUNT("x-amqp-delivery-count");
const std::string X_AMQP_0_10_APP_ID("x-amqp-0-10.app-id");
class HeaderAdapter : public qpid::amqp::MessageEncoder::Header
{
public:
HeaderAdapter(const qpid::messaging::MessageImpl& impl) : msg(impl), headers(msg.getHeaders()) {}
virtual bool isDurable() const
{
return msg.isDurable();
}
virtual uint8_t getPriority() const
{
return msg.getPriority();
}
virtual bool hasTtl() const
{
return msg.getTtl();
}
virtual uint32_t getTtl() const
{
return msg.getTtl();
}
virtual bool isFirstAcquirer() const
{
qpid::types::Variant::Map::const_iterator i = headers.find(X_AMQP_FIRST_ACQUIRER);
if (i != headers.end()) {
return i->second;
} else {
return false;
}
}
virtual uint32_t getDeliveryCount() const
{
qpid::types::Variant::Map::const_iterator i = headers.find(X_AMQP_DELIVERY_COUNT);
if (i != headers.end()) {
return i->second;
} else {
return msg.isRedelivered() ? 1 : 0;
}
}
private:
const qpid::messaging::MessageImpl& msg;
const qpid::types::Variant::Map& headers;
};
const std::string EMPTY;
const std::string FORWARD_SLASH("/");
const std::string X_AMQP_TO("x-amqp-to");
const std::string X_AMQP_CONTENT_ENCODING("x-amqp-content-encoding");
const std::string X_AMQP_CREATION_TIME("x-amqp-creation-time");
const std::string X_AMQP_ABSOLUTE_EXPIRY_TIME("x-amqp-absolute-expiry-time");
const std::string X_AMQP_GROUP_ID("x-amqp-group-id");
const std::string X_AMQP_GROUP_SEQUENCE("x-amqp-group-sequence");
const std::string X_AMQP_REPLY_TO_GROUP_ID("x-amqp-reply-to-group-id");
const std::string X_AMQP_MESSAGE_ANNOTATIONS("x-amqp-message-annotations");
const std::string X_AMQP_DELIVERY_ANNOTATIONS("x-amqp-delivery-annotations");
class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties
{
public:
PropertiesAdapter(const qpid::messaging::MessageImpl& impl, const std::string& s, const std::string& t) : msg(impl), headers(msg.getHeaders()), subject(s), to(t) {}
bool hasMessageId() const
{
return getMessageId().size();
}
std::string getMessageId() const
{
return msg.getMessageId();
}
bool hasUserId() const
{
return getUserId().size();
}
std::string getUserId() const
{
return msg.getUserId();
}
bool hasTo() const
{
return hasHeader(X_AMQP_TO) || !to.empty();
}
std::string getTo() const
{
qpid::types::Variant::Map::const_iterator i = headers.find(X_AMQP_TO);
if (i == headers.end()) return to;
else return i->second;
}
bool hasSubject() const
{
return subject.size() || getSubject().size();
}
std::string getSubject() const
{
return subject.size() ? subject : msg.getSubject();
}
bool hasReplyTo() const
{
return msg.getReplyTo();
}
std::string getReplyTo() const
{
Address a = msg.getReplyTo();
if (a.getSubject().size()) {
return a.getName() + FORWARD_SLASH + a.getSubject();
} else {
return a.getName();
}
}
bool hasCorrelationId() const
{
return getCorrelationId().size();
}
std::string getCorrelationId() const
{
return msg.getCorrelationId();
}
bool hasContentType() const
{
return getContentType().size();
}
std::string getContentType() const
{
return msg.getContentType();
}
bool hasContentEncoding() const
{
return hasHeader(X_AMQP_CONTENT_ENCODING);
}
std::string getContentEncoding() const
{
return headers.find(X_AMQP_CONTENT_ENCODING)->second;
}
bool hasAbsoluteExpiryTime() const
{
return hasHeader(X_AMQP_ABSOLUTE_EXPIRY_TIME);
}
int64_t getAbsoluteExpiryTime() const
{
return headers.find(X_AMQP_ABSOLUTE_EXPIRY_TIME)->second;
}
bool hasCreationTime() const
{
return hasHeader(X_AMQP_CREATION_TIME);
}
int64_t getCreationTime() const
{
return headers.find(X_AMQP_CREATION_TIME)->second;
}
bool hasGroupId() const
{
return hasHeader(X_AMQP_GROUP_ID);
}
std::string getGroupId() const
{
return headers.find(X_AMQP_GROUP_ID)->second;
}
bool hasGroupSequence() const
{
return hasHeader(X_AMQP_GROUP_SEQUENCE);
}
uint32_t getGroupSequence() const
{
return headers.find(X_AMQP_GROUP_SEQUENCE)->second;
}
bool hasReplyToGroupId() const
{
return hasHeader(X_AMQP_REPLY_TO_GROUP_ID);
}
std::string getReplyToGroupId() const
{
return headers.find(X_AMQP_REPLY_TO_GROUP_ID)->second;
}
private:
const qpid::messaging::MessageImpl& msg;
const qpid::types::Variant::Map& headers;
const std::string subject;
const std::string to;
bool hasHeader(const std::string& key) const
{
return headers.find(key) != headers.end();
}
};
bool startsWith(const std::string& input, const std::string& pattern)
{
if (input.size() < pattern.size()) return false;
for (std::string::const_iterator b = pattern.begin(), a = input.begin(); b != pattern.end(); ++b, ++a) {
if (*a != *b) return false;
}
return true;
}
class ApplicationPropertiesAdapter : public qpid::amqp::MessageEncoder::ApplicationProperties
{
public:
ApplicationPropertiesAdapter(const qpid::types::Variant::Map& h) : headers(h) {}
void handle(qpid::amqp::MapHandler& h) const
{
for (qpid::types::Variant::Map::const_iterator i = headers.begin(); i != headers.end(); ++i) {
//strip out values with special keys as they are sent in standard fields
if (!startsWith(i->first, X_AMQP) || i->first == X_AMQP_0_10_APP_ID) {
qpid::amqp::CharSequence key(convert(i->first));
switch (i->second.getType()) {
case qpid::types::VAR_VOID:
h.handleVoid(key);
break;
case qpid::types::VAR_BOOL:
h.handleBool(key, i->second);
break;
case qpid::types::VAR_UINT8:
h.handleUint8(key, i->second);
break;
case qpid::types::VAR_UINT16:
h.handleUint16(key, i->second);
break;
case qpid::types::VAR_UINT32:
h.handleUint32(key, i->second);
break;
case qpid::types::VAR_UINT64:
h.handleUint64(key, i->second);
break;
case qpid::types::VAR_INT8:
h.handleInt8(key, i->second);
break;
case qpid::types::VAR_INT16:
h.handleInt16(key, i->second);
break;
case qpid::types::VAR_INT32:
h.handleInt32(key, i->second);
break;
case qpid::types::VAR_INT64:
h.handleInt64(key, i->second);
break;
case qpid::types::VAR_FLOAT:
h.handleFloat(key, i->second);
break;
case qpid::types::VAR_DOUBLE:
h.handleDouble(key, i->second);
break;
case qpid::types::VAR_STRING:
h.handleString(key, convert(i->second), convert(i->second.getEncoding()));
break;
case qpid::types::VAR_UUID:
QPID_LOG(warning, "Skipping UUID in application properties; not yet handled correctly.");
break;
case qpid::types::VAR_MAP:
case qpid::types::VAR_LIST:
QPID_LOG(warning, "Skipping nested list and map; not allowed in application properties.");
break;
}
}
}
}
private:
const qpid::types::Variant::Map& headers;
static qpid::amqp::CharSequence convert(const std::string& in)
{
qpid::amqp::CharSequence out;
out.data = in.data();
out.size = in.size();
return out;
}
};
bool changedSubject(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address)
{
return address.getSubject().size() && address.getSubject() != msg.getSubject();
}
}
namespace{
qpid::sys::AbsTime until(const qpid::sys::Duration& d)
{
return d ? qpid::sys::AbsTime(qpid::sys::now(), d) : qpid::sys::FAR_FUTURE;
}
}
SenderContext::Delivery::Delivery(int32_t i, const uint32_t max_attempts_, const qpid::sys::Duration& max_time) :
id(i), token(0), settled(false), attempts(0), max_attempts(max_attempts_),
retry_until(until(max_time)) {}
void SenderContext::Delivery::reset()
{
token = 0;
}
void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address, bool setToField)
{
try {
boost::shared_ptr<const EncodedMessage> original = msg.getEncoded();
if (original && !changedSubject(msg, address)) { //still have the content as received, send at least the bare message unaltered
//do we need to alter the header? are durable, priority, ttl, first-acquirer, delivery-count different from what was received?
if (original->hasHeaderChanged(msg)) {
//since as yet have no annotations, just write the revised header then the rest of the message as received
encoded.resize(16/*max header size*/ + original->getBareMessage().size);
qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize());
HeaderAdapter header(msg);
encoder.writeHeader(header);
::memcpy(encoded.getData() + encoder.getPosition(), original->getBareMessage().data, original->getBareMessage().size);
} else {
//since as yet have no annotations, if the header hasn't
//changed and we still have the original bare message, can
//send the entire content as is
encoded.resize(original->getSize());
::memcpy(encoded.getData(), original->getData(), original->getSize());
}
} else {
HeaderAdapter header(msg);
PropertiesAdapter properties(msg, address.getSubject(), setToField ? address.getName() : EMPTY);
ApplicationPropertiesAdapter applicationProperties(msg.getHeaders());
//compute size:
size_t contentSize = qpid::amqp::MessageEncoder::getEncodedSize(header)
+ qpid::amqp::MessageEncoder::getEncodedSize(properties)
+ qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties);
if (msg.getContent().isVoid()) {
contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForContent(msg.getBytes());
} else {
contentSize += qpid::amqp::MessageEncoder::getEncodedSizeForValue(msg.getContent()) + 3/*descriptor*/;
}
encoded.resize(contentSize);
QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes")
qpid::amqp::MessageEncoder encoder(encoded.getData(), encoded.getSize());
//write header:
encoder.writeHeader(header);
//write delivery-annotations, write message-annotations (none yet supported)
//write properties
encoder.writeProperties(properties);
//write application-properties
encoder.writeApplicationProperties(applicationProperties);
//write body
if (!msg.getContent().isVoid()) {
//write as AmqpValue
encoder.writeValue(msg.getContent(), &qpid::amqp::message::AMQP_VALUE);
} else if (msg.getBytes().size()) {
encoder.writeBinary(msg.getBytes(), &qpid::amqp::message::DATA);//structured content not yet directly supported
}
if (encoder.getPosition() < encoded.getSize()) {
QPID_LOG(debug, "Trimming buffer from " << encoded.getSize() << " to " << encoder.getPosition());
encoded.trim(encoder.getPosition());
}
//write footer (no annotations yet supported)
}
} catch (const qpid::Exception& e) {
throw SendError(e.what());
}
}
void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable, const types::Variant& state)
{
++attempts;
pn_delivery_tag_t tag;
tag.size = sizeof(id);
#ifdef NO_PROTON_DELIVERY_TAG_T
tag.start = reinterpret_cast<const char*>(&id);
#else
tag.bytes = reinterpret_cast<const char*>(&id);
#endif
token = pn_delivery(sender, tag);
if (!state.isVoid()) { // Add transaction state
PnData data(pn_disposition_data(pn_delivery_local(token)));
data.put(state);
pn_delivery_update(token, qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE);
}
pn_link_send(sender, encoded.getData(), encoded.getSize());
if (unreliable) {
settle();
}
pn_link_advance(sender);
}
bool SenderContext::Delivery::sent() const
{
return settled || token;
}
bool SenderContext::Delivery::delivered()
{
if (settled) {
return true;
} else if (token && (pn_delivery_remote_state(token) || pn_delivery_settled(token))) {
if (delivery_refused()) {
throw MessageRejected(Msg() << "delivery " << id << " refused: " << getStatus());
} else if (not_delivered()) {
if (max_attempts && (attempts >= max_attempts)) {
throw MessageRejected(Msg() << "delivery " << id << " cannot be delivered after " << attempts << " attempts");
} else if (qpid::sys::now() > retry_until) {
throw MessageRejected(Msg() << "delivery " << id << " cannot be delivered, timed out after " << attempts << " attempts");
} else {
std::string status = getStatus();
if (max_attempts) {
QPID_LOG(info, "delivery " << id << " failed attempt " << attempts << " of " << max_attempts << ": " << status);
} else {
QPID_LOG(info, "delivery " << id << " was not successful: " << status);
}
throw MessageReleased(Msg() << "delivery " << id << " was not successful: " << status);
}
} else if (!accepted()) {
QPID_LOG(info, "delivery " << id << " was not accepted by peer");
}
return true;
} else {
return false;
}
}
std::string SenderContext::Delivery::getStatus()
{
if (rejected()) {
pn_disposition_t* d = token ? pn_delivery_remote(token) : 0;
if (d) {
pn_condition_t* c = pn_disposition_condition(d);
if (c && pn_condition_is_set(c)) {
return Msg() << pn_condition_get_name(c) << ": " << pn_condition_get_description(c);
}
}
return "rejected";
} else if (released()) {
return "released";
} else if (delivery_refused()) {
return "undeliverable-here";
} else if (not_delivered()) {
return "delivery-failed";
} else if (modified()) {
return "modified";
}
return "";
}
bool SenderContext::Delivery::accepted()
{
return token && pn_delivery_remote_state(token) == PN_ACCEPTED;
}
bool SenderContext::Delivery::rejected()
{
return token && pn_delivery_remote_state(token) == PN_REJECTED;
}
bool SenderContext::Delivery::released()
{
return token && pn_delivery_remote_state(token) == PN_RELEASED;
}
bool SenderContext::Delivery::modified()
{
return token && pn_delivery_remote_state(token) == PN_MODIFIED;
}
bool SenderContext::Delivery::delivery_refused()
{
if (modified()) {
pn_disposition_t* d = token ? pn_delivery_remote(token) : 0;
return d && pn_disposition_is_undeliverable(d);
} else {
return rejected();
}
}
bool SenderContext::Delivery::not_delivered()
{
if (modified()) {
pn_disposition_t* d = token ? pn_delivery_remote(token) : 0;
return d && !pn_disposition_is_undeliverable(d);
} else {
return released();
}
}
std::string SenderContext::Delivery::error()
{
pn_condition_t *condition = token ? pn_disposition_condition(pn_delivery_remote(token)) : 0;
return (condition && pn_condition_is_set(condition)) ?
Msg() << get_error_string(condition, std::string(), std::string()) :
std::string();
}
void SenderContext::Delivery::settle()
{
if (!settled) {
pn_delivery_settle(token);
token = 0; // can no longer use the delivery
settled = true;
}
}
void SenderContext::Delivery::settleAndReset()
{
//settle current delivery:
settle();
//but treat message as unsent:
settled = false;
}
void SenderContext::verify()
{
error.raise();
pn_terminus_t* target = pn_link_remote_target(sender);
if (!helper.isNameNull() && !pn_terminus_get_address(target)) {
std::string msg("No such target : ");
msg += getTarget();
QPID_LOG(debug, msg);
throw qpid::messaging::NotFound(msg);
} else if (AddressImpl::isTemporary(address)) {
address.setName(pn_terminus_get_address(target));
QPID_LOG(debug, "Dynamic target name set to " << address.getName());
}
helper.checkAssertion(target, AddressHelper::FOR_SENDER);
}
void SenderContext::configure()
{
error.raise();
if (sender) configure(pn_link_target(sender));
}
void SenderContext::configure(pn_terminus_t* target)
{
error.raise();
helper.configure(sender, target, AddressHelper::FOR_SENDER);
std::string option;
if (helper.getLinkSource(option)) {
pn_terminus_set_address(pn_link_source(sender), option.c_str());
} else {
pn_terminus_set_address(pn_link_source(sender), pn_terminus_get_address(pn_link_target(sender)));
}
}
bool SenderContext::settled()
{
error.raise();
return processUnsettled(false) == 0;
}
bool SenderContext::closed()
{
if (sender) {
return pn_link_state(sender) & PN_LOCAL_CLOSED;
} else {
return true;
}
}
Address SenderContext::getAddress() const
{
return address;
}
void SenderContext::reset(pn_session_t* session)
{
sender = session ? pn_sender(session, name.c_str()) : 0;
if (sender) configure();
for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i)
i->reset();
}
void SenderContext::resend()
{
for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end() && !i->sent(); ++i) {
i->send(sender, false/*only resend reliable transfers*/);
}
}
void SenderContext::cleanup()
{
if (!error && sender) {
error = new LinkError("sender no longer valid");
pn_link_free(sender);
sender = 0;
}
}
}}} // namespace qpid::messaging::amqp