blob: 677a9af66174891563e571d15f053b082043cbe7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/fs/file_block_manager.h"
#include <deque>
#include <string>
#include <unordered_set>
#include <vector>
#include "kudu/fs/block_manager_metrics.h"
#include "kudu/fs/block_manager_util.h"
#include "kudu/fs/fs.pb.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/atomic.h"
#include "kudu/util/env.h"
#include "kudu/util/env_util.h"
#include "kudu/util/malloc.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/metrics.h"
#include "kudu/util/oid_generator.h"
#include "kudu/util/path_util.h"
#include "kudu/util/random_util.h"
#include "kudu/util/status.h"
using kudu::env_util::ScopedFileDeleter;
using std::shared_ptr;
using std::string;
using std::unordered_set;
using std::vector;
using strings::Substitute;
DECLARE_bool(enable_data_block_fsync);
DECLARE_bool(block_manager_lock_dirs);
namespace kudu {
namespace fs {
namespace internal {
////////////////////////////////////////////////////////////
// FileBlockLocation
////////////////////////////////////////////////////////////
// Logical location of a block in the file block manager.
//
// A block ID uniquely locates a block. Every ID is a uint64_t, broken down
// into multiple logical components:
// 1. Bytes 0 (MSB) and 1 identify the block's root path by path set index. See
// fs.proto for more details on path sets.
// 2. Bytes 2-7 (LSB) uniquely identify the block within the root path. As more
// and more blocks are created in a root path, the likelihood of a collision
// becomes greater. In the event of a collision, the block manager will
// retry(see CreateBlock()).
//
// A FileBlockLocation abstracts away these details so that clients need not
// worry about them. It is constructed via FromParts() or FromBlockId() and is
// copyable and assignable.
class FileBlockLocation {
public:
// Empty constructor
FileBlockLocation() {
}
// Construct a location from its constituent parts.
static FileBlockLocation FromParts(const string& root_path,
uint16_t root_path_idx,
const BlockId& block_id);
// Construct a location from a full block ID.
static FileBlockLocation FromBlockId(const string& root_path,
const BlockId& block_id);
// Get the root path index of a given block ID.
static uint16_t GetRootPathIdx(const BlockId& block_id) {
return block_id.id() >> 48;
}
// Returns the full filesystem path for this location.
string GetFullPath() const;
// Create all subdirectories needed for this location.
//
// On success, 'created_dirs' contains the directories that were actually
// created (as opposed to those that were reused).
Status CreateBlockDir(Env* env, vector<string>* created_dirs);
// Writes all parent directories that are part of this location to
// 'parent_dirs'.
//
// The directories are written in "fsync order"; that is, the order in
// which they should be fsynced to make them durable.
void GetAllParentDirs(vector<string>* parent_dirs) const;
// Simple accessors.
const string& root_path() const { return root_path_; }
const BlockId& block_id() const { return block_id_; }
private:
FileBlockLocation(string root_path, BlockId block_id)
: root_path_(std::move(root_path)), block_id_(std::move(block_id)) {}
// These per-byte accessors yield subdirectories in which blocks are grouped.
string byte2() const {
return StringPrintf("%02llx",
(block_id_.id() & 0x0000FF0000000000ULL) >> 40);
}
string byte3() const {
return StringPrintf("%02llx",
(block_id_.id() & 0x000000FF00000000ULL) >> 32);
}
string byte4() const {
return StringPrintf("%02llx",
(block_id_.id() & 0x00000000FF000000ULL) >> 24);
}
string root_path_;
BlockId block_id_;
};
FileBlockLocation FileBlockLocation::FromParts(const string& root_path,
uint16_t root_path_idx,
const BlockId& block_id) {
// The combined ID consists of 'root_path_idx' (top 2 bytes) and 'block_id'
// (bottom 6 bytes). The top 2 bytes of 'block_id' are dropped.
uint64_t combined_id = static_cast<uint64_t>(root_path_idx) << 48;
combined_id |= block_id.id() & ((1ULL << 48) - 1);
return FileBlockLocation(root_path, BlockId(combined_id));
}
FileBlockLocation FileBlockLocation::FromBlockId(const string& root_path,
const BlockId& block_id) {
return FileBlockLocation(root_path, block_id);
}
string FileBlockLocation::GetFullPath() const {
string p = root_path_;
p = JoinPathSegments(p, byte2());
p = JoinPathSegments(p, byte3());
p = JoinPathSegments(p, byte4());
p = JoinPathSegments(p, block_id_.ToString());
return p;
}
Status FileBlockLocation::CreateBlockDir(Env* env,
vector<string>* created_dirs) {
DCHECK(env->FileExists(root_path_));
bool path0_created;
string path0 = JoinPathSegments(root_path_, byte2());
RETURN_NOT_OK(env_util::CreateDirIfMissing(env, path0, &path0_created));
bool path1_created;
string path1 = JoinPathSegments(path0, byte3());
RETURN_NOT_OK(env_util::CreateDirIfMissing(env, path1, &path1_created));
bool path2_created;
string path2 = JoinPathSegments(path1, byte4());
RETURN_NOT_OK(env_util::CreateDirIfMissing(env, path2, &path2_created));
if (path2_created) {
created_dirs->push_back(path1);
}
if (path1_created) {
created_dirs->push_back(path0);
}
if (path0_created) {
created_dirs->push_back(root_path_);
}
return Status::OK();
}
void FileBlockLocation::GetAllParentDirs(vector<string>* parent_dirs) const {
string path0 = JoinPathSegments(root_path_, byte2());
string path1 = JoinPathSegments(path0, byte3());
string path2 = JoinPathSegments(path1, byte4());
// This is the order in which the parent directories should be
// synchronized to disk.
parent_dirs->push_back(path2);
parent_dirs->push_back(path1);
parent_dirs->push_back(path0);
parent_dirs->push_back(root_path_);
}
////////////////////////////////////////////////////////////
// FileWritableBlock
////////////////////////////////////////////////////////////
// A file-backed block that has been opened for writing.
//
// Contains a pointer to the block manager as well as a FileBlockLocation
// so that dirty metadata can be synced via BlockManager::SyncMetadata()
// at Close() time. Embedding a FileBlockLocation (and not a simpler
// BlockId) consumes more memory, but the number of outstanding
// FileWritableBlock instances is expected to be low.
class FileWritableBlock : public WritableBlock {
public:
FileWritableBlock(FileBlockManager* block_manager, FileBlockLocation location,
shared_ptr<WritableFile> writer);
virtual ~FileWritableBlock();
virtual Status Close() OVERRIDE;
virtual Status Abort() OVERRIDE;
virtual BlockManager* block_manager() const OVERRIDE;
virtual const BlockId& id() const OVERRIDE;
virtual Status Append(const Slice& data) OVERRIDE;
virtual Status FlushDataAsync() OVERRIDE;
virtual size_t BytesAppended() const OVERRIDE;
virtual State state() const OVERRIDE;
private:
enum SyncMode {
SYNC,
NO_SYNC
};
// Close the block, optionally synchronizing dirty data and metadata.
Status Close(SyncMode mode);
// Back pointer to the block manager.
//
// Should remain alive for the lifetime of this block.
FileBlockManager* block_manager_;
// The block's location.
const FileBlockLocation location_;
// The underlying opened file backing this block.
shared_ptr<WritableFile> writer_;
State state_;
// The number of bytes successfully appended to the block.
size_t bytes_appended_;
DISALLOW_COPY_AND_ASSIGN(FileWritableBlock);
};
FileWritableBlock::FileWritableBlock(FileBlockManager* block_manager,
FileBlockLocation location,
shared_ptr<WritableFile> writer)
: block_manager_(block_manager),
location_(std::move(location)),
writer_(std::move(writer)),
state_(CLEAN),
bytes_appended_(0) {
if (block_manager_->metrics_) {
block_manager_->metrics_->blocks_open_writing->Increment();
block_manager_->metrics_->total_writable_blocks->Increment();
}
}
FileWritableBlock::~FileWritableBlock() {
if (state_ != CLOSED) {
WARN_NOT_OK(Abort(), Substitute("Failed to close block $0",
id().ToString()));
}
}
Status FileWritableBlock::Close() {
return Close(SYNC);
}
Status FileWritableBlock::Abort() {
RETURN_NOT_OK(Close(NO_SYNC));
return block_manager()->DeleteBlock(id());
}
BlockManager* FileWritableBlock::block_manager() const {
return block_manager_;
}
const BlockId& FileWritableBlock::id() const {
return location_.block_id();
}
Status FileWritableBlock::Append(const Slice& data) {
DCHECK(state_ == CLEAN || state_ == DIRTY)
<< "Invalid state: " << state_;
RETURN_NOT_OK(writer_->Append(data));
state_ = DIRTY;
bytes_appended_ += data.size();
return Status::OK();
}
Status FileWritableBlock::FlushDataAsync() {
DCHECK(state_ == CLEAN || state_ == DIRTY || state_ == FLUSHING)
<< "Invalid state: " << state_;
if (state_ == DIRTY) {
VLOG(3) << "Flushing block " << id();
RETURN_NOT_OK(writer_->Flush(WritableFile::FLUSH_ASYNC));
}
state_ = FLUSHING;
return Status::OK();
}
size_t FileWritableBlock::BytesAppended() const {
return bytes_appended_;
}
WritableBlock::State FileWritableBlock::state() const {
return state_;
}
Status FileWritableBlock::Close(SyncMode mode) {
if (state_ == CLOSED) {
return Status::OK();
}
Status sync;
if (mode == SYNC &&
(state_ == CLEAN || state_ == DIRTY || state_ == FLUSHING)) {
// Safer to synchronize data first, then metadata.
VLOG(3) << "Syncing block " << id();
if (FLAGS_enable_data_block_fsync) {
sync = writer_->Sync();
}
if (sync.ok()) {
sync = block_manager_->SyncMetadata(location_);
}
WARN_NOT_OK(sync, Substitute("Failed to sync when closing block $0",
id().ToString()));
}
Status close = writer_->Close();
state_ = CLOSED;
writer_.reset();
if (block_manager_->metrics_) {
block_manager_->metrics_->blocks_open_writing->Decrement();
block_manager_->metrics_->total_bytes_written->IncrementBy(BytesAppended());
}
// Prefer the result of Close() to that of Sync().
return !close.ok() ? close : sync;
}
////////////////////////////////////////////////////////////
// FileReadableBlock
////////////////////////////////////////////////////////////
// A file-backed block that has been opened for reading.
//
// There may be millions of instances of FileReadableBlock outstanding, so
// great care must be taken to reduce its size. To that end, it does _not_
// embed a FileBlockLocation, using the simpler BlockId instead.
class FileReadableBlock : public ReadableBlock {
public:
FileReadableBlock(const FileBlockManager* block_manager, BlockId block_id,
shared_ptr<RandomAccessFile> reader);
virtual ~FileReadableBlock();
virtual Status Close() OVERRIDE;
virtual const BlockId& id() const OVERRIDE;
virtual Status Size(uint64_t* sz) const OVERRIDE;
virtual Status Read(uint64_t offset, size_t length,
Slice* result, uint8_t* scratch) const OVERRIDE;
virtual size_t memory_footprint() const OVERRIDE;
private:
// Back pointer to the owning block manager.
const FileBlockManager* block_manager_;
// The block's identifier.
const BlockId block_id_;
// The underlying opened file backing this block.
shared_ptr<RandomAccessFile> reader_;
// Whether or not this block has been closed. Close() is thread-safe, so
// this must be an atomic primitive.
AtomicBool closed_;
DISALLOW_COPY_AND_ASSIGN(FileReadableBlock);
};
FileReadableBlock::FileReadableBlock(const FileBlockManager* block_manager,
BlockId block_id,
shared_ptr<RandomAccessFile> reader)
: block_manager_(block_manager),
block_id_(std::move(block_id)),
reader_(std::move(reader)),
closed_(false) {
if (block_manager_->metrics_) {
block_manager_->metrics_->blocks_open_reading->Increment();
block_manager_->metrics_->total_readable_blocks->Increment();
}
}
FileReadableBlock::~FileReadableBlock() {
WARN_NOT_OK(Close(), Substitute("Failed to close block $0",
id().ToString()));
}
Status FileReadableBlock::Close() {
if (closed_.CompareAndSet(false, true)) {
reader_.reset();
if (block_manager_->metrics_) {
block_manager_->metrics_->blocks_open_reading->Decrement();
}
}
return Status::OK();
}
const BlockId& FileReadableBlock::id() const {
return block_id_;
}
Status FileReadableBlock::Size(uint64_t* sz) const {
DCHECK(!closed_.Load());
return reader_->Size(sz);
}
Status FileReadableBlock::Read(uint64_t offset, size_t length,
Slice* result, uint8_t* scratch) const {
DCHECK(!closed_.Load());
RETURN_NOT_OK(env_util::ReadFully(reader_.get(), offset, length, result, scratch));
if (block_manager_->metrics_) {
block_manager_->metrics_->total_bytes_read->IncrementBy(length);
}
return Status::OK();
}
size_t FileReadableBlock::memory_footprint() const {
DCHECK(reader_);
return kudu_malloc_usable_size(this) + reader_->memory_footprint();
}
} // namespace internal
////////////////////////////////////////////////////////////
// FileBlockManager
////////////////////////////////////////////////////////////
static const char* kBlockManagerType = "file";
static const int kMaxPaths = (1 << 16) - 1;
Status FileBlockManager::SyncMetadata(const internal::FileBlockLocation& location) {
vector<string> parent_dirs;
location.GetAllParentDirs(&parent_dirs);
// Figure out what directories to sync.
vector<string> to_sync;
{
lock_guard<simple_spinlock> l(&lock_);
for (const string& parent_dir : parent_dirs) {
if (dirty_dirs_.erase(parent_dir)) {
to_sync.push_back(parent_dir);
}
}
}
// Sync them.
if (FLAGS_enable_data_block_fsync) {
for (const string& s : to_sync) {
RETURN_NOT_OK(env_->SyncDir(s));
}
}
return Status::OK();
}
bool FileBlockManager::FindBlockPath(const BlockId& block_id,
string* path) const {
PathInstanceMetadataFile* metadata_file = FindPtrOrNull(
root_paths_by_idx_, internal::FileBlockLocation::GetRootPathIdx(block_id));
if (metadata_file) {
*path = internal::FileBlockLocation::FromBlockId(
metadata_file->path(), block_id).GetFullPath();
}
return metadata_file != nullptr;
}
FileBlockManager::FileBlockManager(Env* env, const BlockManagerOptions& opts)
: env_(DCHECK_NOTNULL(env)),
read_only_(opts.read_only),
root_paths_(opts.root_paths),
rand_(GetRandomSeed32()),
mem_tracker_(MemTracker::CreateTracker(-1,
"file_block_manager",
opts.parent_mem_tracker)) {
DCHECK_GT(root_paths_.size(), 0);
if (opts.metric_entity) {
metrics_.reset(new internal::BlockManagerMetrics(opts.metric_entity));
}
}
FileBlockManager::~FileBlockManager() {
STLDeleteValues(&root_paths_by_idx_);
mem_tracker_->UnregisterFromParent();
}
Status FileBlockManager::Create() {
CHECK(!read_only_);
deque<ScopedFileDeleter*> delete_on_failure;
ElementDeleter d(&delete_on_failure);
if (root_paths_.size() > kMaxPaths) {
return Status::NotSupported(
Substitute("File block manager supports a maximum of $0 paths", kMaxPaths));
}
// The UUIDs and indices will be included in every instance file.
ObjectIdGenerator oid_generator;
vector<string> all_uuids(root_paths_.size());
for (string& u : all_uuids) {
u = oid_generator.Next();
}
int idx = 0;
// Ensure the data paths exist and create the instance files.
unordered_set<string> to_sync;
for (const string& root_path : root_paths_) {
bool created;
RETURN_NOT_OK_PREPEND(env_util::CreateDirIfMissing(env_, root_path, &created),
Substitute("Could not create directory $0", root_path));
if (created) {
delete_on_failure.push_front(new ScopedFileDeleter(env_, root_path));
to_sync.insert(DirName(root_path));
}
string instance_filename = JoinPathSegments(
root_path, kInstanceMetadataFileName);
PathInstanceMetadataFile metadata(env_, kBlockManagerType,
instance_filename);
RETURN_NOT_OK_PREPEND(metadata.Create(all_uuids[idx], all_uuids),
Substitute("Could not create $0", instance_filename));
delete_on_failure.push_front(new ScopedFileDeleter(env_, instance_filename));
idx++;
}
// Ensure newly created directories are synchronized to disk.
if (FLAGS_enable_data_block_fsync) {
for (const string& dir : to_sync) {
RETURN_NOT_OK_PREPEND(env_->SyncDir(dir),
Substitute("Unable to synchronize directory $0", dir));
}
}
// Success: don't delete any files.
for (ScopedFileDeleter* deleter : delete_on_failure) {
deleter->Cancel();
}
return Status::OK();
}
Status FileBlockManager::Open() {
vector<PathInstanceMetadataFile*> instances;
ElementDeleter deleter(&instances);
instances.reserve(root_paths_.size());
for (const string& root_path : root_paths_) {
if (!env_->FileExists(root_path)) {
return Status::NotFound(Substitute(
"FileBlockManager at $0 not found", root_path));
}
string instance_filename = JoinPathSegments(
root_path, kInstanceMetadataFileName);
gscoped_ptr<PathInstanceMetadataFile> metadata(
new PathInstanceMetadataFile(env_, kBlockManagerType,
instance_filename));
RETURN_NOT_OK_PREPEND(metadata->LoadFromDisk(),
Substitute("Could not open $0", instance_filename));
if (FLAGS_block_manager_lock_dirs) {
Status s = metadata->Lock();
if (!s.ok()) {
Status new_status = s.CloneAndPrepend(Substitute(
"Could not lock $0", instance_filename));
if (read_only_) {
// Not fatal in read-only mode.
LOG(WARNING) << new_status.ToString();
LOG(WARNING) << "Proceeding without lock";
} else {
return new_status;
}
}
}
instances.push_back(metadata.release());
}
RETURN_NOT_OK_PREPEND(PathInstanceMetadataFile::CheckIntegrity(instances),
Substitute("Could not verify integrity of files: $0",
JoinStrings(root_paths_, ",")));
PathMap instances_by_idx;
for (PathInstanceMetadataFile* instance : instances) {
const PathSetPB& path_set = instance->metadata()->path_set();
uint32_t idx = -1;
for (int i = 0; i < path_set.all_uuids_size(); i++) {
if (path_set.uuid() == path_set.all_uuids(i)) {
idx = i;
break;
}
}
DCHECK_NE(idx, -1); // Guaranteed by CheckIntegrity().
if (idx > kMaxPaths) {
return Status::NotSupported(
Substitute("File block manager supports a maximum of $0 paths", kMaxPaths));
}
InsertOrDie(&instances_by_idx, idx, instance);
}
instances.clear();
instances_by_idx.swap(root_paths_by_idx_);
next_root_path_ = root_paths_by_idx_.begin();
return Status::OK();
}
Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
gscoped_ptr<WritableBlock>* block) {
CHECK(!read_only_);
// Pick a root path using a simple round-robin block placement strategy.
uint16_t root_path_idx;
string root_path;
{
lock_guard<simple_spinlock> l(&lock_);
root_path_idx = next_root_path_->first;
root_path = next_root_path_->second->path();
next_root_path_++;
if (next_root_path_ == root_paths_by_idx_.end()) {
next_root_path_ = root_paths_by_idx_.begin();
}
}
string path;
vector<string> created_dirs;
Status s;
internal::FileBlockLocation location;
shared_ptr<WritableFile> writer;
// Repeat in case of block id collisions (unlikely).
do {
created_dirs.clear();
// Make sure we don't accidentally create a location using the magic
// invalid ID value.
BlockId id;
do {
id.SetId(rand_.Next64());
} while (id.IsNull());
location = internal::FileBlockLocation::FromParts(
root_path, root_path_idx, id);
path = location.GetFullPath();
RETURN_NOT_OK_PREPEND(location.CreateBlockDir(env_, &created_dirs), path);
WritableFileOptions wr_opts;
wr_opts.mode = Env::CREATE_NON_EXISTING;
s = env_util::OpenFileForWrite(wr_opts, env_, path, &writer);
} while (PREDICT_FALSE(s.IsAlreadyPresent()));
if (s.ok()) {
VLOG(1) << "Creating new block " << location.block_id().ToString() << " at " << path;
{
// Update dirty_dirs_ with those provided as well as the block's
// directory, which may not have been created but is definitely dirty
// (because we added a file to it).
lock_guard<simple_spinlock> l(&lock_);
for (const string& created : created_dirs) {
dirty_dirs_.insert(created);
}
dirty_dirs_.insert(DirName(path));
}
block->reset(new internal::FileWritableBlock(this, location, writer));
}
return s;
}
Status FileBlockManager::CreateBlock(gscoped_ptr<WritableBlock>* block) {
return CreateBlock(CreateBlockOptions(), block);
}
Status FileBlockManager::OpenBlock(const BlockId& block_id,
gscoped_ptr<ReadableBlock>* block) {
string path;
if (!FindBlockPath(block_id, &path)) {
return Status::NotFound(
Substitute("Block $0 not found", block_id.ToString()));
}
VLOG(1) << "Opening block with id " << block_id.ToString() << " at " << path;
shared_ptr<RandomAccessFile> reader;
RETURN_NOT_OK(env_util::OpenFileForRandom(env_, path, &reader));
block->reset(new internal::FileReadableBlock(this, block_id, reader));
return Status::OK();
}
Status FileBlockManager::DeleteBlock(const BlockId& block_id) {
CHECK(!read_only_);
string path;
if (!FindBlockPath(block_id, &path)) {
return Status::NotFound(
Substitute("Block $0 not found", block_id.ToString()));
}
RETURN_NOT_OK(env_->DeleteFile(path));
// We don't bother fsyncing the parent directory as there's nothing to be
// gained by ensuring that the deletion is made durable. Even if we did
// fsync it, we'd need to account for garbage at startup time (in the
// event that we crashed just before the fsync), and with such accounting
// fsync-as-you-delete is unnecessary.
//
// The block's directory hierarchy is left behind. We could prune it if
// it's empty, but that's racy and leaving it isn't much overhead.
return Status::OK();
}
Status FileBlockManager::CloseBlocks(const vector<WritableBlock*>& blocks) {
VLOG(3) << "Closing " << blocks.size() << " blocks";
if (FLAGS_block_coalesce_close) {
// Ask the kernel to begin writing out each block's dirty data. This is
// done up-front to give the kernel opportunities to coalesce contiguous
// dirty pages.
for (WritableBlock* block : blocks) {
RETURN_NOT_OK(block->FlushDataAsync());
}
}
// Now close each block, waiting for each to become durable.
for (WritableBlock* block : blocks) {
RETURN_NOT_OK(block->Close());
}
return Status::OK();
}
} // namespace fs
} // namespace kudu