| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.internal.cache.lru; |
| |
| import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import com.gemstone.gemfire.LogWriter; |
| import com.gemstone.gemfire.cache.Cache; |
| import com.gemstone.gemfire.cache.RegionDestroyedException; |
| import com.gemstone.gemfire.distributed.internal.OverflowQueueWithDMStats; |
| import com.gemstone.gemfire.internal.cache.BucketRegion; |
| import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; |
| import com.gemstone.gemfire.internal.cache.LocalRegion; |
| import com.gemstone.gemfire.internal.cache.PartitionedRegion; |
| import com.gemstone.gemfire.internal.cache.RegionEvictorTask; |
| import com.gemstone.gemfire.internal.cache.control.HeapMemoryMonitor; |
| import com.gemstone.gemfire.internal.cache.control.InternalResourceManager; |
| import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType; |
| import com.gemstone.gemfire.internal.cache.control.MemoryEvent; |
| import com.gemstone.gemfire.internal.cache.control.ResourceListener; |
| import com.gemstone.gemfire.internal.lang.ThreadUtils; |
| import com.gemstone.gemfire.internal.logging.LogService; |
| import com.gemstone.gemfire.internal.logging.LoggingThreadGroup; |
| |
| /** |
| * Triggers centralized eviction(asynchronously) when the ResourceManager sends |
| * an eviction event for on-heap regions. This is registered with the ResourceManager. |
| * |
| * @author Yogesh, Suranjan, Amardeep, rholmes |
| * @since 6.0 |
| * |
| */ |
| public class HeapEvictor implements ResourceListener<MemoryEvent> { |
| private static final Logger logger = LogService.getLogger(); |
| |
| |
| // Add 1 for the management task that's putting more eviction tasks on the queue |
| public static final int MAX_EVICTOR_THREADS = Integer.getInteger( |
| "gemfire.HeapLRUCapacityController.MAX_EVICTOR_THREADS", (Runtime.getRuntime().availableProcessors()*4)) + 1; |
| |
| public static final boolean DISABLE_HEAP_EVICTIOR_THREAD_POOL = Boolean |
| .getBoolean("gemfire.HeapLRUCapacityController.DISABLE_HEAP_EVICTIOR_THREAD_POOL"); |
| |
| public static final boolean EVICT_HIGH_ENTRY_COUNT_BUCKETS_FIRST = Boolean.valueOf( |
| System.getProperty( |
| "gemfire.HeapLRUCapacityController.evictHighEntryCountBucketsFirst", |
| "true")).booleanValue(); |
| |
| public static final int MINIMUM_ENTRIES_PER_BUCKET = Integer |
| .getInteger("gemfire.HeapLRUCapacityController.inlineEvictionThreshold",0); |
| |
| public static final long TOTAL_BYTES_TO_EVICT_FROM_HEAP; |
| |
| public static final int BUCKET_SORTING_INTERVAL = Integer.getInteger( |
| "gemfire.HeapLRUCapacityController.higherEntryCountBucketCalculationInterval", |
| 100).intValue(); |
| |
| private static final String EVICTOR_THREAD_GROUP_NAME = "EvictorThreadGroup"; |
| |
| private static final String EVICTOR_THREAD_NAME = "EvictorThread"; |
| |
| static { |
| float evictionBurstPercentage = Float.parseFloat(System.getProperty( |
| "gemfire.HeapLRUCapacityController.evictionBurstPercentage", "0.4")); |
| long maxTenuredBytes = HeapMemoryMonitor.getTenuredPoolMaxMemory(); |
| TOTAL_BYTES_TO_EVICT_FROM_HEAP = (long)(maxTenuredBytes * 0.01 * evictionBurstPercentage); |
| } |
| |
| private ThreadPoolExecutor evictorThreadPool = null; |
| |
| private AtomicBoolean mustEvict = new AtomicBoolean(false); |
| |
| protected final Cache cache; |
| |
| private final ArrayList testTaskSetSizes = new ArrayList(); |
| public volatile int testAbortAfterLoopCount = Integer.MAX_VALUE; |
| |
| private BlockingQueue<Runnable> poolQueue; |
| |
| private AtomicBoolean isRunning = new AtomicBoolean(true); |
| |
| public HeapEvictor(Cache gemFireCache) { |
| this.cache = gemFireCache; |
| initializeEvictorThreadPool(); |
| } |
| |
| protected boolean includePartitionedRegion(PartitionedRegion region) { |
| return (region.getEvictionAttributes().getAlgorithm().isLRUHeap() |
| && (region.getDataStore() != null) |
| && !region.getAttributes().getOffHeap()); |
| } |
| |
| protected boolean includeLocalRegion(LocalRegion region) { |
| return (region.getEvictionAttributes().getAlgorithm().isLRUHeap() |
| && !region.getAttributes().getOffHeap()); |
| } |
| |
| private List<LocalRegion> getAllRegionList() { |
| List<LocalRegion> allRegionList = new ArrayList<LocalRegion>(); |
| InternalResourceManager irm = (InternalResourceManager)cache |
| .getResourceManager(); |
| |
| for (ResourceListener<MemoryEvent> listener : irm.getResourceListeners(getResourceType())) { |
| if (listener instanceof PartitionedRegion) { |
| PartitionedRegion pr = (PartitionedRegion)listener; |
| if (includePartitionedRegion(pr)) { |
| allRegionList.addAll(pr.getDataStore().getAllLocalBucketRegions()); |
| } |
| } |
| else if (listener instanceof LocalRegion) { |
| LocalRegion lr = (LocalRegion)listener; |
| if (includeLocalRegion(lr)) { |
| allRegionList.add(lr); |
| } |
| } |
| } |
| |
| if (HeapEvictor.MINIMUM_ENTRIES_PER_BUCKET > 0) { |
| Iterator<LocalRegion> iter = allRegionList.iterator(); |
| while(iter.hasNext()){ |
| LocalRegion lr = iter.next(); |
| if (lr instanceof BucketRegion) { |
| if (((BucketRegion)lr).getNumEntriesInVM() <= HeapEvictor.MINIMUM_ENTRIES_PER_BUCKET) { |
| iter.remove(); |
| } |
| } |
| } |
| } |
| return allRegionList; |
| } |
| |
| private List<LocalRegion> getAllSortedRegionList(){ |
| List<LocalRegion> allRegionList = getAllRegionList(); |
| |
| //Capture the sizes so that they do not change while sorting |
| final Object2LongOpenHashMap sizes = new Object2LongOpenHashMap(allRegionList.size()); |
| for(LocalRegion r : allRegionList) { |
| long size = r instanceof BucketRegion ?((BucketRegion)r).getSizeForEviction() : r.size(); |
| sizes.put(r, size); |
| } |
| |
| //Sort with respect to other PR buckets also in case of multiple PRs |
| Collections.sort(allRegionList, new Comparator<LocalRegion>() { |
| public int compare(LocalRegion r1, LocalRegion r2) { |
| long numEntries1 = sizes.get(r1); |
| long numEntries2 = sizes.get(r2); |
| if (numEntries1 > numEntries2) { |
| return -1; |
| } |
| else if (numEntries1 < numEntries2) { |
| return 1; |
| } |
| return 0; |
| } |
| }); |
| return allRegionList; |
| } |
| |
| public GemFireCacheImpl getGemFireCache() { |
| return (GemFireCacheImpl)this.cache; |
| } |
| |
| private void initializeEvictorThreadPool() { |
| |
| final ThreadGroup evictorThreadGroup = LoggingThreadGroup.createThreadGroup( |
| getEvictorThreadGroupName(), logger); |
| ThreadFactory evictorThreadFactory = new ThreadFactory() { |
| private int next = 0; |
| |
| public Thread newThread(Runnable command) { |
| Thread t = new Thread(evictorThreadGroup, command, getEvictorThreadName() |
| + next++); |
| t.setDaemon(true); |
| return t; |
| } |
| }; |
| |
| if (!DISABLE_HEAP_EVICTIOR_THREAD_POOL) { |
| this.poolQueue = new OverflowQueueWithDMStats(getGemFireCache().getCachePerfStats().getEvictionQueueStatHelper()); |
| this.evictorThreadPool = new ThreadPoolExecutor(MAX_EVICTOR_THREADS, MAX_EVICTOR_THREADS, |
| 15, TimeUnit.SECONDS, this.poolQueue, evictorThreadFactory); |
| } |
| } |
| |
| /** |
| * The task(i.e the region on which eviction needs to be performed) is |
| * assigned to the threadpool. |
| */ |
| private void submitRegionEvictionTask(Callable<Object> task) { |
| evictorThreadPool.submit(task); |
| } |
| |
| public ThreadPoolExecutor getEvictorThreadPool() { |
| if(isRunning.get()) { |
| return evictorThreadPool; |
| } |
| return null; |
| } |
| |
| /** |
| * returns the total number of tasks that are currently being executed or |
| * queued for execution |
| * |
| * @return sum of scheduled and running tasks |
| */ |
| public int getRunningAndScheduledTasks() { |
| if(isRunning.get()){ |
| return this.evictorThreadPool.getActiveCount() |
| + this.evictorThreadPool.getQueue().size(); |
| } |
| return -1; |
| } |
| |
| private void createAndSubmitWeightedRegionEvictionTasks() { |
| List<LocalRegion> allRegionList = getAllSortedRegionList(); |
| float numEntriesInVm = 0 ; |
| for(LocalRegion lr : allRegionList){ |
| if(lr instanceof BucketRegion){ |
| numEntriesInVm = numEntriesInVm + ((BucketRegion)lr).getSizeForEviction(); |
| }else { |
| numEntriesInVm = numEntriesInVm + lr.getRegionMap().sizeInVM(); |
| } |
| } |
| for(LocalRegion lr : allRegionList){ |
| List<LocalRegion> regionsForSingleTask = new ArrayList<LocalRegion>(1); |
| float regionEntryCnt = 0; |
| if(lr instanceof BucketRegion){ |
| regionEntryCnt = ((BucketRegion)lr).getSizeForEviction(); |
| }else { |
| regionEntryCnt = lr.getRegionMap().sizeInVM(); |
| } |
| float percentage = (regionEntryCnt/numEntriesInVm); |
| long bytesToEvictPerTask = (long)(getTotalBytesToEvict() * percentage); |
| regionsForSingleTask.add(lr); |
| if (mustEvict()) { |
| submitRegionEvictionTask(new RegionEvictorTask(regionsForSingleTask, this,bytesToEvictPerTask)); |
| }else { |
| break; |
| } |
| } |
| } |
| |
| private Set<Callable<Object>> createRegionEvictionTasks() { |
| Set<Callable<Object>> evictorTaskSet = new HashSet<Callable<Object>>(); |
| int threadsAvailable = getEvictorThreadPool().getCorePoolSize(); |
| long bytesToEvictPerTask = getTotalBytesToEvict() / threadsAvailable; |
| List<LocalRegion> allRegionList = getAllRegionList(); |
| // This shuffling is not required when eviction triggered for the first time |
| Collections.shuffle(allRegionList); |
| int allRegionSetSize = allRegionList.size(); |
| if (allRegionList.isEmpty()) { |
| return evictorTaskSet; |
| } |
| if (allRegionSetSize <= threadsAvailable) { |
| for (LocalRegion region : allRegionList) { |
| List<LocalRegion> regionList = new ArrayList<LocalRegion>(1); |
| regionList.add(region); |
| Callable<Object> task = new RegionEvictorTask(regionList, this, bytesToEvictPerTask); |
| evictorTaskSet.add(task); |
| } |
| Iterator iterator=evictorTaskSet.iterator(); |
| while(iterator.hasNext()) |
| { |
| RegionEvictorTask regionEvictorTask=(RegionEvictorTask)iterator.next(); |
| testTaskSetSizes.add(regionEvictorTask.getRegionList().size()); |
| } |
| return evictorTaskSet; |
| } |
| int numRegionsInTask = allRegionSetSize / threadsAvailable; |
| List<LocalRegion> regionsForSingleTask = null; |
| Iterator<LocalRegion> itr = allRegionList.iterator(); |
| for (int i = 0; i < threadsAvailable; i++) { |
| int count = 1; |
| regionsForSingleTask = new ArrayList<LocalRegion>(numRegionsInTask); |
| while (count <= numRegionsInTask) { |
| if (itr.hasNext()) { |
| regionsForSingleTask.add(itr.next()); |
| } |
| count++; |
| } |
| evictorTaskSet.add(new RegionEvictorTask(regionsForSingleTask, this,bytesToEvictPerTask)); |
| } |
| //Add leftover regions to last task |
| while (itr.hasNext()) { |
| regionsForSingleTask.add(itr.next()); |
| } |
| |
| Iterator iterator=evictorTaskSet.iterator(); |
| while(iterator.hasNext()) |
| { |
| RegionEvictorTask regionEvictorTask=(RegionEvictorTask)iterator.next(); |
| testTaskSetSizes.add(regionEvictorTask.getRegionList().size()); |
| } |
| return evictorTaskSet; |
| } |
| |
| // Since the amount of memory used is to a large degree dependent upon when |
| // garbage collection is run, it's difficult to determine when to stop |
| // evicting. So, an initial calculation is done to determine the number of |
| // evictions that are likely needed in order to bring memory usage below the |
| // eviction threshold. This number is stored in 'numFastLoops' and we |
| // quickly loop through this number performing evictions. We then continue |
| // to evict, but at a progressively slower rate waiting either for an event |
| // which indicates we've dropped below the eviction threshold or another |
| // eviction event with an updated "number of bytes used". If we get another |
| // eviction event with an updated "number of bytes used" then 'numFastLoops' |
| // is recalculated and we start over. |
| protected volatile int numEvictionLoopsCompleted = 0; |
| protected volatile int numFastLoops; |
| private long previousBytesUsed; |
| private final Object evictionLock = new Object(); |
| @Override |
| public void onEvent(final MemoryEvent event) { |
| if (DISABLE_HEAP_EVICTIOR_THREAD_POOL) { |
| return; |
| } |
| |
| // Do we care about eviction events and did the eviction event originate |
| // in this VM ... |
| if(this.isRunning.get() && event.isLocal()) { |
| if (event.getState().isEviction()) { |
| final LogWriter logWriter = cache.getLogger(); |
| |
| // Have we previously received an eviction event and already started eviction ... |
| if (this.mustEvict.get() == true) { |
| if (logWriter.fineEnabled()) { |
| logWriter.fine("Updating eviction in response to memory event: " + event + ". previousBytesUsed=" + previousBytesUsed); |
| } |
| |
| // We lock here to make sure that the thread that was previously |
| // started and running eviction loops is in a state where it's okay |
| // to update the number of fast loops to perform. |
| synchronized (evictionLock) { |
| numEvictionLoopsCompleted = 0; |
| numFastLoops = (int) ((event.getBytesUsed() - event.getThresholds().getEvictionThresholdClearBytes() |
| + getTotalBytesToEvict()) / getTotalBytesToEvict()); |
| evictionLock.notifyAll(); |
| } |
| |
| // We've updated the number of fast loops to perform, and there's |
| // already a thread running the evictions, so we're done. |
| return; |
| } |
| |
| if (!this.mustEvict.compareAndSet(false, true)) { |
| // Another thread just started evicting. |
| return; |
| } |
| |
| numEvictionLoopsCompleted = 0; |
| numFastLoops = (int) ((event.getBytesUsed() - event.getThresholds().getEvictionThresholdClearBytes() |
| + getTotalBytesToEvict()) / getTotalBytesToEvict()); |
| if (logWriter.fineEnabled()) { |
| logWriter.fine("Starting eviction in response to memory event: " + event); |
| } |
| |
| // The new thread which will run in a loop performing evictions |
| final Runnable evictionManagerTask = new Runnable() { |
| @Override |
| public void run() { |
| // Has the test hook been set which will cause eviction to abort early |
| if (numEvictionLoopsCompleted < testAbortAfterLoopCount) { |
| try { |
| // Submit tasks into the queue to do the evictions |
| if (EVICT_HIGH_ENTRY_COUNT_BUCKETS_FIRST) { |
| createAndSubmitWeightedRegionEvictionTasks(); |
| } else { |
| for (Callable<Object> task : createRegionEvictionTasks()) { |
| submitRegionEvictionTask(task); |
| } |
| } |
| RegionEvictorTask.setLastTaskCompletionTime(System.currentTimeMillis()); |
| |
| // Make sure that another thread isn't processing a new eviction event |
| // and changing the number of fast loops to perform. |
| synchronized (evictionLock) { |
| int delayTime = getEvictionLoopDelayTime(); |
| if (logWriter.fineEnabled()) { |
| logWriter.fine("Eviction loop delay time calculated to be " + delayTime + " milliseconds. Fast Loops=" |
| + numFastLoops + ", Loop #=" + numEvictionLoopsCompleted+1); |
| } |
| numEvictionLoopsCompleted++; |
| try { |
| // Wait and release the lock so that the number of fast loops |
| // needed can be updated by another thread processing a new |
| // eviction event. |
| evictionLock.wait(delayTime); |
| } catch (InterruptedException iex) { |
| // Loop and try again |
| } |
| } |
| |
| // Do we think we're still above the eviction threshold ... |
| if (HeapEvictor.this.mustEvict.get()) { |
| // Submit this runnable back into the thread pool and execute |
| // another pass at eviction. |
| HeapEvictor.this.evictorThreadPool.submit(this); |
| } |
| } catch (RegionDestroyedException e) { |
| // A region destroyed exception might be thrown for Region.size() when a bucket |
| // moves due to rebalancing. retry submitting the eviction task without |
| // logging an error message. fixes bug 48162 |
| if (HeapEvictor.this.mustEvict.get()) { |
| HeapEvictor.this.evictorThreadPool.submit(this); |
| } |
| } |
| } |
| } |
| }; |
| |
| // Submit the first pass at eviction into the pool |
| this.evictorThreadPool.submit(evictionManagerTask); |
| |
| } else { |
| this.mustEvict.set(false); |
| } |
| } |
| } |
| |
| protected int getEvictionLoopDelayTime() { |
| int delayTime = 850; // The waiting period when running fast loops |
| if ((numEvictionLoopsCompleted - numFastLoops) > 2) { |
| delayTime = 3000; // Way below the threshold |
| } else if (numEvictionLoopsCompleted >= numFastLoops) { |
| delayTime = (numEvictionLoopsCompleted - numFastLoops + 3) * 500; // Just below the threshold |
| } |
| |
| return delayTime; |
| } |
| |
| public boolean mustEvict() { |
| return this.mustEvict.get(); |
| } |
| |
| public void close() { |
| getEvictorThreadPool().shutdownNow(); |
| isRunning.set(false); |
| } |
| |
| public ArrayList testOnlyGetSizeOfTasks() |
| { |
| if(isRunning.get()) |
| return testTaskSetSizes; |
| return null; |
| } |
| |
| protected String getEvictorThreadGroupName() { |
| return HeapEvictor.EVICTOR_THREAD_GROUP_NAME; |
| } |
| |
| protected String getEvictorThreadName() { |
| return HeapEvictor.EVICTOR_THREAD_NAME; |
| } |
| |
| public long getTotalBytesToEvict() { |
| return TOTAL_BYTES_TO_EVICT_FROM_HEAP; |
| } |
| |
| protected ResourceType getResourceType() { |
| return ResourceType.HEAP_MEMORY; |
| } |
| } |