blob: 89487b0c637fbf9ce22c2e2feccf9d8906000b3b [file] [log] [blame]
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "fs_manager.h"
#include <fmt/std.h> // IWYU pragma: keep
#include <algorithm>
#include <cstdint>
#include <utility>
#include "absl/strings/string_view.h"
#include "common/gpid.h"
#include "common/replication_enums.h"
#include "fmt/core.h"
#include "replica_admin_types.h"
#include "runtime/api_layer1.h"
#include "utils/fail_point.h"
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
#include "utils/math.h"
#include "utils/ports.h"
METRIC_DEFINE_entity(disk);
METRIC_DEFINE_gauge_int64(disk,
disk_capacity_total_mb,
dsn::metric_unit::kMegaBytes,
"The total disk capacity");
METRIC_DEFINE_gauge_int64(disk,
disk_capacity_avail_mb,
dsn::metric_unit::kMegaBytes,
"The available disk capacity");
namespace dsn {
namespace replication {
DSN_DEFINE_int32(replication,
disk_min_available_space_ratio,
10,
"if disk available space ratio "
"is below this value, this "
"disk will be considered as "
"space insufficient");
DSN_TAG_VARIABLE(disk_min_available_space_ratio, FT_MUTABLE);
DSN_DEFINE_bool(replication,
ignore_broken_disk,
true,
"true means ignore broken data disk when initialize");
error_code disk_status_to_error_code(disk_status::type ds)
{
switch (ds) {
case disk_status::SPACE_INSUFFICIENT:
return dsn::ERR_DISK_INSUFFICIENT;
case disk_status::IO_ERROR:
return dsn::ERR_DISK_IO_ERROR;
default:
CHECK_EQ(disk_status::NORMAL, ds);
return dsn::ERR_OK;
}
}
namespace {
metric_entity_ptr instantiate_disk_metric_entity(const std::string &tag,
const std::string &data_dir)
{
auto entity_id = fmt::format("disk@{}", tag);
return METRIC_ENTITY_disk.instantiate(entity_id, {{"tag", tag}, {"data_dir", data_dir}});
}
} // anonymous namespace
disk_capacity_metrics::disk_capacity_metrics(const std::string &tag, const std::string &data_dir)
: _tag(tag),
_data_dir(data_dir),
_disk_metric_entity(instantiate_disk_metric_entity(tag, data_dir)),
METRIC_VAR_INIT_disk(disk_capacity_total_mb),
METRIC_VAR_INIT_disk(disk_capacity_avail_mb)
{
}
const metric_entity_ptr &disk_capacity_metrics::disk_metric_entity() const
{
CHECK_NOTNULL(_disk_metric_entity,
"disk metric entity (tag={}, data_dir={}) should has been instantiated: "
"uninitialized entity cannot be used to instantiate metric",
_tag,
_data_dir);
return _disk_metric_entity;
}
uint64_t dir_node::replicas_count() const
{
uint64_t sum = 0;
for (const auto &s : holding_replicas) {
sum += s.second.size();
}
return sum;
}
uint64_t dir_node::replicas_count(app_id id) const
{
const auto iter = holding_replicas.find(id);
if (iter == holding_replicas.end()) {
return 0;
}
return iter->second.size();
}
std::string dir_node::replica_dir(absl::string_view app_type, const dsn::gpid &pid) const
{
return utils::filesystem::path_combine(full_dir, fmt::format("{}.{}", pid, app_type));
}
bool dir_node::has(const gpid &pid) const
{
auto iter = holding_replicas.find(pid.get_app_id());
if (iter == holding_replicas.end()) {
return false;
}
return iter->second.find(pid) != iter->second.end();
}
uint64_t dir_node::remove(const gpid &pid)
{
auto iter = holding_replicas.find(pid.get_app_id());
if (iter == holding_replicas.end()) {
return 0;
}
return iter->second.erase(pid);
}
void dir_node::update_disk_stat()
{
FAIL_POINT_INJECT_F("update_disk_stat", [](absl::string_view) { return; });
dsn::utils::filesystem::disk_space_info dsi;
if (!dsn::utils::filesystem::get_disk_space_info(full_dir, dsi)) {
// TODO(yingchun): it may encounter some IO errors when get_disk_space_info() failed, deal
// with it.
LOG_ERROR("get disk space info failed, dir = {}", full_dir);
return;
}
disk_capacity_mb = dsi.capacity >> 20;
disk_available_mb = dsi.available >> 20;
disk_available_ratio = dsn::utils::calc_percentage<int>(disk_available_mb, disk_capacity_mb);
METRIC_SET(disk_capacity, disk_capacity_total_mb, disk_capacity_mb);
METRIC_SET(disk_capacity, disk_capacity_avail_mb, disk_available_mb);
// It's able to change status from NORMAL to SPACE_INSUFFICIENT, and vice versa.
const disk_status::type old_status = status;
const auto new_status = disk_available_ratio < FLAGS_disk_min_available_space_ratio
? disk_status::SPACE_INSUFFICIENT
: disk_status::NORMAL;
if (old_status != new_status) {
status = new_status;
}
LOG_INFO("update disk space succeed: dir = {}, capacity_mb = {}, available_mb = {}, "
"available_ratio = {}%, disk_status = {}",
full_dir,
disk_capacity_mb,
disk_available_mb,
disk_available_ratio,
enum_to_string(status));
}
dir_node *fs_manager::get_dir_node(const std::string &subdir) const
{
std::string norm_subdir;
utils::filesystem::get_normalized_path(subdir, norm_subdir);
zauto_read_lock l(_lock);
for (const auto &dn : _dir_nodes) {
// Check if 'subdir' is a sub-directory of 'dn'.
const std::string &full_dir = dn->full_dir;
if (full_dir.size() > norm_subdir.size()) {
continue;
}
if ((norm_subdir.size() == full_dir.size() || norm_subdir[full_dir.size()] == '/') &&
norm_subdir.compare(0, full_dir.size(), full_dir) == 0) {
return dn.get();
}
}
return nullptr;
}
void fs_manager::initialize(const std::vector<std::string> &data_dirs,
const std::vector<std::string> &data_dir_tags)
{
CHECK_EQ(data_dirs.size(), data_dir_tags.size());
// Skip the data directories which are broken.
std::vector<std::shared_ptr<dir_node>> dir_nodes;
for (auto i = 0; i < data_dir_tags.size(); ++i) {
const auto &dir_tag = data_dir_tags[i];
const auto &dir = data_dirs[i];
// Check the status of this directory.
std::string cdir;
std::string err_msg;
disk_status::type status = disk_status::NORMAL;
if (dsn_unlikely(!utils::filesystem::create_directory(dir, cdir, err_msg) ||
!utils::filesystem::check_dir_rw(dir, err_msg))) {
if (FLAGS_ignore_broken_disk) {
LOG_ERROR("data dir({}) is broken, ignore it, error: {}", dir, err_msg);
} else {
CHECK(false, err_msg);
}
status = disk_status::IO_ERROR;
}
// Normalize the data directories.
std::string norm_path;
utils::filesystem::get_normalized_path(cdir, norm_path);
// Create and add this dir_node.
auto dn = std::make_shared<dir_node>(dir_tag, norm_path, 0, 0, 0, status);
dir_nodes.emplace_back(dn);
LOG_INFO("mark data dir({}) as tag({}) with status({})",
norm_path,
dir_tag,
enum_to_string(status));
}
CHECK_FALSE(dir_nodes.empty());
// Update the memory state.
{
zauto_read_lock l(_lock);
_dir_nodes.swap(dir_nodes);
}
// Update the disk statistics.
update_disk_stat();
}
void fs_manager::add_replica(const gpid &pid, const std::string &pid_dir)
{
const auto &dn = get_dir_node(pid_dir);
if (dsn_unlikely(nullptr == dn)) {
LOG_ERROR(
"{}: dir({}) of gpid({}) haven't registered", dsn_primary_address(), pid_dir, pid);
return;
}
bool emplace_success = false;
{
zauto_write_lock l(_lock);
auto &replicas_for_app = dn->holding_replicas[pid.get_app_id()];
emplace_success = replicas_for_app.emplace(pid).second;
}
if (!emplace_success) {
LOG_WARNING(
"{}: gpid({}) already in the dir_node({})", dsn_primary_address(), pid, dn->tag);
return;
}
LOG_INFO("{}: add gpid({}) to dir_node({})", dsn_primary_address(), pid, dn->tag);
}
dir_node *fs_manager::find_best_dir_for_new_replica(const gpid &pid) const
{
dir_node *selected = nullptr;
uint64_t least_app_replicas_count = 0;
uint64_t least_total_replicas_count = 0;
{
zauto_write_lock l(_lock);
// Try to find the dir_node with the least replica count.
for (const auto &dn : _dir_nodes) {
// Do not allocate new replica on dir_node which is not NORMAL.
if (dn->status != disk_status::NORMAL) {
continue;
}
CHECK(!dn->has(pid), "gpid({}) already exists in dir_node({})", pid, dn->tag);
uint64_t app_replicas_count = dn->replicas_count(pid.get_app_id());
uint64_t total_replicas_count = dn->replicas_count();
if (selected == nullptr || least_app_replicas_count > app_replicas_count) {
least_app_replicas_count = app_replicas_count;
least_total_replicas_count = total_replicas_count;
selected = dn.get();
} else if (least_app_replicas_count == app_replicas_count &&
least_total_replicas_count > total_replicas_count) {
least_total_replicas_count = total_replicas_count;
selected = dn.get();
}
}
}
if (selected != nullptr) {
LOG_INFO(
"{}: put pid({}) to dir({}), which has {} replicas of current app, {} replicas totally",
dsn_primary_address(),
pid,
selected->tag,
least_app_replicas_count,
least_total_replicas_count);
}
return selected;
}
void fs_manager::specify_dir_for_new_replica_for_test(dir_node *specified_dn,
absl::string_view app_type,
const dsn::gpid &pid) const
{
bool dn_found = false;
zauto_write_lock l(_lock);
for (const auto &dn : _dir_nodes) {
CHECK(!dn->has(pid), "gpid({}) already exists in dir_node({})", pid, dn->tag);
if (dn.get() == specified_dn) {
dn_found = true;
}
}
CHECK(dn_found, "dir_node({}) is not exist", specified_dn->tag);
CHECK_EQ(disk_status::NORMAL, specified_dn->status);
const auto dir = specified_dn->replica_dir(app_type, pid);
CHECK_TRUE(dsn::utils::filesystem::create_directory(dir));
specified_dn->holding_replicas[pid.get_app_id()].emplace(pid);
}
void fs_manager::remove_replica(const gpid &pid)
{
zauto_write_lock l(_lock);
uint64_t remove_count = 0;
for (auto &dn : _dir_nodes) {
uint64_t r = dn->remove(pid);
CHECK_LE_MSG(remove_count + r,
1,
"gpid({}) found in dir({}), which was removed before",
pid,
dn->tag);
if (r != 0) {
LOG_INFO("{}: remove gpid({}) from dir({})", dsn_primary_address(), pid, dn->tag);
}
remove_count += r;
}
}
void fs_manager::update_disk_stat()
{
int64_t total_capacity_mb = 0;
int64_t total_available_mb = 0;
int total_available_ratio = 0;
int min_available_ratio = 100;
int max_available_ratio = 0;
zauto_write_lock l(_lock);
for (auto &dn : _dir_nodes) {
// If the disk is already in IO_ERROR status, it will not change to other status, just skip
// it.
if (dn->status == disk_status::IO_ERROR) {
LOG_WARNING("skip to update disk stat for dir({}), because it is in IO_ERROR status",
dn->tag);
continue;
}
dn->update_disk_stat();
total_capacity_mb += dn->disk_capacity_mb;
total_available_mb += dn->disk_available_mb;
min_available_ratio = std::min(dn->disk_available_ratio, min_available_ratio);
max_available_ratio = std::max(dn->disk_available_ratio, max_available_ratio);
}
total_available_ratio = dsn::utils::calc_percentage<int>(total_available_mb, total_capacity_mb);
LOG_INFO("update disk space succeed: disk_count = {}, total_capacity_mb = {}, "
"total_available_mb = {}, total_available_ratio = {}%, min_available_ratio = {}%, "
"max_available_ratio = {}%",
_dir_nodes.size(),
total_capacity_mb,
total_available_mb,
total_available_ratio,
min_available_ratio,
max_available_ratio);
_total_capacity_mb.store(total_capacity_mb, std::memory_order_relaxed);
_total_available_mb.store(total_available_mb, std::memory_order_relaxed);
}
void fs_manager::add_new_dir_node(const std::string &data_dir, const std::string &tag)
{
std::string norm_path;
utils::filesystem::get_normalized_path(data_dir, norm_path);
auto dn = std::make_shared<dir_node>(tag, norm_path);
{
zauto_write_lock l(_lock);
_dir_nodes.emplace_back(dn);
}
LOG_INFO("add new data dir({}) and mark as tag({})", norm_path, tag);
}
bool fs_manager::is_dir_node_exist(const std::string &data_dir, const std::string &tag) const
{
std::string norm_path;
utils::filesystem::get_normalized_path(data_dir, norm_path);
zauto_read_lock l(_lock);
for (const auto &dn : _dir_nodes) {
if (dn->full_dir == norm_path || dn->tag == tag) {
return true;
}
}
return false;
}
dir_node *fs_manager::find_replica_dir(absl::string_view app_type, gpid pid)
{
std::string replica_dir;
dir_node *replica_dn = nullptr;
{
zauto_read_lock l(_lock);
for (const auto &dn : _dir_nodes) {
// Skip IO error dir_node.
if (dn->status == disk_status::IO_ERROR) {
continue;
}
const auto dir = dn->replica_dir(app_type, pid);
if (utils::filesystem::directory_exists(dir)) {
// Check if there are duplicate replica instance directories.
CHECK(replica_dir.empty(), "replica dir conflict: {} <--> {}", dir, replica_dir);
replica_dir = dir;
replica_dn = dn.get();
}
}
}
return replica_dn;
}
dir_node *fs_manager::create_replica_dir_if_necessary(absl::string_view app_type, gpid pid)
{
// Try to find the replica directory.
auto replica_dn = find_replica_dir(app_type, pid);
if (replica_dn != nullptr) {
// TODO(yingchun): enable this check after unit tests are refactored and fixed.
// CHECK(replica_dn->has(pid),
// "replica({})'s directory({}) exists but not in management",
// pid,
// replica_dn->tag);
return replica_dn;
}
// TODO(yingchun): enable this check after unit tests are refactored and fixed.
// CHECK(0 == replica_dn->holding_replicas.count(pid.get_app_id()) ||
// 0 == replica_dn->holding_replicas[pid.get_app_id()].count(pid), "");
// Find a dir_node for the new replica.
replica_dn = find_best_dir_for_new_replica(pid);
if (replica_dn == nullptr) {
return nullptr;
}
const auto dir = replica_dn->replica_dir(app_type, pid);
if (!dsn::utils::filesystem::create_directory(dir)) {
LOG_ERROR("create replica directory({}) failed", dir);
return nullptr;
}
replica_dn->holding_replicas[pid.get_app_id()].emplace(pid);
return replica_dn;
}
dir_node *fs_manager::create_child_replica_dir(absl::string_view app_type,
gpid child_pid,
const std::string &parent_dir)
{
dir_node *child_dn = nullptr;
std::string child_dir;
{
zauto_read_lock l(_lock);
for (const auto &dn : _dir_nodes) {
// Skip non-available dir_node.
if (dn->status != disk_status::NORMAL) {
continue;
}
child_dir = dn->replica_dir(app_type, child_pid);
// <parent_dir> = <prefix>/<gpid>.<app_type>
// check if <parent_dir>'s <prefix> is equal to <data_dir>
// TODO(yingchun): use a function instead.
if (parent_dir.substr(0, dn->full_dir.size() + 1) == dn->full_dir + "/") {
child_dn = dn.get();
break;
}
}
}
CHECK_NOTNULL(child_dn, "can not find parent_dir {} in data_dirs", parent_dir);
if (!dsn::utils::filesystem::create_directory(child_dir)) {
LOG_ERROR("create child replica directory({}) failed", child_dir);
return nullptr;
}
add_replica(child_pid, child_dir);
return child_dn;
}
std::vector<disk_info> fs_manager::get_disk_infos(int app_id) const
{
std::vector<disk_info> disk_infos;
zauto_read_lock l(_lock);
for (const auto &dn : _dir_nodes) {
disk_info di;
// Query all app info if 'app_id' is 0, which is not a valid app id.
if (app_id == 0) {
di.holding_primary_replicas = dn->holding_primary_replicas;
di.holding_secondary_replicas = dn->holding_secondary_replicas;
} else {
const auto &primary_replicas = dn->holding_primary_replicas.find(app_id);
if (primary_replicas != dn->holding_primary_replicas.end()) {
di.holding_primary_replicas[app_id] = primary_replicas->second;
}
const auto &secondary_replicas = dn->holding_secondary_replicas.find(app_id);
if (secondary_replicas != dn->holding_secondary_replicas.end()) {
di.holding_secondary_replicas[app_id] = secondary_replicas->second;
}
}
di.tag = dn->tag;
di.full_dir = dn->full_dir;
di.disk_capacity_mb = dn->disk_capacity_mb;
di.disk_available_mb = dn->disk_available_mb;
disk_infos.emplace_back(std::move(di));
}
return disk_infos;
}
error_code fs_manager::validate_migrate_op(gpid pid,
const std::string &origin_disk,
const std::string &target_disk,
std::string &err_msg) const
{
bool origin_disk_exist = false;
bool target_disk_exist = false;
zauto_read_lock l(_lock);
for (const auto &dn : _dir_nodes) {
// Check if the origin directory is valid.
if (dn->tag == origin_disk) {
CHECK_FALSE(origin_disk_exist);
if (!dn->has(pid)) {
err_msg = fmt::format(
"replica({}) doesn't exist on the origin disk({})", pid, origin_disk);
return ERR_OBJECT_NOT_FOUND;
}
// It's OK to migrate a replica from a dir_node which is NORMAL or even
// SPACE_INSUFFICIENT, but not allowed when it's IO_ERROR.
if (dn->status == disk_status::IO_ERROR) {
err_msg = fmt::format(
"replica({}) exists on an IO-Error origin disk({})", pid, origin_disk);
return ERR_DISK_IO_ERROR;
}
origin_disk_exist = true;
}
// Check if the target directory is valid.
if (dn->tag == target_disk) {
CHECK_FALSE(target_disk_exist);
if (dn->has(pid)) {
err_msg =
fmt::format("replica({}) already exists on target disk({})", pid, target_disk);
return ERR_PATH_ALREADY_EXIST;
}
// It's not allowed to migrate a replica to a dir_node which is either
// SPACE_INSUFFICIENT or IO_ERROR.
if (dn->status == disk_status::SPACE_INSUFFICIENT ||
dn->status == disk_status::IO_ERROR) {
err_msg = fmt::format("replica({}) target disk({}) is {}",
pid,
origin_disk,
enum_to_string(dn->status));
return disk_status_to_error_code(dn->status);
}
target_disk_exist = true;
}
}
if (!origin_disk_exist) {
err_msg = fmt::format("origin disk({}) doesn't exist", origin_disk);
return ERR_OBJECT_NOT_FOUND;
}
if (!target_disk_exist) {
err_msg = fmt::format("target disk({}) doesn't exist", target_disk);
return ERR_OBJECT_NOT_FOUND;
}
return ERR_OK;
}
} // namespace replication
} // namespace dsn