blob: db5d2ebf4cf4e14842d61cd2ce656f050027718c [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <qpid/log/Statement.h>
#include "Messages.h"
#include "Lsn.h"
#include "qpid/store/StoreException.h"
#include <boost/foreach.hpp>
namespace qpid {
namespace store {
namespace ms_clfs {
void
Messages::openLog(const std::string& path, const Log::TuningParameters& params)
{
log.open (path, params);
}
void
Messages::add(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg)
{
uint64_t id = log.add(msg);
msg->setPersistenceId(id);
std::auto_ptr<MessageInfo> autom(new MessageInfo);
MessageInfo::shared_ptr m(autom);
std::pair<uint64_t, MessageInfo::shared_ptr> p(id, m);
{
qpid::sys::ScopedWlock<qpid::sys::RWlock> l(lock);
messages.insert(p);
// If there's only this one message there, move the tail to it.
// This prevents the log from continually growing when messages
// are added and removed one at a time.
if (messages.size() == 1) {
CLFS_LSN newTail = idToLsn(id);
log.moveTail(newTail);
}
}
}
void
Messages::enqueue(uint64_t msgId, uint64_t queueId, Transaction::shared_ptr& t)
{
MessageInfo::shared_ptr p;
{
qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock);
MessageMap::const_iterator i = messages.find(msgId);
if (i == messages.end())
THROW_STORE_EXCEPTION("Message does not exist");
p = i->second;
}
MessageInfo::Location loc(queueId, t, MessageInfo::TRANSACTION_ENQUEUE);
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock);
p->where.push_back(loc);
uint64_t transactionId = 0;
if (t.get() != 0) {
transactionId = t->getId();
t->enroll(msgId);
}
try {
log.recordEnqueue(msgId, queueId, transactionId);
}
catch (...) {
// Undo the record-keeping if the log wasn't written correctly.
if (transactionId != 0)
t->unenroll(msgId);
p->where.pop_back();
throw;
}
}
}
void
Messages::dequeue(uint64_t msgId, uint64_t queueId, Transaction::shared_ptr& t)
{
MessageInfo::shared_ptr p;
{
qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock);
MessageMap::const_iterator i = messages.find(msgId);
if (i == messages.end())
THROW_STORE_EXCEPTION("Message does not exist");
p = i->second;
}
{
// Locate the 'where' entry for the specified queue. Once this operation
// is recorded in the log, update the 'where' entry to reflect it.
// Note that an existing entry in 'where' that refers to a transaction
// is not eligible for this operation.
qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock);
std::list<MessageInfo::Location>::iterator i;
for (i = p->where.begin(); i != p->where.end(); ++i) {
if (i->queueId == queueId && i->transaction.get() == 0)
break;
}
if (i == p->where.end())
THROW_STORE_EXCEPTION("Message not on queue");
uint64_t transactionId = 0;
if (t.get() != 0) {
transactionId = t->getId();
t->enroll(msgId);
}
try {
log.recordDequeue(msgId, queueId, transactionId);
}
catch (...) {
// Undo the record-keeping if the log wasn't written correctly.
if (transactionId != 0)
t->unenroll(msgId);
throw;
}
// Ok, logged successfully. If this is a transactional op, note
// the transaction. If non-transactional, remove the 'where' entry.
if (transactionId != 0) {
i->transaction = t;
i->disposition = MessageInfo::TRANSACTION_DEQUEUE;
}
else {
p->where.erase(i);
// If the message doesn't exist on any other queues, remove it.
if (p->where.empty())
remove(msgId);
}
}
}
// Commit a previous provisional enqueue or dequeue of a particular message
// actions under a specified transaction. If this results in the message's
// being removed from all queues, it is deleted.
void
Messages::commit(uint64_t msgId, Transaction::shared_ptr& t)
{
MessageInfo::shared_ptr p;
{
qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock);
MessageMap::const_iterator i = messages.find(msgId);
if (i == messages.end())
THROW_STORE_EXCEPTION("Message does not exist");
p = i->second;
}
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock);
std::list<MessageInfo::Location>::iterator i;
for (i = p->where.begin(); i != p->where.end(); ++i) {
if (i->transaction != t)
continue;
// Transactional dequeues can now remove the item from the
// where list; enqueues just clear the transaction reference.
if (i->disposition == MessageInfo::TRANSACTION_DEQUEUE)
i = p->where.erase(i);
else
i->transaction.reset();
}
}
// If committing results in this message having no further enqueue
// references, delete it. If the delete fails, swallow the exception
// and let recovery take care of removing it later.
if (p->where.empty()) {
try {
remove(msgId);
}
catch(...) {}
}
}
// Abort a previous provisional enqueue or dequeue of a particular message
// actions under a specified transaction. If this results in the message's
// being removed from all queues, it is deleted.
void
Messages::abort(uint64_t msgId, Transaction::shared_ptr& t)
{
MessageInfo::shared_ptr p;
{
qpid::sys::ScopedRlock<qpid::sys::RWlock> l(lock);
MessageMap::const_iterator i = messages.find(msgId);
if (i == messages.end())
THROW_STORE_EXCEPTION("Message does not exist");
p = i->second;
}
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(p->whereLock);
std::list<MessageInfo::Location>::iterator i = p->where.begin();
while (i != p->where.end()) {
if (i->transaction != t) {
++i;
continue;
}
// Aborted transactional dequeues result in the message remaining
// enqueued like before the operation; enqueues clear the
// message from the where list - like the enqueue never happened.
if (i->disposition == MessageInfo::TRANSACTION_ENQUEUE)
i = p->where.erase(i);
else {
i->transaction.reset();
++i;
}
}
}
// If aborting results in this message having no further enqueue
// references, delete it. If the delete fails, swallow the exception
// and let recovery take care of removing it later.
if (p->where.empty()) {
try {
remove(msgId);
}
catch(...) {}
}
}
// Load part or all of a message's content from previously stored
// log record(s).
void
Messages::loadContent(uint64_t msgId,
std::string& data,
uint64_t offset,
uint32_t length)
{
log.loadContent(msgId, data, offset, length);
}
// Recover the current set of messages and where they're queued from
// the log.
void
Messages::recover(qpid::broker::RecoveryManager& recoverer,
const std::set<uint64_t> &validQueues,
const std::map<uint64_t, Transaction::shared_ptr>& transMap,
qpid::store::MessageMap& messageMap,
qpid::store::MessageQueueMap& messageQueueMap)
{
std::map<uint64_t, std::vector<MessageLog::RecoveredMsgOp> > messageOps;
log.recover(recoverer, messageMap, messageOps);
// Now read through the messageOps replaying the operations with the
// knowledge of which transactions committed, aborted, etc. A transaction
// should not be deleted until there are no messages referencing it so
// a message operation with a transaction id not found in transMap is
// a serious problem.
QPID_LOG(debug, "Beginning CLFS-recovered message operation replay");
// Keep track of any messages that are recovered from the log but don't
// have any place to be. This can happen, for example, if the broker
// crashes while logging a message deletion. After all the recovery is
// done, delete all the homeless messages.
std::vector<uint64_t> homeless;
std::map<uint64_t, std::vector<MessageLog::RecoveredMsgOp> >::const_iterator msg;
for (msg = messageOps.begin(); msg != messageOps.end(); ++msg) {
uint64_t msgId = msg->first;
const std::vector<MessageLog::RecoveredMsgOp>& ops = msg->second;
QPID_LOG(debug, "Message " << msgId << "; " << ops.size() << " op(s)");
MessageInfo::shared_ptr m(new MessageInfo);
std::vector<QueueEntry>& entries = messageQueueMap[msgId];
std::vector<MessageLog::RecoveredMsgOp>::const_iterator op;
for (op = ops.begin(); op != ops.end(); ++op) {
QueueEntry entry(op->queueId);
MessageInfo::Location loc(op->queueId);
std::string dir =
op->op == MessageLog::RECOVERED_ENQUEUE ? "enqueue"
: "dequeue";
if (validQueues.find(op->queueId) == validQueues.end()) {
QPID_LOG(info,
"Message " << msgId << dir << " on non-existant queue "
<< op->queueId << "; dropped");
continue;
}
if (op->txnId != 0) {
// Be sure to enroll this message in the transaction even if
// it has committed or aborted. This ensures that the
// transaction isn't removed from the log while finalizing the
// recovery. If it were to be removed and the broker failed
// again before removing this message during normal operation,
// it couldn't be recovered again.
//
// Recall what is being reconstructed; 2 things:
// 1. This class's 'messages' list which keeps track
// of the queues each message is on and the transactions
// each message is enrolled in. For this, aborted
// transactions cause the result of the operation to be
// ignored, but the message does need to be enrolled in
// the transaction to properly maintain the transaction
// references until the message is deleted.
// 2. The StorageProvider's MessageQueueMap, which also
// has an entry for each queue each message is on and
// its TPL status and associated xid.
const Transaction::shared_ptr &t =
transMap.find(op->txnId)->second;
// Prepared transactions cause the operation to be
// provisionally acted on, and the message to be enrolled in
// the transaction for when it commits/aborts. This is
// noted in the QueueEntry for the StorageProvider's map.
if (t->getState() == Transaction::TRANS_PREPARED) {
QPID_LOG(debug, dir << " for queue " << op->queueId <<
", prepared txn " << op->txnId);
TPCTransaction::shared_ptr tpct(boost::dynamic_pointer_cast<TPCTransaction>(t));
if (tpct.get() == 0)
THROW_STORE_EXCEPTION("Invalid transaction state");
t->enroll(msgId);
entry.xid = tpct->getXid();
loc.transaction = t;
if (op->op == MessageLog::RECOVERED_ENQUEUE) {
entry.tplStatus = QueueEntry::ADDING;
loc.disposition = MessageInfo::TRANSACTION_ENQUEUE;
}
else {
entry.tplStatus = QueueEntry::REMOVING;
loc.disposition = MessageInfo::TRANSACTION_DEQUEUE;
}
}
else if (t->getState() != Transaction::TRANS_COMMITTED) {
QPID_LOG(debug, dir << " for queue " << op->queueId <<
", txn " << op->txnId << ", rolling back");
continue;
}
}
// Here for non-transactional and prepared transactional operations
// to set up the messageQueueMap entries. Note that at this point
// a committed transactional operation looks like a
// non-transactional one as far as the QueueEntry is
// concerned - just do it. If this is an entry enqueuing a
// message, just add it to the entries list. If it's a dequeue
// operation, locate the matching entry for the queue and delete
// it if the current op is non-transactional; if it's a prepared
// transaction then replace the existing entry with the current
// one that notes the message is enqueued but being removed under
// a prepared transaction.
QPID_LOG(debug, dir + " at queue " << entry.queueId);
if (op->op == MessageLog::RECOVERED_ENQUEUE) {
entries.push_back(entry);
m->where.push_back(loc);
}
else {
std::vector<QueueEntry>::iterator i = entries.begin();
while (i != entries.end()) {
if (i->queueId == entry.queueId) {
if (entry.tplStatus != QueueEntry::NONE)
*i = entry;
else
entries.erase(i);
break;
}
++i;
}
std::list<MessageInfo::Location>::iterator w = m->where.begin();
while (w != m->where.end()) {
if (w->queueId == loc.queueId) {
if (loc.transaction.get() != 0) {
*w = loc;
++w;
}
else {
w = m->where.erase(w);
}
}
}
}
}
// Now that all the queue entries have been set correctly, see if
// there are any entries; they may have all been removed during
// recovery. If there are none, add this message to the homeless
// list to be deleted from the log after the recovery is done.
if (m->where.size() == 0) {
homeless.push_back(msgId);
messageMap.erase(msgId);
messageQueueMap.erase(msgId);
}
else {
std::pair<uint64_t, MessageInfo::shared_ptr> p(msgId, m);
messages.insert(p);
}
}
QPID_LOG(debug, "Message log recovery done.");
// Done! Ok, go back and delete all the homeless messages.
BOOST_FOREACH(uint64_t msg, homeless) {
QPID_LOG(debug, "Deleting homeless message " << msg);
remove(msg);
}
}
// Expunge is called when a queue is deleted. All references to that
// queue must be expunged from all messages. 'Dequeue' log records are
// written for each queue entry removed, but any errors are swallowed.
// On recovery there's a list of valid queues passed in. The deleted
// queue will not be on that list so if any references to it are
// recovered they'll get weeded out then.
void
Messages::expunge(uint64_t queueId)
{
std::vector<uint64_t> toBeDeleted; // Messages to be deleted later.
{
// Lock everybody out since all messages are possibly in play.
// There also may be other threads already working on particular
// messages so individual message mutex still must be acquired.
qpid::sys::ScopedWlock<qpid::sys::RWlock> l(lock);
MessageMap::iterator m;
for (m = messages.begin(); m != messages.end(); ++m) {
MessageInfo::shared_ptr p = m->second;
{
qpid::sys::ScopedLock<qpid::sys::Mutex> ml(p->whereLock);
std::list<MessageInfo::Location>::iterator i = p->where.begin();
while (i != p->where.end()) {
if (i->queueId != queueId) {
++i;
continue;
}
// If this entry is involved in a transaction, unenroll it.
// Then remove the entry.
if (i->transaction.get() != 0)
i->transaction->unenroll(m->first);
i = p->where.erase(i);
try {
log.recordDequeue(m->first, queueId, 0);
}
catch(...) {
}
}
if (p->where.size() == 0)
toBeDeleted.push_back(m->first);
}
}
}
// Swallow any exceptions during this; don't care. Recover it later
// if needed.
try {
BOOST_FOREACH(uint64_t msg, toBeDeleted)
remove(msg);
}
catch(...) {
}
}
// Remove a specified message from those controlled by this object.
void
Messages::remove(uint64_t messageId)
{
uint64_t newFirstId = 0;
{
qpid::sys::ScopedWlock<qpid::sys::RWlock> l(lock);
messages.erase(messageId);
// May have deleted the first entry; if so the log can release that.
// If this message being deleted results in an empty list of
// messages, move the tail up to this message's LSN. This may
// result in one or more messages being stranded in the log
// until there's more activity. If a restart happens while these
// unneeded log records are there, the presence of the MessageDelete
// entry will cause the message(s) to be ignored anyway.
if (messages.empty())
newFirstId = messageId;
else if (messages.begin()->first > messageId)
newFirstId = messages.begin()->first;
}
log.deleteMessage(messageId, newFirstId);
}
}}} // namespace qpid::store::ms_clfs