| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| #include "MessageStorePlugin.h" |
| #include "StorageProvider.h" |
| #include "StoreException.h" |
| #include "qpid/broker/Broker.h" |
| #include "qpid/Plugin.h" |
| #include "qpid/Options.h" |
| #include "qpid/DataDir.h" |
| #include "qpid/log/Statement.h" |
| |
| namespace qpid { |
| namespace store { |
| |
| /* |
| * The MessageStore pointer given to the Broker points to static storage. |
| * Thus, it cannot be deleted, especially by the broker. To prevent deletion, |
| * this no-op deleter is used with the boost::shared_ptr. When the last |
| * shared_ptr is destroyed, the deleter is called rather than delete(). |
| */ |
| namespace { |
| class NoopDeleter { |
| public: |
| NoopDeleter() {} |
| void operator()(qpid::broker::MessageStore * /*p*/) {} |
| }; |
| } |
| |
| static MessageStorePlugin static_instance_registers_plugin; |
| |
| |
| MessageStorePlugin::StoreOptions::StoreOptions(const std::string& name) : |
| qpid::Options(name) |
| { |
| addOptions() |
| ("storage-provider", qpid::optValue(providerName, "PROVIDER"), |
| "Name of the storage provider to use.") |
| ; |
| } |
| |
| |
| void |
| MessageStorePlugin::earlyInitialize (qpid::Plugin::Target& target) |
| { |
| qpid::broker::Broker* b = |
| dynamic_cast<qpid::broker::Broker*>(&target); |
| if (0 == b) |
| return; // Only listen to Broker targets |
| |
| broker = b; |
| |
| // See if there are any storage provider plugins ready. If not, we can't |
| // do a message store. |
| qpid::Plugin::earlyInitAll(*this); |
| |
| if (providers.empty()) { |
| QPID_LOG(warning, |
| "Message store plugin: No storage providers available."); |
| provider = providers.end(); |
| return; |
| } |
| if (!options.providerName.empty()) { |
| // If specific one was chosen, locate it in loaded set of providers. |
| provider = providers.find(options.providerName); |
| if (provider == providers.end()) |
| throw Exception("Message store plugin: storage provider '" + |
| options.providerName + |
| "' does not exist."); |
| } |
| else { |
| // No specific provider chosen; if there's only one, use it. Else |
| // report the need to pick one. |
| if (providers.size() > 1) { |
| provider = providers.end(); |
| throw Exception("Message store plugin: multiple provider plugins " |
| "loaded; must either load only one or select one " |
| "using --storage-provider"); |
| } |
| provider = providers.begin(); |
| } |
| |
| provider->second->activate(*this); |
| NoopDeleter d; |
| boost::shared_ptr<qpid::broker::MessageStore> sp(this, d); |
| broker->setStore(sp); |
| target.addFinalizer(boost::bind(&MessageStorePlugin::finalizeMe, this)); |
| } |
| |
| void |
| MessageStorePlugin::initialize(qpid::Plugin::Target& target) |
| { |
| qpid::broker::Broker* broker = |
| dynamic_cast<qpid::broker::Broker*>(&target); |
| if (0 == broker) |
| return; // Only listen to Broker targets |
| |
| // Pass along the initialize step to the provider that's activated. |
| if (provider != providers.end()) { |
| provider->second->initialize(*this); |
| } |
| // qpid::Plugin::initializeAll(*this); |
| } |
| |
| void |
| MessageStorePlugin::finalizeMe() |
| { |
| finalize(); // Call finalizers on any Provider plugins |
| } |
| |
| void |
| MessageStorePlugin::providerAvailable(const std::string name, |
| StorageProvider *be) |
| { |
| ProviderMap::value_type newSp(name, be); |
| std::pair<ProviderMap::iterator, bool> inserted = providers.insert(newSp); |
| if (inserted.second == false) |
| QPID_LOG(warning, "Storage provider " << name << " duplicate; ignored."); |
| } |
| |
| void |
| MessageStorePlugin::truncateInit(const bool /*saveStoreContent*/) |
| { |
| QPID_LOG(info, "Store: truncateInit"); |
| } |
| |
| |
| /** |
| * Record the existence of a durable queue |
| */ |
| void |
| MessageStorePlugin::create(broker::PersistableQueue& queue, |
| const framing::FieldTable& args) |
| { |
| if (queue.getName().size() == 0) |
| { |
| QPID_LOG(error, |
| "Cannot create store for empty (null) queue name - " |
| "ignoring and attempting to continue."); |
| return; |
| } |
| if (queue.getPersistenceId()) { |
| THROW_STORE_EXCEPTION("Queue already created: " + queue.getName()); |
| } |
| provider->second->create(queue, args); |
| } |
| |
| /** |
| * Destroy a durable queue |
| */ |
| void |
| MessageStorePlugin::destroy(broker::PersistableQueue& queue) |
| { |
| provider->second->destroy(queue); |
| } |
| |
| /** |
| * Record the existence of a durable exchange |
| */ |
| void |
| MessageStorePlugin::create(const broker::PersistableExchange& exchange, |
| const framing::FieldTable& args) |
| { |
| if (exchange.getPersistenceId()) { |
| THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName()); |
| } |
| provider->second->create(exchange, args); |
| } |
| |
| /** |
| * Destroy a durable exchange |
| */ |
| void |
| MessageStorePlugin::destroy(const broker::PersistableExchange& exchange) |
| { |
| provider->second->destroy(exchange); |
| } |
| |
| /** |
| * Record a binding |
| */ |
| void |
| MessageStorePlugin::bind(const broker::PersistableExchange& exchange, |
| const broker::PersistableQueue& queue, |
| const std::string& key, |
| const framing::FieldTable& args) |
| { |
| provider->second->bind(exchange, queue, key, args); |
| } |
| |
| /** |
| * Forget a binding |
| */ |
| void |
| MessageStorePlugin::unbind(const broker::PersistableExchange& exchange, |
| const broker::PersistableQueue& queue, |
| const std::string& key, |
| const framing::FieldTable& args) |
| { |
| provider->second->unbind(exchange, queue, key, args); |
| } |
| |
| /** |
| * Record generic durable configuration |
| */ |
| void |
| MessageStorePlugin::create(const broker::PersistableConfig& config) |
| { |
| if (config.getPersistenceId()) { |
| THROW_STORE_EXCEPTION("Config item already created: " + |
| config.getName()); |
| } |
| provider->second->create(config); |
| } |
| |
| /** |
| * Destroy generic durable configuration |
| */ |
| void |
| MessageStorePlugin::destroy(const broker::PersistableConfig& config) |
| { |
| provider->second->destroy(config); |
| } |
| |
| /** |
| * Stores a message before it has been enqueued |
| * (enqueueing automatically stores the message so this is |
| * only required if storage is required prior to that |
| * point). |
| */ |
| void |
| MessageStorePlugin::stage(const boost::intrusive_ptr<broker::PersistableMessage>& msg) |
| { |
| if (msg->getPersistenceId() == 0 && !msg->isContentReleased()) { |
| provider->second->stage(msg); |
| } |
| } |
| |
| /** |
| * Destroys a previously staged message. This only needs |
| * to be called if the message is never enqueued. (Once |
| * enqueued, deletion will be automatic when the message |
| * is dequeued from all queues it was enqueued onto). |
| */ |
| void |
| MessageStorePlugin::destroy(broker::PersistableMessage& msg) |
| { |
| if (msg.getPersistenceId()) |
| provider->second->destroy(msg); |
| } |
| |
| /** |
| * Appends content to a previously staged message |
| */ |
| void |
| MessageStorePlugin::appendContent |
| (const boost::intrusive_ptr<const broker::PersistableMessage>& msg, |
| const std::string& data) |
| { |
| if (msg->getPersistenceId()) |
| provider->second->appendContent(msg, data); |
| else |
| THROW_STORE_EXCEPTION("Cannot append content. Message not known to store!"); |
| } |
| |
| /** |
| * Loads (a section) of content data for the specified |
| * message (previously stored through a call to stage or |
| * enqueue) into data. The offset refers to the content |
| * only (i.e. an offset of 0 implies that the start of the |
| * content should be loaded, not the headers or related |
| * meta-data). |
| */ |
| void |
| MessageStorePlugin::loadContent(const broker::PersistableQueue& queue, |
| const boost::intrusive_ptr<const broker::PersistableMessage>& msg, |
| std::string& data, |
| uint64_t offset, |
| uint32_t length) |
| { |
| if (msg->getPersistenceId()) |
| provider->second->loadContent(queue, msg, data, offset, length); |
| else |
| THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!"); |
| } |
| |
| /** |
| * Enqueues a message, storing the message if it has not |
| * been previously stored and recording that the given |
| * message is on the given queue. |
| * |
| * Note: The operation is asynchronous so the return of this function does |
| * not mean the operation is complete. |
| */ |
| void |
| MessageStorePlugin::enqueue(broker::TransactionContext* ctxt, |
| const boost::intrusive_ptr<broker::PersistableMessage>& msg, |
| const broker::PersistableQueue& queue) |
| { |
| if (queue.getPersistenceId() == 0) { |
| THROW_STORE_EXCEPTION("Queue not created: " + queue.getName()); |
| } |
| provider->second->enqueue(ctxt, msg, queue); |
| } |
| |
| /** |
| * Dequeues a message, recording that the given message is |
| * no longer on the given queue and deleting the message |
| * if it is no longer on any other queue. |
| * |
| * Note: The operation is asynchronous so the return of this function does |
| * not mean the operation is complete. |
| */ |
| void |
| MessageStorePlugin::dequeue(broker::TransactionContext* ctxt, |
| const boost::intrusive_ptr<broker::PersistableMessage>& msg, |
| const broker::PersistableQueue& queue) |
| { |
| provider->second->dequeue(ctxt, msg, queue); |
| } |
| |
| /** |
| * Flushes all async messages to disk for the specified queue |
| * |
| * Note: The operation is asynchronous so the return of this function does |
| * not mean the operation is complete. |
| */ |
| void |
| MessageStorePlugin::flush(const broker::PersistableQueue& queue) |
| { |
| provider->second->flush(queue); |
| } |
| |
| /** |
| * Returns the number of outstanding AIO's for a given queue |
| * |
| * If 0, than all the enqueue / dequeues have been stored |
| * to disk. |
| */ |
| uint32_t |
| MessageStorePlugin::outstandingQueueAIO(const broker::PersistableQueue& queue) |
| { |
| return provider->second->outstandingQueueAIO(queue); |
| } |
| |
| std::auto_ptr<broker::TransactionContext> |
| MessageStorePlugin::begin() |
| { |
| return provider->second->begin(); |
| } |
| |
| std::auto_ptr<broker::TPCTransactionContext> |
| MessageStorePlugin::begin(const std::string& xid) |
| { |
| return provider->second->begin(xid); |
| } |
| |
| void |
| MessageStorePlugin::prepare(broker::TPCTransactionContext& ctxt) |
| { |
| provider->second->prepare(ctxt); |
| } |
| |
| void |
| MessageStorePlugin::commit(broker::TransactionContext& ctxt) |
| { |
| provider->second->commit(ctxt); |
| } |
| |
| void |
| MessageStorePlugin::abort(broker::TransactionContext& ctxt) |
| { |
| provider->second->abort(ctxt); |
| } |
| |
| void |
| MessageStorePlugin::collectPreparedXids(std::set<std::string>& xids) |
| { |
| provider->second->collectPreparedXids(xids); |
| } |
| |
| /** |
| * Request recovery of queue and message state; inherited from Recoverable |
| */ |
| void |
| MessageStorePlugin::recover(broker::RecoveryManager& recoverer) |
| { |
| ExchangeMap exchanges; |
| QueueMap queues; |
| MessageMap messages; |
| MessageQueueMap messageQueueMap; |
| std::vector<std::string> xids; |
| PreparedTransactionMap dtxMap; |
| |
| provider->second->recoverConfigs(recoverer); |
| provider->second->recoverExchanges(recoverer, exchanges); |
| provider->second->recoverQueues(recoverer, queues); |
| provider->second->recoverBindings(recoverer, exchanges, queues); |
| // Important to recover messages before transactions in the SQL-CLFS |
| // case. If this becomes a problem, it may be possible to resolve it. |
| // If in doubt please raise a jira and notify Steve Huston |
| // <shuston@riverace.com>. |
| provider->second->recoverMessages(recoverer, messages, messageQueueMap); |
| provider->second->recoverTransactions(recoverer, dtxMap); |
| // Enqueue msgs where needed. |
| for (MessageQueueMap::const_iterator i = messageQueueMap.begin(); |
| i != messageQueueMap.end(); |
| ++i) { |
| // Locate the message corresponding to the current message Id |
| MessageMap::const_iterator iMsg = messages.find(i->first); |
| if (iMsg == messages.end()) { |
| std::ostringstream oss; |
| oss << "No matching message trying to re-enqueue message " |
| << i->first; |
| THROW_STORE_EXCEPTION(oss.str()); |
| } |
| broker::RecoverableMessage::shared_ptr msg = iMsg->second; |
| // Now for each queue referenced in the queue map, locate it |
| // and re-enqueue the message. |
| for (std::vector<QueueEntry>::const_iterator j = i->second.begin(); |
| j != i->second.end(); |
| ++j) { |
| // Locate the queue corresponding to the current queue Id |
| QueueMap::const_iterator iQ = queues.find(j->queueId); |
| if (iQ == queues.end()) { |
| std::ostringstream oss; |
| oss << "No matching queue trying to re-enqueue message " |
| << " on queue Id " << j->queueId; |
| THROW_STORE_EXCEPTION(oss.str()); |
| } |
| // Messages involved in prepared transactions have their status |
| // updated accordingly. First, though, restore a message that |
| // is expected to be on a queue, including non-transacted |
| // messages and those pending dequeue in a dtx. |
| if (j->tplStatus != QueueEntry::ADDING) |
| iQ->second->recover(msg); |
| switch(j->tplStatus) { |
| case QueueEntry::ADDING: |
| dtxMap[j->xid]->enqueue(iQ->second, msg); |
| break; |
| case QueueEntry::REMOVING: |
| dtxMap[j->xid]->dequeue(iQ->second, msg); |
| break; |
| default: |
| break; |
| } |
| } |
| } |
| } |
| |
| }} // namespace qpid::store |