blob: bab6f2ea55443f6d2d20b10cff5473e839f7a47f [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/broker/Queue.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/QueueCursor.h"
#include "qpid/broker/QueueDepth.h"
#include "qpid/broker/QueueSettings.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/DeliverableMessage.h"
//#include "qpid/broker/MessageStore.h"
#include "qpid/broker/MessageDeque.h"
#include "qpid/broker/MessageDistributor.h"
#include "qpid/broker/FifoDistributor.h"
//#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueAsyncContext.h"
#include "qpid/broker/QueueRegistry.h"
//TODO: get rid of this
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/StringUtils.h"
#include "qpid/log/Statement.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
#include "qpid/sys/Timer.h"
#include "qpid/types/Variant.h"
#include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
#include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h"
#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include <iostream>
#include <algorithm>
#include <functional>
#include <boost/bind.hpp>
#include <boost/intrusive_ptr.hpp>
namespace qpid {
namespace broker {
using namespace qpid::sys;
using namespace qpid::framing;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
using std::string;
using std::for_each;
using std::mem_fun;
namespace _qmf = qmf::org::apache::qpid::broker;
namespace
{
inline void mgntEnqStats(const Message& msg,
_qmf::Queue::shared_ptr mgmtObject,
_qmf::Broker::shared_ptr brokerMgmtObject)
{
if (mgmtObject != 0) {
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
uint64_t contentSize = msg.getContentSize();
qStats->msgTotalEnqueues +=1;
bStats->msgTotalEnqueues += 1;
qStats->byteTotalEnqueues += contentSize;
bStats->byteTotalEnqueues += contentSize;
if (msg.isPersistent ()) {
qStats->msgPersistEnqueues += 1;
bStats->msgPersistEnqueues += 1;
qStats->bytePersistEnqueues += contentSize;
bStats->bytePersistEnqueues += contentSize;
}
mgmtObject->statisticsUpdated();
brokerMgmtObject->statisticsUpdated();
}
}
inline void mgntDeqStats(const Message& msg,
_qmf::Queue::shared_ptr mgmtObject,
_qmf::Broker::shared_ptr brokerMgmtObject)
{
if (mgmtObject != 0){
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
uint64_t contentSize = msg.getContentSize();
qStats->msgTotalDequeues += 1;
bStats->msgTotalDequeues += 1;
qStats->byteTotalDequeues += contentSize;
bStats->byteTotalDequeues += contentSize;
if (msg.isPersistent ()){
qStats->msgPersistDequeues += 1;
bStats->msgPersistDequeues += 1;
qStats->bytePersistDequeues += contentSize;
bStats->bytePersistDequeues += contentSize;
}
mgmtObject->statisticsUpdated();
brokerMgmtObject->statisticsUpdated();
}
}
QueueSettings merge(const QueueSettings& inputs, const Broker::Options& globalOptions)
{
QueueSettings settings(inputs);
if (!settings.maxDepth.hasSize() && globalOptions.queueLimit) {
settings.maxDepth.setSize(globalOptions.queueLimit);
}
return settings;
}
}
Queue::TxPublish::TxPublish(const Message& m, boost::shared_ptr<Queue> q) : message(m), queue(q), prepared(false) {}
bool Queue::TxPublish::prepare(TransactionContext* ctxt) throw()
{
try {
prepared = queue->enqueue(ctxt, message);
return true;
} catch (const std::exception& e) {
QPID_LOG(error, "Failed to prepare: " << e.what());
return false;
}
}
void Queue::TxPublish::commit() throw()
{
try {
if (prepared) queue->process(message);
} catch (const std::exception& e) {
QPID_LOG(error, "Failed to commit: " << e.what());
}
}
void Queue::TxPublish::rollback() throw()
{
try {
if (prepared) queue->enqueueAborted(message);
} catch (const std::exception& e) {
QPID_LOG(error, "Failed to rollback: " << e.what());
}
}
Queue::Queue(const string& _name, const QueueSettings& _settings,
AsyncStore* const _asyncStore,
Manageable* parent,
Broker* b) :
name(_name),
asyncStore(_asyncStore),
owner(0),
consumerCount(0),
browserCount(0),
exclusive(0),
persistLastNode(false),
inLastNodeFailure(false),
messages(new MessageDeque()),
persistenceId(0),
settings(b ? merge(_settings, b->getOptions()) : _settings),
eventMode(0),
broker(b),
deleted(false),
barrier(*this),
allocator(new FifoDistributor( *messages ))
{
if (settings.maxDepth.hasCount()) current.setCount(0);
if (settings.maxDepth.hasSize()) current.setSize(0);
if (settings.traceExcludes.size()) {
split(traceExclude, settings.traceExcludes, ", ");
}
qpid::amqp_0_10::translate(settings.asMap(), encodableSettings);
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
if (agent != 0) {
mgmtObject = _qmf::Queue::shared_ptr(
new _qmf::Queue(agent, this, parent, _name, _asyncStore != 0, settings.autodelete));
mgmtObject->set_arguments(settings.asMap());
agent->addObject(mgmtObject, 0, asyncStore != 0);
brokerMgmtObject = boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject());
if (brokerMgmtObject)
brokerMgmtObject->inc_queueCount();
}
}
if ( settings.isBrowseOnly ) {
QPID_LOG ( info, "Queue " << name << " is browse-only." );
}
}
Queue::~Queue()
{
}
bool isLocalTo(const OwnershipToken* token, const Message& msg)
{
return token && token->isLocal(msg.getPublisher());
}
bool Queue::isLocal(const Message& msg)
{
//message is considered local if it was published on the same
//connection as that of the session which declared this queue
//exclusive (owner) or which has an exclusive subscription
//(exclusive)
return settings.noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg));
}
bool Queue::isExcluded(const Message& msg)
{
return traceExclude.size() && msg.isExcluded(traceExclude);
}
void Queue::deliver(Message msg, TxBuffer* txn){
//TODO: move some of this out of the queue and into the publishing
//'link' for whatever protocol is used; that would let protocol
//specific stuff be kept out the queue
if (broker::amqp_0_10::MessageTransfer::isImmediateDeliveryRequired(msg) && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg, 0);
alternateExchange->route(deliverable);
}
} else if (isLocal(msg)) {
//drop message
QPID_LOG(info, "Dropping 'local' message from " << getName());
} else if (isExcluded(msg)) {
//drop message
QPID_LOG(info, "Dropping excluded message from " << getName());
} else {
if (txn) {
TxOp::shared_ptr op(new TxPublish(msg, shared_from_this()));
txn->enlist(op);
} else {
if (enqueue(0, msg)) {
push(msg);
QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
} else {
QPID_LOG(debug, "Message " << msg << " dropped from " << name);
}
}
}
}
void Queue::recoverPrepared(const Message& msg)
{
Mutex::ScopedLock locker(messageLock);
current += QueueDepth(1, msg.getContentSize());
}
void Queue::recover(Message& msg)
{
recoverPrepared(msg);
push(msg, true);
}
void Queue::process(Message& msg)
{
push(msg);
if (mgmtObject != 0){
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
const uint64_t contentSize = msg.getContentSize();
qStats->msgTxnEnqueues += 1;
qStats->byteTxnEnqueues += contentSize;
mgmtObject->statisticsUpdated();
if (brokerMgmtObject) {
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
bStats->msgTxnEnqueues += 1;
bStats->byteTxnEnqueues += contentSize;
brokerMgmtObject->statisticsUpdated();
}
}
}
void Queue::release(const QueueCursor& position, bool markRedelivered)
{
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
if (!deleted) {
Message* message = messages->release(position);
if (message) {
if (!markRedelivered) message->undeliver();
listeners.populate(copy);
observeRequeue(*message, locker);
if (mgmtObject) {
mgmtObject->inc_releases();
if (brokerMgmtObject)
brokerMgmtObject->inc_releases();
}
}
}
}
copy.notify();
}
bool Queue::dequeueMessageAt(const SequenceNumber& position)
{
boost::intrusive_ptr<PersistableMessage> pmsg;
{
Mutex::ScopedLock locker(messageLock);
QPID_LOG(debug, "Attempting to dequeue message at " << position);
QueueCursor cursor;
Message* msg = messages->find(position, &cursor);
if (msg) {
if (msg->isPersistent()) pmsg = msg->getPersistentContext();
observeDequeue(*msg, locker);
messages->deleted(cursor);
} else {
QPID_LOG(debug, "Could not dequeue message at " << position << "; no such message");
return false;
}
}
dequeueFromStore(pmsg);
return true;
}
bool Queue::acquire(const QueueCursor& position, const std::string& consumer)
{
Mutex::ScopedLock locker(messageLock);
Message* msg;
msg = messages->find(position);
if (msg) {
QPID_LOG(debug, consumer << " attempting to acquire message at " << msg->getSequence());
if (!allocator->acquire(consumer, *msg)) {
QPID_LOG(debug, "Not permitted to acquire msg at " << msg->getSequence() << " from '" << name);
return false;
} else {
observeAcquire(*msg, locker);
QPID_LOG(debug, "Acquired message at " << msg->getSequence() << " from " << name);
return true;
}
} else {
QPID_LOG(debug, "Failed to acquire message which no longer exists on " << name);
return false;
}
}
bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c)
{
if (!checkNotDeleted(c)) return false;
QueueListeners::NotificationSet set;
while (true) {
//TODO: reduce lock scope
Mutex::ScopedLock locker(messageLock);
QueueCursor cursor = c->getCursor(); // Save current position.
Message* msg = messages->next(*c); // Advances c.
if (msg) {
if (msg->hasExpired()) {
QPID_LOG(debug, "Message expired from queue '" << name << "'");
observeDequeue(*msg, locker);
//ERROR: don't hold lock across call to store!!
if (msg->isPersistent()) dequeueFromStore(msg->getPersistentContext());
if (mgmtObject) {
mgmtObject->inc_discardsTtl();
if (brokerMgmtObject)
brokerMgmtObject->inc_discardsTtl();
}
messages->deleted(*c);
continue;
}
if (c->filter(*msg)) {
if (c->accept(*msg)) {
if (c->preAcquires()) {
QPID_LOG(debug, "Attempting to acquire message " << msg << " from '" << name << "' with state " << msg->getState());
if (allocator->acquire(c->getName(), *msg)) {
if (mgmtObject) {
mgmtObject->inc_acquires();
if (brokerMgmtObject)
brokerMgmtObject->inc_acquires();
}
observeAcquire(*msg, locker);
msg->deliver();
} else {
QPID_LOG(debug, "Could not acquire message from '" << name << "'");
continue; //try another message
}
}
QPID_LOG(debug, "Message retrieved from '" << name << "'");
m = *msg;
return true;
} else {
//message(s) are available but consumer hasn't got enough credit
QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'");
c->setCursor(cursor); // Restore cursor, will try again with credit
if (c->preAcquires()) {
//let someone else try
listeners.populate(set);
}
break;
}
} else {
//consumer will never want this message, try another one
QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'");
if (c->preAcquires()) {
//let someone else try to take this one
listeners.populate(set);
}
}
} else {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
listeners.addListener(c);
return false;
}
}
set.notify();
return false;
}
void Queue::removeListener(Consumer::shared_ptr c)
{
QueueListeners::NotificationSet set;
{
Mutex::ScopedLock locker(messageLock);
listeners.removeListener(c);
if (messages->size()) {
listeners.populate(set);
}
}
set.notify();
}
bool Queue::dispatch(Consumer::shared_ptr c)
{
Message msg;
if (getNextMessage(msg, c)) {
c->deliver(*c, msg);
return true;
} else {
return false;
}
}
bool Queue::find(SequenceNumber pos, Message& msg) const
{
Mutex::ScopedLock locker(messageLock);
Message* ptr = messages->find(pos, 0);
if (ptr) {
msg = *ptr;
return true;
}
return false;
}
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive)
{
{
Mutex::ScopedLock locker(messageLock);
// NOTE: consumerCount is actually a count of all
// subscriptions, both acquiring and non-acquiring (browsers).
// Check for exclusivity of acquiring consumers.
size_t acquiringConsumers = consumerCount - browserCount;
if (c->preAcquires()) {
if(settings.isBrowseOnly) {
throw NotAllowedException(
QPID_MSG("Queue " << name << " is browse only. Refusing acquiring consumer."));
}
if(exclusive) {
throw ResourceLockedException(
QPID_MSG("Queue " << getName()
<< " has an exclusive consumer. No more consumers allowed."));
} else if(requestExclusive) {
if(acquiringConsumers) {
throw ResourceLockedException(
QPID_MSG("Queue " << getName()
<< " already has consumers. Exclusive access denied."));
} else {
exclusive = c->getSession();
}
}
}
else if(c->isCounted()) {
browserCount++;
}
if(c->isCounted()) {
consumerCount++;
//reset auto deletion timer if necessary
if (settings.autoDeleteDelay && autoDeleteTask) {
autoDeleteTask->cancel();
}
observeConsumerAdd(*c, locker);
}
}
if (mgmtObject != 0 && c->isCounted()) {
mgmtObject->inc_consumerCount();
}
}
void Queue::cancel(Consumer::shared_ptr c)
{
removeListener(c);
if(c->isCounted())
{
Mutex::ScopedLock locker(messageLock);
consumerCount--;
if (!c->preAcquires()) browserCount--;
if(exclusive) exclusive = 0;
observeConsumerRemove(*c, locker);
}
if (mgmtObject != 0 && c->isCounted()) {
mgmtObject->dec_consumerCount();
}
}
/**
*@param lapse: time since the last purgeExpired
*/
void Queue::purgeExpired(sys::Duration lapse) {
//As expired messages are discarded during dequeue also, only
//bother explicitly expiring if the rate of dequeues since last
//attempt is less than one per second.
int count = dequeueSincePurge.get();
dequeueSincePurge -= count;
int seconds = int64_t(lapse)/qpid::sys::TIME_SEC;
if (seconds == 0 || count / seconds < 1) {
uint32_t count = remove(0, boost::bind(&Message::hasExpired, _1), 0, CONSUMER);
QPID_LOG(debug, "Purged " << count << " expired messages from " << getName());
//
// Report the count of discarded-by-ttl messages
//
if (mgmtObject && count) {
mgmtObject->inc_acquires(count);
mgmtObject->inc_discardsTtl(count);
if (brokerMgmtObject) {
brokerMgmtObject->inc_acquires(count);
brokerMgmtObject->inc_discardsTtl(count);
}
}
}
}
namespace {
// for use with purge/move below - collect messages that match a given filter
//
class MessageFilter
{
public:
static const std::string typeKey;
static const std::string paramsKey;
static MessageFilter *create( const ::qpid::types::Variant::Map *filter );
virtual bool match( const Message& ) const { return true; }
virtual ~MessageFilter() {}
protected:
MessageFilter() {};
};
const std::string MessageFilter::typeKey("filter_type");
const std::string MessageFilter::paramsKey("filter_params");
// filter by message header string value exact match
class HeaderMatchFilter : public MessageFilter
{
public:
/* Config:
{ 'filter_type' : 'header_match_str',
'filter_params' : { 'header_key' : "<header name>",
'header_value' : "<value to match>"
}
}
*/
static const std::string typeKey;
static const std::string headerKey;
static const std::string valueKey;
HeaderMatchFilter( const std::string& _header, const std::string& _value )
: MessageFilter (), header(_header), value(_value) {}
bool match( const Message& msg ) const
{
return msg.getPropertyAsString(header) == value;
}
private:
const std::string header;
const std::string value;
};
const std::string HeaderMatchFilter::typeKey("header_match_str");
const std::string HeaderMatchFilter::headerKey("header_key");
const std::string HeaderMatchFilter::valueKey("header_value");
// factory to create correct filter based on map
MessageFilter* MessageFilter::create( const ::qpid::types::Variant::Map *filter )
{
using namespace qpid::types;
if (filter && !filter->empty()) {
Variant::Map::const_iterator i = filter->find(MessageFilter::typeKey);
if (i != filter->end()) {
if (i->second.asString() == HeaderMatchFilter::typeKey) {
Variant::Map::const_iterator p = filter->find(MessageFilter::paramsKey);
if (p != filter->end() && p->second.getType() == VAR_MAP) {
Variant::Map::const_iterator k = p->second.asMap().find(HeaderMatchFilter::headerKey);
Variant::Map::const_iterator v = p->second.asMap().find(HeaderMatchFilter::valueKey);
if (k != p->second.asMap().end() && v != p->second.asMap().end()) {
std::string headerKey(k->second.asString());
std::string value(v->second.asString());
QPID_LOG(debug, "Message filtering by header value configured. key: " << headerKey << " value: " << value );
return new HeaderMatchFilter( headerKey, value );
}
}
}
}
QPID_LOG(error, "Ignoring unrecognized message filter: '" << *filter << "'");
}
return new MessageFilter();
}
bool reroute(boost::shared_ptr<Exchange> e, const Message& m)
{
if (e) {
DeliverableMessage d(m, 0);
d.getMessage().clearTrace();
e->routeWithAlternate(d);
return true;
} else {
return false;
}
}
void moveTo(boost::shared_ptr<Queue> q, Message& m)
{
if (q) {
q->deliver(m);
}
}
} // end namespace
uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f, SubscriptionType type)
{
std::deque<Message> removed;
{
QueueCursor c(type);
uint32_t count(0);
Mutex::ScopedLock locker(messageLock);
Message* m = messages->next(c);
while (m){
if (!p || p(*m)) {
if (!maxCount || count++ < maxCount) {
if (m->getState() == AVAILABLE) {
//don't actually acquire, just act as if we did
observeAcquire(*m, locker);
}
observeDequeue(*m, locker);
removed.push_back(*m);//takes a copy of the message
if (!messages->deleted(c)) {
QPID_LOG(warning, "Failed to correctly remove message from " << name << "; state is not consistent!");
assert(false);
}
} else {
break;
}
}
m = messages->next(c);
}
}
for (std::deque<Message>::iterator i = removed.begin(); i != removed.end(); ++i) {
if (f) f(*i);//ERROR? need to clear old persistent context?
if (i->isPersistent()) dequeueFromStore(i->getPersistentContext());//do this outside of lock and after any re-routing
}
return removed.size();
}
/**
* purge - for purging all or some messages on a queue
* depending on the purge_request
*
* qty == 0 then purge all messages
* == N then purge N messages from queue
* Sometimes qty == 1 to unblock the top of queue
*
* The dest exchange may be supplied to re-route messages through the exchange.
* It is safe to re-route messages such that they arrive back on the same queue,
* even if the queue is ordered by priority.
*
* An optional filter can be supplied that will be applied against each message. The
* message is purged only if the filter matches. See MessageDistributor for more detail.
*/
uint32_t Queue::purge(const uint32_t qty, boost::shared_ptr<Exchange> dest,
const qpid::types::Variant::Map *filter)
{
std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
uint32_t count = remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&reroute, dest, _1), CONSUMER/*?*/);
if (mgmtObject && count) {
mgmtObject->inc_acquires(count);
if (dest.get()) {
mgmtObject->inc_reroutes(count);
if (brokerMgmtObject) {
brokerMgmtObject->inc_acquires(count);
brokerMgmtObject->inc_reroutes(count);
}
} else {
mgmtObject->inc_discardsPurge(count);
if (brokerMgmtObject) {
brokerMgmtObject->inc_acquires(count);
brokerMgmtObject->inc_discardsPurge(count);
}
}
}
return count;
}
uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty,
const qpid::types::Variant::Map *filter)
{
std::auto_ptr<MessageFilter> mf(MessageFilter::create(filter));
return remove(qty, boost::bind(&MessageFilter::match, mf.get(), _1), boost::bind(&moveTo, destq, _1), CONSUMER/*?*/);
}
void Queue::push(Message& message, bool /*isRecovery*/)
{
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
message.setSequence(++sequence);
messages->publish(message);
listeners.populate(copy);
observeEnqueue(message, locker);
}
copy.notify();
}
uint32_t Queue::getMessageCount() const
{
Mutex::ScopedLock locker(messageLock);
return messages->size();
}
uint32_t Queue::getConsumerCount() const
{
Mutex::ScopedLock locker(messageLock);
return consumerCount;
}
bool Queue::canAutoDelete() const
{
Mutex::ScopedLock locker(messageLock);
return settings.autodelete && !consumerCount && !owner;
}
void Queue::clearLastNodeFailure()
{
inLastNodeFailure = false;
}
void Queue::forcePersistent(const Message& /*message*/)
{
//TODO
}
void Queue::setLastNodeFailure()
{
if (persistLastNode){
Mutex::ScopedLock locker(messageLock);
try {
messages->foreach(boost::bind(&Queue::forcePersistent, this, _1));
} catch (const std::exception& e) {
// Could not go into last node standing (for example journal not large enough)
QPID_LOG(error, "Unable to fail to last node standing for queue: " << name << " : " << e.what());
}
inLastNodeFailure = true;
}
}
/*
* return true if enqueue succeeded and message should be made
* available; returning false will result in the message being dropped
*/
bool Queue::enqueue(TransactionContext* /*ctxt*/, Message& msg)
{
ScopedUse u(barrier);
if (!u.acquired) return false;
{
Mutex::ScopedLock locker(messageLock);
if (!checkDepth(QueueDepth(1, msg.getContentSize()), msg)) {
return false;
}
}
if (inLastNodeFailure && persistLastNode){
forcePersistent(msg);
}
if (settings.traceId.size()) {
msg.addTraceId(settings.traceId);
}
// if (msg.isPersistent() && store) {
if (msg.isPersistent() && asyncStore) {
// mark the message as being enqueued - the store MUST CALL msg->enqueueComplete()
// when it considers the message stored.
boost::intrusive_ptr<PersistableMessage> pmsg = msg.getPersistentContext();
assert(pmsg);
// pmsg->enqueueAsync(shared_from_this(), store);
// store->enqueue(ctxt, pmsg, *this);
pmsg->enqueueAsync(shared_from_this(), asyncStore);
pmsg->createMessageHandle(asyncStore);
EnqueueHandle& eh = pmsg->createEnqueueHandle(queueHandle, asyncStore);
TxnHandle th; // TODO: kpvdr: Impement transactions
boost::shared_ptr<QueueAsyncContext> qac(
new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()),
pmsg,
&enqueueComplete,
&broker->getAsyncResultQueue()));
asyncStore->submitEnqueue(eh, th, qac);
}
return true;
}
void Queue::enqueueAborted(const Message& msg)
{
//Called when any transactional enqueue is aborted (including but
//not limited to a recovered dtx transaction)
Mutex::ScopedLock locker(messageLock);
current -= QueueDepth(1, msg.getContentSize());
}
void Queue::enqueueCommited(Message& msg)
{
//called when a recovered dtx enqueue operation is committed; the
//message is already on disk and space has been reserved in policy
//but it should now be made available
process(msg);
}
void Queue::dequeueAborted(Message& msg)
{
//called when a recovered dtx dequeue operation is aborted; the
//message should be added back to the queue
push(msg);
}
void Queue::dequeueCommited(const Message& msg)
{
//called when a recovered dtx dequeue operation is committed; the
//message will at this point have already been removed from the
//store and will not be available for delivery. The only action
//required is to ensure the observers are notified and the
//management stats are correctly decremented
Mutex::ScopedLock locker(messageLock);
observeDequeue(msg, locker);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(msg.getContentSize());
}
}
void Queue::dequeueFromStore(boost::intrusive_ptr<PersistableMessage> msg)
{
ScopedUse u(barrier);
// if (u.acquired && msg && store) {
if (u.acquired && msg && asyncStore) {
// store->dequeue(0, msg, *this);
// TODO: kpvdr: async dequeue here
}
}
void Queue::dequeue(TransactionContext* ctxt, const QueueCursor& cursor)
{
ScopedUse u(barrier);
if (!u.acquired) return;
boost::intrusive_ptr<PersistableMessage> pmsg;
{
Mutex::ScopedLock locker(messageLock);
Message* msg = messages->find(cursor);
if (msg) {
if (msg->isPersistent()) pmsg = msg->getPersistentContext();
if (!ctxt) {
observeDequeue(*msg, locker);
messages->deleted(cursor);//message pointer not valid after this
}
} else {
return;
}
}
// if (store && pmsg) {
if (asyncStore && pmsg) {
// store->dequeue(ctxt, pmsg, *this);
pmsg->dequeueAsync(shared_from_this(), asyncStore);
TxnHandle th; // TODO: kpvdr: Impement transactions
EnqueueHandle& eh = pmsg->getEnqueueHandle(queueHandle);
boost::shared_ptr<QueueAsyncContext> qac(
new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()),
pmsg,
&dequeueComplete,
&broker->getAsyncResultQueue()));
asyncStore->submitDequeue(eh, th, qac);
}
}
void Queue::dequeueCommitted(const QueueCursor& cursor)
{
Mutex::ScopedLock locker(messageLock);
Message* msg = messages->find(cursor);
if (msg) {
const uint64_t contentSize = msg->getContentSize();
observeDequeue(*msg, locker);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(contentSize);
}
if (brokerMgmtObject) {
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
bStats->msgTxnDequeues += 1;
bStats->byteTxnDequeues += contentSize;
brokerMgmtObject->statisticsUpdated();
}
messages->deleted(cursor);
} else {
QPID_LOG(error, "Could not find dequeued message on commit");
}
}
/**
* Updates policy and management when a message has been dequeued,
* Requires messageLock be held by caller.
*/
void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock&)
{
current -= QueueDepth(1, msg.getContentSize());
mgntDeqStats(msg, mgmtObject, brokerMgmtObject);
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->dequeued(msg);
} catch (const std::exception& e) {
QPID_LOG(warning, "Exception on notification of dequeue for queue " << getName() << ": " << e.what());
}
}
}
/** updates queue observers when a message has become unavailable for transfer.
* Requires messageLock be held by caller.
*/
void Queue::observeAcquire(const Message& msg, const Mutex::ScopedLock&)
{
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->acquired(msg);
} catch (const std::exception& e) {
QPID_LOG(warning, "Exception on notification of message removal for queue " << getName() << ": " << e.what());
}
}
}
/** updates queue observers when a message has become re-available for transfer
* Requires messageLock be held by caller.
*/
void Queue::observeRequeue(const Message& msg, const Mutex::ScopedLock&)
{
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->requeued(msg);
} catch (const std::exception& e) {
QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what());
}
}
}
/** updates queue observers when a new consumer has subscribed to this queue.
*/
void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock&)
{
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->consumerAdded(c);
} catch (const std::exception& e) {
QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what());
}
}
}
/** updates queue observers when a consumer has unsubscribed from this queue.
*/
void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock&)
{
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->consumerRemoved(c);
} catch (const std::exception& e) {
QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what());
}
}
}
void Queue::create()
{
// if (store) {
if (asyncStore) {
// store->create(*this, settings.storeSettings);
queueHandle = asyncStore->createQueueHandle(name, qpid::types::Variant::Map());
boost::shared_ptr<QueueAsyncContext> qac(new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()), &createComplete, &broker->getAsyncResultQueue()));
asyncStore->submitCreate(queueHandle, this, qac);
}
}
int getIntegerSetting(const qpid::framing::FieldTable& settings, const std::string& key)
{
qpid::framing::FieldTable::ValuePtr v = settings.get(key);
if (!v) {
return 0;
} else if (v->convertsTo<int>()) {
return v->get<int>();
} else if (v->convertsTo<std::string>()){
std::string s = v->get<std::string>();
try {
return boost::lexical_cast<int>(s);
} catch(const boost::bad_lexical_cast&) {
QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << s);
return 0;
}
} else {
QPID_LOG(warning, "Ignoring invalid integer value for " << key << ": " << *v);
return 0;
}
}
bool getBoolSetting(const qpid::framing::FieldTable& settings, const std::string& key)
{
qpid::framing::FieldTable::ValuePtr v = settings.get(key);
if (!v) {
return false;
} else if (v->convertsTo<int>()) {
return v->get<int>() != 0;
} else if (v->convertsTo<std::string>()){
std::string s = v->get<std::string>();
if (s == "True") return true;
if (s == "true") return true;
if (s == "False") return false;
if (s == "false") return false;
try {
return boost::lexical_cast<bool>(s);
} catch(const boost::bad_lexical_cast&) {
QPID_LOG(warning, "Ignoring invalid boolean value for " << key << ": " << s);
return false;
}
} else {
QPID_LOG(warning, "Ignoring invalid boolean value for " << key << ": " << *v);
return false;
}
}
void Queue::abandoned(const Message& message)
{
if (reroute(alternateExchange, message) && brokerMgmtObject)
brokerMgmtObject->inc_abandonedViaAlt();
else if (brokerMgmtObject)
brokerMgmtObject->inc_abandoned();
}
void Queue::destroyed()
{
unbind(broker->getExchanges());
remove(0, 0, boost::bind(&Queue::abandoned, this, _1), REPLICATOR/*even acquired message are treated as abandoned*/);
if (alternateExchange.get()) {
alternateExchange->decAlternateUsers();
}
// if (store) {
if (asyncStore) {
barrier.destroy();
// store->flush(*this);
boost::shared_ptr<QueueAsyncContext> flush_qac(new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()), &flushComplete, &broker->getAsyncResultQueue()));
asyncStore->submitFlush(queueHandle, flush_qac);
// store->destroy(*this);
boost::shared_ptr<QueueAsyncContext> destroy_qac(new QueueAsyncContext(boost::dynamic_pointer_cast<PersistableQueue>(shared_from_this()), &destroyComplete, &broker->getAsyncResultQueue()));
asyncStore->submitDestroy(queueHandle, destroy_qac);
// store = 0;//ensure we make no more calls to the store for this queue
// TODO: kpvdr: cannot set asyncStore to 0 until all async store ops are complete. Rather set flag which
// will prevent new calls from succeeding and cause store to be destroyed when all outstanding async ops are complete.
}
if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>();
notifyDeleted();
{
Mutex::ScopedLock lock(messageLock);
for_each(observers.begin(), observers.end(),
boost::bind(&QueueObserver::destroy, _1));
observers.clear();
}
if (mgmtObject != 0) {
mgmtObject->resourceDestroy();
if (brokerMgmtObject)
brokerMgmtObject->dec_queueCount();
}
}
void Queue::notifyDeleted()
{
QueueListeners::ListenerSet set;
{
Mutex::ScopedLock locker(messageLock);
deleted = true;
listeners.snapshot(set);
}
set.notifyAll();
}
void Queue::bound(const string& exchange, const string& key,
const FieldTable& args)
{
bindings.add(exchange, key, args);
}
void Queue::unbind(ExchangeRegistry& exchanges)
{
bindings.unbind(exchanges, shared_from_this(), asyncStore);
}
uint64_t Queue::getPersistenceId() const
{
return persistenceId;
}
void Queue::setPersistenceId(uint64_t _persistenceId) const
{
if (mgmtObject != 0 && persistenceId == 0 && externalQueueStore)
{
ManagementObject::shared_ptr childObj = externalQueueStore->GetManagementObject();
if (childObj != 0)
childObj->setReference(mgmtObject->getObjectId());
}
persistenceId = _persistenceId;
}
void Queue::encode(Buffer& buffer) const
{
buffer.putShortString(name);
buffer.put(encodableSettings);
buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string(""));
}
uint32_t Queue::encodedSize() const
{
return name.size() + 1/*short string size octet*/
+ (alternateExchange.get() ? alternateExchange->getName().size() : 0) + 1 /* short string */
+ encodableSettings.encodedSize();
}
Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer )
{
string name;
buffer.getShortString(name);
FieldTable ft;
buffer.get(ft);
boost::shared_ptr<Exchange> alternate;
QueueSettings settings(true, false);
settings.populate(ft, settings.storeSettings);
std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate, true);
if (buffer.available()) {
string altExch;
buffer.getShortString(altExch);
result.first->alternateExchangeName.assign(altExch);
}
return result.first;
}
void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange)
{
alternateExchange = exchange;
alternateExchange->incAlternateUsers();
if (mgmtObject) {
if (exchange.get() != 0)
mgmtObject->set_altExchange(exchange->GetManagementObject()->getObjectId());
else
mgmtObject->clr_altExchange();
}
}
boost::shared_ptr<Exchange> Queue::getAlternateExchange()
{
return alternateExchange;
}
void tryAutoDeleteImpl(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId)
{
if (broker.getQueues().destroyIf(queue->getName(),
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
QPID_LOG_CAT(debug, model, "Auto-delete queue: " << queue->getName()
<< " user:" << userId
<< " rhost:" << connectionId );
queue->destroyed();
}
}
struct AutoDeleteTask : qpid::sys::TimerTask
{
Broker& broker;
Queue::shared_ptr queue;
std::string connectionId;
std::string userId;
AutoDeleteTask(Broker& b, Queue::shared_ptr q, const std::string& cId, const std::string& uId, AbsTime fireTime)
: qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q), connectionId(cId), userId(uId) {}
void fire()
{
//need to detect case where queue was used after the task was
//created, but then became unused again before the task fired;
//in this case ignore this request as there will have already
//been a later task added
tryAutoDeleteImpl(broker, queue, connectionId, userId);
}
};
void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue, const std::string& connectionId, const std::string& userId)
{
if (queue->settings.autoDeleteDelay && queue->canAutoDelete()) {
AbsTime time(now(), Duration(queue->settings.autoDeleteDelay * TIME_SEC));
queue->autoDeleteTask = boost::intrusive_ptr<qpid::sys::TimerTask>(new AutoDeleteTask(broker, queue, connectionId, userId, time));
broker.getTimer().add(queue->autoDeleteTask);
QPID_LOG(debug, "Timed auto-delete for " << queue->getName() << " initiated");
} else {
tryAutoDeleteImpl(broker, queue, connectionId, userId);
}
}
bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
{
Mutex::ScopedLock locker(ownershipLock);
return o == owner;
}
void Queue::releaseExclusiveOwnership()
{
Mutex::ScopedLock locker(ownershipLock);
owner = 0;
if (mgmtObject) {
mgmtObject->set_exclusive(false);
}
}
bool Queue::setExclusiveOwner(const OwnershipToken* const o)
{
//reset auto deletion timer if necessary
if (settings.autoDeleteDelay && autoDeleteTask) {
autoDeleteTask->cancel();
}
Mutex::ScopedLock locker(ownershipLock);
if (owner) {
return false;
} else {
owner = o;
if (mgmtObject) {
mgmtObject->set_exclusive(true);
}
return true;
}
}
bool Queue::hasExclusiveOwner() const
{
Mutex::ScopedLock locker(ownershipLock);
return owner != 0;
}
bool Queue::hasExclusiveConsumer() const
{
return exclusive;
}
void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
if (externalQueueStore!=inst && externalQueueStore)
delete externalQueueStore;
externalQueueStore = inst;
if (inst) {
ManagementObject::shared_ptr childObj = inst->GetManagementObject();
if (childObj != 0 && mgmtObject != 0)
childObj->setReference(mgmtObject->getObjectId());
}
}
uint64_t Queue::getSize() { return 0; } // TODO: kpvdr: implement
void Queue::write(char* /*target*/) {} // TODO: kpvdr: implement
// static
void Queue::createComplete(const AsyncResultHandle* const arh) {
boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Create complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
}
//static
void Queue::dequeueComplete(const AsyncResultHandle* const arh) {
boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
boost::shared_ptr<PersistableQueue> pq = qac->getQueue();
boost::intrusive_ptr<PersistableMessage> pmsg = qac->getMessage();
QueueHandle& qh = pq->getQueueHandle();
pmsg->dequeueComplete();
pmsg->removeEnqueueHandle(qh);
// std::cout << "@@@@ Queue \"" << pq->getName() << "\": Dequeue complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
}
//static
void Queue::destroyComplete(const AsyncResultHandle* const arh) {
boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
// TODO: kpvdr: set Queue::asyncStore = 0 from here.
// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Destroy complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
}
//static
void Queue::enqueueComplete(const AsyncResultHandle* const arh) {
boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
qac->getMessage()->enqueueComplete();
// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Enqueue complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
}
//static
void Queue::flushComplete(const AsyncResultHandle* const arh) {
boost::shared_ptr<QueueAsyncContext> qac = boost::dynamic_pointer_cast<QueueAsyncContext>(arh->getBrokerAsyncContext());
// std::cout << "@@@@ Queue \"" << qac->getQueue()->getName() << "\": Flush complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl;
}
void Queue::countRejected() const
{
if (mgmtObject) {
mgmtObject->inc_discardsSubscriber();
if (brokerMgmtObject)
brokerMgmtObject->inc_discardsSubscriber();
}
}
void Queue::countFlowedToDisk(uint64_t size) const
{
if (mgmtObject) {
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
qStats->msgFtdEnqueues += 1;
qStats->byteFtdEnqueues += size;
mgmtObject->statisticsUpdated();
if (brokerMgmtObject) {
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
bStats->msgFtdEnqueues += 1;
bStats->byteFtdEnqueues += size;
brokerMgmtObject->statisticsUpdated();
}
}
}
void Queue::countLoadedFromDisk(uint64_t size) const
{
if (mgmtObject) {
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
qStats->msgFtdDequeues += 1;
qStats->byteFtdDequeues += size;
mgmtObject->statisticsUpdated();
if (brokerMgmtObject) {
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
bStats->msgFtdDequeues += 1;
bStats->byteFtdDequeues += size;
brokerMgmtObject->statisticsUpdated();
}
}
}
ManagementObject::shared_ptr Queue::GetManagementObject(void) const
{
return mgmtObject;
}
Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string& etext)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
switch (methodId) {
case _qmf::Queue::METHOD_PURGE :
{
_qmf::ArgsQueuePurge& purgeArgs = (_qmf::ArgsQueuePurge&) args;
purge(purgeArgs.i_request, boost::shared_ptr<Exchange>(), &purgeArgs.i_filter);
status = Manageable::STATUS_OK;
}
break;
case _qmf::Queue::METHOD_REROUTE :
{
_qmf::ArgsQueueReroute& rerouteArgs = (_qmf::ArgsQueueReroute&) args;
boost::shared_ptr<Exchange> dest;
if (rerouteArgs.i_useAltExchange) {
if (!alternateExchange) {
status = Manageable::STATUS_PARAMETER_INVALID;
etext = "No alternate-exchange defined";
break;
}
dest = alternateExchange;
} else {
try {
dest = broker->getExchanges().get(rerouteArgs.i_exchange);
} catch(const std::exception&) {
status = Manageable::STATUS_PARAMETER_INVALID;
etext = "Exchange not found";
break;
}
}
purge(rerouteArgs.i_request, dest, &rerouteArgs.i_filter);
status = Manageable::STATUS_OK;
}
break;
}
return status;
}
void Queue::query(qpid::types::Variant::Map& results) const
{
Mutex::ScopedLock locker(messageLock);
/** @todo add any interesting queue state into results */
if (allocator) allocator->query(results);
}
namespace {
struct After {
framing::SequenceNumber seq;
After(framing::SequenceNumber s) : seq(s) {}
bool operator()(const Message& m) { return m.getSequence() > seq; }
};
} // namespace
void Queue::setPosition(SequenceNumber n) {
Mutex::ScopedLock locker(messageLock);
if (n < sequence) {
remove(0, After(n), MessagePredicate(), BROWSER);
}
sequence = n;
QPID_LOG(debug, "Set position to " << sequence << " on " << getName());
}
SequenceNumber Queue::getPosition() {
Mutex::ScopedLock locker(messageLock);
return sequence;
}
void Queue::getRange(framing::SequenceNumber& front, framing::SequenceNumber& back,
SubscriptionType type)
{
Mutex::ScopedLock locker(messageLock);
QueueCursor cursor(type);
back = sequence;
Message* message = messages->next(cursor);
front = message ? message->getSequence() : back+1;
}
int Queue::getEventMode() { return eventMode; }
void Queue::recoveryComplete(ExchangeRegistry& exchanges)
{
// set the alternate exchange
if (!alternateExchangeName.empty()) {
Exchange::shared_ptr ae = exchanges.find(alternateExchangeName);
if (ae) setAlternateExchange(ae);
else QPID_LOG(warning, "Could not set alternate exchange \""
<< alternateExchangeName << "\" on queue \"" << name
<< "\": exchange does not exist.");
}
//process any pending dequeues
for (std::vector<Message>::iterator i = pendingDequeues.begin(); i != pendingDequeues.end(); ++i) {
dequeueFromStore(i->getPersistentContext());
}
pendingDequeues.clear();
}
/** updates queue observers and state when a message has become available for transfer
* Requires messageLock be held by caller.
*/
void Queue::observeEnqueue(const Message& m, const Mutex::ScopedLock&)
{
for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) {
try {
(*i)->enqueued(m);
} catch (const std::exception& e) {
QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what());
}
}
mgntEnqStats(m, mgmtObject, brokerMgmtObject);
}
bool Queue::checkNotDeleted(const Consumer::shared_ptr& c)
{
if (deleted && !c->hideDeletedError())
throw ResourceDeletedException(QPID_MSG("Queue " << getName() << " has been deleted."));
return !deleted;
}
void Queue::addObserver(boost::shared_ptr<QueueObserver> observer)
{
Mutex::ScopedLock lock(messageLock);
observers.insert(observer);
}
void Queue::removeObserver(boost::shared_ptr<QueueObserver> observer)
{
Mutex::ScopedLock lock(messageLock);
observers.erase(observer);
}
void Queue::flush()
{
ScopedUse u(barrier);
// if (u.acquired && store) store->flush(*this);
// TODO: kpvdr: Async store flush here
if (u.acquired && asyncStore) {
//store->flush(*this);
std::cout << "&&&& Queue::flush(): Queue=\"" << name << "\"" << std::endl << std::flush;
}
}
bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key,
const qpid::framing::FieldTable& arguments)
{
if (exchange->bind(shared_from_this(), key, &arguments, asyncStore)) {
bound(exchange->getName(), key, arguments);
// Move this to Exchange::bind() which keeps the binding context
// if (exchange->isDurable() && isDurable()) {
// store->bind(*exchange, *this, key, arguments);
// // TODO: kpvdr: Store configuration here
// }
return true;
} else {
return false;
}
}
Broker* Queue::getBroker()
{
return broker;
}
void Queue::setDequeueSincePurge(uint32_t value) {
dequeueSincePurge = value;
}
void Queue::reject(const QueueCursor& cursor)
{
Exchange::shared_ptr alternate = getAlternateExchange();
Message copy;
boost::intrusive_ptr<PersistableMessage> pmsg;
{
Mutex::ScopedLock locker(messageLock);
Message* message = messages->find(cursor);
if (message) {
if (alternate) copy = *message;
if (message->isPersistent()) pmsg = message->getPersistentContext();
countRejected();
observeDequeue(*message, locker);
messages->deleted(cursor);
} else {
return;
}
}
if (alternate) {
copy.resetDeliveryCount();
DeliverableMessage delivery(copy, 0);
alternate->routeWithAlternate(delivery);
QPID_LOG(info, "Routed rejected message from " << getName() << " to "
<< alternate->getName());
} else {
//just drop it
QPID_LOG(info, "Dropping rejected message from " << getName());
}
dequeueFromStore(pmsg);
}
bool Queue::checkDepth(const QueueDepth& increment, const Message&)
{
if (current && (settings.maxDepth - current < increment)) {
if (mgmtObject) {
mgmtObject->inc_discardsOverflow();
if (brokerMgmtObject)
brokerMgmtObject->inc_discardsOverflow();
}
throw ResourceLimitExceededException(QPID_MSG("Maximum depth exceeded on " << name << ": current=[" << current << "], max=[" << settings.maxDepth << "]"));
} else {
current += increment;
return true;
}
}
bool Queue::seek(QueueCursor& cursor, MessagePredicate predicate)
{
Mutex::ScopedLock locker(messageLock);
//hold lock across calls to predicate, or take copy of message?
//currently hold lock, may want to revise depending on any new use
//cases
Message* message = messages->next(cursor);
while (message && (predicate && !predicate(*message))) {
message = messages->next(cursor);
}
return message != 0;
}
bool Queue::seek(QueueCursor& cursor, MessagePredicate predicate, qpid::framing::SequenceNumber start)
{
Mutex::ScopedLock locker(messageLock);
//hold lock across calls to predicate, or take copy of message?
//currently hold lock, may want to revise depending on any new use
//cases
Message* message;
message = messages->find(start, &cursor);
if (message && (!predicate || predicate(*message))) return true;
return seek(cursor, predicate);
}
bool Queue::seek(QueueCursor& cursor, qpid::framing::SequenceNumber start)
{
Mutex::ScopedLock locker(messageLock);
return messages->find(start, &cursor);
}
Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
bool Queue::UsageBarrier::acquire()
{
Monitor::ScopedLock l(usageLock);
if (parent.deleted) {
return false;
} else {
++count;
return true;
}
}
void Queue::UsageBarrier::release()
{
Monitor::ScopedLock l(usageLock);
if (--count == 0) usageLock.notifyAll();
}
void Queue::UsageBarrier::destroy()
{
Monitor::ScopedLock l(usageLock);
parent.deleted = true;
while (count) usageLock.wait();
}
void Queue::addArgument(const string& key, const types::Variant& value) {
settings.original.insert(types::Variant::Map::value_type(key, value));
if (mgmtObject != 0) mgmtObject->set_arguments(settings.asMap());
}
}}