blob: 4353fcfbcabcdb378e6ca876af4ca746051ef095 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
/**
* \file wmgr.cpp
*
* Qpid asynchronous store plugin library
*
* File containing code for class mrg::journal::wmgr (write manager). See
* comments in file wmgr.h for details.
*
* \author Kim van der Riet
*/
#include "qpid/legacystore/jrnl/wmgr.h"
#include <cassert>
#include <cerrno>
#include <cstdlib>
#include <cstring>
#include "qpid/legacystore/jrnl/file_hdr.h"
#include "qpid/legacystore/jrnl/jcntl.h"
#include "qpid/legacystore/jrnl/jerrno.h"
#include <sstream>
namespace mrg
{
namespace journal
{
wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc):
pmgr(jc, emap, tmap),
_wrfc(wrfc),
_max_dtokpp(0),
_max_io_wait_us(0),
_fhdr_base_ptr(0),
_fhdr_ptr_arr(0),
_fhdr_aio_cb_arr(0),
_cached_offset_dblks(0),
_jfsize_dblks(0),
_jfsize_pgs(0),
_num_jfiles(0),
_enq_busy(false),
_deq_busy(false),
_abort_busy(false),
_commit_busy(false),
_txn_pending_set()
{}
wmgr::wmgr(jcntl* jc, enq_map& emap, txn_map& tmap, wrfc& wrfc,
const u_int32_t max_dtokpp, const u_int32_t max_iowait_us):
pmgr(jc, emap, tmap /* , dtoklp */),
_wrfc(wrfc),
_max_dtokpp(max_dtokpp),
_max_io_wait_us(max_iowait_us),
_fhdr_base_ptr(0),
_fhdr_ptr_arr(0),
_fhdr_aio_cb_arr(0),
_cached_offset_dblks(0),
_jfsize_dblks(0),
_jfsize_pgs(0),
_num_jfiles(0),
_enq_busy(false),
_deq_busy(false),
_abort_busy(false),
_commit_busy(false),
_txn_pending_set()
{}
wmgr::~wmgr()
{
wmgr::clean();
}
void
wmgr::initialize(aio_callback* const cbp, const u_int32_t wcache_pgsize_sblks,
const u_int16_t wcache_num_pages, const u_int32_t max_dtokpp, const u_int32_t max_iowait_us,
std::size_t eo)
{
_enq_busy = false;
_deq_busy = false;
_abort_busy = false;
_commit_busy = false;
_max_dtokpp = max_dtokpp;
_max_io_wait_us = max_iowait_us;
initialize(cbp, wcache_pgsize_sblks, wcache_num_pages);
_jfsize_dblks = _jc->jfsize_sblks() * JRNL_SBLK_SIZE;
_jfsize_pgs = _jc->jfsize_sblks() / _cache_pgsize_sblks;
assert(_jc->jfsize_sblks() % JRNL_RMGR_PAGE_SIZE == 0);
if (eo)
{
const u_int32_t wr_pg_size_dblks = _cache_pgsize_sblks * JRNL_SBLK_SIZE;
u_int32_t data_dblks = (eo / JRNL_DBLK_SIZE) - 4; // 4 dblks for file hdr
_pg_cntr = data_dblks / wr_pg_size_dblks;
_pg_offset_dblks = data_dblks - (_pg_cntr * wr_pg_size_dblks);
}
}
iores
wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len,
const std::size_t this_data_len, data_tok* dtokp, const void* const xid_ptr,
const std::size_t xid_len, const bool transient, const bool external)
{
if (xid_len)
assert(xid_ptr != 0);
if (_deq_busy || _abort_busy || _commit_busy)
return RHM_IORES_BUSY;
if (this_data_len != tot_data_len && !external)
return RHM_IORES_NOTIMPL;
iores res = pre_write_check(WMGR_ENQUEUE, dtokp, xid_len, tot_data_len, external);
if (res != RHM_IORES_SUCCESS)
return res;
bool cont = false;
if (_enq_busy) // If enqueue() exited last time with RHM_IORES_FULL or RHM_IORES_PAGE_AIOWAIT
{
if (dtokp->wstate() == data_tok::ENQ_PART)
cont = true;
else
{
std::ostringstream oss;
oss << "This data_tok: id=" << dtokp->id() << " state=" << dtokp->wstate_str();
throw jexception(jerrno::JERR_WMGR_ENQDISCONT, oss.str(), "wmgr", "enqueue");
}
}
u_int64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _wrfc.get_incr_rid();
_enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len, _wrfc.owi(), transient,
external);
if (!cont)
{
dtokp->set_rid(rid);
dtokp->set_dequeue_rid(0);
if (xid_len)
dtokp->set_xid(xid_ptr, xid_len);
else
dtokp->clear_xid();
_enq_busy = true;
}
bool done = false;
while (!done)
{
assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE);
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
u_int32_t data_offs_dblks = dtokp->dblocks_written();
u_int32_t ret = _enq_rec.encode(wptr, data_offs_dblks,
(_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
if (data_offs_dblks == 0)
dtokp->set_fid(_wrfc.index());
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
dtokp->incr_dblocks_written(ret);
dtokp->incr_pg_cnt();
_page_cb_arr[_pg_index]._pdtokl->push_back(dtokp);
// Is the encoding of this record complete?
if (dtokp->dblocks_written() >= _enq_rec.rec_size_dblks())
{
// TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO returns.
dtokp->set_wstate(data_tok::ENQ_SUBM);
dtokp->set_dsize(tot_data_len);
// Only add this data token to page token list when submit is complete, this way
// long multi-page messages have their token on the page containing the END of the
// message. AIO callbacks will then only process this token when entire message is
// enqueued.
_wrfc.incr_enqcnt(dtokp->fid());
if (xid_len) // If part of transaction, add to transaction map
{
std::string xid((const char*)xid_ptr, xid_len);
_tmap.insert_txn_data(xid, txn_data(rid, 0, dtokp->fid(), true));
}
else
{
if (_emap.insert_pfid(rid, dtokp->fid()) < enq_map::EMAP_OK) // fail
{
// The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
std::ostringstream oss;
oss << std::hex << "rid=0x" << rid << " _pfid=0x" << dtokp->fid();
throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "wmgr", "enqueue");
}
}
done = true;
}
else
dtokp->set_wstate(data_tok::ENQ_PART);
file_header_check(rid, cont, _enq_rec.rec_size_dblks() - data_offs_dblks);
flush_check(res, cont, done);
}
if (dtokp->wstate() >= data_tok::ENQ_SUBM)
_enq_busy = false;
return res;
}
iores
wmgr::dequeue(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len, const bool txn_coml_commit)
{
if (xid_len)
assert(xid_ptr != 0);
if (_enq_busy || _abort_busy || _commit_busy)
return RHM_IORES_BUSY;
iores res = pre_write_check(WMGR_DEQUEUE, dtokp);
if (res != RHM_IORES_SUCCESS)
return res;
bool cont = false;
if (_deq_busy) // If dequeue() exited last time with RHM_IORES_FULL or RHM_IORES_PAGE_AIOWAIT
{
if (dtokp->wstate() == data_tok::DEQ_PART)
cont = true;
else
{
std::ostringstream oss;
oss << "This data_tok: id=" << dtokp->id() << " state=" << dtokp->wstate_str();
throw jexception(jerrno::JERR_WMGR_DEQDISCONT, oss.str(), "wmgr", "dequeue");
}
}
const bool ext_rid = dtokp->external_rid();
u_int64_t rid = (ext_rid | cont) ? dtokp->rid() : _wrfc.get_incr_rid();
u_int64_t dequeue_rid = (ext_rid | cont) ? dtokp->dequeue_rid() : dtokp->rid();
_deq_rec.reset(rid, dequeue_rid, xid_ptr, xid_len, _wrfc.owi(), txn_coml_commit);
if (!cont)
{
if (!ext_rid)
{
dtokp->set_rid(rid);
dtokp->set_dequeue_rid(dequeue_rid);
}
if (xid_len)
dtokp->set_xid(xid_ptr, xid_len);
else
dtokp->clear_xid();
dequeue_check(dtokp->xid(), dequeue_rid);
dtokp->set_dblocks_written(0); // Reset dblks_written from previous op
_deq_busy = true;
}
bool done = false;
while (!done)
{
assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE);
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
u_int32_t data_offs_dblks = dtokp->dblocks_written();
u_int32_t ret = _deq_rec.encode(wptr, data_offs_dblks,
(_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
if (data_offs_dblks == 0)
dtokp->set_fid(_wrfc.index());
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
dtokp->incr_dblocks_written(ret);
dtokp->incr_pg_cnt();
_page_cb_arr[_pg_index]._pdtokl->push_back(dtokp);
// Is the encoding of this record complete?
if (dtokp->dblocks_written() >= _deq_rec.rec_size_dblks())
{
// TODO: Incorrect - must set state to ENQ_CACHED; ENQ_SUBM is set when AIO returns.
dtokp->set_wstate(data_tok::DEQ_SUBM);
if (xid_len) // If part of transaction, add to transaction map
{
// If the enqueue is part of a pending txn, it will not yet be in emap
_emap.lock(dequeue_rid); // ignore rid not found error
std::string xid((const char*)xid_ptr, xid_len);
_tmap.insert_txn_data(xid, txn_data(rid, dequeue_rid, dtokp->fid(), false));
}
else
{
int16_t fid = _emap.get_remove_pfid(dtokp->dequeue_rid());
if (fid < enq_map::EMAP_OK) // fail
{
if (fid == enq_map::EMAP_RID_NOT_FOUND)
{
std::ostringstream oss;
oss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue");
}
if (fid == enq_map::EMAP_LOCKED)
{
std::ostringstream oss;
oss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue");
}
}
_wrfc.decr_enqcnt(fid);
}
done = true;
}
else
dtokp->set_wstate(data_tok::DEQ_PART);
file_header_check(rid, cont, _deq_rec.rec_size_dblks() - data_offs_dblks);
flush_check(res, cont, done);
}
if (dtokp->wstate() >= data_tok::DEQ_SUBM)
_deq_busy = false;
return res;
}
iores
wmgr::abort(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len)
{
// commit and abort MUST have a valid xid
assert(xid_ptr != 0 && xid_len > 0);
if (_enq_busy || _deq_busy || _commit_busy)
return RHM_IORES_BUSY;
iores res = pre_write_check(WMGR_ABORT, dtokp);
if (res != RHM_IORES_SUCCESS)
return res;
bool cont = false;
if (_abort_busy) // If abort() exited last time with RHM_IORES_FULL or RHM_IORES_PAGE_AIOWAIT
{
if (dtokp->wstate() == data_tok::ABORT_PART)
cont = true;
else
{
std::ostringstream oss;
oss << "This data_tok: id=" << dtokp->id() << " state=" << dtokp->wstate_str();
throw jexception(jerrno::JERR_WMGR_DEQDISCONT, oss.str(), "wmgr", "abort");
}
}
u_int64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _wrfc.get_incr_rid();
_txn_rec.reset(RHM_JDAT_TXA_MAGIC, rid, xid_ptr, xid_len, _wrfc.owi());
if (!cont)
{
dtokp->set_rid(rid);
dtokp->set_dequeue_rid(0);
dtokp->set_xid(xid_ptr, xid_len);
dtokp->set_dblocks_written(0); // Reset dblks_written from previous op
_abort_busy = true;
}
bool done = false;
while (!done)
{
assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE);
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
u_int32_t data_offs_dblks = dtokp->dblocks_written();
u_int32_t ret = _txn_rec.encode(wptr, data_offs_dblks,
(_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
if (data_offs_dblks == 0)
dtokp->set_fid(_wrfc.index());
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
dtokp->incr_dblocks_written(ret);
dtokp->incr_pg_cnt();
_page_cb_arr[_pg_index]._pdtokl->push_back(dtokp);
// Is the encoding of this record complete?
if (dtokp->dblocks_written() >= _txn_rec.rec_size_dblks())
{
dtokp->set_wstate(data_tok::ABORT_SUBM);
// Delete this txn from tmap, unlock any locked records in emap
std::string xid((const char*)xid_ptr, xid_len);
txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (!itr->_enq_flag)
_emap.unlock(itr->_drid); // ignore rid not found error
if (itr->_enq_flag)
_wrfc.decr_enqcnt(itr->_pfid);
}
std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid);
if (!res.second)
{
std::ostringstream oss;
oss << std::hex << "_txn_pending_set: xid=\"" << xid << "\"";
throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "wmgr", "abort");
}
done = true;
}
else
dtokp->set_wstate(data_tok::ABORT_PART);
file_header_check(rid, cont, _txn_rec.rec_size_dblks() - data_offs_dblks);
flush_check(res, cont, done);
}
if (dtokp->wstate() >= data_tok::ABORT_SUBM)
_abort_busy = false;
return res;
}
iores
wmgr::commit(data_tok* dtokp, const void* const xid_ptr, const std::size_t xid_len)
{
// commit and abort MUST have a valid xid
assert(xid_ptr != 0 && xid_len > 0);
if (_enq_busy || _deq_busy || _abort_busy)
return RHM_IORES_BUSY;
iores res = pre_write_check(WMGR_COMMIT, dtokp);
if (res != RHM_IORES_SUCCESS)
return res;
bool cont = false;
if (_commit_busy) // If commit() exited last time with RHM_IORES_FULL or RHM_IORES_PAGE_AIOWAIT
{
if (dtokp->wstate() == data_tok::COMMIT_PART)
cont = true;
else
{
std::ostringstream oss;
oss << "This data_tok: id=" << dtokp->id() << " state=" << dtokp->wstate_str();
throw jexception(jerrno::JERR_WMGR_DEQDISCONT, oss.str(), "wmgr", "commit");
}
}
u_int64_t rid = (dtokp->external_rid() | cont) ? dtokp->rid() : _wrfc.get_incr_rid();
_txn_rec.reset(RHM_JDAT_TXC_MAGIC, rid, xid_ptr, xid_len, _wrfc.owi());
if (!cont)
{
dtokp->set_rid(rid);
dtokp->set_dequeue_rid(0);
dtokp->set_xid(xid_ptr, xid_len);
dtokp->set_dblocks_written(0); // Reset dblks_written from previous op
_commit_busy = true;
}
bool done = false;
while (!done)
{
assert(_pg_offset_dblks < _cache_pgsize_sblks * JRNL_SBLK_SIZE);
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
u_int32_t data_offs_dblks = dtokp->dblocks_written();
u_int32_t ret = _txn_rec.encode(wptr, data_offs_dblks,
(_cache_pgsize_sblks * JRNL_SBLK_SIZE) - _pg_offset_dblks);
// Remember fid which contains the record header in case record is split over several files
if (data_offs_dblks == 0)
dtokp->set_fid(_wrfc.index());
_pg_offset_dblks += ret;
_cached_offset_dblks += ret;
dtokp->incr_dblocks_written(ret);
dtokp->incr_pg_cnt();
_page_cb_arr[_pg_index]._pdtokl->push_back(dtokp);
// Is the encoding of this record complete?
if (dtokp->dblocks_written() >= _txn_rec.rec_size_dblks())
{
dtokp->set_wstate(data_tok::COMMIT_SUBM);
// Delete this txn from tmap, process records into emap
std::string xid((const char*)xid_ptr, xid_len);
txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (itr->_enq_flag) // txn enqueue
{
if (_emap.insert_pfid(itr->_rid, itr->_pfid) < enq_map::EMAP_OK) // fail
{
// The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
std::ostringstream oss;
oss << std::hex << "rid=0x" << itr->_rid << " _pfid=0x" << itr->_pfid;
throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "wmgr", "commit");
}
}
else // txn dequeue
{
int16_t fid = _emap.get_remove_pfid(itr->_drid, true);
if (fid < enq_map::EMAP_OK) // fail
{
if (fid == enq_map::EMAP_RID_NOT_FOUND)
{
std::ostringstream oss;
oss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr", "dequeue");
}
if (fid == enq_map::EMAP_LOCKED)
{
std::ostringstream oss;
oss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue");
}
}
_wrfc.decr_enqcnt(fid);
}
}
std::pair<std::set<std::string>::iterator, bool> res = _txn_pending_set.insert(xid);
if (!res.second)
{
std::ostringstream oss;
oss << std::hex << "_txn_pending_set: xid=\"" << xid << "\"";
throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "wmgr", "commit");
}
done = true;
}
else
dtokp->set_wstate(data_tok::COMMIT_PART);
file_header_check(rid, cont, _txn_rec.rec_size_dblks() - data_offs_dblks);
flush_check(res, cont, done);
}
if (dtokp->wstate() >= data_tok::COMMIT_SUBM)
_commit_busy = false;
return res;
}
void
wmgr::file_header_check(const u_int64_t rid, const bool cont, const u_int32_t rec_dblks_rem)
{
// Has the file header been written (i.e. write pointers still at 0)?
if (_wrfc.is_void())
{
bool file_fit = rec_dblks_rem <= _jfsize_dblks;
bool file_full = rec_dblks_rem == _jfsize_dblks;
std::size_t fro = 0;
if (cont)
{
if (file_fit && !file_full)
fro = (rec_dblks_rem + JRNL_SBLK_SIZE) * JRNL_DBLK_SIZE;
}
else
fro = JRNL_SBLK_SIZE * JRNL_DBLK_SIZE;
write_fhdr(rid, _wrfc.index(), _wrfc.index(), fro);
}
}
void
wmgr::flush_check(iores& res, bool& cont, bool& done)
{
// Is page is full, flush
if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE)
{
res = write_flush();
assert(res == RHM_IORES_SUCCESS);
if (_page_cb_arr[_pg_index]._state == AIO_PENDING && !done)
{
res = RHM_IORES_PAGE_AIOWAIT;
done = true;
}
// If file is full, rotate to next file
if (_pg_cntr >= _jfsize_pgs)
{
iores rfres = rotate_file();
if (rfres != RHM_IORES_SUCCESS)
res = rfres;
if (!done)
{
if (rfres == RHM_IORES_SUCCESS)
cont = true;
else
done = true;
}
}
}
}
iores
wmgr::flush()
{
iores res = write_flush();
if (_pg_cntr >= _jfsize_pgs)
{
iores rfres = rotate_file();
if (rfres != RHM_IORES_SUCCESS)
res = rfres;
}
return res;
}
iores
wmgr::write_flush()
{
iores res = RHM_IORES_SUCCESS;
// Don't bother flushing an empty page or one that is still in state AIO_PENDING
if (_cached_offset_dblks)
{
if (_page_cb_arr[_pg_index]._state == AIO_PENDING)
res = RHM_IORES_PAGE_AIOWAIT;
else
{
if (_page_cb_arr[_pg_index]._state != IN_USE)
{
std::ostringstream oss;
oss << "pg_index=" << _pg_index << " state=" << _page_cb_arr[_pg_index].state_str();
throw jexception(jerrno::JERR_WMGR_BADPGSTATE, oss.str(), "wmgr",
"write_flush");
}
// Send current page using AIO
// In manual flushes, dblks may not coincide with sblks, add filler records ("RHMx")
// if necessary.
dblk_roundup();
std::size_t pg_offs = (_pg_offset_dblks - _cached_offset_dblks) * JRNL_DBLK_SIZE;
aio_cb* aiocbp = &_aio_cb_arr[_pg_index];
aio::prep_pwrite_2(aiocbp, _wrfc.fh(),
(char*)_page_ptr_arr[_pg_index] + pg_offs, _cached_offset_dblks * JRNL_DBLK_SIZE,
_wrfc.subm_offs());
page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb)
pcbp->_wdblks = _cached_offset_dblks;
pcbp->_wfh = _wrfc.file_controller();
if (aio::submit(_ioctx, 1, &aiocbp) < 0)
throw jexception(jerrno::JERR__AIO, "wmgr", "write_flush");
_wrfc.add_subm_cnt_dblks(_cached_offset_dblks);
_wrfc.incr_aio_cnt();
_aio_evt_rem++;
_cached_offset_dblks = 0;
_jc->instr_incr_outstanding_aio_cnt();
rotate_page(); // increments _pg_index, resets _pg_offset_dblks if req'd
if (_page_cb_arr[_pg_index]._state == UNUSED)
_page_cb_arr[_pg_index]._state = IN_USE;
}
}
get_events(UNUSED, 0);
if (_page_cb_arr[_pg_index]._state == UNUSED)
_page_cb_arr[_pg_index]._state = IN_USE;
return res;
}
iores
wmgr::rotate_file()
{
_pg_cntr = 0;
iores res = _wrfc.rotate();
_jc->chk_wr_frot();
return res;
}
int32_t
wmgr::get_events(page_state state, timespec* const timeout, bool flush)
{
if (_aio_evt_rem == 0) // no events to get
return 0;
int ret = 0;
if ((ret = aio::getevents(_ioctx, flush ? _aio_evt_rem : 1, _aio_evt_rem/*_cache_num_pages + _jc->num_jfiles()*/, _aio_event_arr, timeout)) < 0)
{
if (ret == -EINTR) // Interrupted by signal
return 0;
std::ostringstream oss;
oss << "io_getevents() failed: " << std::strerror(-ret) << " (" << ret << ")";
throw jexception(jerrno::JERR__AIO, oss.str(), "wmgr", "get_events");
}
if (ret == 0 && timeout)
return jerrno::AIO_TIMEOUT;
int32_t tot_data_toks = 0;
for (int i=0; i<ret; i++) // Index of returned AIOs
{
if (_aio_evt_rem == 0)
{
std::ostringstream oss;
oss << "_aio_evt_rem; evt " << (i + 1) << " of " << ret;
throw jexception(jerrno::JERR__UNDERFLOW, oss.str(), "wmgr", "get_events");
}
_aio_evt_rem--;
aio_cb* aiocbp = _aio_event_arr[i].obj; // This I/O control block (iocb)
page_cb* pcbp = (page_cb*)(aiocbp->data); // This page control block (pcb)
long aioret = (long)_aio_event_arr[i].res;
if (aioret < 0)
{
std::ostringstream oss;
oss << "AIO write operation failed: " << std::strerror(-aioret) << " (" << aioret << ") [";
if (pcbp)
oss << "pg=" << pcbp->_index;
else
{
file_hdr* fhp = (file_hdr*)aiocbp->u.c.buf;
oss << "fid=" << fhp->_pfid;
}
oss << " size=" << aiocbp->u.c.nbytes;
oss << " offset=" << aiocbp->u.c.offset << " fh=" << aiocbp->aio_fildes << "]";
throw jexception(jerrno::JERR__AIO, oss.str(), "wmgr", "get_events");
}
if (pcbp) // Page writes have pcb
{
u_int32_t s = pcbp->_pdtokl->size();
std::vector<data_tok*> dtokl;
dtokl.reserve(s);
for (u_int32_t k=0; k<s; k++)
{
data_tok* dtokp = pcbp->_pdtokl->at(k);
if (dtokp->decr_pg_cnt() == 0)
{
std::set<std::string>::iterator it;
switch (dtokp->wstate())
{
case data_tok::ENQ_SUBM:
dtokl.push_back(dtokp);
tot_data_toks++;
dtokp->set_wstate(data_tok::ENQ);
if (dtokp->has_xid())
// Ignoring return value here. A non-zero return can signify that the transaction
// has committed or aborted, and which was completed prior to the aio returning.
_tmap.set_aio_compl(dtokp->xid(), dtokp->rid());
break;
case data_tok::DEQ_SUBM:
dtokl.push_back(dtokp);
tot_data_toks++;
dtokp->set_wstate(data_tok::DEQ);
if (dtokp->has_xid())
// Ignoring return value - see note above.
_tmap.set_aio_compl(dtokp->xid(), dtokp->rid());
break;
case data_tok::ABORT_SUBM:
dtokl.push_back(dtokp);
tot_data_toks++;
dtokp->set_wstate(data_tok::ABORTED);
it = _txn_pending_set.find(dtokp->xid());
if (it == _txn_pending_set.end())
{
std::ostringstream oss;
oss << std::hex << "_txn_pending_set: abort xid=\"";
oss << dtokp->xid() << "\"";
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr",
"get_events");
}
_txn_pending_set.erase(it);
break;
case data_tok::COMMIT_SUBM:
dtokl.push_back(dtokp);
tot_data_toks++;
dtokp->set_wstate(data_tok::COMMITTED);
it = _txn_pending_set.find(dtokp->xid());
if (it == _txn_pending_set.end())
{
std::ostringstream oss;
oss << std::hex << "_txn_pending_set: commit xid=\"";
oss << dtokp->xid() << "\"";
throw jexception(jerrno::JERR_MAP_NOTFOUND, oss.str(), "wmgr",
"get_events");
}
_txn_pending_set.erase(it);
break;
case data_tok::ENQ_PART:
case data_tok::DEQ_PART:
case data_tok::ABORT_PART:
case data_tok::COMMIT_PART:
// ignore these
break;
default:
// throw for anything else
std::ostringstream oss;
oss << "dtok_id=" << dtokp->id() << " dtok_state=" << dtokp->wstate_str();
throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(), "wmgr",
"get_events");
} // switch
} // if
} // for
// Increment the completed write offset
// NOTE: We cannot use _wrfc here, as it may have rotated since submitting count.
// Use stored pointer to fcntl in the pcb instead.
pcbp->_wfh->add_wr_cmpl_cnt_dblks(pcbp->_wdblks);
pcbp->_wfh->decr_aio_cnt();
_jc->instr_decr_outstanding_aio_cnt();
// Clean up this pcb's data_tok list
pcbp->_pdtokl->clear();
pcbp->_state = state;
// Perform AIO return callback
if (_cbp && tot_data_toks)
_cbp->wr_aio_cb(dtokl);
}
else // File header writes have no pcb
{
// get lfid from original file header record, update info for that lfid
file_hdr* fhp = (file_hdr*)aiocbp->u.c.buf;
u_int32_t lfid = fhp->_lfid;
fcntl* fcntlp = _jc->get_fcntlp(lfid);
fcntlp->add_wr_cmpl_cnt_dblks(JRNL_SBLK_SIZE);
fcntlp->decr_aio_cnt();
fcntlp->set_wr_fhdr_aio_outstanding(false);
}
}
return tot_data_toks;
}
bool
wmgr::is_txn_synced(const std::string& xid)
{
// Ignore xid not found error here
if (_tmap.is_txn_synced(xid) == txn_map::TMAP_NOT_SYNCED)
return false;
// Check for outstanding commit/aborts
std::set<std::string>::iterator it = _txn_pending_set.find(xid);
return it == _txn_pending_set.end();
}
void
wmgr::initialize(aio_callback* const cbp, const u_int32_t wcache_pgsize_sblks, const u_int16_t wcache_num_pages)
{
pmgr::initialize(cbp, wcache_pgsize_sblks, wcache_num_pages);
wmgr::clean();
_num_jfiles = _jc->num_jfiles();
if (::posix_memalign(&_fhdr_base_ptr, _sblksize, _sblksize * _num_jfiles))
{
wmgr::clean();
std::ostringstream oss;
oss << "posix_memalign(): blksize=" << _sblksize << " size=" << _sblksize;
oss << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR__MALLOC, oss.str(), "wmgr", "initialize");
}
_fhdr_ptr_arr = (void**)std::malloc(_num_jfiles * sizeof(void*));
MALLOC_CHK(_fhdr_ptr_arr, "_fhdr_ptr_arr", "wmgr", "initialize");
_fhdr_aio_cb_arr = (aio_cb**)std::malloc(sizeof(aio_cb*) * _num_jfiles);
MALLOC_CHK(_fhdr_aio_cb_arr, "_fhdr_aio_cb_arr", "wmgr", "initialize");
std::memset(_fhdr_aio_cb_arr, 0, sizeof(aio_cb*) * _num_jfiles);
for (u_int16_t i=0; i<_num_jfiles; i++)
{
_fhdr_ptr_arr[i] = (void*)((char*)_fhdr_base_ptr + _sblksize * i);
_fhdr_aio_cb_arr[i] = new aio_cb;
}
_page_cb_arr[0]._state = IN_USE;
_ddtokl.clear();
_cached_offset_dblks = 0;
_enq_busy = false;
}
iores
wmgr::pre_write_check(const _op_type op, const data_tok* const dtokp,
const std::size_t xidsize, const std::size_t dsize, const bool external
) const
{
// Check status of current file
if (!_wrfc.is_wr_reset())
{
if (!_wrfc.wr_reset())
return RHM_IORES_FULL;
}
// Check status of current page is ok for writing
if (_page_cb_arr[_pg_index]._state != IN_USE)
{
if (_page_cb_arr[_pg_index]._state == UNUSED)
_page_cb_arr[_pg_index]._state = IN_USE;
else if (_page_cb_arr[_pg_index]._state == AIO_PENDING)
return RHM_IORES_PAGE_AIOWAIT;
else
{
std::ostringstream oss;
oss << "jrnl=" << _jc->id() << " op=" << _op_str[op];
oss << " index=" << _pg_index << " pg_state=" << _page_cb_arr[_pg_index].state_str();
throw jexception(jerrno::JERR_WMGR_BADPGSTATE, oss.str(), "wmgr", "pre_write_check");
}
}
// operation-specific checks
switch (op)
{
case WMGR_ENQUEUE:
{
// Check for enqueue reaching cutoff threshold
u_int32_t size_dblks = jrec::size_dblks(enq_rec::rec_size(xidsize, dsize,
external));
if (!_enq_busy && _wrfc.enq_threshold(_cached_offset_dblks + size_dblks))
return RHM_IORES_ENQCAPTHRESH;
if (!dtokp->is_writable())
{
std::ostringstream oss;
oss << "jrnl=" << _jc->id() << " op=" << _op_str[op];
oss << " dtok_id=" << dtokp->id() << " dtok_state=" << dtokp->wstate_str();
throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(), "wmgr",
"pre_write_check");
}
}
break;
case WMGR_DEQUEUE:
if (!dtokp->is_dequeueable())
{
std::ostringstream oss;
oss << "jrnl=" << _jc->id() << " op=" << _op_str[op];
oss << " dtok_id=" << dtokp->id() << " dtok_state=" << dtokp->wstate_str();
throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, oss.str(), "wmgr",
"pre_write_check");
}
break;
case WMGR_ABORT:
break;
case WMGR_COMMIT:
break;
}
return RHM_IORES_SUCCESS;
}
void
wmgr::dequeue_check(const std::string& xid, const u_int64_t drid)
{
// First check emap
bool found = false;
int16_t fid = _emap.get_pfid(drid);
if (fid < enq_map::EMAP_OK) // fail
{
if (fid == enq_map::EMAP_RID_NOT_FOUND)
{
if (xid.size())
found = _tmap.data_exists(xid, drid);
}
else if (fid == enq_map::EMAP_LOCKED)
{
std::ostringstream oss;
oss << std::hex << "drid=0x" << drid;
throw jexception(jerrno::JERR_MAP_LOCKED, oss.str(), "wmgr", "dequeue_check");
}
}
else
found = true;
if (!found)
{
std::ostringstream oss;
oss << "jrnl=" << _jc->id() << " drid=0x" << std::hex << drid;
throw jexception(jerrno::JERR_WMGR_DEQRIDNOTENQ, oss.str(), "wmgr", "dequeue_check");
}
}
void
wmgr::dblk_roundup()
{
const u_int32_t xmagic = RHM_JDAT_EMPTY_MAGIC;
u_int32_t wdblks = jrec::size_blks(_cached_offset_dblks, JRNL_SBLK_SIZE) * JRNL_SBLK_SIZE;
while (_cached_offset_dblks < wdblks)
{
void* wptr = (void*)((char*)_page_ptr_arr[_pg_index] + _pg_offset_dblks * JRNL_DBLK_SIZE);
std::memcpy(wptr, (const void*)&xmagic, sizeof(xmagic));
#ifdef RHM_CLEAN
std::memset((char*)wptr + sizeof(xmagic), RHM_CLEAN_CHAR, JRNL_DBLK_SIZE - sizeof(xmagic));
#endif
_pg_offset_dblks++;
_cached_offset_dblks++;
}
}
void
wmgr::write_fhdr(u_int64_t rid, u_int16_t fid, u_int16_t lid, std::size_t fro)
{
file_hdr fhdr(RHM_JDAT_FILE_MAGIC, RHM_JDAT_VERSION, rid, fid, lid, fro, _wrfc.owi(), true);
std::memcpy(_fhdr_ptr_arr[fid], &fhdr, sizeof(fhdr));
#ifdef RHM_CLEAN
std::memset((char*)_fhdr_ptr_arr[fid] + sizeof(fhdr), RHM_CLEAN_CHAR, _sblksize - sizeof(fhdr));
#endif
aio_cb* aiocbp = _fhdr_aio_cb_arr[fid];
aio::prep_pwrite(aiocbp, _wrfc.fh(), _fhdr_ptr_arr[fid], _sblksize, 0);
if (aio::submit(_ioctx, 1, &aiocbp) < 0)
throw jexception(jerrno::JERR__AIO, "wmgr", "write_fhdr");
_aio_evt_rem++;
_wrfc.add_subm_cnt_dblks(JRNL_SBLK_SIZE);
_wrfc.incr_aio_cnt();
_wrfc.file_controller()->set_wr_fhdr_aio_outstanding(true);
}
void
wmgr::rotate_page()
{
_page_cb_arr[_pg_index]._state = AIO_PENDING;
if (_pg_offset_dblks >= _cache_pgsize_sblks * JRNL_SBLK_SIZE)
{
_pg_offset_dblks = 0;
_pg_cntr++;
}
if (++_pg_index >= _cache_num_pages)
_pg_index = 0;
}
void
wmgr::clean()
{
std::free(_fhdr_base_ptr);
_fhdr_base_ptr = 0;
std::free(_fhdr_ptr_arr);
_fhdr_ptr_arr = 0;
if (_fhdr_aio_cb_arr)
{
for (u_int32_t i=0; i<_num_jfiles; i++)
delete _fhdr_aio_cb_arr[i];
std::free(_fhdr_aio_cb_arr);
_fhdr_aio_cb_arr = 0;
}
}
const std::string
wmgr::status_str() const
{
std::ostringstream oss;
oss << "wmgr: pi=" << _pg_index << " pc=" << _pg_cntr;
oss << " po=" << _pg_offset_dblks << " aer=" << _aio_evt_rem;
oss << " edac:" << (_enq_busy?"T":"F") << (_deq_busy?"T":"F");
oss << (_abort_busy?"T":"F") << (_commit_busy?"T":"F");
oss << " ps=[";
for (int i=0; i<_cache_num_pages; i++)
{
switch (_page_cb_arr[i]._state)
{
case UNUSED: oss << "-"; break;
case IN_USE: oss << "U"; break;
case AIO_PENDING: oss << "A"; break;
case AIO_COMPLETE: oss << "*"; break;
default: oss << _page_cb_arr[i]._state;
}
}
oss << "] " << _wrfc.status_str();
return oss.str();
}
// static
const char* wmgr::_op_str[] = {"enqueue", "dequeue", "abort", "commit"};
} // namespace journal
} // namespace mrg