| #ifndef QPID_STORE_MESSAGESTOREPLUGIN_H |
| #define QPID_STORE_MESSAGESTOREPLUGIN_H |
| |
| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| #include "qpid/Plugin.h" |
| #include "qpid/Options.h" |
| #include "qpid/broker/MessageStore.h" |
| //#include "qpid/management/Manageable.h" |
| |
| #include <string> |
| |
| using namespace qpid; |
| |
| namespace qpid { |
| |
| namespace broker { |
| class Broker; |
| class PersistableExchange; |
| class PersistableMessage; |
| class PersistableQueue; |
| } |
| |
| namespace store { |
| |
| class StorageProvider; |
| |
| /** |
| * @class MessageStorePlugin |
| * |
| * MessageStorePlugin is the front end of the persistent message store |
| * plugin. It is responsible for coordinating recovery, initialization, |
| * transactions (both local and distributed), flow-to-disk loading and |
| * unloading and persisting broker state (queues, bindings etc.). |
| * Actual storage operations are carried out by a message store storage |
| * provider that implements the qpid::store::StorageProvider interface. |
| */ |
| class MessageStorePlugin : |
| public qpid::Plugin, |
| public qpid::broker::MessageStore, // Frontend classes |
| public qpid::Plugin::Target // Provider target |
| // @TODO Need a mgmt story for this. Maybe allow r/o access to provider store info? public qpid::management::Manageable |
| { |
| public: |
| MessageStorePlugin() : broker(0) {} |
| |
| /** |
| * @name Methods inherited from qpid::Plugin |
| */ |
| //@{ |
| virtual Options* getOptions() { return &options; } |
| virtual void earlyInitialize (Plugin::Target& target); |
| virtual void initialize(Plugin::Target& target); |
| //@} |
| |
| /// Finalizer; calls Target::finalize() to run finalizers on |
| /// StorageProviders. |
| void finalizeMe(); |
| |
| /** |
| * Called by StorageProvider instances during the earlyInitialize sequence. |
| * Each StorageProvider must supply a unique name by which it is known and a |
| * pointer to itself. |
| */ |
| virtual void providerAvailable(const std::string name, StorageProvider *be); |
| |
| /** |
| * @name Methods inherited from qpid::broker::MessageStore |
| */ |
| |
| /** |
| * Record the existence of a durable queue |
| */ |
| virtual void create(broker::PersistableQueue& queue, |
| const framing::FieldTable& args); |
| /** |
| * Destroy a durable queue |
| */ |
| virtual void destroy(broker::PersistableQueue& queue); |
| |
| /** |
| * Record the existence of a durable exchange |
| */ |
| virtual void create(const broker::PersistableExchange& exchange, |
| const framing::FieldTable& args); |
| /** |
| * Destroy a durable exchange |
| */ |
| virtual void destroy(const broker::PersistableExchange& exchange); |
| |
| /** |
| * Record a binding |
| */ |
| virtual void bind(const broker::PersistableExchange& exchange, |
| const broker::PersistableQueue& queue, |
| const std::string& key, |
| const framing::FieldTable& args); |
| |
| /** |
| * Forget a binding |
| */ |
| virtual void unbind(const broker::PersistableExchange& exchange, |
| const broker::PersistableQueue& queue, |
| const std::string& key, |
| const framing::FieldTable& args); |
| |
| /** |
| * Record generic durable configuration |
| */ |
| virtual void create(const broker::PersistableConfig& config); |
| |
| /** |
| * Destroy generic durable configuration |
| */ |
| virtual void destroy(const broker::PersistableConfig& config); |
| |
| /** |
| * Stores a message before it has been enqueued |
| * (enqueueing automatically stores the message so this is |
| * only required if storage is required prior to that |
| * point). If the message has not yet been stored it will |
| * store the headers as well as any content passed in. A |
| * persistence id will be set on the message which can be |
| * used to load the content or to append to it. |
| */ |
| virtual void stage(const boost::intrusive_ptr<broker::PersistableMessage>& msg); |
| |
| /** |
| * Destroys a previously staged message. This only needs |
| * to be called if the message is never enqueued. (Once |
| * enqueued, deletion will be automatic when the message |
| * is dequeued from all queues it was enqueued onto). |
| */ |
| virtual void destroy(broker::PersistableMessage& msg); |
| |
| /** |
| * Appends content to a previously staged message |
| */ |
| virtual void appendContent(const boost::intrusive_ptr<const broker::PersistableMessage>& msg, |
| const std::string& data); |
| |
| /** |
| * Loads (a section) of content data for the specified |
| * message (previously stored through a call to stage or |
| * enqueue) into data. The offset refers to the content |
| * only (i.e. an offset of 0 implies that the start of the |
| * content should be loaded, not the headers or related |
| * meta-data). |
| */ |
| virtual void loadContent(const broker::PersistableQueue& queue, |
| const boost::intrusive_ptr<const broker::PersistableMessage>& msg, |
| std::string& data, |
| uint64_t offset, |
| uint32_t length); |
| |
| /** |
| * Enqueues a message, storing the message if it has not |
| * been previously stored and recording that the given |
| * message is on the given queue. |
| * |
| * Note: The operation is asynchronous so the return of this function does |
| * not mean the operation is complete. |
| * |
| * @param msg the message to enqueue |
| * @param queue the name of the queue onto which it is to be enqueued |
| * @param xid (a pointer to) an identifier of the |
| * distributed transaction in which the operation takes |
| * place or null for 'local' transactions |
| */ |
| virtual void enqueue(broker::TransactionContext* ctxt, |
| const boost::intrusive_ptr<broker::PersistableMessage>& msg, |
| const broker::PersistableQueue& queue); |
| |
| /** |
| * Dequeues a message, recording that the given message is |
| * no longer on the given queue and deleting the message |
| * if it is no longer on any other queue. |
| * |
| * |
| * Note: The operation is asynchronous so the return of this function does |
| * not mean the operation is complete. |
| * |
| * @param msg the message to dequeue |
| * @param queue the name of the queue from which it is to be dequeued |
| * @param xid (a pointer to) an identifier of the |
| * distributed transaction in which the operation takes |
| * place or null for 'local' transactions |
| */ |
| virtual void dequeue(broker::TransactionContext* ctxt, |
| const boost::intrusive_ptr<broker::PersistableMessage>& msg, |
| const broker::PersistableQueue& queue); |
| |
| /** |
| * Flushes all async messages to disk for the specified queue |
| * |
| * |
| * Note: The operation is asynchronous so the return of this function does |
| * not mean the operation is complete. |
| * |
| * @param queue the name of the queue from which it is to be dequeued |
| */ |
| virtual void flush(const broker::PersistableQueue& queue); |
| |
| /** |
| * Returns the number of outstanding AIO's for a given queue |
| * |
| * If 0, than all the enqueue / dequeues have been stored |
| * to disk |
| * |
| * @param queue the name of the queue to check for outstanding AIO |
| */ |
| virtual uint32_t outstandingQueueAIO(const broker::PersistableQueue& queue); |
| //@} |
| |
| /** |
| * @name Methods inherited from qpid::broker::TransactionalStore |
| */ |
| //@{ |
| std::auto_ptr<broker::TransactionContext> begin(); |
| |
| std::auto_ptr<broker::TPCTransactionContext> begin(const std::string& xid); |
| |
| void prepare(broker::TPCTransactionContext& ctxt); |
| |
| void commit(broker::TransactionContext& ctxt); |
| |
| void abort(broker::TransactionContext& ctxt); |
| |
| void collectPreparedXids(std::set<std::string>& xids); |
| //@} |
| |
| /** |
| * Request recovery of queue and message state; inherited from Recoverable |
| */ |
| virtual void recover(broker::RecoveryManager& recoverer); |
| |
| // inline management::Manageable::status_t ManagementMethod (uint32_t, management::Args&, std::string&) |
| // { return management::Manageable::STATUS_OK; } |
| |
| // So storage provider can get the broker info. |
| broker::Broker *getBroker() { return broker; } |
| |
| protected: |
| |
| struct StoreOptions : public qpid::Options { |
| StoreOptions(const std::string& name="Store Options"); |
| std::string providerName; |
| }; |
| StoreOptions options; |
| |
| typedef std::map<std::string, StorageProvider*> ProviderMap; |
| ProviderMap providers; |
| ProviderMap::const_iterator provider; |
| |
| broker::Broker *broker; |
| |
| }; // class MessageStoreImpl |
| |
| }} // namespace qpid::store |
| |
| #endif /* QPID_SERIALIZER_H */ |