blob: 002510067560cca1c724ddf4c6441c4e5819a70f [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#ifndef QPID_LINEARSTORE_MESSAGESTOREIMPL_H
#define QPID_LINEARSTORE_MESSAGESTOREIMPL_H
#include "qpid/broker/MessageStore.h"
#include "qpid/Options.h"
#include "qpid/linearstore/IdSequence.h"
#include "qpid/linearstore/JournalLogImpl.h"
#include "qpid/linearstore/journal/jcfg.h"
#include "qpid/linearstore/journal/EmptyFilePoolTypes.h"
#include "qpid/linearstore/PreparedTransaction.h"
#include "qpid/sys/Time.h"
#include "qmf/org/apache/qpid/linearstore/Store.h"
#include <iomanip>
// Assume DB_VERSION_MAJOR == 4
#if (DB_VERSION_MINOR == 2)
#include <errno.h>
#define DB_BUFFER_SMALL ENOMEM
#endif
class Db;
class DbEnv;
class Dbt;
class DbTxn;
namespace qpid {
namespace broker {
class Broker;
}
namespace sys {
class Timer;
}
namespace linearstore{
namespace journal {
class EmptyFilePool;
class EmptyFilePoolManager;
}
class IdDbt;
class JournalImpl;
class TplJournalImpl;
class TxnCtxt;
/**
* An implementation of the MessageStore interface based on Berkeley DB
*/
class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::management::Manageable
{
public:
typedef boost::shared_ptr<Db> db_ptr;
typedef boost::shared_ptr<DbEnv> dbEnv_ptr;
struct StoreOptions : public qpid::Options {
StoreOptions(const std::string& name="Linear Store Options");
std::string clusterName;
std::string storeDir;
bool truncateFlag;
uint32_t wCachePageSizeKib;
uint16_t wCacheNumPages;
uint32_t tplWCachePageSizeKib;
uint16_t tplWCacheNumPages;
uint16_t efpPartition;
uint64_t efpFileSizeKib;
bool overwriteBeforeReturnFlag;
qpid::sys::Duration journalFlushTimeout;
};
private:
typedef std::map<uint64_t, qpid::broker::RecoverableQueue::shared_ptr> queue_index;
typedef std::map<uint64_t, qpid::broker::RecoverableExchange::shared_ptr> exchange_index;
typedef std::map<uint64_t, qpid::broker::RecoverableMessage::shared_ptr> message_index;
typedef LockedMappings::map txn_lock_map;
typedef boost::ptr_list<PreparedTransaction> txn_list;
typedef std::map<std::string, JournalImpl*> JournalListMap;
typedef JournalListMap::iterator JournalListMapItr;
// Default store settings
static const bool defTruncateFlag = false;
static const uint32_t defWCachePageSizeKib = QLS_WMGR_DEF_PAGE_SIZE_KIB;
static const uint16_t defWCacheNumPages = QLS_WMGR_DEF_NUM_PAGES;
static const uint32_t defTplWCachePageSizeKib = QLS_WMGR_DEF_TPL_PAGE_SIZE_KIB;
static const uint16_t defTplWCacheNumPages = QLS_WMGR_DEF_TPL_NUM_PAGES;
static const uint16_t defEfpPartition = 1;
static const uint64_t defEfpFileSizeKib = 512 * QLS_SBLK_SIZE_KIB;
static const bool defOverwriteBeforeReturnFlag = false;
static const std::string storeTopLevelDir;
// FIXME aconway 2010-03-09: was 10ms
static const uint64_t defJournalGetEventsTimeoutNs = 1 * 1000000; // 1ms
static const uint64_t defJournalFlushTimeoutNs = 500 * 1000000; // 500ms
std::list<db_ptr> dbs;
dbEnv_ptr dbenv;
db_ptr queueDb;
db_ptr configDb;
db_ptr exchangeDb;
db_ptr mappingDb;
db_ptr bindingDb;
db_ptr generalDb;
// Pointer to Transaction Prepared List (TPL) journal instance
boost::shared_ptr<TplJournalImpl> tplStorePtr;
qpid::sys::Mutex tplInitLock;
JournalListMap journalList;
qpid::sys::Mutex journalListLock;
qpid::sys::Mutex bdbLock;
IdSequence queueIdSequence;
IdSequence exchangeIdSequence;
IdSequence generalIdSequence;
IdSequence messageIdSequence;
std::string storeDir;
qpid::linearstore::journal::efpPartitionNumber_t defaultEfpPartitionNumber;
qpid::linearstore::journal::efpDataSize_kib_t defaultEfpFileSize_kib;
bool overwriteBeforeReturnFlag;
uint32_t wCachePgSizeSblks;
uint16_t wCacheNumPages;
uint32_t tplWCachePgSizeSblks;
uint16_t tplWCacheNumPages;
uint64_t highestRid;
qpid::sys::Duration journalFlushTimeout;
bool isInit;
const char* envPath;
qpid::broker::Broker* broker;
JournalLogImpl jrnlLog;
boost::shared_ptr<qpid::linearstore::journal::EmptyFilePoolManager> efpMgr;
qmf::org::apache::qpid::linearstore::Store::shared_ptr mgmtObject;
qpid::management::ManagementAgent* agent;
// Parameter validation and calculation
static uint32_t chkJrnlWrPageCacheSize(const uint32_t param,
const std::string& paramName);
static uint16_t chkJrnlWrCacheNumPages(const uint16_t param,
const std::string& paramName);
static qpid::linearstore::journal::efpPartitionNumber_t chkEfpPartition(const qpid::linearstore::journal::efpPartitionNumber_t partition,
const std::string& paramName);
static qpid::linearstore::journal::efpDataSize_kib_t chkEfpFileSizeKiB(const qpid::linearstore::journal::efpDataSize_kib_t efpFileSizeKiB,
const std::string& paramName);
void init(const bool truncateFlag);
void recoverQueues(TxnCtxt& txn,
qpid::broker::RecoveryManager& recovery,
queue_index& index,
txn_list& locked,
message_index& messages);
void recoverMessages(TxnCtxt& txn,
qpid::broker::RecoveryManager& recovery,
queue_index& index,
txn_list& locked,
message_index& prepared);
void recoverMessages(TxnCtxt& txn,
qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr& queue,
txn_list& locked,
message_index& prepared,
long& rcnt,
long& idcnt);
qpid::broker::RecoverableMessage::shared_ptr getExternMessage(qpid::broker::RecoveryManager& recovery,
uint64_t mId,
unsigned& headerSize);
void recoverExchanges(TxnCtxt& txn,
qpid::broker::RecoveryManager& recovery,
exchange_index& index);
void recoverBindings(TxnCtxt& txn,
exchange_index& exchanges,
queue_index& queues);
void recoverGeneral(TxnCtxt& txn,
qpid::broker::RecoveryManager& recovery);
int enqueueMessage(TxnCtxt& txn,
IdDbt& msgId,
qpid::broker::RecoverableMessage::shared_ptr& msg,
queue_index& index,
txn_list& locked,
message_index& prepared);
void recoverTplStore();
void recoverLockedMappings(txn_list& txns);
TxnCtxt* check(qpid::broker::TransactionContext* ctxt);
uint64_t msgEncode(std::vector<char>& buff, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message);
void store(const qpid::broker::PersistableQueue* queue,
TxnCtxt* txn,
const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message);
void async_dequeue(qpid::broker::TransactionContext* ctxt,
const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
const qpid::broker::PersistableQueue& queue);
void destroy(db_ptr db,
const qpid::broker::Persistable& p);
bool create(db_ptr db,
IdSequence& seq,
const qpid::broker::Persistable& p);
void completed(TxnCtxt& txn,
bool commit);
void deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue);
void deleteBinding(const qpid::broker::PersistableExchange& exchange,
const qpid::broker::PersistableQueue& queue,
const std::string& key);
void put(db_ptr db,
DbTxn* txn,
Dbt& key,
Dbt& value);
void open(db_ptr db,
DbTxn* txn,
const char* file,
bool dupKey);
void closeDbs();
// journal functions
void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
std::string getJrnlDir(const std::string& queueName);
qpid::linearstore::journal::EmptyFilePool* getEmptyFilePool(const qpid::linearstore::journal::efpPartitionNumber_t p, const qpid::linearstore::journal::efpDataSize_kib_t s);
qpid::linearstore::journal::EmptyFilePool* getEmptyFilePool(const qpid::framing::FieldTable& args, std::ostringstream& oss);
std::string getStoreTopLevelDir();
std::string getJrnlBaseDir();
std::string getBdbBaseDir();
std::string getTplBaseDir();
inline void checkInit() {
// TODO: change the default dir to ~/.qpidd
if (!isInit) { init("/tmp"); isInit = true; }
}
void chkTplStoreInit();
public:
typedef boost::shared_ptr<MessageStoreImpl> shared_ptr;
MessageStoreImpl(qpid::broker::Broker* broker, const char* envpath = 0);
virtual ~MessageStoreImpl();
bool init(const qpid::Options* options);
bool init(const std::string& dir,
qpid::linearstore::journal::efpPartitionNumber_t efpPartition = defEfpPartition,
qpid::linearstore::journal::efpDataSize_kib_t efpFileSizeKib = defEfpFileSizeKib,
const bool truncateFlag = false,
uint32_t wCachePageSize = defWCachePageSizeKib,
uint16_t wCacheNumPages = defWCacheNumPages,
uint32_t tplWCachePageSize = defTplWCachePageSizeKib,
uint16_t tplWCacheNumPages = defTplWCacheNumPages,
const bool overwriteBeforeReturnFlag_ = false);
void truncateInit();
void initManagement ();
void finalize();
// --- Implementation of qpid::broker::MessageStore ---
void create(qpid::broker::PersistableQueue& queue,
const qpid::framing::FieldTable& args);
void destroy(qpid::broker::PersistableQueue& queue);
void create(const qpid::broker::PersistableExchange& queue,
const qpid::framing::FieldTable& args);
void destroy(const qpid::broker::PersistableExchange& queue);
void bind(const qpid::broker::PersistableExchange& exchange,
const qpid::broker::PersistableQueue& queue,
const std::string& key,
const qpid::framing::FieldTable& args);
void unbind(const qpid::broker::PersistableExchange& exchange,
const qpid::broker::PersistableQueue& queue,
const std::string& key,
const qpid::framing::FieldTable& args);
void create(const qpid::broker::PersistableConfig& config);
void destroy(const qpid::broker::PersistableConfig& config);
void recover(qpid::broker::RecoveryManager& queues);
void stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg);
void destroy(qpid::broker::PersistableMessage& msg);
void appendContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
const std::string& data);
void loadContent(const qpid::broker::PersistableQueue& queue,
const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
std::string& data,
uint64_t offset,
uint32_t length);
void enqueue(qpid::broker::TransactionContext* ctxt,
const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
const qpid::broker::PersistableQueue& queue);
void dequeue(qpid::broker::TransactionContext* ctxt,
const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
const qpid::broker::PersistableQueue& queue);
void flush(const qpid::broker::PersistableQueue& queue);
inline uint32_t outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/) { return 0; }; // TODO: Deprecate this call
void collectPreparedXids(std::set<std::string>& xids);
std::auto_ptr<qpid::broker::TransactionContext> begin();
std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid);
void prepare(qpid::broker::TPCTransactionContext& ctxt);
void localPrepare(TxnCtxt* ctxt);
void commit(qpid::broker::TransactionContext& ctxt);
void abort(qpid::broker::TransactionContext& ctxt);
// --- Implementation of qpid::management::Managable ---
qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
{ return mgmtObject; }
inline qpid::management::Manageable::status_t ManagementMethod (uint32_t, qpid::management::Args&, std::string&)
{ return qpid::management::Manageable::STATUS_OK; }
std::string getStoreDir() const;
private:
void journalDeleted(JournalImpl&);
}; // class MessageStoreImpl
} // namespace msgstore
} // namespace mrg
#endif // ifndef QPID_LINEARSTORE_MESSAGESTOREIMPL_H