blob: e9570acc64cb8073ca5c99eebb7584f1033a8053 [file] [log] [blame]
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "mutation_log.h"
#include <fmt/core.h>
#include <algorithm>
#include <cstdint>
#include <ctime>
#include <iterator>
#include <list>
#include <unordered_map>
#include "aio/aio_task.h"
#include "aio/file_io.h"
#include "common/replication.codes.h"
#include "consensus_types.h"
#include "mutation_log_utils.h"
#include "perf_counter/perf_counter.h"
#include "perf_counter/perf_counter_wrapper.h"
#include "replica.h"
#include "replica/log_block.h"
#include "replica/log_file.h"
#include "replica/mutation.h"
#include "runtime/api_layer1.h"
#include "utils/binary_writer.h"
#include "utils/blob.h"
#include "utils/defer.h"
#include "utils/filesystem.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/latency_tracer.h"
#include "utils/ports.h"
namespace dsn {
namespace replication {
DSN_DEFINE_bool(replication,
plog_force_flush,
false,
"when write private log, whether to flush file after write done");
::dsn::task_ptr mutation_log_shared::append(mutation_ptr &mu,
dsn::task_code callback_code,
dsn::task_tracker *tracker,
aio_handler &&callback,
int hash,
int64_t *pending_size)
{
auto d = mu->data.header.decree;
::dsn::aio_task_ptr cb =
callback ? file::create_aio_task(
callback_code, tracker, std::forward<aio_handler>(callback), hash)
: nullptr;
_slock.lock();
ADD_POINT(mu->_tracer);
// init pending buffer
if (nullptr == _pending_write) {
_pending_write = std::make_shared<log_appender>(mark_new_offset(0, true).second);
}
_pending_write->append_mutation(mu, cb);
// update meta
update_max_decree(mu->data.header.pid, d);
// start to write if possible
if (!_is_writing.load(std::memory_order_acquire)) {
write_pending_mutations(true);
if (pending_size) {
*pending_size = 0;
}
} else {
if (pending_size) {
*pending_size = _pending_write->size();
}
_slock.unlock();
}
return cb;
}
void mutation_log_shared::flush() { flush_internal(-1); }
void mutation_log_shared::flush_once() { flush_internal(1); }
void mutation_log_shared::flush_internal(int max_count)
{
int count = 0;
while (max_count <= 0 || count < max_count) {
if (_is_writing.load(std::memory_order_acquire)) {
_tracker.wait_outstanding_tasks();
} else {
_slock.lock();
if (_is_writing.load(std::memory_order_acquire)) {
_slock.unlock();
continue;
}
if (!_pending_write) {
// !_is_writing && !_pending_write, means flush done
_slock.unlock();
break;
}
// !_is_writing && _pending_write, start next write
write_pending_mutations(true);
count++;
}
}
}
void mutation_log_shared::write_pending_mutations(bool release_lock_required)
{
CHECK(release_lock_required, "lock must be hold at this point");
CHECK(!_is_writing.load(std::memory_order_relaxed), "");
CHECK_NOTNULL(_pending_write, "");
CHECK_GT(_pending_write->size(), 0);
auto pr = mark_new_offset(_pending_write->size(), false);
CHECK_EQ(pr.second, _pending_write->start_offset());
_is_writing.store(true, std::memory_order_release);
// move or reset pending variables
auto pending = std::move(_pending_write);
// seperate commit_log_block from within the lock
_slock.unlock();
commit_pending_mutations(pr.first, pending);
}
void mutation_log_shared::commit_pending_mutations(log_file_ptr &lf,
std::shared_ptr<log_appender> &pending)
{
if (utils::FLAGS_enable_latency_tracer) {
for (auto &mu : pending->mutations()) {
ADD_POINT(mu->_tracer);
}
}
lf->commit_log_blocks( // forces a new line for params
*pending,
LPC_WRITE_REPLICATION_LOG_SHARED,
&_tracker,
[this, lf, pending](error_code err, size_t sz) mutable {
CHECK(_is_writing.load(std::memory_order_relaxed), "");
if (utils::FLAGS_enable_latency_tracer) {
for (auto &mu : pending->mutations()) {
ADD_CUSTOM_POINT(mu->_tracer, "commit_pending_completed");
}
}
for (auto &block : pending->all_blocks()) {
auto hdr = (log_block_header *)block.front().data();
CHECK_EQ(hdr->magic, 0xdeadbeef);
}
if (err == ERR_OK) {
CHECK_EQ(sz, pending->size());
if (_force_flush) {
// flush to ensure that shared log data synced to disk
//
// FIXME : the file could have been closed
lf->flush();
}
if (_write_size_counter) {
(*_write_size_counter)->add(sz);
}
} else {
LOG_ERROR("write shared log failed, err = {}", err);
}
// here we use _is_writing instead of _issued_write.expired() to check writing done,
// because the following callbacks may run before "block" released, which may cause
// the next init_prepare() not starting the write.
_is_writing.store(false, std::memory_order_relaxed);
// notify the callbacks
// ATTENTION: callback may be called before this code block executed done.
for (auto &c : pending->callbacks()) {
c->enqueue(err, sz);
}
// start to write next if possible
if (err == ERR_OK) {
_slock.lock();
if (!_is_writing.load(std::memory_order_acquire) && _pending_write) {
write_pending_mutations(true);
} else {
_slock.unlock();
}
}
},
0);
}
////////////////////////////////////////////////////
mutation_log_private::mutation_log_private(const std::string &dir,
int32_t max_log_file_mb,
gpid gpid,
replica *r)
: mutation_log(dir, max_log_file_mb, gpid, r), replica_base(r)
{
mutation_log_private::init_states();
}
::dsn::task_ptr mutation_log_private::append(mutation_ptr &mu,
dsn::task_code callback_code,
dsn::task_tracker *tracker,
aio_handler &&callback,
int hash,
int64_t *pending_size)
{
dsn::aio_task_ptr cb =
callback ? file::create_aio_task(
callback_code, tracker, std::forward<aio_handler>(callback), hash)
: nullptr;
_plock.lock();
ADD_POINT(mu->_tracer);
// init pending buffer
if (nullptr == _pending_write) {
_pending_write = std::make_unique<log_appender>(mark_new_offset(0, true).second);
}
_pending_write->append_mutation(mu, cb);
// update meta
_pending_write_max_commit =
std::max(_pending_write_max_commit, mu->data.header.last_committed_decree);
_pending_write_max_decree = std::max(_pending_write_max_decree, mu->data.header.decree);
// start to write if possible
if (!_is_writing.load(std::memory_order_acquire)) {
write_pending_mutations(true);
if (pending_size) {
*pending_size = 0;
}
} else {
if (pending_size) {
*pending_size = _pending_write->size();
}
_plock.unlock();
}
return cb;
}
bool mutation_log_private::get_learn_state_in_memory(decree start_decree,
binary_writer &writer) const
{
std::shared_ptr<log_appender> issued_write;
mutations pending_mutations;
{
zauto_lock l(_plock);
issued_write = _issued_write.lock();
if (_pending_write) {
pending_mutations = _pending_write->mutations();
}
}
int learned_count = 0;
if (issued_write) {
for (auto &mu : issued_write->mutations()) {
if (mu->get_decree() >= start_decree) {
mu->write_to(writer, nullptr);
learned_count++;
}
}
}
for (auto &mu : pending_mutations) {
if (mu->get_decree() >= start_decree) {
mu->write_to(writer, nullptr);
learned_count++;
}
}
return learned_count > 0;
}
void mutation_log_private::get_in_memory_mutations(decree start_decree,
ballot start_ballot,
std::vector<mutation_ptr> &mutation_list) const
{
std::shared_ptr<log_appender> issued_write;
mutations pending_mutations;
{
zauto_lock l(_plock);
issued_write = _issued_write.lock();
if (_pending_write) {
pending_mutations = _pending_write->mutations();
}
}
if (issued_write) {
for (auto &mu : issued_write->mutations()) {
// if start_ballot is invalid or equal to mu.ballot, check decree
// otherwise check ballot
ballot current_ballot =
(start_ballot == invalid_ballot) ? invalid_ballot : mu->get_ballot();
if ((mu->get_decree() >= start_decree && start_ballot == current_ballot) ||
current_ballot > start_ballot) {
mutation_list.push_back(mutation::copy_no_reply(mu));
}
}
}
for (auto &mu : pending_mutations) {
// if start_ballot is invalid or equal to mu.ballot, check decree
// otherwise check ballot
ballot current_ballot =
(start_ballot == invalid_ballot) ? invalid_ballot : mu->get_ballot();
if ((mu->get_decree() >= start_decree && start_ballot == current_ballot) ||
current_ballot > start_ballot) {
mutation_list.push_back(mutation::copy_no_reply(mu));
}
}
}
void mutation_log_private::flush() { flush_internal(-1); }
void mutation_log_private::flush_once() { flush_internal(1); }
void mutation_log_private::flush_internal(int max_count)
{
int count = 0;
while (max_count <= 0 || count < max_count) {
if (_is_writing.load(std::memory_order_acquire)) {
_tracker.wait_outstanding_tasks();
} else {
_plock.lock();
if (_is_writing.load(std::memory_order_acquire)) {
_plock.unlock();
continue;
}
if (!_pending_write) {
// !_is_writing && !_pending_write, means flush done
_plock.unlock();
break;
}
// !_is_writing && _pending_write, start next write
write_pending_mutations(true);
count++;
}
}
}
void mutation_log_private::init_states()
{
mutation_log::init_states();
_is_writing.store(false, std::memory_order_release);
_issued_write.reset();
_pending_write = nullptr;
_pending_write_max_commit = 0;
_pending_write_max_decree = 0;
}
void mutation_log_private::write_pending_mutations(bool release_lock_required)
{
CHECK(release_lock_required, "lock must be hold at this point");
CHECK(!_is_writing.load(std::memory_order_relaxed), "");
CHECK_NOTNULL(_pending_write, "");
CHECK_GT(_pending_write->size(), 0);
auto pr = mark_new_offset(_pending_write->size(), false);
CHECK_EQ_PREFIX(pr.second, _pending_write->start_offset());
_is_writing.store(true, std::memory_order_release);
update_max_decree(_private_gpid, _pending_write_max_decree);
// move or reset pending variables
std::shared_ptr<log_appender> pending = std::move(_pending_write);
_issued_write = pending;
decree max_commit = _pending_write_max_commit;
_pending_write_max_commit = 0;
_pending_write_max_decree = 0;
// Free plog from lock during committing log block, in the meantime
// new mutations can still be appended.
_plock.unlock();
commit_pending_mutations(pr.first, pending, max_commit);
}
void mutation_log_private::commit_pending_mutations(log_file_ptr &lf,
std::shared_ptr<log_appender> &pending,
decree max_commit)
{
if (dsn_unlikely(utils::FLAGS_enable_latency_tracer)) {
for (const auto &mu : pending->mutations()) {
ADD_POINT(mu->_tracer);
}
}
lf->commit_log_blocks(*pending,
LPC_WRITE_REPLICATION_LOG_PRIVATE,
&_tracker,
[this, lf, pending, max_commit](error_code err, size_t sz) mutable {
CHECK(_is_writing.load(std::memory_order_relaxed), "");
for (auto &block : pending->all_blocks()) {
auto hdr = (log_block_header *)block.front().data();
CHECK_EQ(hdr->magic, 0xdeadbeef);
}
if (dsn_unlikely(utils::FLAGS_enable_latency_tracer)) {
for (const auto &mu : pending->mutations()) {
ADD_CUSTOM_POINT(mu->_tracer, "commit_pending_completed");
}
}
// notify the callbacks
// ATTENTION: callback may be called before this code block executed
// done.
for (auto &c : pending->callbacks()) {
c->enqueue(err, sz);
}
if (err != ERR_OK) {
LOG_ERROR("write private log failed, err = {}", err);
_is_writing.store(false, std::memory_order_relaxed);
if (_io_error_callback) {
_io_error_callback(err);
}
return;
}
CHECK_EQ(sz, pending->size());
// flush to ensure that there is no gap between private log and
// in-memory buffer
// so that we can get all mutations in learning process.
//
// FIXME : the file could have been closed
if (FLAGS_plog_force_flush) {
lf->flush();
}
// update _private_max_commit_on_disk after written into log file done
update_max_commit_on_disk(max_commit);
_is_writing.store(false, std::memory_order_relaxed);
// start to write if possible
_plock.lock();
if (!_is_writing.load(std::memory_order_acquire) && _pending_write) {
write_pending_mutations(true);
} else {
_plock.unlock();
}
},
get_gpid().thread_hash());
}
///////////////////////////////////////////////////////////////
mutation_log::mutation_log(const std::string &dir, int32_t max_log_file_mb, gpid gpid, replica *r)
{
_dir = dir;
_is_private = (gpid.value() != 0);
_max_log_file_size_in_bytes = static_cast<int64_t>(max_log_file_mb) * 1024L * 1024L;
_min_log_file_size_in_bytes = _max_log_file_size_in_bytes / 10;
_owner_replica = r;
_private_gpid = gpid;
if (r) {
CHECK_EQ(_private_gpid, r->get_gpid());
}
mutation_log::init_states();
}
void mutation_log::init_states()
{
_is_opened = false;
_switch_file_hint = false;
_switch_file_demand = false;
// logs
_last_file_index = 0;
_log_files.clear();
_current_log_file = nullptr;
_global_start_offset = 0;
_global_end_offset = 0;
// replica states
_shared_log_info_map.clear();
_private_log_info = {0, 0};
_private_max_commit_on_disk = 0;
}
error_code mutation_log::open(replay_callback read_callback,
io_failure_callback write_error_callback)
{
std::map<gpid, decree> replay_condition;
return open(read_callback, write_error_callback, replay_condition);
}
error_code mutation_log::open(replay_callback read_callback,
io_failure_callback write_error_callback,
const std::map<gpid, decree> &replay_condition)
{
CHECK(!_is_opened, "cannot open an opened mutation_log");
CHECK_NULL(_current_log_file, "");
// create dir if necessary
if (!dsn::utils::filesystem::path_exists(_dir)) {
if (!dsn::utils::filesystem::create_directory(_dir)) {
LOG_ERROR("open mutation_log: create log path failed");
return ERR_FILE_OPERATION_FAILED;
}
}
// load the existing logs
_log_files.clear();
_io_error_callback = write_error_callback;
std::vector<std::string> file_list;
if (!dsn::utils::filesystem::get_subfiles(_dir, file_list, false)) {
LOG_ERROR("open mutation_log: get subfiles failed.");
return ERR_FILE_OPERATION_FAILED;
}
if (nullptr == read_callback) {
CHECK(file_list.empty(), "");
}
std::sort(file_list.begin(), file_list.end());
error_code err = ERR_OK;
for (auto &fpath : file_list) {
log_file_ptr log = log_file::open_read(fpath.c_str(), err);
if (log == nullptr) {
if (err == ERR_HANDLE_EOF || err == ERR_INCOMPLETE_DATA ||
err == ERR_INVALID_PARAMETERS) {
LOG_WARNING("skip file {} during log init, err = {}", fpath, err);
continue;
}
return err;
}
if (_is_private) {
LOG_INFO("open private log {} succeed, start_offset = {}, end_offset = {}, size = "
"{}, previous_max_decree = {}",
fpath,
log->start_offset(),
log->end_offset(),
log->end_offset() - log->start_offset(),
log->previous_log_max_decree(_private_gpid));
} else {
LOG_INFO("open shared log {} succeed, start_offset = {}, end_offset = {}, size = {}",
fpath,
log->start_offset(),
log->end_offset(),
log->end_offset() - log->start_offset());
}
CHECK(_log_files.find(log->index()) == _log_files.end(),
"invalid log_index, index = {}",
log->index());
_log_files[log->index()] = log;
}
file_list.clear();
// filter useless log
log_file_map_by_index::iterator replay_begin = _log_files.begin();
log_file_map_by_index::iterator replay_end = _log_files.end();
if (!replay_condition.empty()) {
if (_is_private) {
auto find = replay_condition.find(_private_gpid);
CHECK(find != replay_condition.end(), "invalid gpid({})", _private_gpid);
for (auto it = _log_files.begin(); it != _log_files.end(); ++it) {
if (it->second->previous_log_max_decree(_private_gpid) <= find->second) {
// previous logs can be ignored
replay_begin = it;
} else {
break;
}
}
} else {
// find the largest file which can be ignored.
// after iterate, the 'mark_it' will point to the largest file which can be ignored.
log_file_map_by_index::reverse_iterator mark_it;
std::set<gpid> kickout_replicas;
replica_log_info_map max_decrees; // max_decrees for log file at mark_it.
for (mark_it = _log_files.rbegin(); mark_it != _log_files.rend(); ++mark_it) {
bool ignore_this = true;
if (mark_it == _log_files.rbegin()) {
// the last file should not be ignored
ignore_this = false;
}
if (ignore_this) {
for (auto &kv : replay_condition) {
if (kickout_replicas.find(kv.first) != kickout_replicas.end()) {
// no need to consider this replica
continue;
}
auto find = max_decrees.find(kv.first);
if (find == max_decrees.end() || find->second.max_decree <= kv.second) {
// can ignore for this replica
kickout_replicas.insert(kv.first);
} else {
ignore_this = false;
break;
}
}
}
if (ignore_this) {
// found the largest file which can be ignored
break;
}
// update max_decrees for the next log file
max_decrees = mark_it->second->previous_log_max_decrees();
}
if (mark_it != _log_files.rend()) {
// set replay_begin to the next position of mark_it.
replay_begin = _log_files.find(mark_it->first);
CHECK(replay_begin != _log_files.end(),
"invalid log_index, index = {}",
mark_it->first);
replay_begin++;
CHECK(replay_begin != _log_files.end(),
"invalid log_index, index = {}",
mark_it->first);
}
}
for (auto it = _log_files.begin(); it != replay_begin; it++) {
LOG_INFO("ignore log {}", it->second->path());
}
}
// replay with the found files
log_file_map_by_index replay_logs(replay_begin, replay_end);
int64_t end_offset = 0;
err = replay(
replay_logs,
[this, read_callback](int log_length, mutation_ptr &mu) {
bool ret = true;
if (read_callback) {
ret = read_callback(log_length,
mu); // actually replica::replay_mutation(mu, true|false);
}
if (ret) {
this->update_max_decree_no_lock(mu->data.header.pid, mu->data.header.decree);
if (this->_is_private) {
this->update_max_commit_on_disk_no_lock(mu->data.header.last_committed_decree);
}
}
return ret;
},
end_offset);
if (ERR_OK == err) {
_global_start_offset =
_log_files.size() > 0 ? _log_files.begin()->second->start_offset() : 0;
_global_end_offset = end_offset;
_last_file_index = _log_files.size() > 0 ? _log_files.rbegin()->first : 0;
_is_opened = true;
} else {
// clear
for (auto &kv : _log_files) {
kv.second->close();
}
_log_files.clear();
init_states();
}
return err;
}
void mutation_log::close()
{
{
zauto_lock l(_lock);
if (!_is_opened) {
return;
}
_is_opened = false;
}
LOG_DEBUG("close mutation log {}", dir());
// make all data is on disk
flush();
{
zauto_lock l(_lock);
// close current log file
if (nullptr != _current_log_file) {
_current_log_file->close();
_current_log_file = nullptr;
}
}
// reset all states
init_states();
}
error_code mutation_log::create_new_log_file()
{
// create file
uint64_t start = dsn_now_ns();
log_file_ptr logf =
log_file::create_write(_dir.c_str(), _last_file_index + 1, _global_end_offset);
if (logf == nullptr) {
LOG_ERROR("cannot create log file with index {}", _last_file_index + 1);
return ERR_FILE_OPERATION_FAILED;
}
CHECK_EQ(logf->end_offset(), logf->start_offset());
CHECK_EQ(_global_end_offset, logf->end_offset());
LOG_INFO(
"create new log file {} succeed, time_used = {} ns", logf->path(), dsn_now_ns() - start);
// update states
_last_file_index++;
CHECK(_log_files.find(_last_file_index) == _log_files.end(),
"invalid log_offset, offset = {}",
_last_file_index);
_log_files[_last_file_index] = logf;
// switch the current log file
// the old log file may be hold by _log_files or aio_task
_current_log_file = logf;
// create new pending buffer because we need write file header
// write file header into pending buffer
size_t header_len = 0;
binary_writer temp_writer;
if (_is_private) {
replica_log_info_map ds;
ds[_private_gpid] =
replica_log_info(_private_log_info.max_decree, _private_log_info.valid_start_offset);
header_len = logf->write_file_header(temp_writer, ds);
} else {
header_len = logf->write_file_header(temp_writer, _shared_log_info_map);
}
log_block *blk = new log_block();
blk->add(temp_writer.get_buffer());
_global_end_offset += blk->size();
logf->commit_log_block(*blk,
_current_log_file->start_offset(),
LPC_WRITE_REPLICATION_LOG_COMMON,
&_tracker,
[this, blk, logf](::dsn::error_code err, size_t sz) {
delete blk;
if (ERR_OK != err) {
LOG_ERROR(
"write mutation log file header failed, file = {}, err = {}",
logf->path(),
err);
CHECK(_io_error_callback, "");
_io_error_callback(err);
}
},
0);
CHECK_EQ_MSG(_global_end_offset,
_current_log_file->start_offset() + sizeof(log_block_header) + header_len,
"current log file start offset: {}, log_block_header size: {}, header_len: {}",
_current_log_file->start_offset(),
sizeof(log_block_header),
header_len);
return ERR_OK;
}
std::pair<log_file_ptr, int64_t> mutation_log::mark_new_offset(size_t size,
bool create_new_log_if_needed)
{
zauto_lock l(_lock);
if (create_new_log_if_needed) {
bool create_file = false;
if (_current_log_file == nullptr) {
create_file = true;
} else {
int64_t file_size = _global_end_offset - _current_log_file->start_offset();
const char *reason = nullptr;
if (_switch_file_demand) {
create_file = true;
reason = "demand";
} else if (file_size >= _max_log_file_size_in_bytes) {
create_file = true;
reason = "limit";
} else if (_switch_file_hint && file_size >= _min_log_file_size_in_bytes) {
create_file = true;
reason = "hint";
}
if (create_file) {
LOG_INFO("switch log file by {}, old_file = {}, size = {}",
reason,
_current_log_file->path(),
file_size);
}
}
if (create_file) {
auto ec = create_new_log_file();
CHECK_EQ_MSG(ec,
ERR_OK,
"{} create new log file failed",
_is_private ? _private_gpid.to_string() : "");
_switch_file_hint = false;
_switch_file_demand = false;
}
} else {
CHECK_NOTNULL(_current_log_file, "");
}
int64_t write_start_offset = _global_end_offset;
_global_end_offset += size;
return std::make_pair(_current_log_file, write_start_offset);
}
decree mutation_log::max_decree(gpid gpid) const
{
zauto_lock l(_lock);
if (_is_private) {
CHECK_EQ(gpid, _private_gpid);
return _private_log_info.max_decree;
} else {
const auto &it = _shared_log_info_map.find(gpid);
return it != _shared_log_info_map.end() ? it->second.max_decree : 0;
}
}
decree mutation_log::max_commit_on_disk() const
{
zauto_lock l(_lock);
CHECK(_is_private, "this method is only valid for private logs");
return _private_max_commit_on_disk;
}
decree mutation_log::max_gced_decree(gpid gpid) const
{
zauto_lock l(_lock);
return max_gced_decree_no_lock(gpid);
}
decree mutation_log::max_gced_decree_no_lock(gpid gpid) const
{
CHECK(_is_private, "");
decree result = invalid_decree;
for (auto &log : _log_files) {
auto it = log.second->previous_log_max_decrees().find(gpid);
if (it != log.second->previous_log_max_decrees().end()) {
if (result == invalid_decree) {
result = it->second.max_decree;
} else {
result = std::min(result, it->second.max_decree);
}
}
}
return result;
}
void mutation_log::check_valid_start_offset(gpid gpid, int64_t valid_start_offset) const
{
zauto_lock l(_lock);
if (_is_private) {
CHECK_EQ(valid_start_offset, _private_log_info.valid_start_offset);
} else {
auto it = _shared_log_info_map.find(gpid);
if (it != _shared_log_info_map.end()) {
CHECK_EQ(valid_start_offset, it->second.valid_start_offset);
}
}
}
int64_t mutation_log::total_size() const
{
zauto_lock l(_lock);
return total_size_no_lock();
}
int64_t mutation_log::total_size_no_lock() const
{
return _log_files.empty() ? 0 : _global_end_offset - _global_start_offset;
}
error_code mutation_log::reset_from(const std::string &dir,
replay_callback replay_error_callback,
io_failure_callback write_error_callback)
{
// Close for flushing current log and get ready to open new log files after reset.
close();
// Ensure that log files in `dir` (such as "/learn") are valid.
error_s es = log_utils::check_log_files_continuity(dir);
if (!es.is_ok()) {
LOG_ERROR("the log files of source dir {} are invalid: {}, will remove it", dir, es);
if (!utils::filesystem::remove_path(dir)) {
LOG_ERROR("remove source dir {} failed", dir);
return ERR_FILE_OPERATION_FAILED;
}
return es.code();
}
std::string temp_dir = fmt::format("{}.{}", _dir, dsn_now_ns());
if (!utils::filesystem::rename_path(_dir, temp_dir)) {
LOG_ERROR("rename current log dir {} to temp dir {} failed", _dir, temp_dir);
return ERR_FILE_OPERATION_FAILED;
}
LOG_INFO("rename current log dir {} to temp dir {}", _dir, temp_dir);
error_code err = ERR_OK;
// If successful, just remove temp dir; otherwise, rename temp dir back to current dir.
auto temp_dir_resolve = dsn::defer([this, temp_dir, &err]() {
if (err == ERR_OK) {
if (!dsn::utils::filesystem::remove_path(temp_dir)) {
// Removing temp dir failed is allowed, it's just garbage.
LOG_ERROR("remove temp dir {} failed", temp_dir);
}
} else {
// Once rollback failed, dir should be recovered manually in case data is lost.
CHECK(utils::filesystem::rename_path(temp_dir, _dir),
"rename temp dir {} back to current dir {} failed",
temp_dir,
_dir);
}
});
// Rename source dir to current dir.
if (!utils::filesystem::rename_path(dir, _dir)) {
LOG_ERROR("rename source dir {} to current dir {} failed", dir, _dir);
return err;
}
LOG_INFO("rename source dir {} to current dir {} successfully", dir, _dir);
auto dir_resolve = dsn::defer([this, dir, &err]() {
if (err != ERR_OK) {
CHECK(utils::filesystem::rename_path(_dir, dir),
"rename current dir {} back to source dir {} failed",
_dir,
dir);
}
});
// 1. Ensure that logs in current dir(such as "/plog") are valid and can be opened
// successfully;
// 2. Reopen, load and register new log files into replica;
// 3. Be sure that the old log files should have been closed.
err = open(replay_error_callback, write_error_callback);
if (err != ERR_OK) {
LOG_ERROR("the log files of current dir {} are invalid, thus open failed: {}", _dir, err);
}
return err;
}
void mutation_log::set_valid_start_offset_on_open(gpid gpid, int64_t valid_start_offset)
{
zauto_lock l(_lock);
if (_is_private) {
CHECK_EQ(gpid, _private_gpid);
_private_log_info.valid_start_offset = valid_start_offset;
} else {
_shared_log_info_map[gpid] = replica_log_info(0, valid_start_offset);
}
}
int64_t mutation_log::on_partition_reset(gpid gpid, decree max_decree)
{
zauto_lock l(_lock);
if (_is_private) {
CHECK_EQ(_private_gpid, gpid);
replica_log_info old_info = _private_log_info;
_private_log_info.max_decree = max_decree;
_private_log_info.valid_start_offset = _global_end_offset;
LOG_WARNING("replica {} has changed private log max_decree from {} to {}, "
"valid_start_offset from {} to {}",
gpid,
old_info.max_decree,
_private_log_info.max_decree,
old_info.valid_start_offset,
_private_log_info.valid_start_offset);
} else {
replica_log_info info(max_decree, _global_end_offset);
auto it = _shared_log_info_map.insert(replica_log_info_map::value_type(gpid, info));
if (!it.second) {
LOG_WARNING("replica {} has changed shared log max_decree from {} to {}, "
"valid_start_offset from {} to {} ",
gpid,
it.first->second.max_decree,
info.max_decree,
it.first->second.valid_start_offset,
info.valid_start_offset);
_shared_log_info_map[gpid] = info;
}
}
return _global_end_offset;
}
void mutation_log::on_partition_removed(gpid gpid)
{
CHECK(!_is_private, "this method is only valid for shared logs");
zauto_lock l(_lock);
_shared_log_info_map.erase(gpid);
}
void mutation_log::update_max_decree(gpid gpid, decree d)
{
zauto_lock l(_lock);
update_max_decree_no_lock(gpid, d);
}
void mutation_log::update_max_decree_no_lock(gpid gpid, decree d)
{
if (!_is_private) {
auto it = _shared_log_info_map.find(gpid);
if (it != _shared_log_info_map.end()) {
if (it->second.max_decree < d) {
it->second.max_decree = d;
}
} else {
CHECK(false, "replica has not been registered in the log before");
}
} else {
CHECK_EQ(gpid, _private_gpid);
if (d > _private_log_info.max_decree) {
_private_log_info.max_decree = d;
}
}
}
void mutation_log::update_max_commit_on_disk(decree d)
{
zauto_lock l(_lock);
update_max_commit_on_disk_no_lock(d);
}
void mutation_log::update_max_commit_on_disk_no_lock(decree d)
{
CHECK(_is_private, "this method is only valid for private logs");
if (d > _private_max_commit_on_disk) {
_private_max_commit_on_disk = d;
}
}
bool mutation_log::get_learn_state(gpid gpid, decree start, /*out*/ learn_state &state) const
{
CHECK(_is_private, "this method is only valid for private logs");
CHECK_EQ(_private_gpid, gpid);
binary_writer temp_writer;
if (get_learn_state_in_memory(start, temp_writer)) {
state.meta = temp_writer.get_buffer();
}
log_file_map_by_index files;
{
zauto_lock l(_lock);
if (state.meta.length() == 0 && start > _private_log_info.max_decree) {
// no memory data and no disk data
LOG_INFO("gpid({}) get_learn_state returns false"
"learn_start_decree={}, max_decree_in_private_log={}",
gpid,
start,
_private_log_info.max_decree);
return false;
}
files = _log_files;
}
// find all applicable files
bool skip_next = false;
std::list<std::string> learn_files;
log_file_ptr log;
decree last_max_decree = 0;
int learned_file_head_index = 0;
int learned_file_tail_index = 0;
int64_t learned_file_start_offset = 0;
for (auto itr = files.rbegin(); itr != files.rend(); ++itr) {
log = itr->second;
if (log->end_offset() <= _private_log_info.valid_start_offset)
break;
if (skip_next) {
skip_next = (log->previous_log_max_decrees().size() == 0);
continue;
}
if (log->end_offset() > log->start_offset()) {
// not empty file
learn_files.push_back(log->path());
if (learned_file_tail_index == 0)
learned_file_tail_index = log->index();
learned_file_head_index = log->index();
learned_file_start_offset = log->start_offset();
}
skip_next = (log->previous_log_max_decrees().size() == 0);
// continue checking as this file may be a fault
if (skip_next)
continue;
last_max_decree = log->previous_log_max_decrees().begin()->second.max_decree;
// when all possible decrees are not needed
if (last_max_decree < start) {
// skip all older logs
break;
}
}
// reverse the order, to make files ordered by index incrementally
state.files.reserve(learn_files.size());
for (auto it = learn_files.rbegin(); it != learn_files.rend(); ++it) {
state.files.push_back(*it);
}
bool ret = (learned_file_start_offset >= _private_log_info.valid_start_offset &&
last_max_decree > 0 && last_max_decree < start);
LOG_INFO("gpid({}) get_learn_state returns {}, private logs count {} ({} => {}), learned "
"files count {} ({} => {}): learned_file_start_offset({}) >= valid_start_offset({}) "
"&& last_max_decree({}) > 0 && last_max_decree({}) < learn_start_decree({})",
gpid,
ret ? "true" : "false",
files.size(),
files.empty() ? 0 : files.begin()->first,
files.empty() ? 0 : files.rbegin()->first,
learn_files.size(),
learned_file_head_index,
learned_file_tail_index,
learned_file_start_offset,
_private_log_info.valid_start_offset,
last_max_decree,
last_max_decree,
start);
return ret;
}
void mutation_log::get_parent_mutations_and_logs(gpid pid,
decree start_decree,
ballot start_ballot,
std::vector<mutation_ptr> &mutation_list,
std::vector<std::string> &files,
uint64_t &total_file_size) const
{
CHECK(_is_private, "this method is only valid for private logs");
CHECK_EQ(_private_gpid, pid);
mutation_list.clear();
files.clear();
total_file_size = 0;
get_in_memory_mutations(start_decree, start_ballot, mutation_list);
if (mutation_list.size() == 0 && start_decree > _private_log_info.max_decree) {
// no memory data and no disk data
return;
}
const auto &file_map = get_log_file_map();
bool skip_next = false;
std::list<std::string> learn_files;
decree last_max_decree = 0;
for (auto itr = file_map.rbegin(); itr != file_map.rend(); ++itr) {
const log_file_ptr &log = itr->second;
if (log->end_offset() <= _private_log_info.valid_start_offset)
break;
if (skip_next) {
skip_next = (log->previous_log_max_decrees().size() == 0);
continue;
}
if (log->end_offset() > log->start_offset()) {
// not empty file
learn_files.push_back(log->path());
total_file_size += (log->end_offset() - log->start_offset());
}
skip_next = (log->previous_log_max_decrees().size() == 0);
// continue checking as this file may be a fault
if (skip_next)
continue;
last_max_decree = log->previous_log_max_decrees().begin()->second.max_decree;
// when all possible decrees are not needed
if (last_max_decree < start_decree) {
// skip all older logs
break;
}
}
// reverse the order, to make files ordered by index incrementally
files.reserve(learn_files.size());
for (auto it = learn_files.rbegin(); it != learn_files.rend(); ++it) {
files.push_back(*it);
}
}
// return true if the file is covered by both reserve_max_size and reserve_max_time
static bool should_reserve_file(log_file_ptr log,
int64_t already_reserved_size,
int64_t reserve_max_size,
int64_t reserve_max_time)
{
if (reserve_max_size == 0 || reserve_max_time == 0)
return false;
int64_t file_size = log->end_offset() - log->start_offset();
if (already_reserved_size + file_size > reserve_max_size) {
// already exceed size limit, should not reserve
return false;
}
uint64_t file_last_write_time = log->last_write_time();
if (file_last_write_time == 0) {
time_t tm;
if (!dsn::utils::filesystem::last_write_time(log->path(), tm)) {
// get file last write time failed, reserve it for safety
LOG_WARNING("get last write time of file {} failed", log->path());
return true;
}
file_last_write_time = (uint64_t)tm;
log->set_last_write_time(file_last_write_time);
}
uint64_t current_time = dsn_now_ms() / 1000;
if (file_last_write_time + reserve_max_time < current_time) {
// already exceed time limit, should not reserve
return false;
}
// not exceed size and time limit, reserve it
return true;
}
int mutation_log::garbage_collection(gpid gpid,
decree cleanable_decree,
int64_t valid_start_offset,
int64_t reserve_max_size,
int64_t reserve_max_time)
{
CHECK(_is_private, "this method is only valid for private log");
log_file_map_by_index files;
decree max_decree = invalid_decree;
int current_file_index = -1;
{
zauto_lock l(_lock);
files = _log_files;
max_decree = _private_log_info.max_decree;
if (_current_log_file != nullptr) {
current_file_index = _current_log_file->index();
}
}
if (files.size() <= 1) {
// nothing to do
return 0;
}
// the last one should be the current log file
CHECK(current_file_index == -1 || files.rbegin()->first == current_file_index,
"invalid current_file_index, index = {}",
current_file_index);
// find the largest file which can be deleted.
// after iterate, the 'mark_it' will point to the largest file which can be deleted.
log_file_map_by_index::reverse_iterator mark_it;
int64_t already_reserved_size = 0;
for (mark_it = files.rbegin(); mark_it != files.rend(); ++mark_it) {
log_file_ptr log = mark_it->second;
CHECK_EQ(mark_it->first, log->index());
// currently, "max_decree" is the max decree covered by this log.
// reserve current file
if (current_file_index == log->index()) {
// not break, go to update max decree
}
// reserve if the file is covered by both reserve_max_size and reserve_max_time
else if (should_reserve_file(
log, already_reserved_size, reserve_max_size, reserve_max_time)) {
// not break, go to update max decree
}
// log is invalid, ok to delete
else if (valid_start_offset >= log->end_offset()) {
LOG_INFO("gc_private @ {}: will remove files {} ~ log.{} because "
"valid_start_offset={} outdates log_end_offset={}",
_private_gpid,
files.begin()->second->path(),
log->index(),
valid_start_offset,
log->end_offset());
break;
}
// all mutations are cleanable, ok to delete
else if (cleanable_decree >= max_decree) {
LOG_INFO("gc_private @ {}: will remove files {} ~ log.{} because "
"cleanable_decree={} outdates max_decree={}",
_private_gpid,
files.begin()->second->path(),
log->index(),
cleanable_decree,
max_decree);
break;
}
// update max decree for the next log file
auto &max_decrees = log->previous_log_max_decrees();
auto it3 = max_decrees.find(gpid);
CHECK(it3 != max_decrees.end(), "impossible for private logs");
max_decree = it3->second.max_decree;
already_reserved_size += log->end_offset() - log->start_offset();
}
if (mark_it == files.rend()) {
// no file to delete
return 0;
}
// ok, let's delete files in increasing order of file index
// to avoid making a hole in the file list
int largest_to_delete = mark_it->second->index();
int deleted = 0;
for (auto it = files.begin(); it != files.end() && it->second->index() <= largest_to_delete;
++it) {
log_file_ptr log = it->second;
CHECK_EQ(it->first, log->index());
// close first
log->close();
// delete file
auto &fpath = log->path();
if (!dsn::utils::filesystem::remove_path(fpath)) {
LOG_ERROR("gc_private @ {}: fail to remove {}, stop current gc cycle ...",
_private_gpid,
fpath);
break;
}
// delete succeed
LOG_INFO("gc_private @ {}: log file {} is removed", _private_gpid, fpath);
deleted++;
// erase from _log_files
{
zauto_lock l(_lock);
_log_files.erase(it->first);
_global_start_offset =
_log_files.size() > 0 ? _log_files.begin()->second->start_offset() : 0;
}
}
return deleted;
}
struct gc_summary_info
{
dsn::gpid pid;
int min_file_index = 0;
dsn::replication::decree max_decree_gap = 0;
dsn::replication::decree garbage_max_decree = 0;
dsn::replication::decree slog_max_decree = 0;
std::string to_string() const
{
return fmt::format("gc_summary_info = [pid = {}, min_file_index = {}, max_decree_gap = {}, "
"garbage_max_decree = {}, slog_max_decree = {}]",
pid,
min_file_index,
max_decree_gap,
garbage_max_decree,
slog_max_decree);
}
friend std::ostream &operator<<(std::ostream &os, const gc_summary_info &gc_summary)
{
return os << gc_summary.to_string();
}
};
namespace {
bool can_gc_replica_slog(const dsn::replication::replica_log_info_map &slog_max_decrees,
const dsn::replication::log_file_ptr &file,
const dsn::gpid &pid,
const dsn::replication::replica_log_info &replica_durable_info,
dsn::replication::gc_summary_info &gc_summary)
{
const auto &garbage_max_decree = replica_durable_info.max_decree;
const auto &valid_start_offset = replica_durable_info.valid_start_offset;
const auto &it = slog_max_decrees.find(pid);
if (it == slog_max_decrees.end()) {
// There's no log found in this file for this replica, thus all decrees of
// this replica in this file could deleted.
//
// `valid_start_offset` might be reset to 0 if initialize_on_load() returned
// `ERR_INCOMPLETE_DATA`, thus it's possible that `valid_start_offset == 0`.
CHECK(valid_start_offset == 0 || file->end_offset() <= valid_start_offset,
"valid start offset must be 0 or greater than the end of this log file");
LOG_INFO("gc @ {}: max_decree for {} is missing vs {} as garbage max decree, it's "
"safe to delete this and all older logs for this replica",
pid,
file->path(),
garbage_max_decree);
return true;
}
if (file->end_offset() <= valid_start_offset) {
// This file has been invalid for this replica, since `valid_start_offset` was reset
// to a file with larger index than this file. Thus all decrees of this replica in
// this file could be deleted.
LOG_INFO("gc @ {}: log is invalid for {}, as valid start offset vs log end offset = "
"{} vs {}, it is therefore safe to delete this and all older logs for this "
"replica",
pid,
file->path(),
valid_start_offset,
file->end_offset());
return true;
}
if (it->second.max_decree <= garbage_max_decree) {
// All decrees are no more than the garbage max decree. Since all decrees less than
// garbage max decree would be deleted, all decrees of this replica in this file
// could be deleted.
LOG_INFO("gc @ {}: max_decree for {} is {} vs {} as garbage max decree, it is "
"therefore safe to delete this and all older logs for this replica",
pid,
file->path(),
it->second.max_decree,
garbage_max_decree);
return true;
}
// it->second.max_decree > garbage_max_decree
//
// Some decrees are more than garbage max decree, thus this file should not be deleted
// for now.
LOG_INFO("gc @ {}: max_decree for {} is {} vs {} as garbage max decree, it "
"is therefore not allowed to delete this and all older logs",
pid,
file->path(),
it->second.max_decree,
garbage_max_decree);
auto gap = it->second.max_decree - garbage_max_decree;
if (file->index() < gc_summary.min_file_index || gap > gc_summary.max_decree_gap) {
// Find the oldest file of this round of iteration for gc of slog files, with the
// max decree gap between the garbage max decree and the oldest slog file.
gc_summary.pid = pid;
gc_summary.min_file_index = file->index();
gc_summary.max_decree_gap = gap;
gc_summary.garbage_max_decree = garbage_max_decree;
gc_summary.slog_max_decree = it->second.max_decree;
}
return false;
}
} // anonymous namespace
void mutation_log::garbage_collection(const replica_log_info_map &replica_durable_decrees,
std::set<gpid> &prevent_gc_replicas)
{
CHECK(!_is_private, "this method is only valid for shared log");
// Fetch the snapshot of the latest states of the slog, such as the max decree it maintains
// for each partition.
log_file_map_by_index files;
replica_log_info_map slog_max_decrees;
int64_t total_log_size = 0;
{
zauto_lock l(_lock);
total_log_size = total_size_no_lock();
if (_log_files.empty()) {
CHECK_EQ(total_log_size, 0);
LOG_INFO("gc_shared: slog file not found");
return;
}
CHECK_NULL(_current_log_file,
"shared logs have been deprecated, thus could not be created");
files = _log_files;
slog_max_decrees = _shared_log_info_map;
}
reserved_slog_info reserved_slog = {
files.size(), total_log_size, files.begin()->first, files.rbegin()->first};
// Iterate over the slog files from the newest to the oldest in descending order(i.e.
// file index in descending order), to find the newest file that could be deleted(after
// iterating, `mark_it` would point to the newest file that could be deleted).
log_file_map_by_index::reverse_iterator mark_it;
std::set<gpid> kickout_replicas;
gc_summary_info gc_summary;
for (mark_it = files.rbegin(); mark_it != files.rend(); ++mark_it) {
const auto &file = mark_it->second;
CHECK_EQ(mark_it->first, file->index());
bool can_gc_all_replicas_slog = true;
for (const auto &replica_durable_info : replica_durable_decrees) {
if (kickout_replicas.find(replica_durable_info.first) != kickout_replicas.end()) {
// There's no need to consider this replica.
continue;
}
if (can_gc_replica_slog(slog_max_decrees,
file,
replica_durable_info.first,
replica_durable_info.second,
gc_summary)) {
// Log files before this file is useless for this replica,
// so from now on, this replica would not be considered any more.
kickout_replicas.insert(replica_durable_info.first);
continue;
}
// For now, this file could not be deleted.
can_gc_all_replicas_slog = false;
prevent_gc_replicas.insert(replica_durable_info.first);
}
if (can_gc_all_replicas_slog) {
// The newest file that could be deleted has been found.
break;
}
// Fetch max decrees of the next slog file.
slog_max_decrees = file->previous_log_max_decrees();
}
if (mark_it == files.rend()) {
// There's no file that could be deleted.
LOG_INFO("gc_shared: no file can be deleted: {}, {}, prevent_gc_replicas = {}",
reserved_slog,
gc_summary,
prevent_gc_replicas.size());
return;
}
slog_deletion_info slog_deletion;
// Delete files in ascending order of file index. Otherwise, deleting files in descending
// order would lead to a hole in the file list once a file failed to be deleted.
remove_obsolete_slog_files(mark_it->second->index(), files, reserved_slog, slog_deletion);
LOG_INFO("gc_shared: deleted some files: {}, {}, {}, prevent_gc_replicas = {}",
reserved_slog,
slog_deletion,
gc_summary,
prevent_gc_replicas.size());
}
void mutation_log::remove_obsolete_slog_files(const int max_file_index_to_delete,
log_file_map_by_index &files,
reserved_slog_info &reserved_slog,
slog_deletion_info &slog_deletion)
{
for (auto it = files.begin();
it != files.end() && it->second->index() <= max_file_index_to_delete;
++it) {
auto &file = it->second;
CHECK_EQ(it->first, file->index());
slog_deletion.to_delete_file_count++;
slog_deletion.to_delete_log_size += file->end_offset() - file->start_offset();
// Firstly close the log file.
file->close();
// Delete the log file.
const auto &fpath = file->path();
if (!dsn::utils::filesystem::remove_path(fpath)) {
LOG_ERROR("gc_shared: fail to remove {}, stop current gc cycle ...", fpath);
break;
}
// The log file is deleted successfully.
LOG_INFO("gc_shared: log file {} is removed", fpath);
slog_deletion.deleted_file_count++;
slog_deletion.deleted_log_size += file->end_offset() - file->start_offset();
if (slog_deletion.deleted_min_file_index == 0) {
slog_deletion.deleted_min_file_index = file->index();
}
slog_deletion.deleted_max_file_index = file->index();
// Remove the log file from _log_files.
{
zauto_lock l(_lock);
_log_files.erase(it->first);
_global_start_offset =
_log_files.size() > 0 ? _log_files.begin()->second->start_offset() : 0;
reserved_slog.file_count = _log_files.size();
reserved_slog.log_size = total_size_no_lock();
if (reserved_slog.file_count > 0) {
reserved_slog.min_file_index = _log_files.begin()->first;
reserved_slog.max_file_index = _log_files.rbegin()->first;
} else {
reserved_slog.min_file_index = -1;
reserved_slog.max_file_index = -1;
}
}
}
}
mutation_log::log_file_map_by_index mutation_log::get_log_file_map() const
{
zauto_lock l(_lock);
return _log_files;
}
} // namespace replication
} // namespace dsn
USER_DEFINED_STRUCTURE_FORMATTER(dsn::replication::gc_summary_info);