blob: d807ef22b11ea7e37c461ca85affc9b46a29f146 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/broker/PriorityQueue.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueuedMessage.h"
#include "qpid/framing/reply_exceptions.h"
#include <cmath>
namespace qpid {
namespace broker {
PriorityQueue::PriorityQueue(int l) :
levels(l),
messages(levels, Deque()),
frontLevel(0), haveFront(false), cached(false) {}
bool PriorityQueue::deleted(const QueuedMessage&) { return true; }
size_t PriorityQueue::size()
{
size_t total(0);
for (int i = 0; i < levels; ++i) {
total += messages[i].size();
}
return total;
}
void PriorityQueue::release(const QueuedMessage& message)
{
uint p = getPriorityLevel(message);
messages[p].insert(lower_bound(messages[p].begin(), messages[p].end(), message), message);
clearCache();
}
bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message, bool remove)
{
QueuedMessage comp;
comp.position = position;
for (int i = 0; i < levels; ++i) {
if (!messages[i].empty()) {
unsigned long diff = position.getValue() - messages[i].front().position.getValue();
long maxEnd = diff < messages[i].size() ? diff : messages[i].size();
Deque::iterator l = lower_bound(messages[i].begin(),messages[i].begin()+maxEnd,comp);
if (l != messages[i].end() && l->position == position) {
message = *l;
if (remove) {
messages[i].erase(l);
clearCache();
}
return true;
}
}
}
return false;
}
bool PriorityQueue::acquire(const framing::SequenceNumber& position, QueuedMessage& message)
{
return find(position, message, true);
}
bool PriorityQueue::find(const framing::SequenceNumber& position, QueuedMessage& message)
{
return find(position, message, false);
}
bool PriorityQueue::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool)
{
QueuedMessage match;
match.position = position+1;
Deque::iterator lowest;
bool found = false;
for (int i = 0; i < levels; ++i) {
Deque::iterator m = lower_bound(messages[i].begin(), messages[i].end(), match);
if (m != messages[i].end()) {
if (m->position == match.position) {
message = *m;
return true;
} else if (!found || m->position < lowest->position) {
lowest = m;
found = true;
}
}
}
if (found) {
message = *lowest;
}
return found;
}
bool PriorityQueue::consume(QueuedMessage& message)
{
if (checkFront()) {
message = messages[frontLevel].front();
messages[frontLevel].pop_front();
clearCache();
return true;
} else {
return false;
}
}
bool PriorityQueue::push(const QueuedMessage& added, QueuedMessage& /*not needed*/)
{
messages[getPriorityLevel(added)].push_back(added);
clearCache();
return false;//adding a message never causes one to be removed for deque
}
void PriorityQueue::foreach(Functor f)
{
for (int i = 0; i < levels; ++i) {
std::for_each(messages[i].begin(), messages[i].end(), f);
}
}
void PriorityQueue::removeIf(Predicate p)
{
for (int priority = 0; priority < levels; ++priority) {
for (Deque::iterator i = messages[priority].begin(); i != messages[priority].end();) {
if (p(*i)) {
i = messages[priority].erase(i);
clearCache();
} else {
++i;
}
}
}
}
uint PriorityQueue::getPriorityLevel(const QueuedMessage& m) const
{
uint priority = m.payload->getPriority();
//Use AMQP 0-10 approach to mapping priorities to a fixed level
//(see rule priority-level-implementation)
const uint firstLevel = 5 - uint(std::min(5.0, std::ceil((double) levels/2.0)));
if (priority <= firstLevel) return 0;
return std::min(priority - firstLevel, (uint)levels-1);
}
void PriorityQueue::clearCache()
{
cached = false;
}
bool PriorityQueue::findFrontLevel(uint& l, PriorityLevels& m)
{
for (int p = levels-1; p >= 0; --p) {
if (!m[p].empty()) {
l = p;
return true;
}
}
return false;
}
bool PriorityQueue::checkFront()
{
if (!cached) {
haveFront = findFrontLevel(frontLevel, messages);
cached = true;
}
return haveFront;
}
uint PriorityQueue::getPriority(const QueuedMessage& message)
{
const PriorityQueue* queue = dynamic_cast<const PriorityQueue*>(&(message.queue->getMessages()));
if (queue) return queue->getPriorityLevel(message);
else return 0;
}
}} // namespace qpid::broker