/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "MapSegment.hpp"

#include <chrono>

#include "MapEntry.hpp"
#include "RegionInternal.hpp"
#include "TableOfPrimes.hpp"
#include "ThinClientPoolDM.hpp"
#include "ThinClientRegion.hpp"
#include "TombstoneExpiryHandler.hpp"
#include "TrackedMapEntry.hpp"
#include "Utils.hpp"
#include "util/concurrent/spinlock_mutex.hpp"

namespace apache {
namespace geode {
namespace client {

bool MapSegment::boolVal = false;
MapSegment::~MapSegment() {
  delete m_map;
  // m_entryFactory will be disposed by the containing EntriesMap impl.
}

void MapSegment::open(RegionInternal* region, const EntryFactory* entryFactory,
                      ExpiryTaskManager* expiryTaskManager, uint32_t size,
                      std::atomic<int32_t>* destroyTrackers,
                      bool concurrencyChecksEnabled) {
  m_map = new CacheableKeyHashMap();
  uint32_t mapSize = TableOfPrimes::nextLargerPrime(size, m_primeIndex);
  LOGFINER("Initializing MapSegment with size %d (given size %d).", mapSize,
           size);
  m_map->reserve(mapSize);
  m_entryFactory = entryFactory;
  m_region = region;
  m_tombstoneList =
      std::make_shared<TombstoneList>(this, m_region->getCacheImpl());
  m_expiryTaskManager = expiryTaskManager;
  m_numDestroyTrackers = destroyTrackers;
  m_concurrencyChecksEnabled = concurrencyChecksEnabled;
}

void MapSegment::close() {}

void MapSegment::clear() {
  std::lock_guard<spinlock_mutex> lk(m_spinlock);
  m_map->clear();
}

void MapSegment::lock() { m_segmentMutex.lock(); }

void MapSegment::unlock() { m_segmentMutex.unlock(); }

GfErrType MapSegment::create(const std::shared_ptr<CacheableKey>& key,
                             const std::shared_ptr<Cacheable>& newValue,
                             std::shared_ptr<MapEntryImpl>& me,
                             std::shared_ptr<Cacheable>& oldValue,
                             int updateCount, int destroyTracker,
                             std::shared_ptr<VersionTag> versionTag) {
  ExpiryTaskManager::id_type taskid = -1;
  TombstoneExpiryHandler* handler = nullptr;
  GfErrType err = GF_NOERR;
  {
    std::lock_guard<spinlock_mutex> lk(m_spinlock);
    // if size is greater than 75 percent of prime, rehash
    auto mapSize = TableOfPrimes::getPrime(m_primeIndex);
    if (((m_map->size() * 75) / 100) > mapSize) {
      rehash();
    }

    const auto& find = m_map->find(key);
    if (find == m_map->end()) {
      if ((err = putNoEntry(key, newValue, me, updateCount, destroyTracker,
                            versionTag)) != GF_NOERR) {
        return err;
      }
    } else {
      auto& entry = find->second;
      auto entryImpl = entry->getImplPtr();
      entryImpl->getValueI(oldValue);
      if (oldValue == nullptr || CacheableToken::isTombstone(oldValue)) {
        // pass the version stamp
        VersionStamp versionStamp;
        if (m_concurrencyChecksEnabled) {
          versionStamp = entry->getVersionStamp();
          if (versionTag) {
            err = versionStamp.processVersionTag(m_region, key, versionTag,
                                                 false);
            if (err != GF_NOERR) return err;
            versionStamp.setVersions(versionTag);
          }
        }
        // good case; go ahead with the create
        if (oldValue == nullptr) {
          err = putForTrackedEntry(key, newValue, entry, entryImpl, updateCount,
                                   versionStamp);
        } else {
          unguardedRemoveActualEntryWithoutCancelTask(key, handler, taskid);
          err = putNoEntry(key, newValue, me, updateCount, destroyTracker,
                           versionTag, &versionStamp);
        }

        oldValue = nullptr;

      } else {
        err = GF_CACHE_ENTRY_EXISTS;
      }
      if (err == GF_NOERR) {
        me = entryImpl;
      }
    }
  }
  if (taskid != -1) {
    m_expiryTaskManager->cancelTask(taskid);
    if (handler != nullptr) delete handler;
  }
  return err;
}

/**
 * @brief put a value in the map, replacing if key already exists.
 */
GfErrType MapSegment::put(const std::shared_ptr<CacheableKey>& key,
                          const std::shared_ptr<Cacheable>& newValue,
                          std::shared_ptr<MapEntryImpl>& me,
                          std::shared_ptr<Cacheable>& oldValue, int updateCount,
                          int destroyTracker, bool& isUpdate,
                          std::shared_ptr<VersionTag> versionTag,
                          DataInput* delta) {
  ExpiryTaskManager::id_type taskid = -1;
  TombstoneExpiryHandler* handler = nullptr;
  GfErrType err = GF_NOERR;
  {
    std::lock_guard<spinlock_mutex> lk(m_spinlock);
    // if size is greater than 75 percent of prime, rehash
    uint32_t mapSize = TableOfPrimes::getPrime(m_primeIndex);
    if (((m_map->size() * 75) / 100) > mapSize) {
      rehash();
    }

    const auto& find = m_map->find(key);
    if (find == m_map->end()) {
      if (delta != nullptr) {
        return GF_INVALID_DELTA;  // You can not apply delta when there is no
      }
      // entry hence ask for full object
      isUpdate = false;
      err = putNoEntry(key, newValue, me, updateCount, destroyTracker,
                       versionTag);
    } else {
      auto& entry = find->second;
      auto entryImpl = entry->getImplPtr();
      std::shared_ptr<Cacheable> meOldValue;
      entryImpl->getValueI(meOldValue);
      // pass the version stamp
      VersionStamp versionStamp;
      if (m_concurrencyChecksEnabled) {
        versionStamp = entry->getVersionStamp();
        if (versionTag) {
          if (delta == nullptr) {
            err = versionStamp.processVersionTag(m_region, key, versionTag,
                                                 false);
          } else {
            err =
                versionStamp.processVersionTag(m_region, key, versionTag, true);
          }

          if (err != GF_NOERR) return err;
          versionStamp.setVersions(versionTag);
        }
      }
      if (CacheableToken::isTombstone(meOldValue)) {
        unguardedRemoveActualEntryWithoutCancelTask(key, handler, taskid);
        err = putNoEntry(key, newValue, me, updateCount, destroyTracker,
                         versionTag, &versionStamp);
        meOldValue = nullptr;
        isUpdate = false;
      } else if ((err = putForTrackedEntry(key, newValue, entry, entryImpl,
                                           updateCount, versionStamp, delta)) ==
                 GF_NOERR) {
        me = entryImpl;
        oldValue = meOldValue;
        isUpdate = (meOldValue != nullptr);
      }
    }
  }
  if (taskid != -1) {
    m_expiryTaskManager->cancelTask(taskid);
    if (handler != nullptr) delete handler;
  }
  return err;
}

GfErrType MapSegment::invalidate(const std::shared_ptr<CacheableKey>& key,
                                 std::shared_ptr<MapEntryImpl>& me,
                                 std::shared_ptr<Cacheable>& oldValue,
                                 std::shared_ptr<VersionTag> versionTag,
                                 bool& isTokenAdded) {
  std::lock_guard<spinlock_mutex> lk(m_spinlock);
  isTokenAdded = false;
  GfErrType err = GF_NOERR;

  const auto& find = m_map->find(key);
  if (find != m_map->end()) {
    auto entry = find->second;
    VersionStamp versionStamp;
    if (m_concurrencyChecksEnabled) {
      versionStamp = entry->getVersionStamp();
      if (versionTag) {
        err = versionStamp.processVersionTag(m_region, key, versionTag, false);
        if (err != GF_NOERR) return err;
        versionStamp.setVersions(versionTag);
      }
    }
    auto entryImpl = entry->getImplPtr();
    entryImpl->getValueI(oldValue);
    if (CacheableToken::isTombstone(oldValue)) {
      oldValue = nullptr;
      return GF_CACHE_ENTRY_NOT_FOUND;
    }
    entryImpl->setValueI(CacheableToken::invalid());
    if (m_concurrencyChecksEnabled) {
      entryImpl->getVersionStamp().setVersions(versionStamp);
    }
    (void)incrementUpdateCount(key, entry);
    if (oldValue != nullptr) {
      me = entryImpl;
    }
  } else {
    // create new entry for the key if concurrencychecksEnabled is true
    if (m_concurrencyChecksEnabled) {
      if ((err = putNoEntry(key, CacheableToken::invalid(), me, -1, -1,
                            versionTag)) != GF_NOERR) {
        return err;
      }
      isTokenAdded = true;
    }
    err = GF_CACHE_ENTRY_NOT_FOUND;
  }
  return err;
}

GfErrType MapSegment::removeWhenConcurrencyEnabled(
    const std::shared_ptr<CacheableKey>& key,
    std::shared_ptr<Cacheable>& oldValue, std::shared_ptr<MapEntryImpl>& me,
    int updateCount, std::shared_ptr<VersionTag> versionTag, bool afterRemote,
    bool& isEntryFound, ExpiryTaskManager::id_type expiryTaskID,
    TombstoneExpiryHandler* handler, bool& expTaskSet) {
  GfErrType err = GF_NOERR;
  VersionStamp versionStamp;
  // If entry found, else return no entry
  const auto& find = m_map->find(key);
  if (find != m_map->end()) {
    auto entry = find->second;
    isEntryFound = true;
    // If the version tag is null, use the version tag of
    // the existing entry
    versionStamp = entry->getVersionStamp();
    if (versionTag) {
      std::shared_ptr<CacheableKey> keyPtr;
      entry->getImplPtr()->getKeyI(keyPtr);
      if ((err = entry->getVersionStamp().processVersionTag(
               m_region, keyPtr, versionTag, false)) != GF_NOERR) {
        return err;
      }
      versionStamp.setVersions(versionTag);
    }
    // Get the old value for returning
    auto entryImpl = entry->getImplPtr();
    entryImpl->getValueI(oldValue);

    if (oldValue) me = entryImpl;

    if ((err = putForTrackedEntry(key, CacheableToken::tombstone(), entry,
                                  entryImpl, updateCount, versionStamp)) ==
        GF_NOERR) {
      m_tombstoneList->add(entryImpl, handler, expiryTaskID);
      expTaskSet = true;
    }
    if (CacheableToken::isTombstone(oldValue)) {
      oldValue = nullptr;
      if (afterRemote) {
        return GF_NOERR;  // We are here because a remote op succeeded, no need
                          // to throw an error
      } else {
        return GF_CACHE_ENTRY_NOT_FOUND;
      }
    }
  } else {
    // If entry not found than add a tombstone for this entry
    // so that any future updates for this entry are checked for version
    // no entry
    if (versionTag) {
      std::shared_ptr<MapEntryImpl> mapEntry;
      putNoEntry(key, CacheableToken::tombstone(), mapEntry, -1, 0, versionTag);
      m_tombstoneList->add(mapEntry->getImplPtr(), handler, expiryTaskID);
      expTaskSet = true;
    }
    oldValue = nullptr;
    isEntryFound = false;
    if (afterRemote) {
      err = GF_NOERR;  // We are here because a remote op succeeded, no need to
                       // throw an error
    } else {
      err = GF_CACHE_ENTRY_NOT_FOUND;
    }
  }
  return err;
}
/**
 * @brief remove entry, setting oldValue.
 */
GfErrType MapSegment::remove(const std::shared_ptr<CacheableKey>& key,
                             std::shared_ptr<Cacheable>& oldValue,
                             std::shared_ptr<MapEntryImpl>& me, int updateCount,
                             std::shared_ptr<VersionTag> versionTag,
                             bool afterRemote, bool& isEntryFound) {
  std::shared_ptr<MapEntry> entry;
  if (m_concurrencyChecksEnabled) {
    TombstoneExpiryHandler* handler;
    auto id = m_tombstoneList->getExpiryTask(&handler);
    bool expTaskSet = false;
    GfErrType err;
    {
      std::lock_guard<spinlock_mutex> lk(m_spinlock);
      err = removeWhenConcurrencyEnabled(key, oldValue, me, updateCount,
                                         versionTag, afterRemote, isEntryFound,
                                         id, handler, expTaskSet);
    }

    if (!expTaskSet) {
      m_expiryTaskManager->cancelTask(id);
      delete handler;
    }
    return err;
  }

  std::lock_guard<spinlock_mutex> lk(m_spinlock);
  if (m_map->erase(key) == 0) {
    // didn't unbind, probably no entry...
    oldValue = nullptr;
    volatile int destroyTrackers = *m_numDestroyTrackers;
    if (destroyTrackers > 0) {
      m_destroyedKeys[key] = destroyTrackers + 1;
    }
    return GF_CACHE_ENTRY_NOT_FOUND;
  }

  if (updateCount >= 0 && updateCount != entry->getUpdateCount()) {
    // this is the case when entry has been updated while being tracked
    return GF_CACHE_ENTRY_UPDATED;
  }
  auto entryImpl = entry->getImplPtr();
  entryImpl->getValueI(oldValue);
  if (CacheableToken::isTombstone(oldValue)) oldValue = nullptr;
  if (oldValue) {
    me = entryImpl;
  }
  return GF_NOERR;
}

bool MapSegment::unguardedRemoveActualEntry(
    const std::shared_ptr<CacheableKey>& key, bool cancelTask) {
  m_tombstoneList->eraseEntryFromTombstoneList(key, cancelTask);
  if (m_map->erase(key) == 0) {
    return false;
  }
  return true;
}

bool MapSegment::unguardedRemoveActualEntryWithoutCancelTask(
    const std::shared_ptr<CacheableKey>& key, TombstoneExpiryHandler*& handler,
    ExpiryTaskManager::id_type& taskid) {
  std::shared_ptr<MapEntry> entry;
  taskid = m_tombstoneList->eraseEntryFromTombstoneListWithoutCancelTask(
      key, handler);
  if (m_map->erase(key) == 0) {
    return false;
  }
  return true;
}

bool MapSegment::removeActualEntry(const std::shared_ptr<CacheableKey>& key,
                                   bool cancelTask) {
  std::lock_guard<spinlock_mutex> lk(m_spinlock);
  return unguardedRemoveActualEntry(key, cancelTask);
}
/**
 * @brief get MapEntry for key. throws NoEntryException if absent.
 */
bool MapSegment::getEntry(const std::shared_ptr<CacheableKey>& key,
                          std::shared_ptr<MapEntryImpl>& result,
                          std::shared_ptr<Cacheable>& value) {
  std::lock_guard<spinlock_mutex> lk(m_spinlock);

  const auto& find = m_map->find(key);
  if (find == m_map->end()) {
    result = nullptr;
    value = nullptr;
    return false;
  }
  auto entry = find->second;

  // If the value is a tombstone return not found
  auto mePtr = entry->getImplPtr();
  mePtr->getValueI(value);
  if (value == nullptr || CacheableToken::isTombstone(value)) {
    result = nullptr;
    value = nullptr;
    return false;
  }
  result = mePtr;
  return true;
}

/**
 * @brief return true if there exists an entry for the key.
 */
bool MapSegment::containsKey(const std::shared_ptr<CacheableKey>& key) {
  std::lock_guard<spinlock_mutex> lk(m_spinlock);

  const auto& find = m_map->find(key);
  if (find == m_map->end()) {
    return false;
  }
  auto mePtr = find->second;

  // If the value is a tombstone return not found
  std::shared_ptr<Cacheable> value;
  auto mePtr1 = mePtr->getImplPtr();
  mePtr1->getValueI(value);
  if (value != nullptr && CacheableToken::isTombstone(value)) return false;

  return true;
}

/**
 * @brief return the all the keys in the provided list.
 */
void MapSegment::getKeys(std::vector<std::shared_ptr<CacheableKey>>& result) {
  std::lock_guard<spinlock_mutex> lk(m_spinlock);

  for (const auto& kv : *m_map) {
    std::shared_ptr<Cacheable> valuePtr;
    kv.second->getImplPtr()->getValueI(valuePtr);
    if (!CacheableToken::isTombstone(valuePtr)) {
      result.push_back(kv.first);
    }
  }
}

/**
 * @brief return all the entries in the provided list.
 */
void MapSegment::getEntries(std::vector<std::shared_ptr<RegionEntry>>& result) {
  std::lock_guard<spinlock_mutex> lk(m_spinlock);

  for (const auto& kv : *m_map) {
    std::shared_ptr<CacheableKey> keyPtr;
    std::shared_ptr<Cacheable> valuePtr;
    auto me = kv.second->getImplPtr();
    me->getValueI(valuePtr);
    if (valuePtr && !CacheableToken::isTombstone(valuePtr)) {
      if (CacheableToken::isInvalid(valuePtr)) {
        valuePtr = nullptr;
      }
      me->getKeyI(keyPtr);
      auto rePtr = m_region->createRegionEntry(keyPtr, valuePtr);
      result.push_back(rePtr);
    }
  }
}

/**
 * @brief return all values in the provided list.
 */
void MapSegment::getValues(std::vector<std::shared_ptr<Cacheable>>& result) {
  std::lock_guard<spinlock_mutex> lk(m_spinlock);
  for (const auto& kv : *m_map) {
    auto& entry = kv.second;
    std::shared_ptr<Cacheable> value;
    entry->getValue(value);
    auto entryImpl = entry->getImplPtr();

    if (value && !CacheableToken::isInvalid(value) &&
        !CacheableToken::isDestroyed(value) &&
        !CacheableToken::isTombstone(value)) {
      if (CacheableToken::isOverflowed(value)) {  // get Value from disc.
        auto& key = kv.first;
        value = getFromDisc(key, entryImpl);
        entryImpl->setValueI(value);
      }
      result.push_back(value);
    }
  }
}

// This function will not get called if concurrency checks are enabled. The
// versioning
// changes takes care of the version and no need for tracking the entry
int MapSegment::addTrackerForEntry(const std::shared_ptr<CacheableKey>& key,
                                   std::shared_ptr<Cacheable>& oldValue,
                                   bool addIfAbsent, bool failIfPresent,
                                   bool incUpdateCount) {
  if (m_concurrencyChecksEnabled) return -1;
  std::lock_guard<spinlock_mutex> lk(m_spinlock);
  std::shared_ptr<MapEntry> entry;
  std::shared_ptr<MapEntry> newEntry;
  const auto& find = m_map->find(key);
  if (find == m_map->end()) {
    oldValue = nullptr;
    if (addIfAbsent) {
      std::shared_ptr<MapEntryImpl> entryImpl;
      // add a new entry with value as destroyed
      m_entryFactory->newMapEntry(m_expiryTaskManager, key, entryImpl);
      entryImpl->setValueI(CacheableToken::destroyed());
      entry = entryImpl;
      newEntry = entryImpl;
    } else {
      // return -1 without adding an entry
      return -1;
    }
  } else {
    entry = find->second;
    entry->getValue(oldValue);
    if (failIfPresent) {
      // return -1 without adding an entry; the callee should check on
      // oldValue to distinguish this case from "addIfAbsent==false" case
      return -1;
    }
  }
  int updateCount;
  if (incUpdateCount) {
    (void)entry->addTracker(newEntry);
    updateCount = entry->incrementUpdateCount(newEntry);
  } else {
    updateCount = entry->addTracker(newEntry);
  }
  if (newEntry) {
    if (find == m_map->end()) {
      m_map->emplace(key, newEntry);
    } else {
      find->second = newEntry;
    }
  }
  return updateCount;
}

// This function will not get called if concurrency checks are enabled. The
// versioning
// changes takes care of the version and no need for tracking the entry
void MapSegment::removeTrackerForEntry(
    const std::shared_ptr<CacheableKey>& key) {
  if (m_concurrencyChecksEnabled) return;
  std::lock_guard<spinlock_mutex> lk(m_spinlock);

  const auto& find = m_map->find(key);
  if (find != m_map->end()) {
    auto& entry = find->second;
    auto impl = entry->getImplPtr();
    removeTrackerForEntry(key, entry, impl);
  }
}

// This function will not get called if concurrency checks are enabled. The
// versioning
// changes takes care of the version and no need for tracking the entry
void MapSegment::addTrackerForAllEntries(
    MapOfUpdateCounters& updateCounterMap) {
  if (m_concurrencyChecksEnabled) return;
  std::lock_guard<spinlock_mutex> lk(m_spinlock);

  std::shared_ptr<MapEntry> newEntry;
  std::shared_ptr<CacheableKey> key;
  for (auto& kv : *m_map) {
    kv.second->getKey(key);
    int updateCount = kv.second->addTracker(newEntry);
    if (newEntry != nullptr) {
      kv.second = newEntry;
    }
    updateCounterMap.emplace(key, updateCount);
  }
}

// This function will not get called if concurrency checks are enabled. The
// versioning
// changes takes care of the version and no need for tracking the entry
void MapSegment::removeDestroyTracking() {
  if (m_concurrencyChecksEnabled) return;
  std::lock_guard<spinlock_mutex> lk(m_spinlock);
  m_destroyedKeys.clear();
}

/**
 * @brief replace the existing hash map with one that is wider
 *   to reduce collision chains.
 */
void MapSegment::rehash() {
  // Only called from put, segment must already be locked...
  auto newMapSize = TableOfPrimes::getPrime(++m_primeIndex);
  LOGFINER("Rehashing MapSegment to size %d.", newMapSize);
  m_map->reserve(newMapSize);
  m_rehashCount++;
}
std::shared_ptr<Cacheable> MapSegment::getFromDisc(
    std::shared_ptr<CacheableKey> key,
    std::shared_ptr<MapEntryImpl>& entryImpl) {
  auto* lregion = static_cast<LocalRegion*>(m_region);
  EntriesMap* em = lregion->getEntryMap();
  return em->getFromDisk(key, entryImpl);
}

GfErrType MapSegment::putForTrackedEntry(
    const std::shared_ptr<CacheableKey>& key,
    const std::shared_ptr<Cacheable>& newValue,
    std::shared_ptr<MapEntry>& entry, std::shared_ptr<MapEntryImpl>& entryImpl,
    int updateCount, VersionStamp& versionStamp, DataInput* delta) {
  if (updateCount < 0 || m_concurrencyChecksEnabled) {
    // for a non-tracked put (e.g. from notification) go ahead with the
    // create/update and increment the update counter
    auto* thinClientRegion = dynamic_cast<ThinClientRegion*>(m_region);
    ThinClientPoolDM* m_poolDM = nullptr;
    if (thinClientRegion) {
      m_poolDM =
          dynamic_cast<ThinClientPoolDM*>(thinClientRegion->getDistMgr());
    }

    if (delta != nullptr) {
      std::shared_ptr<Cacheable> oldValue;
      entryImpl->getValueI(oldValue);
      if (oldValue == nullptr || CacheableToken::isDestroyed(oldValue) ||
          CacheableToken::isInvalid(oldValue) ||
          CacheableToken::isTombstone(oldValue)) {
        if (m_poolDM) {
          m_poolDM->updateNotificationStats(false, std::chrono::nanoseconds(0));
        }
        return GF_INVALID_DELTA;
      } else if (CacheableToken::isOverflowed(
                     oldValue)) {  // get Value from disc.
        oldValue = getFromDisc(key, entryImpl);
        if (oldValue == nullptr) {
          if (m_poolDM) {
            m_poolDM->updateNotificationStats(false,
                                              std::chrono::nanoseconds(0));
          }
          return GF_INVALID_DELTA;
        }
      }

      using clock = std::chrono::steady_clock;

      auto valueWithDelta = std::dynamic_pointer_cast<Delta>(oldValue);
      auto& newValue1 = const_cast<std::shared_ptr<Cacheable>&>(newValue);
      try {
        if (m_region->getAttributes().getCloningEnabled()) {
          auto tempVal = valueWithDelta->clone();
          auto currTimeBefore = clock::now();
          tempVal->fromDelta(*delta);

          if (m_poolDM) {
            m_poolDM->updateNotificationStats(true,
                                              clock::now() - currTimeBefore);
          }
          newValue1 = std::dynamic_pointer_cast<Serializable>(tempVal);
          entryImpl->setValueI(newValue1);
        } else {
          auto currTimeBefore = clock::now();
          valueWithDelta->fromDelta(*delta);
          newValue1 = std::dynamic_pointer_cast<Serializable>(valueWithDelta);

          if (m_poolDM) {
            m_poolDM->updateNotificationStats(true,
                                              clock::now() - currTimeBefore);
          }
          entryImpl->setValueI(
              std::dynamic_pointer_cast<Serializable>(valueWithDelta));
        }
      } catch (InvalidDeltaException&) {
        return GF_INVALID_DELTA;
      }
    } else {
      entryImpl->setValueI(newValue);
    }
    if (m_concurrencyChecksEnabled) {
      // erase if the entry is in tombstone
      m_tombstoneList->eraseEntryFromTombstoneList(key, true);
      entryImpl->getVersionStamp().setVersions(versionStamp);
    }
    (void)incrementUpdateCount(key, entry);
    return GF_NOERR;
  } else if (updateCount == entry->getUpdateCount()) {
    // good case; go ahead with the create/update
    entryImpl->setValueI(newValue);
    removeTrackerForEntry(key, entry, entryImpl);
    return GF_NOERR;
  } else {
    // entry updated while tracking was being done
    // abort the create/update and do not change the oldValue or MapEntry
    removeTrackerForEntry(key, entry, entryImpl);
    return GF_CACHE_ENTRY_UPDATED;
  }
}
void MapSegment::reapTombstones(std::map<uint16_t, int64_t>& gcVersions) {
  std::lock_guard<spinlock_mutex> lk(m_spinlock);
  m_tombstoneList->reapTombstones(gcVersions);
}
void MapSegment::reapTombstones(std::shared_ptr<CacheableHashSet> removedKeys) {
  std::lock_guard<spinlock_mutex> lk(m_spinlock);
  m_tombstoneList->reapTombstones(removedKeys);
}

GfErrType MapSegment::isTombstone(std::shared_ptr<CacheableKey> key,
                                  std::shared_ptr<MapEntryImpl>& me,
                                  bool& result) {
  std::shared_ptr<Cacheable> value;
  std::shared_ptr<MapEntryImpl> mePtr;
  const auto& find = m_map->find(key);
  if (find == m_map->end()) {
    result = false;
    return GF_NOERR;
  }
  auto& entry = find->second;
  mePtr = entry->getImplPtr();

  if (!mePtr) {
    result = false;
    return GF_NOERR;
  }

  mePtr->getValueI(value);
  if (!value) {
    result = false;
    return GF_NOERR;
  }

  if (CacheableToken::isTombstone(value)) {
    if (m_tombstoneList->exists(key)) {
      std::shared_ptr<MapEntry> entry;
      const auto find = m_map->find(key);
      if (find != m_map->end()) {
        auto mePtr = find->second->getImplPtr();
        me = mePtr;
      }
      result = true;
      return GF_NOERR;
    } else {
      LOGFINER("1 result= false return GF_CACHE_ILLEGAL_STATE_EXCEPTION");
      result = false;
      return GF_CACHE_ILLEGAL_STATE_EXCEPTION;
    }

  } else {
    if (m_tombstoneList->exists(key)) {
      LOGFINER(" 2 result= false return GF_CACHE_ILLEGAL_STATE_EXCEPTION");
      result = false;
      return GF_CACHE_ILLEGAL_STATE_EXCEPTION;
    } else {
      result = false;
      return GF_NOERR;
    }
  }
}
}  // namespace client
}  // namespace geode
}  // namespace apache
