blob: 78b3215b02192ba98c8a3dba4d780b1947be21cf [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
/**
*
*/
package com.gemstone.gemfire.internal.cache;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.util.ObjectSizer;
import com.gemstone.gemfire.internal.GemFireVersion;
import com.gemstone.gemfire.internal.cache.control.MemoryEvent;
import com.gemstone.gemfire.internal.cache.control.ResourceListener;
import com.gemstone.gemfire.internal.cache.versions.CompactVersionHolder;
import com.gemstone.gemfire.internal.cache.versions.VersionSource;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
import com.gemstone.gemfire.internal.size.ReflectionSingleObjectSizer;
import com.gemstone.gemfire.internal.util.concurrent.StoppableReentrantLock;
/**
* Tombstones are region entries that have been destroyed but are held
* for future concurrency checks. They are timed out after a reasonable
* period of time when there is no longer the possibility of concurrent
* modification conflicts.
* <p>
* The cache holds a tombstone service that is responsible for tracking
* and timing out tombstones.
*
* @author bruce
*/
public class TombstoneService implements ResourceListener<MemoryEvent> {
private static final Logger logger = LogService.getLogger();
/**
* The default tombstone expiration period, in milliseconds for replicated
* regions.<p> This is the period over which the destroy operation may
* conflict with another operation. After this timeout elapses the tombstone
* is put into a GC set for removal. Removal is typically triggered by
* the size of the GC set, but could be influenced by resource managers.
*
* The default is 600,000 milliseconds (10 minutes).
*/
public static long REPLICATED_TOMBSTONE_TIMEOUT = Long.getLong(
"gemfire.tombstone-timeout", 600000L).longValue();
/**
* The default tombstone expiration period in millis for non-replicated
* regions. This tombstone timeout should be shorter than the one for
* replicated regions and need not be excessively long. Making it longer
* than the replicated timeout can cause non-replicated regions to issue
* revisions based on the tombstone that could overwrite modifications made
* by others that no longer have the tombstone.<p>
* The default is 480,000 milliseconds (8 minutes)
*/
public static long CLIENT_TOMBSTONE_TIMEOUT = Long.getLong(
"gemfire.non-replicated-tombstone-timeout", 480000);
/**
* The max number of tombstones in an expired batch. This covers
* all replicated regions, including PR buckets. The default is
* 100,000 expired tombstones.
*/
public static long EXPIRED_TOMBSTONE_LIMIT = Long.getLong("gemfire.tombstone-gc-threshold", 100000);
/**
* The interval to scan for expired tombstones in the queues
*/
public static long DEFUNCT_TOMBSTONE_SCAN_INTERVAL = Long.getLong("gemfire.tombstone-scan-interval", 60000);
/**
* The threshold percentage of free max memory that will trigger tombstone GCs.
* The default percentage is somewhat less than the LRU Heap evictor so that
* we evict tombstones before we start evicting cache data.
*/
public static double GC_MEMORY_THRESHOLD = Integer.getInteger("gemfire.tombstone-gc-memory-threshold",
30 /*100-HeapLRUCapacityController.DEFAULT_HEAP_PERCENTAGE*/) * 0.01;
/** this is a test hook for causing the tombstone service to act as though free memory is low */
public static boolean FORCE_GC_MEMORY_EVENTS = false;
public final static Object debugSync = new Object();
public final static boolean DEBUG_TOMBSTONE_COUNT = Boolean.getBoolean("gemfire.TombstoneService.DEBUG_TOMBSTONE_COUNT"); // TODO:LOG:replace TombstoneService.DEBUG_TOMBSTONE_COUNT
public static boolean IDLE_EXPIRATION = false; // dunit test hook for forced batch expiration
/**
* tasks for cleaning up tombstones
*/
private TombstoneSweeper replicatedTombstoneSweeper;
private TombstoneSweeper nonReplicatedTombstoneSweeper;
/** a tombstone service is tied to a cache */
private GemFireCacheImpl cache;
/**
* two queues, one for replicated regions (including PR buckets) and one for
* other regions. They have different timeout intervals.
*/
private Queue<Tombstone> replicatedTombstones = new ConcurrentLinkedQueue<Tombstone>();
private Queue<Tombstone> nonReplicatedTombstones = new ConcurrentLinkedQueue<Tombstone>();
private AtomicLong replicatedTombstoneQueueSize = new AtomicLong();
private AtomicLong nonReplicatedTombstoneQueueSize = new AtomicLong();
public Object blockGCLock = new Object();
private int progressingDeltaGIICount;
public static TombstoneService initialize(GemFireCacheImpl cache) {
TombstoneService instance = new TombstoneService(cache);
// cache.getResourceManager().addResourceListener(instance); experimental
return instance;
}
private TombstoneService(GemFireCacheImpl cache) {
this.cache = cache;
this.replicatedTombstoneSweeper = new TombstoneSweeper(cache, this.replicatedTombstones,
REPLICATED_TOMBSTONE_TIMEOUT, true, this.replicatedTombstoneQueueSize);
this.nonReplicatedTombstoneSweeper = new TombstoneSweeper(cache, this.nonReplicatedTombstones,
CLIENT_TOMBSTONE_TIMEOUT, false, this.nonReplicatedTombstoneQueueSize);
startSweeper(this.replicatedTombstoneSweeper);
startSweeper(this.nonReplicatedTombstoneSweeper);
}
private void startSweeper(TombstoneSweeper tombstoneSweeper) {
synchronized(tombstoneSweeper) {
if (tombstoneSweeper.sweeperThread == null) {
tombstoneSweeper.sweeperThread = new Thread(LoggingThreadGroup.createThreadGroup("Destroyed Entries Processors",
logger), tombstoneSweeper);
tombstoneSweeper.sweeperThread.setDaemon(true);
String product = "GemFire";
if (tombstoneSweeper == this.replicatedTombstoneSweeper) {
tombstoneSweeper.sweeperThread.setName(product + " Garbage Collection Thread 1");
} else {
tombstoneSweeper.sweeperThread.setName(product + " Garbage Collection Thread 2");
}
tombstoneSweeper.sweeperThread.start();
}
}
}
/**
* this ensures that the background sweeper thread is stopped
*/
public void stop() {
stopSweeper(this.replicatedTombstoneSweeper);
stopSweeper(this.nonReplicatedTombstoneSweeper);
}
private void stopSweeper(TombstoneSweeper t) {
Thread sweeperThread;
synchronized(t) {
sweeperThread = t.sweeperThread;
t.isStopped = true;
if (sweeperThread != null) {
t.notifyAll();
}
}
try {
sweeperThread.join(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
t.tombstones.clear();
}
/**
* Tombstones are markers placed in destroyed entries in order to keep the
* entry around for a while so that it's available for concurrent modification
* detection.
*
* @param r the region holding the entry
* @param entry the region entry that holds the tombstone
* @param destroyedVersion the version that was destroyed
*/
public void scheduleTombstone(LocalRegion r, RegionEntry entry, VersionTag destroyedVersion) {
boolean useReplicated = useReplicatedQueue(r);
Tombstone ts = new Tombstone(entry, r, destroyedVersion);
if (useReplicated) {
this.replicatedTombstones.add(ts);
this.replicatedTombstoneQueueSize.addAndGet(ts.getSize());
} else {
this.nonReplicatedTombstones.add(ts);
this.nonReplicatedTombstoneQueueSize.addAndGet(ts.getSize());
}
}
private boolean useReplicatedQueue(LocalRegion r) {
return (r.getScope().isDistributed() && r.getServerProxy() == null) && r.dataPolicy.withReplication();
}
/**
* remove all tombstones for the given region. Do this when the region is
* cleared or destroyed.
* @param r
*/
public void unscheduleTombstones(LocalRegion r) {
Queue<Tombstone> queue =
r.getAttributes().getDataPolicy().withReplication() ? replicatedTombstones : nonReplicatedTombstones;
long removalSize = 0;
for (Iterator<Tombstone> it=queue.iterator(); it.hasNext(); ) {
Tombstone t = it.next();
if (t.region == r) {
it.remove();
removalSize += t.getSize();
}
}
if (queue == replicatedTombstones) {
replicatedTombstoneQueueSize.addAndGet(-removalSize);
} else {
nonReplicatedTombstoneQueueSize.addAndGet(-removalSize);
}
}
public int getGCBlockCount() {
synchronized(this.blockGCLock) {
return this.progressingDeltaGIICount;
}
}
public int incrementGCBlockCount() {
synchronized(this.blockGCLock) {
return ++this.progressingDeltaGIICount;
}
}
public int decrementGCBlockCount() {
synchronized(this.blockGCLock) {
return --this.progressingDeltaGIICount;
}
}
/**
* remove tombstones from the given region that have region-versions <= those in the given removal map
* @return a collection of keys removed (only if the region is a bucket - empty otherwise)
*/
public Set<Object> gcTombstones(LocalRegion r, Map<VersionSource, Long> regionGCVersions) {
synchronized(this.blockGCLock) {
int count = getGCBlockCount();
if (count > 0) {
// if any delta GII is on going as provider at this member, not to do tombstone GC
if (logger.isDebugEnabled()) {
logger.debug("gcTombstones skipped due to {} Delta GII on going", count);
}
return null;
}
Queue<Tombstone> queue;
boolean replicated = false;
long removalSize = 0;
Tombstone currentTombstone;
StoppableReentrantLock lock = null;
boolean locked = false;
if (logger.isDebugEnabled()) {
logger.debug("gcTombstones invoked for region {} and version map {}", r, regionGCVersions);
}
Set<Tombstone> removals = new HashSet<Tombstone>();
VersionSource myId = r.getVersionMember();
boolean isBucket = r.isUsedForPartitionedRegionBucket();
try {
locked = false;
if (r.getServerProxy() != null) {
queue = this.nonReplicatedTombstones;
lock = this.nonReplicatedTombstoneSweeper.currentTombstoneLock;
lock.lock();
locked = true;
currentTombstone = this.nonReplicatedTombstoneSweeper.currentTombstone;
} else {
queue = this.replicatedTombstones;
replicated = true;
lock = this.replicatedTombstoneSweeper.currentTombstoneLock;
lock.lock();
locked = true;
currentTombstone = this.replicatedTombstoneSweeper.currentTombstone;
}
if (currentTombstone != null && currentTombstone.region == r) {
VersionSource destroyingMember = currentTombstone.getMemberID();
if (destroyingMember == null) {
destroyingMember = myId;
}
Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
if (maxReclaimedRV != null && currentTombstone.getRegionVersion() <= maxReclaimedRV.longValue()) {
removals.add(currentTombstone);
}
}
for (Tombstone t: queue) {
if (t.region == r) {
VersionSource destroyingMember = t.getMemberID();
if (destroyingMember == null) {
destroyingMember = myId;
}
Long maxReclaimedRV = regionGCVersions.get(destroyingMember);
if (maxReclaimedRV != null && t.getRegionVersion() <= maxReclaimedRV.longValue()) {
removals.add(t);
removalSize += t.getSize();
}
}
}
queue.removeAll(removals);
if (replicated) {
this.replicatedTombstoneQueueSize.addAndGet(-removalSize);
} else {
this.nonReplicatedTombstoneQueueSize.addAndGet(-removalSize);
}
} finally {
if (locked) {
lock.unlock();
}
}
//Record the GC versions now, so that we can persist them
for(Map.Entry<VersionSource, Long> entry : regionGCVersions.entrySet()) {
r.getVersionVector().recordGCVersion(entry.getKey(), entry.getValue());
}
//Remove any exceptions from the RVV that are older than the GC version
r.getVersionVector().pruneOldExceptions();
//Persist the GC RVV to disk. This needs to happen BEFORE we remove
//the entries from map, to prevent us from removing a tombstone
//from disk that has a version greater than the persisted
//GV RVV.
if(r.getDataPolicy().withPersistence()) {
//Update the version vector which reflects what has been persisted on disk.
r.getDiskRegion().writeRVVGC(r);
}
Set<Object> removedKeys = new HashSet();
for (Tombstone t: removals) {
if (t.region.getRegionMap().removeTombstone(t.entry, t, false, true) && isBucket) {
removedKeys.add(t.entry.getKey());
}
}
return removedKeys;
} // sync on deltaGIILock
}
/**
* client tombstone removal is key-based if the server is a PR. This is due to the
* server having separate version vectors for each bucket. In the client this causes
* the version vector to make no sense, so we have to send it a collection of the
* keys removed on the server and then we brute-force remove any of them that
* are tombstones on the client
*
* @param r the region affected
* @param tombstoneKeys the keys removed on the server
*/
public void gcTombstoneKeys(LocalRegion r, Set<Object> tombstoneKeys) {
Queue<Tombstone> queue = this.nonReplicatedTombstones;
Set<Tombstone> removals = new HashSet<Tombstone>();
this.nonReplicatedTombstoneSweeper.currentTombstoneLock.lock();
try {
Tombstone currentTombstone = this.nonReplicatedTombstoneSweeper.currentTombstone;
long removalSize = 0;
VersionSource myId = r.getVersionMember();
if (logger.isDebugEnabled()) {
logger.debug("gcTombstones invoked for region {} and keys {}", r, tombstoneKeys);
}
if (currentTombstone != null && currentTombstone.region == r) {
VersionSource destroyingMember = currentTombstone.getMemberID();
if (destroyingMember == null) {
destroyingMember = myId;
}
if (tombstoneKeys.contains(currentTombstone.entry.getKey())) {
removals.add(currentTombstone);
}
}
for (Tombstone t: queue) {
if (t.region == r) {
VersionSource destroyingMember = t.getMemberID();
if (destroyingMember == null) {
destroyingMember = myId;
}
if (tombstoneKeys.contains(t.entry.getKey())) {
removals.add(t);
removalSize += t.getSize();
}
}
}
queue.removeAll(removals);
nonReplicatedTombstoneQueueSize.addAndGet(removalSize);
} finally {
this.nonReplicatedTombstoneSweeper.currentTombstoneLock.unlock();
}
for (Tombstone t: removals) {
//TODO - RVV - to support persistent client regions
//we need to actually record this as a destroy on disk, because
//the GCC RVV doesn't make sense on the client.
t.region.getRegionMap().removeTombstone(t.entry, t, false, true);
}
}
/**
* For test purposes only, force the expiration of a number of tombstones for
* replicated regions.
* @throws InterruptedException
* @return true if the expiration occurred
*/
public boolean forceBatchExpirationForTests(int count) throws InterruptedException {
this.replicatedTombstoneSweeper.testHook_batchExpired = new CountDownLatch(1);
try {
synchronized(this.replicatedTombstoneSweeper) {
this.replicatedTombstoneSweeper.forceExpirationCount+= count;
this.replicatedTombstoneSweeper.notifyAll();
}
//Wait for 30 seconds. If we wait longer, we risk hanging the tests if
//something goes wrong.
return this.replicatedTombstoneSweeper.testHook_batchExpired.await(30, TimeUnit.SECONDS);
} finally {
this.replicatedTombstoneSweeper.testHook_batchExpired=null;
}
}
/**
* Test Hook - slow operation
* verify whether a tombstone is scheduled for expiration
*/
public boolean isTombstoneScheduled(LocalRegion r, RegionEntry re) {
Queue<Tombstone> queue;
if (r.getDataPolicy().withReplication()) {
queue = this.replicatedTombstones;
} else {
queue = this.nonReplicatedTombstones;
}
VersionSource myId = r.getVersionMember();
VersionTag entryTag = re.getVersionStamp().asVersionTag();
int entryVersion = entryTag.getEntryVersion();
for (Tombstone t: queue) {
if (t.region == r) {
VersionSource destroyingMember = t.getMemberID();
if (destroyingMember == null) {
destroyingMember = myId;
}
if (t.region == r
&& t.entry.getKey().equals(re.getKey())
&& t.getEntryVersion() == entryVersion) {
return true;
}
}
}
if (this.replicatedTombstoneSweeper != null) {
return this.replicatedTombstoneSweeper.hasExpiredTombstone(r, re, entryTag);
}
return false;
}
@Override
public String toString() {
return "Destroyed entries GC service. Replicate Queue=" + this.replicatedTombstones.toString()
+ " Non-replicate Queue=" + this.nonReplicatedTombstones
+ (this.replicatedTombstoneSweeper.expiredTombstones != null?
" expired batch size = " + this.replicatedTombstoneSweeper.expiredTombstones.size() : "");
}
private static class Tombstone extends CompactVersionHolder {
// tombstone overhead size
public static int PER_TOMBSTONE_OVERHEAD = ReflectionSingleObjectSizer.REFERENCE_SIZE // queue's reference to the tombstone
+ ReflectionSingleObjectSizer.REFERENCE_SIZE * 3 // entry, region, member ID
+ ReflectionSingleObjectSizer.REFERENCE_SIZE // region entry value (Token.TOMBSTONE)
+ 18; // version numbers and timestamp
RegionEntry entry;
LocalRegion region;
Tombstone(RegionEntry entry, LocalRegion region, VersionTag destroyedVersion) {
super(destroyedVersion);
this.entry = entry;
this.region = region;
}
public int getSize() {
return Tombstone.PER_TOMBSTONE_OVERHEAD // includes per-entry overhead
+ ObjectSizer.DEFAULT.sizeof(entry.getKey());
}
@Override
public String toString() {
String v = super.toString();
StringBuilder sb = new StringBuilder();
sb.append("(").append(entry.getKey()).append("; ")
.append(region.getName()).append("; ").append(v)
.append(")");
return sb.toString();
}
}
private static class TombstoneSweeper implements Runnable {
/**
* the expiration time for tombstones in this sweeper
*/
private final long expiryTime;
/**
* the current tombstones. These are queued for expiration. When tombstones
* are resurrected they are left in this queue and the sweeper thread
* figures out that they are no longer valid tombstones.
*/
Queue<Tombstone> tombstones;
/**
* The size, in bytes, of the queue
*/
AtomicLong queueSize = new AtomicLong();
/**
* the thread that handles tombstone expiration. It reads from the
* tombstone queue.
*/
Thread sweeperThread;
/**
* whether this sweeper accumulates expired tombstones for batch removal
*/
boolean batchMode;
/**
* this suspends batch expiration. It is intended for administrative use
* so an operator can suspend the garbage-collection of tombstones for
* replicated/partitioned regions if a persistent member goes off line
*/
volatile boolean batchExpirationSuspended;
/**
* The sweeper thread's current tombstone
*/
Tombstone currentTombstone;
/**
* a lock protecting the value of currentTombstone from changing
*/
final StoppableReentrantLock currentTombstoneLock;
/**
* tombstones that have expired and are awaiting batch removal. This
* variable is only accessed by the sweeper thread and so is not guarded
*/
Set<Tombstone> expiredTombstones;
/**
* count of entries to forcibly expire due to memory events
*/
private long forceExpirationCount = 0;
/**
* Force batch expiration
*/
private boolean forceBatchExpiration = false;
/**
* Is a batch expiration in progress?
*/
private volatile boolean batchExpirationInProgress;
/**
* A test hook to force expiration of tombstones.
* See @{link {@link TombstoneService#forceBatchExpirationForTests(int)}
*/
private CountDownLatch testHook_batchExpired;
/**
* the cache that owns all of the tombstones in this sweeper
*/
private GemFireCacheImpl cache;
private volatile boolean isStopped;
TombstoneSweeper(GemFireCacheImpl cache,
Queue<Tombstone> tombstones,
long expiryTime,
boolean batchMode,
AtomicLong queueSize) {
this.cache = cache;
this.expiryTime = expiryTime;
this.tombstones = tombstones;
this.queueSize = queueSize;
if (batchMode) {
this.batchMode = true;
this.expiredTombstones = new HashSet<Tombstone>();
}
this.currentTombstoneLock = new StoppableReentrantLock(cache.getCancelCriterion());
}
/** stop tombstone removal for sweepers that have batchMode==true */
@SuppressWarnings("unused")
void suspendBatchExpiration() {
this.batchExpirationSuspended = true;
}
/** enables tombstone removal for sweepers that have batchMode==true */
@SuppressWarnings("unused")
void resumeBatchExpiration () {
if (this.batchExpirationSuspended) {
this.batchExpirationSuspended = false; // volatile write
}
}
/** force a batch GC */
void forceBatchExpiration() {
this.forceBatchExpiration = true;
//this.forceExpirationCount = EXPIRED_TOMBSTONE_LIMIT - this.expiredTombstones.size() + 1;
}
/** if we should GC the batched tombstones, this method will initiate the operation */
private void processBatch() {
if ((!batchExpirationSuspended &&
(this.forceBatchExpiration || (this.expiredTombstones.size() >= EXPIRED_TOMBSTONE_LIMIT)))
|| testHook_batchExpired != null) {
this.forceBatchExpiration = false;
expireBatch();
}
}
/** test hook - unsafe since not synchronized */
boolean hasExpiredTombstone(LocalRegion r, RegionEntry re, VersionTag tag) {
int entryVersion = tag.getEntryVersion();
boolean retry;
do {
retry = false;
try {
for (Tombstone t: this.expiredTombstones) {
if (t.region == r
&& t.entry.getKey().equals(re.getKey())
&& t.getEntryVersion() == entryVersion) {
return true;
}
}
} catch (ConcurrentModificationException e) {
retry = true;
}
} while (retry);
return false;
}
/** expire a batch of tombstones */
private void expireBatch() {
// fix for bug #46087 - OOME due to too many GC threads
if (this.batchExpirationInProgress) {
// incorrect return due to race between this and waiting-pool GC thread is okay
// because the sweeper thread will just try again after its next sleep (max sleep is 10 seconds)
return;
}
synchronized(cache.getTombstoneService().blockGCLock) {
int count = cache.getTombstoneService().getGCBlockCount();
if (count > 0) {
// if any delta GII is on going as provider at this member, not to do tombstone GC
if (logger.isDebugEnabled()) {
logger.debug("expireBatch skipped due to {} Delta GII on going", count);
}
return;
}
this.batchExpirationInProgress = true;
boolean batchScheduled = false;
try {
final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>();
Set<Tombstone> expired = expiredTombstones;
long removalSize = 0;
expiredTombstones = new HashSet<Tombstone>();
if (expired.size() == 0) {
return;
}
//Update the GC RVV for all of the affected regions.
//We need to do this so that we can persist the GC RVV before
//we start removing entries from the map.
for (Tombstone t: expired) {
t.region.getVersionVector().recordGCVersion(t.getMemberID(), t.getRegionVersion());
regionsAffected.add((DistributedRegion)t.region);
}
for (DistributedRegion r: regionsAffected) {
//Remove any exceptions from the RVV that are older than the GC version
r.getVersionVector().pruneOldExceptions();
//Persist the GC RVV to disk. This needs to happen BEFORE we remove
//the entries from map, to prevent us from removing a tombstone
//from disk that has a version greater than the persisted
//GV RVV.
if(r.getDataPolicy().withPersistence()) {
r.getDiskRegion().writeRVVGC(r);
}
}
final Map<LocalRegion, Set<Object>> reapedKeys = new HashMap<LocalRegion, Set<Object>>();
//Remove the tombstones from the in memory region map.
for (Tombstone t: expired) {
// for PR buckets we have to keep track of the keys removed because clients have
// them all lumped in a single non-PR region
if (t.region.getRegionMap().removeTombstone(t.entry, t, false, true) && t.region.isUsedForPartitionedRegionBucket()) {
Set<Object> keys = reapedKeys.get(t.region);
if (keys == null) {
keys = new HashSet<Object>();
reapedKeys.put(t.region, keys);
}
keys.add(t.entry.getKey());
}
removalSize += t.getSize();
}
this.queueSize.addAndGet(-removalSize);
// do messaging in a pool so this thread is not stuck trying to
// communicate with other members
cache.getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
public void run() {
try {
// this thread should not reference other sweeper state, which is not synchronized
for (DistributedRegion r: regionsAffected) {
r.distributeTombstoneGC(reapedKeys.get(r));
}
} finally {
batchExpirationInProgress = false;
}
}
});
batchScheduled = true;
} finally {
if(testHook_batchExpired != null) {
testHook_batchExpired.countDown();
}
if (!batchScheduled) {
batchExpirationInProgress = false;
}
}
} // sync on deltaGIILock
}
/**
* The run loop picks a tombstone off of the expiration queue and waits
* for it to expire. It also periodically scans for resurrected tombstones
* and handles batch expiration. Batch expiration works by tossing the
* expired tombstones into a set and delaying the removal of those tombstones
* from the Region until scheduled points in the calendar.
*/
public void run() {
long minimumRetentionMs = this.expiryTime / 10; // forceExpiration will not work on something younger than this
long maximumSleepTime = 10000;
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
logger.trace(LogMarker.TOMBSTONE, "Destroyed entries sweeper starting with default sleep interval={}", this.expiryTime);
}
currentTombstone = null;
// millis we need to run a scan of queue and batch set for resurrected tombstones
long minimumScanTime = 100;
// how often to perform the scan
long scanInterval = Math.min(DEFUNCT_TOMBSTONE_SCAN_INTERVAL, expiryTime);
long lastScanTime = this.cache.cacheTimeMillis();
while (!isStopped && cache.getCancelCriterion().cancelInProgress() == null) {
Throwable problem = null;
try {
if (this.batchMode) {
cache.getCachePerfStats().setReplicatedTombstonesSize(queueSize.get());
} else {
cache.getCachePerfStats().setNonReplicatedTombstonesSize(queueSize.get());
}
SystemFailure.checkFailure();
long now = this.cache.cacheTimeMillis();
if (forceExpirationCount <= 0) {
if (this.batchMode) {
processBatch();
}
// if we're running out of memory we get a little more aggressive about
// the size of the batch we'll expire
if (GC_MEMORY_THRESHOLD > 0 && this.batchMode) {
// check to see how we're doing on memory
Runtime rt = Runtime.getRuntime();
long freeMemory = rt.freeMemory();
long totalMemory = rt.totalMemory();
long maxMemory = rt.maxMemory();
freeMemory += (maxMemory-totalMemory);
if (FORCE_GC_MEMORY_EVENTS ||
freeMemory / (totalMemory * 1.0) < GC_MEMORY_THRESHOLD) {
forceBatchExpiration = !this.batchExpirationInProgress &&
this.expiredTombstones.size() > (EXPIRED_TOMBSTONE_LIMIT / 4);
if (forceBatchExpiration) {
if (logger.isDebugEnabled()) {
logger.debug("forcing batch expiration due to low memory conditions");
}
}
// forcing expiration of tombstones that have not timed out can cause inconsistencies
// too easily
// if (this.batchMode) {
// forceExpirationCount = EXPIRED_TOMBSTONE_LIMIT - this.expiredTombstones.size();
// } else {
// forceExpirationCount = EXPIRED_TOMBSTONE_LIMIT;
// }
// maximumSleepTime = 1000;
}
}
}
if (currentTombstone == null) {
try {
currentTombstoneLock.lock();
try {
currentTombstone = tombstones.remove();
} finally {
currentTombstoneLock.unlock();
}
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", currentTombstone);
}
} catch (NoSuchElementException e) {
// expected
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
}
forceExpirationCount = 0;
}
}
long sleepTime;
if (currentTombstone == null) {
sleepTime = expiryTime;
} else if (currentTombstone.getVersionTimeStamp()+expiryTime > now && (forceExpirationCount <= 0 || (currentTombstone.getVersionTimeStamp() + expiryTime - now) <= minimumRetentionMs)) {
sleepTime = currentTombstone.getVersionTimeStamp()+expiryTime - now;
} else {
if (forceExpirationCount > 0) {
forceExpirationCount--;
}
sleepTime = 0;
try {
if (batchMode) {
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", currentTombstone);
}
expiredTombstones.add(currentTombstone);
} else {
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", currentTombstone);
}
queueSize.addAndGet(-currentTombstone.getSize());
currentTombstone.region.getRegionMap().removeTombstone(currentTombstone.entry, currentTombstone, false, true);
}
currentTombstoneLock.lock();
try {
currentTombstone = null;
} finally {
currentTombstoneLock.unlock();
}
} catch (CancelException e) {
return;
} catch (Exception e) {
logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR), e);
}
}
if (sleepTime > 0) {
// initial sleeps could be very long, so we reduce the interval to allow
// this thread to periodically sweep up tombstones for resurrected entries
sleepTime = Math.min(sleepTime, scanInterval);
if (sleepTime > minimumScanTime && (now - lastScanTime) > scanInterval) {
lastScanTime = now;
long start = now;
// see if any have been superseded
for (Iterator<Tombstone> it = tombstones.iterator(); it.hasNext(); ) {
Tombstone test = it.next();
if (it.hasNext()) {
if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
it.remove();
this.queueSize.addAndGet(-test.getSize());
if (test == currentTombstone) {
currentTombstoneLock.lock();
try {
currentTombstone = null;
} finally {
currentTombstoneLock.unlock();
}
sleepTime = 0;
}
} else if (batchMode && test != currentTombstone && (test.getVersionTimeStamp()+expiryTime) <= now) {
it.remove();
this.queueSize.addAndGet(-test.getSize());
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", currentTombstone);
}
expiredTombstones.add(test);
sleepTime = 0;
}
}
}
// now check the batch of timed-out tombstones, if there is one
if (batchMode) {
for (Iterator<Tombstone> it = expiredTombstones.iterator(); it.hasNext(); ) {
Tombstone test = it.next();
if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion())) {
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
logger.trace(LogMarker.TOMBSTONE, "removing obsolete tombstone: {}", test);
}
it.remove();
this.queueSize.addAndGet(-test.getSize());
if (test == currentTombstone) {
currentTombstoneLock.lock();
try {
currentTombstone = null;
} finally {
currentTombstoneLock.unlock();
}
sleepTime = 0;
}
}
}
}
if (sleepTime > 0) {
long elapsed = this.cache.cacheTimeMillis() - start;
sleepTime = sleepTime - elapsed;
if (sleepTime <= 0) {
minimumScanTime = elapsed;
continue;
}
}
}
// test hook: if there are expired tombstones and nothing else is expiring soon,
// perform distributed tombstone GC
if (batchMode && IDLE_EXPIRATION && sleepTime >= expiryTime) {
if (this.expiredTombstones.size() > 0) {
expireBatch();
}
}
if (sleepTime > 0) {
try {
sleepTime = Math.min(sleepTime, maximumSleepTime);
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
logger.trace(LogMarker.TOMBSTONE, "sleeping for {}", sleepTime);
}
synchronized(this) {
if(isStopped) {
return;
}
this.wait(sleepTime);
}
} catch (InterruptedException e) {
return;
}
}
} // sleepTime > 0
} catch (CancelException e) {
break;
} catch (VirtualMachineError err) { // GemStoneAddition
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable e) {
SystemFailure.checkFailure();
problem = e;
}
if (problem != null) {
logger.fatal(LocalizedMessage.create(LocalizedStrings.TombstoneService_UNEXPECTED_EXCEPTION), problem);
}
} // while()
} // run()
} // class TombstoneSweeper
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.control.ResourceListener#onEvent(java.lang.Object)
*/
@Override
public void onEvent(MemoryEvent event) {
if (event.isLocal()) {
if (event.getState().isEviction() && !event.getPreviousState().isEviction()) {
this.replicatedTombstoneSweeper.forceBatchExpiration();
}
}
}
}