/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "LRUEntriesMap.hpp"

#include <mutex>

#include "CacheImpl.hpp"
#include "EvictionController.hpp"
#include "ExpiryTaskManager.hpp"
#include "LRUList.cpp"
#include "MapSegment.hpp"
#include "util/concurrent/spinlock_mutex.hpp"

namespace apache {
namespace geode {
namespace client {

/**
 * @brief LRUAction for testing map outside of a region....
 */
class APACHE_GEODE_EXPORT TestMapAction : public virtual LRUAction {
 private:
  EntriesMap* m_eMap;

 public:
  explicit TestMapAction(EntriesMap* eMap) : m_eMap(eMap) { m_destroys = true; }

  virtual ~TestMapAction() {}

  virtual bool evict(const std::shared_ptr<MapEntryImpl>& mePtr) {
    std::shared_ptr<CacheableKey> keyPtr;
    mePtr->getKeyI(keyPtr);
    /** @TODO try catch.... return true or false. */
    std::shared_ptr<Cacheable> cPtr;  // old value.
    std::shared_ptr<MapEntryImpl> me;
    std::shared_ptr<VersionTag> versionTag;
    return (m_eMap->remove(keyPtr, cPtr, me, 0, versionTag, false) == GF_NOERR);
  }

  virtual LRUAction::Action getType() { return LRUAction::LOCAL_DESTROY; }
  friend class LRUAction;
};

LRUEntriesMap::LRUEntriesMap(ExpiryTaskManager* expiryTaskManager,
                             std::unique_ptr<EntryFactory> entryFactory,
                             RegionInternal* region,
                             const LRUAction::Action& lruAction,
                             const uint32_t limit,
                             bool concurrencyChecksEnabled,
                             const uint8_t concurrency, bool heapLRUEnabled)
    : ConcurrentEntriesMap(expiryTaskManager, std::move(entryFactory),
                           concurrencyChecksEnabled, region, concurrency),
      m_lruList(),
      m_limit(limit),
      m_pmPtr(nullptr),
      m_validEntries(0),
      m_heapLRUEnabled(heapLRUEnabled) {
  m_currentMapSize = 0;
  m_action = nullptr;
  m_evictionControllerPtr = nullptr;
  // translate action type to an instance.
  if (region) {
    m_action = LRUAction::newLRUAction(lruAction, region, this);
    m_name = region->getName();
    CacheImpl* cImpl = region->getCacheImpl();
    if (cImpl != nullptr) {
      m_evictionControllerPtr = cImpl->getEvictionController();
      if (m_evictionControllerPtr != nullptr) {
        m_evictionControllerPtr->registerRegion(m_name);
        LOGINFO("Heap LRU eviction controller registered region %s",
                m_name.c_str());
      }
    }
  } else {
    m_action = new TestMapAction(this);
  }
}

void LRUEntriesMap::close() {
  if (m_evictionControllerPtr != nullptr) {
    m_evictionControllerPtr->updateRegionHeapInfo((-1 * (m_currentMapSize)));
    m_evictionControllerPtr->deregisterRegion(m_name);
  }
  ConcurrentEntriesMap::close();
}

void LRUEntriesMap::clear() {
  updateMapSize((-1 * (m_currentMapSize)));
  ConcurrentEntriesMap::clear();
}

LRUEntriesMap::~LRUEntriesMap() { delete m_action; }

/**
 * @brief put an item in the map... if it is a new entry, then the LRU may
 * need to be consulted.
 * If LRUAction is LRUInvalidateAction, then increment if old value was absent.
 * If LRUAction is one of Destroys, then increment if old Entry was absent.
 */
GfErrType LRUEntriesMap::create(const std::shared_ptr<CacheableKey>& key,
                                const std::shared_ptr<Cacheable>& newValue,
                                std::shared_ptr<MapEntryImpl>& me,
                                std::shared_ptr<Cacheable>& oldValue,
                                int updateCount, int destroyTracker,
                                std::shared_ptr<VersionTag> versionTag) {
  MapSegment* segmentRPtr = segmentFor(key);
  GfErrType err = GF_NOERR;
  {  // SYNCHRONIZE_SEGMENT(segmentRPtr);
    std::shared_ptr<MapEntryImpl> mePtr;
    if ((err = segmentRPtr->create(key, newValue, me, oldValue, updateCount,
                                   destroyTracker, versionTag)) != GF_NOERR) {
      return err;
    }
    // TODO:  can newValue ever be a token ??
    if (!CacheableToken::isToken(newValue)) {
      m_validEntries++;
    }
    //  oldValue can be an invalid token when "createIfInvalid" is true
    if (!CacheableToken::isInvalid(oldValue)) {
      m_size++;
    }
    std::shared_ptr<Cacheable> tmpValue;
    segmentRPtr->getEntry(key, mePtr, tmpValue);
    if (mePtr == nullptr) {
      return err;
    }
    m_lruList.appendEntry(mePtr);
    me = mePtr;
  }
  if (m_evictionControllerPtr != nullptr) {
    int64_t newSize =
        static_cast<int64_t>(Utils::checkAndGetObjectSize(newValue));
    newSize += static_cast<int64_t>(Utils::checkAndGetObjectSize(key));
    if (oldValue != nullptr) {
      newSize -= static_cast<int64_t>(oldValue->objectSize());
    } else {
      newSize -= static_cast<int64_t>(sizeof(void*));
    }
    updateMapSize(newSize);
  }
  err = processLRU();
  return err;
}

GfErrType LRUEntriesMap::processLRU() {
  GfErrType canEvict = GF_NOERR;
  while (canEvict == GF_NOERR && mustEvict()) {
    canEvict = evictionHelper();
  }
  return canEvict;
}

GfErrType LRUEntriesMap::evictionHelper() {
  GfErrType err = GF_NOERR;
  std::shared_ptr<MapEntryImpl> lruEntryPtr;
  m_lruList.getLRUEntry(lruEntryPtr);
  if (lruEntryPtr == nullptr) {
    err = GF_ENOENT;
    return err;
  }
  bool IsEvictDone = m_action->evict(lruEntryPtr);
  if (m_action->overflows() && IsEvictDone) {
    --m_validEntries;
    lruEntryPtr->getLRUProperties().setEvicted();
  }
  if (!IsEvictDone) {
    err = GF_DISKFULL;
    return err;
  }
  return err;
}

void LRUEntriesMap::processLRU(int32_t numEntriesToEvict) {
  int32_t evicted = 0;
  for (int32_t i = 0; i < numEntriesToEvict; i++) {
    if (m_validEntries > 0 && size() > 0) {
      if (evictionHelper() == GF_NOERR) {
        evicted++;
      }
    } else {
      break;
    }
  }
}

GfErrType LRUEntriesMap::invalidate(const std::shared_ptr<CacheableKey>& key,
                                    std::shared_ptr<MapEntryImpl>& me,
                                    std::shared_ptr<Cacheable>& oldValue,
                                    std::shared_ptr<VersionTag> versionTag) {
  int64_t newSize = 0;
  MapSegment* segmentRPtr = segmentFor(key);
  bool isTokenAdded = false;
  GfErrType err =
      segmentRPtr->invalidate(key, me, oldValue, versionTag, isTokenAdded);
  if (isTokenAdded) {
    ++m_size;
  }
  if (err != GF_NOERR) {
    return err;
  }
  bool isOldValueToken = CacheableToken::isToken(oldValue);
  //  get the old value first which is required for heapLRU
  // calculation and for listeners; note even though there is a race
  // here between get and destroy, it will not harm if we get a slightly
  // later value
  // TODO: assess any other effects of this race
  if (CacheableToken::isOverflowed(oldValue)) {
    std::lock_guard<MapSegment> _guard(*segmentRPtr);
    auto&& persistenceInfo = me->getLRUProperties().getPersistenceInfo();
    //  get the old value first which is required for heapLRU
    // calculation and for listeners; note even though there is a race
    // here between get and destroy, it will not harm if we get a slightly
    // older value
    // TODO: there is also a race between segment remove and destroy here
    // need to assess the effect of this; also assess the affect of above
    // mentioned race
    oldValue = m_pmPtr->read(key, persistenceInfo);
    if (oldValue != nullptr) {
      m_pmPtr->destroy(key, persistenceInfo);
    }
  }
  if (!isOldValueToken) {
    --m_validEntries;
    me->getLRUProperties().setEvicted();
    newSize = CacheableToken::invalid()->objectSize();
    if (oldValue != nullptr) {
      newSize -= oldValue->objectSize();
    } else {
      newSize -= sizeof(void*);
    }
    if (m_evictionControllerPtr != nullptr) {
      if (newSize != 0) {
        updateMapSize(newSize);
      }
    }
  }
  return err;
}

GfErrType LRUEntriesMap::put(const std::shared_ptr<CacheableKey>& key,
                             const std::shared_ptr<Cacheable>& newValue,
                             std::shared_ptr<MapEntryImpl>& me,
                             std::shared_ptr<Cacheable>& oldValue,
                             int updateCount, int destroyTracker,
                             std::shared_ptr<VersionTag> versionTag,
                             bool& isUpdate, DataInput* delta) {
  MapSegment* segmentRPtr = segmentFor(key);
  GfErrType err = GF_NOERR;
  bool segmentLocked = false;
  {
    if (m_action != nullptr &&
        m_action->getType() == LRUAction::OVERFLOW_TO_DISK) {
      segmentRPtr->lock();
      segmentLocked = true;
    }
    std::shared_ptr<MapEntryImpl> mePtr;
    if ((err = segmentRPtr->put(key, newValue, me, oldValue, updateCount,
                                destroyTracker, isUpdate, versionTag, delta)) !=
        GF_NOERR) {
      if (segmentLocked == true) segmentRPtr->unlock();
      return err;
    }

    bool isOldValueToken = CacheableToken::isToken(oldValue);
    // TODO:  need tests for checking that oldValue is returned
    // correctly to listeners for overflow -- this is for all operations
    // put, invalidate, destroy
    // TODO:  need tests for checking if the overflowed entry is
    // destroyed *from disk* in put
    //  existing overflowed entry should be returned correctly in
    // put and removed from disk
    // TODO: need to verify if segment mutex lock is enough here
    // TODO: This whole class needs to be rethought and reworked for locking
    // and concurrency issues. The basic issue is two concurrent operations
    // on the same key regardless of whether we need to go to the persistence
    // manager or not. So all operations on the same key should be protected
    // unless very careful thought has gone into it.
    if (CacheableToken::isOverflowed(oldValue)) {
      if (!segmentLocked) {
        segmentRPtr->unlock();
        segmentLocked = true;
      }
      auto&& persistenceInfo = me->getLRUProperties().getPersistenceInfo();
      oldValue = m_pmPtr->read(key, persistenceInfo);
      if (oldValue != nullptr) {
        m_pmPtr->destroy(key, persistenceInfo);
      }
    }
    // SpinLock& lock = segmentRPtr->getSpinLock();
    // std::lock_guard<spinlock_mutex> mapGuard( lock );

    // TODO:  when can newValue be a token ??
    if (CacheableToken::isToken(newValue) && !isOldValueToken) {
      --m_validEntries;
    }
    if (!CacheableToken::isToken(newValue) && isOldValueToken) {
      ++m_validEntries;
    }

    // Add new entry to LRU list
    if (isUpdate == false) {
      ++m_size;
      ++m_validEntries;
      std::shared_ptr<Cacheable> tmpValue;
      segmentRPtr->getEntry(key, mePtr, tmpValue);
      // mePtr cannot be null, we just put it...
      // must convert to an std::shared_ptr<LRUMapEntryImpl>...

      m_lruList.appendEntry(mePtr);
      me = mePtr;
    } else {
      if (!CacheableToken::isToken(newValue) && isOldValueToken) {
        std::shared_ptr<Cacheable> tmpValue;
        segmentRPtr->getEntry(key, mePtr, tmpValue);
        mePtr->getLRUProperties().clearEvicted();
        m_lruList.appendEntry(
            std::shared_ptr<MapEntryImpl>(mePtr->getImplPtr()));
        me = mePtr;
      }
    }
  }
  if (m_evictionControllerPtr != nullptr) {
    int64_t newSize =
        static_cast<int64_t>(Utils::checkAndGetObjectSize(newValue));
    /*
    if (newSize == 0) {
      LOGWARN("Object size for class ID %d should not be zero when HeapLRU is
    enabled", newValue->classId());
      LOGDEBUG("Type ID is %d for the object returning zero HeapLRU size",
    newValue->typeId());
    }
    */
    if (isUpdate == false) {
      newSize += static_cast<int64_t>(Utils::checkAndGetObjectSize(key));
    } else {
      if (oldValue != nullptr) {
        newSize -= static_cast<int64_t>(oldValue->objectSize());
      } else {
        newSize -= static_cast<int64_t>(sizeof(void*));
      }
    }
    updateMapSize(newSize);
  }

  err = processLRU();

  if (segmentLocked) {
    segmentRPtr->unlock();
  }
  return err;
}

/**
 * @brief Get the value from an entry, if the entry exists it will be marked
 * as recently used. Note, getEntry, entries, and values do not mark entries
 * as recently used.
 */
bool LRUEntriesMap::get(const std::shared_ptr<CacheableKey>& key,
                        std::shared_ptr<Cacheable>& returnPtr,
                        std::shared_ptr<MapEntryImpl>& me) {
  bool doProcessLRU = false;
  MapSegment* segmentRPtr = segmentFor(key);
  bool segmentLocked = false;
  if (m_action != nullptr &&
      m_action->getType() == LRUAction::OVERFLOW_TO_DISK) {
    segmentRPtr->lock();
    segmentLocked = true;
  }
  {
    returnPtr = nullptr;
    std::shared_ptr<MapEntryImpl> mePtr;
    if (false == segmentRPtr->getEntry(key, mePtr, returnPtr)) {
      if (segmentLocked == true) segmentRPtr->unlock();
      return false;
    }
    // segmentRPtr->get(key, returnPtr, mePtr);
    auto nodeToMark = mePtr;
    LRUEntryProperties& lruProps = nodeToMark->getLRUProperties();
    if (returnPtr != nullptr && CacheableToken::isOverflowed(returnPtr)) {
      auto&& persistenceInfo = lruProps.getPersistenceInfo();
      std::shared_ptr<Cacheable> tmpObj;
      try {
        tmpObj = m_pmPtr->read(key, persistenceInfo);
      } catch (Exception& ex) {
        LOGERROR("read on the persistence layer failed - %s", ex.what());
        if (segmentLocked == true) segmentRPtr->unlock();
        return false;
      }
      m_region->getRegionStats()->incRetrieves();
      m_region->getCacheImpl()->getCachePerfStats().incRetrieves();

      returnPtr = tmpObj;

      std::shared_ptr<Cacheable> oldValue;
      bool isUpdate;
      std::shared_ptr<VersionTag> versionTag;
      if (GF_NOERR == segmentRPtr->put(key, tmpObj, mePtr, oldValue, 0, 0,
                                       isUpdate, versionTag, nullptr)) {
        // m_entriesRetrieved++;
        ++m_validEntries;
        lruProps.clearEvicted();
        m_lruList.appendEntry(nodeToMark);
      }
      doProcessLRU = true;
      if (m_evictionControllerPtr != nullptr) {
        int64_t newSize = 0;
        if (tmpObj != nullptr) {
          newSize += static_cast<int64_t>(
              tmpObj->objectSize() - CacheableToken::invalid()->objectSize());
        } else {
          newSize += sizeof(void*);
        }
        updateMapSize(newSize);
      }
    }
    me = mePtr;
    // lruProps.clearEvicted();
    lruProps.setRecentlyUsed();
    if (doProcessLRU) {
      GfErrType IsProcessLru = processLRU();
      if ((IsProcessLru != GF_NOERR)) {
        if (segmentLocked) {
          segmentRPtr->unlock();
        }
        return false;
      }
    }
    if (segmentLocked) {
      segmentRPtr->unlock();
    }
    return true;
  }
}

GfErrType LRUEntriesMap::remove(const std::shared_ptr<CacheableKey>& key,
                                std::shared_ptr<Cacheable>& result,
                                std::shared_ptr<MapEntryImpl>& me,
                                int updateCount,
                                std::shared_ptr<VersionTag> versionTag,
                                bool afterRemote) {
  MapSegment* segmentRPtr = segmentFor(key);
  bool isEntryFound = true;
  GfErrType err;
  if ((err = segmentRPtr->remove(key, result, me, updateCount, versionTag,
                                 afterRemote, isEntryFound)) == GF_NOERR) {
    // ACE_Guard<MapSegment> _guard(*segmentRPtr);
    if (result != nullptr && me != nullptr) {
      LRUEntryProperties& lruProps = me->getLRUProperties();
      lruProps.setEvicted();
      if (isEntryFound) --m_size;
      if (!CacheableToken::isToken(result)) {
        --m_validEntries;
      }
      if (CacheableToken::isOverflowed(result)) {
        std::lock_guard<MapSegment> _guard(*segmentRPtr);
        auto&& persistenceInfo = lruProps.getPersistenceInfo();
        //  get the old value first which is required for heapLRU
        // calculation and for listeners; note even though there is a race
        // here between get and destroy, it will not harm if we get a slightly
        // older value
        // TODO: there is also a race between segment remove and destroy here
        // need to assess the effect of this; also assess the affect of above
        // mentioned race
        result = m_pmPtr->read(key, persistenceInfo);
        if (result != nullptr) {
          m_pmPtr->destroy(key, persistenceInfo);
        }
      }
      if (m_evictionControllerPtr != nullptr) {
        int64_t sizeToRemove = static_cast<int64_t>(key->objectSize());
        sizeToRemove += static_cast<int64_t>(result->objectSize());
        updateMapSize((-1 * sizeToRemove));
      }
    }
  }
  return err;
}

void LRUEntriesMap::updateMapSize(int64_t size) {
  // TODO: check and remove null check since this has already been done
  // by all the callers
  if (m_evictionControllerPtr != nullptr) {
    {
      std::lock_guard<spinlock_mutex> __guard(m_mapInfoLock);
      m_currentMapSize += size;
    }
    m_evictionControllerPtr->updateRegionHeapInfo(size);
  }
}
std::shared_ptr<Cacheable> LRUEntriesMap::getFromDisk(
    const std::shared_ptr<CacheableKey>& key,
    std::shared_ptr<MapEntryImpl>& me) const {
  auto&& persistenceInfo = me->getLRUProperties().getPersistenceInfo();
  std::shared_ptr<Cacheable> tmpObj;
  try {
    LOGDEBUG("Reading value from persistence layer for key: %s",
             key->toString().c_str());
    tmpObj = m_pmPtr->read(key, persistenceInfo);
  } catch (Exception& ex) {
    LOGERROR("read on the persistence layer failed - %s", ex.what());
    return nullptr;
  }
  return tmpObj;
}
}  // namespace client
}  // namespace geode
}  // namespace apache
