| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "LocalRegion.hpp" |
| |
| #include <sstream> |
| #include <vector> |
| |
| #include <geode/PoolManager.hpp> |
| #include <geode/SystemProperties.hpp> |
| |
| #include "CacheImpl.hpp" |
| #include "CacheRegionHelper.hpp" |
| #include "CacheableToken.hpp" |
| #include "EntryExpiryHandler.hpp" |
| #include "ExpiryTaskManager.hpp" |
| #include "LRUEntriesMap.hpp" |
| #include "RegionExpiryHandler.hpp" |
| #include "RegionGlobalLocks.hpp" |
| #include "SerializableHelper.hpp" |
| #include "TXState.hpp" |
| #include "Utils.hpp" |
| #include "VersionTag.hpp" |
| #include "util/Log.hpp" |
| #include "util/bounds.hpp" |
| #include "util/exception.hpp" |
| |
| namespace apache { |
| namespace geode { |
| namespace client { |
| |
| LocalRegion::LocalRegion(const std::string& name, CacheImpl* cacheImpl, |
| const std::shared_ptr<RegionInternal>& rPtr, |
| RegionAttributes attributes, |
| const std::shared_ptr<CacheStatistics>& stats, |
| bool enableTimeStatistics) |
| : RegionInternal(cacheImpl, attributes), |
| m_name(name), |
| m_parentRegion(rPtr), |
| m_destroyPending(false), |
| m_listener(nullptr), |
| m_writer(nullptr), |
| m_loader(nullptr), |
| m_released(false), |
| m_entries(nullptr), |
| m_cacheStatistics(stats), |
| m_transactionEnabled(false), |
| m_isPRSingleHopEnabled(false), |
| m_attachedPool(nullptr), |
| m_enableTimeStatistics(enableTimeStatistics), |
| m_persistenceManager(nullptr) { |
| if (m_parentRegion != nullptr) { |
| ((m_fullPath = m_parentRegion->getFullPath()) += "/") += m_name; |
| } else { |
| (m_fullPath = "/") += m_name; |
| } |
| // create entries map based on RegionAttributes... |
| if (attributes.getCachingEnabled()) { |
| m_entries = EntriesMapFactory::createMap(this, m_regionAttributes); |
| } |
| |
| // Initialize callbacks |
| std::shared_ptr<CacheListener> clptr; |
| std::shared_ptr<CacheWriter> cwptr; |
| clptr = m_regionAttributes.getCacheListener(); |
| m_listener = clptr; |
| cwptr = m_regionAttributes.getCacheWriter(); |
| m_writer = cwptr; |
| std::shared_ptr<CacheLoader> cldptr; |
| cldptr = m_regionAttributes.getCacheLoader(); |
| m_loader = cldptr; |
| |
| if (m_parentRegion != nullptr) { |
| ((m_fullPath = m_parentRegion->getFullPath()) += "/") += m_name; |
| } else { |
| (m_fullPath = "/") += m_name; |
| } |
| |
| m_regionStats = new RegionStats( |
| cacheImpl->getStatisticsManager().getStatisticsFactory(), m_fullPath); |
| auto p = cacheImpl->getPoolManager().find(getAttributes().getPoolName()); |
| setPool(p); |
| } |
| |
| const std::string& LocalRegion::getName() const { return m_name; } |
| |
| const std::string& LocalRegion::getFullPath() const { return m_fullPath; } |
| |
| std::shared_ptr<Region> LocalRegion::getParentRegion() const { |
| CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::getParentRegion); |
| return m_parentRegion; |
| } |
| |
| void LocalRegion::updateAccessAndModifiedTime(bool modified) { |
| // locking not required since setters use atomic operations |
| if (regionExpiryEnabled()) { |
| auto now = std::chrono::system_clock::now(); |
| LOGDEBUG("Setting last accessed time for region %s to %d", |
| getFullPath().c_str(), now.time_since_epoch().count()); |
| m_cacheStatistics->setLastAccessedTime(now); |
| if (modified) { |
| LOGDEBUG("Setting last modified time for region %s to %d", |
| getFullPath().c_str(), now.time_since_epoch().count()); |
| m_cacheStatistics->setLastModifiedTime(now); |
| } |
| // TODO: should we really touch the parent region?? |
| RegionInternal* ri = dynamic_cast<RegionInternal*>(m_parentRegion.get()); |
| if (ri != nullptr) { |
| ri->updateAccessAndModifiedTime(modified); |
| } |
| } |
| } |
| std::shared_ptr<CacheStatistics> LocalRegion::getStatistics() const { |
| CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::getStatistics); |
| bool m_statisticsEnabled = true; |
| auto& props = m_cacheImpl->getDistributedSystem().getSystemProperties(); |
| m_statisticsEnabled = props.statisticsEnabled(); |
| if (!m_statisticsEnabled) { |
| throw StatisticsDisabledException( |
| "LocalRegion::getStatistics statistics disabled for this region"); |
| } |
| |
| return m_cacheStatistics; |
| } |
| |
| void LocalRegion::invalidateRegion( |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| GfErrType err = |
| invalidateRegionNoThrow(aCallbackArgument, CacheEventFlags::NORMAL); |
| GfErrTypeToException("Region::invalidateRegion", err); |
| } |
| |
| void LocalRegion::localInvalidateRegion( |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| GfErrType err = |
| invalidateRegionNoThrow(aCallbackArgument, CacheEventFlags::LOCAL); |
| GfErrTypeToException("Region::localInvalidateRegion", err); |
| } |
| |
| void LocalRegion::destroyRegion( |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| GfErrType err = |
| destroyRegionNoThrow(aCallbackArgument, true, CacheEventFlags::NORMAL); |
| GfErrTypeToException("Region::destroyRegion", err); |
| } |
| |
| void LocalRegion::localDestroyRegion( |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| GfErrType err = |
| destroyRegionNoThrow(aCallbackArgument, true, CacheEventFlags::LOCAL); |
| GfErrTypeToException("Region::localDestroyRegion", err); |
| } |
| |
| void LocalRegion::tombstoneOperationNoThrow( |
| const std::shared_ptr<CacheableHashMap>& tombstoneVersions, |
| const std::shared_ptr<CacheableHashSet>& tombstoneKeys) { |
| bool cachingEnabled = m_regionAttributes.getCachingEnabled(); |
| |
| if (!cachingEnabled) return; |
| |
| if (tombstoneVersions) { |
| std::map<uint16_t, int64_t> gcVersions; |
| for (const auto& itr : *tombstoneVersions) { |
| if (auto member = |
| std::dynamic_pointer_cast<DSMemberForVersionStamp>(itr.first)) { |
| uint16_t memberId = |
| getCacheImpl()->getMemberListForVersionStamp()->add(member); |
| int64_t version = |
| (std::dynamic_pointer_cast<CacheableInt64>(itr.second))->value(); |
| gcVersions[memberId] = version; |
| } else { |
| LOGERROR( |
| "tombstone_operation contains incorrect gc versions in the " |
| "message. Region " + |
| getFullPath()); |
| continue; |
| } |
| } |
| m_entries->reapTombstones(gcVersions); |
| } else { |
| m_entries->reapTombstones(tombstoneKeys); |
| } |
| } |
| std::shared_ptr<Region> LocalRegion::getSubregion(const std::string& path) { |
| CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::getSubregion); |
| |
| static const std::string slash("/"); |
| if (path == slash || path.empty()) { |
| LOGERROR("Get subregion path [" + path + "] is not valid."); |
| throw IllegalArgumentException("Get subegion path is empty or a /"); |
| } |
| auto fullname = path; |
| if (fullname.substr(0, 1) == slash) { |
| fullname = path.substr(1); |
| } |
| // find second separator |
| size_t idx = fullname.find('/'); |
| auto stepname = fullname.substr(0, idx); |
| |
| std::shared_ptr<Region> region, rptr; |
| if (0 == m_subRegions.find(stepname, region)) { |
| if (stepname == fullname) { |
| // done... |
| rptr = region; |
| } else { |
| std::string remainder = fullname.substr(stepname.length() + 1); |
| rptr = region->getSubregion(remainder.c_str()); |
| } |
| } |
| return rptr; |
| } |
| |
| std::shared_ptr<Region> LocalRegion::createSubregion( |
| const std::string& subregionName, RegionAttributes regionAttributes) { |
| CHECK_DESTROY_PENDING(TryWriteGuard, LocalRegion::createSubregion); |
| { |
| std::string namestr = subregionName; |
| if (namestr.find('/') != std::string::npos) { |
| throw IllegalArgumentException( |
| "Malformed name string, contains region path seperator '/'"); |
| } |
| } |
| |
| MapOfRegionGuard guard1(m_subRegions.mutex()); |
| std::shared_ptr<Region> region_ptr; |
| if (0 == m_subRegions.find(subregionName, region_ptr)) { |
| throw RegionExistsException( |
| "LocalRegion::createSubregion: named region exists in the region"); |
| } |
| |
| auto csptr = std::make_shared<CacheStatistics>(); |
| auto rPtr = m_cacheImpl->createRegion_internal( |
| subregionName, |
| std::static_pointer_cast<RegionInternal>(shared_from_this()), |
| regionAttributes, csptr, false); |
| region_ptr = rPtr; |
| if (!rPtr) { |
| throw OutOfMemoryException("createSubregion: failed to create region"); |
| } |
| |
| // Instantiate a PersistenceManager object if DiskPolicy is overflow |
| if (regionAttributes.getDiskPolicy() == DiskPolicyType::OVERFLOWS) { |
| auto pmPtr = regionAttributes.getPersistenceManager(); |
| if (pmPtr == nullptr) { |
| throw NullPointerException( |
| "PersistenceManager could not be instantiated"); |
| } |
| auto props = regionAttributes.getPersistenceProperties(); |
| pmPtr->init(std::shared_ptr<Region>(rPtr), props); |
| rPtr->setPersistenceManager(pmPtr); |
| } |
| |
| rPtr->acquireReadLock(); |
| m_subRegions.bind(rPtr->getName(), std::shared_ptr<Region>(rPtr)); |
| |
| // schedule the sub region expiry if regionExpiry enabled. |
| rPtr->setRegionExpiryTask(); |
| rPtr->releaseReadLock(); |
| return region_ptr; |
| } |
| |
| std::vector<std::shared_ptr<Region>> LocalRegion::subregions( |
| const bool recursive) { |
| CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::subregions); |
| if (m_subRegions.current_size() == 0) { |
| return std::vector<std::shared_ptr<Region>>(); |
| } |
| |
| return subregions_internal(recursive); |
| } |
| std::shared_ptr<RegionEntry> LocalRegion::getEntry( |
| const std::shared_ptr<CacheableKey>& key) { |
| if (getTXState() != nullptr) { |
| GfErrTypeThrowException("GetEntry is not supported in transaction", |
| GF_NOTSUP); |
| } |
| std::shared_ptr<RegionEntry> rptr; |
| std::shared_ptr<Cacheable> valuePtr; |
| getEntry(key, valuePtr); |
| if (valuePtr != nullptr) { |
| rptr = createRegionEntry(key, valuePtr); |
| } |
| return rptr; |
| } |
| |
| void LocalRegion::getEntry(const std::shared_ptr<CacheableKey>& key, |
| std::shared_ptr<Cacheable>& valuePtr) { |
| if (key == nullptr) { |
| throw IllegalArgumentException("LocalRegion::getEntry: null key"); |
| } |
| |
| std::shared_ptr<MapEntryImpl> mePtr; |
| CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::getEntry); |
| if (m_regionAttributes.getCachingEnabled()) { |
| m_entries->getEntry(key, mePtr, valuePtr); |
| } |
| } |
| std::shared_ptr<Cacheable> LocalRegion::get( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| std::shared_ptr<Cacheable> rptr; |
| int64_t sampleStartNanos = startStatOpTime(); |
| GfErrType err = getNoThrow(key, rptr, aCallbackArgument); |
| updateStatOpTime(m_regionStats->getStat(), m_regionStats->getGetTimeId(), |
| sampleStartNanos); |
| |
| // rptr = handleReplay(err, rptr); |
| |
| GfErrTypeToException("Region::get", err); |
| |
| return rptr; |
| } |
| |
| void LocalRegion::put(const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| std::shared_ptr<Cacheable> oldValue; |
| int64_t sampleStartNanos = startStatOpTime(); |
| std::shared_ptr<VersionTag> versionTag; |
| GfErrType err = putNoThrow(key, value, aCallbackArgument, oldValue, -1, |
| CacheEventFlags::NORMAL, versionTag); |
| updateStatOpTime(m_regionStats->getStat(), m_regionStats->getPutTimeId(), |
| sampleStartNanos); |
| // handleReplay(err, nullptr); |
| GfErrTypeToException("Region::put", err); |
| } |
| |
| void LocalRegion::localPut( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| std::shared_ptr<Cacheable> oldValue; |
| std::shared_ptr<VersionTag> versionTag; |
| GfErrType err = putNoThrow(key, value, aCallbackArgument, oldValue, -1, |
| CacheEventFlags::LOCAL, versionTag); |
| GfErrTypeToException("Region::localPut", err); |
| } |
| |
| void LocalRegion::putAll( |
| const HashMapOfCacheable& map, std::chrono::milliseconds timeout, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| util::PROTOCOL_OPERATION_TIMEOUT_BOUNDS(timeout); |
| |
| auto sampleStartNanos = startStatOpTime(); |
| auto err = putAllNoThrow(map, timeout, aCallbackArgument); |
| updateStatOpTime(m_regionStats->getStat(), m_regionStats->getPutAllTimeId(), |
| sampleStartNanos); |
| // handleReplay(err, nullptr); |
| GfErrTypeToException("Region::putAll", err); |
| } |
| |
| void LocalRegion::removeAll( |
| const std::vector<std::shared_ptr<CacheableKey>>& keys, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| if (keys.size() == 0) { |
| throw IllegalArgumentException("Region::removeAll: zero keys provided"); |
| } |
| int64_t sampleStartNanos = startStatOpTime(); |
| GfErrType err = removeAllNoThrow(keys, aCallbackArgument); |
| updateStatOpTime(m_regionStats->getStat(), |
| m_regionStats->getRemoveAllTimeId(), sampleStartNanos); |
| GfErrTypeToException("Region::removeAll", err); |
| } |
| |
| void LocalRegion::create( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| std::shared_ptr<VersionTag> versionTag; |
| GfErrType err = createNoThrow(key, value, aCallbackArgument, -1, |
| CacheEventFlags::NORMAL, versionTag); |
| // handleReplay(err, nullptr); |
| GfErrTypeToException("Region::create", err); |
| } |
| |
| void LocalRegion::localCreate( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| std::shared_ptr<VersionTag> versionTag; |
| GfErrType err = createNoThrow(key, value, aCallbackArgument, -1, |
| CacheEventFlags::LOCAL, versionTag); |
| GfErrTypeToException("Region::localCreate", err); |
| } |
| |
| void LocalRegion::invalidate( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| std::shared_ptr<VersionTag> versionTag; |
| GfErrType err = invalidateNoThrow(key, aCallbackArgument, -1, |
| CacheEventFlags::NORMAL, versionTag); |
| // handleReplay(err, nullptr); |
| GfErrTypeToException("Region::invalidate", err); |
| } |
| |
| void LocalRegion::localInvalidate( |
| const std::shared_ptr<CacheableKey>& keyPtr, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| std::shared_ptr<VersionTag> versionTag; |
| GfErrType err = invalidateNoThrow(keyPtr, aCallbackArgument, -1, |
| CacheEventFlags::LOCAL, versionTag); |
| GfErrTypeToException("Region::localInvalidate", err); |
| } |
| |
| void LocalRegion::destroy( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| std::shared_ptr<VersionTag> versionTag; |
| |
| GfErrType err = destroyNoThrow(key, aCallbackArgument, -1, |
| CacheEventFlags::NORMAL, versionTag); |
| // handleReplay(err, nullptr); |
| GfErrTypeToException("Region::destroy", err); |
| } |
| |
| void LocalRegion::localDestroy( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| std::shared_ptr<VersionTag> versionTag; |
| GfErrType err = destroyNoThrow(key, aCallbackArgument, -1, |
| CacheEventFlags::LOCAL, versionTag); |
| GfErrTypeToException("Region::localDestroy", err); |
| } |
| |
| bool LocalRegion::remove( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| std::shared_ptr<VersionTag> versionTag; |
| GfErrType err = removeNoThrow(key, value, aCallbackArgument, -1, |
| CacheEventFlags::NORMAL, versionTag); |
| |
| bool result = false; |
| |
| if (err == GF_NOERR) { |
| result = true; |
| } else if (err != GF_ENOENT && err != GF_CACHE_ENTRY_NOT_FOUND) { |
| GfErrTypeToException("Region::remove", err); |
| } |
| |
| return result; |
| } |
| |
| bool LocalRegion::removeEx( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| std::shared_ptr<VersionTag> versionTag; |
| GfErrType err = removeNoThrowEx(key, aCallbackArgument, -1, |
| CacheEventFlags::NORMAL, versionTag); |
| bool result = false; |
| |
| if (err == GF_NOERR) { |
| result = true; |
| } else if (err != GF_ENOENT && err != GF_CACHE_ENTRY_NOT_FOUND) { |
| GfErrTypeToException("Region::removeEx", err); |
| } |
| |
| return result; |
| } |
| |
| bool LocalRegion::localRemove( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| std::shared_ptr<VersionTag> versionTag; |
| GfErrType err = removeNoThrow(key, value, aCallbackArgument, -1, |
| CacheEventFlags::LOCAL, versionTag); |
| |
| bool result = false; |
| |
| if (err == GF_NOERR) { |
| result = true; |
| } else if (err != GF_ENOENT && err != GF_CACHE_ENTRY_NOT_FOUND) { |
| GfErrTypeToException("Region::localRemove", err); |
| } |
| |
| return result; |
| } |
| |
| bool LocalRegion::localRemoveEx( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| std::shared_ptr<VersionTag> versionTag; |
| GfErrType err = removeNoThrowEx(key, aCallbackArgument, -1, |
| CacheEventFlags::LOCAL, versionTag); |
| |
| bool result = false; |
| |
| if (err == GF_NOERR) { |
| result = true; |
| } else if (err != GF_ENOENT && err != GF_CACHE_ENTRY_NOT_FOUND) { |
| GfErrTypeToException("Region::localRemoveEx", err); |
| } |
| |
| return result; |
| } |
| |
| std::vector<std::shared_ptr<CacheableKey>> LocalRegion::keys() { |
| CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::keys); |
| return keys_internal(); |
| } |
| |
| std::vector<std::shared_ptr<CacheableKey>> LocalRegion::serverKeys() { |
| throw UnsupportedOperationException( |
| "serverKeys is not supported for local regions."); |
| } |
| |
| std::vector<std::shared_ptr<Cacheable>> LocalRegion::values() { |
| CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::values); |
| |
| std::vector<std::shared_ptr<Cacheable>> values; |
| |
| if (m_regionAttributes.getCachingEnabled()) { |
| // invalidToken should not be added by the MapSegments. |
| m_entries->getValues(values); |
| } |
| |
| return values; |
| } |
| |
| std::vector<std::shared_ptr<RegionEntry>> LocalRegion::entries(bool recursive) { |
| CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::entries); |
| |
| std::vector<std::shared_ptr<RegionEntry>> entries; |
| |
| if (m_regionAttributes.getCachingEnabled()) { |
| entries_internal(entries, recursive); |
| } |
| |
| return entries; |
| } |
| |
| HashMapOfCacheable LocalRegion::getAll( |
| const std::vector<std::shared_ptr<CacheableKey>>& keys, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| return getAll_internal(keys, aCallbackArgument, true); |
| } |
| |
| HashMapOfCacheable LocalRegion::getAll_internal( |
| const std::vector<std::shared_ptr<CacheableKey>>& keys, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| bool addToLocalCache) { |
| if (keys.empty()) { |
| throw IllegalArgumentException("Region::getAll: zero keys provided"); |
| } |
| |
| int64_t sampleStartNanos = startStatOpTime(); |
| |
| auto values = std::make_shared<HashMapOfCacheable>(); |
| auto exceptions = std::make_shared<HashMapOfException>(); |
| GfErrType err = getAllNoThrow(keys, values, exceptions, addToLocalCache, |
| aCallbackArgument); |
| |
| updateStatOpTime(m_regionStats->getStat(), m_regionStats->getGetAllTimeId(), |
| sampleStartNanos); |
| |
| GfErrTypeToException("Region::getAll", err); |
| |
| return *values; |
| } |
| |
| uint32_t LocalRegion::size_remote() { |
| CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::size); |
| if (m_regionAttributes.getCachingEnabled()) { |
| return m_entries->size(); |
| } |
| return 0; |
| } |
| |
| uint32_t LocalRegion::size() { |
| TXState* txState = getTXState(); |
| if (txState != nullptr) { |
| if (isLocalOp()) { |
| return GF_NOTSUP; |
| } |
| return size_remote(); |
| } |
| |
| return LocalRegion::size_remote(); |
| } |
| RegionService& LocalRegion::getRegionService() const { |
| CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::getRegionService); |
| return *m_cacheImpl->getCache(); |
| } |
| |
| CacheImpl* LocalRegion::getCacheImpl() const { |
| CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::getCache); |
| return m_cacheImpl; |
| } |
| |
| bool LocalRegion::containsValueForKey_remote( |
| const std::shared_ptr<CacheableKey>& keyPtr) const { |
| CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::containsValueForKey); |
| if (!m_regionAttributes.getCachingEnabled()) { |
| return false; |
| } |
| std::shared_ptr<Cacheable> valuePtr; |
| std::shared_ptr<MapEntryImpl> mePtr; |
| m_entries->getEntry(keyPtr, mePtr, valuePtr); |
| if (mePtr == nullptr) { |
| return false; |
| } |
| return (valuePtr != nullptr && !CacheableToken::isInvalid(valuePtr)); |
| } |
| |
| bool LocalRegion::containsValueForKey( |
| const std::shared_ptr<CacheableKey>& keyPtr) const { |
| if (keyPtr == nullptr) { |
| throw IllegalArgumentException( |
| "LocalRegion::containsValueForKey: " |
| "key is null"); |
| } |
| |
| TXState* txState = getTXState(); |
| if (txState == nullptr) { |
| return LocalRegion::containsValueForKey_remote(keyPtr); |
| } |
| |
| return containsValueForKey_remote(keyPtr); |
| } |
| |
| bool LocalRegion::containsKeyOnServer( |
| const std::shared_ptr<CacheableKey>&) const { |
| throw UnsupportedOperationException( |
| "LocalRegion::containsKeyOnServer: is not supported."); |
| } |
| std::vector<std::shared_ptr<CacheableKey>> LocalRegion::getInterestList() |
| const { |
| throw UnsupportedOperationException( |
| "LocalRegion::getInterestList: is not supported."); |
| } |
| std::vector<std::shared_ptr<CacheableString>> |
| LocalRegion::getInterestListRegex() const { |
| throw UnsupportedOperationException( |
| "LocalRegion::getInterestListRegex: is not supported."); |
| } |
| |
| bool LocalRegion::containsKey( |
| const std::shared_ptr<CacheableKey>& keyPtr) const { |
| if (keyPtr == nullptr) { |
| throw IllegalArgumentException( |
| "LocalRegion::containsKey: " |
| "key is null"); |
| } |
| CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::containsKey); |
| return containsKey_internal(keyPtr); |
| } |
| |
| void LocalRegion::setPersistenceManager( |
| std::shared_ptr<PersistenceManager>& pmPtr) { |
| m_persistenceManager = pmPtr; |
| // set the memberVariable of LRUEntriesMap too. |
| LRUEntriesMap* lruMap = dynamic_cast<LRUEntriesMap*>(m_entries); |
| if (lruMap != nullptr) { |
| lruMap->setPersistenceManager(pmPtr); |
| } |
| } |
| |
| void LocalRegion::setRegionExpiryTask() { |
| if (regionExpiryEnabled()) { |
| auto rptr = std::static_pointer_cast<RegionInternal>(shared_from_this()); |
| const auto& duration = getRegionExpiryDuration(); |
| auto handler = |
| new RegionExpiryHandler(rptr, getRegionExpiryAction(), duration); |
| auto expiryTaskId = |
| rptr->getCacheImpl()->getExpiryTaskManager().scheduleExpiryTask( |
| handler, duration, std::chrono::seconds::zero()); |
| handler->setExpiryTaskId(expiryTaskId); |
| LOGFINE( |
| "expiry for region [%s], expiry task id = %d, duration = %d, " |
| "action = %d", |
| m_fullPath.c_str(), expiryTaskId, duration.count(), |
| getRegionExpiryAction()); |
| } |
| } |
| |
| void LocalRegion::registerEntryExpiryTask( |
| std::shared_ptr<MapEntryImpl>& entry) { |
| // locking is not required here since only the thread that creates |
| // the entry will register the expiry task for that entry |
| ExpEntryProperties& expProps = entry->getExpProperties(); |
| expProps.initStartTime(); |
| auto rptr = std::static_pointer_cast<RegionInternal>(shared_from_this()); |
| const auto& duration = getEntryExpiryDuration(); |
| auto handler = |
| new EntryExpiryHandler(rptr, entry, getEntryExpirationAction(), duration); |
| auto id = rptr->getCacheImpl()->getExpiryTaskManager().scheduleExpiryTask( |
| handler, duration, std::chrono::seconds::zero()); |
| if (Log::finestEnabled()) { |
| std::shared_ptr<CacheableKey> key; |
| entry->getKeyI(key); |
| LOGFINEST( |
| "entry expiry in region [%s], key [%s], task id = %d, " |
| "duration = %d, action = %d", |
| m_fullPath.c_str(), Utils::nullSafeToString(key).c_str(), id, |
| duration.count(), getEntryExpirationAction()); |
| } |
| expProps.setExpiryTaskId(id); |
| } |
| |
| LocalRegion::~LocalRegion() { |
| TryWriteGuard guard(m_rwLock, m_destroyPending); |
| if (!m_destroyPending) { |
| release(false); |
| } |
| m_listener = nullptr; |
| m_writer = nullptr; |
| m_loader = nullptr; |
| |
| _GEODE_SAFE_DELETE(m_entries); |
| _GEODE_SAFE_DELETE(m_regionStats); |
| } |
| |
| /** |
| * Release the region resources if not released already. |
| */ |
| void LocalRegion::release(bool invokeCallbacks) { |
| if (m_released) { |
| return; |
| } |
| LOGFINE("LocalRegion::release entered for region %s", m_fullPath.c_str()); |
| m_released = true; |
| |
| if (m_regionStats != nullptr) { |
| m_regionStats->close(); |
| } |
| if (invokeCallbacks) { |
| try { |
| if (m_loader != nullptr) { |
| m_loader->close(*this); |
| } |
| if (m_writer != nullptr) { |
| m_writer->close(*this); |
| } |
| // TODO: shouldn't listener also be here instead of |
| // during CacheImpl.close() |
| } catch (...) { |
| LOGWARN( |
| "Region close caught unknown exception in loader/writer " |
| "close; continuing"); |
| } |
| } |
| |
| if (m_persistenceManager != nullptr) { |
| m_persistenceManager->close(); |
| m_persistenceManager = nullptr; |
| } |
| if (m_entries != nullptr && m_regionAttributes.getCachingEnabled()) { |
| m_entries->close(); |
| } |
| LOGFINE("LocalRegion::release done for region %s", m_fullPath.c_str()); |
| } |
| |
| /** Returns whether the specified key currently exists in this region. |
| * This method is equivalent to <code>getEntry(key) != null</code>. |
| * |
| * @param keyPtr the key to check for an existing entry, type is |
| *CacheableString |
| *& |
| * @return true if there is an entry in this region for the specified key |
| *@throw RegionDestroyedException, if region is destroyed. |
| *@throw IllegalArgumentException, if the key is 'null'. |
| *@throw NotConnectedException, if not connected to geode system. |
| */ |
| bool LocalRegion::containsKey_internal( |
| const std::shared_ptr<CacheableKey>& keyPtr) const { |
| if (keyPtr == nullptr) { |
| throw IllegalArgumentException("Region::containsKey: key is null"); |
| } |
| if (!m_regionAttributes.getCachingEnabled()) { |
| return false; |
| } |
| return m_entries->containsKey(keyPtr); |
| } |
| |
| std::vector<std::shared_ptr<Region>> LocalRegion::subregions_internal( |
| const bool recursive) { |
| MapOfRegionGuard guard(m_subRegions.mutex()); |
| |
| std::vector<std::shared_ptr<Region>> regions; |
| regions.reserve(m_subRegions.current_size()); |
| |
| for (const auto& entry : m_subRegions) { |
| const auto& subRegion = entry.int_id_; |
| regions.push_back(subRegion); |
| |
| if (recursive == true) { |
| if (auto localRegion = |
| std::dynamic_pointer_cast<LocalRegion>(subRegion)) { |
| auto subRegions = localRegion->subregions_internal(true); |
| regions.insert(regions.end(), subRegions.begin(), subRegions.end()); |
| } |
| } |
| } |
| |
| return regions; |
| } |
| |
| GfErrType LocalRegion::getNoThrow( |
| const std::shared_ptr<CacheableKey>& keyPtr, |
| std::shared_ptr<Cacheable>& value, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard); |
| GfErrType err = GF_NOERR; |
| |
| if (keyPtr == nullptr) { |
| return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION; |
| } |
| TXState* txState = getTXState(); |
| if (txState != nullptr) { |
| if (isLocalOp()) { |
| return GF_NOTSUP; |
| } |
| std::shared_ptr<VersionTag> versionTag; |
| err = getNoThrow_remote(keyPtr, value, aCallbackArgument, versionTag); |
| if (err == GF_NOERR) { |
| txState->setDirty(); |
| } |
| if (CacheableToken::isInvalid(value) || |
| CacheableToken::isTombstone(value)) { |
| value = nullptr; |
| } |
| return err; |
| } |
| |
| m_regionStats->incGets(); |
| auto& cachePerfStats = m_cacheImpl->getCachePerfStats(); |
| cachePerfStats.incGets(); |
| |
| // TODO: CacheableToken::isInvalid should be completely hidden |
| // inside MapSegment; this should be done both for the value obtained |
| // from local cache as well as oldValue in every instance |
| std::shared_ptr<MapEntryImpl> me; |
| int updateCount = -1; |
| bool isLoaderInvoked = false; |
| bool isLocal = false; |
| bool cachingEnabled = m_regionAttributes.getCachingEnabled(); |
| std::shared_ptr<Cacheable> localValue = nullptr; |
| if (cachingEnabled) { |
| isLocal = m_entries->get(keyPtr, value, me); |
| if (isLocal && (value != nullptr && !CacheableToken::isInvalid(value))) { |
| m_regionStats->incHits(); |
| cachePerfStats.incHits(); |
| updateAccessAndModifiedTimeForEntry(me, false); |
| updateAccessAndModifiedTime(false); |
| return err; // found it in local cache... |
| } |
| localValue = value; |
| value = nullptr; |
| // start tracking the entry |
| if (!m_regionAttributes.getConcurrencyChecksEnabled()) { |
| updateCount = |
| m_entries->addTrackerForEntry(keyPtr, value, true, false, false); |
| LOGDEBUG( |
| "Region::get: added tracking with update counter [%d] for key " |
| "[%s] with value [%s]", |
| updateCount, Utils::nullSafeToString(keyPtr).c_str(), |
| Utils::nullSafeToString(value).c_str()); |
| } |
| } |
| |
| // remove tracking for the entry before exiting the function |
| struct RemoveTracking { |
| private: |
| const std::shared_ptr<CacheableKey>& m_key; |
| const int& m_updateCount; |
| LocalRegion& m_region; |
| |
| public: |
| RemoveTracking(const std::shared_ptr<CacheableKey>& key, |
| const int& updateCount, LocalRegion& region) |
| : m_key(key), m_updateCount(updateCount), m_region(region) {} |
| ~RemoveTracking() { |
| if (m_updateCount >= 0 && |
| !m_region.getAttributes().getConcurrencyChecksEnabled()) { |
| m_region.m_entries->removeTrackerForEntry(m_key); |
| } |
| } |
| } _removeTracking(keyPtr, updateCount, *this); |
| |
| // The control will come here only when caching is disabled or/and |
| // the entry was not found. In this case atleast update the region |
| // access times. |
| updateAccessAndModifiedTime(false); |
| m_regionStats->incMisses(); |
| |
| cachePerfStats.incMisses(); |
| std::shared_ptr<VersionTag> versionTag; |
| // Get from some remote source (e.g. external java server) if required. |
| err = getNoThrow_remote(keyPtr, value, aCallbackArgument, versionTag); |
| |
| // Its a cache missor it is invalid token then Check if we have a local |
| // loader. |
| if ((value == nullptr || CacheableToken::isInvalid(value) || |
| CacheableToken::isTombstone(value)) && |
| m_loader != nullptr) { |
| try { |
| isLoaderInvoked = true; |
| /*Update the statistics*/ |
| int64_t sampleStartNanos = startStatOpTime(); |
| value = m_loader->load(*this, keyPtr, aCallbackArgument); |
| updateStatOpTime(m_regionStats->getStat(), |
| m_regionStats->getLoaderCallTimeId(), sampleStartNanos); |
| m_regionStats->incLoaderCallsCompleted(); |
| } catch (const Exception& ex) { |
| LOGERROR("Error in CacheLoader::load: %s: %s", ex.getName().c_str(), |
| ex.what()); |
| err = GF_CACHE_LOADER_EXCEPTION; |
| } catch (...) { |
| LOGERROR("Error in CacheLoader::load, unknown"); |
| err = GF_CACHE_LOADER_EXCEPTION; |
| } |
| if (err != GF_NOERR) { |
| return err; |
| } |
| } |
| |
| std::shared_ptr<Cacheable> oldValue; |
| // Found it somehow, so store it. |
| if (value != nullptr /*&& value != CacheableToken::invalid( )*/ && |
| cachingEnabled && |
| !(CacheableToken::isTombstone(value) && |
| (localValue == nullptr || CacheableToken::isInvalid(localValue)))) { |
| // try to create the entry and if that returns an existing value |
| // (e.g. from another thread or notification) then return that |
| LOGDEBUG( |
| "Region::get: creating entry with tracking update counter [%d] for " |
| "key " |
| "[%s]", |
| updateCount, Utils::nullSafeToString(keyPtr).c_str()); |
| if ((err = putLocal("Region::get", false, keyPtr, value, oldValue, |
| cachingEnabled, updateCount, 0, versionTag)) != |
| GF_NOERR) { |
| if (err == GF_CACHE_CONCURRENT_MODIFICATION_EXCEPTION) { |
| LOGDEBUG( |
| "Region::get: putLocal for key [%s] failed because the cache already contains \ |
| an entry with higher version.", |
| Utils::nullSafeToString(keyPtr).c_str()); |
| if (CacheableToken::isInvalid(value) || |
| CacheableToken::isTombstone(value)) { |
| value = nullptr; |
| } |
| // don't do anything and exit |
| return GF_NOERR; |
| } |
| |
| LOGDEBUG("Region::get: putLocal for key [%s] failed with error %d", |
| Utils::nullSafeToString(keyPtr).c_str(), err); |
| err = GF_NOERR; |
| if (oldValue != nullptr && !CacheableToken::isInvalid(oldValue)) { |
| LOGDEBUG("Region::get: returning updated value [%s] for key [%s]", |
| Utils::nullSafeToString(oldValue).c_str(), |
| Utils::nullSafeToString(keyPtr).c_str()); |
| value = oldValue; |
| } |
| } |
| } |
| |
| if (CacheableToken::isInvalid(value) || CacheableToken::isTombstone(value)) { |
| value = nullptr; |
| } |
| |
| // invokeCacheListenerForEntryEvent method has the check that if oldValue |
| // is a CacheableToken then it sets it to nullptr; also determines if it |
| // should be AFTER_UPDATE or AFTER_CREATE depending on oldValue, so don't |
| // check here. |
| if (isLoaderInvoked == false && err == GF_NOERR && value != nullptr) { |
| err = invokeCacheListenerForEntryEvent( |
| keyPtr, oldValue, value, aCallbackArgument, CacheEventFlags::NORMAL, |
| AFTER_UPDATE, isLocal); |
| } |
| |
| return err; |
| } |
| |
| GfErrType LocalRegion::getAllNoThrow( |
| const std::vector<std::shared_ptr<CacheableKey>>& keys, |
| const std::shared_ptr<HashMapOfCacheable>& values, |
| const std::shared_ptr<HashMapOfException>& exceptions, |
| const bool addToLocalCache, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard); |
| GfErrType err = GF_NOERR; |
| std::shared_ptr<Cacheable> value; |
| |
| TXState* txState = getTXState(); |
| if (txState != nullptr) { |
| if (isLocalOp()) { |
| return GF_NOTSUP; |
| } |
| err = getAllNoThrow_remote(&keys, values, exceptions, nullptr, false, |
| aCallbackArgument); |
| if (err == GF_NOERR) { |
| txState->setDirty(); |
| } |
| |
| return err; |
| } |
| // keys not in cache with their tracking numbers to be gotten using |
| // a remote call |
| std::vector<std::shared_ptr<CacheableKey>> serverKeys; |
| bool cachingEnabled = m_regionAttributes.getCachingEnabled(); |
| bool regionAccessed = false; |
| auto& cachePerfStats = m_cacheImpl->getCachePerfStats(); |
| |
| for (const auto& key : keys) { |
| std::shared_ptr<MapEntryImpl> me; |
| value = nullptr; |
| m_regionStats->incGets(); |
| cachePerfStats.incGets(); |
| if (values && cachingEnabled) { |
| if (m_entries->get(key, value, me) && value && |
| !CacheableToken::isInvalid(value)) { |
| m_regionStats->incHits(); |
| cachePerfStats.incHits(); |
| updateAccessAndModifiedTimeForEntry(me, false); |
| regionAccessed = true; |
| values->emplace(key, value); |
| } else { |
| value = nullptr; |
| } |
| } |
| if (value == nullptr) { |
| // Add to missed keys list. |
| serverKeys.push_back(key); |
| |
| m_regionStats->incMisses(); |
| cachePerfStats.incMisses(); |
| } |
| // TODO: No support for loaders in getAll for now. |
| } |
| if (regionAccessed) { |
| updateAccessAndModifiedTime(false); |
| } |
| if (serverKeys.size() > 0) { |
| err = getAllNoThrow_remote(&serverKeys, values, exceptions, nullptr, |
| addToLocalCache, aCallbackArgument); |
| } |
| m_regionStats->incGetAll(); |
| return err; |
| } |
| |
| // encapsulates actions that need to be taken for a put() operation |
| class PutActions { |
| public: |
| static const EntryEventType s_beforeEventType = BEFORE_UPDATE; |
| static const EntryEventType s_afterEventType = AFTER_UPDATE; |
| static const bool s_addIfAbsent = true; |
| static const bool s_failIfPresent = false; |
| TXState* m_txState; |
| |
| inline explicit PutActions(LocalRegion& region) : m_region(region) { |
| m_txState = TSSTXStateWrapper::s_geodeTSSTXState->getTXState(); |
| } |
| |
| inline static const char* name() { return "Region::put"; } |
| |
| inline static GfErrType checkArgs(const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| DataInput* delta = nullptr) { |
| if (key == nullptr || (value == nullptr && delta == nullptr)) { |
| return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION; |
| } |
| return GF_NOERR; |
| } |
| |
| inline void getCallbackOldValue(bool cachingEnabled, |
| const std::shared_ptr<CacheableKey>& key, |
| std::shared_ptr<MapEntryImpl>& entry, |
| std::shared_ptr<Cacheable>& oldValue) const { |
| if (cachingEnabled) { |
| m_region.m_entries->getEntry(key, entry, oldValue); |
| } |
| } |
| |
| inline static void logCacheWriterFailure( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& oldValue) { |
| bool isUpdate = (oldValue != nullptr); |
| LOGFINER("Cache writer vetoed %s for key %s", |
| (isUpdate ? "update" : "create"), |
| Utils::nullSafeToString(key).c_str()); |
| } |
| |
| inline GfErrType remoteUpdate( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| std::shared_ptr<VersionTag>& versionTag) { |
| // propagate the put to remote server, if any |
| return m_region.putNoThrow_remote(key, value, aCallbackArgument, |
| versionTag); |
| } |
| |
| inline GfErrType localUpdate(const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| std::shared_ptr<Cacheable>& oldValue, |
| bool cachingEnabled, |
| const CacheEventFlags /*eventFlags*/, |
| int updateCount, |
| std::shared_ptr<VersionTag> versionTag, |
| DataInput* delta = nullptr, |
| std::shared_ptr<EventId> eventId = nullptr, |
| bool /*afterRemote*/ = false) { |
| return m_region.putLocal(name(), false, key, value, oldValue, |
| cachingEnabled, updateCount, 0, versionTag, delta, |
| eventId); |
| } |
| |
| private: |
| LocalRegion& m_region; |
| }; |
| |
| // encapsulates actions that need to be taken for a put() operation. This |
| // implementation allows |
| // null values in Put during transaction. See defect #743 |
| class PutActionsTx : public PutActions { |
| public: |
| inline explicit PutActionsTx(LocalRegion& region) : PutActions(region) {} |
| inline static GfErrType checkArgs(const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& /*value*/, |
| DataInput* /*delta*/ = nullptr) { |
| if (key == nullptr) { |
| return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION; |
| } |
| return GF_NOERR; |
| } |
| }; |
| |
| // encapsulates actions that need to be taken for a create() operation |
| class CreateActions { |
| public: |
| static const EntryEventType s_beforeEventType = BEFORE_CREATE; |
| static const EntryEventType s_afterEventType = AFTER_CREATE; |
| static const bool s_addIfAbsent = true; |
| static const bool s_failIfPresent = true; |
| TXState* m_txState; |
| |
| inline explicit CreateActions(LocalRegion& region) : m_region(region) { |
| m_txState = TSSTXStateWrapper::s_geodeTSSTXState->getTXState(); |
| } |
| |
| inline static const char* name() { return "Region::create"; } |
| |
| inline static GfErrType checkArgs(const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& /*value*/, |
| DataInput* /*delta*/) { |
| if (key == nullptr) { |
| return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION; |
| } |
| return GF_NOERR; |
| } |
| |
| inline void getCallbackOldValue( |
| bool /*cachingEnabled*/, const std::shared_ptr<CacheableKey>& /*key*/, |
| std::shared_ptr<MapEntryImpl>& /*entry*/, |
| std::shared_ptr<Cacheable>& /*oldValue*/) const {} |
| |
| inline static void logCacheWriterFailure( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& /*oldValue*/) { |
| LOGFINER("Cache writer vetoed create for key %s", |
| Utils::nullSafeToString(key).c_str()); |
| } |
| |
| inline GfErrType remoteUpdate( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| std::shared_ptr<VersionTag>& versionTag) { |
| return m_region.createNoThrow_remote(key, value, aCallbackArgument, |
| versionTag); |
| } |
| |
| inline GfErrType localUpdate(const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| std::shared_ptr<Cacheable>& oldValue, |
| bool cachingEnabled, |
| const CacheEventFlags /*eventFlags*/, |
| int updateCount, |
| std::shared_ptr<VersionTag> versionTag, |
| DataInput* /*delta*/ = nullptr, |
| std::shared_ptr<EventId> /*eventId*/ = nullptr, |
| bool /*afterRemote*/ = false) { |
| return m_region.putLocal(name(), true, key, value, oldValue, cachingEnabled, |
| updateCount, 0, versionTag); |
| } |
| |
| private: |
| LocalRegion& m_region; |
| }; |
| |
| // encapsulates actions that need to be taken for a destroy() operation |
| class DestroyActions { |
| public: |
| static const EntryEventType s_beforeEventType = BEFORE_DESTROY; |
| static const EntryEventType s_afterEventType = AFTER_DESTROY; |
| static const bool s_addIfAbsent = true; |
| static const bool s_failIfPresent = false; |
| TXState* m_txState; |
| |
| inline explicit DestroyActions(LocalRegion& region) : m_region(region) { |
| m_txState = TSSTXStateWrapper::s_geodeTSSTXState->getTXState(); |
| } |
| |
| inline static const char* name() { return "Region::destroy"; } |
| |
| inline static GfErrType checkArgs(const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& /*value*/, |
| DataInput* /*delta*/) { |
| if (key == nullptr) { |
| return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION; |
| } |
| return GF_NOERR; |
| } |
| |
| inline void getCallbackOldValue(bool cachingEnabled, |
| const std::shared_ptr<CacheableKey>& key, |
| std::shared_ptr<MapEntryImpl>& entry, |
| std::shared_ptr<Cacheable>& oldValue) const { |
| if (cachingEnabled) { |
| m_region.m_entries->getEntry(key, entry, oldValue); |
| } |
| } |
| |
| inline static void logCacheWriterFailure( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& /*oldValue*/) { |
| LOGFINER("Cache writer vetoed destroy for key %s", |
| Utils::nullSafeToString(key).c_str()); |
| } |
| |
| inline GfErrType remoteUpdate( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& /*value*/, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| std::shared_ptr<VersionTag>& versionTag) { |
| return m_region.destroyNoThrow_remote(key, aCallbackArgument, versionTag); |
| } |
| |
| inline GfErrType localUpdate(const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& /*value*/, |
| std::shared_ptr<Cacheable>& oldValue, |
| bool cachingEnabled, |
| const CacheEventFlags eventFlags, |
| int updateCount, |
| std::shared_ptr<VersionTag> versionTag, |
| DataInput* /*delta*/ = nullptr, |
| std::shared_ptr<EventId> /*eventId*/ = nullptr, |
| bool afterRemote = false) { |
| auto& cachePerfStats = m_region.m_cacheImpl->getCachePerfStats(); |
| |
| if (cachingEnabled) { |
| std::shared_ptr<MapEntryImpl> entry; |
| // for notification invoke the listener even if the key does |
| // not exist locally |
| GfErrType err; |
| LOGDEBUG("Region::destroy: region [%s] destroying key [%s]", |
| m_region.getFullPath().c_str(), |
| Utils::nullSafeToString(key).c_str()); |
| if ((err = m_region.m_entries->remove(key, oldValue, entry, updateCount, |
| versionTag, afterRemote)) != |
| GF_NOERR) { |
| if (eventFlags.isNotification()) { |
| LOGDEBUG( |
| "Region::destroy: region [%s] destroy key [%s] for " |
| "notification having value [%s] failed with %d", |
| m_region.getFullPath().c_str(), |
| Utils::nullSafeToString(key).c_str(), |
| Utils::nullSafeToString(oldValue).c_str(), err); |
| err = GF_NOERR; |
| } |
| return err; |
| } |
| |
| if (oldValue != nullptr) { |
| LOGDEBUG( |
| "Region::destroy: region [%s] destroyed key [%s] having " |
| "value [%s]", |
| m_region.getFullPath().c_str(), |
| Utils::nullSafeToString(key).c_str(), |
| Utils::nullSafeToString(oldValue).c_str()); |
| // any cleanup required for the entry (e.g. removing from LRU list) |
| if (entry != nullptr) { |
| entry->cleanup(eventFlags); |
| } |
| // entry/region expiration |
| if (!eventFlags.isEvictOrExpire()) { |
| m_region.updateAccessAndModifiedTime(true); |
| } |
| // update the stats |
| m_region.m_regionStats->setEntries(m_region.m_entries->size()); |
| cachePerfStats.incEntries(-1); |
| } |
| } |
| // update the stats |
| m_region.m_regionStats->incDestroys(); |
| cachePerfStats.incDestroys(); |
| return GF_NOERR; |
| } |
| |
| private: |
| LocalRegion& m_region; |
| }; |
| |
| // encapsulates actions that need to be taken for a remove() operation |
| class RemoveActions { |
| public: |
| static const EntryEventType s_beforeEventType = BEFORE_DESTROY; |
| static const EntryEventType s_afterEventType = AFTER_DESTROY; |
| static const bool s_addIfAbsent = true; |
| static const bool s_failIfPresent = false; |
| TXState* m_txState; |
| bool allowNULLValue; |
| |
| inline explicit RemoveActions(LocalRegion& region) |
| : m_region(region), m_ServerResponse(GF_ENOENT) { |
| m_txState = TSSTXStateWrapper::s_geodeTSSTXState->getTXState(); |
| allowNULLValue = false; |
| } |
| |
| inline static const char* name() { return "Region::remove"; } |
| |
| inline static GfErrType checkArgs(const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& /*value*/, |
| DataInput* /*delta*/) { |
| if (key == nullptr) { |
| return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION; |
| } |
| return GF_NOERR; |
| } |
| |
| inline void getCallbackOldValue(bool cachingEnabled, |
| const std::shared_ptr<CacheableKey>& key, |
| std::shared_ptr<MapEntryImpl>& entry, |
| std::shared_ptr<Cacheable>& oldValue) const { |
| if (cachingEnabled) { |
| m_region.m_entries->getEntry(key, entry, oldValue); |
| } |
| } |
| |
| inline static void logCacheWriterFailure( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& /*oldValue*/) { |
| LOGFINER("Cache writer vetoed remove for key %s", |
| Utils::nullSafeToString(key).c_str()); |
| } |
| |
| bool serializedEqualTo(const std::shared_ptr<Cacheable>& lhs, |
| const std::shared_ptr<Cacheable>& rhs) { |
| auto&& cache = *(m_region.getCacheImpl()); |
| |
| if (const auto dataSerializablePrimitive = |
| std::dynamic_pointer_cast<DataSerializablePrimitive>(lhs)) { |
| return SerializableHelper<DataSerializablePrimitive>{}.equalTo( |
| cache, dataSerializablePrimitive, |
| std::dynamic_pointer_cast<DataSerializablePrimitive>(rhs)); |
| } else if (const auto dataSerializable = |
| std::dynamic_pointer_cast<DataSerializable>(lhs)) { |
| return SerializableHelper<DataSerializable>{}.equalTo( |
| cache, dataSerializable, |
| std::dynamic_pointer_cast<DataSerializable>(rhs)); |
| } else if (const auto pdxSerializable = |
| std::dynamic_pointer_cast<PdxSerializable>(lhs)) { |
| return SerializableHelper<PdxSerializable>{}.equalTo( |
| cache, pdxSerializable, |
| std::dynamic_pointer_cast<PdxSerializable>(rhs)); |
| } else if (const auto dataSerializableInternal = |
| std::dynamic_pointer_cast<DataSerializableInternal>(lhs)) { |
| return SerializableHelper<DataSerializableInternal>{}.equalTo( |
| cache, dataSerializableInternal, |
| std::dynamic_pointer_cast<DataSerializableInternal>(rhs)); |
| } else { |
| throw UnsupportedOperationException( |
| "Serialization type not implemented."); |
| } |
| } |
| |
| inline GfErrType remoteUpdate( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& newValue, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| std::shared_ptr<VersionTag>& versionTag) { |
| // propagate the remove to remote server, if any |
| std::shared_ptr<Cacheable> oldValue; |
| GfErrType err = GF_NOERR; |
| if (!allowNULLValue && m_region.getAttributes().getCachingEnabled()) { |
| m_region.getEntry(key, oldValue); |
| if (oldValue != nullptr && newValue != nullptr) { |
| if (!serializedEqualTo(oldValue, newValue)) { |
| err = GF_ENOENT; |
| return err; |
| } |
| } else if ((oldValue == nullptr || CacheableToken::isInvalid(oldValue))) { |
| m_ServerResponse = m_region.removeNoThrow_remote( |
| key, newValue, aCallbackArgument, versionTag); |
| |
| return m_ServerResponse; |
| } else if (oldValue != nullptr && newValue == nullptr) { |
| err = GF_ENOENT; |
| return err; |
| } |
| } |
| if (allowNULLValue) { |
| m_ServerResponse = |
| m_region.removeNoThrowEX_remote(key, aCallbackArgument, versionTag); |
| } else { |
| m_ServerResponse = m_region.removeNoThrow_remote( |
| key, newValue, aCallbackArgument, versionTag); |
| } |
| LOGDEBUG("serverResponse::%d", m_ServerResponse); |
| return m_ServerResponse; |
| } |
| |
| inline GfErrType localUpdate(const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| std::shared_ptr<Cacheable>& oldValue, |
| bool cachingEnabled, |
| const CacheEventFlags eventFlags, |
| int updateCount, |
| std::shared_ptr<VersionTag> versionTag, |
| DataInput* /*delta*/ = nullptr, |
| std::shared_ptr<EventId> /*eventId*/ = nullptr, |
| bool afterRemote = false) { |
| std::shared_ptr<Cacheable> valuePtr; |
| GfErrType err = GF_NOERR; |
| if (!allowNULLValue && cachingEnabled) { |
| m_region.getEntry(key, valuePtr); |
| if (valuePtr != nullptr && value != nullptr) { |
| if (!serializedEqualTo(valuePtr, value)) { |
| err = GF_ENOENT; |
| return err; |
| } |
| } else if (value == nullptr && (!CacheableToken::isInvalid(valuePtr) || |
| valuePtr == nullptr)) { |
| err = (m_ServerResponse == 0 && valuePtr == nullptr) ? GF_NOERR |
| : GF_ENOENT; |
| if (updateCount >= 0 && |
| !m_region.getAttributes().getConcurrencyChecksEnabled()) { |
| // This means server has deleted an entry & same entry has been |
| // destroyed locally |
| // So call removeTrackerForEntry to remove key that |
| // was added in the map during addTrackerForEntry call. |
| m_region.m_entries->removeTrackerForEntry(key); |
| } |
| return err; |
| } else if (valuePtr == nullptr && value != nullptr && |
| m_ServerResponse != 0) { |
| err = GF_ENOENT; |
| return err; |
| } |
| } |
| auto& cachePerfStats = m_region.m_cacheImpl->getCachePerfStats(); |
| |
| if (cachingEnabled) { |
| std::shared_ptr<MapEntryImpl> entry; |
| // for notification invoke the listener even if the key does |
| // not exist locally |
| GfErrType err; |
| LOGDEBUG("Region::remove: region [%s] removing key [%s]", |
| m_region.getFullPath().c_str(), |
| Utils::nullSafeToString(key).c_str()); |
| if ((err = m_region.m_entries->remove(key, oldValue, entry, updateCount, |
| versionTag, afterRemote)) != |
| GF_NOERR) { |
| if (eventFlags.isNotification()) { |
| LOGDEBUG( |
| "Region::remove: region [%s] remove key [%s] for " |
| "notification having value [%s] failed with %d", |
| m_region.getFullPath().c_str(), |
| Utils::nullSafeToString(key).c_str(), |
| Utils::nullSafeToString(oldValue).c_str(), err); |
| err = GF_NOERR; |
| } |
| return err; |
| } |
| if (oldValue != nullptr) { |
| LOGDEBUG( |
| "Region::remove: region [%s] removed key [%s] having " |
| "value [%s]", |
| m_region.getFullPath().c_str(), |
| Utils::nullSafeToString(key).c_str(), |
| Utils::nullSafeToString(oldValue).c_str()); |
| // any cleanup required for the entry (e.g. removing from LRU list) |
| if (entry != nullptr) { |
| entry->cleanup(eventFlags); |
| } |
| // entry/region expiration |
| if (!eventFlags.isEvictOrExpire()) { |
| m_region.updateAccessAndModifiedTime(true); |
| } |
| // update the stats |
| m_region.m_regionStats->setEntries(m_region.m_entries->size()); |
| cachePerfStats.incEntries(-1); |
| } |
| } |
| // update the stats |
| m_region.m_regionStats->incDestroys(); |
| cachePerfStats.incDestroys(); |
| return GF_NOERR; |
| } |
| |
| private: |
| LocalRegion& m_region; |
| GfErrType m_ServerResponse; |
| }; |
| |
| class RemoveActionsEx : public RemoveActions { |
| public: |
| inline explicit RemoveActionsEx(LocalRegion& region) : RemoveActions(region) { |
| allowNULLValue = true; |
| } |
| }; |
| |
| // encapsulates actions that need to be taken for a invalidate() operation |
| class InvalidateActions { |
| public: |
| static const EntryEventType s_beforeEventType = BEFORE_INVALIDATE; |
| static const EntryEventType s_afterEventType = AFTER_INVALIDATE; |
| static const bool s_addIfAbsent = true; |
| static const bool s_failIfPresent = false; |
| TXState* m_txState; |
| |
| inline explicit InvalidateActions(LocalRegion& region) : m_region(region) { |
| m_txState = TSSTXStateWrapper::s_geodeTSSTXState->getTXState(); |
| } |
| |
| inline static const char* name() { return "Region::invalidate"; } |
| |
| inline static GfErrType checkArgs(const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& /*value*/, |
| DataInput* /*delta*/ = nullptr) { |
| if (key == nullptr) { |
| return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION; |
| } |
| return GF_NOERR; |
| } |
| |
| inline void getCallbackOldValue(bool cachingEnabled, |
| const std::shared_ptr<CacheableKey>& key, |
| std::shared_ptr<MapEntryImpl>& entry, |
| std::shared_ptr<Cacheable>& oldValue) const { |
| if (cachingEnabled) { |
| m_region.m_entries->getEntry(key, entry, oldValue); |
| } |
| } |
| |
| inline static void logCacheWriterFailure( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& oldValue) { |
| bool isUpdate = (oldValue != nullptr); |
| LOGFINER("Cache writer vetoed %s for key %s", |
| (isUpdate ? "update" : "invalidate"), |
| Utils::nullSafeToString(key).c_str()); |
| } |
| |
| inline GfErrType remoteUpdate( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& /*value*/, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| std::shared_ptr<VersionTag>& versionTag) { |
| // propagate the invalidate to remote server, if any |
| return m_region.invalidateNoThrow_remote(key, aCallbackArgument, |
| versionTag); |
| } |
| |
| inline GfErrType localUpdate(const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| std::shared_ptr<Cacheable>& /*oldValue*/, |
| bool /*cachingEnabled*/, |
| const CacheEventFlags eventFlags, |
| int /*updateCount*/, |
| std::shared_ptr<VersionTag> versionTag, |
| DataInput* /*delta*/ = nullptr, |
| std::shared_ptr<EventId> /*eventId*/ = nullptr, |
| bool /*afterRemote*/ = false) { |
| return m_region.invalidateLocal(name(), key, value, eventFlags, versionTag); |
| } |
| |
| private: |
| LocalRegion& m_region; |
| }; |
| |
| template <typename TAction> |
| GfErrType LocalRegion::updateNoThrow( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| std::shared_ptr<Cacheable>& oldValue, int updateCount, |
| const CacheEventFlags eventFlags, std::shared_ptr<VersionTag> versionTag, |
| DataInput* delta, std::shared_ptr<EventId> eventId) { |
| GfErrType err = GF_NOERR; |
| if ((err = TAction::checkArgs(key, value, delta)) != GF_NOERR) { |
| return err; |
| } |
| |
| CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard); |
| |
| TAction action(*this); |
| TXState* txState = action.m_txState; |
| if (txState != nullptr) { |
| if (isLocalOp(&eventFlags)) { |
| return GF_NOTSUP; |
| } |
| /* adongre - Coverity II |
| * CID 29194 (6): Parse warning (PW.PARAMETER_HIDDEN) |
| */ |
| // std::shared_ptr<VersionTag> versionTag; |
| err = action.remoteUpdate(key, value, aCallbackArgument, versionTag); |
| if (err == GF_NOERR) { |
| txState->setDirty(); |
| } |
| |
| return err; |
| } |
| |
| bool cachingEnabled = m_regionAttributes.getCachingEnabled(); |
| std::shared_ptr<MapEntryImpl> entry; |
| |
| // do not invoke the writer in case of notification/eviction |
| // or expiration |
| if (m_writer != nullptr && eventFlags.invokeCacheWriter()) { |
| action.getCallbackOldValue(cachingEnabled, key, entry, oldValue); |
| // invokeCacheWriterForEntryEvent method has the check that if oldValue |
| // is a CacheableToken then it sets it to nullptr; also determines if it |
| // should be BEFORE_UPDATE or BEFORE_CREATE depending on oldValue |
| if (!invokeCacheWriterForEntryEvent(key, oldValue, value, aCallbackArgument, |
| eventFlags, |
| TAction::s_beforeEventType)) { |
| TAction::logCacheWriterFailure(key, oldValue); |
| return GF_CACHEWRITER_ERROR; |
| } |
| } |
| bool remoteOpDone = false; |
| // try the remote update; but if this fails (e.g. due to security |
| // exception) do not do the local update |
| // uses the technique of adding a tracking to the entry before proceeding |
| // for put; if the update counter changes when the remote update completes |
| // then it means that the local entry was overwritten in the meantime |
| // by a notification or another thread, so we do not do the local update |
| if (!eventFlags.isLocal() && !eventFlags.isNotification()) { |
| if (cachingEnabled && updateCount < 0 && |
| !m_regionAttributes.getConcurrencyChecksEnabled()) { |
| // add a tracking for the entry |
| if ((updateCount = m_entries->addTrackerForEntry( |
| key, oldValue, TAction::s_addIfAbsent, TAction::s_failIfPresent, |
| true)) < 0) { |
| if (oldValue != nullptr) { |
| // fail for "create" when entry exists |
| return GF_CACHE_ENTRY_EXISTS; |
| } |
| } |
| } |
| // propagate the update to remote server, if any |
| err = action.remoteUpdate(key, value, aCallbackArgument, versionTag); |
| if (err != GF_NOERR) { |
| if (updateCount >= 0 && |
| !m_regionAttributes.getConcurrencyChecksEnabled()) { |
| m_entries->removeTrackerForEntry(key); |
| } |
| return err; |
| } |
| remoteOpDone = true; |
| } |
| if (!eventFlags.isNotification() || getProcessedMarker()) { |
| if ((err = action.localUpdate(key, value, oldValue, cachingEnabled, |
| eventFlags, updateCount, versionTag, delta, |
| eventId, remoteOpDone)) == |
| GF_CACHE_ENTRY_UPDATED) { |
| LOGFINEST( |
| "%s: did not change local value for key [%s] since it has " |
| "been updated by another thread while operation was in progress", |
| TAction::name(), Utils::nullSafeToString(key).c_str()); |
| err = GF_NOERR; |
| } else if (err == GF_CACHE_CONCURRENT_MODIFICATION_EXCEPTION) { |
| LOGDEBUG( |
| "Region::localUpdate: updateNoThrow<%s> for key [%s] failed because the cache already contains \ |
| an entry with higher version. The cache listener will not be invoked.", |
| TAction::name(), Utils::nullSafeToString(key).c_str()); |
| // Cache listener won't be called in this case |
| return GF_NOERR; |
| } else if (err == GF_INVALID_DELTA) { |
| LOGDEBUG( |
| "Region::localUpdate: updateNoThrow<%s> for key [%s] failed " |
| "because " |
| "of invalid delta.", |
| TAction::name(), Utils::nullSafeToString(key).c_str()); |
| m_cacheImpl->getCachePerfStats().incFailureOnDeltaReceived(); |
| // Get full object from server. |
| std::shared_ptr<Cacheable>& newValue1 = |
| const_cast<std::shared_ptr<Cacheable>&>(value); |
| std::shared_ptr<VersionTag> versionTag1; |
| err = getNoThrow_FullObject(eventId, newValue1, versionTag1); |
| if (err == GF_NOERR && newValue1 != nullptr) { |
| err = m_entries->put(key, newValue1, entry, oldValue, updateCount, 0, |
| versionTag1 != nullptr ? versionTag1 : versionTag); |
| if (err == GF_CACHE_CONCURRENT_MODIFICATION_EXCEPTION) { |
| LOGDEBUG( |
| "Region::localUpdate: updateNoThrow<%s> for key [%s] failed because the cache already contains \ |
| an entry with higher version. The cache listener will not be invoked.", |
| TAction::name(), Utils::nullSafeToString(key).c_str()); |
| // Cache listener won't be called in this case |
| return GF_NOERR; |
| } else if (err != GF_NOERR) { |
| return err; |
| } |
| } |
| } else if (err != GF_NOERR) { |
| return err; |
| } |
| } else { // if (getProcessedMarker()) |
| action.getCallbackOldValue(cachingEnabled, key, entry, oldValue); |
| if (updateCount >= 0 && !m_regionAttributes.getConcurrencyChecksEnabled()) { |
| m_entries->removeTrackerForEntry(key); |
| } |
| } |
| // invokeCacheListenerForEntryEvent method has the check that if oldValue |
| // is a CacheableToken then it sets it to nullptr; also determines if it |
| // should be AFTER_UPDATE or AFTER_CREATE depending on oldValue |
| err = |
| invokeCacheListenerForEntryEvent(key, oldValue, value, aCallbackArgument, |
| eventFlags, TAction::s_afterEventType); |
| return err; |
| } |
| |
| template <typename TAction> |
| GfErrType LocalRegion::updateNoThrowTX( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| std::shared_ptr<Cacheable>& oldValue, int updateCount, |
| const CacheEventFlags eventFlags, std::shared_ptr<VersionTag> versionTag, |
| DataInput* delta, std::shared_ptr<EventId> eventId) { |
| GfErrType err = GF_NOERR; |
| if ((err = TAction::checkArgs(key, value, delta)) != GF_NOERR) { |
| return err; |
| } |
| |
| CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard); |
| TAction action(*this); |
| |
| bool cachingEnabled = m_regionAttributes.getCachingEnabled(); |
| std::shared_ptr<MapEntryImpl> entry; |
| |
| if (!eventFlags.isNotification() || getProcessedMarker()) { |
| if ((err = action.localUpdate(key, value, oldValue, cachingEnabled, |
| eventFlags, updateCount, versionTag, delta, |
| eventId)) == GF_CACHE_ENTRY_UPDATED) { |
| LOGFINEST( |
| "%s: did not change local value for key [%s] since it has " |
| "been updated by another thread while operation was in progress", |
| TAction::name(), Utils::nullSafeToString(key).c_str()); |
| err = GF_NOERR; |
| } else if (err == GF_CACHE_ENTRY_NOT_FOUND) { |
| // Entry not found. Possibly because the entry was added and removed in |
| // the |
| // same transaction. Ignoring this error #739 |
| LOGFINE( |
| "%s: No entry found. Possibly because the entry was added and " |
| "removed in the same transaction. " |
| "Ignoring this error. ", |
| TAction::name(), Utils::nullSafeToString(key).c_str()); |
| err = GF_NOERR; |
| } else if (err != GF_NOERR) { |
| return err; |
| } |
| } else { // if (getProcessedMarker()) |
| action.getCallbackOldValue(cachingEnabled, key, entry, oldValue); |
| if (updateCount >= 0 && !m_regionAttributes.getConcurrencyChecksEnabled()) { |
| m_entries->removeTrackerForEntry(key); |
| } |
| } |
| // invokeCacheListenerForEntryEvent method has the check that if oldValue |
| // is a CacheableToken then it sets it to nullptr; also determines if it |
| // should be AFTER_UPDATE or AFTER_CREATE depending on oldValue |
| err = |
| invokeCacheListenerForEntryEvent(key, oldValue, value, aCallbackArgument, |
| eventFlags, TAction::s_afterEventType); |
| return err; |
| } |
| |
| GfErrType LocalRegion::putNoThrow( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| std::shared_ptr<Cacheable>& oldValue, int updateCount, |
| const CacheEventFlags eventFlags, std::shared_ptr<VersionTag> versionTag, |
| DataInput* delta, std::shared_ptr<EventId> eventId) { |
| return updateNoThrow<PutActions>(key, value, aCallbackArgument, oldValue, |
| updateCount, eventFlags, versionTag, delta, |
| eventId); |
| } |
| |
| GfErrType LocalRegion::putNoThrowTX( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| std::shared_ptr<Cacheable>& oldValue, int updateCount, |
| const CacheEventFlags eventFlags, std::shared_ptr<VersionTag> versionTag, |
| DataInput* delta, std::shared_ptr<EventId> eventId) { |
| return updateNoThrowTX<PutActionsTx>(key, value, aCallbackArgument, oldValue, |
| updateCount, eventFlags, versionTag, |
| delta, eventId); |
| } |
| |
| GfErrType LocalRegion::createNoThrow( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| const std::shared_ptr<Serializable>& aCallbackArgument, int updateCount, |
| const CacheEventFlags eventFlags, std::shared_ptr<VersionTag> versionTag) { |
| std::shared_ptr<Cacheable> oldValue; |
| return updateNoThrow<CreateActions>(key, value, aCallbackArgument, oldValue, |
| updateCount, eventFlags, versionTag); |
| } |
| |
| GfErrType LocalRegion::destroyNoThrow( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Serializable>& aCallbackArgument, int updateCount, |
| const CacheEventFlags eventFlags, std::shared_ptr<VersionTag> versionTag) { |
| std::shared_ptr<Cacheable> oldValue; |
| return updateNoThrow<DestroyActions>(key, nullptr, aCallbackArgument, |
| oldValue, updateCount, eventFlags, |
| versionTag); |
| } |
| |
| GfErrType LocalRegion::destroyNoThrowTX( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Serializable>& aCallbackArgument, int updateCount, |
| const CacheEventFlags eventFlags, std::shared_ptr<VersionTag> versionTag) { |
| std::shared_ptr<Cacheable> oldValue; |
| return updateNoThrowTX<DestroyActions>(key, nullptr, aCallbackArgument, |
| oldValue, updateCount, eventFlags, |
| versionTag); |
| } |
| |
| GfErrType LocalRegion::removeNoThrow( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| const std::shared_ptr<Serializable>& aCallbackArgument, int updateCount, |
| const CacheEventFlags eventFlags, std::shared_ptr<VersionTag> versionTag) { |
| std::shared_ptr<Cacheable> oldValue; |
| return updateNoThrow<RemoveActions>(key, value, aCallbackArgument, oldValue, |
| updateCount, eventFlags, versionTag); |
| } |
| |
| GfErrType LocalRegion::removeNoThrowEx( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Serializable>& aCallbackArgument, int updateCount, |
| const CacheEventFlags eventFlags, std::shared_ptr<VersionTag> versionTag) { |
| std::shared_ptr<Cacheable> oldValue; |
| return updateNoThrow<RemoveActionsEx>(key, nullptr, aCallbackArgument, |
| oldValue, updateCount, eventFlags, |
| versionTag); |
| } |
| |
| GfErrType LocalRegion::invalidateNoThrow( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Serializable>& aCallbackArgument, int updateCount, |
| const CacheEventFlags eventFlags, std::shared_ptr<VersionTag> versionTag) { |
| std::shared_ptr<Cacheable> oldValue; |
| return updateNoThrow<InvalidateActions>(key, nullptr, aCallbackArgument, |
| oldValue, updateCount, eventFlags, |
| versionTag); |
| } |
| |
| GfErrType LocalRegion::invalidateNoThrowTX( |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Serializable>& aCallbackArgument, int updateCount, |
| const CacheEventFlags eventFlags, std::shared_ptr<VersionTag> versionTag) { |
| std::shared_ptr<Cacheable> oldValue; |
| return updateNoThrowTX<InvalidateActions>(key, nullptr, aCallbackArgument, |
| oldValue, updateCount, eventFlags, |
| versionTag); |
| } |
| |
| GfErrType LocalRegion::putAllNoThrow( |
| const HashMapOfCacheable& map, std::chrono::milliseconds timeout, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard); |
| GfErrType err = GF_NOERR; |
| // std::shared_ptr<VersionTag> versionTag; |
| std::shared_ptr<VersionedCacheableObjectPartList> |
| versionedObjPartListPtr; //= new VersionedCacheableObjectPartList(); |
| TXState* txState = getTXState(); |
| if (txState != nullptr) { |
| if (isLocalOp()) { |
| return GF_NOTSUP; |
| } |
| |
| err = putAllNoThrow_remote(map, /*versionTag*/ versionedObjPartListPtr, |
| timeout, aCallbackArgument); |
| if (err == GF_NOERR) { |
| txState->setDirty(); |
| } |
| |
| return err; |
| } |
| |
| bool cachingEnabled = m_regionAttributes.getCachingEnabled(); |
| MapOfOldValue oldValueMap; |
| |
| // remove tracking for the entries befor exiting the function |
| struct RemoveTracking { |
| private: |
| const MapOfOldValue& m_oldValueMap; |
| LocalRegion& m_region; |
| |
| public: |
| RemoveTracking(const MapOfOldValue& oldValueMap, LocalRegion& region) |
| : m_oldValueMap(oldValueMap), m_region(region) {} |
| ~RemoveTracking() { |
| if (!m_region.getAttributes().getConcurrencyChecksEnabled()) { |
| // need to remove the tracking added to the entries at the end |
| for (MapOfOldValue::const_iterator iter = m_oldValueMap.begin(); |
| iter != m_oldValueMap.end(); ++iter) { |
| if (iter->second.second >= 0) { |
| m_region.m_entries->removeTrackerForEntry(iter->first); |
| } |
| } |
| } |
| } |
| } _removeTracking(oldValueMap, *this); |
| |
| if (cachingEnabled || m_writer != nullptr) { |
| std::shared_ptr<Cacheable> oldValue; |
| for (const auto& iter : map) { |
| const auto& key = iter.first; |
| if (cachingEnabled && !m_regionAttributes.getConcurrencyChecksEnabled()) { |
| int updateCount = |
| m_entries->addTrackerForEntry(key, oldValue, true, false, true); |
| oldValueMap.insert( |
| std::make_pair(key, std::make_pair(oldValue, updateCount))); |
| } |
| if (m_writer != nullptr) { |
| // invokeCacheWriterForEntryEvent method has the check that if |
| // oldValue is a CacheableToken then it sets it to nullptr; also |
| // determines if it should be BEFORE_UPDATE or BEFORE_CREATE depending |
| // on oldValue |
| if (!invokeCacheWriterForEntryEvent( |
| key, oldValue, iter.second, aCallbackArgument, |
| CacheEventFlags::LOCAL, BEFORE_UPDATE)) { |
| PutActions::logCacheWriterFailure(key, oldValue); |
| return GF_CACHEWRITER_ERROR; |
| } |
| } |
| } |
| } |
| // try remote putAll, if any |
| if ((err = putAllNoThrow_remote(map, versionedObjPartListPtr, timeout, |
| aCallbackArgument)) != GF_NOERR) { |
| return err; |
| } |
| // next the local puts |
| GfErrType localErr; |
| std::shared_ptr<VersionTag> versionTag; |
| |
| if (cachingEnabled) { |
| if (m_isPRSingleHopEnabled) { /*New PRSingleHop Case:: PR Singlehop |
| condition*/ |
| for (size_t keyIndex = 0; |
| keyIndex < versionedObjPartListPtr->getSucceededKeys()->size(); |
| keyIndex++) { |
| const auto valPtr = |
| versionedObjPartListPtr->getSucceededKeys()->at(keyIndex); |
| const auto& mapIter = map.find(valPtr); |
| std::shared_ptr<CacheableKey> key = nullptr; |
| std::shared_ptr<Cacheable> value = nullptr; |
| |
| if (mapIter != map.end()) { |
| key = mapIter->first; |
| value = mapIter->second; |
| } else { |
| // ThrowERROR |
| LOGERROR( |
| "ERROR :: LocalRegion::putAllNoThrow() Key must be found in " |
| "the " |
| "usermap"); |
| } |
| |
| if (versionedObjPartListPtr) { |
| LOGDEBUG("versionedObjPartListPtr->getVersionedTagptr().size() = %d ", |
| versionedObjPartListPtr->getVersionedTagptr().size()); |
| if (versionedObjPartListPtr->getVersionedTagptr().size() > 0) { |
| versionTag = |
| versionedObjPartListPtr->getVersionedTagptr()[keyIndex]; |
| } |
| } |
| std::pair<std::shared_ptr<Cacheable>, int>& p = oldValueMap[key]; |
| if ((localErr = LocalRegion::putNoThrow( |
| key, value, aCallbackArgument, p.first, p.second, |
| CacheEventFlags::LOCAL | CacheEventFlags::NOCACHEWRITER, |
| versionTag)) == GF_CACHE_ENTRY_UPDATED) { |
| LOGFINEST( |
| "Region::putAll: did not change local value for key [%s] " |
| "since it has been updated by another thread while operation " |
| "was " |
| "in progress", |
| Utils::nullSafeToString(key).c_str()); |
| } else if (localErr == GF_CACHE_LISTENER_EXCEPTION) { |
| LOGFINER("Region::putAll: invoke listener error [%d] for key [%s]", |
| localErr, Utils::nullSafeToString(key).c_str()); |
| err = localErr; |
| } else if (localErr != GF_NOERR) { |
| return localErr; |
| } |
| } // End of for loop |
| } else { /*Non SingleHop case :: PUTALL has taken multiple hops*/ |
| LOGDEBUG( |
| "NILKANTH LocalRegion::putAllNoThrow m_isPRSingleHopEnabled = %d " |
| "expected false", |
| m_isPRSingleHopEnabled); |
| int index = 0; |
| for (const auto& iter : map) { |
| const auto& key = iter.first; |
| const auto& value = iter.second; |
| auto& p = oldValueMap[key]; |
| |
| if (versionedObjPartListPtr) { |
| LOGDEBUG("versionedObjPartListPtr->getVersionedTagptr().size() = %d ", |
| versionedObjPartListPtr->getVersionedTagptr().size()); |
| if (versionedObjPartListPtr->getVersionedTagptr().size() > 0) { |
| versionTag = versionedObjPartListPtr->getVersionedTagptr()[index++]; |
| } |
| } |
| if ((localErr = LocalRegion::putNoThrow( |
| key, value, aCallbackArgument, p.first, p.second, |
| CacheEventFlags::LOCAL | CacheEventFlags::NOCACHEWRITER, |
| versionTag)) == GF_CACHE_ENTRY_UPDATED) { |
| LOGFINEST( |
| "Region::putAll: did not change local value for key [%s] " |
| "since it has been updated by another thread while operation " |
| "was " |
| "in progress", |
| Utils::nullSafeToString(key).c_str()); |
| } else if (localErr == GF_CACHE_LISTENER_EXCEPTION) { |
| LOGFINER("Region::putAll: invoke listener error [%d] for key [%s]", |
| localErr, Utils::nullSafeToString(key).c_str()); |
| err = localErr; |
| } else if (localErr != GF_NOERR) { |
| return localErr; |
| } |
| } |
| } |
| } |
| |
| m_regionStats->incPutAll(); |
| return err; |
| } |
| |
| GfErrType LocalRegion::removeAllNoThrow( |
| const std::vector<std::shared_ptr<CacheableKey>>& keys, |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| // 1. check destroy pending |
| CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard); |
| GfErrType err = GF_NOERR; |
| std::shared_ptr<VersionedCacheableObjectPartList> versionedObjPartListPtr; |
| |
| // 2.check transaction state and do remote op |
| TXState* txState = getTXState(); |
| if (txState != nullptr) { |
| if (isLocalOp()) return GF_NOTSUP; |
| err = removeAllNoThrow_remote(keys, versionedObjPartListPtr, |
| aCallbackArgument); |
| if (err == GF_NOERR) txState->setDirty(); |
| return err; |
| } |
| |
| // 3.add tracking |
| bool cachingEnabled = m_regionAttributes.getCachingEnabled(); |
| |
| // 4. do remote removeAll |
| err = |
| removeAllNoThrow_remote(keys, versionedObjPartListPtr, aCallbackArgument); |
| if (err != GF_NOERR) { |
| return err; |
| } |
| |
| // 5. update local cache |
| GfErrType localErr; |
| std::shared_ptr<VersionTag> versionTag; |
| if (cachingEnabled) { |
| std::vector<std::shared_ptr<CacheableKey>>* keysPtr; |
| if (m_isPRSingleHopEnabled) { |
| keysPtr = versionedObjPartListPtr->getSucceededKeys().get(); |
| } else { |
| keysPtr = const_cast<std::vector<std::shared_ptr<CacheableKey>>*>(&keys); |
| } |
| |
| for (size_t keyIndex = 0; keyIndex < keysPtr->size(); keyIndex++) { |
| auto key = keysPtr->at(keyIndex); |
| if (versionedObjPartListPtr) { |
| LOGDEBUG("versionedObjPartListPtr->getVersionedTagptr().size() = %d ", |
| versionedObjPartListPtr->getVersionedTagptr().size()); |
| if (versionedObjPartListPtr->getVersionedTagptr().size() > 0) { |
| versionTag = versionedObjPartListPtr->getVersionedTagptr()[keyIndex]; |
| } |
| if (versionTag == nullptr) { |
| LOGDEBUG( |
| "RemoveAll hits EntryNotFoundException at server side for key " |
| "[%s], not to destroy it from local cache.", |
| Utils::nullSafeToString(key).c_str()); |
| continue; |
| } |
| } |
| |
| if ((localErr = LocalRegion::destroyNoThrow( |
| key, aCallbackArgument, -1, |
| CacheEventFlags::LOCAL | CacheEventFlags::NOCACHEWRITER, |
| versionTag)) == GF_CACHE_ENTRY_UPDATED) { |
| LOGFINEST( |
| "Region::removeAll: did not remove local value for key [%s] " |
| "since it has been updated by another thread while operation was " |
| "in progress", |
| Utils::nullSafeToString(key).c_str()); |
| } else if (localErr == GF_CACHE_LISTENER_EXCEPTION) { |
| LOGFINER("Region::removeAll: invoke listener error [%d] for key [%s]", |
| localErr, Utils::nullSafeToString(key).c_str()); |
| err = localErr; |
| } else if (localErr == GF_CACHE_ENTRY_NOT_FOUND) { |
| LOGFINER("Region::removeAll: error [%d] for key [%s]", localErr, |
| Utils::nullSafeToString(key).c_str()); |
| } else if (localErr != GF_NOERR) { |
| return localErr; |
| } |
| } // End of for loop |
| } |
| |
| // 6.update stats |
| m_regionStats->incRemoveAll(); |
| return err; |
| } |
| |
| void LocalRegion::clear( |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| /*update the stats */ |
| int64_t sampleStartNanos = startStatOpTime(); |
| localClear(aCallbackArgument); |
| updateStatOpTime(m_regionStats->getStat(), m_regionStats->getClearsId(), |
| sampleStartNanos); |
| } |
| void LocalRegion::localClear( |
| const std::shared_ptr<Serializable>& aCallbackArgument) { |
| GfErrType err = localClearNoThrow(aCallbackArgument, CacheEventFlags::LOCAL); |
| if (err != GF_NOERR) GfErrTypeToException("LocalRegion::localClear", err); |
| } |
| GfErrType LocalRegion::localClearNoThrow( |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| const CacheEventFlags eventFlags) { |
| bool cachingEnabled = m_regionAttributes.getCachingEnabled(); |
| /*Update the stats for clear*/ |
| m_regionStats->incClears(); |
| GfErrType err = GF_NOERR; |
| TryReadGuard guard(m_rwLock, m_destroyPending); |
| if (m_released || m_destroyPending) return err; |
| if (!invokeCacheWriterForRegionEvent(aCallbackArgument, eventFlags, |
| BEFORE_REGION_CLEAR)) { |
| LOGFINE("Cache writer prevented region clear"); |
| return GF_CACHEWRITER_ERROR; |
| } |
| if (cachingEnabled == true) m_entries->clear(); |
| if (!eventFlags.isNormal()) { |
| err = invokeCacheListenerForRegionEvent(aCallbackArgument, eventFlags, |
| AFTER_REGION_CLEAR); |
| } |
| return err; |
| } |
| |
| GfErrType LocalRegion::invalidateLocal( |
| const std::string& name, const std::shared_ptr<CacheableKey>& keyPtr, |
| const std::shared_ptr<Cacheable>& value, const CacheEventFlags eventFlags, |
| std::shared_ptr<VersionTag> versionTag) { |
| if (keyPtr == nullptr) { |
| return GF_CACHE_ILLEGAL_ARGUMENT_EXCEPTION; |
| } |
| CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard); |
| |
| GfErrType err = GF_NOERR; |
| |
| bool cachingEnabled = m_regionAttributes.getCachingEnabled(); |
| std::shared_ptr<Cacheable> oldValue; |
| std::shared_ptr<MapEntryImpl> me; |
| |
| if (!eventFlags.isNotification() || getProcessedMarker()) { |
| if (cachingEnabled) { |
| LOGDEBUG("%s: region [%s] invalidating key [%s], value [%s]", |
| name.c_str(), getFullPath().c_str(), |
| Utils::nullSafeToString(keyPtr).c_str(), |
| Utils::nullSafeToString(value).c_str()); |
| /* adongre - Coverity II |
| * CID 29193: Parse warning (PW.PARAMETER_HIDDEN) |
| */ |
| // std::shared_ptr<VersionTag> versionTag; |
| if ((err = m_entries->invalidate(keyPtr, me, oldValue, versionTag)) != |
| GF_NOERR) { |
| if (eventFlags.isNotification()) { |
| LOGDEBUG( |
| "Region::invalidate: region [%s] invalidate key [%s] " |
| "failed with error %d", |
| getFullPath().c_str(), Utils::nullSafeToString(keyPtr).c_str(), |
| err); |
| } |
| if (err == GF_CACHE_CONCURRENT_MODIFICATION_EXCEPTION) { |
| LOGDEBUG( |
| "Region::invalidateLocal: invalidate for key [%s] failed because the cache already contains \ |
| an entry with higher version. The cache listener will not be invoked.", |
| Utils::nullSafeToString(keyPtr).c_str()); |
| // Cache listener won't be called in this case |
| return GF_NOERR; |
| } |
| // for notification invoke the listener even if the key does |
| // not exist locally |
| if (!eventFlags.isNotification() || err != GF_CACHE_ENTRY_NOT_FOUND) { |
| return err; |
| } else { |
| err = GF_NOERR; |
| } |
| } else { |
| LOGDEBUG("Region::invalidate: region [%s] invalidated key [%s]", |
| getFullPath().c_str(), |
| Utils::nullSafeToString(keyPtr).c_str()); |
| } |
| // entry/region expiration |
| if (!eventFlags.isEvictOrExpire()) { |
| updateAccessAndModifiedTime(true); |
| } |
| } |
| } else { // if (getProcessedMarker()) |
| if (cachingEnabled) { |
| m_entries->getEntry(keyPtr, me, oldValue); |
| } |
| } |
| return err; |
| } |
| |
| GfErrType LocalRegion::invalidateRegionNoThrow( |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| const CacheEventFlags eventFlags) { |
| CHECK_DESTROY_PENDING_NOTHROW(TryReadGuard); |
| GfErrType err = GF_NOERR; |
| |
| if (m_regionAttributes.getCachingEnabled()) { |
| std::vector<std::shared_ptr<CacheableKey>> v = keys_internal(); |
| const auto size = v.size(); |
| std::shared_ptr<MapEntryImpl> me; |
| for (size_t i = 0; i < size; i++) { |
| { |
| std::shared_ptr<Cacheable> oldValue; |
| // invalidate all the entries with a nullptr versionTag |
| std::shared_ptr<VersionTag> versionTag; |
| m_entries->invalidate(v.at(i), me, oldValue, versionTag); |
| if (!eventFlags.isEvictOrExpire()) { |
| updateAccessAndModifiedTimeForEntry(me, true); |
| } |
| } |
| } |
| if (!eventFlags.isEvictOrExpire()) { |
| updateAccessAndModifiedTime(true); |
| } |
| } |
| |
| // try remote region invalidate, if any |
| if (!eventFlags.isLocal()) { |
| err = invalidateRegionNoThrow_remote(aCallbackArgument); |
| if (err != GF_NOERR) return err; |
| } |
| |
| if (m_subRegions.current_size() > 0) { |
| ACE_Guard<ACE_Recursive_Thread_Mutex> subguard(m_subRegions.mutex()); |
| for (MapOfRegionWithLock::iterator p = m_subRegions.begin(); |
| p != m_subRegions.end(); ++p) { |
| RegionInternal* subRegion = |
| dynamic_cast<RegionInternal*>((*p).int_id_.get()); |
| if (subRegion != nullptr) { |
| err = subRegion->invalidateRegionNoThrow(aCallbackArgument, eventFlags); |
| if (err != GF_NOERR) { |
| return err; |
| } |
| } |
| } |
| } |
| err = invokeCacheListenerForRegionEvent(aCallbackArgument, eventFlags, |
| AFTER_REGION_INVALIDATE); |
| |
| return err; |
| } |
| |
| GfErrType LocalRegion::destroyRegionNoThrow( |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| bool removeFromParent, const CacheEventFlags eventFlags) { |
| // Get global locks to synchronize with failover thread. |
| // TODO: This should go into RegionGlobalLocks |
| // The distMngrsLock is required before RegionGlobalLocks since failover |
| // thread acquires distMngrsLock and then tries to acquire endpoints lock |
| // which is already taken by RegionGlobalLocks here. |
| DistManagersLockGuard _guard(m_cacheImpl->tcrConnectionManager()); |
| RegionGlobalLocks acquireLocks(this); |
| |
| // Fix for BUG:849, i.e Remove subscription on region before destroying the |
| // region |
| if (eventFlags == CacheEventFlags::LOCAL) { |
| if (unregisterKeysBeforeDestroyRegion() != GF_NOERR) { |
| LOGDEBUG( |
| "DEBUG :: LocalRegion::destroyRegionNoThrow UnregisteredKeys " |
| "Failed"); |
| } |
| } |
| |
| TryWriteGuard guard(m_rwLock, m_destroyPending); |
| if (m_destroyPending) { |
| if (eventFlags.isCacheClose()) { |
| return GF_NOERR; |
| } else { |
| return GF_CACHE_REGION_DESTROYED_EXCEPTION; |
| } |
| } |
| |
| m_destroyPending = true; |
| LOGDEBUG("LocalRegion::destroyRegionNoThrow( ): set flag destroy-pending."); |
| |
| GfErrType err = GF_NOERR; |
| |
| // do not invoke the writer for expiry or notification |
| if (!eventFlags.isNotification() && !eventFlags.isEvictOrExpire()) { |
| if (!invokeCacheWriterForRegionEvent(aCallbackArgument, eventFlags, |
| BEFORE_REGION_DESTROY)) { |
| // do not let CacheWriter veto when this is Cache::close() |
| if (!eventFlags.isCacheClose()) { |
| LOGFINE("Cache writer prevented region destroy"); |
| m_destroyPending = false; |
| return GF_CACHEWRITER_ERROR; |
| } |
| } |
| // for the expiry case try the local destroy first and remote |
| // destroy only if local destroy succeeds |
| if (!eventFlags.isLocal()) { |
| err = destroyRegionNoThrow_remote(aCallbackArgument); |
| if (err != GF_NOERR) { |
| m_destroyPending = false; |
| return err; |
| } |
| } |
| } |
| |
| LOGFINE("Region %s is being destroyed", m_fullPath.c_str()); |
| { |
| MapOfRegionGuard guard(m_subRegions.mutex()); |
| for (MapOfRegionWithLock::iterator p = m_subRegions.begin(); |
| p != m_subRegions.end(); ++p) { |
| // TODO: remove unnecessary dynamic_cast by having m_subRegions hold |
| // RegionInternal and invoke the destroy method in that |
| RegionInternal* subRegion = |
| dynamic_cast<RegionInternal*>((*p).int_id_.get()); |
| if (subRegion != nullptr) { |
| // for subregions never remove from parent since that will cause |
| // the region to be destroyed and SEGV; unbind_all takes care of that |
| // Also don't send remote destroy message for sub-regions |
| err = subRegion->destroyRegionNoThrow( |
| aCallbackArgument, false, eventFlags | CacheEventFlags::LOCAL); |
| // for Cache::close() keep going as far as possible |
| if (err != GF_NOERR && !eventFlags.isCacheClose()) { |
| m_destroyPending = false; |
| return err; |
| } |
| } |
| } |
| } |
| m_subRegions.unbind_all(); |
| |
| // for the expiry case try the local destroy first and remote |
| // destroy only if local destroy succeeds |
| if (eventFlags.isEvictOrExpire() && !eventFlags.isLocal()) { |
| err = destroyRegionNoThrow_remote(aCallbackArgument); |
| if (err != GF_NOERR) { |
| m_destroyPending = false; |
| return err; |
| } |
| } |
| // if we are not removing from parent then this is a proper |
| // region close so invoke listener->close() also |
| err = invokeCacheListenerForRegionEvent(aCallbackArgument, eventFlags, |
| AFTER_REGION_DESTROY); |
| |
| release(true); |
| if (m_regionAttributes.getCachingEnabled()) { |
| _GEODE_SAFE_DELETE(m_entries); |
| } |
| GF_D_ASSERT(m_destroyPending); |
| |
| if (removeFromParent) { |
| if (m_parentRegion == nullptr) { |
| m_cacheImpl->removeRegion(m_name.c_str()); |
| } else { |
| LocalRegion* parent = dynamic_cast<LocalRegion*>(m_parentRegion.get()); |
| if (parent != nullptr) { |
| parent->removeRegion(m_name); |
| if (!eventFlags.isEvictOrExpire()) { |
| parent->updateAccessAndModifiedTime(true); |
| } |
| } |
| } |
| } |
| return err; |
| } |
| |
| GfErrType LocalRegion::putLocal(const std::string& name, bool isCreate, |
| const std::shared_ptr<CacheableKey>& key, |
| const std::shared_ptr<Cacheable>& value, |
| std::shared_ptr<Cacheable>& oldValue, |
| bool cachingEnabled, int updateCount, |
| int destroyTracker, |
| std::shared_ptr<VersionTag> versionTag, |
| DataInput* delta, |
| std::shared_ptr<EventId> eventId) { |
| GfErrType err = GF_NOERR; |
| bool isUpdate = !isCreate; |
| auto& cachePerfStats = m_cacheImpl->getCachePerfStats(); |
| |
| if (cachingEnabled) { |
| std::shared_ptr<MapEntryImpl> entry; |
| LOGDEBUG("%s: region [%s] putting key [%s], value [%s]", name.c_str(), |
| getFullPath().c_str(), Utils::nullSafeToString(key).c_str(), |
| Utils::nullSafeToString(value).c_str()); |
| if (isCreate) { |
| err = m_entries->create(key, value, entry, oldValue, updateCount, |
| destroyTracker, versionTag); |
| } else { |
| err = m_entries->put(key, value, entry, oldValue, updateCount, |
| destroyTracker, versionTag, isUpdate, delta); |
| if (err == GF_INVALID_DELTA) { |
| cachePerfStats.incFailureOnDeltaReceived(); |
| // PXR: Get full object from server. |
| std::shared_ptr<Cacheable>& newValue1 = |
| const_cast<std::shared_ptr<Cacheable>&>(value); |
| std::shared_ptr<VersionTag> versionTag1; |
| err = getNoThrow_FullObject(eventId, newValue1, versionTag1); |
| if (err == GF_NOERR && newValue1 != nullptr) { |
| err = m_entries->put( |
| key, newValue1, entry, oldValue, updateCount, destroyTracker, |
| versionTag1 != nullptr ? versionTag1 : versionTag, isUpdate); |
| } |
| } |
| if (delta != nullptr && |
| err == GF_NOERR) { // Means that delta is on and there is no failure. |
| cachePerfStats.incDeltaReceived(); |
| } |
| } |
| if (err != GF_NOERR) { |
| return err; |
| } |
| LOGDEBUG("%s: region [%s] %s key [%s], value [%s]", name.c_str(), |
| getFullPath().c_str(), isUpdate ? "updated" : "created", |
| Utils::nullSafeToString(key).c_str(), |
| Utils::nullSafeToString(value).c_str()); |
| // entry/region expiration |
| if (entryExpiryEnabled()) { |
| if (isUpdate && entry->getExpProperties().getExpiryTaskId() != -1) { |
| updateAccessAndModifiedTimeForEntry(entry, true); |
| } else { |
| registerEntryExpiryTask(entry); |
| } |
| } |
| updateAccessAndModifiedTime(true); |
| } |
| // update the stats |
| if (isUpdate) { |
| m_regionStats->incPuts(); |
| cachePerfStats.incPuts(); |
| } else { |
| if (cachingEnabled) { |
| m_regionStats->setEntries(m_entries->size()); |
| cachePerfStats.incEntries(1); |
| } |
| m_regionStats->incCreates(); |
| cachePerfStats.incCreates(); |
| } |
| return err; |
| } |
| |
| std::vector<std::shared_ptr<CacheableKey>> LocalRegion::keys_internal() { |
| std::vector<std::shared_ptr<CacheableKey>> keys; |
| |
| if (m_regionAttributes.getCachingEnabled()) { |
| m_entries->getKeys(keys); |
| } |
| |
| return keys; |
| } |
| |
| void LocalRegion::entries_internal( |
| std::vector<std::shared_ptr<RegionEntry>>& me, const bool recursive) { |
| m_entries->getEntries(me); |
| |
| if (recursive == true) { |
| MapOfRegionGuard guard(m_subRegions.mutex()); |
| for (MapOfRegionWithLock::iterator p = m_subRegions.begin(); |
| p != m_subRegions.end(); ++p) { |
| dynamic_cast<LocalRegion*>((*p).int_id_.get()) |
| ->entries_internal(me, true); |
| } |
| } |
| } |
| |
| int LocalRegion::removeRegion(const std::string& name) { |
| if (m_subRegions.current_size() == 0) { |
| return 0; |
| } |
| return m_subRegions.unbind(name); |
| } |
| |
| bool LocalRegion::invokeCacheWriterForEntryEvent( |
| const std::shared_ptr<CacheableKey>& key, |
| std::shared_ptr<Cacheable>& oldValue, |
| const std::shared_ptr<Cacheable>& newValue, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| CacheEventFlags eventFlags, EntryEventType type) { |
| // Check if we have a local cache writer. If so, invoke and return. |
| bool bCacheWriterReturn = true; |
| if (m_writer != nullptr) { |
| if (oldValue != nullptr && CacheableToken::isInvalid(oldValue)) { |
| oldValue = nullptr; |
| } |
| EntryEvent event(shared_from_this(), key, oldValue, newValue, |
| aCallbackArgument, eventFlags.isNotification()); |
| const char* eventStr = "unknown"; |
| try { |
| bool updateStats = true; |
| /*Update the CacheWriter Stats*/ |
| int64_t sampleStartNanos = startStatOpTime(); |
| switch (type) { |
| case BEFORE_UPDATE: { |
| if (oldValue != nullptr) { |
| eventStr = "beforeUpdate"; |
| bCacheWriterReturn = m_writer->beforeUpdate(event); |
| break; |
| } |
| // if oldValue is nullptr then fall to BEFORE_CREATE case |
| } |
| case BEFORE_CREATE: { |
| eventStr = "beforeCreate"; |
| bCacheWriterReturn = m_writer->beforeCreate(event); |
| break; |
| } |
| case BEFORE_DESTROY: { |
| eventStr = "beforeDestroy"; |
| bCacheWriterReturn = m_writer->beforeDestroy(event); |
| break; |
| } |
| default: { |
| updateStats = false; |
| break; |
| } |
| } |
| |
| if (updateStats) { |
| updateStatOpTime(m_regionStats->getStat(), |
| m_regionStats->getWriterCallTimeId(), |
| sampleStartNanos); |
| m_regionStats->incWriterCallsCompleted(); |
| } |
| |
| } catch (const Exception& ex) { |
| LOGERROR(std::string("Exception in CacheWriter::") + eventStr + ": " + |
| ex.getName() + ": " + ex.getMessage()); |
| bCacheWriterReturn = false; |
| } catch (...) { |
| LOGERROR("Unknown exception in CacheWriter::%s", eventStr); |
| bCacheWriterReturn = false; |
| } |
| } |
| return bCacheWriterReturn; |
| } |
| |
| bool LocalRegion::invokeCacheWriterForRegionEvent( |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| CacheEventFlags eventFlags, RegionEventType type) { |
| // Check if we have a local cache writer. If so, invoke and return. |
| bool bCacheWriterReturn = true; |
| if (m_writer != nullptr) { |
| RegionEvent event(shared_from_this(), aCallbackArgument, |
| eventFlags.isNotification()); |
| const char* eventStr = "unknown"; |
| try { |
| bool updateStats = true; |
| /*Update the CacheWriter Stats*/ |
| int64_t sampleStartNanos = startStatOpTime(); |
| switch (type) { |
| case BEFORE_REGION_DESTROY: { |
| eventStr = "beforeRegionDestroy"; |
| bCacheWriterReturn = m_writer->beforeRegionDestroy(event); |
| break; |
| } |
| case BEFORE_REGION_CLEAR: { |
| eventStr = "beforeRegionClear"; |
| bCacheWriterReturn = m_writer->beforeRegionClear(event); |
| break; |
| } |
| default: { |
| updateStats = false; |
| break; |
| } |
| } |
| if (updateStats) { |
| updateStatOpTime(m_regionStats->getStat(), |
| m_regionStats->getWriterCallTimeId(), |
| sampleStartNanos); |
| m_regionStats->incWriterCallsCompleted(); |
| } |
| } catch (const Exception& ex) { |
| LOGERROR(std::string("Exception in CacheWriter::") + eventStr + ": " + |
| ex.getName() + ": " + ex.getMessage()); |
| bCacheWriterReturn = false; |
| } catch (...) { |
| LOGERROR("Unknown exception in CacheWriter::%s", eventStr); |
| bCacheWriterReturn = false; |
| } |
| } |
| return bCacheWriterReturn; |
| } |
| |
| GfErrType LocalRegion::invokeCacheListenerForEntryEvent( |
| const std::shared_ptr<CacheableKey>& key, |
| std::shared_ptr<Cacheable>& oldValue, |
| const std::shared_ptr<Cacheable>& newValue, |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| CacheEventFlags eventFlags, EntryEventType type, bool isLocal) { |
| GfErrType err = GF_NOERR; |
| |
| // Check if we have a local cache listener. If so, invoke and return. |
| if (m_listener != nullptr) { |
| if (oldValue != nullptr && CacheableToken::isInvalid(oldValue)) { |
| oldValue = nullptr; |
| } |
| EntryEvent event(shared_from_this(), key, oldValue, newValue, |
| aCallbackArgument, eventFlags.isNotification()); |
| const char* eventStr = "unknown"; |
| try { |
| bool updateStats = true; |
| /*Update the CacheWriter Stats*/ |
| int64_t sampleStartNanos = startStatOpTime(); |
| switch (type) { |
| case AFTER_UPDATE: { |
| // when CREATE is received from server for notification |
| // then force an afterUpdate even if key is not present in cache. |
| if (oldValue != nullptr || eventFlags.isNotificationUpdate() || |
| isLocal) { |
| eventStr = "afterUpdate"; |
| m_listener->afterUpdate(event); |
| break; |
| } |
| // if oldValue is nullptr then fall to AFTER_CREATE case |
| } |
| case AFTER_CREATE: { |
| eventStr = "afterCreate"; |
| m_listener->afterCreate(event); |
| break; |
| } |
| case AFTER_DESTROY: { |
| eventStr = "afterDestroy"; |
| m_listener->afterDestroy(event); |
| break; |
| } |
| case AFTER_INVALIDATE: { |
| eventStr = "afterInvalidate"; |
| m_listener->afterInvalidate(event); |
| break; |
| } |
| default: { |
| updateStats = false; |
| break; |
| } |
| } |
| if (updateStats) { |
| m_cacheImpl->getCachePerfStats().incListenerCalls(); |
| updateStatOpTime(m_regionStats->getStat(), |
| m_regionStats->getListenerCallTimeId(), |
| sampleStartNanos); |
| m_regionStats->incListenerCallsCompleted(); |
| } |
| } catch (const Exception& ex) { |
| LOGERROR("Exception in CacheListener for key[%s]::%s: %s: %s", |
| Utils::nullSafeToString(key).c_str(), eventStr, |
| ex.getName().c_str(), ex.what()); |
| err = GF_CACHE_LISTENER_EXCEPTION; |
| } catch (...) { |
| LOGERROR("Unknown exception in CacheListener for key[%s]::%s", |
| Utils::nullSafeToString(key).c_str(), eventStr); |
| err = GF_CACHE_LISTENER_EXCEPTION; |
| } |
| } |
| return err; |
| } |
| |
| GfErrType LocalRegion::invokeCacheListenerForRegionEvent( |
| const std::shared_ptr<Serializable>& aCallbackArgument, |
| CacheEventFlags eventFlags, RegionEventType type) { |
| GfErrType err = GF_NOERR; |
| |
| // Check if we have a local cache listener. If so, invoke and return. |
| if (m_listener != nullptr) { |
| RegionEvent event(shared_from_this(), aCallbackArgument, |
| eventFlags.isNotification()); |
| const char* eventStr = "unknown"; |
| try { |
| bool updateStats = true; |
| /*Update the CacheWriter Stats*/ |
| int64_t sampleStartNanos = Utils::startStatOpTime(); |
| switch (type) { |
| case AFTER_REGION_DESTROY: { |
| eventStr = "afterRegionDestroy"; |
| m_listener->afterRegionDestroy(event); |
| m_cacheImpl->getCachePerfStats().incListenerCalls(); |
| if (eventFlags.isCacheClose()) { |
| eventStr = "close"; |
| m_listener->close(*this); |
| m_cacheImpl->getCachePerfStats().incListenerCalls(); |
| } |
| break; |
| } |
| case AFTER_REGION_INVALIDATE: { |
| eventStr = "afterRegionInvalidate"; |
| m_listener->afterRegionInvalidate(event); |
| m_cacheImpl->getCachePerfStats().incListenerCalls(); |
| break; |
| } |
| case AFTER_REGION_CLEAR: { |
| eventStr = "afterRegionClear"; |
| m_listener->afterRegionClear(event); |
| break; |
| } |
| default: { |
| updateStats = false; |
| break; |
| } |
| } |
| if (updateStats) { |
| updateStatOpTime(m_regionStats->getStat(), |
| m_regionStats->getListenerCallTimeId(), |
| sampleStartNanos); |
| m_regionStats->incListenerCallsCompleted(); |
| } |
| } catch (const Exception& ex) { |
| LOGERROR("Exception in CacheListener::%s: %s: %s", eventStr, |
| ex.getName().c_str(), ex.what()); |
| err = GF_CACHE_LISTENER_EXCEPTION; |
| } catch (...) { |
| LOGERROR("Unknown exception in CacheListener::%s", eventStr); |
| err = GF_CACHE_LISTENER_EXCEPTION; |
| } |
| } |
| return err; |
| } |
| |
| // TODO: pass current time instead of evaluating it twice, here |
| // and in region |
| void LocalRegion::updateAccessAndModifiedTimeForEntry( |
| std::shared_ptr<MapEntryImpl>& ptr, bool modified) { |
| // locking is not required since setters use atomic operations |
| if (ptr != nullptr && entryExpiryEnabled()) { |
| ExpEntryProperties& expProps = ptr->getExpProperties(); |
| auto currTime = std::chrono::system_clock::now(); |
| std::string keyStr; |
| if (Log::debugEnabled()) { |
| std::shared_ptr<CacheableKey> key; |
| ptr->getKeyI(key); |
| keyStr = Utils::nullSafeToString(key); |
| } |
| LOGDEBUG("Setting last accessed time for key [%s] in region %s to %d", |
| keyStr.c_str(), getFullPath().c_str(), |
| currTime.time_since_epoch().count()); |
| expProps.updateLastAccessTime(currTime); |
| if (modified) { |
| LOGDEBUG("Setting last modified time for key [%s] in region %s to %d", |
| keyStr.c_str(), getFullPath().c_str(), |
| currTime.time_since_epoch().count()); |
| expProps.updateLastModifiedTime(currTime); |
| } |
| } |
| } |
| |
| uint32_t LocalRegion::adjustLruEntriesLimit(uint32_t limit) { |
| CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::adjustLruEntriesLimit); |
| |
| auto attrs = m_regionAttributes; |
| if (!attrs.getCachingEnabled()) return 0; |
| bool hadlru = (attrs.getLruEntriesLimit() != 0); |
| bool needslru = (limit != 0); |
| if (hadlru != needslru) { |
| throw IllegalStateException( |
| "Cannot disable or enable LRU, can only adjust limit."); |
| } |
| uint32_t oldValue = attrs.getLruEntriesLimit(); |
| setLruEntriesLimit(limit); |
| if (needslru) { |
| // checked in AttributesMutator already to assert that LRU was enabled.. |
| LRUEntriesMap* lrumap = static_cast<LRUEntriesMap*>(m_entries); |
| |
| lrumap->adjustLimit(limit); |
| } |
| return oldValue; |
| } |
| |
| ExpirationAction LocalRegion::adjustRegionExpiryAction( |
| ExpirationAction action) { |
| CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::adjustRegionExpiryAction); |
| |
| auto attrs = m_regionAttributes; |
| bool hadExpiry = (getRegionExpiryDuration() > std::chrono::seconds::zero()); |
| if (!hadExpiry) { |
| throw IllegalStateException( |
| "Cannot change region ExpirationAction for region created without " |
| "region expiry."); |
| } |
| ExpirationAction oldValue = getRegionExpiryAction(); |
| |
| setRegionTimeToLiveExpirationAction(action); |
| setRegionIdleTimeoutExpirationAction(action); |
| // m_regionExpirationAction = action; |
| |
| return oldValue; |
| } |
| |
| ExpirationAction LocalRegion::adjustEntryExpiryAction(ExpirationAction action) { |
| CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::adjustEntryExpiryAction); |
| |
| auto attrs = m_regionAttributes; |
| bool hadExpiry = (getEntryExpiryDuration() > std::chrono::seconds::zero()); |
| if (!hadExpiry) { |
| throw IllegalStateException( |
| "Cannot change entry ExpirationAction for region created without " |
| "entry " |
| "expiry."); |
| } |
| ExpirationAction oldValue = getEntryExpirationAction(); |
| |
| setEntryTimeToLiveExpirationAction(action); |
| setEntryIdleTimeoutExpirationAction(action); |
| |
| return oldValue; |
| } |
| |
| std::chrono::seconds LocalRegion::adjustRegionExpiryDuration( |
| const std::chrono::seconds& duration) { |
| CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::adjustRegionExpiryDuration); |
| |
| bool hadExpiry = (getEntryExpiryDuration() > std::chrono::seconds::zero()); |
| if (!hadExpiry) { |
| throw IllegalStateException( |
| "Cannot change region expiration duration for region created " |
| "without " |
| "region expiry."); |
| } |
| const auto& oldValue = getRegionExpiryDuration(); |
| |
| setRegionTimeToLive(duration); |
| setRegionIdleTimeout(duration); |
| |
| return oldValue; |
| } |
| |
| std::chrono::seconds LocalRegion::adjustEntryExpiryDuration( |
| const std::chrono::seconds& duration) { |
| CHECK_DESTROY_PENDING(TryReadGuard, LocalRegion::adjustEntryExpiryDuration); |
| |
| bool hadExpiry = (getEntryExpiryDuration() > std::chrono::seconds::zero()); |
| if (!hadExpiry) { |
| throw IllegalStateException( |
| "Cannot change entry expiration duration for region created without " |
| "entry expiry."); |
| } |
| auto oldValue = getEntryExpiryDuration(); |
| |
| setEntryTimeToLive(duration); |
| setEntryIdleTimeout(duration); |
| |
| return oldValue; |
| } |
| |
| /** they used to public methods in hpp file */ |
| bool LocalRegion::isStatisticsEnabled() { |
| if (m_cacheImpl == nullptr) { |
| return false; |
| } |
| return m_cacheImpl->getDistributedSystem() |
| .getSystemProperties() |
| .statisticsEnabled(); |
| } |
| |
| bool LocalRegion::useModifiedTimeForRegionExpiry() { |
| const auto& region_ttl = m_regionAttributes.getRegionTimeToLive(); |
| if (region_ttl > std::chrono::seconds::zero()) { |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| bool LocalRegion::useModifiedTimeForEntryExpiry() { |
| if (m_regionAttributes.getEntryTimeToLive() > std::chrono::seconds::zero()) { |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| bool LocalRegion::isEntryIdletimeEnabled() { |
| if (m_regionAttributes.getCachingEnabled() && |
| m_regionAttributes.getEntryIdleTimeout() > std::chrono::seconds::zero()) { |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| ExpirationAction LocalRegion::getEntryExpirationAction() const { |
| if (m_regionAttributes.getEntryTimeToLive() > std::chrono::seconds::zero()) { |
| return m_regionAttributes.getEntryTimeToLiveAction(); |
| } else { |
| return m_regionAttributes.getEntryIdleTimeoutAction(); |
| } |
| } |
| |
| ExpirationAction LocalRegion::getRegionExpiryAction() const { |
| const auto& region_ttl = m_regionAttributes.getRegionTimeToLive(); |
| if (region_ttl > std::chrono::seconds::zero()) { |
| return m_regionAttributes.getRegionTimeToLiveAction(); |
| } else { |
| return m_regionAttributes.getRegionIdleTimeoutAction(); |
| } |
| } |
| |
| std::chrono::seconds LocalRegion::getRegionExpiryDuration() const { |
| const auto& region_ttl = m_regionAttributes.getRegionTimeToLive(); |
| const auto& region_idle = m_regionAttributes.getRegionIdleTimeout(); |
| if (region_ttl > std::chrono::seconds::zero()) { |
| return region_ttl; |
| } else { |
| return region_idle; |
| } |
| } |
| |
| std::chrono::seconds LocalRegion::getEntryExpiryDuration() const { |
| const auto& entry_ttl = m_regionAttributes.getEntryTimeToLive(); |
| const auto& entry_idle = m_regionAttributes.getEntryIdleTimeout(); |
| |
| if (entry_ttl > std::chrono::seconds::zero()) { |
| return entry_ttl; |
| } else { |
| return entry_idle; |
| } |
| } |
| |
| /** methods to be overridden by derived classes*/ |
| GfErrType LocalRegion::unregisterKeysBeforeDestroyRegion() { return GF_NOERR; } |
| |
| GfErrType LocalRegion::getNoThrow_remote(const std::shared_ptr<CacheableKey>&, |
| std::shared_ptr<Cacheable>&, |
| const std::shared_ptr<Serializable>&, |
| std::shared_ptr<VersionTag>&) { |
| return GF_NOERR; |
| } |
| |
| GfErrType LocalRegion::putNoThrow_remote(const std::shared_ptr<CacheableKey>&, |
| const std::shared_ptr<Cacheable>&, |
| const std::shared_ptr<Serializable>&, |
| std::shared_ptr<VersionTag>&, bool) { |
| return GF_NOERR; |
| } |
| |
| GfErrType LocalRegion::putAllNoThrow_remote( |
| const HashMapOfCacheable&, |
| std::shared_ptr<VersionedCacheableObjectPartList>&, |
| std::chrono::milliseconds, const std::shared_ptr<Serializable>&) { |
| return GF_NOERR; |
| } |
| |
| GfErrType LocalRegion::removeAllNoThrow_remote( |
| const std::vector<std::shared_ptr<CacheableKey>>&, |
| std::shared_ptr<VersionedCacheableObjectPartList>&, |
| const std::shared_ptr<Serializable>&) { |
| return GF_NOERR; |
| } |
| |
| GfErrType LocalRegion::createNoThrow_remote( |
| const std::shared_ptr<CacheableKey>&, const std::shared_ptr<Cacheable>&, |
| const std::shared_ptr<Serializable>&, std::shared_ptr<VersionTag>&) { |
| return GF_NOERR; |
| } |
| |
| GfErrType LocalRegion::destroyNoThrow_remote( |
| const std::shared_ptr<CacheableKey>&, const std::shared_ptr<Serializable>&, |
| std::shared_ptr<VersionTag>&) { |
| return GF_NOERR; |
| } |
| |
| GfErrType LocalRegion::removeNoThrow_remote( |
| const std::shared_ptr<CacheableKey>&, const std::shared_ptr<Cacheable>&, |
| const std::shared_ptr<Serializable>&, std::shared_ptr<VersionTag>&) { |
| return GF_NOERR; |
| } |
| |
| GfErrType LocalRegion::removeNoThrowEX_remote( |
| const std::shared_ptr<CacheableKey>&, const std::shared_ptr<Serializable>&, |
| std::shared_ptr<VersionTag>&) { |
| return GF_NOERR; |
| } |
| |
| GfErrType LocalRegion::invalidateNoThrow_remote( |
| const std::shared_ptr<CacheableKey>&, const std::shared_ptr<Serializable>&, |
| std::shared_ptr<VersionTag>&) { |
| return GF_NOERR; |
| } |
| |
| GfErrType LocalRegion::getAllNoThrow_remote( |
| const std::vector<std::shared_ptr<CacheableKey>>*, |
| const std::shared_ptr<HashMapOfCacheable>&, |
| const std::shared_ptr<HashMapOfException>&, |
| const std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>>&, bool, |
| const std::shared_ptr<Serializable>&) { |
| return GF_NOERR; |
| } |
| |
| GfErrType LocalRegion::invalidateRegionNoThrow_remote( |
| const std::shared_ptr<Serializable>&) { |
| return GF_NOERR; |
| } |
| |
| GfErrType LocalRegion::destroyRegionNoThrow_remote( |
| const std::shared_ptr<Serializable>&) { |
| return GF_NOERR; |
| } |
| |
| void LocalRegion::adjustCacheListener( |
| const std::shared_ptr<CacheListener>& aListener) { |
| WriteGuard guard(m_rwLock); |
| setCacheListener(aListener); |
| m_listener = aListener; |
| } |
| |
| void LocalRegion::adjustCacheListener(const std::string& lib, |
| const std::string& func) { |
| WriteGuard guard(m_rwLock); |
| setCacheListener(lib, func); |
| m_listener = m_regionAttributes.getCacheListener(); |
| } |
| |
| void LocalRegion::adjustCacheLoader( |
| const std::shared_ptr<CacheLoader>& aLoader) { |
| WriteGuard guard(m_rwLock); |
| setCacheLoader(aLoader); |
| m_loader = aLoader; |
| } |
| |
| void LocalRegion::adjustCacheLoader(const std::string& lib, |
| const std::string& func) { |
| WriteGuard guard(m_rwLock); |
| setCacheLoader(lib, func); |
| m_loader = m_regionAttributes.getCacheLoader(); |
| } |
| |
| void LocalRegion::adjustCacheWriter( |
| const std::shared_ptr<CacheWriter>& aWriter) { |
| WriteGuard guard(m_rwLock); |
| setCacheWriter(aWriter); |
| m_writer = aWriter; |
| } |
| |
| void LocalRegion::adjustCacheWriter(const std::string& lib, |
| const std::string& func) { |
| WriteGuard guard(m_rwLock); |
| setCacheWriter(lib, func); |
| m_writer = m_regionAttributes.getCacheWriter(); |
| } |
| |
| void LocalRegion::evict(int32_t percentage) { |
| TryReadGuard guard(m_rwLock, m_destroyPending); |
| if (m_released || m_destroyPending) return; |
| if (m_entries != nullptr) { |
| int32_t size = m_entries->size(); |
| int32_t entriesToEvict = (percentage * size) / 100; |
| // only invoked from EvictionController so static_cast is always safe |
| LRUEntriesMap* lruMap = static_cast<LRUEntriesMap*>(m_entries); |
| LOGINFO("Evicting %d entries. Current entry count is %d", entriesToEvict, |
| size); |
| lruMap->processLRU(entriesToEvict); |
| } |
| } |
| void LocalRegion::invokeAfterAllEndPointDisconnected() { |
| if (m_listener != nullptr) { |
| int64_t sampleStartNanos = startStatOpTime(); |
| try { |
| m_listener->afterRegionDisconnected(*this); |
| } catch (const Exception& ex) { |
| LOGERROR("Exception in CacheListener::afterRegionDisconnected: %s: %s", |
| ex.getName().c_str(), ex.what()); |
| } catch (...) { |
| LOGERROR("Unknown exception in CacheListener::afterRegionDisconnected"); |
| } |
| updateStatOpTime(m_regionStats->getStat(), |
| m_regionStats->getListenerCallTimeId(), sampleStartNanos); |
| m_regionStats->incListenerCallsCompleted(); |
| } |
| } |
| |
| GfErrType LocalRegion::getNoThrow_FullObject(std::shared_ptr<EventId>, |
| std::shared_ptr<Cacheable>&, |
| std::shared_ptr<VersionTag>&) { |
| return GF_NOERR; |
| } |
| |
| std::shared_ptr<Cacheable> LocalRegion::handleReplay( |
| GfErrType& err, std::shared_ptr<Cacheable> value) const { |
| if (err == GF_TRANSACTION_DATA_REBALANCED_EXCEPTION || |
| err == GF_TRANSACTION_DATA_NODE_HAS_DEPARTED_EXCEPTION) { |
| bool isRollBack = (err == GF_TRANSACTION_DATA_REBALANCED_EXCEPTION); |
| auto txState = getTXState(); |
| if (!txState) { |
| GfErrTypeThrowException("TXState is nullptr", |
| GF_CACHE_ILLEGAL_STATE_EXCEPTION); |
| throw ""; // never reached |
| } |
| |
| auto ret = txState->replay(isRollBack); |
| err = GF_NOERR; |
| return ret; |
| } |
| |
| return value; |
| } |
| std::shared_ptr<TombstoneList> LocalRegion::getTombstoneList() { |
| return m_tombstoneList; |
| } |
| |
| int64_t LocalRegion::startStatOpTime() { |
| return m_enableTimeStatistics ? Utils::startStatOpTime() : 0; |
| } |
| void LocalRegion::updateStatOpTime(Statistics* statistics, int32_t statId, |
| int64_t start) { |
| if (m_enableTimeStatistics) { |
| Utils::updateStatOpTime(statistics, statId, start); |
| } |
| } |
| |
| void LocalRegion::acquireGlobals(bool) {} |
| |
| void LocalRegion::releaseGlobals(bool) {} |
| |
| } // namespace client |
| } // namespace geode |
| } // namespace apache |