| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.eviction; |
| |
| import static org.apache.geode.distributed.internal.DistributionConfig.GEMFIRE_PREFIX; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.cache.RegionDestroyedException; |
| import org.apache.geode.distributed.internal.QueueStatHelper; |
| import org.apache.geode.internal.cache.BucketRegion; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.LocalRegion; |
| import org.apache.geode.internal.cache.PartitionedRegion; |
| import org.apache.geode.internal.cache.control.HeapMemoryMonitor; |
| import org.apache.geode.internal.cache.control.InternalResourceManager; |
| import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceType; |
| import org.apache.geode.internal.cache.control.MemoryEvent; |
| import org.apache.geode.internal.cache.control.ResourceListener; |
| import org.apache.geode.internal.logging.CoreLoggingExecutors; |
| import org.apache.geode.internal.statistics.StatisticsClock; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| /** |
| * Triggers centralized eviction(asynchronously) when the ResourceManager sends an eviction event |
| * for on-heap regions. This is registered with the ResourceManager. |
| * |
| * @since GemFire 6.0 |
| * |
| */ |
| public class HeapEvictor implements ResourceListener<MemoryEvent> { |
| private static final Logger logger = LogService.getLogger(); |
| |
| // Add 1 for the management task that's putting more eviction tasks on the queue |
| public static final int MAX_EVICTOR_THREADS = |
| Integer.getInteger(GEMFIRE_PREFIX + "HeapLRUCapacityController.MAX_EVICTOR_THREADS", |
| Runtime.getRuntime().availableProcessors() * 4) + 1; |
| |
| public static final boolean EVICT_HIGH_ENTRY_COUNT_BUCKETS_FIRST = |
| Boolean.valueOf(System.getProperty( |
| GEMFIRE_PREFIX + "HeapLRUCapacityController.evictHighEntryCountBucketsFirst", "true")); |
| |
| public static final int MINIMUM_ENTRIES_PER_BUCKET = |
| Integer.getInteger(GEMFIRE_PREFIX + "HeapLRUCapacityController.inlineEvictionThreshold", 0); |
| |
| public static final int BUCKET_SORTING_INTERVAL = Integer.getInteger( |
| GEMFIRE_PREFIX + "HeapLRUCapacityController.higherEntryCountBucketCalculationInterval", 100); |
| |
| private static final boolean DISABLE_HEAP_EVICTOR_THREAD_POOL = Boolean |
| .getBoolean(GEMFIRE_PREFIX + "HeapLRUCapacityController.DISABLE_HEAP_EVICTOR_THREAD_POOL"); |
| |
| private static final long TOTAL_BYTES_TO_EVICT_FROM_HEAP = setTotalBytesToEvictFromHeap(); |
| |
| private static final String EVICTOR_THREAD_NAME = "EvictorThread"; |
| |
| private final Object evictionLock = new Object(); |
| |
| private final AtomicBoolean mustEvict = new AtomicBoolean(false); |
| |
| private final List<Integer> testTaskSetSizes = new ArrayList<>(); |
| |
| private final ExecutorService evictorThreadPool; |
| |
| private final InternalCache cache; |
| |
| private final AtomicBoolean isRunning = new AtomicBoolean(true); |
| |
| private final StatisticsClock statisticsClock; |
| |
| private volatile int testAbortAfterLoopCount = Integer.MAX_VALUE; |
| |
| /* |
| * Since the amount of memory used is to a large degree dependent upon when garbage collection is |
| * run, it's difficult to determine when to stop evicting. So, an initial calculation is done to |
| * determine the number of evictions that are likely needed in order to bring memory usage below |
| * the eviction threshold. This number is stored in 'numFastLoops' and we quickly loop through |
| * this number performing evictions. We then continue to evict, but at a progressively slower rate |
| * waiting either for an event which indicates we've dropped below the eviction threshold or |
| * another eviction event with an updated "number of bytes used". If we get another eviction event |
| * with an updated "number of bytes used" then 'numFastLoops' is recalculated and we start over. |
| */ |
| private volatile int numEvictionLoopsCompleted = 0; |
| private volatile int numFastLoops; |
| |
| public HeapEvictor(final InternalCache cache, StatisticsClock statisticsClock) { |
| this(cache, EVICTOR_THREAD_NAME, statisticsClock); |
| } |
| |
| public HeapEvictor(final InternalCache cache, final String threadName, |
| StatisticsClock statisticsClock) { |
| this.cache = cache; |
| |
| if (!DISABLE_HEAP_EVICTOR_THREAD_POOL) { |
| QueueStatHelper poolStats = this.cache.getCachePerfStats().getEvictionQueueStatHelper(); |
| this.evictorThreadPool = CoreLoggingExecutors.newFixedThreadPoolWithTimeout(threadName, |
| MAX_EVICTOR_THREADS, 15, poolStats); |
| } else { |
| // disabled |
| this.evictorThreadPool = null; |
| } |
| |
| this.statisticsClock = statisticsClock; |
| } |
| |
| protected InternalCache cache() { |
| return this.cache; |
| } |
| |
| protected boolean includePartitionedRegion(PartitionedRegion region) { |
| return region.getEvictionAttributes().getAlgorithm().isLRUHeap() |
| && region.getDataStore() != null && !region.getAttributes().getOffHeap(); |
| } |
| |
| protected boolean includeLocalRegion(LocalRegion region) { |
| return region.getEvictionAttributes().getAlgorithm().isLRUHeap() |
| && !region.getAttributes().getOffHeap(); |
| } |
| |
| private List<LocalRegion> getAllRegionList() { |
| List<LocalRegion> allRegionsList = new ArrayList<>(); |
| InternalResourceManager resourceManager = (InternalResourceManager) cache.getResourceManager(); |
| |
| for (ResourceListener<MemoryEvent> listener : resourceManager |
| .getResourceListeners(getResourceType())) { |
| if (listener instanceof PartitionedRegion) { |
| PartitionedRegion partitionedRegion = (PartitionedRegion) listener; |
| if (includePartitionedRegion(partitionedRegion)) { |
| allRegionsList.addAll(partitionedRegion.getDataStore().getAllLocalBucketRegions()); |
| } |
| } else if (listener instanceof LocalRegion) { |
| LocalRegion lr = (LocalRegion) listener; |
| if (includeLocalRegion(lr)) { |
| allRegionsList.add(lr); |
| } |
| } |
| } |
| |
| if (HeapEvictor.MINIMUM_ENTRIES_PER_BUCKET > 0) { |
| Iterator<LocalRegion> iterator = allRegionsList.iterator(); |
| while (iterator.hasNext()) { |
| LocalRegion region = iterator.next(); |
| if (region instanceof BucketRegion) { |
| if (((BucketRegion) region) |
| .getNumEntriesInVM() <= HeapEvictor.MINIMUM_ENTRIES_PER_BUCKET) { |
| iterator.remove(); |
| } |
| } |
| } |
| } |
| return allRegionsList; |
| } |
| |
| private List<LocalRegion> getAllSortedRegionList() { |
| List<LocalRegion> allRegionList = getAllRegionList(); |
| |
| // Capture the sizes so that they do not change while sorting |
| final Object2LongOpenHashMap<LocalRegion> sizes = |
| new Object2LongOpenHashMap<>(allRegionList.size()); |
| for (LocalRegion region : allRegionList) { |
| long size = region instanceof BucketRegion ? ((BucketRegion) region).getSizeForEviction() |
| : region.size(); |
| sizes.put(region, size); |
| } |
| |
| // Sort with respect to other PR buckets also in case of multiple PRs |
| allRegionList.sort((region1, region2) -> { |
| long numEntries1 = sizes.get(region1); |
| long numEntries2 = sizes.get(region2); |
| if (numEntries1 > numEntries2) { |
| return -1; |
| } else if (numEntries1 < numEntries2) { |
| return 1; |
| } |
| return 0; |
| }); |
| return allRegionList; |
| } |
| |
| private void executeInThreadPool(Runnable task) { |
| try { |
| evictorThreadPool.execute(task); |
| } catch (RejectedExecutionException e) { |
| // ignore rejection if evictor no longer running |
| if (isRunning()) { |
| throw e; |
| } |
| } |
| } |
| |
| public ExecutorService getEvictorThreadPool() { |
| if (isRunning()) { |
| return evictorThreadPool; |
| } |
| return null; |
| } |
| |
| private void createAndSubmitWeightedRegionEvictionTasks() { |
| List<LocalRegion> allRegionList = getAllSortedRegionList(); |
| float numEntriesInVM = 0; |
| for (LocalRegion region : allRegionList) { |
| if (region instanceof BucketRegion) { |
| numEntriesInVM += ((BucketRegion) region).getSizeForEviction(); |
| } else { |
| numEntriesInVM += region.getRegionMap().sizeInVM(); |
| } |
| } |
| |
| for (LocalRegion region : allRegionList) { |
| float regionEntryCount; |
| if (region instanceof BucketRegion) { |
| regionEntryCount = ((BucketRegion) region).getSizeForEviction(); |
| } else { |
| regionEntryCount = region.getRegionMap().sizeInVM(); |
| } |
| |
| float percentage = regionEntryCount / numEntriesInVM; |
| long bytesToEvictPerTask = (long) (getTotalBytesToEvict() * percentage); |
| List<LocalRegion> regionsForSingleTask = new ArrayList<>(1); |
| regionsForSingleTask.add(region); |
| if (mustEvict()) { |
| executeInThreadPool(new RegionEvictorTask(cache.getCachePerfStats(), regionsForSingleTask, |
| this, bytesToEvictPerTask, statisticsClock)); |
| } else { |
| break; |
| } |
| } |
| } |
| |
| private Set<RegionEvictorTask> createRegionEvictionTasks() { |
| if (getEvictorThreadPool() == null) { |
| return Collections.emptySet(); |
| } |
| |
| int threadsAvailable = MAX_EVICTOR_THREADS; |
| long bytesToEvictPerTask = getTotalBytesToEvict() / threadsAvailable; |
| List<LocalRegion> allRegionList = getAllRegionList(); |
| if (allRegionList.isEmpty()) { |
| return Collections.emptySet(); |
| } |
| |
| // This shuffling is not required when eviction triggered for the first time |
| Collections.shuffle(allRegionList); |
| int allRegionSetSize = allRegionList.size(); |
| Set<RegionEvictorTask> evictorTaskSet = new HashSet<>(); |
| if (allRegionSetSize <= threadsAvailable) { |
| for (LocalRegion region : allRegionList) { |
| List<LocalRegion> regionList = new ArrayList<>(1); |
| regionList.add(region); |
| RegionEvictorTask task = |
| new RegionEvictorTask(cache.getCachePerfStats(), regionList, this, bytesToEvictPerTask, |
| statisticsClock); |
| evictorTaskSet.add(task); |
| } |
| for (RegionEvictorTask regionEvictorTask : evictorTaskSet) { |
| testTaskSetSizes.add(regionEvictorTask.getRegionList().size()); |
| } |
| return evictorTaskSet; |
| } |
| |
| int numRegionsInTask = allRegionSetSize / threadsAvailable; |
| List<LocalRegion> regionsForSingleTask = null; |
| Iterator<LocalRegion> regionIterator = allRegionList.iterator(); |
| for (int i = 0; i < threadsAvailable; i++) { |
| regionsForSingleTask = new ArrayList<>(numRegionsInTask); |
| int count = 1; |
| while (count <= numRegionsInTask) { |
| if (regionIterator.hasNext()) { |
| regionsForSingleTask.add(regionIterator.next()); |
| } |
| count++; |
| } |
| evictorTaskSet.add(new RegionEvictorTask(cache.getCachePerfStats(), regionsForSingleTask, |
| this, bytesToEvictPerTask, statisticsClock)); |
| } |
| |
| // Add leftover regions to last task |
| while (regionIterator.hasNext()) { |
| regionsForSingleTask.add(regionIterator.next()); |
| } |
| |
| for (RegionEvictorTask regionEvictorTask : evictorTaskSet) { |
| testTaskSetSizes.add(regionEvictorTask.getRegionList().size()); |
| } |
| return evictorTaskSet; |
| } |
| |
| @Override |
| public void onEvent(final MemoryEvent event) { |
| if (DISABLE_HEAP_EVICTOR_THREAD_POOL) { |
| return; |
| } |
| |
| // Do we care about eviction events and did the eviction event originate |
| // in this VM ... |
| if (isRunning() && event.isLocal()) { |
| if (event.getState().isEviction()) { |
| // Have we previously received an eviction event and already started eviction ... |
| if (this.mustEvict.get()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Updating eviction in response to memory event: {}", event); |
| } |
| |
| // We lock here to make sure that the thread that was previously |
| // started and running eviction loops is in a state where it's okay |
| // to update the number of fast loops to perform. |
| synchronized (evictionLock) { |
| numEvictionLoopsCompleted = 0; |
| numFastLoops = (int) ((event.getBytesUsed() |
| - event.getThresholds().getEvictionThresholdClearBytes() + getTotalBytesToEvict()) |
| / getTotalBytesToEvict()); |
| evictionLock.notifyAll(); |
| } |
| |
| // We've updated the number of fast loops to perform, and there's |
| // already a thread running the evictions, so we're done. |
| return; |
| } |
| |
| if (!this.mustEvict.compareAndSet(false, true)) { |
| // Another thread just started evicting. |
| return; |
| } |
| |
| numEvictionLoopsCompleted = 0; |
| numFastLoops = |
| (int) ((event.getBytesUsed() - event.getThresholds().getEvictionThresholdClearBytes() |
| + getTotalBytesToEvict()) / getTotalBytesToEvict()); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Starting eviction in response to memory event: {}", event); |
| } |
| |
| // The new thread which will run in a loop performing evictions |
| final Runnable evictionManagerTask = new Runnable() { |
| @Override |
| public void run() { |
| // Has the test hook been set which will cause eviction to abort early |
| if (numEvictionLoopsCompleted < getTestAbortAfterLoopCount()) { |
| try { |
| // Submit tasks into the queue to do the evictions |
| if (EVICT_HIGH_ENTRY_COUNT_BUCKETS_FIRST) { |
| createAndSubmitWeightedRegionEvictionTasks(); |
| } else { |
| for (RegionEvictorTask task : createRegionEvictionTasks()) { |
| executeInThreadPool(task); |
| } |
| } |
| |
| // Make sure that another thread isn't processing a new eviction event |
| // and changing the number of fast loops to perform. |
| synchronized (evictionLock) { |
| int delayTime = getEvictionLoopDelayTime(); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Eviction loop delay time calculated to be {} milliseconds. Fast Loops={}, Loop #={}", |
| delayTime, numFastLoops, numEvictionLoopsCompleted + 1); |
| } |
| numEvictionLoopsCompleted++; |
| try { |
| // Wait and release the lock so that the number of fast loops |
| // needed can be updated by another thread processing a new |
| // eviction event. |
| evictionLock.wait(delayTime); |
| } catch (InterruptedException ignored) { |
| // Loop and try again |
| } |
| } |
| |
| // Do we think we're still above the eviction threshold ... |
| if (HeapEvictor.this.mustEvict.get()) { |
| // Submit this runnable back into the thread pool and execute |
| // another pass at eviction. |
| executeInThreadPool(this); |
| } |
| } catch (RegionDestroyedException ignored) { |
| // A region destroyed exception might be thrown for Region.size() when a bucket |
| // moves due to rebalancing. retry submitting the eviction task without |
| // logging an error message. fixes bug 48162 |
| if (HeapEvictor.this.mustEvict.get()) { |
| executeInThreadPool(this); |
| } |
| } |
| } |
| } |
| }; |
| |
| // Submit the first pass at eviction into the pool |
| executeInThreadPool(evictionManagerTask); |
| |
| } else { |
| this.mustEvict.set(false); |
| } |
| } |
| } |
| |
| protected int getEvictionLoopDelayTime() { |
| int delayTime = 850; // The waiting period when running fast loops |
| if (numEvictionLoopsCompleted - numFastLoops > 2) { |
| delayTime = 3000; // Way below the threshold |
| } else if (numEvictionLoopsCompleted >= numFastLoops) { |
| delayTime = (numEvictionLoopsCompleted - numFastLoops + 3) * 500; // Just below the threshold |
| } |
| |
| return delayTime; |
| } |
| |
| boolean mustEvict() { |
| return this.mustEvict.get(); |
| } |
| |
| public void close() { |
| if (isRunning.compareAndSet(true, false)) { |
| evictorThreadPool.shutdownNow(); |
| } |
| } |
| |
| private boolean isRunning() { |
| return isRunning.get(); |
| } |
| |
| public List<Integer> testOnlyGetSizeOfTasks() { |
| if (isRunning()) { |
| return testTaskSetSizes; |
| } |
| return null; |
| } |
| |
| public long getTotalBytesToEvict() { |
| return TOTAL_BYTES_TO_EVICT_FROM_HEAP; |
| } |
| |
| protected ResourceType getResourceType() { |
| return ResourceType.HEAP_MEMORY; |
| } |
| |
| private int getTestAbortAfterLoopCount() { |
| return testAbortAfterLoopCount; |
| } |
| |
| public void setTestAbortAfterLoopCount(int testAbortAfterLoopCount) { |
| this.testAbortAfterLoopCount = testAbortAfterLoopCount; |
| } |
| |
| int numEvictionLoopsCompleted() { |
| return this.numEvictionLoopsCompleted; |
| } |
| |
| int numFastLoops() { |
| return this.numFastLoops; |
| } |
| |
| private static long setTotalBytesToEvictFromHeap() { |
| float evictionBurstPercentage = Float.parseFloat(System |
| .getProperty(GEMFIRE_PREFIX + "HeapLRUCapacityController.evictionBurstPercentage", "0.4")); |
| long maxTenuredBytes = HeapMemoryMonitor.getTenuredPoolMaxMemory(); |
| return (long) (maxTenuredBytes * 0.01 * evictionBurstPercentage); |
| } |
| } |