blob: a1ab8140f12c6ff641c9d11b73a1e695d7002d87 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/util/file_cache.h"
#include <atomic>
#include <cstdint>
#include <cstring>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/array_view.h"
#include "kudu/util/cache.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/env.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/locks.h"
#include "kudu/util/metrics.h" // IWYU pragma: keep
#include "kudu/util/monotime.h"
#include "kudu/util/once.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/thread.h"
DEFINE_int32(file_cache_expiry_period_ms, 60 * 1000,
"Period of time (in ms) between removing expired file cache descriptors");
TAG_FLAG(file_cache_expiry_period_ms, advanced);
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace {
template <class FileType>
FileType* CacheValueToFileType(Slice s) {
return reinterpret_cast<FileType*>(*reinterpret_cast<void**>(
s.mutable_data()));
}
template <class FileType>
class EvictionCallback : public Cache::EvictionCallback {
public:
EvictionCallback() {}
void EvictedEntry(Slice key, Slice value) override {
VLOG(2) << "Evicted fd belonging to " << key.ToString();
delete CacheValueToFileType<FileType>(value);
}
private:
DISALLOW_COPY_AND_ASSIGN(EvictionCallback);
};
} // anonymous namespace
namespace internal {
template <class FileType>
class ScopedOpenedDescriptor;
// Encapsulates common descriptor fields and methods.
template <class FileType>
class BaseDescriptor {
public:
BaseDescriptor(FileCache<FileType>* file_cache,
string filename)
: file_cache_(file_cache),
file_name_(std::move(filename)) {}
~BaseDescriptor() {
VLOG(2) << "Out of scope descriptor with file name: " << filename();
// The (now expired) weak_ptr remains in 'descriptors_', to be removed by
// the next call to RunDescriptorExpiry(). Removing it here would risk a
// deadlock on recursive acquisition of 'lock_'.
if (deleted()) {
cache()->Erase(filename());
VLOG(1) << "Deleting file: " << filename();
WARN_NOT_OK(env()->DeleteFile(filename()), "");
}
}
// Insert a pointer to an open file object into the file cache with the
// filename as the cache key.
//
// Returns a handle to the inserted entry. The handle always contains an open
// file.
ScopedOpenedDescriptor<FileType> InsertIntoCache(void* file_ptr) const {
// The allocated charge is always one byte. This is incorrect with respect
// to memory tracking, but it's necessary if the cache capacity is to be
// equivalent to the max number of fds.
Cache::PendingHandle* pending = CHECK_NOTNULL(cache()->Allocate(
filename(), sizeof(file_ptr), 1));
memcpy(cache()->MutableValue(pending),
&file_ptr,
sizeof(file_ptr));
return ScopedOpenedDescriptor<FileType>(this, Cache::UniqueHandle(
cache()->Insert(pending, file_cache_->eviction_cb_.get()),
Cache::HandleDeleter(cache())));
}
// Retrieves a pointer to an open file object from the file cache with the
// filename as the cache key.
//
// Returns a handle to the looked up entry. The handle may or may not contain
// an open file, depending on whether the cache hit or missed.
ScopedOpenedDescriptor<FileType> LookupFromCache() const {
return ScopedOpenedDescriptor<FileType>(this, Cache::UniqueHandle(
cache()->Lookup(filename(), Cache::EXPECT_IN_CACHE),
Cache::HandleDeleter(cache())));
}
// Mark this descriptor as to-be-deleted later.
void MarkDeleted() {
DCHECK(!deleted());
while (true) {
auto v = flags_.load();
if (flags_.compare_exchange_weak(v, v | FILE_DELETED)) return;
}
}
// Mark this descriptor as invalidated. No further access is allowed
// to this file.
void MarkInvalidated() {
DCHECK(!invalidated());
while (true) {
auto v = flags_.load();
if (flags_.compare_exchange_weak(v, v | INVALIDATED)) return;
}
}
Cache* cache() const { return file_cache_->cache_.get(); }
Env* env() const { return file_cache_->env_; }
const string& filename() const { return file_name_; }
bool deleted() const { return flags_.load() & FILE_DELETED; }
bool invalidated() const { return flags_.load() & INVALIDATED; }
private:
FileCache<FileType>* file_cache_;
const string file_name_;
enum Flags {
FILE_DELETED = 1 << 0,
INVALIDATED = 1 << 1
};
std::atomic<uint8_t> flags_ {0};
DISALLOW_COPY_AND_ASSIGN(BaseDescriptor);
};
// A "smart" retrieved LRU cache handle.
//
// The cache handle is released when this object goes out of scope, possibly
// closing the opened file if it is no longer in the cache.
template <class FileType>
class ScopedOpenedDescriptor {
public:
// A not-yet-but-soon-to-be opened descriptor.
explicit ScopedOpenedDescriptor(const BaseDescriptor<FileType>* desc)
: desc_(desc),
handle_(nullptr, Cache::HandleDeleter(desc_->cache())) {
}
// An opened descriptor. Its handle may or may not contain an open file.
ScopedOpenedDescriptor(const BaseDescriptor<FileType>* desc,
Cache::UniqueHandle handle)
: desc_(desc),
handle_(std::move(handle)) {
}
bool opened() const { return handle_.get(); }
FileType* file() const {
DCHECK(opened());
return CacheValueToFileType<FileType>(desc_->cache()->Value(handle_.get()));
}
private:
const BaseDescriptor<FileType>* desc_;
Cache::UniqueHandle handle_;
};
// Reference to an on-disk file that may or may not be opened (and thus
// cached) in the file cache.
//
// This empty template is just a specification; actual descriptor classes must
// be fully specialized.
template <class FileType>
class Descriptor : public FileType {
};
// A descriptor adhering to the RWFile interface (i.e. when opened, provides
// a read-write interface to the underlying file).
template <>
class Descriptor<RWFile> : public RWFile {
public:
Descriptor(FileCache<RWFile>* file_cache, const string& filename)
: base_(file_cache, filename) {}
~Descriptor() = default;
Status Read(uint64_t offset, Slice result) const override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
return opened.file()->Read(offset, result);
}
Status ReadV(uint64_t offset, ArrayView<Slice> results) const override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
return opened.file()->ReadV(offset, results);
}
Status Write(uint64_t offset, const Slice& data) override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
return opened.file()->Write(offset, data);
}
Status WriteV(uint64_t offset, ArrayView<const Slice> data) override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
return opened.file()->WriteV(offset, data);
}
Status PreAllocate(uint64_t offset, size_t length, PreAllocateMode mode) override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
return opened.file()->PreAllocate(offset, length, mode);
}
Status Truncate(uint64_t length) override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
return opened.file()->Truncate(length);
}
Status PunchHole(uint64_t offset, size_t length) override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
return opened.file()->PunchHole(offset, length);
}
Status Flush(FlushMode mode, uint64_t offset, size_t length) override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
return opened.file()->Flush(mode, offset, length);
}
Status Sync() override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
return opened.file()->Sync();
}
Status Close() override {
// Intentional no-op; actual closing is deferred to LRU cache eviction.
return Status::OK();
}
Status Size(uint64_t* size) const override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
return opened.file()->Size(size);
}
Status GetExtentMap(ExtentMap* out) const override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
return opened.file()->GetExtentMap(out);
}
const string& filename() const override {
return base_.filename();
}
private:
friend class FileCache<RWFile>;
Status Init() {
return once_.Init(&Descriptor<RWFile>::InitOnce, this);
}
Status InitOnce() {
return ReopenFileIfNecessary(nullptr);
}
Status ReopenFileIfNecessary(ScopedOpenedDescriptor<RWFile>* out) const {
ScopedOpenedDescriptor<RWFile> found(base_.LookupFromCache());
CHECK(!base_.invalidated());
if (found.opened()) {
// The file is already open in the cache, return it.
if (out) {
*out = std::move(found);
}
return Status::OK();
}
// The file was evicted, reopen it.
//
// Because the file may be evicted at any time we must use 'sync_on_close'
// (note: sync is a no-op if the file isn't dirty).
RWFileOptions opts;
opts.sync_on_close = true;
opts.mode = Env::OPEN_EXISTING;
unique_ptr<RWFile> f;
RETURN_NOT_OK(base_.env()->NewRWFile(opts, base_.filename(), &f));
// The cache will take ownership of the newly opened file.
ScopedOpenedDescriptor<RWFile> opened(base_.InsertIntoCache(f.release()));
if (out) {
*out = std::move(opened);
}
return Status::OK();
}
BaseDescriptor<RWFile> base_;
KuduOnceDynamic once_;
DISALLOW_COPY_AND_ASSIGN(Descriptor);
};
// A descriptor adhering to the RandomAccessFile interface (i.e. when opened,
// provides a read-only interface to the underlying file).
template <>
class Descriptor<RandomAccessFile> : public RandomAccessFile {
public:
Descriptor(FileCache<RandomAccessFile>* file_cache, const string& filename)
: base_(file_cache, filename) {}
~Descriptor() = default;
Status Read(uint64_t offset, Slice result) const override {
ScopedOpenedDescriptor<RandomAccessFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
return opened.file()->Read(offset, result);
}
Status ReadV(uint64_t offset, ArrayView<Slice> results) const override {
ScopedOpenedDescriptor<RandomAccessFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
return opened.file()->ReadV(offset, results);
}
Status Size(uint64_t *size) const override {
ScopedOpenedDescriptor<RandomAccessFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
return opened.file()->Size(size);
}
const string& filename() const override {
return base_.filename();
}
size_t memory_footprint() const override {
// Normally we would use kudu_malloc_usable_size(this). However, that's
// not safe because 'this' was allocated via std::make_shared(), which
// means it isn't necessarily the base of the memory allocation; it may be
// preceded by the shared_ptr control block.
//
// It doesn't appear possible to get the base of the allocation via any
// shared_ptr APIs, so we'll use sizeof(*this) + 16 instead. The 16 bytes
// represent the shared_ptr control block. Overall the object size is still
// undercounted as it doesn't account for any internal heap fragmentation,
// but at least it's safe.
//
// Some anecdotal memory measurements taken inside gdb:
// - glibc 2.23 malloc_usable_size() on make_shared<FileType>: 88 bytes.
// - tcmalloc malloc_usable_size() on make_shared<FileType>: 96 bytes.
// - sizeof(std::_Sp_counted_base<>) with libstdc++ 5.4: 16 bytes.
// - sizeof(std::__1::__shared_ptr_emplace<>) with libc++ 3.9: 16 bytes.
// - sizeof(*this): 72 bytes.
return sizeof(*this) +
16 + // shared_ptr control block
once_.memory_footprint_excluding_this() +
base_.filename().capacity();
}
private:
friend class FileCache<RandomAccessFile>;
Status Init() {
return once_.Init(&Descriptor<RandomAccessFile>::InitOnce, this);
}
Status InitOnce() {
return ReopenFileIfNecessary(nullptr);
}
Status ReopenFileIfNecessary(
ScopedOpenedDescriptor<RandomAccessFile>* out) const {
ScopedOpenedDescriptor<RandomAccessFile> found(base_.LookupFromCache());
CHECK(!base_.invalidated());
if (found.opened()) {
// The file is already open in the cache, return it.
if (out) {
*out = std::move(found);
}
return Status::OK();
}
// The file was evicted, reopen it.
unique_ptr<RandomAccessFile> f;
RETURN_NOT_OK(base_.env()->NewRandomAccessFile(base_.filename(), &f));
// The cache will take ownership of the newly opened file.
ScopedOpenedDescriptor<RandomAccessFile> opened(
base_.InsertIntoCache(f.release()));
if (out) {
*out = std::move(opened);
}
return Status::OK();
}
BaseDescriptor<RandomAccessFile> base_;
KuduOnceDynamic once_;
DISALLOW_COPY_AND_ASSIGN(Descriptor);
};
} // namespace internal
template <class FileType>
FileCache<FileType>::FileCache(const string& cache_name,
Env* env,
int max_open_files,
const scoped_refptr<MetricEntity>& entity)
: env_(env),
cache_name_(cache_name),
eviction_cb_(new EvictionCallback<FileType>()),
cache_(NewLRUCache(DRAM_CACHE, max_open_files, cache_name)),
running_(1) {
if (entity) {
cache_->SetMetrics(entity);
}
LOG(INFO) << Substitute("Constructed file cache $0 with capacity $1",
cache_name, max_open_files);
}
template <class FileType>
FileCache<FileType>::~FileCache() {
running_.CountDown();
if (descriptor_expiry_thread_) {
descriptor_expiry_thread_->Join();
}
}
template <class FileType>
Status FileCache<FileType>::Init() {
return Thread::Create("cache", Substitute("$0-evict", cache_name_),
&FileCache::RunDescriptorExpiry, this,
&descriptor_expiry_thread_);
}
template <class FileType>
Status FileCache<FileType>::OpenExistingFile(const string& file_name,
shared_ptr<FileType>* file) {
shared_ptr<internal::Descriptor<FileType>> desc;
{
// Find an existing descriptor, or create one if none exists.
std::lock_guard<simple_spinlock> l(lock_);
RETURN_NOT_OK(FindDescriptorUnlocked(file_name, &desc));
if (desc) {
VLOG(2) << "Found existing descriptor: " << desc->filename();
} else {
desc = std::make_shared<internal::Descriptor<FileType>>(this, file_name);
InsertOrDie(&descriptors_, file_name, desc);
VLOG(2) << "Created new descriptor: " << desc->filename();
}
}
// Check that the underlying file can be opened (no-op for found
// descriptors). Done outside the lock.
RETURN_NOT_OK(desc->Init());
*file = std::move(desc);
return Status::OK();
}
template <class FileType>
Status FileCache<FileType>::DeleteFile(const string& file_name) {
{
std::lock_guard<simple_spinlock> l(lock_);
shared_ptr<internal::Descriptor<FileType>> desc;
RETURN_NOT_OK(FindDescriptorUnlocked(file_name, &desc));
if (desc) {
VLOG(2) << "Marking file for deletion: " << file_name;
desc->base_.MarkDeleted();
return Status::OK();
}
}
// There is no outstanding descriptor. Delete the file now.
//
// Make sure it's been fully evicted from the cache (perhaps it was opened
// previously?) so that the filesystem can reclaim the file data instantly.
cache_->Erase(file_name);
return env_->DeleteFile(file_name);
}
template <class FileType>
void FileCache<FileType>::Invalidate(const string& file_name) {
// Ensure that there is an invalidated descriptor in the map for this filename.
//
// This ensures that any concurrent OpenExistingFile() during this method wil
// see the invalidation and issue a CHECK failure.
shared_ptr<internal::Descriptor<FileType>> desc;
{
// Find an existing descriptor, or create one if none exists.
std::lock_guard<simple_spinlock> l(lock_);
auto it = descriptors_.find(file_name);
if (it != descriptors_.end()) {
desc = it->second.lock();
}
if (!desc) {
desc = std::make_shared<internal::Descriptor<FileType>>(this, file_name);
descriptors_.emplace(file_name, desc);
}
desc->base_.MarkInvalidated();
}
// Remove it from the cache so that if the same path is opened again, we
// will re-open a new FD rather than retrieving one that might have been
// cached prior to invalidation.
cache_->Erase(file_name);
// Remove the invalidated descriptor from the map. We are guaranteed it
// is still there because we've held a strong reference to it for
// the duration of this method, and no other methods erase strong
// references from the map.
{
std::lock_guard<simple_spinlock> l(lock_);
CHECK_EQ(1, descriptors_.erase(file_name));
}
}
template <class FileType>
int FileCache<FileType>::NumDescriptorsForTests() const {
std::lock_guard<simple_spinlock> l(lock_);
return descriptors_.size();
}
template <class FileType>
string FileCache<FileType>::ToDebugString() const {
std::lock_guard<simple_spinlock> l(lock_);
string ret;
for (const auto& e : descriptors_) {
bool strong = false;
bool deleted = false;
bool opened = false;
shared_ptr<internal::Descriptor<FileType>> desc = e.second.lock();
if (desc) {
strong = true;
if (desc->base_.deleted()) {
deleted = true;
}
internal::ScopedOpenedDescriptor<FileType> o(
desc->base_.LookupFromCache());
if (o.opened()) {
opened = true;
}
}
if (strong) {
ret += Substitute("$0 (S$1$2)\n", e.first,
deleted ? "D" : "", opened ? "O" : "");
} else {
ret += Substitute("$0\n", e.first);
}
}
return ret;
}
template <class FileType>
Status FileCache<FileType>::FindDescriptorUnlocked(
const string& file_name,
shared_ptr<internal::Descriptor<FileType>>* file) {
DCHECK(lock_.is_locked());
auto it = descriptors_.find(file_name);
if (it != descriptors_.end()) {
// Found the descriptor. Has it expired?
shared_ptr<internal::Descriptor<FileType>> desc = it->second.lock();
if (desc) {
CHECK(!desc->base_.invalidated());
if (desc->base_.deleted()) {
return Status::NotFound("File already marked for deletion", file_name);
}
// Descriptor is still valid, return it.
if (file) {
*file = desc;
}
return Status::OK();
}
// Descriptor has expired; erase it and pretend we found nothing.
descriptors_.erase(it);
}
return Status::OK();
}
template <class FileType>
void FileCache<FileType>::RunDescriptorExpiry() {
while (!running_.WaitFor(MonoDelta::FromMilliseconds(
FLAGS_file_cache_expiry_period_ms))) {
std::lock_guard<simple_spinlock> l(lock_);
for (auto it = descriptors_.begin(); it != descriptors_.end();) {
if (it->second.expired()) {
it = descriptors_.erase(it);
} else {
it++;
}
}
}
}
// Explicit specialization for callers outside this compilation unit.
template class FileCache<RWFile>;
template class FileCache<RandomAccessFile>;
} // namespace kudu