| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/fs/data_dirs.h" |
| |
| #include <algorithm> |
| #include <cerrno> |
| #include <cstdint> |
| #include <iterator> |
| #include <memory> |
| #include <mutex> |
| #include <numeric> |
| #include <ostream> |
| #include <random> |
| #include <string> |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <gflags/gflags_declare.h> |
| #include <glog/logging.h> |
| |
| #include "kudu/fs/block_manager.h" |
| #include "kudu/fs/block_manager_util.h" |
| #include "kudu/fs/fs.pb.h" |
| #include "kudu/gutil/bind.h" |
| #include "kudu/gutil/gscoped_ptr.h" |
| #include "kudu/gutil/integral_types.h" |
| #include "kudu/gutil/macros.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/strings/join.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/util/env.h" |
| #include "kudu/util/env_util.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/oid_generator.h" |
| #include "kudu/util/path_util.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/random_util.h" |
| #include "kudu/util/scoped_cleanup.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/stopwatch.h" |
| #include "kudu/util/test_util_prod.h" |
| #include "kudu/util/threadpool.h" |
| |
| DEFINE_int32(fs_target_data_dirs_per_tablet, 3, |
| "Indicates the target number of data dirs to spread each " |
| "tablet's data across. If greater than the number of data dirs " |
| "available, data will be striped across those available. A " |
| "value of 0 indicates striping should occur across all healthy " |
| "data dirs. Using fewer data dirs per tablet means a single " |
| "drive failure will be less likely to affect a given tablet."); |
| DEFINE_validator(fs_target_data_dirs_per_tablet, |
| [](const char* /*n*/, int32_t v) { return v >= 0; }); |
| TAG_FLAG(fs_target_data_dirs_per_tablet, advanced); |
| TAG_FLAG(fs_target_data_dirs_per_tablet, evolving); |
| |
| DEFINE_int64(fs_data_dirs_reserved_bytes, -1, |
| "Number of bytes to reserve on each data directory filesystem for " |
| "non-Kudu usage. The default, which is represented by -1, is that " |
| "1% of the disk space on each disk will be reserved. Any other " |
| "value specified represents the number of bytes reserved and must " |
| "be greater than or equal to 0. Explicit percentages to reserve " |
| "are not currently supported"); |
| DEFINE_validator(fs_data_dirs_reserved_bytes, [](const char* /*n*/, int64_t v) { return v >= -1; }); |
| TAG_FLAG(fs_data_dirs_reserved_bytes, runtime); |
| TAG_FLAG(fs_data_dirs_reserved_bytes, evolving); |
| |
| DEFINE_int32(fs_data_dirs_full_disk_cache_seconds, 30, |
| "Number of seconds we cache the full-disk status in the block manager. " |
| "During this time, writes to the corresponding root path will not be attempted."); |
| TAG_FLAG(fs_data_dirs_full_disk_cache_seconds, advanced); |
| TAG_FLAG(fs_data_dirs_full_disk_cache_seconds, evolving); |
| |
| DEFINE_bool(fs_lock_data_dirs, true, |
| "Lock the data directories to prevent concurrent usage. " |
| "Note that read-only concurrent usage is still allowed."); |
| TAG_FLAG(fs_lock_data_dirs, unsafe); |
| TAG_FLAG(fs_lock_data_dirs, evolving); |
| |
| METRIC_DEFINE_gauge_uint64(server, data_dirs_failed, |
| "Data Directories Failed", |
| kudu::MetricUnit::kDataDirectories, |
| "Number of data directories whose disks are currently " |
| "in a failed state"); |
| METRIC_DEFINE_gauge_uint64(server, data_dirs_full, |
| "Data Directories Full", |
| kudu::MetricUnit::kDataDirectories, |
| "Number of data directories whose disks are currently full"); |
| |
| DECLARE_bool(enable_data_block_fsync); |
| DECLARE_string(block_manager); |
| |
| namespace kudu { |
| |
| namespace fs { |
| |
| using internal::DataDirGroup; |
| using std::default_random_engine; |
| using std::iota; |
| using std::pair; |
| using std::set; |
| using std::shuffle; |
| using std::string; |
| using std::unique_ptr; |
| using std::unordered_map; |
| using std::unordered_set; |
| using std::vector; |
| using strings::Substitute; |
| using strings::SubstituteAndAppend; |
| |
| |
| namespace { |
| |
| const char kHolePunchErrorMsg[] = |
| "Error during hole punch test. The log block manager requires a " |
| "filesystem with hole punching support such as ext4 or xfs. On el6, " |
| "kernel version 2.6.32-358 or newer is required. To run without hole " |
| "punching (at the cost of some efficiency and scalability), reconfigure " |
| "Kudu to use the file block manager. Refer to the Kudu documentation for " |
| "more details. WARNING: the file block manager is not suitable for " |
| "production use and should be used only for small-scale evaluation and " |
| "development on systems where hole-punching is not available. It's " |
| "impossible to switch between block managers after data is written to the " |
| "server. Raw error message follows"; |
| |
| Status CheckHolePunch(Env* env, const string& path) { |
| // Arbitrary constants. |
| static uint64_t kFileSize = 4096 * 4; |
| static uint64_t kHoleOffset = 4096; |
| static uint64_t kHoleSize = 8192; |
| static uint64_t kPunchedFileSize = kFileSize - kHoleSize; |
| |
| // Open the test file. |
| string filename = JoinPathSegments(path, "hole_punch_test_file"); |
| unique_ptr<RWFile> file; |
| RWFileOptions opts; |
| RETURN_NOT_OK(env->NewRWFile(opts, filename, &file)); |
| |
| // The file has been created; delete it on exit no matter what happens. |
| auto file_deleter = MakeScopedCleanup([&]() { |
| WARN_NOT_OK(env->DeleteFile(filename), |
| "Could not delete file " + filename); |
| }); |
| |
| // Preallocate it, making sure the file's size is what we'd expect. |
| uint64_t sz; |
| RETURN_NOT_OK(file->PreAllocate(0, kFileSize, RWFile::CHANGE_FILE_SIZE)); |
| RETURN_NOT_OK(env->GetFileSizeOnDisk(filename, &sz)); |
| if (sz != kFileSize) { |
| return Status::IOError(Substitute( |
| "Unexpected pre-punch file size for $0: expected $1 but got $2", |
| filename, kFileSize, sz)); |
| } |
| |
| // Punch the hole, testing the file's size again. |
| RETURN_NOT_OK(file->PunchHole(kHoleOffset, kHoleSize)); |
| RETURN_NOT_OK(env->GetFileSizeOnDisk(filename, &sz)); |
| if (sz != kPunchedFileSize) { |
| return Status::IOError(Substitute( |
| "Unexpected post-punch file size for $0: expected $1 but got $2", |
| filename, kPunchedFileSize, sz)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| // Wrapper for env_util::DeleteTmpFilesRecursively that is suitable for parallel |
| // execution on a data directory's thread pool (which requires the return value |
| // be void). |
| void DeleteTmpFilesRecursively(Env* env, const string& path) { |
| WARN_NOT_OK(env_util::DeleteTmpFilesRecursively(env, path), |
| "Error while deleting temp files"); |
| } |
| |
| } // anonymous namespace |
| |
| //////////////////////////////////////////////////////////// |
| // DataDirMetrics |
| //////////////////////////////////////////////////////////// |
| |
| #define GINIT(x) x(METRIC_##x.Instantiate(entity, 0)) |
| DataDirMetrics::DataDirMetrics(const scoped_refptr<MetricEntity>& entity) |
| : GINIT(data_dirs_failed), |
| GINIT(data_dirs_full) { |
| } |
| #undef GINIT |
| |
| //////////////////////////////////////////////////////////// |
| // DataDir |
| //////////////////////////////////////////////////////////// |
| |
| DataDir::DataDir(Env* env, |
| DataDirMetrics* metrics, |
| DataDirFsType fs_type, |
| string dir, |
| unique_ptr<PathInstanceMetadataFile> metadata_file, |
| unique_ptr<ThreadPool> pool) |
| : env_(env), |
| metrics_(metrics), |
| fs_type_(fs_type), |
| dir_(std::move(dir)), |
| metadata_file_(std::move(metadata_file)), |
| pool_(std::move(pool)), |
| is_shutdown_(false), |
| is_full_(false) { |
| } |
| |
| DataDir::~DataDir() { |
| Shutdown(); |
| } |
| |
| void DataDir::Shutdown() { |
| if (is_shutdown_) { |
| return; |
| } |
| |
| WaitOnClosures(); |
| pool_->Shutdown(); |
| is_shutdown_ = true; |
| } |
| |
| void DataDir::ExecClosure(const Closure& task) { |
| Status s = pool_->SubmitClosure(task); |
| if (!s.ok()) { |
| WARN_NOT_OK( |
| s, "Could not submit task to thread pool, running it synchronously"); |
| task.Run(); |
| } |
| } |
| |
| void DataDir::WaitOnClosures() { |
| pool_->Wait(); |
| } |
| |
| Status DataDir::RefreshIsFull(RefreshMode mode) { |
| switch (mode) { |
| case RefreshMode::EXPIRED_ONLY: { |
| std::lock_guard<simple_spinlock> l(lock_); |
| DCHECK(last_check_is_full_.Initialized()); |
| MonoTime expiry = last_check_is_full_ + MonoDelta::FromSeconds( |
| FLAGS_fs_data_dirs_full_disk_cache_seconds); |
| if (!is_full_ || MonoTime::Now() < expiry) { |
| break; |
| } |
| FALLTHROUGH_INTENDED; // Root was previously full, check again. |
| } |
| case RefreshMode::ALWAYS: { |
| Status s = env_util::VerifySufficientDiskSpace( |
| env_, dir_, 0, FLAGS_fs_data_dirs_reserved_bytes); |
| bool is_full_new; |
| if (PREDICT_FALSE(s.IsIOError() && s.posix_code() == ENOSPC)) { |
| LOG(WARNING) << Substitute( |
| "Insufficient disk space under path $0: creation of new data " |
| "blocks under this path can be retried after $1 seconds: $2", |
| dir_, FLAGS_fs_data_dirs_full_disk_cache_seconds, s.ToString()); |
| s = Status::OK(); |
| is_full_new = true; |
| } else { |
| is_full_new = false; |
| } |
| RETURN_NOT_OK_PREPEND(s, "Could not refresh fullness"); // Catch other types of IOErrors, etc. |
| { |
| std::lock_guard<simple_spinlock> l(lock_); |
| if (metrics_ && is_full_ != is_full_new) { |
| metrics_->data_dirs_full->IncrementBy(is_full_new ? 1 : -1); |
| } |
| is_full_ = is_full_new; |
| last_check_is_full_ = MonoTime::Now(); |
| } |
| break; |
| } |
| default: |
| LOG(FATAL) << "Unknown check mode"; |
| } |
| return Status::OK(); |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // DataDirGroup |
| //////////////////////////////////////////////////////////// |
| |
| DataDirGroup::DataDirGroup() {} |
| |
| DataDirGroup::DataDirGroup(vector<int> uuid_indices) |
| : uuid_indices_(std::move(uuid_indices)) {} |
| |
| Status DataDirGroup::LoadFromPB(const UuidIndexByUuidMap& uuid_idx_by_uuid, |
| const DataDirGroupPB& pb) { |
| vector<int> uuid_indices; |
| for (const auto& uuid : pb.uuids()) { |
| int uuid_idx; |
| if (!FindCopy(uuid_idx_by_uuid, uuid, &uuid_idx)) { |
| return Status::NotFound(Substitute( |
| "could not find data dir with uuid $0", uuid)); |
| } |
| uuid_indices.emplace_back(uuid_idx); |
| } |
| |
| uuid_indices_ = std::move(uuid_indices); |
| return Status::OK(); |
| } |
| |
| Status DataDirGroup::CopyToPB(const UuidByUuidIndexMap& uuid_by_uuid_idx, |
| DataDirGroupPB* pb) const { |
| DCHECK(pb); |
| DataDirGroupPB group; |
| for (auto uuid_idx : uuid_indices_) { |
| string uuid; |
| if (!FindCopy(uuid_by_uuid_idx, uuid_idx, &uuid)) { |
| return Status::NotFound(Substitute( |
| "could not find data dir with uuid index $0", uuid_idx)); |
| } |
| group.mutable_uuids()->Add(std::move(uuid)); |
| } |
| |
| *pb = std::move(group); |
| return Status::OK(); |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // DataDirManagerOptions |
| //////////////////////////////////////////////////////////// |
| |
| DataDirManagerOptions::DataDirManagerOptions() |
| : block_manager_type(FLAGS_block_manager), |
| read_only(false), |
| consistency_check(ConsistencyCheckBehavior::ENFORCE_CONSISTENCY) { |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // DataDirManager |
| //////////////////////////////////////////////////////////// |
| |
| vector<string> DataDirManager::GetRootNames(const CanonicalizedRootsList& root_list) { |
| vector<string> roots; |
| std::transform(root_list.begin(), root_list.end(), std::back_inserter(roots), |
| [&] (const CanonicalizedRootAndStatus& r) { return r.path; }); |
| return roots; |
| } |
| |
| DataDirManager::DataDirManager(Env* env, |
| DataDirManagerOptions opts, |
| CanonicalizedRootsList canonicalized_data_roots) |
| : env_(env), |
| opts_(std::move(opts)), |
| canonicalized_data_fs_roots_(std::move(canonicalized_data_roots)), |
| rng_(GetRandomSeed32()) { |
| DCHECK_GT(canonicalized_data_fs_roots_.size(), 0); |
| DCHECK(opts_.consistency_check != ConsistencyCheckBehavior::UPDATE_ON_DISK || |
| !opts_.read_only); |
| |
| if (opts_.metric_entity) { |
| metrics_.reset(new DataDirMetrics(opts_.metric_entity)); |
| } |
| } |
| |
| DataDirManager::~DataDirManager() { |
| Shutdown(); |
| } |
| |
| void DataDirManager::WaitOnClosures() { |
| for (const auto& dd : data_dirs_) { |
| dd->WaitOnClosures(); |
| } |
| } |
| |
| void DataDirManager::Shutdown() { |
| // We may be waiting here for a while on outstanding closures. |
| LOG_SLOW_EXECUTION(INFO, 1000, |
| Substitute("waiting on $0 block manager thread pools", |
| data_dirs_.size())) { |
| for (const auto& dd : data_dirs_) { |
| dd->Shutdown(); |
| } |
| } |
| } |
| |
| Status DataDirManager::OpenExistingForTests(Env* env, vector<string> data_fs_roots, |
| DataDirManagerOptions opts, |
| unique_ptr<DataDirManager>* dd_manager) { |
| CanonicalizedRootsList roots; |
| for (const auto& r : data_fs_roots) { |
| roots.push_back({ r, Status::OK() }); |
| } |
| return DataDirManager::OpenExisting(env, std::move(roots), std::move(opts), dd_manager); |
| } |
| |
| Status DataDirManager::OpenExisting(Env* env, CanonicalizedRootsList data_fs_roots, |
| DataDirManagerOptions opts, |
| unique_ptr<DataDirManager>* dd_manager) { |
| unique_ptr<DataDirManager> dm; |
| dm.reset(new DataDirManager(env, std::move(opts), std::move(data_fs_roots))); |
| RETURN_NOT_OK(dm->Open()); |
| dd_manager->swap(dm); |
| return Status::OK(); |
| } |
| |
| Status DataDirManager::CreateNewForTests(Env* env, vector<string> data_fs_roots, |
| DataDirManagerOptions opts, |
| unique_ptr<DataDirManager>* dd_manager) { |
| CanonicalizedRootsList roots; |
| for (const auto& r : data_fs_roots) { |
| roots.push_back({ r, Status::OK() }); |
| } |
| return DataDirManager::CreateNew(env, std::move(roots), std::move(opts), dd_manager); |
| } |
| |
| Status DataDirManager::CreateNew(Env* env, CanonicalizedRootsList data_fs_roots, |
| DataDirManagerOptions opts, |
| unique_ptr<DataDirManager>* dd_manager) { |
| unique_ptr<DataDirManager> dm; |
| dm.reset(new DataDirManager(env, std::move(opts), std::move(data_fs_roots))); |
| RETURN_NOT_OK(dm->Create()); |
| RETURN_NOT_OK(dm->Open()); |
| dd_manager->swap(dm); |
| return Status::OK(); |
| } |
| |
| Status DataDirManager::Create() { |
| CHECK(!opts_.read_only); |
| |
| // Generate a new UUID for each data directory. |
| ObjectIdGenerator gen; |
| vector<string> all_uuids; |
| vector<pair<string, string>> root_uuid_pairs_to_create; |
| for (const auto& r : canonicalized_data_fs_roots_) { |
| RETURN_NOT_OK_PREPEND(r.status, "Could not create directory manager with disks failed"); |
| string uuid = gen.Next(); |
| all_uuids.emplace_back(uuid); |
| root_uuid_pairs_to_create.emplace_back(r.path, std::move(uuid)); |
| } |
| RETURN_NOT_OK_PREPEND(CreateNewDataDirectoriesAndUpdateInstances( |
| std::move(root_uuid_pairs_to_create), {}, std::move(all_uuids)), |
| "could not create new data directories"); |
| return Status::OK(); |
| } |
| |
| Status DataDirManager::CreateNewDataDirectoriesAndUpdateInstances( |
| vector<pair<string, string>> root_uuid_pairs_to_create, |
| vector<unique_ptr<PathInstanceMetadataFile>> instances_to_update, |
| vector<string> all_uuids) { |
| CHECK(!opts_.read_only); |
| |
| vector<string> created_dirs; |
| vector<string> created_files; |
| auto deleter = MakeScopedCleanup([&]() { |
| // Delete files first so that the directories will be empty when deleted. |
| for (const auto& f : created_files) { |
| WARN_NOT_OK(env_->DeleteFile(f), "Could not delete file " + f); |
| } |
| // Delete directories in reverse order since parent directories will have |
| // been added before child directories. |
| for (auto it = created_dirs.rbegin(); it != created_dirs.rend(); it++) { |
| WARN_NOT_OK(env_->DeleteDir(*it), "Could not delete dir " + *it); |
| } |
| }); |
| |
| // Ensure the data dirs exist and create the instance files. |
| for (const auto& p : root_uuid_pairs_to_create) { |
| string data_dir = JoinPathSegments(p.first, kDataDirName); |
| bool created; |
| RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(env_, data_dir, &created), |
| Substitute("Could not create directory $0", data_dir)); |
| if (created) { |
| created_dirs.emplace_back(data_dir); |
| } |
| |
| if (opts_.block_manager_type == "log") { |
| RETURN_NOT_OK_PREPEND(CheckHolePunch(env_, data_dir), kHolePunchErrorMsg); |
| } |
| |
| string instance_filename = JoinPathSegments(data_dir, kInstanceMetadataFileName); |
| PathInstanceMetadataFile metadata(env_, opts_.block_manager_type, |
| instance_filename); |
| RETURN_NOT_OK_PREPEND(metadata.Create(p.second, all_uuids), instance_filename); |
| created_files.emplace_back(instance_filename); |
| } |
| |
| // Update existing instances, if any. |
| RETURN_NOT_OK_PREPEND(UpdateInstances( |
| std::move(instances_to_update), std::move(all_uuids)), |
| "could not update existing data directories"); |
| |
| // Ensure newly created directories are synchronized to disk. |
| if (FLAGS_enable_data_block_fsync) { |
| WARN_NOT_OK(env_util::SyncAllParentDirs(env_, created_dirs, created_files), |
| "could not sync newly created data directories"); |
| } |
| |
| // Success: don't delete any files. |
| deleter.cancel(); |
| return Status::OK(); |
| } |
| |
| Status DataDirManager::UpdateInstances( |
| vector<unique_ptr<PathInstanceMetadataFile>> instances_to_update, |
| vector<string> new_all_uuids) { |
| // Prepare a scoped cleanup for managing instance metadata copies. |
| unordered_map<string, string> copies_to_restore; |
| unordered_set<string> copies_to_delete; |
| auto copy_cleanup = MakeScopedCleanup([&]() { |
| for (const auto& f : copies_to_delete) { |
| WARN_NOT_OK(env_->DeleteFile(f), "Could not delete file " + f); |
| } |
| for (const auto& f : copies_to_restore) { |
| WARN_NOT_OK(env_->RenameFile(f.first, f.second), |
| Substitute("Could not restore file $0 from $1", f.second, f.first)); |
| } |
| }); |
| |
| // Make a copy of every existing instance metadata file. |
| // |
| // This is done before performing any updates, so that if there's a failure |
| // while copying, there's no metadata to restore. |
| WritableFileOptions opts; |
| opts.sync_on_close = true; |
| for (const auto& instance : instances_to_update) { |
| if (!instance->healthy()) { |
| continue; |
| } |
| const string& instance_filename = instance->path(); |
| string copy_filename = instance_filename + kTmpInfix; |
| RETURN_NOT_OK_PREPEND(env_util::CopyFile( |
| env_, instance_filename, copy_filename, opts), |
| "unable to backup existing data directory instance metadata"); |
| InsertOrDie(&copies_to_delete, copy_filename); |
| } |
| |
| // Update existing instance metadata files with the new value of all_uuids. |
| for (const auto& instance : instances_to_update) { |
| if (!instance->healthy()) { |
| continue; |
| } |
| const string& instance_filename = instance->path(); |
| string copy_filename = instance_filename + kTmpInfix; |
| |
| // We've made enough progress on this instance that we should restore its |
| // copy on failure, even if the update below fails. That's because it's a |
| // multi-step process and it's possible for it to return failure despite |
| // the update taking place (e.g. synchronization failure). |
| CHECK_EQ(1, copies_to_delete.erase(copy_filename)); |
| InsertOrDie(&copies_to_restore, copy_filename, instance_filename); |
| |
| // Perform the update. |
| PathInstanceMetadataPB new_pb = *instance->metadata(); |
| new_pb.mutable_path_set()->mutable_all_uuids()->Clear(); |
| for (const auto& uuid : new_all_uuids) { |
| new_pb.mutable_path_set()->add_all_uuids(uuid); |
| } |
| RETURN_NOT_OK_PREPEND(pb_util::WritePBContainerToPath( |
| env_, instance_filename, new_pb, pb_util::OVERWRITE, |
| FLAGS_enable_data_block_fsync ? pb_util::SYNC : pb_util::NO_SYNC), |
| "unable to overwrite existing data directory instance metadata"); |
| } |
| |
| // Success; instance metadata copies will be deleted by 'copy_cleanup'. |
| InsertKeysFromMap(copies_to_restore, &copies_to_delete); |
| copies_to_restore.clear(); |
| return Status::OK(); |
| } |
| |
| Status DataDirManager::LoadInstances( |
| vector<unique_ptr<PathInstanceMetadataFile>>* loaded_instances) { |
| LockMode lock_mode; |
| if (!FLAGS_fs_lock_data_dirs) { |
| lock_mode = LockMode::NONE; |
| } else if (opts_.read_only) { |
| lock_mode = LockMode::OPTIONAL; |
| } else { |
| lock_mode = LockMode::MANDATORY; |
| } |
| vector<string> missing_roots_tmp; |
| vector<unique_ptr<PathInstanceMetadataFile>> loaded_instances_tmp; |
| for (int i = 0; i < canonicalized_data_fs_roots_.size(); i++) { |
| const auto& root = canonicalized_data_fs_roots_[i]; |
| string data_dir = JoinPathSegments(root.path, kDataDirName); |
| string instance_filename = JoinPathSegments(data_dir, kInstanceMetadataFileName); |
| |
| unique_ptr<PathInstanceMetadataFile> instance( |
| new PathInstanceMetadataFile(env_, opts_.block_manager_type, instance_filename)); |
| if (PREDICT_FALSE(!root.status.ok())) { |
| instance->SetInstanceFailed(root.status); |
| } else { |
| // This may return OK and mark 'instance' as unhealthy if the file could |
| // not be loaded (e.g. not found, disk errors). |
| RETURN_NOT_OK_PREPEND(instance->LoadFromDisk(), |
| Substitute("could not load $0", instance_filename)); |
| } |
| |
| // Try locking the instance. |
| if (instance->healthy() && lock_mode != LockMode::NONE) { |
| // This may return OK and mark 'instance' as unhealthy if the file could |
| // not be locked due to non-locking issues (e.g. disk errors). |
| Status s = instance->Lock(); |
| if (!s.ok()) { |
| if (lock_mode == LockMode::OPTIONAL) { |
| LOG(WARNING) << s.ToString(); |
| LOG(WARNING) << "Proceeding without lock"; |
| } else { |
| DCHECK(LockMode::MANDATORY == lock_mode); |
| return s; |
| } |
| } |
| } |
| |
| loaded_instances_tmp.emplace_back(std::move(instance)); |
| } |
| |
| int num_healthy_instances = 0; |
| for (const auto& instance : loaded_instances_tmp) { |
| if (instance->healthy()) { |
| num_healthy_instances++; |
| } |
| } |
| if (num_healthy_instances == 0) { |
| return Status::NotFound("could not open directory manager; no healthy " |
| "data directories found"); |
| } |
| loaded_instances->swap(loaded_instances_tmp); |
| return Status::OK(); |
| } |
| |
| Status DataDirManager::Open() { |
| const int kMaxDataDirs = opts_.block_manager_type == "file" ? (1 << 16) - 1 : kint32max; |
| |
| // Find and load existing data directory instances. |
| vector<unique_ptr<PathInstanceMetadataFile>> loaded_instances; |
| RETURN_NOT_OK(LoadInstances(&loaded_instances)); |
| |
| // Add new or remove existing data directories, if desired. |
| if (opts_.consistency_check == ConsistencyCheckBehavior::UPDATE_ON_DISK) { |
| if (opts_.block_manager_type == "file") { |
| return Status::InvalidArgument( |
| "file block manager may not add or remove data directories"); |
| } |
| |
| // Prepare to create new directories and update existing instances. We |
| // must generate a new UUID for each missing root, and update all_uuids in |
| // all existing instances to include those new UUIDs. |
| // |
| // Note: all data directories must be healthy to perform this operation. |
| ObjectIdGenerator gen; |
| vector<string> new_all_uuids; |
| vector<pair<string, string>> root_uuid_pairs_to_create; |
| for (const auto& i : loaded_instances) { |
| if (i->health_status().IsNotFound()) { |
| string uuid = gen.Next(); |
| new_all_uuids.emplace_back(uuid); |
| root_uuid_pairs_to_create.emplace_back(DirName(i->dir()), std::move(uuid)); |
| continue; |
| } |
| RETURN_NOT_OK_PREPEND( |
| i->health_status(), |
| "found failed data directory while adding new data directories"); |
| new_all_uuids.emplace_back(i->metadata()->path_set().uuid()); |
| } |
| RETURN_NOT_OK_PREPEND( |
| CreateNewDataDirectoriesAndUpdateInstances( |
| std::move(root_uuid_pairs_to_create), |
| std::move(loaded_instances), |
| std::move(new_all_uuids)), |
| "could not add new data directories"); |
| |
| // Now that we've created the missing directories, try loading the |
| // directories again. |
| // |
| // Note: 'loaded_instances' must be cleared to unlock the instance files. |
| loaded_instances.clear(); |
| RETURN_NOT_OK(LoadInstances(&loaded_instances)); |
| for (const auto& i : loaded_instances) { |
| RETURN_NOT_OK_PREPEND(i->health_status(), |
| "found failed data directory after updating data directories"); |
| } |
| } |
| |
| // Check the integrity of all loaded instances. |
| if (opts_.consistency_check != ConsistencyCheckBehavior::IGNORE_INCONSISTENCY) { |
| RETURN_NOT_OK_PREPEND( |
| PathInstanceMetadataFile::CheckIntegrity(loaded_instances), |
| Substitute("could not verify integrity of files: $0", |
| JoinStrings(GetDataDirs(), ","))); |
| } |
| |
| // All instances are present and accounted for. Time to create the in-memory |
| // data directory structures. |
| int i = 0; |
| vector<unique_ptr<DataDir>> dds; |
| for (auto& instance : loaded_instances) { |
| const string data_dir = instance->dir(); |
| |
| // Create a per-dir thread pool. |
| gscoped_ptr<ThreadPool> pool; |
| RETURN_NOT_OK(ThreadPoolBuilder(Substitute("data dir $0", i)) |
| .set_max_threads(1) |
| .set_trace_metric_prefix("data dirs") |
| .Build(&pool)); |
| |
| // Figure out what filesystem the data directory is on. |
| DataDirFsType fs_type = DataDirFsType::OTHER; |
| if (instance->healthy()) { |
| bool result; |
| RETURN_NOT_OK(env_->IsOnExtFilesystem(data_dir, &result)); |
| if (result) { |
| fs_type = DataDirFsType::EXT; |
| } else { |
| RETURN_NOT_OK(env_->IsOnXfsFilesystem(data_dir, &result)); |
| if (result) { |
| fs_type = DataDirFsType::XFS; |
| } |
| } |
| } |
| |
| unique_ptr<DataDir> dd(new DataDir( |
| env_, metrics_.get(), fs_type, data_dir, std::move(instance), |
| unique_ptr<ThreadPool>(pool.release()))); |
| dds.emplace_back(std::move(dd)); |
| i++; |
| } |
| |
| // Use the per-dir thread pools to delete temporary files in parallel. |
| for (const auto& dd : dds) { |
| if (dd->instance()->healthy()) { |
| dd->ExecClosure(Bind(&DeleteTmpFilesRecursively, env_, dd->dir())); |
| } |
| } |
| for (const auto& dd : dds) { |
| dd->WaitOnClosures(); |
| } |
| |
| // Build in-memory maps of on-disk state. |
| UuidByRootMap uuid_by_root; |
| UuidByUuidIndexMap uuid_by_idx; |
| UuidIndexByUuidMap idx_by_uuid; |
| UuidIndexMap dd_by_uuid_idx; |
| ReverseUuidIndexMap uuid_idx_by_dd; |
| TabletsByUuidIndexMap tablets_by_uuid_idx_map; |
| FailedDataDirSet failed_data_dirs; |
| |
| const auto insert_to_maps = [&] (int idx, string uuid, DataDir* dd) { |
| InsertOrDie(&uuid_by_root, DirName(dd->dir()), uuid); |
| InsertOrDie(&uuid_by_idx, idx, uuid); |
| InsertOrDie(&idx_by_uuid, uuid, idx); |
| InsertOrDie(&dd_by_uuid_idx, idx, dd); |
| InsertOrDie(&uuid_idx_by_dd, dd, idx); |
| InsertOrDie(&tablets_by_uuid_idx_map, idx, {}); |
| }; |
| |
| if (opts_.consistency_check != ConsistencyCheckBehavior::IGNORE_INCONSISTENCY) { |
| // If we're not in IGNORE_INCONSISTENCY mode, we're guaranteed that the |
| // healthy instances match from the above integrity check, so we can assign |
| // each healthy directory a UUID in accordance with its instance file. |
| // |
| // A directory may not have been assigned a UUID because its instance file |
| // could not be read, in which case, we track it and assign a UUID to it |
| // later if we can. |
| vector<DataDir*> unassigned_dirs; |
| int first_healthy = -1; |
| for (int dir = 0; dir < dds.size(); dir++) { |
| const auto& dd = dds[dir]; |
| if (PREDICT_FALSE(!dd->instance()->healthy())) { |
| // Keep track of failed directories so we can assign them UUIDs later. |
| unassigned_dirs.push_back(dd.get()); |
| continue; |
| } |
| if (first_healthy == -1) { |
| first_healthy = dir; |
| } |
| const PathSetPB& path_set = dd->instance()->metadata()->path_set(); |
| int idx = -1; |
| for (int i = 0; i < path_set.all_uuids_size(); i++) { |
| if (path_set.uuid() == path_set.all_uuids(i)) { |
| idx = i; |
| break; |
| } |
| } |
| if (idx == -1) { |
| return Status::IOError(Substitute( |
| "corrupt path set for data directory $0: uuid $1 not found in path set", |
| dd->dir(), path_set.uuid())); |
| } |
| if (idx > kMaxDataDirs) { |
| return Status::NotSupported( |
| Substitute("block manager supports a maximum of $0 paths", kMaxDataDirs)); |
| } |
| insert_to_maps(idx, path_set.uuid(), dd.get()); |
| } |
| CHECK_NE(first_healthy, -1); // Guaranteed by LoadInstances(). |
| |
| // If the uuid index was not assigned, assign it to a failed directory. Use |
| // the path set from the first healthy instance. |
| PathSetPB path_set = dds[first_healthy]->instance()->metadata()->path_set(); |
| int failed_dir_idx = 0; |
| for (int uuid_idx = 0; uuid_idx < path_set.all_uuids_size(); uuid_idx++) { |
| if (!ContainsKey(uuid_by_idx, uuid_idx)) { |
| const string& unassigned_uuid = path_set.all_uuids(uuid_idx); |
| insert_to_maps(uuid_idx, unassigned_uuid, unassigned_dirs[failed_dir_idx]); |
| |
| // Record the directory as failed. |
| if (metrics_) { |
| metrics_->data_dirs_failed->IncrementBy(1); |
| } |
| InsertOrDie(&failed_data_dirs, uuid_idx); |
| failed_dir_idx++; |
| } |
| } |
| CHECK_EQ(unassigned_dirs.size(), failed_dir_idx); |
| } else { |
| // If we are in IGNORE_INCONSISTENCY mode, all bets are off. The most we |
| // can do is make a best effort assignment of data dirs to UUIDs based on |
| // the ones that are healthy, and for the sake of completeness, assign |
| // artificial UUIDs to the unhealthy ones. |
| for (int dir = 0; dir < dds.size(); dir++) { |
| DataDir* dd = dds[dir].get(); |
| if (dd->instance()->healthy()) { |
| insert_to_maps(dir, dd->instance()->metadata()->path_set().uuid(), dd); |
| } else { |
| insert_to_maps(dir, Substitute("<unknown uuid $0>", dir), dd); |
| InsertOrDie(&failed_data_dirs, dir); |
| } |
| } |
| } |
| |
| data_dirs_.swap(dds); |
| uuid_by_idx_.swap(uuid_by_idx); |
| idx_by_uuid_.swap(idx_by_uuid); |
| data_dir_by_uuid_idx_.swap(dd_by_uuid_idx); |
| uuid_idx_by_data_dir_.swap(uuid_idx_by_dd); |
| tablets_by_uuid_idx_map_.swap(tablets_by_uuid_idx_map); |
| failed_data_dirs_.swap(failed_data_dirs); |
| uuid_by_root_.swap(uuid_by_root); |
| |
| // From this point onwards, the above in-memory maps must be consistent with |
| // the main path set. |
| |
| // Initialize the 'fullness' status of the data directories. |
| for (const auto& dd : data_dirs_) { |
| int uuid_idx; |
| CHECK(FindUuidIndexByDataDir(dd.get(), &uuid_idx)); |
| if (ContainsKey(failed_data_dirs_, uuid_idx)) { |
| continue; |
| } |
| Status refresh_status = dd->RefreshIsFull(DataDir::RefreshMode::ALWAYS); |
| if (PREDICT_FALSE(!refresh_status.ok())) { |
| if (refresh_status.IsDiskFailure()) { |
| RETURN_NOT_OK(MarkDataDirFailed(uuid_idx, refresh_status.ToString())); |
| continue; |
| } |
| return refresh_status; |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status DataDirManager::LoadDataDirGroupFromPB(const std::string& tablet_id, |
| const DataDirGroupPB& pb) { |
| std::lock_guard<percpu_rwlock> lock(dir_group_lock_); |
| DataDirGroup group_from_pb; |
| RETURN_NOT_OK_PREPEND(group_from_pb.LoadFromPB(idx_by_uuid_, pb), Substitute( |
| "could not load data dir group for tablet $0", tablet_id)); |
| DataDirGroup* other = InsertOrReturnExisting(&group_by_tablet_map_, |
| tablet_id, |
| group_from_pb); |
| if (other != nullptr) { |
| return Status::AlreadyPresent(Substitute( |
| "tried to load directory group for tablet $0 but one is already registered", |
| tablet_id)); |
| } |
| for (int uuid_idx : group_from_pb.uuid_indices()) { |
| InsertOrDie(&FindOrDie(tablets_by_uuid_idx_map_, uuid_idx), tablet_id); |
| } |
| return Status::OK(); |
| } |
| |
| Status DataDirManager::CreateDataDirGroup(const string& tablet_id, |
| DirDistributionMode mode) { |
| std::lock_guard<percpu_rwlock> write_lock(dir_group_lock_); |
| if (ContainsKey(group_by_tablet_map_, tablet_id)) { |
| return Status::AlreadyPresent("Tried to create directory group for tablet but one is already " |
| "registered", tablet_id); |
| } |
| // Adjust the disk group size to fit within the total number of data dirs. |
| int group_target_size; |
| if (FLAGS_fs_target_data_dirs_per_tablet == 0) { |
| group_target_size = data_dirs_.size(); |
| } else { |
| group_target_size = std::min(FLAGS_fs_target_data_dirs_per_tablet, |
| static_cast<int>(data_dirs_.size())); |
| } |
| vector<int> group_indices; |
| if (mode == DirDistributionMode::ACROSS_ALL_DIRS) { |
| // If using all dirs, add all regardless of directory state. |
| AppendKeysFromMap(data_dir_by_uuid_idx_, &group_indices); |
| } else { |
| // Randomly select directories, giving preference to those with fewer tablets. |
| if (PREDICT_FALSE(!failed_data_dirs_.empty())) { |
| group_target_size = std::min(group_target_size, |
| static_cast<int>(data_dirs_.size()) - static_cast<int>(failed_data_dirs_.size())); |
| |
| // A size of 0 would indicate no healthy disks, which should crash the server. |
| DCHECK_GE(group_target_size, 0); |
| if (group_target_size == 0) { |
| return Status::IOError("No healthy data directories available", "", ENODEV); |
| } |
| } |
| GetDirsForGroupUnlocked(group_target_size, &group_indices); |
| if (PREDICT_FALSE(group_indices.empty())) { |
| return Status::IOError("All healthy data directories are full", "", ENOSPC); |
| } |
| if (PREDICT_FALSE(group_indices.size() < FLAGS_fs_target_data_dirs_per_tablet)) { |
| string msg = Substitute("Could only allocate $0 dirs of requested $1 for tablet " |
| "$2. $3 dirs total", group_indices.size(), |
| FLAGS_fs_target_data_dirs_per_tablet, tablet_id, data_dirs_.size()); |
| if (metrics_) { |
| SubstituteAndAppend(&msg, ", $0 dirs full, $1 dirs failed", |
| metrics_->data_dirs_full->value(), metrics_->data_dirs_failed->value()); |
| } |
| LOG(INFO) << msg; |
| } |
| } |
| InsertOrDie(&group_by_tablet_map_, tablet_id, DataDirGroup(group_indices)); |
| for (int uuid_idx : group_indices) { |
| InsertOrDie(&FindOrDie(tablets_by_uuid_idx_map_, uuid_idx), tablet_id); |
| } |
| return Status::OK(); |
| } |
| |
| Status DataDirManager::GetNextDataDir(const CreateBlockOptions& opts, DataDir** dir) { |
| shared_lock<rw_spinlock> lock(dir_group_lock_.get_lock()); |
| const vector<int>* group_uuid_indices; |
| vector<int> valid_uuid_indices; |
| if (PREDICT_TRUE(!opts.tablet_id.empty())) { |
| // Get the data dir group for the tablet. |
| DataDirGroup* group = FindOrNull(group_by_tablet_map_, opts.tablet_id); |
| if (group == nullptr) { |
| return Status::NotFound("Tried to get directory but no directory group " |
| "registered for tablet", opts.tablet_id); |
| } |
| if (PREDICT_TRUE(failed_data_dirs_.empty())) { |
| group_uuid_indices = &group->uuid_indices(); |
| } else { |
| RemoveUnhealthyDataDirsUnlocked(group->uuid_indices(), &valid_uuid_indices); |
| group_uuid_indices = &valid_uuid_indices; |
| if (valid_uuid_indices.empty()) { |
| return Status::IOError("No healthy directories exist in tablet's " |
| "directory group", opts.tablet_id, ENODEV); |
| } |
| } |
| } else { |
| // This should only be reached by some tests; in cases where there is no |
| // natural tablet_id, select a data dir from any of the directories. |
| CHECK(IsGTest()); |
| AppendKeysFromMap(data_dir_by_uuid_idx_, &valid_uuid_indices); |
| group_uuid_indices = &valid_uuid_indices; |
| } |
| vector<int> random_indices(group_uuid_indices->size()); |
| iota(random_indices.begin(), random_indices.end(), 0); |
| shuffle(random_indices.begin(), random_indices.end(), default_random_engine(rng_.Next())); |
| |
| // Randomly select a member of the group that is not full. |
| for (int i : random_indices) { |
| int uuid_idx = (*group_uuid_indices)[i]; |
| DataDir* candidate = FindOrDie(data_dir_by_uuid_idx_, uuid_idx); |
| Status s = candidate->RefreshIsFull(DataDir::RefreshMode::EXPIRED_ONLY); |
| WARN_NOT_OK(s, Substitute("failed to refresh fullness of $0", candidate->dir())); |
| if (s.ok() && !candidate->is_full()) { |
| *dir = candidate; |
| return Status::OK(); |
| } |
| } |
| string tablet_id_str = ""; |
| if (PREDICT_TRUE(!opts.tablet_id.empty())) { |
| tablet_id_str = Substitute("$0's ", opts.tablet_id); |
| } |
| string dirs_state_str = Substitute("$0 failed", failed_data_dirs_.size()); |
| if (metrics_) { |
| dirs_state_str = Substitute("$0 full, $1", |
| metrics_->data_dirs_full->value(), dirs_state_str); |
| } |
| return Status::IOError(Substitute("No directories available to add to $0directory group ($1 " |
| "dirs total, $2).", tablet_id_str, data_dirs_.size(), dirs_state_str), |
| "", ENOSPC); |
| } |
| |
| void DataDirManager::DeleteDataDirGroup(const std::string& tablet_id) { |
| std::lock_guard<percpu_rwlock> lock(dir_group_lock_); |
| DataDirGroup* group = FindOrNull(group_by_tablet_map_, tablet_id); |
| if (group == nullptr) { |
| return; |
| } |
| // Remove the tablet_id from every data dir in its group. |
| for (int uuid_idx : group->uuid_indices()) { |
| FindOrDie(tablets_by_uuid_idx_map_, uuid_idx).erase(tablet_id); |
| } |
| group_by_tablet_map_.erase(tablet_id); |
| } |
| |
| Status DataDirManager::GetDataDirGroupPB(const string& tablet_id, |
| DataDirGroupPB* pb) const { |
| shared_lock<rw_spinlock> lock(dir_group_lock_.get_lock()); |
| const DataDirGroup* group = FindOrNull(group_by_tablet_map_, tablet_id); |
| if (group == nullptr) { |
| return Status::NotFound(Substitute( |
| "could not find data dir group for tablet $0", tablet_id)); |
| } |
| RETURN_NOT_OK(group->CopyToPB(uuid_by_idx_, pb)); |
| return Status::OK(); |
| } |
| |
| void DataDirManager::GetDirsForGroupUnlocked(int target_size, |
| vector<int>* group_indices) { |
| DCHECK(dir_group_lock_.is_locked()); |
| vector<int> candidate_indices; |
| for (auto& e : data_dir_by_uuid_idx_) { |
| if (ContainsKey(failed_data_dirs_, e.first)) { |
| continue; |
| } |
| Status s = e.second->RefreshIsFull(DataDir::RefreshMode::ALWAYS); |
| WARN_NOT_OK(s, Substitute("failed to refresh fullness of $0", e.second->dir())); |
| if (s.ok() && !e.second->is_full()) { |
| // TODO(awong): If a disk is unhealthy at the time of group creation, the |
| // resulting group may be below targeted size. Add functionality to |
| // resize groups. See KUDU-2040 for more details. |
| candidate_indices.push_back(e.first); |
| } |
| } |
| while (group_indices->size() < target_size && !candidate_indices.empty()) { |
| shuffle(candidate_indices.begin(), candidate_indices.end(), default_random_engine(rng_.Next())); |
| if (candidate_indices.size() == 1 || |
| FindOrDie(tablets_by_uuid_idx_map_, candidate_indices[0]).size() < |
| FindOrDie(tablets_by_uuid_idx_map_, candidate_indices[1]).size()) { |
| group_indices->push_back(candidate_indices[0]); |
| candidate_indices.erase(candidate_indices.begin()); |
| } else { |
| group_indices->push_back(candidate_indices[1]); |
| candidate_indices.erase(candidate_indices.begin() + 1); |
| } |
| } |
| } |
| |
| DataDir* DataDirManager::FindDataDirByUuidIndex(int uuid_idx) const { |
| DCHECK_LT(uuid_idx, data_dirs_.size()); |
| return FindPtrOrNull(data_dir_by_uuid_idx_, uuid_idx); |
| } |
| |
| bool DataDirManager::FindUuidIndexByDataDir(DataDir* dir, int* uuid_idx) const { |
| return FindCopy(uuid_idx_by_data_dir_, dir, uuid_idx); |
| } |
| |
| bool DataDirManager::FindUuidIndexByRoot(const string& root, int* uuid_idx) const { |
| string uuid; |
| if (FindUuidByRoot(root, &uuid)) { |
| return FindUuidIndexByUuid(uuid, uuid_idx); |
| } |
| return false; |
| } |
| |
| bool DataDirManager::FindUuidIndexByUuid(const string& uuid, int* uuid_idx) const { |
| return FindCopy(idx_by_uuid_, uuid, uuid_idx); |
| } |
| |
| bool DataDirManager::FindUuidByRoot(const string& root, string* uuid) const { |
| return FindCopy(uuid_by_root_, root, uuid); |
| } |
| |
| set<string> DataDirManager::FindTabletsByDataDirUuidIdx(int uuid_idx) const { |
| DCHECK_LT(uuid_idx, data_dirs_.size()); |
| shared_lock<rw_spinlock> lock(dir_group_lock_.get_lock()); |
| const set<string>* tablet_set_ptr = FindOrNull(tablets_by_uuid_idx_map_, uuid_idx); |
| if (tablet_set_ptr) { |
| return *tablet_set_ptr; |
| } |
| return {}; |
| } |
| |
| void DataDirManager::MarkDataDirFailedByUuid(const string& uuid) { |
| int uuid_idx; |
| CHECK(FindUuidIndexByUuid(uuid, &uuid_idx)); |
| WARN_NOT_OK(MarkDataDirFailed(uuid_idx), "Failed to handle disk failure"); |
| } |
| |
| Status DataDirManager::MarkDataDirFailed(int uuid_idx, const string& error_message) { |
| DCHECK_LT(uuid_idx, data_dirs_.size()); |
| std::lock_guard<percpu_rwlock> lock(dir_group_lock_); |
| DataDir* dd = FindDataDirByUuidIndex(uuid_idx); |
| DCHECK(dd); |
| if (InsertIfNotPresent(&failed_data_dirs_, uuid_idx)) { |
| if (failed_data_dirs_.size() == data_dirs_.size()) { |
| // TODO(awong): pass 'error_message' as a Status instead of an string so |
| // we can avoid returning this artificial status. |
| return Status::IOError(Substitute("All data dirs have failed: ", error_message)); |
| } |
| if (metrics_) { |
| metrics_->data_dirs_failed->IncrementBy(1); |
| } |
| string error_prefix = ""; |
| if (!error_message.empty()) { |
| error_prefix = Substitute("$0: ", error_message); |
| } |
| LOG(ERROR) << error_prefix << Substitute("Directory $0 marked as failed", dd->dir()); |
| } |
| return Status::OK(); |
| } |
| |
| bool DataDirManager::IsDataDirFailed(int uuid_idx) const { |
| DCHECK_LT(uuid_idx, data_dirs_.size()); |
| shared_lock<rw_spinlock> lock(dir_group_lock_.get_lock()); |
| return ContainsKey(failed_data_dirs_, uuid_idx); |
| } |
| |
| bool DataDirManager::IsTabletInFailedDir(const string& tablet_id) const { |
| const set<int> failed_dirs = GetFailedDataDirs(); |
| for (int failed_dir : failed_dirs) { |
| if (ContainsKey(FindTabletsByDataDirUuidIdx(failed_dir), tablet_id)) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| void DataDirManager::RemoveUnhealthyDataDirsUnlocked(const vector<int>& uuid_indices, |
| vector<int>* healthy_indices) const { |
| if (PREDICT_TRUE(failed_data_dirs_.empty())) { |
| return; |
| } |
| healthy_indices->clear(); |
| for (int uuid_idx : uuid_indices) { |
| DCHECK_LT(uuid_idx, data_dirs_.size()); |
| if (!ContainsKey(failed_data_dirs_, uuid_idx)) { |
| healthy_indices->emplace_back(uuid_idx); |
| } |
| } |
| } |
| |
| vector<string> DataDirManager::GetDataRoots() const { |
| return GetRootNames(canonicalized_data_fs_roots_); |
| } |
| |
| vector<string> DataDirManager::GetDataDirs() const { |
| return JoinPathSegmentsV(GetDataRoots(), kDataDirName); |
| } |
| |
| } // namespace fs |
| } // namespace kudu |