blob: 58764637b1ef8dac3e5d90f4fd39c13a0f0e2de5 [file]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownersip. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/broker/PriorityQueue.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueCursor.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
#include <algorithm>
#include <cmath>
#include <boost/bind.hpp>
namespace qpid {
namespace broker {
namespace {
class PriorityContext : public CursorContext {
public:
std::vector<QueueCursor> position;
PriorityContext(size_t levels, SubscriptionType type) : position(levels, QueueCursor(type)) {}
};
}
PriorityQueue::PriorityQueue(int l) :
levels(l),
messages(levels, Deque(boost::bind(&PriorityQueue::priorityPadding, this, _1))),
counters(levels, framing::SequenceNumber()),
fifo(boost::bind(&PriorityQueue::fifoPadding, this, _1)),
frontLevel(0), haveFront(false), cached(false)
{
}
bool PriorityQueue::deleted(const QueueCursor& c)
{
MessagePointer* ptr = fifo.find(c);
if (ptr && ptr->holder) {
//mark the message as deleted
ptr->holder->message.setState(DELETED);
//clean the deque for the relevant priority level
boost::shared_ptr<PriorityContext> ctxt = boost::dynamic_pointer_cast<PriorityContext>(c.context);
messages[ptr->holder->priority].clean();
//stop referencing that message holder (it may now have been
//deleted)
ptr->holder = 0;
//clean fifo index
fifo.clean();
return true;
} else {
return false;
}
}
size_t PriorityQueue::size()
{
return fifo.size();
}
Message* PriorityQueue::next(QueueCursor& cursor)
{
boost::shared_ptr<PriorityContext> ctxt = boost::dynamic_pointer_cast<PriorityContext>(cursor.context);
if (!ctxt) {
ctxt = boost::shared_ptr<PriorityContext>(new PriorityContext(levels, cursor.type));
cursor.context = ctxt;
}
if (cursor.type == REPLICATOR) {
//browse in fifo order
MessagePointer* ptr = fifo.next(cursor);
return ptr ? &(ptr->holder->message) : 0;
} else if (cursor.type == PURGE) {
//iterate over message in reverse priority order (i.e. purge lowest priority message first)
//ignore any fairshare configuration here as well
for (int p = 0; p < levels; ++p) {
MessageHolder* holder = messages[p].next(ctxt->position[p]);
if (holder) {
cursor.setPosition(holder->message.getSequence(), 0);
return &(holder->message);
}
}
return 0;
} else {
//check each level in turn, in priority order, for any more messages
Priority p = firstLevel();
do {
MessageHolder* holder = messages[p.current].next(ctxt->position[p.current]);
if (holder) {
cursor.setPosition(holder->message.getSequence(), 0);
return &(holder->message);
}
} while (nextLevel(p));
return 0;
}
}
Message* PriorityQueue::find(const QueueCursor& cursor)
{
return find(cursor.position, 0);
}
Message* PriorityQueue::find(const framing::SequenceNumber& position, QueueCursor* cursor)
{
MessagePointer* ptr = fifo.find(position, cursor);
return ptr ? &(ptr->holder->message) : 0;
}
void PriorityQueue::publish(const Message& published)
{
MessageHolder holder;
holder.message = published;
holder.priority = getPriorityLevel(published);
holder.id = ++(counters[holder.priority]);
MessagePointer pointer;
pointer.holder = &(messages[holder.priority].publish(holder));
pointer.id = published.getSequence();
fifo.publish(pointer);
}
Message* PriorityQueue::release(const QueueCursor& cursor)
{
MessagePointer* ptr = fifo.release(cursor);
if (ptr) {
messages[ptr->holder->priority].resetCursors();
return &(ptr->holder->message);
} else {
return 0;
}
}
void PriorityQueue::foreach(Functor f)
{
fifo.foreach(f);
}
uint PriorityQueue::getPriorityLevel(const Message& m) const
{
uint priority = m.getPriority();
//Use AMQP 0-10 approach to mapping priorities to a fixed level
//(see rule priority-level-implementation)
const uint firstLevel = 5 - uint(std::min(5.0, std::ceil((double) levels/2.0)));
if (priority <= firstLevel) return 0;
return std::min(priority - firstLevel, (uint)levels-1);
}
PriorityQueue::MessagePointer PriorityQueue::fifoPadding(qpid::framing::SequenceNumber id)
{
PriorityQueue::MessagePointer pointer;
pointer.holder = 0;
pointer.id = id;
return pointer;
}
PriorityQueue::MessageHolder PriorityQueue::priorityPadding(qpid::framing::SequenceNumber id)
{
PriorityQueue::MessageHolder holder;
holder.id = id;
holder.message.setState(DELETED);
return holder;
}
PriorityQueue::Priority PriorityQueue::firstLevel()
{
return Priority(levels - 1);
}
bool PriorityQueue::nextLevel(Priority& p)
{
if (p.current > 0) {
--(p.current);
return true;
} else {
return false;
}
}
framing::SequenceNumber PriorityQueue::MessageHolder::getSequence() const
{
return id;
}
void PriorityQueue::MessageHolder::setState(MessageState s)
{
message.setState(s);
}
MessageState PriorityQueue::MessageHolder::getState() const
{
return message.getState();
}
PriorityQueue::MessageHolder::operator Message&()
{
return message;
}
framing::SequenceNumber PriorityQueue::MessagePointer::getSequence() const
{
if (holder) {
return holder->message.getSequence();
} else {
//this is used when the instance is merely acting as padding
return id;
}
}
void PriorityQueue::MessagePointer::setState(MessageState s)
{
if (holder) {
holder->message.setState(s);
}
}
MessageState PriorityQueue::MessagePointer::getState() const
{
if (holder) {
return holder->message.getState();
} else {
return DELETED;
}
}
PriorityQueue::MessagePointer::operator Message&()
{
assert(holder);
return holder->message;
}
}} // namespace qpid::broker