blob: bd8f3de40933f19e3ecfbb6240292a8d51e9f373 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "LRUEntriesMap.hpp"
#include <mutex>
#include "CacheImpl.hpp"
#include "ExpiryTaskManager.hpp"
#include "LRUList.cpp"
#include "MapSegment.hpp"
#include "util/concurrent/spinlock_mutex.hpp"
namespace apache {
namespace geode {
namespace client {
/**
* @brief LRUAction for testing map outside of a region....
*/
class APACHE_GEODE_EXPORT TestMapAction : public virtual LRUAction {
private:
EntriesMap* m_eMap;
public:
explicit TestMapAction(EntriesMap* eMap) : m_eMap(eMap) { m_destroys = true; }
virtual ~TestMapAction() {}
virtual bool evict(const std::shared_ptr<MapEntryImpl>& mePtr) {
std::shared_ptr<CacheableKey> keyPtr;
mePtr->getKeyI(keyPtr);
/** @TODO try catch.... return true or false. */
std::shared_ptr<Cacheable> cPtr; // old value.
std::shared_ptr<MapEntryImpl> me;
std::shared_ptr<VersionTag> versionTag;
return (m_eMap->remove(keyPtr, cPtr, me, 0, versionTag, false) == GF_NOERR);
}
virtual LRUAction::Action getType() { return LRUAction::LOCAL_DESTROY; }
friend class LRUAction;
};
LRUEntriesMap::LRUEntriesMap(ExpiryTaskManager* expiryTaskManager,
std::unique_ptr<EntryFactory> entryFactory,
RegionInternal* region,
const LRUAction::Action& lruAction,
const uint32_t limit,
bool concurrencyChecksEnabled,
const uint8_t concurrency, bool heapLRUEnabled)
: ConcurrentEntriesMap(expiryTaskManager, std::move(entryFactory),
concurrencyChecksEnabled, region, concurrency),
m_lruList(),
m_limit(limit),
m_pmPtr(nullptr),
m_validEntries(0),
m_heapLRUEnabled(heapLRUEnabled) {
m_currentMapSize = 0;
m_action = nullptr;
m_evictionControllerPtr = nullptr;
// translate action type to an instance.
if (region) {
m_action = LRUAction::newLRUAction(lruAction, region, this);
m_name = region->getName();
CacheImpl* cImpl = region->getCacheImpl();
if (cImpl != nullptr) {
m_evictionControllerPtr = cImpl->getEvictionController();
if (m_evictionControllerPtr != nullptr) {
m_evictionControllerPtr->registerRegion(m_name);
LOGINFO("Heap LRU eviction controller registered region %s",
m_name.c_str());
}
}
} else {
m_action = new TestMapAction(this);
}
}
void LRUEntriesMap::close() {
if (m_evictionControllerPtr != nullptr) {
m_evictionControllerPtr->updateRegionHeapInfo((-1 * (m_currentMapSize)));
m_evictionControllerPtr->deregisterRegion(m_name);
}
ConcurrentEntriesMap::close();
}
void LRUEntriesMap::clear() {
updateMapSize((-1 * (m_currentMapSize)));
ConcurrentEntriesMap::clear();
}
LRUEntriesMap::~LRUEntriesMap() { delete m_action; }
/**
* @brief put an item in the map... if it is a new entry, then the LRU may
* need to be consulted.
* If LRUAction is LRUInvalidateAction, then increment if old value was absent.
* If LRUAction is one of Destroys, then increment if old Entry was absent.
*/
GfErrType LRUEntriesMap::create(const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& newValue,
std::shared_ptr<MapEntryImpl>& me,
std::shared_ptr<Cacheable>& oldValue,
int updateCount, int destroyTracker,
std::shared_ptr<VersionTag> versionTag) {
MapSegment* segmentRPtr = segmentFor(key);
GfErrType err = GF_NOERR;
{ // SYNCHRONIZE_SEGMENT(segmentRPtr);
std::shared_ptr<MapEntryImpl> mePtr;
if ((err = segmentRPtr->create(key, newValue, me, oldValue, updateCount,
destroyTracker, versionTag)) != GF_NOERR) {
return err;
}
// TODO: can newValue ever be a token ??
if (!CacheableToken::isToken(newValue)) {
m_validEntries++;
}
// oldValue can be an invalid token when "createIfInvalid" is true
if (!CacheableToken::isInvalid(oldValue)) {
m_size++;
}
std::shared_ptr<Cacheable> tmpValue;
segmentRPtr->getEntry(key, mePtr, tmpValue);
if (mePtr == nullptr) {
return err;
}
m_lruList.appendEntry(mePtr);
me = mePtr;
}
if (m_evictionControllerPtr != nullptr) {
int64_t newSize =
static_cast<int64_t>(Utils::checkAndGetObjectSize(newValue));
newSize += static_cast<int64_t>(Utils::checkAndGetObjectSize(key));
if (oldValue != nullptr) {
newSize -= static_cast<int64_t>(oldValue->objectSize());
} else {
newSize -= static_cast<int64_t>(sizeof(void*));
}
updateMapSize(newSize);
}
err = processLRU();
return err;
}
GfErrType LRUEntriesMap::processLRU() {
GfErrType canEvict = GF_NOERR;
while (canEvict == GF_NOERR && mustEvict()) {
canEvict = evictionHelper();
}
return canEvict;
}
GfErrType LRUEntriesMap::evictionHelper() {
GfErrType err = GF_NOERR;
// ACE_Guard< ACE_Recursive_Thread_Mutex > guard( m_mutex );
std::shared_ptr<MapEntryImpl> lruEntryPtr;
m_lruList.getLRUEntry(lruEntryPtr);
if (lruEntryPtr == nullptr) {
err = GF_ENOENT;
return err;
}
bool IsEvictDone = m_action->evict(lruEntryPtr);
if (m_action->overflows() && IsEvictDone) {
--m_validEntries;
lruEntryPtr->getLRUProperties().setEvicted();
}
if (!IsEvictDone) {
err = GF_DISKFULL;
return err;
}
return err;
}
void LRUEntriesMap::processLRU(int32_t numEntriesToEvict) {
int32_t evicted = 0;
for (int32_t i = 0; i < numEntriesToEvict; i++) {
if (m_validEntries > 0 && size() > 0) {
if (evictionHelper() == GF_NOERR) {
evicted++;
}
} else {
break;
}
}
}
GfErrType LRUEntriesMap::invalidate(const std::shared_ptr<CacheableKey>& key,
std::shared_ptr<MapEntryImpl>& me,
std::shared_ptr<Cacheable>& oldValue,
std::shared_ptr<VersionTag> versionTag) {
int64_t newSize = 0;
MapSegment* segmentRPtr = segmentFor(key);
bool isTokenAdded = false;
GfErrType err =
segmentRPtr->invalidate(key, me, oldValue, versionTag, isTokenAdded);
if (isTokenAdded) {
++m_size;
}
if (err != GF_NOERR) {
return err;
}
bool isOldValueToken = CacheableToken::isToken(oldValue);
// get the old value first which is required for heapLRU
// calculation and for listeners; note even though there is a race
// here between get and destroy, it will not harm if we get a slightly
// later value
// TODO: assess any other effects of this race
if (CacheableToken::isOverflowed(oldValue)) {
ACE_Guard<MapSegment> _guard(*segmentRPtr);
auto&& persistenceInfo = me->getLRUProperties().getPersistenceInfo();
// get the old value first which is required for heapLRU
// calculation and for listeners; note even though there is a race
// here between get and destroy, it will not harm if we get a slightly
// older value
// TODO: there is also a race between segment remove and destroy here
// need to assess the effect of this; also assess the affect of above
// mentioned race
oldValue = m_pmPtr->read(key, persistenceInfo);
if (oldValue != nullptr) {
m_pmPtr->destroy(key, persistenceInfo);
}
}
if (!isOldValueToken) {
--m_validEntries;
me->getLRUProperties().setEvicted();
newSize = CacheableToken::invalid()->objectSize();
if (oldValue != nullptr) {
newSize -= oldValue->objectSize();
} else {
newSize -= sizeof(void*);
}
if (m_evictionControllerPtr != nullptr) {
if (newSize != 0) {
updateMapSize(newSize);
}
}
}
return err;
}
GfErrType LRUEntriesMap::put(const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& newValue,
std::shared_ptr<MapEntryImpl>& me,
std::shared_ptr<Cacheable>& oldValue,
int updateCount, int destroyTracker,
std::shared_ptr<VersionTag> versionTag,
bool& isUpdate, DataInput* delta) {
MapSegment* segmentRPtr = segmentFor(key);
GF_D_ASSERT(segmentRPtr != nullptr);
GfErrType err = GF_NOERR;
bool segmentLocked = false;
{
if (m_action != nullptr &&
m_action->getType() == LRUAction::OVERFLOW_TO_DISK) {
segmentRPtr->acquire();
segmentLocked = true;
}
std::shared_ptr<MapEntryImpl> mePtr;
if ((err = segmentRPtr->put(key, newValue, me, oldValue, updateCount,
destroyTracker, isUpdate, versionTag, delta)) !=
GF_NOERR) {
if (segmentLocked == true) segmentRPtr->release();
return err;
}
bool isOldValueToken = CacheableToken::isToken(oldValue);
// TODO: need tests for checking that oldValue is returned
// correctly to listeners for overflow -- this is for all operations
// put, invalidate, destroy
// TODO: need tests for checking if the overflowed entry is
// destroyed *from disk* in put
// existing overflowed entry should be returned correctly in
// put and removed from disk
// TODO: need to verify if segment mutex lock is enough here
// TODO: This whole class needs to be rethought and reworked for locking
// and concurrency issues. The basic issue is two concurrent operations
// on the same key regardless of whether we need to go to the persistence
// manager or not. So all operations on the same key should be protected
// unless very careful thought has gone into it.
if (CacheableToken::isOverflowed(oldValue)) {
if (!segmentLocked) {
segmentRPtr->release();
segmentLocked = true;
}
auto&& persistenceInfo = me->getLRUProperties().getPersistenceInfo();
oldValue = m_pmPtr->read(key, persistenceInfo);
if (oldValue != nullptr) {
m_pmPtr->destroy(key, persistenceInfo);
}
}
// SpinLock& lock = segmentRPtr->getSpinLock();
// std::lock_guard<spinlock_mutex> mapGuard( lock );
// TODO: when can newValue be a token ??
if (CacheableToken::isToken(newValue) && !isOldValueToken) {
--m_validEntries;
}
if (!CacheableToken::isToken(newValue) && isOldValueToken) {
++m_validEntries;
}
// Add new entry to LRU list
if (isUpdate == false) {
++m_size;
++m_validEntries;
std::shared_ptr<Cacheable> tmpValue;
segmentRPtr->getEntry(key, mePtr, tmpValue);
// mePtr cannot be null, we just put it...
// must convert to an std::shared_ptr<LRUMapEntryImpl>...
GF_D_ASSERT(mePtr != nullptr);
m_lruList.appendEntry(mePtr);
me = mePtr;
} else {
if (!CacheableToken::isToken(newValue) && isOldValueToken) {
std::shared_ptr<Cacheable> tmpValue;
segmentRPtr->getEntry(key, mePtr, tmpValue);
mePtr->getLRUProperties().clearEvicted();
m_lruList.appendEntry(
std::shared_ptr<MapEntryImpl>(mePtr->getImplPtr()));
me = mePtr;
}
}
}
if (m_evictionControllerPtr != nullptr) {
int64_t newSize =
static_cast<int64_t>(Utils::checkAndGetObjectSize(newValue));
/*
if (newSize == 0) {
LOGWARN("Object size for class ID %d should not be zero when HeapLRU is
enabled", newValue->classId());
LOGDEBUG("Type ID is %d for the object returning zero HeapLRU size",
newValue->typeId());
}
*/
if (isUpdate == false) {
newSize += static_cast<int64_t>(Utils::checkAndGetObjectSize(key));
} else {
if (oldValue != nullptr) {
newSize -= static_cast<int64_t>(oldValue->objectSize());
} else {
newSize -= static_cast<int64_t>(sizeof(void*));
}
}
updateMapSize(newSize);
}
err = processLRU();
if (segmentLocked) {
segmentRPtr->release();
}
return err;
}
/**
* @brief Get the value from an entry, if the entry exists it will be marked
* as recently used. Note, getEntry, entries, and values do not mark entries
* as recently used.
*/
bool LRUEntriesMap::get(const std::shared_ptr<CacheableKey>& key,
std::shared_ptr<Cacheable>& returnPtr,
std::shared_ptr<MapEntryImpl>& me) {
bool doProcessLRU = false;
MapSegment* segmentRPtr = segmentFor(key);
bool segmentLocked = false;
if (m_action != nullptr &&
m_action->getType() == LRUAction::OVERFLOW_TO_DISK) {
segmentRPtr->acquire();
segmentLocked = true;
}
{
returnPtr = nullptr;
std::shared_ptr<MapEntryImpl> mePtr;
if (false == segmentRPtr->getEntry(key, mePtr, returnPtr)) {
if (segmentLocked == true) segmentRPtr->release();
return false;
}
// segmentRPtr->get(key, returnPtr, mePtr);
auto nodeToMark = mePtr;
LRUEntryProperties& lruProps = nodeToMark->getLRUProperties();
if (returnPtr != nullptr && CacheableToken::isOverflowed(returnPtr)) {
auto&& persistenceInfo = lruProps.getPersistenceInfo();
std::shared_ptr<Cacheable> tmpObj;
try {
tmpObj = m_pmPtr->read(key, persistenceInfo);
} catch (Exception& ex) {
LOGERROR("read on the persistence layer failed - %s", ex.what());
if (segmentLocked == true) segmentRPtr->release();
return false;
}
m_region->getRegionStats()->incRetrieves();
m_region->getCacheImpl()->getCachePerfStats().incRetrieves();
returnPtr = tmpObj;
std::shared_ptr<Cacheable> oldValue;
bool isUpdate;
std::shared_ptr<VersionTag> versionTag;
if (GF_NOERR == segmentRPtr->put(key, tmpObj, mePtr, oldValue, 0, 0,
isUpdate, versionTag, nullptr)) {
// m_entriesRetrieved++;
++m_validEntries;
lruProps.clearEvicted();
m_lruList.appendEntry(nodeToMark);
}
doProcessLRU = true;
if (m_evictionControllerPtr != nullptr) {
int64_t newSize = 0;
if (tmpObj != nullptr) {
newSize += static_cast<int64_t>(
tmpObj->objectSize() - CacheableToken::invalid()->objectSize());
} else {
newSize += sizeof(void*);
}
updateMapSize(newSize);
}
}
me = mePtr;
// lruProps.clearEvicted();
lruProps.setRecentlyUsed();
if (doProcessLRU) {
GfErrType IsProcessLru = processLRU();
if ((IsProcessLru != GF_NOERR)) {
if (segmentLocked) {
segmentRPtr->release();
}
return false;
}
}
if (segmentLocked) {
segmentRPtr->release();
}
return true;
}
}
GfErrType LRUEntriesMap::remove(const std::shared_ptr<CacheableKey>& key,
std::shared_ptr<Cacheable>& result,
std::shared_ptr<MapEntryImpl>& me,
int updateCount,
std::shared_ptr<VersionTag> versionTag,
bool afterRemote) {
MapSegment* segmentRPtr = segmentFor(key);
bool isEntryFound = true;
GfErrType err;
if ((err = segmentRPtr->remove(key, result, me, updateCount, versionTag,
afterRemote, isEntryFound)) == GF_NOERR) {
// ACE_Guard<MapSegment> _guard(*segmentRPtr);
if (result != nullptr && me != nullptr) {
LRUEntryProperties& lruProps = me->getLRUProperties();
lruProps.setEvicted();
if (isEntryFound) --m_size;
if (!CacheableToken::isToken(result)) {
--m_validEntries;
}
if (CacheableToken::isOverflowed(result)) {
ACE_Guard<MapSegment> _guard(*segmentRPtr);
auto&& persistenceInfo = lruProps.getPersistenceInfo();
// get the old value first which is required for heapLRU
// calculation and for listeners; note even though there is a race
// here between get and destroy, it will not harm if we get a slightly
// older value
// TODO: there is also a race between segment remove and destroy here
// need to assess the effect of this; also assess the affect of above
// mentioned race
result = m_pmPtr->read(key, persistenceInfo);
if (result != nullptr) {
m_pmPtr->destroy(key, persistenceInfo);
}
}
if (m_evictionControllerPtr != nullptr) {
int64_t sizeToRemove = static_cast<int64_t>(key->objectSize());
sizeToRemove += static_cast<int64_t>(result->objectSize());
updateMapSize((-1 * sizeToRemove));
}
}
}
return err;
}
void LRUEntriesMap::updateMapSize(int64_t size) {
// TODO: check and remove null check since this has already been done
// by all the callers
if (m_evictionControllerPtr != nullptr) {
{
std::lock_guard<spinlock_mutex> __guard(m_mapInfoLock);
m_currentMapSize += size;
}
m_evictionControllerPtr->updateRegionHeapInfo(size);
}
}
std::shared_ptr<Cacheable> LRUEntriesMap::getFromDisk(
const std::shared_ptr<CacheableKey>& key,
std::shared_ptr<MapEntryImpl>& me) const {
auto&& persistenceInfo = me->getLRUProperties().getPersistenceInfo();
std::shared_ptr<Cacheable> tmpObj;
try {
LOGDEBUG("Reading value from persistence layer for key: %s",
key->toString().c_str());
tmpObj = m_pmPtr->read(key, persistenceInfo);
} catch (Exception& ex) {
LOGERROR("read on the persistence layer failed - %s", ex.what());
return nullptr;
}
return tmpObj;
}
} // namespace client
} // namespace geode
} // namespace apache