blob: bb69ca27b6bbc8fc0c2e2f5b619efed91aca219b [file]
// Some portions copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "util/cache/cache.h"
#include <vector>
#include "common/status.h"
#include "kudu/gutil/bits.h"
#include "kudu/gutil/hash/city.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/sysinfo.h"
#include "kudu/util/alignment.h"
#include "kudu/util/malloc.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/slice.h"
using kudu::Slice;
using std::shared_ptr;
using std::unique_ptr;
using std::vector;
namespace impala {
// Base class for handles for all algorithms. Sharing the base class allows for a shared
// HashTable implementation and a shared sharding implementation. HandleBase is designed
// to allow the entire handle to be stored in a single allocation, even though the key
// and value can be variable-sized. It does this by taking in a pointer to the key/value
// pair in the constructor. For a contiguous layout, the key/value pair would be stored
// immediately following the HandleBase (or subclass) and the caller of the constructor
// would pass in a pointer to this contiguous data.
class HandleBase {
public:
// Create a HandleBase. See Cache::Allocate() for information on the 'key', 'val_len',
// and 'charge' parameters. The remaining parameters are:
// - kv_ptr - points to memory for the variable sized key/value pair
// It must be sized to fit the key and value with appropriate padding. See the
// description of kv_data_ptr_ for the memory layout and sizing.
// - hash - hash of the key
HandleBase(uint8_t* kv_ptr, const Slice& key, int32_t hash, int val_len,
size_t charge) {
DCHECK_GE(charge, 0);
next_handle_ = nullptr;
key_length_ = key.size();
val_length_ = val_len;
hash_ = hash;
charge_ = charge;
kv_data_ptr_ = kv_ptr;
if (kv_data_ptr_ != nullptr && key_length_ > 0) {
memcpy(kv_data_ptr_, key.data(), key_length_);
}
}
~HandleBase() {
DCHECK(next_handle_ == nullptr);
}
Slice key() const {
return Slice(kv_data_ptr_, key_length_);
}
uint32_t hash() const { return hash_; }
uint8_t* mutable_val_ptr() {
int val_offset = KUDU_ALIGN_UP(key_length_, sizeof(void*));
return &kv_data_ptr_[val_offset];
}
const uint8_t* val_ptr() const {
return const_cast<HandleBase*>(this)->mutable_val_ptr();
}
Slice value() const {
return Slice(val_ptr(), val_length_);
}
size_t charge() const { return charge_; }
void set_charge(size_t charge) {
charge_ = charge;
}
private:
friend class HandleTable;
HandleBase* next_handle_;
uint32_t hash_;
uint32_t key_length_;
uint32_t val_length_;
size_t charge_;
// kv_data_ptr_ is a pointer to the beginning of the key/value pair. The key/value
// pair is stored as a byte array with the format:
// [key bytes ...] [padding up to 8-byte boundary] [value bytes ...]
uint8_t* kv_data_ptr_;
};
// We provide our own simple hash table since it removes a whole bunch
// of porting hacks and is also faster than some of the built-in hash
// table implementations in some of the compiler/runtime combinations
// we have tested. E.g., readrandom speeds up by ~5% over the g++
// 4.4.3's builtin hashtable.
class HandleTable {
public:
HandleTable() : length_(0), elems_(0), list_(nullptr) { Resize(); }
~HandleTable() { delete[] list_; }
HandleBase* Lookup(const Slice& key, uint32_t hash) {
return *FindPointer(key, hash);
}
HandleBase* Insert(HandleBase* h) {
HandleBase** ptr = FindPointer(h->key(), h->hash());
HandleBase* old = *ptr;
h->next_handle_ = (old == nullptr ? nullptr : old->next_handle_);
*ptr = h;
if (old == nullptr) {
++elems_;
if (elems_ > length_) {
// Since each cache entry is fairly large, we aim for a small
// average linked list length (<= 1).
Resize();
}
} else {
old->next_handle_ = nullptr;
}
return old;
}
HandleBase* Remove(const Slice& key, uint32_t hash) {
HandleBase** ptr = FindPointer(key, hash);
HandleBase* result = *ptr;
if (result != nullptr) {
*ptr = result->next_handle_;
result->next_handle_ = nullptr;
--elems_;
}
return result;
}
private:
// The table consists of an array of buckets where each bucket is
// a linked list of cache entries that hash into the bucket.
uint32_t length_;
uint32_t elems_;
HandleBase** list_;
// Return a pointer to slot that points to a cache entry that
// matches key/hash. If there is no such cache entry, return a
// pointer to the trailing slot in the corresponding linked list.
HandleBase** FindPointer(const Slice& key, uint32_t hash) {
HandleBase** ptr = &list_[hash & (length_ - 1)];
while (*ptr != nullptr &&
((*ptr)->hash() != hash || key != (*ptr)->key())) {
ptr = &(*ptr)->next_handle_;
}
return ptr;
}
void Resize() {
uint32_t new_length = 16;
while (new_length < elems_ * 1.5) {
new_length *= 2;
}
auto new_list = new HandleBase*[new_length];
memset(new_list, 0, sizeof(new_list[0]) * new_length);
uint32_t count = 0;
for (uint32_t i = 0; i < length_; ++i) {
HandleBase* h = list_[i];
while (h != nullptr) {
HandleBase* next = h->next_handle_;
uint32_t hash = h->hash();
HandleBase** ptr = &new_list[hash & (new_length - 1)];
h->next_handle_ = *ptr;
*ptr = h;
h = next;
++count;
}
}
DCHECK_EQ(elems_, count);
delete[] list_;
list_ = new_list;
length_ = new_length;
}
};
// This defines a single shared interface for the cache algorithms, which allows them to
// share the sharding implementation. There are several differences between this interface
// and the Cache interface:
// 1. The functions use HandleBase arguments rather than the types defined externally
// for the Cache.
// 2. This does not use separate Handle vs PendingHandle types. Allocate and Free just use
// HandleBase like all other functions. At the moment, no implementation uses this
// functionality. This is internal code, so the distinction can be reintroduced later
// if it is useful.
// 3. Since the sharding would already need the hash, it passes the computed hash through
// for all functions where it makes sense.
class CacheShard {
public:
virtual ~CacheShard() {}
// Initialization function. Must be called before any other function.
virtual Status Init() = 0;
virtual HandleBase* Allocate(Slice key, uint32_t hash, int val_len, size_t charge) = 0;
virtual void Free(HandleBase* handle) = 0;
virtual HandleBase* Insert(HandleBase* handle,
Cache::EvictionCallback* eviction_callback) = 0;
virtual HandleBase* Lookup(const Slice& key, uint32_t hash, bool no_updates) = 0;
virtual void UpdateCharge(HandleBase* handle, size_t new_charge) = 0;
virtual void Release(HandleBase* handle) = 0;
virtual void Erase(const Slice& key, uint32_t hash) = 0;
virtual size_t Invalidate(const Cache::InvalidationControl& ctl) = 0;
virtual vector<HandleBase*> Dump() = 0;
};
// Function to build a cache shard using the given eviction algorithm.
// This untemplatized function allows ShardedCache to avoid templating and
// keeps the templates internal to the cache.
CacheShard* NewCacheShard(Cache::EvictionPolicy eviction_policy,
kudu::MemTracker* mem_tracker, size_t capacity);
// Internal templatized function to build a cache shard for the given eviction
// algorithm. The template argument allows the different eviction policy implementations
// to be defined in different files (e.g. LRU in rl-cache.cc and LIRS in lirs-cache.cc).
template <Cache::EvictionPolicy eviction_policy>
CacheShard* NewCacheShardInt(kudu::MemTracker* mem_tracker, size_t capacity);
// Determine the number of bits of the hash that should be used to determine
// the cache shard. This, in turn, determines the number of shards.
int DetermineShardBits();
// Helper function to provide a string representation of an eviction policy.
string ToString(Cache::EvictionPolicy p);
// This is a minimal sharding cache implementation. It passes almost all functions
// through to the underlying CacheShard unless the function can be answered by the
// HandleBase itself. The number of shards is currently a power of 2 so that it can
// do a right shift to get the shard index.
class ShardedCache : public Cache {
public:
explicit ShardedCache(Cache::EvictionPolicy policy, size_t capacity, const string& id)
: shard_bits_(DetermineShardBits()) {
// A cache is often a singleton, so:
// 1. We reuse its MemTracker if one already exists, and
// 2. It is directly parented to the root MemTracker.
// TODO: Change this to an Impala MemTracker, and only track the memory usage for the
// metadata. This currently does not hook up to Impala's MemTracker hierarchy
mem_tracker_ = kudu::MemTracker::FindOrCreateGlobalTracker(
-1, strings::Substitute("$0-sharded_$1_cache", id, ToString(policy)));
int num_shards = 1 << shard_bits_;
const size_t per_shard = (capacity + (num_shards - 1)) / num_shards;
for (int s = 0; s < num_shards; ++s) {
shards_.push_back(NewCacheShard(policy, mem_tracker_.get(), per_shard));
}
if (per_shard < shard_charge_limit_) {
shard_charge_limit_ = static_cast<int>(per_shard);
}
}
virtual ~ShardedCache() {
STLDeleteElements(&shards_);
}
Status Init() override {
for (CacheShard* shard : shards_) {
RETURN_IF_ERROR(shard->Init());
}
return Status::OK();
}
UniqueHandle Lookup(const Slice& key, LookupBehavior behavior) override {
const uint32_t hash = HashSlice(key);
HandleBase* h = shards_[Shard(hash)]->Lookup(key, hash, behavior == NO_UPDATE);
return UniqueHandle(reinterpret_cast<Cache::Handle*>(h), Cache::HandleDeleter(this));
}
size_t Charge(const UniqueHandle& handle) override {
return reinterpret_cast<HandleBase*>(handle.get())->charge();
}
void UpdateCharge(const UniqueHandle& handle, size_t charge) override {
HandleBase* h = reinterpret_cast<HandleBase*>(handle.get());
shards_[Shard(h->hash())]->UpdateCharge(h, charge);
// NOTE: the handle could point to an entry that has been evicted, so there is no
// guarantee that the charge will actually change. That's why we don't assert that
// h->charge() == charge here.
}
void Erase(const Slice& key) override {
const uint32_t hash = HashSlice(key);
shards_[Shard(hash)]->Erase(key, hash);
}
Slice Key(const UniqueHandle& handle) const override {
return reinterpret_cast<HandleBase*>(handle.get())->key();
}
Slice Value(const UniqueHandle& handle) const override {
return reinterpret_cast<HandleBase*>(handle.get())->value();
}
UniqueHandle Insert(UniquePendingHandle handle,
Cache::EvictionCallback* eviction_callback) override {
HandleBase* h_in = reinterpret_cast<HandleBase*>(DCHECK_NOTNULL(handle.release()));
HandleBase* h_out = shards_[Shard(h_in->hash())]->Insert(h_in, eviction_callback);
return UniqueHandle(reinterpret_cast<Cache::Handle*>(h_out),
Cache::HandleDeleter(this));
}
size_t MaxCharge() const override {
return shard_charge_limit_;
}
UniquePendingHandle Allocate(Slice key, int val_len, size_t charge) override {
const uint32_t hash = HashSlice(key);
HandleBase* handle = shards_[Shard(hash)]->Allocate(key, hash, val_len, charge);
UniquePendingHandle h(reinterpret_cast<PendingHandle*>(handle),
PendingHandleDeleter(this));
return h;
}
uint8_t* MutableValue(UniquePendingHandle* handle) const override {
return reinterpret_cast<HandleBase*>(handle->get())->mutable_val_ptr();
}
size_t Invalidate(const InvalidationControl& ctl) override {
size_t invalidated_count = 0;
for (auto& shard: shards_) {
invalidated_count += shard->Invalidate(ctl);
}
return invalidated_count;
}
vector<UniqueHandle> Dump() override {
vector<UniqueHandle> handles;
for (auto& shard : shards_) {
for (HandleBase* handle : shard->Dump()) {
handles.emplace_back(
reinterpret_cast<Cache::Handle*>(handle), Cache::HandleDeleter(this));
}
}
return handles;
}
protected:
void Release(Handle* handle) override {
HandleBase* h = reinterpret_cast<HandleBase*>(handle);
shards_[Shard(h->hash())]->Release(h);
}
void Free(PendingHandle* pending_handle) override {
HandleBase* h = reinterpret_cast<HandleBase*>(pending_handle);
shards_[Shard(h->hash())]->Free(h);
}
private:
shared_ptr<kudu::MemTracker> mem_tracker_;
vector<CacheShard*> shards_;
// Number of bits of hash used to determine the shard.
const int shard_bits_;
// Max charge that can be stored in a shard.
size_t shard_charge_limit_ = numeric_limits<size_t>::max();
static inline uint32_t HashSlice(const Slice& s) {
return util_hash::CityHash64(reinterpret_cast<const char *>(s.data()), s.size());
}
uint32_t Shard(uint32_t hash) {
// Widen to uint64 before shifting, or else on a single CPU,
// we would try to shift a uint32_t by 32 bits, which is undefined.
return static_cast<uint64_t>(hash) >> (32 - shard_bits_);
}
};
}