blob: 72c69bc1b9bf8668b90fb7d8d187bd55633e4aa0 [file] [log] [blame]
// Some portions copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "kudu/util/cache.h"
#include <atomic>
#include <cstdint>
#include <cstring>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/gutil/bits.h"
#include "kudu/gutil/hash/city.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/sysinfo.h"
#include "kudu/util/alignment.h"
#include "kudu/util/cache_metrics.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/locks.h"
#include "kudu/util/malloc.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/metrics.h"
#include "kudu/util/slice.h"
#include "kudu/util/test_util_prod.h"
// Useful in tests that require accurate cache capacity accounting.
DEFINE_bool(cache_force_single_shard, false,
"Override all cache implementations to use just one shard");
TAG_FLAG(cache_force_single_shard, hidden);
DEFINE_double(cache_memtracker_approximation_ratio, 0.01,
"The MemTracker associated with a cache can accumulate error up to "
"this ratio to improve performance. For tests.");
TAG_FLAG(cache_memtracker_approximation_ratio, hidden);
using std::atomic;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
namespace kudu {
Cache::~Cache() {
}
const Cache::ValidityFunc Cache::kInvalidateAllEntriesFunc = [](
Slice /* key */, Slice /* value */) {
return false;
};
const Cache::IterationFunc Cache::kIterateOverAllEntriesFunc = [](
size_t /* valid_entries_num */, size_t /* invalid_entries_num */) {
return true;
};
namespace {
// Recency list cache implementations (FIFO, LRU, etc.)
// Recency list handle. An entry is a variable length heap-allocated structure.
// Entries are kept in a circular doubly linked list ordered by some recency
// criterion (e.g., access time for LRU policy, insertion time for FIFO policy).
struct RLHandle {
Cache::EvictionCallback* eviction_callback;
RLHandle* next_hash;
RLHandle* next;
RLHandle* prev;
size_t charge; // TODO(opt): Only allow uint32_t?
uint32_t key_length;
uint32_t val_length;
std::atomic<int32_t> refs;
uint32_t hash; // Hash of key(); used for fast sharding and comparisons
// The storage for the key/value pair itself. The data is stored as:
// [key bytes ...] [padding up to 8-byte boundary] [value bytes ...]
uint8_t kv_data[1]; // Beginning of key/value pair
Slice key() const {
return Slice(kv_data, key_length);
}
uint8_t* mutable_val_ptr() {
int val_offset = KUDU_ALIGN_UP(key_length, sizeof(void*));
return &kv_data[val_offset];
}
const uint8_t* val_ptr() const {
return const_cast<RLHandle*>(this)->mutable_val_ptr();
}
Slice value() const {
return Slice(val_ptr(), val_length);
}
};
// We provide our own simple hash table since it removes a whole bunch
// of porting hacks and is also faster than some of the built-in hash
// table implementations in some of the compiler/runtime combinations
// we have tested. E.g., readrandom speeds up by ~5% over the g++
// 4.4.3's builtin hashtable.
class HandleTable {
public:
HandleTable() : length_(0), elems_(0), list_(nullptr) { Resize(); }
~HandleTable() { delete[] list_; }
RLHandle* Lookup(const Slice& key, uint32_t hash) {
return *FindPointer(key, hash);
}
RLHandle* Insert(RLHandle* h) {
RLHandle** ptr = FindPointer(h->key(), h->hash);
RLHandle* old = *ptr;
h->next_hash = (old == nullptr ? nullptr : old->next_hash);
*ptr = h;
if (old == nullptr) {
++elems_;
if (elems_ > length_) {
// Since each cache entry is fairly large, we aim for a small
// average linked list length (<= 1).
Resize();
}
}
return old;
}
RLHandle* Remove(const Slice& key, uint32_t hash) {
RLHandle** ptr = FindPointer(key, hash);
RLHandle* result = *ptr;
if (result != nullptr) {
*ptr = result->next_hash;
--elems_;
}
return result;
}
private:
// The table consists of an array of buckets where each bucket is
// a linked list of cache entries that hash into the bucket.
uint32_t length_;
uint32_t elems_;
RLHandle** list_;
// Return a pointer to slot that points to a cache entry that
// matches key/hash. If there is no such cache entry, return a
// pointer to the trailing slot in the corresponding linked list.
RLHandle** FindPointer(const Slice& key, uint32_t hash) {
RLHandle** ptr = &list_[hash & (length_ - 1)];
while (*ptr != nullptr &&
((*ptr)->hash != hash || key != (*ptr)->key())) {
ptr = &(*ptr)->next_hash;
}
return ptr;
}
void Resize() {
uint32_t new_length = 16;
while (new_length < elems_ * 1.5) {
new_length *= 2;
}
auto new_list = new RLHandle*[new_length];
memset(new_list, 0, sizeof(new_list[0]) * new_length);
uint32_t count = 0;
for (uint32_t i = 0; i < length_; i++) {
RLHandle* h = list_[i];
while (h != nullptr) {
RLHandle* next = h->next_hash;
uint32_t hash = h->hash;
RLHandle** ptr = &new_list[hash & (new_length - 1)];
h->next_hash = *ptr;
*ptr = h;
h = next;
count++;
}
}
DCHECK_EQ(elems_, count);
delete[] list_;
list_ = new_list;
length_ = new_length;
}
};
string ToString(Cache::EvictionPolicy p) {
switch (p) {
case Cache::EvictionPolicy::FIFO:
return "fifo";
case Cache::EvictionPolicy::LRU:
return "lru";
default:
LOG(FATAL) << "unexpected cache eviction policy: " << static_cast<int>(p);
break;
}
return "unknown";
}
// A single shard of sharded cache.
template<Cache::EvictionPolicy policy>
class CacheShard {
public:
explicit CacheShard(MemTracker* tracker);
~CacheShard();
// Separate from constructor so caller can easily make an array of CacheShard
void SetCapacity(size_t capacity) {
capacity_ = capacity;
max_deferred_consumption_ = capacity * FLAGS_cache_memtracker_approximation_ratio;
}
void SetMetrics(CacheMetrics* metrics) { metrics_ = metrics; }
Cache::Handle* Insert(RLHandle* handle, Cache::EvictionCallback* eviction_callback);
// Like Cache::Lookup, but with an extra "hash" parameter.
Cache::Handle* Lookup(const Slice& key, uint32_t hash, bool caching);
void Release(Cache::Handle* handle);
void Erase(const Slice& key, uint32_t hash);
size_t Invalidate(const Cache::InvalidationControl& ctl);
private:
void RL_Remove(RLHandle* e);
void RL_Append(RLHandle* e);
// Update the recency list after a lookup operation.
void RL_UpdateAfterLookup(RLHandle* e);
// Just reduce the reference count by 1.
// Return true if last reference
bool Unref(RLHandle* e);
// Call the user's eviction callback, if it exists, and free the entry.
void FreeEntry(RLHandle* e);
// Update the memtracker's consumption by the given amount.
//
// This "buffers" the updates locally in 'deferred_consumption_' until the amount
// of accumulated delta is more than ~1% of the cache capacity. This improves
// performance under workloads with high eviction rates for a few reasons:
//
// 1) once the cache reaches its full capacity, we expect it to remain there
// in steady state. Each insertion is usually matched by an eviction, and unless
// the total size of the evicted item(s) is much different than the size of the
// inserted item, each eviction event is unlikely to change the total cache usage
// much. So, we expect that the accumulated error will mostly remain around 0
// and we can avoid propagating changes to the MemTracker at all.
//
// 2) because the cache implementation is sharded, we do this tracking in a bunch
// of different locations, avoiding bouncing cache-lines between cores. By contrast
// the MemTracker is a simple integer, so it doesn't scale as well under concurrency.
//
// Positive delta indicates an increased memory consumption.
void UpdateMemTracker(int64_t delta);
// Update the metrics for a lookup operation in the cache.
void UpdateMetricsLookup(bool was_hit, bool caching);
// Initialized before use.
size_t capacity_;
// mutex_ protects the following state.
simple_spinlock mutex_;
size_t usage_;
// Dummy head of recency list.
// rl.prev is newest entry, rl.next is oldest entry.
RLHandle rl_;
HandleTable table_;
MemTracker* mem_tracker_;
atomic<int64_t> deferred_consumption_ { 0 };
// Initialized based on capacity_ to ensure an upper bound on the error on the
// MemTracker consumption.
int64_t max_deferred_consumption_;
CacheMetrics* metrics_;
};
template<Cache::EvictionPolicy policy>
CacheShard<policy>::CacheShard(MemTracker* tracker)
: usage_(0),
mem_tracker_(tracker),
metrics_(nullptr) {
// Make empty circular linked list.
rl_.next = &rl_;
rl_.prev = &rl_;
}
template<Cache::EvictionPolicy policy>
CacheShard<policy>::~CacheShard() {
for (RLHandle* e = rl_.next; e != &rl_; ) {
RLHandle* next = e->next;
DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 1)
<< "caller has an unreleased handle";
if (Unref(e)) {
FreeEntry(e);
}
e = next;
}
mem_tracker_->Consume(deferred_consumption_);
}
template<Cache::EvictionPolicy policy>
bool CacheShard<policy>::Unref(RLHandle* e) {
DCHECK_GT(e->refs.load(std::memory_order_relaxed), 0);
return e->refs.fetch_sub(1) == 1;
}
template<Cache::EvictionPolicy policy>
void CacheShard<policy>::FreeEntry(RLHandle* e) {
DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 0);
if (e->eviction_callback) {
e->eviction_callback->EvictedEntry(e->key(), e->value());
}
UpdateMemTracker(-static_cast<int64_t>(e->charge));
if (PREDICT_TRUE(metrics_)) {
metrics_->cache_usage->DecrementBy(e->charge);
metrics_->evictions->Increment();
}
delete [] e;
}
template<Cache::EvictionPolicy policy>
void CacheShard<policy>::UpdateMemTracker(int64_t delta) {
int64_t old_deferred = deferred_consumption_.fetch_add(delta);
int64_t new_deferred = old_deferred + delta;
if (new_deferred > max_deferred_consumption_ ||
new_deferred < -max_deferred_consumption_) {
int64_t to_propagate = deferred_consumption_.exchange(0, std::memory_order_relaxed);
mem_tracker_->Consume(to_propagate);
}
}
template<Cache::EvictionPolicy policy>
void CacheShard<policy>::UpdateMetricsLookup(bool was_hit, bool caching) {
if (PREDICT_TRUE(metrics_)) {
metrics_->lookups->Increment();
if (was_hit) {
if (caching) {
metrics_->cache_hits_caching->Increment();
} else {
metrics_->cache_hits->Increment();
}
} else {
if (caching) {
metrics_->cache_misses_caching->Increment();
} else {
metrics_->cache_misses->Increment();
}
}
}
}
template<Cache::EvictionPolicy policy>
void CacheShard<policy>::RL_Remove(RLHandle* e) {
e->next->prev = e->prev;
e->prev->next = e->next;
DCHECK_GE(usage_, e->charge);
usage_ -= e->charge;
}
template<Cache::EvictionPolicy policy>
void CacheShard<policy>::RL_Append(RLHandle* e) {
// Make "e" newest entry by inserting just before rl_.
e->next = &rl_;
e->prev = rl_.prev;
e->prev->next = e;
e->next->prev = e;
usage_ += e->charge;
}
template<>
void CacheShard<Cache::EvictionPolicy::FIFO>::RL_UpdateAfterLookup(RLHandle* /* e */) {
}
template<>
void CacheShard<Cache::EvictionPolicy::LRU>::RL_UpdateAfterLookup(RLHandle* e) {
RL_Remove(e);
RL_Append(e);
}
template<Cache::EvictionPolicy policy>
Cache::Handle* CacheShard<policy>::Lookup(const Slice& key,
uint32_t hash,
bool caching) {
RLHandle* e;
{
std::lock_guard<decltype(mutex_)> l(mutex_);
e = table_.Lookup(key, hash);
if (e != nullptr) {
e->refs.fetch_add(1, std::memory_order_relaxed);
RL_UpdateAfterLookup(e);
}
}
// Do the metrics outside of the lock.
UpdateMetricsLookup(e != nullptr, caching);
return reinterpret_cast<Cache::Handle*>(e);
}
template<Cache::EvictionPolicy policy>
void CacheShard<policy>::Release(Cache::Handle* handle) {
RLHandle* e = reinterpret_cast<RLHandle*>(handle);
bool last_reference = Unref(e);
if (last_reference) {
FreeEntry(e);
}
}
template<Cache::EvictionPolicy policy>
Cache::Handle* CacheShard<policy>::Insert(
RLHandle* handle,
Cache::EvictionCallback* eviction_callback) {
// Set the remaining RLHandle members which were not already allocated during
// Allocate().
handle->eviction_callback = eviction_callback;
// Two refs for the handle: one from CacheShard, one for the returned handle.
handle->refs.store(2, std::memory_order_relaxed);
UpdateMemTracker(handle->charge);
if (PREDICT_TRUE(metrics_)) {
metrics_->cache_usage->IncrementBy(handle->charge);
metrics_->inserts->Increment();
}
RLHandle* to_remove_head = nullptr;
{
std::lock_guard<decltype(mutex_)> l(mutex_);
RL_Append(handle);
RLHandle* old = table_.Insert(handle);
if (old != nullptr) {
RL_Remove(old);
if (Unref(old)) {
old->next = to_remove_head;
to_remove_head = old;
}
}
while (usage_ > capacity_ && rl_.next != &rl_) {
RLHandle* old = rl_.next;
RL_Remove(old);
table_.Remove(old->key(), old->hash);
if (Unref(old)) {
old->next = to_remove_head;
to_remove_head = old;
}
}
}
// we free the entries here outside of mutex for
// performance reasons
while (to_remove_head != nullptr) {
RLHandle* next = to_remove_head->next;
FreeEntry(to_remove_head);
to_remove_head = next;
}
return reinterpret_cast<Cache::Handle*>(handle);
}
template<Cache::EvictionPolicy policy>
void CacheShard<policy>::Erase(const Slice& key, uint32_t hash) {
RLHandle* e;
bool last_reference = false;
{
std::lock_guard<decltype(mutex_)> l(mutex_);
e = table_.Remove(key, hash);
if (e != nullptr) {
RL_Remove(e);
last_reference = Unref(e);
}
}
// mutex not held here
// last_reference will only be true if e != NULL
if (last_reference) {
FreeEntry(e);
}
}
template<Cache::EvictionPolicy policy>
size_t CacheShard<policy>::Invalidate(const Cache::InvalidationControl& ctl) {
size_t invalid_entry_count = 0;
size_t valid_entry_count = 0;
RLHandle* to_remove_head = nullptr;
{
std::lock_guard<decltype(mutex_)> l(mutex_);
// rl_.next is the oldest (a.k.a. least relevant) entry in the recency list.
RLHandle* h = rl_.next;
while (h != nullptr && h != &rl_ &&
ctl.iteration_func(valid_entry_count, invalid_entry_count)) {
if (ctl.validity_func(h->key(), h->value())) {
// Continue iterating over the list.
h = h->next;
++valid_entry_count;
continue;
}
// Copy the handle slated for removal.
RLHandle* h_to_remove = h;
// Prepare for next iteration of the cycle.
h = h->next;
RL_Remove(h_to_remove);
table_.Remove(h_to_remove->key(), h_to_remove->hash);
if (Unref(h_to_remove)) {
h_to_remove->next = to_remove_head;
to_remove_head = h_to_remove;
}
++invalid_entry_count;
}
}
// Once removed from the lookup table and the recency list, the entries
// with no references left must be deallocated because Cache::Release()
// wont be called for them from elsewhere.
while (to_remove_head != nullptr) {
RLHandle* next = to_remove_head->next;
FreeEntry(to_remove_head);
to_remove_head = next;
}
return invalid_entry_count;
}
// Determine the number of bits of the hash that should be used to determine
// the cache shard. This, in turn, determines the number of shards.
int DetermineShardBits() {
int bits = PREDICT_FALSE(FLAGS_cache_force_single_shard) ?
0 : Bits::Log2Ceiling(base::NumCPUs());
VLOG(1) << "Will use " << (1 << bits) << " shards for recency list cache.";
return bits;
}
template<Cache::EvictionPolicy policy>
class ShardedCache : public Cache {
public:
explicit ShardedCache(size_t capacity, const string& id)
: shard_bits_(DetermineShardBits()) {
// A cache is often a singleton, so:
// 1. We reuse its MemTracker if one already exists, and
// 2. It is directly parented to the root MemTracker.
mem_tracker_ = MemTracker::FindOrCreateGlobalTracker(
-1, strings::Substitute("$0-sharded_$1_cache", id, ToString(policy)));
int num_shards = 1 << shard_bits_;
const size_t per_shard = (capacity + (num_shards - 1)) / num_shards;
for (int s = 0; s < num_shards; s++) {
unique_ptr<CacheShard<policy>> shard(
new CacheShard<policy>(mem_tracker_.get()));
shard->SetCapacity(per_shard);
shards_.push_back(shard.release());
}
}
virtual ~ShardedCache() {
STLDeleteElements(&shards_);
}
void SetMetrics(std::unique_ptr<CacheMetrics> metrics,
ExistingMetricsPolicy metrics_policy) override {
std::lock_guard<decltype(metrics_lock_)> l(metrics_lock_);
if (metrics_ && metrics_policy == ExistingMetricsPolicy::kKeep) {
// KUDU-2165: reuse of the Cache singleton across multiple InternalMiniCluster
// servers causes TSAN errors. So, we'll ensure that metrics only get
// attached once, from whichever server starts first. This has the downside
// that, in test builds, we won't get accurate cache metrics, but that's
// probably better than spurious failures.
CHECK(IsGTest()) << "Metrics should only be set once per Cache";
return;
}
metrics_ = std::move(metrics);
for (auto* cache : shards_) {
cache->SetMetrics(metrics_.get());
}
}
UniqueHandle Lookup(const Slice& key, CacheBehavior caching) override {
const uint32_t hash = HashSlice(key);
return UniqueHandle(
shards_[Shard(hash)]->Lookup(key, hash, caching == EXPECT_IN_CACHE),
Cache::HandleDeleter(this));
}
void Erase(const Slice& key) override {
const uint32_t hash = HashSlice(key);
shards_[Shard(hash)]->Erase(key, hash);
}
Slice Value(const UniqueHandle& handle) const override {
return reinterpret_cast<const RLHandle*>(handle.get())->value();
}
UniqueHandle Insert(UniquePendingHandle handle,
Cache::EvictionCallback* eviction_callback) override {
RLHandle* h = reinterpret_cast<RLHandle*>(DCHECK_NOTNULL(handle.release()));
return UniqueHandle(
shards_[Shard(h->hash)]->Insert(h, eviction_callback),
Cache::HandleDeleter(this));
}
UniquePendingHandle Allocate(Slice key, int val_len, int charge) override {
int key_len = key.size();
DCHECK_GE(key_len, 0);
DCHECK_GE(val_len, 0);
int key_len_padded = KUDU_ALIGN_UP(key_len, sizeof(void*));
UniquePendingHandle h(reinterpret_cast<PendingHandle*>(
new uint8_t[sizeof(RLHandle)
+ key_len_padded + val_len // the kv_data VLA data
- 1 // (the VLA has a 1-byte placeholder)
]),
PendingHandleDeleter(this));
RLHandle* handle = reinterpret_cast<RLHandle*>(h.get());
handle->key_length = key_len;
handle->val_length = val_len;
// TODO(KUDU-1091): account for the footprint of structures used by Cache's
// internal housekeeping (RL handles, etc.) in case of
// non-automatic charge.
handle->charge = (charge == kAutomaticCharge) ? kudu_malloc_usable_size(h.get())
: charge;
handle->hash = HashSlice(key);
memcpy(handle->kv_data, key.data(), key_len);
return h;
}
uint8_t* MutableValue(UniquePendingHandle* handle) override {
return reinterpret_cast<RLHandle*>(handle->get())->mutable_val_ptr();
}
size_t Invalidate(const InvalidationControl& ctl) override {
size_t invalidated_count = 0;
for (auto& shard: shards_) {
invalidated_count += shard->Invalidate(ctl);
}
return invalidated_count;
}
protected:
void Release(Handle* handle) override {
RLHandle* h = reinterpret_cast<RLHandle*>(handle);
shards_[Shard(h->hash)]->Release(handle);
}
void Free(PendingHandle* h) override {
uint8_t* data = reinterpret_cast<uint8_t*>(h);
delete [] data;
}
private:
static inline uint32_t HashSlice(const Slice& s) {
return util_hash::CityHash64(
reinterpret_cast<const char *>(s.data()), s.size());
}
uint32_t Shard(uint32_t hash) {
// Widen to uint64 before shifting, or else on a single CPU,
// we would try to shift a uint32_t by 32 bits, which is undefined.
return static_cast<uint64_t>(hash) >> (32 - shard_bits_);
}
shared_ptr<MemTracker> mem_tracker_;
unique_ptr<CacheMetrics> metrics_;
vector<CacheShard<policy>*> shards_;
// Number of bits of hash used to determine the shard.
const int shard_bits_;
// Protects 'metrics_'. Used only when metrics are set, to ensure
// that they are set only once in test environments.
simple_spinlock metrics_lock_;
};
} // end anonymous namespace
template<>
Cache* NewCache<Cache::EvictionPolicy::FIFO,
Cache::MemoryType::DRAM>(size_t capacity, const std::string& id) {
return new ShardedCache<Cache::EvictionPolicy::FIFO>(capacity, id);
}
template<>
Cache* NewCache<Cache::EvictionPolicy::LRU,
Cache::MemoryType::DRAM>(size_t capacity, const std::string& id) {
return new ShardedCache<Cache::EvictionPolicy::LRU>(capacity, id);
}
std::ostream& operator<<(std::ostream& os, Cache::MemoryType mem_type) {
switch (mem_type) {
case Cache::MemoryType::DRAM:
os << "DRAM";
break;
case Cache::MemoryType::NVM:
os << "NVM";
break;
default:
os << "unknown (" << static_cast<int>(mem_type) << ")";
break;
}
return os;
}
} // namespace kudu