blob: fd7af42300fba3282fc6514ed48c112c0a5211d3 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.gemstone.gemfire.cache.CacheClosedException;
import com.gemstone.gemfire.cache.EvictionCriteria;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.google.common.util.concurrent.AbstractScheduledService;
import com.gemstone.gemfire.internal.offheap.Releasable;
/**
* Schedules each iteration periodically. EvictorService takes absolute time and
* a period as input and schedules Eviction at absolute times by calculating the
* interval. For scheduling the next eviction iteration it also takes into
* account the time taken to complete one iteration. If an iteration takes more
* time than the specified period then another iteration is scheduled
* immediately.
*
* @author skumar
*
*/
public class EvictorService extends AbstractScheduledService {
private final EvictionCriteria<Object, Object> criteria;
// period is always in seconds
private long interval;
private volatile boolean stopScheduling;
private long nextScheduleTime;
private GemFireCacheImpl cache;
private Region region;
private volatile ScheduledExecutorService executorService;
public EvictorService(EvictionCriteria<Object, Object> criteria,
long evictorStartTime, long evictorInterval, TimeUnit unit, Region r) {
this.criteria = criteria;
this.interval = unit.toSeconds(evictorInterval);
this.region = r;
try {
this.cache = GemFireCacheImpl.getExisting();
} catch (CacheClosedException cce) {
}
//TODO: Unless we revisit System.currentTimeMillis or cacheTimeMillis keep the default
// long now = (evictorStartTime != 0 ? evictorStartTime
// + this.cache.getDistributionManager().getCacheTimeOffset() : this.cache
// .getDistributionManager().cacheTimeMillis()) / 1000;
long now = this.cache.getDistributionManager().cacheTimeMillis() / 1000;
if (this.cache.getLoggerI18n().fineEnabled()) {
this.cache.getLoggerI18n().fine(
"EvictorService: The startTime(now) is " + now + " evictorStartTime : " + evictorStartTime);
}
this.nextScheduleTime = now + 10;
if (this.cache.getLoggerI18n().fineEnabled()) {
this.cache.getLoggerI18n().fine(
"EvictorService: The startTime is " + this.nextScheduleTime);
}
}
@Override
protected void runOneIteration() throws Exception {
if (this.cache.getLoggerI18n().fineEnabled()) {
this.cache.getLoggerI18n()
.fine(
"EvictorService: Running the iteration at "
+ cache.cacheTimeMillis());
}
if (stopScheduling || checkCancelled(cache)) {
stopScheduling(); // if check cancelled
if (this.cache.getLoggerI18n().fineEnabled()) {
this.cache
.getLoggerI18n()
.fine(
"EvictorService: Abort eviction since stopScheduling OR cancel in progress. Evicted 0 entries ");
}
return;
}
CachePerfStats stats = ((LocalRegion)this.region).getCachePerfStats();
long startEvictionTime = stats.startCustomEviction();
int evicted = 0;
long startEvaluationTime = stats.startEvaluation();
Iterator<Entry<Object, Object>> keysItr = null;
long totalIterationsTime = 0;
keysItr = this.criteria.getKeysToBeEvicted(this.cache
.getDistributionManager().cacheTimeMillis(), this.region);
try {
stats.incEvaluations(this.region.size());
// if we have been asked to stop scheduling
// or the cache is closing stop in between.
while (keysItr.hasNext() && !stopScheduling && !checkCancelled(cache)) {
Map.Entry<Object, Object> entry = keysItr.next();
long startIterationTime = this.cache
.getDistributionManager().cacheTimeMillis();
Object routingObj = entry.getValue();
if (this.cache.getLoggerI18n().fineEnabled()) {
this.cache.getLoggerI18n().fine(
"EvictorService: Going to evict the following entry " + entry);
}
if (this.region instanceof PartitionedRegion) {
try {
PartitionedRegion pr = (PartitionedRegion)this.region;
stats.incEvictionsInProgress();
int bucketId = PartitionedRegionHelper.getHashKey(pr, routingObj);
BucketRegion br = pr.getDataStore().getLocalBucketById(bucketId);
// This has to be called on BucketRegion directly and not on the PR as
// PR doesn't allow operation on Secondary buckets.
if (br != null) {
if (this.cache.getLoggerI18n().fineEnabled()) {
this.cache.getLoggerI18n().fine(
"EvictorService: Going to evict the following entry " + entry
+ " from bucket " + br);
}
if (br.getBucketAdvisor().isPrimary()) {
boolean succ = false;
try {
succ = br.customEvictDestroy(entry.getKey());
} catch (PrimaryBucketException e) {
if (this.cache.getLoggerI18n().fineEnabled()) {
this.cache.getLoggerI18n().warning(
LocalizedStrings.EVICTORSERVICE_CAUGHT_EXCEPTION_0, e);
}
}
if (succ)
evicted++;
if (this.cache.getLoggerI18n().fineEnabled()) {
this.cache.getLoggerI18n()
.fine(
"EvictorService: Evicted the following entry " + entry
+ " from bucket " + br + " successfully " + succ
+ " the value in buk is " /*
* +
* br.get(entry.getKey())
*/);
}
}
}
stats.incEvictions();
} catch (Exception e) {
if (this.cache.getLoggerI18n().fineEnabled()) {
this.cache.getLoggerI18n().warning(
LocalizedStrings.EVICTORSERVICE_CAUGHT_EXCEPTION_0, e);
}
// TODO:
// Do the exception handling .
// Check if the bucket is present
// If the entry could not be evicted then log the warning
// Log any other exception.
}finally{
stats.decEvictionsInProgress();
long endIterationTime = this.cache
.getDistributionManager().cacheTimeMillis();
totalIterationsTime += (endIterationTime - startIterationTime);
}
}
}
}finally {
if(keysItr instanceof Releasable) {
((Releasable)keysItr).release();
}
}
stats.endEvaluation(startEvaluationTime, totalIterationsTime);
if (this.cache.getLoggerI18n().fineEnabled()) {
this.cache.getLoggerI18n().fine(
"EvictorService: Completed an iteration at time "
+ this.cache.getDistributionManager().cacheTimeMillis() / 1000
+ ". Evicted " + evicted + " entries.");
}
stats.endCustomEviction(startEvictionTime);
}
private boolean checkCancelled(GemFireCacheImpl cache) {
if (cache.getCancelCriterion().cancelInProgress() != null) {
return true;
}
return false;
}
@Override
protected Scheduler scheduler() {
return new CustomScheduler() {
@Override
protected Schedule getNextSchedule() throws Exception {
// get the current time in seconds from DM.
// it takes care of clock skew etc in different VMs
long now = cache.getDistributionManager().cacheTimeMillis() / 1000;
if (cache.getLoggerI18n().fineEnabled()) {
cache.getLoggerI18n().fine("EvictorService: Now is " + now);
}
long delay = 0;
if (now < nextScheduleTime) {
delay = nextScheduleTime - now;
}
nextScheduleTime += interval;
// calculate the next immediate time i.e. schedule time in seconds
// set the schedule.delay to that scheduletime - currenttime
if (cache.getLoggerI18n().fineEnabled()) {
cache.getLoggerI18n().fine(
"EvictorService: Returning the next schedule with delay " + delay
+ " next schedule is at : " + nextScheduleTime);
}
return new Schedule(delay, TimeUnit.SECONDS);
}
};
}
/**
* Region.destroy and Region.close should make sure to call this method. This
* will be called here.
*/
public void stopScheduling() {
this.stopScheduling = true;
}
// this will be called when we stop the service.
// not sure if we have to do any cleanup
// to stop the service call stop()
@Override
protected void shutDown() throws Exception {
this.executorService.shutdownNow();
this.region= null;
this.cache = null;
}
// This will be called when we start the service.
// not sure if we have to any intialization
@Override
protected void startUp() throws Exception {
}
public void changeEvictionInterval(long newInterval) {
this.interval = newInterval / 1000;
if (cache.getLoggerI18n().fineEnabled()) {
cache.getLoggerI18n().fine(
"EvictorService: New interval is " + this.interval);
}
}
public void changeStartTime(long newStart) {
this.nextScheduleTime = newStart/1000;
if (cache.getLoggerI18n().fineEnabled()) {
cache.getLoggerI18n().fine("EvictorService: New start time is " + this.nextScheduleTime);
}
}
protected ScheduledExecutorService executor() {
this.executorService = super.executor();
return this.executorService;
}
}