blob: 2a79bf8735d2a27c856331d1cbe905b6a7b44d79 [file]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "EmptyFilePool.h"
#include <fstream>
#include "qpid/linearstore/journal/EmptyFilePoolPartition.h"
#include "qpid/linearstore/journal/jdir.h"
#include "qpid/linearstore/journal/JournalLog.h"
#include "qpid/linearstore/journal/slock.h"
#include "qpid/linearstore/journal/utils/file_hdr.h"
#include "qpid/types/Uuid.h"
#include <sys/stat.h>
#include <unistd.h>
#include <vector>
namespace qpid {
namespace linearstore {
namespace journal {
#define FHDR_BUFF_SIZE (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB) * 1024
#define ZERO_BUFF_SIZE QLS_SBLK_SIZE_KIB * 1024
// Static declarations
std::string EmptyFilePool::s_inuseFileDirectory_ = "in_use";
std::string EmptyFilePool::s_returnedFileDirectory_ = "returned";
size_t EmptyFilePool::s_fhdr_buff_size_ = FHDR_BUFF_SIZE;
unsigned char EmptyFilePool::s_fhdr_buff_[FHDR_BUFF_SIZE];
smutex EmptyFilePool::s_fhdr_buff_mutex_;
size_t EmptyFilePool::s_zero_buff_size_ = ZERO_BUFF_SIZE;
unsigned char EmptyFilePool::s_zero_buff_[ZERO_BUFF_SIZE];
bool EmptyFilePool::s_static_initializer_flag_ = initializeStaticBuffers();
EmptyFilePool::EmptyFilePool(const std::string& efpDirectory,
const EmptyFilePoolPartition* partitionPtr,
const bool overwriteBeforeReturnFlag,
const bool truncateFlag,
JournalLog& journalLogRef) :
efpDirectory_(efpDirectory),
efpDataSize_kib_(dataSizeFromDirName_kib(efpDirectory, partitionPtr->getPartitionNumber())),
partitionPtr_(partitionPtr),
overwriteBeforeReturnFlag_(overwriteBeforeReturnFlag),
truncateFlag_(truncateFlag),
journalLogRef_(journalLogRef)
{}
EmptyFilePool::~EmptyFilePool() {}
void EmptyFilePool::initialize() {
if (::mkdir(efpDirectory_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH)) { // Create EFP dir if it does not yet exist
if (errno != EEXIST) {
std::ostringstream oss;
oss << "directory=" << efpDirectory_ << " " << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_EFP_MKDIR, oss.str(), "EmptyFilePool", "initialize");
}
}
// Process empty files in main dir
std::vector<std::string> dirList;
jdir::read_dir(efpDirectory_, dirList, false, true, false, false);
for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
size_t dotPos = i->rfind(".");
if (dotPos != std::string::npos) {
if (i->substr(dotPos).compare(".jrnl") == 0 && i->length() == 41) {
std::string emptyFileName(efpDirectory_ + "/" + (*i));
if (validateEmptyFile(emptyFileName)) {
pushEmptyFile(emptyFileName);
}
}
}
}
// Create 'in_use' and 'returned' subdirs if they don't already exist
// Return files to EFP in 'in_use' and 'returned' subdirs if they do exist
initializeSubDirectory(efpDirectory_ + "/" + s_inuseFileDirectory_);
initializeSubDirectory(efpDirectory_ + "/" + s_returnedFileDirectory_);
}
efpDataSize_kib_t EmptyFilePool::dataSize_kib() const {
return efpDataSize_kib_;
}
efpFileSize_kib_t EmptyFilePool::fileSize_kib() const {
return efpDataSize_kib_ + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB);
}
efpDataSize_sblks_t EmptyFilePool::dataSize_sblks() const {
return efpDataSize_kib_ / QLS_SBLK_SIZE_KIB;
}
efpFileSize_sblks_t EmptyFilePool::fileSize_sblks() const {
return (efpDataSize_kib_ / QLS_SBLK_SIZE_KIB) + QLS_JRNL_FHDR_RES_SIZE_SBLKS;
}
efpFileCount_t EmptyFilePool::numEmptyFiles() const {
slock l(emptyFileListMutex_);
return efpFileCount_t(emptyFileList_.size());
}
efpDataSize_kib_t EmptyFilePool::cumFileSize_kib() const {
slock l(emptyFileListMutex_);
return efpDataSize_kib_t(emptyFileList_.size()) * efpDataSize_kib_;
}
efpPartitionNumber_t EmptyFilePool::getPartitionNumber() const {
return partitionPtr_->getPartitionNumber();
}
const EmptyFilePoolPartition* EmptyFilePool::getPartition() const {
return partitionPtr_;
}
const efpIdentity_t EmptyFilePool::getIdentity() const {
return efpIdentity_t(partitionPtr_->getPartitionNumber(), efpDataSize_kib_);
}
std::string EmptyFilePool::takeEmptyFile(const std::string& destDirectory) {
std::string emptyFileName = popEmptyFile();
std::string newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/'
std::string symlinkName = destDirectory + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/'
if (!moveFile(emptyFileName, newFileName)) {
// Try again with new UUID for file name
newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + "/" + getEfpFileName();
if (!moveFile(emptyFileName, newFileName)) {
//std::cerr << "*** DEBUG: pushEmptyFile " << emptyFileName << "from EmptyFilePool::takeEmptyFile()" << std::endl; // DEBUG
pushEmptyFile(emptyFileName); // Return empty file to pool
std::ostringstream oss;
oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "takeEmptyFile");
}
}
if (createSymLink(newFileName, symlinkName)) {
std::ostringstream oss;
oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\" symlink=\"" << symlinkName << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", "takeEmptyFile");
}
return symlinkName;
}
void EmptyFilePool::returnEmptyFileSymlink(const std::string& emptyFileSymlink) {
if (isFile(emptyFileSymlink)) {
returnEmptyFile(emptyFileSymlink);
} else if(isSymlink(emptyFileSymlink)) {
returnEmptyFile(deleteSymlink(emptyFileSymlink));
} else {
std::ostringstream oss;
oss << "File \"" << emptyFileSymlink << "\" is neither a file nor a symlink";
throw jexception(jerrno::JERR_EFP_BADFILETYPE, oss.str(), "EmptyFilePool", "returnEmptyFileSymlink");
}
}
//static
std::string EmptyFilePool::dirNameFromDataSize(const efpDataSize_kib_t efpDataSize_kib) {
std::ostringstream oss;
oss << efpDataSize_kib << "k";
return oss.str();
}
// static
efpDataSize_kib_t EmptyFilePool::dataSizeFromDirName_kib(const std::string& dirName,
const efpPartitionNumber_t partitionNumber) {
// Check for dirName format 'NNNk', where NNN is a number, convert NNN into an integer. NNN cannot be 0.
std::string n(dirName.substr(dirName.rfind('/')+1));
bool valid = true;
for (uint16_t charNum = 0; charNum < n.length(); ++charNum) {
if (charNum < n.length()-1) {
if (!::isdigit((int)n[charNum])) {
valid = false;
break;
}
} else {
valid = n[charNum] == 'k';
}
}
efpDataSize_kib_t s = ::atol(n.c_str());
if (!valid || s == 0 || s % QLS_SBLK_SIZE_KIB != 0) {
std::ostringstream oss;
oss << "Partition: " << partitionNumber << "; EFP directory: \'" << dirName << "\'";
throw jexception(jerrno::JERR_EFP_BADEFPDIRNAME, oss.str(), "EmptyFilePool", "fileSizeKbFromDirName");
}
return s;
}
// --- protected functions ---
void EmptyFilePool::checkIosState(const int io_errno,
std::ofstream& ofs,
const uint32_t jerrno,
const std::string& fqFileName,
const std::string& operation,
const std::string& errorMessage,
const std::string& className,
const std::string& fnName) {
if (!ofs.good()) {
if (ofs.is_open()) {
ofs.close();
}
std::ostringstream oss;
oss << "IO failure: eofbit=" << (ofs.eof()?"T":"F") << " failbit=" << (ofs.fail()?"T":"F") << " badbit="
<< (ofs.bad()?"T":"F") << " file=" << fqFileName << FORMAT_SYSERR(io_errno) << ") operation="
<< operation << ": " << errorMessage;
throw jexception(jerrno, oss.str(), className, fnName);
}
}
std::string EmptyFilePool::createEmptyFile() {
std::string efpfn = getEfpFileName();
overwriteFileContents(efpfn);
return efpfn;
}
std::string EmptyFilePool::getEfpFileName() {
qpid::types::Uuid uuid(true);
std::ostringstream oss;
oss << efpDirectory_ << "/" << uuid << QLS_JRNL_FILE_EXTENSION;
return oss.str();
}
void EmptyFilePool::initializeSubDirectory(const std::string& fqDirName) {
std::vector<std::string> dirList;
if (jdir::exists(fqDirName)) {
if (truncateFlag_) {
jdir::read_dir(fqDirName, dirList, false, true, false, false);
for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
size_t dotPos = i->rfind(".");
if (i->substr(dotPos).compare(".jrnl") == 0 && i->length() == 41) {
returnEmptyFile(fqDirName + "/" + (*i));
} else {
std::ostringstream oss;
oss << "File \'" << *i << "\' was not a journal file and was not returned to EFP.";
journalLogRef_.log(JournalLog::LOG_WARN, oss.str());
}
}
}
} else {
jdir::create_dir(fqDirName);
}
}
void EmptyFilePool::overwriteFileContents(const std::string& fqFileName) {
FILE* pFile;
pFile = ::fopen(fqFileName.c_str(), "wb");
{
slock l(s_fhdr_buff_mutex_);
// Initialize file header
::file_hdr_create((::file_hdr_t*)s_fhdr_buff_,
QLS_FILE_MAGIC,
QLS_JRNL_VERSION,
QLS_JRNL_FHDR_RES_SIZE_SBLKS,
partitionPtr_->getPartitionNumber(),
efpDataSize_kib_);
// Write file header
::fwrite((void*)s_fhdr_buff_, 1, s_fhdr_buff_size_, pFile);
}
// Fill rest of file with zeros (buffer is 1 sblk in size)
for (efpDataSize_sblks_t i = 0; i < dataSize_sblks(); ++i) {
::fwrite((void*)s_zero_buff_, 1, s_zero_buff_size_, pFile);
}
::fclose(pFile);
}
std::string EmptyFilePool::popEmptyFile() {
std::string emptyFileName;
bool listEmptyFlag;
{
slock l(emptyFileListMutex_);
listEmptyFlag = emptyFileList_.empty();
if (!listEmptyFlag) {
emptyFileName = emptyFileList_.front();
emptyFileList_.pop_front();
}
}
// If the list is empty, create a new file and return the file name.
if (listEmptyFlag) {
emptyFileName = createEmptyFile();
}
return emptyFileName;
}
void EmptyFilePool::pushEmptyFile(const std::string fqFileName) {
slock l(emptyFileListMutex_);
emptyFileList_.push_back(fqFileName);
}
void EmptyFilePool::returnEmptyFile(const std::string& emptyFileName) {
std::string returnedFileName = efpDirectory_ + "/" + s_returnedFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/'
if (!moveFile(emptyFileName, returnedFileName)) {
::unlink(emptyFileName.c_str());
//std::cerr << "*** WARNING: Unable to move file " << emptyFileName << " to " << returnedFileName << "; deleted." << std::endl; // DEBUG
}
// TODO: On a separate thread, process returned files by overwriting headers and, optionally, their contents and
// returning them to the EFP directory
resetEmptyFileHeader(returnedFileName);
if (overwriteBeforeReturnFlag_) {
overwriteFileContents(returnedFileName);
}
std::string sanitizedEmptyFileName = efpDirectory_ + returnedFileName.substr(returnedFileName.rfind('/')); // NOTE: substr() includes leading '/'
if (!moveFile(returnedFileName, sanitizedEmptyFileName)) {
::unlink(returnedFileName.c_str());
//std::cerr << "*** WARNING: Unable to move file " << returnedFileName << " to " << sanitizedEmptyFileName << "; deleted." << std::endl; // DEBUG
} else {
pushEmptyFile(sanitizedEmptyFileName);
}
}
void EmptyFilePool::resetEmptyFileHeader(const std::string& fqFileName) {
std::fstream fs(fqFileName.c_str(), std::fstream::in | std::fstream::out | std::fstream::binary);
if (fs.good()) {
const std::streamsize buffsize = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES;
char buff[buffsize];
fs.read((char*)buff, buffsize);
std::streampos bytesRead = fs.tellg();
if (std::streamoff(bytesRead) == buffsize) {
::file_hdr_reset((::file_hdr_t*)buff);
// set rest of buffer to 0
::memset(buff + sizeof(::file_hdr_t), 0, MAX_FILE_HDR_LEN - sizeof(::file_hdr_t));
fs.seekp(0, std::fstream::beg);
fs.write(buff, buffsize);
std::streampos bytesWritten = fs.tellp();
if (std::streamoff(bytesWritten) != buffsize) {
//std::cerr << "*** ERROR: Unable to write file header of file \"" << fqFileName << "\": tried to write " << buffsize << " bytes; wrote " << bytesWritten << " bytes." << std::endl; // DEBUG
}
} else {
//std::cerr << "*** ERROR: Unable to read file header of file \"" << fqFileName << "\": tried to read " << sizeof(::file_hdr_t) << " bytes; read " << bytesRead << " bytes." << std::endl; // DEBUG
}
fs.close();
} else {
//std::cerr << "*** ERROR: Unable to open file \"" << fqFileName << "\" for reading" << std::endl; // DEBUG
}
}
bool EmptyFilePool::validateEmptyFile(const std::string& emptyFileName) const {
std::ostringstream oss;
struct stat s;
if (::stat(emptyFileName.c_str(), &s))
{
oss << "stat: file=\"" << emptyFileName << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "EmptyFilePool", "validateEmptyFile");
}
// Size matches pool
efpDataSize_kib_t expectedSize = (QLS_SBLK_SIZE_KIB + efpDataSize_kib_) * 1024;
if ((efpDataSize_kib_t)s.st_size != expectedSize) {
oss << "ERROR: File " << emptyFileName << ": Incorrect size: Expected=" << expectedSize
<< "; actual=" << s.st_size;
journalLogRef_.log(JournalLog::LOG_ERROR, oss.str());
return false;
}
// Open file and read header
std::fstream fs(emptyFileName.c_str(), std::fstream::in | std::fstream::out | std::fstream::binary);
if (!fs) {
oss << "ERROR: File " << emptyFileName << ": Unable to open for reading";
journalLogRef_.log(JournalLog::LOG_ERROR, oss.str());
return false;
}
const std::streamsize buffsize = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES;
char buff[buffsize];
fs.read((char*)buff, buffsize);
std::streampos bytesRead = fs.tellg();
if (std::streamoff(bytesRead) != buffsize) {
oss << "ERROR: Unable to read file header of file \"" << emptyFileName << "\": tried to read "
<< buffsize << " bytes; read " << bytesRead << " bytes";
journalLogRef_.log(JournalLog::LOG_ERROR, oss.str());
fs.close();
return false;
}
// Check file header
::file_hdr_t* fhp = (::file_hdr_t*)buff;
const bool jrnlMagicError = fhp->_rhdr._magic != QLS_FILE_MAGIC;
const bool jrnlVersionError = fhp->_rhdr._version != QLS_JRNL_VERSION;
const bool jrnlPartitionError = fhp->_efp_partition != partitionPtr_->getPartitionNumber();
const bool jrnlFileSizeError = fhp->_data_size_kib != efpDataSize_kib_;
if (jrnlMagicError || jrnlVersionError || jrnlPartitionError || jrnlFileSizeError)
{
oss << "ERROR: File " << emptyFileName << ": Invalid file header - mismatched header fields: " <<
(jrnlMagicError ? "magic " : "") <<
(jrnlVersionError ? "version " : "") <<
(jrnlPartitionError ? "partition" : "") <<
(jrnlFileSizeError ? "file-size" : "");
journalLogRef_.log(JournalLog::LOG_ERROR, oss.str());
fs.close();
return false;
}
// Check file header is reset
if (!::is_file_hdr_reset(fhp)) {
::file_hdr_reset(fhp);
::memset(buff + sizeof(::file_hdr_t), 0, MAX_FILE_HDR_LEN - sizeof(::file_hdr_t)); // set rest of buffer to 0
fs.seekp(0, std::fstream::beg);
fs.write(buff, buffsize);
std::streampos bytesWritten = fs.tellp();
if (std::streamoff(bytesWritten) != buffsize) {
oss << "ERROR: Unable to write file header of file \"" << emptyFileName << "\": tried to write "
<< buffsize << " bytes; wrote " << bytesWritten << " bytes";
journalLogRef_.log(JournalLog::LOG_ERROR, oss.str());
fs.close();
return false;
}
oss << "WARNING: File " << emptyFileName << ": File header not reset";
journalLogRef_.log(JournalLog::LOG_WARN, oss.str());
}
// Close file
fs.close();
return true;
}
//static
int EmptyFilePool::createSymLink(const std::string& fqFileName,
const std::string& fqLinkName) {
if(::symlink(fqFileName.c_str(), fqLinkName.c_str())) {
if (errno == EEXIST) return errno; // File name exists
std::ostringstream oss;
oss << "file=\"" << fqFileName << "\" symlink=\"" << fqLinkName << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", "createSymLink");
}
return 0;
}
//static
std::string EmptyFilePool::deleteSymlink(const std::string& fqLinkName) {
char buff[1024];
ssize_t len = ::readlink(fqLinkName.c_str(), buff, 1024);
if (len < 0) {
std::ostringstream oss;
oss << "symlink=\"" << fqLinkName << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", "deleteSymlink");
}
::unlink(fqLinkName.c_str());
return std::string(buff, len);
}
//static
bool EmptyFilePool::isFile(const std::string& fqName) {
struct stat buff;
if (::lstat(fqName.c_str(), &buff)) {
std::ostringstream oss;
oss << "lstat file=\"" << fqName << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_EFP_LSTAT, oss.str(), "EmptyFilePool", "isFile");
}
return S_ISREG(buff.st_mode);
}
//static
bool EmptyFilePool::isSymlink(const std::string& fqName) {
struct stat buff;
if (::lstat(fqName.c_str(), &buff)) {
std::ostringstream oss;
oss << "lstat file=\"" << fqName << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_EFP_LSTAT, oss.str(), "EmptyFilePool", "isSymlink");
}
return S_ISLNK(buff.st_mode);
}
// static
bool EmptyFilePool::moveFile(const std::string& from,
const std::string& to) {
if (::rename(from.c_str(), to.c_str())) {
if (errno == EEXIST) {
return false; // File name exists
}
std::ostringstream oss;
oss << "file=\"" << from << "\" dest=\"" << to << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile");
}
return true;
}
//static
bool EmptyFilePool::initializeStaticBuffers() {
// Overwrite buffers with zeros
::memset(s_fhdr_buff_, 0, s_fhdr_buff_size_);
::memset(s_zero_buff_, 0, s_zero_buff_size_);
return true;
}
}}}