blob: 625c6cceba07e2de7d6b49564fa740834f0ce21a [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/broker/Queue.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/AclModule.h"
#include "qpid/broker/QueueCursor.h"
#include "qpid/broker/QueueDepth.h"
#include "qpid/broker/QueueSettings.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/MessageStore.h"
#include "qpid/broker/MessageDeque.h"
#include "qpid/broker/MessageDistributor.h"
#include "qpid/broker/FifoDistributor.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/Selector.h"
#include "qpid/broker/TransactionObserver.h"
#include "qpid/broker/TxDequeue.h"
//TODO: get rid of this
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/StringUtils.h"
#include "qpid/log/Statement.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
#include "qpid/sys/Timer.h"
#include "qpid/types/Variant.h"
#include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
#include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h"
#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
#include "qmf/org/apache/qpid/broker/EventUnsubscribe.h"
#include <iostream>
#include <algorithm>
#include <functional>
#include <boost/bind.hpp>
#include <boost/intrusive_ptr.hpp>
namespace qpid {
namespace broker {
using namespace qpid::sys;
using namespace qpid::framing;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
using qpid::management::getCurrentPublisher;
using std::string;
using std::for_each;
using std::mem_fun;
namespace _qmf = qmf::org::apache::qpid::broker;
namespace
{
inline void mgntEnqStats(const Message& msg,
_qmf::Queue::shared_ptr mgmtObject,
_qmf::Broker::shared_ptr brokerMgmtObject)
{
if (mgmtObject != 0) {
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
uint64_t contentSize = msg.getMessageSize();
qStats->msgTotalEnqueues +=1;
bStats->msgTotalEnqueues += 1;
qStats->byteTotalEnqueues += contentSize;
bStats->byteTotalEnqueues += contentSize;
if (msg.isPersistent ()) {
qStats->msgPersistEnqueues += 1;
bStats->msgPersistEnqueues += 1;
qStats->bytePersistEnqueues += contentSize;
bStats->bytePersistEnqueues += contentSize;
}
mgmtObject->statisticsUpdated();
brokerMgmtObject->statisticsUpdated();
}
}
inline void mgntDeqStats(const Message& msg,
_qmf::Queue::shared_ptr mgmtObject,
_qmf::Broker::shared_ptr brokerMgmtObject)
{
if (mgmtObject != 0){
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
uint64_t contentSize = msg.getMessageSize();
qStats->msgTotalDequeues += 1;
bStats->msgTotalDequeues += 1;
qStats->byteTotalDequeues += contentSize;
bStats->byteTotalDequeues += contentSize;
if (msg.isPersistent ()){
qStats->msgPersistDequeues += 1;
bStats->msgPersistDequeues += 1;
qStats->bytePersistDequeues += contentSize;
bStats->bytePersistDequeues += contentSize;
}
mgmtObject->statisticsUpdated();
brokerMgmtObject->statisticsUpdated();
}
}
QueueSettings merge(const QueueSettings& inputs, const Broker& broker)
{
QueueSettings settings(inputs);
settings.maxDepth = QueueDepth();
if (inputs.maxDepth.hasCount() && inputs.maxDepth.getCount()) {
settings.maxDepth.setCount(inputs.maxDepth.getCount());
}
if (inputs.maxDepth.hasSize()) {
if (inputs.maxDepth.getSize()) {
settings.maxDepth.setSize(inputs.maxDepth.getSize());
}
} else if (broker.getQueueLimit()) {
settings.maxDepth.setSize(broker.getQueueLimit());
}
return settings;
}
}
Queue::TxPublish::TxPublish(const Message& m, boost::shared_ptr<Queue> q) : message(m), queue(q), prepared(false) {}
bool Queue::TxPublish::prepare(TransactionContext* ctxt) throw()
{
try {
prepared = queue->enqueue(ctxt, message);
return true;
} catch (const std::exception& e) {
QPID_LOG(error, "Failed to prepare: " << e.what());
return false;
}
}
void Queue::TxPublish::commit() throw()
{
try {
if (prepared) queue->process(message);
} catch (const std::exception& e) {
QPID_LOG(error, "Failed to commit: " << e.what());
}
}
void Queue::TxPublish::rollback() throw()
{
try {
if (prepared) queue->enqueueAborted(message);
} catch (const std::exception& e) {
QPID_LOG(error, "Failed to rollback: " << e.what());
}
}
void Queue::TxPublish::callObserver(
const boost::shared_ptr<TransactionObserver>& observer)
{
observer->enqueue(queue, message);
}
Queue::Queue(const string& _name, const QueueSettings& _settings,
MessageStore* const _store,
Manageable* parent,
Broker* b) :
name(_name),
store(_store),
owner(0),
exclusive(0),
messages(new MessageDeque()),
persistenceId(0),
settings(b ? merge(_settings, *b) : _settings),
eventMode(0),
observers(name, messageLock),
broker(b),
deleted(false),
barrier(*this),
allocator(new FifoDistributor( *messages )),
redirectSource(false)
{
current.setCount(0);//always track depth in messages
if (settings.maxDepth.getSize()) current.setSize(0);//track depth in bytes only if policy requires it
if (settings.traceExcludes.size()) {
split(traceExclude, settings.traceExcludes, ", ");
}
qpid::amqp_0_10::translate(settings.asMap(), encodableSettings);
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
if (agent != 0) {
mgmtObject = _qmf::Queue::shared_ptr(
new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete));
mgmtObject->set_arguments(settings.asMap());
agent->addObject(mgmtObject, 0, store != 0);
brokerMgmtObject = boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject());
if (brokerMgmtObject)
brokerMgmtObject->inc_queueCount();
}
}
if ( settings.isBrowseOnly ) {
QPID_LOG ( info, "Queue " << name << " is browse-only." );
}
if (settings.filter.size()) {
selector.reset(new Selector(settings.filter));
QPID_LOG (info, "Queue " << name << " using filter: " << settings.filter);
}
}
Queue::~Queue()
{
if (mgmtObject != 0) {
mgmtObject->debugStats("destroying");
mgmtObject->resourceDestroy();
}
}
bool Queue::isLocal(const Message& msg)
{
//message is considered local if it was published on the same
//connection as that of the session which declared this queue
//exclusive (owner) or which has an exclusive subscription
//(exclusive)
return settings.noLocal && (msg.isLocalTo(owner) || msg.isLocalTo(exclusive));
}
bool Queue::isExcluded(const Message& msg)
{
return traceExclude.size() && msg.isExcluded(traceExclude);
}
bool Queue::accept(const Message& msg)
{
//TODO: move some of this out of the queue and into the publishing
//'link' for whatever protocol is used; that would let protocol
//specific stuff be kept out the queue
if (broker::amqp_0_10::MessageTransfer::isImmediateDeliveryRequired(msg) && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg, 0);
alternateExchange->route(deliverable);
}
return false;
} else if (isLocal(msg)) {
//drop message
QPID_LOG(info, "Dropping 'local' message from " << getName());
return false;
} else if (isExcluded(msg)) {
//drop message
QPID_LOG(info, "Dropping excluded message from " << getName());
return false;
} else {
messages->check(msg);
if (selector) {
return selector->filter(msg);
} else {
return true;
}
}
}
void Queue::deliver(Message msg, TxBuffer* txn)
{
if (redirectPeer) {
redirectPeer->deliverTo(msg, txn);
} else {
deliverTo(msg, txn);
}
}
void Queue::deliverTo(Message msg, TxBuffer* txn)
{
if (accept(msg)) {
interceptors.record(msg);
if (txn) {
TxOp::shared_ptr op(new TxPublish(msg, shared_from_this()));
txn->enlist(op);
QPID_LOG(debug, "Message " << msg.getSequence() << " enqueue on " << name
<< " enlisted in " << txn);
} else {
if (enqueue(0, msg)) {
push(msg);
QPID_LOG(debug, "Message " << msg.getSequence() << " enqueued on " << name);
} else {
QPID_LOG(debug, "Message " << msg.getSequence() << " dropped from " << name);
}
}
}
}
void Queue::recoverPrepared(const Message& msg)
{
Mutex::ScopedLock locker(messageLock);
current += QueueDepth(1, msg.getMessageSize());
}
void Queue::recover(Message& msg)
{
recoverPrepared(msg);
push(msg, true);
}
void Queue::process(Message& msg)
{
push(msg);
if (mgmtObject != 0){
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
const uint64_t contentSize = msg.getMessageSize();
qStats->msgTxnEnqueues += 1;
qStats->byteTxnEnqueues += contentSize;
mgmtObject->statisticsUpdated();
if (brokerMgmtObject) {
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
bStats->msgTxnEnqueues += 1;
bStats->byteTxnEnqueues += contentSize;
brokerMgmtObject->statisticsUpdated();
}
}
}
void Queue::release(const QueueCursor& position, bool markRedelivered)
{
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
if (!deleted) {
Message* message = messages->release(position);
if (message) {
if (!markRedelivered) message->undeliver();
listeners.populate(copy);
observeRequeue(*message, locker);
if (mgmtObject) {
mgmtObject->inc_releases();
if (brokerMgmtObject)
brokerMgmtObject->inc_releases();
}
}
}
}
copy.notify();
}
bool Queue::dequeueMessageAt(const SequenceNumber& position)
{
ScopedAutoDelete autodelete(*this);
boost::intrusive_ptr<PersistableMessage> pmsg;
{
Mutex::ScopedLock locker(messageLock);
QPID_LOG(debug, "Attempting to dequeue message at " << position);
QueueCursor cursor;
Message* msg = messages->find(position, &cursor);
if (msg) {
if (msg->isPersistent()) pmsg = msg->getPersistentContext();
observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0);
messages->deleted(cursor);
} else {
QPID_LOG(debug, "Could not dequeue message at " << position << "; no such message");
return false;
}
}
dequeueFromStore(pmsg);
return true;
}
bool Queue::acquire(const QueueCursor& position, const std::string& consumer)
{
Mutex::ScopedLock locker(messageLock);
Message* msg;
msg = messages->find(position);
if (msg) {
QPID_LOG(debug, consumer << " attempting to acquire message at " << msg->getSequence());
if (!allocator->acquire(consumer, *msg)) {
QPID_LOG(debug, "Not permitted to acquire msg at " << msg->getSequence() << " from '" << name);
return false;
} else {
observeAcquire(*msg, locker);
QPID_LOG(debug, "Acquired message at " << msg->getSequence() << " from " << name);
return true;
}
} else {
QPID_LOG(debug, "Failed to acquire message which no longer exists on " << name);
return false;
}
}
bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c)
{
if (!checkNotDeleted(c)) return false;
QueueListeners::NotificationSet set;
ScopedAutoDelete autodelete(*this);
bool messageFound(false);
while (true) {
//TODO: reduce lock scope
Mutex::ScopedLock locker(messageLock);
QueueCursor cursor = c->getCursor(); // Save current position.
Message* msg = messages->next(*c); // Advances c.
if (msg) {
if (isExpired(name, *msg, sys::AbsTime::now())) {
QPID_LOG(debug, "Message expired from queue '" << name << "'");
observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0);
//ERROR: don't hold lock across call to store!!
if (msg->isPersistent()) dequeueFromStore(msg->getPersistentContext());
if (mgmtObject) {
mgmtObject->inc_discardsTtl();
if (brokerMgmtObject)
brokerMgmtObject->inc_discardsTtl();
}
messages->deleted(*c);
continue;
}
if (c->filter(*msg)) {
if (c->accept(*msg)) {
if (c->preAcquires()) {
QPID_LOG(debug, "Attempting to acquire message " << msg->getSequence()
<< " from '" << name << "' with state " << msg->getState());
if (allocator->acquire(c->getName(), *msg)) {
if (mgmtObject) {
mgmtObject->inc_acquires();
if (brokerMgmtObject)
brokerMgmtObject->inc_acquires();
}
observeAcquire(*msg, locker);
msg->deliver();
} else {
QPID_LOG(debug, "Could not acquire message from '" << name << "'");
continue; //try another message
}
}
QPID_LOG(debug, "Message " << msg->getSequence() << " retrieved from '"
<< name << "'");
m = *msg;
messageFound = true;
break;
} else {
//message(s) are available but consumer hasn't got enough credit
QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
c->setCursor(cursor); // Restore cursor, will try again with credit
if (c->preAcquires()) {
//let someone else try
listeners.populate(set);
}
break;
}
} else {
//consumer will never want this message, try another one
QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
if (c->preAcquires()) {
//let someone else try to take this one
listeners.populate(set);
}
}
} else {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
c->stopped();
listeners.addListener(c);
break;
}
}
set.notify();
return messageFound;
}
void Queue::removeListener(Consumer::shared_ptr c)
{
QueueListeners::NotificationSet set;
{
Mutex::ScopedLock locker(messageLock);
listeners.removeListener(c);
if (messages->size()) {
listeners.populate(set);
}
}
set.notify();
}
bool Queue::dispatch(Consumer::shared_ptr c)
{
Message msg;
if (getNextMessage(msg, c)) {
c->deliver(*c, msg);
return true;
} else {
return false;
}
}
bool Queue::find(SequenceNumber pos, Message& msg) const
{
Mutex::ScopedLock locker(messageLock);
Message* ptr = messages->find(pos, 0);
if (ptr) {
msg = *ptr;
return true;
}
return false;
}
void Queue::markInUse(bool controlling)
{
Mutex::ScopedLock locker(messageLock);
if (controlling) users.addLifecycleController();
else users.addOther();
}
void Queue::releaseFromUse(bool controlling, bool doDelete)
{
bool trydelete;
if (controlling) {
Mutex::ScopedLock locker(messageLock);
users.removeLifecycleController();
trydelete = true;
} else {
Mutex::ScopedLock locker(messageLock);
users.removeOther();
trydelete = isUnused(locker);
}
if (trydelete && doDelete) scheduleAutoDelete();
}
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive,
const framing::FieldTable& arguments,
const std::string& connectionId, const std::string& userId)
{
boost::intrusive_ptr<qpid::sys::TimerTask> t;
{
Mutex::ScopedLock locker(messageLock);
if (c->preAcquires()) {
if(settings.isBrowseOnly) {
throw NotAllowedException(
QPID_MSG("Queue " << name << " is browse only. Refusing acquiring consumer."));
}
if(exclusive) {
throw ResourceLockedException(
QPID_MSG("Queue " << getName()
<< " has an exclusive consumer. No more consumers allowed."));
} else if(requestExclusive) {
if(users.hasConsumers()) {
throw ResourceLockedException(
QPID_MSG("Queue " << getName()
<< " already has consumers. Exclusive access denied."));
} else {
exclusive = c->getSession();
}
}
users.addConsumer();
} else if(c->isCounted()) {
users.addBrowser();
}
if(c->isCounted()) {
//reset auto deletion timer if necessary
if (settings.autoDeleteDelay && autoDeleteTask) {
t = autoDeleteTask;
}
observeConsumerAdd(*c, locker);
}
}
if (t) t->cancel();
if (mgmtObject != 0 && c->isCounted()) {
mgmtObject->inc_consumerCount();
}
if (broker) {
ManagementAgent* agent = broker->getManagementAgent();
if (agent) {
agent->raiseEvent(
_qmf::EventSubscribe(connectionId, userId, name,
c->getTag(), requestExclusive, ManagementAgent::toMap(arguments)));
}
}
}
void Queue::cancel(Consumer::shared_ptr c, const std::string& connectionId, const std::string& userId)
{
removeListener(c);
if(c->isCounted())
{
bool unused;
{
Mutex::ScopedLock locker(messageLock);
if (c->preAcquires()) {
users.removeConsumer();
if (exclusive) exclusive = 0;
} else {
users.removeBrowser();
}
observeConsumerRemove(*c, locker);
unused = !users.isUsed();
}
if (mgmtObject != 0) {
mgmtObject->dec_consumerCount();
}
if (unused && settings.autodelete) scheduleAutoDelete();
}
if (broker) {
ManagementAgent* agent = broker->getManagementAgent();
if (agent) agent->raiseEvent(_qmf::EventUnsubscribe(connectionId, userId, c->getTag()));
}
}
bool Queue::isExpired(const std::string& name, const Message& m, AbsTime now)
{
if (m.getExpiration() < now) {
QPID_LOG(debug, "Message expired from queue '" << name << "': " << m.printProperties());
return true;
} else {
return false;
}
}
/**
*@param lapse: time since the last purgeExpired
*/
void Queue::purgeExpired(sys::Duration lapse) {
//As expired messages are discarded during dequeue also, only
//bother explicitly expiring if the rate of dequeues since last
//attempt is less than one per second.
int count = dequeueSincePurge.get();
dequeueSincePurge -= count;
int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
if (seconds == 0 || count / seconds < 1) {
sys::AbsTime time = sys::AbsTime::now();
uint32_t count = remove(0, boost::bind(&isExpired, name, _1, time), 0, CONSUMER, settings.autodelete);
QPID_LOG(debug, "Purged " << count << " expired messages from " << getName());
//
// Report the count of discarded-by-ttl messages
//
if (mgmtObject && count) {
mgmtObject->inc_discardsTtl(count);
if (brokerMgmtObject) {
brokerMgmtObject->inc_discardsTtl(count);
}
}
}
}
namespace {
// for use with purge/move below - collect messages that match a given filter
//
class MessageFilter
{
public:
static const std::string typeKey;
static const std::string paramsKey;
static MessageFilter *create( const ::qpid::types::Variant::Map *filter );
virtual bool match( const Message& ) const { return true; }
virtual ~MessageFilter() {}
protected:
MessageFilter() {};
};
const std::string MessageFilter::typeKey("filter_type");
const std::string MessageFilter::paramsKey("filter_params");
// filter by message header string value exact match
class HeaderMatchFilter : public MessageFilter
{
public:
/* Config:
{ 'filter_type' : 'header_match_str',
'filter_params' : { 'header_key' : "<header name>",
'header_value' : "<value to match>"
}
}
*/
static const std::string typeKey;
static const std::string headerKey;
static const std::string valueKey;
HeaderMatchFilter( const std::string& _header, const std::string& _value )
: MessageFilter (), header(_header), value(_value) {}
bool match( const Message& msg ) const
{
return msg.getPropertyAsString(header) == value;
}
private:
const std::string header;
const std::string value;
};
const std::string HeaderMatchFilter::typeKey("header_match_str");
const std::string HeaderMatchFilter::headerKey("header_key");
const std::string HeaderMatchFilter::valueKey("header_value");
// factory to create correct filter based on map
MessageFilter* MessageFilter::create( const ::qpid::types::Variant::Map *filter )
{
using namespace qpid::types;
if (filter && !filter->empty()) {
Variant::Map::const_iterator i = filter->find(MessageFilter::typeKey);
if (i != filter->end()) {
if (i->second.asString() == HeaderMatchFilter::typeKey) {
Variant::Map::const_iterator p = filter->find(MessageFilter::paramsKey);
if (p != filter->end() && p->second.getType() == VAR_MAP) {
Variant::Map::const_iterator k = p->second.asMap().find(HeaderMatchFilter::headerKey);
Variant::Map::const_iterator v = p->second.asMap().find(HeaderMatchFilter::valueKey);
if (k != p->second.asMap().end() && v != p->second.asMap().end()) {
std::string headerKey(k->second.asString());
std::string value(v->second.asString());
QPID_LOG(debug, "Message filtering by header value configured. key: " << headerKey << " value: " << value );
return new HeaderMatchFilter( headerKey, value );
}
}
}
}
QPID_LOG(error, "Unrecognized message filter: '" << *filter << "'");
throw qpid::Exception(QPID_MSG("Unrecognized message filter: '" << *filter << "'"));
}
return new MessageFilter();
}
void moveTo(boost::shared_ptr<Queue> q, Message& m)
{
if (q) {
q->deliver(m);
}
}
} // end namespace
uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f,
SubscriptionType type, bool triggerAutoDelete, uint32_t maxTests)
{
ScopedAutoDelete autodelete(*this);
std::deque<Message> removed;
{
QueueCursor c(type);
uint32_t count(0), tests(0);
Mutex::ScopedLock locker(messageLock);
Message* m = messages->next(c);
while (m){
if (maxTests && tests++ >= maxTests) break;
if (!p || p(*m)) {
if (maxCount && count++ >= maxCount) break;
if (m->getState() == AVAILABLE) {
//don't actually acquire, just act as if we did
observeAcquire(*m, locker);
}
observeDequeue(*m, locker, triggerAutoDelete ? &autodelete : 0);
removed.push_back(*m);//takes a copy of the message
if (!messages->deleted(c)) {
QPID_LOG(warning, "Failed to correctly remove message from " << name << "; state is not consistent!");
assert(false);
}
}
m = messages->next(c);
}
}
for (std::deque<Message>::iterator i = removed.begin(); i != removed.end(); ++i) {
if (f) f(*i);//ERROR? need to clear old persistent context?
if (i->isPersistent()) dequeueFromStore(i->getPersistentContext());//do this outside of lock and after any re-routing
}
return removed.size();
}
/**
* purge - for purging all or some messages on a queue
* depending on the purge_request
*
* qty == 0 then purge all messages
* == N then purge N messages from queue
* Sometimes qty == 1 to unblock the top of queue
*
* The dest exchange may be supplied to re-route messages through the exchange.
* It is safe to re-route messages such that they arrive back on the same queue,
* even if the queue is ordered by priority.
*
* An optional filter can be supplied that will be applied against each message. The
* message is purged only if the filter matches. See MessageDistributor for more detail.
*/
uint32_t Queue::purge(const uint32_t qty, boost::shared_ptr<Exchange> dest,
const qpid::types::Variant::Map *filter)
{
std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
uint32_t count = remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&reroute, dest, _1), CONSUMER/*?*/, settings.autodelete);
if (mgmtObject && count) {
mgmtObject->inc_acquires(count);
if (dest.get()) {
mgmtObject->inc_reroutes(count);
if (brokerMgmtObject) {
brokerMgmtObject->inc_acquires(count);
brokerMgmtObject->inc_reroutes(count);
}
} else {
mgmtObject->inc_discardsPurge(count);
if (brokerMgmtObject) {
brokerMgmtObject->inc_acquires(count);
brokerMgmtObject->inc_discardsPurge(count);
}
}
}
return count;
}
uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
const qpid::types::Variant::Map *filter)
{
std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
return remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&moveTo, destq, _1), CONSUMER/*?*/, settings.autodelete);
}
void Queue::push(Message& message, bool /*isRecovery*/)
{
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
message.setSequence(++sequence);
if (settings.sequencing) message.addAnnotation(settings.sequenceKey, (uint32_t)sequence);
interceptors.publish(message);
messages->publish(message);
listeners.populate(copy);
observeEnqueue(message, locker);
}
copy.notify();
}
uint32_t Queue::getMessageCount() const
{
Mutex::ScopedLock locker(messageLock);
return messages->size();
}
uint32_t Queue::getConsumerCount() const
{
Mutex::ScopedLock locker(messageLock);
return users.getSubscriberCount();
}
bool Queue::canAutoDelete() const
{
Mutex::ScopedLock locker(messageLock);
return !deleted && checkAutoDelete(locker);
}
bool Queue::checkAutoDelete(const Mutex::ScopedLock& lock) const
{
if (settings.autodelete) {
switch (settings.lifetime) {
case QueueSettings::DELETE_IF_UNUSED:
return isUnused(lock);
case QueueSettings::DELETE_IF_EMPTY:
return !users.isInUseByController() && isEmpty(lock);
case QueueSettings::DELETE_IF_UNUSED_AND_EMPTY:
return isUnused(lock) && isEmpty(lock);
case QueueSettings::DELETE_ON_CLOSE:
return !users.isInUseByController();
}
}
return false;
}
bool Queue::isUnused(const Mutex::ScopedLock&) const
{
return !owner && !users.isUsed();;
}
bool Queue::isEmpty(const Mutex::ScopedLock&) const
{
return current.getCount() == 0;
}
/*
* return true if enqueue succeeded and message should be made
* available; returning false will result in the message being dropped
*/
bool Queue::enqueue(TransactionContext* ctxt, Message& msg)
{
ScopedUse u(barrier);
if (!u.acquired) return false;
{
Mutex::ScopedLock locker(messageLock);
if (!checkDepth(QueueDepth(1, msg.getMessageSize()), msg)) {
return false;
}
}
if (settings.traceId.size()) {
msg.addTraceId(settings.traceId);
}
if (msg.isPersistent() && store) {
// mark the message as being enqueued - the store MUST CALL msg->enqueueComplete()
// when it considers the message stored.
boost::intrusive_ptr<PersistableMessage> pmsg = msg.getPersistentContext();
assert(pmsg);
pmsg->enqueueAsync(shared_from_this());
try {
store->enqueue(ctxt, pmsg, *this);
} catch (...) {
enqueueAborted(msg);
throw;
}
}
return true;
}
void Queue::enqueueAborted(const Message& msg)
{
//Called when any transactional enqueue is aborted (including but
//not limited to a recovered dtx transaction)
Mutex::ScopedLock locker(messageLock);
current -= QueueDepth(1, msg.getMessageSize());
}
void Queue::enqueueCommited(Message& msg)
{
//called when a recovered dtx enqueue operation is committed; the
//message is already on disk and space has been reserved in policy
//but it should now be made available
process(msg);
}
void Queue::dequeueAborted(Message& msg)
{
//called when a recovered dtx dequeue operation is aborted; the
//message should be added back to the queue
push(msg);
}
void Queue::dequeueCommited(const Message& msg)
{
//called when a recovered dtx dequeue operation is committed; the
//message will at this point have already been removed from the
//store and will not be available for delivery. The only action
//required is to ensure the observers are notified and the
//management stats are correctly decremented
ScopedAutoDelete autodelete(*this);
Mutex::ScopedLock locker(messageLock);
observeDequeue(msg, locker, settings.autodelete ? &autodelete : 0);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(msg.getMessageSize());
}
}
void Queue::dequeueFromStore(boost::intrusive_ptr<PersistableMessage> msg)
{
ScopedUse u(barrier);
if (u.acquired && msg && store) {
store->dequeue(0, msg, *this);
}
}
void Queue::dequeue(const QueueCursor& cursor, TxBuffer* txn)
{
if (txn) {
TxOp::shared_ptr op;
{
Mutex::ScopedLock locker(messageLock);
Message* msg = messages->find(cursor);
if (msg) {
op = TxOp::shared_ptr(new TxDequeue(cursor, shared_from_this(), msg->getSequence(), msg->getReplicationId()));
}
}
if (op) txn->enlist(op);
} else {
dequeue(0, cursor);
}
}
void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor)
{
ScopedUse u(barrier);
if (!u.acquired) return;
ScopedAutoDelete autodelete(*this);
boost::intrusive_ptr<PersistableMessage> pmsg;
{
Mutex::ScopedLock locker(messageLock);
Message* msg = messages->find(cursor);
if (msg) {
if (msg->isPersistent()) pmsg = msg->getPersistentContext();
if (!ctxt) {
observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0);
messages->deleted(cursor);//message pointer not valid after this
}
} else {
return;
}
}
if (store && pmsg) {
store->dequeue(ctxt, pmsg, *this);
}
}
void Queue::dequeueCommitted(const QueueCursor& cursor)
{
ScopedAutoDelete autodelete(*this);
Mutex::ScopedLock locker(messageLock);
Message* msg = messages->find(cursor);
if (msg) {
const uint64_t contentSize = msg->getMessageSize();
observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(contentSize);
}
if (brokerMgmtObject) {
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
bStats->msgTxnDequeues += 1;
bStats->byteTxnDequeues += contentSize;
brokerMgmtObject->statisticsUpdated();
}
messages->deleted(cursor);
} else {
QPID_LOG(error, "Could not find dequeued message on commit");
}
}
/**
* Updates policy and management when a message has been dequeued,
* Requires messageLock be held by caller.
*/
void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock& lock, ScopedAutoDelete* autodelete)
{
current -= QueueDepth(1, msg.getMessageSize());
mgntDeqStats(msg, mgmtObject, brokerMgmtObject);
observers.dequeued(msg, lock);
if (autodelete && isEmpty(lock)) autodelete->check(lock);
}
/** updates queue observers when a message has become unavailable for transfer.
* Requires messageLock be held by caller.
*/
void Queue::observeAcquire(const Message& msg, const Mutex::ScopedLock& l)
{
observers.acquired(msg, l);
}
/** updates queue observers when a message has become re-available for transfer
* Requires messageLock be held by caller.
*/
void Queue::observeRequeue(const Message& msg, const Mutex::ScopedLock& l)
{
observers.requeued(msg, l);
}
/** updates queue observers when a new consumer has subscribed to this queue.
*/
void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock& l)
{
observers.consumerAdded(c, l);
}
/** updates queue observers when a consumer has unsubscribed from this queue.
*/
void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock& l)
{
observers.consumerRemoved(c, l);
}
void Queue::create()
{
if (store) {
store->create(*this, settings.storeSettings);
}
}
int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key)
{
qpid::framing::FieldTable::ValuePtr v = settings.get(key);
if (!v) {
return 0;
} else if (v->convertsTo<int>()) {
return v->get<int>();
} else if (v->convertsTo<std::string>()){
std::string s = v->get<std::string>();
try {
return boost::lexical_cast<int>(s);
} catch(const boost::bad_lexical_cast&) {
QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
return 0;
}
} else {
QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v);
return 0;
}
}
bool getBoolSetting(const qpid::framing::FieldTable& settings, const std::string& key)
{
qpid::framing::FieldTable::ValuePtr v = settings.get(key);
if (!v) {
return false;
} else if (v->convertsTo<int>()) {
return v->get<int>() != 0;
} else if (v->convertsTo<std::string>()){
std::string s = v->get<std::string>();
if (s == "True") return true;
if (s == "true") return true;
if (s == "False") return false;
if (s == "false") return false;
try {
return boost::lexical_cast<bool>(s);
} catch(const boost::bad_lexical_cast&) {
QPID_LOG(warning, "Ignoring invalid boolean value for " << key << ": " << s);
return false;
}
} else {
QPID_LOG(warning, "Ignoring invalid boolean value for " << key << ": " << *v);
return false;
}
}
void Queue::abandoned(const Message& message)
{
if (reroute(alternateExchange, message) && brokerMgmtObject)
brokerMgmtObject->inc_abandonedViaAlt();
else if (brokerMgmtObject)
brokerMgmtObject->inc_abandoned();
}
void Queue::destroyed()
{
if (mgmtObject != 0)
mgmtObject->debugStats("destroying");
unbind(broker->getExchanges());
remove(0, 0, boost::bind(&Queue::abandoned, this, _1), REPLICATOR/*even acquired message are treated as abandoned*/, false);
if (alternateExchange.get()) {
alternateExchange->decAlternateUsers();
alternateExchange.reset();
}
if (store) {
barrier.destroy();
store->flush(*this);
store->destroy(*this);
store = 0;//ensure we make no more calls to the store for this queue
}
notifyDeleted();
{
Mutex::ScopedLock l(messageLock);
if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
observers.destroy(l);
}
if (mgmtObject != 0) {
mgmtObject->resourceDestroy();
if (brokerMgmtObject)
brokerMgmtObject->dec_queueCount();
mgmtObject = _qmf::Queue::shared_ptr(); // dont print debugStats in Queue::~Queue
}
}
void Queue::notifyDeleted()
{
QueueListeners::ListenerSet set;
{
Mutex::ScopedLock locker(messageLock);
deleted = true;
listeners.snapshot(set);
}
set.notifyAll();
}
void Queue::bound(const string& exchange, const string& key,
const FieldTable& args)
{
bindings.add(exchange, key, args);
}
void Queue::unbind(ExchangeRegistry& exchanges)
{
bindings.unbind(exchanges, shared_from_this());
}
uint64_t Queue::getPersistenceId() const
{
return persistenceId;
}
void Queue::setPersistenceId(uint64_t _persistenceId) const
{
if (mgmtObject != 0 && persistenceId == 0 && externalQueueStore)
{
ManagementObject::shared_ptr childObj = externalQueueStore->GetManagementObject();
if (childObj != 0)
childObj->setReference(mgmtObject->getObjectId());
}
persistenceId = _persistenceId;
}
void Queue::encode(Buffer& buffer) const
{
buffer.putShortString(name);
buffer.put(encodableSettings);
buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string(""));
buffer.putShortString(userId);
buffer.putInt8(isAutoDelete());
}
uint32_t Queue::encodedSize() const
{
return name.size() + 1/*short string size octet*/
+ (alternateExchange.get() ? alternateExchange->getName().size() : 0) + 1 /* short string */
+ userId.size() + 1 /* short string */
+ 1 /* autodelete flag */
+ encodableSettings.encodedSize();
}
void Queue::updateAclUserQueueCount()
{
if (broker->getAcl())
broker->getAcl()->approveCreateQueue(userId, name);
}
Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer )
{
string name;
string _userId;
FieldTable ft;
boost::shared_ptr<Exchange> alternate;
QueueSettings settings(true, false); // settings.autodelete might be overwritten
string altExch;
bool has_userId = false;
bool has_altExch = false;
buffer.getShortString(name);
buffer.get(ft);
settings.populate(ft, settings.storeSettings);
//get alternate exchange
if (buffer.available()) {
buffer.getShortString(altExch);
has_altExch = true;
}
//get userId of queue's creator; ACL counters for userId are done after ACL plugin is initialized
if (buffer.available()) {
buffer.getShortString(_userId);
has_userId = true;
}
//get autodelete flag
if (buffer.available()) {
settings.autodelete = buffer.getInt8();
}
std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate, true);
if (has_altExch)
result.first->alternateExchangeName.assign(altExch);
if (has_userId)
result.first->setOwningUser(_userId);
if (result.first->getSettings().autoDeleteDelay) {
result.first->scheduleAutoDelete();
}
return result.first;
}
void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange)
{
alternateExchange = exchange;
alternateExchange->incAlternateUsers();
if (mgmtObject) {
if (exchange.get() != 0)
mgmtObject->set_altExchange(exchange->GetManagementObject()->getObjectId());
else
mgmtObject->clr_altExchange();
}
}
boost::shared_ptr<Exchange> Queue::getAlternateExchange()
{
return alternateExchange;
}
struct AutoDeleteTask : qpid::sys::TimerTask
{
Queue::shared_ptr queue;
long expectedVersion;
AutoDeleteTask(Queue::shared_ptr q, AbsTime fireTime)
: qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), queue(q), expectedVersion(q->version) {}
void fire()
{
//need to detect case where queue was used after the task was
//created, but then became unused again before the task fired;
//in this case ignore this request as there will have already
//been a later task added
queue->tryAutoDelete(expectedVersion);
}
};
void Queue::scheduleAutoDelete(bool immediate)
{
if (canAutoDelete()) {
if (!immediate && settings.autoDeleteDelay) {
AbsTime time(now(), Duration(settings.autoDeleteDelay * TIME_SEC));
autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(shared_from_this(), time));
broker->getTimer().add(autoDeleteTask);
QPID_LOG(debug, "Timed auto-delete for " << getName() << " initiated");
} else {
tryAutoDelete(version);
}
}
}
void Queue::tryAutoDelete(long expectedVersion)
{
bool proceed(false);
{
Mutex::ScopedLock locker(messageLock);
if (!deleted && checkAutoDelete(locker)) {
proceed = true;
}
}
if (proceed) {
if (broker->getQueues().destroyIfUntouched(name, expectedVersion)) {
{
Mutex::ScopedLock locker(messageLock);
deleted = true;
}
if (broker->getAcl())
broker->getAcl()->recordDestroyQueue(name);
QPID_LOG_CAT(debug, model, "Auto-delete queue deleted: " << name << " (" << deleted << ")");
} else {
//queue was accessed since the delayed auto-delete was scheduled, so try again
QPID_LOG_CAT(debug, model, "Auto-delete interrupted for queue: " << name);
scheduleAutoDelete();
}
} else {
QPID_LOG_CAT(debug, model, "Auto-delete queue could not be deleted: " << name);
}
}
bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
{
Mutex::ScopedLock locker(messageLock);
return o == owner;
}
void Queue::releaseExclusiveOwnership(bool immediateExpiry)
{
bool unused;
{
Mutex::ScopedLock locker(messageLock);
owner = 0;
if (mgmtObject) {
mgmtObject->set_exclusive(false);
}
unused = !users.isUsed();
}
if (unused && settings.autodelete) {
scheduleAutoDelete(immediateExpiry);
}
}
bool Queue::setExclusiveOwner(const OwnershipToken* const o)
{
//reset auto deletion timer if necessary
if (settings.autoDeleteDelay && autoDeleteTask) {
autoDeleteTask->cancel();
}
Mutex::ScopedLock locker(messageLock);
if (owner || users.hasConsumers()) {
return false;
} else {
owner = o;
if (mgmtObject) {
mgmtObject->set_exclusive(true);
}
return true;
}
}
bool Queue::hasExclusiveOwner() const
{
Mutex::ScopedLock locker(messageLock);
return owner != 0;
}
bool Queue::hasExclusiveConsumer() const
{
return exclusive;
}
void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
if (externalQueueStore!=inst && externalQueueStore)
delete externalQueueStore;
externalQueueStore = inst;
if (inst) {
ManagementObject::shared_ptr childObj = inst->GetManagementObject();
if (childObj != 0 && mgmtObject != 0)
childObj->setReference(mgmtObject->getObjectId());
}
}
void Queue::countRejected() const
{
if (mgmtObject) {
mgmtObject->inc_discardsSubscriber();
if (brokerMgmtObject)
brokerMgmtObject->inc_discardsSubscriber();
}
}
ManagementObject::shared_ptr Queue::GetManagementObject(void) const
{
return mgmtObject;
}
Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string& etext)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
AclModule* acl = broker->getAcl();
std::string _userId = (getCurrentPublisher()?getCurrentPublisher()->getUserId():"");
QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
switch (methodId) {
case _qmf::Queue::METHOD_PURGE :
{
if ((acl)&&(!(acl->authorise(_userId, acl::ACT_PURGE, acl::OBJ_QUEUE, name, NULL)))) {
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied purge request from " << _userId));
}
_qmf::ArgsQueuePurge& purgeArgs = (_qmf::ArgsQueuePurge&) args;
purge(purgeArgs.i_request, boost::shared_ptr<Exchange>(), &purgeArgs.i_filter);
status = Manageable::STATUS_OK;
}
break;
case _qmf::Queue::METHOD_REROUTE :
{
_qmf::ArgsQueueReroute& rerouteArgs = (_qmf::ArgsQueueReroute&) args;
boost::shared_ptr<Exchange> dest;
if (rerouteArgs.i_useAltExchange) {
if (!alternateExchange) {
status = Manageable::STATUS_PARAMETER_INVALID;
etext = "No alternate-exchange defined";
break;
}
dest = alternateExchange;
} else {
try {
dest = broker->getExchanges().get(rerouteArgs.i_exchange);
} catch(const std::exception&) {
status = Manageable::STATUS_PARAMETER_INVALID;
etext = "Exchange not found";
break;
}
}
if (acl) {
std::map<acl::Property, std::string> params;
params.insert(make_pair(acl::PROP_EXCHANGENAME, dest->getName()));
if (!acl->authorise(_userId, acl::ACT_REROUTE, acl::OBJ_QUEUE, name, &params)) {
throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied reroute request from " << _userId));
}
}
purge(rerouteArgs.i_request, dest, &rerouteArgs.i_filter);
status = Manageable::STATUS_OK;
}
break;
}
return status;
}
void Queue::query(qpid::types::Variant::Map& results) const
{
Mutex::ScopedLock locker(messageLock);
/** @todo add any interesting queue state into results */
if (allocator) allocator->query(results);
}
namespace {
struct After {
framing::SequenceNumber seq;
After(framing::SequenceNumber s) : seq(s) {}
bool operator()(const Message& m) { return m.getSequence() > seq; }
};
} // namespace
void Queue::setPosition(SequenceNumber n) {
Mutex::ScopedLock locker(messageLock);
if (n < sequence) {
remove(0, After(n), MessagePredicate(), BROWSER, false);
}
sequence = n;
QPID_LOG(debug, "Set position to " << sequence << " on " << getName());
}
SequenceNumber Queue::getPosition() {
Mutex::ScopedLock locker(messageLock);
return sequence;
}
void Queue::getRange(framing::SequenceNumber& front, framing::SequenceNumber& back,
SubscriptionType type)
{
Mutex::ScopedLock locker(messageLock);
QueueCursor cursor(type);
back = sequence;
Message* message = messages->next(cursor);
front = message ? message->getSequence() : back+1;
}
int Queue::getEventMode() { return eventMode; }
void Queue::recoveryComplete(ExchangeRegistry& exchanges)
{
// set the alternate exchange
if (!alternateExchangeName.empty()) {
Exchange::shared_ptr ae = exchanges.find(alternateExchangeName);
if (ae) setAlternateExchange(ae);
else QPID_LOG(warning, "Could not set alternate exchange \""
<< alternateExchangeName << "\" on queue \"" << name
<< "\": exchange does not exist.");
}
//process any pending dequeues
for (std::vector<Message>::iterator i = pendingDequeues.begin(); i != pendingDequeues.end(); ++i) {
dequeueFromStore(i->getPersistentContext());
}
pendingDequeues.clear();
}
/** updates queue observers and state when a message has become available for transfer
* Requires messageLock be held by caller.
*/
void Queue::observeEnqueue(const Message& m, const Mutex::ScopedLock& l)
{
observers.enqueued(m, l);
mgntEnqStats(m, mgmtObject, brokerMgmtObject);
}
bool Queue::checkNotDeleted(const Consumer::shared_ptr& c)
{
if (deleted && !c->hideDeletedError())
throw ResourceDeletedException(QPID_MSG("Queue " << getName() << " has been deleted."));
return !deleted;
}
bool Queue::isDeleted() const
{
Mutex::ScopedLock lock(messageLock);
return deleted;
}
void Queue::flush()
{
ScopedUse u(barrier);
if (u.acquired && store) store->flush(*this);
}
bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
const qpid::framing::FieldTable& arguments)
{
if (!isDeleted() && exchange->bind(shared_from_this(), key, &arguments)) {
bound(exchange->getName(), key, arguments);
if (exchange->isDurable() && isDurable()) {
store->bind(*exchange, *this, key, arguments);
}
return true;
} else {
return false;
}
}
Broker* Queue::getBroker()
{
return broker;
}
void Queue::setDequeueSincePurge(uint32_t value) {
dequeueSincePurge = value;
}
void Queue::reject(const QueueCursor& cursor)
{
ScopedAutoDelete autodelete(*this);
Exchange::shared_ptr alternate = getAlternateExchange();
Message copy;
boost::intrusive_ptr<PersistableMessage> pmsg;
{
Mutex::ScopedLock locker(messageLock);
Message* message = messages->find(cursor);
if (message) {
if (alternate) copy = *message;
if (message->isPersistent()) pmsg = message->getPersistentContext();
countRejected();
observeDequeue(*message, locker, settings.autodelete ? &autodelete : 0);
messages->deleted(cursor);
} else {
return;
}
}
if (alternate) {
copy.resetDeliveryCount();
DeliverableMessage delivery(copy, 0);
alternate->routeWithAlternate(delivery);
QPID_LOG(info, "Routed rejected message from " << getName() << " to "
<< alternate->getName());
} else {
//just drop it
QPID_LOG(info, "Dropping rejected message from " << getName());
}
dequeueFromStore(pmsg);
}
bool Queue::checkDepth(const QueueDepth& increment, const Message&)
{
if (settings.maxDepth && (settings.maxDepth - current < increment)) {
if (mgmtObject) {
mgmtObject->inc_discardsOverflow();
if (brokerMgmtObject)
brokerMgmtObject->inc_discardsOverflow();
}
throw ResourceLimitExceededException(QPID_MSG("Maximum depth exceeded on " << name << ": current=[" << current << "], max=[" << settings.maxDepth << "]"));
} else {
current += increment;
return true;
}
}
bool Queue::seek(QueueCursor& cursor, MessagePredicate predicate)
{
Mutex::ScopedLock locker(messageLock);
//hold lock across calls to predicate, or take copy of message?
//currently hold lock, may want to revise depending on any new use
//cases
Message* message = messages->next(cursor);
while (message && (predicate && !predicate(*message))) {
message = messages->next(cursor);
}
return message != 0;
}
bool Queue::seek(QueueCursor& cursor, MessagePredicate predicate, qpid::framing::SequenceNumber start)
{
Mutex::ScopedLock locker(messageLock);
//hold lock across calls to predicate, or take copy of message?
//currently hold lock, may want to revise depending on any new use
//cases
Message* message;
message = messages->find(start, &cursor);
if (message && (!predicate || predicate(*message))) return true;
return seek(cursor, predicate);
}
bool Queue::seek(QueueCursor& cursor, qpid::framing::SequenceNumber start)
{
Mutex::ScopedLock locker(messageLock);
return messages->find(start, &cursor);
}
Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
bool Queue::UsageBarrier::acquire()
{
Monitor::ScopedLock l(usageLock);
if (parent.deleted) {
return false;
} else {
++count;
return true;
}
}
void Queue::UsageBarrier::release()
{
Monitor::ScopedLock l(usageLock);
if (--count == 0) usageLock.notifyAll();
}
void Queue::UsageBarrier::destroy()
{
Monitor::ScopedLock l(usageLock);
parent.deleted = true;
while (count) usageLock.wait();
}
void Queue::addArgument(const string& key, const types::Variant& value) {
settings.original[key] = value;
qpid::amqp_0_10::translate(settings.asMap(), encodableSettings);
boost::shared_ptr<qpid::framing::FieldValue> v;
qpid::amqp_0_10::translate(value, v);
settings.storeSettings.set(key, v);
if (mgmtObject != 0) mgmtObject->set_arguments(settings.asMap());
}
void Queue::setRedirectPeer ( Queue::shared_ptr peer, bool isSrc) {
Mutex::ScopedLock locker(messageLock);
redirectPeer = peer;
redirectSource = isSrc;
}
void Queue::setMgmtRedirectState( std::string peer, bool enabled, bool isSrc ) {
if (mgmtObject != 0) {
mgmtObject->set_redirectPeer(enabled ? peer : "");
mgmtObject->set_redirectSource(isSrc);
}
}
void Queue::setOwningUser(std::string& _userId) {
userId = _userId;
if (mgmtObject != 0)
mgmtObject->set_creator(userId);
}
bool Queue::reroute(boost::shared_ptr<Exchange> e, const Message& m)
{
if (e) {
DeliverableMessage d(m, 0);
d.getMessage().clearTrace();
e->routeWithAlternate(d);
return true;
} else {
return false;
}
}
Queue::QueueUsers::QueueUsers() : consumers(0), browsers(0), others(0), controller(false) {}
void Queue::QueueUsers::addConsumer() { ++consumers; }
void Queue::QueueUsers::addBrowser() { ++browsers; }
void Queue::QueueUsers::addLifecycleController() { assert(!controller); controller = true; }
void Queue::QueueUsers::addOther(){ ++others; }
void Queue::QueueUsers::removeConsumer() { assert(consumers > 0); --consumers; }
void Queue::QueueUsers::removeBrowser() { assert(browsers > 0); --browsers; }
void Queue::QueueUsers::removeLifecycleController() { assert(controller); controller = false; }
void Queue::QueueUsers::removeOther() { assert(others > 0); --others; }
bool Queue::QueueUsers::isInUseByController() const { return controller; }
bool Queue::QueueUsers::isUsed() const { return controller || consumers || browsers || others; }
uint32_t Queue::QueueUsers::getSubscriberCount() const { return consumers + browsers; }
bool Queue::QueueUsers::hasConsumers() const { return consumers; }
Queue::ScopedAutoDelete::ScopedAutoDelete(Queue& q) : queue(q), eligible(false) {}
void Queue::ScopedAutoDelete::check(const sys::Mutex::ScopedLock& lock)
{
eligible = queue.checkAutoDelete(lock);
}
Queue::ScopedAutoDelete::~ScopedAutoDelete()
{
if (eligible) queue.scheduleAutoDelete();
}
}}