blob: 10b4a1c84f6c0022abfcc464c6ba5ad92361dc73 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "EvictionController.hpp"
#include <chrono>
#include <boost/thread/lock_types.hpp>
#include "CacheImpl.hpp"
#include "CacheRegionHelper.hpp"
#include "DistributedSystem.hpp"
#include "ReadWriteLock.hpp"
#include "RegionInternal.hpp"
#include "util/Log.hpp"
namespace apache {
namespace geode {
namespace client {
const char* EvictionController::NC_EC_Thread = "NC EC Thread";
EvictionController::EvictionController(size_t maxHeapSize,
int32_t heapSizeDelta, CacheImpl* cache)
: m_run(false),
m_maxHeapSize(maxHeapSize * 1024 * 1024),
m_heapSizeDelta(heapSizeDelta),
m_cacheImpl(cache),
m_currentHeapSize(0),
m_evictionThread(this) {
LOGINFO("Maximum heap size for Heap LRU set to %ld bytes", m_maxHeapSize);
}
void EvictionController::start() {
m_evictionThread.start();
m_run = true;
m_thread = std::thread(&EvictionController::svc, this);
LOGFINE("Eviction Controller started");
}
void EvictionController::stop() {
m_run = false;
m_queueCondition.notify_one();
m_thread.join();
m_evictionThread.stop();
m_regions.clear();
m_queue.clear();
LOGFINE("Eviction controller stopped");
}
void EvictionController::svc() {
DistributedSystemImpl::setThreadName(NC_EC_Thread);
int64_t pendingEvictions = 0;
while (m_run) {
std::unique_lock<std::mutex> lock(m_queueMutex);
m_queueCondition.wait(lock, [this] { return !m_run || !m_queue.empty(); });
while (!m_queue.empty()) {
auto readInfo = m_queue.front();
m_queue.pop_front();
if (0 != readInfo) {
processHeapInfo(readInfo, pendingEvictions);
}
}
}
}
void EvictionController::updateRegionHeapInfo(int64_t info) {
std::unique_lock<std::mutex> lock(m_queueMutex);
m_queue.push_back(info);
m_queueCondition.notify_one();
// We could block here if we wanted to prevent any further memory use
// until the evictions had been completed.
}
void EvictionController::processHeapInfo(int64_t& readInfo,
int64_t& pendingEvictions) {
m_currentHeapSize += readInfo;
// Waiting for evictions to catch up.Negative numbers
// are attributed to evictions that were triggered by the
// EvictionController
int64_t sizeToCompare = 0;
if (readInfo < 0 && pendingEvictions > 0) {
pendingEvictions += readInfo;
if (pendingEvictions < 0) pendingEvictions = 0;
return; // as long as you are still evicting, don't do the rest of the work
} else {
sizeToCompare = m_currentHeapSize - pendingEvictions;
}
if (sizeToCompare > m_maxHeapSize) {
// Check if overflow is above the delta
int64_t sizeOverflow = sizeToCompare - m_maxHeapSize;
// Calculate the percentage that we are over the limit.
int32_t fractionalOverflow =
static_cast<int32_t>(((sizeOverflow * 100) % m_maxHeapSize) > 0) ? 1
: 0;
int32_t percentage =
static_cast<int32_t>((sizeOverflow * 100) / m_maxHeapSize) +
fractionalOverflow;
// need to evict
int32_t evictionPercentage =
static_cast<int32_t>(percentage + m_heapSizeDelta);
int32_t bytesToEvict =
static_cast<int32_t>((sizeToCompare * evictionPercentage) / 100);
pendingEvictions += bytesToEvict;
if (evictionPercentage > 100) evictionPercentage = 100;
orderEvictions(evictionPercentage);
}
}
void EvictionController::registerRegion(const std::string& name) {
boost::unique_lock<decltype(m_regionLock)> lock(m_regionLock);
m_regions.push_back(name);
LOGFINE("Registered region with Heap LRU eviction controller: name is " +
name);
}
void EvictionController::deregisterRegion(const std::string& name) {
// Iterate over regions vector and remove the one that we need to remove
boost::unique_lock<decltype(m_regionLock)> lock(m_regionLock);
const auto& removed =
std::remove_if(m_regions.begin(), m_regions.end(),
[&](const std::string& region) { return region == name; });
if (removed != m_regions.cend()) {
LOGFINE("Deregistered region with Heap LRU eviction controller: name is " +
name);
}
m_regions.erase(removed, m_regions.cend());
}
void EvictionController::orderEvictions(int32_t percentage) {
m_evictionThread.putEvictionInfo(percentage);
}
void EvictionController::evict(int32_t percentage) {
// TODO: Shouldn't we take the CacheImpl::m_regions
// lock here? Otherwise we might invoke eviction on a region
// that has been destroyed or is being destroyed.
// Its important to not hold this lock for too long
// because it prevents new regions from getting created or destroyed
// On the flip side, this requires a copy of the registered region list
// every time eviction is ordered and that might not be cheap
//@TODO: Discuss with team
decltype(m_regions) regionTempVector;
{
boost::shared_lock<decltype(m_regionLock)> lock(m_regionLock);
regionTempVector.reserve(m_regions.size());
regionTempVector.insert(regionTempVector.end(), m_regions.begin(),
m_regions.end());
}
for (const auto& regionName : regionTempVector) {
if (auto region = std::dynamic_pointer_cast<RegionInternal>(
m_cacheImpl->getRegion(regionName))) {
region->evict(percentage);
}
}
}
} // namespace client
} // namespace geode
} // namespace apache