| // Some portions copyright (c) 2011 The LevelDB Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "util/cache/cache.h" |
| |
| #include <atomic> |
| #include <cstdint> |
| #include <cstring> |
| #include <memory> |
| #include <mutex> |
| #include <ostream> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| |
| #include "common/status.h" |
| #include "gutil/bits.h" |
| #include "gutil/hash/city.h" |
| #include "gutil/macros.h" |
| #include "gutil/mathlimits.h" |
| #include "gutil/port.h" |
| #include "gutil/ref_counted.h" |
| #include "gutil/stl_util.h" |
| #include "gutil/strings/substitute.h" |
| #include "gutil/sysinfo.h" |
| #include "kudu/util/alignment.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/locks.h" |
| #include "kudu/util/malloc.h" |
| #include "kudu/util/mem_tracker.h" |
| #include "kudu/util/slice.h" |
| #include "util/cache/cache-internal.h" |
| |
| DECLARE_double(cache_memtracker_approximation_ratio); |
| |
| using std::atomic; |
| using std::shared_ptr; |
| using std::string; |
| using std::unique_ptr; |
| using std::vector; |
| |
| using strings::Substitute; |
| |
| namespace impala { |
| |
| namespace { |
| |
| // Recency list cache implementations (FIFO, LRU, etc.) |
| |
| // Recency list handle. An entry is a variable length heap-allocated structure. |
| // Entries are kept in a circular doubly linked list ordered by some recency |
| // criterion (e.g., access time for LRU policy, insertion time for FIFO policy). |
| class RLHandle : public HandleBase { |
| public: |
| RLHandle(uint8_t* kv_ptr, const Slice& key, int32_t hash, int val_len, int charge) |
| : HandleBase(kv_ptr, key, hash, val_len, charge) { |
| refs.store(0); |
| next = nullptr; |
| prev = nullptr; |
| eviction_callback = nullptr; |
| } |
| |
| RLHandle() |
| : HandleBase(nullptr, Slice(), 0, 0, 0) {} |
| |
| Cache::EvictionCallback* eviction_callback; |
| RLHandle* next; |
| RLHandle* prev; |
| std::atomic<int32_t> refs; |
| }; |
| |
| // A single shard of a cache that uses a recency list based eviction policy |
| template<Cache::EvictionPolicy policy> |
| class RLCacheShard : public CacheShard { |
| static_assert( |
| policy == Cache::EvictionPolicy::LRU || policy == Cache::EvictionPolicy::FIFO, |
| "RLCacheShard only supports LRU or FIFO"); |
| |
| public: |
| explicit RLCacheShard(kudu::MemTracker* tracker, size_t capacity); |
| ~RLCacheShard(); |
| |
| Status Init() override; |
| HandleBase* Allocate(Slice key, uint32_t hash, int val_len, int charge) override; |
| void Free(HandleBase* handle) override; |
| HandleBase* Insert(HandleBase* handle, |
| Cache::EvictionCallback* eviction_callback) override; |
| HandleBase* Lookup(const Slice& key, uint32_t hash, bool caching) override; |
| void Release(HandleBase* handle) override; |
| void Erase(const Slice& key, uint32_t hash) override; |
| size_t Invalidate(const Cache::InvalidationControl& ctl) override; |
| |
| private: |
| void RL_Remove(RLHandle* e); |
| void RL_Append(RLHandle* e); |
| // Update the recency list after a lookup operation. |
| void RL_UpdateAfterLookup(RLHandle* e); |
| // Just reduce the reference count by 1. |
| // Return true if last reference |
| bool Unref(RLHandle* e); |
| // Call the user's eviction callback, if it exists, and free the entry. |
| void FreeEntry(RLHandle* e); |
| |
| |
| // Update the memtracker's consumption by the given amount. |
| // |
| // This "buffers" the updates locally in 'deferred_consumption_' until the amount |
| // of accumulated delta is more than ~1% of the cache capacity. This improves |
| // performance under workloads with high eviction rates for a few reasons: |
| // |
| // 1) once the cache reaches its full capacity, we expect it to remain there |
| // in steady state. Each insertion is usually matched by an eviction, and unless |
| // the total size of the evicted item(s) is much different than the size of the |
| // inserted item, each eviction event is unlikely to change the total cache usage |
| // much. So, we expect that the accumulated error will mostly remain around 0 |
| // and we can avoid propagating changes to the MemTracker at all. |
| // |
| // 2) because the cache implementation is sharded, we do this tracking in a bunch |
| // of different locations, avoiding bouncing cache-lines between cores. By contrast |
| // the MemTracker is a simple integer, so it doesn't scale as well under concurrency. |
| // |
| // Positive delta indicates an increased memory consumption. |
| void UpdateMemTracker(int64_t delta); |
| |
| bool initialized_ = false; |
| |
| size_t capacity_; |
| |
| // mutex_ protects the following state. |
| kudu::simple_spinlock mutex_; |
| size_t usage_; |
| |
| // Dummy head of recency list. |
| // rl.prev is newest entry, rl.next is oldest entry. |
| RLHandle rl_; |
| |
| HandleTable table_; |
| |
| kudu::MemTracker* mem_tracker_; |
| atomic<int64_t> deferred_consumption_ { 0 }; |
| |
| // Initialized based on capacity_ to ensure an upper bound on the error on the |
| // MemTracker consumption. |
| int64_t max_deferred_consumption_; |
| }; |
| |
| template<Cache::EvictionPolicy policy> |
| RLCacheShard<policy>::RLCacheShard(kudu::MemTracker* tracker, size_t capacity) |
| : capacity_(capacity), |
| usage_(0), |
| mem_tracker_(tracker) { |
| // Make empty circular linked list. |
| rl_.next = &rl_; |
| rl_.prev = &rl_; |
| } |
| |
| template<Cache::EvictionPolicy policy> |
| RLCacheShard<policy>::~RLCacheShard() { |
| for (RLHandle* e = rl_.next; e != &rl_; ) { |
| RLHandle* next = e->next; |
| DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 1) |
| << "caller has an unreleased handle"; |
| table_.Remove(e->key(), e->hash()); |
| if (Unref(e)) { |
| FreeEntry(e); |
| } |
| e = next; |
| } |
| mem_tracker_->Consume(deferred_consumption_); |
| } |
| |
| template<Cache::EvictionPolicy policy> |
| Status RLCacheShard<policy>::Init() { |
| if (!MathLimits<double>::IsFinite(FLAGS_cache_memtracker_approximation_ratio) || |
| FLAGS_cache_memtracker_approximation_ratio < 0.0 || |
| FLAGS_cache_memtracker_approximation_ratio > 1.0) { |
| return Status(Substitute("Misconfigured --cache_memtracker_approximation_ratio: $0. " |
| "Must be between 0 and 1.", FLAGS_cache_memtracker_approximation_ratio)); |
| } |
| max_deferred_consumption_ = capacity_ * FLAGS_cache_memtracker_approximation_ratio; |
| initialized_ = true; |
| return Status::OK(); |
| } |
| |
| template<Cache::EvictionPolicy policy> |
| bool RLCacheShard<policy>::Unref(RLHandle* e) { |
| DCHECK_GT(e->refs.load(std::memory_order_relaxed), 0); |
| return e->refs.fetch_sub(1) == 1; |
| } |
| |
| template<Cache::EvictionPolicy policy> |
| void RLCacheShard<policy>::FreeEntry(RLHandle* e) { |
| DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 0); |
| if (e->eviction_callback) { |
| e->eviction_callback->EvictedEntry(e->key(), e->value()); |
| } |
| UpdateMemTracker(-static_cast<int64_t>(e->charge())); |
| Free(e); |
| } |
| |
| template<Cache::EvictionPolicy policy> |
| void RLCacheShard<policy>::UpdateMemTracker(int64_t delta) { |
| int64_t old_deferred = deferred_consumption_.fetch_add(delta); |
| int64_t new_deferred = old_deferred + delta; |
| |
| if (new_deferred > max_deferred_consumption_ || |
| new_deferred < -max_deferred_consumption_) { |
| int64_t to_propagate = deferred_consumption_.exchange(0, std::memory_order_relaxed); |
| mem_tracker_->Consume(to_propagate); |
| } |
| } |
| |
| template<Cache::EvictionPolicy policy> |
| void RLCacheShard<policy>::RL_Remove(RLHandle* e) { |
| e->next->prev = e->prev; |
| e->prev->next = e->next; |
| DCHECK_GE(usage_, e->charge()); |
| usage_ -= e->charge(); |
| } |
| |
| template<Cache::EvictionPolicy policy> |
| void RLCacheShard<policy>::RL_Append(RLHandle* e) { |
| // Make "e" newest entry by inserting just before rl_. |
| e->next = &rl_; |
| e->prev = rl_.prev; |
| e->prev->next = e; |
| e->next->prev = e; |
| usage_ += e->charge(); |
| } |
| |
| template<> |
| void RLCacheShard<Cache::EvictionPolicy::FIFO>::RL_UpdateAfterLookup(RLHandle* /* e */) { |
| } |
| |
| template<> |
| void RLCacheShard<Cache::EvictionPolicy::LRU>::RL_UpdateAfterLookup(RLHandle* e) { |
| RL_Remove(e); |
| RL_Append(e); |
| } |
| |
| template<Cache::EvictionPolicy policy> |
| HandleBase* RLCacheShard<policy>::Allocate(Slice key, uint32_t hash, int val_len, |
| int charge) { |
| DCHECK(initialized_); |
| int key_len = key.size(); |
| DCHECK_GE(key_len, 0); |
| DCHECK_GE(val_len, 0); |
| int key_len_padded = KUDU_ALIGN_UP(key_len, sizeof(void*)); |
| uint8_t* buf = new uint8_t[sizeof(RLHandle) |
| + key_len_padded + val_len]; // the kv_data VLA data |
| // TODO(KUDU-1091): account for the footprint of structures used by Cache's |
| // internal housekeeping (RL handles, etc.) in case of |
| // non-automatic charge. |
| int calc_charge = |
| (charge == Cache::kAutomaticCharge) ? kudu::kudu_malloc_usable_size(buf) : charge; |
| uint8_t* kv_ptr = buf + sizeof(RLHandle); |
| RLHandle* handle = new (buf) RLHandle(kv_ptr, key, hash, val_len, |
| calc_charge); |
| return handle; |
| } |
| |
| template<Cache::EvictionPolicy policy> |
| void RLCacheShard<policy>::Free(HandleBase* handle) { |
| DCHECK(initialized_); |
| // We allocate the handle as a uint8_t array, then we call a placement new, |
| // which calls the constructor. For symmetry, we call the destructor and then |
| // delete on the uint8_t array. |
| RLHandle* h = static_cast<RLHandle*>(handle); |
| h->~RLHandle(); |
| uint8_t* data = reinterpret_cast<uint8_t*>(handle); |
| delete [] data; |
| } |
| |
| template<Cache::EvictionPolicy policy> |
| HandleBase* RLCacheShard<policy>::Lookup(const Slice& key, |
| uint32_t hash, |
| bool no_updates) { |
| DCHECK(initialized_); |
| RLHandle* e; |
| { |
| std::lock_guard<decltype(mutex_)> l(mutex_); |
| e = static_cast<RLHandle*>(table_.Lookup(key, hash)); |
| if (e != nullptr) { |
| e->refs.fetch_add(1, std::memory_order_relaxed); |
| // If this is a no update lookup, skip the modifications. |
| if (!no_updates) RL_UpdateAfterLookup(e); |
| } |
| } |
| |
| return e; |
| } |
| |
| template<Cache::EvictionPolicy policy> |
| void RLCacheShard<policy>::Release(HandleBase* handle) { |
| DCHECK(initialized_); |
| RLHandle* e = static_cast<RLHandle*>(handle); |
| bool last_reference = Unref(e); |
| if (last_reference) { |
| FreeEntry(e); |
| } |
| } |
| |
| template<Cache::EvictionPolicy policy> |
| HandleBase* RLCacheShard<policy>::Insert( |
| HandleBase* handle_in, |
| Cache::EvictionCallback* eviction_callback) { |
| DCHECK(initialized_); |
| RLHandle* handle = static_cast<RLHandle*>(handle_in); |
| // Set the remaining RLHandle members which were not already allocated during |
| // Allocate(). |
| handle->eviction_callback = eviction_callback; |
| // Two refs for the handle: one from RLCacheShard, one for the returned handle. |
| handle->refs.store(2, std::memory_order_relaxed); |
| UpdateMemTracker(handle->charge()); |
| |
| RLHandle* to_remove_head = nullptr; |
| { |
| std::lock_guard<decltype(mutex_)> l(mutex_); |
| |
| RL_Append(handle); |
| |
| RLHandle* old = static_cast<RLHandle*>(table_.Insert(handle)); |
| if (old != nullptr) { |
| RL_Remove(old); |
| if (Unref(old)) { |
| old->next = to_remove_head; |
| to_remove_head = old; |
| } |
| } |
| |
| while (usage_ > capacity_ && rl_.next != &rl_) { |
| RLHandle* old = rl_.next; |
| RL_Remove(old); |
| table_.Remove(old->key(), old->hash()); |
| if (Unref(old)) { |
| old->next = to_remove_head; |
| to_remove_head = old; |
| } |
| } |
| } |
| |
| // we free the entries here outside of mutex for |
| // performance reasons |
| while (to_remove_head != nullptr) { |
| RLHandle* next = to_remove_head->next; |
| FreeEntry(to_remove_head); |
| to_remove_head = next; |
| } |
| |
| return handle; |
| } |
| |
| template<Cache::EvictionPolicy policy> |
| void RLCacheShard<policy>::Erase(const Slice& key, uint32_t hash) { |
| DCHECK(initialized_); |
| RLHandle* e; |
| bool last_reference = false; |
| { |
| std::lock_guard<decltype(mutex_)> l(mutex_); |
| e = static_cast<RLHandle*>(table_.Remove(key, hash)); |
| if (e != nullptr) { |
| RL_Remove(e); |
| last_reference = Unref(e); |
| } |
| } |
| // mutex not held here |
| // last_reference will only be true if e != NULL |
| if (last_reference) { |
| FreeEntry(e); |
| } |
| } |
| |
| template<Cache::EvictionPolicy policy> |
| size_t RLCacheShard<policy>::Invalidate(const Cache::InvalidationControl& ctl) { |
| DCHECK(initialized_); |
| size_t invalid_entry_count = 0; |
| size_t valid_entry_count = 0; |
| RLHandle* to_remove_head = nullptr; |
| |
| { |
| std::lock_guard<decltype(mutex_)> l(mutex_); |
| |
| // rl_.next is the oldest (a.k.a. least relevant) entry in the recency list. |
| RLHandle* h = rl_.next; |
| while (h != nullptr && h != &rl_ && |
| ctl.iteration_func(valid_entry_count, invalid_entry_count)) { |
| if (ctl.validity_func(h->key(), h->value())) { |
| // Continue iterating over the list. |
| h = h->next; |
| ++valid_entry_count; |
| continue; |
| } |
| // Copy the handle slated for removal. |
| RLHandle* h_to_remove = h; |
| // Prepare for next iteration of the cycle. |
| h = h->next; |
| |
| RL_Remove(h_to_remove); |
| table_.Remove(h_to_remove->key(), h_to_remove->hash()); |
| if (Unref(h_to_remove)) { |
| h_to_remove->next = to_remove_head; |
| to_remove_head = h_to_remove; |
| } |
| ++invalid_entry_count; |
| } |
| } |
| // Once removed from the lookup table and the recency list, the entries |
| // with no references left must be deallocated because Cache::Release() |
| // wont be called for them from elsewhere. |
| while (to_remove_head != nullptr) { |
| RLHandle* next = to_remove_head->next; |
| FreeEntry(to_remove_head); |
| to_remove_head = next; |
| } |
| return invalid_entry_count; |
| } |
| |
| } // end anonymous namespace |
| |
| template<> |
| CacheShard* NewCacheShardInt<Cache::EvictionPolicy::FIFO>(kudu::MemTracker* mem_tracker, |
| size_t capacity) { |
| return new RLCacheShard<Cache::EvictionPolicy::FIFO>(mem_tracker, capacity); |
| } |
| |
| template<> |
| CacheShard* NewCacheShardInt<Cache::EvictionPolicy::LRU>(kudu::MemTracker* mem_tracker, |
| size_t capacity) { |
| return new RLCacheShard<Cache::EvictionPolicy::LRU>(mem_tracker, capacity); |
| } |
| |
| } // namespace impala |