blob: 1e9eb9d38693ed8612b79001896ab19531b2e5b6 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "Message.h"
#include "ExchangeRegistry.h"
#include "ExpiryPolicy.h"
#include "qpid/StringUtils.h"
#include "qpid/framing/frame_functors.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/SendContent.h"
#include "qpid/framing/SequenceNumber.h"
#include "qpid/framing/TypeFilter.h"
#include "qpid/log/Statement.h"
#include <time.h>
using boost::intrusive_ptr;
using qpid::sys::AbsTime;
using qpid::sys::Duration;
using qpid::sys::TIME_MSEC;
using qpid::sys::FAR_FUTURE;
using std::string;
using namespace qpid::framing;
namespace qpid {
namespace broker {
TransferAdapter Message::TRANSFER;
Message::Message(const framing::SequenceNumber& id) :
frames(id), persistenceId(0), redelivered(false), loaded(false),
staged(false), forcePersistentPolicy(false), publisher(0), adapter(0),
expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0) {}
Message::~Message()
{
if (expiryPolicy)
expiryPolicy->forget(*this);
}
void Message::forcePersistent()
{
forcePersistentPolicy = true;
}
std::string Message::getRoutingKey() const
{
return getAdapter().getRoutingKey(frames);
}
std::string Message::getExchangeName() const
{
return getAdapter().getExchange(frames);
}
const boost::shared_ptr<Exchange> Message::getExchange(ExchangeRegistry& registry) const
{
if (!exchange) {
exchange = registry.get(getExchangeName());
}
return exchange;
}
bool Message::isImmediate() const
{
return getAdapter().isImmediate(frames);
}
const FieldTable* Message::getApplicationHeaders() const
{
return getAdapter().getApplicationHeaders(frames);
}
bool Message::isPersistent()
{
return (getAdapter().isPersistent(frames) || forcePersistentPolicy);
}
bool Message::requiresAccept()
{
return getAdapter().requiresAccept(frames);
}
uint32_t Message::getRequiredCredit() const
{
//add up payload for all header and content frames in the frameset
SumBodySize sum;
frames.map_if(sum, TypeFilter2<HEADER_BODY, CONTENT_BODY>());
return sum.getSize();
}
void Message::encode(framing::Buffer& buffer) const
{
//encode method and header frames
EncodeFrame f1(buffer);
frames.map_if(f1, TypeFilter2<METHOD_BODY, HEADER_BODY>());
//then encode the payload of each content frame
framing::EncodeBody f2(buffer);
frames.map_if(f2, TypeFilter<CONTENT_BODY>());
}
void Message::encodeContent(framing::Buffer& buffer) const
{
//encode the payload of each content frame
EncodeBody f2(buffer);
frames.map_if(f2, TypeFilter<CONTENT_BODY>());
}
uint32_t Message::encodedSize() const
{
return encodedHeaderSize() + encodedContentSize();
}
uint32_t Message::encodedContentSize() const
{
return frames.getContentSize();
}
uint32_t Message::encodedHeaderSize() const
{
//add up the size for all method and header frames in the frameset
SumFrameSize sum;
frames.map_if(sum, TypeFilter2<METHOD_BODY, HEADER_BODY>());
return sum.getSize();
}
void Message::decodeHeader(framing::Buffer& buffer)
{
AMQFrame method;
method.decode(buffer);
frames.append(method);
AMQFrame header;
header.decode(buffer);
frames.append(header);
}
void Message::decodeContent(framing::Buffer& buffer)
{
if (buffer.available()) {
//get the data as a string and set that as the content
//body on a frame then add that frame to the frameset
AMQFrame frame((AMQContentBody()));
frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
frame.setFirstSegment(false);
frames.append(frame);
} else {
//adjust header flags
MarkLastSegment f;
frames.map_if(f, TypeFilter<HEADER_BODY>());
}
//mark content loaded
loaded = true;
}
void Message::releaseContent(MessageStore* _store)
{
if (!store) {
store = _store;
}
if (store) {
if (!getPersistenceId()) {
intrusive_ptr<PersistableMessage> pmsg(this);
store->stage(pmsg);
staged = true;
}
//remove any content frames from the frameset
frames.remove(TypeFilter<CONTENT_BODY>());
setContentReleased();
}
}
void Message::destroy()
{
if (staged) {
if (store) {
store->destroy(*this);
} else {
QPID_LOG(error, "Message content was staged but no store is set so it can't be destroyed");
}
}
}
bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const
{
if (isContentReleased()) {
intrusive_ptr<const PersistableMessage> pmsg(this);
bool done = false;
string& data = frame.castBody<AMQContentBody>()->getData();
store->loadContent(queue, pmsg, data, offset, maxContentSize);
done = data.size() < maxContentSize;
frame.setBof(false);
frame.setEof(true);
QPID_LOG(debug, "loaded frame" << frame);
if (offset > 0) {
frame.setBos(false);
}
if (!done) {
frame.setEos(false);
} else return false;
return true;
}
else return false;
}
void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const
{
if (isContentReleased()) {
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
bool morecontent = true;
for (uint64_t offset = 0; morecontent; offset += maxContentSize)
{
AMQFrame frame((AMQContentBody()));
morecontent = getContentFrame(queue, frame, maxContentSize, offset);
out.handle(frame);
}
} else {
Count c;
frames.map_if(c, TypeFilter<CONTENT_BODY>());
SendContent f(out, maxFrameSize, c.getCount());
frames.map_if(f, TypeFilter<CONTENT_BODY>());
}
}
void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/) const
{
sys::Mutex::ScopedLock l(lock);
Relay f(out);
frames.map_if(f, TypeFilter<HEADER_BODY>());
}
// TODO aconway 2007-11-09: Obsolete, remove. Was used to cover over
// 0-8/0-9 message differences.
MessageAdapter& Message::getAdapter() const
{
if (!adapter) {
if(frames.isA<MessageTransferBody>()) {
adapter = &TRANSFER;
} else {
const AMQMethodBody* method = frames.getMethod();
if (!method) throw Exception("Can't adapt message with no method");
else throw Exception(QPID_MSG("Can't adapt message based on " << *method));
}
}
return *adapter;
}
uint64_t Message::contentSize() const
{
return frames.getContentSize();
}
bool Message::isContentLoaded() const
{
return loaded;
}
namespace
{
const std::string X_QPID_TRACE("x-qpid.trace");
}
bool Message::isExcluded(const std::vector<std::string>& excludes) const
{
const FieldTable* headers = getApplicationHeaders();
if (headers) {
std::string traceStr = headers->getAsString(X_QPID_TRACE);
if (traceStr.size()) {
std::vector<std::string> trace = split(traceStr, ", ");
for (std::vector<std::string>::const_iterator i = excludes.begin(); i != excludes.end(); i++) {
for (std::vector<std::string>::const_iterator j = trace.begin(); j != trace.end(); j++) {
if (*i == *j) {
return true;
}
}
}
}
}
return false;
}
void Message::addTraceId(const std::string& id)
{
sys::Mutex::ScopedLock l(lock);
if (isA<MessageTransferBody>()) {
FieldTable& headers = getProperties<MessageProperties>()->getApplicationHeaders();
std::string trace = headers.getAsString(X_QPID_TRACE);
if (trace.empty()) {
headers.setString(X_QPID_TRACE, id);
} else if (trace.find(id) == std::string::npos) {
trace += ",";
trace += id;
headers.setString(X_QPID_TRACE, trace);
}
}
}
void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
{
DeliveryProperties* props = getProperties<DeliveryProperties>();
if (props->getTtl()) {
// AMQP requires setting the expiration property to be posix
// time_t in seconds. TTL is in milliseconds
time_t now = ::time(0);
props->setExpiration(now + (props->getTtl()/1000));
// Use higher resolution time for the internal expiry calculation.
expiration = AbsTime(AbsTime::now(), Duration(props->getTtl() * TIME_MSEC));
setExpiryPolicy(e);
}
}
void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
expiryPolicy = e;
if (expiryPolicy)
expiryPolicy->willExpire(*this);
}
bool Message::hasExpired()
{
return expiryPolicy && expiryPolicy->hasExpired(*this);
}
boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const
{
sys::Mutex::ScopedLock l(lock);
Replacement::iterator i = replacement.find(qfor);
if (i != replacement.end()){
return i->second;
}
return empty;
}
void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor)
{
sys::Mutex::ScopedLock l(lock);
replacement[qfor] = msg;
}
void Message::allEnqueuesComplete() {
MessageCallback* cb = 0;
{
sys::Mutex::ScopedLock l(lock);
std::swap(cb, enqueueCallback);
}
if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
}
void Message::allDequeuesComplete() {
MessageCallback* cb = 0;
{
sys::Mutex::ScopedLock l(lock);
std::swap(cb, dequeueCallback);
}
if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
}
void Message::setEnqueueCompleteCallback(MessageCallback& cb) { enqueueCallback = &cb; }
void Message::resetEnqueueCompleteCallback() { enqueueCallback = 0; }
void Message::setDequeueCompleteCallback(MessageCallback& cb) { dequeueCallback = &cb; }
void Message::resetDequeueCompleteCallback() { dequeueCallback = 0; }
framing::FieldTable& Message::getOrInsertHeaders()
{
return getProperties<MessageProperties>()->getApplicationHeaders();
}
}} // namespace qpid::broker