blob: fbb176667ef4add77a8f25be07286ca45a6d82f9 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
/**
* \file fcntl.cpp
*
* Qpid asynchronous store plugin library
*
* File containing code for class mrg::journal::fcntl (non-logging file
* handle), used for controlling journal log files. See comments in file
* fcntl.h for details.
*/
#include "qpid/legacystore/jrnl/fcntl.h"
#include <cerrno>
#include <cstdlib>
#include <cstring>
#include <fcntl.h>
#include <iomanip>
#include "qpid/legacystore/jrnl/jerrno.h"
#include "qpid/legacystore/jrnl/jexception.h"
#include <sstream>
#include <unistd.h>
namespace mrg
{
namespace journal
{
fcntl::fcntl(const std::string& fbasename, const u_int16_t pfid, const u_int16_t lfid, const u_int32_t jfsize_sblks,
const rcvdat* const ro):
_fname(),
_pfid(pfid),
_lfid(lfid),
_ffull_dblks(JRNL_SBLK_SIZE * (jfsize_sblks + 1)),
_wr_fh(-1),
_rec_enqcnt(0),
_rd_subm_cnt_dblks(0),
_rd_cmpl_cnt_dblks(0),
_wr_subm_cnt_dblks(0),
_wr_cmpl_cnt_dblks(0),
_aio_cnt(0),
_fhdr_wr_aio_outstanding(false)
{
initialize(fbasename, pfid, lfid, jfsize_sblks, ro);
open_wr_fh();
}
fcntl::~fcntl()
{
close_wr_fh();
}
bool
fcntl::reset(const rcvdat* const ro)
{
rd_reset();
return wr_reset(ro);
}
void
fcntl::rd_reset()
{
_rd_subm_cnt_dblks = 0;
_rd_cmpl_cnt_dblks = 0;
}
bool
fcntl::wr_reset(const rcvdat* const ro)
{
if (ro)
{
if (!ro->_jempty)
{
if (ro->_lfid == _pfid)
{
_wr_subm_cnt_dblks = ro->_eo/JRNL_DBLK_SIZE;
_wr_cmpl_cnt_dblks = ro->_eo/JRNL_DBLK_SIZE;
}
else
{
_wr_subm_cnt_dblks = _ffull_dblks;
_wr_cmpl_cnt_dblks = _ffull_dblks;
}
_rec_enqcnt = ro->_enq_cnt_list[_pfid];
return true;
}
}
// Journal overflow test - checks if the file to be reset still contains enqueued records
// or outstanding aios
if (_rec_enqcnt || _aio_cnt)
return false;
_wr_subm_cnt_dblks = 0;
_wr_cmpl_cnt_dblks = 0;
return true;
}
int
fcntl::open_wr_fh()
{
if (_wr_fh < 0)
{
_wr_fh = ::open(_fname.c_str(), O_WRONLY | O_DIRECT, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r--
if (_wr_fh < 0)
{
std::ostringstream oss;
oss << "pfid=" << _pfid << " lfid=" << _lfid << " file=\"" << _fname << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_FCNTL_OPENWR, oss.str(), "fcntl", "open_fh");
}
}
return _wr_fh;
}
void
fcntl::close_wr_fh()
{
if (_wr_fh >= 0)
{
::close(_wr_fh);
_wr_fh = -1;
}
}
u_int32_t
fcntl::add_enqcnt(u_int32_t a)
{
_rec_enqcnt += a;
return _rec_enqcnt;
}
u_int32_t
fcntl::decr_enqcnt()
{
if (_rec_enqcnt == 0)
{
std::ostringstream oss;
oss << "pfid=" << _pfid << " lfid=" << _lfid;
throw jexception(jerrno::JERR__UNDERFLOW, oss.str(), "fcntl", "decr_enqcnt");
}
return --_rec_enqcnt;
}
u_int32_t
fcntl::subtr_enqcnt(u_int32_t s)
{
if (_rec_enqcnt < s)
{
std::ostringstream oss;
oss << "pfid=" << _pfid << " lfid=" << _lfid << " rec_enqcnt=" << _rec_enqcnt << " decr=" << s;
throw jexception(jerrno::JERR__UNDERFLOW, oss.str(), "fcntl", "subtr_enqcnt");
}
_rec_enqcnt -= s;
return _rec_enqcnt;
}
u_int32_t
fcntl::add_rd_subm_cnt_dblks(u_int32_t a)
{
if (_rd_subm_cnt_dblks + a > _wr_subm_cnt_dblks)
{
std::ostringstream oss;
oss << "pfid=" << _pfid << " lfid=" << _lfid << " rd_subm_cnt_dblks=" << _rd_subm_cnt_dblks << " incr=" << a;
oss << " wr_subm_cnt_dblks=" << _wr_subm_cnt_dblks;
throw jexception(jerrno::JERR_FCNTL_RDOFFSOVFL, oss.str(), "fcntl", "add_rd_subm_cnt_dblks");
}
_rd_subm_cnt_dblks += a;
return _rd_subm_cnt_dblks;
}
u_int32_t
fcntl::add_rd_cmpl_cnt_dblks(u_int32_t a)
{
if (_rd_cmpl_cnt_dblks + a > _rd_subm_cnt_dblks)
{
std::ostringstream oss;
oss << "pfid=" << _pfid << " lfid=" << _lfid << " rd_cmpl_cnt_dblks=" << _rd_cmpl_cnt_dblks << " incr=" << a;
oss << " rd_subm_cnt_dblks=" << _rd_subm_cnt_dblks;
throw jexception(jerrno::JERR_FCNTL_CMPLOFFSOVFL, oss.str(), "fcntl", "add_rd_cmpl_cnt_dblks");
}
_rd_cmpl_cnt_dblks += a;
return _rd_cmpl_cnt_dblks;
}
u_int32_t
fcntl::add_wr_subm_cnt_dblks(u_int32_t a)
{
if (_wr_subm_cnt_dblks + a > _ffull_dblks) // Allow for file header
{
std::ostringstream oss;
oss << "pfid=" << _pfid << " lfid=" << _lfid << " wr_subm_cnt_dblks=" << _wr_subm_cnt_dblks << " incr=" << a;
oss << " fsize=" << _ffull_dblks << " dblks";
throw jexception(jerrno::JERR_FCNTL_FILEOFFSOVFL, oss.str(), "fcntl", "add_wr_subm_cnt_dblks");
}
_wr_subm_cnt_dblks += a;
return _wr_subm_cnt_dblks;
}
u_int32_t
fcntl::add_wr_cmpl_cnt_dblks(u_int32_t a)
{
if (_wr_cmpl_cnt_dblks + a > _wr_subm_cnt_dblks)
{
std::ostringstream oss;
oss << "pfid=" << _pfid << " lfid=" << _lfid << " wr_cmpl_cnt_dblks=" << _wr_cmpl_cnt_dblks << " incr=" << a;
oss << " wr_subm_cnt_dblks=" << _wr_subm_cnt_dblks;
throw jexception(jerrno::JERR_FCNTL_CMPLOFFSOVFL, oss.str(), "fcntl", "add_wr_cmpl_cnt_dblks");
}
_wr_cmpl_cnt_dblks += a;
return _wr_cmpl_cnt_dblks;
}
u_int16_t
fcntl::decr_aio_cnt()
{
if(_aio_cnt == 0)
{
std::ostringstream oss;
oss << "pfid=" << _pfid << " lfid=" << _lfid << " Decremented aio_cnt to below zero";
throw jexception(jerrno::JERR__UNDERFLOW, oss.str(), "fcntl", "decr_aio_cnt");
}
return --_aio_cnt;
}
// Debug function
const std::string
fcntl::status_str() const
{
std::ostringstream oss;
oss << "pfid=" << _pfid << " ws=" << _wr_subm_cnt_dblks << " wc=" << _wr_cmpl_cnt_dblks;
oss << " rs=" << _rd_subm_cnt_dblks << " rc=" << _rd_cmpl_cnt_dblks;
oss << " ec=" << _rec_enqcnt << " ac=" << _aio_cnt;
return oss.str();
}
// Protected functions
void
fcntl::initialize(const std::string& fbasename, const u_int16_t pfid, const u_int16_t lfid, const u_int32_t jfsize_sblks,
const rcvdat* const ro)
{
_pfid = pfid;
_lfid = lfid;
_fname = filename(fbasename, pfid);
#ifdef RHM_JOWRITE
// In test mode, only create file if it does not exist
struct stat s;
if (::stat(_fname.c_str(), &s))
{
#endif
if (ro) // Recovery initialization: set counters only
{
if (!ro->_jempty)
{
// For last file only, set write counters to end of last record (the
// continuation point); for all others, set to eof.
if (ro->_lfid == _pfid)
{
_wr_subm_cnt_dblks = ro->_eo/JRNL_DBLK_SIZE;
_wr_cmpl_cnt_dblks = ro->_eo/JRNL_DBLK_SIZE;
}
else
{
_wr_subm_cnt_dblks = _ffull_dblks;
_wr_cmpl_cnt_dblks = _ffull_dblks;
}
// Set the number of enqueued records for this file.
_rec_enqcnt = ro->_enq_cnt_list[_pfid];
}
}
else // Normal initialization: create empty journal files
create_jfile(jfsize_sblks);
#ifdef RHM_JOWRITE
}
#endif
}
std::string
fcntl::filename(const std::string& fbasename, const u_int16_t pfid)
{
std::ostringstream oss;
oss << fbasename << ".";
oss << std::setw(4) << std::setfill('0') << std::hex << pfid;
oss << "." << JRNL_DATA_EXTENSION;
return oss.str();
}
void
fcntl::clean_file(const u_int32_t jfsize_sblks)
{
// NOTE: The journal file size is always one sblock bigger than the specified journal
// file size, which is the data content size. The extra block is for the journal file
// header which precedes all data on each file and is exactly one sblock in size.
u_int32_t nsblks = jfsize_sblks + 1;
// TODO - look at more efficient alternatives to allocating a null block:
// 1. mmap() against /dev/zero, but can alignment for O_DIRECT be assured?
// 2. ftruncate(), but does this result in a sparse file? If so, then this is no good.
// Create temp null block for writing
const std::size_t sblksize = JRNL_DBLK_SIZE * JRNL_SBLK_SIZE;
void* nullbuf = 0;
// Allocate no more than 2MB (4096 sblks) as a null buffer
const u_int32_t nullbuffsize_sblks = nsblks > 4096 ? 4096 : nsblks;
const std::size_t nullbuffsize = nullbuffsize_sblks * sblksize;
if (::posix_memalign(&nullbuf, sblksize, nullbuffsize))
{
std::ostringstream oss;
oss << "posix_memalign() failed: size=" << nullbuffsize << " blk_size=" << sblksize;
oss << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR__MALLOC, oss.str(), "fcntl", "clean_file");
}
std::memset(nullbuf, 0, nullbuffsize);
int fh = ::open(_fname.c_str(), O_WRONLY | O_CREAT | O_DIRECT,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); // 0644 -rw-r--r--
if (fh < 0)
{
std::free(nullbuf);
std::ostringstream oss;
oss << "open() failed:" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_FCNTL_OPENWR, oss.str(), "fcntl", "clean_file");
}
while (nsblks > 0)
{
u_int32_t this_write_sblks = nsblks >= nullbuffsize_sblks ? nullbuffsize_sblks : nsblks;
if (::write(fh, nullbuf, this_write_sblks * sblksize) == -1)
{
::close(fh);
std::free(nullbuf);
std::ostringstream oss;
oss << "wr_size=" << (this_write_sblks * sblksize) << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_FCNTL_WRITE, oss.str(), "fcntl", "clean_file");
}
nsblks -= this_write_sblks;
}
// Clean up
std::free(nullbuf);
if (::close(fh))
{
std::ostringstream oss;
oss << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_FCNTL_CLOSE, oss.str(), "fcntl", "clean_file");
}
}
void
fcntl::create_jfile(const u_int32_t jfsize_sblks)
{
clean_file(jfsize_sblks);
}
} // namespace journal
} // namespace mrg