| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "EvictionController.hpp" |
| |
| #include <string> |
| |
| #include "CacheImpl.hpp" |
| #include "CacheRegionHelper.hpp" |
| #include "DistributedSystem.hpp" |
| #include "ReadWriteLock.hpp" |
| #include "RegionInternal.hpp" |
| |
| namespace apache { |
| namespace geode { |
| namespace client { |
| |
| const char* EvictionController::NC_EC_Thread = "NC EC Thread"; |
| EvictionController::EvictionController(size_t maxHeapSize, |
| int32_t heapSizeDelta, CacheImpl* cache) |
| : m_run(false), |
| m_maxHeapSize(maxHeapSize * 1024 * 1024), |
| m_heapSizeDelta(heapSizeDelta), |
| m_cacheImpl(cache), |
| m_currentHeapSize(0) { |
| evictionThreadPtr = new EvictionThread(this); |
| LOGINFO("Maximum heap size for Heap LRU set to %ld bytes", m_maxHeapSize); |
| // m_currentHeapSize = |
| // DistributedSystem::getSystemProperties()->gfHighWaterMark(), |
| // DistributedSystem::getSystemProperties()->gfMessageSize(); |
| } |
| |
| EvictionController::~EvictionController() { |
| _GEODE_SAFE_DELETE(evictionThreadPtr); |
| } |
| |
| void EvictionController::updateRegionHeapInfo(int64_t info) { |
| // LOGINFO("updateRegionHeapInfo is %d", info); |
| m_queue.put(info); |
| // We could block here if we wanted to prevent any further memory use |
| // until the evictions had been completed. |
| } |
| |
| int EvictionController::svc() { |
| DistributedSystemImpl::setThreadName(NC_EC_Thread); |
| int64_t pendingEvictions = 0; |
| while (m_run) { |
| int64_t readInfo = 0; |
| readInfo = m_queue.get(1500); |
| if (readInfo == 0) continue; |
| |
| processHeapInfo(readInfo, pendingEvictions); |
| } |
| int32_t size = m_queue.size(); |
| for (int i = 0; i < size; i++) { |
| int64_t readInfo = 0; |
| readInfo = m_queue.get(); |
| if (readInfo == 0) continue; |
| processHeapInfo(readInfo, pendingEvictions); |
| } |
| return 1; |
| } |
| |
| void EvictionController::processHeapInfo(int64_t& readInfo, |
| int64_t& pendingEvictions) { |
| m_currentHeapSize += readInfo; |
| |
| // Waiting for evictions to catch up.Negative numbers |
| // are attributed to evictions that were triggered by the |
| // EvictionController |
| int64_t sizeToCompare = 0; |
| if (readInfo < 0 && pendingEvictions > 0) { |
| pendingEvictions += readInfo; |
| if (pendingEvictions < 0) pendingEvictions = 0; |
| return; // as long as you are still evicting, don't do the rest of the work |
| } else { |
| sizeToCompare = m_currentHeapSize - pendingEvictions; |
| } |
| |
| if (sizeToCompare > m_maxHeapSize) { |
| // Check if overflow is above the delta |
| int64_t sizeOverflow = sizeToCompare - m_maxHeapSize; |
| |
| // Calculate the percentage that we are over the limit. |
| int32_t fractionalOverflow = |
| static_cast<int32_t>(((sizeOverflow * 100) % m_maxHeapSize) > 0) ? 1 |
| : 0; |
| int32_t percentage = |
| static_cast<int32_t>((sizeOverflow * 100) / m_maxHeapSize) + |
| fractionalOverflow; |
| // need to evict |
| int32_t evictionPercentage = |
| static_cast<int32_t>(percentage + m_heapSizeDelta); |
| int32_t bytesToEvict = |
| static_cast<int32_t>((sizeToCompare * evictionPercentage) / 100); |
| pendingEvictions += bytesToEvict; |
| if (evictionPercentage > 100) evictionPercentage = 100; |
| orderEvictions(evictionPercentage); |
| } |
| } |
| |
| void EvictionController::registerRegion(std::string& name) { |
| WriteGuard guard(m_regionLock); |
| m_regions.push_back(name); |
| LOGFINE("Registered region with Heap LRU eviction controller: name is %s", |
| name.c_str()); |
| } |
| |
| void EvictionController::deregisterRegion(std::string& name) { |
| // Iterate over regions vector and remove the one that we need to remove |
| WriteGuard guard(m_regionLock); |
| for (size_t i = 0; i < m_regions.size(); i++) { |
| std::string str = m_regions.at(i); |
| if (str == name) { |
| std::vector<std::string>::iterator iter = m_regions.begin(); |
| m_regions.erase(iter + i); |
| LOGFINE( |
| "Deregistered region with Heap LRU eviction controller: name is %s", |
| name.c_str()); |
| break; |
| } |
| } |
| } |
| |
| void EvictionController::orderEvictions(int32_t percentage) { |
| evictionThreadPtr->putEvictionInfo(percentage); |
| } |
| |
| void EvictionController::evict(int32_t percentage) { |
| // TODO: Shouldn't we take the CacheImpl::m_regions |
| // lock here? Otherwise we might invoke eviction on a region |
| // that has been destroyed or is being destroyed. |
| // Its important to not hold this lock for too long |
| // because it prevents new regions from getting created or destroyed |
| // On the flip side, this requires a copy of the registered region list |
| // every time eviction is ordered and that might not be cheap |
| //@TODO: Discuss with team |
| VectorOfString regionTempVector; |
| { |
| ReadGuard guard(m_regionLock); |
| for (size_t i = 0; i < m_regions.size(); i++) { |
| regionTempVector.push_back(m_regions.at(i)); |
| } |
| } |
| |
| for (size_t i = 0; i < regionTempVector.size(); i++) { |
| std::string region_name = regionTempVector.at(i); |
| auto region = m_cacheImpl->getRegion(region_name); |
| if (region != nullptr) { |
| RegionInternal* regionImpl = dynamic_cast<RegionInternal*>(region.get()); |
| if (regionImpl != nullptr) { |
| regionImpl->evict(percentage); |
| } |
| } |
| } |
| } |
| } // namespace client |
| } // namespace geode |
| } // namespace apache |