| /* |
| * The MIT License (MIT) |
| * |
| * Copyright (c) 2015 Microsoft Corporation |
| * |
| * -=- Robust Distributed System Nucleus (rDSN) -=- |
| * |
| * Permission is hereby granted, free of charge, to any person obtaining a copy |
| * of this software and associated documentation files (the "Software"), to deal |
| * in the Software without restriction, including without limitation the rights |
| * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| * copies of the Software, and to permit persons to whom the Software is |
| * furnished to do so, subject to the following conditions: |
| * |
| * The above copyright notice and this permission notice shall be included in |
| * all copies or substantial portions of the Software. |
| * |
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| * THE SOFTWARE. |
| */ |
| |
| #include "log_file.h" |
| |
| #include <inttypes.h> |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <utility> |
| #include <vector> |
| |
| #include "aio/file_io.h" |
| #include "log_file_stream.h" |
| #include "replica/log_block.h" |
| #include "replica/mutation.h" |
| #include "utils/binary_reader.h" |
| #include "utils/binary_writer.h" |
| #include "utils/blob.h" |
| #include "utils/crc.h" |
| #include "utils/env.h" |
| #include "utils/filesystem.h" |
| #include "utils/fmt_logging.h" |
| #include "utils/latency_tracer.h" |
| #include "utils/ports.h" |
| #include "utils/strings.h" |
| |
| namespace dsn { |
| class disk_file; |
| class task_tracker; |
| |
| namespace replication { |
| |
| log_file::~log_file() { close(); } |
| /*static */ log_file_ptr log_file::open_read(const char *path, /*out*/ error_code &err) |
| { |
| char splitters[] = {'\\', '/', 0}; |
| std::string name = utils::get_last_component(std::string(path), splitters); |
| |
| // log.index.start_offset |
| if (name.length() < strlen("log.") || name.substr(0, strlen("log.")) != std::string("log.")) { |
| err = ERR_INVALID_PARAMETERS; |
| LOG_WARNING("invalid log path {}", path); |
| return nullptr; |
| } |
| |
| auto pos = name.find_first_of('.'); |
| CHECK(pos != std::string::npos, "invalid log_file, name = {}", name); |
| auto pos2 = name.find_first_of('.', pos + 1); |
| if (pos2 == std::string::npos) { |
| err = ERR_INVALID_PARAMETERS; |
| LOG_WARNING("invalid log path {}", path); |
| return nullptr; |
| } |
| |
| /* so the log file format is log.index_str.start_offset_str */ |
| std::string index_str = name.substr(pos + 1, pos2 - pos - 1); |
| std::string start_offset_str = name.substr(pos2 + 1); |
| if (index_str.empty() || start_offset_str.empty()) { |
| err = ERR_INVALID_PARAMETERS; |
| LOG_WARNING("invalid log path {}", path); |
| return nullptr; |
| } |
| |
| char *p = nullptr; |
| int index = static_cast<int>(strtol(index_str.c_str(), &p, 10)); |
| if (*p != 0) { |
| err = ERR_INVALID_PARAMETERS; |
| LOG_WARNING("invalid log path {}", path); |
| return nullptr; |
| } |
| int64_t start_offset = static_cast<int64_t>(strtoll(start_offset_str.c_str(), &p, 10)); |
| if (*p != 0) { |
| err = ERR_INVALID_PARAMETERS; |
| LOG_WARNING("invalid log path {}", path); |
| return nullptr; |
| } |
| |
| disk_file *hfile = file::open(path, file::FileOpenType::kReadOnly); |
| if (!hfile) { |
| err = ERR_FILE_OPERATION_FAILED; |
| LOG_WARNING("open log file {} failed", path); |
| return nullptr; |
| } |
| |
| auto lf = new log_file(path, hfile, index, start_offset, true); |
| lf->reset_stream(); |
| blob hdr_blob; |
| err = lf->read_next_log_block(hdr_blob); |
| if (err == ERR_INVALID_DATA || err == ERR_INCOMPLETE_DATA || err == ERR_HANDLE_EOF || |
| err == ERR_FILE_OPERATION_FAILED) { |
| std::string removed = std::string(path) + ".removed"; |
| LOG_ERROR("read first log entry of file {} failed, err = {}. Rename the file to {}", |
| path, |
| err, |
| removed); |
| delete lf; |
| lf = nullptr; |
| |
| // rename file on failure |
| dsn::utils::filesystem::rename_path(path, removed); |
| |
| return nullptr; |
| } |
| |
| binary_reader reader(std::move(hdr_blob)); |
| lf->read_file_header(reader); |
| if (!lf->is_right_header()) { |
| std::string removed = std::string(path) + ".removed"; |
| LOG_ERROR("invalid log file header of file {}. Rename the file to {}", path, removed); |
| delete lf; |
| lf = nullptr; |
| |
| // rename file on failure |
| dsn::utils::filesystem::rename_path(path, removed); |
| |
| err = ERR_INVALID_DATA; |
| return nullptr; |
| } |
| |
| err = ERR_OK; |
| return lf; |
| } |
| |
| /*static*/ log_file_ptr log_file::create_write(const char *dir, int index, int64_t start_offset) |
| { |
| char path[512]; |
| sprintf(path, "%s/log.%d.%" PRId64, dir, index, start_offset); |
| |
| if (dsn::utils::filesystem::path_exists(std::string(path))) { |
| LOG_WARNING("log file {} already exist", path); |
| return nullptr; |
| } |
| |
| disk_file *hfile = file::open(path, file::FileOpenType::kWriteOnly); |
| if (!hfile) { |
| LOG_WARNING("create log {} failed", path); |
| return nullptr; |
| } |
| |
| return new log_file(path, hfile, index, start_offset, false); |
| } |
| |
| log_file::log_file( |
| const char *path, disk_file *handle, int index, int64_t start_offset, bool is_read) |
| : _crc32(0), |
| _start_offset(start_offset), |
| _end_offset(start_offset), |
| _handle(handle), |
| _is_read(is_read), |
| _path(path), |
| _index(index), |
| _last_write_time(0) |
| { |
| memset(&_header, 0, sizeof(_header)); |
| |
| if (is_read) { |
| int64_t sz; |
| CHECK(dsn::utils::filesystem::file_size(_path, dsn::utils::FileDataType::kSensitive, sz), |
| "fail to get file size of {}.", |
| _path); |
| _end_offset += sz; |
| } |
| } |
| |
| void log_file::close() |
| { |
| zauto_lock lock(_write_lock); |
| |
| //_stream implicitly refer to _handle so it needs to be cleaned up first. |
| // TODO: We need better abstraction to avoid those manual stuffs.. |
| _stream.reset(nullptr); |
| if (_handle) { |
| error_code err = file::close(_handle); |
| CHECK_EQ_MSG(err, ERR_OK, "file::close failed"); |
| |
| _handle = nullptr; |
| } |
| } |
| |
| void log_file::flush() const |
| { |
| CHECK(!_is_read, "log file must be of write mode"); |
| zauto_lock lock(_write_lock); |
| |
| if (_handle) { |
| error_code err = file::flush(_handle); |
| CHECK_EQ_MSG(err, ERR_OK, "file::flush failed"); |
| } |
| } |
| |
| error_code log_file::read_next_log_block(/*out*/ ::dsn::blob &bb) |
| { |
| CHECK(_is_read, "log file must be of read mode"); |
| auto err = _stream->read_next(sizeof(log_block_header), bb); |
| if (err != ERR_OK || bb.length() != sizeof(log_block_header)) { |
| if (err == ERR_OK || err == ERR_HANDLE_EOF) { |
| // if read_count is 0, then we meet the end of file |
| err = (bb.length() == 0 ? ERR_HANDLE_EOF : ERR_INCOMPLETE_DATA); |
| } else { |
| LOG_ERROR("read data block header failed, size = {} vs {}, err = {}", |
| bb.length(), |
| sizeof(log_block_header), |
| err); |
| } |
| |
| return err; |
| } |
| log_block_header hdr = *reinterpret_cast<const log_block_header *>(bb.data()); |
| |
| if (hdr.magic != 0xdeadbeef) { |
| LOG_ERROR("invalid data header magic: {:#x}", static_cast<uint32_t>(hdr.magic)); |
| return ERR_INVALID_DATA; |
| } |
| |
| err = _stream->read_next(hdr.length, bb); |
| if (err != ERR_OK || hdr.length != bb.length()) { |
| LOG_ERROR( |
| "read data block body failed, size = {} vs {}, err = {}", bb.length(), hdr.length, err); |
| |
| if (err == ERR_OK || err == ERR_HANDLE_EOF) { |
| // because already read log_block_header above, so here must be imcomplete data |
| err = ERR_INCOMPLETE_DATA; |
| } |
| |
| return err; |
| } |
| |
| auto crc = dsn::utils::crc32_calc( |
| static_cast<const void *>(bb.data()), static_cast<size_t>(hdr.length), _crc32); |
| if (crc != hdr.body_crc) { |
| LOG_ERROR("crc checking failed"); |
| return ERR_INVALID_DATA; |
| } |
| _crc32 = crc; |
| |
| return ERR_OK; |
| } |
| |
| aio_task_ptr log_file::commit_log_block(log_block &block, |
| int64_t offset, |
| dsn::task_code evt, |
| dsn::task_tracker *tracker, |
| aio_handler &&callback, |
| int hash) |
| { |
| log_appender pending(offset, block); |
| return commit_log_blocks(pending, evt, tracker, std::move(callback), hash); |
| } |
| |
| aio_task_ptr log_file::commit_log_blocks(log_appender &pending, |
| dsn::task_code evt, |
| dsn::task_tracker *tracker, |
| aio_handler &&callback, |
| int hash) |
| { |
| CHECK(!_is_read, "log file must be of write mode"); |
| CHECK_GT(pending.size(), 0); |
| |
| zauto_lock lock(_write_lock); |
| if (!_handle) { |
| return nullptr; |
| } |
| |
| auto size = (long long)pending.size(); |
| size_t vec_size = pending.blob_count(); |
| std::vector<dsn_file_buffer_t> buffer_vector(vec_size); |
| int buffer_idx = 0; |
| for (log_block &block : pending.all_blocks()) { |
| int64_t local_offset = block.start_offset() - start_offset(); |
| auto hdr = reinterpret_cast<log_block_header *>(const_cast<char *>(block.front().data())); |
| |
| CHECK_EQ(hdr->magic, 0xdeadbeef); |
| hdr->local_offset = local_offset; |
| hdr->length = static_cast<int32_t>(block.size() - sizeof(log_block_header)); |
| hdr->body_crc = _crc32; |
| |
| for (int i = 0; i < block.data().size(); i++) { |
| auto &blk = block.data()[i]; |
| buffer_vector[buffer_idx].buffer = static_cast<void *>(const_cast<char *>(blk.data())); |
| buffer_vector[buffer_idx].size = blk.length(); |
| |
| // skip block header |
| if (i > 0) { |
| hdr->body_crc = dsn::utils::crc32_calc(static_cast<const void *>(blk.data()), |
| static_cast<size_t>(blk.length()), |
| hdr->body_crc); |
| } |
| buffer_idx++; |
| } |
| _crc32 = hdr->body_crc; |
| } |
| |
| aio_task_ptr tsk; |
| int64_t local_offset = pending.start_offset() - start_offset(); |
| if (callback) { |
| tsk = file::write_vector(_handle, |
| buffer_vector.data(), |
| vec_size, |
| static_cast<uint64_t>(local_offset), |
| evt, |
| tracker, |
| std::forward<aio_handler>(callback), |
| hash); |
| } else { |
| tsk = file::write_vector(_handle, |
| buffer_vector.data(), |
| vec_size, |
| static_cast<uint64_t>(local_offset), |
| evt, |
| tracker, |
| nullptr, |
| hash); |
| } |
| |
| if (dsn_unlikely(utils::FLAGS_enable_latency_tracer)) { |
| tsk->_tracer->set_parent_point_name("commit_pending_mutations"); |
| tsk->_tracer->set_description("log"); |
| for (const auto &mutation : pending.mutations()) { |
| mutation->_tracer->add_sub_tracer(tsk->_tracer); |
| } |
| } |
| |
| _end_offset.fetch_add(size); |
| return tsk; |
| } |
| |
| void log_file::reset_stream(size_t offset /*default = 0*/) |
| { |
| if (_stream == nullptr) { |
| _stream.reset(new file_streamer(_handle, offset)); |
| } else { |
| _stream->reset(offset); |
| } |
| if (offset == 0) { |
| _crc32 = 0; |
| } |
| } |
| |
| decree log_file::previous_log_max_decree(const dsn::gpid &pid) const |
| { |
| auto it = _previous_log_max_decrees.find(pid); |
| return it == _previous_log_max_decrees.end() ? 0 : it->second.max_decree; |
| } |
| |
| int log_file::read_file_header(binary_reader &reader) |
| { |
| /* |
| * the log file header structure: |
| * log_file_header + |
| * count + count * (gpid + replica_log_info) |
| */ |
| reader.read_pod(_header); |
| |
| int count = 0; |
| reader.read(count); |
| for (int i = 0; i < count; i++) { |
| gpid gpid; |
| replica_log_info info; |
| |
| reader.read_pod(gpid); |
| reader.read_pod(info); |
| |
| _previous_log_max_decrees[gpid] = info; |
| } |
| |
| return get_file_header_size(); |
| } |
| |
| int log_file::get_file_header_size() const |
| { |
| int count = static_cast<int>(_previous_log_max_decrees.size()); |
| return static_cast<int>(sizeof(log_file_header) + sizeof(count) + |
| (sizeof(gpid) + sizeof(replica_log_info)) * count); |
| } |
| |
| bool log_file::is_right_header() const |
| { |
| return _header.magic == 0xdeadbeef && _header.start_global_offset == _start_offset; |
| } |
| |
| int log_file::write_file_header(binary_writer &writer, const replica_log_info_map &init_max_decrees) |
| { |
| /* |
| * the log file header structure: |
| * log_file_header + |
| * count + count * (gpid + replica_log_info) |
| */ |
| _previous_log_max_decrees = init_max_decrees; |
| |
| _header.magic = 0xdeadbeef; |
| _header.version = 0x1; |
| _header.start_global_offset = start_offset(); |
| |
| writer.write_pod(_header); |
| |
| int count = static_cast<int>(_previous_log_max_decrees.size()); |
| writer.write(count); |
| for (auto &kv : _previous_log_max_decrees) { |
| writer.write_pod(kv.first); |
| writer.write_pod(kv.second); |
| } |
| |
| return get_file_header_size(); |
| } |
| |
| } // namespace replication |
| } // namespace dsn |