blob: 6fb8b8a25e6261fac6a55d8b556e888e3db2345b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/util/env_util.h"
#include <fnmatch.h>
#include <algorithm>
#include <cerrno>
#include <cstdint>
#include <ctime>
#include <memory>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/gutil/bind.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/util/env.h"
#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/path_util.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
DEFINE_int64(disk_reserved_bytes_free_for_testing, -1,
"For testing only! Set to number of bytes free on each filesystem. "
"Set to -1 to disable this test-specific override");
TAG_FLAG(disk_reserved_bytes_free_for_testing, runtime);
TAG_FLAG(disk_reserved_bytes_free_for_testing, unsafe);
// We define some flags for testing purposes: Two prefixes and their associated
// "bytes free" overrides.
DEFINE_string(disk_reserved_override_prefix_1_path_for_testing, "",
"For testing only! Specifies a prefix to override the visible 'bytes free' on. "
"Use --disk_reserved_override_prefix_1_bytes_free_for_testing to set the number of "
"bytes free for this path prefix. Set to empty string to disable.");
DEFINE_int64(disk_reserved_override_prefix_1_bytes_free_for_testing, -1,
"For testing only! Set number of bytes free on the path prefix specified by "
"--disk_reserved_override_prefix_1_path_for_testing. Set to -1 to disable.");
DEFINE_string(disk_reserved_override_prefix_2_path_for_testing, "",
"For testing only! Specifies a prefix to override the visible 'bytes free' on. "
"Use --disk_reserved_override_prefix_2_bytes_free_for_testing to set the number of "
"bytes free for this path prefix. Set to empty string to disable.");
DEFINE_int64(disk_reserved_override_prefix_2_bytes_free_for_testing, -1,
"For testing only! Set number of bytes free on the path prefix specified by "
"--disk_reserved_override_prefix_2_path_for_testing. Set to -1 to disable.");
TAG_FLAG(disk_reserved_override_prefix_1_path_for_testing, unsafe);
TAG_FLAG(disk_reserved_override_prefix_2_path_for_testing, unsafe);
TAG_FLAG(disk_reserved_override_prefix_1_bytes_free_for_testing, unsafe);
TAG_FLAG(disk_reserved_override_prefix_2_bytes_free_for_testing, unsafe);
TAG_FLAG(disk_reserved_override_prefix_1_bytes_free_for_testing, runtime);
TAG_FLAG(disk_reserved_override_prefix_2_bytes_free_for_testing, runtime);
DEFINE_double(env_inject_full, 0.0,
"Fraction of the time that space checks on certain paths will "
"yield the posix code ENOSPC.");
TAG_FLAG(env_inject_full, hidden);
DEFINE_string(env_inject_full_globs, "*",
"Comma-separated list of glob patterns specifying which paths "
"return with space errors.");
TAG_FLAG(env_inject_full_globs, hidden);
using kudu::fault_injection::MaybeTrue;
using std::pair;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::unordered_set;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace {
// Returns whether the path specified by 'data_dir' should be considered full.
bool ShouldInjectSpaceError(const string& data_dir) {
if (PREDICT_FALSE(MaybeTrue(FLAGS_env_inject_full))) {
vector<string> globs =
strings::Split(FLAGS_env_inject_full_globs, ",", strings::SkipEmpty());
for (const auto& glob : globs) {
if (fnmatch(glob.c_str(), data_dir.c_str(), 0) == 0) {
return true;
return false;
} // anonymous namespace
namespace env_util {
Status OpenFileForWrite(Env* env, const string& path,
shared_ptr<WritableFile>* file) {
return OpenFileForWrite(WritableFileOptions(), env, path, file);
Status OpenFileForWrite(const WritableFileOptions& opts,
Env *env, const string &path,
shared_ptr<WritableFile> *file) {
unique_ptr<WritableFile> w;
RETURN_NOT_OK(env->NewWritableFile(opts, path, &w));
return Status::OK();
Status OpenFileForRandom(Env *env, const string &path,
shared_ptr<RandomAccessFile> *file) {
unique_ptr<RandomAccessFile> r;
RETURN_NOT_OK(env->NewRandomAccessFile(path, &r));
return Status::OK();
Status OpenFileForSequential(Env *env, const string &path,
shared_ptr<SequentialFile> *file) {
unique_ptr<SequentialFile> r;
RETURN_NOT_OK(env->NewSequentialFile(path, &r));
return Status::OK();
// If any of the override gflags specifies an override for the given path, then
// override the free bytes to match what is specified in the flag. See the
// definitions of these test-only flags for more information.
static void OverrideBytesFreeWithTestingFlags(const string& path, int64_t* bytes_free) {
const string* prefixes[] = { &FLAGS_disk_reserved_override_prefix_1_path_for_testing,
&FLAGS_disk_reserved_override_prefix_2_path_for_testing };
const int64_t* overrides[] = { &FLAGS_disk_reserved_override_prefix_1_bytes_free_for_testing,
&FLAGS_disk_reserved_override_prefix_2_bytes_free_for_testing };
for (int i = 0; i < arraysize(prefixes); i++) {
if (*overrides[i] != -1 && !prefixes[i]->empty() && HasPrefixString(path, *prefixes[i])) {
*bytes_free = *overrides[i];
Status VerifySufficientDiskSpace(Env *env, const std::string& path, int64_t requested_bytes,
int64_t reserved_bytes, int64_t* available_bytes) {
DCHECK_GE(requested_bytes, 0);
if (ShouldInjectSpaceError(path)) {
if (available_bytes) {
*available_bytes = 0;
return Status::IOError(Env::kInjectedFailureStatusMsg, "", ENOSPC);
const int64_t kOnePercentReservation = -1;
SpaceInfo space_info;
RETURN_NOT_OK(env->GetSpaceInfo(path, &space_info));
int64_t free_bytes = space_info.free_bytes;
// Allow overriding these values by tests.
if (PREDICT_FALSE(FLAGS_disk_reserved_bytes_free_for_testing > -1)) {
free_bytes = FLAGS_disk_reserved_bytes_free_for_testing;
if (PREDICT_FALSE(FLAGS_disk_reserved_override_prefix_1_bytes_free_for_testing != -1 ||
FLAGS_disk_reserved_override_prefix_2_bytes_free_for_testing != -1)) {
OverrideBytesFreeWithTestingFlags(path, &free_bytes);
// If they requested a one percent reservation, calculate what that is in bytes.
if (reserved_bytes == kOnePercentReservation) {
reserved_bytes = space_info.capacity_bytes / 100;
if (available_bytes) {
*available_bytes = free_bytes;
if (free_bytes - requested_bytes < reserved_bytes) {
return Status::IOError(Substitute("Insufficient disk space to allocate $0 bytes under path $1 "
"($2 bytes available vs $3 bytes reserved)",
requested_bytes, path, free_bytes, reserved_bytes),
"", ENOSPC);
return Status::OK();
Status CreateDirIfMissing(Env* env, const string& path, bool* created) {
Status s = env->CreateDir(path);
if (created != nullptr) {
*created = s.ok();
return s.IsAlreadyPresent() ? Status::OK() : s;
Status CreateDirsRecursively(Env* env, const string& path) {
vector<string> segments = SplitPath(path);
string partial_path;
for (const string& segment : segments) {
partial_path = partial_path.empty() ? segment : JoinPathSegments(partial_path, segment);
bool is_dir;
Status s = env->IsDirectory(partial_path, &is_dir);
if (s.ok()) {
// We didn't get a NotFound error, so something is there.
if (is_dir) continue; // It's a normal directory.
// Maybe a file or a symlink. Let's try to follow the symlink.
string real_partial_path;
RETURN_NOT_OK(env->Canonicalize(partial_path, &real_partial_path));
s = env->IsDirectory(real_partial_path, &is_dir);
if (s.ok() && is_dir) continue; // It's a symlink to a directory.
RETURN_NOT_OK_PREPEND(env->CreateDir(partial_path), "Unable to create directory");
return Status::OK();
Status CopyFile(Env* env, const string& source_path, const string& dest_path,
WritableFileOptions opts) {
unique_ptr<SequentialFile> source;
RETURN_NOT_OK(env->NewSequentialFile(source_path, &source));
uint64_t size;
RETURN_NOT_OK(env->GetFileSize(source_path, &size));
unique_ptr<WritableFile> dest;
RETURN_NOT_OK(env->NewWritableFile(opts, dest_path, &dest));
const int32_t kBufferSize = 1024 * 1024;
unique_ptr<uint8_t[]> scratch(new uint8_t[kBufferSize]);
uint64_t bytes_read = 0;
while (bytes_read < size) {
uint64_t max_bytes_to_read = std::min<uint64_t>(size - bytes_read, kBufferSize);
Slice data(scratch.get(), max_bytes_to_read);
bytes_read += data.size();
return Status::OK();
Status DeleteExcessFilesByPattern(Env* env, const string& pattern, int max_matches) {
// Negative numbers don't make sense for our interface.
DCHECK_GE(max_matches, 0);
vector<string> matching_files;
RETURN_NOT_OK(env->Glob(pattern, &matching_files));
if (matching_files.size() <= max_matches) {
return Status::OK();
vector<pair<time_t, string>> matching_file_mtimes;
for (string& matching_file_path : matching_files) {
int64_t mtime;
RETURN_NOT_OK(env->GetFileModifiedTime(matching_file_path, &mtime));
matching_file_mtimes.emplace_back(mtime, std::move(matching_file_path));
// Use mtime to determine which matching files to delete. This could
// potentially be ambiguous, depending on the resolution of last-modified
// timestamp in the filesystem, but that is part of the contract.
std::sort(matching_file_mtimes.begin(), matching_file_mtimes.end());
matching_file_mtimes.resize(matching_file_mtimes.size() - max_matches);
for (const auto& matching_file : matching_file_mtimes) {
return Status::OK();
// Callback for DeleteTmpFilesRecursively().
// Tests 'basename' for the Kudu-specific tmp file infix, and if found,
// deletes the file.
static Status DeleteTmpFilesRecursivelyCb(Env* env,
Env::FileType file_type,
const string& dirname,
const string& basename) {
if (file_type != Env::FILE_TYPE) {
// Skip directories.
return Status::OK();
if (basename.find(kTmpInfix) != string::npos) {
string filename = JoinPathSegments(dirname, basename);
Substitute("Failed to remove temporary file $0", filename));
return Status::OK();
Status DeleteTmpFilesRecursively(Env* env, const string& path) {
return env->Walk(path, Env::PRE_ORDER, Bind(&DeleteTmpFilesRecursivelyCb, env));
Status IsDirectoryEmpty(Env* env, const string& path, bool* is_empty) {
vector<string> children;
RETURN_NOT_OK(env->GetChildren(path, &children));
for (const auto& c : children) {
if (c == "." || c == "..") {
*is_empty = false;
return Status::OK();
*is_empty = true;
return Status::OK();
Status SyncAllParentDirs(Env* env,
const vector<string>& dirs,
const vector<string>& files) {
// An unordered_set is used to deduplicate the set of directories.
unordered_set<string> to_sync;
for (const auto& d : dirs) {
for (const auto& f : files) {
for (const auto& d : to_sync) {
Substitute("unable to synchronize directory $0", d));
return Status::OK();
Status ListFilesInDir(Env* env,
const string& path,
vector<string>* entries) {
RETURN_NOT_OK(env->GetChildren(path, entries));
auto iter = entries->begin();
while (iter != entries->end()) {
if (*iter == "." || *iter == ".." || iter->find(kTmpInfix) != string::npos) {
iter = entries->erase(iter);
return Status::OK();
} // namespace env_util
} // namespace kudu