blob: 00f2e52cd6757d1734b06309e6210cdd437c4c1d [file] [log] [blame]
// Some portions copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "kudu/util/cache.h"
#include <atomic>
#include <cstdint>
#include <cstring>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/gutil/bits.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/hash/city.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/sysinfo.h"
#include "kudu/util/alignment.h"
#include "kudu/util/cache_metrics.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/locks.h"
#include "kudu/util/malloc.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/metrics.h"
#include "kudu/util/slice.h"
#include "kudu/util/test_util_prod.h"
#if !defined(__APPLE__)
#include "kudu/util/nvm_cache.h"
#endif
// Useful in tests that require accurate cache capacity accounting.
DEFINE_bool(cache_force_single_shard, false,
"Override all cache implementations to use just one shard");
TAG_FLAG(cache_force_single_shard, hidden);
DEFINE_double(cache_memtracker_approximation_ratio, 0.01,
"The MemTracker associated with a cache can accumulate error up to "
"this ratio to improve performance. For tests.");
TAG_FLAG(cache_memtracker_approximation_ratio, hidden);
using std::atomic;
using std::shared_ptr;
using std::string;
using std::vector;
namespace kudu {
Cache::~Cache() {
}
namespace {
typedef simple_spinlock MutexType;
// LRU cache implementation
// An entry is a variable length heap-allocated structure. Entries
// are kept in a circular doubly linked list ordered by access time.
struct LRUHandle {
Cache::EvictionCallback* eviction_callback;
LRUHandle* next_hash;
LRUHandle* next;
LRUHandle* prev;
size_t charge; // TODO(opt): Only allow uint32_t?
uint32_t key_length;
uint32_t val_length;
std::atomic<int32_t> refs;
uint32_t hash; // Hash of key(); used for fast sharding and comparisons
// The storage for the key/value pair itself. The data is stored as:
// [key bytes ...] [padding up to 8-byte boundary] [value bytes ...]
uint8_t kv_data[1]; // Beginning of key/value pair
Slice key() const {
return Slice(kv_data, key_length);
}
uint8_t* mutable_val_ptr() {
int val_offset = KUDU_ALIGN_UP(key_length, sizeof(void*));
return &kv_data[val_offset];
}
const uint8_t* val_ptr() const {
return const_cast<LRUHandle*>(this)->mutable_val_ptr();
}
Slice value() const {
return Slice(val_ptr(), val_length);
}
};
// We provide our own simple hash table since it removes a whole bunch
// of porting hacks and is also faster than some of the built-in hash
// table implementations in some of the compiler/runtime combinations
// we have tested. E.g., readrandom speeds up by ~5% over the g++
// 4.4.3's builtin hashtable.
class HandleTable {
public:
HandleTable() : length_(0), elems_(0), list_(nullptr) { Resize(); }
~HandleTable() { delete[] list_; }
LRUHandle* Lookup(const Slice& key, uint32_t hash) {
return *FindPointer(key, hash);
}
LRUHandle* Insert(LRUHandle* h) {
LRUHandle** ptr = FindPointer(h->key(), h->hash);
LRUHandle* old = *ptr;
h->next_hash = (old == nullptr ? nullptr : old->next_hash);
*ptr = h;
if (old == nullptr) {
++elems_;
if (elems_ > length_) {
// Since each cache entry is fairly large, we aim for a small
// average linked list length (<= 1).
Resize();
}
}
return old;
}
LRUHandle* Remove(const Slice& key, uint32_t hash) {
LRUHandle** ptr = FindPointer(key, hash);
LRUHandle* result = *ptr;
if (result != nullptr) {
*ptr = result->next_hash;
--elems_;
}
return result;
}
private:
// The table consists of an array of buckets where each bucket is
// a linked list of cache entries that hash into the bucket.
uint32_t length_;
uint32_t elems_;
LRUHandle** list_;
// Return a pointer to slot that points to a cache entry that
// matches key/hash. If there is no such cache entry, return a
// pointer to the trailing slot in the corresponding linked list.
LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
LRUHandle** ptr = &list_[hash & (length_ - 1)];
while (*ptr != nullptr &&
((*ptr)->hash != hash || key != (*ptr)->key())) {
ptr = &(*ptr)->next_hash;
}
return ptr;
}
void Resize() {
uint32_t new_length = 16;
while (new_length < elems_ * 1.5) {
new_length *= 2;
}
auto new_list = new LRUHandle*[new_length];
memset(new_list, 0, sizeof(new_list[0]) * new_length);
uint32_t count = 0;
for (uint32_t i = 0; i < length_; i++) {
LRUHandle* h = list_[i];
while (h != nullptr) {
LRUHandle* next = h->next_hash;
uint32_t hash = h->hash;
LRUHandle** ptr = &new_list[hash & (new_length - 1)];
h->next_hash = *ptr;
*ptr = h;
h = next;
count++;
}
}
DCHECK_EQ(elems_, count);
delete[] list_;
list_ = new_list;
length_ = new_length;
}
};
// A single shard of sharded cache.
class LRUCache {
public:
explicit LRUCache(MemTracker* tracker);
~LRUCache();
// Separate from constructor so caller can easily make an array of LRUCache
void SetCapacity(size_t capacity) {
capacity_ = capacity;
max_deferred_consumption_ = capacity * FLAGS_cache_memtracker_approximation_ratio;
}
void SetMetrics(CacheMetrics* metrics) { metrics_ = metrics; }
Cache::Handle* Insert(LRUHandle* handle, Cache::EvictionCallback* eviction_callback);
// Like Cache::Lookup, but with an extra "hash" parameter.
Cache::Handle* Lookup(const Slice& key, uint32_t hash, bool caching);
void Release(Cache::Handle* handle);
void Erase(const Slice& key, uint32_t hash);
private:
void LRU_Remove(LRUHandle* e);
void LRU_Append(LRUHandle* e);
// Just reduce the reference count by 1.
// Return true if last reference
bool Unref(LRUHandle* e);
// Call the user's eviction callback, if it exists, and free the entry.
void FreeEntry(LRUHandle* e);
// Update the memtracker's consumption by the given amount.
//
// This "buffers" the updates locally in 'deferred_consumption_' until the amount
// of accumulated delta is more than ~1% of the cache capacity. This improves
// performance under workloads with high eviction rates for a few reasons:
//
// 1) once the cache reaches its full capacity, we expect it to remain there
// in steady state. Each insertion is usually matched by an eviction, and unless
// the total size of the evicted item(s) is much different than the size of the
// inserted item, each eviction event is unlikely to change the total cache usage
// much. So, we expect that the accumulated error will mostly remain around 0
// and we can avoid propagating changes to the MemTracker at all.
//
// 2) because the cache implementation is sharded, we do this tracking in a bunch
// of different locations, avoiding bouncing cache-lines between cores. By contrast
// the MemTracker is a simple integer, so it doesn't scale as well under concurrency.
//
// Positive delta indicates an increased memory consumption.
void UpdateMemTracker(int64_t delta);
// Initialized before use.
size_t capacity_;
// mutex_ protects the following state.
MutexType mutex_;
size_t usage_;
// Dummy head of LRU list.
// lru.prev is newest entry, lru.next is oldest entry.
LRUHandle lru_;
HandleTable table_;
MemTracker* mem_tracker_;
atomic<int64_t> deferred_consumption_ { 0 };
// Initialized based on capacity_ to ensure an upper bound on the error on the
// MemTracker consumption.
int64_t max_deferred_consumption_;
CacheMetrics* metrics_;
};
LRUCache::LRUCache(MemTracker* tracker)
: usage_(0),
mem_tracker_(tracker),
metrics_(nullptr) {
// Make empty circular linked list
lru_.next = &lru_;
lru_.prev = &lru_;
}
LRUCache::~LRUCache() {
for (LRUHandle* e = lru_.next; e != &lru_; ) {
LRUHandle* next = e->next;
DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 1)
<< "caller has an unreleased handle";
if (Unref(e)) {
FreeEntry(e);
}
e = next;
}
mem_tracker_->Consume(deferred_consumption_);
}
bool LRUCache::Unref(LRUHandle* e) {
DCHECK_GT(e->refs.load(std::memory_order_relaxed), 0);
return e->refs.fetch_sub(1) == 1;
}
void LRUCache::FreeEntry(LRUHandle* e) {
DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 0);
if (e->eviction_callback) {
e->eviction_callback->EvictedEntry(e->key(), e->value());
}
UpdateMemTracker(-static_cast<int64_t>(e->charge));
if (PREDICT_TRUE(metrics_)) {
metrics_->cache_usage->DecrementBy(e->charge);
metrics_->evictions->Increment();
}
delete [] e;
}
void LRUCache::UpdateMemTracker(int64_t delta) {
int64_t old_deferred = deferred_consumption_.fetch_add(delta);
int64_t new_deferred = old_deferred + delta;
if (new_deferred > max_deferred_consumption_ ||
new_deferred < -max_deferred_consumption_) {
int64_t to_propagate = deferred_consumption_.exchange(0, std::memory_order_relaxed);
mem_tracker_->Consume(to_propagate);
}
}
void LRUCache::LRU_Remove(LRUHandle* e) {
e->next->prev = e->prev;
e->prev->next = e->next;
usage_ -= e->charge;
}
void LRUCache::LRU_Append(LRUHandle* e) {
// Make "e" newest entry by inserting just before lru_
e->next = &lru_;
e->prev = lru_.prev;
e->prev->next = e;
e->next->prev = e;
usage_ += e->charge;
}
Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash, bool caching) {
LRUHandle* e;
{
std::lock_guard<MutexType> l(mutex_);
e = table_.Lookup(key, hash);
if (e != nullptr) {
e->refs.fetch_add(1, std::memory_order_relaxed);
LRU_Remove(e);
LRU_Append(e);
}
}
// Do the metrics outside of the lock.
if (metrics_) {
metrics_->lookups->Increment();
bool was_hit = (e != nullptr);
if (was_hit) {
if (caching) {
metrics_->cache_hits_caching->Increment();
} else {
metrics_->cache_hits->Increment();
}
} else {
if (caching) {
metrics_->cache_misses_caching->Increment();
} else {
metrics_->cache_misses->Increment();
}
}
}
return reinterpret_cast<Cache::Handle*>(e);
}
void LRUCache::Release(Cache::Handle* handle) {
LRUHandle* e = reinterpret_cast<LRUHandle*>(handle);
bool last_reference = Unref(e);
if (last_reference) {
FreeEntry(e);
}
}
Cache::Handle* LRUCache::Insert(LRUHandle* e, Cache::EvictionCallback *eviction_callback) {
// Set the remaining LRUHandle members which were not already allocated during
// Allocate().
e->eviction_callback = eviction_callback;
e->refs.store(2, std::memory_order_relaxed); // One from LRUCache, one for the returned handle
UpdateMemTracker(e->charge);
if (PREDICT_TRUE(metrics_)) {
metrics_->cache_usage->IncrementBy(e->charge);
metrics_->inserts->Increment();
}
LRUHandle* to_remove_head = nullptr;
{
std::lock_guard<MutexType> l(mutex_);
LRU_Append(e);
LRUHandle* old = table_.Insert(e);
if (old != nullptr) {
LRU_Remove(old);
if (Unref(old)) {
old->next = to_remove_head;
to_remove_head = old;
}
}
while (usage_ > capacity_ && lru_.next != &lru_) {
LRUHandle* old = lru_.next;
LRU_Remove(old);
table_.Remove(old->key(), old->hash);
if (Unref(old)) {
old->next = to_remove_head;
to_remove_head = old;
}
}
}
// we free the entries here outside of mutex for
// performance reasons
while (to_remove_head != nullptr) {
LRUHandle* next = to_remove_head->next;
FreeEntry(to_remove_head);
to_remove_head = next;
}
return reinterpret_cast<Cache::Handle*>(e);
}
void LRUCache::Erase(const Slice& key, uint32_t hash) {
LRUHandle* e;
bool last_reference = false;
{
std::lock_guard<MutexType> l(mutex_);
e = table_.Remove(key, hash);
if (e != nullptr) {
LRU_Remove(e);
last_reference = Unref(e);
}
}
// mutex not held here
// last_reference will only be true if e != NULL
if (last_reference) {
FreeEntry(e);
}
}
// Determine the number of bits of the hash that should be used to determine
// the cache shard. This, in turn, determines the number of shards.
int DetermineShardBits() {
int bits = PREDICT_FALSE(FLAGS_cache_force_single_shard) ?
0 : Bits::Log2Ceiling(base::NumCPUs());
VLOG(1) << "Will use " << (1 << bits) << " shards for LRU cache.";
return bits;
}
class ShardedLRUCache : public Cache {
private:
shared_ptr<MemTracker> mem_tracker_;
gscoped_ptr<CacheMetrics> metrics_;
vector<LRUCache*> shards_;
// Number of bits of hash used to determine the shard.
const int shard_bits_;
// Protects 'metrics_'. Used only when metrics are set, to ensure
// that they are set only once in test environments.
MutexType metrics_lock_;
static inline uint32_t HashSlice(const Slice& s) {
return util_hash::CityHash64(
reinterpret_cast<const char *>(s.data()), s.size());
}
uint32_t Shard(uint32_t hash) {
// Widen to uint64 before shifting, or else on a single CPU,
// we would try to shift a uint32_t by 32 bits, which is undefined.
return static_cast<uint64_t>(hash) >> (32 - shard_bits_);
}
public:
explicit ShardedLRUCache(size_t capacity, const string& id)
: shard_bits_(DetermineShardBits()) {
// A cache is often a singleton, so:
// 1. We reuse its MemTracker if one already exists, and
// 2. It is directly parented to the root MemTracker.
mem_tracker_ = MemTracker::FindOrCreateGlobalTracker(
-1, strings::Substitute("$0-sharded_lru_cache", id));
int num_shards = 1 << shard_bits_;
const size_t per_shard = (capacity + (num_shards - 1)) / num_shards;
for (int s = 0; s < num_shards; s++) {
gscoped_ptr<LRUCache> shard(new LRUCache(mem_tracker_.get()));
shard->SetCapacity(per_shard);
shards_.push_back(shard.release());
}
}
virtual ~ShardedLRUCache() {
STLDeleteElements(&shards_);
}
virtual Handle* Insert(PendingHandle* handle,
Cache::EvictionCallback* eviction_callback) OVERRIDE {
LRUHandle* h = reinterpret_cast<LRUHandle*>(DCHECK_NOTNULL(handle));
return shards_[Shard(h->hash)]->Insert(h, eviction_callback);
}
virtual Handle* Lookup(const Slice& key, CacheBehavior caching) OVERRIDE {
const uint32_t hash = HashSlice(key);
return shards_[Shard(hash)]->Lookup(key, hash, caching == EXPECT_IN_CACHE);
}
virtual void Release(Handle* handle) OVERRIDE {
LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
shards_[Shard(h->hash)]->Release(handle);
}
virtual void Erase(const Slice& key) OVERRIDE {
const uint32_t hash = HashSlice(key);
shards_[Shard(hash)]->Erase(key, hash);
}
virtual Slice Value(Handle* handle) OVERRIDE {
return reinterpret_cast<LRUHandle*>(handle)->value();
}
virtual void SetMetrics(const scoped_refptr<MetricEntity>& entity) OVERRIDE {
// TODO(KUDU-2165): reuse of the Cache singleton across multiple MiniCluster servers
// causes TSAN errors. So, we'll ensure that metrics only get attached once, from
// whichever server starts first. This has the downside that, in test builds, we won't
// get accurate cache metrics, but that's probably better than spurious failures.
std::lock_guard<simple_spinlock> l(metrics_lock_);
if (metrics_) {
CHECK(IsGTest()) << "Metrics should only be set once per Cache singleton";
return;
}
metrics_.reset(new CacheMetrics(entity));
for (LRUCache* cache : shards_) {
cache->SetMetrics(metrics_.get());
}
}
virtual PendingHandle* Allocate(Slice key, int val_len, int charge) OVERRIDE {
int key_len = key.size();
DCHECK_GE(key_len, 0);
DCHECK_GE(val_len, 0);
int key_len_padded = KUDU_ALIGN_UP(key_len, sizeof(void*));
uint8_t* buf = new uint8_t[sizeof(LRUHandle)
+ key_len_padded + val_len // the kv_data VLA data
- 1 // (the VLA has a 1-byte placeholder)
];
LRUHandle* handle = reinterpret_cast<LRUHandle*>(buf);
handle->key_length = key_len;
handle->val_length = val_len;
handle->charge = (charge == kAutomaticCharge) ? kudu_malloc_usable_size(buf) : charge;
handle->hash = HashSlice(key);
memcpy(handle->kv_data, key.data(), key_len);
return reinterpret_cast<PendingHandle*>(handle);
}
virtual void Free(PendingHandle* h) OVERRIDE {
uint8_t* data = reinterpret_cast<uint8_t*>(h);
delete [] data;
}
virtual uint8_t* MutableValue(PendingHandle* h) OVERRIDE {
return reinterpret_cast<LRUHandle*>(h)->mutable_val_ptr();
}
};
} // end anonymous namespace
Cache* NewLRUCache(CacheType type, size_t capacity, const string& id) {
switch (type) {
case DRAM_CACHE:
return new ShardedLRUCache(capacity, id);
#if defined(HAVE_LIB_VMEM)
case NVM_CACHE:
return NewLRUNvmCache(capacity, id);
#endif
default:
LOG(FATAL) << "Unsupported LRU cache type: " << type;
}
}
} // namespace kudu