blob: afae99acf1b6897f0c14ebd11e8f17155109a76d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.eviction;
import static org.apache.geode.distributed.internal.DistributionConfig.GEMFIRE_PREFIX;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.logging.log4j.Logger;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.distributed.internal.QueueStatHelper;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.control.HeapMemoryMonitor;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType;
import org.apache.geode.internal.cache.control.MemoryEvent;
import org.apache.geode.internal.cache.control.ResourceListener;
import org.apache.geode.internal.logging.CoreLoggingExecutors;
import org.apache.geode.internal.statistics.StatisticsClock;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* Triggers centralized eviction(asynchronously) when the ResourceManager sends an eviction event
* for on-heap regions. This is registered with the ResourceManager.
*
* @since GemFire 6.0
*
*/
public class HeapEvictor implements ResourceListener<MemoryEvent> {
private static final Logger logger = LogService.getLogger();
// Add 1 for the management task that's putting more eviction tasks on the queue
public static final int MAX_EVICTOR_THREADS =
Integer.getInteger(GEMFIRE_PREFIX + "HeapLRUCapacityController.MAX_EVICTOR_THREADS",
Runtime.getRuntime().availableProcessors() * 4) + 1;
public static final boolean EVICT_HIGH_ENTRY_COUNT_BUCKETS_FIRST =
Boolean.valueOf(System.getProperty(
GEMFIRE_PREFIX + "HeapLRUCapacityController.evictHighEntryCountBucketsFirst", "true"));
public static final int MINIMUM_ENTRIES_PER_BUCKET =
Integer.getInteger(GEMFIRE_PREFIX + "HeapLRUCapacityController.inlineEvictionThreshold", 0);
public static final int BUCKET_SORTING_INTERVAL = Integer.getInteger(
GEMFIRE_PREFIX + "HeapLRUCapacityController.higherEntryCountBucketCalculationInterval", 100);
private static final boolean DISABLE_HEAP_EVICTOR_THREAD_POOL = Boolean
.getBoolean(GEMFIRE_PREFIX + "HeapLRUCapacityController.DISABLE_HEAP_EVICTOR_THREAD_POOL");
private static final long TOTAL_BYTES_TO_EVICT_FROM_HEAP = setTotalBytesToEvictFromHeap();
private static final String EVICTOR_THREAD_NAME = "EvictorThread";
private final Object evictionLock = new Object();
private final AtomicBoolean mustEvict = new AtomicBoolean(false);
private final List<Integer> testTaskSetSizes = new ArrayList<>();
private final ExecutorService evictorThreadPool;
private final InternalCache cache;
private final AtomicBoolean isRunning = new AtomicBoolean(true);
private final StatisticsClock statisticsClock;
private volatile int testAbortAfterLoopCount = Integer.MAX_VALUE;
/*
* Since the amount of memory used is to a large degree dependent upon when garbage collection is
* run, it's difficult to determine when to stop evicting. So, an initial calculation is done to
* determine the number of evictions that are likely needed in order to bring memory usage below
* the eviction threshold. This number is stored in 'numFastLoops' and we quickly loop through
* this number performing evictions. We then continue to evict, but at a progressively slower rate
* waiting either for an event which indicates we've dropped below the eviction threshold or
* another eviction event with an updated "number of bytes used". If we get another eviction event
* with an updated "number of bytes used" then 'numFastLoops' is recalculated and we start over.
*/
private volatile int numEvictionLoopsCompleted = 0;
private volatile int numFastLoops;
public HeapEvictor(final InternalCache cache, StatisticsClock statisticsClock) {
this(cache, EVICTOR_THREAD_NAME, statisticsClock);
}
public HeapEvictor(final InternalCache cache, final String threadName,
StatisticsClock statisticsClock) {
this.cache = cache;
if (!DISABLE_HEAP_EVICTOR_THREAD_POOL) {
QueueStatHelper poolStats = this.cache.getCachePerfStats().getEvictionQueueStatHelper();
this.evictorThreadPool = CoreLoggingExecutors.newFixedThreadPoolWithTimeout(threadName,
MAX_EVICTOR_THREADS, 15, poolStats);
} else {
// disabled
this.evictorThreadPool = null;
}
this.statisticsClock = statisticsClock;
}
protected InternalCache cache() {
return this.cache;
}
protected boolean includePartitionedRegion(PartitionedRegion region) {
return region.getEvictionAttributes().getAlgorithm().isLRUHeap()
&& region.getDataStore() != null && !region.getAttributes().getOffHeap();
}
protected boolean includeLocalRegion(LocalRegion region) {
return region.getEvictionAttributes().getAlgorithm().isLRUHeap()
&& !region.getAttributes().getOffHeap();
}
private List<LocalRegion> getAllRegionList() {
List<LocalRegion> allRegionsList = new ArrayList<>();
InternalResourceManager resourceManager = (InternalResourceManager) cache.getResourceManager();
for (ResourceListener<MemoryEvent> listener : resourceManager
.getResourceListeners(getResourceType())) {
if (listener instanceof PartitionedRegion) {
PartitionedRegion partitionedRegion = (PartitionedRegion) listener;
if (includePartitionedRegion(partitionedRegion)) {
allRegionsList.addAll(partitionedRegion.getDataStore().getAllLocalBucketRegions());
}
} else if (listener instanceof LocalRegion) {
LocalRegion lr = (LocalRegion) listener;
if (includeLocalRegion(lr)) {
allRegionsList.add(lr);
}
}
}
if (HeapEvictor.MINIMUM_ENTRIES_PER_BUCKET > 0) {
Iterator<LocalRegion> iterator = allRegionsList.iterator();
while (iterator.hasNext()) {
LocalRegion region = iterator.next();
if (region instanceof BucketRegion) {
if (((BucketRegion) region)
.getNumEntriesInVM() <= HeapEvictor.MINIMUM_ENTRIES_PER_BUCKET) {
iterator.remove();
}
}
}
}
return allRegionsList;
}
private List<LocalRegion> getAllSortedRegionList() {
List<LocalRegion> allRegionList = getAllRegionList();
// Capture the sizes so that they do not change while sorting
final Object2LongOpenHashMap<LocalRegion> sizes =
new Object2LongOpenHashMap<>(allRegionList.size());
for (LocalRegion region : allRegionList) {
long size = region instanceof BucketRegion ? ((BucketRegion) region).getSizeForEviction()
: region.size();
sizes.put(region, size);
}
// Sort with respect to other PR buckets also in case of multiple PRs
allRegionList.sort((region1, region2) -> {
long numEntries1 = sizes.get(region1);
long numEntries2 = sizes.get(region2);
if (numEntries1 > numEntries2) {
return -1;
} else if (numEntries1 < numEntries2) {
return 1;
}
return 0;
});
return allRegionList;
}
private void executeInThreadPool(Runnable task) {
try {
evictorThreadPool.execute(task);
} catch (RejectedExecutionException e) {
// ignore rejection if evictor no longer running
if (isRunning()) {
throw e;
}
}
}
public ExecutorService getEvictorThreadPool() {
if (isRunning()) {
return evictorThreadPool;
}
return null;
}
private void createAndSubmitWeightedRegionEvictionTasks() {
List<LocalRegion> allRegionList = getAllSortedRegionList();
float numEntriesInVM = 0;
for (LocalRegion region : allRegionList) {
if (region instanceof BucketRegion) {
numEntriesInVM += ((BucketRegion) region).getSizeForEviction();
} else {
numEntriesInVM += region.getRegionMap().sizeInVM();
}
}
for (LocalRegion region : allRegionList) {
float regionEntryCount;
if (region instanceof BucketRegion) {
regionEntryCount = ((BucketRegion) region).getSizeForEviction();
} else {
regionEntryCount = region.getRegionMap().sizeInVM();
}
float percentage = regionEntryCount / numEntriesInVM;
long bytesToEvictPerTask = (long) (getTotalBytesToEvict() * percentage);
List<LocalRegion> regionsForSingleTask = new ArrayList<>(1);
regionsForSingleTask.add(region);
if (mustEvict()) {
executeInThreadPool(new RegionEvictorTask(cache.getCachePerfStats(), regionsForSingleTask,
this, bytesToEvictPerTask, statisticsClock));
} else {
break;
}
}
}
private Set<RegionEvictorTask> createRegionEvictionTasks() {
if (getEvictorThreadPool() == null) {
return Collections.emptySet();
}
int threadsAvailable = MAX_EVICTOR_THREADS;
long bytesToEvictPerTask = getTotalBytesToEvict() / threadsAvailable;
List<LocalRegion> allRegionList = getAllRegionList();
if (allRegionList.isEmpty()) {
return Collections.emptySet();
}
// This shuffling is not required when eviction triggered for the first time
Collections.shuffle(allRegionList);
int allRegionSetSize = allRegionList.size();
Set<RegionEvictorTask> evictorTaskSet = new HashSet<>();
if (allRegionSetSize <= threadsAvailable) {
for (LocalRegion region : allRegionList) {
List<LocalRegion> regionList = new ArrayList<>(1);
regionList.add(region);
RegionEvictorTask task =
new RegionEvictorTask(cache.getCachePerfStats(), regionList, this, bytesToEvictPerTask,
statisticsClock);
evictorTaskSet.add(task);
}
for (RegionEvictorTask regionEvictorTask : evictorTaskSet) {
testTaskSetSizes.add(regionEvictorTask.getRegionList().size());
}
return evictorTaskSet;
}
int numRegionsInTask = allRegionSetSize / threadsAvailable;
List<LocalRegion> regionsForSingleTask = null;
Iterator<LocalRegion> regionIterator = allRegionList.iterator();
for (int i = 0; i < threadsAvailable; i++) {
regionsForSingleTask = new ArrayList<>(numRegionsInTask);
int count = 1;
while (count <= numRegionsInTask) {
if (regionIterator.hasNext()) {
regionsForSingleTask.add(regionIterator.next());
}
count++;
}
evictorTaskSet.add(new RegionEvictorTask(cache.getCachePerfStats(), regionsForSingleTask,
this, bytesToEvictPerTask, statisticsClock));
}
// Add leftover regions to last task
while (regionIterator.hasNext()) {
regionsForSingleTask.add(regionIterator.next());
}
for (RegionEvictorTask regionEvictorTask : evictorTaskSet) {
testTaskSetSizes.add(regionEvictorTask.getRegionList().size());
}
return evictorTaskSet;
}
@Override
public void onEvent(final MemoryEvent event) {
if (DISABLE_HEAP_EVICTOR_THREAD_POOL) {
return;
}
// Do we care about eviction events and did the eviction event originate
// in this VM ...
if (isRunning() && event.isLocal()) {
if (event.getState().isEviction()) {
// Have we previously received an eviction event and already started eviction ...
if (this.mustEvict.get()) {
if (logger.isDebugEnabled()) {
logger.debug("Updating eviction in response to memory event: {}", event);
}
// We lock here to make sure that the thread that was previously
// started and running eviction loops is in a state where it's okay
// to update the number of fast loops to perform.
synchronized (evictionLock) {
numEvictionLoopsCompleted = 0;
numFastLoops = (int) ((event.getBytesUsed()
- event.getThresholds().getEvictionThresholdClearBytes() + getTotalBytesToEvict())
/ getTotalBytesToEvict());
evictionLock.notifyAll();
}
// We've updated the number of fast loops to perform, and there's
// already a thread running the evictions, so we're done.
return;
}
if (!this.mustEvict.compareAndSet(false, true)) {
// Another thread just started evicting.
return;
}
numEvictionLoopsCompleted = 0;
numFastLoops =
(int) ((event.getBytesUsed() - event.getThresholds().getEvictionThresholdClearBytes()
+ getTotalBytesToEvict()) / getTotalBytesToEvict());
if (logger.isDebugEnabled()) {
logger.debug("Starting eviction in response to memory event: {}", event);
}
// The new thread which will run in a loop performing evictions
final Runnable evictionManagerTask = new Runnable() {
@Override
public void run() {
// Has the test hook been set which will cause eviction to abort early
if (numEvictionLoopsCompleted < getTestAbortAfterLoopCount()) {
try {
// Submit tasks into the queue to do the evictions
if (EVICT_HIGH_ENTRY_COUNT_BUCKETS_FIRST) {
createAndSubmitWeightedRegionEvictionTasks();
} else {
for (RegionEvictorTask task : createRegionEvictionTasks()) {
executeInThreadPool(task);
}
}
// Make sure that another thread isn't processing a new eviction event
// and changing the number of fast loops to perform.
synchronized (evictionLock) {
int delayTime = getEvictionLoopDelayTime();
if (logger.isDebugEnabled()) {
logger.debug(
"Eviction loop delay time calculated to be {} milliseconds. Fast Loops={}, Loop #={}",
delayTime, numFastLoops, numEvictionLoopsCompleted + 1);
}
numEvictionLoopsCompleted++;
try {
// Wait and release the lock so that the number of fast loops
// needed can be updated by another thread processing a new
// eviction event.
evictionLock.wait(delayTime);
} catch (InterruptedException ignored) {
// Loop and try again
}
}
// Do we think we're still above the eviction threshold ...
if (HeapEvictor.this.mustEvict.get()) {
// Submit this runnable back into the thread pool and execute
// another pass at eviction.
executeInThreadPool(this);
}
} catch (RegionDestroyedException ignored) {
// A region destroyed exception might be thrown for Region.size() when a bucket
// moves due to rebalancing. retry submitting the eviction task without
// logging an error message. fixes bug 48162
if (HeapEvictor.this.mustEvict.get()) {
executeInThreadPool(this);
}
}
}
}
};
// Submit the first pass at eviction into the pool
executeInThreadPool(evictionManagerTask);
} else {
this.mustEvict.set(false);
}
}
}
protected int getEvictionLoopDelayTime() {
int delayTime = 850; // The waiting period when running fast loops
if (numEvictionLoopsCompleted - numFastLoops > 2) {
delayTime = 3000; // Way below the threshold
} else if (numEvictionLoopsCompleted >= numFastLoops) {
delayTime = (numEvictionLoopsCompleted - numFastLoops + 3) * 500; // Just below the threshold
}
return delayTime;
}
boolean mustEvict() {
return this.mustEvict.get();
}
public void close() {
if (isRunning.compareAndSet(true, false)) {
evictorThreadPool.shutdownNow();
}
}
private boolean isRunning() {
return isRunning.get();
}
public List<Integer> testOnlyGetSizeOfTasks() {
if (isRunning()) {
return testTaskSetSizes;
}
return null;
}
public long getTotalBytesToEvict() {
return TOTAL_BYTES_TO_EVICT_FROM_HEAP;
}
protected ResourceType getResourceType() {
return ResourceType.HEAP_MEMORY;
}
private int getTestAbortAfterLoopCount() {
return testAbortAfterLoopCount;
}
public void setTestAbortAfterLoopCount(int testAbortAfterLoopCount) {
this.testAbortAfterLoopCount = testAbortAfterLoopCount;
}
int numEvictionLoopsCompleted() {
return this.numEvictionLoopsCompleted;
}
int numFastLoops() {
return this.numFastLoops;
}
private static long setTotalBytesToEvictFromHeap() {
float evictionBurstPercentage = Float.parseFloat(System
.getProperty(GEMFIRE_PREFIX + "HeapLRUCapacityController.evictionBurstPercentage", "0.4"));
long maxTenuredBytes = HeapMemoryMonitor.getTenuredPoolMaxMemory();
return (long) (maxTenuredBytes * 0.01 * evictionBurstPercentage);
}
}