blob: b3b7f6a2d441be18f015975958bae17efb594da2 [file]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/broker/PagedQueue.h"
#include "qpid/broker/Protocol.h"
#include "qpid/broker/QueueCursor.h"
#include "qpid/broker/Message.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/sys/Time.h"
#include <string.h>
namespace qpid {
namespace broker {
namespace {
using qpid::sys::AbsTime;
using qpid::sys::Duration;
using qpid::sys::ZERO;
using qpid::sys::FAR_FUTURE;
using qpid::sys::MemoryMappedFile;
const uint32_t OVERHEAD(4/*content-size*/ + 4/*sequence-number*/ + 8/*persistence-id*/ + 8/*expiration*/);
size_t encodedSize(const Message& msg)
{
return msg.getPersistentContext()->encodedSize() + OVERHEAD;
}
size_t encode(const Message& msg, char* data, size_t size)
{
uint32_t encoded = msg.getPersistentContext()->encodedSize();
uint32_t required = encoded + OVERHEAD;
if (required > size) return 0;
qpid::framing::Buffer buffer(data, required);
buffer.putLong(encoded);
buffer.putLong(msg.getSequence());
buffer.putLongLong(msg.getPersistentContext()->getPersistenceId());
sys::AbsTime expiration = msg.getExpiration();
int64_t t(0);
if (expiration < FAR_FUTURE) {
// Just need an integer that will round trip
t = Duration(ZERO, expiration);
}
buffer.putLongLong(t);
msg.getPersistentContext()->encode(buffer);
assert(buffer.getPosition() == required);
return required;
}
size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_t size)
{
qpid::framing::Buffer metadata(const_cast<char*>(data), size);
uint32_t encoded = metadata.getLong();
uint32_t sequence = metadata.getLong();
uint64_t persistenceId = metadata.getLongLong();
int64_t t = metadata.getLongLong();
assert(metadata.available() >= encoded);
qpid::framing::Buffer buffer(const_cast<char*>(data) + metadata.getPosition(), encoded);
msg = protocols.decode(buffer);
assert(buffer.getPosition() == encoded);
msg.setSequence(qpid::framing::SequenceNumber(sequence));
msg.getPersistentContext()->setPersistenceId(persistenceId);
if (t) {
sys::AbsTime expiration(ZERO, t);
msg.getSharedState().setExpiration(expiration);
}
return encoded + metadata.getPosition();
}
}
PagedQueue::PagedQueue(const std::string& name_, const std::string& directory, uint m, uint factor, ProtocolRegistry& p)
: name(name_), pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0), loaded(0), version(0)
{
if (directory.empty()) {
throw qpid::Exception(QPID_MSG("Cannot create paged queue: No paged queue directory specified"));
}
file.open(name, directory);
QPID_LOG(debug, "PagedQueue[" << name << "]");
}
PagedQueue::~PagedQueue()
{
file.close();
}
size_t PagedQueue::size()
{
size_t total(0);
for (Used::const_iterator i = used.begin(); i != used.end(); ++i) {
total += i->second.available();
}
return total;
}
bool PagedQueue::deleted(const QueueCursor& cursor)
{
if (cursor.valid) {
Used::iterator page = findPage(cursor.position, false);
if (page == used.end()) {
return false;
}
page->second.deleted(cursor.position);
if (page->second.empty()) {
//move page to free list
--loaded;
page->second.clear(file);
free.push_back(page->second);
used.erase(page);
}
return true;
} else {
return false;
}
}
void PagedQueue::check(const Message& added)
{
if (encodedSize(added) > pageSize) {
QPID_LOG(error, "Message is larger than page size for queue " << name);
throw qpid::framing::PreconditionFailedException(QPID_MSG("Message is larger than page size for queue " << name));
}
}
void PagedQueue::publish(const Message& added)
{
check(added);
Used::reverse_iterator i = used.rbegin();
if (i != used.rend()) {
if (!i->second.isLoaded()) load(i->second);
if (i->second.add(added)) return;
}
//used is empty or last page is full, need to add a new page
if (!newPage(added.getSequence()).add(added)) {
QPID_LOG(error, "Could not add message to paged queue " << name);
throw qpid::Exception(QPID_MSG("Could not add message to paged queue " << name));
}
}
Message* PagedQueue::next(QueueCursor& cursor)
{
Used::iterator i = used.begin();
if (cursor.valid) {
qpid::framing::SequenceNumber position(cursor.position);
++position;
i = findPage(position, true);
if (i == used.end() && !used.empty() && used.begin()->first > position) i = used.begin();
}
while (i != used.end()) {
if (!i->second.isLoaded()) load(i->second);
Message* m = i->second.next(version, cursor);
QPID_LOG(debug, "PagedQueue::next(" << cursor.valid << ":" << cursor.position << "): " << m);
if (m) return m;
++i;
}
QPID_LOG(debug, "PagedQueue::next(" << cursor.valid << ":" << cursor.position << ") returning 0 ");
return 0;
}
Message* PagedQueue::release(const QueueCursor& cursor)
{
if (cursor.valid) {
Used::iterator i = findPage(cursor.position, true);
if (i == used.end()) return 0;
return i->second.release(cursor.position);
} else {
return 0;
}
}
Message* PagedQueue::find(const framing::SequenceNumber& position, QueueCursor* cursor)
{
Used::iterator i = findPage(position, true);
if (i != used.end()) {
Message* m = i->second.find(position);
if (cursor) {
cursor->setPosition(version, m ? m->getSequence() : position);
}
return m;
} else {
return 0;
}
}
void PagedQueue::foreach(Functor)
{
//TODO:
}
Message* PagedQueue::find(const QueueCursor& cursor)
{
if (cursor.valid) return find(cursor.position, 0);
else return 0;
}
PagedQueue::Page::Page(size_t s, size_t o) : size(s), offset(o), region(0), used(0)
{
QPID_LOG(debug, "Created Page[" << offset << "], size=" << size);
}
void PagedQueue::Page::deleted(qpid::framing::SequenceNumber s)
{
if (isLoaded()) {
Message* message = find(s);
assert(message);//could this ever legitimately be 0?
message->setState(DELETED);
}
contents.remove(s);
acquired.remove(s);
}
Message* PagedQueue::Page::release(qpid::framing::SequenceNumber s)
{
Message* m = find(s);
if (m) {
m->setState(AVAILABLE);
}
acquired.remove(s);
return m;
}
bool PagedQueue::Page::add(const Message& message)
{
assert(region);
assert (size >= used);
size_t encoded = encode(message, region + used, size - used);
QPID_LOG(debug, "Calling Page[" << offset << "]::add() used=" << used << ", size=" << size << ", encoded=" << encoded << ")");
if (encoded) {
used += encoded;
messages.push_back(message);
messages.back().setState(AVAILABLE);
contents.add(message.getSequence());
return true;
} else {
return false;
}
}
bool PagedQueue::Page::empty() const
{
return contents.empty();
}
bool PagedQueue::Page::isLoaded() const
{
return region;
}
Message* PagedQueue::Page::next(uint32_t version, QueueCursor& cursor)
{
if (messages.empty()) return 0;
qpid::framing::SequenceNumber position;
if (cursor.valid) {
position = cursor.position + 1;
if (position < messages.front().getSequence()) {
position = messages.front().getSequence();
cursor.setPosition(position, version);
}
} else {
position = messages.front().getSequence();
cursor.setPosition(position, version);
}
Message* m;
do {
m = find(position);
if (m) cursor.setPosition(position, version);
++position;
} while (m != 0 && !cursor.check(*m));
return m;
//if it is the first in the page, increment the hint count of the page
//if it is the last in the page, decrement the hint count of the page
}
/**
* Called before adding to the free list
*/
void PagedQueue::Page::clear(MemoryMappedFile& file)
{
if (region) file.unmap(region, size);
region = 0;
used = 0;
contents.clear();
messages.clear();
}
size_t PagedQueue::Page::available() const
{
return contents.size() - acquired.size();
}
Message* PagedQueue::Page::find(qpid::framing::SequenceNumber position)
{
if (messages.size()) {
assert(position >= messages.front().getSequence());
size_t index = position - messages.front().getSequence();
if (index < messages.size()) return &(messages[index]);
else return 0;
} else {
//page is empty, is this an error?
QPID_LOG(warning, "Could not find message at " << position << "; empty page.");
return 0;
}
//if it is the first in the page, increment the hint count of the page
//if it is the last in the page, decrement the hint count of the page
}
void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry& protocols)
{
QPID_LOG(debug, "Page[" << offset << "]::load" << " used=" << used << ", size=" << size);
assert(region == 0);
region = file.map(offset, size);
assert(region != 0);
bool haveData = used > 0;
used = 4;//first 4 bytes are the count
if (haveData) {
qpid::framing::Buffer buffer(region, sizeof(uint32_t));
uint32_t count = buffer.getLong();
//decode messages into Page::messages
for (size_t i = 0; i < count; ++i) {
Message message;
used += decode(protocols, message, region + used, size - used);
if (!contents.contains(message.getSequence())) {
message.setState(DELETED);
QPID_LOG(debug, "Setting state to deleted for message loaded at " << message.getSequence());
} else if (acquired.contains(message.getSequence())) {
message.setState(ACQUIRED);
} else {
message.setState(AVAILABLE);
}
messages.push_back(message);
}
if (messages.size()) {
QPID_LOG(debug, "Page[" << offset << "]::load " << messages.size() << " messages loaded from "
<< messages.front().getSequence() << " to " << messages.back().getSequence());
} else {
QPID_LOG(debug, "Page[" << offset << "]::load no messages loaded");
}
}//else there is nothing we need to explicitly load, just needed to map region
}
void PagedQueue::Page::unload(MemoryMappedFile& file)
{
if (messages.size()) {
QPID_LOG(debug, "Page[" << offset << "]::unload " << messages.size() << " messages to unload from "
<< messages.front().getSequence() << " to " << messages.back().getSequence());
} else {
QPID_LOG(debug, "Page[" << offset << "]::unload no messages to unload");
}
for (std::deque<Message>::iterator i = messages.begin(); i != messages.end(); ++i) {
if (i->getState() == ACQUIRED) acquired.add(i->getSequence());
}
uint32_t count = messages.size();
qpid::framing::Buffer buffer(region, sizeof(uint32_t));
buffer.putLong(count);
file.flush(region, size);
file.unmap(region, size);
//remove messages from memory
messages.clear();
region = 0;
}
void PagedQueue::load(Page& page)
{
//if needed, release another page
if (loaded == maxLoaded) {
//which page to select?
Used::reverse_iterator i = used.rbegin();
while (i != used.rend() && !i->second.isLoaded()) {
++i;
}
assert(i != used.rend());
unload(i->second);
}
page.load(file, protocols);
++loaded;
QPID_LOG(debug, "PagedQueue[" << name << "] loaded page, " << loaded << " pages now loaded");
}
void PagedQueue::unload(Page& page)
{
page.unload(file);
--loaded;
QPID_LOG(debug, "PagedQueue[" << name << "] unloaded page, " << loaded << " pages now loaded");
}
PagedQueue::Page& PagedQueue::newPage(qpid::framing::SequenceNumber id)
{
if (loaded == maxLoaded) {
//need to release a page from memory to make way for a new one
//choose last one?
Used::reverse_iterator i = used.rbegin();
while (!i->second.isLoaded() && i != used.rend()) {
++i;
}
assert(i != used.rend());
unload(i->second);
}
if (free.empty()) {
//need to extend file and add some pages to the free list
addPages(4/*arbitrary number, should this be config item?*/);
}
std::pair<Used::iterator, bool> result = used.insert(Used::value_type(id, free.front()));
QPID_LOG(debug, "Added page for sequence starting from " << id);
assert(result.second);
free.pop_front();
load(result.first->second);
return result.first->second;
}
void PagedQueue::addPages(size_t count)
{
for (size_t i = 0; i < count; ++i) {
free.push_back(Page(pageSize, offset));
offset += pageSize;
file.expand(offset);
}
QPID_LOG(debug, "Added " << count << " pages to free list; now have " << used.size() << " used, and " << free.size() << " free");
}
PagedQueue::Used::iterator PagedQueue::findPage(const QueueCursor& cursor)
{
Used::iterator i = used.begin();
if (cursor.valid) {
i = findPage(cursor.position, true);
} else if (i != used.end() && !i->second.isLoaded()) {
load(i->second);
}
return i;
}
PagedQueue::Used::iterator PagedQueue::findPage(qpid::framing::SequenceNumber n, bool loadIfRequired)
{
Used::iterator i = used.end();
for (Used::iterator j = used.begin(); j != used.end() && j->first <= n; ++j) {
i = j;
}
if (loadIfRequired && i != used.end() && !i->second.isLoaded()) {
load(i->second);
}
return i;
}
}} // namespace qpid::broker