blob: 905b6cdb986ce0e5f50ccb9a69405e54ecd21f43 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/fs/data_dirs.h"
#include <algorithm>
#include <cerrno>
#include <cstdint>
#include <deque>
#include <iterator>
#include <memory>
#include <mutex>
#include <numeric>
#include <ostream>
#include <random>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include "kudu/fs/block_manager.h"
#include "kudu/fs/block_manager_util.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/integral_types.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/env.h"
#include "kudu/util/env_util.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/oid_generator.h"
#include "kudu/util/path_util.h"
#include "kudu/util/random_util.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_util_prod.h"
#include "kudu/util/threadpool.h"
DEFINE_int32(fs_target_data_dirs_per_tablet, 0,
"Indicates the target number of data dirs to spread each "
"tablet's data across. If greater than the number of data dirs "
"available, data will be striped across those available. The "
"default value 0 indicates striping should occur across all "
"healthy data directories.");
DEFINE_validator(fs_target_data_dirs_per_tablet,
[](const char* /*n*/, int32_t v) { return v >= 0; });
TAG_FLAG(fs_target_data_dirs_per_tablet, advanced);
TAG_FLAG(fs_target_data_dirs_per_tablet, experimental);
DEFINE_int64(fs_data_dirs_reserved_bytes, -1,
"Number of bytes to reserve on each data directory filesystem for "
"non-Kudu usage. The default, which is represented by -1, is that "
"1% of the disk space on each disk will be reserved. Any other "
"value specified represents the number of bytes reserved and must "
"be greater than or equal to 0. Explicit percentages to reserve "
"are not currently supported");
DEFINE_validator(fs_data_dirs_reserved_bytes, [](const char* /*n*/, int64_t v) { return v >= -1; });
TAG_FLAG(fs_data_dirs_reserved_bytes, runtime);
TAG_FLAG(fs_data_dirs_reserved_bytes, evolving);
DEFINE_int32(fs_data_dirs_full_disk_cache_seconds, 30,
"Number of seconds we cache the full-disk status in the block manager. "
"During this time, writes to the corresponding root path will not be attempted.");
TAG_FLAG(fs_data_dirs_full_disk_cache_seconds, advanced);
TAG_FLAG(fs_data_dirs_full_disk_cache_seconds, evolving);
DEFINE_bool(fs_lock_data_dirs, true,
"Lock the data directories to prevent concurrent usage. "
"Note that read-only concurrent usage is still allowed.");
TAG_FLAG(fs_lock_data_dirs, unsafe);
TAG_FLAG(fs_lock_data_dirs, evolving);
METRIC_DEFINE_gauge_uint64(server, data_dirs_failed,
"Data Directories Failed",
kudu::MetricUnit::kDataDirectories,
"Number of data directories whose disks are currently "
"in a failed state");
METRIC_DEFINE_gauge_uint64(server, data_dirs_full,
"Data Directories Full",
kudu::MetricUnit::kDataDirectories,
"Number of data directories whose disks are currently full");
DECLARE_string(block_manager);
namespace kudu {
namespace fs {
using env_util::ScopedFileDeleter;
using internal::DataDirGroup;
using std::default_random_engine;
using std::deque;
using std::iota;
using std::set;
using std::shuffle;
using std::string;
using std::unique_ptr;
using std::unordered_set;
using std::vector;
using strings::Substitute;
namespace {
const char kHolePunchErrorMsg[] =
"Error during hole punch test. The log block manager requires a "
"filesystem with hole punching support such as ext4 or xfs. On el6, "
"kernel version 2.6.32-358 or newer is required. To run without hole "
"punching (at the cost of some efficiency and scalability), reconfigure "
"Kudu with --block_manager=file. Refer to the Kudu documentation for more "
"details. Raw error message follows";
Status CheckHolePunch(Env* env, const string& path) {
// Arbitrary constants.
static uint64_t kFileSize = 4096 * 4;
static uint64_t kHoleOffset = 4096;
static uint64_t kHoleSize = 8192;
static uint64_t kPunchedFileSize = kFileSize - kHoleSize;
// Open the test file.
string filename = JoinPathSegments(path, "hole_punch_test_file");
unique_ptr<RWFile> file;
RWFileOptions opts;
RETURN_NOT_OK(env->NewRWFile(opts, filename, &file));
// The file has been created; delete it on exit no matter what happens.
ScopedFileDeleter file_deleter(env, filename);
// Preallocate it, making sure the file's size is what we'd expect.
uint64_t sz;
RETURN_NOT_OK(file->PreAllocate(0, kFileSize, RWFile::CHANGE_FILE_SIZE));
RETURN_NOT_OK(env->GetFileSizeOnDisk(filename, &sz));
if (sz != kFileSize) {
return Status::IOError(Substitute(
"Unexpected pre-punch file size for $0: expected $1 but got $2",
filename, kFileSize, sz));
}
// Punch the hole, testing the file's size again.
RETURN_NOT_OK(file->PunchHole(kHoleOffset, kHoleSize));
RETURN_NOT_OK(env->GetFileSizeOnDisk(filename, &sz));
if (sz != kPunchedFileSize) {
return Status::IOError(Substitute(
"Unexpected post-punch file size for $0: expected $1 but got $2",
filename, kPunchedFileSize, sz));
}
return Status::OK();
}
// Wrapper for env_util::DeleteTmpFilesRecursively that is suitable for parallel
// execution on a data directory's thread pool (which requires the return value
// be void).
void DeleteTmpFilesRecursively(Env* env, const string& path) {
WARN_NOT_OK(env_util::DeleteTmpFilesRecursively(env, path),
"Error while deleting temp files");
}
} // anonymous namespace
#define GINIT(x) x(METRIC_##x.Instantiate(entity, 0))
DataDirMetrics::DataDirMetrics(const scoped_refptr<MetricEntity>& entity)
: GINIT(data_dirs_failed),
GINIT(data_dirs_full) {
}
#undef GINIT
DataDir::DataDir(Env* env,
DataDirMetrics* metrics,
DataDirFsType fs_type,
string dir,
unique_ptr<PathInstanceMetadataFile> metadata_file,
unique_ptr<ThreadPool> pool)
: env_(env),
metrics_(metrics),
fs_type_(fs_type),
dir_(std::move(dir)),
metadata_file_(std::move(metadata_file)),
pool_(std::move(pool)),
is_shutdown_(false),
is_full_(false) {
}
DataDir::~DataDir() {
Shutdown();
}
void DataDir::Shutdown() {
if (is_shutdown_) {
return;
}
WaitOnClosures();
pool_->Shutdown();
is_shutdown_ = true;
}
void DataDir::ExecClosure(const Closure& task) {
Status s = pool_->SubmitClosure(task);
if (!s.ok()) {
WARN_NOT_OK(
s, "Could not submit task to thread pool, running it synchronously");
task.Run();
}
}
void DataDir::WaitOnClosures() {
pool_->Wait();
}
Status DataDir::RefreshIsFull(RefreshMode mode) {
switch (mode) {
case RefreshMode::EXPIRED_ONLY: {
std::lock_guard<simple_spinlock> l(lock_);
DCHECK(last_check_is_full_.Initialized());
MonoTime expiry = last_check_is_full_ + MonoDelta::FromSeconds(
FLAGS_fs_data_dirs_full_disk_cache_seconds);
if (!is_full_ || MonoTime::Now() < expiry) {
break;
}
FALLTHROUGH_INTENDED; // Root was previously full, check again.
}
case RefreshMode::ALWAYS: {
Status s = env_util::VerifySufficientDiskSpace(
env_, dir_, 0, FLAGS_fs_data_dirs_reserved_bytes);
bool is_full_new;
if (PREDICT_FALSE(s.IsIOError() && s.posix_code() == ENOSPC)) {
LOG(WARNING) << Substitute(
"Insufficient disk space under path $0: creation of new data "
"blocks under this path can be retried after $1 seconds: $2",
dir_, FLAGS_fs_data_dirs_full_disk_cache_seconds, s.ToString());
s = Status::OK();
is_full_new = true;
} else {
is_full_new = false;
}
RETURN_NOT_OK_PREPEND(s, "Could not refresh fullness"); // Catch other types of IOErrors, etc.
{
std::lock_guard<simple_spinlock> l(lock_);
if (metrics_ && is_full_ != is_full_new) {
metrics_->data_dirs_full->IncrementBy(is_full_new ? 1 : -1);
}
is_full_ = is_full_new;
last_check_is_full_ = MonoTime::Now();
}
break;
}
default:
LOG(FATAL) << "Unknown check mode";
}
return Status::OK();
}
const char* DataDirManager::kDataDirName = "data";
DataDirManagerOptions::DataDirManagerOptions()
: read_only(false) {
}
vector<string> DataDirManager::GetRootNames(const CanonicalizedRootsList& root_list) {
vector<string> roots;
std::transform(root_list.begin(), root_list.end(), std::back_inserter(roots),
[&] (const CanonicalizedRootAndStatus& r) { return r.path; });
return roots;
}
DataDirManager::DataDirManager(Env* env,
DataDirManagerOptions opts,
CanonicalizedRootsList canonicalized_data_roots)
: env_(env),
block_manager_type_(FLAGS_block_manager),
read_only_(opts.read_only),
canonicalized_data_fs_roots_(std::move(canonicalized_data_roots)),
rng_(GetRandomSeed32()) {
DCHECK_GT(canonicalized_data_fs_roots_.size(), 0);
if (opts.metric_entity) {
metrics_.reset(new DataDirMetrics(opts.metric_entity));
}
}
DataDirManager::~DataDirManager() {
Shutdown();
}
void DataDirManager::WaitOnClosures() {
for (const auto& dd : data_dirs_) {
dd->WaitOnClosures();
}
}
void DataDirManager::Shutdown() {
// We may be waiting here for a while on outstanding closures.
LOG_SLOW_EXECUTION(INFO, 1000,
Substitute("waiting on $0 block manager thread pools",
data_dirs_.size())) {
for (const auto& dd : data_dirs_) {
dd->Shutdown();
}
}
}
Status DataDirManager::OpenExistingForTests(Env* env, vector<string> data_fs_roots,
DataDirManagerOptions opts,
unique_ptr<DataDirManager>* dd_manager) {
CanonicalizedRootsList roots;
for (string& root : data_fs_roots) {
roots.push_back({ root, Status::OK() });
}
return DataDirManager::OpenExisting(env, std::move(roots), std::move(opts), dd_manager);
}
Status DataDirManager::OpenExisting(Env* env, CanonicalizedRootsList data_fs_roots,
DataDirManagerOptions opts,
unique_ptr<DataDirManager>* dd_manager) {
unique_ptr<DataDirManager> dm;
dm.reset(new DataDirManager(env, std::move(opts), std::move(data_fs_roots)));
RETURN_NOT_OK(dm->Open());
dd_manager->swap(dm);
return Status::OK();
}
Status DataDirManager::CreateNewForTests(Env* env, vector<string> data_fs_roots,
DataDirManagerOptions opts,
unique_ptr<DataDirManager>* dd_manager) {
CanonicalizedRootsList roots;
for (string& root : data_fs_roots) {
roots.push_back({ root, Status::OK() });
}
return DataDirManager::CreateNew(env, std::move(roots), std::move(opts), dd_manager);
}
Status DataDirManager::CreateNew(Env* env, CanonicalizedRootsList data_fs_roots,
DataDirManagerOptions opts,
unique_ptr<DataDirManager>* dd_manager) {
unique_ptr<DataDirManager> dm;
dm.reset(new DataDirManager(env, std::move(opts), std::move(data_fs_roots)));
RETURN_NOT_OK(dm->Create());
RETURN_NOT_OK(dm->Open());
dd_manager->swap(dm);
return Status::OK();
}
Status DataDirManager::Create() {
CHECK(!read_only_);
deque<ScopedFileDeleter*> delete_on_failure;
ElementDeleter d(&delete_on_failure);
// The UUIDs and indices will be included in every instance file.
ObjectIdGenerator gen;
vector<string> all_uuids(canonicalized_data_fs_roots_.size());
for (string& u : all_uuids) {
u = gen.Next();
}
int idx = 0;
// Ensure the data dirs exist and create the instance files.
unordered_set<string> to_sync;
for (const auto& root : canonicalized_data_fs_roots_) {
RETURN_NOT_OK_PREPEND(root.status, "Could not create directory manager with disks failed");
string data_dir = JoinPathSegments(root.path, kDataDirName);
bool created;
RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(env_, data_dir, &created),
Substitute("Could not create directory $0", data_dir));
if (created) {
delete_on_failure.push_front(new ScopedFileDeleter(env_, data_dir));
to_sync.insert(root.path);
}
if (block_manager_type_ == "log") {
RETURN_NOT_OK_PREPEND(CheckHolePunch(env_, data_dir), kHolePunchErrorMsg);
}
string instance_filename = JoinPathSegments(data_dir, kInstanceMetadataFileName);
PathInstanceMetadataFile metadata(env_, block_manager_type_,
instance_filename);
RETURN_NOT_OK_PREPEND(metadata.Create(all_uuids[idx], all_uuids), instance_filename);
delete_on_failure.push_front(new ScopedFileDeleter(env_, instance_filename));
idx++;
}
// Ensure newly created directories are synchronized to disk.
for (const string& dir : to_sync) {
RETURN_NOT_OK_PREPEND(env_->SyncDir(dir),
Substitute("Unable to synchronize directory $0", dir));
}
// Success: don't delete any files.
for (ScopedFileDeleter* deleter : delete_on_failure) {
deleter->Cancel();
}
return Status::OK();
}
Status DataDirManager::Open() {
vector<PathInstanceMetadataFile*> instances;
vector<unique_ptr<DataDir>> dds;
LockMode lock_mode;
if (!FLAGS_fs_lock_data_dirs) {
lock_mode = LockMode::NONE;
} else if (read_only_) {
lock_mode = LockMode::OPTIONAL;
} else {
lock_mode = LockMode::MANDATORY;
}
int max_data_dirs = block_manager_type_ == "file" ? (1 << 16) - 1 : kuint32max;
int i = 0;
// Create a directory for all data dirs specified by the user.
for (const auto& root : canonicalized_data_fs_roots_) {
string data_dir = JoinPathSegments(root.path, kDataDirName);
string instance_filename = JoinPathSegments(data_dir, kInstanceMetadataFileName);
// Open and lock the data dir's metadata instance file.
gscoped_ptr<PathInstanceMetadataFile> instance(
new PathInstanceMetadataFile(env_, block_manager_type_,
instance_filename));
if (PREDICT_FALSE(!root.status.ok())) {
instance->SetInstanceFailed(root.status);
} else {
RETURN_NOT_OK_PREPEND(instance->LoadFromDisk(),
Substitute("Could not open $0", instance_filename));
}
// Try locking the directory.
if (lock_mode != LockMode::NONE) {
Status s = instance->Lock();
if (!s.ok()) {
Status new_status = s.CloneAndPrepend(Substitute(
"Could not lock $0", instance_filename));
if (lock_mode == LockMode::OPTIONAL) {
LOG(WARNING) << new_status.ToString();
LOG(WARNING) << "Proceeding without lock";
} else {
DCHECK(LockMode::MANDATORY == lock_mode);
return new_status;
}
}
}
// Crash if the metadata directory, i.e. the first specified by the user, failed.
if (!instance->healthy() && i == 0) {
return Status::IOError(Substitute("Could not open directory manager; "
"metadata directory failed: $0", instance->health_status().ToString()));
}
instances.push_back(instance.get());
// Create a per-dir thread pool.
gscoped_ptr<ThreadPool> pool;
RETURN_NOT_OK(ThreadPoolBuilder(Substitute("data dir $0", i))
.set_max_threads(1)
.Build(&pool));
// Figure out what filesystem the data directory is on.
DataDirFsType fs_type = DataDirFsType::OTHER;
if (instance->healthy()) {
bool result;
RETURN_NOT_OK(env_->IsOnExtFilesystem(data_dir, &result));
if (result) {
fs_type = DataDirFsType::EXT;
} else {
RETURN_NOT_OK(env_->IsOnXfsFilesystem(data_dir, &result));
if (result) {
fs_type = DataDirFsType::XFS;
}
}
}
// Create the data directory in-memory structure itself.
unique_ptr<DataDir> dd(new DataDir(
env_, metrics_.get(), fs_type, data_dir,
unique_ptr<PathInstanceMetadataFile>(instance.release()),
unique_ptr<ThreadPool>(pool.release())));
dds.emplace_back(std::move(dd));
i++;
}
// Check integrity and determine which instances need updating.
RETURN_NOT_OK_PREPEND(
PathInstanceMetadataFile::CheckIntegrity(instances),
Substitute("Could not verify integrity of files: $0",
JoinStrings(GetDataDirs(), ",")));
// Use the per-dir thread pools to delete temporary files in parallel.
for (const auto& dd : dds) {
if (dd->instance()->healthy()) {
dd->ExecClosure(Bind(&DeleteTmpFilesRecursively, env_, dd->dir()));
}
}
for (const auto& dd : dds) {
dd->WaitOnClosures();
}
// Build in-memory maps of on-disk state.
UuidByRootMap uuid_by_root;
UuidByUuidIndexMap uuid_by_idx;
UuidIndexByUuidMap idx_by_uuid;
UuidIndexMap dd_by_uuid_idx;
ReverseUuidIndexMap uuid_idx_by_dd;
TabletsByUuidIndexMap tablets_by_uuid_idx_map;
FailedDataDirSet failed_data_dirs;
const auto insert_to_maps = [&] (uint16_t idx, string uuid, DataDir* dd) {
InsertOrDie(&uuid_by_root, DirName(dd->dir()), uuid);
InsertOrDie(&uuid_by_idx, idx, uuid);
InsertOrDie(&idx_by_uuid, uuid, idx);
InsertOrDie(&dd_by_uuid_idx, idx, dd);
InsertOrDie(&uuid_idx_by_dd, dd, idx);
InsertOrDie(&tablets_by_uuid_idx_map, idx, {});
};
vector<DataDir*> unassigned_dirs;
// Assign a uuid index to each healthy instance.
for (const auto& dd : dds) {
if (PREDICT_FALSE(!dd->instance()->healthy())) {
// Keep track of failed directories so we can assign them UUIDs later.
unassigned_dirs.push_back(dd.get());
continue;
}
const PathSetPB& path_set = dd->instance()->metadata()->path_set();
uint32_t idx = -1;
for (int i = 0; i < path_set.all_uuids_size(); i++) {
if (path_set.uuid() == path_set.all_uuids(i)) {
idx = i;
break;
}
}
DCHECK_NE(idx, -1); // Guaranteed by CheckIntegrity().
if (idx > max_data_dirs) {
return Status::NotSupported(
Substitute("Block manager supports a maximum of $0 paths", max_data_dirs));
}
insert_to_maps(idx, path_set.uuid(), dd.get());
}
// If the uuid index was not assigned, assign it to a failed directory. Use
// the 'all_uuids' of the first data directory, as it is guaranteed to be
// healthy.
PathSetPB path_set = dds[0]->instance()->metadata()->path_set();
int failed_dir_idx = 0;
for (int uuid_idx = 0; uuid_idx < path_set.all_uuids_size(); uuid_idx++) {
if (!ContainsKey(uuid_by_idx, uuid_idx)) {
const string& unassigned_uuid = path_set.all_uuids(uuid_idx);
DCHECK_LT(failed_dir_idx, unassigned_dirs.size());
insert_to_maps(uuid_idx, unassigned_uuid, unassigned_dirs[failed_dir_idx]);
// Record the directory as failed.
if (metrics_) {
metrics_->data_dirs_failed->IncrementBy(1);
}
InsertOrDie(&failed_data_dirs, uuid_idx);
failed_dir_idx++;
}
}
CHECK_EQ(unassigned_dirs.size(), failed_dir_idx);
data_dirs_.swap(dds);
uuid_by_idx_.swap(uuid_by_idx);
idx_by_uuid_.swap(idx_by_uuid);
data_dir_by_uuid_idx_.swap(dd_by_uuid_idx);
uuid_idx_by_data_dir_.swap(uuid_idx_by_dd);
tablets_by_uuid_idx_map_.swap(tablets_by_uuid_idx_map);
failed_data_dirs_.swap(failed_data_dirs);
uuid_by_root_.swap(uuid_by_root);
// From this point onwards, the above in-memory maps must be consistent with
// the main path set.
// Initialize the 'fullness' status of the data directories.
for (const auto& dd : data_dirs_) {
uint16_t uuid_idx;
CHECK(FindUuidIndexByDataDir(dd.get(), &uuid_idx));
if (ContainsKey(failed_data_dirs_, uuid_idx)) {
continue;
}
Status refresh_status = dd->RefreshIsFull(DataDir::RefreshMode::ALWAYS);
if (PREDICT_FALSE(!refresh_status.ok())) {
if (refresh_status.IsDiskFailure()) {
RETURN_NOT_OK(MarkDataDirFailed(uuid_idx, refresh_status.ToString()));
continue;
}
return refresh_status;
}
}
return Status::OK();
}
Status DataDirManager::LoadDataDirGroupFromPB(const std::string& tablet_id,
const DataDirGroupPB& pb) {
std::lock_guard<percpu_rwlock> lock(dir_group_lock_);
DataDirGroup group_from_pb = DataDirGroup::FromPB(pb, idx_by_uuid_);
DataDirGroup* other = InsertOrReturnExisting(&group_by_tablet_map_,
tablet_id,
group_from_pb);
if (other != nullptr) {
return Status::AlreadyPresent("Tried to load directory group for tablet but one is already "
"registered", tablet_id);
}
for (uint16_t uuid_idx : group_from_pb.uuid_indices()) {
InsertOrDie(&FindOrDie(tablets_by_uuid_idx_map_, uuid_idx), tablet_id);
}
return Status::OK();
}
Status DataDirManager::CreateDataDirGroup(const string& tablet_id,
DirDistributionMode mode) {
std::lock_guard<percpu_rwlock> write_lock(dir_group_lock_);
if (ContainsKey(group_by_tablet_map_, tablet_id)) {
return Status::AlreadyPresent("Tried to create directory group for tablet but one is already "
"registered", tablet_id);
}
// Adjust the disk group size to fit within the total number of data dirs.
int group_target_size;
if (FLAGS_fs_target_data_dirs_per_tablet == 0) {
group_target_size = data_dirs_.size();
} else {
group_target_size = std::min(FLAGS_fs_target_data_dirs_per_tablet,
static_cast<int>(data_dirs_.size()));
}
vector<uint16_t> group_indices;
if (mode == DirDistributionMode::ACROSS_ALL_DIRS) {
// If using all dirs, add all regardless of directory state.
AppendKeysFromMap(data_dir_by_uuid_idx_, &group_indices);
} else {
// Randomly select directories, giving preference to those with fewer tablets.
if (PREDICT_FALSE(!failed_data_dirs_.empty())) {
group_target_size = std::min(group_target_size,
static_cast<int>(data_dirs_.size()) - static_cast<int>(failed_data_dirs_.size()));
// A size of 0 would indicate no healthy disks, which should crash the server.
DCHECK_GE(group_target_size, 0);
if (group_target_size == 0) {
return Status::IOError("No healthy data directories available", "", ENODEV);
}
}
RETURN_NOT_OK(GetDirsForGroupUnlocked(group_target_size, &group_indices));
if (PREDICT_FALSE(group_indices.empty())) {
return Status::IOError("All healthy data directories are full", "", ENOSPC);
}
if (PREDICT_FALSE(group_indices.size() < FLAGS_fs_target_data_dirs_per_tablet)) {
LOG(WARNING) << Substitute("Could only allocate $0 dirs of requested $1 for tablet $2 ($3 "
"dirs total, $4 full, $5 failed).", group_indices.size(),
FLAGS_fs_target_data_dirs_per_tablet, tablet_id, data_dirs_.size(),
metrics_->data_dirs_full.get(), metrics_->data_dirs_failed.get());
}
}
InsertOrDie(&group_by_tablet_map_, tablet_id, DataDirGroup(group_indices));
for (uint16_t uuid_idx : group_indices) {
InsertOrDie(&FindOrDie(tablets_by_uuid_idx_map_, uuid_idx), tablet_id);
}
return Status::OK();
}
Status DataDirManager::GetNextDataDir(const CreateBlockOptions& opts, DataDir** dir) {
shared_lock<rw_spinlock> lock(dir_group_lock_.get_lock());
const vector<uint16_t>* group_uuid_indices;
vector<uint16_t> valid_uuid_indices;
if (PREDICT_TRUE(!opts.tablet_id.empty())) {
// Get the data dir group for the tablet.
DataDirGroup* group = FindOrNull(group_by_tablet_map_, opts.tablet_id);
if (group == nullptr) {
return Status::NotFound("Tried to get directory but no directory group "
"registered for tablet", opts.tablet_id);
}
if (PREDICT_TRUE(failed_data_dirs_.empty())) {
group_uuid_indices = &group->uuid_indices();
} else {
RemoveUnhealthyDataDirsUnlocked(group->uuid_indices(), &valid_uuid_indices);
group_uuid_indices = &valid_uuid_indices;
if (valid_uuid_indices.empty()) {
return Status::IOError("No healthy directories exist in tablet's "
"directory group", opts.tablet_id, ENODEV);
}
}
} else {
// This should only be reached by some tests; in cases where there is no
// natural tablet_id, select a data dir from any of the directories.
CHECK(IsGTest());
AppendKeysFromMap(data_dir_by_uuid_idx_, &valid_uuid_indices);
group_uuid_indices = &valid_uuid_indices;
}
vector<int> random_indices(group_uuid_indices->size());
iota(random_indices.begin(), random_indices.end(), 0);
shuffle(random_indices.begin(), random_indices.end(), default_random_engine(rng_.Next()));
// Randomly select a member of the group that is not full.
for (int i : random_indices) {
uint16_t uuid_idx = (*group_uuid_indices)[i];
DataDir* candidate = FindOrDie(data_dir_by_uuid_idx_, uuid_idx);
RETURN_NOT_OK(candidate->RefreshIsFull(DataDir::RefreshMode::EXPIRED_ONLY));
if (!candidate->is_full()) {
*dir = candidate;
return Status::OK();
}
}
string tablet_id_str = "";
if (PREDICT_TRUE(!opts.tablet_id.empty())) {
tablet_id_str = Substitute("$0's ", opts.tablet_id);
}
string dirs_state_str = Substitute("$0 failed", failed_data_dirs_.size());
if (metrics_) {
dirs_state_str = Substitute("$0 full, $1", metrics_->data_dirs_full.get(), dirs_state_str);
}
return Status::IOError(Substitute("No directories available to add to $0directory group ($1 "
"dirs total, $2).", tablet_id_str, data_dirs_.size(), dirs_state_str),
"", ENOSPC);
}
void DataDirManager::DeleteDataDirGroup(const std::string& tablet_id) {
std::lock_guard<percpu_rwlock> lock(dir_group_lock_);
DataDirGroup* group = FindOrNull(group_by_tablet_map_, tablet_id);
if (group == nullptr) {
return;
}
// Remove the tablet_id from every data dir in its group.
for (uint16_t uuid_idx : group->uuid_indices()) {
FindOrDie(tablets_by_uuid_idx_map_, uuid_idx).erase(tablet_id);
}
group_by_tablet_map_.erase(tablet_id);
}
bool DataDirManager::GetDataDirGroupPB(const std::string& tablet_id,
DataDirGroupPB* pb) const {
shared_lock<rw_spinlock> lock(dir_group_lock_.get_lock());
const DataDirGroup* group = FindOrNull(group_by_tablet_map_, tablet_id);
if (group != nullptr) {
group->CopyToPB(uuid_by_idx_, pb);
return true;
}
return false;
}
Status DataDirManager::GetDirsForGroupUnlocked(int target_size,
vector<uint16_t>* group_indices) {
DCHECK(dir_group_lock_.is_locked());
vector<uint16_t> candidate_indices;
for (auto& e : data_dir_by_uuid_idx_) {
if (ContainsKey(failed_data_dirs_, e.first)) {
continue;
}
RETURN_NOT_OK(e.second->RefreshIsFull(DataDir::RefreshMode::ALWAYS));
// TODO(awong): If a disk is unhealthy at the time of group creation, the
// resulting group may be below targeted size. Add functionality to resize
// groups. See KUDU-2040 for more details.
if (!e.second->is_full()) {
candidate_indices.push_back(e.first);
}
}
while (group_indices->size() < target_size && !candidate_indices.empty()) {
shuffle(candidate_indices.begin(), candidate_indices.end(), default_random_engine(rng_.Next()));
if (candidate_indices.size() == 1 ||
FindOrDie(tablets_by_uuid_idx_map_, candidate_indices[0]).size() <
FindOrDie(tablets_by_uuid_idx_map_, candidate_indices[1]).size()) {
group_indices->push_back(candidate_indices[0]);
candidate_indices.erase(candidate_indices.begin());
} else {
group_indices->push_back(candidate_indices[1]);
candidate_indices.erase(candidate_indices.begin() + 1);
}
}
return Status::OK();
}
DataDir* DataDirManager::FindDataDirByUuidIndex(uint16_t uuid_idx) const {
DCHECK_LT(uuid_idx, data_dirs_.size());
return FindPtrOrNull(data_dir_by_uuid_idx_, uuid_idx);
}
bool DataDirManager::FindUuidIndexByDataDir(DataDir* dir, uint16_t* uuid_idx) const {
return FindCopy(uuid_idx_by_data_dir_, dir, uuid_idx);
}
bool DataDirManager::FindUuidIndexByRoot(const string& root, uint16_t* uuid_idx) const {
string uuid;
if (FindUuidByRoot(root, &uuid)) {
return FindUuidIndexByUuid(uuid, uuid_idx);
}
return false;
}
bool DataDirManager::FindUuidIndexByUuid(const string& uuid, uint16_t* uuid_idx) const {
return FindCopy(idx_by_uuid_, uuid, uuid_idx);
}
bool DataDirManager::FindUuidByRoot(const string& root, string* uuid) const {
return FindCopy(uuid_by_root_, root, uuid);
}
set<string> DataDirManager::FindTabletsByDataDirUuidIdx(uint16_t uuid_idx) const {
DCHECK_LT(uuid_idx, data_dirs_.size());
shared_lock<rw_spinlock> lock(dir_group_lock_.get_lock());
const set<string>* tablet_set_ptr = FindOrNull(tablets_by_uuid_idx_map_, uuid_idx);
if (tablet_set_ptr) {
return *tablet_set_ptr;
}
return {};
}
void DataDirManager::MarkDataDirFailedByUuid(const string& uuid) {
uint16_t uuid_idx;
CHECK(FindUuidIndexByUuid(uuid, &uuid_idx));
WARN_NOT_OK(MarkDataDirFailed(uuid_idx), "Failed to handle disk failure");
}
Status DataDirManager::MarkDataDirFailed(uint16_t uuid_idx, const string& error_message) {
DCHECK_LT(uuid_idx, data_dirs_.size());
std::lock_guard<percpu_rwlock> lock(dir_group_lock_);
DataDir* dd = FindDataDirByUuidIndex(uuid_idx);
DCHECK(dd);
if (InsertIfNotPresent(&failed_data_dirs_, uuid_idx)) {
if (failed_data_dirs_.size() == data_dirs_.size()) {
// TODO(awong): pass 'error_message' as a Status instead of an string so
// we can avoid returning this artificial status.
return Status::IOError(Substitute("All data dirs have failed: ", error_message));
}
if (metrics_) {
metrics_->data_dirs_failed->IncrementBy(1);
}
string error_prefix = "";
if (!error_message.empty()) {
error_prefix = Substitute("$0: ", error_message);
}
LOG(ERROR) << error_prefix << Substitute("Directory $0 marked as failed", dd->dir());
}
return Status::OK();
}
bool DataDirManager::IsDataDirFailed(uint16_t uuid_idx) const {
DCHECK_LT(uuid_idx, data_dirs_.size());
shared_lock<rw_spinlock> lock(dir_group_lock_.get_lock());
return ContainsKey(failed_data_dirs_, uuid_idx);
}
bool DataDirManager::IsTabletInFailedDir(const string& tablet_id) const {
const set<uint16_t> failed_dirs = GetFailedDataDirs();
for (uint16_t failed_dir : failed_dirs) {
if (ContainsKey(FindTabletsByDataDirUuidIdx(failed_dir), tablet_id)) {
return true;
}
}
return false;
}
void DataDirManager::RemoveUnhealthyDataDirsUnlocked(const vector<uint16_t>& uuid_indices,
vector<uint16_t>* healthy_indices) const {
if (PREDICT_TRUE(failed_data_dirs_.empty())) {
return;
}
healthy_indices->clear();
for (uint16_t uuid_idx : uuid_indices) {
DCHECK_LT(uuid_idx, data_dirs_.size());
if (!ContainsKey(failed_data_dirs_, uuid_idx)) {
healthy_indices->emplace_back(uuid_idx);
}
}
}
vector<string> DataDirManager::GetDataRoots() const {
return GetRootNames(canonicalized_data_fs_roots_);
}
vector<string> DataDirManager::GetDataDirs() const {
return JoinPathSegmentsV(GetDataRoots(), kDataDirName);
}
} // namespace fs
} // namespace kudu