blob: ba3f2aecae553ea43ec790e859515b59a3e9c0f4 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/legacystore/JournalImpl.h"
#include "qpid/legacystore/jrnl/jerrno.h"
#include "qpid/legacystore/jrnl/jexception.h"
#include "qpid/log/Statement.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/legacystore/ArgsJournalExpand.h"
#include "qmf/org/apache/qpid/legacystore/EventCreated.h"
#include "qmf/org/apache/qpid/legacystore/EventEnqThresholdExceeded.h"
#include "qmf/org/apache/qpid/legacystore/EventFull.h"
#include "qmf/org/apache/qpid/legacystore/EventRecovered.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Timer.h"
#include "qpid/legacystore/StoreException.h"
using namespace mrg::msgstore;
using namespace mrg::journal;
using qpid::management::ManagementAgent;
namespace _qmf = qmf::org::apache::qpid::legacystore;
InactivityFireEvent::InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
qpid::sys::TimerTask(timeout, "JournalInactive:"+p->id()), _parent(p) {}
void InactivityFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); if (_parent) _parent->flushFire(); }
GetEventsFireEvent::GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
qpid::sys::TimerTask(timeout, "JournalGetEvents:"+p->id()), _parent(p) {}
void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); if (_parent) _parent->getEventsFire(); }
JournalImpl::JournalImpl(qpid::sys::Timer& timer_,
const std::string& journalId,
const std::string& journalDirectory,
const std::string& journalBaseFilename,
const qpid::sys::Duration getEventsTimeout,
const qpid::sys::Duration flushTimeout,
qpid::management::ManagementAgent* a,
DeleteCallback onDelete):
jcntl(journalId, journalDirectory, journalBaseFilename),
timer(timer_),
getEventsTimerSetFlag(false),
lastReadRid(0),
writeActivityFlag(false),
flushTriggeredFlag(true),
_xidp(0),
_datap(0),
_dlen(0),
_dtok(),
_external(false),
deleteCallback(onDelete)
{
getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
{
timer.start();
timer.add(inactivityFireEventPtr);
}
initManagement(a);
log(LOG_NOTICE, "Created");
std::ostringstream oss;
oss << "Journal directory = \"" << journalDirectory << "\"; Base file name = \"" << journalBaseFilename << "\"";
log(LOG_DEBUG, oss.str());
}
JournalImpl::~JournalImpl()
{
if (deleteCallback) deleteCallback(*this);
if (_init_flag && !_stop_flag){
try { stop(true); } // NOTE: This will *block* until all outstanding disk aio calls are complete!
catch (const jexception& e) { log(LOG_ERROR, e.what()); }
}
getEventsFireEventsPtr->cancel();
inactivityFireEventPtr->cancel();
free_read_buffers();
if (_mgmtObject.get() != 0) {
_mgmtObject->resourceDestroy();
_mgmtObject.reset();
}
log(LOG_NOTICE, "Destroyed");
}
void
JournalImpl::initManagement(qpid::management::ManagementAgent* a)
{
_agent = a;
if (_agent != 0)
{
_mgmtObject = _qmf::Journal::shared_ptr (
new _qmf::Journal(_agent, this));
_mgmtObject->set_name(_jid);
_mgmtObject->set_directory(_jdir.dirname());
_mgmtObject->set_baseFileName(_base_filename);
_mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
_mgmtObject->set_readPages(JRNL_RMGR_PAGES);
// The following will be set on initialize(), but being properties, these must be set to 0 in the meantime
_mgmtObject->set_initialFileCount(0);
_mgmtObject->set_dataFileSize(0);
_mgmtObject->set_currentFileCount(0);
_mgmtObject->set_writePageSize(0);
_mgmtObject->set_writePages(0);
_agent->addObject(_mgmtObject, 0, true);
}
}
void
JournalImpl::initialize(const u_int16_t num_jfiles,
const bool auto_expand,
const u_int16_t ae_max_jfiles,
const u_int32_t jfsize_sblks,
const u_int16_t wcache_num_pages,
const u_int32_t wcache_pgsize_sblks,
mrg::journal::aio_callback* const cbp)
{
std::ostringstream oss;
oss << "Initialize; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks;
oss << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
oss << " wcache_num_pages=" << wcache_num_pages;
log(LOG_DEBUG, oss.str());
jcntl::initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, cbp);
log(LOG_DEBUG, "Initialization complete");
if (_mgmtObject.get() != 0)
{
_mgmtObject->set_initialFileCount(_lpmgr.num_jfiles());
_mgmtObject->set_autoExpand(_lpmgr.is_ae());
_mgmtObject->set_currentFileCount(_lpmgr.num_jfiles());
_mgmtObject->set_maxFileCount(_lpmgr.ae_max_jfiles());
_mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
_mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
_mgmtObject->set_writePages(wcache_num_pages);
}
if (_agent != 0)
_agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventCreated(_jid, _jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE, _lpmgr.num_jfiles()),
qpid::management::ManagementAgent::SEV_NOTE);
}
void
JournalImpl::recover(const u_int16_t num_jfiles,
const bool auto_expand,
const u_int16_t ae_max_jfiles,
const u_int32_t jfsize_sblks,
const u_int16_t wcache_num_pages,
const u_int32_t wcache_pgsize_sblks,
mrg::journal::aio_callback* const cbp,
boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr,
u_int64_t& highest_rid,
u_int64_t queue_id)
{
std::ostringstream oss1;
oss1 << "Recover; num_jfiles=" << num_jfiles << " jfsize_sblks=" << jfsize_sblks;
oss1 << " queue_id = 0x" << std::hex << queue_id << std::dec;
oss1 << " wcache_pgsize_sblks=" << wcache_pgsize_sblks;
oss1 << " wcache_num_pages=" << wcache_num_pages;
log(LOG_DEBUG, oss1.str());
if (_mgmtObject.get() != 0)
{
_mgmtObject->set_initialFileCount(_lpmgr.num_jfiles());
_mgmtObject->set_autoExpand(_lpmgr.is_ae());
_mgmtObject->set_currentFileCount(_lpmgr.num_jfiles());
_mgmtObject->set_maxFileCount(_lpmgr.ae_max_jfiles());
_mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
_mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE);
_mgmtObject->set_writePages(wcache_num_pages);
}
if (prep_tx_list_ptr) {
// Create list of prepared xids
std::vector<std::string> prep_xid_list;
for (msgstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
prep_xid_list.push_back(i->xid);
}
jcntl::recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
cbp, &prep_xid_list, highest_rid);
} else {
jcntl::recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
cbp, 0, highest_rid);
}
// Populate PreparedTransaction lists from _tmap
if (prep_tx_list_ptr)
{
for (msgstore::PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) {
txn_data_list tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found
for (tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
if (tdl_itr->_enq_flag) { // enqueue op
i->enqueues->add(queue_id, tdl_itr->_rid);
} else { // dequeue op
i->dequeues->add(queue_id, tdl_itr->_drid);
}
}
}
}
std::ostringstream oss2;
oss2 << "Recover phase 1 complete; highest rid found = 0x" << std::hex << highest_rid;
oss2 << std::dec << "; emap.size=" << _emap.size() << "; tmap.size=" << _tmap.size();
oss2 << "; journal now read-only.";
log(LOG_DEBUG, oss2.str());
if (_mgmtObject.get() != 0)
{
_mgmtObject->inc_recordDepth(_emap.size());
_mgmtObject->inc_enqueues(_emap.size());
_mgmtObject->inc_txn(_tmap.size());
_mgmtObject->inc_txnEnqueues(_tmap.enq_cnt());
_mgmtObject->inc_txnDequeues(_tmap.deq_cnt());
}
}
void
JournalImpl::recover_complete()
{
jcntl::recover_complete();
log(LOG_DEBUG, "Recover phase 2 complete; journal now writable.");
if (_agent != 0)
_agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventRecovered(_jid, _jfsize_sblks * JRNL_SBLK_SIZE * JRNL_DBLK_SIZE, _lpmgr.num_jfiles(),
_emap.size(), _tmap.size(), _tmap.enq_cnt(), _tmap.deq_cnt()), qpid::management::ManagementAgent::SEV_NOTE);
}
//#define MAX_AIO_SLEEPS 1000000 // tot: ~10 sec
//#define AIO_SLEEP_TIME_US 10 // 0.01 ms
// Return true if content is recovered from store; false if content is external and must be recovered from an external store.
// Throw exception for all errors.
bool
JournalImpl::loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset)
{
qpid::sys::Mutex::ScopedLock sl(_read_lock);
if (_dtok.rid() != rid)
{
// Free any previous msg
free_read_buffers();
// Last read encountered out-of-order rids, check if this rid is in that list
bool oooFlag = false;
for (std::vector<u_int64_t>::const_iterator i=oooRidList.begin(); i!=oooRidList.end() && !oooFlag; i++) {
if (*i == rid) {
oooFlag = true;
}
}
// TODO: This is a brutal approach - very inefficient and slow. Rather introduce a system of remembering
// jumpover points and allow the read to jump back to the first known jumpover point - but this needs
// a mechanism in rrfc to accomplish it. Also helpful is a struct containing a journal address - a
// combination of lid/offset.
// NOTE: The second part of the if stmt (rid < lastReadRid) is required to handle browsing.
if (oooFlag || rid < lastReadRid) {
_rmgr.invalidate();
oooRidList.clear();
}
_dlen = 0;
_dtok.reset();
_dtok.set_wstate(DataTokenImpl::ENQ);
_dtok.set_rid(0);
_external = false;
size_t xlen = 0;
bool transient = false;
bool done = false;
bool rid_found = false;
while (!done) {
iores res = read_data_record(&_datap, _dlen, &_xidp, xlen, transient, _external, &_dtok);
switch (res) {
case mrg::journal::RHM_IORES_SUCCESS:
if (_dtok.rid() != rid) {
// Check if this is an out-of-order rid that may impact next read
if (_dtok.rid() > rid)
oooRidList.push_back(_dtok.rid());
free_read_buffers();
// Reset data token for next read
_dlen = 0;
_dtok.reset();
_dtok.set_wstate(DataTokenImpl::ENQ);
_dtok.set_rid(0);
} else {
rid_found = _dtok.rid() == rid;
lastReadRid = rid;
done = true;
}
break;
case mrg::journal::RHM_IORES_PAGE_AIOWAIT:
if (get_wr_events(&_aio_cmpl_timeout) == journal::jerrno::AIO_TIMEOUT) {
std::stringstream ss;
ss << "read_data_record() returned " << mrg::journal::iores_str(res);
ss << "; timed out waiting for page to be processed.";
throw jexception(mrg::journal::jerrno::JERR__TIMEOUT, ss.str().c_str(), "JournalImpl",
"loadMsgContent");
}
break;
default:
std::stringstream ss;
ss << "read_data_record() returned " << mrg::journal::iores_str(res);
throw jexception(mrg::journal::jerrno::JERR__UNEXPRESPONSE, ss.str().c_str(), "JournalImpl",
"loadMsgContent");
}
}
if (!rid_found) {
std::stringstream ss;
ss << "read_data_record() was unable to find rid 0x" << std::hex << rid << std::dec;
ss << " (" << rid << "); last rid found was 0x" << std::hex << _dtok.rid() << std::dec;
ss << " (" << _dtok.rid() << ")";
throw jexception(mrg::journal::jerrno::JERR__RECNFOUND, ss.str().c_str(), "JournalImpl", "loadMsgContent");
}
}
if (_external) return false;
u_int32_t hdr_offs = qpid::framing::Buffer(static_cast<char*>(_datap), sizeof(u_int32_t)).getLong() + sizeof(u_int32_t);
if (hdr_offs + offset + length > _dlen) {
data.append((const char*)_datap + hdr_offs + offset, _dlen - hdr_offs - offset);
} else {
data.append((const char*)_datap + hdr_offs + offset, length);
}
return true;
}
void
JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, data_tok* dtokp, const bool transient)
{
handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp, transient));
if (_mgmtObject.get() != 0)
{
_mgmtObject->inc_enqueues();
_mgmtObject->inc_recordDepth();
}
}
void
JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, data_tok* dtokp,
const bool transient)
{
handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient));
if (_mgmtObject.get() != 0)
{
_mgmtObject->inc_enqueues();
_mgmtObject->inc_recordDepth();
}
}
void
JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, data_tok* dtokp, const std::string& xid, const bool transient)
{
bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len, dtokp, xid, transient));
if (_mgmtObject.get() != 0)
{
if (!txn_incr) // If this xid was not in _tmap, it will be now...
_mgmtObject->inc_txn();
_mgmtObject->inc_enqueues();
_mgmtObject->inc_txnEnqueues();
_mgmtObject->inc_recordDepth();
}
}
void
JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
const std::string& xid, const bool transient)
{
bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid, transient));
if (_mgmtObject.get() != 0)
{
if (!txn_incr) // If this xid was not in _tmap, it will be now...
_mgmtObject->inc_txn();
_mgmtObject->inc_enqueues();
_mgmtObject->inc_txnEnqueues();
_mgmtObject->inc_recordDepth();
}
}
void
JournalImpl::dequeue_data_record(data_tok* const dtokp, const bool txn_coml_commit)
{
handleIoResult(jcntl::dequeue_data_record(dtokp, txn_coml_commit));
if (_mgmtObject.get() != 0)
{
_mgmtObject->inc_dequeues();
_mgmtObject->inc_txnDequeues();
_mgmtObject->dec_recordDepth();
}
}
void
JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit)
{
bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid, txn_coml_commit));
if (_mgmtObject.get() != 0)
{
if (!txn_incr) // If this xid was not in _tmap, it will be now...
_mgmtObject->inc_txn();
_mgmtObject->inc_dequeues();
_mgmtObject->inc_txnDequeues();
_mgmtObject->dec_recordDepth();
}
}
void
JournalImpl::txn_abort(data_tok* const dtokp, const std::string& xid)
{
handleIoResult(jcntl::txn_abort(dtokp, xid));
if (_mgmtObject.get() != 0)
{
_mgmtObject->dec_txn();
_mgmtObject->inc_txnAborts();
}
}
void
JournalImpl::txn_commit(data_tok* const dtokp, const std::string& xid)
{
handleIoResult(jcntl::txn_commit(dtokp, xid));
if (_mgmtObject.get() != 0)
{
_mgmtObject->dec_txn();
_mgmtObject->inc_txnCommits();
}
}
void
JournalImpl::stop(bool block_till_aio_cmpl)
{
InactivityFireEvent* ifep = dynamic_cast<InactivityFireEvent*>(inactivityFireEventPtr.get());
assert(ifep); // dynamic_cast can return null if the cast fails
ifep->cancel();
jcntl::stop(block_till_aio_cmpl);
if (_mgmtObject.get() != 0) {
_mgmtObject->resourceDestroy();
_mgmtObject.reset();
}
}
iores
JournalImpl::flush(const bool block_till_aio_cmpl)
{
const iores res = jcntl::flush(block_till_aio_cmpl);
{
qpid::sys::Mutex::ScopedLock sl(_getf_lock);
if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) { setGetEventTimer(); }
}
return res;
}
void
JournalImpl::log(mrg::journal::log_level ll, const std::string& log_stmt) const
{
log(ll, log_stmt.c_str());
}
void
JournalImpl::log(mrg::journal::log_level ll, const char* const log_stmt) const
{
switch (ll)
{
case LOG_TRACE: QPID_LOG(trace, "Journal \"" << _jid << "\": " << log_stmt); break;
case LOG_DEBUG: QPID_LOG(debug, "Journal \"" << _jid << "\": " << log_stmt); break;
case LOG_INFO: QPID_LOG(info, "Journal \"" << _jid << "\": " << log_stmt); break;
case LOG_NOTICE: QPID_LOG(notice, "Journal \"" << _jid << "\": " << log_stmt); break;
case LOG_WARN: QPID_LOG(warning, "Journal \"" << _jid << "\": " << log_stmt); break;
case LOG_ERROR: QPID_LOG(error, "Journal \"" << _jid << "\": " << log_stmt); break;
case LOG_CRITICAL: QPID_LOG(critical, "Journal \"" << _jid << "\": " << log_stmt); break;
}
}
void
JournalImpl::getEventsFire()
{
qpid::sys::Mutex::ScopedLock sl(_getf_lock);
getEventsTimerSetFlag = false;
if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(0); }
if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); }
}
void
JournalImpl::flushFire()
{
if (writeActivityFlag) {
writeActivityFlag = false;
flushTriggeredFlag = false;
} else {
if (!flushTriggeredFlag) {
flush();
flushTriggeredFlag = true;
}
}
inactivityFireEventPtr->setupNextFire();
{
timer.add(inactivityFireEventPtr);
}
}
void
JournalImpl::wr_aio_cb(std::vector<data_tok*>& dtokl)
{
for (std::vector<data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++)
{
DataTokenImpl* dtokp = static_cast<DataTokenImpl*>(*i);
if (/*!is_stopped() &&*/ dtokp->getSourceMessage())
{
switch (dtokp->wstate())
{
case data_tok::ENQ:
dtokp->getSourceMessage()->enqueueComplete();
break;
case data_tok::DEQ:
/* Don't need to signal until we have a way to ack completion of dequeue in AMQP
dtokp->getSourceMessage()->dequeueComplete();
if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after last dequeue
dtokp->getSourceMessage()->setPersistenceId(0);
*/
break;
default: ;
}
}
dtokp->release();
}
}
void
JournalImpl::rd_aio_cb(std::vector<u_int16_t>& /*pil*/)
{}
void
JournalImpl::free_read_buffers()
{
if (_xidp) {
::free(_xidp);
_xidp = 0;
_datap = 0;
} else if (_datap) {
::free(_datap);
_datap = 0;
}
}
void
JournalImpl::handleIoResult(const iores r)
{
writeActivityFlag = true;
switch (r)
{
case mrg::journal::RHM_IORES_SUCCESS:
return;
case mrg::journal::RHM_IORES_ENQCAPTHRESH:
{
std::ostringstream oss;
oss << "Enqueue capacity threshold exceeded on queue \"" << _jid << "\".";
log(LOG_WARN, oss.str());
if (_agent != 0)
_agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventEnqThresholdExceeded(_jid, "Journal enqueue capacity threshold exceeded"),
qpid::management::ManagementAgent::SEV_WARN);
THROW_STORE_FULL_EXCEPTION(oss.str());
}
case mrg::journal::RHM_IORES_FULL:
{
std::ostringstream oss;
oss << "Journal full on queue \"" << _jid << "\".";
log(LOG_CRITICAL, oss.str());
if (_agent != 0)
_agent->raiseEvent(qmf::org::apache::qpid::legacystore::EventFull(_jid, "Journal full"), qpid::management::ManagementAgent::SEV_ERROR);
THROW_STORE_FULL_EXCEPTION(oss.str());
}
default:
{
std::ostringstream oss;
oss << "Unexpected I/O response (" << mrg::journal::iores_str(r) << ") on queue " << _jid << "\".";
log(LOG_ERROR, oss.str());
THROW_STORE_FULL_EXCEPTION(oss.str());
}
}
}
qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t methodId,
qpid::management::Args& /*args*/,
std::string& /*text*/)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
switch (methodId)
{
case _qmf::Journal::METHOD_EXPAND :
//_qmf::ArgsJournalExpand& eArgs = (_qmf::ArgsJournalExpand&) args;
// Implement "expand" using eArgs.i_by (expand-by argument)
status = Manageable::STATUS_NOT_IMPLEMENTED;
break;
}
return status;
}