/** @file

  A cache (with map-esque interface) for RefCountObjs

  @section license License

  Licensed to the Apache Software Foundation (ASF) under one
  or more contributor license agreements.  See the NOTICE file
  distributed with this work for additional information
  regarding copyright ownership.  The ASF licenses this file
  to you under the Apache License, Version 2.0 (the
  "License"); you may not use this file except in compliance
  with the License.  You may obtain a copy of the License at

      http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
 */
#pragma once

#include <I_EventSystem.h>
#include <P_EventSystem.h> // TODO: less? just need ET_TASK

#include "tscore/IntrusiveHashMap.h"
#include "tscore/PriorityQueue.h"

#include "tscore/List.h"
#include "tscore/ink_hrtime.h"

#include "tscore/I_Version.h"
#include <unistd.h>

#define REFCOUNT_CACHE_EVENT_SYNC REFCOUNT_CACHE_EVENT_EVENTS_START

#define REFCOUNTCACHE_MAGIC_NUMBER 0x0BAD2D9

static constexpr unsigned char REFCOUNTCACHE_MAJOR_VERSION = 1;
static constexpr unsigned char REFCOUNTCACHE_MINOR_VERSION = 0;
static constexpr ts::VersionNumber REFCOUNTCACHE_VERSION(1, 0);

// Stats
enum RefCountCache_Stats {
  refcountcache_current_items_stat,        // current number of items
  refcountcache_current_size_stat,         // current size of cache
  refcountcache_total_inserts_stat,        // total items inserted
  refcountcache_total_failed_inserts_stat, // total items unable to insert
  refcountcache_total_lookups_stat,        // total get() calls
  refcountcache_total_hits_stat,           // total hits

  // Persistence metrics
  refcountcache_last_sync_time,   // seconds since epoch of last successful sync
  refcountcache_last_total_items, // number of items sync last time
  refcountcache_last_total_size,  // total size at last sync

  RefCountCache_Stat_Count
};

struct RefCountCacheItemMeta {
  uint64_t key;
  unsigned int size;
  ink_time_t expiry_time; // expire time as seconds since epoch
  RefCountCacheItemMeta(uint64_t key, unsigned int size, ink_time_t expiry_time = -1)
    : key(key), size(size), expiry_time(expiry_time)
  {
  }
};

// Layer of indirection for the hashmap-- since it needs lots of things inside of it
// We'll also use this as the item header, for persisting objects to disk
class RefCountCacheHashEntry
{
public:
  Ptr<RefCountObj> item;
  RefCountCacheHashEntry *_next{nullptr};
  RefCountCacheHashEntry *_prev{nullptr};
  PriorityQueueEntry<RefCountCacheHashEntry *> *expiry_entry = nullptr;
  RefCountCacheItemMeta meta;

  // Need a no-argument constructor to use the classAllocator
  RefCountCacheHashEntry() : item(Ptr<RefCountObj>()), meta(0, 0) {}
  void
  set(RefCountObj *i, uint64_t key, unsigned int size, int expire_time)
  {
    this->item = make_ptr(i);
    this->meta = RefCountCacheItemMeta(key, size, expire_time);
  }

  // make these values comparable -- so we can sort them
  bool
  operator<(const RefCountCacheHashEntry &v2) const
  {
    return this->meta.expiry_time < v2.meta.expiry_time;
  }

  static RefCountCacheHashEntry *alloc();
  static void dealloc(RefCountCacheHashEntry *e);

  template <typename C>
  static void
  free(RefCountCacheHashEntry *e)
  {
    // Since the Value is actually RefCountObj-- when this gets deleted normally it calls the wrong
    // `free` method, this forces the delete/decr to happen with the right type
    Ptr<C> *tmp = (Ptr<C> *)&e->item;
    tmp->clear();

    e->~RefCountCacheHashEntry();
    dealloc(e);
  }
};

// Since the hashing values are all fixed size, we can simply use a classAllocator to avoid mallocs
extern ClassAllocator<PriorityQueueEntry<RefCountCacheHashEntry *>> expiryQueueEntry;

struct RefCountCacheLinkage {
  using key_type   = uint64_t const;
  using value_type = RefCountCacheHashEntry;

  static value_type *&
  next_ptr(value_type *value)
  {
    return value->_next;
  }
  static value_type *&
  prev_ptr(value_type *value)
  {
    return value->_prev;
  }
  static uint64_t
  hash_of(key_type key)
  {
    return key;
  }
  static key_type
  key_of(value_type *v)
  {
    return v->meta.key;
  }
  static bool
  equal(key_type lhs, key_type rhs)
  {
    return lhs == rhs;
  }
};

// The RefCountCachePartition is simply a map of key -> Ptr<YourClass>
// We partition the cache to reduce lock contention
template <class C> class RefCountCachePartition
{
public:
  using hash_type = IntrusiveHashMap<RefCountCacheLinkage>;

  RefCountCachePartition(unsigned int part_num, uint64_t max_size, unsigned int max_items, RecRawStatBlock *rsb = nullptr);
  Ptr<C> get(uint64_t key);
  void put(uint64_t key, C *item, int size = 0, int expire_time = 0);
  void erase(uint64_t key, ink_time_t expiry_time = -1);

  void clear();
  bool is_full() const;
  bool make_space_for(unsigned int);
  void dealloc_entry(hash_type::iterator ptr);

  size_t count() const;
  void copy(std::vector<RefCountCacheHashEntry *> &items);

  hash_type &get_map();

  Ptr<ProxyMutex> lock; // Lock

private:
  void metric_inc(RefCountCache_Stats metric_enum, int64_t data);

  unsigned int part_num;
  uint64_t max_size;
  unsigned int max_items;
  uint64_t size;
  unsigned int items;

  hash_type item_map;

  PriorityQueue<RefCountCacheHashEntry *> expiry_queue;
  RecRawStatBlock *rsb;
};

template <class C>
RefCountCachePartition<C>::RefCountCachePartition(unsigned int part_num, uint64_t max_size, unsigned int max_items,
                                                  RecRawStatBlock *rsb)
  : lock(new_ProxyMutex()), part_num(part_num), max_size(max_size), max_items(max_items), size(0), items(0), rsb(rsb)
{
}

template <class C>
Ptr<C>
RefCountCachePartition<C>::get(uint64_t key)
{
  this->metric_inc(refcountcache_total_lookups_stat, 1);
  if (auto it = this->item_map.find(key); it != this->item_map.end()) {
    // found
    this->metric_inc(refcountcache_total_hits_stat, 1);
    return make_ptr(static_cast<C *>(it->item.get()));
  } else {
    return Ptr<C>();
  }
}

template <class C>
void
RefCountCachePartition<C>::put(uint64_t key, C *item, int size, int expire_time)
{
  this->metric_inc(refcountcache_total_inserts_stat, 1);
  size += sizeof(C);
  // Remove any colliding entries
  this->erase(key);

  // if we are full, and can't make space-- then don't store the item
  if (this->is_full() && !this->make_space_for(size)) {
    Debug("refcountcache", "partition %d is full-- not storing item key=%" PRIu64, this->part_num, key);
    this->metric_inc(refcountcache_total_failed_inserts_stat, 1);
    return;
  }

  // Create our value-- which has a ref to the `item`
  RefCountCacheHashEntry *val = RefCountCacheHashEntry::alloc();
  val->set(item, key, size, expire_time);

  // add expiry_entry to expiry queue, if the expire time is positive (otherwise it means don't expire)
  if (expire_time >= 0) {
    Debug("refcountcache", "partition %d adding entry with expire_time=%d\n", this->part_num, expire_time);
    PriorityQueueEntry<RefCountCacheHashEntry *> *expiry_entry = expiryQueueEntry.alloc();
    new ((void *)expiry_entry) PriorityQueueEntry<RefCountCacheHashEntry *>(val);
    expiry_queue.push(expiry_entry);
    val->expiry_entry = expiry_entry;
  }

  // add the item to the map
  this->item_map.insert(val);
  this->size += val->meta.size;
  this->items++;
  this->metric_inc(refcountcache_current_size_stat, (int64_t)val->meta.size);
  this->metric_inc(refcountcache_current_items_stat, 1);
}

template <class C>
void
RefCountCachePartition<C>::erase(uint64_t key, ink_time_t expiry_time)
{
  if (auto it = this->item_map.find(key); it != this->item_map.end()) {
    if (expiry_time >= 0 && it->meta.expiry_time != expiry_time) {
      return;
    }
    this->item_map.erase(it);
    this->dealloc_entry(it);
  }
}

template <class C>
void
RefCountCachePartition<C>::dealloc_entry(hash_type::iterator ptr)
{
  // decrement usage are not cleaned up. The values are not touched in this method, therefore it is safe
  // counters
  this->size -= ptr->meta.size;
  this->items--;

  this->metric_inc(refcountcache_current_size_stat, -((int64_t)ptr->meta.size));
  this->metric_inc(refcountcache_current_items_stat, -1);

  // remove from expiry queue
  if (ptr->expiry_entry != nullptr) {
    Debug("refcountcache", "partition %d deleting item from expiry_queue idx=%d", this->part_num, ptr->expiry_entry->index);

    this->expiry_queue.erase(ptr->expiry_entry);
    expiryQueueEntry.free(ptr->expiry_entry);
    ptr->expiry_entry = nullptr; // To avoid the destruction of `l` calling the destructor again-- and causing issues
  }

  RefCountCacheHashEntry::free<C>(ptr);
}

template <class C>
void
RefCountCachePartition<C>::clear()
{
  // Since the hash nodes embed the list pointers, you can't iterate over the
  // hash elements and deallocate them, let alone remove them from the hash.
  // Hence, this monstrosity.
  auto it = this->item_map.begin();
  while (it != this->item_map.end()) {
    auto cur = it++;

    this->item_map.erase(cur);
    this->dealloc_entry(cur);
  }
}

// Are we full?
template <class C>
bool
RefCountCachePartition<C>::is_full() const
{
  Debug("refcountcache", "partition %d is full? items %d/%d size %" PRIu64 "/%" PRIu64 "\n\n", this->part_num, this->items,
        this->max_items, this->size, this->max_size);
  return (this->max_items > 0 && this->items >= this->max_items) || (this->max_size > 0 && this->size >= this->max_size);
}

// Attempt to make space for item of `size`
template <class C>
bool
RefCountCachePartition<C>::make_space_for(unsigned int size)
{
  ink_time_t now = ink_time();
  while (this->is_full() || (size > 0 && this->size + size > this->max_size)) {
    PriorityQueueEntry<RefCountCacheHashEntry *> *top_item = expiry_queue.top();
    // if there is nothing in the expiry queue, then we can't make space
    if (top_item == nullptr) {
      return false;
    }

    // If the first item has expired, lets evict it, and then go around again
    if (top_item->node->meta.expiry_time < now) {
      this->erase(top_item->node->meta.key);
    } else { // if the first item isn't expired-- the rest won't be either (queue is sorted)
      return false;
    }
  }
  return true;
}

template <class C>
size_t
RefCountCachePartition<C>::count() const
{
  return this->items;
}

template <class C>
void
RefCountCachePartition<C>::copy(std::vector<RefCountCacheHashEntry *> &items)
{
  for (auto &&it : this->item_map) {
    RefCountCacheHashEntry *val = RefCountCacheHashEntry::alloc();
    val->set(it.item.get(), it.meta.key, it.meta.size, it.meta.expiry_time);
    items.push_back(val);
  }
}

template <class C>
void
RefCountCachePartition<C>::metric_inc(RefCountCache_Stats metric_enum, int64_t data)
{
  if (this->rsb) {
    RecIncrGlobalRawStatCount(this->rsb, metric_enum, data);
  }
}

template <class C>
IntrusiveHashMap<RefCountCacheLinkage> &
RefCountCachePartition<C>::get_map()
{
  return this->item_map;
}

// The header for the cache, this is used to check if the serialized cache is compatible
class RefCountCacheHeader
{
public:
  unsigned int magic = REFCOUNTCACHE_MAGIC_NUMBER;
  ts::VersionNumber version{REFCOUNTCACHE_VERSION};
  ts::VersionNumber object_version; // version passed in of whatever it is we are caching

  RefCountCacheHeader(ts::VersionNumber object_version = ts::VersionNumber());
  bool operator==(const RefCountCacheHeader other) const;
  bool compatible(RefCountCacheHeader *other) const;
};

// RefCountCache is a ref-counted key->value map to store classes that inherit from RefCountObj.
// Once an item is `put` into the cache, the cache will maintain a Ptr<> to that object until erase
// or clear is called-- which will remove the cache's Ptr<> to the object.
//
// This cache may be Persisted (RefCountCacheSync) as well as loaded from disk (LoadRefCountCacheFromPath).
// This class will optionally emit metrics at the given `metrics_prefix`.
//
// Note: although this cache does allow you to set expiry times this cache does not actively GC itself-- meaning
// it will only remove expired items once the space is required. So to ensure that the cache is bounded either a
// size or an item limit must be set-- otherwise the cache will not GC.
//
// Also note, that if keys collide the previous
// entry for a given key will be removed, so this "leak" concern is assuming you don't have sufficient space to store
// an item for each possible key
template <class C> class RefCountCache
{
public:
  // Constructor
  RefCountCache(unsigned int num_partitions, int size = -1, int items = -1, ts::VersionNumber object_version = ts::VersionNumber(),
                const std::string &metrics_prefix = "");
  // Destructor
  ~RefCountCache();

  // User interface to the cache
  Ptr<C> get(uint64_t key);
  void put(uint64_t key, C *item, int size = 0, ink_time_t expiry_time = -1);
  void erase(uint64_t key);
  void clear();

  // Some methods to get some internal state
  int partition_for_key(uint64_t key);
  Ptr<ProxyMutex> lock_for_key(uint64_t key);
  size_t partition_count() const;
  RefCountCachePartition<C> &get_partition(int pnum);
  size_t count() const;
  RefCountCacheHeader &get_header();
  RecRawStatBlock *get_rsb();

private:
  int max_size;  // Total size
  int max_items; // Total number of items allowed
  unsigned int num_partitions;
  std::vector<RefCountCachePartition<C> *> partitions;
  // Header
  RefCountCacheHeader header; // Our header
  RecRawStatBlock *rsb;
};

template <class C>
RefCountCache<C>::RefCountCache(unsigned int num_partitions, int size, int items, ts::VersionNumber object_version,
                                const std::string &metrics_prefix)
  : header(RefCountCacheHeader(object_version)), rsb(nullptr)
{
  this->max_size       = size;
  this->max_items      = items;
  this->num_partitions = num_partitions;

  if (metrics_prefix.length() > 0) {
    this->rsb = RecAllocateRawStatBlock((int)RefCountCache_Stat_Count);

    RecRegisterRawStat(this->rsb, RECT_PROCESS, (metrics_prefix + "current_items").c_str(), RECD_INT, RECP_NON_PERSISTENT,
                       (int)refcountcache_current_items_stat, RecRawStatSyncCount);

    RecRegisterRawStat(this->rsb, RECT_PROCESS, (metrics_prefix + "current_size").c_str(), RECD_INT, RECP_NON_PERSISTENT,
                       (int)refcountcache_current_size_stat, RecRawStatSyncCount);

    RecRegisterRawStat(this->rsb, RECT_PROCESS, (metrics_prefix + "total_inserts").c_str(), RECD_INT, RECP_NON_PERSISTENT,
                       (int)refcountcache_total_inserts_stat, RecRawStatSyncCount);

    RecRegisterRawStat(this->rsb, RECT_PROCESS, (metrics_prefix + "total_failed_inserts").c_str(), RECD_INT, RECP_NON_PERSISTENT,
                       (int)refcountcache_total_failed_inserts_stat, RecRawStatSyncCount);

    RecRegisterRawStat(this->rsb, RECT_PROCESS, (metrics_prefix + "total_lookups").c_str(), RECD_INT, RECP_NON_PERSISTENT,
                       (int)refcountcache_total_lookups_stat, RecRawStatSyncCount);

    RecRegisterRawStat(this->rsb, RECT_PROCESS, (metrics_prefix + "total_hits").c_str(), RECD_INT, RECP_NON_PERSISTENT,
                       (int)refcountcache_total_hits_stat, RecRawStatSyncCount);

    RecRegisterRawStat(this->rsb, RECT_PROCESS, (metrics_prefix + "last_sync.time").c_str(), RECD_INT, RECP_NON_PERSISTENT,
                       (int)refcountcache_last_sync_time, RecRawStatSyncCount);

    RecRegisterRawStat(this->rsb, RECT_PROCESS, (metrics_prefix + "last_sync.total_items").c_str(), RECD_INT, RECP_NON_PERSISTENT,
                       (int)refcountcache_last_total_items, RecRawStatSyncCount);

    RecRegisterRawStat(this->rsb, RECT_PROCESS, (metrics_prefix + "last_sync.total_size").c_str(), RECD_INT, RECP_NON_PERSISTENT,
                       (int)refcountcache_last_total_size, RecRawStatSyncCount);
  }
  // Now lets create all the partitions
  this->partitions.reserve(num_partitions);
  for (unsigned int i = 0; i < num_partitions; i++) {
    this->partitions.push_back(new RefCountCachePartition<C>(i, size / num_partitions, items / num_partitions, this->rsb));
  }
}

// Deconstruct the class
template <class C> RefCountCache<C>::~RefCountCache()
{
  for (unsigned int i = 0; i < num_partitions; i++) {
    delete this->partitions[i];
  }
  num_partitions = 0;
}

template <class C>
Ptr<C>
RefCountCache<C>::get(uint64_t key)
{
  return this->partitions[this->partition_for_key(key)]->get(key);
}

template <class C>
void
RefCountCache<C>::put(uint64_t key, C *item, int size, ink_time_t expiry_time)
{
  return this->partitions[this->partition_for_key(key)]->put(key, item, size, expiry_time);
}

// Pick a partition for a given item
template <class C>
int
RefCountCache<C>::partition_for_key(uint64_t key)
{
  return key % this->num_partitions;
}

template <class C>
RefCountCacheHeader &
RefCountCache<C>::get_header()
{
  return this->header;
}

template <class C>
Ptr<ProxyMutex>
RefCountCache<C>::lock_for_key(uint64_t key)
{
  return this->partitions[this->partition_for_key(key)]->lock;
}

template <class C>
RefCountCachePartition<C> &
RefCountCache<C>::get_partition(int pnum)
{
  return *this->partitions[pnum];
}

template <class C>
size_t
RefCountCache<C>::count() const
{
  size_t c = 0;
  for (unsigned int i = 0; i < this->num_partitions; i++) {
    c += this->partitions[i]->count();
  }
  return c;
}

template <class C>
size_t
RefCountCache<C>::partition_count() const
{
  return this->num_partitions;
}

template <class C>
RecRawStatBlock *
RefCountCache<C>::get_rsb()
{
  return this->rsb;
}

template <class C>
void
RefCountCache<C>::erase(uint64_t key)
{
  this->partitions[this->partition_for_key(key)]->erase(key);
}

template <class C>
void
RefCountCache<C>::clear()
{
  for (unsigned int i = 0; i < this->num_partitions; i++) {
    this->partitions[i]->clear();
  }
}

// Fill `cache` with items in file `filepath` using `load_func` to unmarshall the record.
// Errors are -1
template <typename CacheEntryType>
int
LoadRefCountCacheFromPath(RefCountCache<CacheEntryType> &cache, const std::string &dirname, const std::string &filepath,
                          CacheEntryType *(*load_func)(char *, unsigned int))
{
  // If we have no load method, then we can't load anything so lets just stop right here
  if (load_func == nullptr) {
    return -1; // TODO: some specific error code
  }

  int fd = open(filepath.c_str(), O_RDONLY);
  if (fd < 0) {
    Warning("Unable to open file %s; [Error]: %s", filepath.c_str(), strerror(errno));
    return -1;
  }

  // read in the header
  RefCountCacheHeader tmpHeader = RefCountCacheHeader();
  int read_ret                  = read(fd, (char *)&tmpHeader, sizeof(RefCountCacheHeader));
  if (read_ret != sizeof(RefCountCacheHeader)) {
    socketManager.close(fd);
    Warning("Error reading cache header from disk (expected %ld): %d", sizeof(RefCountCacheHeader), read_ret);
    return -1;
  }
  if (!cache.get_header().compatible(&tmpHeader)) {
    socketManager.close(fd);
    Warning("Incompatible cache at %s, not loading.", filepath.c_str());
    return -1; // TODO: specific code for incompatible
  }

  RefCountCacheItemMeta tmpValue = RefCountCacheItemMeta(0, 0);
  while (true) { // TODO: better loop
    read_ret = read(fd, (char *)&tmpValue, sizeof(tmpValue));
    if (read_ret != sizeof(tmpValue)) {
      break;
    }
    char buf[tmpValue.size];
    read_ret = read(fd, (char *)&buf, tmpValue.size);
    if (read_ret != (int)tmpValue.size) {
      Warning("Encountered error reading item from cache: %d", read_ret);
      break;
    }

    CacheEntryType *newItem = load_func((char *)&buf, tmpValue.size);
    if (newItem != nullptr) {
      cache.put(tmpValue.key, newItem, tmpValue.size - sizeof(CacheEntryType));
    }
  };

  socketManager.close(fd);
  return 0;
}
