blob: 3cfcf6533a55ef0bae60b1c243ddee73198209e0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/fs/data_dirs.h"
#include <cerrno>
#include <deque>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/fs/block_manager.h"
#include "kudu/fs/block_manager_util.h"
#include "kudu/fs/fs.pb.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/atomic.h"
#include "kudu/util/env.h"
#include "kudu/util/env_util.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/locks.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/oid_generator.h"
#include "kudu/util/path_util.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/threadpool.h"
DEFINE_int64(fs_data_dirs_reserved_bytes, 0,
"Number of bytes to reserve on each data directory filesystem for non-Kudu usage.");
TAG_FLAG(fs_data_dirs_reserved_bytes, runtime);
TAG_FLAG(fs_data_dirs_reserved_bytes, evolving);
DEFINE_int32(fs_data_dirs_full_disk_cache_seconds, 30,
"Number of seconds we cache the full-disk status in the block manager. "
"During this time, writes to the corresponding root path will not be attempted.");
TAG_FLAG(fs_data_dirs_full_disk_cache_seconds, advanced);
TAG_FLAG(fs_data_dirs_full_disk_cache_seconds, evolving);
METRIC_DEFINE_gauge_uint64(server, data_dirs_full,
"Data Directories Full",
kudu::MetricUnit::kDataDirectories,
"Number of data directories whose disks are currently full");
namespace kudu {
namespace fs {
using env_util::ScopedFileDeleter;
using std::deque;
using std::string;
using std::unique_ptr;
using std::unordered_set;
using std::vector;
using strings::Substitute;
namespace {
const char kInstanceMetadataFileName[] = "block_manager_instance";
const char kHolePunchErrorMsg[] =
"Error during hole punch test. The log block manager requires a "
"filesystem with hole punching support such as ext4 or xfs. On el6, "
"kernel version 2.6.32-358 or newer is required. To run without hole "
"punching (at the cost of some efficiency and scalability), reconfigure "
"Kudu with --block_manager=file. Refer to the Kudu documentation for more "
"details. Raw error message follows";
Status CheckHolePunch(Env* env, const string& path) {
// Arbitrary constants.
static uint64_t kFileSize = 4096 * 4;
static uint64_t kHoleOffset = 4096;
static uint64_t kHoleSize = 8192;
static uint64_t kPunchedFileSize = kFileSize - kHoleSize;
// Open the test file.
string filename = JoinPathSegments(path, "hole_punch_test_file");
gscoped_ptr<RWFile> file;
RWFileOptions opts;
RETURN_NOT_OK(env->NewRWFile(opts, filename, &file));
// The file has been created; delete it on exit no matter what happens.
ScopedFileDeleter file_deleter(env, filename);
// Preallocate it, making sure the file's size is what we'd expect.
uint64_t sz;
RETURN_NOT_OK(file->PreAllocate(0, kFileSize));
RETURN_NOT_OK(env->GetFileSizeOnDisk(filename, &sz));
if (sz != kFileSize) {
return Status::IOError(Substitute(
"Unexpected pre-punch file size for $0: expected $1 but got $2",
filename, kFileSize, sz));
}
// Punch the hole, testing the file's size again.
RETURN_NOT_OK(file->PunchHole(kHoleOffset, kHoleSize));
RETURN_NOT_OK(env->GetFileSizeOnDisk(filename, &sz));
if (sz != kPunchedFileSize) {
return Status::IOError(Substitute(
"Unexpected post-punch file size for $0: expected $1 but got $2",
filename, kPunchedFileSize, sz));
}
return Status::OK();
}
} // anonymous namespace
#define GINIT(x) x(METRIC_##x.Instantiate(entity, 0))
DataDirMetrics::DataDirMetrics(const scoped_refptr<MetricEntity>& entity)
: GINIT(data_dirs_full) {
}
#undef GINIT
DataDir::DataDir(Env* env,
DataDirMetrics* metrics,
string dir,
unique_ptr<PathInstanceMetadataFile> metadata_file,
unique_ptr<ThreadPool> pool)
: env_(env),
metrics_(metrics),
dir_(std::move(dir)),
metadata_file_(std::move(metadata_file)),
pool_(std::move(pool)),
is_shutdown_(false),
is_full_(false) {
}
DataDir::~DataDir() {
Shutdown();
}
void DataDir::Shutdown() {
if (is_shutdown_) {
return;
}
pool_->Wait();
pool_->Shutdown();
is_shutdown_ = true;
}
void DataDir::ExecClosure(const Closure& task) {
Status s = pool_->SubmitClosure(task);
if (!s.ok()) {
WARN_NOT_OK(
s, "Could not submit task to thread pool, running it synchronously");
task.Run();
}
}
void DataDir::WaitOnClosures() {
pool_->Wait();
}
Status DataDir::RefreshIsFull(RefreshMode mode) {
switch (mode) {
case RefreshMode::EXPIRED_ONLY: {
std::lock_guard<simple_spinlock> l(lock_);
DCHECK(last_check_is_full_.Initialized());
MonoTime expiry = last_check_is_full_ + MonoDelta::FromSeconds(
FLAGS_fs_data_dirs_full_disk_cache_seconds);
if (!is_full_ || MonoTime::Now() < expiry) {
break;
}
FALLTHROUGH_INTENDED; // Root was previously full, check again.
}
case RefreshMode::ALWAYS: {
Status s = env_util::VerifySufficientDiskSpace(
env_, dir_, 0, FLAGS_fs_data_dirs_reserved_bytes);
bool is_full_new;
if (PREDICT_FALSE(s.IsIOError() && s.posix_code() == ENOSPC)) {
LOG(WARNING) << Substitute(
"Insufficient disk space under path $0: creation of new data "
"blocks under this path can be retried after $1 seconds: $2",
dir_, FLAGS_fs_data_dirs_full_disk_cache_seconds, s.ToString());
s = Status::OK();
is_full_new = true;
} else {
is_full_new = false;
}
RETURN_NOT_OK(s); // Catch other types of IOErrors, etc.
{
std::lock_guard<simple_spinlock> l(lock_);
if (metrics_ && is_full_ != is_full_new) {
metrics_->data_dirs_full->IncrementBy(is_full_new ? 1 : -1);
}
is_full_ = is_full_new;
last_check_is_full_ = MonoTime::Now();
}
break;
}
default:
LOG(FATAL) << "Unknown check mode";
}
return Status::OK();
}
DataDirManager::DataDirManager(Env* env,
scoped_refptr<MetricEntity> metric_entity,
string block_manager_type,
vector<string> paths)
: env_(env),
block_manager_type_(std::move(block_manager_type)),
paths_(std::move(paths)),
data_dirs_next_(0) {
DCHECK_GT(paths_.size(), 0);
if (metric_entity) {
metrics_.reset(new DataDirMetrics(metric_entity));
}
}
DataDirManager::~DataDirManager() {
Shutdown();
}
void DataDirManager::Shutdown() {
// We may be waiting here for a while on outstanding closures.
LOG_SLOW_EXECUTION(INFO, 1000,
Substitute("waiting on $0 block manager thread pools",
data_dirs_.size())) {
for (const auto& dd : data_dirs_) {
dd->Shutdown();
}
}
}
Status DataDirManager::Create(int flags) {
deque<ScopedFileDeleter*> delete_on_failure;
ElementDeleter d(&delete_on_failure);
// The UUIDs and indices will be included in every instance file.
ObjectIdGenerator gen;
vector<string> all_uuids(paths_.size());
for (string& u : all_uuids) {
u = gen.Next();
}
int idx = 0;
// Ensure the data paths exist and create the instance files.
unordered_set<string> to_sync;
for (const auto& p : paths_) {
bool created;
RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(env_, p, &created),
Substitute("Could not create directory $0", p));
if (created) {
delete_on_failure.push_front(new ScopedFileDeleter(env_, p));
to_sync.insert(DirName(p));
}
if (flags & FLAG_CREATE_TEST_HOLE_PUNCH) {
RETURN_NOT_OK_PREPEND(CheckHolePunch(env_, p), kHolePunchErrorMsg);
}
string instance_filename = JoinPathSegments(p, kInstanceMetadataFileName);
PathInstanceMetadataFile metadata(env_, block_manager_type_,
instance_filename);
RETURN_NOT_OK_PREPEND(metadata.Create(all_uuids[idx], all_uuids), instance_filename);
delete_on_failure.push_front(new ScopedFileDeleter(env_, instance_filename));
idx++;
}
// Ensure newly created directories are synchronized to disk.
if (flags & FLAG_CREATE_FSYNC) {
for (const string& dir : to_sync) {
RETURN_NOT_OK_PREPEND(env_->SyncDir(dir),
Substitute("Unable to synchronize directory $0", dir));
}
}
// Success: don't delete any files.
for (ScopedFileDeleter* deleter : delete_on_failure) {
deleter->Cancel();
}
return Status::OK();
}
Status DataDirManager::Open(int max_data_dirs, LockMode mode) {
vector<PathInstanceMetadataFile*> instances;
vector<unique_ptr<DataDir>> dds;
int i = 0;
for (const auto& p : paths_) {
// Open and lock the data dir's metadata instance file.
string instance_filename = JoinPathSegments(p, kInstanceMetadataFileName);
gscoped_ptr<PathInstanceMetadataFile> instance(
new PathInstanceMetadataFile(env_, block_manager_type_,
instance_filename));
RETURN_NOT_OK_PREPEND(instance->LoadFromDisk(),
Substitute("Could not open $0", instance_filename));
if (mode != LockMode::NONE) {
Status s = instance->Lock();
if (!s.ok()) {
Status new_status = s.CloneAndPrepend(Substitute(
"Could not lock $0", instance_filename));
if (mode == LockMode::OPTIONAL) {
LOG(WARNING) << new_status.ToString();
LOG(WARNING) << "Proceeding without lock";
} else {
DCHECK(LockMode::MANDATORY == mode);
RETURN_NOT_OK(new_status);
}
}
}
instances.push_back(instance.get());
// Create a per-dir thread pool.
gscoped_ptr<ThreadPool> pool;
RETURN_NOT_OK(ThreadPoolBuilder(Substitute("data dir $0", i))
.set_max_threads(1)
.Build(&pool));
// Create the data directory in-memory structure itself.
unique_ptr<DataDir> dd(new DataDir(
env_, metrics_.get(), p,
unique_ptr<PathInstanceMetadataFile>(instance.release()),
unique_ptr<ThreadPool>(pool.release())));
// Initialize the 'fullness' status of the data directory.
RETURN_NOT_OK(dd->RefreshIsFull(DataDir::RefreshMode::ALWAYS));
dds.emplace_back(std::move(dd));
i++;
}
RETURN_NOT_OK_PREPEND(PathInstanceMetadataFile::CheckIntegrity(instances),
Substitute("Could not verify integrity of files: $0",
JoinStrings(paths_, ",")));
// Build uuid index and data directory maps.
UuidIndexMap dd_by_uuid_idx;
ReverseUuidIndexMap uuid_idx_by_dd;
for (const auto& dd : dds) {
const PathSetPB& path_set = dd->instance()->metadata()->path_set();
uint32_t idx = -1;
for (int i = 0; i < path_set.all_uuids_size(); i++) {
if (path_set.uuid() == path_set.all_uuids(i)) {
idx = i;
break;
}
}
DCHECK_NE(idx, -1); // Guaranteed by CheckIntegrity().
if (idx > max_data_dirs) {
return Status::NotSupported(
Substitute("Block manager supports a maximum of $0 paths", max_data_dirs));
}
InsertOrDie(&dd_by_uuid_idx, idx, dd.get());
InsertOrDie(&uuid_idx_by_dd, dd.get(), idx);
}
data_dirs_.swap(dds);
data_dir_by_uuid_idx_.swap(dd_by_uuid_idx);
uuid_idx_by_data_dir_.swap(uuid_idx_by_dd);
return Status::OK();
}
Status DataDirManager::GetNextDataDir(DataDir** dir) {
// Round robin through the data dirs, ignoring ones that are full.
unordered_set<DataDir*> full_dds;
while (true) {
int32_t cur_idx;
int32_t next_idx;
do {
cur_idx = data_dirs_next_.Load();
next_idx = (cur_idx + 1) % data_dirs_.size();
} while (!data_dirs_next_.CompareAndSet(cur_idx, next_idx));
DataDir* candidate = data_dirs_[cur_idx].get();
RETURN_NOT_OK(candidate->RefreshIsFull(
DataDir::RefreshMode::EXPIRED_ONLY));
if (!candidate->is_full()) {
*dir = candidate;
return Status::OK();
}
// This data dir was full. If all are full, we can't satisfy the request.
full_dds.insert(candidate);
if (full_dds.size() == data_dirs_.size()) {
return Status::IOError(
"All data directories are full. Please free some disk space or "
"consider changing the fs_data_dirs_reserved_bytes configuration "
"parameter", "", ENOSPC);
}
}
}
DataDir* DataDirManager::FindDataDirByUuidIndex(uint16_t uuid_idx) const {
return FindPtrOrNull(data_dir_by_uuid_idx_, uuid_idx);
}
bool DataDirManager::FindUuidIndexByDataDir(DataDir* dir, uint16_t* uuid_idx) const {
return FindCopy(uuid_idx_by_data_dir_, dir, uuid_idx);
}
} // namespace fs
} // namespace kudu