blob: 653b0c13834cff4a3a92bca6a4e578a4135b6f30 [file] [log] [blame]
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
/*
* Description:
* fs_manager's implement: used to track the disk position for all the allocated replicas
*
* Revision history:
* 2017-08-08: sunweijie@xiaomi.com, first draft
*/
#include "fs_manager.h"
#include <algorithm>
#include <cmath>
#include <iosfwd>
#include <utility>
#include "common/gpid.h"
#include "common/replication_enums.h"
#include "fmt/core.h"
#include "fmt/ostream.h"
#include "perf_counter/perf_counter.h"
#include "runtime/api_layer1.h"
#include "runtime/rpc/rpc_address.h"
#include "utils/fail_point.h"
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
#include "utils/ports.h"
#include "utils/string_view.h"
namespace dsn {
namespace replication {
DSN_DEFINE_int32(replication,
disk_min_available_space_ratio,
10,
"if disk available space ratio "
"is below this value, this "
"disk will be considered as "
"space insufficient");
DSN_TAG_VARIABLE(disk_min_available_space_ratio, FT_MUTABLE);
DSN_DEFINE_bool(replication,
ignore_broken_disk,
true,
"true means ignore broken data disk when initialize");
uint64_t dir_node::replicas_count() const
{
uint64_t sum = 0;
for (const auto &s : holding_replicas) {
sum += s.second.size();
}
return sum;
}
uint64_t dir_node::replicas_count(app_id id) const
{
const auto iter = holding_replicas.find(id);
if (iter == holding_replicas.end()) {
return 0;
}
return iter->second.size();
}
std::string dir_node::replica_dir(dsn::string_view app_type, const dsn::gpid &pid) const
{
return utils::filesystem::path_combine(full_dir, fmt::format("{}.{}", pid, app_type));
}
bool dir_node::has(const gpid &pid) const
{
auto iter = holding_replicas.find(pid.get_app_id());
if (iter == holding_replicas.end()) {
return false;
}
return iter->second.find(pid) != iter->second.end();
}
uint64_t dir_node::remove(const gpid &pid)
{
auto iter = holding_replicas.find(pid.get_app_id());
if (iter == holding_replicas.end()) {
return 0;
}
return iter->second.erase(pid);
}
void dir_node::update_disk_stat()
{
FAIL_POINT_INJECT_F("update_disk_stat", [](string_view) { return; });
dsn::utils::filesystem::disk_space_info dsi;
if (!dsn::utils::filesystem::get_disk_space_info(full_dir, dsi)) {
// TODO(yingchun): it may encounter some IO errors when get_disk_space_info() failed, deal
// with it.
LOG_ERROR("get disk space info failed, dir = {}", full_dir);
return;
}
disk_capacity_mb = dsi.capacity >> 20;
disk_available_mb = dsi.available >> 20;
disk_available_ratio = static_cast<int>(
disk_capacity_mb == 0 ? 0 : std::round(disk_available_mb * 100.0 / disk_capacity_mb));
auto old_status = status;
auto new_status = disk_available_ratio < FLAGS_disk_min_available_space_ratio
? disk_status::SPACE_INSUFFICIENT
: disk_status::NORMAL;
if (old_status != new_status) {
status = new_status;
}
LOG_INFO("update disk space succeed: dir = {}, capacity_mb = {}, available_mb = {}, "
"available_ratio = {}%, disk_status = {}",
full_dir,
disk_capacity_mb,
disk_available_mb,
disk_available_ratio,
enum_to_string(status));
}
fs_manager::fs_manager()
{
_counter_total_capacity_mb.init_app_counter("eon.replica_stub",
"disk.capacity.total(MB)",
COUNTER_TYPE_NUMBER,
"total disk capacity in MB");
_counter_total_available_mb.init_app_counter("eon.replica_stub",
"disk.available.total(MB)",
COUNTER_TYPE_NUMBER,
"total disk available in MB");
_counter_total_available_ratio.init_app_counter("eon.replica_stub",
"disk.available.total.ratio",
COUNTER_TYPE_NUMBER,
"total disk available ratio");
_counter_min_available_ratio.init_app_counter("eon.replica_stub",
"disk.available.min.ratio",
COUNTER_TYPE_NUMBER,
"minimal disk available ratio in all disks");
_counter_max_available_ratio.init_app_counter("eon.replica_stub",
"disk.available.max.ratio",
COUNTER_TYPE_NUMBER,
"maximal disk available ratio in all disks");
}
dir_node *fs_manager::get_dir_node(const std::string &subdir) const
{
std::string norm_subdir;
utils::filesystem::get_normalized_path(subdir, norm_subdir);
zauto_read_lock l(_lock);
for (const auto &dn : _dir_nodes) {
// Check if 'subdir' is a sub-directory of 'dn'.
const std::string &full_dir = dn->full_dir;
if (full_dir.size() > norm_subdir.size()) {
continue;
}
if ((norm_subdir.size() == full_dir.size() || norm_subdir[full_dir.size()] == '/') &&
norm_subdir.compare(0, full_dir.size(), full_dir) == 0) {
return dn.get();
}
}
return nullptr;
}
void fs_manager::initialize(const std::vector<std::string> &data_dirs,
const std::vector<std::string> &data_dir_tags)
{
CHECK_EQ(data_dirs.size(), data_dir_tags.size());
// Skip the data directories which are broken.
std::vector<std::shared_ptr<dir_node>> dir_nodes;
for (auto i = 0; i < data_dir_tags.size(); ++i) {
const auto &dir_tag = data_dir_tags[i];
const auto &dir = data_dirs[i];
// Check the status of this directory.
std::string cdir;
std::string err_msg;
if (dsn_unlikely(!utils::filesystem::create_directory(dir, cdir, err_msg) ||
!utils::filesystem::check_dir_rw(dir, err_msg))) {
if (FLAGS_ignore_broken_disk) {
LOG_ERROR("data dir({}) is broken, ignore it, error: {}", dir, err_msg);
} else {
CHECK(false, err_msg);
}
// TODO(yingchun): Remove the 'continue' and mark its io error status, regardless
// the status of the disks, add all disks.
continue;
}
// Normalize the data directories.
std::string norm_path;
utils::filesystem::get_normalized_path(cdir, norm_path);
// Create and add this dir_node.
auto dn = std::make_shared<dir_node>(dir_tag, norm_path);
dir_nodes.emplace_back(dn);
LOG_INFO("mark data dir({}) as tag({})", norm_path, dir_tag);
}
CHECK_FALSE(dir_nodes.empty());
// Update the memory state.
{
zauto_read_lock l(_lock);
_dir_nodes.swap(dir_nodes);
}
// Update the disk statistics.
update_disk_stat();
}
dsn::error_code fs_manager::get_disk_tag(const std::string &dir, std::string &tag)
{
dir_node *n = get_dir_node(dir);
if (nullptr == n) {
return dsn::ERR_OBJECT_NOT_FOUND;
} else {
tag = n->tag;
return dsn::ERR_OK;
}
}
void fs_manager::add_replica(const gpid &pid, const std::string &pid_dir)
{
const auto &dn = get_dir_node(pid_dir);
if (dsn_unlikely(nullptr == dn)) {
LOG_ERROR(
"{}: dir({}) of gpid({}) haven't registered", dsn_primary_address(), pid_dir, pid);
return;
}
bool emplace_success = false;
{
zauto_write_lock l(_lock);
auto &replicas_for_app = dn->holding_replicas[pid.get_app_id()];
emplace_success = replicas_for_app.emplace(pid).second;
}
if (!emplace_success) {
LOG_WARNING(
"{}: gpid({}) already in the dir_node({})", dsn_primary_address(), pid, dn->tag);
return;
}
LOG_INFO("{}: add gpid({}) to dir_node({})", dsn_primary_address(), pid, dn->tag);
}
dir_node *fs_manager::find_best_dir_for_new_replica(const gpid &pid) const
{
dir_node *selected = nullptr;
uint64_t least_app_replicas_count = 0;
uint64_t least_total_replicas_count = 0;
{
zauto_write_lock l(_lock);
// Try to find the dir_node with the least replica count.
for (const auto &dn : _dir_nodes) {
CHECK(!dn->has(pid), "gpid({}) already exists in dir_node({})", pid, dn->tag);
uint64_t app_replicas_count = dn->replicas_count(pid.get_app_id());
uint64_t total_replicas_count = dn->replicas_count();
if (selected == nullptr || least_app_replicas_count > app_replicas_count) {
least_app_replicas_count = app_replicas_count;
least_total_replicas_count = total_replicas_count;
selected = dn.get();
} else if (least_app_replicas_count == app_replicas_count &&
least_total_replicas_count > total_replicas_count) {
least_total_replicas_count = total_replicas_count;
selected = dn.get();
}
}
}
if (selected != nullptr) {
LOG_INFO(
"{}: put pid({}) to dir({}), which has {} replicas of current app, {} replicas totally",
dsn_primary_address(),
pid,
selected->tag,
least_app_replicas_count,
least_total_replicas_count);
}
return selected;
}
void fs_manager::remove_replica(const gpid &pid)
{
zauto_write_lock l(_lock);
uint64_t remove_count = 0;
for (auto &dn : _dir_nodes) {
uint64_t r = dn->remove(pid);
CHECK_LE_MSG(remove_count + r,
1,
"gpid({}) found in dir({}), which was removed before",
pid,
dn->tag);
if (r != 0) {
LOG_INFO("{}: remove gpid({}) from dir({})", dsn_primary_address(), pid, dn->tag);
}
remove_count += r;
}
}
bool fs_manager::for_each_dir_node(const std::function<bool(const dir_node &)> &func) const
{
zauto_read_lock l(_lock);
for (auto &n : _dir_nodes) {
if (!func(*n))
return false;
}
return true;
}
void fs_manager::update_disk_stat()
{
zauto_write_lock l(_lock);
reset_disk_stat();
for (auto &dn : _dir_nodes) {
dn->update_disk_stat();
_total_capacity_mb += dn->disk_capacity_mb;
_total_available_mb += dn->disk_available_mb;
_min_available_ratio = std::min(dn->disk_available_ratio, _min_available_ratio);
_max_available_ratio = std::max(dn->disk_available_ratio, _max_available_ratio);
}
_total_available_ratio = static_cast<int>(
_total_capacity_mb == 0 ? 0 : std::round(_total_available_mb * 100.0 / _total_capacity_mb));
LOG_INFO("update disk space succeed: disk_count = {}, total_capacity_mb = {}, "
"total_available_mb = {}, total_available_ratio = {}%, min_available_ratio = {}%, "
"max_available_ratio = {}%",
_dir_nodes.size(),
_total_capacity_mb,
_total_available_mb,
_total_available_ratio,
_min_available_ratio,
_max_available_ratio);
_counter_total_capacity_mb->set(_total_capacity_mb);
_counter_total_available_mb->set(_total_available_mb);
_counter_total_available_ratio->set(_total_available_ratio);
_counter_min_available_ratio->set(_min_available_ratio);
_counter_max_available_ratio->set(_max_available_ratio);
}
void fs_manager::add_new_dir_node(const std::string &data_dir, const std::string &tag)
{
zauto_write_lock l(_lock);
std::string norm_path;
utils::filesystem::get_normalized_path(data_dir, norm_path);
dir_node *n = new dir_node(tag, norm_path);
_dir_nodes.emplace_back(n);
LOG_INFO("{}: mark data dir({}) as tag({})", dsn_primary_address().to_string(), norm_path, tag);
}
bool fs_manager::is_dir_node_available(const std::string &data_dir, const std::string &tag) const
{
zauto_read_lock l(_lock);
for (const auto &dir_node : _dir_nodes) {
std::string norm_path;
utils::filesystem::get_normalized_path(data_dir, norm_path);
if (dir_node->full_dir == norm_path || dir_node->tag == tag) {
return true;
}
}
return false;
}
dir_node *fs_manager::find_replica_dir(dsn::string_view app_type, gpid pid)
{
std::string replica_dir;
dir_node *replica_dn = nullptr;
{
zauto_read_lock l(_lock);
for (const auto &dn : _dir_nodes) {
const auto dir = dn->replica_dir(app_type, pid);
if (utils::filesystem::directory_exists(dir)) {
// Check if there are duplicate replica instance directories.
CHECK(replica_dir.empty(), "replica dir conflict: {} <--> {}", dir, replica_dir);
replica_dir = dir;
replica_dn = dn.get();
}
}
}
return replica_dn;
}
dir_node *fs_manager::create_replica_dir_if_necessary(dsn::string_view app_type, gpid pid)
{
// Try to find the replica directory.
auto replica_dn = find_replica_dir(app_type, pid);
if (replica_dn != nullptr) {
return replica_dn;
}
// Find a dir_node for the new replica.
replica_dn = find_best_dir_for_new_replica(pid);
if (replica_dn == nullptr) {
return nullptr;
}
const auto dir = replica_dn->replica_dir(app_type, pid);
if (!dsn::utils::filesystem::create_directory(dir)) {
LOG_ERROR("create replica directory({}) failed", dir);
return nullptr;
}
replica_dn->holding_replicas[pid.get_app_id()].emplace(pid);
return replica_dn;
}
dir_node *fs_manager::create_child_replica_dir(dsn::string_view app_type,
gpid child_pid,
const std::string &parent_dir)
{
dir_node *child_dn = nullptr;
std::string child_dir;
{
zauto_read_lock l(_lock);
for (const auto &dn : _dir_nodes) {
child_dir = dn->replica_dir(app_type, child_pid);
// <parent_dir> = <prefix>/<gpid>.<app_type>
// check if <parent_dir>'s <prefix> is equal to <data_dir>
// TODO(yingchun): use a function instead.
if (parent_dir.substr(0, dn->full_dir.size() + 1) == dn->full_dir + "/") {
child_dn = dn.get();
break;
}
}
}
CHECK_NOTNULL(child_dn, "can not find parent_dir {} in data_dirs", parent_dir);
if (!dsn::utils::filesystem::create_directory(child_dir)) {
LOG_ERROR("create child replica directory({}) failed", child_dir);
return nullptr;
}
add_replica(child_pid, child_dir);
return child_dn;
}
} // namespace replication
} // namespace dsn