| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/fs/data_dirs.h" |
| |
| #include <algorithm> |
| #include <cerrno> |
| #include <cstddef> |
| #include <cstdint> |
| #include <memory> |
| #include <mutex> |
| #include <ostream> |
| #include <random> |
| #include <set> |
| #include <string> |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| #include <google/protobuf/stubs/common.h> |
| |
| #include "kudu/fs/block_manager.h" |
| #include "kudu/fs/dir_util.h" |
| #include "kudu/fs/fs.pb.h" |
| #include "kudu/gutil/integral_types.h" |
| #include "kudu/gutil/macros.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/locks.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/random.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/test_util_prod.h" |
| #include "kudu/util/threadpool.h" |
| |
| DEFINE_int32(fs_target_data_dirs_per_tablet, 3, |
| "Indicates the target number of data dirs to spread each " |
| "tablet's data across. If greater than the number of data dirs " |
| "available, data will be striped across those available. A " |
| "value of 0 indicates striping should occur across all healthy " |
| "data dirs. Using fewer data dirs per tablet means a single " |
| "drive failure will be less likely to affect a given tablet."); |
| DEFINE_validator(fs_target_data_dirs_per_tablet, |
| [](const char* /*n*/, int32_t v) { return v >= 0; }); |
| TAG_FLAG(fs_target_data_dirs_per_tablet, advanced); |
| TAG_FLAG(fs_target_data_dirs_per_tablet, evolving); |
| |
| DEFINE_int64(fs_data_dirs_reserved_bytes, -1, |
| "Number of bytes to reserve on each data directory filesystem for " |
| "non-Kudu usage. The default, which is represented by -1, is that " |
| "1% of the disk space on each disk will be reserved. Any other " |
| "value specified represents the number of bytes reserved and must " |
| "be greater than or equal to 0. Explicit percentages to reserve " |
| "are not currently supported"); |
| DEFINE_validator(fs_data_dirs_reserved_bytes, [](const char* /*n*/, int64_t v) { return v >= -1; }); |
| TAG_FLAG(fs_data_dirs_reserved_bytes, runtime); |
| TAG_FLAG(fs_data_dirs_reserved_bytes, evolving); |
| |
| DEFINE_int32(fs_data_dirs_available_space_cache_seconds, 10, |
| "Number of seconds we cache the available disk space in the block manager."); |
| DEFINE_validator(fs_data_dirs_available_space_cache_seconds, |
| [](const char* /*n*/, int32_t v) { return v >= 0; }); |
| TAG_FLAG(fs_data_dirs_available_space_cache_seconds, advanced); |
| TAG_FLAG(fs_data_dirs_available_space_cache_seconds, evolving); |
| |
| DEFINE_int32(fs_wal_dir_available_space_cache_seconds, 10, |
| "Number of seconds we cache the available disk space the WAL directory."); |
| DEFINE_validator(fs_wal_dir_available_space_cache_seconds, |
| [](const char* /*n*/, int32_t v) { return v >= 0; }); |
| TAG_FLAG(fs_wal_dir_available_space_cache_seconds, advanced); |
| TAG_FLAG(fs_wal_dir_available_space_cache_seconds, evolving); |
| |
| DEFINE_bool(fs_lock_data_dirs, true, |
| "Lock the data directories to prevent concurrent usage. " |
| "Note that read-only concurrent usage is still allowed."); |
| TAG_FLAG(fs_lock_data_dirs, unsafe); |
| TAG_FLAG(fs_lock_data_dirs, evolving); |
| |
| DEFINE_bool(fs_data_dirs_consider_available_space, true, |
| "Whether to consider available space when selecting a data " |
| "directory during tablet or data block creation."); |
| TAG_FLAG(fs_data_dirs_consider_available_space, runtime); |
| TAG_FLAG(fs_data_dirs_consider_available_space, evolving); |
| |
| DEFINE_uint64(fs_max_thread_count_per_data_dir, 8, |
| "Maximum work thread per data directory."); |
| TAG_FLAG(fs_max_thread_count_per_data_dir, advanced); |
| |
| METRIC_DEFINE_gauge_uint64(server, data_dirs_failed, |
| "Data Directories Failed", |
| kudu::MetricUnit::kDataDirectories, |
| "Number of data directories whose disks are currently " |
| "in a failed state", |
| kudu::MetricLevel::kWarn); |
| METRIC_DEFINE_gauge_uint64(server, data_dirs_full, |
| "Data Directories Full", |
| kudu::MetricUnit::kDataDirectories, |
| "Number of data directories whose disks are currently full", |
| kudu::MetricLevel::kWarn); |
| |
| DECLARE_bool(enable_data_block_fsync); |
| DECLARE_string(block_manager); |
| |
| namespace kudu { |
| |
| namespace fs { |
| |
| using internal::DataDirGroup; |
| using std::default_random_engine; |
| using std::pair; |
| using std::set; |
| using std::shuffle; |
| using std::string; |
| using std::unique_ptr; |
| using std::unordered_map; |
| using std::unordered_set; |
| using std::vector; |
| using strings::Substitute; |
| using strings::SubstituteAndAppend; |
| |
| |
| //////////////////////////////////////////////////////////// |
| // DataDirMetrics |
| //////////////////////////////////////////////////////////// |
| |
| #define GINIT(member, x) member = METRIC_##x.Instantiate(metric_entity, 0) |
| DataDirMetrics::DataDirMetrics(const scoped_refptr<MetricEntity>& metric_entity) { |
| GINIT(dirs_failed, data_dirs_failed); |
| GINIT(dirs_full, data_dirs_full); |
| } |
| #undef GINIT |
| |
| //////////////////////////////////////////////////////////// |
| // DataDirGroup |
| //////////////////////////////////////////////////////////// |
| |
| DataDirGroup::DataDirGroup() {} |
| |
| DataDirGroup::DataDirGroup(vector<int> uuid_indices) |
| : uuid_indices_(std::move(uuid_indices)) {} |
| |
| Status DataDirGroup::LoadFromPB(const UuidIndexByUuidMap& uuid_idx_by_uuid, |
| const DataDirGroupPB& pb) { |
| vector<int> uuid_indices; |
| for (const auto& uuid : pb.uuids()) { |
| int uuid_idx; |
| if (!FindCopy(uuid_idx_by_uuid, uuid, &uuid_idx)) { |
| return Status::NotFound(Substitute( |
| "could not find data dir with uuid $0", uuid)); |
| } |
| uuid_indices.emplace_back(uuid_idx); |
| } |
| |
| uuid_indices_ = std::move(uuid_indices); |
| return Status::OK(); |
| } |
| |
| Status DataDirGroup::CopyToPB(const UuidByUuidIndexMap& uuid_by_uuid_idx, |
| DataDirGroupPB* pb) const { |
| DCHECK(pb); |
| DataDirGroupPB group; |
| for (auto uuid_idx : uuid_indices_) { |
| string uuid; |
| if (!FindCopy(uuid_by_uuid_idx, uuid_idx, &uuid)) { |
| return Status::NotFound(Substitute( |
| "could not find data dir with uuid index $0", uuid_idx)); |
| } |
| group.mutable_uuids()->Add(std::move(uuid)); |
| } |
| |
| *pb = std::move(group); |
| return Status::OK(); |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // DataDir |
| //////////////////////////////////////////////////////////// |
| |
| DataDir::DataDir(Env* env, DirMetrics* metrics, FsType fs_type, std::string dir, |
| std::unique_ptr<DirInstanceMetadataFile> metadata_file, |
| std::unique_ptr<ThreadPool> pool) |
| : Dir(env, metrics, fs_type, std::move(dir), std::move(metadata_file), std::move(pool)) {} |
| |
| std::unique_ptr<Dir> DataDirManager::CreateNewDir( |
| Env* env, DirMetrics* metrics, FsType fs_type, |
| std::string dir, std::unique_ptr<DirInstanceMetadataFile> metadata_file, |
| std::unique_ptr<ThreadPool> pool) { |
| return unique_ptr<Dir>(new DataDir(env, metrics, fs_type, std::move(dir), |
| std::move(metadata_file), std::move(pool))); |
| } |
| |
| int DataDir::available_space_cache_secs() const { |
| return FLAGS_fs_data_dirs_available_space_cache_seconds; |
| } |
| |
| int DataDir::reserved_bytes() const { |
| return FLAGS_fs_data_dirs_reserved_bytes; |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // DataDirManager |
| //////////////////////////////////////////////////////////// |
| |
| DataDirManagerOptions::DataDirManagerOptions() |
| : DirManagerOptions(FLAGS_block_manager) {} |
| |
| DataDirManager::DataDirManager(Env* env, |
| const DataDirManagerOptions& opts, |
| CanonicalizedRootsList canonicalized_data_roots) |
| : DirManager(env, opts.metric_entity ? |
| unique_ptr<DirMetrics>(new DataDirMetrics(opts.metric_entity)) : nullptr, |
| FLAGS_fs_max_thread_count_per_data_dir, |
| opts, std::move(canonicalized_data_roots)) {} |
| |
| Status DataDirManager::OpenExistingForTests(Env* env, |
| vector<string> data_fs_roots, |
| const DataDirManagerOptions& opts, |
| unique_ptr<DataDirManager>* dd_manager) { |
| CanonicalizedRootsList roots; |
| for (const auto& r : data_fs_roots) { |
| roots.push_back({ r, Status::OK() }); |
| } |
| return DataDirManager::OpenExisting(env, std::move(roots), opts, dd_manager); |
| } |
| |
| Status DataDirManager::OpenExisting(Env* env, CanonicalizedRootsList data_fs_roots, |
| const DataDirManagerOptions& opts, |
| unique_ptr<DataDirManager>* dd_manager) { |
| unique_ptr<DataDirManager> dm; |
| dm.reset(new DataDirManager(env, opts, std::move(data_fs_roots))); |
| RETURN_NOT_OK(dm->Open()); |
| dd_manager->swap(dm); |
| return Status::OK(); |
| } |
| |
| Status DataDirManager::CreateNewForTests(Env* env, vector<string> data_fs_roots, |
| const DataDirManagerOptions& opts, |
| unique_ptr<DataDirManager>* dd_manager) { |
| CanonicalizedRootsList roots; |
| for (const auto& r : data_fs_roots) { |
| roots.push_back({ r, Status::OK() }); |
| } |
| return DataDirManager::CreateNew(env, std::move(roots), opts, dd_manager); |
| } |
| |
| Status DataDirManager::CreateNew(Env* env, CanonicalizedRootsList data_fs_roots, |
| const DataDirManagerOptions& opts, |
| unique_ptr<DataDirManager>* dd_manager) { |
| unique_ptr<DataDirManager> dm; |
| dm.reset(new DataDirManager(env, opts, std::move(data_fs_roots))); |
| RETURN_NOT_OK(dm->Create()); |
| RETURN_NOT_OK(dm->Open()); |
| dd_manager->swap(dm); |
| return Status::OK(); |
| } |
| |
| bool DataDirManager::sync_dirs() const { |
| return FLAGS_enable_data_block_fsync; |
| } |
| |
| bool DataDirManager::lock_dirs() const { |
| return FLAGS_fs_lock_data_dirs; |
| } |
| |
| int DataDirManager::max_dirs() const { |
| return opts_.dir_type == "file" ? (1 << 16) - 1 : kint32max; |
| } |
| |
| Status DataDirManager::PopulateDirectoryMaps(const vector<unique_ptr<Dir>>& dirs) { |
| if (opts_.dir_type == "log") { |
| return DirManager::PopulateDirectoryMaps(dirs); |
| } |
| DCHECK_EQ("file", opts_.dir_type); |
| // When assigning directories for the file block manager, the UUID indexes |
| // must match what exists in the instance files' list of UUIDs. |
| unordered_map<string, int> uuid_to_idx; |
| for (const auto& dd : dirs) { |
| // Find a healthy instance file and use its set of UUIDs. |
| if (dd->instance()->healthy()) { |
| const auto& dir_set = dd->instance()->metadata()->dir_set(); |
| VLOG(1) << Substitute("using dir set $0 as reference: $1", |
| dd->instance()->path(), pb_util::SecureDebugString(dir_set)); |
| for (int idx = 0; idx < dir_set.all_uuids_size(); idx++) { |
| const string& uuid = dir_set.all_uuids(idx); |
| InsertIfNotPresent(&uuid_to_idx, uuid, idx); |
| } |
| break; |
| } |
| } |
| // We should have the same number of UUID assignments as directories. |
| if (dirs.size() != uuid_to_idx.size()) { |
| return Status::Corruption( |
| Substitute("instance files contain duplicate UUIDs: $0 unique UUIDs expected, got $1", |
| dirs.size(), uuid_to_idx.size())); |
| } |
| // Keep track of any dirs that were not referenced in the dir set. These |
| // are presumably from instance files we failed to read. We'll assign them |
| // indexes of those that remain. |
| vector<Dir*> unassigned_dirs; |
| for (const auto& dd : dirs) { |
| const auto& uuid = dd->instance()->uuid(); |
| int* idx = FindOrNull(uuid_to_idx, uuid); |
| if (idx) { |
| InsertToMaps(uuid, *idx, dd.get()); |
| uuid_to_idx.erase(uuid); |
| } else { |
| LOG(WARNING) << Substitute("instance $0 has unknown UUID $1", |
| dd->instance()->path(), uuid); |
| unassigned_dirs.emplace_back(dd.get()); |
| } |
| } |
| DCHECK_EQ(unassigned_dirs.size(), uuid_to_idx.size()); |
| int unassigned_dir_idx = 0; |
| for (const auto& failed_uuid_and_idx : uuid_to_idx) { |
| InsertToMaps(failed_uuid_and_idx.first, failed_uuid_and_idx.second, |
| unassigned_dirs[unassigned_dir_idx++]); |
| } |
| return Status::OK(); |
| } |
| |
| Status DataDirManager::LoadDataDirGroupFromPB(const std::string& tablet_id, |
| const DataDirGroupPB& pb) { |
| std::lock_guard<percpu_rwlock> lock(dir_group_lock_); |
| DataDirGroup group_from_pb; |
| RETURN_NOT_OK_PREPEND(group_from_pb.LoadFromPB(idx_by_uuid_, pb), Substitute( |
| "could not load data dir group for tablet $0", tablet_id)); |
| DataDirGroup* other = InsertOrReturnExisting(&group_by_tablet_map_, |
| tablet_id, |
| group_from_pb); |
| if (other != nullptr) { |
| return Status::AlreadyPresent(Substitute( |
| "tried to load directory group for tablet $0 but one is already registered", |
| tablet_id)); |
| } |
| for (int uuid_idx : group_from_pb.uuid_indices()) { |
| InsertOrDie(&FindOrDie(tablets_by_uuid_idx_map_, uuid_idx), tablet_id); |
| } |
| return Status::OK(); |
| } |
| |
| Status DataDirManager::CreateDataDirGroup(const string& tablet_id, |
| DirDistributionMode mode) { |
| std::lock_guard<percpu_rwlock> write_lock(dir_group_lock_); |
| if (ContainsKey(group_by_tablet_map_, tablet_id)) { |
| return Status::AlreadyPresent("Tried to create directory group for tablet but one is already " |
| "registered", tablet_id); |
| } |
| // Adjust the disk group size to fit within the total number of data dirs. |
| int group_target_size; |
| if (FLAGS_fs_target_data_dirs_per_tablet == 0) { |
| group_target_size = dirs_.size(); |
| } else { |
| group_target_size = std::min(FLAGS_fs_target_data_dirs_per_tablet, |
| static_cast<int>(dirs_.size())); |
| } |
| vector<int> group_indices; |
| if (mode == DirDistributionMode::ACROSS_ALL_DIRS) { |
| // If using all dirs, add all regardless of directory state. |
| AppendKeysFromMap(dir_by_uuid_idx_, &group_indices); |
| } else { |
| // Randomly select directories, giving preference to those with fewer tablets. |
| if (PREDICT_FALSE(!failed_dirs_.empty())) { |
| group_target_size = std::min(group_target_size, |
| static_cast<int>(dirs_.size()) - static_cast<int>(failed_dirs_.size())); |
| |
| // A size of 0 would indicate no healthy disks, which should crash the server. |
| DCHECK_GE(group_target_size, 0); |
| if (group_target_size == 0) { |
| return Status::IOError("No healthy data directories available", "", ENODEV); |
| } |
| } |
| GetDirsForGroupUnlocked(group_target_size, &group_indices); |
| if (PREDICT_FALSE(group_indices.empty())) { |
| return Status::IOError("All healthy data directories are full", "", ENOSPC); |
| } |
| if (PREDICT_FALSE(group_indices.size() < FLAGS_fs_target_data_dirs_per_tablet)) { |
| string msg = Substitute("Could only allocate $0 dirs of requested $1 for tablet " |
| "$2. $3 dirs total", group_indices.size(), |
| FLAGS_fs_target_data_dirs_per_tablet, tablet_id, dirs_.size()); |
| if (metrics_) { |
| SubstituteAndAppend(&msg, ", $0 dirs full, $1 dirs failed", |
| metrics_->dirs_full->value(), metrics_->dirs_failed->value()); |
| } |
| LOG(INFO) << msg; |
| } |
| } |
| InsertOrDie(&group_by_tablet_map_, tablet_id, DataDirGroup(group_indices)); |
| for (int uuid_idx : group_indices) { |
| InsertOrDie(&FindOrDie(tablets_by_uuid_idx_map_, uuid_idx), tablet_id); |
| } |
| return Status::OK(); |
| } |
| |
| Status DataDirManager::GetDirForBlock(const CreateBlockOptions& opts, Dir** dir, |
| int* new_target_group_size) const { |
| shared_lock<rw_spinlock> lock(dir_group_lock_.get_lock()); |
| vector<int> healthy_uuid_indices; |
| const DataDirGroup* group = nullptr; |
| DataDirGroup group_for_tests; |
| if (PREDICT_TRUE(!opts.tablet_id.empty())) { |
| // Get the data dir group for the tablet. |
| group = FindOrNull(group_by_tablet_map_, opts.tablet_id); |
| if (group == nullptr) { |
| return Status::NotFound("Tried to get directory but no directory group " |
| "registered for tablet", opts.tablet_id); |
| } |
| } else { |
| // This should only be reached by some tests; in cases where there is no |
| // natural tablet_id, select a data dir from any of the directories. |
| CHECK(IsGTest()); |
| vector<int> all_uuid_indices; |
| AppendKeysFromMap(dir_by_uuid_idx_, &all_uuid_indices); |
| group_for_tests = DataDirGroup(std::move(all_uuid_indices)); |
| group = &group_for_tests; |
| } |
| // Within a given directory group, filter out the ones that are failed. |
| if (PREDICT_TRUE(failed_dirs_.empty())) { |
| healthy_uuid_indices = group->uuid_indices(); |
| } else { |
| RemoveUnhealthyDataDirsUnlocked(group->uuid_indices(), &healthy_uuid_indices); |
| if (healthy_uuid_indices.empty()) { |
| return Status::IOError("No healthy directories exist in tablet's " |
| "directory group", opts.tablet_id, ENODEV); |
| } |
| } |
| // Within a given directory group, filter out the ones that are full. |
| vector<Dir*> candidate_dirs; |
| for (auto uuid_idx : healthy_uuid_indices) { |
| Dir* candidate = FindOrDie(dir_by_uuid_idx_, uuid_idx); |
| Status s = candidate->RefreshAvailableSpace(Dir::RefreshMode::EXPIRED_ONLY); |
| WARN_NOT_OK(s, Substitute("failed to refresh fullness of $0", candidate->dir())); |
| if (s.ok() && !candidate->is_full()) { |
| candidate_dirs.emplace_back(candidate); |
| } |
| } |
| |
| // If all the directories in the group are full, return an ENOSPC error. |
| if (PREDICT_FALSE(candidate_dirs.empty())) { |
| DCHECK(group); |
| size_t num_total = group->uuid_indices().size(); |
| size_t num_full = 0; |
| for (const auto& idx : group->uuid_indices()) { |
| if (FindOrDie(dir_by_uuid_idx_, idx)->is_full()) { |
| num_full++; |
| } |
| } |
| size_t num_failed = num_total - num_full; |
| *new_target_group_size = num_total + 1; |
| return Status::IOError( |
| Substitute("No directories available in $0's directory group ($1 dirs " |
| "total, $2 failed, $3 full)", |
| opts.tablet_id, num_total, num_failed, num_full), |
| "", ENOSPC); |
| } |
| if (candidate_dirs.size() == 1) { |
| *dir = candidate_dirs[0]; |
| return Status::OK(); |
| } |
| // Pick two randomly and select the one with more space. |
| shuffle(candidate_dirs.begin(), candidate_dirs.end(), |
| default_random_engine(rng_.Next())); |
| *dir = PREDICT_TRUE(FLAGS_fs_data_dirs_consider_available_space) && |
| candidate_dirs[0]->available_bytes() > candidate_dirs[1]->available_bytes() ? |
| candidate_dirs[0] : candidate_dirs[1]; |
| return Status::OK(); |
| } |
| |
| void DataDirManager::DeleteDataDirGroup(const std::string& tablet_id) { |
| std::lock_guard<percpu_rwlock> lock(dir_group_lock_); |
| DataDirGroup* group = FindOrNull(group_by_tablet_map_, tablet_id); |
| if (group == nullptr) { |
| return; |
| } |
| // Remove the tablet_id from every data dir in its group. |
| for (int uuid_idx : group->uuid_indices()) { |
| FindOrDie(tablets_by_uuid_idx_map_, uuid_idx).erase(tablet_id); |
| } |
| group_by_tablet_map_.erase(tablet_id); |
| } |
| |
| Status DataDirManager::GetDataDirGroupPB(const string& tablet_id, |
| DataDirGroupPB* pb) const { |
| shared_lock<rw_spinlock> lock(dir_group_lock_.get_lock()); |
| const DataDirGroup* group = FindOrNull(group_by_tablet_map_, tablet_id); |
| if (group == nullptr) { |
| return Status::NotFound(Substitute( |
| "could not find data dir group for tablet $0", tablet_id)); |
| } |
| RETURN_NOT_OK(group->CopyToPB(uuid_by_idx_, pb)); |
| return Status::OK(); |
| } |
| |
| Status DataDirManager::GetDirAddIfNecessary(const CreateBlockOptions& opts, Dir** dir) { |
| int new_target_group_size = 0; |
| Status s = GetDirForBlock(opts, dir, &new_target_group_size); |
| if (PREDICT_TRUE(s.ok())) { |
| return Status::OK(); |
| } |
| const string& tablet_id = opts.tablet_id; |
| if (tablet_id.empty()) { |
| // This should only be reached by some tests; in cases where there is no |
| // natural tablet_id. Just return whatever error we got. |
| CHECK(IsGTest()); |
| return s; |
| } |
| // If we failed for a reason other than lack of space in the data dir gruop, |
| // return the error. |
| if (!s.IsIOError() || s.posix_code() != ENOSPC) { |
| return s; |
| } |
| // If we couldn't get a directory because the group is out of space, try |
| // adding a new directory to the group. |
| DCHECK_GT(new_target_group_size, 0); |
| std::lock_guard<percpu_rwlock> l(dir_group_lock_); |
| const DataDirGroup& group = FindOrDie(group_by_tablet_map_, tablet_id); |
| // If we're already at the new target group size (e.g. because another |
| // thread has added a directory), just return the newly added directory. |
| if (new_target_group_size <= group.uuid_indices().size()) { |
| *dir = FindOrDie(dir_by_uuid_idx_, group.uuid_indices().back()); |
| return Status::OK(); |
| } |
| vector<int> group_uuid_indices = group.uuid_indices(); |
| GetDirsForGroupUnlocked(new_target_group_size, &group_uuid_indices); |
| if (PREDICT_FALSE(group_uuid_indices.size() < new_target_group_size)) { |
| // If we couldn't add to the group, return an error. |
| int num_total = dirs_.size(); |
| int num_failed = failed_dirs_.size(); |
| int num_full = 0; |
| for (const auto& dd : dirs_) { |
| if (dd->is_full()) num_full++; |
| } |
| return Status::IOError( |
| Substitute("No directories could be added ($0 dirs total, $1 failed, $2 full): $3", |
| num_total, num_failed, num_full, s.ToString()), "", ENODEV); |
| } |
| // Update the groups. The tablet ID should not be associated with the data |
| // dir already, and we should update the existing data dir group. |
| // NOTE: it is not the responsibility of the DataDirManager to persist groups |
| // to disk. On-disk representations of data dir groups (e.g. those in the |
| // TabletSuperBlockPB) should be re-written upon modifying them. |
| int new_uuid_idx = group_uuid_indices.back(); |
| InsertOrDie(&FindOrDie(tablets_by_uuid_idx_map_, new_uuid_idx), tablet_id); |
| CHECK(!EmplaceOrUpdate(&group_by_tablet_map_, tablet_id, DataDirGroup(group_uuid_indices))); |
| *dir = FindOrDie(dir_by_uuid_idx_, new_uuid_idx); |
| LOG(INFO) << Substitute("Added $0 to $1's directory group: $2", |
| (*dir)->dir(), tablet_id, s.ToString()); |
| return Status::OK(); |
| } |
| |
| void DataDirManager::GetDirsForGroupUnlocked(int target_size, |
| vector<int>* group_indices) { |
| DCHECK(dir_group_lock_.is_locked()); |
| vector<int> candidate_indices; |
| unordered_set<int> existing_group_indices(group_indices->begin(), group_indices->end()); |
| for (auto& e : dir_by_uuid_idx_) { |
| int uuid_idx = e.first; |
| DCHECK_LT(uuid_idx, dirs_.size()); |
| if (ContainsKey(existing_group_indices, uuid_idx) || |
| ContainsKey(failed_dirs_, uuid_idx)) { |
| continue; |
| } |
| Dir* dd = e.second; |
| Status s = dd->RefreshAvailableSpace(Dir::RefreshMode::ALWAYS); |
| WARN_NOT_OK(s, Substitute("failed to refresh fullness of $0", dd->dir())); |
| if (s.ok() && !dd->is_full()) { |
| // TODO(awong): If a disk is unhealthy at the time of group creation, the |
| // resulting group may be below targeted size. Add functionality to |
| // resize groups. See KUDU-2040 for more details. |
| candidate_indices.push_back(uuid_idx); |
| } |
| } |
| while (group_indices->size() < target_size && !candidate_indices.empty()) { |
| shuffle(candidate_indices.begin(), candidate_indices.end(), default_random_engine(rng_.Next())); |
| if (candidate_indices.size() == 1) { |
| group_indices->push_back(candidate_indices[0]); |
| candidate_indices.erase(candidate_indices.begin()); |
| } else { |
| int tablets_in_first = FindOrDie(tablets_by_uuid_idx_map_, candidate_indices[0]).size(); |
| int tablets_in_second = FindOrDie(tablets_by_uuid_idx_map_, candidate_indices[1]).size(); |
| int selected_index = 0; |
| if (tablets_in_first == tablets_in_second && |
| PREDICT_TRUE(FLAGS_fs_data_dirs_consider_available_space)) { |
| int64_t space_in_first = FindOrDie(dir_by_uuid_idx_, |
| candidate_indices[0])->available_bytes(); |
| int64_t space_in_second = FindOrDie(dir_by_uuid_idx_, |
| candidate_indices[1])->available_bytes(); |
| selected_index = space_in_first > space_in_second ? 0 : 1; |
| } else { |
| selected_index = tablets_in_first < tablets_in_second ? 0 : 1; |
| } |
| group_indices->push_back(candidate_indices[selected_index]); |
| candidate_indices.erase(candidate_indices.begin() + selected_index); |
| } |
| } |
| } |
| |
| Status DataDirManager::FindDataDirsByTabletId(const string& tablet_id, |
| vector<string>* data_dirs) const { |
| CHECK(data_dirs); |
| DataDirGroupPB group; |
| RETURN_NOT_OK(GetDataDirGroupPB(tablet_id, &group)); |
| vector<string> data_dirs_tmp; |
| data_dirs_tmp.reserve(group.uuids_size()); |
| for (const auto& uuid : group.uuids()) { |
| int uuid_idx; |
| if (!FindUuidIndexByUuid(uuid, &uuid_idx)) { |
| return Status::NotFound("unable to find index for UUID", uuid); |
| } |
| const auto* data_dir = FindDirByUuidIndex(uuid_idx); |
| if (!data_dir) { |
| return Status::NotFound( |
| Substitute("unable to find data dir for UUID $0 with index $1", |
| uuid, uuid_idx)); |
| } |
| data_dirs_tmp.emplace_back(data_dir->dir()); |
| } |
| std::sort(data_dirs_tmp.begin(), data_dirs_tmp.end()); |
| *data_dirs = std::move(data_dirs_tmp); |
| return Status::OK(); |
| } |
| |
| void DataDirManager::RemoveUnhealthyDataDirsUnlocked(const vector<int>& uuid_indices, |
| vector<int>* healthy_indices) const { |
| if (PREDICT_TRUE(failed_dirs_.empty())) { |
| return; |
| } |
| healthy_indices->clear(); |
| for (int uuid_idx : uuid_indices) { |
| if (!ContainsKey(failed_dirs_, uuid_idx)) { |
| healthy_indices->emplace_back(uuid_idx); |
| } |
| } |
| } |
| |
| } // namespace fs |
| } // namespace kudu |