blob: 82b4d93980eae6d052bc248b1e5caacb88b5834a [file] [log] [blame]
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <dirent.h>
#include <fcntl.h>
#include <fnmatch.h>
#include <fts.h>
#include <glob.h>
#include <openssl/rand.h>
#include <openssl/ssl.h>
#include <pthread.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/statvfs.h>
#include <sys/time.h>
#include <sys/uio.h>
#include <sys/utsname.h>
#include <unistd.h>
#include <algorithm>
#include <cerrno>
#include <climits>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <functional>
#include <memory>
#include <numeric>
#include <optional>
#include <ostream>
#include <string>
#include <type_traits>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/integral_types.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/once.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/util/array_view.h"
#include "kudu/util/coding-inl.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/env.h"
#include "kudu/util/errno.h"
#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/flags.h"
#include "kudu/util/logging.h"
#include "kudu/util/malloc.h"
#include "kudu/util/monotime.h"
#include "kudu/util/openssl_util.h"
#include "kudu/util/path_util.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/thread_restrictions.h"
#include "kudu/util/trace.h"
#if defined(__APPLE__)
#include <mach-o/dyld.h>
#include <sys/sysctl.h>
#include <linux/falloc.h>
#include <linux/fiemap.h>
#include <linux/fs.h>
#include <linux/ioctl.h>
#include <linux/kernel.h>
#include <linux/magic.h>
#include <linux/types.h>
#include <sys/ioctl.h>
#include <sys/sysinfo.h>
#include <sys/vfs.h>
#endif // defined(__APPLE__)
using base::subtle::Atomic64;
using base::subtle::Barrier_AtomicIncrement;
using std::accumulate;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
// Copied from falloc.h. Useful for older kernels that lack support for
// hole punching; fallocate(2) will return EOPNOTSUPP.
#define FALLOC_FL_KEEP_SIZE 0x01 /* default is extend size */
#define FALLOC_FL_PUNCH_HOLE 0x02 /* de-allocates range */
#ifndef __APPLE__
// These struct and ioctl definitions were copied verbatim from xfsprogs.
typedef struct xfs_flock64 {
__s16 l_type;
__s16 l_whence;
__s64 l_start;
__s64 l_len; /* len == 0 means until end of file */
__s32 l_sysid;
__u32 l_pid;
__s32 l_pad[4]; /* reserve area */
} xfs_flock64_t;
#define XFS_IOC_UNRESVSP64 _IOW ('X', 43, struct xfs_flock64)
// OSX does not have fdatasync or fread_unlocked.
#ifdef __APPLE__
#define fdatasync fsync
#define fread_unlocked fread
// With some probability, if 'filename_expr' matches the glob pattern specified
// by the 'env_inject_eio_globs' flag, calls RETURN_NOT_OK on 'error_expr'.
#define MAYBE_RETURN_EIO(filename_expr, error_expr) do { \
const string& f_ = (filename_expr); \
ShouldInject(f_, FLAGS_env_inject_eio_globs) ? (error_expr) : Status::OK()); \
} while (0)
bool ShouldInject(const string& candidate, const string& glob_patterns) {
// Never inject on /proc/ file accesses regardless of the configured flag,
// since it's not possible for /proc to "go bad".
// NB: it's important that this is done here _before_ consulting glob_patterns
// since some background threads read /proc/ after gflags have already been
// destructed.
if (HasPrefixString(candidate, "/proc/")) {
return false;
vector<string> globs = strings::Split(glob_patterns, ",", strings::SkipEmpty());
for (const auto& glob : globs) {
if (fnmatch(glob.c_str(), candidate.c_str(), 0) == 0) {
return true;
return false;
// See KUDU-588 for details.
DEFINE_bool(env_use_fsync, false,
"Use fsync(2) instead of fdatasync(2) for synchronizing dirty "
"data to disk.");
TAG_FLAG(env_use_fsync, advanced);
TAG_FLAG(env_use_fsync, evolving);
// See KUDU-2052 for details.
DEFINE_bool(env_use_ioctl_hole_punch_on_xfs, true,
"Use the XFS_IOC_UNRESVSP64 ioctl instead of fallocate(2) to "
"punch holes on XFS filesystems.");
TAG_FLAG(env_use_ioctl_hole_punch_on_xfs, advanced);
TAG_FLAG(env_use_ioctl_hole_punch_on_xfs, experimental);
DEFINE_bool(crash_on_eio, false,
"Kill the process if an I/O operation results in EIO. If false, "
"I/O resulting in EIOs will return the status IOError and leave "
"error-handling up to the caller.");
TAG_FLAG(crash_on_eio, advanced);
TAG_FLAG(crash_on_eio, experimental);
DEFINE_bool(never_fsync, false,
"Never fsync() anything to disk. This is used by certain test cases to "
"speed up runtime. This is very unsafe to use in production.");
TAG_FLAG(never_fsync, advanced);
TAG_FLAG(never_fsync, unsafe);
DEFINE_int32(env_inject_short_read_bytes, 0,
"The number of bytes less than the requested bytes to read");
TAG_FLAG(env_inject_short_read_bytes, hidden);
DEFINE_int32(env_inject_short_write_bytes, 0,
"The number of bytes less than the requested bytes to write");
TAG_FLAG(env_inject_short_write_bytes, hidden);
DEFINE_double(env_inject_eio, 0.0,
"Fraction of the time that operations on certain files will fail "
"with the posix code EIO.");
TAG_FLAG(env_inject_eio, hidden);
DEFINE_string(env_inject_eio_globs, "*",
"Comma-separated list of glob patterns specifying files on which "
"I/O will fail. By default, all files may cause a failure.");
TAG_FLAG(env_inject_eio_globs, hidden);
DEFINE_string(env_inject_lock_failure_globs, "",
"Comma-separated list of glob patterns specifying files on which "
"attempts to obtain a file lock will fail. By default, no files "
"will fail.");
TAG_FLAG(env_inject_lock_failure_globs, hidden);
DEFINE_bool(encrypt_data_at_rest, false,
"Whether sensitive files should be encrypted on the file system.");
DEFINE_int32(encryption_key_length, 128, "Encryption key length.");
TAG_FLAG(encryption_key_length, advanced);
[](const char* /*n*/, int32 v) { return v == 128 || v == 192 || v == 256; });
static __thread uint64_t thread_local_id;
static Atomic64 cur_thread_local_id_;
namespace kudu {
const char* const Env::kInjectedFailureStatusMsg = "INJECTED FAILURE";
const uint8_t kEncryptionBlockSize = 16;
const uint8_t kEncryptionHeaderSize = 64;
const char* const kEncryptionHeaderMagic = "kuduenc";
using evp_ctx_unique_ptr = std::unique_ptr<EVP_CIPHER_CTX, decltype(&EVP_CIPHER_CTX_free)>;
namespace {
struct FreeDeleter {
inline void operator()(void* ptr) const {
enum class EncryptionAlgorithm {
AES128CTR = 0x00,
AES192CTR = 0x01,
AES256CTR = 0x02,
// ECB mode below only used to encrypt keys.
AES128ECB = 0xFD,
AES256ECB = 0xFF,
// The encryption header is stored on disk as follows:
// *----------------------------------------------*
// | "kuduenc" magic string (7 bytes) |
// *----------------------------------------------*
// | Algorithm and key length (1 byte) |
// *----------------------------------------------*
// | Encrypted File Key (right-padded) (32 bytes) |
// *----------------------------------------------*
// | Reserved for future use (24 bytes) |
// *----------------------------------------------*
// The algorithm and key length mapping is:
// *------*-------------*
// | 0x00 | AES-128-CTR |
// *------*-------------*
// | 0x01 | AES-192-CTR |
// *------*-------------*
// | 0x02 | AES-256-CTR |
// *------*-------------*
struct EncryptionHeader {
EncryptionAlgorithm algorithm;
uint8_t key[32];
const EVP_CIPHER* GetEVPCipher(EncryptionAlgorithm algorithm) {
switch (algorithm) {
case EncryptionAlgorithm::AES128CTR:
return EVP_aes_128_ctr();
case EncryptionAlgorithm::AES192CTR:
return EVP_aes_192_ctr();
case EncryptionAlgorithm::AES256CTR:
return EVP_aes_256_ctr();
case EncryptionAlgorithm::AES128ECB:
return EVP_aes_128_ecb();
case EncryptionAlgorithm::AES192ECB:
return EVP_aes_192_ecb();
case EncryptionAlgorithm::AES256ECB:
return EVP_aes_256_ecb();
return nullptr;
#if defined(__APPLE__)
// Simulates Linux's fallocate file preallocation API on OS X.
int fallocate(int fd, int mode, off_t offset, off_t len) {
CHECK_EQ(mode, 0);
off_t size = offset + len;
struct stat stat;
int ret = fstat(fd, &stat);
if (ret < 0) {
return ret;
if (stat.st_blocks * 512 < size) {
// The offset field seems to have no effect; the file is always allocated
// with space from 0 to the size. This is probably because OS X does not
// support sparse files.
fstore_t store = {F_ALLOCATECONTIG, F_PEOFPOSMODE, 0, size};
if (fcntl(fd, F_PREALLOCATE, &store) < 0) {
LOG(INFO) << "Unable to allocate contiguous disk space, attempting non-contiguous allocation";
store.fst_flags = F_ALLOCATEALL;
ret = fcntl(fd, F_PREALLOCATE, &store);
if (ret < 0) {
return ret;
if (stat.st_size < size) {
// fcntl does not change the file size, so set it if necessary.
int ret;
RETRY_ON_EINTR(ret, ftruncate(fd, size));
return ret;
return 0;
// Implementations for `preadv` and `pwritev` are available in the MacOSX11+ SDK.
// However, we still use the simulated implementations in order to support older versions.
// Simulates Linux's preadv API on OS X.
ssize_t preadvsim(int fd, const struct iovec* iovec, int count, off_t offset) {
ssize_t total_read_bytes = 0;
for (int i = 0; i < count; i++) {
ssize_t r;
RETRY_ON_EINTR(r, pread(fd, iovec[i].iov_base, iovec[i].iov_len, offset));
if (r < 0) {
return r;
total_read_bytes += r;
if (static_cast<size_t>(r) < iovec[i].iov_len) {
offset += iovec[i].iov_len;
return total_read_bytes;
// Simulates Linux's pwritev API on OS X.
ssize_t pwritevsim(int fd, const struct iovec* iovec, int count, off_t offset) {
ssize_t total_written_bytes = 0;
for (int i = 0; i < count; i++) {
ssize_t r;
RETRY_ON_EINTR(r, pwrite(fd, iovec[i].iov_base, iovec[i].iov_len, offset));
if (r < 0) {
return r;
total_written_bytes += r;
if (static_cast<size_t>(r) < iovec[i].iov_len) {
offset += iovec[i].iov_len;
return total_written_bytes;
void DoClose(int fd) {
int err;
RETRY_ON_EINTR(err, close(fd));
if (PREDICT_FALSE(err != 0)) {
PLOG(WARNING) << "Failed to close fd " << fd;
// Close file descriptor when object goes out of scope.
class ScopedFdCloser {
explicit ScopedFdCloser(int fd)
: fd_(fd) {
~ScopedFdCloser() {
const int fd_;
Status IOError(const string& context, int err_number) {
switch (err_number) {
case ENOENT:
return Status::NotFound(context, ErrnoToString(err_number), err_number);
case EEXIST:
return Status::AlreadyPresent(context, ErrnoToString(err_number), err_number);
return Status::NotSupported(context, ErrnoToString(err_number), err_number);
case EIO:
if (FLAGS_crash_on_eio) {
// TODO(awong): This is very, very coarse-grained. A more comprehensive
// approach is described in KUDU-616.
LOG(FATAL) << "Fatal I/O error, context: " << context;
} else {
LOG(ERROR) << "I/O error, context: " << context;
return Status::IOError(context, ErrnoToString(err_number), err_number);
Status DoSync(int fd, const string& filename) {
MAYBE_RETURN_EIO(filename, IOError(Env::kInjectedFailureStatusMsg, EIO));
if (FLAGS_never_fsync) return Status::OK();
if (FLAGS_env_use_fsync) {
if (fsync(fd) < 0) {
return IOError(filename, errno);
} else {
if (fdatasync(fd) < 0) {
return IOError(filename, errno);
return Status::OK();
Status DoOpen(const string& filename, int flags, const string& reason, int* fd) {
int f;
RETRY_ON_EINTR(f, open(filename.c_str(), flags));
if (f == -1) {
return IOError(Substitute("Error opening for $0: $1", reason, filename), errno);
*fd = f;
return Status::OK();
// Encrypts the data in 'cleartext' and writes it to 'ciphertext'. It requires
// 'offset' to be set in the file as it's used to set the initialization vector.
Status DoEncryptV(const EncryptionHeader* eh,
uint64_t offset,
ArrayView<const Slice> cleartext,
ArrayView<Slice> ciphertext) {
DCHECK_EQ(cleartext.size(), ciphertext.size());
// Set the initialization vector based on the offset.
uint8_t iv[16];
InlineBigEndianEncodeFixed64(&iv[0], 0);
InlineBigEndianEncodeFixed64(&iv[8], offset / kEncryptionBlockSize);
evp_ctx_unique_ptr ctx(EVP_CIPHER_CTX_new(), EVP_CIPHER_CTX_free);
OPENSSL_RET_NOT_OK(EVP_EncryptInit_ex(ctx.get(), GetEVPCipher(eh->algorithm),
nullptr, eh->key, iv),
"Failed to initialize encryption");
size_t offset_mod = offset % kEncryptionBlockSize;
if (offset_mod) {
unsigned char scratch_clear[kEncryptionBlockSize];
unsigned char scratch_cipher[kEncryptionBlockSize];
int out_length;
OPENSSL_RET_NOT_OK(EVP_EncryptUpdate(ctx.get(), scratch_cipher, &out_length,
scratch_clear, offset_mod),
"Failed to encrypt scratch data");
DCHECK_LE(out_length, kEncryptionBlockSize);
for (auto i = 0; i < cleartext.size(); ++i) {
int out_length;
// Normally, EVP_EncryptFinal_ex() would be needed after the last chunk of
// data encrypted with EVP_EncryptUpdate(). In Kudu, we only use AES-CTR
// which requires no padding or authentication tags, so
// EVP_EncryptFinal_ex() doesn't actually add anything.
"Failed to encrypt data");
DCHECK_EQ(out_length, cleartext[i].size());
DCHECK_LE(out_length, ciphertext[i].size());
return Status::OK();
// Decrypts 'data'. Uses 'offset' in the file to set the initialization vector.
Status DoDecryptV(const EncryptionHeader* eh, uint64_t offset, ArrayView<Slice> data) {
// Set the initialization vector based on the offset.
uint8_t iv[16];
InlineBigEndianEncodeFixed64(&iv[0], 0);
InlineBigEndianEncodeFixed64(&iv[8], offset / kEncryptionBlockSize);
evp_ctx_unique_ptr ctx(EVP_CIPHER_CTX_new(), EVP_CIPHER_CTX_free);
OPENSSL_RET_NOT_OK(EVP_DecryptInit_ex(ctx.get(), GetEVPCipher(eh->algorithm),
nullptr, eh->key, iv),
"Failed to initialize decryption");
size_t offset_mod = offset % kEncryptionBlockSize;
if (offset_mod) {
unsigned char scratch_clear[kEncryptionBlockSize];
unsigned char scratch_cipher[kEncryptionBlockSize];
int out_length;
"Failed to decrypt scratch data");
for (auto i = 0; i < data.size(); ++i) {
const Slice& ciphertext_slice = data[i];
int in_length = ciphertext_slice.size();
if (!in_length || IsAllZeros(ciphertext_slice)) continue;
int out_length;
// We don't call EVP_DecryptFinal_ex() after EVP_DecryptUpdate() for the
// same reason we don't call EVP_EncryptFinal_ex().
"Failed to decrypt data");
return Status::OK();
Status DoOpen(const string& filename, Env::OpenMode mode, int* fd) {
MAYBE_RETURN_EIO(filename, IOError(Env::kInjectedFailureStatusMsg, EIO));
int flags = O_RDWR;
switch (mode) {
flags |= O_CREAT | O_TRUNC;
flags |= O_CREAT;
case Env::MUST_CREATE:
flags |= O_CREAT | O_EXCL;
case Env::MUST_EXIST:
return Status::NotSupported(Substitute("Unknown create mode $0", mode));
int f;
RETRY_ON_EINTR(f, open(filename.c_str(), flags, 0666));
if (f < 0) {
return IOError(filename, errno);
*fd = f;
return Status::OK();
Status DoReadV(
int fd,
const string& filename,
uint64_t offset,
ArrayView<Slice> results,
const EncryptionHeader* eh) {
MAYBE_RETURN_EIO(filename, IOError(Env::kInjectedFailureStatusMsg, EIO));
// Convert the results into the iovec vector to request
// and calculate the total bytes requested
size_t bytes_req = 0;
size_t iov_size = results.size();
struct iovec iov[iov_size];
for (size_t i = 0; i < iov_size; i++) {
Slice& result = results[i];
bytes_req += result.size();
iov[i] = {result.mutable_data(), result.size()};
uint64_t cur_offset = offset;
size_t completed_iov = 0;
size_t rem = bytes_req;
while (rem > 0) {
// Never request more than IOV_MAX in one request
size_t iov_count = std::min(iov_size - completed_iov, static_cast<size_t>(IOV_MAX));
ssize_t r;
#if defined(__APPLE__)
RETRY_ON_EINTR(r, preadvsim(fd, iov + completed_iov, iov_count, cur_offset));
RETRY_ON_EINTR(r, preadv(fd, iov + completed_iov, iov_count, cur_offset));
// Fake a short read for testing
if (PREDICT_FALSE(FLAGS_env_inject_short_read_bytes > 0 && rem == bytes_req)) {
DCHECK_LT(FLAGS_env_inject_short_read_bytes, r);
r -= FLAGS_env_inject_short_read_bytes;
if (PREDICT_FALSE(r < 0)) {
// An error: return a non-ok status.
return IOError(filename, errno);
if (PREDICT_FALSE(r == 0)) {
// EOF.
if (eh) {
RETURN_NOT_OK(DoDecryptV(eh, offset, results));
return Status::EndOfFile(
Substitute("EOF trying to read $0 bytes at offset $1", bytes_req, offset));
if (PREDICT_TRUE(r == rem)) {
// All requested bytes were read. This is almost always the case.
if (eh) {
RETURN_NOT_OK(DoDecryptV(eh, offset, results));
return Status::OK();
DCHECK_LE(r, rem);
// Adjust iovec vector based on bytes read for the next request
ssize_t bytes_rem = r;
for (size_t i = completed_iov; i < iov_size; i++) {
if (bytes_rem >= iov[i].iov_len) {
// The full length of this iovec was read
bytes_rem -= iov[i].iov_len;
} else {
// Partially read this result.
// Adjust the iov_len and iov_base to request only the missing data.
iov[i].iov_base = static_cast<uint8_t *>(iov[i].iov_base) + bytes_rem;
iov[i].iov_len -= bytes_rem;
break; // Don't need to adjust remaining iovec's
cur_offset += r;
rem -= r;
if (eh) {
RETURN_NOT_OK(DoDecryptV(eh, offset, results));
DCHECK_EQ(0, rem);
return Status::OK();
Status DoWriteV(
int fd,
const string& filename,
uint64_t offset,
ArrayView<const Slice> data,
const EncryptionHeader* eh) {
MAYBE_RETURN_EIO(filename, IOError(Env::kInjectedFailureStatusMsg, EIO));
// Convert the results into the iovec vector to request
// and calculate the total bytes requested.
size_t bytes_req = 0;
size_t iov_size = data.size();
struct iovec iov[iov_size];
std::vector<Slice> encrypted_data(iov_size);
if (eh) {
delete[] encrypted_data[0].mutable_data();
if (eh) {
for (size_t i = 0; i < iov_size; i++) {
bytes_req += data[i].size();
unique_ptr<unsigned char[]> encrypted_buf(new unsigned char[bytes_req]);
size_t buffer_offset = 0;
for (size_t i = 0; i < iov_size; i++) {
size_t size = data[i].size();
encrypted_data[i] = Slice(&encrypted_buf[buffer_offset], size);
buffer_offset += size;
iov[i] = {const_cast<uint8_t*>(encrypted_data[i].data()), size};
RETURN_NOT_OK(DoEncryptV(eh, offset, data, encrypted_data));
} else {
for (size_t i = 0; i < iov_size; i++) {
const Slice& result = data[i];
bytes_req += result.size();
iov[i] = {const_cast<uint8_t*>(, result.size()};
uint64_t cur_offset = offset;
size_t completed_iov = 0;
size_t rem = bytes_req;
while (rem > 0) {
// Never request more than IOV_MAX in one request.
size_t iov_count = std::min(iov_size - completed_iov, static_cast<size_t>(IOV_MAX));
ssize_t w;
#if defined(__APPLE__)
RETRY_ON_EINTR(w, pwritevsim(fd, iov + completed_iov, iov_count, cur_offset));
RETRY_ON_EINTR(w, pwritev(fd, iov + completed_iov, iov_count, cur_offset));
// Fake a short write for testing.
if (PREDICT_FALSE(FLAGS_env_inject_short_write_bytes > 0 && rem == bytes_req)) {
DCHECK_LT(FLAGS_env_inject_short_write_bytes, w);
w -= FLAGS_env_inject_short_read_bytes;
if (PREDICT_FALSE(w < 0)) {
// An error: return a non-ok status.
return IOError(filename, errno);
DCHECK_LE(w, rem);
if (PREDICT_TRUE(w == rem)) {
// All requested bytes were read. This is almost always the case.
return Status::OK();
// Adjust iovec vector based on bytes read for the next request.
ssize_t bytes_rem = w;
for (size_t i = completed_iov; i < iov_size; i++) {
if (bytes_rem >= iov[i].iov_len) {
// The full length of this iovec was written.
bytes_rem -= iov[i].iov_len;
} else {
// Partially wrote this result.
// Adjust the iov_len and iov_base to write only the missing data.
iov[i].iov_base = static_cast<uint8_t *>(iov[i].iov_base) + bytes_rem;
iov[i].iov_len -= bytes_rem;
break; // Don't need to adjust remaining iovec's.
cur_offset += w;
rem -= w;
DCHECK_EQ(0, rem);
return Status::OK();
Status GenerateHeader(EncryptionHeader* eh) {
switch (FLAGS_encryption_key_length) {
case 128:
eh->algorithm = EncryptionAlgorithm::AES128CTR;
case 192:
eh->algorithm = EncryptionAlgorithm::AES192CTR;
case 256:
eh->algorithm = EncryptionAlgorithm::AES256CTR;
return Status::InvalidArgument(
"Supported key lengths for AES encryption are 128, 192, and 256.");
OPENSSL_RET_NOT_OK(RAND_bytes(eh->key, FLAGS_encryption_key_length / 8),
"Failed to generate random key");
return Status::OK();
Status WriteEncryptionHeader(int fd, const string& filename, const EncryptionHeader& server_key,
const EncryptionHeader& eh) {
vector<Slice> headerv = { kEncryptionHeaderMagic };
uint32_t key_size;
uint8_t algorithm[1];
switch (eh.algorithm) {
case EncryptionAlgorithm::AES128CTR:
algorithm[0] = 0;
key_size = 16;
case EncryptionAlgorithm::AES192CTR:
algorithm[0] = 1;
// As the keys are encrypted in ECB mode which requires padding, we need
// 32 bytes instead of 24 to encrypt and write a 192-bit key.
key_size = 32;
case EncryptionAlgorithm::AES256CTR:
algorithm[0] = 2;
key_size = 32;
return Status::InvalidArgument(Substitute("Unknown encryption algorithm: $0", algorithm));
headerv.emplace_back(Slice(algorithm, 1));
Slice file_key(eh.key, key_size);
uint8_t encrypted_file_key[32];
Slice efk(encrypted_file_key, key_size);
vector<Slice> clear = {file_key};
vector<Slice> cipher = {efk};
RETURN_NOT_OK(DoEncryptV(&server_key, 0, clear, cipher));
// Add the encrypted file key and trailing zeros to the header.
static const uint8_t padding[40] = {0};
// 7 bytes of magic + 1 byte of algorithm and key length.
constexpr int kMagicAndAlgorithmSize = 8;
Slice padding_slice(padding, kEncryptionHeaderSize - kMagicAndAlgorithmSize - key_size);
return DoWriteV(fd, filename, 0, headerv, nullptr);
Status DoIsOnXfsFilesystem(const string& path, bool* result) {
#ifdef __APPLE__
*result = false;
struct statfs buf;
int ret;
RETRY_ON_EINTR(ret, statfs(path.c_str(), &buf));
if (ret == -1) {
return IOError(Substitute("statfs: $0", path), errno);
// This magic number isn't defined in any header but is the value of the
// US-ASCII string 'XFSB' expressed in hexadecimal.
*result = (buf.f_type == 0x58465342);
return Status::OK();
Status ReadEncryptionHeader(int fd, const string& filename, const EncryptionHeader& server_key,
EncryptionHeader* eh) {
char magic[7];
uint8_t algorithm[1];
char file_key[32];
vector<Slice> headerv({ Slice(magic, 7), Slice(algorithm, 1), Slice(file_key, 32) });
RETURN_NOT_OK(DoReadV(fd, filename, 0, headerv, nullptr));
if (strncmp(magic, kEncryptionHeaderMagic, 7) != 0) {
return Status::Corruption(Substitute("Invalid encryption header: $0", magic));
uint16_t key_size;
eh->algorithm = EncryptionAlgorithm(algorithm[0]);
switch (eh->algorithm) {
case EncryptionAlgorithm::AES128CTR:
key_size = 16;
case EncryptionAlgorithm::AES192CTR:
key_size = 24;
case EncryptionAlgorithm::AES256CTR:
key_size = 32;
return Status::Corruption(Substitute("Unknown encryption algorithm: $0", algorithm));
// Round up to the nearest multiple of 16 bytes when reading and decrypting
// the file. The actual key size can be used when storing the key in memory.
// See WriteEncryptionHeader for more info.
vector<Slice> v = {Slice(file_key, (key_size + 15) & -16)};
RETURN_NOT_OK(DoDecryptV(&server_key, 0, v));
memcpy(&eh->key, file_key, key_size);
return Status::OK();
const char* ResourceLimitTypeToString(Env::ResourceLimitType t) {
switch (t) {
case Env::ResourceLimitType::OPEN_FILES_PER_PROCESS:
return "open files per process";
case Env::ResourceLimitType::RUNNING_THREADS_PER_EUID:
return "running threads per effective uid";
default: LOG(FATAL) << "Unknown resource limit type";
int ResourceLimitTypeToUnixRlimit(Env::ResourceLimitType t) {
switch (t) {
case Env::ResourceLimitType::OPEN_FILES_PER_PROCESS: return RLIMIT_NOFILE;
case Env::ResourceLimitType::RUNNING_THREADS_PER_EUID: return RLIMIT_NPROC;
default: LOG(FATAL) << "Unknown resource limit type: " << t;
#ifdef __APPLE__
const char* ResourceLimitTypeToMacosRlimit(Env::ResourceLimitType t) {
switch (t) {
case Env::ResourceLimitType::OPEN_FILES_PER_PROCESS:
return "kern.maxfilesperproc";
case Env::ResourceLimitType::RUNNING_THREADS_PER_EUID:
return "kern.maxprocperuid";
default: LOG(FATAL) << "Unknown resource limit type: " << t;
class PosixFifo : public Fifo {
explicit PosixFifo(string fname) : filename_(std::move(fname)) {}
size_t GetEncryptionHeaderSize() const override {
return 0;
const string& filename() const override {
return filename_;
Status OpenForReads() override {
CHECK_EQ(-1, read_fd_);
return DoOpen(filename_, O_RDONLY, "reads", &read_fd_);
Status OpenForWrites() override {
CHECK_EQ(-1, write_fd_);
return DoOpen(filename_, O_WRONLY, "writes", &write_fd_);
int read_fd() const override {
CHECK_NE(-1, read_fd_);
return read_fd_;
int write_fd() const override {
CHECK_NE(-1, write_fd_);
return write_fd_;
~PosixFifo() {
if (read_fd_ != -1) {
if (write_fd_ != -1) {
const string filename_;
int read_fd_ = -1;
int write_fd_ = -1;
class PosixSequentialFile: public SequentialFile {
const string filename_;
FILE* const file_;
const bool encrypted_;
size_t offset_;
const EncryptionHeader encryption_header_;
PosixSequentialFile(string fname, bool encrypted, FILE* f, EncryptionHeader eh)
: filename_(std::move(fname)),
offset_(encrypted ? kEncryptionHeaderSize : 0),
encryption_header_(eh) {}
~PosixSequentialFile() {
int err;
RETRY_ON_EINTR(err, fclose(file_));
if (PREDICT_FALSE(err != 0)) {
PLOG(WARNING) << "Failed to close " << filename_;
virtual Status Read(Slice* result) OVERRIDE {
MAYBE_RETURN_EIO(filename_, IOError(Env::kInjectedFailureStatusMsg, EIO));
size_t r;
STREAM_RETRY_ON_EINTR(r, file_, fread_unlocked(result->mutable_data(), 1,
result->size(), file_));
if (r < result->size()) {
if (feof(file_)) {
// We leave status as ok if we hit the end of the file.
// We need to adjust the slice size.
} else {
// A partial read with an error: return a non-ok status.
return IOError(filename_, errno);
if (encrypted_) {
RETURN_NOT_OK(DoDecryptV(&encryption_header_, offset_, ArrayView<Slice>(result, 1)));
offset_ += r;
return Status::OK();
virtual Status Skip(uint64_t n) OVERRIDE {
MAYBE_RETURN_EIO(filename_, IOError(Env::kInjectedFailureStatusMsg, EIO));
TRACE_EVENT1("io", "PosixSequentialFile::Skip", "path", filename_);
if (fseek(file_, n, SEEK_CUR)) {
return IOError(filename_, errno);
offset_ += n;
return Status::OK();
virtual const string& filename() const OVERRIDE { return filename_; }
size_t GetEncryptionHeaderSize() const override {
return encrypted_ ? kEncryptionHeaderSize : 0;
// pread() based random-access
class PosixRandomAccessFile: public RandomAccessFile {
const string filename_;
const int fd_;
const bool encrypted_;
const EncryptionHeader encryption_header_;
PosixRandomAccessFile(string fname, int fd, bool encrypted, EncryptionHeader eh)
: filename_(std::move(fname)),
encryption_header_(eh) {}
~PosixRandomAccessFile() {
virtual Status Read(uint64_t offset, Slice result) const OVERRIDE {
DCHECK_GE(offset, GetEncryptionHeaderSize());
return DoReadV(fd_, filename_, offset, ArrayView<Slice>(&result, 1),
encrypted_ ? &encryption_header_ : nullptr);
virtual Status ReadV(uint64_t offset, ArrayView<Slice> results) const OVERRIDE {
DCHECK_GE(offset, GetEncryptionHeaderSize());
return DoReadV(fd_, filename_, offset, results,
encrypted_ ? &encryption_header_ : nullptr);
virtual Status Size(uint64_t *size) const OVERRIDE {
MAYBE_RETURN_EIO(filename_, IOError(Env::kInjectedFailureStatusMsg, EIO));
TRACE_EVENT1("io", "PosixRandomAccessFile::Size", "path", filename_);
struct stat st;
if (fstat(fd_, &st) == -1) {
return IOError(filename_, errno);
*size = st.st_size;
return Status::OK();
virtual const string& filename() const OVERRIDE { return filename_; }
size_t GetEncryptionHeaderSize() const override {
return encrypted_ ? kEncryptionHeaderSize : 0;
virtual size_t memory_footprint() const OVERRIDE {
return kudu_malloc_usable_size(this) + filename_.capacity();
// Use non-memory mapped POSIX files to write data to a file.
// TODO (perf) investigate zeroing a pre-allocated allocated area in
// order to further improve Sync() performance.
class PosixWritableFile : public WritableFile {
PosixWritableFile(string fname, int fd, uint64_t file_size, bool sync_on_close,
bool encrypted, EncryptionHeader eh)
: filename_(std::move(fname)),
encryption_header_(eh) {}
~PosixWritableFile() {
WARN_NOT_OK(Close(), "Failed to close " + filename_);
virtual Status Append(const Slice& data) OVERRIDE {
return AppendV(ArrayView<const Slice>(&data, 1));
virtual Status AppendV(ArrayView<const Slice> data) OVERRIDE {
RETURN_NOT_OK(DoWriteV(fd_, filename_, filesize_, data,
encrypted_ ? &encryption_header_ : nullptr));
// Calculate the amount of data written
size_t bytes_written = accumulate(data.begin(), data.end(), static_cast<size_t>(0),
[&](int sum, const Slice& curr) {
return sum + curr.size();
filesize_ += bytes_written;
pending_sync_ = true;
return Status::OK();
virtual Status PreAllocate(uint64_t size) OVERRIDE {
MAYBE_RETURN_EIO(filename_, IOError(Env::kInjectedFailureStatusMsg, EIO));
TRACE_EVENT1("io", "PosixWritableFile::PreAllocate", "path", filename_);
uint64_t offset = std::max(filesize_, pre_allocated_size_);
int ret;
RETRY_ON_EINTR(ret, fallocate(fd_, 0, offset, size));
if (ret != 0) {
if (errno == EOPNOTSUPP) {
KLOG_FIRST_N(WARNING, 1) << "The filesystem does not support fallocate().";
} else if (errno == ENOSYS) {
KLOG_FIRST_N(WARNING, 1) << "The kernel does not implement fallocate().";
} else {
return IOError(filename_, errno);
pre_allocated_size_ = offset + size;
return Status::OK();
virtual Status Close() OVERRIDE {
if (closed_) {
return Status::OK();
TRACE_EVENT1("io", "PosixWritableFile::Close", "path", filename_);
MAYBE_RETURN_EIO(filename_, IOError(Env::kInjectedFailureStatusMsg, EIO));
Status s;
// If we've allocated more space than we used, truncate to the
// actual size of the file and perform Sync().
if (filesize_ < pre_allocated_size_) {
int ret;
RETRY_ON_EINTR(ret, ftruncate(fd_, filesize_));
if (ret != 0) {
s = IOError(filename_, errno);
pending_sync_ = true;
if (sync_on_close_) {
Status sync_status = Sync();
if (!sync_status.ok()) {
LOG(ERROR) << "Unable to Sync " << filename_ << ": " << sync_status.ToString();
if (s.ok()) {
s = sync_status;
int ret;
RETRY_ON_EINTR(ret, close(fd_));
if (ret < 0) {
if (s.ok()) {
s = IOError(filename_, errno);
closed_ = true;
return s;
virtual Status Flush(FlushMode mode) OVERRIDE {
TRACE_EVENT1("io", "PosixWritableFile::Flush", "path", filename_);
MAYBE_RETURN_EIO(filename_, IOError(Env::kInjectedFailureStatusMsg, EIO));
#if defined(__linux__)
if (mode == FLUSH_SYNC) {
if (sync_file_range(fd_, 0, 0, flags) < 0) {
return IOError(filename_, errno);
if (mode == FLUSH_SYNC && fsync(fd_) < 0) {
return IOError(filename_, errno);
return Status::OK();
virtual Status Sync() OVERRIDE {
TRACE_EVENT1("io", "PosixWritableFile::Sync", "path", filename_);
LOG_SLOW_EXECUTION(WARNING, 1000, Substitute("sync call for $0", filename_)) {
if (pending_sync_) {
pending_sync_ = false;
RETURN_NOT_OK(DoSync(fd_, filename_));
return Status::OK();
virtual uint64_t Size() const OVERRIDE {
return filesize_;
virtual const string& filename() const OVERRIDE { return filename_; }
size_t GetEncryptionHeaderSize() const override {
return encrypted_ ? kEncryptionHeaderSize : 0;
const string filename_;
const int fd_;
const bool sync_on_close_;
uint64_t filesize_;
uint64_t pre_allocated_size_;
bool pending_sync_;
bool closed_;
const bool encrypted_;
const EncryptionHeader encryption_header_;
class PosixRWFile : public RWFile {
PosixRWFile(string fname, int fd, bool sync_on_close, bool encrypted,
EncryptionHeader eh)
: filename_(std::move(fname)),
encryption_header_(eh) {}
~PosixRWFile() {
WARN_NOT_OK(Close(), "Failed to close " + filename_);
virtual Status Read(uint64_t offset, Slice result) const OVERRIDE {
DCHECK_GE(offset, GetEncryptionHeaderSize());
return DoReadV(fd_, filename_, offset, ArrayView<Slice>(&result, 1),
encrypted_ ? &encryption_header_ : nullptr);
virtual Status ReadV(uint64_t offset, ArrayView<Slice> results) const OVERRIDE {
DCHECK_GE(offset, GetEncryptionHeaderSize());
return DoReadV(fd_, filename_, offset, results,
encrypted_ ? &encryption_header_ : nullptr);
virtual Status Write(uint64_t offset, const Slice& data) OVERRIDE {
return WriteV(offset, ArrayView<const Slice>(&data, 1));
virtual Status WriteV(uint64_t offset, ArrayView<const Slice> data) OVERRIDE {
DCHECK_GE(offset, GetEncryptionHeaderSize());
return DoWriteV(fd_, filename_, offset, data,
encrypted_ ? &encryption_header_ : nullptr);
virtual Status PreAllocate(uint64_t offset,
size_t length,
PreAllocateMode mode) OVERRIDE {
MAYBE_RETURN_EIO(filename_, IOError(Env::kInjectedFailureStatusMsg, EIO));
TRACE_EVENT1("io", "PosixRWFile::PreAllocate", "path", filename_);
int falloc_mode = 0;
if (mode == DONT_CHANGE_FILE_SIZE) {
falloc_mode = FALLOC_FL_KEEP_SIZE;
int ret;
RETRY_ON_EINTR(ret, fallocate(fd_, falloc_mode, offset, length));
if (ret != 0) {
if (errno == EOPNOTSUPP) {
KLOG_FIRST_N(WARNING, 1) << "The filesystem does not support fallocate().";
} else if (errno == ENOSYS) {
KLOG_FIRST_N(WARNING, 1) << "The kernel does not implement fallocate().";
} else {
return IOError(filename_, errno);
return Status::OK();
virtual Status Truncate(uint64_t length) OVERRIDE {
TRACE_EVENT2("io", "PosixRWFile::Truncate", "path", filename_, "length", length);
MAYBE_RETURN_EIO(filename_, IOError(Env::kInjectedFailureStatusMsg, EIO));
int ret;
RETRY_ON_EINTR(ret, ftruncate(fd_, length));
if (ret != 0) {
int err = errno;
return Status::IOError(Substitute("Unable to truncate file $0", filename_),
Substitute("ftruncate() failed: $0", ErrnoToString(err)),
return Status::OK();
virtual Status PunchHole(uint64_t offset, size_t length) OVERRIDE {
#if defined(__linux__)
TRACE_EVENT1("io", "PosixRWFile::PunchHole", "path", filename_);
MAYBE_RETURN_EIO(filename_, IOError(Env::kInjectedFailureStatusMsg, EIO));
// KUDU-2052: xfs in el6 systems induces an fsync in the kernel whenever it
// performs a hole punch through the fallocate() syscall, even if the file
// range was already punched out. The older xfs-specific hole punching
// ioctl doesn't do this, despite eventually executing the same xfs code.
// To keep the code simple, we'll use this ioctl on any xfs system (not
// just on el6) and fallocate() on all other filesystems.
// Note: the cast to void* here (and back to PosixRWFile*, in InitIsOnXFS)
// is needed to avoid an undefined behavior warning from UBSAN.
once_.Init(&InitIsOnXFS, reinterpret_cast<void*>(this));
if (is_on_xfs_ && FLAGS_env_use_ioctl_hole_punch_on_xfs) {
xfs_flock64_t cmd;
memset(&cmd, 0, sizeof(cmd));
cmd.l_start = offset;
cmd.l_len = length;
if (ioctl(fd_, XFS_IOC_UNRESVSP64, &cmd) < 0) {
return IOError(filename_, errno);
} else {
int ret;
RETRY_ON_EINTR(ret, fallocate(
offset, length));
if (ret != 0) {
return IOError(filename_, errno);
return Status::OK();
return Status::NotSupported("Hole punching not supported on this platform");
virtual Status Flush(FlushMode mode, uint64_t offset, size_t length) OVERRIDE {
TRACE_EVENT1("io", "PosixRWFile::Flush", "path", filename_);
MAYBE_RETURN_EIO(filename_, IOError(Env::kInjectedFailureStatusMsg, EIO));
#if defined(__linux__)
if (mode == FLUSH_SYNC) {
if (sync_file_range(fd_, offset, length, flags) < 0) {
return IOError(filename_, errno);
if (mode == FLUSH_SYNC && fsync(fd_) < 0) {
return IOError(filename_, errno);
return Status::OK();
virtual Status Sync() OVERRIDE {
TRACE_EVENT1("io", "PosixRWFile::Sync", "path", filename_);
LOG_SLOW_EXECUTION(WARNING, 1000, Substitute("sync call for $0", filename())) {
RETURN_NOT_OK(DoSync(fd_, filename_));
return Status::OK();
virtual Status Close() OVERRIDE {
if (closed_) {
return Status::OK();
TRACE_EVENT1("io", "PosixRWFile::Close", "path", filename_);
MAYBE_RETURN_EIO(filename_, IOError(Env::kInjectedFailureStatusMsg, EIO));
Status s;
if (sync_on_close_) {
s = Sync();
if (!s.ok()) {
LOG(ERROR) << "Unable to Sync " << filename_ << ": " << s.ToString();
int ret;
RETRY_ON_EINTR(ret, close(fd_));
if (ret < 0) {
if (s.ok()) {
s = IOError(filename_, errno);
closed_ = true;
return s;
virtual Status Size(uint64_t* size) const OVERRIDE {
TRACE_EVENT1("io", "PosixRWFile::Size", "path", filename_);
MAYBE_RETURN_EIO(filename_, IOError(Env::kInjectedFailureStatusMsg, EIO));
struct stat st;
if (fstat(fd_, &st) == -1) {
return IOError(filename_, errno);
*size = st.st_size;
return Status::OK();
virtual Status GetExtentMap(ExtentMap* out) const OVERRIDE {
#if !defined(__linux__)
return Status::NotSupported("GetExtentMap not supported on this platform");
TRACE_EVENT1("io", "PosixRWFile::GetExtentMap", "path", filename_);
MAYBE_RETURN_EIO(filename_, IOError(Env::kInjectedFailureStatusMsg, EIO));
// This allocation size is arbitrary.
static const int kBufSize = 4096;
uint8_t buf[kBufSize] = { 0 };
struct fiemap* fm = reinterpret_cast<struct fiemap*>(buf);
struct fiemap_extent* fme = &fm->fm_extents[0];
int avail_extents_in_buffer = (kBufSize - sizeof(*fm)) / sizeof(*fme);
bool saw_last_extent = false;
ExtentMap extents;
do {
// Fetch another block of extents.
fm->fm_length = FIEMAP_MAX_OFFSET;
fm->fm_extent_count = avail_extents_in_buffer;
if (ioctl(fd_, FS_IOC_FIEMAP, fm) == -1) {
return IOError(filename_, errno);
// No extents returned, this file must have no extents.
if (fm->fm_mapped_extents == 0) {
// Parse the extent block.
uint64_t last_extent_end_offset;
for (int i = 0; i < fm->fm_mapped_extents; i++) {
if (fme[i].fe_flags & FIEMAP_EXTENT_LAST) {
// This should really be the last extent.
CHECK_EQ(fm->fm_mapped_extents - 1, i);
saw_last_extent = true;
InsertOrDie(&extents, fme[i].fe_logical, fme[i].fe_length);
VLOG(3) << Substitute("File $0 extent $1: o $2, l $3 $4",
filename_, i,
fme[i].fe_logical, fme[i].fe_length,
saw_last_extent ? "(final)" : "");
last_extent_end_offset = fme[i].fe_logical + fme[i].fe_length;
if (saw_last_extent) {
fm->fm_start = last_extent_end_offset;
} while (!saw_last_extent);
return Status::OK();
bool IsEncrypted() const override {
return encrypted_;
virtual const string& filename() const OVERRIDE {
return filename_;
size_t GetEncryptionHeaderSize() const override {
return encrypted_ ? kEncryptionHeaderSize : 0;
static void InitIsOnXFS(void* arg) {
PosixRWFile* rwf = reinterpret_cast<PosixRWFile*>(arg);
bool result;
Status s = DoIsOnXfsFilesystem(rwf->filename_, &result);
if (s.ok()) {
rwf->is_on_xfs_ = result;
} else {
Substitute("Could not determine whether file is on xfs, assuming not: $0",
const string filename_;
const int fd_;
const bool sync_on_close_;
GoogleOnceDynamic once_;
bool is_on_xfs_;
bool closed_;
const bool encrypted_;
const EncryptionHeader encryption_header_;
int LockOrUnlock(int fd, bool lock) {
errno = 0;
struct flock f;
memset(&f, 0, sizeof(f));
f.l_type = (lock ? F_WRLCK : F_UNLCK);
f.l_whence = SEEK_SET;
f.l_start = 0;
f.l_len = 0; // Lock/unlock entire file
int ret;
RETRY_ON_EINTR(ret, fcntl(fd, F_SETLK, &f));
return ret;
class PosixFileLock : public FileLock {
int fd_;
static Env* default_env;
class PosixEnv : public Env {
~PosixEnv() {
if (this == default_env) {
fprintf(stderr, "Destroying Env::Default()\n");
virtual Status NewSequentialFile(const string& fname,
unique_ptr<SequentialFile>* result) override {
return NewSequentialFile(SequentialFileOptions(), fname, result);
virtual Status NewSequentialFile(const SequentialFileOptions& opts,
const string& fname,
unique_ptr<SequentialFile>* result) OVERRIDE {
TRACE_EVENT1("io", "PosixEnv::NewSequentialFile", "path", fname);
MAYBE_RETURN_EIO(fname, IOError(Env::kInjectedFailureStatusMsg, EIO));
FILE* f;
POINTER_RETRY_ON_EINTR(f, fopen(fname.c_str(), "r"));
if (f == nullptr) {
return IOError(fname, errno);
bool encrypted = opts.is_sensitive && IsEncryptionEnabled();
EncryptionHeader header;
if (encrypted) {
int fd;
RETURN_NOT_OK(DoOpen(fname, OpenMode::MUST_EXIST, &fd));
RETURN_NOT_OK(ReadEncryptionHeader(fd, fname, *server_key_, &header));
if (fseek(f, kEncryptionHeaderSize, SEEK_CUR)) {
return IOError(fname, errno);
result->reset(new PosixSequentialFile(fname, encrypted, f, header));
return Status::OK();
virtual Status NewRandomAccessFile(const string& fname,
unique_ptr<RandomAccessFile>* result) OVERRIDE {
return NewRandomAccessFile(RandomAccessFileOptions(), fname, result);
virtual Status NewRandomAccessFile(const RandomAccessFileOptions& opts,
const string& fname,
unique_ptr<RandomAccessFile>* result) OVERRIDE {
TRACE_EVENT1("io", "PosixEnv::NewRandomAccessFile", "path", fname);
MAYBE_RETURN_EIO(fname, IOError(Env::kInjectedFailureStatusMsg, EIO));
int fd;
RETRY_ON_EINTR(fd, open(fname.c_str(), O_RDONLY));
if (fd < 0) {
return IOError(fname, errno);
EncryptionHeader header;
bool encrypted = opts.is_sensitive && IsEncryptionEnabled();
if (encrypted) {
RETURN_NOT_OK(ReadEncryptionHeader(fd, fname, *server_key_, &header));
result->reset(new PosixRandomAccessFile(fname, fd,
encrypted, header));
return Status::OK();
virtual Status NewWritableFile(const string& fname,
unique_ptr<WritableFile>* result) OVERRIDE {
return NewWritableFile(WritableFileOptions(), fname, result);
virtual Status NewWritableFile(const WritableFileOptions& opts,
const string& fname,
unique_ptr<WritableFile>* result) OVERRIDE {
TRACE_EVENT1("io", "PosixEnv::NewWritableFile", "path", fname);
int fd;
RETURN_NOT_OK(DoOpen(fname, opts.mode, &fd));
return InstantiateNewWritableFile(fname, fd, opts, result);
virtual Status NewTempWritableFile(const WritableFileOptions& opts,
const string& name_template,
string* created_filename,
unique_ptr<WritableFile>* result) OVERRIDE {
TRACE_EVENT1("io", "PosixEnv::NewTempWritableFile", "template", name_template);
int fd = 0;
string tmp_filename;
RETURN_NOT_OK(MkTmpFile(name_template, &fd, &tmp_filename));
RETURN_NOT_OK(InstantiateNewWritableFile(tmp_filename, fd, opts, result));
return Status::OK();
virtual Status NewRWFile(const string& fname,
unique_ptr<RWFile>* result) OVERRIDE {
return NewRWFile(RWFileOptions(), fname, result);
virtual Status NewRWFile(const RWFileOptions& opts,
const string& fname,
unique_ptr<RWFile>* result) OVERRIDE {
TRACE_EVENT1("io", "PosixEnv::NewRWFile", "path", fname);
int fd;
bool encrypt = opts.is_sensitive && IsEncryptionEnabled();
uint64_t size = 0;
if (opts.mode == MUST_EXIST) {
RETURN_NOT_OK(GetFileSize(fname, &size));
} else if (encrypt) {
GetFileSize(fname, &size);
RETURN_NOT_OK(DoOpen(fname, opts.mode, &fd));
EncryptionHeader eh;
if (encrypt) {
if (size >= kEncryptionHeaderSize) {
RETURN_NOT_OK(ReadEncryptionHeader(fd, fname, *server_key_, &eh));
} else {
RETURN_NOT_OK(WriteEncryptionHeader(fd, fname, *server_key_, eh));
result->reset(new PosixRWFile(fname, fd, opts.sync_on_close,
encrypt, eh));
return Status::OK();
virtual Status NewTempRWFile(const RWFileOptions& opts, const string& name_template,
string* created_filename, unique_ptr<RWFile>* res) OVERRIDE {
TRACE_EVENT1("io", "PosixEnv::NewTempRWFile", "template", name_template);
int fd = 0;
RETURN_NOT_OK(MkTmpFile(name_template, &fd, created_filename));
bool encrypt = opts.is_sensitive && IsEncryptionEnabled();
EncryptionHeader eh;
if (encrypt) {
RETURN_NOT_OK(WriteEncryptionHeader(fd, *created_filename, *server_key_, eh));
res->reset(new PosixRWFile(*created_filename, fd, opts.sync_on_close,
encrypt, eh));
return Status::OK();
virtual Status NewFifo(const string& fname, unique_ptr<Fifo>* fifo) override {
TRACE_EVENT1("io", "PosixEnv::NewFifo", "path", fname);
int m = mkfifo(fname.c_str(), 0666);
if (m != 0) {
return IOError(Substitute("Error creating fifo $0", fname), errno);
fifo->reset(new PosixFifo(fname));
return Status::OK();
virtual bool FileExists(const string& fname) OVERRIDE {
TRACE_EVENT1("io", "PosixEnv::FileExists", "path", fname);
return access(fname.c_str(), F_OK) == 0;
virtual Status GetChildren(const string& dir, vector<string>* result) OVERRIDE {
TRACE_EVENT1("io", "PosixEnv::GetChildren", "path", dir);
MAYBE_RETURN_EIO(dir, IOError(Env::kInjectedFailureStatusMsg, EIO));
DIR* d = opendir(dir.c_str());
if (d == nullptr) {
return IOError(dir, errno);
struct dirent* entry;
// TODO: lint: Consider using readdir_r(...) instead of readdir(...) for improved thread safety.
while ((entry = readdir(d)) != nullptr) {
return Status::OK();
virtual Status DeleteFile(const string& fname) OVERRIDE {
TRACE_EVENT1("io", "PosixEnv::DeleteFile", "path", fname);
MAYBE_RETURN_EIO(fname, IOError(Env::kInjectedFailureStatusMsg, EIO));
Status result;
if (unlink(fname.c_str()) != 0) {
result = IOError(fname, errno);
return result;
virtual Status CreateDir(const string& name) OVERRIDE {
TRACE_EVENT1("io", "PosixEnv::CreateDir", "path", name);
MAYBE_RETURN_EIO(name, IOError(Env::kInjectedFailureStatusMsg, EIO));
Status result;
if (mkdir(name.c_str(), 0777) != 0) {
result = IOError(name, errno);
return result;
virtual Status DeleteDir(const string& name) OVERRIDE {
TRACE_EVENT1("io", "PosixEnv::DeleteDir", "path", name);
MAYBE_RETURN_EIO(name, IOError(Env::kInjectedFailureStatusMsg, EIO));
Status result;
if (rmdir(name.c_str()) != 0) {
result = IOError(name, errno);
return result;
Status GetCurrentWorkingDir(string* cwd) const override {
TRACE_EVENT0("io", "PosixEnv::GetCurrentWorkingDir");
unique_ptr<char[], FreeDeleter> wd(getcwd(nullptr, 0));
if (!wd) {
return IOError("getcwd()", errno);
MAYBE_RETURN_EIO(*cwd, IOError(Env::kInjectedFailureStatusMsg, EIO));
return Status::OK();
Status ChangeDir(const string& dest) override {
TRACE_EVENT1("io", "PosixEnv::ChangeDir", "dest", dest);
MAYBE_RETURN_EIO(dest, IOError(Env::kInjectedFailureStatusMsg, EIO));
Status result;
if (chdir(dest.c_str()) != 0) {
result = IOError(dest, errno);
return result;
virtual Status SyncDir(const string& dirname) OVERRIDE {
TRACE_EVENT1("io", "SyncDir", "path", dirname);
MAYBE_RETURN_EIO(dirname, IOError(Env::kInjectedFailureStatusMsg, EIO));
if (FLAGS_never_fsync) return Status::OK();
int dir_fd;
RETRY_ON_EINTR(dir_fd, open(dirname.c_str(), O_DIRECTORY|O_RDONLY));
if (dir_fd < 0) {
return IOError(dirname, errno);
ScopedFdCloser fd_closer(dir_fd);
if (fsync(dir_fd) != 0) {
return IOError(dirname, errno);
return Status::OK();
virtual Status DeleteRecursively(const string &name) OVERRIDE {
return Walk(
[this](FileType type, const string& dirname, const string& basename) {
return this->DeleteRecursivelyCb(type, dirname, basename);
virtual Status GetFileSize(const string& fname, uint64_t* size) override {
TRACE_EVENT1("io", "PosixEnv::GetFileSize", "path", fname);
MAYBE_RETURN_EIO(fname, IOError(Env::kInjectedFailureStatusMsg, EIO));
Status s;
struct stat sbuf;
if (stat(fname.c_str(), &sbuf) != 0) {
s = IOError(fname, errno);
} else {
*size = sbuf.st_size;
return s;
virtual Status GetFileSizeOnDisk(const string& fname, uint64_t* size) OVERRIDE {
TRACE_EVENT1("io", "PosixEnv::GetFileSizeOnDisk", "path", fname);
MAYBE_RETURN_EIO(fname, IOError(Env::kInjectedFailureStatusMsg, EIO));
Status s;
struct stat sbuf;
if (stat(fname.c_str(), &sbuf) != 0) {
s = IOError(fname, errno);
} else {
// From stat(2):
// The st_blocks field indicates the number of blocks allocated to
// the file, 512-byte units. (This may be smaller than st_size/512
// when the file has holes.)
*size = sbuf.st_blocks * 512;
return s;
virtual Status GetFileSizeOnDiskRecursively(const string& root,
uint64_t* bytes_used) OVERRIDE {
TRACE_EVENT1("io", "PosixEnv::GetFileSizeOnDiskRecursively", "path", root);
uint64_t total = 0;
root, PRE_ORDER,
[this, &total](FileType type, const string& dirname, const string& basename) {
return this->GetFileSizeOnDiskRecursivelyCb(&total, type, dirname, basename);
*bytes_used = total;
return Status::OK();
virtual Status GetBlockSize(const string& fname, uint64_t* block_size) OVERRIDE {
TRACE_EVENT1("io", "PosixEnv::GetBlockSize", "path", fname);
MAYBE_RETURN_EIO(fname, IOError(Env::kInjectedFailureStatusMsg, EIO));
Status s;
struct stat sbuf;
if (stat(fname.c_str(), &sbuf) != 0) {
s = IOError(fname, errno);
} else {
*block_size = sbuf.st_blksize;
return s;
virtual Status GetFileModifiedTime(const string& fname, int64_t* timestamp) override {
TRACE_EVENT1("io", "PosixEnv::GetFileModifiedTime", "fname", fname);
MAYBE_RETURN_EIO(fname, IOError(Env::kInjectedFailureStatusMsg, EIO));
struct stat s;
if (stat(fname.c_str(), &s) != 0) {
return IOError(fname, errno);
#ifdef __APPLE__
*timestamp = s.st_mtimespec.tv_sec * 1000000 + s.st_mtimespec.tv_nsec / 1000;
*timestamp = s.st_mtim.tv_sec * 1000000 + s.st_mtim.tv_nsec / 1000;
return Status::OK();
// Local convenience function for safely running statvfs().
static Status StatVfs(const string& path, struct statvfs* buf) {
MAYBE_RETURN_EIO(path, IOError(Env::kInjectedFailureStatusMsg, EIO));
int ret;
RETRY_ON_EINTR(ret, statvfs(path.c_str(), buf));
if (ret == -1) {
return IOError(Substitute("statvfs: $0", path), errno);
return Status::OK();
virtual Status GetSpaceInfo(const string& path, SpaceInfo* space_info) OVERRIDE {
TRACE_EVENT1("io", "PosixEnv::GetSpaceInfo", "path", path);
struct statvfs buf;
RETURN_NOT_OK(StatVfs(path, &buf));
space_info->capacity_bytes = buf.f_frsize * buf.f_blocks;
space_info->free_bytes = buf.f_frsize * buf.f_bavail;
space_info->filesystem_id = buf.f_fsid;
return Status::OK();
virtual Status RenameFile(const string& src, const string& target) OVERRIDE {
TRACE_EVENT2("io", "PosixEnv::RenameFile", "src", src, "dst", target);
MAYBE_RETURN_EIO(src, IOError(Env::kInjectedFailureStatusMsg, EIO));
MAYBE_RETURN_EIO(target, IOError(Env::kInjectedFailureStatusMsg, EIO));
Status result;
if (rename(src.c_str(), target.c_str()) != 0) {
result = IOError(src, errno);
return result;
virtual Status LockFile(const string& fname, FileLock** lock) OVERRIDE {
TRACE_EVENT1("io", "PosixEnv::LockFile", "path", fname);
MAYBE_RETURN_EIO(fname, IOError(Env::kInjectedFailureStatusMsg, EIO));
if (ShouldInject(fname, FLAGS_env_inject_lock_failure_globs)) {
return IOError("lock " + fname, EAGAIN);
*lock = nullptr;
Status result;
int fd;
RETRY_ON_EINTR(fd, open(fname.c_str(), O_RDWR | O_CREAT, 0666));
if (fd < 0) {
result = IOError(fname, errno);
} else if (LockOrUnlock(fd, true) == -1) {
result = IOError("lock " + fname, errno);
} else {
auto my_lock = new PosixFileLock;
my_lock->fd_ = fd;
*lock = my_lock;
return result;
virtual Status UnlockFile(FileLock* lock) OVERRIDE {
TRACE_EVENT0("io", "PosixEnv::UnlockFile");
unique_ptr<PosixFileLock> my_lock(reinterpret_cast<PosixFileLock*>(lock));
Status result;
if (LockOrUnlock(my_lock->fd_, false) == -1) {
result = IOError("unlock", errno);
return result;
virtual Status GetTestDirectory(string* result) OVERRIDE {
string dir;
const char* env = getenv("TEST_TMPDIR");
if (env && env[0] != '\0') {
dir = env;
} else {
char buf[100];
snprintf(buf, sizeof(buf), "/tmp/kudutest-%d", static_cast<int>(geteuid()));
dir = buf;
// Directory may already exist
// /tmp may be a symlink, so canonicalize the path.
return Canonicalize(dir, result);
virtual uint64_t gettid() OVERRIDE {
// Platform-independent thread ID. We can't use pthread_self here,
// because that function returns a totally opaque ID, which can't be
// compared via normal means.
if (thread_local_id == 0) {
thread_local_id = Barrier_AtomicIncrement(&cur_thread_local_id_, 1);
return thread_local_id;
virtual uint64_t NowMicros() OVERRIDE {
struct timeval tv;
gettimeofday(&tv, nullptr);
return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
virtual void SleepForMicroseconds(int micros) OVERRIDE {
virtual Status GetExecutablePath(string* path) OVERRIDE {
MAYBE_RETURN_EIO("/proc/self/exe", IOError(Env::kInjectedFailureStatusMsg, EIO));
uint32_t size = 64;
uint32_t len = 0;
while (true) {
unique_ptr<char[]> buf(new char[size]);
#if defined(__linux__)
int rc = readlink("/proc/self/exe", buf.get(), size);
if (rc == -1) {
return IOError("Unable to determine own executable path", errno);
} else if (rc >= size) {
// The buffer wasn't large enough
size *= 2;
len = rc;
#elif defined(__APPLE__)
if (_NSGetExecutablePath(buf.get(), &size) != 0) {
// The buffer wasn't large enough; 'size' has been updated.
len = strlen(buf.get());
#error Unsupported platform
path->assign(buf.get(), len);
return Status::OK();
virtual Status IsDirectory(const string& path, bool* is_dir) OVERRIDE {
TRACE_EVENT1("io", "PosixEnv::IsDirectory", "path", path);
MAYBE_RETURN_EIO(path, IOError(Env::kInjectedFailureStatusMsg, EIO));
Status s;
struct stat sbuf;
if (stat(path.c_str(), &sbuf) != 0) {
s = IOError(path, errno);
} else {
*is_dir = S_ISDIR(sbuf.st_mode);
return s;
virtual Status Walk(const string& root, DirectoryOrder order, const WalkCallback& cb) OVERRIDE {
TRACE_EVENT1("io", "PosixEnv::Walk", "path", root);
MAYBE_RETURN_EIO(root, IOError(Env::kInjectedFailureStatusMsg, EIO));
// Some sanity checks
CHECK_NE(root, "/");
CHECK_NE(root, "./");
CHECK_NE(root, ".");
CHECK_NE(root, "");
// FTS requires a non-const copy of the name. strdup it and free() when
// we leave scope.
unique_ptr<char[], FreeDeleter> name_dup(strdup(root.c_str()));
char* paths[] = { name_dup.get(), nullptr };
// FTS_NOCHDIR is important here to make this thread-safe.
FTS* ret;
paths, FTS_PHYSICAL | FTS_XDEV | FTS_NOCHDIR, nullptr));
if (ret == nullptr) {
return IOError(root, errno);
unique_ptr<FTS, FtsCloser> tree(ret);
FTSENT* ent = nullptr;
bool had_errors = false;
while ((ent = fts_read(tree.get())) != nullptr) {
bool do_cb = false;
FileType type = DIRECTORY_TYPE;
switch (ent->fts_info) {
case FTS_D: // Directory in pre-order
if (order == PRE_ORDER) {
do_cb = true;
case FTS_DP: // Directory in post-order
if (order == POST_ORDER) {
do_cb = true;
case FTS_F: // A regular file
case FTS_SL: // A symbolic link
case FTS_SLNONE: // A broken symbolic link
case FTS_DEFAULT: // Unknown type of file
do_cb = true;
type = FILE_TYPE;
case FTS_DNR:
case FTS_ERR:
case FTS_NS:
LOG(WARNING) << "Unable to access file " << ent->fts_path
<< " during walk: " << strerror(ent->fts_errno);
LOG(WARNING) << Substitute("Unable to access file $0 during walk: $1",
ent->fts_path, strerror(ent->fts_errno));
had_errors = true;
LOG(WARNING) << Substitute("Unable to access file $0 during walk (code $1)",
ent->fts_path, ent->fts_info);
if (do_cb) {
Status s = cb(type, DirName(ent->fts_path), ent->fts_name);
if (PREDICT_FALSE(!s.ok())) {
LOG(WARNING) << Substitute("Error running callback with file $0 during walk: $1",
ent->fts_path, s.ToString());
had_errors = true;
if (had_errors) {
return Status::IOError(root, "One or more errors occurred");
return Status::OK();
Status Glob(const string& path_pattern, vector<string>* paths) override {
TRACE_EVENT1("io", "PosixEnv::Glob", "path_pattern", path_pattern);
glob_t result;
auto cleanup = MakeScopedCleanup([&] { globfree(&result); });
errno = 0;
int ret = glob(path_pattern.c_str(), GLOB_TILDE | GLOB_ERR , NULL, &result);
switch (ret) {
case 0: break;
case GLOB_NOMATCH: return Status::OK();
case GLOB_NOSPACE: return Status::RuntimeError("glob out of memory");
default: {
string err = (errno != 0) ? ErrnoToString(errno) : "unknown error";
return Status::IOError(Substitute("glob failed for $0: $1",
for (size_t i = 0; i < result.gl_pathc; ++i) {
return Status::OK();
virtual Status Canonicalize(const string& path, string* result) OVERRIDE {
TRACE_EVENT1("io", "PosixEnv::Canonicalize", "path", path);
MAYBE_RETURN_EIO(path, IOError(Env::kInjectedFailureStatusMsg, EIO));
unique_ptr<char[], FreeDeleter> r(realpath(path.c_str(), nullptr));
if (!r) {
return IOError(Substitute("Unable to canonicalize $0", path), errno);
*result = string(r.get());
return Status::OK();
virtual Status GetTotalRAMBytes(int64_t* ram) OVERRIDE {
#if defined(__APPLE__)
int mib[2];
size_t length = sizeof(*ram);
// Get the Physical memory size
mib[0] = CTL_HW;
mib[1] = HW_MEMSIZE;
CHECK_ERR(sysctl(mib, 2, ram, &length, nullptr, 0)) << "sysctl CTL_HW HW_MEMSIZE failed";
struct sysinfo info;
if (sysinfo(&info) < 0) {
return IOError("sysinfo() failed", errno);
*ram = info.totalram;
return Status::OK();
virtual uint64_t GetResourceLimit(ResourceLimitType t) OVERRIDE {
static_assert(std::is_unsigned<rlim_t>::value, "rlim_t must be unsigned");
static_assert(RLIM_INFINITY > 0, "RLIM_INFINITY must be positive");
// There's no reason for this to ever fail.
struct rlimit l;
PCHECK(getrlimit(ResourceLimitTypeToUnixRlimit(t), &l) == 0);
return l.rlim_cur;
virtual void IncreaseResourceLimit(ResourceLimitType t) OVERRIDE {
// There's no reason for this to ever fail; any process should have
// sufficient privilege to increase its soft limit up to the hard limit.
// This change is logged because it is process-wide.
int rlimit_type = ResourceLimitTypeToUnixRlimit(t);
struct rlimit l;
PCHECK(getrlimit(rlimit_type, &l) == 0);
#if defined(__APPLE__)
// OS X 10.11 can return RLIM_INFINITY from getrlimit, but allows rlim_cur and
// rlim_max to be raised only as high as the value of the maxfilesperproc
// kernel variable. Empirically, this value is 10240 across all tested macOS
// versions. Testing on OS X 10.10 and macOS 10.12 revealed that getrlimit
// returns the true limits (not RLIM_INFINITY), rlim_max can *not* be raised
// (when running as non-root), and rlim_cur can only be raised as high as
// rlim_max (this is consistent with Linux).
// TLDR; OS X 10.11 is whack.
if (l.rlim_max == RLIM_INFINITY) {
uint32_t limit;
size_t len = sizeof(limit);
PCHECK(sysctlbyname(ResourceLimitTypeToMacosRlimit(t), &limit, &len,
nullptr, 0) == 0);
// Make sure no uninitialized bits are present in the result.
DCHECK_EQ(sizeof(limit), len);
l.rlim_max = limit;
const char* rlimit_str = ResourceLimitTypeToString(t);
if (l.rlim_cur < l.rlim_max) {
LOG(INFO) << Substitute("Raising this process' $0 limit from $1 to $2",
rlimit_str, l.rlim_cur, l.rlim_max);
l.rlim_cur = l.rlim_max;
PCHECK(setrlimit(rlimit_type, &l) == 0);
} else {
LOG(INFO) << Substitute("Not raising this process' $0 limit of $1; it "
"is already as high as it can go", rlimit_str, l.rlim_cur);
virtual Status IsOnExtFilesystem(const string& path, bool* result) OVERRIDE {
TRACE_EVENT1("io", "PosixEnv::IsOnExtFilesystem", "path", path);
MAYBE_RETURN_EIO(path, IOError(Env::kInjectedFailureStatusMsg, EIO));
#ifdef __APPLE__
*result = false;
struct statfs buf;
int ret;
RETRY_ON_EINTR(ret, statfs(path.c_str(), &buf));
if (ret == -1) {
return IOError(Substitute("statfs: $0", path), errno);
*result = (buf.f_type == EXT4_SUPER_MAGIC);
return Status::OK();
virtual Status IsOnXfsFilesystem(const string& path, bool* result) OVERRIDE {
TRACE_EVENT1("io", "PosixEnv::IsOnXfsFilesystem", "path", path);
MAYBE_RETURN_EIO(path, IOError(Env::kInjectedFailureStatusMsg, EIO));
return DoIsOnXfsFilesystem(path, result);
virtual string GetKernelRelease() OVERRIDE {
// There's no reason for this to ever fail.
struct utsname u;
PCHECK(uname(&u) == 0);
return string(u.release);
Status EnsureFileModeAdheresToUmask(const string& path) override {
MAYBE_RETURN_EIO(path, IOError(Env::kInjectedFailureStatusMsg, EIO));
struct stat s;
if (stat(path.c_str(), &s) != 0) {
return IOError("stat", errno);
CHECK_NE(g_parsed_umask, -1);
if (s.st_mode & g_parsed_umask) {
uint32_t old_perms = s.st_mode & ACCESSPERMS;
uint32_t new_perms = old_perms & ~g_parsed_umask;
LOG(WARNING) << "Path " << path << " has permissions "
<< StringPrintf("%03o", old_perms)
<< " which are less restrictive than current umask value "
<< StringPrintf("%03o", g_parsed_umask)
<< ": resetting permissions to "
<< StringPrintf("%03o", new_perms);
if (chmod(path.c_str(), new_perms) != 0) {
return IOError("chmod", errno);
return Status::OK();
Status IsFileWorldReadable(const string& path, bool* result) override {
TRACE_EVENT1("io", "PosixEnv::IsFileWorldReadable", "path", path);
MAYBE_RETURN_EIO(path, IOError(Env::kInjectedFailureStatusMsg, EIO));
struct stat s;
if (stat(path.c_str(), &s) != 0) {
return IOError("stat", errno);
*result = (s.st_mode & S_IROTH) != 0;
return Status::OK();
virtual Status CreateSymLink(const string& src, const string& dst) override {
TRACE_EVENT2("io", "PosixEnv::CreateSymLink", "src", src, "dst", dst);
MAYBE_RETURN_EIO(dst, IOError(Env::kInjectedFailureStatusMsg, EIO));
Status result;
if (symlink(src.c_str(), dst.c_str()) != 0) {
result = IOError(dst, errno);
return result;
bool IsEncryptionEnabled() const override { return FLAGS_encrypt_data_at_rest; }
void SetEncryptionKey(const uint8_t* server_key, size_t key_size) override {
EncryptionHeader eh;
switch (key_size) {
case 128:
eh.algorithm = EncryptionAlgorithm::AES128ECB;
case 192:
eh.algorithm = EncryptionAlgorithm::AES192ECB;
case 256:
eh.algorithm = EncryptionAlgorithm::AES256ECB;
LOG(FATAL) << "Illegal key size: " << key_size;
memcpy(eh.key, server_key, key_size / 8);
server_key_ = eh;
// unique_ptr Deleter implementation for fts_close
struct FtsCloser {
void operator()(FTS *fts) const {
if (fts) {
int err;
RETRY_ON_EINTR(err, fts_close(fts));
if (PREDICT_FALSE(err != 0)) {
PLOG(WARNING) << "Failed to close fts";
Status MkTmpFile(const string& name_template, int* fd, string* created_filename) {
unique_ptr<char[]> fname(new char[name_template.size() + 1]);
::snprintf(fname.get(), name_template.size() + 1, "%s", name_template.c_str());
MAYBE_RETURN_EIO(fname.get(), IOError(Env::kInjectedFailureStatusMsg, EIO));
int created_fd = mkstemp(fname.get());
if (created_fd < 0) {
return IOError(Substitute("Call to mkstemp() failed on name template $0", name_template),
// mkstemp defaults to making files with permissions 0600. But, if the
// user configured a more permissive umask, then we ensure that the
// resulting file gets the desired (wider) permissions.
uint32_t new_perms = 0666 & ~g_parsed_umask;
if (new_perms != 0600) {
CHECK_ERR(fchmod(created_fd, new_perms));
*fd = created_fd;
*created_filename = fname.get();
return Status::OK();
Status InstantiateNewWritableFile(const string& fname,
int fd,
const WritableFileOptions& opts,
unique_ptr<WritableFile>* result) {
uint64_t file_size = 0;
Status s = GetFileSize(fname, &file_size);
if (opts.mode == MUST_EXIST) {
bool encrypt = opts.is_sensitive && IsEncryptionEnabled();
EncryptionHeader eh;
if (encrypt) {
if (file_size < kEncryptionHeaderSize) {
RETURN_NOT_OK(WriteEncryptionHeader(fd, fname, *server_key_, eh));
file_size = kEncryptionHeaderSize;
} else {
RETURN_NOT_OK(ReadEncryptionHeader(fd, fname, *server_key_, &eh));
result->reset(new PosixWritableFile(fname, fd, file_size, opts.sync_on_close,
encrypt, eh));
return Status::OK();
Status DeleteRecursivelyCb(FileType type, const string& dirname, const string& basename) {
string full_path = JoinPathSegments(dirname, basename);
Status s;
switch (type) {
s = DeleteFile(full_path);
WARN_NOT_OK(s, "Could not delete file");
return s;
s = DeleteDir(full_path);
WARN_NOT_OK(s, "Could not delete directory");
return s;
LOG(FATAL) << "Unknown file type: " << type;
return Status::OK();
size_t GetEncryptionHeaderSize() const override {
return IsEncryptionEnabled() ? kEncryptionHeaderSize : 0;
Status GetFileSizeOnDiskRecursivelyCb(uint64_t* bytes_used,
FileType type,
const string& dirname,
const string& basename) {
uint64_t file_bytes_used = 0;
switch (type) {
case Env::FILE_TYPE:
JoinPathSegments(dirname, basename), &file_bytes_used));
*bytes_used += file_bytes_used;
// Ignore directory space consumption as it varies from filesystem to
// filesystem.
LOG(FATAL) << "Unknown file type: " << type;
return Status::OK();
std::optional<EncryptionHeader> server_key_;
} // namespace
static pthread_once_t once = PTHREAD_ONCE_INIT;
static void InitDefaultEnv() { default_env = new PosixEnv; }
Env* Env::Default() {
pthread_once(&once, InitDefaultEnv);
return default_env;
unique_ptr<Env> Env::NewEnv() {
return unique_ptr<Env>(new PosixEnv());
std::ostream& operator<<(std::ostream& o, Env::ResourceLimitType t) {
return o << ResourceLimitTypeToString(t);
} // namespace kudu