blob: ffa379c5c9b70688255ef99c9f54fc28d13662da [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/linearstore/journal/jcntl.h"
#include <iomanip>
#include "qpid/linearstore/journal/data_tok.h"
#include "qpid/linearstore/journal/JournalLog.h"
namespace qpid {
namespace linearstore {
namespace journal {
#define AIO_CMPL_TIMEOUT_SEC 5
#define AIO_CMPL_TIMEOUT_NSEC 0
#define FINAL_AIO_CMPL_TIMEOUT_SEC 15
#define FINAL_AIO_CMPL_TIMEOUT_NSEC 0
// Static
timespec jcntl::_aio_cmpl_timeout; ///< Timeout for blocking libaio returns
timespec jcntl::_final_aio_cmpl_timeout; ///< Timeout for blocking libaio returns when stopping or finalizing
bool jcntl::_init = init_statics();
bool jcntl::init_statics()
{
_aio_cmpl_timeout.tv_sec = AIO_CMPL_TIMEOUT_SEC;
_aio_cmpl_timeout.tv_nsec = AIO_CMPL_TIMEOUT_NSEC;
_final_aio_cmpl_timeout.tv_sec = FINAL_AIO_CMPL_TIMEOUT_SEC;
_final_aio_cmpl_timeout.tv_nsec = FINAL_AIO_CMPL_TIMEOUT_NSEC;
return true;
}
// Functions
jcntl::jcntl(const std::string& jid,
const std::string& jdir,
JournalLog& jrnl_log):
_jid(jid),
_jdir(jdir),
_init_flag(false),
_stop_flag(false),
_readonly_flag(false),
_jrnl_log(jrnl_log),
_linearFileController(*this),
_emptyFilePoolPtr(0),
_emap(),
_tmap(),
_wmgr(this, _emap, _tmap, _linearFileController),
_recoveryManager(_jdir.dirname(), _jid, _emap, _tmap, jrnl_log)
{}
jcntl::~jcntl()
{
if (_init_flag && !_stop_flag)
try { stop(true); }
catch (const jexception& e) { std::cerr << e << std::endl; }
_linearFileController.finalize();
}
void
jcntl::initialize(EmptyFilePool* efpp,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
aio_callback* const cbp)
{
_init_flag = false;
_stop_flag = false;
_readonly_flag = false;
_emap.clear();
_tmap.clear();
_linearFileController.finalize();
_jdir.clear_dir(); // Clear any existing journal files
_linearFileController.initialize(_jdir.dirname(), efpp, 0ULL);
_linearFileController.getNextJournalFile();
_wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS, 0);
_init_flag = true;
}
void
jcntl::recover(EmptyFilePoolManager* efpmp,
const uint16_t wcache_num_pages,
const uint32_t wcache_pgsize_sblks,
aio_callback* const cbp,
const std::vector<std::string>* prep_txn_list_ptr,
uint64_t& highest_rid)
{
_init_flag = false;
_stop_flag = false;
_readonly_flag = false;
_emap.clear();
_tmap.clear();
_linearFileController.finalize();
// Verify journal dir and journal files
_jdir.verify_dir();
_recoveryManager.analyzeJournals(prep_txn_list_ptr, efpmp, &_emptyFilePoolPtr);
assert(_emptyFilePoolPtr != 0);
highest_rid = _recoveryManager.getHighestRecordId();
_jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toString(_jid, 5U));
_linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber());
_recoveryManager.setLinearFileControllerJournals(&qpid::linearstore::journal::LinearFileController::addJournalFile, &_linearFileController);
_recoveryManager.removeUninitFiles(_emptyFilePoolPtr);
if (_recoveryManager.isLastFileFull()) {
_linearFileController.getNextJournalFile();
}
_wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS,
(_recoveryManager.isLastFileFull() ? 0 : _recoveryManager.getEndOffset()));
_readonly_flag = true;
_init_flag = true;
}
void
jcntl::recover_complete()
{
if (!_readonly_flag)
throw jexception(jerrno::JERR_JCNTL_NOTRECOVERED, "jcntl", "recover_complete");
_recoveryManager.recoveryComplete();
_readonly_flag = false;
}
void
jcntl::delete_jrnl_files()
{
stop(true); // wait for AIO to complete
_linearFileController.closeCurrentJournal();
_linearFileController.purgeEmptyFilesToEfp(true);
_jdir.delete_dir();
}
iores
jcntl::enqueue_data_record(const void* const data_buff,
const std::size_t tot_data_len,
const std::size_t this_data_len,
data_tok* dtokp,
const bool transient)
{
iores r;
check_wstatus("enqueue_data_record");
{
slock s(_wr_mutex);
while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, 0, 0, false, transient, false), r,
dtokp)) ;
}
return r;
}
iores
jcntl::enqueue_extern_data_record(const std::size_t tot_data_len,
data_tok* dtokp,
const bool transient)
{
iores r;
check_wstatus("enqueue_extern_data_record");
{
slock s(_wr_mutex);
while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, 0, 0, false, transient, true), r, dtokp)) ;
}
return r;
}
iores
jcntl::enqueue_txn_data_record(const void* const data_buff,
const std::size_t tot_data_len,
const std::size_t this_data_len,
data_tok* dtokp,
const std::string& xid,
const bool tpc_flag,
const bool transient)
{
iores r;
check_wstatus("enqueue_tx_data_record");
{
slock s(_wr_mutex);
while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(),
tpc_flag, transient, false), r, dtokp)) ;
}
return r;
}
iores
jcntl::enqueue_extern_txn_data_record(const std::size_t tot_data_len,
data_tok* dtokp,
const std::string& xid,
const bool tpc_flag,
const bool transient)
{
iores r;
check_wstatus("enqueue_extern_txn_data_record");
{
slock s(_wr_mutex);
while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, xid.data(), xid.size(), tpc_flag, transient,
true), r, dtokp)) ;
}
return r;
}
iores
jcntl::read_data_record(void** const datapp,
std::size_t& dsize,
void** const xidpp,
std::size_t& xidsize,
bool& transient,
bool& external,
data_tok* const dtokp,
bool ignore_pending_txns)
{
check_rstatus("read_data");
if (_recoveryManager.readNextRemainingRecord(datapp, dsize, xidpp, xidsize, transient, external, dtokp, ignore_pending_txns)) {
return RHM_IORES_SUCCESS;
}
return RHM_IORES_EMPTY;
}
iores
jcntl::dequeue_data_record(data_tok* const dtokp,
const bool txn_coml_commit)
{
iores r;
check_wstatus("dequeue_data");
{
slock s(_wr_mutex);
while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0, false, txn_coml_commit), r, dtokp)) ;
}
return r;
}
iores
jcntl::dequeue_txn_data_record(data_tok* const dtokp,
const std::string& xid,
const bool tpc_flag,
const bool txn_coml_commit)
{
iores r;
check_wstatus("dequeue_data");
{
slock s(_wr_mutex);
while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size(), tpc_flag, txn_coml_commit), r, dtokp)) ;
}
return r;
}
iores
jcntl::txn_abort(data_tok* const dtokp,
const std::string& xid)
{
iores r;
check_wstatus("txn_abort");
{
slock s(_wr_mutex);
while (handle_aio_wait(_wmgr.abort(dtokp, xid.data(), xid.size()), r, dtokp)) ;
}
return r;
}
iores
jcntl::txn_commit(data_tok* const dtokp,
const std::string& xid)
{
iores r;
check_wstatus("txn_commit");
{
slock s(_wr_mutex);
while (handle_aio_wait(_wmgr.commit(dtokp, xid.data(), xid.size()), r, dtokp)) ;
}
return r;
}
bool
jcntl::is_txn_synced(const std::string& xid)
{
slock s(_wr_mutex);
bool res = _wmgr.is_txn_synced(xid);
return res;
}
int32_t
jcntl::get_wr_events(timespec* const timeout)
{
stlock t(_wr_mutex);
if (!t.locked())
return jerrno::LOCK_TAKEN;
return _wmgr.get_events(timeout, false);
}
void
jcntl::stop(const bool block_till_aio_cmpl)
{
if (_readonly_flag)
check_rstatus("stop");
else
check_wstatus("stop");
_stop_flag = true;
if (!_readonly_flag)
flush(block_till_aio_cmpl);
}
LinearFileController&
jcntl::getLinearFileControllerRef() {
return _linearFileController;
}
// static
std::string
jcntl::str2hexnum(const std::string& str) {
if (str.empty()) {
return "<null>";
}
std::ostringstream oss;
oss << "(" << str.size() << ")0x" << std::hex;
for (unsigned i=str.size(); i>0; --i) {
oss << std::setfill('0') << std::setw(2) << (uint16_t)(uint8_t)str[i-1];
}
return oss.str();
}
iores
jcntl::flush(const bool block_till_aio_cmpl)
{
if (!_init_flag)
return RHM_IORES_SUCCESS;
if (_readonly_flag)
throw jexception(jerrno::JERR_JCNTL_READONLY, "jcntl", "flush");
iores res;
{
slock s(_wr_mutex);
res = _wmgr.flush();
}
if (block_till_aio_cmpl)
aio_cmpl_wait();
return res;
}
// Protected/Private functions
void
jcntl::check_wstatus(const char* fn_name) const
{
if (!_init_flag)
throw jexception(jerrno::JERR__NINIT, "jcntl", fn_name);
if (_readonly_flag)
throw jexception(jerrno::JERR_JCNTL_READONLY, "jcntl", fn_name);
if (_stop_flag)
throw jexception(jerrno::JERR_JCNTL_STOPPED, "jcntl", fn_name);
}
void
jcntl::check_rstatus(const char* fn_name) const
{
if (!_init_flag)
throw jexception(jerrno::JERR__NINIT, "jcntl", fn_name);
if (_stop_flag)
throw jexception(jerrno::JERR_JCNTL_STOPPED, "jcntl", fn_name);
}
void
jcntl::aio_cmpl_wait()
{
//while (_wmgr.get_aio_evt_rem())
while (true)
{
uint32_t aer;
{
slock s(_wr_mutex);
aer = _wmgr.get_aio_evt_rem();
}
if (aer == 0) break; // no events left
if (get_wr_events(&_aio_cmpl_timeout) == jerrno::AIO_TIMEOUT)
throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "aio_cmpl_wait");
}
}
bool
jcntl::handle_aio_wait(const iores res, iores& resout, const data_tok* dtp)
{
resout = res;
if (res == RHM_IORES_PAGE_AIOWAIT)
{
while (_wmgr.curr_pg_blocked())
{
if (_wmgr.get_aio_evt_rem() == 0) {
//std::cout << "&&&&&& jcntl::handle_aio_wait() " << _wmgr.status_str() << std::endl; // DEBUG
throw jexception("_wmgr.curr_pg_blocked() with no events remaining"); // TODO - complete exception
}
if (_wmgr.get_events(&_aio_cmpl_timeout, false) == jerrno::AIO_TIMEOUT)
{
std::ostringstream oss;
oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: " << _wmgr.status_str();
_jrnl_log.log(JournalLog::LOG_CRITICAL, _jid, oss.str());
throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait");
}
}
return true;
}
else if (res == RHM_IORES_FILE_AIOWAIT)
{
// while (_wmgr.curr_file_blocked())
// {
// if (_wmgr.get_events(pmgr::UNUSED, &_aio_cmpl_timeout) == jerrno::AIO_TIMEOUT)
// {
// std::ostringstream oss;
// oss << "get_events() returned JERR_JCNTL_AIOCMPLWAIT; wmgr_status: " << _wmgr.status_str();
// this->log(LOG_CRITICAL, oss.str());
// throw jexception(jerrno::JERR_JCNTL_AIOCMPLWAIT, "jcntl", "handle_aio_wait");
// }
// }
// _wrfc.wr_reset();
resout = RHM_IORES_SUCCESS;
data_tok::write_state ws = dtp->wstate();
return ws == data_tok::ENQ_PART || ws == data_tok::DEQ_PART || ws == data_tok::ABORT_PART ||
ws == data_tok::COMMIT_PART;
}
return false;
}
}}}