blob: 494efafed0ce018e9796869452c2b0bc25162926 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.GemFireIOException;
import com.gemstone.gemfire.InvalidDeltaException;
import com.gemstone.gemfire.cache.CacheRuntimeException;
import com.gemstone.gemfire.cache.CacheWriter;
import com.gemstone.gemfire.cache.CacheWriterException;
import com.gemstone.gemfire.cache.CustomEvictionAttributes;
import com.gemstone.gemfire.cache.DiskAccessException;
import com.gemstone.gemfire.cache.EntryExistsException;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.TimeoutException;
import com.gemstone.gemfire.cache.TransactionId;
import com.gemstone.gemfire.cache.query.IndexMaintenanceException;
import com.gemstone.gemfire.cache.query.QueryException;
import com.gemstone.gemfire.cache.query.internal.IndexUpdater;
import com.gemstone.gemfire.cache.query.internal.index.IndexManager;
import com.gemstone.gemfire.cache.query.internal.index.IndexProtocol;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.ClassPathLoader;
import com.gemstone.gemfire.internal.cache.DiskInitFile.DiskRegionFlag;
import com.gemstone.gemfire.internal.cache.FilterRoutingInfo.FilterInfo;
import com.gemstone.gemfire.internal.cache.delta.Delta;
import com.gemstone.gemfire.internal.cache.ha.HAContainerWrapper;
import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.tier.sockets.HAEventWrapper;
import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationException;
import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
import com.gemstone.gemfire.internal.cache.versions.VersionHolder;
import com.gemstone.gemfire.internal.cache.versions.VersionSource;
import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
import com.gemstone.gemfire.internal.concurrent.MapCallbackAdapter;
import com.gemstone.gemfire.internal.concurrent.MapResult;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
import com.gemstone.gemfire.internal.offheap.OffHeapRegionEntryHelper;
import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.Chunk;
import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.internal.offheap.annotations.Retained;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
import com.gemstone.gemfire.internal.sequencelog.EntryLogger;
import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap;
import com.gemstone.gemfire.pdx.PdxInstance;
import com.gemstone.gemfire.pdx.PdxSerializationException;
import com.gemstone.gemfire.pdx.internal.ConvertableToBytes;
/**
* Abstract implementation of {@link RegionMap}that has all the common
* behavior.
*
* @since 3.5.1
*
* @author Darrel Schneider
*
*/
//Asif: In case of sqlFabric System, we are creating a different set of RegionEntry
// which are derived from the concrete GFE RegionEntry classes.
// In future if any new concrete RegionEntry class is defined, the new SqlFabric
// RegionEntry Classes need to be created. There is a junit test in sqlfabric
// which checks for RegionEntry classes of GFE and validates the same with its
// own classes.
abstract class AbstractRegionMap implements RegionMap {
private static final Logger logger = LogService.getLogger();
/** The underlying map for this region. */
protected CustomEntryConcurrentHashMap<Object, Object> map;
/** An internal Listener for index maintenance for SQLFabric. */
private final IndexUpdater indexUpdater;
/**
* This test hook is used to force the conditions for defect 48182.
* This hook is used by Bug48182JUnitTest.
*/
static Runnable testHookRunnableFor48182 = null;
private RegionEntryFactory entryFactory;
private Attributes attr;
private transient Object owner; // the region that owns this map
protected AbstractRegionMap(InternalRegionArguments internalRegionArgs) {
if (internalRegionArgs != null) {
this.indexUpdater = internalRegionArgs.getIndexUpdater();
}
else {
this.indexUpdater = null;
}
}
public final IndexUpdater getIndexUpdater() {
return this.indexUpdater;
}
protected void initialize(Object owner,
Attributes attr,
InternalRegionArguments internalRegionArgs,
boolean isLRU) {
_setAttributes(attr);
setOwner(owner);
_setMap(createConcurrentMap(attr.initialCapacity, attr.loadFactor,
attr.concurrencyLevel, false,
new AbstractRegionEntry.HashRegionEntryCreator()));
final GemFireCacheImpl cache;
boolean isDisk;
boolean withVersioning = false;
boolean offHeap = false;
if (owner instanceof LocalRegion) {
LocalRegion region = (LocalRegion)owner;
isDisk = region.getDiskRegion() != null;
cache = region.getGemFireCache();
withVersioning = region.getConcurrencyChecksEnabled();
offHeap = region.getOffHeap();
}
else if (owner instanceof PlaceHolderDiskRegion) {
offHeap = ((PlaceHolderDiskRegion) owner).getOffHeap();
isDisk = true;
withVersioning = ((PlaceHolderDiskRegion)owner).getFlags().contains(
DiskRegionFlag.IS_WITH_VERSIONING);
cache = GemFireCacheImpl.getInstance();
}
else {
throw new IllegalStateException(
"expected LocalRegion or PlaceHolderDiskRegion");
}
if (cache != null && cache.isSqlfSystem()) {
String provider = GemFireCacheImpl.SQLF_ENTRY_FACTORY_PROVIDER;
try {
Class<?> factoryProvider = ClassPathLoader.getLatest().forName(provider);
Method method = factoryProvider.getDeclaredMethod(
"getRegionEntryFactory", new Class[] { Boolean.TYPE, Boolean.TYPE,
Boolean.TYPE, Object.class, InternalRegionArguments.class });
RegionEntryFactory ref = (RegionEntryFactory)method.invoke(null,
new Object[] { Boolean.valueOf(attr.statisticsEnabled),
Boolean.valueOf(isLRU), Boolean.valueOf(isDisk), owner,
internalRegionArgs });
// TODO need to have the SQLF entry factory support version stamp storage
setEntryFactory(ref);
}
catch (Exception e) {
throw new CacheRuntimeException(
"Exception in obtaining RegionEntry Factory" + " provider class ",
e) {
};
}
}
else {
final RegionEntryFactory factory;
if (attr.statisticsEnabled) {
if (isLRU) {
if (isDisk) {
if (withVersioning) {
if (offHeap) {
factory = VersionedStatsDiskLRURegionEntryOffHeap.getEntryFactory();
} else {
factory = VersionedStatsDiskLRURegionEntryHeap.getEntryFactory();
}
} else {
if (offHeap) {
factory = VMStatsDiskLRURegionEntryOffHeap.getEntryFactory();
} else {
factory = VMStatsDiskLRURegionEntryHeap.getEntryFactory();
}
}
} else {
if (withVersioning) {
if (offHeap) {
factory = VersionedStatsLRURegionEntryOffHeap.getEntryFactory();
} else {
factory = VersionedStatsLRURegionEntryHeap.getEntryFactory();
}
} else {
if (offHeap) {
factory = VMStatsLRURegionEntryOffHeap.getEntryFactory();
} else {
factory = VMStatsLRURegionEntryHeap.getEntryFactory();
}
}
}
} else { // !isLRU
if (isDisk) {
if (withVersioning) {
if (offHeap) {
factory = VersionedStatsDiskRegionEntryOffHeap.getEntryFactory();
} else {
factory = VersionedStatsDiskRegionEntryHeap.getEntryFactory();
}
} else {
if (offHeap) {
factory = VMStatsDiskRegionEntryOffHeap.getEntryFactory();
} else {
factory = VMStatsDiskRegionEntryHeap.getEntryFactory();
}
}
} else {
if (withVersioning) {
if (offHeap) {
factory = VersionedStatsRegionEntryOffHeap.getEntryFactory();
} else {
factory = VersionedStatsRegionEntryHeap.getEntryFactory();
}
} else {
if (offHeap) {
factory = VMStatsRegionEntryOffHeap.getEntryFactory();
} else {
factory = VMStatsRegionEntryHeap.getEntryFactory();
}
}
}
}
}
else { // !statistics enabled
if (isLRU) {
if (isDisk) {
if (withVersioning) {
if (offHeap) {
factory = VersionedThinDiskLRURegionEntryOffHeap.getEntryFactory();
} else {
factory = VersionedThinDiskLRURegionEntryHeap.getEntryFactory();
}
} else {
if (offHeap) {
factory = VMThinDiskLRURegionEntryOffHeap.getEntryFactory();
} else {
factory = VMThinDiskLRURegionEntryHeap.getEntryFactory();
}
}
}
else {
if (withVersioning) {
if (offHeap) {
factory = VersionedThinLRURegionEntryOffHeap.getEntryFactory();
} else {
factory = VersionedThinLRURegionEntryHeap.getEntryFactory();
}
} else {
if (offHeap) {
factory = VMThinLRURegionEntryOffHeap.getEntryFactory();
} else {
factory = VMThinLRURegionEntryHeap.getEntryFactory();
}
}
}
}
else { // !isLRU
if (isDisk) {
if (withVersioning) {
if (offHeap) {
factory = VersionedThinDiskRegionEntryOffHeap.getEntryFactory();
} else {
factory = VersionedThinDiskRegionEntryHeap.getEntryFactory();
}
} else {
if (offHeap) {
factory = VMThinDiskRegionEntryOffHeap.getEntryFactory();
} else {
factory = VMThinDiskRegionEntryHeap.getEntryFactory();
}
}
}
else {
if (withVersioning) {
if (offHeap) {
factory = VersionedThinRegionEntryOffHeap.getEntryFactory();
} else {
factory = VersionedThinRegionEntryHeap.getEntryFactory();
}
} else {
if (offHeap) {
factory = VMThinRegionEntryOffHeap.getEntryFactory();
} else {
factory = VMThinRegionEntryHeap.getEntryFactory();
}
}
}
}
}
setEntryFactory(factory);
}
}
protected CustomEntryConcurrentHashMap<Object, Object> createConcurrentMap(
int initialCapacity, float loadFactor, int concurrencyLevel,
boolean isIdentityMap,
CustomEntryConcurrentHashMap.HashEntryCreator<Object, Object> entryCreator) {
if (entryCreator != null) {
return new CustomEntryConcurrentHashMap<Object, Object>(initialCapacity, loadFactor,
concurrencyLevel, isIdentityMap, entryCreator);
}
else {
return new CustomEntryConcurrentHashMap<Object, Object>(initialCapacity,
loadFactor, concurrencyLevel, isIdentityMap);
}
}
public void changeOwner(LocalRegion r) {
if (r == _getOwnerObject()) {
return;
}
setOwner(r);
}
@Override
public final void setEntryFactory(RegionEntryFactory f) {
this.entryFactory = f;
}
public final RegionEntryFactory getEntryFactory() {
return this.entryFactory;
}
protected final void _setAttributes(Attributes a) {
this.attr = a;
}
public final Attributes getAttributes() {
return this.attr;
}
protected final LocalRegion _getOwner() {
return (LocalRegion)this.owner;
}
protected boolean _isOwnerALocalRegion() {
return this.owner instanceof LocalRegion;
}
protected final Object _getOwnerObject() {
return this.owner;
}
public final void setOwner(Object r) {
this.owner = r;
}
protected final CustomEntryConcurrentHashMap<Object, Object> _getMap() {
return this.map;
}
protected final void _setMap(CustomEntryConcurrentHashMap<Object, Object> m) {
this.map = m;
}
public int size()
{
return _getMap().size();
}
// this is currently used by stats and eviction
@Override
public int sizeInVM() {
return _getMap().size();
}
public boolean isEmpty()
{
return _getMap().isEmpty();
}
public Set keySet()
{
return _getMap().keySet();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public Collection<RegionEntry> regionEntries() {
return (Collection)_getMap().values();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public Collection<RegionEntry> regionEntriesInVM() {
return (Collection)_getMap().values();
}
public final boolean containsKey(Object key) {
RegionEntry re = getEntry(key);
if (re == null) {
return false;
}
if (re.isRemoved()) {
return false;
}
return true;
}
public RegionEntry getEntry(Object key) {
RegionEntry re = (RegionEntry)_getMap().get(key);
if (re != null && re.isMarkedForEviction()) {
// entry has been faulted in from HDFS
return null;
}
return re;
}
protected RegionEntry getEntry(EntryEventImpl event) {
return getEntry(event.getKey());
}
@Override
public final RegionEntry getEntryInVM(Object key) {
return (RegionEntry)_getMap().get(key);
}
public final RegionEntry putEntryIfAbsent(Object key, RegionEntry re) {
RegionEntry value = (RegionEntry)_getMap().putIfAbsent(key, re);
if (value == null && (re instanceof OffHeapRegionEntry)
&& _isOwnerALocalRegion() && _getOwner().isThisRegionBeingClosedOrDestroyed()) {
// prevent orphan during concurrent destroy (#48068)
if (_getMap().remove(key, re)) {
((OffHeapRegionEntry)re).release();
}
_getOwner().checkReadiness(); // throw RegionDestroyedException
}
return value;
}
@Override
public final RegionEntry getOperationalEntryInVM(Object key) {
RegionEntry re = (RegionEntry)_getMap().get(key);
if (re != null && re.isMarkedForEviction()) {
// entry has been faulted in from HDFS
return null;
}
return re;
}
public final void removeEntry(Object key, RegionEntry re, boolean updateStat) {
if (re.isTombstone() && _getMap().get(key) == re && !re.isMarkedForEviction()){
logger.fatal(LocalizedMessage.create(LocalizedStrings.AbstractRegionMap_ATTEMPT_TO_REMOVE_TOMBSTONE), new Exception("stack trace"));
return; // can't remove tombstones except from the tombstone sweeper
}
if (_getMap().remove(key, re)) {
re.removePhase2();
if (updateStat) {
incEntryCount(-1);
}
}
}
public final void removeEntry(Object key, RegionEntry re, boolean updateStat,
EntryEventImpl event, final LocalRegion owner,
final IndexUpdater indexUpdater) {
boolean success = false;
if (re.isTombstone()&& _getMap().get(key) == re && !re.isMarkedForEviction()) {
logger.fatal(LocalizedMessage.create(LocalizedStrings.AbstractRegionMap_ATTEMPT_TO_REMOVE_TOMBSTONE), new Exception("stack trace"));
return; // can't remove tombstones except from the tombstone sweeper
}
try {
if (indexUpdater != null) {
indexUpdater.onEvent(owner, event, re);
}
//This is messy, but custom eviction calls removeEntry
//rather than re.destroy I think to avoid firing callbacks, etc.
//However, the value still needs to be set to removePhase1
//in order to remove the entry from disk.
if(event.isCustomEviction() && !re.isRemoved()) {
try {
re.removePhase1(owner, false);
} catch (RegionClearedException e) {
//that's ok, we were just trying to do evict incoming eviction
}
}
if (_getMap().remove(key, re)) {
re.removePhase2();
success = true;
if (updateStat) {
incEntryCount(-1);
}
}
} finally {
if (indexUpdater != null) {
indexUpdater.postEvent(owner, event, re, success);
}
}
}
protected final void incEntryCount(int delta) {
LocalRegion lr = _getOwner();
if (lr != null) {
CachePerfStats stats = lr.getCachePerfStats();
if (stats != null) {
stats.incEntryCount(delta);
}
}
}
final void incClearCount(LocalRegion lr) {
if (lr != null && !(lr instanceof HARegion)) {
CachePerfStats stats = lr.getCachePerfStats();
if (stats != null) {
stats.incClearCount();
}
}
}
private void _mapClear() {
_getMap().clear();
}
public void close() {
/*
for (SuspectEntryList l: this.suspectEntries.values()) {
for (EntryEventImpl e: l) {
e.release();
}
}
*/
clear(null);
}
/**
* Clear the region and, if an RVV is given, return a collection of the
* version sources in all remaining tags
*/
public Set<VersionSource> clear(RegionVersionVector rvv)
{
Set<VersionSource> result = new HashSet<VersionSource>();
if(!_isOwnerALocalRegion()) {
//Fix for #41333. Just clear the the map
//if we failed during initialization.
_mapClear();
return null;
}
if (logger.isDebugEnabled()) {
logger.debug("Clearing entries for {} rvv={}", _getOwner(), " rvv=" + rvv);
}
LocalRegion lr = _getOwner();
RegionVersionVector localRvv = lr.getVersionVector();
incClearCount(lr);
// lock for size calcs if the region might have tombstones
Object lockObj = lr.getConcurrencyChecksEnabled()? lr.getSizeGuard() : new Object();
synchronized (lockObj) {
if (rvv == null) {
int delta = 0;
try {
delta = sizeInVM(); // TODO soplog need to determine if stats should
// reflect only size in memory or the complete thing
} catch (GemFireIOException e) {
// ignore rather than throwing an exception during cache close
}
int tombstones = lr.getTombstoneCount();
_mapClear();
_getOwner().updateSizeOnClearRegion(delta - tombstones);
_getOwner().incTombstoneCount(-tombstones);
if (delta != 0) {
incEntryCount(-delta);
}
} else {
int delta = 0;
int tombstones = 0;
VersionSource myId = _getOwner().getVersionMember();
if (localRvv != rvv) {
localRvv.recordGCVersions(rvv);
}
final boolean isTraceEnabled = logger.isTraceEnabled();
for (RegionEntry re : regionEntries()) {
synchronized(re) {
Token value = re.getValueAsToken();
// if it's already being removed or the entry is being created we leave it alone
if (value == Token.REMOVED_PHASE1 || value == Token.REMOVED_PHASE2) {
continue;
}
VersionSource id = re.getVersionStamp().getMemberID();
if (id == null) {
id = myId;
}
if (rvv.contains(id, re.getVersionStamp().getRegionVersion())) {
if (isTraceEnabled) {
logger.trace("region clear op is removing {} {}", re.getKey(), re.getVersionStamp());
}
boolean tombstone = re.isTombstone();
// note: it.remove() did not reliably remove the entry so we use remove(K,V) here
if (_getMap().remove(re.getKey(), re)) {
if (OffHeapRegionEntryHelper.doesClearNeedToCheckForOffHeap()) {
GatewaySenderEventImpl.release(re._getValue()); // OFFHEAP _getValue ok
}
//If this is an overflow only region, we need to free the entry on
//disk at this point.
try {
re.removePhase1(lr, true);
} catch (RegionClearedException e) {
//do nothing, it's already cleared.
}
re.removePhase2();
lruEntryDestroy(re);
if (tombstone) {
_getOwner().incTombstoneCount(-1);
tombstones += 1;
} else {
delta += 1;
}
}
} else { // rvv does not contain this entry so it is retained
result.add(id);
}
}
}
_getOwner().updateSizeOnClearRegion(delta);
incEntryCount(-delta);
incEntryCount(-tombstones);
if (logger.isDebugEnabled()) {
logger.debug("Size after clearing = {}", _getMap().size());
}
if (isTraceEnabled && _getMap().size() < 20) {
_getOwner().dumpBackingMap();
}
}
}
return result;
}
public void lruUpdateCallback()
{
// By default do nothing; LRU maps needs to override this method
}
public void lruUpdateCallback(boolean b)
{
// By default do nothing; LRU maps needs to override this method
}
public void lruUpdateCallback(int i)
{
// By default do nothing; LRU maps needs to override this method
}
public boolean disableLruUpdateCallback()
{
// By default do nothing; LRU maps needs to override this method
return false;
}
public void enableLruUpdateCallback()
{
// By default do nothing; LRU maps needs to override this method
}
public void resetThreadLocals()
{
// By default do nothing; LRU maps needs to override this method
}
/**
* Tell an LRU that a new entry has been created
*/
protected void lruEntryCreate(RegionEntry e)
{
// do nothing by default
}
/**
* Tell an LRU that an existing entry has been destroyed
*/
protected void lruEntryDestroy(RegionEntry e)
{
// do nothing by default
}
/**
* Tell an LRU that an existing entry has been modified
*/
protected void lruEntryUpdate(RegionEntry e)
{
// do nothing by default
}
@Override
public void decTxRefCount(RegionEntry e)
{
LocalRegion lr = null;
if (_isOwnerALocalRegion()) {
lr = _getOwner();
}
e.decRefCount(null, lr);
}
public boolean lruLimitExceeded() {
return false;
}
public void lruCloseStats() {
// do nothing by default
}
public void lruEntryFaultIn(LRUEntry entry) {
// do nothing by default
}
/**
* Process an incoming version tag for concurrent operation detection.
* This must be done before modifying the region entry.
* @param re the entry that is to be modified
* @param event the modification to the entry
* @throws InvalidDeltaException if the event contains a delta that cannot be applied
* @throws ConcurrentCacheModificationException if the event is in conflict
* with a previously applied change
*/
private void processVersionTag(RegionEntry re, EntryEventImpl event) {
if (re.getVersionStamp() != null) {
re.getVersionStamp().processVersionTag(event);
// during initialization we record version tag info to detect ops the
// image provider hasn't seen
VersionTag<?> tag = event.getVersionTag();
if (tag != null && !event.getRegion().isInitialized()) {
ImageState is = event.getRegion().getImageState();
if (is != null && !event.getRegion().isUsedForPartitionedRegionBucket()) {
if (logger.isTraceEnabled()) {
logger.trace("recording version tag in image state: {}", tag);
}
is.addVersionTag(event.getKey(), tag);
}
}
}
}
private void processVersionTagForGII(RegionEntry re, LocalRegion owner, VersionTag entryVersion, boolean isTombstone, InternalDistributedMember sender, boolean checkConflicts) {
re.getVersionStamp().processVersionTag(_getOwner(), entryVersion, isTombstone, false, owner.getMyId(), sender, checkConflicts);
}
public void copyRecoveredEntries(RegionMap rm) {
//We need to sort the tombstones before scheduling them,
//so that they will be in the correct order.
OrderedTombstoneMap<RegionEntry> tombstones = new OrderedTombstoneMap<RegionEntry>();
if (rm != null) {
CustomEntryConcurrentHashMap<Object, Object> other = ((AbstractRegionMap)rm)._getMap();
Iterator<Map.Entry<Object, Object>> it = other
.entrySetWithReusableEntries().iterator();
while (it.hasNext()) {
Map.Entry<Object, Object> me = it.next();
it.remove(); // This removes the RegionEntry from "rm" but it does not decrement its refcount to an offheap value.
RegionEntry oldRe = (RegionEntry)me.getValue();
Object key = me.getKey();
@Retained @Released Object value = oldRe._getValueRetain((RegionEntryContext) ((AbstractRegionMap) rm)._getOwnerObject(), true);
try {
if (value == Token.NOT_AVAILABLE) {
// fix for bug 43993
value = null;
}
if (value == Token.TOMBSTONE && !_getOwner().getConcurrencyChecksEnabled()) {
continue;
}
RegionEntry newRe = getEntryFactory().createEntry((RegionEntryContext) _getOwnerObject(), key, value);
copyRecoveredEntry(oldRe, newRe);
// newRe is now in this._getMap().
if (newRe.isTombstone()) {
VersionTag tag = newRe.getVersionStamp().asVersionTag();
tombstones.put(tag, newRe);
}
_getOwner().updateSizeOnCreate(key, _getOwner().calculateRegionEntryValueSize(newRe));
incEntryCount(1);
lruEntryUpdate(newRe);
} finally {
if (OffHeapHelper.release(value)) {
((OffHeapRegionEntry)oldRe).release();
}
}
lruUpdateCallback();
}
} else {
incEntryCount(size());
for (Iterator<RegionEntry> iter = regionEntries().iterator(); iter.hasNext(); ) {
RegionEntry re = iter.next();
if (re.isTombstone()) {
if (re.getVersionStamp() == null) { // bug #50992 - recovery from versioned to non-versioned
incEntryCount(-1);
iter.remove();
continue;
} else {
tombstones.put(re.getVersionStamp().asVersionTag(), re);
}
}
_getOwner().updateSizeOnCreate(re.getKey(), _getOwner().calculateRegionEntryValueSize(re));
}
// Since lru was not being done during recovery call it now.
lruUpdateCallback();
}
//Schedule all of the tombstones, now that we have sorted them
Map.Entry<VersionTag, RegionEntry> entry;
while((entry = tombstones.take()) != null) {
// refresh the tombstone so it doesn't time out too soon
_getOwner().scheduleTombstone(entry.getValue(), entry.getKey());
}
}
protected void copyRecoveredEntry(RegionEntry oldRe, RegionEntry newRe) {
if(newRe.getVersionStamp() != null) {
newRe.getVersionStamp().setMemberID(oldRe.getVersionStamp().getMemberID());
newRe.getVersionStamp().setVersions(oldRe.getVersionStamp().asVersionTag());
}
if (newRe instanceof AbstractOplogDiskRegionEntry) {
((AbstractOplogDiskRegionEntry)newRe).setDiskId(oldRe);
_getOwner().getDiskRegion().replaceIncompatibleEntry((DiskEntry) oldRe, (DiskEntry) newRe);
}
_getMap().put(newRe.getKey(), newRe);
}
@Retained // Region entry may contain an off-heap value
public final RegionEntry initRecoveredEntry(Object key, DiskEntry.RecoveredEntry value) {
boolean needsCallback = false;
@Retained RegionEntry newRe = getEntryFactory().createEntry((RegionEntryContext) _getOwnerObject(), key, value);
synchronized (newRe) {
if (value.getVersionTag()!=null && newRe.getVersionStamp()!=null) {
newRe.getVersionStamp().setVersions(value.getVersionTag());
}
RegionEntry oldRe = putEntryIfAbsent(key, newRe);
while (oldRe != null) {
synchronized (oldRe) {
if (oldRe.isRemoved() && !oldRe.isTombstone()) {
oldRe = putEntryIfAbsent(key, newRe);
if (oldRe != null) {
if (_isOwnerALocalRegion()) {
_getOwner().getCachePerfStats().incRetries();
}
}
}
/*
* Entry already exists which should be impossible.
* Free the current entry (if off-heap) and
* throw an exception.
*/
else {
if (newRe instanceof OffHeapRegionEntry) {
((OffHeapRegionEntry) newRe).release();
}
throw new IllegalStateException("Could not recover entry for key " + key + ". The entry already exists!");
}
} // synchronized
}
if (_isOwnerALocalRegion()) {
_getOwner().updateSizeOnCreate(key, _getOwner().calculateRegionEntryValueSize(newRe));
if (newRe.isTombstone()) {
// refresh the tombstone so it doesn't time out too soon
_getOwner().scheduleTombstone(newRe, newRe.getVersionStamp().asVersionTag());
}
incEntryCount(1); // we are creating an entry that was recovered from disk including tombstone
}
lruEntryUpdate(newRe);
needsCallback = true;
}
if (needsCallback) {
lruUpdateCallback();
}
EntryLogger.logRecovery(_getOwnerObject(), key, value);
return newRe;
}
public final RegionEntry updateRecoveredEntry(Object key, DiskEntry.RecoveredEntry value) {
boolean needsCallback = false;
RegionEntry re = getEntry(key);
if (re == null) {
return null;
}
synchronized (re) {
if (re.isRemoved() && !re.isTombstone()) {
return null;
}
if (value.getVersionTag()!=null && re.getVersionStamp()!=null) {
re.getVersionStamp().setVersions(value.getVersionTag());
}
try {
if (_isOwnerALocalRegion()) {
if (re.isTombstone()) {
// when a tombstone is to be overwritten, unschedule it first
_getOwner().unscheduleTombstone(re);
}
final int oldSize = _getOwner().calculateRegionEntryValueSize(re);
re.setValue(_getOwner(), value); // OFFHEAP no need to call AbstractRegionMap.prepareValueForCache because setValue is overridden for disk and that code takes apart value (RecoveredEntry) and prepares its nested value for the cache
if (re.isTombstone()) {
_getOwner().scheduleTombstone(re, re.getVersionStamp().asVersionTag());
}
_getOwner().updateSizeOnPut(key, oldSize, _getOwner().calculateRegionEntryValueSize(re));
} else {
DiskEntry.Helper.updateRecoveredEntry((PlaceHolderDiskRegion)_getOwnerObject(),
(DiskEntry)re, value, (RegionEntryContext) _getOwnerObject());
}
} catch (RegionClearedException rce) {
throw new IllegalStateException("RegionClearedException should never happen in this context", rce);
}
lruEntryUpdate(re);
needsCallback = true;
}
if (needsCallback) {
lruUpdateCallback();
}
EntryLogger.logRecovery(_getOwnerObject(), key, value);
return re;
}
public final boolean initialImagePut(final Object key,
final long lastModified,
Object newValue,
final boolean wasRecovered,
boolean deferLRUCallback,
VersionTag entryVersion, InternalDistributedMember sender, boolean isSynchronizing)
{
boolean result = false;
boolean done = false;
boolean cleared = false;
final LocalRegion owner = _getOwner();
if (newValue == Token.TOMBSTONE && !owner.getConcurrencyChecksEnabled()) {
return false;
}
if (owner instanceof HARegion && newValue instanceof CachedDeserializable) {
Object actualVal = ((CachedDeserializable)newValue)
.getDeserializedValue(null, null);
if (actualVal instanceof HAEventWrapper) {
HAEventWrapper haEventWrapper = (HAEventWrapper)actualVal;
// Key was removed at sender side so not putting it into the HARegion
if (haEventWrapper.getClientUpdateMessage() == null) {
return false;
}
// Getting the instance from singleton CCN..This assumes only one bridge
// server in the VM
HAContainerWrapper haContainer = (HAContainerWrapper)CacheClientNotifier
.getInstance().getHaContainer();
Map.Entry entry = null;
HAEventWrapper original = null;
synchronized (haContainer) {
entry = (Map.Entry)haContainer.getEntry(haEventWrapper);
if (entry != null) {
original = (HAEventWrapper)entry.getKey();
original.incAndGetReferenceCount();
}
else {
haEventWrapper.incAndGetReferenceCount();
haEventWrapper.setHAContainer(haContainer);
haContainer.put(haEventWrapper, haEventWrapper
.getClientUpdateMessage());
haEventWrapper.setClientUpdateMessage(null);
haEventWrapper.setIsRefFromHAContainer(true);
}
}
if (entry != null) {
HARegionQueue.addClientCQsAndInterestList(entry, haEventWrapper,
haContainer, owner.getName());
haEventWrapper.setClientUpdateMessage(null);
newValue = CachedDeserializableFactory.create(original,
((CachedDeserializable)newValue).getSizeInBytes());
}
}
}
try {
RegionEntry newRe = getEntryFactory().createEntry(owner, key,
Token.REMOVED_PHASE1);
EntryEventImpl event = null;
@Retained @Released Object oldValue = null;
try {
RegionEntry oldRe = null;
synchronized (newRe) {
try {
oldRe = putEntryIfAbsent(key, newRe);
while (!done && oldRe != null) {
synchronized (oldRe) {
if (oldRe.isRemovedPhase2()) {
oldRe = putEntryIfAbsent(key, newRe);
if (oldRe != null) {
owner.getCachePerfStats().incRetries();
}
}
else {
boolean acceptedVersionTag = false;
if (entryVersion != null && owner.concurrencyChecksEnabled) {
Assert.assertTrue(entryVersion.getMemberID() != null, "GII entry versions must have identifiers");
try {
boolean isTombstone = (newValue == Token.TOMBSTONE);
// don't reschedule the tombstone if it hasn't changed
boolean isSameTombstone = oldRe.isTombstone() && isTombstone
&& oldRe.getVersionStamp().asVersionTag()
.equals(entryVersion);
if (isSameTombstone) {
return true;
}
processVersionTagForGII(oldRe, owner, entryVersion, isTombstone, sender, !wasRecovered || isSynchronizing);
acceptedVersionTag = true;
} catch (ConcurrentCacheModificationException e) {
return false;
}
}
final boolean oldIsTombstone = oldRe.isTombstone();
final int oldSize = owner.calculateRegionEntryValueSize(oldRe);
// Neeraj: The below if block is to handle the special
// scenario witnessed in SqlFabric for now. (Though its
// a general scenario). The scenario is that during GII
// it is possible that updates start coming before the
// base value reaches through GII. In that scenario the deltas
// for that particular key is kept on being added to a list
// of deltas. When the base value arrives through this path
// of GII the oldValue will be that list of deltas. When the
// base values arrives the deltas are applied one by one on that list.
// The same scenario is applicable for GemFire also but the below
// code will be executed only in case of sqlfabric now. Probably
// the code can be made more generic for both SQL Fabric and GemFire.
if (indexUpdater != null) {
oldValue = oldRe.getValueInVM(owner); // OFFHEAP: ListOfDeltas
if (oldValue instanceof ListOfDeltas) {
// apply the deltas on this new value. update index
// Make a new event object
// make it an insert operation
LocalRegion rgn = owner;
if (owner instanceof BucketRegion) {
rgn = ((BucketRegion)owner).getPartitionedRegion();
}
event = EntryEventImpl.create(rgn, Operation.CREATE, key, null,
Boolean.TRUE /* indicate that GII is in progress */,
false, null);
try {
event.setOldValue(newValue);
if (logger.isDebugEnabled()) {
logger.debug("initialImagePut: received base value for list of deltas; event: {}", event);
}
((ListOfDeltas)oldValue).apply(event);
Object preparedNewValue =oldRe.prepareValueForCache(owner,
event.getNewValueAsOffHeapDeserializedOrRaw(), true);
if(preparedNewValue instanceof Chunk) {
event.setNewValue(preparedNewValue);
}
oldRe.setValue(owner, preparedNewValue, event);
//event.setNewValue(event.getOldValue());
event.setOldValue(null);
try {
indexUpdater.onEvent(owner, event, oldRe);
lruEntryUpdate(oldRe);
owner.updateSizeOnPut(key, oldSize, owner.calculateRegionEntryValueSize(oldRe));
EntryLogger.logInitialImagePut(_getOwnerObject(), key, newValue);
result = true;
done = true;
break;
} finally {
// this must be done within the oldRe sync block
indexUpdater.postEvent(owner, event, oldRe, done);
}
} finally {
if (event != null) {
event.release();
event = null;
}
}
}
}
try {
if (indexUpdater != null) {
event = EntryEventImpl.create(owner, Operation.CREATE, key,
newValue,
Boolean.TRUE /* indicate that GII is in progress */,
false, null);
indexUpdater.onEvent(owner, event, oldRe);
}
result = oldRe.initialImagePut(owner, lastModified, newValue, wasRecovered, acceptedVersionTag);
if (result) {
if (oldIsTombstone) {
owner.unscheduleTombstone(oldRe);
if (newValue != Token.TOMBSTONE){
lruEntryCreate(oldRe);
} else {
lruEntryUpdate(oldRe);
}
}
if (newValue == Token.TOMBSTONE) {
if (owner.getServerProxy() == null &&
owner.getVersionVector().isTombstoneTooOld(entryVersion.getMemberID(), entryVersion.getRegionVersion())) {
// the received tombstone has already been reaped, so don't retain it
removeTombstone(oldRe, entryVersion, false, false);
return false;
} else {
owner.scheduleTombstone(oldRe, entryVersion);
lruEntryDestroy(oldRe);
}
} else {
int newSize = owner.calculateRegionEntryValueSize(oldRe);
if(!oldIsTombstone) {
owner.updateSizeOnPut(key, oldSize, newSize);
} else {
owner.updateSizeOnCreate(key, newSize);
}
EntryLogger.logInitialImagePut(_getOwnerObject(), key, newValue);
}
}
if (owner.getIndexManager() != null) {
owner.getIndexManager().updateIndexes(oldRe, oldRe.isRemoved() ? IndexManager.ADD_ENTRY : IndexManager.UPDATE_ENTRY,
oldRe.isRemoved() ? IndexProtocol.OTHER_OP : IndexProtocol.AFTER_UPDATE_OP);
}
done = true;
} finally {
if (indexUpdater != null) {
indexUpdater.postEvent(owner, event, oldRe, result);
}
if (event != null) {
event.release();
event = null;
}
}
}
}
}
if (!done) {
boolean versionTagAccepted = false;
if (entryVersion != null && owner.concurrencyChecksEnabled) {
Assert.assertTrue(entryVersion.getMemberID() != null, "GII entry versions must have identifiers");
try {
boolean isTombstone = (newValue == Token.TOMBSTONE);
processVersionTagForGII(newRe, owner, entryVersion, isTombstone, sender, !wasRecovered || isSynchronizing);
versionTagAccepted = true;
} catch (ConcurrentCacheModificationException e) {
return false;
}
}
result = newRe.initialImageInit(owner, lastModified, newValue,
true, wasRecovered, versionTagAccepted);
try {
if (result) {
if (indexUpdater != null) {
event = EntryEventImpl.create(owner, Operation.CREATE, key,
newValue,
Boolean.TRUE /* indicate that GII is in progress */,
false, null);
indexUpdater.onEvent(owner, event, newRe);
}
if (newValue == Token.TOMBSTONE) {
owner.scheduleTombstone(newRe, entryVersion);
} else {
owner.updateSizeOnCreate(key, owner.calculateRegionEntryValueSize(newRe));
EntryLogger.logInitialImagePut(_getOwnerObject(), key, newValue);
lruEntryCreate(newRe);
}
incEntryCount(1);
}
//Update local indexes
if (owner.getIndexManager() != null) {
owner.getIndexManager().updateIndexes(newRe, newRe.isRemoved() ? IndexManager.REMOVE_ENTRY : IndexManager.UPDATE_ENTRY,
newRe.isRemoved() ? IndexProtocol.OTHER_OP : IndexProtocol.AFTER_UPDATE_OP);
}
done = true;
} finally {
if (result && indexUpdater != null) {
indexUpdater.postEvent(owner, event, newRe, done);
}
if (event != null) {
event.release();
event = null;
}
}
}
}
finally {
if (done && result) {
initialImagePutEntry(newRe);
}
if (!done) {
removeEntry(key, newRe, false);
if (owner.getIndexManager() != null) {
owner.getIndexManager().updateIndexes(newRe, IndexManager.REMOVE_ENTRY, IndexProtocol.OTHER_OP);
}
}
}
} // synchronized
} finally {
if (event != null) event.release();
OffHeapHelper.release(oldValue);
}
} catch(RegionClearedException rce) {
//Asif: do not issue any sort of callbacks
done = false;
cleared= true;
}catch(QueryException qe) {
done = false;
cleared= true;
}
finally {
if (done && !deferLRUCallback) {
lruUpdateCallback();
}
else if (!cleared) {
resetThreadLocals();
}
}
return result;
}
protected void initialImagePutEntry(RegionEntry newRe) {
}
boolean confirmEvictionDestroy(RegionEntry re)
{
/* We arn't in an LRU context, and should never get here */
Assert.assertTrue(false,
"Not an LRU region, can not confirm LRU eviction operation");
return true;
}
public final boolean destroy(EntryEventImpl event,
boolean inTokenMode,
boolean duringRI,
boolean cacheWrite,
boolean isEviction,
Object expectedOldValue,
boolean removeRecoveredEntry)
throws CacheWriterException, EntryNotFoundException, TimeoutException {
final LocalRegion owner = _getOwner();
if (owner == null) {
Assert.assertTrue(false, "The owner for RegionMap " + this // "fix" for bug 32440
+ " is null for event " + event);
}
//mbid: this has been added to maintain consistency between the disk region
// and
//and the region map after clear() has been called. This will set the
// reference of
//the diskSegmentRegion as a ThreadLocal so that if the diskRegionSegment
// is later changed
//by another thread, we can do the necessary.
boolean retry = true;
// int retries = -1;
RETRY_LOOP:
while (retry) {
retry = false;
/* this is useful for debugging if you get a hot thread
retries++;
if (retries > 0) {
owner.getCachePerfStats().incRetries();
if (retries == 1000000) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.AbstractRegionMap_RETRIED_1_MILLION_TIMES_FOR_ENTRY_TO_GO_AWAY_0, new Object[] { retryEntry, retryEntry.removeTrace }));
}
}
*/
// boolean lruUpdateCallback = false;
boolean sqlfIndexLocked = false;
if (indexUpdater != null) {
// take read lock for SQLF index initializations if required
sqlfIndexLocked = indexUpdater.lockForIndexGII();
}
boolean opCompleted = false;
boolean doPart3 = false;
// We need to acquire the region entry while holding the lock to avoid #45620.
// However, we also want to release the lock before distribution to prevent
// potential deadlocks. The outer try/finally ensures that the lock will be
// released without fail. I'm avoiding indenting just to preserve the ability
// to track diffs since the code is fairly complex.
boolean doUnlock = true;
lockForCacheModification(owner, event);
try {
RegionEntry re = getOrCreateRegionEntry(owner, event, Token.REMOVED_PHASE1, null, true, true);
RegionEntry tombstone = null;
boolean haveTombstone = false;
/*
* Execute the test hook runnable inline (not threaded) if it is not null.
*/
if(null != testHookRunnableFor48182) {
testHookRunnableFor48182.run();
}
try {
if (logger.isTraceEnabled(LogMarker.LRU_TOMBSTONE_COUNT) && !(owner instanceof HARegion)) {
logger.trace(LogMarker.LRU_TOMBSTONE_COUNT,
"ARM.destroy() inTokenMode={}; duringRI={}; riLocalDestroy={}; withRepl={}; fromServer={}; concurrencyEnabled={}; isOriginRemote={}; isEviction={}; operation={}; re={}",
inTokenMode, duringRI, event.isFromRILocalDestroy(), owner.dataPolicy.withReplication(), event.isFromServer(),
owner.concurrencyChecksEnabled, event.isOriginRemote(), isEviction, event.getOperation(), re);
}
if (event.isFromRILocalDestroy()) {
// for RI local-destroy we don't want to keep tombstones.
// In order to simplify things we just set this recovery
// flag to true to force the entry to be removed
removeRecoveredEntry = true;
}
// the logic in this method is already very involved, and adding tombstone
// permutations to (re != null) greatly complicates it. So, we check
// for a tombstone here and, if found, pretend for a bit that the entry is null
if (re != null && re.isTombstone() && !removeRecoveredEntry) {
tombstone = re;
haveTombstone = true;
re = null;
}
IndexManager oqlIndexManager = owner.getIndexManager() ;
if (re == null) {
// we need to create an entry if in token mode or if we've received
// a destroy from a peer or WAN gateway and we need to retain version
// information for concurrency checks
boolean retainForConcurrency = (!haveTombstone
&& (owner.dataPolicy.withReplication() || event.isFromServer())
&& owner.concurrencyChecksEnabled
&& (event.isOriginRemote() /* destroy received from other must create tombstone */
|| event.isFromWANAndVersioned() /* wan event must create a tombstone */
|| event.isBridgeEvent())); /* event from client must create a tombstone so client has a version # */
if (inTokenMode
|| retainForConcurrency) {
// removeRecoveredEntry should be false in this case
RegionEntry newRe = getEntryFactory().createEntry(owner,
event.getKey(),
Token.REMOVED_PHASE1);
// Fix for Bug #44431. We do NOT want to update the region and wait
// later for index INIT as region.clear() can cause inconsistency if
// happened in parallel as it also does index INIT.
if (oqlIndexManager != null) {
oqlIndexManager.waitForIndexInit();
}
try {
synchronized (newRe) {
RegionEntry oldRe = putEntryIfAbsent(event.getKey(), newRe);
while (!opCompleted && oldRe != null) {
synchronized (oldRe) {
if (oldRe.isRemovedPhase2()) {
oldRe = putEntryIfAbsent(event.getKey(), newRe);
if (oldRe != null) {
owner.getCachePerfStats().incRetries();
}
} else {
event.setRegionEntry(oldRe);
// Last transaction related eviction check. This should
// prevent
// transaction conflict (caused by eviction) when the entry
// is being added to transaction state.
if (isEviction) {
if (!confirmEvictionDestroy(oldRe) || (owner.getEvictionCriteria() != null && !owner.getEvictionCriteria().doEvict(event))) {
opCompleted = false;
return opCompleted;
}
}
try {
//if concurrency checks are enabled, destroy will
//set the version tag
boolean destroyed = destroyEntry(oldRe, event, inTokenMode, cacheWrite, expectedOldValue, false, removeRecoveredEntry);
if (destroyed) {
if (retainForConcurrency) {
owner.basicDestroyBeforeRemoval(oldRe, event);
}
owner.basicDestroyPart2(oldRe, event, inTokenMode,
false /* conflict with clear */, duringRI, true);
// if (!oldRe.isTombstone() || isEviction) {
lruEntryDestroy(oldRe);
// } else { // tombstone
// lruEntryUpdate(oldRe);
// lruUpdateCallback = true;
// }
doPart3 = true;
}
}
catch (RegionClearedException rce) {
// region cleared implies entry is no longer there
// so must throw exception if expecting a particular
// old value
// if (expectedOldValue != null) {
// throw new EntryNotFoundException("entry not found with expected value");
// }
// Ignore. The exception will ensure that we do not update
// the LRU List
owner.basicDestroyPart2(oldRe, event, inTokenMode,
true/* conflict with clear */, duringRI, true);
doPart3 = true;
} catch (ConcurrentCacheModificationException ccme) {
VersionTag tag = event.getVersionTag();
if (tag != null && tag.isTimeStampUpdated()) {
// Notify gateways of new time-stamp.
owner.notifyTimestampsToGateways(event);
}
throw ccme;
}
re = oldRe;
opCompleted = true;
}
} // synchronized oldRe
} // while
if (!opCompleted) {
// The following try has a finally that cleans up the newRe.
// This is only needed if newRe was added to the map which only
// happens if we didn't get completed with oldRe in the above while loop.
try { // bug #42228 - leaving "removed" entries in the cache
re = newRe;
event.setRegionEntry(newRe);
try {
//if concurrency checks are enabled, destroy will
//set the version tag
if (isEviction) {
opCompleted = false;
return opCompleted;
}
opCompleted = destroyEntry(newRe, event, inTokenMode, cacheWrite, expectedOldValue, true, removeRecoveredEntry);
if (opCompleted) {
// This is a new entry that was created because we are in
// token mode or are accepting a destroy operation by adding
// a tombstone. There is no oldValue, so we don't need to
// call updateSizeOnRemove
// owner.recordEvent(event);
event.setIsRedestroyedEntry(true); // native clients need to know if the entry didn't exist
if (retainForConcurrency) {
owner.basicDestroyBeforeRemoval(oldRe, event);
}
owner.basicDestroyPart2(newRe, event, inTokenMode,
false /* conflict with clear */, duringRI, true);
doPart3 = true;
}
}
catch (RegionClearedException rce) {
// region cleared implies entry is no longer there
// so must throw exception if expecting a particular
// old value
// if (expectedOldValue != null) {
// throw new EntryNotFoundException("entry not found with expected value");
// }
// Ignore. The exception will ensure that we do not update
// the LRU List
opCompleted = true;
EntryLogger.logDestroy(event);
// owner.recordEvent(event, newRe);
owner.basicDestroyPart2(newRe, event, inTokenMode, true /* conflict with clear*/, duringRI, true);
doPart3 = true;
} catch (ConcurrentCacheModificationException ccme) {
VersionTag tag = event.getVersionTag();
if (tag != null && tag.isTimeStampUpdated()) {
// Notify gateways of new time-stamp.
owner.notifyTimestampsToGateways(event);
}
throw ccme;
}
// Note no need for LRU work since the entry is destroyed
// and will be removed when gii completes
} finally { // bug #42228
if (!opCompleted && !haveTombstone /* to fix bug 51583 do this for all operations */ ) {
// owner.getLogWriterI18n().warning(LocalizedStrings.DEBUG, "BRUCE: removing incomplete entry");
removeEntry(event.getKey(), newRe, false);
}
if (!opCompleted && isEviction) {
removeEntry(event.getKey(), newRe, false);
}
}
} // !opCompleted
} // synchronized newRe
} finally {
if (oqlIndexManager != null) {
oqlIndexManager.countDownIndexUpdaters();
}
}
} // inTokenMode or tombstone creation
else {
if (!isEviction || owner.concurrencyChecksEnabled) {
// The following ensures that there is not a concurrent operation
// on the entry and leaves behind a tombstone if concurrencyChecksEnabled.
// It fixes bug #32467 by propagating the destroy to the server even though
// the entry isn't in the client
RegionEntry newRe = haveTombstone? tombstone : getEntryFactory().createEntry(owner, event.getKey(),
Token.REMOVED_PHASE1);
synchronized(newRe) {
if (haveTombstone && !tombstone.isTombstone()) {
// we have to check this again under synchronization since it may have changed
retry = true;
//retryEntry = tombstone; // leave this in place for debugging
continue RETRY_LOOP;
}
re = (RegionEntry)_getMap().putIfAbsent(event.getKey(), newRe);
if (re != null && re != tombstone) {
// concurrent change - try again
retry = true;
//retryEntry = tombstone; // leave this in place for debugging
continue RETRY_LOOP;
}
else if (!isEviction) {
boolean throwex = false;
EntryNotFoundException ex = null;
try {
if (!cacheWrite) {
throwex = true;
} else {
try {
if (!removeRecoveredEntry) {
throwex = !owner.bridgeWriteBeforeDestroy(event, expectedOldValue);
}
} catch (EntryNotFoundException e) {
throwex = true;
ex = e;
}
}
if (throwex) {
if (!event.isOriginRemote() && !event.getOperation().isLocal() &&
(event.isFromBridgeAndVersioned() || // if this is a replayed client event that already has a version
event.isFromWANAndVersioned())) { // or if this is a WAN event that has been applied in another system
// we must distribute these since they will update the version information in peers
if (logger.isDebugEnabled()) {
logger.debug("ARM.destroy is allowing wan/client destroy of {} to continue", event.getKey());
}
throwex = false;
event.setIsRedestroyedEntry(true);
// Distribution of this op happens on re and re might me null here before
// distributing this destroy op.
if (re == null) {
re = newRe;
}
doPart3 = true;
}
}
if (throwex) {
if (ex == null) {
// Fix for 48182, check cache state and/or region state before sending entry not found.
// this is from the server and any exceptions will propogate to the client
owner.checkEntryNotFound(event.getKey());
} else {
throw ex;
}
}
} finally {
// either remove the entry or leave a tombstone
try {
if (!event.isOriginRemote() && event.getVersionTag() != null && owner.concurrencyChecksEnabled) {
// this shouldn't fail since we just created the entry.
// it will either generate a tag or apply a server's version tag
processVersionTag(newRe, event);
if (doPart3) {
owner.generateAndSetVersionTag(event, newRe);
}
try {
owner.recordEvent(event);
newRe.makeTombstone(owner, event.getVersionTag());
} catch (RegionClearedException e) {
// that's okay - when writing a tombstone into a disk, the
// region has been cleared (including this tombstone)
}
opCompleted = true;
// lruEntryCreate(newRe);
} else if (!haveTombstone) {
try {
assert newRe != tombstone;
newRe.setValue(owner, Token.REMOVED_PHASE2);
removeEntry(event.getKey(), newRe, false);
} catch (RegionClearedException e) {
// that's okay - we just need to remove the new entry
}
} else if (event.getVersionTag() != null ) { // haveTombstone - update the tombstone version info
processVersionTag(tombstone, event);
if (doPart3) {
owner.generateAndSetVersionTag(event, newRe);
}
// This is not conflict, we need to persist the tombstone again with new version tag
try {
tombstone.setValue(owner, Token.TOMBSTONE);
} catch (RegionClearedException e) {
// that's okay - when writing a tombstone into a disk, the
// region has been cleared (including this tombstone)
}
owner.recordEvent(event);
owner.rescheduleTombstone(tombstone, event.getVersionTag());
owner.basicDestroyPart2(tombstone, event, inTokenMode, true /* conflict with clear*/, duringRI, true);
opCompleted = true;
}
} catch (ConcurrentCacheModificationException ccme) {
VersionTag tag = event.getVersionTag();
if (tag != null && tag.isTimeStampUpdated()) {
// Notify gateways of new time-stamp.
owner.notifyTimestampsToGateways(event);
}
throw ccme;
}
}
}
} // synchronized(newRe)
}
}
} // no current entry
else { // current entry exists
if (oqlIndexManager != null) {
oqlIndexManager.waitForIndexInit();
}
try {
synchronized (re) {
// if the entry is a tombstone and the event is from a peer or a client
// then we allow the operation to be performed so that we can update the
// version stamp. Otherwise we would retain an old version stamp and may allow
// an operation that is older than the destroy() to be applied to the cache
// Bug 45170: If removeRecoveredEntry, we treat tombstone as regular entry to be deleted
boolean createTombstoneForConflictChecks = (owner.concurrencyChecksEnabled
&& (event.isOriginRemote() || event.getContext() != null || removeRecoveredEntry));
if (!re.isRemoved() || createTombstoneForConflictChecks) {
if (re.isRemovedPhase2()) {
retry = true;
continue RETRY_LOOP;
}
if (!event.isOriginRemote() && event.getOperation().isExpiration()) {
// If this expiration started locally then only do it if the RE is not being used by a tx.
if (re.isInUseByTransaction()) {
opCompleted = false;
return opCompleted;
}
}
event.setRegionEntry(re);
// See comment above about eviction checks
if (isEviction) {
assert expectedOldValue == null;
if (!confirmEvictionDestroy(re) || (owner.getEvictionCriteria() != null && !owner.getEvictionCriteria().doEvict(event))) {
opCompleted = false;
return opCompleted;
}
}
boolean removed = false;
try {
opCompleted = destroyEntry(re, event, inTokenMode, cacheWrite, expectedOldValue, false, removeRecoveredEntry);
if (opCompleted) {
// It is very, very important for Partitioned Regions to keep
// the entry in the map until after distribution occurs so that other
// threads performing a create on this entry wait until the destroy
// distribution is finished.
// keeping backup copies consistent. Fix for bug 35906.
// -- mthomas 07/02/2007 <-- how about that date, kinda cool eh?
owner.basicDestroyBeforeRemoval(re, event);
// do this before basicDestroyPart2 to fix bug 31786
if (!inTokenMode) {
if ( re.getVersionStamp() == null) {
re.removePhase2();
removeEntry(event.getKey(), re, true, event, owner,
indexUpdater);
removed = true;
}
}
if (inTokenMode && !duringRI) {
event.inhibitCacheListenerNotification(true);
}
doPart3 = true;
owner.basicDestroyPart2(re, event, inTokenMode, false /* conflict with clear*/, duringRI, true);
// if (!re.isTombstone() || isEviction) {
lruEntryDestroy(re);
// } else {
// lruEntryUpdate(re);
// lruUpdateCallback = true;
// }
} else {
if (!inTokenMode) {
EntryLogger.logDestroy(event);
owner.recordEvent(event);
if (re.getVersionStamp() == null) {
re.removePhase2();
removeEntry(event.getKey(), re, true, event, owner,
indexUpdater);
lruEntryDestroy(re);
} else {
if (re.isTombstone()) {
// the entry is already a tombstone, but we're destroying it
// again, so we need to reschedule the tombstone's expiration
if (event.isOriginRemote()) {
owner.rescheduleTombstone(re, re.getVersionStamp().asVersionTag());
}
}
}
lruEntryDestroy(re);
opCompleted = true;
}
}
}
catch (RegionClearedException rce) {
// Ignore. The exception will ensure that we do not update
// the LRU List
opCompleted = true;
owner.recordEvent(event);
if (inTokenMode && !duringRI) {
event.inhibitCacheListenerNotification(true);
}
owner.basicDestroyPart2(re, event, inTokenMode, true /*conflict with clear*/, duringRI, true);
doPart3 = true;
}
finally {
if (re.isRemoved() && !re.isTombstone()) {
if (!removed) {
removeEntry(event.getKey(), re, true, event, owner,
indexUpdater);
}
}
}
} // !isRemoved
else { // already removed
if (owner.isHDFSReadWriteRegion() && re.isRemovedPhase2()) {
// For HDFS region there may be a race with eviction
// so retry the operation. fixes bug 49150
retry = true;
continue RETRY_LOOP;
}
if (re.isTombstone() && event.getVersionTag() != null) {
// if we're dealing with a tombstone and this is a remote event
// (e.g., from cache client update thread) we need to update
// the tombstone's version information
// TODO use destroyEntry() here
processVersionTag(re, event);
try {
re.makeTombstone(owner, event.getVersionTag());
} catch (RegionClearedException e) {
// that's okay - when writing a tombstone into a disk, the
// region has been cleared (including this tombstone)
}
}
if (expectedOldValue != null) {
// if re is removed then there is no old value, so return false
return false;
}
if (!inTokenMode && !isEviction) {
owner.checkEntryNotFound(event.getKey());
}
// if (isEviction && re.isTombstone()) {
// owner.unscheduleTombstone(re);
// removeTombstone(re, re.getVersionStamp().getEntryVersion(), true);
// }
}
} // synchronized re
} catch (ConcurrentCacheModificationException ccme) {
VersionTag tag = event.getVersionTag();
if (tag != null && tag.isTimeStampUpdated()) {
// Notify gateways of new time-stamp.
owner.notifyTimestampsToGateways(event);
}
throw ccme;
} finally {
if (oqlIndexManager != null) {
oqlIndexManager.countDownIndexUpdaters();
}
}
// No need to call lruUpdateCallback since the only lru action
// we may have taken was lruEntryDestroy. This fixes bug 31759.
} // current entry exists
if(opCompleted) {
EntryLogger.logDestroy(event);
}
return opCompleted;
}
finally {
releaseCacheModificationLock(owner, event);
doUnlock = false;
try {
// release the SQLF index lock, if acquired
if (sqlfIndexLocked) {
indexUpdater.unlockForIndexGII();
}
// If concurrency conflict is there and event contains gateway version tag then
// do NOT distribute.
if (event.isConcurrencyConflict() &&
(event.getVersionTag() != null && event.getVersionTag().isGatewayTag())) {
doPart3 = false;
}
// distribution and listener notification
if (doPart3) {
owner.basicDestroyPart3(re, event, inTokenMode, duringRI, true, expectedOldValue);
}
// if (lruUpdateCallback) {
// lruUpdateCallback();
// }
} finally {
if (opCompleted) {
if (re != null) {
owner.cancelExpiryTask(re);
} else if (tombstone != null) {
owner.cancelExpiryTask(tombstone);
}
}
}
}
} finally { // failsafe on the read lock...see comment above
if (doUnlock) {
releaseCacheModificationLock(owner, event);
}
}
} // retry loop
return false;
}
public final void txApplyDestroy(Object key, TransactionId txId,
TXRmtEvent txEvent, boolean inTokenMode, boolean inRI, Operation op,
EventID eventId, Object aCallbackArgument,List<EntryEventImpl> pendingCallbacks,FilterRoutingInfo filterRoutingInfo,ClientProxyMembershipID bridgeContext,
boolean isOriginRemote, TXEntryState txEntryState, VersionTag versionTag, long tailKey)
{
final boolean isDebugEnabled = logger.isDebugEnabled();
final LocalRegion owner = _getOwner();
owner.checkBeforeEntrySync(txEvent);
final boolean isRegionReady = !inTokenMode;
final boolean hasRemoteOrigin = !((TXId)txId).getMemberId().equals(owner.getMyId());
boolean cbEventInPending = false;
lockForTXCacheModification(owner, versionTag);
IndexManager oqlIndexManager = owner.getIndexManager() ;
try {
RegionEntry re = getEntry(key);
if (re != null) {
// Fix for Bug #44431. We do NOT want to update the region and wait
// later for index INIT as region.clear() can cause inconsistency if
// happened in parallel as it also does index INIT.
if (oqlIndexManager != null) {
oqlIndexManager.waitForIndexInit();
}
try {
synchronized (re) {
if (!re.isRemoved() || re.isTombstone()) {
EntryEventImpl sqlfEvent = null;
@Retained @Released Object oldValue = re.getValueInVM(owner);
try {
final int oldSize = owner.calculateRegionEntryValueSize(re);
// Create an entry event only if the calling context is
// a receipt of a TXCommitMessage AND there are callbacks installed
// for this region
boolean invokeCallbacks = shouldCreateCBEvent(owner, false/*isInvalidate*/, isRegionReady || inRI);
EntryEventImpl cbEvent = createCBEvent(owner, op,
key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
try {
if (/* owner.isUsedForPartitionedRegionBucket() && */
indexUpdater != null) {
sqlfEvent = cbEvent;
} else {
if (owner.isUsedForPartitionedRegionBucket()) {
txHandleWANEvent(owner, cbEvent, txEntryState);
}
cbEvent.setRegionEntry(re);
}
cbEvent.setOldValue(oldValue);
if (isDebugEnabled) {
logger.debug("txApplyDestroy cbEvent={}", cbEvent);
}
txRemoveOldIndexEntry(Operation.DESTROY, re);
if (txEvent != null) {
txEvent.addDestroy(owner, re, re.getKey(),aCallbackArgument);
}
boolean clearOccured = false;
try {
processAndGenerateTXVersionTag(owner, cbEvent, re, txEntryState);
if (inTokenMode) {
if (oldValue == Token.TOMBSTONE) {
owner.unscheduleTombstone(re);
}
re.setValue(owner, Token.DESTROYED);
}
else {
if (!re.isTombstone()) {
if (sqlfEvent != null) {
re.removePhase1(owner, false); // fix for bug 43063
re.removePhase2();
removeEntry(key, re, true, sqlfEvent, owner, indexUpdater);
} else {
if (shouldPerformConcurrencyChecks(owner, cbEvent) && cbEvent.getVersionTag() != null) {
re.makeTombstone(owner, cbEvent.getVersionTag());
} else {
re.removePhase1(owner, false); // fix for bug 43063
re.removePhase2();
removeEntry(key, re, false);
}
}
} else {
owner.rescheduleTombstone(re, re.getVersionStamp().asVersionTag());
}
}
EntryLogger.logTXDestroy(_getOwnerObject(), key);
owner.updateSizeOnRemove(key, oldSize);
}
catch (RegionClearedException rce) {
clearOccured = true;
}
owner.txApplyDestroyPart2(re, re.getKey(), inTokenMode,
clearOccured /* Clear Conflciting with the operation */);
if (invokeCallbacks) {
switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
if(pendingCallbacks==null) {
owner.invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY,
cbEvent, true/*callDispatchListenerEvent*/);
} else {
pendingCallbacks.add(cbEvent);
cbEventInPending = true;
}
}
if (!clearOccured) {
lruEntryDestroy(re);
}
if (owner.concurrencyChecksEnabled && txEntryState != null && cbEvent!= null) {
txEntryState.setVersionTag(cbEvent.getVersionTag());
}
} finally {
if (!cbEventInPending) cbEvent.release();
}
} finally {
OffHeapHelper.release(oldValue);
}
}
}
} finally {
if (oqlIndexManager != null) {
oqlIndexManager.countDownIndexUpdaters();
}
}
} else if (inTokenMode || owner.concurrencyChecksEnabled) {
// treating tokenMode and re == null as same, since we now want to
// generate versions and Tombstones for destroys
boolean dispatchListenerEvent = inTokenMode;
boolean opCompleted = false;
RegionEntry newRe = getEntryFactory().createEntry(owner, key,
Token.DESTROYED);
if ( oqlIndexManager != null) {
oqlIndexManager.waitForIndexInit();
}
EntryEventImpl cbEvent = null;
try {
synchronized (newRe) {
RegionEntry oldRe = putEntryIfAbsent(key, newRe);
while (!opCompleted && oldRe != null) {
synchronized (oldRe) {
if (oldRe.isRemovedPhase2()) {
oldRe = putEntryIfAbsent(key, newRe);
if (oldRe != null) {
owner.getCachePerfStats().incRetries();
}
}
else {
try {
boolean invokeCallbacks = shouldCreateCBEvent(owner, false, isRegionReady || inRI);
cbEvent = createCBEvent(owner, op,
key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
try {
cbEvent.setRegionEntry(oldRe);
cbEvent.setOldValue(Token.NOT_AVAILABLE);
if (isDebugEnabled) {
logger.debug("txApplyDestroy token mode cbEvent={}", cbEvent);
}
if (owner.isUsedForPartitionedRegionBucket()) {
txHandleWANEvent(owner, cbEvent, txEntryState);
}
processAndGenerateTXVersionTag(owner, cbEvent, oldRe, txEntryState);
if (invokeCallbacks) {
switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
if(pendingCallbacks==null) {
owner.invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY,
cbEvent, dispatchListenerEvent);
} else {
pendingCallbacks.add(cbEvent);
cbEventInPending = true;
}
}
int oldSize = 0;
boolean wasTombstone = oldRe.isTombstone();
{
if (!wasTombstone) {
oldSize = owner.calculateRegionEntryValueSize(oldRe);
}
}
oldRe.setValue(owner, Token.DESTROYED);
EntryLogger.logTXDestroy(_getOwnerObject(), key);
if (wasTombstone) {
owner.unscheduleTombstone(oldRe);
}
owner.updateSizeOnRemove(oldRe.getKey(), oldSize);
owner.txApplyDestroyPart2(oldRe, oldRe.getKey(), inTokenMode,
false /* Clear Conflicting with the operation */);
lruEntryDestroy(oldRe);
} finally {
if (!cbEventInPending) cbEvent.release();
}
}
catch (RegionClearedException rce) {
owner.txApplyDestroyPart2(oldRe, oldRe.getKey(), inTokenMode,
true /* Clear Conflicting with the operation */);
}
if (shouldPerformConcurrencyChecks(owner, cbEvent) && cbEvent.getVersionTag() != null) {
oldRe.makeTombstone(owner, cbEvent.getVersionTag());
} else if (!inTokenMode) {
// only remove for NORMAL regions if they do not generate versions see 51781
oldRe.removePhase1(owner, false); // fix for bug 43063
oldRe.removePhase2();
removeEntry(key, oldRe, false);
}
opCompleted = true;
}
}
}
if (!opCompleted) {
// already has value set to Token.DESTROYED
opCompleted = true;
boolean invokeCallbacks = shouldCreateCBEvent(owner, false, isRegionReady || inRI);
cbEvent = createCBEvent(owner, op,
key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
try {
cbEvent.setRegionEntry(newRe);
cbEvent.setOldValue(Token.NOT_AVAILABLE);
if (isDebugEnabled) {
logger.debug("txApplyDestroy token mode cbEvent={}", cbEvent);
}
if (owner.isUsedForPartitionedRegionBucket()) {
txHandleWANEvent(owner, cbEvent, txEntryState);
}
processAndGenerateTXVersionTag(owner, cbEvent, newRe, txEntryState);
if (invokeCallbacks) {
switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
if(pendingCallbacks==null) {
owner.invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY,
cbEvent, dispatchListenerEvent);
} else {
pendingCallbacks.add(cbEvent);
cbEventInPending = true;
}
}
EntryLogger.logTXDestroy(_getOwnerObject(), key);
owner.updateSizeOnCreate(newRe.getKey(), 0);
if (shouldPerformConcurrencyChecks(owner, cbEvent) && cbEvent.getVersionTag() != null) {
newRe.makeTombstone(owner, cbEvent.getVersionTag());
} else if (!inTokenMode) {
// only remove for NORMAL regions if they do not generate versions see 51781
newRe.removePhase1(owner, false); // fix for bug 43063
newRe.removePhase2();
removeEntry(key, newRe, false);
}
owner
.txApplyDestroyPart2(newRe, newRe.getKey(), inTokenMode,
false /*clearConflict*/);
// Note no need for LRU work since the entry is destroyed
// and will be removed when gii completes
} finally {
if (!cbEventInPending) cbEvent.release();
}
}
if (owner.concurrencyChecksEnabled && txEntryState != null && cbEvent != null) {
txEntryState.setVersionTag(cbEvent.getVersionTag());
}
}
} catch (RegionClearedException e) {
// TODO
} finally {
if (oqlIndexManager != null) {
oqlIndexManager.countDownIndexUpdaters();
}
}
} else if (re == null) {
// Fix bug#43594
// In cases where bucket region is re-created, it may so happen that
// the destroy is already applied on the Initial image provider, thus
// causing region entry to be absent.
// Notify clients with client events.
EntryEventImpl cbEvent = createCBEvent(owner, op,
key, null, txId, txEvent, eventId, aCallbackArgument,
filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
try {
if (owner.isUsedForPartitionedRegionBucket()) {
txHandleWANEvent(owner, cbEvent, txEntryState);
}
switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
if (pendingCallbacks == null) {
owner.invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY,cbEvent,false);
} else {
pendingCallbacks.add(cbEvent);
cbEventInPending = true;
}
} finally {
if (!cbEventInPending) cbEvent.release();
}
}
} catch( DiskAccessException dae) {
owner.handleDiskAccessException(dae);
throw dae;
}
finally {
releaseTXCacheModificationLock(owner, versionTag);
}
}
public final boolean invalidate(EntryEventImpl event,
boolean invokeCallbacks, boolean forceNewEntry, boolean forceCallbacks)
throws EntryNotFoundException
{
final boolean isDebugEnabled = logger.isDebugEnabled();
final LocalRegion owner = _getOwner();
if (owner == null) {
// "fix" for bug 32440
Assert.assertTrue(false, "The owner for RegionMap " + this
+ " is null for event " + event);
}
boolean didInvalidate = false;
RegionEntry invalidatedRe = null;
boolean clearOccured = false;
DiskRegion dr = owner.getDiskRegion();
// Fix for Bug #44431. We do NOT want to update the region and wait
// later for index INIT as region.clear() can cause inconsistency if
// happened in parallel as it also does index INIT.
IndexManager oqlIndexManager = owner.getIndexManager() ;
if (oqlIndexManager != null) {
oqlIndexManager.waitForIndexInit();
}
lockForCacheModification(owner, event);
try {
if (forceNewEntry || forceCallbacks) {
boolean opCompleted = false;
RegionEntry newRe = getEntryFactory().createEntry(owner, event.getKey(),
Token.REMOVED_PHASE1);
synchronized (newRe) {
try {
RegionEntry oldRe = putEntryIfAbsent(event.getKey(), newRe);
while (!opCompleted && oldRe != null) {
synchronized (oldRe) {
// if the RE is in phase 2 of removal, it will really be removed
// from the map. Otherwise, we can use it here and the thread
// that is destroying the RE will see the invalidation and not
// proceed to phase 2 of removal.
if (oldRe.isRemovedPhase2()) {
oldRe = putEntryIfAbsent(event.getKey(), newRe);
if (oldRe != null) {
owner.getCachePerfStats().incRetries();
}
} else {
opCompleted = true;
event.setRegionEntry(oldRe);
if (oldRe.isDestroyed()) {
if (isDebugEnabled) {
logger.debug("mapInvalidate: Found DESTROYED token, not invalidated; key={}", event.getKey());
}
} else if (oldRe.isInvalid()) {
// was already invalid, do not invoke listeners or increment stat
if (isDebugEnabled) {
logger.debug("mapInvalidate: Entry already invalid: '{}'", event.getKey());
}
processVersionTag(oldRe, event);
try {
oldRe.setValue(owner, oldRe.getValueInVM(owner)); // OFFHEAP noop setting an already invalid to invalid; No need to call prepareValueForCache since it is an invalid token.
} catch (RegionClearedException e) {
// that's okay - when writing an invalid into a disk, the
// region has been cleared (including this token)
}
} else {
owner.cacheWriteBeforeInvalidate(event, invokeCallbacks, forceNewEntry);
if (owner.concurrencyChecksEnabled && event.noVersionReceivedFromServer()) {
// server did not perform the invalidation, so don't leave an invalid
// entry here
return false;
}
final int oldSize = owner.calculateRegionEntryValueSize(oldRe);
//added for cq which needs old value. rdubey
FilterProfile fp = owner.getFilterProfile();
if (!oldRe.isRemoved() &&
(fp != null && fp.getCqCount() > 0)) {
@Retained @Released Object oldValue = oldRe.getValueInVM(owner); // OFFHEAP EntryEventImpl oldValue
// this will not fault in the value.
try {
if (oldValue == Token.NOT_AVAILABLE){
event.setOldValue(oldRe.getValueOnDiskOrBuffer(owner));
} else {
event.setOldValue(oldValue);
}
} finally {
OffHeapHelper.release(oldValue);
}
}
boolean isCreate = false;
try {
if (oldRe.isRemoved()) {
processVersionTag(oldRe, event);
event.putNewEntry(owner, oldRe);
EntryLogger.logInvalidate(event);
owner.recordEvent(event);
if (!oldRe.isTombstone()) {
owner.updateSizeOnPut(event.getKey(), oldSize, event.getNewValueBucketSize());
} else {
owner.updateSizeOnCreate(event.getKey(), event.getNewValueBucketSize());
isCreate = true;
}
} else {
processVersionTag(oldRe, event);
event.putExistingEntry(owner, oldRe);
EntryLogger.logInvalidate(event);
owner.recordEvent(event);
owner.updateSizeOnPut(event.getKey(), oldSize, event.getNewValueBucketSize());
}
}
catch (RegionClearedException e) {
// generate versionTag for the event
EntryLogger.logInvalidate(event);
owner.recordEvent(event);
clearOccured = true;
}
owner.basicInvalidatePart2(oldRe, event,
clearOccured /* conflict with clear */, invokeCallbacks);
if (!clearOccured) {
if (isCreate) {
lruEntryCreate(oldRe);
} else {
lruEntryUpdate(oldRe);
}
}
didInvalidate = true;
invalidatedRe = oldRe;
}
}
} // synchronized oldRe
} // while oldRe exists
if (!opCompleted) {
if (forceNewEntry && event.isFromServer()) {
// don't invoke listeners - we didn't force new entries for
// CCU invalidations before 7.0, and listeners don't care
event.inhibitCacheListenerNotification(true);
}
event.setRegionEntry(newRe);
owner.cacheWriteBeforeInvalidate(event, invokeCallbacks, forceNewEntry);
if (!forceNewEntry && event.noVersionReceivedFromServer()) {
// server did not perform the invalidation, so don't leave an invalid
// entry here
return false;
}
try {
if (!owner.isInitialized() && owner.getDataPolicy().withReplication()) {
final int oldSize = owner.calculateRegionEntryValueSize(newRe);
invalidateEntry(event, newRe, oldSize);
}
else {
invalidateNewEntry(event, owner, newRe);
}
}
catch (RegionClearedException e) {
// TODO: deltaGII: do we even need RegionClearedException?
// generate versionTag for the event
owner.recordEvent(event);
clearOccured = true;
}
owner.basicInvalidatePart2(newRe, event, clearOccured /*conflict with clear*/, invokeCallbacks);
if (!clearOccured) {
lruEntryCreate(newRe);
incEntryCount(1);
}
opCompleted = true;
didInvalidate = true;
invalidatedRe = newRe;
// Don't leave an entry in the cache, if we
// just wanted to force the distribution and events
// for this invalidate
if (!forceNewEntry) {
removeEntry(event.getKey(), newRe, false);
}
} // !opCompleted
} catch (ConcurrentCacheModificationException ccme) {
VersionTag tag = event.getVersionTag();
if (tag != null && tag.isTimeStampUpdated()) {
// Notify gateways of new time-stamp.
owner.notifyTimestampsToGateways(event);
}
throw ccme;
} finally {
if (!opCompleted) {
removeEntry(event.getKey(), newRe, false);
}
}
} // synchronized newRe
} // forceNewEntry
else { // !forceNewEntry
boolean retry = true;
// RegionEntry retryEntry = null;
// int retries = -1;
RETRY_LOOP:
while (retry) {
retry = false;
/* this is useful for debugging if you get a hot thread
retries++;
if (retries > 0) {
owner.getCachePerfStats().incRetries();
if (retries == 1000000) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.AbstractRegionMap_RETRIED_1_MILLION_TIMES_FOR_ENTRY_TO_GO_AWAY_0, new Object[] { retryEntry, retryEntry.removeTrace }));
}
}
*/
boolean entryExisted = false;
RegionEntry re = getEntry(event.getKey());
RegionEntry tombstone = null;
boolean haveTombstone = false;
/* this test fails when an invalidate(k,v) doesn't leave an entry in the cache:
parReg/bridge/serialParRegHABridge.conf
bridgeHosts=5
bridgeThreadsPerVM=1
bridgeVMsPerHost=1
edgeHosts=4
edgeThreadsPerVM=1
edgeVMsPerHost=1
numAccessors=1
numEmptyClients=1
numThinClients=1
numVMsToStop=2
redundantCopies=3
hydra.Prms-randomSeed=1328320674613;
*/
if (re != null && re.isTombstone()) {
tombstone = re;
haveTombstone = true;
re = null;
}
if (re == null) {
if (!owner.isInitialized()) {
// when GII message arrived or processed later than invalidate
// message, the entry should be created as placeholder
RegionEntry newRe = haveTombstone? tombstone : getEntryFactory().createEntry(owner, event.getKey(),
Token.INVALID);
synchronized (newRe) {
if (haveTombstone && !tombstone.isTombstone()) {
// state of the tombstone has changed so we need to retry
retry = true;
//retryEntry = tombstone; // leave this in place for debugging
continue RETRY_LOOP;
}
re = putEntryIfAbsent(event.getKey(), newRe);
if (re == tombstone) {
re = null; // pretend we don't have an entry
}
}
} else if (owner.getServerProxy() != null) {
Object sync = haveTombstone? tombstone : new Object();
synchronized(sync) {
if (haveTombstone && !tombstone.isTombstone()) {
// bug 45295: state of the tombstone has changed so we need to retry
retry = true;
//retryEntry = tombstone; // leave this in place for debugging
continue RETRY_LOOP;
}
// bug #43287 - send event to server even if it's not in the client (LRU may have evicted it)
owner.cacheWriteBeforeInvalidate(event, true, false);
if (owner.concurrencyChecksEnabled) {
if (event.getVersionTag() == null) {
// server did not perform the invalidation, so don't leave an invalid
// entry here
return false;
} else if (tombstone != null) {
processVersionTag(tombstone, event);
try {
if (!tombstone.isTombstone()) {
if (isDebugEnabled) {
logger.debug("tombstone is no longer a tombstone. {}:event={}", tombstone, event);
}
}
tombstone.setValue(owner, Token.TOMBSTONE);
} catch (RegionClearedException e) {
// that's okay - when writing a tombstone into a disk, the
// region has been cleared (including this tombstone)
} catch (ConcurrentCacheModificationException ccme) {
VersionTag tag = event.getVersionTag();
if (tag != null && tag.isTimeStampUpdated()) {
// Notify gateways of new time-stamp.
owner.notifyTimestampsToGateways(event);
}
throw ccme;
}
// update the tombstone's version to prevent an older CCU/putAll from overwriting it
owner.rescheduleTombstone(tombstone, event.getVersionTag());
}
}
}
entryExisted = true;
}
}
if (re != null) {
// Gester: Race condition in GII
// when adding the placeholder for invalidate entry during GII,
// if the GII got processed earlier for this entry, then do
// normal invalidate operation
synchronized (re) {
if (!event.isOriginRemote() && event.getOperation().isExpiration()) {
// If this expiration started locally then only do it if the RE is not being used by a tx.
if (re.isInUseByTransaction()) {
return false;
}
}
if (re.isTombstone() || (!re.isRemoved() && !re.isDestroyed())) {
entryExisted = true;
if (re.isInvalid()) {
// was already invalid, do not invoke listeners or increment
// stat
if (isDebugEnabled) {
logger.debug("Invalidate: Entry already invalid: '{}'", event.getKey());
}
if (event.getVersionTag() != null && owner.getVersionVector() != null) {
owner.getVersionVector().recordVersion((InternalDistributedMember) event.getDistributedMember(), event.getVersionTag());
}
}
else { // previous value not invalid
event.setRegionEntry(re);
owner.cacheWriteBeforeInvalidate(event, invokeCallbacks, forceNewEntry);
if (owner.concurrencyChecksEnabled && event.noVersionReceivedFromServer()) {
// server did not perform the invalidation, so don't leave an invalid
// entry here
if (isDebugEnabled) {
logger.debug("returning early because server did not generate a version stamp for this event:{}", event);
}
return false;
}
// in case of overflow to disk we need the old value for cqs.
if(owner.getFilterProfile().getCqCount() > 0){
//use to be getValue and can cause dead lock rdubey.
if (re.isValueNull()) {
event.setOldValue(re.getValueOnDiskOrBuffer(owner));
} else {
@Retained @Released Object v = re.getValueInVM(owner);
try {
event.setOldValue(v); // OFFHEAP escapes to EntryEventImpl oldValue
} finally {
OffHeapHelper.release(v);
}
}
}
final boolean oldWasTombstone = re.isTombstone();
final int oldSize = _getOwner().calculateRegionEntryValueSize(re);
try {
invalidateEntry(event, re, oldSize);
}
catch (RegionClearedException rce) {
// generate versionTag for the event
EntryLogger.logInvalidate(event);
_getOwner().recordEvent(event);
clearOccured = true;
} catch (ConcurrentCacheModificationException ccme) {
VersionTag tag = event.getVersionTag();
if (tag != null && tag.isTimeStampUpdated()) {
// Notify gateways of new time-stamp.
owner.notifyTimestampsToGateways(event);
}
throw ccme;
}
owner.basicInvalidatePart2(re, event,
clearOccured /* conflict with clear */, invokeCallbacks);
if (!clearOccured) {
if (oldWasTombstone) {
lruEntryCreate(re);
} else {
lruEntryUpdate(re);
}
}
didInvalidate = true;
invalidatedRe = re;
} // previous value not invalid
}
} // synchronized re
} // re != null
else {
// At this point, either it's not in GII mode, or the placeholder
// is in region, do nothing
}
if (!entryExisted) {
owner.checkEntryNotFound(event.getKey());
}
} // while(retry)
} // !forceNewEntry
} catch( DiskAccessException dae) {
invalidatedRe = null;
didInvalidate = false;
this._getOwner().handleDiskAccessException(dae);
throw dae;
} finally {
releaseCacheModificationLock(owner, event);
if (oqlIndexManager != null) {
oqlIndexManager.countDownIndexUpdaters();
}
if (invalidatedRe != null) {
owner.basicInvalidatePart3(invalidatedRe, event, invokeCallbacks);
}
if (didInvalidate && !clearOccured) {
try {
lruUpdateCallback();
} catch( DiskAccessException dae) {
this._getOwner().handleDiskAccessException(dae);
throw dae;
}
}
else if (!didInvalidate){
resetThreadLocals();
}
}
return didInvalidate;
}
protected void invalidateNewEntry(EntryEventImpl event,
final LocalRegion owner, RegionEntry newRe) throws RegionClearedException {
processVersionTag(newRe, event);
event.putNewEntry(owner, newRe);
owner.recordEvent(event);
owner.updateSizeOnCreate(event.getKey(), event.getNewValueBucketSize());
}
protected void invalidateEntry(EntryEventImpl event, RegionEntry re,
int oldSize) throws RegionClearedException {
processVersionTag(re, event);
event.putExistingEntry(_getOwner(), re);
EntryLogger.logInvalidate(event);
_getOwner().recordEvent(event);
_getOwner().updateSizeOnPut(event.getKey(), oldSize, event.getNewValueBucketSize());
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.RegionMap#updateEntryVersion(com.gemstone.gemfire.internal.cache.EntryEventImpl)
*/
@Override
public void updateEntryVersion(EntryEventImpl event) throws EntryNotFoundException {
final LocalRegion owner = _getOwner();
if (owner == null) {
// "fix" for bug 32440
Assert.assertTrue(false, "The owner for RegionMap " + this
+ " is null for event " + event);
}
DiskRegion dr = owner.getDiskRegion();
if (dr != null) {
dr.setClearCountReference();
}
lockForCacheModification(owner, event);
try {
RegionEntry re = getEntry(event.getKey());
boolean entryExisted = false;
if (re != null) {
// process version tag
synchronized (re) {
try {
if (re.isTombstone()
|| (!re.isRemoved() && !re.isDestroyed())) {
entryExisted = true;
}
processVersionTag(re, event);
owner.generateAndSetVersionTag(event, re);
EntryLogger.logUpdateEntryVersion(event);
_getOwner().recordEvent(event);
} catch (ConcurrentCacheModificationException ccme) {
// Do nothing.
}
}
}
if (!entryExisted) {
owner.checkEntryNotFound(event.getKey());
}
} catch( DiskAccessException dae) {
this._getOwner().handleDiskAccessException(dae);
throw dae;
} finally {
releaseCacheModificationLock(owner, event);
if (dr != null) {
dr.removeClearCountReference();
}
}
}
public final void txApplyInvalidate(Object key, Object newValue, boolean didDestroy,
TransactionId txId, TXRmtEvent txEvent, boolean localOp,
EventID eventId, Object aCallbackArgument,List<EntryEventImpl> pendingCallbacks,FilterRoutingInfo filterRoutingInfo,ClientProxyMembershipID bridgeContext, TXEntryState txEntryState, VersionTag versionTag, long tailKey)
{
// boolean didInvalidate = false;
final LocalRegion owner = _getOwner();
owner.checkBeforeEntrySync(txEvent);
EntryEventImpl cbEvent = null;
boolean forceNewEntry = !owner.isInitialized() && owner.isAllEvents();
final boolean hasRemoteOrigin = !((TXId)txId).getMemberId().equals(owner.getMyId());
DiskRegion dr = owner.getDiskRegion();
// Fix for Bug #44431. We do NOT want to update the region and wait
// later for index INIT as region.clear() can cause inconsistency if
// happened in parallel as it also does index INIT.
IndexManager oqlIndexManager = owner.getIndexManager() ;
if (oqlIndexManager != null) {
oqlIndexManager.waitForIndexInit();
}
lockForTXCacheModification(owner, versionTag);
try {
if (forceNewEntry) {
boolean opCompleted = false;
RegionEntry newRe = getEntryFactory().createEntry(owner, key,
Token.REMOVED_PHASE1);
synchronized (newRe) {
try {
RegionEntry oldRe = putEntryIfAbsent(key, newRe);
while (!opCompleted && oldRe != null) {
synchronized (oldRe) {
if (oldRe.isRemovedPhase2()) {
oldRe = putEntryIfAbsent(key, newRe);
if (oldRe != null) {
owner.getCachePerfStats().incRetries();
}
}
else {
opCompleted = true;
final boolean oldWasTombstone = oldRe.isTombstone();
final int oldSize = owner.calculateRegionEntryValueSize(oldRe);
Object oldValue = oldRe.getValueInVM(owner); // OFFHEAP eei
try {
// Create an entry event only if the calling context is
// a receipt of a TXCommitMessage AND there are callbacks
// installed
// for this region
boolean invokeCallbacks = shouldCreateCBEvent(owner, true, owner.isInitialized());
boolean cbEventInPending = false;
cbEvent = createCBEvent(owner,
localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE,
key, newValue, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
try {
cbEvent.setRegionEntry(oldRe);
cbEvent.setOldValue(oldValue);
if (logger.isDebugEnabled()) {
logger.debug("txApplyInvalidate cbEvent={}", cbEvent);
}
txRemoveOldIndexEntry(Operation.INVALIDATE, oldRe);
if (didDestroy) {
oldRe.txDidDestroy(owner.cacheTimeMillis());
}
if (txEvent != null) {
txEvent.addInvalidate(owner, oldRe, oldRe.getKey(),
newValue,aCallbackArgument);
}
oldRe.setValueResultOfSearch(false);
processAndGenerateTXVersionTag(owner, cbEvent, oldRe, txEntryState);
boolean clearOccured = false;
try {
oldRe.setValue(owner, oldRe.prepareValueForCache(owner, newValue, true));
EntryLogger.logTXInvalidate(_getOwnerObject(), key);
owner.updateSizeOnPut(key, oldSize, 0);
if (oldWasTombstone) {
owner.unscheduleTombstone(oldRe);
}
}
catch (RegionClearedException rce) {
clearOccured = true;
}
owner.txApplyInvalidatePart2(oldRe, oldRe.getKey(),
didDestroy, true, clearOccured);
// didInvalidate = true;
if (invokeCallbacks) {
switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
if(pendingCallbacks==null) {
owner.invokeTXCallbacks(
EnumListenerEvent.AFTER_INVALIDATE, cbEvent,
true/*callDispatchListenerEvent*/);
} else {
pendingCallbacks.add(cbEvent);
cbEventInPending = true;
}
}
if (!clearOccured) {
lruEntryUpdate(oldRe);
}
if (shouldPerformConcurrencyChecks(owner, cbEvent) && txEntryState != null) {
txEntryState.setVersionTag(cbEvent.getVersionTag());
}
} finally {
if (!cbEventInPending) cbEvent.release();
}
} finally {
OffHeapHelper.release(oldValue);
}
}
}
}
if (!opCompleted) {
boolean invokeCallbacks = shouldCreateCBEvent( owner, true /* isInvalidate */, owner.isInitialized());
boolean cbEventInPending = false;
cbEvent = createCBEvent(owner,
localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE,
key, newValue, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
try {
cbEvent.setRegionEntry(newRe);
txRemoveOldIndexEntry(Operation.INVALIDATE, newRe);
newRe.setValueResultOfSearch(false);
boolean clearOccured = false;
try {
processAndGenerateTXVersionTag(owner, cbEvent, newRe, txEntryState);
newRe.setValue(owner, newRe.prepareValueForCache(owner, newValue, true));
EntryLogger.logTXInvalidate(_getOwnerObject(), key);
owner.updateSizeOnCreate(newRe.getKey(), 0);//we are putting in a new invalidated entry
}
catch (RegionClearedException rce) {
clearOccured = true;
}
owner.txApplyInvalidatePart2(newRe, newRe.getKey(), didDestroy,
true, clearOccured);
if (invokeCallbacks) {
switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
if(pendingCallbacks==null) {
owner.invokeTXCallbacks(
EnumListenerEvent.AFTER_INVALIDATE, cbEvent,
true/*callDispatchListenerEvent*/);
} else {
pendingCallbacks.add(cbEvent);
cbEventInPending = true;
}
}
opCompleted = true;
if (!clearOccured) {
lruEntryCreate(newRe);
incEntryCount(1);
}
if (shouldPerformConcurrencyChecks(owner, cbEvent) && txEntryState != null) {
txEntryState.setVersionTag(cbEvent.getVersionTag());
}
} finally {
if (!cbEventInPending) cbEvent.release();
}
}
}
finally {
if (!opCompleted) {
removeEntry(key, newRe, false);
}
}
}
}
else { /* !forceNewEntry */
RegionEntry re = getEntry(key);
if (re != null) {
synchronized (re) {
{
final int oldSize = owner.calculateRegionEntryValueSize(re);
boolean wasTombstone = re.isTombstone();
Object oldValue = re.getValueInVM(owner); // OFFHEAP eei
// Create an entry event only if the calling context is
// a receipt of a TXCommitMessage AND there are callbacks
// installed
// for this region
boolean invokeCallbacks = shouldCreateCBEvent(owner, true, owner.isInitialized());
boolean cbEventInPending = false;
cbEvent = createCBEvent(owner,
localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE,
key, newValue, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
try {
cbEvent.setRegionEntry(re);
cbEvent.setOldValue(oldValue);
txRemoveOldIndexEntry(Operation.INVALIDATE, re);
if (didDestroy) {
re.txDidDestroy(owner.cacheTimeMillis());
}
if (txEvent != null) {
txEvent.addInvalidate(owner, re, re.getKey(), newValue,aCallbackArgument);
}
re.setValueResultOfSearch(false);
processAndGenerateTXVersionTag(owner, cbEvent, re, txEntryState);
boolean clearOccured = false;
try {
re.setValue(owner, re.prepareValueForCache(owner, newValue, true));
EntryLogger.logTXInvalidate(_getOwnerObject(), key);
if (wasTombstone) {
owner.unscheduleTombstone(re);
}
owner.updateSizeOnPut(key, oldSize, 0);
}
catch (RegionClearedException rce) {
clearOccured = true;
}
owner.txApplyInvalidatePart2(re, re.getKey(), didDestroy, true,
clearOccured);
// didInvalidate = true;
if (invokeCallbacks) {
switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
if(pendingCallbacks==null) {
owner.invokeTXCallbacks(
EnumListenerEvent.AFTER_INVALIDATE, cbEvent,
true/*callDispatchListenerEvent*/);
} else {
pendingCallbacks.add(cbEvent);
cbEventInPending = true;
}
}
if (!clearOccured) {
lruEntryUpdate(re);
}
if (shouldPerformConcurrencyChecks(owner, cbEvent) && txEntryState != null) {
txEntryState.setVersionTag(cbEvent.getVersionTag());
}
} finally {
if (!cbEventInPending) cbEvent.release();
}
}
}
} else { //re == null
// Fix bug#43594
// In cases where bucket region is re-created, it may so happen
// that the invalidate is already applied on the Initial image
// provider, thus causing region entry to be absent.
// Notify clients with client events.
boolean cbEventInPending = false;
cbEvent = createCBEvent(owner,
localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE,
key, newValue, txId, txEvent, eventId, aCallbackArgument,
filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
try {
switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
if (pendingCallbacks == null) {
owner.invokeTXCallbacks(EnumListenerEvent.AFTER_INVALIDATE,
cbEvent, false);
} else {
pendingCallbacks.add(cbEvent);
cbEventInPending = true;
}
} finally {
if (!cbEventInPending) cbEvent.release();
}
}
}
}catch( DiskAccessException dae) {
owner.handleDiskAccessException(dae);
throw dae;
}
finally {
releaseTXCacheModificationLock(owner, versionTag);
if (oqlIndexManager != null) {
oqlIndexManager.countDownIndexUpdaters();
}
}
}
/**
* This code needs to be evaluated. It was added quickly to help PR persistence
* not to consume as much memory.
*/
public void evictValue(Object key) {
final LocalRegion owner = _getOwner();
RegionEntry re = getEntry(key);
if (re != null) {
synchronized (re) {
if (!re.isValueNull()) {
re.setValueToNull();
owner.getDiskRegion().incNumEntriesInVM(-1L);
owner.getDiskRegion().incNumOverflowOnDisk(1L);
if(owner instanceof BucketRegion)
{
((BucketRegion)owner).incNumEntriesInVM(-1L);
((BucketRegion)owner).incNumOverflowOnDisk(1L);
}
}
}
}
}
private RegionEntry getOrCreateRegionEntry(Object ownerRegion,
EntryEventImpl event, Object value,
MapCallbackAdapter<Object, Object, Object, Object> valueCreator,
boolean onlyExisting, boolean returnTombstone) {
Object key = event.getKey();
RegionEntry retVal = null;
if (event.isFetchFromHDFS()) {
retVal = getEntry(event);
} else {
retVal = getEntryInVM(key);
}
if (onlyExisting) {
if (!returnTombstone && (retVal != null && retVal.isTombstone())) {
return null;
}
return retVal;
}
if (retVal != null) {
return retVal;
}
if (valueCreator != null) {
value = valueCreator.newValue(key, ownerRegion, value, null);
}
retVal = getEntryFactory().createEntry((RegionEntryContext) ownerRegion, key, value);
RegionEntry oldRe = putEntryIfAbsent(key, retVal);
if (oldRe != null) {
if (retVal instanceof OffHeapRegionEntry) {
((OffHeapRegionEntry) retVal).release();
}
return oldRe;
}
return retVal;
}
protected static final MapCallbackAdapter<Object, Object, Object, Object>
listOfDeltasCreator = new MapCallbackAdapter<Object, Object,
Object, Object>() {
@Override
public Object newValue(Object key, Object context, Object createParams,
final MapResult result) {
return new ListOfDeltas(4);
}
};
/**
* Neeraj: The below if block is to handle the special
* scenario witnessed in Sqlfabric for now. (Though its
* a general scenario). The scenario is that the updates start coming
* before the base value reaches through GII. In that scenario the updates
* essentially the deltas are added to a list and kept as oldValue in the
* map and this method returns. When through GII the actual base value arrives
* these updates or deltas are applied on it and the new value thus got is put
* in the map.
* @param event
* @param ifOld
* @return true if delta was enqued
*/
private boolean enqueDelta(EntryEventImpl event, boolean ifOld) {
final IndexUpdater indexManager = getIndexUpdater();
LocalRegion owner = _getOwner();
if (indexManager != null && !owner.isInitialized() && event.hasDelta()) {
boolean isOldValueDelta = true;
try {
if (ifOld) {
final Delta delta = event.getDeltaNewValue();
RegionEntry re = getOrCreateRegionEntry(owner, event, null,
listOfDeltasCreator, false, false);
assert re != null;
synchronized (re) {
@Retained @Released Object oVal = re.getValueOffHeapOrDiskWithoutFaultIn(owner);
if (oVal != null) {
try {
if (oVal instanceof ListOfDeltas) {
if (logger.isDebugEnabled()) {
logger.debug("basicPut: adding delta to list of deltas: {}", delta);
}
((ListOfDeltas)oVal).merge(delta);
@Retained Object newVal = ((AbstractRegionEntry)re).prepareValueForCache(owner, oVal, true);
re.setValue(owner, newVal); // TODO:KIRK:48068 prevent orphan
}
else {
isOldValueDelta = false;
}
}finally {
OffHeapHelper.release(oVal);
}
}
else {
if (logger.isDebugEnabled()) {
logger.debug("basicPut: new list of deltas with delta: {}", delta);
}
@Retained Object newVal = new ListOfDeltas(delta);
// TODO no need to call AbstractRegionMap.prepareValueForCache here?
newVal = ((AbstractRegionEntry)re).prepareValueForCache(owner, newVal, true);
re.setValue(owner, newVal); // TODO:KIRK:48068 prevent orphan
}
}
}
} catch (RegionClearedException ex) {
// Neeraj: We can just ignore this exception because we are returning after this block
}
if (isOldValueDelta) {
return true;
}
}
return false;
}
/*
* returns null if the operation fails
*/
public RegionEntry basicPut(EntryEventImpl event,
final long lastModified,
final boolean ifNew,
final boolean ifOld,
Object expectedOldValue, // only non-null if ifOld
boolean requireOldValue,
final boolean overwriteDestroyed)
throws CacheWriterException,
TimeoutException {
final LocalRegion owner = _getOwner();
boolean clearOccured = false;
if (owner == null) {
// "fix" for bug 32440
Assert.assertTrue(false, "The owner for RegionMap " + this
+ " is null for event " + event);
}
if (logger.isTraceEnabled(LogMarker.LRU_TOMBSTONE_COUNT) && !(owner instanceof HARegion)) {
logger.trace(LogMarker.LRU_TOMBSTONE_COUNT,
"ARM.basicPut called for {} expectedOldValue={} requireOldValue={} ifNew={} ifOld={} initialized={} overwriteDestroyed={}",
event, expectedOldValue, requireOldValue, ifNew, ifOld, owner.isInitialized(), overwriteDestroyed);
}
RegionEntry result = null;
long lastModifiedTime = 0;
// copy into local var to prevent race condition with setter
final CacheWriter cacheWriter = owner.basicGetWriter();
final boolean cacheWrite = !event.isOriginRemote() && !event.isNetSearch() && event.isGenerateCallbacks()
&& (cacheWriter != null
|| owner.hasServerProxy()
|| owner.scope.isDistributed());
/*
* For performance reason, we try to minimize object creation and do as much
* work as we can outside of synchronization, especially getting
* distribution advice.
*/
final Set netWriteRecipients;
if (cacheWrite) {
if (cacheWriter == null && owner.scope.isDistributed()) {
netWriteRecipients = ((DistributedRegion)owner)
.getCacheDistributionAdvisor().adviseNetWrite();
}
else {
netWriteRecipients = null;
}
}
else {
netWriteRecipients = null;
}
// mbid: this has been added to maintain consistency between the disk region
// and the region map after clear() has been called. This will set the
// reference of the diskSegmentRegion as a ThreadLocal so that if the diskRegionSegment
// is later changed by another thread, we can do the necessary.
boolean uninitialized = !owner.isInitialized();
// SqlFabric Changes - BEGIN
if (enqueDelta(event, ifOld)) {
return null;
}
final IndexUpdater indexManager = getIndexUpdater();
boolean sqlfIndexLocked = false;
// SqlFabric Changes - END
boolean retrieveOldValueForDelta = event.getDeltaBytes() != null
&& event.getRawNewValue() == null;
lockForCacheModification(owner, event);
IndexManager oqlIndexManager = null;
try {
// take read lock for SQLF index initializations if required; the index
// GII lock is for any updates that may come in while index is being
// loaded during replay see bug #41377; this will go away once we allow
// for indexes to be loaded completely in parallel (#40899); need to
// take this lock before the RegionEntry lock else a deadlock can happen
// between this thread and index loading thread that will first take the
// corresponding write lock on the IndexUpdater
if (indexManager != null) {
sqlfIndexLocked = indexManager.lockForIndexGII();
}
// Fix for Bug #44431. We do NOT want to update the region and wait
// later for index INIT as region.clear() can cause inconsistency if
// happened in parallel as it also does index INIT.
oqlIndexManager = owner.getIndexManager() ;
if (oqlIndexManager != null) {
oqlIndexManager.waitForIndexInit();
}
// fix for bug #42169, replace must go to server if entry not on client
boolean replaceOnClient = event.getOperation() == Operation.REPLACE
&& owner.getServerProxy() != null;
// Rather than having two different blocks for synchronizing oldRe
// and newRe, have only one block and synchronize re
RegionEntry re = null;
boolean eventRecorded = false;
boolean onlyExisting = ifOld && !replaceOnClient;
re = getOrCreateRegionEntry(owner, event,
Token.REMOVED_PHASE1, null, onlyExisting, false);
if (re == null) {
throwExceptionForSqlFire(event);
return null;
}
while (true) {
synchronized (re) {
// if the re goes into removed2 state, it will be removed
// from the map. otherwise we can append an event to it
// and change its state
if (re.isRemovedPhase2()) {
re = getOrCreateRegionEntry(owner, event,
Token.REMOVED_PHASE1, null, onlyExisting, false);
_getOwner().getCachePerfStats().incRetries();
if (re == null) {
// this will happen when onlyExisting is true
throwExceptionForSqlFire(event);
return null;
}
continue;
} else {
@Released Object oldValueForDelta = null;
if (retrieveOldValueForDelta) {
// defer the lruUpdateCallback to prevent a deadlock (see bug 51121).
final boolean disabled = disableLruUpdateCallback();
try {
// Old value is faulted in from disk if not found in memory.
oldValueForDelta = re.getValue(owner); // OFFHEAP: if we are synced on oldRe no issue since we can use ARE's ref
} finally {
if (disabled) {
enableLruUpdateCallback();
}
}
}
try {
event.setRegionEntry(re);
// set old value in event
setOldValueInEvent(event, re, cacheWrite, requireOldValue);
if (!continueUpdate(re, event, ifOld, replaceOnClient)) {
return null;
}
// overwrite destroyed?
if (!continueOverwriteDestroyed(re, event, overwriteDestroyed, ifNew)) {
return null;
}
// check expectedOldValue
if (!satisfiesExpectedOldValue(event, re, expectedOldValue, replaceOnClient)) {
return null;
}
// invoke cacheWriter
invokeCacheWriter(re, event, cacheWrite, cacheWriter,
netWriteRecipients, requireOldValue, expectedOldValue, replaceOnClient);
// notify index of an update
notifyIndex(re, true);
try {
try {
if ((cacheWrite && event.getOperation().isUpdate()) // if there is a cacheWriter, type of event has already been set
|| !re.isRemoved()
|| replaceOnClient) {
// update
updateEntry(event, requireOldValue, oldValueForDelta, re);
} else {
// create
createEntry(event, owner, re);
}
owner.recordEvent(event);
eventRecorded = true;
} catch (RegionClearedException rce) {
clearOccured = true;
owner.recordEvent(event);
} catch (ConcurrentCacheModificationException ccme) {
VersionTag tag = event.getVersionTag();
if (tag != null && tag.isTimeStampUpdated()) {
// Notify gateways of new time-stamp.
owner.notifyTimestampsToGateways(event);
}
throw ccme;
}
if (uninitialized) {
event.inhibitCacheListenerNotification(true);
}
updateLru(clearOccured, re, event);
lastModifiedTime = owner.basicPutPart2(event, re,
!uninitialized, lastModifiedTime, clearOccured);
} finally {
notifyIndex(re, false);
}
result = re;
break;
} finally {
OffHeapHelper.release(oldValueForDelta);
if (re != null && !onlyExisting && !isOpComplete(re, event)) {
owner.cleanUpOnIncompleteOp(event, re, eventRecorded,
false/* updateStats */, replaceOnClient);
}
else if (re != null && owner.isUsedForPartitionedRegionBucket()) {
BucketRegion br = (BucketRegion)owner;
CachePerfStats stats = br.getPartitionedRegion().getCachePerfStats();
long startTime= stats.startCustomEviction();
CustomEvictionAttributes csAttr = br.getCustomEvictionAttributes();
// No need to update indexes if entry was faulted in but operation did not succeed.
if (csAttr != null && (csAttr.isEvictIncoming() || re.isMarkedForEviction())) {
if (csAttr.getCriteria().doEvict(event)) {
stats.incEvictionsInProgress();
// set the flag on event saying the entry should be evicted
// and not indexed
EntryEventImpl destroyEvent = EntryEventImpl.create (owner, Operation.DESTROY, event.getKey(),
null/* newValue */, null, false, owner.getMyId());
try {
destroyEvent.setOldValueFromRegion();
destroyEvent.setCustomEviction(true);
destroyEvent.setPossibleDuplicate(event.isPossibleDuplicate());
if(logger.isDebugEnabled()) {
logger.debug("Evicting the entry " + destroyEvent);
}
if(result != null) {
removeEntry(event.getKey(),re, true, destroyEvent,owner, indexUpdater);
}
else{
removeEntry(event.getKey(),re, true, destroyEvent,owner, null);
}
//mark the region entry for this event as evicted
event.setEvicted();
stats.incEvictions();
if(logger.isDebugEnabled()) {
logger.debug("Evicted the entry " + destroyEvent);
}
//removeEntry(event.getKey(), re);
} finally {
destroyEvent.release();
stats.decEvictionsInProgress();
}
} else {
re.clearMarkedForEviction();
}
}
stats.endCustomEviction(startTime);
}
} // try
}
} // sync re
}// end while
} catch (DiskAccessException dae) {
//Asif:Feel that it is safe to destroy the region here as there appears
// to be no chance of deadlock during region destruction
result = null;
this._getOwner().handleDiskAccessException(dae);
throw dae;
} finally {
releaseCacheModificationLock(owner, event);
if (sqlfIndexLocked) {
indexManager.unlockForIndexGII();
}
if (oqlIndexManager != null) {
oqlIndexManager.countDownIndexUpdaters();
}
if (result != null) {
try {
// Note we do distribution after releasing all sync to avoid deadlock
final boolean invokeListeners = event.basicGetNewValue() != Token.TOMBSTONE;
owner.basicPutPart3(event, result, !uninitialized,
lastModifiedTime, invokeListeners, ifNew, ifOld, expectedOldValue, requireOldValue);
} catch (EntryExistsException eee) {
// SQLFabric changes BEGIN
// ignore EntryExistsException in distribution from a non-empty
// region since actual check will be done in this put itself
// and it can happen in distribution if put comes in from
// GII as well as distribution channel
if (indexManager != null) {
if (logger.isTraceEnabled()) {
logger.trace("basicPut: ignoring EntryExistsException in distribution {}", eee);
}
}
else {
// can this happen for non-SQLFabric case?
throw eee;
}
// SQLFabric changes END
} finally {
// bug 32589, post update may throw an exception if exception occurs
// for any recipients
if (!clearOccured) {
try {
lruUpdateCallback();
} catch( DiskAccessException dae) {
//Asif:Feel that it is safe to destroy the region here as there appears
// to be no chance of deadlock during region destruction
result = null;
this._getOwner().handleDiskAccessException(dae);
throw dae;
}
}
} // finally
} else {
resetThreadLocals();
}
} // finally
return result;
}
/**
* If the value in the VM is still REMOVED_PHASE1 Token, then the operation
* was not completed (due to cacheWriter exception, concurrentMap operation) etc.
*/
private boolean isOpComplete(RegionEntry re, EntryEventImpl event) {
if (re.getValueAsToken() == Token.REMOVED_PHASE1) {
return false;
}
return true;
}
private boolean satisfiesExpectedOldValue(EntryEventImpl event,
RegionEntry re, Object expectedOldValue, boolean replaceOnClient) {
// replace is propagated to server, so no need to check
// satisfiesOldValue on client
if (expectedOldValue != null && !replaceOnClient) {
SimpleMemoryAllocatorImpl.skipRefCountTracking();
@Retained @Released Object v = re._getValueRetain(event.getLocalRegion(), true);
SimpleMemoryAllocatorImpl.unskipRefCountTracking();
try {
if (!AbstractRegionEntry.checkExpectedOldValue(expectedOldValue, v, event.getLocalRegion())) {
return false;
}
} finally {
OffHeapHelper.releaseWithNoTracking(v);
}
}
return true;
}
// Asif: If the new value is an instance of SerializableDelta, then
// the old value requirement is a must & it needs to be faulted in
// if overflown to disk without affecting LRU? This is needed for
// Sql Fabric.
// [sumedh] store both the value in VM and the value in VM or disk;
// the former is used for updating the VM size calculations, while
// the latter is used in other places like passing to
// SqlfIndexManager or setting the old value in the event; this is
// required since using the latter for updating the size
// calculations will be incorrect in case the value was read from
// disk but not brought into the VM like what getValueInVMOrDisk
// method does when value is not found in VM
// PRECONDITION: caller must be synced on re
private void setOldValueInEvent(EntryEventImpl event, RegionEntry re, boolean cacheWrite, boolean requireOldValue) {
boolean needToSetOldValue = getIndexUpdater() != null || cacheWrite || requireOldValue || event.getOperation().guaranteesOldValue();
if (needToSetOldValue) {
if (event.hasDelta() || event.getOperation().guaranteesOldValue()
|| GemFireCacheImpl.sqlfSystem()) {
// In these cases we want to even get the old value from disk if it is not in memory
SimpleMemoryAllocatorImpl.skipRefCountTracking();
@Released Object oldValueInVMOrDisk = re.getValueOffHeapOrDiskWithoutFaultIn(event.getLocalRegion());
SimpleMemoryAllocatorImpl.unskipRefCountTracking();
try {
event.setOldValue(oldValueInVMOrDisk, requireOldValue
|| GemFireCacheImpl.sqlfSystem());
} finally {
OffHeapHelper.releaseWithNoTracking(oldValueInVMOrDisk);
}
} else {
// In these cases only need the old value if it is in memory
SimpleMemoryAllocatorImpl.skipRefCountTracking();
@Retained @Released Object oldValueInVM = re._getValueRetain(event.getLocalRegion(), true); // OFFHEAP: re synced so can use its ref.
SimpleMemoryAllocatorImpl.unskipRefCountTracking();
try {
event.setOldValue(oldValueInVM,
requireOldValue || GemFireCacheImpl.sqlfSystem());
} finally {
OffHeapHelper.releaseWithNoTracking(oldValueInVM);
}
}
} else {
// if the old value is in memory then if it is a GatewaySenderEventImpl then
// we want to set the old value.
@Unretained Object ov = re._getValue(); // OFFHEAP _getValue is ok since re is synced and we only use it if its a GatewaySenderEventImpl.
// Since GatewaySenderEventImpl is never stored in an off-heap region nor a compressed region we don't need to worry about ov being compressed.
if (ov instanceof GatewaySenderEventImpl) {
event.setOldValue(ov, true);
}
}
}
/**
* Asif: If the system is sqlfabric and the event has delta, then re == null
* implies update on non existent row . Throwing ENFE in that case
* As returning a boolean etc has other complications in terms of PR reattempt etc
*/
private void throwExceptionForSqlFire(EntryEventImpl event) {
if (event.hasDelta() && _getOwner().getGemFireCache().isSqlfSystem()) {
throw new EntryNotFoundException(
"SqlFabric::No row found for update");
}
}
protected void createEntry(EntryEventImpl event, final LocalRegion owner,
RegionEntry re) throws RegionClearedException {
final boolean wasTombstone = re.isTombstone();
processVersionTag(re, event);
event.putNewEntry(owner, re);
updateSize(event, 0, false, wasTombstone);
if (!event.getLocalRegion().isInitialized()) {
owner.getImageState().removeDestroyedEntry(event.getKey());
}
}
protected void updateEntry(EntryEventImpl event, boolean requireOldValue,
Object oldValueForDelta, RegionEntry re) throws RegionClearedException {
final int oldSize = event.getLocalRegion().calculateRegionEntryValueSize(re);
final boolean wasTombstone = re.isTombstone();
processVersionTag(re, event);
event.putExistingEntry(event.getLocalRegion(), re, requireOldValue,
oldValueForDelta);
EntryLogger.logPut(event);
updateSize(event, oldSize, true/* isUpdate */, wasTombstone);
}
private void updateLru(boolean clearOccured, RegionEntry re, EntryEventImpl event) {
if (!clearOccured) {
if (event.getOperation().isCreate()) {
lruEntryCreate(re);
} else {
lruEntryUpdate(re);
}
}
}
private void updateSize(EntryEventImpl event, int oldSize, boolean isUpdate, boolean wasTombstone) {
if (isUpdate && !wasTombstone) {
_getOwner().updateSizeOnPut(event.getKey(), oldSize, event.getNewValueBucketSize());
} else {
_getOwner().updateSizeOnCreate(event.getKey(), event.getNewValueBucketSize());
if (!wasTombstone) {
incEntryCount(1);
}
}
}
private void notifyIndex(RegionEntry re, boolean isUpdating) {
if (_getOwner().indexMaintenanceSynchronous) {
re.setUpdateInProgress(isUpdating);
}
}
private void invokeCacheWriter(RegionEntry re, EntryEventImpl event,
boolean cacheWrite, CacheWriter cacheWriter, Set netWriteRecipients,
boolean requireOldValue, Object expectedOldValue, boolean replaceOnClient) {
// invoke listeners only if region is initialized
if (_getOwner().isInitialized() && cacheWrite) {
// event.setOldValue already called in setOldValueInEvent
// bug #42638 for replaceOnClient, do not make the event create
// or update since replace must propagate to server
if (!replaceOnClient) {
if (re.isDestroyedOrRemoved()) {
event.makeCreate();
} else {
event.makeUpdate();
}
}
_getOwner().cacheWriteBeforePut(event, netWriteRecipients, cacheWriter,
requireOldValue, expectedOldValue);
}
if (!_getOwner().isInitialized() && !cacheWrite) {
// block setting of old value in putNewValueNoSync, don't
// need it
event.oldValueNotAvailable();
}
}
private boolean continueOverwriteDestroyed(RegionEntry re,
EntryEventImpl event, boolean overwriteDestroyed, boolean ifNew) {
Token oldValueInVM = re.getValueAsToken();
// if region is under GII, check if token is destroyed
if (!overwriteDestroyed) {
if (!_getOwner().isInitialized() && (oldValueInVM == Token.DESTROYED || oldValueInVM == Token.TOMBSTONE)) {
event.setOldValueDestroyedToken();
return false;
}
}
if (ifNew && !Token.isRemoved(oldValueInVM)) {
return false;
}
return true;
}
private boolean continueUpdate(RegionEntry re, EntryEventImpl event,
boolean ifOld, boolean replaceOnClient) {
if (ifOld) {
// only update, so just do tombstone maintainence and exit
if (re.isTombstone() && event.getVersionTag() != null) {
// refresh the tombstone so it doesn't time out too soon
processVersionTag(re, event);
try {
re.setValue(_getOwner(), Token.TOMBSTONE);
} catch (RegionClearedException e) {
// that's okay - when writing a tombstone into a disk, the
// region has been cleared (including this tombstone)
}
_getOwner().rescheduleTombstone(re, re.getVersionStamp().asVersionTag());
return false;
}
if (re.isRemoved() && !replaceOnClient) {
return false;
}
}
return true;
}
protected boolean destroyEntry(RegionEntry re, EntryEventImpl event,
boolean inTokenMode, boolean cacheWrite, @Released Object expectedOldValue,
boolean forceDestroy, boolean removeRecoveredEntry)
throws CacheWriterException, TimeoutException, EntryNotFoundException,
RegionClearedException {
processVersionTag(re, event);
final int oldSize = _getOwner().calculateRegionEntryValueSize(re);
boolean retVal = re.destroy(event.getLocalRegion(), event, inTokenMode,
cacheWrite, expectedOldValue, forceDestroy, removeRecoveredEntry);
if (retVal) {
EntryLogger.logDestroy(event);
_getOwner().updateSizeOnRemove(event.getKey(), oldSize);
}
return retVal;
}
public void txApplyPut(Operation p_putOp, Object key, Object nv,
boolean didDestroy, TransactionId txId, TXRmtEvent txEvent,
EventID eventId, Object aCallbackArgument,List<EntryEventImpl> pendingCallbacks,FilterRoutingInfo filterRoutingInfo,ClientProxyMembershipID bridgeContext, TXEntryState txEntryState, VersionTag versionTag, long tailKey)
{
final LocalRegion owner = _getOwner();
if (owner == null) {
// "fix" for bug 32440
Assert.assertTrue(false, "The owner for RegionMap " + this
+ " is null");
}
Operation putOp = p_putOp;
owner.checkBeforeEntrySync(txEvent);
Object newValue = nv;
final boolean hasRemoteOrigin = !((TXId)txId).getMemberId().equals(owner.getMyId());
final boolean isTXHost = txEntryState != null;
final boolean isClientTXOriginator = owner.cache.isClient() && !hasRemoteOrigin;
final boolean isRegionReady = owner.isInitialized();
EntryEventImpl cbEvent = null;
EntryEventImpl sqlfEvent = null;
boolean invokeCallbacks = shouldCreateCBEvent(owner, false /*isInvalidate*/, isRegionReady);
boolean cbEventInPending = false;
cbEvent = createCBEvent(owner, putOp, key, newValue, txId,
txEvent, eventId, aCallbackArgument,filterRoutingInfo,bridgeContext, txEntryState, versionTag, tailKey);
try {
if (logger.isDebugEnabled()) {
logger.debug("txApplyPut cbEvent={}", cbEvent);
}
if (owner.isUsedForPartitionedRegionBucket()) {
newValue = EntryEventImpl.getCachedDeserializable(nv, cbEvent);
txHandleWANEvent(owner, cbEvent, txEntryState);
}
if (/*owner.isUsedForPartitionedRegionBucket() && */
(getIndexUpdater() != null ||
(newValue instanceof com.gemstone.gemfire.internal.cache.delta.Delta))) {
sqlfEvent = createCBEvent(owner, putOp, key, newValue, txId,
txEvent, eventId, aCallbackArgument,filterRoutingInfo,bridgeContext, txEntryState, versionTag, tailKey);
}
boolean opCompleted = false;
// Fix for Bug #44431. We do NOT want to update the region and wait
// later for index INIT as region.clear() can cause inconsistency if
// happened in parallel as it also does index INIT.
IndexManager oqlIndexManager = owner.getIndexManager() ;
if (oqlIndexManager != null) {
oqlIndexManager.waitForIndexInit();
}
lockForTXCacheModification(owner, versionTag);
try {
if (hasRemoteOrigin && !isTXHost && !isClientTXOriginator) {
// If we are not a mirror then only apply the update to existing
// entries
//
// If we are a mirror then then only apply the update to
// existing entries when the operation is an update and we
// are initialized.
// Otherwise use the standard create/update logic
if (!owner.isAllEvents() || (!putOp.isCreate() && isRegionReady)) {
// At this point we should only apply the update if the entry exists
RegionEntry re = getEntry(key); // Fix for bug 32347.
if (re != null) {
synchronized (re) {
if (!re.isRemoved()) {
opCompleted = true;
putOp = putOp.getCorrespondingUpdateOp();
// Net writers are not called for received transaction data
final int oldSize = owner.calculateRegionEntryValueSize(re);
if (cbEvent != null) {
cbEvent.setRegionEntry(re);
cbEvent.setOldValue(re.getValueInVM(owner)); // OFFHEAP eei
}
if (sqlfEvent != null) {
sqlfEvent.setOldValue(re.getValueInVM(owner)); // OFFHEAP eei
}
boolean clearOccured = false;
// Set RegionEntry updateInProgress
if (owner.indexMaintenanceSynchronous) {
re.setUpdateInProgress(true);
}
try {
txRemoveOldIndexEntry(putOp, re);
if (didDestroy) {
re.txDidDestroy(owner.cacheTimeMillis());
}
if (txEvent != null) {
txEvent.addPut(putOp, owner, re, re.getKey(), newValue,aCallbackArgument);
}
re.setValueResultOfSearch(putOp.isNetSearch());
try {
// Rahul: applies the delta and sets the new value in
// region entry (required for sqlfabric delta).
processAndGenerateTXVersionTag(owner, cbEvent, re, txEntryState);
if (newValue instanceof com.gemstone.gemfire.internal.cache.delta.Delta
&& sqlfEvent != null) {
//cbEvent.putExistingEntry(owner, re);
sqlfEvent.putExistingEntry(owner, re);
} else {
re.setValue(owner, re.prepareValueForCache(owner, newValue, cbEvent, !putOp.isCreate()));
}
if (putOp.isCreate()) {
owner.updateSizeOnCreate(key, owner.calculateRegionEntryValueSize(re));
} else if (putOp.isUpdate()) {
// Rahul : fix for 41694. Negative bucket size can also be
// an issue with normal GFE Delta and will have to be fixed
// in a similar manner and may be this fix the the one for
// other delta can be combined.
if (sqlfEvent != null) {
owner.updateSizeOnPut(key, oldSize, sqlfEvent.getNewValueBucketSize());
} else {
owner.updateSizeOnPut(key, oldSize, owner.calculateRegionEntryValueSize(re));
}
}
}
catch (RegionClearedException rce) {
clearOccured = true;
}
{
long lastMod = owner.cacheTimeMillis();
EntryLogger.logTXPut(_getOwnerObject(), key, nv);
re.updateStatsForPut(lastMod);
owner.txApplyPutPart2(re, re.getKey(), newValue, lastMod,
false, didDestroy, clearOccured);
}
} finally {
if (re != null && owner.indexMaintenanceSynchronous) {
re.setUpdateInProgress(false);
}
}
if (invokeCallbacks) {
cbEvent.makeUpdate();
switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
if(pendingCallbacks==null) {
owner.invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE,
cbEvent, hasRemoteOrigin);
} else {
pendingCallbacks.add(cbEvent);
cbEventInPending = true;
}
}
if (!clearOccured) {
lruEntryUpdate(re);
}
}
}
if (didDestroy && !opCompleted) {
owner
.txApplyInvalidatePart2(re, re.getKey(), true, false, false /* clear*/);
}
}
if (owner.concurrencyChecksEnabled && txEntryState != null && cbEvent != null) {
txEntryState.setVersionTag(cbEvent.getVersionTag());
}
return;
}
}
RegionEntry newRe = getEntryFactory().createEntry(owner, key,
Token.REMOVED_PHASE1);
synchronized (newRe) {
try {
RegionEntry oldRe = putEntryIfAbsent(key, newRe);
while (!opCompleted && oldRe != null) {
synchronized (oldRe) {
if (oldRe.isRemovedPhase2()) {
oldRe = putEntryIfAbsent(key, newRe);
if (oldRe != null) {
owner.getCachePerfStats().incRetries();
}
}
else {
opCompleted = true;
if (!oldRe.isRemoved()) {
putOp = putOp.getCorrespondingUpdateOp();
}
// Net writers are not called for received transaction data
final int oldSize = owner.calculateRegionEntryValueSize(oldRe);
final boolean oldIsRemoved = oldRe.isDestroyedOrRemoved();
if (cbEvent != null) {
cbEvent.setRegionEntry(oldRe);
cbEvent.setOldValue(oldRe.getValueInVM(owner)); // OFFHEAP eei
}
if (sqlfEvent != null) {
sqlfEvent.setOldValue(oldRe.getValueInVM(owner)); // OFFHEAP eei
}
boolean clearOccured = false;
// Set RegionEntry updateInProgress
if (owner.indexMaintenanceSynchronous) {
oldRe.setUpdateInProgress(true);
}
try {
txRemoveOldIndexEntry(putOp, oldRe);
if (didDestroy) {
oldRe.txDidDestroy(owner.cacheTimeMillis());
}
if (txEvent != null) {
txEvent.addPut(putOp, owner, oldRe, oldRe.getKey(), newValue,aCallbackArgument);
}
oldRe.setValueResultOfSearch(putOp.isNetSearch());
try {
processAndGenerateTXVersionTag(owner, cbEvent, oldRe, txEntryState);
boolean wasTombstone = oldRe.isTombstone();
if (newValue instanceof com.gemstone.gemfire.internal.cache.delta.Delta
&& sqlfEvent != null ) {
//cbEvent.putExistingEntry(owner, oldRe);
sqlfEvent.putExistingEntry(owner, oldRe);
} else {
oldRe.setValue(owner, oldRe.prepareValueForCache(owner, newValue, cbEvent, !putOp.isCreate()));
if (wasTombstone) {
owner.unscheduleTombstone(oldRe);
}
}
if (putOp.isCreate()) {
owner.updateSizeOnCreate(key, owner.calculateRegionEntryValueSize(oldRe));
} else if (putOp.isUpdate()) {
// Rahul : fix for 41694. Negative bucket size can also be
// an issue with normal GFE Delta and will have to be fixed
// in a similar manner and may be this fix the the one for
// other delta can be combined.
if (sqlfEvent != null) {
owner.updateSizeOnPut(key, oldSize, sqlfEvent.getNewValueBucketSize());
} else {
owner.updateSizeOnPut(key, oldSize, owner.calculateRegionEntryValueSize(oldRe));
}
}
}
catch (RegionClearedException rce) {
clearOccured = true;
}
{
long lastMod = System.currentTimeMillis();
EntryLogger.logTXPut(_getOwnerObject(), key, nv);
oldRe.updateStatsForPut(lastMod);
owner.txApplyPutPart2(oldRe, oldRe.getKey(), newValue,
lastMod, false, didDestroy, clearOccured);
}
} finally {
if (oldRe != null && owner.indexMaintenanceSynchronous) {
oldRe.setUpdateInProgress(false);
}
}
if (invokeCallbacks) {
if (!oldIsRemoved) {
cbEvent.makeUpdate();
}
switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
if(pendingCallbacks==null) {
owner.invokeTXCallbacks(cbEvent.op.isCreate() ? EnumListenerEvent.AFTER_CREATE : EnumListenerEvent.AFTER_UPDATE,
cbEvent, true/*callDispatchListenerEvent*/);
} else {
pendingCallbacks.add(cbEvent);
cbEventInPending = true;
}
}
if (!clearOccured) {
lruEntryUpdate(oldRe);
}
}
}
}
if (!opCompleted) {
putOp = putOp.getCorrespondingCreateOp();
if (cbEvent != null) {
cbEvent.setRegionEntry(newRe);
cbEvent.setOldValue(null);
}
boolean clearOccured = false;
// Set RegionEntry updateInProgress
if (owner.indexMaintenanceSynchronous) {
newRe.setUpdateInProgress(true);
}
try {
txRemoveOldIndexEntry(putOp, newRe);
// creating a new entry
if (didDestroy) {
newRe.txDidDestroy(owner.cacheTimeMillis());
}
if (txEvent != null) {
txEvent.addPut(putOp, owner, newRe, newRe.getKey(), newValue,aCallbackArgument);
}
newRe.setValueResultOfSearch(putOp.isNetSearch());
try {
processAndGenerateTXVersionTag(owner, cbEvent, newRe, txEntryState);
if (sqlfEvent != null ) {
sqlfEvent.putNewEntry(owner,newRe);
} else {
newRe.setValue(owner, newRe.prepareValueForCache(owner, newValue, cbEvent, !putOp.isCreate()));
}
owner.updateSizeOnCreate(newRe.getKey(), owner.calculateRegionEntryValueSize(newRe));
}
catch (RegionClearedException rce) {
clearOccured = true;
}
{
long lastMod = System.currentTimeMillis();
EntryLogger.logTXPut(_getOwnerObject(), key, nv);
newRe.updateStatsForPut(lastMod);
owner.txApplyPutPart2(newRe, newRe.getKey(), newValue, lastMod,
true, didDestroy, clearOccured);
}
} finally {
if (newRe != null && owner.indexMaintenanceSynchronous) {
newRe.setUpdateInProgress(false);
}
}
opCompleted = true;
if (invokeCallbacks) {
cbEvent.makeCreate();
cbEvent.setOldValue(null);
switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin);
if(pendingCallbacks==null) {
owner.invokeTXCallbacks(EnumListenerEvent.AFTER_CREATE, cbEvent,
true/*callDispatchListenerEvent*/);
} else {
pendingCallbacks.add(cbEvent);
cbEventInPending = true;
}
}
if (!clearOccured) {
lruEntryCreate(newRe);
incEntryCount(1);
}
}
}
finally {
if (!opCompleted) {
removeEntry(key, newRe, false);
}
}
}
if (owner.concurrencyChecksEnabled && txEntryState != null && cbEvent != null) {
txEntryState.setVersionTag(cbEvent.getVersionTag());
}
}catch( DiskAccessException dae) {
owner.handleDiskAccessException(dae);
throw dae;
}
finally {
releaseTXCacheModificationLock(owner, versionTag);
if (oqlIndexManager != null) {
oqlIndexManager.countDownIndexUpdaters();
}
}
} finally {
if (!cbEventInPending) cbEvent.release();
if (sqlfEvent != null) sqlfEvent.release();
}
}
private void txHandleWANEvent(final LocalRegion owner, EntryEventImpl cbEvent, TXEntryState txEntryState) {
((BucketRegion)owner).handleWANEvent(cbEvent);
if (txEntryState != null) {
txEntryState.setTailKey(cbEvent.getTailKey());
}
}
/**
* called from txApply* methods to process and generate versionTags.
*/
private void processAndGenerateTXVersionTag(final LocalRegion owner,
EntryEventImpl cbEvent, RegionEntry re, TXEntryState txEntryState) {
if (shouldPerformConcurrencyChecks(owner, cbEvent)) {
try {
if (txEntryState != null && txEntryState.getRemoteVersionTag() != null) {
// to generate a version based on a remote VersionTag, we will
// have to put the remote versionTag in the regionEntry
VersionTag remoteTag = txEntryState.getRemoteVersionTag();
if (re instanceof VersionStamp) {
VersionStamp stamp = (VersionStamp) re;
stamp.setVersions(remoteTag);
}
}
processVersionTag(re, cbEvent);
} catch (ConcurrentCacheModificationException ignore) {
// ignore this execption, however invoke callbacks for this operation
}
// For distributed transactions, stuff the next region version generated
// in phase-1 commit into the cbEvent so that ARE.generateVersionTag can later
// just apply it and not regenerate it in phase-2 commit
if (cbEvent != null && txEntryState != null && txEntryState.getDistTxEntryStates() != null) {
cbEvent.setNextRegionVersion(txEntryState.getDistTxEntryStates().getRegionVersion());
}
//cbEvent.setNextRegionVersion(txEntryState.getNextRegionVersion());
owner.generateAndSetVersionTag(cbEvent, re);
}
}
/**
* Checks for concurrency checks enabled on Region and that cbEvent is not null.
*/
private boolean shouldPerformConcurrencyChecks(LocalRegion owner, EntryEventImpl cbEvent) {
return owner.getConcurrencyChecksEnabled() && cbEvent != null;
}
/**
* Switch the event's region from BucketRegion to owning PR and set originRemote to the given value
*/
static EntryEventImpl switchEventOwnerAndOriginRemote(EntryEventImpl event, boolean originRemote) {
assert event != null;
if (event.getRegion().isUsedForPartitionedRegionBucket()) {
LocalRegion pr = event.getRegion().getPartitionedRegion();
event.setRegion(pr);
}
event.setOriginRemote(originRemote);
return event;
}
/**
* Removing the existing indexed value requires the current value in the cache,
* that is the one prior to applying the operation.
* @param op
* @param entry the RegionEntry that contains the value prior to applying the op
*/
private void txRemoveOldIndexEntry(Operation op, RegionEntry entry) {
if ((op.isUpdate() && !entry.isInvalid()) ||
op.isInvalidate() || op.isDestroy()) {
IndexManager idxManager = _getOwner().getIndexManager();
if (idxManager != null) {
try {
idxManager.updateIndexes(entry,
IndexManager.REMOVE_ENTRY,
op.isUpdate() ?
IndexProtocol.BEFORE_UPDATE_OP :
IndexProtocol.OTHER_OP);
} catch (QueryException e) {
throw new IndexMaintenanceException(e);
}
}
}
}
public void dumpMap() {
logger.debug("dump of concurrent map of size {} for region {}", this._getMap().size(), this._getOwner());
for (Iterator it = this._getMap().values().iterator(); it.hasNext(); ) {
logger.trace("dumpMap:"+it.next().toString());
}
}
static boolean shouldCreateCBEvent( final LocalRegion owner,
final boolean isInvalidate, final boolean isInitialized)
{
LocalRegion lr = owner;
boolean isPartitioned = lr.isUsedForPartitionedRegionBucket();
if(isPartitioned){
/* if(!((BucketRegion)lr).getBucketAdvisor().isPrimary()) {
if(!BucketRegion.FORCE_LOCAL_LISTENERS_INVOCATION) {
return false;
}
}*/
lr = owner.getPartitionedRegion();
}
if (isInvalidate) { // ignore shouldNotifyGatewayHub check for invalidates
return (isPartitioned || isInitialized)
&& (lr.shouldDispatchListenerEvent()
|| lr.shouldNotifyBridgeClients()
|| lr.getConcurrencyChecksEnabled());
} else {
return (isPartitioned || isInitialized)
&& (lr.shouldDispatchListenerEvent()
|| lr.shouldNotifyBridgeClients()
|| lr.getConcurrencyChecksEnabled());
}
}
/** create a callback event for applying a transactional change to the local cache */
public static final EntryEventImpl createCBEvent(final LocalRegion re,
Operation op, Object key, Object newValue, TransactionId txId,
TXRmtEvent txEvent,EventID eventId, Object aCallbackArgument,FilterRoutingInfo filterRoutingInfo,ClientProxyMembershipID bridgeContext, TXEntryState txEntryState, VersionTag versionTag, long tailKey)
{
DistributedMember originator = null ;
//txId should not be null even on localOrigin
Assert.assertTrue(txId != null);
originator = ((TXId)txId).getMemberId();
LocalRegion eventRegion = re;
if (eventRegion.isUsedForPartitionedRegionBucket()) {
eventRegion = re.getPartitionedRegion();
}
EntryEventImpl retVal = EntryEventImpl.create(
re, op, key, newValue,
aCallbackArgument,
txEntryState == null, originator);
boolean returnedRetVal = false;
try {
if(bridgeContext!=null) {
retVal.setContext(bridgeContext);
}
if (eventRegion.generateEventID()) {
retVal.setEventId(eventId);
}
if (versionTag != null) {
retVal.setVersionTag(versionTag);
}
retVal.setTailKey(tailKey);
FilterInfo localRouting = null;
boolean computeFilterInfo = false;
if (filterRoutingInfo == null) {
computeFilterInfo = true;
} else {
localRouting = filterRoutingInfo.getLocalFilterInfo();
if (localRouting != null) {
// routing was computed in this VM but may need to perform local interest processing
computeFilterInfo = !filterRoutingInfo.hasLocalInterestBeenComputed();
} else {
// routing was computed elsewhere and is in the "remote" routing table
localRouting = filterRoutingInfo.getFilterInfo(re.getMyId());
}
if (localRouting != null) {
if (!computeFilterInfo) {
retVal.setLocalFilterInfo(localRouting);
}
} else {
computeFilterInfo = true;
}
}
if (logger.isTraceEnabled()) {
logger.trace("createCBEvent filterRouting={} computeFilterInfo={} local routing={}", filterRoutingInfo, computeFilterInfo, localRouting);
}
if (re.isUsedForPartitionedRegionBucket()) {
BucketRegion bucket = (BucketRegion)re;
if(BucketRegion.FORCE_LOCAL_LISTENERS_INVOCATION || bucket.getBucketAdvisor().isPrimary()) {
retVal.setInvokePRCallbacks(true);
} else {
retVal.setInvokePRCallbacks(false);
}
if (computeFilterInfo) {
if (bucket.getBucketAdvisor().isPrimary()) {
if (logger.isTraceEnabled()) {
logger.trace("createCBEvent computing routing for primary bucket");
}
FilterProfile fp = ((BucketRegion)re).getPartitionedRegion().getFilterProfile();
if (fp != null) {
FilterRoutingInfo fri = fp.getFilterRoutingInfoPart2(filterRoutingInfo, retVal);
if (fri != null) {
retVal.setLocalFilterInfo(fri.getLocalFilterInfo());
}
}
}
}
} else if (computeFilterInfo) { // not a bucket
if (logger.isTraceEnabled()) {
logger.trace("createCBEvent computing routing for non-bucket");
}
FilterProfile fp = re.getFilterProfile();
if (fp != null) {
retVal.setLocalFilterInfo(fp.getLocalFilterRouting(retVal));
}
}
retVal.setTransactionId(txId);
returnedRetVal = true;
return retVal;
} finally {
if (!returnedRetVal) {
retVal.release();
}
}
}
public final void writeSyncIfPresent(Object key, Runnable runner)
{
RegionEntry re = getEntry(key);
if (re != null) {
final boolean disabled = disableLruUpdateCallback();
try {
synchronized (re) {
if (!re.isRemoved()) {
runner.run();
}
}
}
finally {
if (disabled) {
enableLruUpdateCallback();
}
try {
lruUpdateCallback();
}catch(DiskAccessException dae) {
this._getOwner().handleDiskAccessException(dae);
throw dae;
}
}
}
}
public final void removeIfDestroyed(Object key)
{
LocalRegion owner = _getOwner();
// boolean makeTombstones = owner.concurrencyChecksEnabled;
DiskRegion dr = owner.getDiskRegion();
RegionEntry re = getEntry(key);
if (re != null) {
if (re.isDestroyed()) {
synchronized (re) {
if (re.isDestroyed()) {
// [bruce] destroyed entries aren't in the LRU clock, so they can't be retained here
// if (makeTombstones) {
// re.makeTombstone(owner, re.getVersionStamp().asVersionTag());
// } else {
re.removePhase2();
removeEntry(key, re, true);
}
}
}
}
// }
}
/** get version-generation permission from the region's version vector */
private void lockForCacheModification(LocalRegion owner, EntryEventImpl event) {
boolean lockedByBulkOp = event.isBulkOpInProgress() && owner.dataPolicy.withReplication();
if (!event.isOriginRemote() && !lockedByBulkOp) {
RegionVersionVector vector = owner.getVersionVector();
if (vector != null) {
vector.lockForCacheModification(owner);
}
}
}
/** release version-generation permission from the region's version vector */
private void releaseCacheModificationLock(LocalRegion owner, EntryEventImpl event) {
boolean lockedByBulkOp = event.isBulkOpInProgress() && owner.dataPolicy.withReplication();
if (!event.isOriginRemote() && !lockedByBulkOp) {
RegionVersionVector vector = owner.getVersionVector();
if (vector != null) {
vector.releaseCacheModificationLock(owner);
}
}
}
/** get version-generation permission from the region's version vector */
private void lockForTXCacheModification(LocalRegion owner, VersionTag tag) {
if ( !(tag != null && tag.isFromOtherMember()) ) {
RegionVersionVector vector = owner.getVersionVector();
if (vector != null) {
vector.lockForCacheModification(owner);
}
}
}
/** release version-generation permission from the region's version vector */
private void releaseTXCacheModificationLock(LocalRegion owner, VersionTag tag) {
if ( !(tag != null && tag.isFromOtherMember()) ) {
RegionVersionVector vector = owner.getVersionVector();
if (vector != null) {
vector.releaseCacheModificationLock(owner);
}
}
}
public final void unscheduleTombstone(RegionEntry re) {
}
/**
* for testing race conditions between threads trying to apply ops to the
* same entry
* @param entry the entry to attempt to add to the system
*/
protected final RegionEntry putEntryIfAbsentForTest(RegionEntry entry) {
return (RegionEntry)putEntryIfAbsent(entry.getKey(), entry);
}
public boolean isTombstoneNotNeeded(RegionEntry re, int destroyedVersion) {
// no need for synchronization - stale values are okay here
RegionEntry actualRe = getEntry(re.getKey());
// TODO this looks like a problem for regionEntry pooling
if (actualRe != re) { // null actualRe is okay here
return true; // tombstone was evicted at some point
}
VersionStamp vs = re.getVersionStamp();
if (vs == null) {
// if we have no VersionStamp why were we even added as a tombstone?
// We used to see an NPE here. See bug 52092.
logger.error("Unexpected RegionEntry scheduled as tombstone: re.getClass {} destroyedVersion {}", re.getClass(), destroyedVersion);
return true;
}
int entryVersion = vs.getEntryVersion();
boolean isSameTombstone = (entryVersion == destroyedVersion && re.isTombstone());
return !isSameTombstone;
}
/** removes a tombstone that has expired locally */
public final boolean removeTombstone(RegionEntry re, VersionHolder version, boolean isEviction, boolean isScheduledTombstone) {
boolean result = false;
int destroyedVersion = version.getEntryVersion();
DiskRegion dr = this._getOwner().getDiskRegion();
synchronized(this._getOwner().getSizeGuard()) { // do this sync first; see bug 51985
synchronized (re) {
int entryVersion = re.getVersionStamp().getEntryVersion();
boolean isTombstone = re.isTombstone();
boolean isSameTombstone = (entryVersion == destroyedVersion && isTombstone);
if (isSameTombstone || (isTombstone && entryVersion < destroyedVersion)) {
if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
// logs are at info level for TomstoneService.DEBUG_TOMBSTONE_COUNT so customer doesn't have to use fine level
if (isSameTombstone) {
// logging this can put tremendous pressure on the log writer in tests
// that "wait for silence"
logger.trace(LogMarker.TOMBSTONE_COUNT,
"removing tombstone for {} with v{} rv{}; count is {}",
re.getKey(), destroyedVersion, version.getRegionVersion(), (this._getOwner().getTombstoneCount() - 1));
} else {
logger.trace(LogMarker.TOMBSTONE_COUNT, "removing entry (v{}) that is older than an expiring tombstone (v{} rv{}) for {}",
entryVersion, destroyedVersion, version.getRegionVersion(), re.getKey());
}
}
try {
re.setValue(_getOwner(), Token.REMOVED_PHASE2);
if (removeTombstone(re)) {
result = true;
incEntryCount(-1);
// Bug 51118: When the method is called by tombstoneGC thread, current 're' is an
// expired tombstone. Then we detected an destroyed (due to overwritingOldTombstone()
// returns true earlier) tombstone with bigger entry version, it's safe to delete
// current tombstone 're' and adjust the tombstone count.
// lruEntryDestroy(re); // tombstones are invisible to LRU
if (isScheduledTombstone) {
_getOwner().incTombstoneCount(-1);
}
_getOwner().getVersionVector().recordGCVersion(version.getMemberID(), version.getRegionVersion());
}
} catch (RegionClearedException e) {
// if the region has been cleared we don't need to remove the tombstone
} catch (RegionDestroyedException e) {
//if the region has been destroyed, the tombstone is already
//gone. Catch an exception to avoid an error from the GC thread.
}
} else {
if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT)) {
logger.trace(LogMarker.TOMBSTONE_COUNT,
"tombstone for {} was resurrected with v{}; destroyed version was v{}; count is {}; entryMap size is {}",
re.getKey(), re.getVersionStamp().getEntryVersion(), destroyedVersion, this._getOwner().getTombstoneCount(), size());
}
}
}
}
return result;
}
protected boolean removeTombstone(RegionEntry re) {
return _getMap().remove(re.getKey(), re);
}
// method used for debugging tombstone count issues
public boolean verifyTombstoneCount(AtomicInteger numTombstones) {
int deadEntries = 0;
try {
for (Iterator it=_getMap().values().iterator(); it.hasNext(); ) {
RegionEntry re = (RegionEntry)it.next();
if (re.isTombstone()) {
deadEntries++;
}
}
if (deadEntries != numTombstones.get()) {
if (logger.isDebugEnabled()) {
logger.debug("tombstone count ({}) does not match actual number of tombstones ({})",
numTombstones, deadEntries, new Exception());
}
return false;
} else {
if (logger.isDebugEnabled()) {
logger.debug("tombstone count verified");
}
}
} catch (Exception e) {
// ignore
}
return true;
}
}