// Some portions copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "kudu/util/cache.h"

#include <atomic>
#include <cstdint>
#include <cstring>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <utility>
#include <vector>

#include <gflags/gflags.h>
#include <glog/logging.h>

#include "kudu/gutil/bits.h"
#include "kudu/gutil/hash/city.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/sysinfo.h"
#include "kudu/util/alignment.h"
#include "kudu/util/cache_metrics.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/locks.h"
#include "kudu/util/malloc.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/metrics.h"
#include "kudu/util/slice.h"
#include "kudu/util/test_util_prod.h"

// Useful in tests that require accurate cache capacity accounting.
DEFINE_bool(cache_force_single_shard, false,
            "Override all cache implementations to use just one shard");
TAG_FLAG(cache_force_single_shard, hidden);

DEFINE_double(cache_memtracker_approximation_ratio, 0.01,
              "The MemTracker associated with a cache can accumulate error up to "
              "this ratio to improve performance. For tests.");
TAG_FLAG(cache_memtracker_approximation_ratio, hidden);

using std::atomic;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;

namespace kudu {

Cache::~Cache() {
}

const Cache::ValidityFunc Cache::kInvalidateAllEntriesFunc = [](
    Slice /* key */, Slice /* value */) {
  return false;
};

const Cache::IterationFunc Cache::kIterateOverAllEntriesFunc = [](
    size_t /* valid_entries_num */, size_t /* invalid_entries_num */) {
  return true;
};

namespace {

// Recency list cache implementations (FIFO, LRU, etc.)

// Recency list handle. An entry is a variable length heap-allocated structure.
// Entries are kept in a circular doubly linked list ordered by some recency
// criterion (e.g., access time for LRU policy, insertion time for FIFO policy).
struct RLHandle {
  Cache::EvictionCallback* eviction_callback;
  RLHandle* next_hash;
  RLHandle* next;
  RLHandle* prev;
  size_t charge;      // TODO(opt): Only allow uint32_t?
  uint32_t key_length;
  uint32_t val_length;
  std::atomic<int32_t> refs;
  uint32_t hash;      // Hash of key(); used for fast sharding and comparisons

  // The storage for the key/value pair itself. The data is stored as:
  //   [key bytes ...] [padding up to 8-byte boundary] [value bytes ...]
  uint8_t kv_data[1];   // Beginning of key/value pair

  Slice key() const {
    return Slice(kv_data, key_length);
  }

  uint8_t* mutable_val_ptr() {
    int val_offset = KUDU_ALIGN_UP(key_length, sizeof(void*));
    return &kv_data[val_offset];
  }

  const uint8_t* val_ptr() const {
    return const_cast<RLHandle*>(this)->mutable_val_ptr();
  }

  Slice value() const {
    return Slice(val_ptr(), val_length);
  }
};

// We provide our own simple hash table since it removes a whole bunch
// of porting hacks and is also faster than some of the built-in hash
// table implementations in some of the compiler/runtime combinations
// we have tested.  E.g., readrandom speeds up by ~5% over the g++
// 4.4.3's builtin hashtable.
class HandleTable {
 public:
  HandleTable() : length_(0), elems_(0), list_(nullptr) { Resize(); }
  ~HandleTable() { delete[] list_; }

  RLHandle* Lookup(const Slice& key, uint32_t hash) {
    return *FindPointer(key, hash);
  }

  RLHandle* Insert(RLHandle* h) {
    RLHandle** ptr = FindPointer(h->key(), h->hash);
    RLHandle* old = *ptr;
    h->next_hash = (old == nullptr ? nullptr : old->next_hash);
    *ptr = h;
    if (old == nullptr) {
      ++elems_;
      if (elems_ > length_) {
        // Since each cache entry is fairly large, we aim for a small
        // average linked list length (<= 1).
        Resize();
      }
    }
    return old;
  }

  RLHandle* Remove(const Slice& key, uint32_t hash) {
    RLHandle** ptr = FindPointer(key, hash);
    RLHandle* result = *ptr;
    if (result != nullptr) {
      *ptr = result->next_hash;
      --elems_;
    }
    return result;
  }

 private:
  // The table consists of an array of buckets where each bucket is
  // a linked list of cache entries that hash into the bucket.
  uint32_t length_;
  uint32_t elems_;
  RLHandle** list_;

  // Return a pointer to slot that points to a cache entry that
  // matches key/hash.  If there is no such cache entry, return a
  // pointer to the trailing slot in the corresponding linked list.
  RLHandle** FindPointer(const Slice& key, uint32_t hash) {
    RLHandle** ptr = &list_[hash & (length_ - 1)];
    while (*ptr != nullptr &&
           ((*ptr)->hash != hash || key != (*ptr)->key())) {
      ptr = &(*ptr)->next_hash;
    }
    return ptr;
  }

  void Resize() {
    uint32_t new_length = 16;
    while (new_length < elems_ * 1.5) {
      new_length *= 2;
    }
    auto new_list = new RLHandle*[new_length];
    memset(new_list, 0, sizeof(new_list[0]) * new_length);
    uint32_t count = 0;
    for (uint32_t i = 0; i < length_; i++) {
      RLHandle* h = list_[i];
      while (h != nullptr) {
        RLHandle* next = h->next_hash;
        uint32_t hash = h->hash;
        RLHandle** ptr = &new_list[hash & (new_length - 1)];
        h->next_hash = *ptr;
        *ptr = h;
        h = next;
        count++;
      }
    }
    DCHECK_EQ(elems_, count);
    delete[] list_;
    list_ = new_list;
    length_ = new_length;
  }
};

string ToString(Cache::EvictionPolicy p) {
  switch (p) {
    case Cache::EvictionPolicy::FIFO:
      return "fifo";
    case Cache::EvictionPolicy::LRU:
      return "lru";
    default:
      LOG(FATAL) << "unexpected cache eviction policy: " << static_cast<int>(p);
      break;
  }
  return "unknown";
}

// A single shard of sharded cache.
template<Cache::EvictionPolicy policy>
class CacheShard {
 public:
  explicit CacheShard(MemTracker* tracker);
  ~CacheShard();

  // Separate from constructor so caller can easily make an array of CacheShard
  void SetCapacity(size_t capacity) {
    capacity_ = capacity;
    max_deferred_consumption_ = capacity * FLAGS_cache_memtracker_approximation_ratio;
  }

  void SetMetrics(CacheMetrics* metrics) { metrics_ = metrics; }

  Cache::Handle* Insert(RLHandle* handle, Cache::EvictionCallback* eviction_callback);
  // Like Cache::Lookup, but with an extra "hash" parameter.
  Cache::Handle* Lookup(const Slice& key, uint32_t hash, bool caching);
  void Release(Cache::Handle* handle);
  void Erase(const Slice& key, uint32_t hash);
  size_t Invalidate(const Cache::InvalidationControl& ctl);

 private:
  void RL_Remove(RLHandle* e);
  void RL_Append(RLHandle* e);
  // Update the recency list after a lookup operation.
  void RL_UpdateAfterLookup(RLHandle* e);
  // Just reduce the reference count by 1.
  // Return true if last reference
  bool Unref(RLHandle* e);
  // Call the user's eviction callback, if it exists, and free the entry.
  void FreeEntry(RLHandle* e);


  // Update the memtracker's consumption by the given amount.
  //
  // This "buffers" the updates locally in 'deferred_consumption_' until the amount
  // of accumulated delta is more than ~1% of the cache capacity. This improves
  // performance under workloads with high eviction rates for a few reasons:
  //
  // 1) once the cache reaches its full capacity, we expect it to remain there
  // in steady state. Each insertion is usually matched by an eviction, and unless
  // the total size of the evicted item(s) is much different than the size of the
  // inserted item, each eviction event is unlikely to change the total cache usage
  // much. So, we expect that the accumulated error will mostly remain around 0
  // and we can avoid propagating changes to the MemTracker at all.
  //
  // 2) because the cache implementation is sharded, we do this tracking in a bunch
  // of different locations, avoiding bouncing cache-lines between cores. By contrast
  // the MemTracker is a simple integer, so it doesn't scale as well under concurrency.
  //
  // Positive delta indicates an increased memory consumption.
  void UpdateMemTracker(int64_t delta);

  // Update the metrics for a lookup operation in the cache.
  void UpdateMetricsLookup(bool was_hit, bool caching);

  // Initialized before use.
  size_t capacity_;

  // mutex_ protects the following state.
  simple_spinlock mutex_;
  size_t usage_;

  // Dummy head of recency list.
  // rl.prev is newest entry, rl.next is oldest entry.
  RLHandle rl_;

  HandleTable table_;

  MemTracker* mem_tracker_;
  atomic<int64_t> deferred_consumption_ { 0 };

  // Initialized based on capacity_ to ensure an upper bound on the error on the
  // MemTracker consumption.
  int64_t max_deferred_consumption_;

  CacheMetrics* metrics_;
};

template<Cache::EvictionPolicy policy>
CacheShard<policy>::CacheShard(MemTracker* tracker)
    : usage_(0),
      mem_tracker_(tracker),
      metrics_(nullptr) {
  // Make empty circular linked list.
  rl_.next = &rl_;
  rl_.prev = &rl_;
}

template<Cache::EvictionPolicy policy>
CacheShard<policy>::~CacheShard() {
  for (RLHandle* e = rl_.next; e != &rl_; ) {
    RLHandle* next = e->next;
    DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 1)
        << "caller has an unreleased handle";
    if (Unref(e)) {
      FreeEntry(e);
    }
    e = next;
  }
  mem_tracker_->Consume(deferred_consumption_);
}

template<Cache::EvictionPolicy policy>
bool CacheShard<policy>::Unref(RLHandle* e) {
  DCHECK_GT(e->refs.load(std::memory_order_relaxed), 0);
  return e->refs.fetch_sub(1) == 1;
}

template<Cache::EvictionPolicy policy>
void CacheShard<policy>::FreeEntry(RLHandle* e) {
  DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 0);
  if (e->eviction_callback) {
    e->eviction_callback->EvictedEntry(e->key(), e->value());
  }
  UpdateMemTracker(-static_cast<int64_t>(e->charge));
  if (PREDICT_TRUE(metrics_)) {
    metrics_->cache_usage->DecrementBy(e->charge);
    metrics_->evictions->Increment();
  }
  delete [] e;
}

template<Cache::EvictionPolicy policy>
void CacheShard<policy>::UpdateMemTracker(int64_t delta) {
  int64_t old_deferred = deferred_consumption_.fetch_add(delta);
  int64_t new_deferred = old_deferred + delta;

  if (new_deferred > max_deferred_consumption_ ||
      new_deferred < -max_deferred_consumption_) {
    int64_t to_propagate = deferred_consumption_.exchange(0, std::memory_order_relaxed);
    mem_tracker_->Consume(to_propagate);
  }
}

template<Cache::EvictionPolicy policy>
void CacheShard<policy>::UpdateMetricsLookup(bool was_hit, bool caching) {
  if (PREDICT_TRUE(metrics_)) {
    metrics_->lookups->Increment();
    if (was_hit) {
      if (caching) {
        metrics_->cache_hits_caching->Increment();
      } else {
        metrics_->cache_hits->Increment();
      }
    } else {
      if (caching) {
        metrics_->cache_misses_caching->Increment();
      } else {
        metrics_->cache_misses->Increment();
      }
    }
  }
}

template<Cache::EvictionPolicy policy>
void CacheShard<policy>::RL_Remove(RLHandle* e) {
  e->next->prev = e->prev;
  e->prev->next = e->next;
  DCHECK_GE(usage_, e->charge);
  usage_ -= e->charge;
}

template<Cache::EvictionPolicy policy>
void CacheShard<policy>::RL_Append(RLHandle* e) {
  // Make "e" newest entry by inserting just before rl_.
  e->next = &rl_;
  e->prev = rl_.prev;
  e->prev->next = e;
  e->next->prev = e;
  usage_ += e->charge;
}

template<>
void CacheShard<Cache::EvictionPolicy::FIFO>::RL_UpdateAfterLookup(RLHandle* /* e */) {
}

template<>
void CacheShard<Cache::EvictionPolicy::LRU>::RL_UpdateAfterLookup(RLHandle* e) {
  RL_Remove(e);
  RL_Append(e);
}

template<Cache::EvictionPolicy policy>
Cache::Handle* CacheShard<policy>::Lookup(const Slice& key,
                                          uint32_t hash,
                                          bool caching) {
  RLHandle* e;
  {
    std::lock_guard<decltype(mutex_)> l(mutex_);
    e = table_.Lookup(key, hash);
    if (e != nullptr) {
      e->refs.fetch_add(1, std::memory_order_relaxed);
      RL_UpdateAfterLookup(e);
    }
  }

  // Do the metrics outside of the lock.
  UpdateMetricsLookup(e != nullptr, caching);

  return reinterpret_cast<Cache::Handle*>(e);
}

template<Cache::EvictionPolicy policy>
void CacheShard<policy>::Release(Cache::Handle* handle) {
  RLHandle* e = reinterpret_cast<RLHandle*>(handle);
  bool last_reference = Unref(e);
  if (last_reference) {
    FreeEntry(e);
  }
}

template<Cache::EvictionPolicy policy>
Cache::Handle* CacheShard<policy>::Insert(
    RLHandle* handle,
    Cache::EvictionCallback* eviction_callback) {
  // Set the remaining RLHandle members which were not already allocated during
  // Allocate().
  handle->eviction_callback = eviction_callback;
  // Two refs for the handle: one from CacheShard, one for the returned handle.
  handle->refs.store(2, std::memory_order_relaxed);
  UpdateMemTracker(handle->charge);
  if (PREDICT_TRUE(metrics_)) {
    metrics_->cache_usage->IncrementBy(handle->charge);
    metrics_->inserts->Increment();
  }

  RLHandle* to_remove_head = nullptr;
  {
    std::lock_guard<decltype(mutex_)> l(mutex_);

    RL_Append(handle);

    RLHandle* old = table_.Insert(handle);
    if (old != nullptr) {
      RL_Remove(old);
      if (Unref(old)) {
        old->next = to_remove_head;
        to_remove_head = old;
      }
    }

    while (usage_ > capacity_ && rl_.next != &rl_) {
      RLHandle* old = rl_.next;
      RL_Remove(old);
      table_.Remove(old->key(), old->hash);
      if (Unref(old)) {
        old->next = to_remove_head;
        to_remove_head = old;
      }
    }
  }

  // we free the entries here outside of mutex for
  // performance reasons
  while (to_remove_head != nullptr) {
    RLHandle* next = to_remove_head->next;
    FreeEntry(to_remove_head);
    to_remove_head = next;
  }

  return reinterpret_cast<Cache::Handle*>(handle);
}

template<Cache::EvictionPolicy policy>
void CacheShard<policy>::Erase(const Slice& key, uint32_t hash) {
  RLHandle* e;
  bool last_reference = false;
  {
    std::lock_guard<decltype(mutex_)> l(mutex_);
    e = table_.Remove(key, hash);
    if (e != nullptr) {
      RL_Remove(e);
      last_reference = Unref(e);
    }
  }
  // mutex not held here
  // last_reference will only be true if e != NULL
  if (last_reference) {
    FreeEntry(e);
  }
}

template<Cache::EvictionPolicy policy>
size_t CacheShard<policy>::Invalidate(const Cache::InvalidationControl& ctl) {
  size_t invalid_entry_count = 0;
  size_t valid_entry_count = 0;
  RLHandle* to_remove_head = nullptr;

  {
    std::lock_guard<decltype(mutex_)> l(mutex_);

    // rl_.next is the oldest (a.k.a. least relevant) entry in the recency list.
    RLHandle* h = rl_.next;
    while (h != nullptr && h != &rl_ &&
           ctl.iteration_func(valid_entry_count, invalid_entry_count)) {
      if (ctl.validity_func(h->key(), h->value())) {
        // Continue iterating over the list.
        h = h->next;
        ++valid_entry_count;
        continue;
      }
      // Copy the handle slated for removal.
      RLHandle* h_to_remove = h;
      // Prepare for next iteration of the cycle.
      h = h->next;

      RL_Remove(h_to_remove);
      table_.Remove(h_to_remove->key(), h_to_remove->hash);
      if (Unref(h_to_remove)) {
        h_to_remove->next = to_remove_head;
        to_remove_head = h_to_remove;
      }
      ++invalid_entry_count;
    }
  }
  // Once removed from the lookup table and the recency list, the entries
  // with no references left must be deallocated because Cache::Release()
  // wont be called for them from elsewhere.
  while (to_remove_head != nullptr) {
    RLHandle* next = to_remove_head->next;
    FreeEntry(to_remove_head);
    to_remove_head = next;
  }
  return invalid_entry_count;
}

// Determine the number of bits of the hash that should be used to determine
// the cache shard. This, in turn, determines the number of shards.
int DetermineShardBits() {
  int bits = PREDICT_FALSE(FLAGS_cache_force_single_shard) ?
      0 : Bits::Log2Ceiling(base::NumCPUs());
  VLOG(1) << "Will use " << (1 << bits) << " shards for recency list cache.";
  return bits;
}

template<Cache::EvictionPolicy policy>
class ShardedCache : public Cache {
 public:
  explicit ShardedCache(size_t capacity, const string& id)
        : shard_bits_(DetermineShardBits()) {
    // A cache is often a singleton, so:
    // 1. We reuse its MemTracker if one already exists, and
    // 2. It is directly parented to the root MemTracker.
    mem_tracker_ = MemTracker::FindOrCreateGlobalTracker(
        -1, strings::Substitute("$0-sharded_$1_cache", id, ToString(policy)));

    int num_shards = 1 << shard_bits_;
    const size_t per_shard = (capacity + (num_shards - 1)) / num_shards;
    for (int s = 0; s < num_shards; s++) {
      unique_ptr<CacheShard<policy>> shard(
          new CacheShard<policy>(mem_tracker_.get()));
      shard->SetCapacity(per_shard);
      shards_.push_back(shard.release());
    }
  }

  virtual ~ShardedCache() {
    STLDeleteElements(&shards_);
  }

  void SetMetrics(std::unique_ptr<CacheMetrics> metrics,
                  ExistingMetricsPolicy metrics_policy) override {
    std::lock_guard<decltype(metrics_lock_)> l(metrics_lock_);
    if (metrics_ && metrics_policy == ExistingMetricsPolicy::kKeep) {
      // KUDU-2165: reuse of the Cache singleton across multiple InternalMiniCluster
      // servers causes TSAN errors. So, we'll ensure that metrics only get
      // attached once, from whichever server starts first. This has the downside
      // that, in test builds, we won't get accurate cache metrics, but that's
      // probably better than spurious failures.
      CHECK(IsGTest()) << "Metrics should only be set once per Cache";
      return;
    }
    metrics_ = std::move(metrics);
    for (auto* cache : shards_) {
      cache->SetMetrics(metrics_.get());
    }
  }

  UniqueHandle Lookup(const Slice& key, CacheBehavior caching) override {
    const uint32_t hash = HashSlice(key);
    return UniqueHandle(
        shards_[Shard(hash)]->Lookup(key, hash, caching == EXPECT_IN_CACHE),
        Cache::HandleDeleter(this));
  }

  void Erase(const Slice& key) override {
    const uint32_t hash = HashSlice(key);
    shards_[Shard(hash)]->Erase(key, hash);
  }

  Slice Value(const UniqueHandle& handle) const override {
    return reinterpret_cast<const RLHandle*>(handle.get())->value();
  }

  UniqueHandle Insert(UniquePendingHandle handle,
                      Cache::EvictionCallback* eviction_callback) override {
    RLHandle* h = reinterpret_cast<RLHandle*>(DCHECK_NOTNULL(handle.release()));
    return UniqueHandle(
        shards_[Shard(h->hash)]->Insert(h, eviction_callback),
        Cache::HandleDeleter(this));
  }

  UniquePendingHandle Allocate(Slice key, int val_len, int charge) override {
    int key_len = key.size();
    DCHECK_GE(key_len, 0);
    DCHECK_GE(val_len, 0);
    int key_len_padded = KUDU_ALIGN_UP(key_len, sizeof(void*));
    UniquePendingHandle h(reinterpret_cast<PendingHandle*>(
        new uint8_t[sizeof(RLHandle)
                    + key_len_padded + val_len // the kv_data VLA data
                    - 1 // (the VLA has a 1-byte placeholder)
                   ]),
        PendingHandleDeleter(this));
    RLHandle* handle = reinterpret_cast<RLHandle*>(h.get());
    handle->key_length = key_len;
    handle->val_length = val_len;
    // TODO(KUDU-1091): account for the footprint of structures used by Cache's
    //                  internal housekeeping (RL handles, etc.) in case of
    //                  non-automatic charge.
    handle->charge = (charge == kAutomaticCharge) ? kudu_malloc_usable_size(h.get())
                                                  : charge;
    handle->hash = HashSlice(key);
    memcpy(handle->kv_data, key.data(), key_len);

    return h;
  }

  uint8_t* MutableValue(UniquePendingHandle* handle) override {
    return reinterpret_cast<RLHandle*>(handle->get())->mutable_val_ptr();
  }

  size_t Invalidate(const InvalidationControl& ctl) override {
    size_t invalidated_count = 0;
    for (auto& shard: shards_) {
      invalidated_count += shard->Invalidate(ctl);
    }
    return invalidated_count;
  }

 protected:
  void Release(Handle* handle) override {
    RLHandle* h = reinterpret_cast<RLHandle*>(handle);
    shards_[Shard(h->hash)]->Release(handle);
  }

  void Free(PendingHandle* h) override {
    uint8_t* data = reinterpret_cast<uint8_t*>(h);
    delete [] data;
  }

 private:
  static inline uint32_t HashSlice(const Slice& s) {
    return util_hash::CityHash64(
      reinterpret_cast<const char *>(s.data()), s.size());
  }

  uint32_t Shard(uint32_t hash) {
    // Widen to uint64 before shifting, or else on a single CPU,
    // we would try to shift a uint32_t by 32 bits, which is undefined.
    return static_cast<uint64_t>(hash) >> (32 - shard_bits_);
  }

  shared_ptr<MemTracker> mem_tracker_;
  unique_ptr<CacheMetrics> metrics_;
  vector<CacheShard<policy>*> shards_;

  // Number of bits of hash used to determine the shard.
  const int shard_bits_;

  // Protects 'metrics_'. Used only when metrics are set, to ensure
  // that they are set only once in test environments.
  simple_spinlock metrics_lock_;
};

}  // end anonymous namespace

template<>
Cache* NewCache<Cache::EvictionPolicy::FIFO,
                Cache::MemoryType::DRAM>(size_t capacity, const std::string& id) {
  return new ShardedCache<Cache::EvictionPolicy::FIFO>(capacity, id);
}

template<>
Cache* NewCache<Cache::EvictionPolicy::LRU,
                Cache::MemoryType::DRAM>(size_t capacity, const std::string& id) {
  return new ShardedCache<Cache::EvictionPolicy::LRU>(capacity, id);
}

std::ostream& operator<<(std::ostream& os, Cache::MemoryType mem_type) {
  switch (mem_type) {
    case Cache::MemoryType::DRAM:
      os << "DRAM";
      break;
    case Cache::MemoryType::NVM:
      os << "NVM";
      break;
    default:
      os << "unknown (" << static_cast<int>(mem_type) << ")";
      break;
  }
  return os;
}

}  // namespace kudu
