blob: b5dc54c00c3ad9f9961a1dec81826fdde6d95a1c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/fs/data_dirs.h"
#include <algorithm>
#include <cerrno>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <mutex>
#include <ostream>
#include <random>
#include <set>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/common.h>
#include "kudu/fs/block_manager.h"
#include "kudu/fs/dir_util.h"
#include "kudu/fs/fs.pb.h"
#include "kudu/gutil/integral_types.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/locks.h"
#include "kudu/util/metrics.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/random.h"
#include "kudu/util/status.h"
#include "kudu/util/test_util_prod.h"
#include "kudu/util/threadpool.h"
DEFINE_int32(fs_target_data_dirs_per_tablet, 3,
"Indicates the target number of data dirs to spread each "
"tablet's data across. If greater than the number of data dirs "
"available, data will be striped across those available. A "
"value of 0 indicates striping should occur across all healthy "
"data dirs. Using fewer data dirs per tablet means a single "
"drive failure will be less likely to affect a given tablet.");
DEFINE_validator(fs_target_data_dirs_per_tablet,
[](const char* /*n*/, int32_t v) { return v >= 0; });
TAG_FLAG(fs_target_data_dirs_per_tablet, advanced);
TAG_FLAG(fs_target_data_dirs_per_tablet, evolving);
DEFINE_int64(fs_data_dirs_reserved_bytes, -1,
"Number of bytes to reserve on each data directory filesystem for "
"non-Kudu usage. The default, which is represented by -1, is that "
"1% of the disk space on each disk will be reserved. Any other "
"value specified represents the number of bytes reserved and must "
"be greater than or equal to 0. Explicit percentages to reserve "
"are not currently supported");
DEFINE_validator(fs_data_dirs_reserved_bytes, [](const char* /*n*/, int64_t v) { return v >= -1; });
TAG_FLAG(fs_data_dirs_reserved_bytes, runtime);
TAG_FLAG(fs_data_dirs_reserved_bytes, evolving);
DEFINE_int32(fs_data_dirs_available_space_cache_seconds, 10,
"Number of seconds we cache the available disk space in the block manager.");
DEFINE_validator(fs_data_dirs_available_space_cache_seconds,
[](const char* /*n*/, int32_t v) { return v >= 0; });
TAG_FLAG(fs_data_dirs_available_space_cache_seconds, advanced);
TAG_FLAG(fs_data_dirs_available_space_cache_seconds, evolving);
DEFINE_int32(fs_wal_dir_available_space_cache_seconds, 10,
"Number of seconds we cache the available disk space the WAL directory.");
DEFINE_validator(fs_wal_dir_available_space_cache_seconds,
[](const char* /*n*/, int32_t v) { return v >= 0; });
TAG_FLAG(fs_wal_dir_available_space_cache_seconds, advanced);
TAG_FLAG(fs_wal_dir_available_space_cache_seconds, evolving);
DEFINE_bool(fs_lock_data_dirs, true,
"Lock the data directories to prevent concurrent usage. "
"Note that read-only concurrent usage is still allowed.");
TAG_FLAG(fs_lock_data_dirs, unsafe);
TAG_FLAG(fs_lock_data_dirs, evolving);
DEFINE_bool(fs_data_dirs_consider_available_space, true,
"Whether to consider available space when selecting a data "
"directory during tablet or data block creation.");
TAG_FLAG(fs_data_dirs_consider_available_space, runtime);
TAG_FLAG(fs_data_dirs_consider_available_space, evolving);
DEFINE_uint64(fs_max_thread_count_per_data_dir, 8,
"Maximum work thread per data directory.");
TAG_FLAG(fs_max_thread_count_per_data_dir, advanced);
METRIC_DEFINE_gauge_uint64(server, data_dirs_failed,
"Data Directories Failed",
kudu::MetricUnit::kDataDirectories,
"Number of data directories whose disks are currently "
"in a failed state",
kudu::MetricLevel::kWarn);
METRIC_DEFINE_gauge_uint64(server, data_dirs_full,
"Data Directories Full",
kudu::MetricUnit::kDataDirectories,
"Number of data directories whose disks are currently full",
kudu::MetricLevel::kWarn);
DECLARE_bool(enable_data_block_fsync);
DECLARE_string(block_manager);
namespace kudu {
namespace fs {
using internal::DataDirGroup;
using std::default_random_engine;
using std::pair;
using std::set;
using std::shuffle;
using std::string;
using std::unique_ptr;
using std::unordered_map;
using std::unordered_set;
using std::vector;
using strings::Substitute;
using strings::SubstituteAndAppend;
////////////////////////////////////////////////////////////
// DataDirMetrics
////////////////////////////////////////////////////////////
#define GINIT(member, x) member = METRIC_##x.Instantiate(metric_entity, 0)
DataDirMetrics::DataDirMetrics(const scoped_refptr<MetricEntity>& metric_entity) {
GINIT(dirs_failed, data_dirs_failed);
GINIT(dirs_full, data_dirs_full);
}
#undef GINIT
////////////////////////////////////////////////////////////
// DataDirGroup
////////////////////////////////////////////////////////////
DataDirGroup::DataDirGroup() {}
DataDirGroup::DataDirGroup(vector<int> uuid_indices)
: uuid_indices_(std::move(uuid_indices)) {}
Status DataDirGroup::LoadFromPB(const UuidIndexByUuidMap& uuid_idx_by_uuid,
const DataDirGroupPB& pb) {
vector<int> uuid_indices;
for (const auto& uuid : pb.uuids()) {
int uuid_idx;
if (!FindCopy(uuid_idx_by_uuid, uuid, &uuid_idx)) {
return Status::NotFound(Substitute(
"could not find data dir with uuid $0", uuid));
}
uuid_indices.emplace_back(uuid_idx);
}
uuid_indices_ = std::move(uuid_indices);
return Status::OK();
}
Status DataDirGroup::CopyToPB(const UuidByUuidIndexMap& uuid_by_uuid_idx,
DataDirGroupPB* pb) const {
DCHECK(pb);
DataDirGroupPB group;
for (auto uuid_idx : uuid_indices_) {
string uuid;
if (!FindCopy(uuid_by_uuid_idx, uuid_idx, &uuid)) {
return Status::NotFound(Substitute(
"could not find data dir with uuid index $0", uuid_idx));
}
group.mutable_uuids()->Add(std::move(uuid));
}
*pb = std::move(group);
return Status::OK();
}
////////////////////////////////////////////////////////////
// DataDir
////////////////////////////////////////////////////////////
DataDir::DataDir(Env* env, DirMetrics* metrics, FsType fs_type, std::string dir,
std::unique_ptr<DirInstanceMetadataFile> metadata_file,
std::unique_ptr<ThreadPool> pool)
: Dir(env, metrics, fs_type, std::move(dir), std::move(metadata_file), std::move(pool)) {}
std::unique_ptr<Dir> DataDirManager::CreateNewDir(
Env* env, DirMetrics* metrics, FsType fs_type,
std::string dir, std::unique_ptr<DirInstanceMetadataFile> metadata_file,
std::unique_ptr<ThreadPool> pool) {
return unique_ptr<Dir>(new DataDir(env, metrics, fs_type, std::move(dir),
std::move(metadata_file), std::move(pool)));
}
int DataDir::available_space_cache_secs() const {
return FLAGS_fs_data_dirs_available_space_cache_seconds;
}
int DataDir::reserved_bytes() const {
return FLAGS_fs_data_dirs_reserved_bytes;
}
////////////////////////////////////////////////////////////
// DataDirManager
////////////////////////////////////////////////////////////
DataDirManagerOptions::DataDirManagerOptions()
: DirManagerOptions(FLAGS_block_manager) {}
DataDirManager::DataDirManager(Env* env,
const DataDirManagerOptions& opts,
CanonicalizedRootsList canonicalized_data_roots)
: DirManager(env, opts.metric_entity ?
unique_ptr<DirMetrics>(new DataDirMetrics(opts.metric_entity)) : nullptr,
FLAGS_fs_max_thread_count_per_data_dir,
opts, std::move(canonicalized_data_roots)) {}
Status DataDirManager::OpenExistingForTests(Env* env,
vector<string> data_fs_roots,
const DataDirManagerOptions& opts,
unique_ptr<DataDirManager>* dd_manager) {
CanonicalizedRootsList roots;
for (const auto& r : data_fs_roots) {
roots.push_back({ r, Status::OK() });
}
return DataDirManager::OpenExisting(env, std::move(roots), opts, dd_manager);
}
Status DataDirManager::OpenExisting(Env* env, CanonicalizedRootsList data_fs_roots,
const DataDirManagerOptions& opts,
unique_ptr<DataDirManager>* dd_manager) {
unique_ptr<DataDirManager> dm;
dm.reset(new DataDirManager(env, opts, std::move(data_fs_roots)));
RETURN_NOT_OK(dm->Open());
dd_manager->swap(dm);
return Status::OK();
}
Status DataDirManager::CreateNewForTests(Env* env, vector<string> data_fs_roots,
const DataDirManagerOptions& opts,
unique_ptr<DataDirManager>* dd_manager) {
CanonicalizedRootsList roots;
for (const auto& r : data_fs_roots) {
roots.push_back({ r, Status::OK() });
}
return DataDirManager::CreateNew(env, std::move(roots), opts, dd_manager);
}
Status DataDirManager::CreateNew(Env* env, CanonicalizedRootsList data_fs_roots,
const DataDirManagerOptions& opts,
unique_ptr<DataDirManager>* dd_manager) {
unique_ptr<DataDirManager> dm;
dm.reset(new DataDirManager(env, opts, std::move(data_fs_roots)));
RETURN_NOT_OK(dm->Create());
RETURN_NOT_OK(dm->Open());
dd_manager->swap(dm);
return Status::OK();
}
bool DataDirManager::sync_dirs() const {
return FLAGS_enable_data_block_fsync;
}
bool DataDirManager::lock_dirs() const {
return FLAGS_fs_lock_data_dirs;
}
int DataDirManager::max_dirs() const {
return opts_.dir_type == "file" ? (1 << 16) - 1 : kint32max;
}
Status DataDirManager::PopulateDirectoryMaps(const vector<unique_ptr<Dir>>& dirs) {
if (opts_.dir_type == "log") {
return DirManager::PopulateDirectoryMaps(dirs);
}
DCHECK_EQ("file", opts_.dir_type);
// When assigning directories for the file block manager, the UUID indexes
// must match what exists in the instance files' list of UUIDs.
unordered_map<string, int> uuid_to_idx;
for (const auto& dd : dirs) {
// Find a healthy instance file and use its set of UUIDs.
if (dd->instance()->healthy()) {
const auto& dir_set = dd->instance()->metadata()->dir_set();
VLOG(1) << Substitute("using dir set $0 as reference: $1",
dd->instance()->path(), pb_util::SecureDebugString(dir_set));
for (int idx = 0; idx < dir_set.all_uuids_size(); idx++) {
const string& uuid = dir_set.all_uuids(idx);
InsertIfNotPresent(&uuid_to_idx, uuid, idx);
}
break;
}
}
// We should have the same number of UUID assignments as directories.
if (dirs.size() != uuid_to_idx.size()) {
return Status::Corruption(
Substitute("instance files contain duplicate UUIDs: $0 unique UUIDs expected, got $1",
dirs.size(), uuid_to_idx.size()));
}
// Keep track of any dirs that were not referenced in the dir set. These
// are presumably from instance files we failed to read. We'll assign them
// indexes of those that remain.
vector<Dir*> unassigned_dirs;
for (const auto& dd : dirs) {
const auto& uuid = dd->instance()->uuid();
int* idx = FindOrNull(uuid_to_idx, uuid);
if (idx) {
InsertToMaps(uuid, *idx, dd.get());
uuid_to_idx.erase(uuid);
} else {
LOG(WARNING) << Substitute("instance $0 has unknown UUID $1",
dd->instance()->path(), uuid);
unassigned_dirs.emplace_back(dd.get());
}
}
DCHECK_EQ(unassigned_dirs.size(), uuid_to_idx.size());
int unassigned_dir_idx = 0;
for (const auto& failed_uuid_and_idx : uuid_to_idx) {
InsertToMaps(failed_uuid_and_idx.first, failed_uuid_and_idx.second,
unassigned_dirs[unassigned_dir_idx++]);
}
return Status::OK();
}
Status DataDirManager::LoadDataDirGroupFromPB(const std::string& tablet_id,
const DataDirGroupPB& pb) {
std::lock_guard<percpu_rwlock> lock(dir_group_lock_);
DataDirGroup group_from_pb;
RETURN_NOT_OK_PREPEND(group_from_pb.LoadFromPB(idx_by_uuid_, pb), Substitute(
"could not load data dir group for tablet $0", tablet_id));
DataDirGroup* other = InsertOrReturnExisting(&group_by_tablet_map_,
tablet_id,
group_from_pb);
if (other != nullptr) {
return Status::AlreadyPresent(Substitute(
"tried to load directory group for tablet $0 but one is already registered",
tablet_id));
}
for (int uuid_idx : group_from_pb.uuid_indices()) {
InsertOrDie(&FindOrDie(tablets_by_uuid_idx_map_, uuid_idx), tablet_id);
}
return Status::OK();
}
Status DataDirManager::CreateDataDirGroup(const string& tablet_id,
DirDistributionMode mode) {
std::lock_guard<percpu_rwlock> write_lock(dir_group_lock_);
if (ContainsKey(group_by_tablet_map_, tablet_id)) {
return Status::AlreadyPresent("Tried to create directory group for tablet but one is already "
"registered", tablet_id);
}
// Adjust the disk group size to fit within the total number of data dirs.
int group_target_size;
if (FLAGS_fs_target_data_dirs_per_tablet == 0) {
group_target_size = dirs_.size();
} else {
group_target_size = std::min(FLAGS_fs_target_data_dirs_per_tablet,
static_cast<int>(dirs_.size()));
}
vector<int> group_indices;
if (mode == DirDistributionMode::ACROSS_ALL_DIRS) {
// If using all dirs, add all regardless of directory state.
AppendKeysFromMap(dir_by_uuid_idx_, &group_indices);
} else {
// Randomly select directories, giving preference to those with fewer tablets.
if (PREDICT_FALSE(!failed_dirs_.empty())) {
group_target_size = std::min(group_target_size,
static_cast<int>(dirs_.size()) - static_cast<int>(failed_dirs_.size()));
// A size of 0 would indicate no healthy disks, which should crash the server.
DCHECK_GE(group_target_size, 0);
if (group_target_size == 0) {
return Status::IOError("No healthy data directories available", "", ENODEV);
}
}
GetDirsForGroupUnlocked(group_target_size, &group_indices);
if (PREDICT_FALSE(group_indices.empty())) {
return Status::IOError("All healthy data directories are full", "", ENOSPC);
}
if (PREDICT_FALSE(group_indices.size() < FLAGS_fs_target_data_dirs_per_tablet)) {
string msg = Substitute("Could only allocate $0 dirs of requested $1 for tablet "
"$2. $3 dirs total", group_indices.size(),
FLAGS_fs_target_data_dirs_per_tablet, tablet_id, dirs_.size());
if (metrics_) {
SubstituteAndAppend(&msg, ", $0 dirs full, $1 dirs failed",
metrics_->dirs_full->value(), metrics_->dirs_failed->value());
}
LOG(INFO) << msg;
}
}
InsertOrDie(&group_by_tablet_map_, tablet_id, DataDirGroup(group_indices));
for (int uuid_idx : group_indices) {
InsertOrDie(&FindOrDie(tablets_by_uuid_idx_map_, uuid_idx), tablet_id);
}
return Status::OK();
}
Status DataDirManager::GetDirForBlock(const CreateBlockOptions& opts, Dir** dir,
int* new_target_group_size) const {
shared_lock<rw_spinlock> lock(dir_group_lock_.get_lock());
vector<int> healthy_uuid_indices;
const DataDirGroup* group = nullptr;
DataDirGroup group_for_tests;
if (PREDICT_TRUE(!opts.tablet_id.empty())) {
// Get the data dir group for the tablet.
group = FindOrNull(group_by_tablet_map_, opts.tablet_id);
if (group == nullptr) {
return Status::NotFound("Tried to get directory but no directory group "
"registered for tablet", opts.tablet_id);
}
} else {
// This should only be reached by some tests; in cases where there is no
// natural tablet_id, select a data dir from any of the directories.
CHECK(IsGTest());
vector<int> all_uuid_indices;
AppendKeysFromMap(dir_by_uuid_idx_, &all_uuid_indices);
group_for_tests = DataDirGroup(std::move(all_uuid_indices));
group = &group_for_tests;
}
// Within a given directory group, filter out the ones that are failed.
if (PREDICT_TRUE(failed_dirs_.empty())) {
healthy_uuid_indices = group->uuid_indices();
} else {
RemoveUnhealthyDataDirsUnlocked(group->uuid_indices(), &healthy_uuid_indices);
if (healthy_uuid_indices.empty()) {
return Status::IOError("No healthy directories exist in tablet's "
"directory group", opts.tablet_id, ENODEV);
}
}
// Within a given directory group, filter out the ones that are full.
vector<Dir*> candidate_dirs;
for (auto uuid_idx : healthy_uuid_indices) {
Dir* candidate = FindOrDie(dir_by_uuid_idx_, uuid_idx);
Status s = candidate->RefreshAvailableSpace(Dir::RefreshMode::EXPIRED_ONLY);
WARN_NOT_OK(s, Substitute("failed to refresh fullness of $0", candidate->dir()));
if (s.ok() && !candidate->is_full()) {
candidate_dirs.emplace_back(candidate);
}
}
// If all the directories in the group are full, return an ENOSPC error.
if (PREDICT_FALSE(candidate_dirs.empty())) {
DCHECK(group);
size_t num_total = group->uuid_indices().size();
size_t num_full = 0;
for (const auto& idx : group->uuid_indices()) {
if (FindOrDie(dir_by_uuid_idx_, idx)->is_full()) {
num_full++;
}
}
size_t num_failed = num_total - num_full;
*new_target_group_size = num_total + 1;
return Status::IOError(
Substitute("No directories available in $0's directory group ($1 dirs "
"total, $2 failed, $3 full)",
opts.tablet_id, num_total, num_failed, num_full),
"", ENOSPC);
}
if (candidate_dirs.size() == 1) {
*dir = candidate_dirs[0];
return Status::OK();
}
// Pick two randomly and select the one with more space.
shuffle(candidate_dirs.begin(), candidate_dirs.end(),
default_random_engine(rng_.Next()));
*dir = PREDICT_TRUE(FLAGS_fs_data_dirs_consider_available_space) &&
candidate_dirs[0]->available_bytes() > candidate_dirs[1]->available_bytes() ?
candidate_dirs[0] : candidate_dirs[1];
return Status::OK();
}
void DataDirManager::DeleteDataDirGroup(const std::string& tablet_id) {
std::lock_guard<percpu_rwlock> lock(dir_group_lock_);
DataDirGroup* group = FindOrNull(group_by_tablet_map_, tablet_id);
if (group == nullptr) {
return;
}
// Remove the tablet_id from every data dir in its group.
for (int uuid_idx : group->uuid_indices()) {
FindOrDie(tablets_by_uuid_idx_map_, uuid_idx).erase(tablet_id);
}
group_by_tablet_map_.erase(tablet_id);
}
Status DataDirManager::GetDataDirGroupPB(const string& tablet_id,
DataDirGroupPB* pb) const {
shared_lock<rw_spinlock> lock(dir_group_lock_.get_lock());
const DataDirGroup* group = FindOrNull(group_by_tablet_map_, tablet_id);
if (group == nullptr) {
return Status::NotFound(Substitute(
"could not find data dir group for tablet $0", tablet_id));
}
RETURN_NOT_OK(group->CopyToPB(uuid_by_idx_, pb));
return Status::OK();
}
Status DataDirManager::GetDirAddIfNecessary(const CreateBlockOptions& opts, Dir** dir) {
int new_target_group_size = 0;
Status s = GetDirForBlock(opts, dir, &new_target_group_size);
if (PREDICT_TRUE(s.ok())) {
return Status::OK();
}
const string& tablet_id = opts.tablet_id;
if (tablet_id.empty()) {
// This should only be reached by some tests; in cases where there is no
// natural tablet_id. Just return whatever error we got.
CHECK(IsGTest());
return s;
}
// If we failed for a reason other than lack of space in the data dir gruop,
// return the error.
if (!s.IsIOError() || s.posix_code() != ENOSPC) {
return s;
}
// If we couldn't get a directory because the group is out of space, try
// adding a new directory to the group.
DCHECK_GT(new_target_group_size, 0);
std::lock_guard<percpu_rwlock> l(dir_group_lock_);
const DataDirGroup& group = FindOrDie(group_by_tablet_map_, tablet_id);
// If we're already at the new target group size (e.g. because another
// thread has added a directory), just return the newly added directory.
if (new_target_group_size <= group.uuid_indices().size()) {
*dir = FindOrDie(dir_by_uuid_idx_, group.uuid_indices().back());
return Status::OK();
}
vector<int> group_uuid_indices = group.uuid_indices();
GetDirsForGroupUnlocked(new_target_group_size, &group_uuid_indices);
if (PREDICT_FALSE(group_uuid_indices.size() < new_target_group_size)) {
// If we couldn't add to the group, return an error.
int num_total = dirs_.size();
int num_failed = failed_dirs_.size();
int num_full = 0;
for (const auto& dd : dirs_) {
if (dd->is_full()) num_full++;
}
return Status::IOError(
Substitute("No directories could be added ($0 dirs total, $1 failed, $2 full): $3",
num_total, num_failed, num_full, s.ToString()), "", ENODEV);
}
// Update the groups. The tablet ID should not be associated with the data
// dir already, and we should update the existing data dir group.
// NOTE: it is not the responsibility of the DataDirManager to persist groups
// to disk. On-disk representations of data dir groups (e.g. those in the
// TabletSuperBlockPB) should be re-written upon modifying them.
int new_uuid_idx = group_uuid_indices.back();
InsertOrDie(&FindOrDie(tablets_by_uuid_idx_map_, new_uuid_idx), tablet_id);
CHECK(!EmplaceOrUpdate(&group_by_tablet_map_, tablet_id, DataDirGroup(group_uuid_indices)));
*dir = FindOrDie(dir_by_uuid_idx_, new_uuid_idx);
LOG(INFO) << Substitute("Added $0 to $1's directory group: $2",
(*dir)->dir(), tablet_id, s.ToString());
return Status::OK();
}
void DataDirManager::GetDirsForGroupUnlocked(int target_size,
vector<int>* group_indices) {
DCHECK(dir_group_lock_.is_locked());
vector<int> candidate_indices;
unordered_set<int> existing_group_indices(group_indices->begin(), group_indices->end());
for (auto& e : dir_by_uuid_idx_) {
int uuid_idx = e.first;
DCHECK_LT(uuid_idx, dirs_.size());
if (ContainsKey(existing_group_indices, uuid_idx) ||
ContainsKey(failed_dirs_, uuid_idx)) {
continue;
}
Dir* dd = e.second;
Status s = dd->RefreshAvailableSpace(Dir::RefreshMode::ALWAYS);
WARN_NOT_OK(s, Substitute("failed to refresh fullness of $0", dd->dir()));
if (s.ok() && !dd->is_full()) {
// TODO(awong): If a disk is unhealthy at the time of group creation, the
// resulting group may be below targeted size. Add functionality to
// resize groups. See KUDU-2040 for more details.
candidate_indices.push_back(uuid_idx);
}
}
while (group_indices->size() < target_size && !candidate_indices.empty()) {
shuffle(candidate_indices.begin(), candidate_indices.end(), default_random_engine(rng_.Next()));
if (candidate_indices.size() == 1) {
group_indices->push_back(candidate_indices[0]);
candidate_indices.erase(candidate_indices.begin());
} else {
int tablets_in_first = FindOrDie(tablets_by_uuid_idx_map_, candidate_indices[0]).size();
int tablets_in_second = FindOrDie(tablets_by_uuid_idx_map_, candidate_indices[1]).size();
int selected_index = 0;
if (tablets_in_first == tablets_in_second &&
PREDICT_TRUE(FLAGS_fs_data_dirs_consider_available_space)) {
int64_t space_in_first = FindOrDie(dir_by_uuid_idx_,
candidate_indices[0])->available_bytes();
int64_t space_in_second = FindOrDie(dir_by_uuid_idx_,
candidate_indices[1])->available_bytes();
selected_index = space_in_first > space_in_second ? 0 : 1;
} else {
selected_index = tablets_in_first < tablets_in_second ? 0 : 1;
}
group_indices->push_back(candidate_indices[selected_index]);
candidate_indices.erase(candidate_indices.begin() + selected_index);
}
}
}
Status DataDirManager::FindDataDirsByTabletId(const string& tablet_id,
vector<string>* data_dirs) const {
CHECK(data_dirs);
DataDirGroupPB group;
RETURN_NOT_OK(GetDataDirGroupPB(tablet_id, &group));
vector<string> data_dirs_tmp;
data_dirs_tmp.reserve(group.uuids_size());
for (const auto& uuid : group.uuids()) {
int uuid_idx;
if (!FindUuidIndexByUuid(uuid, &uuid_idx)) {
return Status::NotFound("unable to find index for UUID", uuid);
}
const auto* data_dir = FindDirByUuidIndex(uuid_idx);
if (!data_dir) {
return Status::NotFound(
Substitute("unable to find data dir for UUID $0 with index $1",
uuid, uuid_idx));
}
data_dirs_tmp.emplace_back(data_dir->dir());
}
std::sort(data_dirs_tmp.begin(), data_dirs_tmp.end());
*data_dirs = std::move(data_dirs_tmp);
return Status::OK();
}
void DataDirManager::RemoveUnhealthyDataDirsUnlocked(const vector<int>& uuid_indices,
vector<int>* healthy_indices) const {
if (PREDICT_TRUE(failed_dirs_.empty())) {
return;
}
healthy_indices->clear();
for (int uuid_idx : uuid_indices) {
if (!ContainsKey(failed_dirs_, uuid_idx)) {
healthy_indices->emplace_back(uuid_idx);
}
}
}
} // namespace fs
} // namespace kudu