| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "olap/file_helper.h" |
| |
| #include <stdio.h> |
| #include <sys/stat.h> |
| |
| #include <string> |
| #include <vector> |
| |
| #include <errno.h> |
| |
| #include "olap/olap_common.h" |
| #include "olap/olap_define.h" |
| #include "olap/utils.h" |
| #include "util/debug_util.h" |
| |
| using std::string; |
| |
| namespace doris { |
| |
| Cache* FileHandler::_s_fd_cache; |
| |
| FileHandler::FileHandler() : |
| _fd(-1), |
| _wr_length(0), |
| _file_name(""), |
| _is_using_cache(false), |
| _cache_handle(NULL) { |
| } |
| |
| FileHandler::~FileHandler() { |
| this->close(); |
| } |
| |
| OLAPStatus FileHandler::open(const string& file_name, int flag) { |
| if (_fd != -1 && _file_name == file_name) { |
| return OLAP_SUCCESS; |
| } |
| |
| if (OLAP_SUCCESS != this->close()) { |
| return OLAP_ERR_IO_ERROR; |
| } |
| |
| _fd = ::open(file_name.c_str(), flag); |
| |
| if (_fd < 0) { |
| char errmsg[64]; |
| LOG(WARNING) << "failed to open file. [err=" << strerror_r(errno, errmsg, 64) |
| << ", file_name='" << file_name << "' flag=" << flag << "]"; |
| if (errno == EEXIST) { |
| return OLAP_ERR_FILE_ALREADY_EXIST; |
| } |
| return OLAP_ERR_IO_ERROR; |
| } |
| |
| VLOG(3) << "success to open file. file_name=" << file_name |
| << ", mode=" << flag << " fd=" << _fd; |
| _is_using_cache = false; |
| _file_name = file_name; |
| return OLAP_SUCCESS; |
| } |
| |
| OLAPStatus FileHandler::open_with_cache(const string& file_name, int flag) { |
| if (_fd != -1 && _file_name == file_name) { |
| return OLAP_SUCCESS; |
| } |
| |
| if (OLAP_SUCCESS != this->close()) { |
| return OLAP_ERR_IO_ERROR; |
| } |
| |
| CacheKey key(file_name.c_str(), file_name.size()); |
| Cache* fd_cache = get_fd_cache(); |
| _cache_handle = fd_cache->lookup(key); |
| if (NULL != _cache_handle) { |
| FileDescriptor* file_desc = |
| reinterpret_cast<FileDescriptor*>(fd_cache->value(_cache_handle)); |
| _fd = file_desc->fd; |
| VLOG(3) << "success to open file with cache. file_name=" << file_name |
| << ", mode=" << flag << " fd=" << _fd; |
| } else { |
| _fd = ::open(file_name.c_str(), flag); |
| if (_fd < 0) { |
| char errmsg[64]; |
| LOG(WARNING) << "failed to open file. [err=" << strerror_r(errno, errmsg, 64) |
| << " file_name='" << file_name << "' flag=" << flag << "]"; |
| if (errno == EEXIST) { |
| return OLAP_ERR_FILE_ALREADY_EXIST; |
| } |
| return OLAP_ERR_IO_ERROR; |
| } |
| FileDescriptor* file_desc = new FileDescriptor(_fd); |
| _cache_handle = fd_cache->insert( |
| key, file_desc, 1, |
| &_delete_cache_file_descriptor); |
| VLOG(3) << "success to open file with cache. " |
| << "file_name=" << file_name |
| << ", mode=" << flag << ", fd=" << _fd; |
| } |
| _is_using_cache = true; |
| _file_name = file_name; |
| return OLAP_SUCCESS; |
| } |
| |
| OLAPStatus FileHandler::open_with_mode(const string& file_name, int flag, int mode) { |
| if (_fd != -1 && _file_name == file_name) { |
| return OLAP_SUCCESS; |
| } |
| |
| if (OLAP_SUCCESS != this->close()) { |
| return OLAP_ERR_IO_ERROR; |
| } |
| |
| _fd = ::open(file_name.c_str(), flag, mode); |
| |
| if (_fd < 0) { |
| char err_buf[64]; |
| LOG(WARNING) << "failed to open file. [err=" << strerror_r(errno, err_buf, 64) |
| << " file_name='" << file_name |
| << "' flag=" << flag |
| << " mode=" << mode << "]"; |
| if (errno == EEXIST) { |
| return OLAP_ERR_FILE_ALREADY_EXIST; |
| } |
| return OLAP_ERR_IO_ERROR; |
| } |
| |
| VLOG(3) << "success to open file. file_name=" << file_name |
| << ", mode=" << mode << ", fd=" << _fd; |
| _file_name = file_name; |
| return OLAP_SUCCESS; |
| } |
| |
| OLAPStatus FileHandler::release() { |
| get_fd_cache()->release(_cache_handle); |
| _cache_handle = NULL; |
| _is_using_cache = false; |
| return OLAP_SUCCESS; |
| } |
| |
| OLAPStatus FileHandler::close() { |
| if (_fd < 0) { |
| return OLAP_SUCCESS; |
| } |
| |
| if (_is_using_cache) { |
| release(); |
| } else { |
| // try to sync page cache if have written some bytes |
| if (_wr_length > 0) { |
| posix_fadvise(_fd, 0, 0, POSIX_FADV_DONTNEED); |
| // Clean dirty pages and wait for io queue empty and return |
| sync_file_range(_fd, 0, 0, SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER); |
| _wr_length = 0; |
| } |
| |
| // 在一些极端情况下(fd可用,但fsync失败)可能造成句柄泄漏 |
| if (::close(_fd) < 0) { |
| char errmsg[64]; |
| LOG(WARNING) << "failed to close file. [err= " << strerror_r(errno, errmsg, 64) |
| << " file_name='" << _file_name << "' fd=" << _fd << "]"; |
| return OLAP_ERR_IO_ERROR; |
| } |
| } |
| |
| VLOG(3) << "finished to close file. " |
| << "file_name=" << _file_name << ", fd=" << _fd; |
| _fd = -1; |
| _file_name = ""; |
| _wr_length = 0; |
| return OLAP_SUCCESS; |
| } |
| |
| OLAPStatus FileHandler::pread(void* buf, size_t size, size_t offset) { |
| char* ptr = reinterpret_cast<char*>(buf); |
| |
| while (size > 0) { |
| ssize_t rd_size = ::pread(_fd, ptr, size, offset); |
| |
| if (rd_size < 0) { |
| char errmsg[64]; |
| LOG(WARNING) << "failed to pread from file. [err= " << strerror_r(errno, errmsg, 64) |
| << " file_name='" << _file_name << "' fd=" << _fd << " size=" << size |
| << " offset=" << offset << "]"; |
| return OLAP_ERR_IO_ERROR; |
| } else if (0 == rd_size) { |
| char errmsg[64]; |
| LOG(WARNING) << "read unenough from file. [err= " << strerror_r(errno, errmsg, 64) |
| << " file_name='" << _file_name << "' fd=" << _fd << " size=" << size |
| << " offset=" << offset << "]"; |
| return OLAP_ERR_READ_UNENOUGH; |
| } |
| |
| size -= rd_size; |
| offset += rd_size; |
| ptr += rd_size; |
| } |
| |
| return OLAP_SUCCESS; |
| } |
| |
| OLAPStatus FileHandler::write(const void* buf, size_t buf_size) { |
| |
| size_t org_buf_size = buf_size; |
| const char* ptr = reinterpret_cast<const char*>(buf); |
| while (buf_size > 0) { |
| ssize_t wr_size = ::write(_fd, ptr, buf_size); |
| |
| if (wr_size < 0) { |
| char errmsg[64]; |
| LOG(WARNING) << "failed to write to file. [err= " << strerror_r(errno, errmsg, 64) |
| << " file_name='" << _file_name << "' fd=" << _fd |
| << " size=" << buf_size << "]"; |
| return OLAP_ERR_IO_ERROR; |
| } else if (0 == wr_size) { |
| char errmsg[64]; |
| LOG(WARNING) << "write unenough to file. [err=" << strerror_r(errno, errmsg, 64) |
| << " file_name='" << _file_name << "' fd=" << _fd |
| << " size=" << buf_size << "]"; |
| return OLAP_ERR_IO_ERROR; |
| } |
| |
| buf_size -= wr_size; |
| ptr += wr_size; |
| } |
| _wr_length += org_buf_size; |
| // try to sync page cache if cache size is bigger than threshold |
| if (_wr_length >= _cache_threshold) { |
| posix_fadvise(_fd, 0, 0, POSIX_FADV_DONTNEED); |
| // Clean dirty pages and wait for io queue empty and return |
| sync_file_range(_fd, 0, 0, SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER); |
| _wr_length = 0; |
| } |
| return OLAP_SUCCESS; |
| } |
| |
| OLAPStatus FileHandler::pwrite(const void* buf, size_t buf_size, size_t offset) { |
| const char* ptr = reinterpret_cast<const char*>(buf); |
| |
| size_t org_buf_size = buf_size; |
| while (buf_size > 0) { |
| ssize_t wr_size = ::pwrite(_fd, ptr, buf_size, offset); |
| |
| if (wr_size < 0) { |
| char errmsg[64]; |
| LOG(WARNING) << "failed to pwrite to file. [err= " << strerror_r(errno, errmsg, 64) |
| << " file_name='" << _file_name << "' fd=" << _fd << " size=" << buf_size |
| << " offset=" << offset << "]"; |
| return OLAP_ERR_IO_ERROR; |
| } else if (0 == wr_size) { |
| char errmsg[64]; |
| LOG(WARNING) << "pwrite unenough to file. [err= " << strerror_r(errno, errmsg, 64) |
| << " file_name='" << _file_name << "' fd=" << _fd << " size=" << buf_size |
| << " offset=" << offset << "]"; |
| return OLAP_ERR_IO_ERROR; |
| } |
| |
| buf_size -= wr_size; |
| ptr += wr_size; |
| offset += wr_size; |
| } |
| _wr_length += org_buf_size; |
| |
| return OLAP_SUCCESS; |
| } |
| |
| off_t FileHandler::length() const { |
| struct stat stat_data; |
| |
| if (fstat(_fd, &stat_data) < 0) { |
| OLAP_LOG_WARNING("fstat error. [fd=%d]", _fd); |
| return -1; |
| } |
| |
| return stat_data.st_size; |
| } |
| |
| FileHandlerWithBuf::FileHandlerWithBuf() : |
| _fp(NULL), |
| _file_name("") { |
| } |
| |
| FileHandlerWithBuf::~FileHandlerWithBuf() { |
| this->close(); |
| } |
| |
| OLAPStatus FileHandlerWithBuf::open(const string& file_name, const char* mode) { |
| if (_fp != NULL && _file_name == file_name) { |
| return OLAP_SUCCESS; |
| } |
| |
| if (OLAP_SUCCESS != this->close()) { |
| return OLAP_ERR_IO_ERROR; |
| } |
| |
| _fp = ::fopen(file_name.c_str(), mode); |
| |
| if (NULL == _fp) { |
| char errmsg[64]; |
| LOG(WARNING) << "failed to open file. [err= " << strerror_r(errno, errmsg, 64) |
| << " file_name='" << file_name << "' flag='" << mode << "']"; |
| if (errno == EEXIST) { |
| return OLAP_ERR_FILE_ALREADY_EXIST; |
| } |
| return OLAP_ERR_IO_ERROR; |
| } |
| |
| VLOG(3) << "success to open file. " |
| << "file_name=" << file_name << ", mode=" << mode; |
| _file_name = file_name; |
| return OLAP_SUCCESS; |
| } |
| |
| OLAPStatus FileHandlerWithBuf::open_with_mode(const string& file_name, const char* mode) { |
| return this->open(file_name, mode); |
| } |
| |
| OLAPStatus FileHandlerWithBuf::close() { |
| if (NULL == _fp) { |
| return OLAP_SUCCESS; |
| } |
| |
| // 在一些极端情况下(fd可用,但fsync失败)可能造成句柄泄漏 |
| if (::fclose(_fp) != 0) { |
| char errmsg[64]; |
| LOG(WARNING) << "failed to close file. [err= " << strerror_r(errno, errmsg, 64) |
| << " file_name='" << _file_name << "']"; |
| return OLAP_ERR_IO_ERROR; |
| } |
| |
| _fp = NULL; |
| _file_name = ""; |
| return OLAP_SUCCESS; |
| } |
| |
| OLAPStatus FileHandlerWithBuf::read(void* buf, size_t size) { |
| if (OLAP_UNLIKELY(NULL == _fp)) { |
| OLAP_LOG_WARNING("Fail to write, fp is NULL!"); |
| return OLAP_ERR_NOT_INITED; |
| } |
| |
| size_t rd_size = ::fread(buf, 1, size, _fp); |
| |
| if (rd_size == size) { |
| return OLAP_SUCCESS; |
| } else if (::feof(_fp)) { |
| char errmsg[64]; |
| LOG(WARNING) << "read unenough from file. [err=" << strerror_r(errno, errmsg, 64) |
| << " file_name='" << _file_name << "' size=" << size |
| << " rd_size=" << rd_size << "]"; |
| return OLAP_ERR_READ_UNENOUGH; |
| } else { |
| char errmsg[64]; |
| LOG(WARNING) << "failed to read from file. [err=" << strerror_r(errno, errmsg, 64) |
| << " file_name='" << _file_name << "' size=" << size << "]"; |
| return OLAP_ERR_IO_ERROR; |
| } |
| } |
| |
| OLAPStatus FileHandlerWithBuf::pread(void* buf, size_t size, size_t offset) { |
| if (OLAP_UNLIKELY(NULL == _fp)) { |
| OLAP_LOG_WARNING("Fail to write, fp is NULL!"); |
| return OLAP_ERR_NOT_INITED; |
| } |
| |
| if (0 != ::fseek(_fp, offset, SEEK_SET)) { |
| char errmsg[64]; |
| LOG(WARNING) << "failed to seek file. [err= " << strerror_r(errno, errmsg, 64) |
| << " file_name='" << _file_name << "' size=" << size |
| << " offset=" << offset << "]"; |
| return OLAP_ERR_IO_ERROR; |
| } |
| |
| return this->read(buf, size); |
| } |
| |
| OLAPStatus FileHandlerWithBuf::write(const void* buf, size_t buf_size) { |
| if (OLAP_UNLIKELY(NULL == _fp)) { |
| OLAP_LOG_WARNING("Fail to write, fp is NULL!"); |
| return OLAP_ERR_NOT_INITED; |
| } |
| |
| size_t wr_size = ::fwrite(buf, 1, buf_size, _fp); |
| |
| if (wr_size != buf_size) { |
| char errmsg[64]; |
| LOG(WARNING) << "failed to write to file. [err= " << strerror_r(errno, errmsg, 64) |
| << " file_name='" << _file_name << "' size=" << buf_size << "]"; |
| return OLAP_ERR_IO_ERROR; |
| } |
| |
| return OLAP_SUCCESS; |
| } |
| |
| OLAPStatus FileHandlerWithBuf::pwrite(const void* buf, size_t buf_size, size_t offset) { |
| if (OLAP_UNLIKELY(NULL == _fp)) { |
| OLAP_LOG_WARNING("Fail to write, fp is NULL!"); |
| return OLAP_ERR_NOT_INITED; |
| } |
| |
| if (0 != ::fseek(_fp, offset, SEEK_SET)) { |
| char errmsg[64]; |
| LOG(WARNING) << "failed to seek file. [err= " << strerror_r(errno, errmsg, 64) |
| << " file_name='" << _file_name << "' size=" << buf_size |
| << " offset=" << offset << "]"; |
| return OLAP_ERR_IO_ERROR; |
| } |
| |
| return this->write(buf, buf_size); |
| } |
| |
| off_t FileHandlerWithBuf::length() const { |
| struct stat stat_data; |
| |
| if (stat(_file_name.c_str(), &stat_data) < 0) { |
| LOG(WARNING) << "fstat error. [file_name='" << _file_name << "']"; |
| return -1; |
| } |
| |
| return stat_data.st_size; |
| } |
| |
| } // namespace doris |