blob: 57b582c73b805a3e689e4bbc02458658febb3028 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "LocalRegion.hpp"
#include <gfcpp/Log.hpp>
#include <gfcpp/SystemProperties.hpp>
#include "CacheImpl.hpp"
#include "CacheRegionHelper.hpp"
#include "CacheableToken.hpp"
#include "NanoTimer.hpp"
#include "Utils.hpp"
#include "EntryExpiryHandler.hpp"
#include "RegionExpiryHandler.hpp"
#include "ExpiryTaskManager.hpp"
#include "LRUEntriesMap.hpp"
#include "RegionGlobalLocks.hpp"
#include "TXState.hpp"
#include "VersionTag.hpp"
#include <vector>
#include <gfcpp/PoolManager.hpp>
using namespace apache::geode::client;
LocalRegion::LocalRegion(const std::string& name, CacheImpl* cache,
RegionInternal* rPtr,
const RegionAttributesPtr& attributes,
const CacheStatisticsPtr& stats, bool shared)
: RegionInternal(attributes),
m_name(name),
m_parentRegion(rPtr),
m_cacheImpl(cache),
m_destroyPending(false),
m_listener(NULLPTR),
m_writer(NULLPTR),
m_loader(NULLPTR),
m_released(false),
m_entries(NULL),
m_cacheStatistics(stats),
m_transactionEnabled(false),
m_isPRSingleHopEnabled(false),
m_attachedPool(NULLPTR),
m_persistenceManager(NULLPTR) {
if (m_parentRegion != NULLPTR) {
((m_fullPath = m_parentRegion->getFullPath()) += "/") += m_name;
} else {
(m_fullPath = "/") += m_name;
}
// create entries map based on RegionAttributes...
if (attributes->getCachingEnabled()) {
m_entries = EntriesMapFactory::createMap(this, m_regionAttributes);
}
// Initialize callbacks
CacheListenerPtr clptr;
CacheWriterPtr cwptr;
clptr = m_regionAttributes->getCacheListener();
m_listener = clptr;
cwptr = m_regionAttributes->getCacheWriter();
m_writer = cwptr;
CacheLoaderPtr cldptr;
cldptr = m_regionAttributes->getCacheLoader();
m_loader = cldptr;
if (m_parentRegion != NULLPTR) {
((m_fullPath = m_parentRegion->getFullPath()) += "/") += m_name;
} else {
(m_fullPath = "/") += m_name;
}
m_regionStats = new RegionStats(m_fullPath.c_str());
PoolPtr p = PoolManager::find(getAttributes()->getPoolName());
// m_attachedPool = p;
setPool(p);
}
const char* LocalRegion::getName() const { return m_name.c_str(); }
const char* LocalRegion::getFullPath() const { return m_fullPath.c_str(); }
RegionPtr LocalRegion::getParentRegion() const {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::getParentRegion);
return m_parentRegion;
}
void LocalRegion::updateAccessAndModifiedTime(bool modified) {
// locking not required since setters use atomic operations
if (regionExpiryEnabled()) {
time_t currTime = ACE_OS::gettimeofday().sec();
LOGDEBUG("Setting last accessed time for region %s to %d", getFullPath(),
currTime);
m_cacheStatistics->setLastAccessedTime(static_cast<uint32_t>(currTime));
if (modified) {
LOGDEBUG("Setting last modified time for region %s to %d", getFullPath(),
currTime);
m_cacheStatistics->setLastModifiedTime(static_cast<uint32_t>(currTime));
}
// TODO: should we really touch the parent region??
RegionInternal* ri = dynamic_cast<RegionInternal*>(m_parentRegion.ptr());
if (ri != NULL) {
ri->updateAccessAndModifiedTime(modified);
}
}
}
CacheStatisticsPtr LocalRegion::getStatistics() const {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::getStatistics);
bool m_statisticsEnabled = true;
SystemProperties* props =
m_cacheImpl->getCache()->getDistributedSystem()->getSystemProperties();
if (props) {
m_statisticsEnabled = props->statisticsEnabled();
}
if (!m_statisticsEnabled) {
throw StatisticsDisabledException(
"LocalRegion::getStatistics statistics disabled for this region");
}
return m_cacheStatistics;
}
void LocalRegion::invalidateRegion(const UserDataPtr& aCallbackArgument) {
GfErrType err =
invalidateRegionNoThrow(aCallbackArgument, CacheEventFlags::NORMAL);
GfErrTypeToException("Region::invalidateRegion", err);
}
void LocalRegion::localInvalidateRegion(const UserDataPtr& aCallbackArgument) {
GfErrType err =
invalidateRegionNoThrow(aCallbackArgument, CacheEventFlags::LOCAL);
GfErrTypeToException("Region::localInvalidateRegion", err);
}
void LocalRegion::destroyRegion(const UserDataPtr& aCallbackArgument) {
GfErrType err =
destroyRegionNoThrow(aCallbackArgument, true, CacheEventFlags::NORMAL);
GfErrTypeToException("Region::destroyRegion", err);
}
void LocalRegion::localDestroyRegion(const UserDataPtr& aCallbackArgument) {
GfErrType err =
destroyRegionNoThrow(aCallbackArgument, true, CacheEventFlags::LOCAL);
GfErrTypeToException("Region::localDestroyRegion", err);
}
void LocalRegion::tombstoneOperationNoThrow(
const CacheableHashMapPtr& tombstoneVersions,
const CacheableHashSetPtr& tombstoneKeys) {
bool cachingEnabled = m_regionAttributes->getCachingEnabled();
if (!cachingEnabled) return;
if (tombstoneVersions.ptr() != NULL) {
std::map<uint16_t, int64_t> gcVersions;
for (HashMapT<CacheableKeyPtr, CacheablePtr>::Iterator itr =
tombstoneVersions->begin();
itr != tombstoneVersions->end(); ++itr) {
try {
DSMemberForVersionStampPtr member =
dynCast<DSMemberForVersionStampPtr>(itr.first());
uint16_t memberId =
getCacheImpl()->getMemberListForVersionStamp()->add(member);
int64_t version = (dynCast<CacheableInt64Ptr>(itr.second()))->value();
gcVersions[memberId] = version;
} catch (const ClassCastException&) {
LOGERROR(
"tombstone_operation contains incorrect gc versions in the "
"message. Region %s",
getFullPath());
continue;
}
}
m_entries->reapTombstones(gcVersions);
} else {
m_entries->reapTombstones(tombstoneKeys);
}
}
RegionPtr LocalRegion::getSubregion(const char* path) {
if (path == NULL) {
throw IllegalArgumentException("LocalRegion::getSubregion: path is null");
}
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::getSubregion);
std::string pathstr(path);
std::string slash("/");
if ((pathstr == slash) || (pathstr.length() < 1)) {
LOGERROR("Get subregion path [%s] is not valid.", pathstr.c_str());
throw IllegalArgumentException("Get subegion path is null or a /");
}
std::string fullname = pathstr;
if (fullname.substr(0, 1) == slash) {
fullname = pathstr.substr(1);
}
// find second separator
size_t idx = fullname.find('/');
std::string stepname = fullname.substr(0, idx);
RegionPtr region, rptr;
if (0 == m_subRegions.find(stepname, region)) {
if (stepname == fullname) {
// done...
rptr = region;
} else {
std::string remainder = fullname.substr(stepname.length() + 1);
rptr = region->getSubregion(remainder.c_str());
}
}
return rptr;
}
RegionPtr LocalRegion::createSubregion(
const char* subregionName, const RegionAttributesPtr& aRegionAttributes) {
CHECK_DESTROY_PENDING(TryWriteGuard, LocalRegion::createSubregion);
{
std::string namestr = subregionName;
if (namestr.find('/') != std::string::npos) {
throw IllegalArgumentException(
"Malformed name string, contains region path seperator '/'");
}
}
MapOfRegionGuard guard1(m_subRegions.mutex());
RegionPtr region_ptr;
if (0 == m_subRegions.find(subregionName, region_ptr)) {
throw RegionExistsException(
"LocalRegion::createSubregion: named region exists in the region");
}
CacheStatisticsPtr csptr(new CacheStatistics);
RegionInternal* rPtr = m_cacheImpl->createRegion_internal(
subregionName, this, aRegionAttributes, csptr, false);
region_ptr = rPtr;
if (!rPtr) {
throw OutOfMemoryException("createSubregion: failed to create region");
}
// Instantiate a PersistenceManager object if DiskPolicy is overflow
if (aRegionAttributes->getDiskPolicy() == DiskPolicyType::OVERFLOWS) {
PersistenceManagerPtr pmPtr = aRegionAttributes->getPersistenceManager();
if (pmPtr == NULLPTR) {
throw NullPointerException(
"PersistenceManager could not be instantiated");
}
PropertiesPtr props = aRegionAttributes->getPersistenceProperties();
pmPtr->init(RegionPtr(rPtr), props);
rPtr->setPersistenceManager(pmPtr);
}
rPtr->acquireReadLock();
m_subRegions.bind(rPtr->getName(), RegionPtr(rPtr));
// schedule the sub region expiry if regionExpiry enabled.
rPtr->setRegionExpiryTask();
rPtr->releaseReadLock();
return region_ptr;
}
void LocalRegion::subregions(const bool recursive, VectorOfRegion& sr) {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::subregions);
sr.clear();
if (m_subRegions.current_size() == 0) return;
subregions_internal(recursive, sr);
}
RegionEntryPtr LocalRegion::getEntry(const CacheableKeyPtr& key) {
if (getTXState() != NULL) {
GfErrTypeThrowException("GetEntry is not supported in transaction",
GF_NOTSUP);
}
RegionEntryPtr rptr;
CacheablePtr valuePtr;
getEntry(key, valuePtr);
if (valuePtr != NULLPTR) {
rptr = createRegionEntry(key, valuePtr);
}
return rptr;
}
void LocalRegion::getEntry(const CacheableKeyPtr& key, CacheablePtr& valuePtr) {
if (key == NULLPTR) {
throw IllegalArgumentException("LocalRegion::getEntry: null key");
}
MapEntryImplPtr mePtr;
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::getEntry);
if (m_regionAttributes->getCachingEnabled()) {
m_entries->getEntry(key, mePtr, valuePtr);
}
}
CacheablePtr LocalRegion::get(const CacheableKeyPtr& key,
const UserDataPtr& aCallbackArgument) {
CacheablePtr rptr;
int64 sampleStartNanos = Utils::startStatOpTime();
GfErrType err = getNoThrow(key, rptr, aCallbackArgument);
Utils::updateStatOpTime(m_regionStats->getStat(),
RegionStatType::getInstance()->getGetTimeId(),
sampleStartNanos);
// rptr = handleReplay(err, rptr);
GfErrTypeToException("Region::get", err);
return rptr;
}
void LocalRegion::put(const CacheableKeyPtr& key, const CacheablePtr& value,
const UserDataPtr& aCallbackArgument) {
CacheablePtr oldValue;
int64 sampleStartNanos = Utils::startStatOpTime();
VersionTagPtr versionTag;
GfErrType err = putNoThrow(key, value, aCallbackArgument, oldValue, -1,
CacheEventFlags::NORMAL, versionTag);
Utils::updateStatOpTime(m_regionStats->getStat(),
RegionStatType::getInstance()->getPutTimeId(),
sampleStartNanos);
// handleReplay(err, NULLPTR);
GfErrTypeToException("Region::put", err);
}
void LocalRegion::localPut(const CacheableKeyPtr& key,
const CacheablePtr& value,
const UserDataPtr& aCallbackArgument) {
CacheablePtr oldValue;
VersionTagPtr versionTag;
GfErrType err = putNoThrow(key, value, aCallbackArgument, oldValue, -1,
CacheEventFlags::LOCAL, versionTag);
GfErrTypeToException("Region::localPut", err);
}
void LocalRegion::putAll(const HashMapOfCacheable& map, uint32_t timeout,
const UserDataPtr& aCallbackArgument) {
if ((timeout * 1000) >= 0x7fffffff) {
throw IllegalArgumentException(
"Region::putAll: timeout parameter "
"greater than maximum allowed (2^31/1000 i.e 2147483).");
}
int64 sampleStartNanos = Utils::startStatOpTime();
GfErrType err = putAllNoThrow(map, timeout, aCallbackArgument);
Utils::updateStatOpTime(m_regionStats->getStat(),
RegionStatType::getInstance()->getPutAllTimeId(),
sampleStartNanos);
// handleReplay(err, NULLPTR);
GfErrTypeToException("Region::putAll", err);
}
void LocalRegion::removeAll(const VectorOfCacheableKey& keys,
const UserDataPtr& aCallbackArgument) {
if (keys.size() == 0) {
throw IllegalArgumentException("Region::removeAll: zero keys provided");
}
int64 sampleStartNanos = Utils::startStatOpTime();
GfErrType err = removeAllNoThrow(keys, aCallbackArgument);
Utils::updateStatOpTime(m_regionStats->getStat(),
RegionStatType::getInstance()->getRemoveAllTimeId(),
sampleStartNanos);
GfErrTypeToException("Region::removeAll", err);
}
void LocalRegion::create(const CacheableKeyPtr& key, const CacheablePtr& value,
const UserDataPtr& aCallbackArgument) {
VersionTagPtr versionTag;
GfErrType err = createNoThrow(key, value, aCallbackArgument, -1,
CacheEventFlags::NORMAL, versionTag);
// handleReplay(err, NULLPTR);
GfErrTypeToException("Region::create", err);
}
void LocalRegion::localCreate(const CacheableKeyPtr& key,
const CacheablePtr& value,
const UserDataPtr& aCallbackArgument) {
VersionTagPtr versionTag;
GfErrType err = createNoThrow(key, value, aCallbackArgument, -1,
CacheEventFlags::LOCAL, versionTag);
GfErrTypeToException("Region::localCreate", err);
}
void LocalRegion::invalidate(const CacheableKeyPtr& key,
const UserDataPtr& aCallbackArgument) {
VersionTagPtr versionTag;
GfErrType err = invalidateNoThrow(key, aCallbackArgument, -1,
CacheEventFlags::NORMAL, versionTag);
// handleReplay(err, NULLPTR);
GfErrTypeToException("Region::invalidate", err);
}
void LocalRegion::localInvalidate(const CacheableKeyPtr& keyPtr,
const UserDataPtr& aCallbackArgument) {
VersionTagPtr versionTag;
GfErrType err = invalidateNoThrow(keyPtr, aCallbackArgument, -1,
CacheEventFlags::LOCAL, versionTag);
GfErrTypeToException("Region::localInvalidate", err);
}
void LocalRegion::destroy(const CacheableKeyPtr& key,
const UserDataPtr& aCallbackArgument) {
VersionTagPtr versionTag;
GfErrType err = destroyNoThrow(key, aCallbackArgument, -1,
CacheEventFlags::NORMAL, versionTag);
// handleReplay(err, NULLPTR);
GfErrTypeToException("Region::destroy", err);
}
void LocalRegion::localDestroy(const CacheableKeyPtr& key,
const UserDataPtr& aCallbackArgument) {
VersionTagPtr versionTag;
GfErrType err = destroyNoThrow(key, aCallbackArgument, -1,
CacheEventFlags::LOCAL, versionTag);
GfErrTypeToException("Region::localDestroy", err);
}
bool LocalRegion::remove(const CacheableKeyPtr& key, const CacheablePtr& value,
const UserDataPtr& aCallbackArgument) {
VersionTagPtr versionTag;
GfErrType err = removeNoThrow(key, value, aCallbackArgument, -1,
CacheEventFlags::NORMAL, versionTag);
bool result = false;
if (err == GF_NOERR) {
result = true;
} else if (err != GF_ENOENT && err != GF_CACHE_ENTRY_NOT_FOUND) {
GfErrTypeToException("Region::remove", err);
}
return result;
}
bool LocalRegion::removeEx(const CacheableKeyPtr& key,
const UserDataPtr& aCallbackArgument) {
VersionTagPtr versionTag;
GfErrType err = removeNoThrowEx(key, aCallbackArgument, -1,
CacheEventFlags::NORMAL, versionTag);
bool result = false;
if (err == GF_NOERR) {
result = true;
} else if (err != GF_ENOENT && err != GF_CACHE_ENTRY_NOT_FOUND) {
GfErrTypeToException("Region::removeEx", err);
}
return result;
}
bool LocalRegion::localRemove(const CacheableKeyPtr& key,
const CacheablePtr& value,
const UserDataPtr& aCallbackArgument) {
VersionTagPtr versionTag;
GfErrType err = removeNoThrow(key, value, aCallbackArgument, -1,
CacheEventFlags::LOCAL, versionTag);
bool result = false;
if (err == GF_NOERR) {
result = true;
} else if (err != GF_ENOENT && err != GF_CACHE_ENTRY_NOT_FOUND) {
GfErrTypeToException("Region::localRemove", err);
}
return result;
}
bool LocalRegion::localRemoveEx(const CacheableKeyPtr& key,
const UserDataPtr& aCallbackArgument) {
VersionTagPtr versionTag;
GfErrType err = removeNoThrowEx(key, aCallbackArgument, -1,
CacheEventFlags::LOCAL, versionTag);
bool result = false;
if (err == GF_NOERR) {
result = true;
} else if (err != GF_ENOENT && err != GF_CACHE_ENTRY_NOT_FOUND) {
GfErrTypeToException("Region::localRemoveEx", err);
}
return result;
}
void LocalRegion::keys(VectorOfCacheableKey& v) {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::keys);
keys_internal(v);
}
void LocalRegion::serverKeys(VectorOfCacheableKey& v) {
throw UnsupportedOperationException(
"serverKeys is not supported for local regions.");
}
void LocalRegion::values(VectorOfCacheable& vc) {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::values);
if (!m_regionAttributes->getCachingEnabled()) {
return;
}
uint32_t size = m_entries->size();
vc.clear();
if (size == 0) return;
m_entries->values(vc);
// invalidToken should not be added by the MapSegments.
}
void LocalRegion::entries(VectorOfRegionEntry& me, bool recursive) {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::entries);
me.clear();
if (!m_regionAttributes->getCachingEnabled()) {
return;
}
entries_internal(me, recursive);
}
void LocalRegion::getAll(const VectorOfCacheableKey& keys,
HashMapOfCacheablePtr values,
HashMapOfExceptionPtr exceptions, bool addToLocalCache,
const UserDataPtr& aCallbackArgument) {
if (keys.size() == 0) {
throw IllegalArgumentException("Region::getAll: zero keys provided");
}
// check for the combination which will result in no action
if (values == NULLPTR &&
!(addToLocalCache && m_regionAttributes->getCachingEnabled())) {
throw IllegalArgumentException(
"Region::getAll: either output \"values\""
" parameter should be non-null, or \"addToLocalCache\" should be true "
"and caching should be enabled for the region [%s]",
getFullPath());
}
int64 sampleStartNanos = Utils::startStatOpTime();
GfErrType err = getAllNoThrow(keys, values, exceptions, addToLocalCache,
aCallbackArgument);
Utils::updateStatOpTime(m_regionStats->getStat(),
RegionStatType::getInstance()->getGetAllTimeId(),
sampleStartNanos);
// handleReplay(err, NULLPTR);
GfErrTypeToException("Region::getAll", err);
}
uint32 LocalRegion::size_remote() {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::size);
if (m_regionAttributes->getCachingEnabled()) {
return m_entries->size();
}
return 0;
}
uint32_t LocalRegion::size() {
TXState* txState = getTXState();
if (txState != NULL) {
if (isLocalOp()) {
return GF_NOTSUP;
}
return size_remote();
}
return LocalRegion::size_remote();
}
RegionServicePtr LocalRegion::getRegionService() const {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::getRegionService);
return RegionServicePtr(m_cacheImpl->getCache());
}
CacheImpl* LocalRegion::getCacheImpl() {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::getCache);
return m_cacheImpl;
}
bool LocalRegion::containsValueForKey_remote(
const CacheableKeyPtr& keyPtr) const {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::containsValueForKey);
if (!m_regionAttributes->getCachingEnabled()) {
return false;
}
CacheablePtr valuePtr;
MapEntryImplPtr mePtr;
m_entries->getEntry(keyPtr, mePtr, valuePtr);
if (mePtr == NULLPTR) {
return false;
}
return (valuePtr != NULLPTR && !CacheableToken::isInvalid(valuePtr));
}
bool LocalRegion::containsValueForKey(const CacheableKeyPtr& keyPtr) const {
if (keyPtr == NULLPTR) {
throw IllegalArgumentException(
"LocalRegion::containsValueForKey: "
"key is null");
}
TXState* txState = getTXState();
if (txState == NULL) {
return LocalRegion::containsValueForKey_remote(keyPtr);
}
return containsValueForKey_remote(keyPtr);
}
bool LocalRegion::containsKeyOnServer(const CacheableKeyPtr& keyPtr) const {
throw UnsupportedOperationException(
"LocalRegion::containsKeyOnServer: is not supported.");
}
void LocalRegion::getInterestList(VectorOfCacheableKey& vlist) const {
throw UnsupportedOperationException(
"LocalRegion::getInterestList: is not supported.");
}
void LocalRegion::getInterestListRegex(VectorOfCacheableString& vregex) const {
throw UnsupportedOperationException(
"LocalRegion::getInterestListRegex: is not supported.");
}
bool LocalRegion::containsKey(const CacheableKeyPtr& keyPtr) const {
if (keyPtr == NULLPTR) {
throw IllegalArgumentException(
"LocalRegion::containsKey: "
"key is null");
}
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::containsKey);
return containsKey_internal(keyPtr);
}
void LocalRegion::setPersistenceManager(PersistenceManagerPtr& pmPtr) {
m_persistenceManager = pmPtr;
// set the memberVariable of LRUEntriesMap too.
LRUEntriesMap* lruMap = dynamic_cast<LRUEntriesMap*>(m_entries);
if (lruMap != NULL) {
lruMap->setPersistenceManager(pmPtr);
}
}
void LocalRegion::setRegionExpiryTask() {
if (regionExpiryEnabled()) {
RegionInternalPtr rptr(this);
uint32_t duration = getRegionExpiryDuration();
RegionExpiryHandler* handler =
new RegionExpiryHandler(rptr, getRegionExpiryAction(), duration);
long expiryTaskId =
CacheImpl::expiryTaskManager->scheduleExpiryTask(handler, duration, 0);
handler->setExpiryTaskId(expiryTaskId);
LOGFINE(
"expiry for region [%s], expiry task id = %d, duration = %d, "
"action = %d",
m_fullPath.c_str(), expiryTaskId, duration, getRegionExpiryAction());
}
}
void LocalRegion::registerEntryExpiryTask(MapEntryImplPtr& entry) {
// locking is not required here since only the thread that creates
// the entry will register the expiry task for that entry
ExpEntryProperties& expProps = entry->getExpProperties();
expProps.initStartTime();
RegionInternalPtr rptr(this);
uint32_t duration = getEntryExpiryDuration();
EntryExpiryHandler* handler =
new EntryExpiryHandler(rptr, entry, getEntryExpirationAction(), duration);
long id =
CacheImpl::expiryTaskManager->scheduleExpiryTask(handler, duration, 0);
if (Log::finestEnabled()) {
CacheableKeyPtr key;
entry->getKeyI(key);
LOGFINEST(
"entry expiry in region [%s], key [%s], task id = %d, "
"duration = %d, action = %d",
m_fullPath.c_str(), Utils::getCacheableKeyString(key)->asChar(), id,
duration, getEntryExpirationAction());
}
expProps.setExpiryTaskId(id);
}
LocalRegion::~LocalRegion() {
TryWriteGuard guard(m_rwLock, m_destroyPending);
if (!m_destroyPending) {
release(false);
}
m_listener = NULLPTR;
m_writer = NULLPTR;
m_loader = NULLPTR;
GF_SAFE_DELETE(m_entries);
GF_SAFE_DELETE(m_regionStats);
}
/**
* Release the region resources if not released already.
*/
void LocalRegion::release(bool invokeCallbacks) {
if (m_released) {
return;
}
LOGFINE("LocalRegion::release entered for region %s", m_fullPath.c_str());
m_released = true;
if (m_regionStats != NULL) {
m_regionStats->close();
}
if (invokeCallbacks) {
try {
if (m_loader != NULLPTR) {
m_loader->close(RegionPtr(this));
}
if (m_writer != NULLPTR) {
m_writer->close(RegionPtr(this));
}
// TODO: shouldn't listener also be here instead of
// during CacheImpl.close()
} catch (...) {
LOGWARN(
"Region close caught unknown exception in loader/writer "
"close; continuing");
}
}
if (m_persistenceManager != NULLPTR) {
m_persistenceManager->close();
m_persistenceManager = NULLPTR;
}
if (m_entries != NULL && m_regionAttributes->getCachingEnabled()) {
m_entries->close();
}
LOGFINE("LocalRegion::release done for region %s", m_fullPath.c_str());
}
/** Returns whether the specified key currently exists in this region.
* This method is equivalent to <code>getEntry(key) != null</code>.
*
* @param keyPtr the key to check for an existing entry, type is CacheableString
*&
* @return true if there is an entry in this region for the specified key
*@throw RegionDestroyedException, if region is destroyed.
*@throw IllegalArgumentException, if the key is 'null'.
*@throw NotConnectedException, if not connected to geode system.
*/
bool LocalRegion::containsKey_internal(const CacheableKeyPtr& keyPtr) const {
if (keyPtr == NULLPTR) {
throw IllegalArgumentException("Region::containsKey: key is null");
}
if (!m_regionAttributes->getCachingEnabled()) {
return false;
}
return m_entries->containsKey(keyPtr);
}
void LocalRegion::subregions_internal(const bool recursive,
VectorOfRegion& sr) {
MapOfRegionGuard guard(m_subRegions.mutex());
if (m_subRegions.current_size() == 0) return;
VectorOfRegion subRegions;
for (MapOfRegionWithLock::iterator p = m_subRegions.begin();
p != m_subRegions.end(); ++p) {
sr.push_back((*p).int_id_);
// seperate list so children can be descended.
if (recursive) {
subRegions.push_back((*p).int_id_);
}
}
if (recursive == true) {
// decend...
for (int32_t i = 0; i < subRegions.size(); i++) {
dynamic_cast<LocalRegion*>(subRegions.at(i).ptr())
->subregions_internal(true, sr);
}
}
}
GfErrType LocalRegion::getNoThrow(const CacheableKeyPtr& keyPtr,
CacheablePtr& value,
const UserDataPtr& aCallbackArgument) {
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
GfErrType err = GF_NOERR;
if (keyPtr == NULLPTR) {
return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION;
}
TXState* txState = getTXState();
if (txState != NULL) {
if (isLocalOp()) {
return GF_NOTSUP;
}
VersionTagPtr versionTag;
err = getNoThrow_remote(keyPtr, value, aCallbackArgument, versionTag);
if (err == GF_NOERR) {
txState->setDirty();
}
if (CacheableToken::isInvalid(value) ||
CacheableToken::isTombstone(value)) {
value = NULLPTR;
}
return err;
}
m_regionStats->incGets();
m_cacheImpl->m_cacheStats->incGets();
// TODO: CacheableToken::isInvalid should be completely hidden
// inside MapSegment; this should be done both for the value obtained
// from local cache as well as oldValue in every instance
MapEntryImplPtr me;
int updateCount = -1;
bool isLoaderInvoked = false;
bool isLocal = false;
bool cachingEnabled = m_regionAttributes->getCachingEnabled();
CacheablePtr localValue = NULLPTR;
if (cachingEnabled) {
isLocal = m_entries->get(keyPtr, value, me);
if (isLocal && (value != NULLPTR && !CacheableToken::isInvalid(value))) {
m_regionStats->incHits();
m_cacheImpl->m_cacheStats->incHits();
updateAccessAndModifiedTimeForEntry(me, false);
updateAccessAndModifiedTime(false);
return err; // found it in local cache...
}
localValue = value;
value = NULLPTR;
// start tracking the entry
if (!m_regionAttributes->getConcurrencyChecksEnabled()) {
updateCount =
m_entries->addTrackerForEntry(keyPtr, value, true, false, false);
LOGDEBUG(
"Region::get: added tracking with update counter [%d] for key "
"[%s] with value [%s]",
updateCount, Utils::getCacheableKeyString(keyPtr)->asChar(),
Utils::getCacheableString(value)->asChar());
}
}
// remove tracking for the entry before exiting the function
struct RemoveTracking {
private:
const CacheableKeyPtr& m_key;
const int& m_updateCount;
LocalRegion& m_region;
public:
RemoveTracking(const CacheableKeyPtr& key, const int& updateCount,
LocalRegion& region)
: m_key(key), m_updateCount(updateCount), m_region(region) {}
~RemoveTracking() {
if (m_updateCount >= 0 &&
!m_region.getAttributes()->getConcurrencyChecksEnabled()) {
m_region.m_entries->removeTrackerForEntry(m_key);
}
}
} _removeTracking(keyPtr, updateCount, *this);
// The control will come here only when caching is disabled or/and
// the entry was not found. In this case atleast update the region
// access times.
updateAccessAndModifiedTime(false);
m_regionStats->incMisses();
m_cacheImpl->m_cacheStats->incMisses();
VersionTagPtr versionTag;
// Get from some remote source (e.g. external java server) if required.
err = getNoThrow_remote(keyPtr, value, aCallbackArgument, versionTag);
// Its a cache missor it is invalid token then Check if we have a local
// loader.
if ((value == NULLPTR || CacheableToken::isInvalid(value) ||
CacheableToken::isTombstone(value)) &&
m_loader != NULLPTR) {
try {
isLoaderInvoked = true;
/*Update the statistics*/
int64 sampleStartNanos = Utils::startStatOpTime();
value = m_loader->load(RegionPtr(this), keyPtr, aCallbackArgument);
Utils::updateStatOpTime(
m_regionStats->getStat(),
RegionStatType::getInstance()->getLoaderCallTimeId(),
sampleStartNanos);
m_regionStats->incLoaderCallsCompleted();
} catch (const Exception& ex) {
LOGERROR("Error in CacheLoader::load: %s: %s", ex.getName(),
ex.getMessage());
err = GF_CACHE_LOADER_EXCEPTION;
} catch (...) {
LOGERROR("Error in CacheLoader::load, unknown");
err = GF_CACHE_LOADER_EXCEPTION;
}
if (err != GF_NOERR) {
return err;
}
}
CacheablePtr oldValue;
// Found it somehow, so store it.
if (value != NULLPTR /*&& value != CacheableToken::invalid( )*/ &&
cachingEnabled &&
!(CacheableToken::isTombstone(value) &&
(localValue == NULLPTR || CacheableToken::isInvalid(localValue)))) {
// try to create the entry and if that returns an existing value
// (e.g. from another thread or notification) then return that
LOGDEBUG(
"Region::get: creating entry with tracking update counter [%d] for key "
"[%s]",
updateCount, Utils::getCacheableKeyString(keyPtr)->asChar());
if ((err = putLocal("Region::get", false, keyPtr, value, oldValue,
cachingEnabled, updateCount, 0, versionTag)) !=
GF_NOERR) {
if (err == GF_CACHE_CONCURRENT_MODIFICATION_EXCEPTION) {
LOGDEBUG(
"Region::get: putLocal for key [%s] failed because the cache already contains \
an entry with higher version.",
Utils::getCacheableKeyString(keyPtr)->asChar());
if (CacheableToken::isInvalid(value) ||
CacheableToken::isTombstone(value)) {
value = NULLPTR;
}
// don't do anything and exit
return GF_NOERR;
}
LOGDEBUG("Region::get: putLocal for key [%s] failed with error %d",
Utils::getCacheableKeyString(keyPtr)->asChar(), err);
err = GF_NOERR;
if (oldValue != NULLPTR && !CacheableToken::isInvalid(oldValue)) {
LOGDEBUG("Region::get: returning updated value [%s] for key [%s]",
Utils::getCacheableString(oldValue)->asChar(),
Utils::getCacheableKeyString(keyPtr)->asChar());
value = oldValue;
}
}
// signal no explicit removal of tracking to the RemoveTracking object
updateCount = -1;
}
if (CacheableToken::isInvalid(value) || CacheableToken::isTombstone(value)) {
value = NULLPTR;
}
// invokeCacheListenerForEntryEvent method has the check that if oldValue
// is a CacheableToken then it sets it to NULL; also determines if it
// should be AFTER_UPDATE or AFTER_CREATE depending on oldValue, so don't
// check here.
if (isLoaderInvoked == false && err == GF_NOERR && value != NULLPTR) {
err = invokeCacheListenerForEntryEvent(
keyPtr, oldValue, value, aCallbackArgument, CacheEventFlags::NORMAL,
AFTER_UPDATE, isLocal);
}
return err;
}
GfErrType LocalRegion::getAllNoThrow(const VectorOfCacheableKey& keys,
const HashMapOfCacheablePtr& values,
const HashMapOfExceptionPtr& exceptions,
bool addToLocalCache,
const UserDataPtr& aCallbackArgument) {
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
GfErrType err = GF_NOERR;
CacheablePtr value;
TXState* txState = getTXState();
if (txState != NULL) {
if (isLocalOp()) {
return GF_NOTSUP;
}
// if(!txState->isReplay())
// {
// VectorOfCacheablePtr args(new VectorOfCacheable());
// args->push_back(VectorOfCacheableKeyPtr(new
// VectorOfCacheableKey(keys)));
// args->push_back(values);
// args->push_back(exceptions);
// args->push_back(CacheableBoolean::create(addToLocalCache));
// txState->recordTXOperation(GF_GET_ALL, getFullPath(),
// NULLPTR,
// args);
// }
err = getAllNoThrow_remote(&keys, values, exceptions, NULLPTR, false,
aCallbackArgument);
if (err == GF_NOERR) {
txState->setDirty();
}
return err;
}
// keys not in cache with their tracking numbers to be gotten using
// a remote call
VectorOfCacheableKey serverKeys;
bool cachingEnabled = m_regionAttributes->getCachingEnabled();
bool regionAccessed = false;
for (int32_t index = 0; index < keys.size(); ++index) {
const CacheableKeyPtr& key = keys[index];
MapEntryImplPtr me;
value = NULLPTR;
m_regionStats->incGets();
m_cacheImpl->m_cacheStats->incGets();
if (values != NULLPTR && cachingEnabled) {
if (m_entries->get(key, value, me) && value != NULLPTR &&
!CacheableToken::isInvalid(value)) {
m_regionStats->incHits();
m_cacheImpl->m_cacheStats->incHits();
updateAccessAndModifiedTimeForEntry(me, false);
regionAccessed = true;
values->insert(key, value);
} else {
value = NULLPTR;
}
}
if (value == NULLPTR) {
// Add to missed keys list.
serverKeys.push_back(key);
m_regionStats->incMisses();
m_cacheImpl->m_cacheStats->incMisses();
}
// TODO: No support for loaders in getAll for now.
}
if (regionAccessed) {
updateAccessAndModifiedTime(false);
}
if (serverKeys.size() > 0) {
err = getAllNoThrow_remote(&serverKeys, values, exceptions, NULLPTR,
addToLocalCache, aCallbackArgument);
}
m_regionStats->incGetAll();
return err;
}
namespace apache {
namespace geode {
namespace client {
// encapsulates actions that need to be taken for a put() operation
class PutActions {
public:
static const EntryEventType s_beforeEventType = BEFORE_UPDATE;
static const EntryEventType s_afterEventType = AFTER_UPDATE;
static const bool s_addIfAbsent = true;
static const bool s_failIfPresent = false;
TXState* m_txState;
inline explicit PutActions(LocalRegion& region) : m_region(region) {
m_txState = TSSTXStateWrapper::s_geodeTSSTXState->getTXState();
}
inline static const char* name() { return "Region::put"; }
inline static GfErrType checkArgs(const CacheableKeyPtr& key,
const CacheablePtr& value,
DataInput* delta = NULL) {
if (key == NULLPTR || (value == NULLPTR && delta == NULL)) {
return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION;
}
return GF_NOERR;
}
inline void getCallbackOldValue(bool cachingEnabled,
const CacheableKeyPtr& key,
MapEntryImplPtr& entry,
CacheablePtr& oldValue) const {
if (cachingEnabled) {
m_region.m_entries->getEntry(key, entry, oldValue);
}
}
inline static void logCacheWriterFailure(const CacheableKeyPtr& key,
const CacheablePtr& oldValue) {
bool isUpdate = (oldValue != NULLPTR);
LOGFINER("Cache writer vetoed %s for key %s",
(isUpdate ? "update" : "create"),
Utils::getCacheableKeyString(key)->asChar());
}
inline GfErrType remoteUpdate(const CacheableKeyPtr& key,
const CacheablePtr& value,
const UserDataPtr& aCallbackArgument,
VersionTagPtr& versionTag) {
// if(m_txState != NULL && !m_txState->isReplay())
// {
// VectorOfCacheablePtr args(new VectorOfCacheable());
// args->push_back(value);
// args->push_back(aCallbackArgument);
// m_txState->recordTXOperation(GF_PUT,
// m_region.getFullPath(), key, args);
// }
// propagate the put to remote server, if any
return m_region.putNoThrow_remote(key, value, aCallbackArgument,
versionTag);
}
inline GfErrType localUpdate(const CacheableKeyPtr& key,
const CacheablePtr& value,
CacheablePtr& oldValue, bool cachingEnabled,
const CacheEventFlags eventFlags,
int updateCount, VersionTagPtr versionTag,
DataInput* delta = NULL,
EventIdPtr eventId = NULLPTR,
bool afterRemote = false) {
return m_region.putLocal(name(), false, key, value, oldValue,
cachingEnabled, updateCount, 0, versionTag, delta,
eventId);
}
private:
LocalRegion& m_region;
};
// encapsulates actions that need to be taken for a put() operation. This
// implementation allows
// null values in Put during transaction. See defect #743
class PutActionsTx : public PutActions {
public:
inline explicit PutActionsTx(LocalRegion& region) : PutActions(region) {}
inline static GfErrType checkArgs(const CacheableKeyPtr& key,
const CacheablePtr& value,
DataInput* delta = NULL) {
if (key == NULLPTR) {
return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION;
}
return GF_NOERR;
}
};
// encapsulates actions that need to be taken for a create() operation
class CreateActions {
public:
static const EntryEventType s_beforeEventType = BEFORE_CREATE;
static const EntryEventType s_afterEventType = AFTER_CREATE;
static const bool s_addIfAbsent = true;
static const bool s_failIfPresent = true;
TXState* m_txState;
inline explicit CreateActions(LocalRegion& region) : m_region(region) {
m_txState = TSSTXStateWrapper::s_geodeTSSTXState->getTXState();
}
inline static const char* name() { return "Region::create"; }
inline static GfErrType checkArgs(const CacheableKeyPtr& key,
const CacheablePtr& value,
DataInput* delta) {
if (key == NULLPTR) {
return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION;
}
return GF_NOERR;
}
inline void getCallbackOldValue(bool cachingEnabled,
const CacheableKeyPtr& key,
MapEntryImplPtr& entry,
CacheablePtr& oldValue) const {}
inline static void logCacheWriterFailure(const CacheableKeyPtr& key,
const CacheablePtr& oldValue) {
LOGFINER("Cache writer vetoed create for key %s",
Utils::getCacheableKeyString(key)->asChar());
}
inline GfErrType remoteUpdate(const CacheableKeyPtr& key,
const CacheablePtr& value,
const UserDataPtr& aCallbackArgument,
VersionTagPtr& versionTag) {
// propagate the create to remote server, if any
// if(m_txState != NULL && !m_txState->isReplay())
// {
// VectorOfCacheablePtr args(new VectorOfCacheable());
// args->push_back(value);
// args->push_back(aCallbackArgument);
// m_txState->recordTXOperation(GF_CREATE,
// m_region.getFullPath(), key, args);
// }
return m_region.createNoThrow_remote(key, value, aCallbackArgument,
versionTag);
}
inline GfErrType localUpdate(const CacheableKeyPtr& key,
const CacheablePtr& value,
CacheablePtr& oldValue, bool cachingEnabled,
const CacheEventFlags eventFlags,
int updateCount, VersionTagPtr versionTag,
DataInput* delta = NULL,
EventIdPtr eventId = NULLPTR,
bool afterRemote = false) {
return m_region.putLocal(name(), true, key, value, oldValue, cachingEnabled,
updateCount, 0, versionTag);
}
private:
LocalRegion& m_region;
};
// encapsulates actions that need to be taken for a destroy() operation
class DestroyActions {
public:
static const EntryEventType s_beforeEventType = BEFORE_DESTROY;
static const EntryEventType s_afterEventType = AFTER_DESTROY;
static const bool s_addIfAbsent = true;
static const bool s_failIfPresent = false;
TXState* m_txState;
inline explicit DestroyActions(LocalRegion& region) : m_region(region) {
m_txState = TSSTXStateWrapper::s_geodeTSSTXState->getTXState();
}
inline static const char* name() { return "Region::destroy"; }
inline static GfErrType checkArgs(const CacheableKeyPtr& key,
const CacheablePtr& value,
DataInput* delta) {
if (key == NULLPTR) {
return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION;
}
return GF_NOERR;
}
inline void getCallbackOldValue(bool cachingEnabled,
const CacheableKeyPtr& key,
MapEntryImplPtr& entry,
CacheablePtr& oldValue) const {
if (cachingEnabled) {
m_region.m_entries->getEntry(key, entry, oldValue);
}
}
inline static void logCacheWriterFailure(const CacheableKeyPtr& key,
const CacheablePtr& oldValue) {
LOGFINER("Cache writer vetoed destroy for key %s",
Utils::getCacheableKeyString(key)->asChar());
}
inline GfErrType remoteUpdate(const CacheableKeyPtr& key,
const CacheablePtr& value,
const UserDataPtr& aCallbackArgument,
VersionTagPtr& versionTag) {
// propagate the destroy to remote server, if any
// if(m_txState != NULL && !m_txState->isReplay())
// {
// VectorOfCacheablePtr args(new VectorOfCacheable());
// args->push_back(aCallbackArgument);
// m_txState->recordTXOperation(GF_DESTROY,
// m_region.getFullPath(), key, args);
// }
return m_region.destroyNoThrow_remote(key, aCallbackArgument, versionTag);
}
inline GfErrType localUpdate(const CacheableKeyPtr& key,
const CacheablePtr& value,
CacheablePtr& oldValue, bool cachingEnabled,
const CacheEventFlags eventFlags,
int updateCount, VersionTagPtr versionTag,
DataInput* delta = NULL,
EventIdPtr eventId = NULLPTR,
bool afterRemote = false) {
if (cachingEnabled) {
MapEntryImplPtr entry;
// for notification invoke the listener even if the key does
// not exist locally
GfErrType err;
LOGDEBUG("Region::destroy: region [%s] destroying key [%s]",
m_region.getFullPath(),
Utils::getCacheableKeyString(key)->asChar());
if ((err = m_region.m_entries->remove(key, oldValue, entry, updateCount,
versionTag, afterRemote)) !=
GF_NOERR) {
if (eventFlags.isNotification()) {
LOGDEBUG(
"Region::destroy: region [%s] destroy key [%s] for "
"notification having value [%s] failed with %d",
m_region.getFullPath(),
Utils::getCacheableKeyString(key)->asChar(),
Utils::getCacheableString(oldValue)->asChar(), err);
err = GF_NOERR;
}
return err;
}
if (oldValue != NULLPTR) {
LOGDEBUG(
"Region::destroy: region [%s] destroyed key [%s] having "
"value [%s]",
m_region.getFullPath(), Utils::getCacheableKeyString(key)->asChar(),
Utils::getCacheableString(oldValue)->asChar());
// any cleanup required for the entry (e.g. removing from LRU list)
if (entry != NULLPTR) {
entry->cleanup(eventFlags);
}
// entry/region expiration
if (!eventFlags.isEvictOrExpire()) {
m_region.updateAccessAndModifiedTime(true);
}
// update the stats
m_region.m_regionStats->setEntries(m_region.m_entries->size());
m_region.m_cacheImpl->m_cacheStats->incEntries(-1);
}
}
// update the stats
m_region.m_regionStats->incDestroys();
m_region.m_cacheImpl->m_cacheStats->incDestroys();
return GF_NOERR;
}
private:
LocalRegion& m_region;
};
// encapsulates actions that need to be taken for a remove() operation
class RemoveActions {
public:
static const EntryEventType s_beforeEventType = BEFORE_DESTROY;
static const EntryEventType s_afterEventType = AFTER_DESTROY;
static const bool s_addIfAbsent = true;
static const bool s_failIfPresent = false;
TXState* m_txState;
bool allowNULLValue;
inline explicit RemoveActions(LocalRegion& region)
: m_region(region), m_ServerResponse(GF_ENOENT) {
m_txState = TSSTXStateWrapper::s_geodeTSSTXState->getTXState();
allowNULLValue = false;
}
inline static const char* name() { return "Region::remove"; }
inline static GfErrType checkArgs(const CacheableKeyPtr& key,
const CacheablePtr& value,
DataInput* delta) {
if (key == NULLPTR) {
return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION;
}
return GF_NOERR;
}
inline void getCallbackOldValue(bool cachingEnabled,
const CacheableKeyPtr& key,
MapEntryImplPtr& entry,
CacheablePtr& oldValue) const {
if (cachingEnabled) {
m_region.m_entries->getEntry(key, entry, oldValue);
}
}
inline static void logCacheWriterFailure(const CacheableKeyPtr& key,
const CacheablePtr& oldValue) {
LOGFINER("Cache writer vetoed remove for key %s",
Utils::getCacheableKeyString(key)->asChar());
}
inline GfErrType remoteUpdate(const CacheableKeyPtr& key,
const CacheablePtr& value,
const UserDataPtr& aCallbackArgument,
VersionTagPtr& versionTag) {
// propagate the remove to remote server, if any
CacheablePtr valuePtr;
GfErrType err = GF_NOERR;
if (!allowNULLValue && m_region.getAttributes()->getCachingEnabled()) {
m_region.getEntry(key, valuePtr);
DataOutput out1;
DataOutput out2;
if (valuePtr != NULLPTR && value != NULLPTR) {
if (valuePtr->classId() != value->classId() ||
valuePtr->typeId() != value->typeId()) {
err = GF_ENOENT;
return err;
}
valuePtr->toData(out1);
value->toData(out2);
if (out1.getBufferLength() != out2.getBufferLength()) {
err = GF_ENOENT;
return err;
}
if (memcmp(out1.getBuffer(), out2.getBuffer(),
out1.getBufferLength()) != 0) {
err = GF_ENOENT;
return err;
}
} else if ((valuePtr == NULLPTR || CacheableToken::isInvalid(valuePtr))) {
// if(m_txState != NULL && !m_txState->isReplay())
// {
// VectorOfCacheablePtr args(new
// VectorOfCacheable());
// args->push_back(value);
// args->push_back(aCallbackArgument);
// m_txState->recordTXOperation(GF_REMOVE,
// m_region.getFullPath(), key, args);
// }
m_ServerResponse = m_region.removeNoThrow_remote(
key, value, aCallbackArgument, versionTag);
return m_ServerResponse;
} else if (valuePtr != NULLPTR && value == NULLPTR) {
err = GF_ENOENT;
return err;
}
}
// if(m_txState != NULL && !m_txState->isReplay())
// {
// VectorOfCacheablePtr args(new VectorOfCacheable());
// args->push_back(value);
// args->push_back(aCallbackArgument);
// m_txState->recordTXOperation(GF_REMOVE,
// m_region.getFullPath(), key, args);
// }
if (allowNULLValue) {
m_ServerResponse =
m_region.removeNoThrowEX_remote(key, aCallbackArgument, versionTag);
} else {
m_ServerResponse = m_region.removeNoThrow_remote(
key, value, aCallbackArgument, versionTag);
}
LOGDEBUG("serverResponse::%d", m_ServerResponse);
return m_ServerResponse;
}
inline GfErrType localUpdate(const CacheableKeyPtr& key,
const CacheablePtr& value,
CacheablePtr& oldValue, bool cachingEnabled,
const CacheEventFlags eventFlags,
int updateCount, VersionTagPtr versionTag,
DataInput* delta = NULL,
EventIdPtr eventId = NULLPTR,
bool afterRemote = false) {
CacheablePtr valuePtr;
GfErrType err = GF_NOERR;
if (!allowNULLValue && cachingEnabled) {
m_region.getEntry(key, valuePtr);
DataOutput out1;
DataOutput out2;
if (valuePtr != NULLPTR && value != NULLPTR) {
if (valuePtr->classId() != value->classId() ||
valuePtr->typeId() != value->typeId()) {
err = GF_ENOENT;
return err;
}
valuePtr->toData(out1);
value->toData(out2);
if (out1.getBufferLength() != out2.getBufferLength()) {
err = GF_ENOENT;
return err;
}
if (memcmp(out1.getBuffer(), out2.getBuffer(),
out1.getBufferLength()) != 0) {
err = GF_ENOENT;
return err;
}
} else if (value == NULLPTR && (!CacheableToken::isInvalid(valuePtr) ||
valuePtr == NULLPTR)) {
err = (m_ServerResponse == 0 && valuePtr == NULLPTR) ? GF_NOERR
: GF_ENOENT;
if (updateCount >= 0 &&
!m_region.getAttributes()
->getConcurrencyChecksEnabled()) { // This means server has
// deleted an entry & same
// entry has been destroyed
// locally
// So call removeTrackerForEntry to remove key that was added in the
// map during addTrackerForEntry call.
m_region.m_entries->removeTrackerForEntry(key);
}
return err;
} else if (valuePtr == NULLPTR && value != NULLPTR &&
m_ServerResponse != 0) {
err = GF_ENOENT;
return err;
}
}
if (cachingEnabled) {
MapEntryImplPtr entry;
// for notification invoke the listener even if the key does
// not exist locally
GfErrType err;
LOGDEBUG("Region::remove: region [%s] removing key [%s]",
m_region.getFullPath(),
Utils::getCacheableKeyString(key)->asChar());
if ((err = m_region.m_entries->remove(key, oldValue, entry, updateCount,
versionTag, afterRemote)) !=
GF_NOERR) {
if (eventFlags.isNotification()) {
LOGDEBUG(
"Region::remove: region [%s] remove key [%s] for "
"notification having value [%s] failed with %d",
m_region.getFullPath(),
Utils::getCacheableKeyString(key)->asChar(),
Utils::getCacheableString(oldValue)->asChar(), err);
err = GF_NOERR;
}
return err;
}
if (oldValue != NULLPTR) {
LOGDEBUG(
"Region::remove: region [%s] removed key [%s] having "
"value [%s]",
m_region.getFullPath(), Utils::getCacheableKeyString(key)->asChar(),
Utils::getCacheableString(oldValue)->asChar());
// any cleanup required for the entry (e.g. removing from LRU list)
if (entry != NULLPTR) {
entry->cleanup(eventFlags);
}
// entry/region expiration
if (!eventFlags.isEvictOrExpire()) {
m_region.updateAccessAndModifiedTime(true);
}
// update the stats
m_region.m_regionStats->setEntries(m_region.m_entries->size());
m_region.m_cacheImpl->m_cacheStats->incEntries(-1);
}
}
// update the stats
m_region.m_regionStats->incDestroys();
m_region.m_cacheImpl->m_cacheStats->incDestroys();
return GF_NOERR;
}
private:
LocalRegion& m_region;
GfErrType m_ServerResponse;
};
class RemoveActionsEx : public RemoveActions {
public:
inline explicit RemoveActionsEx(LocalRegion& region) : RemoveActions(region) {
allowNULLValue = true;
}
};
// encapsulates actions that need to be taken for a invalidate() operation
class InvalidateActions {
public:
static const EntryEventType s_beforeEventType = BEFORE_INVALIDATE;
static const EntryEventType s_afterEventType = AFTER_INVALIDATE;
static const bool s_addIfAbsent = true;
static const bool s_failIfPresent = false;
TXState* m_txState;
inline explicit InvalidateActions(LocalRegion& region) : m_region(region) {
m_txState = TSSTXStateWrapper::s_geodeTSSTXState->getTXState();
}
inline static const char* name() { return "Region::invalidate"; }
inline static GfErrType checkArgs(const CacheableKeyPtr& key,
const CacheablePtr& value,
DataInput* delta = NULL) {
if (key == NULLPTR) {
return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION;
}
return GF_NOERR;
}
inline void getCallbackOldValue(bool cachingEnabled,
const CacheableKeyPtr& key,
MapEntryImplPtr& entry,
CacheablePtr& oldValue) const {
if (cachingEnabled) {
m_region.m_entries->getEntry(key, entry, oldValue);
}
}
inline static void logCacheWriterFailure(const CacheableKeyPtr& key,
const CacheablePtr& oldValue) {
bool isUpdate = (oldValue != NULLPTR);
LOGFINER("Cache writer vetoed %s for key %s",
(isUpdate ? "update" : "invalidate"),
Utils::getCacheableKeyString(key)->asChar());
}
inline GfErrType remoteUpdate(const CacheableKeyPtr& key,
const CacheablePtr& value,
const UserDataPtr& aCallbackArgument,
VersionTagPtr& versionTag) {
// if(m_txState != NULL && !m_txState->isReplay())
// {
// VectorOfCacheablePtr args(new VectorOfCacheable());
// args->push_back(aCallbackArgument);
// m_txState->recordTXOperation(GF_INVALIDATE,
// m_region.getFullPath(), key, args);
// }
// propagate the invalidate to remote server, if any
return m_region.invalidateNoThrow_remote(key, aCallbackArgument,
versionTag);
}
inline GfErrType localUpdate(const CacheableKeyPtr& key,
const CacheablePtr& value,
CacheablePtr& oldValue, bool cachingEnabled,
const CacheEventFlags eventFlags,
int updateCount, VersionTagPtr versionTag,
DataInput* delta = NULL,
EventIdPtr eventId = NULLPTR,
bool afterRemote = false) {
return m_region.invalidateLocal(name(), key, value, eventFlags, versionTag);
}
private:
LocalRegion& m_region;
};
} // namespace client
} // namespace geode
} // namespace apache
template <typename TAction>
GfErrType LocalRegion::updateNoThrow(const CacheableKeyPtr& key,
const CacheablePtr& value,
const UserDataPtr& aCallbackArgument,
CacheablePtr& oldValue, int updateCount,
const CacheEventFlags eventFlags,
VersionTagPtr versionTag, DataInput* delta,
EventIdPtr eventId) {
GfErrType err = GF_NOERR;
if ((err = TAction::checkArgs(key, value, delta)) != GF_NOERR) {
return err;
}
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
TAction action(*this);
TXState* txState = action.m_txState;
if (txState != NULL) {
if (isLocalOp(&eventFlags)) {
return GF_NOTSUP;
}
/* adongre - Coverity II
* CID 29194 (6): Parse warning (PW.PARAMETER_HIDDEN)
*/
// VersionTagPtr versionTag;
err = action.remoteUpdate(key, value, aCallbackArgument, versionTag);
if (err == GF_NOERR) {
txState->setDirty();
}
return err;
}
bool cachingEnabled = m_regionAttributes->getCachingEnabled();
MapEntryImplPtr entry;
// do not invoke the writer in case of notification/eviction
// or expiration
if (m_writer != NULLPTR && eventFlags.invokeCacheWriter()) {
action.getCallbackOldValue(cachingEnabled, key, entry, oldValue);
// invokeCacheWriterForEntryEvent method has the check that if oldValue
// is a CacheableToken then it sets it to NULL; also determines if it
// should be BEFORE_UPDATE or BEFORE_CREATE depending on oldValue
if (!invokeCacheWriterForEntryEvent(key, oldValue, value, aCallbackArgument,
eventFlags,
TAction::s_beforeEventType)) {
TAction::logCacheWriterFailure(key, oldValue);
return GF_CACHEWRITER_ERROR;
}
}
bool remoteOpDone = false;
// try the remote update; but if this fails (e.g. due to security
// exception) do not do the local update
// uses the technique of adding a tracking to the entry before proceeding
// for put; if the update counter changes when the remote update completes
// then it means that the local entry was overwritten in the meantime
// by a notification or another thread, so we do not do the local update
if (!eventFlags.isLocal() && !eventFlags.isNotification()) {
if (cachingEnabled && updateCount < 0 &&
!m_regionAttributes->getConcurrencyChecksEnabled()) {
// add a tracking for the entry
if ((updateCount = m_entries->addTrackerForEntry(
key, oldValue, TAction::s_addIfAbsent, TAction::s_failIfPresent,
true)) < 0) {
if (oldValue != NULLPTR) {
// fail for "create" when entry exists
return GF_CACHE_ENTRY_EXISTS;
}
}
}
// propagate the update to remote server, if any
err = action.remoteUpdate(key, value, aCallbackArgument, versionTag);
if (err != GF_NOERR) {
if (updateCount >= 0 &&
!m_regionAttributes->getConcurrencyChecksEnabled()) {
m_entries->removeTrackerForEntry(key);
}
return err;
}
remoteOpDone = true;
}
if (!eventFlags.isNotification() || getProcessedMarker()) {
if ((err = action.localUpdate(key, value, oldValue, cachingEnabled,
eventFlags, updateCount, versionTag, delta,
eventId, remoteOpDone)) ==
GF_CACHE_ENTRY_UPDATED) {
LOGFINEST(
"%s: did not change local value for key [%s] since it has "
"been updated by another thread while operation was in progress",
TAction::name(), Utils::getCacheableKeyString(key)->asChar());
err = GF_NOERR;
} else if (err == GF_CACHE_CONCURRENT_MODIFICATION_EXCEPTION) {
LOGDEBUG(
"Region::localUpdate: updateNoThrow<%s> for key [%s] failed because the cache already contains \
an entry with higher version. The cache listener will not be invoked.",
TAction::name(), Utils::getCacheableKeyString(key)->asChar());
// Cache listener won't be called in this case
return GF_NOERR;
} else if (err == GF_INVALID_DELTA) {
LOGDEBUG(
"Region::localUpdate: updateNoThrow<%s> for key [%s] failed because "
"of invalid delta.",
TAction::name(), Utils::getCacheableKeyString(key)->asChar());
m_cacheImpl->m_cacheStats->incFailureOnDeltaReceived();
// Get full object from server.
CacheablePtr& newValue1 = const_cast<CacheablePtr&>(value);
VersionTagPtr versionTag1;
err = getNoThrow_FullObject(eventId, newValue1, versionTag1);
if (err == GF_NOERR && newValue1 != NULLPTR) {
err = m_entries->put(key, newValue1, entry, oldValue, updateCount, 0,
versionTag1 != NULLPTR ? versionTag1 : versionTag);
if (err == GF_CACHE_CONCURRENT_MODIFICATION_EXCEPTION) {
LOGDEBUG(
"Region::localUpdate: updateNoThrow<%s> for key [%s] failed because the cache already contains \
an entry with higher version. The cache listener will not be invoked.",
TAction::name(), Utils::getCacheableKeyString(key)->asChar());
// Cache listener won't be called in this case
return GF_NOERR;
} else if (err != GF_NOERR) {
return err;
}
}
} else if (err != GF_NOERR) {
return err;
}
} else { // if (getProcessedMarker())
action.getCallbackOldValue(cachingEnabled, key, entry, oldValue);
if (updateCount >= 0 &&
!m_regionAttributes->getConcurrencyChecksEnabled()) {
m_entries->removeTrackerForEntry(key);
}
}
// invokeCacheListenerForEntryEvent method has the check that if oldValue
// is a CacheableToken then it sets it to NULL; also determines if it
// should be AFTER_UPDATE or AFTER_CREATE depending on oldValue
err =
invokeCacheListenerForEntryEvent(key, oldValue, value, aCallbackArgument,
eventFlags, TAction::s_afterEventType);
return err;
}
template <typename TAction>
GfErrType LocalRegion::updateNoThrowTX(const CacheableKeyPtr& key,
const CacheablePtr& value,
const UserDataPtr& aCallbackArgument,
CacheablePtr& oldValue, int updateCount,
const CacheEventFlags eventFlags,
VersionTagPtr versionTag,
DataInput* delta, EventIdPtr eventId) {
GfErrType err = GF_NOERR;
if ((err = TAction::checkArgs(key, value, delta)) != GF_NOERR) {
return err;
}
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
TAction action(*this);
bool cachingEnabled = m_regionAttributes->getCachingEnabled();
MapEntryImplPtr entry;
if (!eventFlags.isNotification() || getProcessedMarker()) {
if ((err = action.localUpdate(key, value, oldValue, cachingEnabled,
eventFlags, updateCount, versionTag, delta,
eventId)) == GF_CACHE_ENTRY_UPDATED) {
LOGFINEST(
"%s: did not change local value for key [%s] since it has "
"been updated by another thread while operation was in progress",
TAction::name(), Utils::getCacheableKeyString(key)->asChar());
err = GF_NOERR;
} else if (err == GF_CACHE_ENTRY_NOT_FOUND) {
// Entry not found. Possibly because the entry was added and removed in
// the
// same transaction. Ignoring this error #739
LOGFINE(
"%s: No entry found. Possibly because the entry was added and "
"removed in the same transaction. "
"Ignoring this error. ",
TAction::name(), Utils::getCacheableKeyString(key)->asChar());
err = GF_NOERR;
} else if (err != GF_NOERR) {
return err;
}
} else { // if (getProcessedMarker())
action.getCallbackOldValue(cachingEnabled, key, entry, oldValue);
if (updateCount >= 0 &&
!m_regionAttributes->getConcurrencyChecksEnabled()) {
m_entries->removeTrackerForEntry(key);
}
}
// invokeCacheListenerForEntryEvent method has the check that if oldValue
// is a CacheableToken then it sets it to NULL; also determines if it
// should be AFTER_UPDATE or AFTER_CREATE depending on oldValue
err =
invokeCacheListenerForEntryEvent(key, oldValue, value, aCallbackArgument,
eventFlags, TAction::s_afterEventType);
return err;
}
GfErrType LocalRegion::putNoThrow(const CacheableKeyPtr& key,
const CacheablePtr& value,
const UserDataPtr& aCallbackArgument,
CacheablePtr& oldValue, int updateCount,
const CacheEventFlags eventFlags,
VersionTagPtr versionTag, DataInput* delta,
EventIdPtr eventId) {
return updateNoThrow<PutActions>(key, value, aCallbackArgument, oldValue,
updateCount, eventFlags, versionTag, delta,
eventId);
}
GfErrType LocalRegion::putNoThrowTX(const CacheableKeyPtr& key,
const CacheablePtr& value,
const UserDataPtr& aCallbackArgument,
CacheablePtr& oldValue, int updateCount,
const CacheEventFlags eventFlags,
VersionTagPtr versionTag, DataInput* delta,
EventIdPtr eventId) {
return updateNoThrowTX<PutActionsTx>(key, value, aCallbackArgument, oldValue,
updateCount, eventFlags, versionTag,
delta, eventId);
}
GfErrType LocalRegion::createNoThrow(const CacheableKeyPtr& key,
const CacheablePtr& value,
const UserDataPtr& aCallbackArgument,
int updateCount,
const CacheEventFlags eventFlags,
VersionTagPtr versionTag) {
CacheablePtr oldValue;
return updateNoThrow<CreateActions>(key, value, aCallbackArgument, oldValue,
updateCount, eventFlags, versionTag);
}
GfErrType LocalRegion::destroyNoThrow(const CacheableKeyPtr& key,
const UserDataPtr& aCallbackArgument,
int updateCount,
const CacheEventFlags eventFlags,
VersionTagPtr versionTag) {
CacheablePtr oldValue;
return updateNoThrow<DestroyActions>(key, NULLPTR, aCallbackArgument,
oldValue, updateCount, eventFlags,
versionTag);
}
GfErrType LocalRegion::destroyNoThrowTX(const CacheableKeyPtr& key,
const UserDataPtr& aCallbackArgument,
int updateCount,
const CacheEventFlags eventFlags,
VersionTagPtr versionTag) {
CacheablePtr oldValue;
return updateNoThrowTX<DestroyActions>(key, NULLPTR, aCallbackArgument,
oldValue, updateCount, eventFlags,
versionTag);
}
GfErrType LocalRegion::removeNoThrow(const CacheableKeyPtr& key,
const CacheablePtr& value,
const UserDataPtr& aCallbackArgument,
int updateCount,
const CacheEventFlags eventFlags,
VersionTagPtr versionTag) {
CacheablePtr oldValue;
return updateNoThrow<RemoveActions>(key, value, aCallbackArgument, oldValue,
updateCount, eventFlags, versionTag);
}
GfErrType LocalRegion::removeNoThrowEx(const CacheableKeyPtr& key,
const UserDataPtr& aCallbackArgument,
int updateCount,
const CacheEventFlags eventFlags,
VersionTagPtr versionTag) {
CacheablePtr oldValue;
return updateNoThrow<RemoveActionsEx>(key, NULLPTR, aCallbackArgument,
oldValue, updateCount, eventFlags,
versionTag);
}
GfErrType LocalRegion::invalidateNoThrow(const CacheableKeyPtr& key,
const UserDataPtr& aCallbackArgument,
int updateCount,
const CacheEventFlags eventFlags,
VersionTagPtr versionTag) {
CacheablePtr oldValue;
return updateNoThrow<InvalidateActions>(key, NULLPTR, aCallbackArgument,
oldValue, updateCount, eventFlags,
versionTag);
}
GfErrType LocalRegion::invalidateNoThrowTX(const CacheableKeyPtr& key,
const UserDataPtr& aCallbackArgument,
int updateCount,
const CacheEventFlags eventFlags,
VersionTagPtr versionTag) {
CacheablePtr oldValue;
return updateNoThrowTX<InvalidateActions>(key, NULLPTR, aCallbackArgument,
oldValue, updateCount, eventFlags,
versionTag);
}
GfErrType LocalRegion::putAllNoThrow(const HashMapOfCacheable& map,
uint32_t timeout,
const UserDataPtr& aCallbackArgument) {
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
GfErrType err = GF_NOERR;
// VersionTagPtr versionTag;
VersionedCacheableObjectPartListPtr
versionedObjPartListPtr; //= new VersionedCacheableObjectPartList();
TXState* txState = getTXState();
if (txState != NULL) {
if (isLocalOp()) {
return GF_NOTSUP;
}
// if(!txState->isReplay())
//{
// VectorOfCacheablePtr args(new VectorOfCacheable());
// args->push_back(HashMapOfCacheablePtr(new HashMapOfCacheable(map)));
// args->push_back(CacheableInt32::create(timeout));
// txState->recordTXOperation(GF_PUT_ALL, getFullPath(), NULLPTR, args);
//}
err = putAllNoThrow_remote(map, /*versionTag*/ versionedObjPartListPtr,
timeout, aCallbackArgument);
if (err == GF_NOERR) {
txState->setDirty();
}
return err;
}
bool cachingEnabled = m_regionAttributes->getCachingEnabled();
MapOfOldValue oldValueMap;
// remove tracking for the entries befor exiting the function
struct RemoveTracking {
private:
const MapOfOldValue& m_oldValueMap;
LocalRegion& m_region;
public:
RemoveTracking(const MapOfOldValue& oldValueMap, LocalRegion& region)
: m_oldValueMap(oldValueMap), m_region(region) {}
~RemoveTracking() {
if (!m_region.getAttributes()->getConcurrencyChecksEnabled()) {
// need to remove the tracking added to the entries at the end
for (MapOfOldValue::const_iterator iter = m_oldValueMap.begin();
iter != m_oldValueMap.end(); ++iter) {
if (iter->second.second >= 0) {
m_region.m_entries->removeTrackerForEntry(iter->first);
}
}
}
}
} _removeTracking(oldValueMap, *this);
if (cachingEnabled || m_writer != NULLPTR) {
CacheablePtr oldValue;
for (HashMapOfCacheable::Iterator iter = map.begin(); iter != map.end();
++iter) {
const CacheableKeyPtr& key = iter.first();
if (cachingEnabled &&
!m_regionAttributes->getConcurrencyChecksEnabled()) {
int updateCount =
m_entries->addTrackerForEntry(key, oldValue, true, false, true);
oldValueMap.insert(
std::make_pair(key, std::make_pair(oldValue, updateCount)));
}
if (m_writer != NULLPTR) {
// invokeCacheWriterForEntryEvent method has the check that if oldValue
// is a CacheableToken then it sets it to NULL; also determines if it
// should be BEFORE_UPDATE or BEFORE_CREATE depending on oldValue
if (!invokeCacheWriterForEntryEvent(
key, oldValue, iter.second(), aCallbackArgument,
CacheEventFlags::LOCAL, BEFORE_UPDATE)) {
PutActions::logCacheWriterFailure(key, oldValue);
return GF_CACHEWRITER_ERROR;
}
}
}
}
// try remote putAll, if any
if ((err = putAllNoThrow_remote(map, versionedObjPartListPtr, timeout,
aCallbackArgument)) != GF_NOERR) {
return err;
}
// next the local puts
GfErrType localErr;
VersionTagPtr versionTag;
if (cachingEnabled) {
if (m_isPRSingleHopEnabled) { /*New PRSingleHop Case:: PR Singlehop
condition*/
for (int keyIndex = 0;
keyIndex < versionedObjPartListPtr->getSucceededKeys()->size();
keyIndex++) {
const CacheablePtr valPtr =
versionedObjPartListPtr->getSucceededKeys()->at(keyIndex);
HashMapOfCacheable::Iterator mapIter = map.find(valPtr);
CacheableKeyPtr key = NULLPTR;
CacheablePtr value = NULLPTR;
if (mapIter != map.end()) {
key = mapIter.first();
value = mapIter.second();
} else {
// ThrowERROR
LOGERROR(
"ERROR :: LocalRegion::putAllNoThrow() Key must be found in the "
"usermap");
}
if (versionedObjPartListPtr != NULLPTR &&
versionedObjPartListPtr.ptr() != NULL) {
LOGDEBUG("versionedObjPartListPtr->getVersionedTagptr().size() = %d ",
versionedObjPartListPtr->getVersionedTagptr().size());
if (versionedObjPartListPtr->getVersionedTagptr().size() > 0) {
versionTag =
versionedObjPartListPtr->getVersionedTagptr()[keyIndex];
}
}
std::pair<CacheablePtr, int>& p = oldValueMap[key];
if ((localErr = LocalRegion::putNoThrow(
key, value, aCallbackArgument, p.first, p.second,
CacheEventFlags::LOCAL | CacheEventFlags::NOCACHEWRITER,
versionTag)) == GF_CACHE_ENTRY_UPDATED) {
LOGFINEST(
"Region::putAll: did not change local value for key [%s] "
"since it has been updated by another thread while operation was "
"in progress",
Utils::getCacheableKeyString(key)->asChar());
} else if (localErr == GF_CACHE_LISTENER_EXCEPTION) {
LOGFINER("Region::putAll: invoke listener error [%d] for key [%s]",
localErr, Utils::getCacheableKeyString(key)->asChar());
err = localErr;
} else if (localErr != GF_NOERR) {
return localErr;
}
} // End of for loop
} else { /*Non SingleHop case :: PUTALL has taken multiple hops*/
LOGDEBUG(
"NILKANTH LocalRegion::putAllNoThrow m_isPRSingleHopEnabled = %d "
"expected false",
m_isPRSingleHopEnabled);
int index = 0;
for (HashMapOfCacheable::Iterator iter = map.begin(); iter != map.end();
++iter) {
const CacheableKeyPtr& key = iter.first();
const CacheablePtr& value = iter.second();
std::pair<CacheablePtr, int>& p = oldValueMap[key];
if (versionedObjPartListPtr != NULLPTR &&
versionedObjPartListPtr.ptr() != NULL) {
LOGDEBUG("versionedObjPartListPtr->getVersionedTagptr().size() = %d ",
versionedObjPartListPtr->getVersionedTagptr().size());
if (versionedObjPartListPtr->getVersionedTagptr().size() > 0) {
versionTag = versionedObjPartListPtr->getVersionedTagptr()[index++];
}
}
if ((localErr = LocalRegion::putNoThrow(
key, value, aCallbackArgument, p.first, p.second,
CacheEventFlags::LOCAL | CacheEventFlags::NOCACHEWRITER,
versionTag)) == GF_CACHE_ENTRY_UPDATED) {
LOGFINEST(
"Region::putAll: did not change local value for key [%s] "
"since it has been updated by another thread while operation was "
"in progress",
Utils::getCacheableKeyString(key)->asChar());
} else if (localErr == GF_CACHE_LISTENER_EXCEPTION) {
LOGFINER("Region::putAll: invoke listener error [%d] for key [%s]",
localErr, Utils::getCacheableKeyString(key)->asChar());
err = localErr;
} else if (localErr != GF_NOERR) {
return localErr;
}
}
}
}
m_regionStats->incPutAll();
return err;
}
GfErrType LocalRegion::removeAllNoThrow(const VectorOfCacheableKey& keys,
const UserDataPtr& aCallbackArgument) {
// 1. check destroy pending
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
GfErrType err = GF_NOERR;
VersionedCacheableObjectPartListPtr versionedObjPartListPtr;
// 2.check transaction state and do remote op
TXState* txState = getTXState();
if (txState != NULL) {
if (isLocalOp()) return GF_NOTSUP;
err = removeAllNoThrow_remote(keys, versionedObjPartListPtr,
aCallbackArgument);
if (err == GF_NOERR) txState->setDirty();
return err;
}
// 3.add tracking
bool cachingEnabled = m_regionAttributes->getCachingEnabled();
// 4. do remote removeAll
err =
removeAllNoThrow_remote(keys, versionedObjPartListPtr, aCallbackArgument);
if (err != GF_NOERR) {
return err;
}
// 5. update local cache
GfErrType localErr;
VersionTagPtr versionTag;
if (cachingEnabled) {
VectorOfCacheableKey* keysPtr;
if (m_isPRSingleHopEnabled) {
keysPtr = versionedObjPartListPtr->getSucceededKeys().ptr();
} else {
keysPtr = const_cast<VectorOfCacheableKey*>(&keys);
}
for (int keyIndex = 0; keyIndex < keysPtr->size(); keyIndex++) {
CacheableKeyPtr key = keysPtr->at(keyIndex);
if (versionedObjPartListPtr != NULLPTR &&
versionedObjPartListPtr.ptr() != NULL) {
LOGDEBUG("versionedObjPartListPtr->getVersionedTagptr().size() = %d ",
versionedObjPartListPtr->getVersionedTagptr().size());
if (versionedObjPartListPtr->getVersionedTagptr().size() > 0) {
versionTag = versionedObjPartListPtr->getVersionedTagptr()[keyIndex];
}
if (versionTag == NULLPTR) {
LOGDEBUG(
"RemoveAll hits EntryNotFoundException at server side for key "
"[%s], not to destroy it from local cache.",
Utils::getCacheableKeyString(key)->asChar());
continue;
}
}
if ((localErr = LocalRegion::destroyNoThrow(
key, aCallbackArgument, -1,
CacheEventFlags::LOCAL | CacheEventFlags::NOCACHEWRITER,
versionTag)) == GF_CACHE_ENTRY_UPDATED) {
LOGFINEST(
"Region::removeAll: did not remove local value for key [%s] "
"since it has been updated by another thread while operation was "
"in progress",
Utils::getCacheableKeyString(key)->asChar());
} else if (localErr == GF_CACHE_LISTENER_EXCEPTION) {
LOGFINER("Region::removeAll: invoke listener error [%d] for key [%s]",
localErr, Utils::getCacheableKeyString(key)->asChar());
err = localErr;
} else if (localErr == GF_CACHE_ENTRY_NOT_FOUND) {
LOGFINER("Region::removeAll: error [%d] for key [%s]", localErr,
Utils::getCacheableKeyString(key)->asChar());
} else if (localErr != GF_NOERR) {
return localErr;
}
} // End of for loop
}
// 6.update stats
m_regionStats->incRemoveAll();
return err;
}
void LocalRegion::clear(const UserDataPtr& aCallbackArgument) {
/*update the stats */
int64 sampleStartNanos = Utils::startStatOpTime();
localClear(aCallbackArgument);
Utils::updateStatOpTime(m_regionStats->getStat(),
RegionStatType::getInstance()->getClearsId(),
sampleStartNanos);
}
void LocalRegion::localClear(const UserDataPtr& aCallbackArgument) {
GfErrType err = localClearNoThrow(aCallbackArgument, CacheEventFlags::LOCAL);
if (err != GF_NOERR) GfErrTypeToException("LocalRegion::localClear", err);
}
GfErrType LocalRegion::localClearNoThrow(const UserDataPtr& aCallbackArgument,
const CacheEventFlags eventFlags) {
bool cachingEnabled = m_regionAttributes->getCachingEnabled();
/*Update the stats for clear*/
m_regionStats->incClears();
GfErrType err = GF_NOERR;
TryReadGuard guard(m_rwLock, m_destroyPending);
if (m_released || m_destroyPending) return err;
if (!invokeCacheWriterForRegionEvent(aCallbackArgument, eventFlags,
BEFORE_REGION_CLEAR)) {
LOGFINE("Cache writer prevented region clear");
return GF_CACHEWRITER_ERROR;
}
if (cachingEnabled == true) m_entries->clear();
if (!eventFlags.isNormal()) {
err = invokeCacheListenerForRegionEvent(aCallbackArgument, eventFlags,
AFTER_REGION_CLEAR);
}
return err;
}
GfErrType LocalRegion::invalidateLocal(const char* name,
const CacheableKeyPtr& keyPtr,
const CacheablePtr& value,
const CacheEventFlags eventFlags,
VersionTagPtr versionTag) {
if (keyPtr == NULLPTR) {
return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION;
}
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
GfErrType err = GF_NOERR;
bool cachingEnabled = m_regionAttributes->getCachingEnabled();
CacheablePtr oldValue;
MapEntryImplPtr me;
if (!eventFlags.isNotification() || getProcessedMarker()) {
if (cachingEnabled) {
LOGDEBUG("%s: region [%s] invalidating key [%s], value [%s]", name,
getFullPath(), Utils::getCacheableKeyString(keyPtr)->asChar(),
Utils::getCacheableString(value)->asChar());
/* adongre - Coverity II
* CID 29193: Parse warning (PW.PARAMETER_HIDDEN)
*/
// VersionTagPtr versionTag;
if ((err = m_entries->invalidate(keyPtr, me, oldValue, versionTag)) !=
GF_NOERR) {
if (eventFlags.isNotification()) {
LOGDEBUG(
"Region::invalidate: region [%s] invalidate key [%s] "
"failed with error %d",
getFullPath(), Utils::getCacheableKeyString(keyPtr)->asChar(),
err);
}
if (err == GF_CACHE_CONCURRENT_MODIFICATION_EXCEPTION) {
LOGDEBUG(
"Region::invalidateLocal: invalidate for key [%s] failed because the cache already contains \
an entry with higher version. The cache listener will not be invoked.",
Utils::getCacheableKeyString(keyPtr)->asChar());
// Cache listener won't be called in this case
return GF_NOERR;
}
// for notification invoke the listener even if the key does
// not exist locally
if (!eventFlags.isNotification() || err != GF_CACHE_ENTRY_NOT_FOUND) {
return err;
} else {
err = GF_NOERR;
}
} else {
LOGDEBUG("Region::invalidate: region [%s] invalidated key [%s]",
getFullPath(), Utils::getCacheableKeyString(keyPtr)->asChar());
}
// entry/region expiration
if (!eventFlags.isEvictOrExpire()) {
updateAccessAndModifiedTime(true);
}
}
} else { // if (getProcessedMarker())
if (cachingEnabled) {
m_entries->getEntry(keyPtr, me, oldValue);
}
}
return err;
}
GfErrType LocalRegion::invalidateRegionNoThrow(
const UserDataPtr& aCallbackArgument, const CacheEventFlags eventFlags) {
CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard);
GfErrType err = GF_NOERR;
if (m_regionAttributes->getCachingEnabled()) {
VectorOfCacheableKey v;
keys_internal(v);
uint32_t size = v.size();
MapEntryImplPtr me;
for (uint32_t i = 0; i < size; i++) {
{
CacheablePtr oldValue;
// invalidate all the entries with a NULL versionTag
VersionTagPtr versionTag;
m_entries->invalidate(v.at(i), me, oldValue, versionTag);
if (!eventFlags.isEvictOrExpire()) {
updateAccessAndModifiedTimeForEntry(me, true);
}
}
}
if (!eventFlags.isEvictOrExpire()) {
updateAccessAndModifiedTime(true);
}
}
// try remote region invalidate, if any
if (!eventFlags.isLocal()) {
err = invalidateRegionNoThrow_remote(aCallbackArgument);
if (err != GF_NOERR) return err;
}
if (m_subRegions.current_size() > 0) {
ACE_Guard<ACE_Recursive_Thread_Mutex> subguard(m_subRegions.mutex());
for (MapOfRegionWithLock::iterator p = m_subRegions.begin();
p != m_subRegions.end(); ++p) {
RegionInternal* subRegion =
dynamic_cast<RegionInternal*>((*p).int_id_.ptr());
if (subRegion != NULL) {
err = subRegion->invalidateRegionNoThrow(aCallbackArgument, eventFlags);
if (err != GF_NOERR) {
return err;
}
}
}
}
err = invokeCacheListenerForRegionEvent(aCallbackArgument, eventFlags,
AFTER_REGION_INVALIDATE);
return err;
}
GfErrType LocalRegion::destroyRegionNoThrow(
const UserDataPtr& aCallbackArgument, bool removeFromParent,
const CacheEventFlags eventFlags) {
// Get global locks to synchronize with failover thread.
// TODO: This should go into RegionGlobalLocks
// The distMngrsLock is required before RegionGlobalLocks since failover
// thread acquires distMngrsLock and then tries to acquire endpoints lock
// which is already taken by RegionGlobalLocks here.
DistManagersLockGuard _guard(m_cacheImpl->tcrConnectionManager());
RegionGlobalLocks acquireLocks(this);
// Fix for BUG:849, i.e Remove subscription on region before destroying the
// region
if (eventFlags == CacheEventFlags::LOCAL) {
if (unregisterKeysBeforeDestroyRegion() != GF_NOERR) {
LOGDEBUG(
"DEBUG :: LocalRegion::destroyRegionNoThrow UnregisteredKeys Failed");
}
}
TryWriteGuard guard(m_rwLock, m_destroyPending);
if (m_destroyPending) {
if (eventFlags.isCacheClose()) {
return GF_NOERR;
} else {
return GF_CACHE_REGION_DESTROYED_EXCEPTION;
}
}
m_destroyPending = true;
LOGDEBUG("LocalRegion::destroyRegionNoThrow( ): set flag destroy-pending.");
GfErrType err = GF_NOERR;
// do not invoke the writer for expiry or notification
if (!eventFlags.isNotification() && !eventFlags.isEvictOrExpire()) {
if (!invokeCacheWriterForRegionEvent(aCallbackArgument, eventFlags,
BEFORE_REGION_DESTROY)) {
// do not let CacheWriter veto when this is Cache::close()
if (!eventFlags.isCacheClose()) {
LOGFINE("Cache writer prevented region destroy");
m_destroyPending = false;
return GF_CACHEWRITER_ERROR;
}
}
// for the expiry case try the local destroy first and remote
// destroy only if local destroy succeeds
if (!eventFlags.isLocal()) {
err = destroyRegionNoThrow_remote(aCallbackArgument);
if (err != GF_NOERR) {
m_destroyPending = false;
return err;
}
}
}
LOGFINE("Region %s is being destroyed", m_fullPath.c_str());
{
MapOfRegionGuard guard(m_subRegions.mutex());
for (MapOfRegionWithLock::iterator p = m_subRegions.begin();
p != m_subRegions.end(); ++p) {
// TODO: remove unnecessary dynamic_cast by having m_subRegions hold
// RegionInternal and invoke the destroy method in that
RegionInternal* subRegion =
dynamic_cast<RegionInternal*>((*p).int_id_.ptr());
if (subRegion != NULL) {
// for subregions never remove from parent since that will cause
// the region to be destroyed and SEGV; unbind_all takes care of that
// Also don't send remote destroy message for sub-regions
err = subRegion->destroyRegionNoThrow(
aCallbackArgument, false, eventFlags | CacheEventFlags::LOCAL);
// for Cache::close() keep going as far as possible
if (err != GF_NOERR && !eventFlags.isCacheClose()) {
m_destroyPending = false;
return err;
}
}
}
}
m_subRegions.unbind_all();
// for the expiry case try the local destroy first and remote
// destroy only if local destroy succeeds
if (eventFlags.isEvictOrExpire() && !eventFlags.isLocal()) {
err = destroyRegionNoThrow_remote(aCallbackArgument);
if (err != GF_NOERR) {
m_destroyPending = false;
return err;
}
}
// if we are not removing from parent then this is a proper
// region close so invoke listener->close() also
err = invokeCacheListenerForRegionEvent(aCallbackArgument, eventFlags,
AFTER_REGION_DESTROY);
release(true);
if (m_regionAttributes->getCachingEnabled()) {
GF_SAFE_DELETE(m_entries);
}
GF_D_ASSERT(m_destroyPending);
if (removeFromParent) {
if (m_parentRegion == NULLPTR) {
m_cacheImpl->removeRegion(m_name.c_str());
} else {
LocalRegion* parent = dynamic_cast<LocalRegion*>(m_parentRegion.ptr());
if (parent != NULL) {
parent->removeRegion(m_name);
if (!eventFlags.isEvictOrExpire()) {
parent->updateAccessAndModifiedTime(true);
}
}
}
}
return err;
}
GfErrType LocalRegion::putLocal(const char* name, bool isCreate,
const CacheableKeyPtr& key,
const CacheablePtr& value,
CacheablePtr& oldValue, bool cachingEnabled,
int updateCount, int destroyTracker,
VersionTagPtr versionTag, DataInput* delta,
EventIdPtr eventId) {
GfErrType err = GF_NOERR;
bool isUpdate = !isCreate;
if (cachingEnabled) {
MapEntryImplPtr entry;
LOGDEBUG("%s: region [%s] putting key [%s], value [%s]", name,
getFullPath(), Utils::getCacheableKeyString(key)->asChar(),
Utils::getCacheableString(value)->asChar());
if (isCreate) {
err = m_entries->create(key, value, entry, oldValue, updateCount,
destroyTracker, versionTag);
} else {
err = m_entries->put(key, value, entry, oldValue, updateCount,
destroyTracker, versionTag, isUpdate, delta);
if (err == GF_INVALID_DELTA) {
m_cacheImpl->m_cacheStats->incFailureOnDeltaReceived();
// PXR: Get full object from server.
CacheablePtr& newValue1 = const_cast<CacheablePtr&>(value);
VersionTagPtr versionTag1;
err = getNoThrow_FullObject(eventId, newValue1, versionTag1);
if (err == GF_NOERR && newValue1 != NULLPTR) {
err = m_entries->put(
key, newValue1, entry, oldValue, updateCount, destroyTracker,
versionTag1 != NULLPTR ? versionTag1 : versionTag, isUpdate);
}
}
if (delta != NULL &&
err == GF_NOERR) { // Means that delta is on and there is no failure.
m_cacheImpl->m_cacheStats->incDeltaReceived();
}
}
if (err != GF_NOERR) {
return err;
}
LOGDEBUG("%s: region [%s] %s key [%s], value [%s]", name, getFullPath(),
isUpdate ? "updated" : "created",
Utils::getCacheableKeyString(key)->asChar(),
Utils::getCacheableString(value)->asChar());
// entry/region expiration
if (entryExpiryEnabled()) {
if (isUpdate && entry->getExpProperties().getExpiryTaskId() != -1) {
updateAccessAndModifiedTimeForEntry(entry, true);
} else {
registerEntryExpiryTask(entry);
}
}
updateAccessAndModifiedTime(true);
}
// update the stats
if (isUpdate) {
m_regionStats->incPuts();
m_cacheImpl->m_cacheStats->incPuts();
} else {
if (cachingEnabled) {
m_regionStats->setEntries(m_entries->size());
m_cacheImpl->m_cacheStats->incEntries(1);
}
m_regionStats->incCreates();
m_cacheImpl->m_cacheStats->incCreates();
}
return err;
}
void LocalRegion::keys_internal(VectorOfCacheableKey& v) {
if (!m_regionAttributes->getCachingEnabled()) {
return;
}
uint32_t size = m_entries->size();
v.clear();
if (size == 0) {
return;
}
m_entries->keys(v);
}
void LocalRegion::entries_internal(VectorOfRegionEntry& me,
const bool recursive) {
m_entries->entries(me);
if (recursive == true) {
MapOfRegionGuard guard(m_subRegions.mutex());
for (MapOfRegionWithLock::iterator p = m_subRegions.begin();
p != m_subRegions.end(); ++p) {
dynamic_cast<LocalRegion*>((*p).int_id_.ptr())
->entries_internal(me, true);
}
}
}
int LocalRegion::removeRegion(const std::string& name) {
if (m_subRegions.current_size() == 0) {
return 0;
}
return m_subRegions.unbind(name);
}
bool LocalRegion::invokeCacheWriterForEntryEvent(
const CacheableKeyPtr& key, CacheablePtr& oldValue,
const CacheablePtr& newValue, const UserDataPtr& aCallbackArgument,
CacheEventFlags eventFlags, EntryEventType type) {
// Check if we have a local cache writer. If so, invoke and return.
bool bCacheWriterReturn = true;
if (m_writer != NULLPTR) {
if (oldValue != NULLPTR && CacheableToken::isInvalid(oldValue)) {
oldValue = NULLPTR;
}
EntryEvent event(RegionPtr(this), key, oldValue, newValue,
aCallbackArgument, eventFlags.isNotification());
const char* eventStr = "unknown";
try {
bool updateStats = true;
/*Update the CacheWriter Stats*/
int64 sampleStartNanos = Utils::startStatOpTime();
switch (type) {
case BEFORE_UPDATE: {
if (oldValue != NULLPTR) {
eventStr = "beforeUpdate";
bCacheWriterReturn = m_writer->beforeUpdate(event);
break;
}
// if oldValue is NULL then fall to BEFORE_CREATE case
}
case BEFORE_CREATE: {
eventStr = "beforeCreate";
bCacheWriterReturn = m_writer->beforeCreate(event);
break;
}
case BEFORE_DESTROY: {
eventStr = "beforeDestroy";
bCacheWriterReturn = m_writer->beforeDestroy(event);
break;
}
default: {
updateStats = false;
break;
}
}
if (updateStats) {
Utils::updateStatOpTime(
m_regionStats->getStat(),
RegionStatType::getInstance()->getWriterCallTimeId(),
sampleStartNanos);
m_regionStats->incWriterCallsCompleted();
}
} catch (const Exception& ex) {
LOGERROR("Exception in CacheWriter::%s: %s: %s", eventStr, ex.getName(),
ex.getMessage());
bCacheWriterReturn = false;
} catch (...) {
LOGERROR("Unknown exception in CacheWriter::%s", eventStr);
bCacheWriterReturn = false;
}
}
return bCacheWriterReturn;
}
bool LocalRegion::invokeCacheWriterForRegionEvent(
const UserDataPtr& aCallbackArgument, CacheEventFlags eventFlags,
RegionEventType type) {
// Check if we have a local cache writer. If so, invoke and return.
bool bCacheWriterReturn = true;
if (m_writer != NULLPTR) {
RegionEvent event(RegionPtr(this), aCallbackArgument,
eventFlags.isNotification());
const char* eventStr = "unknown";
try {
bool updateStats = true;
/*Update the CacheWriter Stats*/
int64 sampleStartNanos = Utils::startStatOpTime();
switch (type) {
case BEFORE_REGION_DESTROY: {
eventStr = "beforeRegionDestroy";
bCacheWriterReturn = m_writer->beforeRegionDestroy(event);
break;
}
case BEFORE_REGION_CLEAR: {
eventStr = "beforeRegionClear";
bCacheWriterReturn = m_writer->beforeRegionClear(event);
break;
}
default: {
updateStats = false;
break;
}
}
if (updateStats) {
Utils::updateStatOpTime(
m_regionStats->getStat(),
RegionStatType::getInstance()->getWriterCallTimeId(),
sampleStartNanos);
m_regionStats->incWriterCallsCompleted();
}
} catch (const Exception& ex) {
LOGERROR("Exception in CacheWriter::%s: %s", eventStr, ex.getName(),
ex.getMessage());
bCacheWriterReturn = false;
} catch (...) {
LOGERROR("Unknown exception in CacheWriter::%s", eventStr);
bCacheWriterReturn = false;
}
}
return bCacheWriterReturn;
}
GfErrType LocalRegion::invokeCacheListenerForEntryEvent(
const CacheableKeyPtr& key, CacheablePtr& oldValue,
const CacheablePtr& newValue, const UserDataPtr& aCallbackArgument,
CacheEventFlags eventFlags, EntryEventType type, bool isLocal) {
GfErrType err = GF_NOERR;
// Check if we have a local cache listener. If so, invoke and return.
if (m_listener != NULLPTR) {
if (oldValue != NULLPTR && CacheableToken::isInvalid(oldValue)) {
oldValue = NULLPTR;
}
EntryEvent event(RegionPtr(this), key, oldValue, newValue,
aCallbackArgument, eventFlags.isNotification());
const char* eventStr = "unknown";
try {
bool updateStats = true;
/*Update the CacheWriter Stats*/
int64 sampleStartNanos = Utils::startStatOpTime();
switch (type) {
case AFTER_UPDATE: {
// when CREATE is received from server for notification
// then force an afterUpdate even if key is not present in cache.
if (oldValue != NULLPTR || eventFlags.isNotificationUpdate() ||
isLocal) {
eventStr = "afterUpdate";
m_listener->afterUpdate(event);
break;
}
// if oldValue is NULL then fall to AFTER_CREATE case
}
case AFTER_CREATE: {
eventStr = "afterCreate";
m_listener->afterCreate(event);
break;
}
case AFTER_DESTROY: {
eventStr = "afterDestroy";
m_listener->afterDestroy(event);
break;
}
case AFTER_INVALIDATE: {
eventStr = "afterInvalidate";
m_listener->afterInvalidate(event);
break;
}
default: {
updateStats = false;
break;
}
}
if (updateStats) {
m_cacheImpl->m_cacheStats->incListenerCalls();
Utils::updateStatOpTime(
m_regionStats->getStat(),
RegionStatType::getInstance()->getListenerCallTimeId(),
sampleStartNanos);
m_regionStats->incListenerCallsCompleted();
}
} catch (const Exception& ex) {
LOGERROR("Exception in CacheListener for key[%s]::%s: %s: %s",
Utils::getCacheableKeyString(key)->asChar(), eventStr,
ex.getName(), ex.getMessage());
err = GF_CACHE_LISTENER_EXCEPTION;
} catch (...) {
LOGERROR("Unknown exception in CacheListener for key[%s]::%s",
Utils::getCacheableKeyString(key)->asChar(), eventStr);
err = GF_CACHE_LISTENER_EXCEPTION;
}
}
return err;
}
GfErrType LocalRegion::invokeCacheListenerForRegionEvent(
const UserDataPtr& aCallbackArgument, CacheEventFlags eventFlags,
RegionEventType type) {
GfErrType err = GF_NOERR;
// Check if we have a local cache listener. If so, invoke and return.
if (m_listener != NULLPTR) {
RegionEvent event(RegionPtr(this), aCallbackArgument,
eventFlags.isNotification());
const char* eventStr = "unknown";
try {
bool updateStats = true;
/*Update the CacheWriter Stats*/
int64 sampleStartNanos = Utils::startStatOpTime();
switch (type) {
case AFTER_REGION_DESTROY: {
eventStr = "afterRegionDestroy";
m_listener->afterRegionDestroy(event);
m_cacheImpl->m_cacheStats->incListenerCalls();
if (eventFlags.isCacheClose()) {
eventStr = "close";
m_listener->close(RegionPtr(this));
m_cacheImpl->m_cacheStats->incListenerCalls();
}
break;
}
case AFTER_REGION_INVALIDATE: {
eventStr = "afterRegionInvalidate";
m_listener->afterRegionInvalidate(event);
m_cacheImpl->m_cacheStats->incListenerCalls();
break;
}
case AFTER_REGION_CLEAR: {
eventStr = "afterRegionClear";
m_listener->afterRegionClear(event);
break;
}
default: {
updateStats = false;
break;
}
}
if (updateStats) {
Utils::updateStatOpTime(
m_regionStats->getStat(),
RegionStatType::getInstance()->getListenerCallTimeId(),
sampleStartNanos);
m_regionStats->incListenerCallsCompleted();
}
} catch (const Exception& ex) {
LOGERROR("Exception in CacheListener::%s: %s: %s", eventStr, ex.getName(),
ex.getMessage());
err = GF_CACHE_LISTENER_EXCEPTION;
} catch (...) {
LOGERROR("Unknown exception in CacheListener::%s", eventStr);
err = GF_CACHE_LISTENER_EXCEPTION;
}
}
return err;
}
// TODO: pass current time instead of evaluating it twice, here
// and in region
void LocalRegion::updateAccessAndModifiedTimeForEntry(MapEntryImplPtr& ptr,
bool modified) {
// locking is not required since setters use atomic operations
if (ptr != NULLPTR && entryExpiryEnabled()) {
ExpEntryProperties& expProps = ptr->getExpProperties();
uint32_t currTime = static_cast<uint32_t>(ACE_OS::gettimeofday().sec());
CacheableStringPtr keyStr;
if (Log::debugEnabled()) {
CacheableKeyPtr key;
ptr->getKeyI(key);
keyStr = Utils::getCacheableKeyString(key);
}
LOGDEBUG("Setting last accessed time for key [%s] in region %s to %d",
keyStr->asChar(), getFullPath(), currTime);
expProps.updateLastAccessTime(currTime);
if (modified) {
LOGDEBUG("Setting last modified time for key [%s] in region %s to %d",
keyStr->asChar(), getFullPath(), currTime);
expProps.updateLastModifiedTime(currTime);
}
}
}
uint32_t LocalRegion::adjustLruEntriesLimit(uint32_t limit) {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::adjustLruEntriesLimit);
RegionAttributesPtr attrs = m_regionAttributes;
if (!attrs->getCachingEnabled()) return 0;
bool hadlru = (attrs->getLruEntriesLimit() != 0);
bool needslru = (limit != 0);
if (hadlru != needslru) {
throw IllegalStateException(
"Cannot disable or enable LRU, can only adjust limit.");
}
uint32_t oldValue = attrs->getLruEntriesLimit();
setLruEntriesLimit(limit);
if (needslru) {
// checked in AttributesMutator already to assert that LRU was enabled..
LRUEntriesMap* lrumap = static_cast<LRUEntriesMap*>(m_entries);
lrumap->adjustLimit(limit);
}
return oldValue;
}
ExpirationAction::Action LocalRegion::adjustRegionExpiryAction(
ExpirationAction::Action action) {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::adjustRegionExpiryAction);
RegionAttributesPtr attrs = m_regionAttributes;
bool hadExpiry = (getRegionExpiryDuration() != 0);
if (!hadExpiry) {
throw IllegalStateException(
"Cannot change region ExpirationAction for region created without "
"region expiry.");
}
ExpirationAction::Action oldValue = getRegionExpiryAction();
setRegionTimeToLiveExpirationAction(action);
setRegionIdleTimeoutExpirationAction(action);
// m_regionExpirationAction = action;
return oldValue;
}
ExpirationAction::Action LocalRegion::adjustEntryExpiryAction(
ExpirationAction::Action action) {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::adjustEntryExpiryAction);
RegionAttributesPtr attrs = m_regionAttributes;
bool hadExpiry = (getEntryExpiryDuration() != 0);
if (!hadExpiry) {
throw IllegalStateException(
"Cannot change entry ExpirationAction for region created without entry "
"expiry.");
}
ExpirationAction::Action oldValue = getEntryExpirationAction();
setEntryTimeToLiveExpirationAction(action);
setEntryIdleTimeoutExpirationAction(action);
return oldValue;
}
int32_t LocalRegion::adjustRegionExpiryDuration(int32_t duration) {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::adjustRegionExpiryDuration);
RegionAttributesPtr attrs = m_regionAttributes;
bool hadExpiry = (getEntryExpiryDuration() != 0);
if (!hadExpiry) {
throw IllegalStateException(
"Cannot change region expiration duration for region created without "
"region expiry.");
}
int32_t oldValue = getRegionExpiryDuration();
setRegionTimeToLive(duration);
setRegionIdleTimeout(duration);
return oldValue;
}
int32_t LocalRegion::adjustEntryExpiryDuration(int32_t duration) {
CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::adjustEntryExpiryDuration);
RegionAttributesPtr attrs = m_regionAttributes;
bool hadExpiry = (getEntryExpiryDuration() != 0);
if (!hadExpiry) {
throw IllegalStateException(
"Cannot change entry expiration duration for region created without "
"entry expiry.");
}
int32_t oldValue = getEntryExpiryDuration();
setEntryTimeToLive(duration);
setEntryIdleTimeout(duration);
return oldValue;
}
/** they used to public methods in hpp file */
bool LocalRegion::isStatisticsEnabled() {
bool status = true;
if (m_cacheImpl == NULL) {
return false;
}
if (m_cacheImpl->getCache() != NULL) {
SystemProperties* props =
m_cacheImpl->getCache()->getDistributedSystem()->getSystemProperties();
if (props) {
status = props->statisticsEnabled();
}
}
return status;
}
bool LocalRegion::useModifiedTimeForRegionExpiry() {
uint32_t region_ttl = m_regionAttributes->getRegionTimeToLive();
if (region_ttl > 0) {
return true;
} else {
return false;
}
}
bool LocalRegion::useModifiedTimeForEntryExpiry() {
uint32_t entry_ttl = m_regionAttributes->getEntryTimeToLive();
if (entry_ttl > 0) {
return true;
} else {
return false;
}
}
bool LocalRegion::isEntryIdletimeEnabled() {
if (m_regionAttributes->getCachingEnabled() &&
0 != m_regionAttributes->getEntryIdleTimeout()) {
return true;
} else {
return false;
}
}
ExpirationAction::Action LocalRegion::getEntryExpirationAction() const {
uint32_t entry_ttl = m_regionAttributes->getEntryTimeToLive();
if (entry_ttl > 0) {
return m_regionAttributes->getEntryTimeToLiveAction();
} else {
return m_regionAttributes->getEntryIdleTimeoutAction();
}
}
ExpirationAction::Action LocalRegion::getRegionExpiryAction() const {
uint32_t region_ttl = m_regionAttributes->getRegionTimeToLive();
if (region_ttl > 0) {
return m_regionAttributes->getRegionTimeToLiveAction();
} else {
return m_regionAttributes->getRegionIdleTimeoutAction();
}
}
uint32_t LocalRegion::getRegionExpiryDuration() const {
uint32_t region_ttl = m_regionAttributes->getRegionTimeToLive();
uint32_t region_idle = m_regionAttributes->getRegionIdleTimeout();
if (region_ttl > 0) {
return region_ttl;
} else {
return region_idle;
}
}
uint32_t LocalRegion::getEntryExpiryDuration() const {
uint32_t entry_ttl = m_regionAttributes->getEntryTimeToLive();
uint32_t entry_idle = m_regionAttributes->getEntryIdleTimeout();
if (entry_ttl > 0) {
return entry_ttl;
} else {
return entry_idle;
}
}
/** methods to be overridden by derived classes*/
GfErrType LocalRegion::unregisterKeysBeforeDestroyRegion() { return GF_NOERR; }
GfErrType LocalRegion::getNoThrow_remote(const CacheableKeyPtr& keyPtr,
CacheablePtr& valPtr,
const UserDataPtr& aCallbackArgument,
VersionTagPtr& versionTag) {
return GF_NOERR;
}
GfErrType LocalRegion::putNoThrow_remote(const CacheableKeyPtr& keyPtr,
const CacheablePtr& cvalue,
const UserDataPtr& aCallbackArgument,
VersionTagPtr& versionTag,
bool checkDelta) {
return GF_NOERR;
}
GfErrType LocalRegion::putAllNoThrow_remote(
const HashMapOfCacheable& map,
VersionedCacheableObjectPartListPtr& putAllResponse, uint32_t timeout,
const UserDataPtr& aCallbackArgument) {
return GF_NOERR;
}
GfErrType LocalRegion::removeAllNoThrow_remote(
const VectorOfCacheableKey& keys,
VersionedCacheableObjectPartListPtr& versionedObjPartList,
const UserDataPtr& aCallbackArgument) {
return GF_NOERR;
}
GfErrType LocalRegion::createNoThrow_remote(
const CacheableKeyPtr& keyPtr, const CacheablePtr& cvalue,
const UserDataPtr& aCallbackArgument, VersionTagPtr& versionTag) {
return GF_NOERR;
}
GfErrType LocalRegion::destroyNoThrow_remote(
const CacheableKeyPtr& keyPtr, const UserDataPtr& aCallbackArgument,
VersionTagPtr& versionTag) {
return GF_NOERR;
}
GfErrType LocalRegion::removeNoThrow_remote(
const CacheableKeyPtr& keyPtr, const CacheablePtr& cvalue,
const UserDataPtr& aCallbackArgument, VersionTagPtr& versionTag) {
return GF_NOERR;
}
GfErrType LocalRegion::removeNoThrowEX_remote(
const CacheableKeyPtr& keyPtr, const UserDataPtr& aCallbackArgument,
VersionTagPtr& versionTag) {
return GF_NOERR;
}
GfErrType LocalRegion::invalidateNoThrow_remote(
const CacheableKeyPtr& keyPtr, const UserDataPtr& aCallbackArgument,
VersionTagPtr& versionTag) {
return GF_NOERR;
}
GfErrType LocalRegion::getAllNoThrow_remote(
const VectorOfCacheableKey* keys, const HashMapOfCacheablePtr& values,
const HashMapOfExceptionPtr& exceptions,
const VectorOfCacheableKeyPtr& resultKeys, bool addToLocalCache,
const UserDataPtr& aCallbackArgument) {
return GF_NOERR;
}
GfErrType LocalRegion::invalidateRegionNoThrow_remote(
const UserDataPtr& aCallbackArgument) {
return GF_NOERR;
}
GfErrType LocalRegion::destroyRegionNoThrow_remote(
const UserDataPtr& aCallbackArgument) {
return GF_NOERR;
}
void LocalRegion::adjustCacheListener(const CacheListenerPtr& aListener) {
WriteGuard guard(m_rwLock);
setCacheListener(aListener);
m_listener = aListener;
}
void LocalRegion::adjustCacheListener(const char* lib, const char* func) {
WriteGuard guard(m_rwLock);
setCacheListener(lib, func);
m_listener = m_regionAttributes->getCacheListener();
}
void LocalRegion::adjustCacheLoader(const CacheLoaderPtr& aLoader) {
WriteGuard guard(m_rwLock);
setCacheLoader(aLoader);
m_loader = aLoader;
}
void LocalRegion::adjustCacheLoader(const char* lib, const char* func) {
WriteGuard guard(m_rwLock);
setCacheLoader(lib, func);
m_loader = m_regionAttributes->getCacheLoader();
}
void LocalRegion::adjustCacheWriter(const CacheWriterPtr& aWriter) {
WriteGuard guard(m_rwLock);
setCacheWriter(aWriter);
m_writer = aWriter;
}
void LocalRegion::adjustCacheWriter(const char* lib, const char* func) {
WriteGuard guard(m_rwLock);
setCacheWriter(lib, func);
m_writer = m_regionAttributes->getCacheWriter();
}
void LocalRegion::evict(int32_t percentage) {
TryReadGuard guard(m_rwLock, m_destroyPending);
if (m_released || m_destroyPending) return;
if (m_entries != NULL) {
int32_t size = m_entries->size();
int32_t entriesToEvict = (int32_t)(percentage * size) / 100;
// only invoked from EvictionController so static_cast is always safe
LRUEntriesMap* lruMap = static_cast<LRUEntriesMap*>(m_entries);
LOGINFO("Evicting %d entries. Current entry count is %d", entriesToEvict,
size);
lruMap->processLRU(entriesToEvict);
}
}
void LocalRegion::invokeAfterAllEndPointDisconnected() {
if (m_listener != NULLPTR) {
int64 sampleStartNanos = Utils::startStatOpTime();
try {
m_listener->afterRegionDisconnected(RegionPtr(this));
} catch (const Exception& ex) {
LOGERROR("Exception in CacheListener::afterRegionDisconnected: %s: %s",
ex.getName(), ex.getMessage());
} catch (...) {
LOGERROR("Unknown exception in CacheListener::afterRegionDisconnected");
}
Utils::updateStatOpTime(
m_regionStats->getStat(),
RegionStatType::getInstance()->getListenerCallTimeId(),
sampleStartNanos);
m_regionStats->incListenerCallsCompleted();
}
}
GfErrType LocalRegion::getNoThrow_FullObject(EventIdPtr eventId,
CacheablePtr& fullObject,
VersionTagPtr& versionTag) {
return GF_NOERR;
}
CacheablePtr LocalRegion::handleReplay(GfErrType& err,
CacheablePtr value) const {
if (err == GF_TRANSACTION_DATA_REBALANCED_EXCEPTION ||
err == GF_TRANSACTION_DATA_NODE_HAS_DEPARTED_EXCEPTION) {
bool isRollBack = (err == GF_TRANSACTION_DATA_REBALANCED_EXCEPTION);
TXState* txState = getTXState();
if (txState == NULL) {
GfErrTypeThrowException("TXState is NULL",
GF_CACHE_ILLEGAL_STATE_EXCEPTION);
}
CacheablePtr ret = txState->replay(isRollBack);
err = GF_NOERR;
return ret;
}
return value;
}
TombstoneListPtr LocalRegion::getTombstoneList() { return m_tombstoneList; }