blob: 69e9f48a17a0c3188c088aa41af2c97fae33250b [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/legacystore/MessageStoreImpl.h"
#include "qpid/legacystore/BindingDbt.h"
#include "qpid/legacystore/BufferValue.h"
#include "qpid/legacystore/IdDbt.h"
#include "qpid/legacystore/jrnl/txn_map.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
#include "qmf/org/apache/qpid/legacystore/Package.h"
#include "qpid/legacystore/StoreException.h"
#include <dirent.h>
#include <db.h>
#define MAX_AIO_SLEEPS 100000 // tot: ~1 sec
#define AIO_SLEEP_TIME_US 10 // 0.01 ms
namespace _qmf = qmf::org::apache::qpid::legacystore;
namespace mrg {
namespace msgstore {
const std::string MessageStoreImpl::storeTopLevelDir("rhm"); // Sets the top-level store dir name
// FIXME aconway 2010-03-09: was 10
qpid::sys::Duration MessageStoreImpl::defJournalGetEventsTimeout(1 * qpid::sys::TIME_MSEC); // 10ms
qpid::sys::Duration MessageStoreImpl::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s
qpid::sys::Mutex TxnCtxt::globalSerialiser;
MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const u_int64_t _rid,
const bool _deq_flag,
const bool _commit_flag,
const bool _tpc_flag) :
rid(_rid),
deq_flag(_deq_flag),
commit_flag(_commit_flag),
tpc_flag(_tpc_flag)
{}
MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath) :
numJrnlFiles(0),
autoJrnlExpand(false),
autoJrnlExpandMaxFiles(0),
jrnlFsizeSblks(0),
truncateFlag(false),
wCachePgSizeSblks(0),
wCacheNumPages(0),
tplNumJrnlFiles(0),
tplJrnlFsizeSblks(0),
tplWCachePgSizeSblks(0),
tplWCacheNumPages(0),
highestRid(0),
isInit(false),
envPath(envpath),
broker(broker_),
mgmtObject(),
agent(0)
{}
u_int16_t MessageStoreImpl::chkJrnlNumFilesParam(const u_int16_t param, const std::string paramName)
{
u_int16_t p = param;
if (p < JRNL_MIN_NUM_FILES) {
p = JRNL_MIN_NUM_FILES;
QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is below allowable minimum (" << JRNL_MIN_NUM_FILES << "); changing this parameter to minimum value.");
} else if (p > JRNL_MAX_NUM_FILES) {
p = JRNL_MAX_NUM_FILES;
QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is above allowable maximum (" << JRNL_MAX_NUM_FILES << "); changing this parameter to maximum value.");
}
return p;
}
u_int32_t MessageStoreImpl::chkJrnlFileSizeParam(const u_int32_t param, const std::string paramName, const u_int32_t wCachePgSizeSblks)
{
u_int32_t p = param;
u_int32_t min = JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
u_int32_t max = JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE;
if (p < min) {
p = min;
QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is below allowable minimum (" << min << "); changing this parameter to minimum value.");
} else if (p > max) {
p = max;
QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is above allowable maximum (" << max << "); changing this parameter to maximum value.");
}
if (wCachePgSizeSblks > p * JRNL_RMGR_PAGE_SIZE) {
std::ostringstream oss;
oss << "Cannot create store with file size less than write page cache size. [file size = " << p << " (" << (p * JRNL_RMGR_PAGE_SIZE / 2) << " kB); write page cache = " << (wCachePgSizeSblks / 2) << " kB]";
THROW_STORE_EXCEPTION(oss.str());
}
return p;
}
u_int32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const u_int32_t param, const std::string paramName, const u_int16_t jrnlFsizePgs)
{
u_int32_t p = param;
switch (p)
{
case 1:
case 2:
case 4:
case 8:
case 16:
case 32:
case 64:
case 128:
if (jrnlFsizePgs == 1) {
p = 64;
QPID_LOG(warning, "parameter " << paramName << " (" << param << ") cannot set a page size greater than the journal file size; changing this parameter to the journal file size (" << p << ")");
}
break;
default:
if (p == 0) {
// For zero value, use default
p = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_DBLK_SIZE * JRNL_SBLK_SIZE / 1024;
QPID_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to default value (" << p << ")");
} else {
// For any positive value, use closest value
if (p < 6) p = 4;
else if (p < 12) p = 8;
else if (p < 24) p = 16;
else if (p < 48) p = 32;
else if (p < 96) p = 64;
else p = 128;
QPID_LOG(warning, "parameter " << paramName << " (" << param << ") must be a power of 2 between 1 and 128; changing this parameter to closest allowable value (" << p << ")");
}
}
return p;
}
u_int16_t MessageStoreImpl::getJrnlWrNumPages(const u_int32_t wrPageSizeKib)
{
u_int32_t wrPageSizeSblks = wrPageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
u_int32_t defTotWCacheSize = JRNL_WMGR_DEF_PAGE_SIZE * JRNL_WMGR_DEF_PAGES; // in sblks. Currently 2014 sblks (1 MiB).
switch (wrPageSizeKib)
{
case 1:
case 2:
case 4:
// 256 KiB total cache
return defTotWCacheSize / wrPageSizeSblks / 4;
case 8:
case 16:
// 512 KiB total cache
return defTotWCacheSize / wrPageSizeSblks / 2;
default: // 32, 64, 128
// 1 MiB total cache
return defTotWCacheSize / wrPageSizeSblks;
}
}
void MessageStoreImpl::chkJrnlAutoExpandOptions(const StoreOptions* opts,
bool& autoJrnlExpand,
u_int16_t& autoJrnlExpandMaxFiles,
const std::string& autoJrnlExpandMaxFilesParamName,
const u_int16_t numJrnlFiles,
const std::string& numJrnlFilesParamName)
{
if (!opts->autoJrnlExpand) {
// auto-expand disabled
autoJrnlExpand = false;
autoJrnlExpandMaxFiles = 0;
return;
}
u_int16_t p = opts->autoJrnlExpandMaxFiles;
if (numJrnlFiles == JRNL_MAX_NUM_FILES) {
// num-jfiles at max; disable auto-expand
autoJrnlExpand = false;
autoJrnlExpandMaxFiles = 0;
QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName << " (" << p << ") must be higher than parameter "
<< numJrnlFilesParamName << " (" << numJrnlFiles << ") which is at the maximum allowable value; disabling auto-expand.");
return;
}
if (p > JRNL_MAX_NUM_FILES) {
// auto-expand-max-jfiles higher than max allowable, adjust
autoJrnlExpand = true;
autoJrnlExpandMaxFiles = JRNL_MAX_NUM_FILES;
QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName << " (" << p << ") is above allowable maximum ("
<< JRNL_MAX_NUM_FILES << "); changing this parameter to maximum value.");
return;
}
if (p && p == defAutoJrnlExpandMaxFiles && numJrnlFiles != defTplNumJrnlFiles) {
// num-jfiles is different from the default AND max-auto-expand-jfiles is still at default
// change value of max-auto-expand-jfiles
autoJrnlExpand = true;
if (2 * numJrnlFiles <= JRNL_MAX_NUM_FILES) {
autoJrnlExpandMaxFiles = 2 * numJrnlFiles <= JRNL_MAX_NUM_FILES ? 2 * numJrnlFiles : JRNL_MAX_NUM_FILES;
QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName << " adjusted from its default value ("
<< defAutoJrnlExpandMaxFiles << ") to twice that of parameter " << numJrnlFilesParamName << " (" << autoJrnlExpandMaxFiles << ").");
} else {
autoJrnlExpandMaxFiles = 2 * numJrnlFiles <= JRNL_MAX_NUM_FILES ? 2 * numJrnlFiles : JRNL_MAX_NUM_FILES;
QPID_LOG(warning, "parameter " << autoJrnlExpandMaxFilesParamName << " adjusted from its default to maximum allowable value ("
<< JRNL_MAX_NUM_FILES << ") because of the value of " << numJrnlFilesParamName << " (" << numJrnlFiles << ").");
}
return;
}
// No adjustments req'd, set values
autoJrnlExpand = true;
autoJrnlExpandMaxFiles = p;
}
void MessageStoreImpl::initManagement ()
{
if (broker != 0) {
agent = broker->getManagementAgent();
if (agent != 0) {
_qmf::Package packageInitializer(agent);
mgmtObject = _qmf::Store::shared_ptr (
new _qmf::Store(agent, this, broker));
mgmtObject->set_location(storeDir);
mgmtObject->set_defaultInitialFileCount(numJrnlFiles);
mgmtObject->set_defaultDataFileSize(jrnlFsizeSblks / JRNL_RMGR_PAGE_SIZE);
mgmtObject->set_tplIsInitialized(false);
mgmtObject->set_tplDirectory(getTplBaseDir());
mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
mgmtObject->set_tplWritePages(tplWCacheNumPages);
mgmtObject->set_tplInitialFileCount(tplNumJrnlFiles);
mgmtObject->set_tplDataFileSize(tplJrnlFsizeSblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
mgmtObject->set_tplCurrentFileCount(tplNumJrnlFiles);
agent->addObject(mgmtObject, 0, true);
// Initialize all existing queues (ie those recovered before management was initialized)
for (JournalListMapItr i=journalList.begin(); i!=journalList.end(); i++) {
i->second->initManagement(agent);
}
}
}
}
bool MessageStoreImpl::init(const qpid::Options* options)
{
// Extract and check options
const StoreOptions* opts = static_cast<const StoreOptions*>(options);
u_int16_t numJrnlFiles = chkJrnlNumFilesParam(opts->numJrnlFiles, "num-jfiles");
u_int32_t jrnlFsizePgs = chkJrnlFileSizeParam(opts->jrnlFsizePgs, "jfile-size-pgs");
u_int32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size", jrnlFsizePgs);
u_int16_t tplNumJrnlFiles = chkJrnlNumFilesParam(opts->tplNumJrnlFiles, "tpl-num-jfiles");
u_int32_t tplJrnlFSizePgs = chkJrnlFileSizeParam(opts->tplJrnlFsizePgs, "tpl-jfile-size-pgs");
u_int32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size", tplJrnlFSizePgs);
bool autoJrnlExpand;
u_int16_t autoJrnlExpandMaxFiles;
chkJrnlAutoExpandOptions(opts, autoJrnlExpand, autoJrnlExpandMaxFiles, "auto-expand-max-jfiles", numJrnlFiles, "num-jfiles");
// Pass option values to init(...)
return init(opts->storeDir, numJrnlFiles, jrnlFsizePgs, opts->truncateFlag, jrnlWrCachePageSizeKib, tplNumJrnlFiles, tplJrnlFSizePgs, tplJrnlWrCachePageSizeKib, autoJrnlExpand, autoJrnlExpandMaxFiles);
}
// These params, taken from options, are assumed to be correct and verified
bool MessageStoreImpl::init(const std::string& dir,
u_int16_t jfiles,
u_int32_t jfileSizePgs,
const bool truncateFlag,
u_int32_t wCachePageSizeKib,
u_int16_t tplJfiles,
u_int32_t tplJfileSizePgs,
u_int32_t tplWCachePageSizeKib,
bool autoJExpand,
u_int16_t autoJExpandMaxFiles)
{
if (isInit) return true;
// Set geometry members (converting to correct units where req'd)
numJrnlFiles = jfiles;
jrnlFsizeSblks = jfileSizePgs * JRNL_RMGR_PAGE_SIZE;
wCachePgSizeSblks = wCachePageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
wCacheNumPages = getJrnlWrNumPages(wCachePageSizeKib);
tplNumJrnlFiles = tplJfiles;
tplJrnlFsizeSblks = tplJfileSizePgs * JRNL_RMGR_PAGE_SIZE;
tplWCachePgSizeSblks = tplWCachePageSizeKib * 1024 / JRNL_DBLK_SIZE / JRNL_SBLK_SIZE; // convert from KiB to number sblks
tplWCacheNumPages = getJrnlWrNumPages(tplWCachePageSizeKib);
autoJrnlExpand = autoJExpand;
autoJrnlExpandMaxFiles = autoJExpandMaxFiles;
if (dir.size()>0) storeDir = dir;
if (truncateFlag)
truncateInit(false);
else
init();
QPID_LOG(notice, "Store module initialized; store-dir=" << dir);
QPID_LOG(info, "> Default files per journal: " << jfiles);
// TODO: Uncomment these lines when auto-expand is enabled.
// QPID_LOG(info, "> Auto-expand " << (autoJrnlExpand ? "enabled" : "disabled"));
// if (autoJrnlExpand) QPID_LOG(info, "> Max auto-expand journal files: " << autoJrnlExpandMaxFiles);
QPID_LOG(info, "> Default journal file size: " << jfileSizePgs << " (wpgs)");
QPID_LOG(info, "> Default write cache page size: " << wCachePageSizeKib << " (KiB)");
QPID_LOG(info, "> Default number of write cache pages: " << wCacheNumPages);
QPID_LOG(info, "> TPL files per journal: " << tplNumJrnlFiles);
QPID_LOG(info, "> TPL journal file size: " << tplJfileSizePgs << " (wpgs)");
QPID_LOG(info, "> TPL write cache page size: " << tplWCachePageSizeKib << " (KiB)");
QPID_LOG(info, "> TPL number of write cache pages: " << tplWCacheNumPages);
return isInit;
}
void MessageStoreImpl::init()
{
const int retryMax = 3;
int bdbRetryCnt = 0;
do {
if (bdbRetryCnt++ > 0)
{
closeDbs();
::usleep(1000000); // 1 sec delay
QPID_LOG(error, "Previoius BDB store initialization failed, retrying (" << bdbRetryCnt << " of " << retryMax << ")...");
}
try {
journal::jdir::create_dir(getBdbBaseDir());
dbenv.reset(new DbEnv(0));
dbenv->set_errpfx("msgstore");
dbenv->set_lg_regionmax(256000); // default = 65000
dbenv->open(getBdbBaseDir().c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON | DB_RECOVER, 0);
// Databases are constructed here instead of the constructor so that the DB_RECOVER flag can be used
// against the database environment. Recover can only be performed if no databases have been created
// against the environment at the time of recovery, as recovery invalidates the environment.
queueDb.reset(new Db(dbenv.get(), 0));
dbs.push_back(queueDb);
configDb.reset(new Db(dbenv.get(), 0));
dbs.push_back(configDb);
exchangeDb.reset(new Db(dbenv.get(), 0));
dbs.push_back(exchangeDb);
mappingDb.reset(new Db(dbenv.get(), 0));
dbs.push_back(mappingDb);
bindingDb.reset(new Db(dbenv.get(), 0));
dbs.push_back(bindingDb);
generalDb.reset(new Db(dbenv.get(), 0));
dbs.push_back(generalDb);
TxnCtxt txn;
txn.begin(dbenv.get(), false);
try {
open(queueDb, txn.get(), "queues.db", false);
open(configDb, txn.get(), "config.db", false);
open(exchangeDb, txn.get(), "exchanges.db", false);
open(mappingDb, txn.get(), "mappings.db", true);
open(bindingDb, txn.get(), "bindings.db", true);
open(generalDb, txn.get(), "general.db", false);
txn.commit();
} catch (...) { txn.abort(); throw; }
// NOTE: during normal initialization, agent == 0 because the store is initialized before the management infrastructure.
// However during a truncated initialization in a cluster, agent != 0. We always pass 0 as the agent for the
// TplStore to keep things consistent in a cluster. See https://bugzilla.redhat.com/show_bug.cgi?id=681026
tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, 0));
isInit = true;
} catch (const DbException& e) {
if (e.get_errno() == DB_VERSION_MISMATCH)
{
QPID_LOG(error, "Database environment mismatch: This version of db4 does not match that which created the store database.: " << e.what());
THROW_STORE_EXCEPTION_2("Database environment mismatch: This version of db4 does not match that which created the store database. "
"(If recovery is not important, delete the contents of the store directory. Otherwise, try upgrading the database using "
"db_upgrade or using db_recover - but the db4-utils package must also be installed to use these utilities.)", e);
}
QPID_LOG(error, "BDB exception occurred while initializing store: " << e.what());
if (bdbRetryCnt >= retryMax)
THROW_STORE_EXCEPTION_2("BDB exception occurred while initializing store", e);
} catch (const StoreException&) {
throw;
} catch (const journal::jexception& e) {
QPID_LOG(error, "Journal Exception occurred while initializing store: " << e);
THROW_STORE_EXCEPTION_2("Journal Exception occurred while initializing store", e.what());
} catch (...) {
QPID_LOG(error, "Unknown exception occurred while initializing store.");
throw;
}
} while (!isInit);
}
void MessageStoreImpl::finalize()
{
if (tplStorePtr.get() && tplStorePtr->is_ready()) tplStorePtr->stop(true);
{
qpid::sys::Mutex::ScopedLock sl(journalListLock);
for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++)
{
JournalImpl* jQueue = i->second;
jQueue->resetDeleteCallback();
if (jQueue->is_ready()) jQueue->stop(true);
}
}
if (mgmtObject.get() != 0) {
mgmtObject->resourceDestroy();
mgmtObject.reset();
}
}
void MessageStoreImpl::truncateInit(const bool saveStoreContent)
{
if (isInit) {
{
qpid::sys::Mutex::ScopedLock sl(journalListLock);
if (journalList.size()) { // check no queues exist
std::ostringstream oss;
oss << "truncateInit() called with " << journalList.size() << " queues still in existence";
THROW_STORE_EXCEPTION(oss.str());
}
}
closeDbs();
dbs.clear();
if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
dbenv->close(0);
isInit = false;
}
std::ostringstream oss;
oss << storeDir << "/" << storeTopLevelDir;
if (saveStoreContent) {
std::string dir = mrg::journal::jdir::push_down(storeDir, storeTopLevelDir, "cluster");
QPID_LOG(notice, "Store directory " << oss.str() << " was pushed down (saved) into directory " << dir << ".");
} else {
mrg::journal::jdir::delete_dir(oss.str().c_str());
QPID_LOG(notice, "Store directory " << oss.str() << " was truncated.");
}
init();
}
void MessageStoreImpl::chkTplStoreInit()
{
// Prevent multiple threads from late-initializing the TPL
qpid::sys::Mutex::ScopedLock sl(tplInitLock);
if (!tplStorePtr->is_ready()) {
journal::jdir::create_dir(getTplBaseDir());
tplStorePtr->initialize(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true);
}
}
void MessageStoreImpl::open(db_ptr db,
DbTxn* txn,
const char* file,
bool dupKey)
{
if(dupKey) db->set_flags(DB_DUPSORT);
db->open(txn, file, 0, DB_BTREE, DB_CREATE | DB_THREAD, 0);
}
void MessageStoreImpl::closeDbs()
{
for (std::list<db_ptr >::iterator i = dbs.begin(); i != dbs.end(); i++) {
(*i)->close(0);
}
dbs.clear();
}
MessageStoreImpl::~MessageStoreImpl()
{
finalize();
try {
closeDbs();
} catch (const DbException& e) {
QPID_LOG(error, "Error closing BDB databases: " << e.what());
} catch (const journal::jexception& e) {
QPID_LOG(error, "Error: " << e.what());
} catch (const std::exception& e) {
QPID_LOG(error, "Error: " << e.what());
} catch (...) {
QPID_LOG(error, "Unknown error in MessageStoreImpl::~MessageStoreImpl()");
}
if (mgmtObject.get() != 0) {
mgmtObject->resourceDestroy();
mgmtObject.reset();
}
}
void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue,
const qpid::framing::FieldTable& args)
{
checkInit();
if (queue.getPersistenceId()) {
THROW_STORE_EXCEPTION("Queue already created: " + queue.getName());
}
JournalImpl* jQueue = 0;
qpid::framing::FieldTable::ValuePtr value;
u_int16_t localFileCount = numJrnlFiles;
bool localAutoExpandFlag = autoJrnlExpand;
u_int16_t localAutoExpandMaxFileCount = autoJrnlExpandMaxFiles;
u_int32_t localFileSizeSblks = jrnlFsizeSblks;
value = args.get("qpid.file_count");
if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
localFileCount = chkJrnlNumFilesParam((u_int16_t) value->get<int>(), "qpid.file_count");
value = args.get("qpid.file_size");
if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
localFileSizeSblks = chkJrnlFileSizeParam((u_int32_t) value->get<int>(), "qpid.file_size", wCachePgSizeSblks) * JRNL_RMGR_PAGE_SIZE;
if (queue.getName().size() == 0)
{
QPID_LOG(error, "Cannot create store for empty (null) queue name - ignoring and attempting to continue.");
return;
}
jQueue = new JournalImpl(broker->getTimer(), queue.getName(), getJrnlDir(queue), std::string("JournalData"),
defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
{
qpid::sys::Mutex::ScopedLock sl(journalListLock);
journalList[queue.getName()]=jQueue;
}
value = args.get("qpid.auto_expand");
if (value.get() != 0 && !value->empty() && value->convertsTo<bool>())
localAutoExpandFlag = (bool) value->get<bool>();
value = args.get("qpid.auto_expand_max_jfiles");
if (value.get() != 0 && !value->empty() && value->convertsTo<int>())
localAutoExpandMaxFileCount = (u_int16_t) value->get<int>();
queue.setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue));
try {
// init will create the deque's for the init...
jQueue->initialize(localFileCount, localAutoExpandFlag, localAutoExpandMaxFileCount, localFileSizeSblks, wCacheNumPages, wCachePgSizeSblks);
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": create() failed: " + e.what());
}
try {
if (!create(queueDb, queueIdSequence, queue)) {
THROW_STORE_EXCEPTION("Queue already exists: " + queue.getName());
}
} catch (const DbException& e) {
THROW_STORE_EXCEPTION_2("Error creating queue named " + queue.getName(), e);
}
}
void MessageStoreImpl::destroy(qpid::broker::PersistableQueue& queue)
{
checkInit();
destroy(queueDb, queue);
deleteBindingsForQueue(queue);
qpid::broker::ExternalQueueStore* eqs = queue.getExternalQueueStore();
if (eqs) {
JournalImpl* jQueue = static_cast<JournalImpl*>(eqs);
jQueue->delete_jrnl_files();
queue.setExternalQueueStore(0); // will delete the journal if exists
{
qpid::sys::Mutex::ScopedLock sl(journalListLock);
journalList.erase(queue.getName());
}
}
}
void MessageStoreImpl::create(const qpid::broker::PersistableExchange& exchange,
const qpid::framing::FieldTable& /*args*/)
{
checkInit();
if (exchange.getPersistenceId()) {
THROW_STORE_EXCEPTION("Exchange already created: " + exchange.getName());
}
try {
if (!create(exchangeDb, exchangeIdSequence, exchange)) {
THROW_STORE_EXCEPTION("Exchange already exists: " + exchange.getName());
}
} catch (const DbException& e) {
THROW_STORE_EXCEPTION_2("Error creating exchange named " + exchange.getName(), e);
}
}
void MessageStoreImpl::destroy(const qpid::broker::PersistableExchange& exchange)
{
checkInit();
destroy(exchangeDb, exchange);
//need to also delete bindings
IdDbt key(exchange.getPersistenceId());
bindingDb->del(0, &key, DB_AUTO_COMMIT);
}
void MessageStoreImpl::create(const qpid::broker::PersistableConfig& general)
{
checkInit();
if (general.getPersistenceId()) {
THROW_STORE_EXCEPTION("General configuration item already created");
}
try {
if (!create(generalDb, generalIdSequence, general)) {
THROW_STORE_EXCEPTION("General configuration already exists");
}
} catch (const DbException& e) {
THROW_STORE_EXCEPTION_2("Error creating general configuration", e);
}
}
void MessageStoreImpl::destroy(const qpid::broker::PersistableConfig& general)
{
checkInit();
destroy(generalDb, general);
}
bool MessageStoreImpl::create(db_ptr db,
IdSequence& seq,
const qpid::broker::Persistable& p)
{
u_int64_t id (seq.next());
Dbt key(&id, sizeof(id));
BufferValue value (p);
int status;
TxnCtxt txn;
txn.begin(dbenv.get(), true);
try {
status = db->put(txn.get(), &key, &value, DB_NOOVERWRITE);
txn.commit();
} catch (...) {
txn.abort();
throw;
}
if (status == DB_KEYEXIST) {
return false;
} else {
p.setPersistenceId(id);
return true;
}
}
void MessageStoreImpl::destroy(db_ptr db, const qpid::broker::Persistable& p)
{
qpid::sys::Mutex::ScopedLock sl(bdbLock);
IdDbt key(p.getPersistenceId());
db->del(0, &key, DB_AUTO_COMMIT);
}
void MessageStoreImpl::bind(const qpid::broker::PersistableExchange& e,
const qpid::broker::PersistableQueue& q,
const std::string& k,
const qpid::framing::FieldTable& a)
{
checkInit();
IdDbt key(e.getPersistenceId());
BindingDbt value(e, q, k, a);
TxnCtxt txn;
txn.begin(dbenv.get(), true);
try {
put(bindingDb, txn.get(), key, value);
txn.commit();
} catch (...) {
txn.abort();
throw;
}
}
void MessageStoreImpl::unbind(const qpid::broker::PersistableExchange& e,
const qpid::broker::PersistableQueue& q,
const std::string& k,
const qpid::framing::FieldTable&)
{
checkInit();
deleteBinding(e, q, k);
}
void MessageStoreImpl::recover(qpid::broker::RecoveryManager& registry)
{
checkInit();
txn_list prepared;
recoverLockedMappings(prepared);
queue_index queues;//id->queue
exchange_index exchanges;//id->exchange
message_index messages;//id->message
TxnCtxt txn;
txn.begin(dbenv.get(), false);
try {
//read all queues, calls recoversMessages
recoverQueues(txn, registry, queues, prepared, messages);
//recover exchange & bindings:
recoverExchanges(txn, registry, exchanges);
recoverBindings(txn, exchanges, queues);
//recover general-purpose configuration
recoverGeneral(txn, registry);
txn.commit();
} catch (const DbException& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error on recovery", e);
} catch (...) {
txn.abort();
throw;
}
//recover transactions:
for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
const PreparedTransaction pt = *i;
if (mgmtObject.get() != 0) {
mgmtObject->inc_tplTransactionDepth();
mgmtObject->inc_tplTxnPrepares();
}
std::string xid = pt.xid;
// Restore data token state in TxnCtxt
TplRecoverMapCitr citr = tplRecoverMap.find(xid);
if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
// If a record is found that is dequeued but not committed/aborted from tplStore, then a complete() call
// was interrupted part way through committing/aborting the impacted queues. Complete this process.
bool incomplTplTxnFlag = citr->second.deq_flag;
if (citr->second.tpc_flag) {
// Dtx (2PC) transaction
TPCTxnCtxt* tpcc = new TPCTxnCtxt(xid, &messageIdSequence);
std::auto_ptr<qpid::broker::TPCTransactionContext> txn(tpcc);
tpcc->recoverDtok(citr->second.rid, xid);
tpcc->prepare(tplStorePtr.get());
qpid::broker::RecoverableTransaction::shared_ptr dtx;
if (!incomplTplTxnFlag) dtx = registry.recoverTransaction(xid, txn);
if (pt.enqueues.get()) {
for (LockedMappings::iterator j = pt.enqueues->begin(); j != pt.enqueues->end(); j++) {
tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
if (!incomplTplTxnFlag) dtx->enqueue(queues[j->first], messages[j->second]);
}
}
if (pt.dequeues.get()) {
for (LockedMappings::iterator j = pt.dequeues->begin(); j != pt.dequeues->end(); j++) {
tpcc->addXidRecord(queues[j->first]->getExternalQueueStore());
if (!incomplTplTxnFlag) dtx->dequeue(queues[j->first], messages[j->second]);
}
}
if (incomplTplTxnFlag) {
tpcc->complete(citr->second.commit_flag);
}
} else {
// Local (1PC) transaction
boost::shared_ptr<TxnCtxt> opcc(new TxnCtxt(xid, &messageIdSequence));
opcc->recoverDtok(citr->second.rid, xid);
opcc->prepare(tplStorePtr.get());
if (pt.enqueues.get()) {
for (LockedMappings::iterator j = pt.enqueues->begin(); j != pt.enqueues->end(); j++) {
opcc->addXidRecord(queues[j->first]->getExternalQueueStore());
}
}
if (pt.dequeues.get()) {
for (LockedMappings::iterator j = pt.dequeues->begin(); j != pt.dequeues->end(); j++) {
opcc->addXidRecord(queues[j->first]->getExternalQueueStore());
}
}
if (incomplTplTxnFlag) {
opcc->complete(citr->second.commit_flag);
} else {
completed(*opcc.get(), citr->second.commit_flag);
}
}
}
registry.recoveryComplete();
}
void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
qpid::broker::RecoveryManager& registry,
queue_index& queue_index,
txn_list& prepared,
message_index& messages)
{
Cursor queues;
queues.open(queueDb, txn.get());
u_int64_t maxQueueId(1);
IdDbt key;
Dbt value;
//read all queues
while (queues.next(key, value)) {
qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
//create a Queue instance
qpid::broker::RecoverableQueue::shared_ptr queue = registry.recoverQueue(buffer);
//set the persistenceId and update max as required
queue->setPersistenceId(key.id);
const std::string queueName = queue->getName().c_str();
JournalImpl* jQueue = 0;
if (queueName.size() == 0)
{
QPID_LOG(error, "Cannot recover empty (null) queue name - ignoring and attempting to continue.");
break;
}
jQueue = new JournalImpl(broker->getTimer(), queueName, getJrnlHashDir(queueName), std::string("JournalData"),
defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
{
qpid::sys::Mutex::ScopedLock sl(journalListLock);
journalList[queueName] = jQueue;
}
queue->setExternalQueueStore(dynamic_cast<qpid::broker::ExternalQueueStore*>(jQueue));
try
{
long rcnt = 0L; // recovered msg count
long idcnt = 0L; // in-doubt msg count
u_int64_t thisHighestRid = 0ULL;
jQueue->recover(numJrnlFiles, autoJrnlExpand, autoJrnlExpandMaxFiles, jrnlFsizeSblks, wCacheNumPages, wCachePgSizeSblks, &prepared, thisHighestRid, key.id); // start recovery
if (highestRid == 0ULL)
highestRid = thisHighestRid;
else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
highestRid = thisHighestRid;
recoverMessages(txn, registry, queue, prepared, messages, rcnt, idcnt);
QPID_LOG(info, "Recovered queue \"" << queueName << "\": " << rcnt << " messages recovered; " << idcnt << " messages in-doubt.");
jQueue->recover_complete(); // start journal.
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queueName + ": recoverQueues() failed: " + e.what());
}
//read all messages: done on a per queue basis if using Journal
queue_index[key.id] = queue;
maxQueueId = std::max(key.id, maxQueueId);
}
// NOTE: highestRid is set by both recoverQueues() and recoverTplStore() as
// the messageIdSequence is used for both queue journals and the tpl journal.
messageIdSequence.reset(highestRid + 1);
QPID_LOG(info, "Most recent persistence id found: 0x" << std::hex << highestRid << std::dec);
queueIdSequence.reset(maxQueueId + 1);
}
void MessageStoreImpl::recoverExchanges(TxnCtxt& txn,
qpid::broker::RecoveryManager& registry,
exchange_index& index)
{
//TODO: this is a copy&paste from recoverQueues - refactor!
Cursor exchanges;
exchanges.open(exchangeDb, txn.get());
u_int64_t maxExchangeId(1);
IdDbt key;
Dbt value;
//read all exchanges
while (exchanges.next(key, value)) {
qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
//create a Exchange instance
qpid::broker::RecoverableExchange::shared_ptr exchange = registry.recoverExchange(buffer);
if (exchange) {
//set the persistenceId and update max as required
exchange->setPersistenceId(key.id);
index[key.id] = exchange;
QPID_LOG(info, "Recovered exchange \"" << exchange->getName() << '"');
}
maxExchangeId = std::max(key.id, maxExchangeId);
}
exchangeIdSequence.reset(maxExchangeId + 1);
}
void MessageStoreImpl::recoverBindings(TxnCtxt& txn,
exchange_index& exchanges,
queue_index& queues)
{
Cursor bindings;
bindings.open(bindingDb, txn.get());
IdDbt key;
Dbt value;
while (bindings.next(key, value)) {
qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
if (buffer.available() < 8) {
QPID_LOG(error, "Not enough data for binding: " << buffer.available());
THROW_STORE_EXCEPTION("Not enough data for binding");
}
uint64_t queueId = buffer.getLongLong();
std::string queueName;
std::string routingkey;
qpid::framing::FieldTable args;
buffer.getShortString(queueName);
buffer.getShortString(routingkey);
buffer.get(args);
exchange_index::iterator exchange = exchanges.find(key.id);
queue_index::iterator queue = queues.find(queueId);
if (exchange != exchanges.end() && queue != queues.end()) {
//could use the recoverable queue here rather than the name...
exchange->second->bind(queueName, routingkey, args);
QPID_LOG(info, "Recovered binding exchange=" << exchange->second->getName()
<< " key=" << routingkey
<< " queue=" << queueName);
} else {
//stale binding, delete it
QPID_LOG(warning, "Deleting stale binding");
bindings->del(0);
}
}
}
void MessageStoreImpl::recoverGeneral(TxnCtxt& txn,
qpid::broker::RecoveryManager& registry)
{
Cursor items;
items.open(generalDb, txn.get());
u_int64_t maxGeneralId(1);
IdDbt key;
Dbt value;
//read all items
while (items.next(key, value)) {
qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
//create instance
qpid::broker::RecoverableConfig::shared_ptr config = registry.recoverConfig(buffer);
//set the persistenceId and update max as required
config->setPersistenceId(key.id);
maxGeneralId = std::max(key.id, maxGeneralId);
}
generalIdSequence.reset(maxGeneralId + 1);
}
void MessageStoreImpl::recoverMessages(TxnCtxt& /*txn*/,
qpid::broker::RecoveryManager& recovery,
qpid::broker::RecoverableQueue::shared_ptr& queue,
txn_list& prepared,
message_index& messages,
long& rcnt,
long& idcnt)
{
size_t preambleLength = sizeof(u_int32_t)/*header size*/;
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
DataTokenImpl dtok;
size_t readSize = 0;
unsigned msg_count = 0;
// TODO: This optimization to skip reading if there are no enqueued messages to read
// breaks the python system test in phase 6 with "Exception: Cannot write lock file"
// Figure out what is breaking.
//bool read = jc->get_enq_cnt() > 0;
bool read = true;
void* dbuff = NULL; size_t dbuffSize = 0;
void* xidbuff = NULL; size_t xidbuffSize = 0;
bool transientFlag = false;
bool externalFlag = false;
dtok.set_wstate(DataTokenImpl::ENQ);
// Read the message from the Journal.
try {
unsigned aio_sleep_cnt = 0;
while (read) {
mrg::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
readSize = dtok.dsize();
switch (res)
{
case mrg::journal::RHM_IORES_SUCCESS: {
msg_count++;
qpid::broker::RecoverableMessage::shared_ptr msg;
char* data = (char*)dbuff;
unsigned headerSize;
if (externalFlag) {
msg = getExternMessage(recovery, dtok.rid(), headerSize); // large message external to jrnl
} else {
headerSize = qpid::framing::Buffer(data, preambleLength).getLong();
qpid::framing::Buffer headerBuff(data+ preambleLength, headerSize); /// do we want read size or header size ????
msg = recovery.recoverMessage(headerBuff);
}
msg->setPersistenceId(dtok.rid());
// At some future point if delivery attempts are stored, then this call would
// become optional depending on that information.
msg->setRedelivered();
// Reset the TTL for the recovered message
msg->computeExpiration(broker->getExpiryPolicy());
u_int32_t contentOffset = headerSize + preambleLength;
u_int64_t contentSize = readSize - contentOffset;
if (msg->loadContent(contentSize) && !externalFlag) {
//now read the content
qpid::framing::Buffer contentBuff(data + contentOffset, contentSize);
msg->decodeContent(contentBuff);
}
PreparedTransaction::list::iterator i = PreparedTransaction::getLockedPreparedTransaction(prepared, queue->getPersistenceId(), dtok.rid());
if (i == prepared.end()) { // not in prepared list
rcnt++;
queue->recover(msg);
} else {
u_int64_t rid = dtok.rid();
std::string xid(i->xid);
TplRecoverMapCitr citr = tplRecoverMap.find(xid);
if (citr == tplRecoverMap.end()) THROW_STORE_EXCEPTION("XID not found in tplRecoverMap");
// deq present in prepared list: this xid is part of incomplete txn commit/abort
// or this is a 1PC txn that must be rolled forward
if (citr->second.deq_flag || !citr->second.tpc_flag) {
if (jc->is_enqueued(rid, true)) {
// Enqueue is non-tx, dequeue tx
assert(jc->is_locked(rid)); // This record MUST be locked by a txn dequeue
if (!citr->second.commit_flag) {
rcnt++;
queue->recover(msg); // recover message in abort case only
}
} else {
// Enqueue and/or dequeue tx
journal::txn_map& tmap = jc->get_txn_map();
journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
bool enq = false;
bool deq = false;
for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
if (j->_enq_flag && j->_rid == rid) enq = true;
else if (!j->_enq_flag && j->_drid == rid) deq = true;
}
if (enq && !deq && citr->second.commit_flag) {
rcnt++;
queue->recover(msg); // recover txn message in commit case only
}
}
} else {
idcnt++;
messages[rid] = msg;
}
}
dtok.reset();
dtok.set_wstate(DataTokenImpl::ENQ);
if (xidbuff)
::free(xidbuff);
else if (dbuff)
::free(dbuff);
aio_sleep_cnt = 0;
break;
}
case mrg::journal::RHM_IORES_PAGE_AIOWAIT:
if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
THROW_STORE_EXCEPTION("Timeout waiting for AIO in MessageStoreImpl::recoverMessages()");
::usleep(AIO_SLEEP_TIME_US);
break;
case mrg::journal::RHM_IORES_EMPTY:
read = false;
break; // done with all messages. (add call in jrnl to test that _emap is empty.)
default:
std::ostringstream oss;
oss << "recoverMessages(): Queue: " << queue->getName() << ": Unexpected return from journal read: " << mrg::journal::iores_str(res);
THROW_STORE_EXCEPTION(oss.str());
} // switch
} // while
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": recoverMessages() failed: " + e.what());
}
}
qpid::broker::RecoverableMessage::shared_ptr MessageStoreImpl::getExternMessage(qpid::broker::RecoveryManager& /*recovery*/,
uint64_t /*messageId*/,
unsigned& /*headerSize*/)
{
throw mrg::journal::jexception(mrg::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "getExternMessage");
}
int MessageStoreImpl::enqueueMessage(TxnCtxt& txn,
IdDbt& msgId,
qpid::broker::RecoverableMessage::shared_ptr& msg,
queue_index& index,
txn_list& prepared,
message_index& messages)
{
Cursor mappings;
mappings.open(mappingDb, txn.get());
IdDbt value;
int count(0);
for (int status = mappings->get(&msgId, &value, DB_SET); status == 0; status = mappings->get(&msgId, &value, DB_NEXT_DUP)) {
if (index.find(value.id) == index.end()) {
QPID_LOG(warning, "Recovered message for queue that no longer exists");
mappings->del(0);
} else {
qpid::broker::RecoverableQueue::shared_ptr queue = index[value.id];
if (PreparedTransaction::isLocked(prepared, value.id, msgId.id)) {
messages[msgId.id] = msg;
} else {
queue->recover(msg);
}
count++;
}
}
mappings.close();
return count;
}
void MessageStoreImpl::readTplStore()
{
tplRecoverMap.clear();
journal::txn_map& tmap = tplStorePtr->get_txn_map();
DataTokenImpl dtok;
void* dbuff = NULL; size_t dbuffSize = 0;
void* xidbuff = NULL; size_t xidbuffSize = 0;
bool transientFlag = false;
bool externalFlag = false;
bool done = false;
try {
unsigned aio_sleep_cnt = 0;
while (!done) {
dtok.reset();
dtok.set_wstate(DataTokenImpl::ENQ);
mrg::journal::iores res = tplStorePtr->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, externalFlag, &dtok);
switch (res) {
case mrg::journal::RHM_IORES_SUCCESS: {
// Every TPL record contains both data and an XID
assert(dbuffSize>0);
assert(xidbuffSize>0);
std::string xid(static_cast<const char*>(xidbuff), xidbuffSize);
bool is2PC = *(static_cast<char*>(dbuff)) != 0;
// Check transaction details; add to recover map
journal::txn_data_list txnList = tmap.get_tdata_list(xid); // txnList will be empty if xid not found
if (!txnList.empty()) { // xid found in tmap
unsigned enqCnt = 0;
unsigned deqCnt = 0;
u_int64_t rid = 0;
// Assume commit (roll forward) in cases where only prepare has been called - ie only enqueue record exists.
// Note: will apply to both 1PC and 2PC transactions.
bool commitFlag = true;
for (journal::tdl_itr j = txnList.begin(); j<txnList.end(); j++) {
if (j->_enq_flag) {
rid = j->_rid;
enqCnt++;
} else {
commitFlag = j->_commit_flag;
deqCnt++;
}
}
assert(enqCnt == 1);
assert(deqCnt <= 1);
tplRecoverMap.insert(TplRecoverMapPair(xid, TplRecoverStruct(rid, deqCnt == 1, commitFlag, is2PC)));
}
::free(xidbuff);
aio_sleep_cnt = 0;
break;
}
case mrg::journal::RHM_IORES_PAGE_AIOWAIT:
if (++aio_sleep_cnt > MAX_AIO_SLEEPS)
THROW_STORE_EXCEPTION("Timeout waiting for AIO in MessageStoreImpl::recoverTplStore()");
::usleep(AIO_SLEEP_TIME_US);
break;
case mrg::journal::RHM_IORES_EMPTY:
done = true;
break; // done with all messages. (add call in jrnl to test that _emap is empty.)
default:
std::ostringstream oss;
oss << "readTplStore(): Unexpected result from journal read: " << mrg::journal::iores_str(res);
THROW_STORE_EXCEPTION(oss.str());
} // switch
}
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("TPL recoverTplStore() failed: ") + e.what());
}
}
void MessageStoreImpl::recoverTplStore()
{
if (journal::jdir::exists(tplStorePtr->jrnl_dir() + tplStorePtr->base_filename() + ".jinf")) {
u_int64_t thisHighestRid = 0ULL;
tplStorePtr->recover(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCachePgSizeSblks, tplWCacheNumPages, 0, thisHighestRid, 0);
if (highestRid == 0ULL)
highestRid = thisHighestRid;
else if (thisHighestRid - highestRid < 0x8000000000000000ULL) // RFC 1982 comparison for unsigned 64-bit
highestRid = thisHighestRid;
// Load tplRecoverMap by reading the TPL store
readTplStore();
tplStorePtr->recover_complete(); // start journal.
}
}
void MessageStoreImpl::recoverLockedMappings(txn_list& txns)
{
if (!tplStorePtr->is_ready())
recoverTplStore();
// Abort unprepared xids and populate the locked maps
for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
LockedMappings::shared_ptr enq_ptr;
enq_ptr.reset(new LockedMappings);
LockedMappings::shared_ptr deq_ptr;
deq_ptr.reset(new LockedMappings);
txns.push_back(new PreparedTransaction(i->first, enq_ptr, deq_ptr));
}
}
void MessageStoreImpl::collectPreparedXids(std::set<std::string>& xids)
{
if (tplStorePtr->is_ready()) {
tplStorePtr->read_reset();
readTplStore();
} else {
recoverTplStore();
}
for (TplRecoverMapCitr i = tplRecoverMap.begin(); i != tplRecoverMap.end(); i++) {
// Discard all txns that are to be rolled forward/back and 1PC transactions
if (!i->second.deq_flag && i->second.tpc_flag)
xids.insert(i->first);
}
}
void MessageStoreImpl::stage(const boost::intrusive_ptr<qpid::broker::PersistableMessage>& /*msg*/)
{
throw mrg::journal::jexception(mrg::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "stage");
}
void MessageStoreImpl::destroy(qpid::broker::PersistableMessage& /*msg*/)
{
throw mrg::journal::jexception(mrg::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "destroy");
}
void MessageStoreImpl::appendContent(const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& /*msg*/,
const std::string& /*data*/)
{
throw mrg::journal::jexception(mrg::journal::jerrno::JERR__NOTIMPL, "MessageStoreImpl", "appendContent");
}
void MessageStoreImpl::loadContent(const qpid::broker::PersistableQueue& queue,
const boost::intrusive_ptr<const qpid::broker::PersistableMessage>& msg,
std::string& data,
u_int64_t offset,
u_int32_t length)
{
checkInit();
u_int64_t messageId (msg->getPersistenceId());
if (messageId != 0) {
try {
JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
if (jc && jc->is_enqueued(messageId) ) {
if (!jc->loadMsgContent(messageId, data, length, offset)) {
std::ostringstream oss;
oss << "Queue " << queue.getName() << ": loadContent() failed: Message " << messageId << " is extern";
THROW_STORE_EXCEPTION(oss.str());
}
} else {
std::ostringstream oss;
oss << "Queue " << queue.getName() << ": loadContent() failed: Message " << messageId << " not enqueued";
THROW_STORE_EXCEPTION(oss.str());
}
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": loadContent() failed: " + e.what());
}
} else {
THROW_STORE_EXCEPTION("Cannot load content. Message not known to store!");
}
}
void MessageStoreImpl::flush(const qpid::broker::PersistableQueue& queue)
{
if (queue.getExternalQueueStore() == 0) return;
checkInit();
std::string qn = queue.getName();
try {
JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
if (jc) {
// TODO: check if this result should be used...
/*mrg::journal::iores res =*/ jc->flush();
}
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + qn + ": flush() failed: " + e.what() );
}
}
void MessageStoreImpl::enqueue(qpid::broker::TransactionContext* ctxt,
const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
const qpid::broker::PersistableQueue& queue)
{
checkInit();
u_int64_t queueId (queue.getPersistenceId());
u_int64_t messageId (msg->getPersistenceId());
if (queueId == 0) {
THROW_STORE_EXCEPTION("Queue not created: " + queue.getName());
}
TxnCtxt implicit;
TxnCtxt* txn = 0;
if (ctxt) {
txn = check(ctxt);
} else {
txn = &implicit;
}
bool newId = false;
if (messageId == 0) {
messageId = messageIdSequence.next();
msg->setPersistenceId(messageId);
newId = true;
}
store(&queue, txn, msg, newId);
// add queue* to the txn map..
if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
}
u_int64_t MessageStoreImpl::msgEncode(std::vector<char>& buff, const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message)
{
u_int32_t headerSize = message->encodedHeaderSize();
u_int64_t size = message->encodedSize() + sizeof(u_int32_t);
try { buff = std::vector<char>(size); } // long + headers + content
catch (const std::exception& e) {
std::ostringstream oss;
oss << "Unable to allocate memory for encoding message; requested size: " << size << "; error: " << e.what();
THROW_STORE_EXCEPTION(oss.str());
}
qpid::framing::Buffer buffer(&buff[0],size);
buffer.putLong(headerSize);
message->encode(buffer);
return size;
}
void MessageStoreImpl::store(const qpid::broker::PersistableQueue* queue,
TxnCtxt* txn,
const boost::intrusive_ptr<qpid::broker::PersistableMessage>& message,
bool /*newId*/)
{
std::vector<char> buff;
u_int64_t size = msgEncode(buff, message);
try {
if (queue) {
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->addRef();
dtokp->setSourceMessage(message);
dtokp->set_external_rid(true);
dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
if (txn->getXid().empty()) {
jc->enqueue_data_record(&buff[0], size, size, dtokp.get(), !message->isPersistent());
} else {
jc->enqueue_txn_data_record(&buff[0], size, size, dtokp.get(), txn->getXid(), !message->isPersistent());
}
} else {
THROW_STORE_EXCEPTION(std::string("MessageStoreImpl::store() failed: queue NULL."));
}
} catch (const journal::jexception& e) {
THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": MessageStoreImpl::store() failed: " +
e.what());
}
}
void MessageStoreImpl::dequeue(qpid::broker::TransactionContext* ctxt,
const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
const qpid::broker::PersistableQueue& queue)
{
checkInit();
u_int64_t queueId (queue.getPersistenceId());
u_int64_t messageId (msg->getPersistenceId());
if (queueId == 0) {
THROW_STORE_EXCEPTION("Queue \"" + queue.getName() + "\" has null queue Id (has not been created)");
}
if (messageId == 0) {
THROW_STORE_EXCEPTION("Queue \"" + queue.getName() + "\": Dequeuing message with null persistence Id.");
}
TxnCtxt implicit;
TxnCtxt* txn = 0;
if (ctxt) {
txn = check(ctxt);
} else {
txn = &implicit;
}
// add queue* to the txn map..
if (ctxt) txn->addXidRecord(queue.getExternalQueueStore());
async_dequeue(ctxt, msg, queue);
msg->dequeueComplete();
}
void MessageStoreImpl::async_dequeue(qpid::broker::TransactionContext* ctxt,
const boost::intrusive_ptr<qpid::broker::PersistableMessage>& msg,
const qpid::broker::PersistableQueue& queue)
{
boost::intrusive_ptr<DataTokenImpl> ddtokp(new DataTokenImpl);
ddtokp->setSourceMessage(msg);
ddtokp->set_external_rid(true);
ddtokp->set_rid(messageIdSequence.next());
ddtokp->set_dequeue_rid(msg->getPersistenceId());
ddtokp->set_wstate(DataTokenImpl::ENQ);
std::string tid;
if (ctxt) {
TxnCtxt* txn = check(ctxt);
tid = txn->getXid();
}
// Manually increase the ref count, as raw pointers are used beyond this point
ddtokp->addRef();
try {
JournalImpl* jc = static_cast<JournalImpl*>(queue.getExternalQueueStore());
if (tid.empty()) {
jc->dequeue_data_record(ddtokp.get());
} else {
jc->dequeue_txn_data_record(ddtokp.get(), tid);
}
} catch (const journal::jexception& e) {
ddtokp->release();
THROW_STORE_EXCEPTION(std::string("Queue ") + queue.getName() + ": async_dequeue() failed: " + e.what());
}
}
u_int32_t MessageStoreImpl::outstandingQueueAIO(const qpid::broker::PersistableQueue& /*queue*/)
{
checkInit();
return 0;
}
void MessageStoreImpl::completed(TxnCtxt& txn,
bool commit)
{
try {
chkTplStoreInit(); // Late initialize (if needed)
// Nothing to do if not prepared
if (txn.getDtok()->is_enqueued()) {
txn.incrDtokRef();
DataTokenImpl* dtokp = txn.getDtok();
dtokp->set_dequeue_rid(dtokp->rid());
dtokp->set_rid(messageIdSequence.next());
tplStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit);
}
txn.complete(commit);
if (mgmtObject.get() != 0) {
mgmtObject->dec_tplTransactionDepth();
if (commit)
mgmtObject->inc_tplTxnCommits();
else
mgmtObject->inc_tplTxnAborts();
}
} catch (const std::exception& e) {
QPID_LOG(error, "Error completing xid " << txn.getXid() << ": " << e.what());
throw;
}
}
std::auto_ptr<qpid::broker::TransactionContext> MessageStoreImpl::begin()
{
checkInit();
// pass sequence number for c/a
return std::auto_ptr<qpid::broker::TransactionContext>(new TxnCtxt(&messageIdSequence));
}
std::auto_ptr<qpid::broker::TPCTransactionContext> MessageStoreImpl::begin(const std::string& xid)
{
checkInit();
IdSequence* jtx = &messageIdSequence;
// pass sequence number for c/a
return std::auto_ptr<qpid::broker::TPCTransactionContext>(new TPCTxnCtxt(xid, jtx));
}
void MessageStoreImpl::prepare(qpid::broker::TPCTransactionContext& ctxt)
{
checkInit();
TxnCtxt* txn = dynamic_cast<TxnCtxt*>(&ctxt);
if(!txn) throw qpid::broker::InvalidTransactionContextException();
localPrepare(txn);
}
void MessageStoreImpl::localPrepare(TxnCtxt* ctxt)
{
try {
chkTplStoreInit(); // Late initialize (if needed)
// This sync is required to ensure multi-queue atomicity - ie all txn data
// must hit the disk on *all* queues before the TPL prepare (enq) is written.
ctxt->sync();
ctxt->incrDtokRef();
DataTokenImpl* dtokp = ctxt->getDtok();
dtokp->set_external_rid(true);
dtokp->set_rid(messageIdSequence.next());
char tpcFlag = static_cast<char>(ctxt->isTPC());
tplStorePtr->enqueue_txn_data_record(&tpcFlag, sizeof(char), sizeof(char), dtokp, ctxt->getXid(), false);
ctxt->prepare(tplStorePtr.get());
// make sure all the data is written to disk before returning
ctxt->sync();
if (mgmtObject.get() != 0) {
mgmtObject->inc_tplTransactionDepth();
mgmtObject->inc_tplTxnPrepares();
}
} catch (const std::exception& e) {
QPID_LOG(error, "Error preparing xid " << ctxt->getXid() << ": " << e.what());
throw;
}
}
void MessageStoreImpl::commit(qpid::broker::TransactionContext& ctxt)
{
checkInit();
TxnCtxt* txn(check(&ctxt));
if (!txn->isTPC()) {
if (txn->impactedQueuesEmpty()) return;
localPrepare(dynamic_cast<TxnCtxt*>(txn));
}
completed(*dynamic_cast<TxnCtxt*>(txn), true);
}
void MessageStoreImpl::abort(qpid::broker::TransactionContext& ctxt)
{
checkInit();
TxnCtxt* txn(check(&ctxt));
if (!txn->isTPC()) {
if (txn->impactedQueuesEmpty()) return;
localPrepare(dynamic_cast<TxnCtxt*>(txn));
}
completed(*dynamic_cast<TxnCtxt*>(txn), false);
}
TxnCtxt* MessageStoreImpl::check(qpid::broker::TransactionContext* ctxt)
{
TxnCtxt* txn = dynamic_cast<TxnCtxt*>(ctxt);
if(!txn) throw qpid::broker::InvalidTransactionContextException();
return txn;
}
void MessageStoreImpl::put(db_ptr db,
DbTxn* txn,
Dbt& key,
Dbt& value)
{
try {
int status = db->put(txn, &key, &value, DB_NODUPDATA);
if (status == DB_KEYEXIST) {
THROW_STORE_EXCEPTION("duplicate data");
} else if (status) {
THROW_STORE_EXCEPTION(DbEnv::strerror(status));
}
} catch (const DbException& e) {
THROW_STORE_EXCEPTION(e.what());
}
}
void MessageStoreImpl::deleteBindingsForQueue(const qpid::broker::PersistableQueue& queue)
{
TxnCtxt txn;
txn.begin(dbenv.get(), true);
try {
{
Cursor bindings;
bindings.open(bindingDb, txn.get());
IdDbt key;
Dbt value;
while (bindings.next(key, value)) {
qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
if (buffer.available() < 8) {
THROW_STORE_EXCEPTION("Not enough data for binding");
}
uint64_t queueId = buffer.getLongLong();
if (queue.getPersistenceId() == queueId) {
bindings->del(0);
QPID_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId);
}
}
}
txn.commit();
} catch (const std::exception& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error deleting bindings", e.what());
} catch (...) {
txn.abort();
throw;
}
QPID_LOG(debug, "Deleted all bindings for " << queue.getName() << ":" << queue.getPersistenceId());
}
void MessageStoreImpl::deleteBinding(const qpid::broker::PersistableExchange& exchange,
const qpid::broker::PersistableQueue& queue,
const std::string& bkey)
{
TxnCtxt txn;
txn.begin(dbenv.get(), true);
try {
{
Cursor bindings;
bindings.open(bindingDb, txn.get());
IdDbt key(exchange.getPersistenceId());
Dbt value;
for (int status = bindings->get(&key, &value, DB_SET); status == 0; status = bindings->get(&key, &value, DB_NEXT_DUP)) {
qpid::framing::Buffer buffer(reinterpret_cast<char*>(value.get_data()), value.get_size());
if (buffer.available() < 8) {
THROW_STORE_EXCEPTION("Not enough data for binding");
}
uint64_t queueId = buffer.getLongLong();
if (queue.getPersistenceId() == queueId) {
std::string q;
std::string k;
buffer.getShortString(q);
buffer.getShortString(k);
if (bkey == k) {
bindings->del(0);
QPID_LOG(debug, "Deleting binding for " << queue.getName() << " " << key.id << "->" << queueId);
}
}
}
}
txn.commit();
} catch (const std::exception& e) {
txn.abort();
THROW_STORE_EXCEPTION_2("Error deleting bindings", e.what());
} catch (...) {
txn.abort();
throw;
}
}
std::string MessageStoreImpl::getJrnlBaseDir()
{
std::ostringstream dir;
dir << storeDir << "/" << storeTopLevelDir << "/jrnl/" ;
return dir.str();
}
std::string MessageStoreImpl::getBdbBaseDir()
{
std::ostringstream dir;
dir << storeDir << "/" << storeTopLevelDir << "/dat/" ;
return dir.str();
}
std::string MessageStoreImpl::getTplBaseDir()
{
std::ostringstream dir;
dir << storeDir << "/" << storeTopLevelDir << "/tpl/" ;
return dir.str();
}
std::string MessageStoreImpl::getJrnlDir(const qpid::broker::PersistableQueue& queue) //for exmaple /var/rhm/ + queueDir/
{
return getJrnlHashDir(queue.getName().c_str());
}
u_int32_t MessageStoreImpl::bHash(const std::string str)
{
// Daniel Bernstein hash fn
u_int32_t h = 0;
for (std::string::const_iterator i = str.begin(); i < str.end(); i++)
h = 33*h + *i;
return h;
}
std::string MessageStoreImpl::getJrnlHashDir(const std::string& queueName) //for exmaple /var/rhm/ + queueDir/
{
std::stringstream dir;
dir << getJrnlBaseDir() << std::hex << std::setfill('0') << std::setw(4);
dir << (bHash(queueName.c_str()) % 29); // Use a prime number for better distribution across dirs
dir << "/" << queueName << "/";
return dir.str();
}
std::string MessageStoreImpl::getStoreDir() const { return storeDir; }
void MessageStoreImpl::journalDeleted(JournalImpl& j) {
qpid::sys::Mutex::ScopedLock sl(journalListLock);
journalList.erase(j.id());
}
MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) :
qpid::Options(name),
numJrnlFiles(defNumJrnlFiles),
autoJrnlExpand(defAutoJrnlExpand),
autoJrnlExpandMaxFiles(defAutoJrnlExpandMaxFiles),
jrnlFsizePgs(defJrnlFileSizePgs),
truncateFlag(defTruncateFlag),
wCachePageSizeKib(defWCachePageSize),
tplNumJrnlFiles(defTplNumJrnlFiles),
tplJrnlFsizePgs(defTplJrnlFileSizePgs),
tplWCachePageSizeKib(defTplWCachePageSize)
{
std::ostringstream oss1;
oss1 << "Default number of files for each journal instance (queue). [Allowable values: " <<
JRNL_MIN_NUM_FILES << " - " << JRNL_MAX_NUM_FILES << "]";
std::ostringstream oss2;
oss2 << "Default size for each journal file in multiples of read pages (1 read page = 64KiB). [Allowable values: " <<
JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << " - " << JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << "]";
std::ostringstream oss3;
oss3 << "Number of files for transaction prepared list journal instance. [Allowable values: " <<
JRNL_MIN_NUM_FILES << " - " << JRNL_MAX_NUM_FILES << "]";
std::ostringstream oss4;
oss4 << "Size of each transaction prepared list journal file in multiples of read pages (1 read page = 64KiB) [Allowable values: " <<
JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << " - " << JRNL_MAX_FILE_SIZE / JRNL_RMGR_PAGE_SIZE << "]";
addOptions()
("store-dir", qpid::optValue(storeDir, "DIR"),
"Store directory location for persistence (instead of using --data-dir value). "
"Required if --no-data-dir is also used.")
("num-jfiles", qpid::optValue(numJrnlFiles, "N"), oss1.str().c_str())
("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"), oss2.str().c_str())
// TODO: Uncomment these lines when auto-expand is enabled.
// ("auto-expand", qpid::optValue(autoJrnlExpand, "yes|no"),
// "If yes|true|1, allows journal to auto-expand by adding additional journal files as needed. "
// "If no|false|0, the number of journal files will remain fixed (num-jfiles).")
// ("max-auto-expand-jfiles", qpid::optValue(autoJrnlExpandMaxFiles, "N"),
// "Maximum number of journal files allowed from auto-expanding; must be greater than --num-jfiles parameter.")
("truncate", qpid::optValue(truncateFlag, "yes|no"),
"If yes|true|1, will truncate the store (discard any existing records). If no|false|0, will preserve "
"the existing store files for recovery.")
("wcache-page-size", qpid::optValue(wCachePageSizeKib, "N"),
"Size of the pages in the write page cache in KiB. "
"Allowable values - powers of 2: 1, 2, 4, ... , 128. "
"Lower values decrease latency at the expense of throughput.")
("tpl-num-jfiles", qpid::optValue(tplNumJrnlFiles, "N"), oss3.str().c_str())
("tpl-jfile-size-pgs", qpid::optValue(tplJrnlFsizePgs, "N"), oss4.str().c_str())
("tpl-wcache-page-size", qpid::optValue(tplWCachePageSizeKib, "N"),
"Size of the pages in the transaction prepared list write page cache in KiB. "
"Allowable values - powers of 2: 1, 2, 4, ... , 128. "
"Lower values decrease latency at the expense of throughput.")
;
}
}}