blob: 20231bf9105e5ea890950058894e137711653cba [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "MessageStorePlugin.h"
#include "StorageProvider.h"
#include "StoreException.h"
#include "qpid/broker/Broker.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
#include "qpid/DataDir.h"
#include "qpid/log/Statement.h"
namespace qpid {
namespace store {
/*
* The MessageStore pointer given to the Broker points to static storage.
* Thus, it cannot be deleted, especially by the broker. To prevent deletion,
* this no-op deleter is used with the boost::shared_ptr. When the last
* shared_ptr is destroyed, the deleter is called rather than delete().
*/
namespace {
class NoopDeleter {
public:
NoopDeleter() {}
void operator()(qpid::broker::MessageStore * /*p*/) {}
};
}
static MessageStorePlugin static_instance_registers_plugin;
MessageStorePlugin::StoreOptions::StoreOptions(const std::string& name) :
qpid::Options(name)
{
addOptions()
("storage-provider", qpid::optValue(providerName, "PROVIDER"),
"Name of the storage provider to use.")
;
}
void
MessageStorePlugin::earlyInitialize (qpid::Plugin::Target& target)
{
qpid::broker::Broker* b =
dynamic_cast<qpid::broker::Broker*>(&target);
if (0 == b)
return; // Only listen to Broker targets
broker = b;
// See if there are any storage provider plugins ready. If not, we can't
// do a message store.
qpid::Plugin::earlyInitAll(*this);
if (providers.empty()) {
QPID_LOG(warning,
"Message store plugin: No storage providers available.");
provider = providers.end();
return;
}
if (!options.providerName.empty()) {
// If specific one was chosen, locate it in loaded set of providers.
provider = providers.find(options.providerName);
if (provider == providers.end())
throw Exception("Message store plugin: storage provider '" +
options.providerName +
"' does not exist.");
}
else {
// No specific provider chosen; if there's only one, use it. Else
// report the need to pick one.
if (providers.size() > 1) {
provider = providers.end();
throw Exception("Message store plugin: multiple provider plugins "
"loaded; must either load only one or select one "
"using --storage-provider");
}
provider = providers.begin();
}
provider->second->activate(*this);
NoopDeleter d;
boost::shared_ptr<qpid::broker::MessageStore> sp(this, d);
broker->setStore(sp);
target.addFinalizer(boost::bind(&MessageStorePlugin::finalizeMe, this));
}
void
MessageStorePlugin::initialize(qpid::Plugin::Target& target)
{
qpid::broker::Broker* broker =
dynamic_cast<qpid::broker::Broker*>(&target);
if (0 == broker)
return; // Only listen to Broker targets
// Pass along the initialize step to the provider that's activated.
if (provider != providers.end()) {
provider->second->initialize(*this);
}
// qpid::Plugin::initializeAll(*this);
}
void
MessageStorePlugin::finalizeMe()
{
finalize(); // Call finalizers on any Provider plugins
}
void
MessageStorePlugin::providerAvailable(const std::string name,
StorageProvider *be)
{
ProviderMap::value_type newSp(name, be);
std::pair<ProviderMap::iterator, bool> inserted = providers.insert(newSp);
if (inserted.second == false)
QPID_LOG(warning, "Storage provider " << name << " duplicate; ignored.");
}
void
MessageStorePlugin::truncateInit(const bool /*saveStoreContent*/)
{
QPID_LOG(info, "Store: truncateInit");
}
/**
* Record the existence of a durable queue
*/
void
MessageStorePlugin::create(broker::PersistableQueue& queue,
const framing::FieldTable& args)
{
if (queue.getName().size() == 0)
{
QPID_LOG(error,
"Cannot create store for empty (null) queue name - "
"ignoring and attempting to continue.");
return;
}
if (queue.getPersistenceId()) {
THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
}
provider->second->create(queue, args);
}
/**
* Destroy a durable queue
*/
void
MessageStorePlugin::destroy(broker::PersistableQueue& queue)
{
provider->second->destroy(queue);
}
/**
* Record the existence of a durable exchange
*/
void
MessageStorePlugin::create(const broker::PersistableExchange& exchange,
const framing::FieldTable& args)
{
if (exchange.getPersistenceId()) {
THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName());
}
provider->second->create(exchange, args);
}
/**
* Destroy a durable exchange
*/
void
MessageStorePlugin::destroy(const broker::PersistableExchange& exchange)
{
provider->second->destroy(exchange);
}
/**
* Record a binding
*/
void
MessageStorePlugin::bind(const broker::PersistableExchange& exchange,
const broker::PersistableQueue& queue,
const std::string& key,
const framing::FieldTable& args)
{
provider->second->bind(exchange, queue, key, args);
}
/**
* Forget a binding
*/
void
MessageStorePlugin::unbind(const broker::PersistableExchange& exchange,
const broker::PersistableQueue& queue,
const std::string& key,
const framing::FieldTable& args)
{
provider->second->unbind(exchange, queue, key, args);
}
/**
* Record generic durable configuration
*/
void
MessageStorePlugin::create(const broker::PersistableConfig& config)
{
if (config.getPersistenceId()) {
THROW_STORE_EXCEPTION("Config item already created: " +
config.getName());
}
provider->second->create(config);
}
/**
* Destroy generic durable configuration
*/
void
MessageStorePlugin::destroy(const broker::PersistableConfig& config)
{
provider->second->destroy(config);
}
/**
* Stores a message before it has been enqueued
* (enqueueing automatically stores the message so this is
* only required if storage is required prior to that
* point).
*/
void
MessageStorePlugin::stage(const boost::intrusive_ptr<broker::PersistableMessage>& msg)
{
if (msg->getPersistenceId() == 0 && !msg->isContentReleased()) {
provider->second->stage(msg);
}
}
/**
* Destroys a previously staged message. This only needs
* to be called if the message is never enqueued. (Once
* enqueued, deletion will be automatic when the message
* is dequeued from all queues it was enqueued onto).
*/
void
MessageStorePlugin::destroy(broker::PersistableMessage& msg)
{
if (msg.getPersistenceId())
provider->second->destroy(msg);
}
/**
* Appends content to a previously staged message
*/
void
MessageStorePlugin::appendContent
(const boost::intrusive_ptr<const broker::PersistableMessage>& msg,
const std::string& data)
{
if (msg->getPersistenceId())
provider->second->appendContent(msg, data);
else
THROW_STORE_EXCEPTION("Cannot append content. Message not known to store!");
}
/**
* Loads (a section) of content data for the specified
* message (previously stored through a call to stage or
* enqueue) into data. The offset refers to the content
* only (i.e. an offset of 0 implies that the start of the
* content should be loaded, not the headers or related
* meta-data).
*/
void
MessageStorePlugin::loadContent(const broker::PersistableQueue& queue,
const boost::intrusive_ptr<const broker::PersistableMessage>& msg,
std::string& data,
uint64_t offset,
uint32_t length)
{
if (msg->getPersistenceId())
provider->second->loadContent(queue, msg, data, offset, length);
else
THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
}
/**
* Enqueues a message, storing the message if it has not
* been previously stored and recording that the given
* message is on the given queue.
*
* Note: The operation is asynchronous so the return of this function does
* not mean the operation is complete.
*/
void
MessageStorePlugin::enqueue(broker::TransactionContext* ctxt,
const boost::intrusive_ptr<broker::PersistableMessage>& msg,
const broker::PersistableQueue& queue)
{
if (queue.getPersistenceId() == 0) {
THROW_STORE_EXCEPTION("Queue not created: " + queue.getName());
}
provider->second->enqueue(ctxt, msg, queue);
}
/**
* Dequeues a message, recording that the given message is
* no longer on the given queue and deleting the message
* if it is no longer on any other queue.
*
* Note: The operation is asynchronous so the return of this function does
* not mean the operation is complete.
*/
void
MessageStorePlugin::dequeue(broker::TransactionContext* ctxt,
const boost::intrusive_ptr<broker::PersistableMessage>& msg,
const broker::PersistableQueue& queue)
{
provider->second->dequeue(ctxt, msg, queue);
}
/**
* Flushes all async messages to disk for the specified queue
*
* Note: The operation is asynchronous so the return of this function does
* not mean the operation is complete.
*/
void
MessageStorePlugin::flush(const broker::PersistableQueue& queue)
{
provider->second->flush(queue);
}
/**
* Returns the number of outstanding AIO's for a given queue
*
* If 0, than all the enqueue / dequeues have been stored
* to disk.
*/
uint32_t
MessageStorePlugin::outstandingQueueAIO(const broker::PersistableQueue& queue)
{
return provider->second->outstandingQueueAIO(queue);
}
std::auto_ptr<broker::TransactionContext>
MessageStorePlugin::begin()
{
return provider->second->begin();
}
std::auto_ptr<broker::TPCTransactionContext>
MessageStorePlugin::begin(const std::string& xid)
{
return provider->second->begin(xid);
}
void
MessageStorePlugin::prepare(broker::TPCTransactionContext& ctxt)
{
provider->second->prepare(ctxt);
}
void
MessageStorePlugin::commit(broker::TransactionContext& ctxt)
{
provider->second->commit(ctxt);
}
void
MessageStorePlugin::abort(broker::TransactionContext& ctxt)
{
provider->second->abort(ctxt);
}
void
MessageStorePlugin::collectPreparedXids(std::set<std::string>& xids)
{
provider->second->collectPreparedXids(xids);
}
/**
* Request recovery of queue and message state; inherited from Recoverable
*/
void
MessageStorePlugin::recover(broker::RecoveryManager& recoverer)
{
ExchangeMap exchanges;
QueueMap queues;
MessageMap messages;
MessageQueueMap messageQueueMap;
std::vector<std::string> xids;
PreparedTransactionMap dtxMap;
provider->second->recoverConfigs(recoverer);
provider->second->recoverExchanges(recoverer, exchanges);
provider->second->recoverQueues(recoverer, queues);
provider->second->recoverBindings(recoverer, exchanges, queues);
// Important to recover messages before transactions in the SQL-CLFS
// case. If this becomes a problem, it may be possible to resolve it.
// If in doubt please raise a jira and notify Steve Huston
// <shuston@riverace.com>.
provider->second->recoverMessages(recoverer, messages, messageQueueMap);
provider->second->recoverTransactions(recoverer, dtxMap);
// Enqueue msgs where needed.
for (MessageQueueMap::const_iterator i = messageQueueMap.begin();
i != messageQueueMap.end();
++i) {
// Locate the message corresponding to the current message Id
MessageMap::const_iterator iMsg = messages.find(i->first);
if (iMsg == messages.end()) {
std::ostringstream oss;
oss << "No matching message trying to re-enqueue message "
<< i->first;
THROW_STORE_EXCEPTION(oss.str());
}
broker::RecoverableMessage::shared_ptr msg = iMsg->second;
// Now for each queue referenced in the queue map, locate it
// and re-enqueue the message.
for (std::vector<QueueEntry>::const_iterator j = i->second.begin();
j != i->second.end();
++j) {
// Locate the queue corresponding to the current queue Id
QueueMap::const_iterator iQ = queues.find(j->queueId);
if (iQ == queues.end()) {
std::ostringstream oss;
oss << "No matching queue trying to re-enqueue message "
<< " on queue Id " << j->queueId;
THROW_STORE_EXCEPTION(oss.str());
}
// Messages involved in prepared transactions have their status
// updated accordingly. First, though, restore a message that
// is expected to be on a queue, including non-transacted
// messages and those pending dequeue in a dtx.
if (j->tplStatus != QueueEntry::ADDING)
iQ->second->recover(msg);
switch(j->tplStatus) {
case QueueEntry::ADDING:
dtxMap[j->xid]->enqueue(iQ->second, msg);
break;
case QueueEntry::REMOVING:
dtxMap[j->xid]->dequeue(iQ->second, msg);
break;
default:
break;
}
}
}
}
}} // namespace qpid::store