blob: 79e469da8c864f05213f79e8237f258ae76435dc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "LocalRegion.hpp"
#include <sstream>
#include <vector>
#include <geode/PoolManager.hpp>
#include <geode/SystemProperties.hpp>
#include "CacheImpl.hpp"
#include "CacheRegionHelper.hpp"
#include "CacheableToken.hpp"
#include "EntryExpiryHandler.hpp"
#include "ExpiryTaskManager.hpp"
#include "LRUEntriesMap.hpp"
#include "RegionExpiryHandler.hpp"
#include "RegionGlobalLocks.hpp"
#include "SerializableHelper.hpp"
#include "TXState.hpp"
#include "TcrConnectionManager.hpp"
#include "Utils.hpp"
#include "VersionTag.hpp"
#include "util/Log.hpp"
#include "util/bounds.hpp"
#include "util/exception.hpp"
namespace apache {
namespace geode {
namespace client {
LocalRegion::LocalRegion(const std::string& name, CacheImpl* cacheImpl,
const std::shared_ptr<RegionInternal>& rPtr,
RegionAttributes attributes,
const std::shared_ptr<CacheStatistics>& stats,
bool enableTimeStatistics)
: RegionInternal(cacheImpl, attributes),
m_name(name),
m_parentRegion(rPtr),
m_destroyPending(false),
m_listener(nullptr),
m_writer(nullptr),
m_loader(nullptr),
m_released(false),
m_entries(nullptr),
m_cacheStatistics(stats),
m_transactionEnabled(false),
m_isPRSingleHopEnabled(false),
m_attachedPool(nullptr),
m_enableTimeStatistics(enableTimeStatistics),
m_persistenceManager(nullptr) {
if (m_parentRegion != nullptr) {
((m_fullPath = m_parentRegion->getFullPath()) += "/") += m_name;
} else {
(m_fullPath = "/") += m_name;
}
// create entries map based on RegionAttributes...
if (attributes.getCachingEnabled()) {
m_entries = EntriesMapFactory::createMap(this, m_regionAttributes);
}
// Initialize callbacks
std::shared_ptr<CacheListener> clptr;
std::shared_ptr<CacheWriter> cwptr;
clptr = m_regionAttributes.getCacheListener();
m_listener = clptr;
cwptr = m_regionAttributes.getCacheWriter();
m_writer = cwptr;
std::shared_ptr<CacheLoader> cldptr;
cldptr = m_regionAttributes.getCacheLoader();
m_loader = cldptr;
if (m_parentRegion != nullptr) {
((m_fullPath = m_parentRegion->getFullPath()) += "/") += m_name;
} else {
(m_fullPath = "/") += m_name;
}
m_regionStats = new RegionStats(
cacheImpl->getStatisticsManager().getStatisticsFactory(), m_fullPath);
auto p = cacheImpl->getPoolManager().find(getAttributes().getPoolName());
setPool(p);
}
const std::string& LocalRegion::getName() const { return m_name; }
const std::string& LocalRegion::getFullPath() const { return m_fullPath; }
std::shared_ptr<Region> LocalRegion::getParentRegion() const {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::getParentRegion);
return m_parentRegion;
}
void LocalRegion::updateAccessAndModifiedTime(bool modified) {
// locking not required since setters use atomic operations
if (regionExpiryEnabled()) {
auto now = std::chrono::system_clock::now();
LOGDEBUG("Setting last accessed time for region %s to %d",
getFullPath().c_str(), now.time_since_epoch().count());
m_cacheStatistics->setLastAccessedTime(now);
if (modified) {
LOGDEBUG("Setting last modified time for region %s to %d",
getFullPath().c_str(), now.time_since_epoch().count());
m_cacheStatistics->setLastModifiedTime(now);
}
// TODO: should we really touch the parent region??
RegionInternal* ri = dynamic_cast<RegionInternal*>(m_parentRegion.get());
if (ri != nullptr) {
ri->updateAccessAndModifiedTime(modified);
}
}
}
std::shared_ptr<CacheStatistics> LocalRegion::getStatistics() const {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::getStatistics);
bool m_statisticsEnabled = true;
auto& props = m_cacheImpl->getDistributedSystem().getSystemProperties();
m_statisticsEnabled = props.statisticsEnabled();
if (!m_statisticsEnabled) {
throw StatisticsDisabledException(
"LocalRegion::getStatistics statistics disabled for this region");
}
return m_cacheStatistics;
}
void LocalRegion::invalidateRegion(
const std::shared_ptr<Serializable>& aCallbackArgument) {
GfErrType err =
invalidateRegionNoThrow(aCallbackArgument, CacheEventFlags::NORMAL);
GfErrTypeToException("Region::invalidateRegion", err);
}
void LocalRegion::localInvalidateRegion(
const std::shared_ptr<Serializable>& aCallbackArgument) {
GfErrType err =
invalidateRegionNoThrow(aCallbackArgument, CacheEventFlags::LOCAL);
GfErrTypeToException("Region::localInvalidateRegion", err);
}
void LocalRegion::destroyRegion(
const std::shared_ptr<Serializable>& aCallbackArgument) {
GfErrType err =
destroyRegionNoThrow(aCallbackArgument, true, CacheEventFlags::NORMAL);
GfErrTypeToException("Region::destroyRegion", err);
}
void LocalRegion::localDestroyRegion(
const std::shared_ptr<Serializable>& aCallbackArgument) {
GfErrType err =
destroyRegionNoThrow(aCallbackArgument, true, CacheEventFlags::LOCAL);
GfErrTypeToException("Region::localDestroyRegion", err);
}
void LocalRegion::tombstoneOperationNoThrow(
const std::shared_ptr<CacheableHashMap>& tombstoneVersions,
const std::shared_ptr<CacheableHashSet>& tombstoneKeys) {
bool cachingEnabled = m_regionAttributes.getCachingEnabled();
if (!cachingEnabled) return;
if (tombstoneVersions) {
std::map<uint16_t, int64_t> gcVersions;
for (const auto& itr : *tombstoneVersions) {
if (auto member =
std::dynamic_pointer_cast<DSMemberForVersionStamp>(itr.first)) {
uint16_t memberId =
getCacheImpl()->getMemberListForVersionStamp()->add(member);
int64_t version =
(std::dynamic_pointer_cast<CacheableInt64>(itr.second))->value();
gcVersions[memberId] = version;
} else {
LOGERROR(
"tombstone_operation contains incorrect gc versions in the "
"message. Region " +
getFullPath());
continue;
}
}
m_entries->reapTombstones(gcVersions);
} else {
m_entries->reapTombstones(tombstoneKeys);
}
}
std::shared_ptr<Region> LocalRegion::findSubRegion(const std::string& name) {
auto&& lock = m_subRegions.make_lock<std::lock_guard>();
const auto& find = m_subRegions.find(name);
if (find != m_subRegions.end()) {
return find->second;
}
return nullptr;
}
std::shared_ptr<Region> LocalRegion::getSubregion(const std::string& path) {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::getSubregion);
static const std::string slash("/");
if (path == slash || path.empty()) {
LOGERROR("Get subregion path [" + path + "] is not valid.");
throw IllegalArgumentException("Get subegion path is empty or a /");
}
auto fullname = path;
if (fullname.substr(0, 1) == slash) {
fullname = path.substr(1);
}
// find second separator
auto idx = fullname.find('/');
auto stepname = fullname.substr(0, idx);
auto region = findSubRegion(stepname);
if (region) {
if (stepname == fullname) {
// done...
return region;
} else {
std::string remainder = fullname.substr(stepname.length() + 1);
return region->getSubregion(remainder);
}
}
return nullptr;
}
std::shared_ptr<Region> LocalRegion::createSubregion(
const std::string& subregionName, RegionAttributes regionAttributes) {
CHECK_DESTROY_PENDING(TryWriteGuard, LocalRegion::createSubregion);
{
std::string namestr = subregionName;
if (namestr.find('/') != std::string::npos) {
throw IllegalArgumentException(
"Malformed name string, contains region path seperator '/'");
}
}
auto&& lock = m_subRegions.make_lock();
std::shared_ptr<Region> region_ptr;
if (m_subRegions.find(subregionName) != m_subRegions.end()) {
throw RegionExistsException(
"LocalRegion::createSubregion: named region exists in the region");
}
auto csptr = std::make_shared<CacheStatistics>();
auto rPtr = m_cacheImpl->createRegion_internal(
subregionName,
std::static_pointer_cast<RegionInternal>(shared_from_this()),
regionAttributes, csptr, false);
region_ptr = rPtr;
if (!rPtr) {
throw OutOfMemoryException("createSubregion: failed to create region");
}
// Instantiate a PersistenceManager object if DiskPolicy is overflow
if (regionAttributes.getDiskPolicy() == DiskPolicyType::OVERFLOWS) {
auto pmPtr = regionAttributes.getPersistenceManager();
if (pmPtr == nullptr) {
throw NullPointerException(
"PersistenceManager could not be instantiated");
}
auto props = regionAttributes.getPersistenceProperties();
pmPtr->init(std::shared_ptr<Region>(rPtr), props);
rPtr->setPersistenceManager(pmPtr);
}
rPtr->acquireReadLock();
m_subRegions.emplace(rPtr->getName(), rPtr);
// schedule the sub region expiry if regionExpiry enabled.
rPtr->setRegionExpiryTask();
rPtr->releaseReadLock();
return region_ptr;
}
std::vector<std::shared_ptr<Region>> LocalRegion::subregions(
const bool recursive) {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::subregions);
if (m_subRegions.empty()) {
return std::vector<std::shared_ptr<Region>>();
}
return subregions_internal(recursive);
}
std::shared_ptr<RegionEntry> LocalRegion::getEntry(
const std::shared_ptr<CacheableKey>& key) {
if (getTXState() != nullptr) {
GfErrTypeThrowException("GetEntry is not supported in transaction",
GF_NOTSUP);
}
std::shared_ptr<RegionEntry> rptr;
std::shared_ptr<Cacheable> valuePtr;
getEntry(key, valuePtr);
if (valuePtr != nullptr) {
rptr = createRegionEntry(key, valuePtr);
}
return rptr;
}
void LocalRegion::getEntry(const std::shared_ptr<CacheableKey>& key,
std::shared_ptr<Cacheable>& valuePtr) {
if (key == nullptr) {
throw IllegalArgumentException("LocalRegion::getEntry: null key");
}
std::shared_ptr<MapEntryImpl> mePtr;
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::getEntry);
if (m_regionAttributes.getCachingEnabled()) {
m_entries->getEntry(key, mePtr, valuePtr);
}
}
std::shared_ptr<Cacheable> LocalRegion::get(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Serializable>& aCallbackArgument) {
std::shared_ptr<Cacheable> rptr;
int64_t sampleStartNanos = startStatOpTime();
GfErrType err = getNoThrow(key, rptr, aCallbackArgument);
updateStatOpTime(m_regionStats->getStat(), m_regionStats->getGetTimeId(),
sampleStartNanos);
// rptr = handleReplay(err, rptr);
GfErrTypeToException("Region::get", err);
return rptr;
}
void LocalRegion::put(const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
const std::shared_ptr<Serializable>& aCallbackArgument) {
std::shared_ptr<Cacheable> oldValue;
int64_t sampleStartNanos = startStatOpTime();
std::shared_ptr<VersionTag> versionTag;
GfErrType err = putNoThrow(key, value, aCallbackArgument, oldValue, -1,
CacheEventFlags::NORMAL, versionTag);
updateStatOpTime(m_regionStats->getStat(), m_regionStats->getPutTimeId(),
sampleStartNanos);
// handleReplay(err, nullptr);
GfErrTypeToException("Region::put", err);
}
void LocalRegion::localPut(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
const std::shared_ptr<Serializable>& aCallbackArgument) {
std::shared_ptr<Cacheable> oldValue;
std::shared_ptr<VersionTag> versionTag;
GfErrType err = putNoThrow(key, value, aCallbackArgument, oldValue, -1,
CacheEventFlags::LOCAL, versionTag);
GfErrTypeToException("Region::localPut", err);
}
void LocalRegion::putAll(
const HashMapOfCacheable& map, std::chrono::milliseconds timeout,
const std::shared_ptr<Serializable>& aCallbackArgument) {
util::PROTOCOL_OPERATION_TIMEOUT_BOUNDS(timeout);
auto sampleStartNanos = startStatOpTime();
auto err = putAllNoThrow(map, timeout, aCallbackArgument);
updateStatOpTime(m_regionStats->getStat(), m_regionStats->getPutAllTimeId(),
sampleStartNanos);
// handleReplay(err, nullptr);
GfErrTypeToException("Region::putAll", err);
}
void LocalRegion::removeAll(
const std::vector<std::shared_ptr<CacheableKey>>& keys,
const std::shared_ptr<Serializable>& aCallbackArgument) {
if (keys.size() == 0) {
throw IllegalArgumentException("Region::removeAll: zero keys provided");
}
int64_t sampleStartNanos = startStatOpTime();
GfErrType err = removeAllNoThrow(keys, aCallbackArgument);
updateStatOpTime(m_regionStats->getStat(),
m_regionStats->getRemoveAllTimeId(), sampleStartNanos);
GfErrTypeToException("Region::removeAll", err);
}
void LocalRegion::create(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
const std::shared_ptr<Serializable>& aCallbackArgument) {
std::shared_ptr<VersionTag> versionTag;
GfErrType err = createNoThrow(key, value, aCallbackArgument, -1,
CacheEventFlags::NORMAL, versionTag);
// handleReplay(err, nullptr);
GfErrTypeToException("Region::create", err);
}
void LocalRegion::localCreate(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
const std::shared_ptr<Serializable>& aCallbackArgument) {
std::shared_ptr<VersionTag> versionTag;
GfErrType err = createNoThrow(key, value, aCallbackArgument, -1,
CacheEventFlags::LOCAL, versionTag);
GfErrTypeToException("Region::localCreate", err);
}
void LocalRegion::invalidate(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Serializable>& aCallbackArgument) {
std::shared_ptr<VersionTag> versionTag;
GfErrType err = invalidateNoThrow(key, aCallbackArgument, -1,
CacheEventFlags::NORMAL, versionTag);
// handleReplay(err, nullptr);
GfErrTypeToException("Region::invalidate", err);
}
void LocalRegion::localInvalidate(
const std::shared_ptr<CacheableKey>& keyPtr,
const std::shared_ptr<Serializable>& aCallbackArgument) {
std::shared_ptr<VersionTag> versionTag;
GfErrType err = invalidateNoThrow(keyPtr, aCallbackArgument, -1,
CacheEventFlags::LOCAL, versionTag);
GfErrTypeToException("Region::localInvalidate", err);
}
void LocalRegion::destroy(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Serializable>& aCallbackArgument) {
std::shared_ptr<VersionTag> versionTag;
GfErrType err = destroyNoThrow(key, aCallbackArgument, -1,
CacheEventFlags::NORMAL, versionTag);
// handleReplay(err, nullptr);
GfErrTypeToException("Region::destroy", err);
}
void LocalRegion::localDestroy(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Serializable>& aCallbackArgument) {
std::shared_ptr<VersionTag> versionTag;
GfErrType err = destroyNoThrow(key, aCallbackArgument, -1,
CacheEventFlags::LOCAL, versionTag);
GfErrTypeToException("Region::localDestroy", err);
}
bool LocalRegion::remove(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
const std::shared_ptr<Serializable>& aCallbackArgument) {
std::shared_ptr<VersionTag> versionTag;
GfErrType err = removeNoThrow(key, value, aCallbackArgument, -1,
CacheEventFlags::NORMAL, versionTag);
bool result = false;
if (err == GF_NOERR) {
result = true;
} else if (err != GF_ENOENT && err != GF_CACHE_ENTRY_NOT_FOUND) {
GfErrTypeToException("Region::remove", err);
}
return result;
}
bool LocalRegion::removeEx(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Serializable>& aCallbackArgument) {
std::shared_ptr<VersionTag> versionTag;
GfErrType err = removeNoThrowEx(key, aCallbackArgument, -1,
CacheEventFlags::NORMAL, versionTag);
bool result = false;
if (err == GF_NOERR) {
result = true;
} else if (err != GF_ENOENT && err != GF_CACHE_ENTRY_NOT_FOUND) {
GfErrTypeToException("Region::removeEx", err);
}
return result;
}
bool LocalRegion::localRemove(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
const std::shared_ptr<Serializable>& aCallbackArgument) {
std::shared_ptr<VersionTag> versionTag;
GfErrType err = removeNoThrow(key, value, aCallbackArgument, -1,
CacheEventFlags::LOCAL, versionTag);
bool result = false;
if (err == GF_NOERR) {
result = true;
} else if (err != GF_ENOENT && err != GF_CACHE_ENTRY_NOT_FOUND) {
GfErrTypeToException("Region::localRemove", err);
}
return result;
}
bool LocalRegion::localRemoveEx(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Serializable>& aCallbackArgument) {
std::shared_ptr<VersionTag> versionTag;
GfErrType err = removeNoThrowEx(key, aCallbackArgument, -1,
CacheEventFlags::LOCAL, versionTag);
bool result = false;
if (err == GF_NOERR) {
result = true;
} else if (err != GF_ENOENT && err != GF_CACHE_ENTRY_NOT_FOUND) {
GfErrTypeToException("Region::localRemoveEx", err);
}
return result;
}
std::vector<std::shared_ptr<CacheableKey>> LocalRegion::keys() {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::keys);
return keys_internal();
}
std::vector<std::shared_ptr<CacheableKey>> LocalRegion::serverKeys() {
throw UnsupportedOperationException(
"serverKeys is not supported for local regions.");
}
std::vector<std::shared_ptr<Cacheable>> LocalRegion::values() {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::values);
std::vector<std::shared_ptr<Cacheable>> values;
if (m_regionAttributes.getCachingEnabled()) {
// invalidToken should not be added by the MapSegments.
m_entries->getValues(values);
}
return values;
}
std::vector<std::shared_ptr<RegionEntry>> LocalRegion::entries(bool recursive) {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::entries);
std::vector<std::shared_ptr<RegionEntry>> entries;
if (m_regionAttributes.getCachingEnabled()) {
entries_internal(entries, recursive);
}
return entries;
}
HashMapOfCacheable LocalRegion::getAll(
const std::vector<std::shared_ptr<CacheableKey>>& keys,
const std::shared_ptr<Serializable>& aCallbackArgument) {
return getAll_internal(keys, aCallbackArgument, true);
}
HashMapOfCacheable LocalRegion::getAll_internal(
const std::vector<std::shared_ptr<CacheableKey>>& keys,
const std::shared_ptr<Serializable>& aCallbackArgument,
bool addToLocalCache) {
if (keys.empty()) {
throw IllegalArgumentException("Region::getAll: zero keys provided");
}
int64_t sampleStartNanos = startStatOpTime();
auto values = std::make_shared<HashMapOfCacheable>();
auto exceptions = std::make_shared<HashMapOfException>();
GfErrType err = getAllNoThrow(keys, values, exceptions, addToLocalCache,
aCallbackArgument);
updateStatOpTime(m_regionStats->getStat(), m_regionStats->getGetAllTimeId(),
sampleStartNanos);
GfErrTypeToException("Region::getAll", err);
return *values;
}
uint32_t LocalRegion::size_remote() {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::size);
if (m_regionAttributes.getCachingEnabled()) {
return m_entries->size();
}
return 0;
}
uint32_t LocalRegion::size() {
TXState* txState = getTXState();
if (txState != nullptr) {
if (isLocalOp()) {
return GF_NOTSUP;
}
return size_remote();
}
return LocalRegion::size_remote();
}
RegionService& LocalRegion::getRegionService() const {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::getRegionService);
return *m_cacheImpl->getCache();
}
CacheImpl* LocalRegion::getCacheImpl() const {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::getCache);
return m_cacheImpl;
}
bool LocalRegion::containsValueForKey_remote(
const std::shared_ptr<CacheableKey>& keyPtr) const {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::containsValueForKey);
if (!m_regionAttributes.getCachingEnabled()) {
return false;
}
std::shared_ptr<Cacheable> valuePtr;
std::shared_ptr<MapEntryImpl> mePtr;
m_entries->getEntry(keyPtr, mePtr, valuePtr);
if (mePtr == nullptr) {
return false;
}
return (valuePtr != nullptr && !CacheableToken::isInvalid(valuePtr));
}
bool LocalRegion::containsValueForKey(
const std::shared_ptr<CacheableKey>& keyPtr) const {
if (keyPtr == nullptr) {
throw IllegalArgumentException(
"LocalRegion::containsValueForKey: "
"key is null");
}
TXState* txState = getTXState();
if (txState == nullptr) {
return LocalRegion::containsValueForKey_remote(keyPtr);
}
return containsValueForKey_remote(keyPtr);
}
bool LocalRegion::containsKeyOnServer(
const std::shared_ptr<CacheableKey>&) const {
throw UnsupportedOperationException(
"LocalRegion::containsKeyOnServer: is not supported.");
}
std::vector<std::shared_ptr<CacheableKey>> LocalRegion::getInterestList()
const {
throw UnsupportedOperationException(
"LocalRegion::getInterestList: is not supported.");
}
std::vector<std::shared_ptr<CacheableString>>
LocalRegion::getInterestListRegex() const {
throw UnsupportedOperationException(
"LocalRegion::getInterestListRegex: is not supported.");
}
bool LocalRegion::containsKey(
const std::shared_ptr<CacheableKey>& keyPtr) const {
if (keyPtr == nullptr) {
throw IllegalArgumentException(
"LocalRegion::containsKey: "
"key is null");
}
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::containsKey);
return containsKey_internal(keyPtr);
}
void LocalRegion::setPersistenceManager(
std::shared_ptr<PersistenceManager>& pmPtr) {
m_persistenceManager = pmPtr;
// set the memberVariable of LRUEntriesMap too.
LRUEntriesMap* lruMap = dynamic_cast<LRUEntriesMap*>(m_entries);
if (lruMap != nullptr) {
lruMap->setPersistenceManager(pmPtr);
}
}
void LocalRegion::setRegionExpiryTask() {
if (regionExpiryEnabled()) {
auto rptr = std::static_pointer_cast<RegionInternal>(shared_from_this());
const auto& duration = getRegionExpiryDuration();
auto handler =
new RegionExpiryHandler(rptr, getRegionExpiryAction(), duration);
auto expiryTaskId =
rptr->getCacheImpl()->getExpiryTaskManager().scheduleExpiryTask(
handler, duration, std::chrono::seconds::zero());
handler->setExpiryTaskId(expiryTaskId);
LOGFINE(
"expiry for region [%s], expiry task id = %d, duration = %d, "
"action = %d",
m_fullPath.c_str(), expiryTaskId, duration.count(),
getRegionExpiryAction());
}
}
void LocalRegion::registerEntryExpiryTask(
std::shared_ptr<MapEntryImpl>& entry) {
// locking is not required here since only the thread that creates
// the entry will register the expiry task for that entry
ExpEntryProperties& expProps = entry->getExpProperties();
expProps.initStartTime();
auto rptr = std::static_pointer_cast<RegionInternal>(shared_from_this());
const auto& duration = getEntryExpiryDuration();
auto handler =
new EntryExpiryHandler(rptr, entry, getEntryExpirationAction(), duration);
auto id = rptr->getCacheImpl()->getExpiryTaskManager().scheduleExpiryTask(
handler, duration, std::chrono::seconds::zero());
if (Log::finestEnabled()) {
std::shared_ptr<CacheableKey> key;
entry->getKeyI(key);
LOGFINEST(
"entry expiry in region [%s], key [%s], task id = %d, "
"duration = %d, action = %d",
m_fullPath.c_str(), Utils::nullSafeToString(key).c_str(), id,
duration.count(), getEntryExpirationAction());
}
expProps.setExpiryTaskId(id);
}
LocalRegion::~LocalRegion() noexcept {
TryWriteGuard guard(m_rwLock, m_destroyPending);
if (!m_destroyPending) {
release(false);
}
m_listener = nullptr;
m_writer = nullptr;
m_loader = nullptr;
_GEODE_SAFE_DELETE(m_entries);
_GEODE_SAFE_DELETE(m_regionStats);
}
/**
* Release the region resources if not released already.
*/
void LocalRegion::release(bool invokeCallbacks) {
if (m_released) {
return;
}
LOGFINE("LocalRegion::release entered for region %s", m_fullPath.c_str());
m_released = true;
if (m_regionStats != nullptr) {
m_regionStats->close();
}
if (invokeCallbacks) {
try {
if (m_loader != nullptr) {
m_loader->close(*this);
}
if (m_writer != nullptr) {
m_writer->close(*this);
}
// TODO: shouldn't listener also be here instead of
// during CacheImpl.close()
} catch (...) {
LOGWARN(
"Region close caught unknown exception in loader/writer "
"close; continuing");
}
}
if (m_persistenceManager != nullptr) {
m_persistenceManager->close();
m_persistenceManager = nullptr;
}
if (m_entries != nullptr && m_regionAttributes.getCachingEnabled()) {
m_entries->close();
}
LOGFINE("LocalRegion::release done for region %s", m_fullPath.c_str());
}
/** Returns whether the specified key currently exists in this region.
* This method is equivalent to <code>getEntry(key) != null</code>.
*
* @param keyPtr the key to check for an existing entry, type is
*CacheableString
*&
* @return true if there is an entry in this region for the specified key
*@throw RegionDestroyedException, if region is destroyed.
*@throw IllegalArgumentException, if the key is 'null'.
*@throw NotConnectedException, if not connected to geode system.
*/
bool LocalRegion::containsKey_internal(
const std::shared_ptr<CacheableKey>& keyPtr) const {
if (keyPtr == nullptr) {
throw IllegalArgumentException("Region::containsKey: key is null");
}
if (!m_regionAttributes.getCachingEnabled()) {
return false;
}
return m_entries->containsKey(keyPtr);
}
std::vector<std::shared_ptr<Region>> LocalRegion::subregions_internal(
const bool recursive) {
auto&& lock = m_subRegions.make_lock();
std::vector<std::shared_ptr<Region>> regions;
regions.reserve(m_subRegions.size());
for (const auto& kv : m_subRegions) {
const auto& subRegion = kv.second;
regions.push_back(subRegion);
if (recursive) {
if (auto localRegion =
std::dynamic_pointer_cast<LocalRegion>(subRegion)) {
auto subRegions = localRegion->subregions_internal(true);
regions.insert(regions.end(), subRegions.begin(), subRegions.end());
}
}
}
return regions;
}
GfErrType LocalRegion::getNoThrow(
const std::shared_ptr<CacheableKey>& keyPtr,
std::shared_ptr<Cacheable>& value,
const std::shared_ptr<Serializable>& aCallbackArgument) {
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
GfErrType err = GF_NOERR;
if (keyPtr == nullptr) {
return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION;
}
TXState* txState = getTXState();
if (txState != nullptr) {
if (isLocalOp()) {
return GF_NOTSUP;
}
std::shared_ptr<VersionTag> versionTag;
err = getNoThrow_remote(keyPtr, value, aCallbackArgument, versionTag);
if (err == GF_NOERR) {
txState->setDirty();
}
if (CacheableToken::isInvalid(value) ||
CacheableToken::isTombstone(value)) {
value = nullptr;
}
return err;
}
m_regionStats->incGets();
auto& cachePerfStats = m_cacheImpl->getCachePerfStats();
cachePerfStats.incGets();
// TODO: CacheableToken::isInvalid should be completely hidden
// inside MapSegment; this should be done both for the value obtained
// from local cache as well as oldValue in every instance
std::shared_ptr<MapEntryImpl> me;
int updateCount = -1;
bool isLoaderInvoked = false;
bool isLocal = false;
bool cachingEnabled = m_regionAttributes.getCachingEnabled();
std::shared_ptr<Cacheable> localValue = nullptr;
if (cachingEnabled) {
isLocal = m_entries->get(keyPtr, value, me);
if (isLocal && (value != nullptr && !CacheableToken::isInvalid(value))) {
m_regionStats->incHits();
cachePerfStats.incHits();
updateAccessAndModifiedTimeForEntry(me, false);
updateAccessAndModifiedTime(false);
return err; // found it in local cache...
}
localValue = value;
value = nullptr;
// start tracking the entry
if (!m_regionAttributes.getConcurrencyChecksEnabled()) {
updateCount =
m_entries->addTrackerForEntry(keyPtr, value, true, false, false);
LOGDEBUG(
"Region::get: added tracking with update counter [%d] for key "
"[%s] with value [%s]",
updateCount, Utils::nullSafeToString(keyPtr).c_str(),
Utils::nullSafeToString(value).c_str());
}
}
// remove tracking for the entry before exiting the function
struct RemoveTracking {
private:
const std::shared_ptr<CacheableKey>& m_key;
const int& m_updateCount;
LocalRegion& m_region;
public:
RemoveTracking(const std::shared_ptr<CacheableKey>& key,
const int& updateCount, LocalRegion& region)
: m_key(key), m_updateCount(updateCount), m_region(region) {}
~RemoveTracking() {
if (m_updateCount >= 0 &&
!m_region.getAttributes().getConcurrencyChecksEnabled()) {
m_region.m_entries->removeTrackerForEntry(m_key);
}
}
} _removeTracking(keyPtr, updateCount, *this);
// The control will come here only when caching is disabled or/and
// the entry was not found. In this case atleast update the region
// access times.
updateAccessAndModifiedTime(false);
m_regionStats->incMisses();
cachePerfStats.incMisses();
std::shared_ptr<VersionTag> versionTag;
// Get from some remote source (e.g. external java server) if required.
err = getNoThrow_remote(keyPtr, value, aCallbackArgument, versionTag);
// Its a cache missor it is invalid token then Check if we have a local
// loader.
if ((value == nullptr || CacheableToken::isInvalid(value) ||
CacheableToken::isTombstone(value)) &&
m_loader != nullptr) {
try {
isLoaderInvoked = true;
/*Update the statistics*/
int64_t sampleStartNanos = startStatOpTime();
value = m_loader->load(*this, keyPtr, aCallbackArgument);
updateStatOpTime(m_regionStats->getStat(),
m_regionStats->getLoaderCallTimeId(), sampleStartNanos);
m_regionStats->incLoaderCallsCompleted();
} catch (const Exception& ex) {
LOGERROR("Error in CacheLoader::load: %s: %s", ex.getName().c_str(),
ex.what());
err = GF_CACHE_LOADER_EXCEPTION;
} catch (...) {
LOGERROR("Error in CacheLoader::load, unknown");
err = GF_CACHE_LOADER_EXCEPTION;
}
if (err != GF_NOERR) {
return err;
}
}
std::shared_ptr<Cacheable> oldValue;
// Found it somehow, so store it.
if (value != nullptr /*&& value != CacheableToken::invalid( )*/ &&
cachingEnabled &&
!(CacheableToken::isTombstone(value) &&
(localValue == nullptr || CacheableToken::isInvalid(localValue)))) {
// try to create the entry and if that returns an existing value
// (e.g. from another thread or notification) then return that
LOGDEBUG(
"Region::get: creating entry with tracking update counter [%d] for "
"key "
"[%s]",
updateCount, Utils::nullSafeToString(keyPtr).c_str());
if ((err = putLocal("Region::get", false, keyPtr, value, oldValue,
cachingEnabled, updateCount, 0, versionTag)) !=
GF_NOERR) {
if (err == GF_CACHE_CONCURRENT_MODIFICATION_EXCEPTION) {
LOGDEBUG(
"Region::get: putLocal for key [%s] failed because the cache already contains \
an entry with higher version.",
Utils::nullSafeToString(keyPtr).c_str());
if (CacheableToken::isInvalid(value) ||
CacheableToken::isTombstone(value)) {
value = nullptr;
}
// don't do anything and exit
return GF_NOERR;
}
LOGDEBUG("Region::get: putLocal for key [%s] failed with error %d",
Utils::nullSafeToString(keyPtr).c_str(), err);
err = GF_NOERR;
if (oldValue != nullptr && !CacheableToken::isInvalid(oldValue)) {
LOGDEBUG("Region::get: returning updated value [%s] for key [%s]",
Utils::nullSafeToString(oldValue).c_str(),
Utils::nullSafeToString(keyPtr).c_str());
value = oldValue;
}
}
}
if (CacheableToken::isInvalid(value) || CacheableToken::isTombstone(value)) {
value = nullptr;
}
// invokeCacheListenerForEntryEvent method has the check that if oldValue
// is a CacheableToken then it sets it to nullptr; also determines if it
// should be AFTER_UPDATE or AFTER_CREATE depending on oldValue, so don't
// check here.
if (isLoaderInvoked == false && err == GF_NOERR && value != nullptr) {
err = invokeCacheListenerForEntryEvent(
keyPtr, oldValue, value, aCallbackArgument, CacheEventFlags::NORMAL,
AFTER_UPDATE, isLocal);
}
return err;
}
GfErrType LocalRegion::getAllNoThrow(
const std::vector<std::shared_ptr<CacheableKey>>& keys,
const std::shared_ptr<HashMapOfCacheable>& values,
const std::shared_ptr<HashMapOfException>& exceptions,
const bool addToLocalCache,
const std::shared_ptr<Serializable>& aCallbackArgument) {
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
GfErrType err = GF_NOERR;
std::shared_ptr<Cacheable> value;
TXState* txState = getTXState();
if (txState != nullptr) {
if (isLocalOp()) {
return GF_NOTSUP;
}
err = getAllNoThrow_remote(&keys, values, exceptions, nullptr, false,
aCallbackArgument);
if (err == GF_NOERR) {
txState->setDirty();
}
return err;
}
// keys not in cache with their tracking numbers to be gotten using
// a remote call
std::vector<std::shared_ptr<CacheableKey>> serverKeys;
bool cachingEnabled = m_regionAttributes.getCachingEnabled();
bool regionAccessed = false;
auto& cachePerfStats = m_cacheImpl->getCachePerfStats();
for (const auto& key : keys) {
std::shared_ptr<MapEntryImpl> me;
value = nullptr;
m_regionStats->incGets();
cachePerfStats.incGets();
if (values && cachingEnabled) {
if (m_entries->get(key, value, me) && value &&
!CacheableToken::isInvalid(value)) {
m_regionStats->incHits();
cachePerfStats.incHits();
updateAccessAndModifiedTimeForEntry(me, false);
regionAccessed = true;
values->emplace(key, value);
} else {
value = nullptr;
}
}
if (value == nullptr) {
// Add to missed keys list.
serverKeys.push_back(key);
m_regionStats->incMisses();
cachePerfStats.incMisses();
}
// TODO: No support for loaders in getAll for now.
}
if (regionAccessed) {
updateAccessAndModifiedTime(false);
}
if (serverKeys.size() > 0) {
err = getAllNoThrow_remote(&serverKeys, values, exceptions, nullptr,
addToLocalCache, aCallbackArgument);
}
m_regionStats->incGetAll();
return err;
}
// encapsulates actions that need to be taken for a put() operation
class PutActions {
public:
static const EntryEventType s_beforeEventType = BEFORE_UPDATE;
static const EntryEventType s_afterEventType = AFTER_UPDATE;
static const bool s_addIfAbsent = true;
static const bool s_failIfPresent = false;
TXState* m_txState;
inline explicit PutActions(LocalRegion& region) : m_region(region) {
m_txState = TSSTXStateWrapper::get().getTXState();
}
inline static const char* name() { return "Region::put"; }
inline static GfErrType checkArgs(const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
DataInput* delta = nullptr) {
if (key == nullptr || (value == nullptr && delta == nullptr)) {
return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION;
}
return GF_NOERR;
}
inline void getCallbackOldValue(bool cachingEnabled,
const std::shared_ptr<CacheableKey>& key,
std::shared_ptr<MapEntryImpl>& entry,
std::shared_ptr<Cacheable>& oldValue) const {
if (cachingEnabled) {
m_region.m_entries->getEntry(key, entry, oldValue);
}
}
inline static void logCacheWriterFailure(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& oldValue) {
bool isUpdate = (oldValue != nullptr);
LOGFINER("Cache writer vetoed %s for key %s",
(isUpdate ? "update" : "create"),
Utils::nullSafeToString(key).c_str());
}
inline GfErrType remoteUpdate(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
const std::shared_ptr<Serializable>& aCallbackArgument,
std::shared_ptr<VersionTag>& versionTag) {
// propagate the put to remote server, if any
return m_region.putNoThrow_remote(key, value, aCallbackArgument,
versionTag);
}
inline GfErrType localUpdate(const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
std::shared_ptr<Cacheable>& oldValue,
bool cachingEnabled,
const CacheEventFlags /*eventFlags*/,
int updateCount,
std::shared_ptr<VersionTag> versionTag,
DataInput* delta = nullptr,
std::shared_ptr<EventId> eventId = nullptr,
bool /*afterRemote*/ = false) {
return m_region.putLocal(name(), false, key, value, oldValue,
cachingEnabled, updateCount, 0, versionTag, delta,
eventId);
}
private:
LocalRegion& m_region;
};
// encapsulates actions that need to be taken for a put() operation. This
// implementation allows
// null values in Put during transaction. See defect #743
class PutActionsTx : public PutActions {
public:
inline explicit PutActionsTx(LocalRegion& region) : PutActions(region) {}
inline static GfErrType checkArgs(const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& /*value*/,
DataInput* /*delta*/ = nullptr) {
if (key == nullptr) {
return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION;
}
return GF_NOERR;
}
};
// encapsulates actions that need to be taken for a create() operation
class CreateActions {
public:
static const EntryEventType s_beforeEventType = BEFORE_CREATE;
static const EntryEventType s_afterEventType = AFTER_CREATE;
static const bool s_addIfAbsent = true;
static const bool s_failIfPresent = true;
TXState* m_txState;
inline explicit CreateActions(LocalRegion& region) : m_region(region) {
m_txState = TSSTXStateWrapper::get().getTXState();
}
inline static const char* name() { return "Region::create"; }
inline static GfErrType checkArgs(const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& /*value*/,
DataInput* /*delta*/) {
if (key == nullptr) {
return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION;
}
return GF_NOERR;
}
inline void getCallbackOldValue(
bool /*cachingEnabled*/, const std::shared_ptr<CacheableKey>& /*key*/,
std::shared_ptr<MapEntryImpl>& /*entry*/,
std::shared_ptr<Cacheable>& /*oldValue*/) const {}
inline static void logCacheWriterFailure(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& /*oldValue*/) {
LOGFINER("Cache writer vetoed create for key %s",
Utils::nullSafeToString(key).c_str());
}
inline GfErrType remoteUpdate(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
const std::shared_ptr<Serializable>& aCallbackArgument,
std::shared_ptr<VersionTag>& versionTag) {
return m_region.createNoThrow_remote(key, value, aCallbackArgument,
versionTag);
}
inline GfErrType localUpdate(const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
std::shared_ptr<Cacheable>& oldValue,
bool cachingEnabled,
const CacheEventFlags /*eventFlags*/,
int updateCount,
std::shared_ptr<VersionTag> versionTag,
DataInput* /*delta*/ = nullptr,
std::shared_ptr<EventId> /*eventId*/ = nullptr,
bool /*afterRemote*/ = false) {
return m_region.putLocal(name(), true, key, value, oldValue, cachingEnabled,
updateCount, 0, versionTag);
}
private:
LocalRegion& m_region;
};
// encapsulates actions that need to be taken for a destroy() operation
class DestroyActions {
public:
static const EntryEventType s_beforeEventType = BEFORE_DESTROY;
static const EntryEventType s_afterEventType = AFTER_DESTROY;
static const bool s_addIfAbsent = true;
static const bool s_failIfPresent = false;
TXState* m_txState;
inline explicit DestroyActions(LocalRegion& region) : m_region(region) {
m_txState = TSSTXStateWrapper::get().getTXState();
}
inline static const char* name() { return "Region::destroy"; }
inline static GfErrType checkArgs(const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& /*value*/,
DataInput* /*delta*/) {
if (key == nullptr) {
return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION;
}
return GF_NOERR;
}
inline void getCallbackOldValue(bool cachingEnabled,
const std::shared_ptr<CacheableKey>& key,
std::shared_ptr<MapEntryImpl>& entry,
std::shared_ptr<Cacheable>& oldValue) const {
if (cachingEnabled) {
m_region.m_entries->getEntry(key, entry, oldValue);
}
}
inline static void logCacheWriterFailure(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& /*oldValue*/) {
LOGFINER("Cache writer vetoed destroy for key %s",
Utils::nullSafeToString(key).c_str());
}
inline GfErrType remoteUpdate(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& /*value*/,
const std::shared_ptr<Serializable>& aCallbackArgument,
std::shared_ptr<VersionTag>& versionTag) {
return m_region.destroyNoThrow_remote(key, aCallbackArgument, versionTag);
}
inline GfErrType localUpdate(const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& /*value*/,
std::shared_ptr<Cacheable>& oldValue,
bool cachingEnabled,
const CacheEventFlags eventFlags,
int updateCount,
std::shared_ptr<VersionTag> versionTag,
DataInput* /*delta*/ = nullptr,
std::shared_ptr<EventId> /*eventId*/ = nullptr,
bool afterRemote = false) {
auto& cachePerfStats = m_region.m_cacheImpl->getCachePerfStats();
if (cachingEnabled) {
std::shared_ptr<MapEntryImpl> entry;
// for notification invoke the listener even if the key does
// not exist locally
GfErrType err;
LOGDEBUG("Region::destroy: region [%s] destroying key [%s]",
m_region.getFullPath().c_str(),
Utils::nullSafeToString(key).c_str());
if ((err = m_region.m_entries->remove(key, oldValue, entry, updateCount,
versionTag, afterRemote)) !=
GF_NOERR) {
if (eventFlags.isNotification()) {
LOGDEBUG(
"Region::destroy: region [%s] destroy key [%s] for "
"notification having value [%s] failed with %d",
m_region.getFullPath().c_str(),
Utils::nullSafeToString(key).c_str(),
Utils::nullSafeToString(oldValue).c_str(), err);
err = GF_NOERR;
}
return err;
}
if (oldValue != nullptr) {
LOGDEBUG(
"Region::destroy: region [%s] destroyed key [%s] having "
"value [%s]",
m_region.getFullPath().c_str(),
Utils::nullSafeToString(key).c_str(),
Utils::nullSafeToString(oldValue).c_str());
// any cleanup required for the entry (e.g. removing from LRU list)
if (entry != nullptr) {
entry->cleanup(eventFlags);
}
// entry/region expiration
if (!eventFlags.isEvictOrExpire()) {
m_region.updateAccessAndModifiedTime(true);
}
// update the stats
m_region.m_regionStats->setEntries(m_region.m_entries->size());
cachePerfStats.incEntries(-1);
}
}
// update the stats
m_region.m_regionStats->incDestroys();
cachePerfStats.incDestroys();
return GF_NOERR;
}
private:
LocalRegion& m_region;
};
// encapsulates actions that need to be taken for a remove() operation
class RemoveActions {
public:
static const EntryEventType s_beforeEventType = BEFORE_DESTROY;
static const EntryEventType s_afterEventType = AFTER_DESTROY;
static const bool s_addIfAbsent = true;
static const bool s_failIfPresent = false;
TXState* m_txState;
bool allowNULLValue;
inline explicit RemoveActions(LocalRegion& region)
: m_region(region), m_ServerResponse(GF_ENOENT) {
m_txState = TSSTXStateWrapper::get().getTXState();
allowNULLValue = false;
}
inline static const char* name() { return "Region::remove"; }
inline static GfErrType checkArgs(const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& /*value*/,
DataInput* /*delta*/) {
if (key == nullptr) {
return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION;
}
return GF_NOERR;
}
inline void getCallbackOldValue(bool cachingEnabled,
const std::shared_ptr<CacheableKey>& key,
std::shared_ptr<MapEntryImpl>& entry,
std::shared_ptr<Cacheable>& oldValue) const {
if (cachingEnabled) {
m_region.m_entries->getEntry(key, entry, oldValue);
}
}
inline static void logCacheWriterFailure(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& /*oldValue*/) {
LOGFINER("Cache writer vetoed remove for key %s",
Utils::nullSafeToString(key).c_str());
}
bool serializedEqualTo(const std::shared_ptr<Cacheable>& lhs,
const std::shared_ptr<Cacheable>& rhs) {
auto&& cache = *(m_region.getCacheImpl());
if (const auto dataSerializablePrimitive =
std::dynamic_pointer_cast<DataSerializablePrimitive>(lhs)) {
return SerializableHelper<DataSerializablePrimitive>{}.equalTo(
cache, dataSerializablePrimitive,
std::dynamic_pointer_cast<DataSerializablePrimitive>(rhs));
} else if (const auto dataSerializable =
std::dynamic_pointer_cast<DataSerializable>(lhs)) {
return SerializableHelper<DataSerializable>{}.equalTo(
cache, dataSerializable,
std::dynamic_pointer_cast<DataSerializable>(rhs));
} else if (const auto pdxSerializable =
std::dynamic_pointer_cast<PdxSerializable>(lhs)) {
return SerializableHelper<PdxSerializable>{}.equalTo(
cache, pdxSerializable,
std::dynamic_pointer_cast<PdxSerializable>(rhs));
} else if (const auto dataSerializableInternal =
std::dynamic_pointer_cast<DataSerializableInternal>(lhs)) {
return SerializableHelper<DataSerializableInternal>{}.equalTo(
cache, dataSerializableInternal,
std::dynamic_pointer_cast<DataSerializableInternal>(rhs));
} else {
throw UnsupportedOperationException(
"Serialization type not implemented.");
}
}
inline GfErrType remoteUpdate(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& newValue,
const std::shared_ptr<Serializable>& aCallbackArgument,
std::shared_ptr<VersionTag>& versionTag) {
// propagate the remove to remote server, if any
std::shared_ptr<Cacheable> oldValue;
GfErrType err = GF_NOERR;
if (!allowNULLValue && m_region.getAttributes().getCachingEnabled()) {
m_region.getEntry(key, oldValue);
if (oldValue != nullptr && newValue != nullptr) {
if (!serializedEqualTo(oldValue, newValue)) {
err = GF_ENOENT;
return err;
}
} else if ((oldValue == nullptr || CacheableToken::isInvalid(oldValue))) {
m_ServerResponse = m_region.removeNoThrow_remote(
key, newValue, aCallbackArgument, versionTag);
return m_ServerResponse;
} else if (oldValue != nullptr && newValue == nullptr) {
err = GF_ENOENT;
return err;
}
}
if (allowNULLValue) {
m_ServerResponse =
m_region.removeNoThrowEX_remote(key, aCallbackArgument, versionTag);
} else {
m_ServerResponse = m_region.removeNoThrow_remote(
key, newValue, aCallbackArgument, versionTag);
}
LOGDEBUG("serverResponse::%d", m_ServerResponse);
return m_ServerResponse;
}
inline GfErrType localUpdate(const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
std::shared_ptr<Cacheable>& oldValue,
bool cachingEnabled,
const CacheEventFlags eventFlags,
int updateCount,
std::shared_ptr<VersionTag> versionTag,
DataInput* /*delta*/ = nullptr,
std::shared_ptr<EventId> /*eventId*/ = nullptr,
bool afterRemote = false) {
std::shared_ptr<Cacheable> valuePtr;
GfErrType err = GF_NOERR;
if (!allowNULLValue && cachingEnabled) {
m_region.getEntry(key, valuePtr);
if (valuePtr != nullptr && value != nullptr) {
if (!serializedEqualTo(valuePtr, value)) {
err = GF_ENOENT;
return err;
}
} else if (value == nullptr && (!CacheableToken::isInvalid(valuePtr) ||
valuePtr == nullptr)) {
err = (m_ServerResponse == 0 && valuePtr == nullptr) ? GF_NOERR
: GF_ENOENT;
if (updateCount >= 0 &&
!m_region.getAttributes().getConcurrencyChecksEnabled()) {
// This means server has deleted an entry & same entry has been
// destroyed locally
// So call removeTrackerForEntry to remove key that
// was added in the map during addTrackerForEntry call.
m_region.m_entries->removeTrackerForEntry(key);
}
return err;
} else if (valuePtr == nullptr && value != nullptr &&
m_ServerResponse != 0) {
err = GF_ENOENT;
return err;
}
}
auto& cachePerfStats = m_region.m_cacheImpl->getCachePerfStats();
if (cachingEnabled) {
std::shared_ptr<MapEntryImpl> entry;
// for notification invoke the listener even if the key does
// not exist locally
GfErrType err;
LOGDEBUG("Region::remove: region [%s] removing key [%s]",
m_region.getFullPath().c_str(),
Utils::nullSafeToString(key).c_str());
if ((err = m_region.m_entries->remove(key, oldValue, entry, updateCount,
versionTag, afterRemote)) !=
GF_NOERR) {
if (eventFlags.isNotification()) {
LOGDEBUG(
"Region::remove: region [%s] remove key [%s] for "
"notification having value [%s] failed with %d",
m_region.getFullPath().c_str(),
Utils::nullSafeToString(key).c_str(),
Utils::nullSafeToString(oldValue).c_str(), err);
err = GF_NOERR;
}
return err;
}
if (oldValue != nullptr) {
LOGDEBUG(
"Region::remove: region [%s] removed key [%s] having "
"value [%s]",
m_region.getFullPath().c_str(),
Utils::nullSafeToString(key).c_str(),
Utils::nullSafeToString(oldValue).c_str());
// any cleanup required for the entry (e.g. removing from LRU list)
if (entry != nullptr) {
entry->cleanup(eventFlags);
}
// entry/region expiration
if (!eventFlags.isEvictOrExpire()) {
m_region.updateAccessAndModifiedTime(true);
}
// update the stats
m_region.m_regionStats->setEntries(m_region.m_entries->size());
cachePerfStats.incEntries(-1);
}
}
// update the stats
m_region.m_regionStats->incDestroys();
cachePerfStats.incDestroys();
return GF_NOERR;
}
private:
LocalRegion& m_region;
GfErrType m_ServerResponse;
};
class RemoveActionsEx : public RemoveActions {
public:
inline explicit RemoveActionsEx(LocalRegion& region) : RemoveActions(region) {
allowNULLValue = true;
}
};
// encapsulates actions that need to be taken for a invalidate() operation
class InvalidateActions {
public:
static const EntryEventType s_beforeEventType = BEFORE_INVALIDATE;
static const EntryEventType s_afterEventType = AFTER_INVALIDATE;
static const bool s_addIfAbsent = true;
static const bool s_failIfPresent = false;
TXState* m_txState;
inline explicit InvalidateActions(LocalRegion& region) : m_region(region) {
m_txState = TSSTXStateWrapper::get().getTXState();
}
inline static const char* name() { return "Region::invalidate"; }
inline static GfErrType checkArgs(const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& /*value*/,
DataInput* /*delta*/ = nullptr) {
if (key == nullptr) {
return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION;
}
return GF_NOERR;
}
inline void getCallbackOldValue(bool cachingEnabled,
const std::shared_ptr<CacheableKey>& key,
std::shared_ptr<MapEntryImpl>& entry,
std::shared_ptr<Cacheable>& oldValue) const {
if (cachingEnabled) {
m_region.m_entries->getEntry(key, entry, oldValue);
}
}
inline static void logCacheWriterFailure(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& oldValue) {
bool isUpdate = (oldValue != nullptr);
LOGFINER("Cache writer vetoed %s for key %s",
(isUpdate ? "update" : "invalidate"),
Utils::nullSafeToString(key).c_str());
}
inline GfErrType remoteUpdate(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& /*value*/,
const std::shared_ptr<Serializable>& aCallbackArgument,
std::shared_ptr<VersionTag>& versionTag) {
// propagate the invalidate to remote server, if any
return m_region.invalidateNoThrow_remote(key, aCallbackArgument,
versionTag);
}
inline GfErrType localUpdate(const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
std::shared_ptr<Cacheable>& /*oldValue*/,
bool /*cachingEnabled*/,
const CacheEventFlags eventFlags,
int /*updateCount*/,
std::shared_ptr<VersionTag> versionTag,
DataInput* /*delta*/ = nullptr,
std::shared_ptr<EventId> /*eventId*/ = nullptr,
bool /*afterRemote*/ = false) {
return m_region.invalidateLocal(name(), key, value, eventFlags, versionTag);
}
private:
LocalRegion& m_region;
};
template <typename TAction>
GfErrType LocalRegion::updateNoThrow(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
const std::shared_ptr<Serializable>& aCallbackArgument,
std::shared_ptr<Cacheable>& oldValue, int updateCount,
const CacheEventFlags eventFlags, std::shared_ptr<VersionTag> versionTag,
DataInput* delta, std::shared_ptr<EventId> eventId) {
GfErrType err = GF_NOERR;
if ((err = TAction::checkArgs(key, value, delta)) != GF_NOERR) {
return err;
}
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
TAction action(*this);
TXState* txState = action.m_txState;
if (txState != nullptr) {
if (isLocalOp(&eventFlags)) {
return GF_NOTSUP;
}
/* adongre - Coverity II
* CID 29194 (6): Parse warning (PW.PARAMETER_HIDDEN)
*/
// std::shared_ptr<VersionTag> versionTag;
err = action.remoteUpdate(key, value, aCallbackArgument, versionTag);
if (err == GF_NOERR) {
txState->setDirty();
}
return err;
}
bool cachingEnabled = m_regionAttributes.getCachingEnabled();
std::shared_ptr<MapEntryImpl> entry;
// do not invoke the writer in case of notification/eviction
// or expiration
if (m_writer != nullptr && eventFlags.invokeCacheWriter()) {
action.getCallbackOldValue(cachingEnabled, key, entry, oldValue);
// invokeCacheWriterForEntryEvent method has the check that if oldValue
// is a CacheableToken then it sets it to nullptr; also determines if it
// should be BEFORE_UPDATE or BEFORE_CREATE depending on oldValue
if (!invokeCacheWriterForEntryEvent(key, oldValue, value, aCallbackArgument,
eventFlags,
TAction::s_beforeEventType)) {
TAction::logCacheWriterFailure(key, oldValue);
return GF_CACHEWRITER_ERROR;
}
}
bool remoteOpDone = false;
// try the remote update; but if this fails (e.g. due to security
// exception) do not do the local update
// uses the technique of adding a tracking to the entry before proceeding
// for put; if the update counter changes when the remote update completes
// then it means that the local entry was overwritten in the meantime
// by a notification or another thread, so we do not do the local update
if (!eventFlags.isLocal() && !eventFlags.isNotification()) {
if (cachingEnabled && updateCount < 0 &&
!m_regionAttributes.getConcurrencyChecksEnabled()) {
// add a tracking for the entry
if ((updateCount = m_entries->addTrackerForEntry(
key, oldValue, TAction::s_addIfAbsent, TAction::s_failIfPresent,
true)) < 0) {
if (oldValue != nullptr) {
// fail for "create" when entry exists
return GF_CACHE_ENTRY_EXISTS;
}
}
}
// propagate the update to remote server, if any
err = action.remoteUpdate(key, value, aCallbackArgument, versionTag);
if (err != GF_NOERR) {
if (updateCount >= 0 &&
!m_regionAttributes.getConcurrencyChecksEnabled()) {
m_entries->removeTrackerForEntry(key);
}
return err;
}
remoteOpDone = true;
}
if (!eventFlags.isNotification() || getProcessedMarker()) {
if ((err = action.localUpdate(key, value, oldValue, cachingEnabled,
eventFlags, updateCount, versionTag, delta,
eventId, remoteOpDone)) ==
GF_CACHE_ENTRY_UPDATED) {
LOGFINEST(
"%s: did not change local value for key [%s] since it has "
"been updated by another thread while operation was in progress",
TAction::name(), Utils::nullSafeToString(key).c_str());
err = GF_NOERR;
} else if (err == GF_CACHE_CONCURRENT_MODIFICATION_EXCEPTION) {
LOGDEBUG(
"Region::localUpdate: updateNoThrow<%s> for key [%s] failed because the cache already contains \
an entry with higher version. The cache listener will not be invoked.",
TAction::name(), Utils::nullSafeToString(key).c_str());
// Cache listener won't be called in this case
return GF_NOERR;
} else if (err == GF_INVALID_DELTA) {
LOGDEBUG(
"Region::localUpdate: updateNoThrow<%s> for key [%s] failed "
"because "
"of invalid delta.",
TAction::name(), Utils::nullSafeToString(key).c_str());
m_cacheImpl->getCachePerfStats().incFailureOnDeltaReceived();
// Get full object from server.
std::shared_ptr<Cacheable>& newValue1 =
const_cast<std::shared_ptr<Cacheable>&>(value);
std::shared_ptr<VersionTag> versionTag1;
err = getNoThrow_FullObject(eventId, newValue1, versionTag1);
if (err == GF_NOERR && newValue1 != nullptr) {
err = m_entries->put(key, newValue1, entry, oldValue, updateCount, 0,
versionTag1 != nullptr ? versionTag1 : versionTag);
if (err == GF_CACHE_CONCURRENT_MODIFICATION_EXCEPTION) {
LOGDEBUG(
"Region::localUpdate: updateNoThrow<%s> for key [%s] failed because the cache already contains \
an entry with higher version. The cache listener will not be invoked.",
TAction::name(), Utils::nullSafeToString(key).c_str());
// Cache listener won't be called in this case
return GF_NOERR;
} else if (err != GF_NOERR) {
return err;
}
}
} else if (err != GF_NOERR) {
return err;
}
} else { // if (getProcessedMarker())
action.getCallbackOldValue(cachingEnabled, key, entry, oldValue);
if (updateCount >= 0 && !m_regionAttributes.getConcurrencyChecksEnabled()) {
m_entries->removeTrackerForEntry(key);
}
}
// invokeCacheListenerForEntryEvent method has the check that if oldValue
// is a CacheableToken then it sets it to nullptr; also determines if it
// should be AFTER_UPDATE or AFTER_CREATE depending on oldValue
err =
invokeCacheListenerForEntryEvent(key, oldValue, value, aCallbackArgument,
eventFlags, TAction::s_afterEventType);
return err;
}
template <typename TAction>
GfErrType LocalRegion::updateNoThrowTX(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
const std::shared_ptr<Serializable>& aCallbackArgument,
std::shared_ptr<Cacheable>& oldValue, int updateCount,
const CacheEventFlags eventFlags, std::shared_ptr<VersionTag> versionTag,
DataInput* delta, std::shared_ptr<EventId> eventId) {
GfErrType err = GF_NOERR;
if ((err = TAction::checkArgs(key, value, delta)) != GF_NOERR) {
return err;
}
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
TAction action(*this);
bool cachingEnabled = m_regionAttributes.getCachingEnabled();
std::shared_ptr<MapEntryImpl> entry;
if (!eventFlags.isNotification() || getProcessedMarker()) {
if ((err = action.localUpdate(key, value, oldValue, cachingEnabled,
eventFlags, updateCount, versionTag, delta,
eventId)) == GF_CACHE_ENTRY_UPDATED) {
LOGFINEST(
"%s: did not change local value for key [%s] since it has "
"been updated by another thread while operation was in progress",
TAction::name(), Utils::nullSafeToString(key).c_str());
err = GF_NOERR;
} else if (err == GF_CACHE_ENTRY_NOT_FOUND) {
// Entry not found. Possibly because the entry was added and removed in
// the
// same transaction. Ignoring this error #739
LOGFINE(
"%s: No entry found. Possibly because the entry was added and "
"removed in the same transaction. "
"Ignoring this error. ",
TAction::name(), Utils::nullSafeToString(key).c_str());
err = GF_NOERR;
} else if (err != GF_NOERR) {
return err;
}
} else { // if (getProcessedMarker())
action.getCallbackOldValue(cachingEnabled, key, entry, oldValue);
if (updateCount >= 0 && !m_regionAttributes.getConcurrencyChecksEnabled()) {
m_entries->removeTrackerForEntry(key);
}
}
// invokeCacheListenerForEntryEvent method has the check that if oldValue
// is a CacheableToken then it sets it to nullptr; also determines if it
// should be AFTER_UPDATE or AFTER_CREATE depending on oldValue
err =
invokeCacheListenerForEntryEvent(key, oldValue, value, aCallbackArgument,
eventFlags, TAction::s_afterEventType);
return err;
}
GfErrType LocalRegion::putNoThrow(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
const std::shared_ptr<Serializable>& aCallbackArgument,
std::shared_ptr<Cacheable>& oldValue, int updateCount,
const CacheEventFlags eventFlags, std::shared_ptr<VersionTag> versionTag,
DataInput* delta, std::shared_ptr<EventId> eventId) {
return updateNoThrow<PutActions>(key, value, aCallbackArgument, oldValue,
updateCount, eventFlags, versionTag, delta,
eventId);
}
GfErrType LocalRegion::putNoThrowTX(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
const std::shared_ptr<Serializable>& aCallbackArgument,
std::shared_ptr<Cacheable>& oldValue, int updateCount,
const CacheEventFlags eventFlags, std::shared_ptr<VersionTag> versionTag,
DataInput* delta, std::shared_ptr<EventId> eventId) {
return updateNoThrowTX<PutActionsTx>(key, value, aCallbackArgument, oldValue,
updateCount, eventFlags, versionTag,
delta, eventId);
}
GfErrType LocalRegion::createNoThrow(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
const std::shared_ptr<Serializable>& aCallbackArgument, int updateCount,
const CacheEventFlags eventFlags, std::shared_ptr<VersionTag> versionTag) {
std::shared_ptr<Cacheable> oldValue;
return updateNoThrow<CreateActions>(key, value, aCallbackArgument, oldValue,
updateCount, eventFlags, versionTag);
}
GfErrType LocalRegion::destroyNoThrow(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Serializable>& aCallbackArgument, int updateCount,
const CacheEventFlags eventFlags, std::shared_ptr<VersionTag> versionTag) {
std::shared_ptr<Cacheable> oldValue;
return updateNoThrow<DestroyActions>(key, nullptr, aCallbackArgument,
oldValue, updateCount, eventFlags,
versionTag);
}
GfErrType LocalRegion::destroyNoThrowTX(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Serializable>& aCallbackArgument, int updateCount,
const CacheEventFlags eventFlags, std::shared_ptr<VersionTag> versionTag) {
std::shared_ptr<Cacheable> oldValue;
return updateNoThrowTX<DestroyActions>(key, nullptr, aCallbackArgument,
oldValue, updateCount, eventFlags,
versionTag);
}
GfErrType LocalRegion::removeNoThrow(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
const std::shared_ptr<Serializable>& aCallbackArgument, int updateCount,
const CacheEventFlags eventFlags, std::shared_ptr<VersionTag> versionTag) {
std::shared_ptr<Cacheable> oldValue;
return updateNoThrow<RemoveActions>(key, value, aCallbackArgument, oldValue,
updateCount, eventFlags, versionTag);
}
GfErrType LocalRegion::removeNoThrowEx(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Serializable>& aCallbackArgument, int updateCount,
const CacheEventFlags eventFlags, std::shared_ptr<VersionTag> versionTag) {
std::shared_ptr<Cacheable> oldValue;
return updateNoThrow<RemoveActionsEx>(key, nullptr, aCallbackArgument,
oldValue, updateCount, eventFlags,
versionTag);
}
GfErrType LocalRegion::invalidateNoThrow(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Serializable>& aCallbackArgument, int updateCount,
const CacheEventFlags eventFlags, std::shared_ptr<VersionTag> versionTag) {
std::shared_ptr<Cacheable> oldValue;
return updateNoThrow<InvalidateActions>(key, nullptr, aCallbackArgument,
oldValue, updateCount, eventFlags,
versionTag);
}
GfErrType LocalRegion::invalidateNoThrowTX(
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Serializable>& aCallbackArgument, int updateCount,
const CacheEventFlags eventFlags, std::shared_ptr<VersionTag> versionTag) {
std::shared_ptr<Cacheable> oldValue;
return updateNoThrowTX<InvalidateActions>(key, nullptr, aCallbackArgument,
oldValue, updateCount, eventFlags,
versionTag);
}
GfErrType LocalRegion::putAllNoThrow(
const HashMapOfCacheable& map, std::chrono::milliseconds timeout,
const std::shared_ptr<Serializable>& aCallbackArgument) {
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
GfErrType err = GF_NOERR;
// std::shared_ptr<VersionTag> versionTag;
std::shared_ptr<VersionedCacheableObjectPartList>
versionedObjPartListPtr; //= new VersionedCacheableObjectPartList();
TXState* txState = getTXState();
if (txState != nullptr) {
if (isLocalOp()) {
return GF_NOTSUP;
}
err = putAllNoThrow_remote(map, /*versionTag*/ versionedObjPartListPtr,
timeout, aCallbackArgument);
if (err == GF_NOERR) {
txState->setDirty();
}
return err;
}
bool cachingEnabled = m_regionAttributes.getCachingEnabled();
MapOfOldValue oldValueMap;
// remove tracking for the entries befor exiting the function
struct RemoveTracking {
private:
const MapOfOldValue& m_oldValueMap;
LocalRegion& m_region;
public:
RemoveTracking(const MapOfOldValue& oldValueMap, LocalRegion& region)
: m_oldValueMap(oldValueMap), m_region(region) {}
~RemoveTracking() {
if (!m_region.getAttributes().getConcurrencyChecksEnabled()) {
// need to remove the tracking added to the entries at the end
for (MapOfOldValue::const_iterator iter = m_oldValueMap.begin();
iter != m_oldValueMap.end(); ++iter) {
if (iter->second.second >= 0) {
m_region.m_entries->removeTrackerForEntry(iter->first);
}
}
}
}
} _removeTracking(oldValueMap, *this);
if (cachingEnabled || m_writer != nullptr) {
std::shared_ptr<Cacheable> oldValue;
for (const auto& iter : map) {
const auto& key = iter.first;
if (cachingEnabled && !m_regionAttributes.getConcurrencyChecksEnabled()) {
int updateCount =
m_entries->addTrackerForEntry(key, oldValue, true, false, true);
oldValueMap.insert(
std::make_pair(key, std::make_pair(oldValue, updateCount)));
}
if (m_writer != nullptr) {
// invokeCacheWriterForEntryEvent method has the check that if
// oldValue is a CacheableToken then it sets it to nullptr; also
// determines if it should be BEFORE_UPDATE or BEFORE_CREATE depending
// on oldValue
if (!invokeCacheWriterForEntryEvent(
key, oldValue, iter.second, aCallbackArgument,
CacheEventFlags::LOCAL, BEFORE_UPDATE)) {
PutActions::logCacheWriterFailure(key, oldValue);
return GF_CACHEWRITER_ERROR;
}
}
}
}
// try remote putAll, if any
if ((err = putAllNoThrow_remote(map, versionedObjPartListPtr, timeout,
aCallbackArgument)) != GF_NOERR) {
return err;
}
// next the local puts
GfErrType localErr;
std::shared_ptr<VersionTag> versionTag;
if (cachingEnabled) {
if (m_isPRSingleHopEnabled) { /*New PRSingleHop Case:: PR Singlehop
condition*/
for (size_t keyIndex = 0;
keyIndex < versionedObjPartListPtr->getSucceededKeys()->size();
keyIndex++) {
const auto valPtr =
versionedObjPartListPtr->getSucceededKeys()->at(keyIndex);
const auto& mapIter = map.find(valPtr);
std::shared_ptr<CacheableKey> key = nullptr;
std::shared_ptr<Cacheable> value = nullptr;
if (mapIter != map.end()) {
key = mapIter->first;
value = mapIter->second;
} else {
// ThrowERROR
LOGERROR(
"ERROR :: LocalRegion::putAllNoThrow() Key must be found in "
"the "
"usermap");
}
if (versionedObjPartListPtr) {
LOGDEBUG("versionedObjPartListPtr->getVersionedTagptr().size() = %d ",
versionedObjPartListPtr->getVersionedTagptr().size());
if (versionedObjPartListPtr->getVersionedTagptr().size() > 0) {
versionTag =
versionedObjPartListPtr->getVersionedTagptr()[keyIndex];
}
}
std::pair<std::shared_ptr<Cacheable>, int>& p = oldValueMap[key];
if ((localErr = LocalRegion::putNoThrow(
key, value, aCallbackArgument, p.first, p.second,
CacheEventFlags::LOCAL | CacheEventFlags::NOCACHEWRITER,
versionTag)) == GF_CACHE_ENTRY_UPDATED) {
LOGFINEST(
"Region::putAll: did not change local value for key [%s] "
"since it has been updated by another thread while operation "
"was "
"in progress",
Utils::nullSafeToString(key).c_str());
} else if (localErr == GF_CACHE_LISTENER_EXCEPTION) {
LOGFINER("Region::putAll: invoke listener error [%d] for key [%s]",
localErr, Utils::nullSafeToString(key).c_str());
err = localErr;
} else if (localErr != GF_NOERR) {
return localErr;
}
} // End of for loop
} else { /*Non SingleHop case :: PUTALL has taken multiple hops*/
LOGDEBUG(
"NILKANTH LocalRegion::putAllNoThrow m_isPRSingleHopEnabled = %d "
"expected false",
m_isPRSingleHopEnabled);
int index = 0;
for (const auto& iter : map) {
const auto& key = iter.first;
const auto& value = iter.second;
auto& p = oldValueMap[key];
if (versionedObjPartListPtr) {
LOGDEBUG("versionedObjPartListPtr->getVersionedTagptr().size() = %d ",
versionedObjPartListPtr->getVersionedTagptr().size());
if (versionedObjPartListPtr->getVersionedTagptr().size() > 0) {
versionTag = versionedObjPartListPtr->getVersionedTagptr()[index++];
}
}
if ((localErr = LocalRegion::putNoThrow(
key, value, aCallbackArgument, p.first, p.second,
CacheEventFlags::LOCAL | CacheEventFlags::NOCACHEWRITER,
versionTag)) == GF_CACHE_ENTRY_UPDATED) {
LOGFINEST(
"Region::putAll: did not change local value for key [%s] "
"since it has been updated by another thread while operation "
"was "
"in progress",
Utils::nullSafeToString(key).c_str());
} else if (localErr == GF_CACHE_LISTENER_EXCEPTION) {
LOGFINER("Region::putAll: invoke listener error [%d] for key [%s]",
localErr, Utils::nullSafeToString(key).c_str());
err = localErr;
} else if (localErr != GF_NOERR) {
return localErr;
}
}
}
}
m_regionStats->incPutAll();
return err;
}
GfErrType LocalRegion::removeAllNoThrow(
const std::vector<std::shared_ptr<CacheableKey>>& keys,
const std::shared_ptr<Serializable>& aCallbackArgument) {
// 1. check destroy pending
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
GfErrType err = GF_NOERR;
std::shared_ptr<VersionedCacheableObjectPartList> versionedObjPartListPtr;
// 2.check transaction state and do remote op
TXState* txState = getTXState();
if (txState != nullptr) {
if (isLocalOp()) return GF_NOTSUP;
err = removeAllNoThrow_remote(keys, versionedObjPartListPtr,
aCallbackArgument);
if (err == GF_NOERR) txState->setDirty();
return err;
}
// 3.add tracking
bool cachingEnabled = m_regionAttributes.getCachingEnabled();
// 4. do remote removeAll
err =
removeAllNoThrow_remote(keys, versionedObjPartListPtr, aCallbackArgument);
if (err != GF_NOERR) {
return err;
}
// 5. update local cache
GfErrType localErr;
std::shared_ptr<VersionTag> versionTag;
if (cachingEnabled) {
std::vector<std::shared_ptr<CacheableKey>>* keysPtr;
if (m_isPRSingleHopEnabled) {
keysPtr = versionedObjPartListPtr->getSucceededKeys().get();
} else {
keysPtr = const_cast<std::vector<std::shared_ptr<CacheableKey>>*>(&keys);
}
for (size_t keyIndex = 0; keyIndex < keysPtr->size(); keyIndex++) {
auto key = keysPtr->at(keyIndex);
if (versionedObjPartListPtr) {
LOGDEBUG("versionedObjPartListPtr->getVersionedTagptr().size() = %d ",
versionedObjPartListPtr->getVersionedTagptr().size());
if (versionedObjPartListPtr->getVersionedTagptr().size() > 0) {
versionTag = versionedObjPartListPtr->getVersionedTagptr()[keyIndex];
}
if (versionTag == nullptr) {
LOGDEBUG(
"RemoveAll hits EntryNotFoundException at server side for key "
"[%s], not to destroy it from local cache.",
Utils::nullSafeToString(key).c_str());
continue;
}
}
if ((localErr = LocalRegion::destroyNoThrow(
key, aCallbackArgument, -1,
CacheEventFlags::LOCAL | CacheEventFlags::NOCACHEWRITER,
versionTag)) == GF_CACHE_ENTRY_UPDATED) {
LOGFINEST(
"Region::removeAll: did not remove local value for key [%s] "
"since it has been updated by another thread while operation was "
"in progress",
Utils::nullSafeToString(key).c_str());
} else if (localErr == GF_CACHE_LISTENER_EXCEPTION) {
LOGFINER("Region::removeAll: invoke listener error [%d] for key [%s]",
localErr, Utils::nullSafeToString(key).c_str());
err = localErr;
} else if (localErr == GF_CACHE_ENTRY_NOT_FOUND) {
LOGFINER("Region::removeAll: error [%d] for key [%s]", localErr,
Utils::nullSafeToString(key).c_str());
} else if (localErr != GF_NOERR) {
return localErr;
}
} // End of for loop
}
// 6.update stats
m_regionStats->incRemoveAll();
return err;
}
void LocalRegion::clear(
const std::shared_ptr<Serializable>& aCallbackArgument) {
/*update the stats */
int64_t sampleStartNanos = startStatOpTime();
localClear(aCallbackArgument);
updateStatOpTime(m_regionStats->getStat(), m_regionStats->getClearsId(),
sampleStartNanos);
}
void LocalRegion::localClear(
const std::shared_ptr<Serializable>& aCallbackArgument) {
GfErrType err = localClearNoThrow(aCallbackArgument, CacheEventFlags::LOCAL);
if (err != GF_NOERR) GfErrTypeToException("LocalRegion::localClear", err);
}
GfErrType LocalRegion::localClearNoThrow(
const std::shared_ptr<Serializable>& aCallbackArgument,
const CacheEventFlags eventFlags) {
bool cachingEnabled = m_regionAttributes.getCachingEnabled();
/*Update the stats for clear*/
m_regionStats->incClears();
GfErrType err = GF_NOERR;
TryReadGuard guard(m_rwLock, m_destroyPending);
if (m_released || m_destroyPending) return err;
if (!invokeCacheWriterForRegionEvent(aCallbackArgument, eventFlags,
BEFORE_REGION_CLEAR)) {
LOGFINE("Cache writer prevented region clear");
return GF_CACHEWRITER_ERROR;
}
if (cachingEnabled == true) m_entries->clear();
if (!eventFlags.isNormal()) {
err = invokeCacheListenerForRegionEvent(aCallbackArgument, eventFlags,
AFTER_REGION_CLEAR);
}
return err;
}
GfErrType LocalRegion::invalidateLocal(
const std::string& name, const std::shared_ptr<CacheableKey>& keyPtr,
const std::shared_ptr<Cacheable>& value, const CacheEventFlags eventFlags,
std::shared_ptr<VersionTag> versionTag) {
if (keyPtr == nullptr) {
return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION;
}
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
GfErrType err = GF_NOERR;
bool cachingEnabled = m_regionAttributes.getCachingEnabled();
std::shared_ptr<Cacheable> oldValue;
std::shared_ptr<MapEntryImpl> me;
if (!eventFlags.isNotification() || getProcessedMarker()) {
if (cachingEnabled) {
LOGDEBUG("%s: region [%s] invalidating key [%s], value [%s]",
name.c_str(), getFullPath().c_str(),
Utils::nullSafeToString(keyPtr).c_str(),
Utils::nullSafeToString(value).c_str());
/* adongre - Coverity II
* CID 29193: Parse warning (PW.PARAMETER_HIDDEN)
*/
// std::shared_ptr<VersionTag> versionTag;
if ((err = m_entries->invalidate(keyPtr, me, oldValue, versionTag)) !=
GF_NOERR) {
if (eventFlags.isNotification()) {
LOGDEBUG(
"Region::invalidate: region [%s] invalidate key [%s] "
"failed with error %d",
getFullPath().c_str(), Utils::nullSafeToString(keyPtr).c_str(),
err);
}
if (err == GF_CACHE_CONCURRENT_MODIFICATION_EXCEPTION) {
LOGDEBUG(
"Region::invalidateLocal: invalidate for key [%s] failed because the cache already contains \
an entry with higher version. The cache listener will not be invoked.",
Utils::nullSafeToString(keyPtr).c_str());
// Cache listener won't be called in this case
return GF_NOERR;
}
// for notification invoke the listener even if the key does
// not exist locally
if (!eventFlags.isNotification() || err != GF_CACHE_ENTRY_NOT_FOUND) {
return err;
} else {
err = GF_NOERR;
}
} else {
LOGDEBUG("Region::invalidate: region [%s] invalidated key [%s]",
getFullPath().c_str(),
Utils::nullSafeToString(keyPtr).c_str());
}
// entry/region expiration
if (!eventFlags.isEvictOrExpire()) {
updateAccessAndModifiedTime(true);
}
}
} else { // if (getProcessedMarker())
if (cachingEnabled) {
m_entries->getEntry(keyPtr, me, oldValue);
}
}
return err;
}
GfErrType LocalRegion::invalidateRegionNoThrowOnSubRegions(
const std::shared_ptr<Serializable>& aCallbackArgument,
const CacheEventFlags eventFlags) {
auto&& lock = m_subRegions.make_lock();
for (const auto& kv : m_subRegions) {
if (auto subRegion = std::dynamic_pointer_cast<RegionInternal>(kv.second)) {
auto err =
subRegion->invalidateRegionNoThrow(aCallbackArgument, eventFlags);
if (err != GF_NOERR) {
return err;
}
}
}
return GF_NOERR;
}
GfErrType LocalRegion::invalidateRegionNoThrow(
const std::shared_ptr<Serializable>& aCallbackArgument,
const CacheEventFlags eventFlags) {
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
GfErrType err = GF_NOERR;
if (m_regionAttributes.getCachingEnabled()) {
std::vector<std::shared_ptr<CacheableKey>> v = keys_internal();
auto size = v.size();
std::shared_ptr<MapEntryImpl> me;
for (decltype(size) i = 0; i < size; i++) {
{
std::shared_ptr<Cacheable> oldValue;
// invalidate all the entries with a nullptr versionTag
std::shared_ptr<VersionTag> versionTag;
m_entries->invalidate(v.at(i), me, oldValue, versionTag);
if (!eventFlags.isEvictOrExpire()) {
updateAccessAndModifiedTimeForEntry(me, true);
}
}
}
if (!eventFlags.isEvictOrExpire()) {
updateAccessAndModifiedTime(true);
}
}
// try remote region invalidate, if any
if (!eventFlags.isLocal()) {
err = invalidateRegionNoThrow_remote(aCallbackArgument);
if (err != GF_NOERR) return err;
}
err = invalidateRegionNoThrowOnSubRegions(aCallbackArgument, eventFlags);
if (err != GF_NOERR) {
return err;
}
err = invokeCacheListenerForRegionEvent(aCallbackArgument, eventFlags,
AFTER_REGION_INVALIDATE);
return err;
}
GfErrType LocalRegion::destroyRegionNoThrow(
const std::shared_ptr<Serializable>& aCallbackArgument,
bool removeFromParent, const CacheEventFlags eventFlags) {
// Get global locks to synchronize with failover thread.
// TODO: This should go into RegionGlobalLocks
// The distMngrsLock is required before RegionGlobalLocks since failover
// thread acquires distMngrsLock and then tries to acquire endpoints lock
// which is already taken by RegionGlobalLocks here.
DistManagersLockGuard _guard(m_cacheImpl->tcrConnectionManager());
RegionGlobalLocks acquireLocks(this);
// Fix for BUG:849, i.e Remove subscription on region before destroying the
// region
if (eventFlags == CacheEventFlags::LOCAL) {
if (unregisterKeysBeforeDestroyRegion() != GF_NOERR) {
LOGDEBUG(
"DEBUG :: LocalRegion::destroyRegionNoThrow UnregisteredKeys "
"Failed");
}
}
TryWriteGuard guard(m_rwLock, m_destroyPending);
if (m_destroyPending) {
if (eventFlags.isCacheClose()) {
return GF_NOERR;
} else {
return GF_CACHE_REGION_DESTROYED_EXCEPTION;
}
}
m_destroyPending = true;
LOGDEBUG("LocalRegion::destroyRegionNoThrow( ): set flag destroy-pending.");
GfErrType err = GF_NOERR;
// do not invoke the writer for expiry or notification
if (!eventFlags.isNotification() && !eventFlags.isEvictOrExpire()) {
if (!invokeCacheWriterForRegionEvent(aCallbackArgument, eventFlags,
BEFORE_REGION_DESTROY)) {
// do not let CacheWriter veto when this is Cache::close()
if (!eventFlags.isCacheClose()) {
LOGFINE("Cache writer prevented region destroy");
m_destroyPending = false;
return GF_CACHEWRITER_ERROR;
}
}
// for the expiry case try the local destroy first and remote
// destroy only if local destroy succeeds
if (!eventFlags.isLocal()) {
err = destroyRegionNoThrow_remote(aCallbackArgument);
if (err != GF_NOERR) {
m_destroyPending = false;
return err;
}
}
}
LOGFINE("Region %s is being destroyed", m_fullPath.c_str());
{
auto&& lock = m_subRegions.make_lock();
for (const auto& kv : m_subRegions) {
// TODO: remove unnecessary dynamic_cast by having m_subRegions hold
// RegionInternal and invoke the destroy method in that
if (auto subRegion =
std::dynamic_pointer_cast<RegionInternal>(kv.second)) {
// for subregions never remove from parent since that will cause
// the region to be destroyed and SEGV; unbind_all takes care of that
// Also don't send remote destroy message for sub-regions
err = subRegion->destroyRegionNoThrow(
aCallbackArgument, false, eventFlags | CacheEventFlags::LOCAL);
// for Cache::close() keep going as far as possible
if (err != GF_NOERR && !eventFlags.isCacheClose()) {
m_destroyPending = false;
return err;
}
}
}
}
m_subRegions.clear();
// for the expiry case try the local destroy first and remote
// destroy only if local destroy succeeds
if (eventFlags.isEvictOrExpire() && !eventFlags.isLocal()) {
err = destroyRegionNoThrow_remote(aCallbackArgument);
if (err != GF_NOERR) {
m_destroyPending = false;
return err;
}
}
// if we are not removing from parent then this is a proper
// region close so invoke listener->close() also
err = invokeCacheListenerForRegionEvent(aCallbackArgument, eventFlags,
AFTER_REGION_DESTROY);
release(true);
if (m_regionAttributes.getCachingEnabled()) {
_GEODE_SAFE_DELETE(m_entries);
}
GF_D_ASSERT(m_destroyPending);
if (removeFromParent) {
if (m_parentRegion == nullptr) {
m_cacheImpl->removeRegion(m_name.c_str());
} else {
LocalRegion* parent = dynamic_cast<LocalRegion*>(m_parentRegion.get());
if (parent != nullptr) {
parent->removeRegion(m_name);
if (!eventFlags.isEvictOrExpire()) {
parent->updateAccessAndModifiedTime(true);
}
}
}
}
return err;
}
GfErrType LocalRegion::putLocal(const std::string& name, bool isCreate,
const std::shared_ptr<CacheableKey>& key,
const std::shared_ptr<Cacheable>& value,
std::shared_ptr<Cacheable>& oldValue,
bool cachingEnabled, int updateCount,
int destroyTracker,
std::shared_ptr<VersionTag> versionTag,
DataInput* delta,
std::shared_ptr<EventId> eventId) {
GfErrType err = GF_NOERR;
bool isUpdate = !isCreate;
auto& cachePerfStats = m_cacheImpl->getCachePerfStats();
if (cachingEnabled) {
std::shared_ptr<MapEntryImpl> entry;
LOGDEBUG("%s: region [%s] putting key [%s], value [%s]", name.c_str(),
getFullPath().c_str(), Utils::nullSafeToString(key).c_str(),
Utils::nullSafeToString(value).c_str());
if (isCreate) {
err = m_entries->create(key, value, entry, oldValue, updateCount,
destroyTracker, versionTag);
} else {
err = m_entries->put(key, value, entry, oldValue, updateCount,
destroyTracker, versionTag, isUpdate, delta);
if (err == GF_INVALID_DELTA) {
cachePerfStats.incFailureOnDeltaReceived();
// PXR: Get full object from server.
std::shared_ptr<Cacheable>& newValue1 =
const_cast<std::shared_ptr<Cacheable>&>(value);
std::shared_ptr<VersionTag> versionTag1;
err = getNoThrow_FullObject(eventId, newValue1, versionTag1);
if (err == GF_NOERR && newValue1 != nullptr) {
err = m_entries->put(
key, newValue1, entry, oldValue, updateCount, destroyTracker,
versionTag1 != nullptr ? versionTag1 : versionTag, isUpdate);
}
}
if (delta != nullptr &&
err == GF_NOERR) { // Means that delta is on and there is no failure.
cachePerfStats.incDeltaReceived();
}
}
if (err != GF_NOERR) {
return err;
}
LOGDEBUG("%s: region [%s] %s key [%s], value [%s]", name.c_str(),
getFullPath().c_str(), isUpdate ? "updated" : "created",
Utils::nullSafeToString(key).c_str(),
Utils::nullSafeToString(value).c_str());
// entry/region expiration
if (entryExpiryEnabled()) {
if (isUpdate && entry->getExpProperties().getExpiryTaskId() != -1) {
updateAccessAndModifiedTimeForEntry(entry, true);
} else {
registerEntryExpiryTask(entry);
}
}
updateAccessAndModifiedTime(true);
}
// update the stats
if (isUpdate) {
m_regionStats->incPuts();
cachePerfStats.incPuts();
} else {
if (cachingEnabled) {
m_regionStats->setEntries(m_entries->size());
cachePerfStats.incEntries(1);
}
m_regionStats->incCreates();
cachePerfStats.incCreates();
}
return err;
}
std::vector<std::shared_ptr<CacheableKey>> LocalRegion::keys_internal() {
std::vector<std::shared_ptr<CacheableKey>> keys;
if (m_regionAttributes.getCachingEnabled()) {
m_entries->getKeys(keys);
}
return keys;
}
void LocalRegion::entries_internal(
std::vector<std::shared_ptr<RegionEntry>>& me, const bool recursive) {
m_entries->getEntries(me);
if (recursive == true) {
auto&& lock = m_subRegions.make_lock();
for (const auto& kv : m_subRegions) {
if (auto subRegion = std::dynamic_pointer_cast<LocalRegion>(kv.second)) {
subRegion->entries_internal(me, true);
}
}
}
}
void LocalRegion::removeRegion(const std::string& name) {
m_subRegions.erase(name);
}
bool LocalRegion::invokeCacheWriterForEntryEvent(
const std::shared_ptr<CacheableKey>& key,
std::shared_ptr<Cacheable>& oldValue,
const std::shared_ptr<Cacheable>& newValue,
const std::shared_ptr<Serializable>& aCallbackArgument,
CacheEventFlags eventFlags, EntryEventType type) {
// Check if we have a local cache writer. If so, invoke and return.
bool bCacheWriterReturn = true;
if (m_writer != nullptr) {
if (oldValue != nullptr && CacheableToken::isInvalid(oldValue)) {
oldValue = nullptr;
}
EntryEvent event(shared_from_this(), key, oldValue, newValue,
aCallbackArgument, eventFlags.isNotification());
const char* eventStr = "unknown";
try {
bool updateStats = true;
/*Update the CacheWriter Stats*/
int64_t sampleStartNanos = startStatOpTime();
switch (type) {
case BEFORE_UPDATE: {
if (oldValue != nullptr) {
eventStr = "beforeUpdate";
bCacheWriterReturn = m_writer->beforeUpdate(event);
break;
}
// if oldValue is nullptr then fall to BEFORE_CREATE case
}
case BEFORE_CREATE: {
eventStr = "beforeCreate";
bCacheWriterReturn = m_writer->beforeCreate(event);
break;
}
case BEFORE_DESTROY: {
eventStr = "beforeDestroy";
bCacheWriterReturn = m_writer->beforeDestroy(event);
break;
}
default: {
updateStats = false;
break;
}
}
if (updateStats) {
updateStatOpTime(m_regionStats->getStat(),
m_regionStats->getWriterCallTimeId(),
sampleStartNanos);
m_regionStats->incWriterCallsCompleted();
}
} catch (const Exception& ex) {
LOGERROR(std::string("Exception in CacheWriter::") + eventStr + ": " +
ex.getName() + ": " + ex.getMessage());
bCacheWriterReturn = false;
} catch (...) {
LOGERROR("Unknown exception in CacheWriter::%s", eventStr);
bCacheWriterReturn = false;
}
}
return bCacheWriterReturn;
}
bool LocalRegion::invokeCacheWriterForRegionEvent(
const std::shared_ptr<Serializable>& aCallbackArgument,
CacheEventFlags eventFlags, RegionEventType type) {
// Check if we have a local cache writer. If so, invoke and return.
bool bCacheWriterReturn = true;
if (m_writer != nullptr) {
RegionEvent event(shared_from_this(), aCallbackArgument,
eventFlags.isNotification());
const char* eventStr = "unknown";
try {
bool updateStats = true;
/*Update the CacheWriter Stats*/
int64_t sampleStartNanos = startStatOpTime();
switch (type) {
case BEFORE_REGION_DESTROY: {
eventStr = "beforeRegionDestroy";
bCacheWriterReturn = m_writer->beforeRegionDestroy(event);
break;
}
case BEFORE_REGION_CLEAR: {
eventStr = "beforeRegionClear";
bCacheWriterReturn = m_writer->beforeRegionClear(event);
break;
}
default: {
updateStats = false;
break;
}
}
if (updateStats) {
updateStatOpTime(m_regionStats->getStat(),
m_regionStats->getWriterCallTimeId(),
sampleStartNanos);
m_regionStats->incWriterCallsCompleted();
}
} catch (const Exception& ex) {
LOGERROR(std::string("Exception in CacheWriter::") + eventStr + ": " +
ex.getName() + ": " + ex.getMessage());
bCacheWriterReturn = false;
} catch (...) {
LOGERROR("Unknown exception in CacheWriter::%s", eventStr);
bCacheWriterReturn = false;
}
}
return bCacheWriterReturn;
}
GfErrType LocalRegion::invokeCacheListenerForEntryEvent(
const std::shared_ptr<CacheableKey>& key,
std::shared_ptr<Cacheable>& oldValue,
const std::shared_ptr<Cacheable>& newValue,
const std::shared_ptr<Serializable>& aCallbackArgument,
CacheEventFlags eventFlags, EntryEventType type, bool isLocal) {
GfErrType err = GF_NOERR;
// Check if we have a local cache listener. If so, invoke and return.
if (m_listener != nullptr) {
if (oldValue != nullptr && CacheableToken::isInvalid(oldValue)) {
oldValue = nullptr;
}
EntryEvent event(shared_from_this(), key, oldValue, newValue,
aCallbackArgument, eventFlags.isNotification());
const char* eventStr = "unknown";
try {
bool updateStats = true;
/*Update the CacheWriter Stats*/
int64_t sampleStartNanos = startStatOpTime();
switch (type) {
case AFTER_UPDATE: {
// when CREATE is received from server for notification
// then force an afterUpdate even if key is not present in cache.
if (oldValue != nullptr || eventFlags.isNotificationUpdate() ||
isLocal) {
eventStr = "afterUpdate";
m_listener->afterUpdate(event);
break;
}
// if oldValue is nullptr then fall to AFTER_CREATE case
}
case AFTER_CREATE: {
eventStr = "afterCreate";
m_listener->afterCreate(event);
break;
}
case AFTER_DESTROY: {
eventStr = "afterDestroy";
m_listener->afterDestroy(event);
break;
}
case AFTER_INVALIDATE: {
eventStr = "afterInvalidate";
m_listener->afterInvalidate(event);
break;
}
default: {
updateStats = false;
break;
}
}
if (updateStats) {
m_cacheImpl->getCachePerfStats().incListenerCalls();
updateStatOpTime(m_regionStats->getStat(),
m_regionStats->getListenerCallTimeId(),
sampleStartNanos);
m_regionStats->incListenerCallsCompleted();
}
} catch (const Exception& ex) {
LOGERROR("Exception in CacheListener for key[%s]::%s: %s: %s",
Utils::nullSafeToString(key).c_str(), eventStr,
ex.getName().c_str(), ex.what());
err = GF_CACHE_LISTENER_EXCEPTION;
} catch (...) {
LOGERROR("Unknown exception in CacheListener for key[%s]::%s",
Utils::nullSafeToString(key).c_str(), eventStr);
err = GF_CACHE_LISTENER_EXCEPTION;
}
}
return err;
}
GfErrType LocalRegion::invokeCacheListenerForRegionEvent(
const std::shared_ptr<Serializable>& aCallbackArgument,
CacheEventFlags eventFlags, RegionEventType type) {
GfErrType err = GF_NOERR;
// Check if we have a local cache listener. If so, invoke and return.
if (m_listener != nullptr) {
RegionEvent event(shared_from_this(), aCallbackArgument,
eventFlags.isNotification());
const char* eventStr = "unknown";
try {
bool updateStats = true;
/*Update the CacheWriter Stats*/
int64_t sampleStartNanos = Utils::startStatOpTime();
switch (type) {
case AFTER_REGION_DESTROY: {
eventStr = "afterRegionDestroy";
m_listener->afterRegionDestroy(event);
m_cacheImpl->getCachePerfStats().incListenerCalls();
if (eventFlags.isCacheClose()) {
eventStr = "close";
m_listener->close(*this);
m_cacheImpl->getCachePerfStats().incListenerCalls();
}
break;
}
case AFTER_REGION_INVALIDATE: {
eventStr = "afterRegionInvalidate";
m_listener->afterRegionInvalidate(event);
m_cacheImpl->getCachePerfStats().incListenerCalls();
break;
}
case AFTER_REGION_CLEAR: {
eventStr = "afterRegionClear";
m_listener->afterRegionClear(event);
break;
}
default: {
updateStats = false;
break;
}
}
if (updateStats) {
updateStatOpTime(m_regionStats->getStat(),
m_regionStats->getListenerCallTimeId(),
sampleStartNanos);
m_regionStats->incListenerCallsCompleted();
}
} catch (const Exception& ex) {
LOGERROR("Exception in CacheListener::%s: %s: %s", eventStr,
ex.getName().c_str(), ex.what());
err = GF_CACHE_LISTENER_EXCEPTION;
} catch (...) {
LOGERROR("Unknown exception in CacheListener::%s", eventStr);
err = GF_CACHE_LISTENER_EXCEPTION;
}
}
return err;
}
// TODO: pass current time instead of evaluating it twice, here
// and in region
void LocalRegion::updateAccessAndModifiedTimeForEntry(
std::shared_ptr<MapEntryImpl>& ptr, bool modified) {
// locking is not required since setters use atomic operations
if (ptr != nullptr && entryExpiryEnabled()) {
ExpEntryProperties& expProps = ptr->getExpProperties();
auto currTime = std::chrono::system_clock::now();
std::string keyStr;
if (Log::debugEnabled()) {
std::shared_ptr<CacheableKey> key;
ptr->getKeyI(key);
keyStr = Utils::nullSafeToString(key);
}
LOGDEBUG("Setting last accessed time for key [%s] in region %s to %d",
keyStr.c_str(), getFullPath().c_str(),
currTime.time_since_epoch().count());
expProps.updateLastAccessTime(currTime);
if (modified) {
LOGDEBUG("Setting last modified time for key [%s] in region %s to %d",
keyStr.c_str(), getFullPath().c_str(),
currTime.time_since_epoch().count());
expProps.updateLastModifiedTime(currTime);
}
}
}
uint32_t LocalRegion::adjustLruEntriesLimit(uint32_t limit) {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::adjustLruEntriesLimit);
auto attrs = m_regionAttributes;
if (!attrs.getCachingEnabled()) return 0;
bool hadlru = (attrs.getLruEntriesLimit() != 0);
bool needslru = (limit != 0);
if (hadlru != needslru) {
throw IllegalStateException(
"Cannot disable or enable LRU, can only adjust limit.");
}
uint32_t oldValue = attrs.getLruEntriesLimit();
setLruEntriesLimit(limit);
if (needslru) {
// checked in AttributesMutator already to assert that LRU was enabled..
LRUEntriesMap* lrumap = static_cast<LRUEntriesMap*>(m_entries);
lrumap->adjustLimit(limit);
}
return oldValue;
}
ExpirationAction LocalRegion::adjustRegionExpiryAction(
ExpirationAction action) {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::adjustRegionExpiryAction);
auto attrs = m_regionAttributes;
bool hadExpiry = (getRegionExpiryDuration() > std::chrono::seconds::zero());
if (!hadExpiry) {
throw IllegalStateException(
"Cannot change region ExpirationAction for region created without "
"region expiry.");
}
ExpirationAction oldValue = getRegionExpiryAction();
setRegionTimeToLiveExpirationAction(action);
setRegionIdleTimeoutExpirationAction(action);
// m_regionExpirationAction = action;
return oldValue;
}
ExpirationAction LocalRegion::adjustEntryExpiryAction(ExpirationAction action) {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::adjustEntryExpiryAction);
auto attrs = m_regionAttributes;
bool hadExpiry = (getEntryExpiryDuration() > std::chrono::seconds::zero());
if (!hadExpiry) {
throw IllegalStateException(
"Cannot change entry ExpirationAction for region created without "
"entry "
"expiry.");
}
ExpirationAction oldValue = getEntryExpirationAction();
setEntryTimeToLiveExpirationAction(action);
setEntryIdleTimeoutExpirationAction(action);
return oldValue;
}
std::chrono::seconds LocalRegion::adjustRegionExpiryDuration(
const std::chrono::seconds& duration) {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::adjustRegionExpiryDuration);
bool hadExpiry = (getEntryExpiryDuration() > std::chrono::seconds::zero());
if (!hadExpiry) {
throw IllegalStateException(
"Cannot change region expiration duration for region created "
"without "
"region expiry.");
}
const auto& oldValue = getRegionExpiryDuration();
setRegionTimeToLive(duration);
setRegionIdleTimeout(duration);
return oldValue;
}
std::chrono::seconds LocalRegion::adjustEntryExpiryDuration(
const std::chrono::seconds& duration) {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::adjustEntryExpiryDuration);
bool hadExpiry = (getEntryExpiryDuration() > std::chrono::seconds::zero());
if (!hadExpiry) {
throw IllegalStateException(
"Cannot change entry expiration duration for region created without "
"entry expiry.");
}
auto oldValue = getEntryExpiryDuration();
setEntryTimeToLive(duration);
setEntryIdleTimeout(duration);
return oldValue;
}
/** they used to public methods in hpp file */
bool LocalRegion::isStatisticsEnabled() {
if (m_cacheImpl == nullptr) {
return false;
}
return m_cacheImpl->getDistributedSystem()
.getSystemProperties()
.statisticsEnabled();
}
bool LocalRegion::useModifiedTimeForRegionExpiry() {
const auto& region_ttl = m_regionAttributes.getRegionTimeToLive();
if (region_ttl > std::chrono::seconds::zero()) {
return true;
} else {
return false;
}
}
bool LocalRegion::useModifiedTimeForEntryExpiry() {
if (m_regionAttributes.getEntryTimeToLive() > std::chrono::seconds::zero()) {
return true;
} else {
return false;
}
}
bool LocalRegion::isEntryIdletimeEnabled() {
if (m_regionAttributes.getCachingEnabled() &&
m_regionAttributes.getEntryIdleTimeout() > std::chrono::seconds::zero()) {
return true;
} else {
return false;
}
}
ExpirationAction LocalRegion::getEntryExpirationAction() const {
if (m_regionAttributes.getEntryTimeToLive() > std::chrono::seconds::zero()) {
return m_regionAttributes.getEntryTimeToLiveAction();
} else {
return m_regionAttributes.getEntryIdleTimeoutAction();
}
}
ExpirationAction LocalRegion::getRegionExpiryAction() const {
const auto& region_ttl = m_regionAttributes.getRegionTimeToLive();
if (region_ttl > std::chrono::seconds::zero()) {
return m_regionAttributes.getRegionTimeToLiveAction();
} else {
return m_regionAttributes.getRegionIdleTimeoutAction();
}
}
std::chrono::seconds LocalRegion::getRegionExpiryDuration() const {
const auto& region_ttl = m_regionAttributes.getRegionTimeToLive();
const auto& region_idle = m_regionAttributes.getRegionIdleTimeout();
if (region_ttl > std::chrono::seconds::zero()) {
return region_ttl;
} else {
return region_idle;
}
}
std::chrono::seconds LocalRegion::getEntryExpiryDuration() const {
const auto& entry_ttl = m_regionAttributes.getEntryTimeToLive();
const auto& entry_idle = m_regionAttributes.getEntryIdleTimeout();
if (entry_ttl > std::chrono::seconds::zero()) {
return entry_ttl;
} else {
return entry_idle;
}
}
/** methods to be overridden by derived classes*/
GfErrType LocalRegion::unregisterKeysBeforeDestroyRegion() { return GF_NOERR; }
GfErrType LocalRegion::getNoThrow_remote(const std::shared_ptr<CacheableKey>&,
std::shared_ptr<Cacheable>&,
const std::shared_ptr<Serializable>&,
std::shared_ptr<VersionTag>&) {
return GF_NOERR;
}
GfErrType LocalRegion::putNoThrow_remote(const std::shared_ptr<CacheableKey>&,
const std::shared_ptr<Cacheable>&,
const std::shared_ptr<Serializable>&,
std::shared_ptr<VersionTag>&, bool) {
return GF_NOERR;
}
GfErrType LocalRegion::putAllNoThrow_remote(
const HashMapOfCacheable&,
std::shared_ptr<VersionedCacheableObjectPartList>&,
std::chrono::milliseconds, const std::shared_ptr<Serializable>&) {
return GF_NOERR;
}
GfErrType LocalRegion::removeAllNoThrow_remote(
const std::vector<std::shared_ptr<CacheableKey>>&,
std::shared_ptr<VersionedCacheableObjectPartList>&,
const std::shared_ptr<Serializable>&) {
return GF_NOERR;
}
GfErrType LocalRegion::createNoThrow_remote(
const std::shared_ptr<CacheableKey>&, const std::shared_ptr<Cacheable>&,
const std::shared_ptr<Serializable>&, std::shared_ptr<VersionTag>&) {
return GF_NOERR;
}
GfErrType LocalRegion::destroyNoThrow_remote(
const std::shared_ptr<CacheableKey>&, const std::shared_ptr<Serializable>&,
std::shared_ptr<VersionTag>&) {
return GF_NOERR;
}
GfErrType LocalRegion::removeNoThrow_remote(
const std::shared_ptr<CacheableKey>&, const std::shared_ptr<Cacheable>&,
const std::shared_ptr<Serializable>&, std::shared_ptr<VersionTag>&) {
return GF_NOERR;
}
GfErrType LocalRegion::removeNoThrowEX_remote(
const std::shared_ptr<CacheableKey>&, const std::shared_ptr<Serializable>&,
std::shared_ptr<VersionTag>&) {
return GF_NOERR;
}
GfErrType LocalRegion::invalidateNoThrow_remote(
const std::shared_ptr<CacheableKey>&, const std::shared_ptr<Serializable>&,
std::shared_ptr<VersionTag>&) {
return GF_NOERR;
}
GfErrType LocalRegion::getAllNoThrow_remote(
const std::vector<std::shared_ptr<CacheableKey>>*,
const std::shared_ptr<HashMapOfCacheable>&,
const std::shared_ptr<HashMapOfException>&,
const std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>>&, bool,
const std::shared_ptr<Serializable>&) {
return GF_NOERR;
}
GfErrType LocalRegion::invalidateRegionNoThrow_remote(
const std::shared_ptr<Serializable>&) {
return GF_NOERR;
}
GfErrType LocalRegion::destroyRegionNoThrow_remote(
const std::shared_ptr<Serializable>&) {
return GF_NOERR;
}
void LocalRegion::adjustCacheListener(
const std::shared_ptr<CacheListener>& aListener) {
WriteGuard guard(m_rwLock);
setCacheListener(aListener);
m_listener = aListener;
}
void LocalRegion::adjustCacheListener(const std::string& lib,
const std::string& func) {
WriteGuard guard(m_rwLock);
setCacheListener(lib, func);
m_listener = m_regionAttributes.getCacheListener();
}
void LocalRegion::adjustCacheLoader(
const std::shared_ptr<CacheLoader>& aLoader) {
WriteGuard guard(m_rwLock);
setCacheLoader(aLoader);
m_loader = aLoader;
}
void LocalRegion::adjustCacheLoader(const std::string& lib,
const std::string& func) {
WriteGuard guard(m_rwLock);
setCacheLoader(lib, func);
m_loader = m_regionAttributes.getCacheLoader();
}
void LocalRegion::adjustCacheWriter(
const std::shared_ptr<CacheWriter>& aWriter) {
WriteGuard guard(m_rwLock);
setCacheWriter(aWriter);
m_writer = aWriter;
}
void LocalRegion::adjustCacheWriter(const std::string& lib,
const std::string& func) {
WriteGuard guard(m_rwLock);
setCacheWriter(lib, func);
m_writer = m_regionAttributes.getCacheWriter();
}
void LocalRegion::evict(int32_t percentage) {
TryReadGuard guard(m_rwLock, m_destroyPending);
if (m_released || m_destroyPending) return;
if (m_entries != nullptr) {
int32_t size = m_entries->size();
int32_t entriesToEvict = (percentage * size) / 100;
// only invoked from EvictionController so static_cast is always safe
LRUEntriesMap* lruMap = static_cast<LRUEntriesMap*>(m_entries);
LOGINFO("Evicting %d entries. Current entry count is %d", entriesToEvict,
size);
lruMap->processLRU(entriesToEvict);
}
}
void LocalRegion::invokeAfterAllEndPointDisconnected() {
if (m_listener != nullptr) {
int64_t sampleStartNanos = startStatOpTime();
try {
m_listener->afterRegionDisconnected(*this);
} catch (const Exception& ex) {
LOGERROR("Exception in CacheListener::afterRegionDisconnected: %s: %s",
ex.getName().c_str(), ex.what());
} catch (...) {
LOGERROR("Unknown exception in CacheListener::afterRegionDisconnected");
}
updateStatOpTime(m_regionStats->getStat(),
m_regionStats->getListenerCallTimeId(), sampleStartNanos);
m_regionStats->incListenerCallsCompleted();
}
}
GfErrType LocalRegion::getNoThrow_FullObject(std::shared_ptr<EventId>,
std::shared_ptr<Cacheable>&,
std::shared_ptr<VersionTag>&) {
return GF_NOERR;
}
std::shared_ptr<Cacheable> LocalRegion::handleReplay(
GfErrType& err, std::shared_ptr<Cacheable> value) const {
if (err == GF_TRANSACTION_DATA_REBALANCED_EXCEPTION ||
err == GF_TRANSACTION_DATA_NODE_HAS_DEPARTED_EXCEPTION) {
bool isRollBack = (err == GF_TRANSACTION_DATA_REBALANCED_EXCEPTION);
auto txState = getTXState();
if (!txState) {
GfErrTypeThrowException("TXState is nullptr",
GF_CACHE_ILLEGAL_STATE_EXCEPTION);
throw ""; // never reached
}
auto ret = txState->replay(isRollBack);
err = GF_NOERR;
return ret;
}
return value;
}
std::shared_ptr<TombstoneList> LocalRegion::getTombstoneList() {
return m_tombstoneList;
}
int64_t LocalRegion::startStatOpTime() {
return m_enableTimeStatistics ? Utils::startStatOpTime() : 0;
}
void LocalRegion::updateStatOpTime(Statistics* statistics, int32_t statId,
int64_t start) {
if (m_enableTimeStatistics) {
Utils::updateStatOpTime(statistics, statId, start);
}
}
void LocalRegion::acquireGlobals(bool) {}
void LocalRegion::releaseGlobals(bool) {}
} // namespace client
} // namespace geode
} // namespace apache