| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| #include "Broker.h" |
| #include "Queue.h" |
| #include "Exchange.h" |
| #include "DeliverableMessage.h" |
| #include "MessageStore.h" |
| #include "NullMessageStore.h" |
| #include "QueueRegistry.h" |
| |
| #include "qpid/StringUtils.h" |
| #include "qpid/log/Statement.h" |
| #include "qpid/framing/reply_exceptions.h" |
| #include "qpid/sys/Monitor.h" |
| #include "qpid/sys/Time.h" |
| #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h" |
| |
| #include <iostream> |
| #include <algorithm> |
| #include <functional> |
| |
| #include <boost/bind.hpp> |
| #include <boost/intrusive_ptr.hpp> |
| |
| using namespace qpid::broker; |
| using namespace qpid::sys; |
| using namespace qpid::framing; |
| using qpid::management::ManagementAgent; |
| using qpid::management::ManagementObject; |
| using qpid::management::Manageable; |
| using qpid::management::Args; |
| using std::for_each; |
| using std::mem_fun; |
| namespace _qmf = qmf::org::apache::qpid::broker; |
| |
| |
| namespace |
| { |
| const std::string qpidMaxSize("qpid.max_size"); |
| const std::string qpidMaxCount("qpid.max_count"); |
| const std::string qpidNoLocal("no-local"); |
| const std::string qpidTraceIdentity("qpid.trace.id"); |
| const std::string qpidTraceExclude("qpid.trace.exclude"); |
| const std::string qpidLastValueQueue("qpid.last_value_queue"); |
| const std::string qpidLastValueQueueNoBrowse("qpid.last_value_queue_no_browse"); |
| const std::string qpidPersistLastNode("qpid.persist_last_node"); |
| const std::string qpidVQMatchProperty("qpid.LVQ_key"); |
| } |
| |
| |
| Queue::Queue(const string& _name, bool _autodelete, |
| MessageStore* const _store, |
| const OwnershipToken* const _owner, |
| Manageable* parent) : |
| |
| name(_name), |
| autodelete(_autodelete), |
| store(_store), |
| owner(_owner), |
| consumerCount(0), |
| exclusive(0), |
| noLocal(false), |
| lastValueQueue(false), |
| lastValueQueueNoBrowse(false), |
| persistLastNode(false), |
| inLastNodeFailure(false), |
| persistenceId(0), |
| policyExceeded(false), |
| mgmtObject(0) |
| { |
| if (parent != 0) |
| { |
| ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); |
| |
| if (agent != 0) |
| { |
| mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete, _owner != 0); |
| |
| // Add the object to the management agent only if this queue is not durable. |
| // If it's durable, we will add it later when the queue is assigned a persistenceId. |
| if (store == 0) |
| agent->addObject (mgmtObject); |
| } |
| } |
| } |
| |
| Queue::~Queue() |
| { |
| if (mgmtObject != 0) |
| mgmtObject->resourceDestroy (); |
| } |
| |
| void Queue::notifyDurableIOComplete() |
| { |
| QueueListeners::NotificationSet copy; |
| { |
| Mutex::ScopedLock locker(messageLock); |
| listeners.populate(copy); |
| } |
| copy.notify(); |
| } |
| |
| bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg) |
| { |
| return token && token->isLocal(msg->getPublisher()); |
| } |
| |
| bool Queue::isLocal(boost::intrusive_ptr<Message>& msg) |
| { |
| //message is considered local if it was published on the same |
| //connection as that of the session which declared this queue |
| //exclusive (owner) or which has an exclusive subscription |
| //(exclusive) |
| return noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg)); |
| } |
| |
| bool Queue::isExcluded(boost::intrusive_ptr<Message>& msg) |
| { |
| return traceExclude.size() && msg->isExcluded(traceExclude); |
| } |
| |
| void Queue::deliver(boost::intrusive_ptr<Message>& msg){ |
| |
| if (msg->isImmediate() && getConsumerCount() == 0) { |
| if (alternateExchange) { |
| DeliverableMessage deliverable(msg); |
| alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders()); |
| } |
| } else if (isLocal(msg)) { |
| //drop message |
| QPID_LOG(info, "Dropping 'local' message from " << getName()); |
| } else if (isExcluded(msg)) { |
| //drop message |
| QPID_LOG(info, "Dropping excluded message from " << getName()); |
| } else { |
| // if no store then mark as enqueued |
| if (!enqueue(0, msg)){ |
| push(msg); |
| msg->enqueueComplete(); |
| }else { |
| push(msg); |
| } |
| mgntEnqStats(msg); |
| QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); |
| } |
| } |
| |
| |
| void Queue::recover(boost::intrusive_ptr<Message>& msg){ |
| push(msg); |
| msg->enqueueComplete(); // mark the message as enqueued |
| mgntEnqStats(msg); |
| |
| if (store && !msg->isContentLoaded()) { |
| //content has not been loaded, need to ensure that lazy loading mode is set: |
| //TODO: find a nicer way to do this |
| msg->releaseContent(store); |
| } |
| } |
| |
| void Queue::process(boost::intrusive_ptr<Message>& msg){ |
| push(msg); |
| mgntEnqStats(msg); |
| if (mgmtObject != 0){ |
| mgmtObject->inc_msgTxnEnqueues (); |
| mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); |
| } |
| } |
| |
| void Queue::requeue(const QueuedMessage& msg){ |
| if (policy.get() && !policy->isEnqueued(msg)) return; |
| |
| QueueListeners::NotificationSet copy; |
| { |
| Mutex::ScopedLock locker(messageLock); |
| msg.payload->enqueueComplete(); // mark the message as enqueued |
| messages.push_front(msg); |
| listeners.populate(copy); |
| } |
| copy.notify(); |
| } |
| |
| void Queue::clearLVQIndex(const QueuedMessage& msg){ |
| if (lastValueQueue){ |
| const framing::FieldTable* ft = msg.payload->getApplicationHeaders(); |
| string key = ft->getAsString(qpidVQMatchProperty); |
| lvq.erase(key); |
| } |
| } |
| |
| bool Queue::acquire(const QueuedMessage& msg) { |
| Mutex::ScopedLock locker(messageLock); |
| QPID_LOG(debug, "attempting to acquire " << msg.position); |
| for (Messages::iterator i = messages.begin(); i != messages.end(); i++) { |
| if ((i->position == msg.position && !lastValueQueue) // note that in some cases payload not be set |
| || (lastValueQueue && (i->position == msg.position) && |
| msg.payload.get() == checkLvqReplace(*i).payload.get()) ) { |
| |
| clearLVQIndex(msg); |
| messages.erase(i); |
| QPID_LOG(debug, "Match found, acquire succeeded: " << i->position << " == " << msg.position); |
| return true; |
| } else { |
| QPID_LOG(debug, "No match: " << i->position << " != " << msg.position); |
| } |
| } |
| QPID_LOG(debug, "Acquire failed for " << msg.position); |
| return false; |
| } |
| |
| bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c) |
| { |
| if (c->preAcquires()) { |
| return consumeNextMessage(m, c); |
| } else { |
| return browseNextMessage(m, c); |
| } |
| } |
| |
| bool Queue::checkForMessages(Consumer::shared_ptr c) |
| { |
| Mutex::ScopedLock locker(messageLock); |
| if (messages.empty()) { |
| //no message available, register consumer for notification |
| //when this changes |
| listeners.addListener(c); |
| return false; |
| } else { |
| QueuedMessage msg = getFront(); |
| if (store && !msg.payload->isEnqueueComplete()) { |
| //though a message is on the queue, it has not yet been |
| //enqueued and so is not available for consumption yet, |
| //register consumer for notification when this changes |
| listeners.addListener(c); |
| return false; |
| } else { |
| //check that consumer has sufficient credit for the |
| //message (if it does not, no need to register it for |
| //notification as the consumer itself will handle the |
| //credit allocation required to change this condition). |
| return c->accept(msg.payload); |
| } |
| } |
| } |
| |
| bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) |
| { |
| while (true) { |
| Mutex::ScopedLock locker(messageLock); |
| if (messages.empty()) { |
| QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); |
| listeners.addListener(c); |
| return false; |
| } else { |
| QueuedMessage msg = getFront(); |
| if (msg.payload->hasExpired()) { |
| QPID_LOG(debug, "Message expired from queue '" << name << "'"); |
| popAndDequeue(); |
| continue; |
| } |
| |
| if (c->filter(msg.payload)) { |
| if (c->accept(msg.payload)) { |
| m = msg; |
| popMsg(msg); |
| return true; |
| } else { |
| //message(s) are available but consumer hasn't got enough credit |
| QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); |
| return false; |
| } |
| } else { |
| //consumer will never want this message |
| QPID_LOG(debug, "Consumer doesn't want message from '" << name << "'"); |
| return false; |
| } |
| } |
| } |
| } |
| |
| |
| bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) |
| { |
| QueuedMessage msg(this); |
| while (seek(msg, c)) { |
| if (c->filter(msg.payload) && !msg.payload->hasExpired()) { |
| if (c->accept(msg.payload)) { |
| //consumer wants the message |
| c->position = msg.position; |
| m = msg; |
| if (!lastValueQueueNoBrowse) clearLVQIndex(msg); |
| if (lastValueQueue) { |
| boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); |
| if (replacement.get()) m.payload = replacement; |
| } |
| return true; |
| } else { |
| //browser hasn't got enough credit for the message |
| QPID_LOG(debug, "Browser can't currently accept message from '" << name << "'"); |
| return false; |
| } |
| } else { |
| //consumer will never want this message, continue seeking |
| c->position = msg.position; |
| QPID_LOG(debug, "Browser skipping message from '" << name << "'"); |
| } |
| } |
| return false; |
| } |
| |
| void Queue::removeListener(Consumer::shared_ptr c) |
| { |
| QueueListeners::NotificationSet set; |
| { |
| Mutex::ScopedLock locker(messageLock); |
| listeners.removeListener(c); |
| if (messages.size()) { |
| listeners.populate(set); |
| } |
| } |
| set.notify(); |
| } |
| |
| bool Queue::dispatch(Consumer::shared_ptr c) |
| { |
| QueuedMessage msg(this); |
| if (getNextMessage(msg, c)) { |
| c->deliver(msg); |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { |
| Mutex::ScopedLock locker(messageLock); |
| if (!messages.empty() && messages.back().position > c->position) { |
| if (c->position < getFront().position) { |
| msg = getFront(); |
| return true; |
| } else { |
| //TODO: can improve performance of this search, for now just searching linearly from end |
| Messages::reverse_iterator pos; |
| for (Messages::reverse_iterator i = messages.rbegin(); i != messages.rend() && i->position > c->position; i++) { |
| pos = i; |
| } |
| msg = *pos; |
| return true; |
| } |
| } |
| listeners.addListener(c); |
| return false; |
| } |
| |
| namespace { |
| struct PositionEquals { |
| SequenceNumber pos; |
| PositionEquals(SequenceNumber p) : pos(p) {} |
| bool operator()(const QueuedMessage& msg) const { return msg.position == pos; } |
| }; |
| }// namespace |
| |
| QueuedMessage Queue::find(SequenceNumber pos) const { |
| Mutex::ScopedLock locker(messageLock); |
| Messages::const_iterator i = std::find_if(messages.begin(), messages.end(), PositionEquals(pos)); |
| if (i != messages.end()) |
| return *i; |
| return QueuedMessage(); |
| } |
| |
| void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){ |
| Mutex::ScopedLock locker(consumerLock); |
| if(exclusive) { |
| throw ResourceLockedException( |
| QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed.")); |
| } else if(requestExclusive) { |
| if(consumerCount) { |
| throw ResourceLockedException( |
| QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied.")); |
| } else { |
| exclusive = c->getSession(); |
| } |
| } |
| consumerCount++; |
| if (mgmtObject != 0) |
| mgmtObject->inc_consumerCount (); |
| } |
| |
| void Queue::cancel(Consumer::shared_ptr c){ |
| removeListener(c); |
| Mutex::ScopedLock locker(consumerLock); |
| consumerCount--; |
| if(exclusive) exclusive = 0; |
| if (mgmtObject != 0) |
| mgmtObject->dec_consumerCount (); |
| } |
| |
| QueuedMessage Queue::get(){ |
| Mutex::ScopedLock locker(messageLock); |
| QueuedMessage msg(this); |
| |
| if(!messages.empty()){ |
| msg = getFront(); |
| popMsg(msg); |
| } |
| return msg; |
| } |
| |
| void Queue::purgeExpired() |
| { |
| //As expired messages are discarded during dequeue also, only |
| //bother explicitly expiring if the rate of dequeues since last |
| //attempt is less than one per second. |
| if (dequeueTracker.sampleRatePerSecond() < 1) { |
| Messages expired; |
| { |
| Mutex::ScopedLock locker(messageLock); |
| for (Messages::iterator i = messages.begin(); i != messages.end();) { |
| if (lastValueQueue) checkLvqReplace(*i); |
| if (i->payload->hasExpired()) { |
| expired.push_back(*i); |
| i = messages.erase(i); |
| } else { |
| ++i; |
| } |
| } |
| } |
| for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); |
| } |
| } |
| |
| /** |
| * purge - for purging all or some messages on a queue |
| * depending on the purge_request |
| * |
| * purge_request == 0 then purge all messages |
| * == N then purge N messages from queue |
| * Sometimes purge_request == 1 to unblock the top of queue |
| */ |
| uint32_t Queue::purge(const uint32_t purge_request){ |
| Mutex::ScopedLock locker(messageLock); |
| uint32_t purge_count = purge_request; // only comes into play if >0 |
| |
| uint32_t count = 0; |
| // Either purge them all or just the some (purge_count) while the queue isn't empty. |
| while((!purge_request || purge_count--) && !messages.empty()) { |
| popAndDequeue(); |
| count++; |
| } |
| return count; |
| } |
| |
| uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { |
| Mutex::ScopedLock locker(messageLock); |
| uint32_t move_count = qty; // only comes into play if qty >0 |
| uint32_t count = 0; // count how many were moved for returning |
| |
| while((!qty || move_count--) && !messages.empty()) { |
| QueuedMessage qmsg = getFront(); |
| boost::intrusive_ptr<Message> msg = qmsg.payload; |
| destq->deliver(msg); // deliver message to the destination queue |
| popMsg(qmsg); |
| dequeue(0, qmsg); |
| count++; |
| } |
| return count; |
| } |
| |
| void Queue::popMsg(QueuedMessage& qmsg) |
| { |
| if (lastValueQueue){ |
| const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders(); |
| string key = ft->getAsString(qpidVQMatchProperty); |
| lvq.erase(key); |
| } |
| messages.pop_front(); |
| ++dequeueTracker; |
| } |
| |
| void Queue::push(boost::intrusive_ptr<Message>& msg){ |
| QueueListeners::NotificationSet copy; |
| { |
| Mutex::ScopedLock locker(messageLock); |
| QueuedMessage qm(this, msg, ++sequence); |
| if (policy.get()) policy->tryEnqueue(qm); |
| |
| LVQ::iterator i; |
| if (lastValueQueue){ |
| const framing::FieldTable* ft = msg->getApplicationHeaders(); |
| string key = ft->getAsString(qpidVQMatchProperty); |
| |
| i = lvq.find(key); |
| if (i == lvq.end()){ |
| messages.push_back(qm); |
| listeners.populate(copy); |
| lvq[key] = msg; |
| }else { |
| i->second->setReplacementMessage(msg,this); |
| qm.payload = i->second; |
| dequeued(qm); |
| } |
| }else { |
| messages.push_back(qm); |
| listeners.populate(copy); |
| } |
| } |
| copy.notify(); |
| } |
| |
| QueuedMessage Queue::getFront() |
| { |
| QueuedMessage msg = messages.front(); |
| if (lastValueQueue) { |
| boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); |
| if (replacement.get()) msg.payload = replacement; |
| } |
| return msg; |
| } |
| |
| QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg) const |
| { |
| boost::intrusive_ptr<Message> replacement = msg.payload->getReplacementMessage(this); |
| if (replacement.get()) msg.payload = replacement; |
| return msg; |
| } |
| |
| /** function only provided for unit tests, or code not in critical message path */ |
| uint32_t Queue::getMessageCount() const |
| { |
| Mutex::ScopedLock locker(messageLock); |
| |
| uint32_t count = 0; |
| for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) { |
| //NOTE: don't need to use checkLvqReplace() here as it |
| //is only relevant for LVQ which does not support persistence |
| //so the enqueueComplete check has no effect |
| if ( i->payload->isEnqueueComplete() ) count ++; |
| } |
| |
| return count; |
| } |
| |
| uint32_t Queue::getConsumerCount() const |
| { |
| Mutex::ScopedLock locker(consumerLock); |
| return consumerCount; |
| } |
| |
| bool Queue::canAutoDelete() const |
| { |
| Mutex::ScopedLock locker(consumerLock); |
| return autodelete && !consumerCount; |
| } |
| |
| void Queue::clearLastNodeFailure() |
| { |
| inLastNodeFailure = false; |
| } |
| |
| void Queue::setLastNodeFailure() |
| { |
| if (persistLastNode){ |
| Mutex::ScopedLock locker(messageLock); |
| for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) { |
| if (lastValueQueue) checkLvqReplace(*i); |
| i->payload->forcePersistent(); |
| if (i->payload->getPersistenceId() == 0){ |
| enqueue(0, i->payload); |
| } |
| } |
| inLastNodeFailure = true; |
| } |
| } |
| |
| // return true if store exists, |
| bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) |
| { |
| if (inLastNodeFailure && persistLastNode){ |
| msg->forcePersistent(); |
| } |
| |
| if (traceId.size()) { |
| msg->addTraceId(traceId); |
| } |
| |
| if (msg->isPersistent() && store && !lastValueQueue) { |
| msg->enqueueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue |
| boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg); |
| store->enqueue(ctxt, pmsg, *this); |
| return true; |
| } |
| return false; |
| } |
| |
| // return true if store exists, |
| bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) |
| { |
| if (policy.get() && !policy->isEnqueued(msg)) return false; |
| { |
| Mutex::ScopedLock locker(messageLock); |
| if (!ctxt) { |
| dequeued(msg); |
| } |
| } |
| if (msg.payload->isPersistent() && store && !lastValueQueue) { |
| msg.payload->dequeueAsync(shared_from_this(), store); //increment to async counter -- for message sent to more than one queue |
| boost::intrusive_ptr<PersistableMessage> pmsg = boost::static_pointer_cast<PersistableMessage>(msg.payload); |
| store->dequeue(ctxt, pmsg, *this); |
| return true; |
| } |
| return false; |
| } |
| |
| void Queue::dequeueCommitted(const QueuedMessage& msg) |
| { |
| Mutex::ScopedLock locker(messageLock); |
| dequeued(msg); |
| if (mgmtObject != 0) { |
| mgmtObject->inc_msgTxnDequeues(); |
| mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize()); |
| } |
| } |
| |
| /** |
| * Removes a message from the in-memory delivery queue as well |
| * dequeing it from the logical (and persistent if applicable) queue |
| */ |
| void Queue::popAndDequeue() |
| { |
| QueuedMessage msg = getFront(); |
| popMsg(msg); |
| dequeue(0, msg); |
| } |
| |
| /** |
| * Updates policy and management when a message has been dequeued, |
| * expects messageLock to be held |
| */ |
| void Queue::dequeued(const QueuedMessage& msg) |
| { |
| if (policy.get()) policy->dequeued(msg); |
| mgntDeqStats(msg.payload); |
| } |
| |
| |
| void Queue::create(const FieldTable& _settings) |
| { |
| settings = _settings; |
| if (store) { |
| store->create(*this, _settings); |
| } |
| configure(_settings); |
| } |
| |
| void Queue::configure(const FieldTable& _settings) |
| { |
| setPolicy(QueuePolicy::createQueuePolicy(_settings)); |
| //set this regardless of owner to allow use of no-local with exclusive consumers also |
| noLocal = _settings.get(qpidNoLocal); |
| QPID_LOG(debug, "Configured queue with no-local=" << noLocal); |
| |
| lastValueQueue= _settings.get(qpidLastValueQueue); |
| if (lastValueQueue) QPID_LOG(debug, "Configured queue as Last Value Queue"); |
| |
| lastValueQueueNoBrowse = _settings.get(qpidLastValueQueueNoBrowse); |
| if (lastValueQueueNoBrowse){ |
| QPID_LOG(debug, "Configured queue as Last Value Queue No Browse"); |
| lastValueQueue = lastValueQueueNoBrowse; |
| } |
| |
| persistLastNode= _settings.get(qpidPersistLastNode); |
| if (persistLastNode) QPID_LOG(debug, "Configured queue to Persist data if cluster fails to one node"); |
| |
| traceId = _settings.getAsString(qpidTraceIdentity); |
| std::string excludeList = _settings.getAsString(qpidTraceExclude); |
| if (excludeList.size()) { |
| split(traceExclude, excludeList, ", "); |
| } |
| QPID_LOG(debug, "Configured queue " << getName() << " with qpid.trace.id='" << traceId |
| << "' and qpid.trace.exclude='"<< excludeList << "' i.e. " << traceExclude.size() << " elements"); |
| |
| if (mgmtObject != 0) |
| mgmtObject->set_arguments (_settings); |
| } |
| |
| void Queue::destroy() |
| { |
| if (alternateExchange.get()) { |
| Mutex::ScopedLock locker(messageLock); |
| while(!messages.empty()){ |
| DeliverableMessage msg(getFront().payload); |
| alternateExchange->route(msg, msg.getMessage().getRoutingKey(), |
| msg.getMessage().getApplicationHeaders()); |
| popAndDequeue(); |
| } |
| alternateExchange->decAlternateUsers(); |
| } |
| |
| if (store) { |
| store->flush(*this); |
| store->destroy(*this); |
| store = 0;//ensure we make no more calls to the store for this queue |
| } |
| } |
| |
| void Queue::bound(const string& exchange, const string& key, |
| const FieldTable& args) |
| { |
| bindings.add(exchange, key, args); |
| } |
| |
| void Queue::unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref) |
| { |
| bindings.unbind(exchanges, shared_ref); |
| } |
| |
| void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy) |
| { |
| policy = _policy; |
| } |
| |
| const QueuePolicy* Queue::getPolicy() |
| { |
| return policy.get(); |
| } |
| |
| uint64_t Queue::getPersistenceId() const |
| { |
| return persistenceId; |
| } |
| |
| void Queue::setPersistenceId(uint64_t _persistenceId) const |
| { |
| if (mgmtObject != 0 && persistenceId == 0) |
| { |
| ManagementAgent* agent = ManagementAgent::Singleton::getInstance(); |
| agent->addObject (mgmtObject, 0x3000000000000000LL + _persistenceId); |
| |
| if (externalQueueStore) { |
| ManagementObject* childObj = externalQueueStore->GetManagementObject(); |
| if (childObj != 0) |
| childObj->setReference(mgmtObject->getObjectId()); |
| } |
| } |
| persistenceId = _persistenceId; |
| } |
| |
| void Queue::encode(Buffer& buffer) const |
| { |
| buffer.putShortString(name); |
| buffer.put(settings); |
| if (policy.get()) { |
| buffer.put(*policy); |
| } |
| } |
| |
| uint32_t Queue::encodedSize() const |
| { |
| return name.size() + 1/*short string size octet*/ + settings.encodedSize() |
| + (policy.get() ? (*policy).encodedSize() : 0); |
| } |
| |
| Queue::shared_ptr Queue::decode(QueueRegistry& queues, Buffer& buffer) |
| { |
| string name; |
| buffer.getShortString(name); |
| std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true); |
| buffer.get(result.first->settings); |
| result.first->configure(result.first->settings); |
| if (result.first->policy.get() && buffer.available() >= result.first->policy->encodedSize()) { |
| buffer.get ( *(result.first->policy) ); |
| } |
| return result.first; |
| } |
| |
| |
| void Queue::setAlternateExchange(boost::shared_ptr<Exchange> exchange) |
| { |
| alternateExchange = exchange; |
| } |
| |
| boost::shared_ptr<Exchange> Queue::getAlternateExchange() |
| { |
| return alternateExchange; |
| } |
| |
| void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue) |
| { |
| if (broker.getQueues().destroyIf(queue->getName(), |
| boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) { |
| queue->unbind(broker.getExchanges(), queue); |
| queue->destroy(); |
| } |
| } |
| |
| bool Queue::isExclusiveOwner(const OwnershipToken* const o) const |
| { |
| Mutex::ScopedLock locker(ownershipLock); |
| return o == owner; |
| } |
| |
| void Queue::releaseExclusiveOwnership() |
| { |
| Mutex::ScopedLock locker(ownershipLock); |
| owner = 0; |
| } |
| |
| bool Queue::setExclusiveOwner(const OwnershipToken* const o) |
| { |
| Mutex::ScopedLock locker(ownershipLock); |
| if (owner) { |
| return false; |
| } else { |
| owner = o; |
| return true; |
| } |
| } |
| |
| bool Queue::hasExclusiveOwner() const |
| { |
| Mutex::ScopedLock locker(ownershipLock); |
| return owner != 0; |
| } |
| |
| bool Queue::hasExclusiveConsumer() const |
| { |
| return exclusive; |
| } |
| |
| void Queue::setExternalQueueStore(ExternalQueueStore* inst) { |
| if (externalQueueStore!=inst && externalQueueStore) |
| delete externalQueueStore; |
| externalQueueStore = inst; |
| |
| if (inst) { |
| ManagementObject* childObj = inst->GetManagementObject(); |
| if (childObj != 0 && mgmtObject != 0) |
| childObj->setReference(mgmtObject->getObjectId()); |
| } |
| } |
| |
| bool Queue::releaseMessageContent(const QueuedMessage& m) |
| { |
| if (store && !NullMessageStore::isNullStore(store)) { |
| QPID_LOG(debug, "Message " << m.position << " on " << name << " released from memory"); |
| m.payload->releaseContent(store); |
| return true; |
| } else { |
| QPID_LOG(warning, "Message " << m.position << " on " << name |
| << " cannot be released from memory as the queue is not durable"); |
| return false; |
| } |
| } |
| |
| ManagementObject* Queue::GetManagementObject (void) const |
| { |
| return (ManagementObject*) mgmtObject; |
| } |
| |
| Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string&) |
| { |
| Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; |
| |
| QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]"); |
| |
| switch (methodId) |
| { |
| case _qmf::Queue::METHOD_PURGE : |
| _qmf::ArgsQueuePurge& iargs = (_qmf::ArgsQueuePurge&) args; |
| purge (iargs.i_request); |
| status = Manageable::STATUS_OK; |
| break; |
| } |
| |
| return status; |
| } |
| |
| void Queue::setPosition(SequenceNumber n) { |
| Mutex::ScopedLock locker(messageLock); |
| sequence = n; |
| } |