blob: 6f288642d81b21847663cdf437dc4aa7f77d7466 [file]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/broker/Message.h"
#include "qpid/amqp/CharSequence.h"
#include "qpid/amqp/MapHandler.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/OwnershipToken.h"
#include "qpid/management/ManagementObject.h"
#include "qpid/management/Manageable.h"
#include "qpid/StringUtils.h"
#include "qpid/log/Statement.h"
#include "qpid/assert.h"
#include <algorithm>
#include <string.h>
#include <time.h>
using boost::intrusive_ptr;
using qpid::sys::AbsTime;
using qpid::sys::Duration;
using qpid::sys::TIME_MSEC;
using qpid::sys::FAR_FUTURE;
using qpid::amqp::CharSequence;
using qpid::amqp::MapHandler;
using std::string;
namespace qpid {
namespace broker {
Message::Message() : deliveryCount(-1), alreadyAcquired(false), replicationId(0), isReplicationIdSet(false)
{}
Message::Message(boost::intrusive_ptr<SharedState> e, boost::intrusive_ptr<PersistableMessage> p)
: sharedState(e), persistentContext(p), deliveryCount(-1), alreadyAcquired(false), replicationId(0), isReplicationIdSet(false)
{
if (persistentContext) persistentContext->setIngressCompletion(e);
}
Message::~Message() {}
std::string Message::getRoutingKey() const
{
return getEncoding().getRoutingKey();
}
std::string Message::getTo() const
{
return getEncoding().getTo();
}
std::string Message::getSubject() const
{
return getEncoding().getSubject();
}
std::string Message::getReplyTo() const
{
return getEncoding().getReplyTo();
}
bool Message::isPersistent() const
{
return getEncoding().isPersistent();
}
uint64_t Message::getMessageSize() const
{
return getEncoding().getMessageSize();
}
boost::intrusive_ptr<AsyncCompletion> Message::getIngressCompletion() const
{
return sharedState;
}
namespace
{
const std::string X_QPID_TRACE("x-qpid.trace");
}
bool Message::isExcluded(const std::vector<std::string>& excludes) const
{
std::string traceStr = getEncoding().getAnnotationAsString(X_QPID_TRACE);
if (traceStr.size()) {
std::vector<std::string> trace = split(traceStr, ", ");
for (std::vector<std::string>::const_iterator i = excludes.begin(); i != excludes.end(); i++) {
for (std::vector<std::string>::const_iterator j = trace.begin(); j != trace.end(); j++) {
if (*i == *j) {
return true;
}
}
}
}
return false;
}
void Message::addTraceId(const std::string& id)
{
std::string trace = getEncoding().getAnnotationAsString(X_QPID_TRACE);
if (trace.empty()) {
addAnnotation(X_QPID_TRACE, id);
} else if (trace.find(id) == std::string::npos) {
trace += ",";
trace += id;
addAnnotation(X_QPID_TRACE, trace);
}
}
void Message::clearTrace()
{
if (!getPropertyAsString(X_QPID_TRACE).empty()) {
addAnnotation(X_QPID_TRACE, std::string());
}
}
uint64_t Message::getTimestamp() const
{
return sharedState ? sharedState->getTimestamp() : 0;
}
uint64_t Message::getTtl() const
{
uint64_t ttl;
if (getTtl(ttl, 1)/*set to 1 if expired*/) {
return ttl;
} else {
return 0;
}
}
bool Message::getTtl(uint64_t& ttl) const
{
return getTtl(ttl, 0); //set to 0 if expired
}
bool Message::getTtl(uint64_t& ttl, uint64_t expiredValue) const
{
if (sharedState->getTtl(ttl) && sharedState->getExpiration() < FAR_FUTURE) {
sys::Duration remaining = sharedState->getTimeToExpiration();
// convert from ns to ms
ttl = (int64_t(remaining) >= 1000000 ? int64_t(remaining)/1000000 : expiredValue);
return true;
} else {
return false;
}
}
void Message::addAnnotation(const std::string& key, const qpid::types::Variant& value)
{
annotations.get()[key] = value;
annotationsChanged();
}
void Message::annotationsChanged()
{
if (persistentContext && persistentContext->isMergeRequired()) {
uint64_t id = persistentContext->getPersistenceId();
persistentContext = persistentContext->merge(getAnnotations());
persistentContext->setIngressCompletion(sharedState);
persistentContext->setPersistenceId(id);
}
}
uint8_t Message::getPriority() const
{
return getEncoding().getPriority();
}
bool Message::getIsManagementMessage() const { return sharedState->getIsManagementMessage(); }
const ConnectionIdentity* Message::getPublisherIdentity() const { return sharedState->getPublisherIdentity(); }
bool Message::isLocalTo(const OwnershipToken* token) const {
return token && sharedState->getPublisherToken() && token->isLocal(sharedState->getPublisherToken());
}
management::ObjectId Message::__getPublisherMgmtObject() const
{
//token is a potentially dangling pointer to the publihser connection that can only be safely
//used as the value to an OwnershipToken::isLocal() call. The following is only used for a
// QMF v1 attach request
const OwnershipToken* token = sharedState->getPublisherToken();
return token ? ((const Connection*) token)->getObjectId() : management::ObjectId();
}
qpid::framing::SequenceNumber Message::getSequence() const
{
return sequence;
}
void Message::setSequence(const qpid::framing::SequenceNumber& s)
{
sequence = s;
}
MessageState Message::getState() const
{
return state;
}
void Message::setState(MessageState s)
{
state = s;
}
namespace {
const qpid::types::Variant::Map EMPTY_MAP;
}
const qpid::types::Variant::Map& Message::getAnnotations() const
{
return annotations ? *annotations : EMPTY_MAP;
}
qpid::types::Variant Message::getAnnotation(const std::string& key) const
{
const qpid::types::Variant::Map& a = getAnnotations();
qpid::types::Variant::Map::const_iterator i = a.find(key);
if (i != a.end()) return i->second;
//FIXME: modify Encoding interface to allow retrieval of
//annotations of different types from the message data as received
//off the wire
return qpid::types::Variant(getEncoding().getAnnotationAsString(key));
}
std::string Message::getUserId() const
{
return sharedState->getUserId();
}
Message::SharedState& Message::getSharedState()
{
return *sharedState;
}
const Message::Encoding& Message::getEncoding() const
{
return *sharedState;
}
Message::operator bool() const
{
return !!sharedState;
}
std::string Message::getContent() const
{
return sharedState->getContent();
}
std::string Message::getPropertyAsString(const std::string& key) const
{
return sharedState->getPropertyAsString(key);
}
namespace {
class PropertyRetriever : public MapHandler
{
public:
PropertyRetriever(const std::string& key) : name(key) {}
void handleVoid(const CharSequence&) {}
void handleBool(const CharSequence& key, bool value) { handle(key, value); }
void handleUint8(const CharSequence& key, uint8_t value) { handle(key, value); }
void handleUint16(const CharSequence& key, uint16_t value) { handle(key, value); }
void handleUint32(const CharSequence& key, uint32_t value) { handle(key, value); }
void handleUint64(const CharSequence& key, uint64_t value) { handle(key, value); }
void handleInt8(const CharSequence& key, int8_t value) { handle(key, value); }
void handleInt16(const CharSequence& key, int16_t value) { handle(key, value); }
void handleInt32(const CharSequence& key, int32_t value) { handle(key, value); }
void handleInt64(const CharSequence& key, int64_t value) { handle(key, value); }
void handleFloat(const CharSequence& key, float value) { handle(key, value); }
void handleDouble(const CharSequence& key, double value) { handle(key, value); }
void handleString(const CharSequence& key, const CharSequence& value, const CharSequence& /*encoding*/)
{
if (matches(key)) result = std::string(value.data, value.size);
}
qpid::types::Variant getResult() { return result; }
private:
std::string name;
qpid::types::Variant result;
bool matches(const CharSequence& key)
{
return name.size()==key.size &&
::strncmp(key.data, name.data(), key.size) == 0;
}
template <typename T> void handle(const CharSequence& key, T value)
{
if (matches(key)) result = value;
}
};
}
qpid::types::Variant Message::getProperty(const std::string& key) const
{
PropertyRetriever r(key);
sharedState->processProperties(r);
return r.getResult();
}
std::string Message::printProperties() const
{
return sharedState->printProperties();
}
boost::intrusive_ptr<PersistableMessage> Message::getPersistentContext() const
{
return persistentContext;
}
void Message::processProperties(MapHandler& handler) const
{
sharedState->processProperties(handler);
}
bool Message::hasReplicationId() const {
return isReplicationIdSet;
}
uint64_t Message::getReplicationId() const {
return replicationId;
}
void Message::setReplicationId(framing::SequenceNumber id) {
replicationId = id;
isReplicationIdSet = true;
}
sys::AbsTime Message::getExpiration() const
{
return sharedState->getExpiration();
}
Message::ConnectionIdentityState::ConnectionIdentityState()
{
}
const std::string& Message::ConnectionIdentityState::getUserId() const
{
return userId;
}
const std::string& Message::ConnectionIdentityState::getMgmtId() const
{
return mgmtId;
}
Message::SharedStateImpl::SharedStateImpl() : publisherToken(0), expiration(qpid::sys::FAR_FUTURE), isManagementMessage(false) {}
const ConnectionIdentity* Message::SharedStateImpl::getPublisherIdentity() const
{
return &publisherIdentity;
}
const OwnershipToken* Message::SharedStateImpl::getPublisherToken() const
{
return publisherToken;
}
void Message::SharedStateImpl::setPublisher(const Connection* p)
{
publisherToken = p;
publisherIdentity.userId = p->getUserId();
publisherIdentity.mgmtId = p->getMgmtId();
}
sys::AbsTime Message::SharedStateImpl::getExpiration() const
{
return expiration;
}
void Message::SharedStateImpl::setExpiration(sys::AbsTime e)
{
expiration = e;
}
sys::Duration Message::SharedStateImpl::getTimeToExpiration() const
{
return sys::Duration(sys::AbsTime::now(), expiration);
}
void Message::SharedStateImpl::computeExpiration()
{
//TODO: this is still quite 0-10 specific...
uint64_t ttl;
if (getTtl(ttl)) {
// Use higher resolution time for the internal expiry calculation.
// Prevent overflow as a signed int64_t
Duration duration(std::min(ttl * TIME_MSEC,
(uint64_t) std::numeric_limits<int64_t>::max()));
expiration = AbsTime(sys::AbsTime::now(), duration);
}
}
bool Message::SharedStateImpl::getIsManagementMessage() const
{
return isManagementMessage;
}
void Message::SharedStateImpl::setIsManagementMessage(bool b)
{
isManagementMessage = b;
}
}} // namespace qpid::broker