blob: a59274b32a5d4e864f2023141d5883fe78def844 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/util/file_cache.h"
#include <atomic>
#include <cstdint>
#include <cstring>
#include <functional>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/gutil/macros.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/array_view.h"
#include "kudu/util/cache.h"
#include "kudu/util/cache_metrics.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/env.h"
#include "kudu/util/file_cache_metrics.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/locks.h"
#include "kudu/util/monotime.h"
#include "kudu/util/once.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/thread.h"
DEFINE_int32(file_cache_expiry_period_ms, 60 * 1000,
"Period of time (in ms) between removing expired file cache descriptors");
TAG_FLAG(file_cache_expiry_period_ms, advanced);
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace {
template <class FileType>
FileType* CacheValueToFileType(Slice s) {
return reinterpret_cast<FileType*>(*reinterpret_cast<void**>(
s.mutable_data()));
}
class EvictionCallback : public Cache::EvictionCallback {
public:
EvictionCallback() {}
void EvictedEntry(Slice key, Slice value) override {
VLOG(2) << "Evicted fd belonging to " << key.ToString();
delete CacheValueToFileType<File>(value);
}
private:
DISALLOW_COPY_AND_ASSIGN(EvictionCallback);
};
} // anonymous namespace
namespace internal {
template <class FileType>
class ScopedOpenedDescriptor;
// Encapsulates common descriptor fields and methods.
template <class FileType>
class BaseDescriptor {
public:
BaseDescriptor(FileCache* file_cache,
string filename)
: file_cache_(file_cache),
file_name_(std::move(filename)) {}
~BaseDescriptor() {
VLOG(2) << "Out of scope descriptor with file name: " << filename();
// The destruction of the descriptor indicates that there's no active user
// of the file at the moment. However, if the fd is still open in the LRU
// cache, should we leave it there, or should we evict it?
//
// We opt for eviction: your typical filesystem user is more likely to want
// the underlying resource released when they're done working with a file
// than they are to want the resource to be quickly accessible should the
// file be reopened.
cache()->Erase(filename());
if (deleted()) {
VLOG(1) << "Deleting file: " << filename();
WARN_NOT_OK(env()->DeleteFile(filename()), "");
}
// The (now expired) weak_ptr remains in 'descriptors_', to be removed by
// the next call to RunDescriptorExpiry(). Removing it here would risk a
// deadlock on recursive acquisition of 'lock_'.
}
// Insert a pointer to an open file object into the file cache with the
// filename as the cache key.
//
// Returns a handle to the inserted entry. The handle always contains an open
// file.
ScopedOpenedDescriptor<FileType> InsertIntoCache(void* file_ptr) const {
// The allocated charge is always one byte. This is incorrect with respect
// to memory tracking, but it's necessary if the cache capacity is to be
// equivalent to the max number of fds.
auto pending(cache()->Allocate(filename(), sizeof(file_ptr), 1));
CHECK(pending);
memcpy(cache()->MutableValue(&pending), &file_ptr, sizeof(file_ptr));
return ScopedOpenedDescriptor<FileType>(
this, cache()->Insert(std::move(pending),
file_cache_->eviction_cb_.get()));
}
// Retrieves a pointer to an open file object from the file cache with the
// filename as the cache key.
//
// Returns a handle to the looked up entry. The handle may or may not contain
// an open file, depending on whether the cache hit or missed.
ScopedOpenedDescriptor<FileType> LookupFromCache() const {
return ScopedOpenedDescriptor<FileType>(
this, cache()->Lookup(filename(), Cache::EXPECT_IN_CACHE));
}
// Mark this descriptor as to-be-deleted later.
void MarkDeleted() {
DCHECK(!deleted());
while (true) {
auto v = flags_.load();
if (flags_.compare_exchange_weak(v, v | FILE_DELETED)) return;
}
}
// Mark this descriptor as invalidated. No further access is allowed
// to this file.
void MarkInvalidated() {
DCHECK(!invalidated());
while (true) {
auto v = flags_.load();
if (flags_.compare_exchange_weak(v, v | INVALIDATED)) return;
}
}
Cache* cache() const { return file_cache_->cache_.get(); }
Env* env() const { return file_cache_->env_; }
const string& filename() const { return file_name_; }
bool deleted() const { return flags_.load() & FILE_DELETED; }
bool invalidated() const { return flags_.load() & INVALIDATED; }
private:
FileCache* file_cache_;
const string file_name_;
enum Flags {
FILE_DELETED = 1 << 0,
INVALIDATED = 1 << 1
};
std::atomic<uint8_t> flags_ {0};
DISALLOW_COPY_AND_ASSIGN(BaseDescriptor);
};
// A "smart" retrieved LRU cache handle.
//
// The cache handle is released when this object goes out of scope, possibly
// closing the opened file if it is no longer in the cache.
template <class FileType>
class ScopedOpenedDescriptor {
public:
// A not-yet-but-soon-to-be opened descriptor.
explicit ScopedOpenedDescriptor(const BaseDescriptor<FileType>* desc)
: desc_(desc),
handle_(nullptr, Cache::HandleDeleter(desc_->cache())) {
}
// An opened descriptor. Its handle may or may not contain an open file.
ScopedOpenedDescriptor(const BaseDescriptor<FileType>* desc,
Cache::UniqueHandle handle)
: desc_(desc),
handle_(std::move(handle)) {
}
bool opened() const { return handle_.get(); }
FileType* file() const {
DCHECK(opened());
return CacheValueToFileType<FileType>(desc_->cache()->Value(handle_));
}
private:
const BaseDescriptor<FileType>* desc_;
Cache::UniqueHandle handle_;
};
// Reference to an on-disk file that may or may not be opened (and thus
// cached) in the file cache.
//
// This empty template is just a specification; actual descriptor classes must
// be fully specialized.
template <class FileType>
class Descriptor : public FileType {
};
// A descriptor adhering to the RWFile interface (i.e. when opened, provides
// a read-write interface to the underlying file).
template <>
class Descriptor<RWFile> : public RWFile {
public:
Descriptor(FileCache* file_cache, const string& filename)
: base_(file_cache, filename) {}
~Descriptor() = default;
Status Read(uint64_t offset, Slice result) const override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
return opened.file()->Read(offset, result);
}
Status ReadV(uint64_t offset, ArrayView<Slice> results) const override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
return opened.file()->ReadV(offset, results);
}
Status Write(uint64_t offset, const Slice& data) override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
return opened.file()->Write(offset, data);
}
Status WriteV(uint64_t offset, ArrayView<const Slice> data) override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
return opened.file()->WriteV(offset, data);
}
Status PreAllocate(uint64_t offset, size_t length, PreAllocateMode mode) override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
return opened.file()->PreAllocate(offset, length, mode);
}
Status Truncate(uint64_t length) override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
return opened.file()->Truncate(length);
}
Status PunchHole(uint64_t offset, size_t length) override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
return opened.file()->PunchHole(offset, length);
}
Status Flush(FlushMode mode, uint64_t offset, size_t length) override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
return opened.file()->Flush(mode, offset, length);
}
Status Sync() override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
return opened.file()->Sync();
}
Status Close() override {
// Intentional no-op; actual closing is deferred to LRU cache eviction.
return Status::OK();
}
Status Size(uint64_t* size) const override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
return opened.file()->Size(size);
}
Status GetExtentMap(ExtentMap* out) const override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
return opened.file()->GetExtentMap(out);
}
bool IsEncrypted() const override {
return true;
}
const string& filename() const override {
return base_.filename();
}
size_t GetEncryptionHeaderSize() const override {
return base_.env()->GetEncryptionHeaderSize();
}
private:
friend class ::kudu::FileCache;
template <Env::OpenMode Mode>
Status Init() {
return once_.Init(&Descriptor<RWFile>::InitOnce<Mode>, this);
}
template <Env::OpenMode Mode>
Status InitOnce() {
return ReopenFileIfNecessary<Mode>(nullptr);
}
template <Env::OpenMode Mode>
Status ReopenFileIfNecessary(ScopedOpenedDescriptor<RWFile>* out) const {
ScopedOpenedDescriptor<RWFile> found(base_.LookupFromCache());
CHECK(!base_.invalidated());
if (found.opened()) {
// The file is already open in the cache, return it.
if (out) {
*out = std::move(found);
}
return Status::OK();
}
// The file was evicted, reopen it.
RWFileOptions opts;
opts.mode = Mode;
opts.is_sensitive = true;
unique_ptr<RWFile> f;
RETURN_NOT_OK(base_.env()->NewRWFile(opts, base_.filename(), &f));
// The cache will take ownership of the newly opened file.
ScopedOpenedDescriptor<RWFile> opened(base_.InsertIntoCache(f.release()));
if (out) {
*out = std::move(opened);
}
return Status::OK();
}
BaseDescriptor<RWFile> base_;
KuduOnceDynamic once_;
DISALLOW_COPY_AND_ASSIGN(Descriptor);
};
// A descriptor adhering to the RandomAccessFile interface (i.e. when opened,
// provides a read-only interface to the underlying file).
template <>
class Descriptor<RandomAccessFile> : public RandomAccessFile {
public:
Descriptor(FileCache* file_cache, const string& filename)
: base_(file_cache, filename) {}
~Descriptor() = default;
Status Read(uint64_t offset, Slice result) const override {
ScopedOpenedDescriptor<RandomAccessFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
return opened.file()->Read(offset, result);
}
Status ReadV(uint64_t offset, ArrayView<Slice> results) const override {
ScopedOpenedDescriptor<RandomAccessFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
return opened.file()->ReadV(offset, results);
}
Status Size(uint64_t *size) const override {
ScopedOpenedDescriptor<RandomAccessFile> opened(&base_);
RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
return opened.file()->Size(size);
}
const string& filename() const override {
return base_.filename();
}
size_t GetEncryptionHeaderSize() const override {
return base_.env()->GetEncryptionHeaderSize();
}
size_t memory_footprint() const override {
// Normally we would use kudu_malloc_usable_size(this). However, that's
// not safe because 'this' was allocated via std::make_shared(), which
// means it isn't necessarily the base of the memory allocation; it may be
// preceded by the shared_ptr control block.
//
// It doesn't appear possible to get the base of the allocation via any
// shared_ptr APIs, so we'll use sizeof(*this) + 16 instead. The 16 bytes
// represent the shared_ptr control block. Overall the object size is still
// undercounted as it doesn't account for any internal heap fragmentation,
// but at least it's safe.
//
// Some anecdotal memory measurements taken inside gdb:
// - glibc 2.23 malloc_usable_size() on make_shared<FileType>: 88 bytes.
// - tcmalloc malloc_usable_size() on make_shared<FileType>: 96 bytes.
// - sizeof(std::_Sp_counted_base<>) with libstdc++ 5.4: 16 bytes.
// - sizeof(std::__1::__shared_ptr_emplace<>) with libc++ 3.9: 16 bytes.
// - sizeof(*this): 72 bytes.
return sizeof(*this) +
16 + // shared_ptr control block
once_.memory_footprint_excluding_this() +
base_.filename().capacity();
}
private:
friend class ::kudu::FileCache;
Status Init() {
return once_.Init(&Descriptor<RandomAccessFile>::InitOnce, this);
}
Status InitOnce() {
return ReopenFileIfNecessary(nullptr);
}
Status ReopenFileIfNecessary(
ScopedOpenedDescriptor<RandomAccessFile>* out) const {
ScopedOpenedDescriptor<RandomAccessFile> found(base_.LookupFromCache());
CHECK(!base_.invalidated());
if (found.opened()) {
// The file is already open in the cache, return it.
if (out) {
*out = std::move(found);
}
return Status::OK();
}
// The file was evicted, reopen it.
unique_ptr<RandomAccessFile> f;
RandomAccessFileOptions opts;
opts.is_sensitive = true;
RETURN_NOT_OK(base_.env()->NewRandomAccessFile(opts, base_.filename(), &f));
// The cache will take ownership of the newly opened file.
ScopedOpenedDescriptor<RandomAccessFile> opened(
base_.InsertIntoCache(f.release()));
if (out) {
*out = std::move(opened);
}
return Status::OK();
}
BaseDescriptor<RandomAccessFile> base_;
KuduOnceDynamic once_;
DISALLOW_COPY_AND_ASSIGN(Descriptor);
};
} // namespace internal
const char* const FileCache::kAlreadyDeleted = "File already marked as deleted";
FileCache::FileCache(const string& cache_name,
Env* env,
int max_open_files,
const scoped_refptr<MetricEntity>& entity)
: env_(env),
cache_name_(cache_name),
eviction_cb_(new EvictionCallback()),
cache_(NewCache(max_open_files, cache_name)),
running_(1) {
if (entity) {
unique_ptr<FileCacheMetrics> metrics(new FileCacheMetrics(entity));
cache_->SetMetrics(std::move(metrics), Cache::ExistingMetricsPolicy::kKeep);
}
LOG(INFO) << Substitute("Constructed file cache $0 with capacity $1",
cache_name, max_open_files);
}
FileCache::~FileCache() {
running_.CountDown();
if (descriptor_expiry_thread_) {
descriptor_expiry_thread_->Join();
}
}
Status FileCache::Init() {
return Thread::Create("cache", Substitute("$0-evict", cache_name_),
[this]() { this->RunDescriptorExpiry(); },
&descriptor_expiry_thread_);
}
template <>
Status FileCache::DoOpenFile(const string& file_name,
shared_ptr<internal::Descriptor<RWFile>>* file,
bool* created_desc) {
shared_ptr<internal::Descriptor<RWFile>> d;
bool cd;
{
std::lock_guard<simple_spinlock> l(lock_);
d = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST,
&rwf_descs_, &cd);
DCHECK(d);
#ifndef NDEBUG
// Enforce the invariant that a particular file name may only be used by one
// descriptor at a time. This is expensive so it's only done in DEBUG mode.
bool ignored;
CHECK(!FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE,
&raf_descs_, &ignored));
#endif
}
if (d->base_.deleted()) {
return Status::NotFound(kAlreadyDeleted, file_name);
}
*file = std::move(d);
*created_desc = cd;
return Status::OK();
}
template <>
Status FileCache::DoOpenFile(const string& file_name,
shared_ptr<internal::Descriptor<RandomAccessFile>>* file,
bool* created_desc) {
shared_ptr<internal::Descriptor<RandomAccessFile>> d;
bool cd;
{
std::lock_guard<simple_spinlock> l(lock_);
d = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST,
&raf_descs_, &cd);
DCHECK(d);
#ifndef NDEBUG
// Enforce the invariant that a particular file name may only be used by one
// descriptor at a time. This is expensive so it's only done in DEBUG mode.
bool ignored;
CHECK(!FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE,
&rwf_descs_, &ignored));
#endif
}
if (d->base_.deleted()) {
return Status::NotFound(kAlreadyDeleted, file_name);
}
*file = std::move(d);
*created_desc = cd;
return Status::OK();
}
template <>
Status FileCache::OpenFile<Env::CREATE_OR_OPEN>(const string& file_name,
shared_ptr<RWFile>* file) {
shared_ptr<internal::Descriptor<RWFile>> d;
bool ignored;
RETURN_NOT_OK(DoOpenFile(file_name, &d, &ignored));
// Check that the underlying file can be opened (no-op for found descriptors).
RETURN_NOT_OK(d->Init<Env::CREATE_OR_OPEN>());
*file = std::move(d);
return Status::OK();
}
template <>
Status FileCache::OpenFile<Env::MUST_CREATE>(const string& file_name,
shared_ptr<RWFile>* file) {
shared_ptr<internal::Descriptor<RWFile>> d;
bool created_desc;
RETURN_NOT_OK(DoOpenFile(file_name, &d, &created_desc));
if (!created_desc) {
return Status::AlreadyPresent("file already exists", file_name);
}
// Check that the underlying file can be opened (no-op for found descriptors).
RETURN_NOT_OK(d->Init<Env::MUST_CREATE>());
*file = std::move(d);
return Status::OK();
}
template <>
Status FileCache::OpenFile<Env::MUST_EXIST>(const string& file_name,
shared_ptr<RWFile>* file) {
shared_ptr<internal::Descriptor<RWFile>> d;
bool ignored;
RETURN_NOT_OK(DoOpenFile(file_name, &d, &ignored));
// Check that the underlying file can be opened (no-op for found descriptors).
RETURN_NOT_OK(d->Init<Env::MUST_EXIST>());
*file = std::move(d);
return Status::OK();
}
template <>
Status FileCache::OpenFile<Env::MUST_EXIST>(const string& file_name,
shared_ptr<RandomAccessFile>* file) {
shared_ptr<internal::Descriptor<RandomAccessFile>> d;
bool ignored;
RETURN_NOT_OK(DoOpenFile(file_name, &d, &ignored));
// Check that the underlying file can be opened (no-op for found descriptors).
RETURN_NOT_OK(d->Init());
*file = std::move(d);
return Status::OK();
}
Status FileCache::DeleteFile(const string& file_name) {
// Mark any outstanding descriptor as deleted. Because there may only be one
// descriptor per file name, we can short circuit the search if we find a
// descriptor in the first map.
{
std::lock_guard<simple_spinlock> l(lock_);
bool ignored;
{
auto d = FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE,
&rwf_descs_, &ignored);
if (d) {
if (d->base_.deleted()) {
return Status::NotFound(kAlreadyDeleted, file_name);
}
d->base_.MarkDeleted();
return Status::OK();
}
}
{
auto d = FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE,
&raf_descs_, &ignored);
if (d) {
if (d->base_.deleted()) {
return Status::NotFound(kAlreadyDeleted, file_name);
}
d->base_.MarkDeleted();
return Status::OK();
}
}
}
// There are no outstanding descriptors. Delete the file now.
//
// Make sure it's been fully evicted from the cache (perhaps it was opened
// previously?) so that the filesystem can reclaim the file data instantly.
cache_->Erase(file_name);
return env_->DeleteFile(file_name);
}
void FileCache::Invalidate(const string& file_name) {
// Ensure that there are invalidated descriptors in both maps for this
// filename. This ensures that any concurrent opens during this method will
// see the invalidation and result in a CHECK failure.
//
// Note: this temporarily violates the invariant that no two descriptors may
// share the same file name. That's OK because the invalidation CHECK failure
// occurs before the client trips on the broken invariant.
shared_ptr<internal::Descriptor<RWFile>> rwf_desc;
shared_ptr<internal::Descriptor<RandomAccessFile>> raf_desc;
{
std::lock_guard<simple_spinlock> l(lock_);
bool ignored;
rwf_desc = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST,
&rwf_descs_, &ignored);
DCHECK(rwf_desc);
rwf_desc->base_.MarkInvalidated();
raf_desc = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST,
&raf_descs_, &ignored);
DCHECK(raf_desc);
raf_desc->base_.MarkInvalidated();
}
// Remove it from the cache so that if the same path is opened again, we
// will re-open a new FD rather than retrieving one that might have been
// cached prior to invalidation.
cache_->Erase(file_name);
// Remove the invalidated descriptors from the maps. We are guaranteed they
// are still there because we've held a strong references to them for the
// duration of this method, and no other methods erase strong references from
// the maps.
{
std::lock_guard<simple_spinlock> l(lock_);
CHECK_EQ(1, rwf_descs_.erase(file_name));
CHECK_EQ(1, raf_descs_.erase(file_name));
}
}
size_t FileCache::NumDescriptorsForTests() const {
std::lock_guard<simple_spinlock> l(lock_);
return rwf_descs_.size() + raf_descs_.size();
}
string FileCache::ToDebugString() const {
string ret;
// We need to iterate through the descriptor maps, so make temporary copies
// of them.
DescriptorMap<RWFile> rwfs_copy;
DescriptorMap<RandomAccessFile> rafs_copy;
{
std::lock_guard<simple_spinlock> l(lock_);
rwfs_copy = rwf_descs_;
rafs_copy = raf_descs_;
}
// Dump the contents of the copies.
ret += MapToDebugString(rwfs_copy, "rwf");
ret += MapToDebugString(rafs_copy, "raf");
return ret;
}
template <class FileType>
string FileCache::MapToDebugString(const DescriptorMap<FileType>& descs,
const string& prefix) {
string ret;
for (const auto& e : descs) {
bool strong = false;
bool deleted = false;
bool opened = false;
shared_ptr<internal::Descriptor<FileType>> d = e.second.lock();
if (d) {
strong = true;
if (d->base_.deleted()) {
deleted = true;
}
internal::ScopedOpenedDescriptor<FileType> sod(d->base_.LookupFromCache());
if (sod.opened()) {
opened = true;
}
}
if (strong) {
ret += Substitute("$0: $1 (S$2$3)\n", prefix, e.first,
deleted ? "D" : "", opened ? "O" : "");
} else {
ret += Substitute("$0: $1\n", prefix, e.first);
}
}
return ret;
}
template <class FileType>
shared_ptr<internal::Descriptor<FileType>> FileCache::FindDescriptorUnlocked(
const string& file_name,
FindMode mode,
DescriptorMap<FileType>* descs,
bool* created_desc) {
DCHECK(lock_.is_locked());
shared_ptr<internal::Descriptor<FileType>> d;
auto it = descs->find(file_name);
if (it != descs->end()) {
// Found the descriptor. Has it expired?
d = it->second.lock();
if (d) {
CHECK(!d->base_.invalidated());
// Descriptor is still valid, return it.
VLOG(2) << "Found existing descriptor: " << file_name;
*created_desc = false;
return d;
}
// Descriptor has expired; erase it and pretend we found nothing.
descs->erase(it);
}
if (mode == FindMode::CREATE_IF_NOT_EXIST) {
d = std::make_shared<internal::Descriptor<FileType>>(this, file_name);
EmplaceOrDie(descs, file_name, d);
VLOG(2) << "Created new descriptor: " << file_name;
*created_desc = true;
} else {
*created_desc = false;
}
return d;
}
template <class FileType>
void FileCache::ExpireDescriptorsFromMap(DescriptorMap<FileType>* descs) {
for (auto it = descs->begin(); it != descs->end();) {
if (it->second.expired()) {
it = descs->erase(it);
} else {
it++;
}
}
}
void FileCache::RunDescriptorExpiry() {
while (!running_.WaitFor(MonoDelta::FromMilliseconds(
FLAGS_file_cache_expiry_period_ms))) {
std::lock_guard<simple_spinlock> l(lock_);
ExpireDescriptorsFromMap(&rwf_descs_);
ExpireDescriptorsFromMap(&raf_descs_);
}
}
} // namespace kudu