blob: f1f765e39ba70c15f1f2842d7fc2296394a8b96b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.Logger;
import org.apache.geode.GemFireIOException;
import org.apache.geode.InvalidDeltaException;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.TimeoutException;
import org.apache.geode.cache.TransactionId;
import org.apache.geode.cache.query.IndexMaintenanceException;
import org.apache.geode.cache.query.QueryException;
import org.apache.geode.cache.query.internal.index.IndexManager;
import org.apache.geode.cache.query.internal.index.IndexProtocol;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.DiskInitFile.DiskRegionFlag;
import org.apache.geode.internal.cache.entries.AbstractOplogDiskRegionEntry;
import org.apache.geode.internal.cache.entries.AbstractRegionEntry;
import org.apache.geode.internal.cache.entries.DiskEntry;
import org.apache.geode.internal.cache.entries.OffHeapRegionEntry;
import org.apache.geode.internal.cache.map.CacheModificationLock;
import org.apache.geode.internal.cache.map.FocusedRegionMap;
import org.apache.geode.internal.cache.map.RegionMapCommitPut;
import org.apache.geode.internal.cache.map.RegionMapDestroy;
import org.apache.geode.internal.cache.map.RegionMapPut;
import org.apache.geode.internal.cache.persistence.DiskRegionView;
import org.apache.geode.internal.cache.region.entry.RegionEntryFactoryBuilder;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.cache.versions.VersionHolder;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.versions.VersionStamp;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.offheap.OffHeapHelper;
import org.apache.geode.internal.offheap.OffHeapRegionEntryHelper;
import org.apache.geode.internal.offheap.StoredObject;
import org.apache.geode.internal.offheap.annotations.Released;
import org.apache.geode.internal.offheap.annotations.Retained;
import org.apache.geode.internal.sequencelog.EntryLogger;
import org.apache.geode.internal.size.ReflectionSingleObjectSizer;
import org.apache.geode.internal.util.concurrent.ConcurrentMapWithReusableEntries;
import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* Abstract implementation of {@link RegionMap}that has all the common behavior.
*
* @since GemFire 3.5.1
*/
public abstract class AbstractRegionMap extends BaseRegionMap
implements FocusedRegionMap, CacheModificationLock {
private static final Logger logger = LogService.getLogger();
private final TxCallbackEventFactory txCallbackEventFactory = new TxCallbackEventFactoryImpl();
/** The underlying map for this region. */
protected ConcurrentMapWithReusableEntries<Object, Object> map;
/**
* This test hook is used to force the conditions during entry destroy. This hook is used by
* DestroyEntryWithConcurrentOperationJUnitTest.
*/
@MutableForTesting
static final Runnable testHookRunnableForConcurrentOperation = null;
private RegionEntryFactory entryFactory;
private Attributes attr;
// the region that owns this map
private RegionMapOwner owner;
private final EntryEventSerialization entryEventSerialization = new EntryEventSerialization();
protected AbstractRegionMap(InternalRegionArguments internalRegionArgs) {
// do nothing
}
protected void initialize(RegionMapOwner owner, Attributes attr,
InternalRegionArguments internalRegionArgs, boolean isLRU) {
_setAttributes(attr);
setOwner(owner);
setEntryMap(createConcurrentMapWithReusableEntries(attr.initialCapacity, attr.loadFactor,
attr.concurrencyLevel, false, new AbstractRegionEntry.HashRegionEntryCreator()));
boolean isDisk;
boolean withVersioning;
boolean offHeap;
if (owner instanceof InternalRegion) {
InternalRegion region = (InternalRegion) owner;
isDisk = region.getDiskRegion() != null;
withVersioning = region.getConcurrencyChecksEnabled();
offHeap = region.getOffHeap();
} else if (owner instanceof PlaceHolderDiskRegion) {
offHeap = ((RegionEntryContext) owner).getOffHeap();
isDisk = true;
withVersioning =
((DiskRegionView) owner).getFlags().contains(DiskRegionFlag.IS_WITH_VERSIONING);
} else {
throw new IllegalStateException("expected LocalRegion or PlaceHolderDiskRegion");
}
setEntryFactory(new RegionEntryFactoryBuilder().create(attr.statisticsEnabled, isLRU, isDisk,
withVersioning, offHeap));
}
private ConcurrentMapWithReusableEntries<Object, Object> createConcurrentMapWithReusableEntries(
int initialCapacity, float loadFactor, int concurrencyLevel, boolean isIdentityMap,
CustomEntryConcurrentHashMap.HashEntryCreator<Object, Object> entryCreator) {
if (entryCreator != null) {
return new CustomEntryConcurrentHashMap<>(initialCapacity, loadFactor, concurrencyLevel,
isIdentityMap, entryCreator);
} else {
return new CustomEntryConcurrentHashMap<>(initialCapacity, loadFactor, concurrencyLevel,
isIdentityMap);
}
}
@Override
public void changeOwner(LocalRegion r) {
if (r == _getOwnerObject()) {
return;
}
setOwner(r);
}
@Override
public void setEntryFactory(RegionEntryFactory f) {
this.entryFactory = f;
}
@Override
public RegionEntryFactory getEntryFactory() {
return this.entryFactory;
}
private void _setAttributes(Attributes a) {
this.attr = a;
}
@Override
public Attributes getAttributes() {
return this.attr;
}
public LocalRegion _getOwner() {
return (LocalRegion) this.owner;
}
boolean _isOwnerALocalRegion() {
return this.owner instanceof LocalRegion;
}
Object _getOwnerObject() {
return this.owner;
}
/**
* Tells this map what region owns it.
*/
private void setOwner(RegionMapOwner owner) {
this.owner = owner;
}
@Override
public ConcurrentMapWithReusableEntries<Object, Object> getCustomEntryConcurrentHashMap() {
return map;
}
@Override
public Map<Object, Object> getEntryMap() {
return map;
}
@Override
public void setEntryMap(ConcurrentMapWithReusableEntries<Object, Object> map) {
this.map = map;
}
@Override
public int size() {
return getEntryMap().size();
}
// this is currently used by stats and eviction
@Override
public int sizeInVM() {
return getEntryMap().size();
}
@Override
public boolean isEmpty() {
return getEntryMap().isEmpty();
}
@Override
public Set keySet() {
return getEntryMap().keySet();
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Collection<RegionEntry> regionEntries() {
return (Collection) getEntryMap().values();
}
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public Collection<RegionEntry> regionEntriesInVM() {
return (Collection) getEntryMap().values();
}
@Override
public boolean containsKey(Object key) {
RegionEntry re = getEntry(key);
if (re == null) {
return false;
}
if (re.isRemoved()) {
return false;
}
return true;
}
@Override
public RegionEntry getEntry(Object key) {
RegionEntry re = (RegionEntry) getEntryMap().get(key);
return re;
}
@Override
public RegionEntry getEntry(EntryEventImpl event) {
return getEntry(event.getKey());
}
@Override
public RegionEntry getEntryInVM(Object key) {
return (RegionEntry) getEntryMap().get(key);
}
@Override
public RegionEntry putEntryIfAbsent(Object key, RegionEntry regionEntry) {
RegionEntry oldRe = (RegionEntry) getEntryMap().putIfAbsent(key, regionEntry);
if (oldRe == null && (regionEntry instanceof OffHeapRegionEntry) && _isOwnerALocalRegion()
&& _getOwner().isThisRegionBeingClosedOrDestroyed()) {
// prevent orphan during concurrent destroy (#48068)
Object v = regionEntry.getValue();
if (v != Token.REMOVED_PHASE1 && v != Token.REMOVED_PHASE2 && v instanceof StoredObject
&& ((StoredObject) v).hasRefCount()) {
if (getEntryMap().remove(key, regionEntry)) {
((OffHeapRegionEntry) regionEntry).release();
}
}
}
return oldRe;
}
@Override
public RegionEntry getOperationalEntryInVM(Object key) {
RegionEntry re = (RegionEntry) getEntryMap().get(key);
return re;
}
@Override
public void removeEntry(Object key, RegionEntry regionEntry, boolean updateStat) {
if (regionEntry.isTombstone() && getEntryMap().get(key) == regionEntry) {
logger.fatal(
"Internal product error: attempt to directly remove a versioned tombstone from region entry map",
new Exception("stack trace"));
return; // can't remove tombstones except from the tombstone sweeper
}
if (getEntryMap().remove(key, regionEntry)) {
regionEntry.removePhase2();
if (updateStat) {
incEntryCount(-1);
}
}
}
@Override
public void removeEntry(Object key, RegionEntry regionEntry, boolean updateStat,
EntryEventImpl event, final InternalRegion internalRegion) {
boolean success = false;
if (regionEntry.isTombstone() && getEntryMap().get(key) == regionEntry) {
logger.fatal(
"Internal product error: attempt to directly remove a versioned tombstone from region entry map",
new Exception("stack trace"));
return; // can't remove tombstones except from the tombstone sweeper
}
if (getEntryMap().remove(key, regionEntry)) {
regionEntry.removePhase2();
success = true;
if (updateStat) {
incEntryCount(-1);
}
}
}
@Override
public void incEntryCount(int delta) {
LocalRegion lr = _getOwner();
if (lr != null) {
CachePerfStats stats = lr.getCachePerfStats();
if (stats != null) {
stats.incEntryCount(delta);
}
}
}
void incClearCount(LocalRegion lr) {
if (lr != null && !(lr instanceof HARegion)) {
CachePerfStats stats = lr.getCachePerfStats();
if (stats != null) {
stats.incClearCount();
}
}
}
private void _mapClear() {
Executor executor = null;
InternalCache cache = this.owner.getCache();
if (cache != null) {
DistributionManager manager = cache.getDistributionManager();
if (manager != null) {
executor = manager.getExecutors().getWaitingThreadPool();
}
}
getCustomEntryConcurrentHashMap().clearWithExecutor(executor);
}
@Override
public void close(BucketRegion bucketRegion) {
clear(null, bucketRegion);
}
/**
* Clear the region and, if an RVV is given, return a collection of the version sources in all
* remaining tags
*/
@Override
public Set<VersionSource> clear(RegionVersionVector rvv, BucketRegion bucketRegion) {
Set<VersionSource> result = new HashSet<VersionSource>();
if (!_isOwnerALocalRegion()) {
// Fix for #41333. Just clear the the map
// if we failed during initialization.
_mapClear();
return null;
}
if (logger.isDebugEnabled()) {
logger.debug("Clearing entries for {} rvv={}", _getOwner(), rvv);
}
LocalRegion lr = _getOwner();
RegionVersionVector localRvv = lr.getVersionVector();
incClearCount(lr);
// lock for size calcs if the region might have tombstones
Object lockObj = lr.getConcurrencyChecksEnabled() ? lr.getSizeGuard() : new Object();
synchronized (lockObj) {
if (rvv == null) {
int delta = 0;
try {
delta = sizeInVM(); // TODO soplog need to determine if stats should
// reflect only size in memory or the complete thing
} catch (GemFireIOException e) {
// ignore rather than throwing an exception during cache close
}
int tombstones = lr.getTombstoneCount();
_mapClear();
_getOwner().updateSizeOnClearRegion(delta - tombstones);
_getOwner().incTombstoneCount(-tombstones);
if (delta != 0) {
incEntryCount(-delta);
}
} else {
int delta = 0;
int tombstones = 0;
VersionSource myId = _getOwner().getVersionMember();
if (localRvv != rvv) {
localRvv.recordGCVersions(rvv);
}
final boolean isTraceEnabled = logger.isTraceEnabled();
for (RegionEntry re : regionEntries()) {
synchronized (re) {
Token value = re.getValueAsToken();
// if it's already being removed or the entry is being created we leave it alone
if (value == Token.REMOVED_PHASE1 || value == Token.REMOVED_PHASE2) {
continue;
}
VersionSource id = re.getVersionStamp().getMemberID();
if (id == null) {
id = myId;
}
if (rvv.contains(id, re.getVersionStamp().getRegionVersion())) {
if (isTraceEnabled) {
logger.trace("region clear op is removing {} {}", re.getKey(),
re.getVersionStamp());
}
boolean tombstone = re.isTombstone();
// note: it.remove() did not reliably remove the entry so we use remove(K,V) here
if (getEntryMap().remove(re.getKey(), re)) {
if (OffHeapRegionEntryHelper.doesClearNeedToCheckForOffHeap()) {
GatewaySenderEventImpl.release(re.getValue()); // OFFHEAP _getValue ok
}
// If this is an overflow only region, we need to free the entry on
// disk at this point.
try {
re.removePhase1(lr, true);
} catch (RegionClearedException e) {
// do nothing, it's already cleared.
}
re.removePhase2();
lruEntryDestroy(re);
if (tombstone) {
_getOwner().incTombstoneCount(-1);
tombstones += 1;
} else {
delta += 1;
}
}
} else { // rvv does not contain this entry so it is retained
result.add(id);
}
}
}
_getOwner().updateSizeOnClearRegion(delta);
incEntryCount(-delta);
incEntryCount(-tombstones);
if (logger.isDebugEnabled()) {
logger.debug("Size after clearing = {}", getEntryMap().size());
}
if (isTraceEnabled && getEntryMap().size() < 20) {
_getOwner().dumpBackingMap();
}
}
}
return result;
}
public void lruUpdateCallback(boolean b) {
// By default do nothing; LRU maps needs to override this method
}
/**
* Tell an LRU that a new entry has been created
*/
@Override
public void lruEntryCreate(RegionEntry e) {
// do nothing by default
}
/**
* Tell an LRU that an existing entry has been destroyed
*/
@Override
public void lruEntryDestroy(RegionEntry regionEntry) {
// do nothing by default
}
/**
* Tell an LRU that an existing entry has been modified
*/
@Override
public void lruEntryUpdate(RegionEntry e) {
// do nothing by default
}
@Override
public void decTxRefCount(RegionEntry e) {
LocalRegion lr = null;
if (_isOwnerALocalRegion()) {
lr = _getOwner();
}
e.decRefCount(null, lr);
}
/**
* Process an incoming version tag for concurrent operation detection. This must be done before
* modifying the region entry.
*
* @param regionEntry the entry that is to be modified
* @param event the modification to the entry
* @throws InvalidDeltaException if the event contains a delta that cannot be applied
* @throws ConcurrentCacheModificationException if the event is in conflict with a previously
* applied change
*/
@Override
public void processVersionTag(RegionEntry regionEntry, EntryEventImpl event) {
if (regionEntry.getVersionStamp() != null) {
regionEntry.getVersionStamp().processVersionTag(event);
// during initialization we record version tag info to detect ops the
// image provider hasn't seen
VersionTag<?> tag = event.getVersionTag();
if (tag != null && !event.getRegion().isInitialized()) {
ImageState is = event.getRegion().getImageState();
if (is != null && !event.getRegion().isUsedForPartitionedRegionBucket()) {
if (logger.isTraceEnabled()) {
logger.trace("recording version tag in image state: {}", tag);
}
is.addVersionTag(event.getKey(), tag);
}
}
}
}
private void processVersionTagForGII(RegionEntry re, LocalRegion owner, VersionTag entryVersion,
boolean isTombstone, InternalDistributedMember sender, boolean checkConflicts) {
re.getVersionStamp().processVersionTag(_getOwner(), entryVersion, isTombstone, false,
owner.getMyId(), sender, checkConflicts);
}
@Override
public void copyRecoveredEntries(RegionMap rm) {
// We need to sort the tombstones before scheduling them,
// so that they will be in the correct order.
OrderedTombstoneMap<RegionEntry> tombstones = new OrderedTombstoneMap<RegionEntry>();
if (rm != null) {
ConcurrentMapWithReusableEntries<Object, Object> other = rm.getCustomEntryConcurrentHashMap();
Iterator<Map.Entry<Object, Object>> it = other.entrySetWithReusableEntries().iterator();
while (it.hasNext()) {
Map.Entry<Object, Object> me = it.next();
it.remove(); // This removes the RegionEntry from "rm" but it does not decrement its
// refcount to an offheap value.
RegionEntry oldRe = (RegionEntry) me.getValue();
Object key = me.getKey();
@Retained
@Released
Object value = oldRe
.getValueRetain((RegionEntryContext) ((AbstractRegionMap) rm)._getOwnerObject(), true);
try {
if (value == Token.NOT_AVAILABLE) {
// fix for bug 43993
value = null;
}
if (value == Token.TOMBSTONE && !_getOwner().getConcurrencyChecksEnabled()) {
continue;
}
RegionEntry newRe =
getEntryFactory().createEntry((RegionEntryContext) _getOwnerObject(), key, value);
// TODO: passing value to createEntry causes a problem with the disk stats.
// The disk stats have already been set to track oldRe.
// So when we call createEntry we probably want to give it REMOVED_PHASE1
// and then set the value in copyRecoveredEntry it a way that does not
// change the disk stats. This also depends on DiskEntry.Helper.initialize not changing
// the stats for REMOVED_PHASE1
copyRecoveredEntry(oldRe, newRe);
// newRe is now in this.getCustomEntryConcurrentHashMap().
if (newRe.isTombstone()) {
VersionTag tag = newRe.getVersionStamp().asVersionTag();
tombstones.put(tag, newRe);
} else {
_getOwner().updateSizeOnCreate(key, _getOwner().calculateRegionEntryValueSize(newRe));
if (_getOwner() instanceof BucketRegionQueue) {
BucketRegionQueue brq = (BucketRegionQueue) _getOwner();
brq.incSecondaryQueueSize(1);
}
}
incEntryCount(1);
lruEntryUpdate(newRe);
} finally {
OffHeapHelper.release(value);
if (oldRe instanceof OffHeapRegionEntry) {
((OffHeapRegionEntry) oldRe).release();
}
}
lruUpdateCallback();
}
} else {
for (Iterator<RegionEntry> iter = regionEntries().iterator(); iter.hasNext();) {
RegionEntry re = iter.next();
if (re.isTombstone()) {
if (re.getVersionStamp() == null) { // bug #50992 - recovery from versioned to
// non-versioned
iter.remove();
continue;
} else {
tombstones.put(re.getVersionStamp().asVersionTag(), re);
}
} else {
_getOwner().updateSizeOnCreate(re.getKey(),
_getOwner().calculateRegionEntryValueSize(re));
if (_getOwner() instanceof BucketRegionQueue) {
BucketRegionQueue brq = (BucketRegionQueue) _getOwner();
brq.incSecondaryQueueSize(1);
}
}
}
incEntryCount(size());
// Since lru was not being done during recovery call it now.
lruUpdateCallback();
}
// Schedule all of the tombstones, now that we have sorted them
Map.Entry<VersionTag, RegionEntry> entry;
while ((entry = tombstones.take()) != null) {
// refresh the tombstone so it doesn't time out too soon
_getOwner().scheduleTombstone(entry.getValue(), entry.getKey());
}
}
private void copyRecoveredEntry(RegionEntry oldRe, RegionEntry newRe) {
if (newRe.getVersionStamp() != null) {
newRe.getVersionStamp().setMemberID(oldRe.getVersionStamp().getMemberID());
newRe.getVersionStamp().setVersions(oldRe.getVersionStamp().asVersionTag());
}
if (newRe instanceof AbstractOplogDiskRegionEntry) {
((AbstractOplogDiskRegionEntry) newRe).setDiskId(oldRe);
_getOwner().getDiskRegion().replaceIncompatibleEntry((DiskEntry) oldRe, (DiskEntry) newRe);
}
getEntryMap().put(newRe.getKey(), newRe);
}
@Override
@Retained // Region entry may contain an off-heap value
public RegionEntry initRecoveredEntry(Object key, DiskEntry.RecoveredEntry value) {
boolean needsCallback = false;
@Retained
RegionEntry newRe =
getEntryFactory().createEntry((RegionEntryContext) _getOwnerObject(), key, value);
synchronized (newRe) {
if (value.getVersionTag() != null && newRe.getVersionStamp() != null) {
newRe.getVersionStamp().setVersions(value.getVersionTag());
}
RegionEntry oldRe = putEntryIfAbsent(key, newRe);
while (oldRe != null) {
synchronized (oldRe) {
if (oldRe.isRemoved() && !oldRe.isTombstone()) {
if (_isOwnerALocalRegion()) {
_getOwner().getCachePerfStats().incRetries();
}
getEntryMap().remove(key, oldRe);
oldRe = putEntryIfAbsent(key, newRe);
}
/*
* Entry already exists which should be impossible. Free the current entry (if off-heap)
* and throw an exception.
*/
else {
if (newRe instanceof OffHeapRegionEntry) {
((OffHeapRegionEntry) newRe).release();
}
throw new IllegalStateException(
"Could not recover entry for key " + key + ". The entry already exists!");
}
} // synchronized
}
if (_isOwnerALocalRegion()) {
if (newRe.isTombstone()) {
// refresh the tombstone so it doesn't time out too soon
_getOwner().scheduleTombstone(newRe, newRe.getVersionStamp().asVersionTag());
} else {
_getOwner().updateSizeOnCreate(key, _getOwner().calculateRegionEntryValueSize(newRe));
}
// incEntryCount is called for a tombstone because scheduleTombstone does entryCount--.
incEntryCount(1); // we are creating an entry that was recovered from disk including
// tombstone
}
lruEntryUpdate(newRe);
needsCallback = true;
}
if (needsCallback) {
lruUpdateCallback();
}
EntryLogger.logRecovery(_getOwnerObject(), key, value);
return newRe;
}
@Override
public RegionEntry updateRecoveredEntry(Object key, DiskEntry.RecoveredEntry value) {
boolean needsCallback = false;
RegionEntry re = getEntry(key);
if (re == null) {
return null;
}
synchronized (re) {
if (re.isRemoved() && !re.isTombstone()) {
return null;
}
if (value.getVersionTag() != null && re.getVersionStamp() != null) {
re.getVersionStamp().setVersions(value.getVersionTag());
}
try {
if (_isOwnerALocalRegion()) {
boolean oldValueWasTombstone = re.isTombstone();
boolean oldIsDestroyedOrRemoved = re.isDestroyedOrRemoved();
if (oldValueWasTombstone) {
// when a tombstone is to be overwritten, unschedule it first
_getOwner().unscheduleTombstone(re);
// unscheduleTombstone incs entryCount which is ok
// because we either set the value after this so that
// the entry exists or we call scheduleTombstone which
// will dec entryCount.
}
final int oldSize = _getOwner().calculateRegionEntryValueSize(re);
re.setValue(_getOwner(), value); // OFFHEAP no need to call
// AbstractRegionMap.prepareValueForCache because
// setValue is overridden for disk and that code takes
// apart value (RecoveredEntry) and prepares its nested
// value for the cache
if (re.isTombstone()) {
_getOwner().scheduleTombstone(re, re.getVersionStamp().asVersionTag());
if (!oldIsDestroyedOrRemoved) {
_getOwner().updateSizeOnRemove(key, oldSize);
}
} else if (oldIsDestroyedOrRemoved) {
_getOwner().updateSizeOnCreate(key, _getOwner().calculateRegionEntryValueSize(re));
} else {
_getOwner().updateSizeOnPut(key, oldSize,
_getOwner().calculateRegionEntryValueSize(re));
}
} else {
value.applyToDiskEntry((PlaceHolderDiskRegion) _getOwnerObject(), (DiskEntry) re,
(RegionEntryContext) _getOwnerObject());
}
} catch (RegionClearedException rce) {
throw new IllegalStateException(
"RegionClearedException should never happen in this context", rce);
}
lruEntryUpdate(re);
needsCallback = true;
}
if (needsCallback) {
lruUpdateCallback();
}
EntryLogger.logRecovery(_getOwnerObject(), key, value);
return re;
}
@Override
public boolean initialImagePut(final Object key, final long lastModified, Object newValue,
final boolean wasRecovered, boolean deferLRUCallback,
VersionTag entryVersion,
InternalDistributedMember sender, boolean isSynchronizing) {
boolean result = false;
boolean done = false;
boolean cleared = false;
final LocalRegion owner = _getOwner();
boolean acceptedVersionTag = entryVersion != null && owner.getConcurrencyChecksEnabled();
if (newValue == Token.TOMBSTONE && !owner.getConcurrencyChecksEnabled()) {
return false;
}
if (owner instanceof HARegion && newValue instanceof CachedDeserializable) {
newValue = ((HARegion) owner).updateHAEventWrapper(sender, (CachedDeserializable) newValue);
if (newValue == null) {
return false;
}
}
try {
RegionEntry newRe = getEntryFactory().createEntry(owner, key, Token.REMOVED_PHASE1);
RegionEntry oldRe = null;
synchronized (newRe) {
try {
oldRe = putEntryIfAbsent(key, newRe);
while (!done && oldRe != null) {
synchronized (oldRe) {
if (oldRe.isRemovedPhase2()) {
owner.getCachePerfStats().incRetries();
getEntryMap().remove(key, oldRe);
oldRe = putEntryIfAbsent(key, newRe);
} else {
if (acceptedVersionTag) {
Assert.assertTrue(entryVersion.getMemberID() != null,
"GII entry versions must have identifiers");
boolean isTombstone = (newValue == Token.TOMBSTONE);
// don't reschedule the tombstone if it hasn't changed
boolean isSameTombstone = oldRe.isTombstone() && isTombstone
&& oldRe.getVersionStamp().asVersionTag().equals(entryVersion);
if (isSameTombstone) {
return true;
}
processVersionTagForGII(oldRe, owner, entryVersion, isTombstone, sender,
!wasRecovered || isSynchronizing);
}
final boolean oldIsTombstone = oldRe.isTombstone();
final boolean oldIsDestroyedOrRemoved = oldRe.isDestroyedOrRemoved();
final int oldSize = owner.calculateRegionEntryValueSize(oldRe);
if (owner.getIndexManager() != null) {
// Due to having no reverse map, we need to be able to generate the oldkey
// before doing an update
// Without the BEFORE_UPDATE_OP, we would see duplicate entries in the index
// as the update could not locate the old key
if (!oldRe.isRemoved()) {
owner.getIndexManager().updateIndexes(oldRe, IndexManager.REMOVE_ENTRY,
IndexProtocol.BEFORE_UPDATE_OP);
}
}
result = oldRe.initialImagePut(owner, lastModified, newValue, wasRecovered,
acceptedVersionTag);
if (result) {
if (oldIsTombstone) {
owner.unscheduleTombstone(oldRe);
if (newValue != Token.TOMBSTONE) {
lruEntryCreate(oldRe);
} else {
lruEntryUpdate(oldRe);
}
}
if (newValue == Token.TOMBSTONE) {
if (!oldIsDestroyedOrRemoved) {
owner.updateSizeOnRemove(key, oldSize);
}
if (owner.getServerProxy() == null
&& owner.getVersionVector().isTombstoneTooOld(
entryVersion.getMemberID(), entryVersion.getRegionVersion())) {
// the received tombstone has already been reaped, so don't retain it
if (owner.getIndexManager() != null) {
owner.getIndexManager().updateIndexes(oldRe, IndexManager.REMOVE_ENTRY,
IndexProtocol.REMOVE_DUE_TO_GII_TOMBSTONE_CLEANUP);
}
removeTombstone(oldRe, entryVersion, false, false);
return false;
} else {
owner.scheduleTombstone(oldRe, entryVersion);
lruEntryDestroy(oldRe);
}
} else {
int newSize = owner.calculateRegionEntryValueSize(oldRe);
if (!oldIsTombstone) {
owner.updateSizeOnPut(key, oldSize, newSize);
} else {
owner.updateSizeOnCreate(key, newSize);
}
EntryLogger.logInitialImagePut(_getOwnerObject(), key, newValue);
}
}
if (owner.getIndexManager() != null) {
// if existing/current re is a tombstone - note oldRe at this point is currentRe
if (oldRe.isRemoved()) {
owner.getIndexManager().updateIndexes(oldRe, IndexManager.REMOVE_ENTRY,
IndexProtocol.REMOVE_DUE_TO_GII_TOMBSTONE_CLEANUP);
} else {
owner.getIndexManager().updateIndexes(oldRe,
oldIsDestroyedOrRemoved ? IndexManager.ADD_ENTRY
: IndexManager.UPDATE_ENTRY,
oldIsDestroyedOrRemoved ? IndexProtocol.OTHER_OP
: IndexProtocol.AFTER_UPDATE_OP);
}
}
done = true;
}
}
}
if (!done) {
if (acceptedVersionTag) {
Assert.assertTrue(entryVersion.getMemberID() != null,
"GII entry versions must have identifiers");
boolean isTombstone = (newValue == Token.TOMBSTONE);
processVersionTagForGII(newRe, owner, entryVersion, isTombstone, sender,
!wasRecovered || isSynchronizing);
}
result = newRe.initialImageInit(owner, lastModified, newValue, true, wasRecovered,
acceptedVersionTag);
if (result) {
if (newValue == Token.TOMBSTONE) {
owner.scheduleTombstone(newRe, entryVersion);
} else {
owner.updateSizeOnCreate(key, owner.calculateRegionEntryValueSize(newRe));
EntryLogger.logInitialImagePut(_getOwnerObject(), key, newValue);
lruEntryCreate(newRe);
}
incEntryCount(1);
}
// Update local indexes
if (owner.getIndexManager() != null && !newRe.isRemoved()) {
owner.getIndexManager().updateIndexes(newRe,
IndexManager.ADD_ENTRY,
IndexProtocol.OTHER_OP);
}
done = true;
}
} catch (ConcurrentCacheModificationException e) {
// We do not want to do any clean up of indexes because it is assumed that
// the cause of the concurrent modification would have updated the indexes appropriately
return false;
} finally {
if (done && result) {
if (owner instanceof BucketRegionQueue) {
BucketRegionQueue brq = (BucketRegionQueue) owner;
brq.addToEventQueue(key, done, null);
}
}
if (!done) {
removeEntry(key, newRe, false);
// Update local indexes
if (owner.getIndexManager() != null && !newRe.isRemoved()) {
// attempt to clean up any thread local state,
// not intended to actually do any removal
try {
owner.getIndexManager().updateIndexes(newRe,
IndexManager.REMOVE_ENTRY,
IndexProtocol.CLEAN_UP_THREAD_LOCALS);
} catch (QueryException qe) {
logger.info("Unable to clean up thread locals for indexes", qe);
}
}
}
}
} // synchronized
} catch (RegionClearedException rce) {
done = false;
cleared = true;
} catch (QueryException qe) {
done = false;
cleared = true;
} finally {
if (done && !deferLRUCallback) {
lruUpdateCallback();
} else if (!cleared) {
resetThreadLocals();
}
}
return result;
}
@Override
public boolean confirmEvictionDestroy(RegionEntry regionEntry) {
/* We arn't in an LRU context, and should never get here */
Assert.assertTrue(false, "Not an LRU region, can not confirm LRU eviction operation");
return true;
}
@Override
public boolean destroy(EntryEventImpl event, boolean inTokenMode, boolean duringRI,
boolean cacheWrite, boolean isEviction, Object expectedOldValue, boolean removeRecoveredEntry)
throws CacheWriterException, EntryNotFoundException, TimeoutException {
RegionMapDestroy regionMapDestroy = new RegionMapDestroy((InternalRegion) owner, this, this);
return regionMapDestroy.destroy(event, inTokenMode, duringRI, cacheWrite, isEviction,
expectedOldValue, removeRecoveredEntry);
}
@Override
public void txApplyDestroy(Object key, TransactionId txId, TXRmtEvent txEvent,
boolean inTokenMode, boolean inRI, Operation op, EventID eventId, Object aCallbackArgument,
List<EntryEventImpl> pendingCallbacks, FilterRoutingInfo filterRoutingInfo,
ClientProxyMembershipID bridgeContext, boolean isOriginRemote, TXEntryState txEntryState,
VersionTag versionTag, long tailKey) {
assert pendingCallbacks != null;
final boolean isDebugEnabled = logger.isDebugEnabled();
final LocalRegion owner = _getOwner();
final boolean isRegionReady = !inTokenMode;
final boolean hasRemoteOrigin = !txId.getMemberId().equals(owner.getMyId());
boolean callbackEventAddedToPending = false;
IndexManager oqlIndexManager = owner.getIndexManager();
final boolean locked = owner.lockWhenRegionIsInitializing();
try {
RegionEntry re = getEntry(key);
if (re != null) {
// Fix for Bug #44431. We do NOT want to update the region and wait
// later for index INIT as region.clear() can cause inconsistency if
// happened in parallel as it also does index INIT.
if (oqlIndexManager != null) {
oqlIndexManager.waitForIndexInit();
}
try {
synchronized (re) {
if (!re.isRemoved() || re.isTombstone()) {
Object oldValue = re.getValueInVM(owner);
final int oldSize = owner.calculateRegionEntryValueSize(re);
final boolean wasDestroyedOrRemoved = re.isDestroyedOrRemoved();
// Create an entry event only if the calling context is
// a receipt of a TXCommitMessage AND there are callbacks installed
// for this region
@Released
final EntryEventImpl callbackEvent = txCallbackEventFactory
.createCallbackEvent(owner, op, key, null, txId,
txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext,
txEntryState, versionTag, tailKey);
try {
if (owner.isUsedForPartitionedRegionBucket()) {
txHandleWANEvent(owner, callbackEvent, txEntryState);
}
callbackEvent.setRegionEntry(re);
callbackEvent.setOldValue(oldValue);
if (isDebugEnabled) {
logger.debug("txApplyDestroy callbackEvent={}", callbackEvent);
}
txRemoveOldIndexEntry(Operation.DESTROY, re);
if (txEvent != null) {
txEvent.addDestroy(owner, re, re.getKey(), aCallbackArgument);
}
boolean clearOccured = false;
try {
processAndGenerateTXVersionTag(callbackEvent, re, txEntryState);
if (inTokenMode) {
if (oldValue == Token.TOMBSTONE) {
owner.unscheduleTombstone(re);
}
re.setValue(owner, Token.DESTROYED);
} else {
if (!re.isTombstone()) {
{
if (owner.getConcurrencyChecksEnabled()
&& callbackEvent.getVersionTag() != null) {
re.makeTombstone(owner, callbackEvent.getVersionTag());
} else {
re.removePhase1(owner, false); // fix for bug 43063
re.removePhase2();
removeEntry(key, re, false);
}
}
} else {
owner.rescheduleTombstone(re, re.getVersionStamp().asVersionTag());
}
}
EntryLogger.logTXDestroy(_getOwnerObject(), key);
if (!wasDestroyedOrRemoved) {
owner.updateSizeOnRemove(key, oldSize);
}
} catch (RegionClearedException rce) {
clearOccured = true;
}
owner.txApplyDestroyPart2(re, re.getKey(), inTokenMode,
clearOccured /* Clear Conflciting with the operation */,
wasDestroyedOrRemoved);
boolean invokeCallbacks = shouldInvokeCallbacks(owner, isRegionReady || inRI);
if (invokeCallbacks) {
switchEventOwnerAndOriginRemote(callbackEvent, hasRemoteOrigin);
pendingCallbacks.add(callbackEvent);
callbackEventAddedToPending = true;
}
if (!clearOccured) {
lruEntryDestroy(re);
}
if (owner.getConcurrencyChecksEnabled() && txEntryState != null) {
txEntryState.setVersionTag(callbackEvent.getVersionTag());
}
} finally {
if (!callbackEventAddedToPending)
releaseEvent(callbackEvent);
}
}
}
} finally {
if (oqlIndexManager != null) {
oqlIndexManager.countDownIndexUpdaters();
}
}
} else if (inTokenMode || owner.getConcurrencyChecksEnabled()) {
// treating tokenMode and re == null as same, since we now want to
// generate versions and Tombstones for destroys
boolean dispatchListenerEvent = inTokenMode;
boolean opCompleted = false;
RegionEntry newRe = getEntryFactory().createEntry(owner, key, Token.REMOVED_PHASE1);
if (oqlIndexManager != null) {
oqlIndexManager.waitForIndexInit();
}
EntryEventImpl callbackEvent = null;
try {
synchronized (newRe) {
RegionEntry oldRe = putEntryIfAbsent(key, newRe);
while (!opCompleted && oldRe != null) {
synchronized (oldRe) {
if (oldRe.isRemovedPhase2()) {
owner.getCachePerfStats().incRetries();
getEntryMap().remove(key, oldRe);
oldRe = putEntryIfAbsent(key, newRe);
} else {
try {
boolean invokeCallbacks = shouldInvokeCallbacks(owner, isRegionReady || inRI);
callbackEvent = txCallbackEventFactory
.createCallbackEvent(owner, op, key, null, txId, txEvent,
eventId, aCallbackArgument, filterRoutingInfo, bridgeContext,
txEntryState,
versionTag, tailKey);
try {
callbackEvent.setRegionEntry(oldRe);
callbackEvent.setOldValue(Token.NOT_AVAILABLE);
if (isDebugEnabled) {
logger.debug("txApplyDestroy token mode callbackEvent={}", callbackEvent);
}
if (owner.isUsedForPartitionedRegionBucket()) {
txHandleWANEvent(owner, callbackEvent, txEntryState);
}
processAndGenerateTXVersionTag(callbackEvent, oldRe, txEntryState);
if (invokeCallbacks) {
switchEventOwnerAndOriginRemote(callbackEvent, hasRemoteOrigin);
pendingCallbacks.add(callbackEvent);
callbackEventAddedToPending = true;
}
int oldSize = 0;
boolean wasTombstone = oldRe.isTombstone();
boolean wasDestroyedOrRemoved = oldRe.isDestroyedOrRemoved();
{
if (!wasTombstone) {
oldSize = owner.calculateRegionEntryValueSize(oldRe);
}
}
// TODO: Token.DESTROYED should only be used if "inTokenMode".
// Otherwise this should be a TOMBSTONE
oldRe.setValue(owner, Token.DESTROYED);
EntryLogger.logTXDestroy(_getOwnerObject(), key);
if (wasTombstone) {
owner.unscheduleTombstone(oldRe);
}
if (!wasDestroyedOrRemoved) {
owner.updateSizeOnRemove(oldRe.getKey(), oldSize);
}
owner.txApplyDestroyPart2(oldRe, oldRe.getKey(), inTokenMode,
false /* Clear Conflicting with the operation */, wasDestroyedOrRemoved);
lruEntryDestroy(oldRe);
} finally {
if (!callbackEventAddedToPending)
releaseEvent(callbackEvent);
}
} catch (RegionClearedException rce) {
owner.txApplyDestroyPart2(oldRe, oldRe.getKey(), inTokenMode,
true /* Clear Conflicting with the operation */, true);
}
if (owner.getConcurrencyChecksEnabled()
&& callbackEvent.getVersionTag() != null) {
oldRe.makeTombstone(owner, callbackEvent.getVersionTag());
} else if (!inTokenMode) {
// only remove for NORMAL regions if they do not generate versions see 51781
oldRe.removePhase1(owner, false); // fix for bug 43063
oldRe.removePhase2();
removeEntry(key, oldRe, false);
}
opCompleted = true;
}
}
}
if (!opCompleted) {
opCompleted = true;
boolean invokeCallbacks = shouldInvokeCallbacks(owner, isRegionReady || inRI);
callbackEvent = txCallbackEventFactory
.createCallbackEvent(owner, op, key, null, txId, txEvent, eventId,
aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag,
tailKey);
try {
callbackEvent.setRegionEntry(newRe);
callbackEvent.setOldValue(Token.NOT_AVAILABLE);
if (isDebugEnabled) {
logger.debug("txApplyDestroy token mode callbackEvent={}", callbackEvent);
}
if (owner.isUsedForPartitionedRegionBucket()) {
txHandleWANEvent(owner, callbackEvent, txEntryState);
}
processAndGenerateTXVersionTag(callbackEvent, newRe, txEntryState);
if (invokeCallbacks) {
switchEventOwnerAndOriginRemote(callbackEvent, hasRemoteOrigin);
pendingCallbacks.add(callbackEvent);
callbackEventAddedToPending = true;
}
EntryLogger.logTXDestroy(_getOwnerObject(), key);
if (owner.getConcurrencyChecksEnabled() && callbackEvent.getVersionTag() != null) {
newRe.makeTombstone(owner, callbackEvent.getVersionTag());
} else if (!inTokenMode) {
// only remove for NORMAL regions if they do not generate versions see 51781
newRe.removePhase1(owner, false); // fix for bug 43063
newRe.removePhase2();
removeEntry(key, newRe, false);
} else {
newRe.setValue(owner, Token.DESTROYED);
}
owner.txApplyDestroyPart2(newRe, newRe.getKey(), inTokenMode,
false /* clearConflict */, true);
// Note no need for LRU work since the entry is destroyed
// and will be removed when gii completes
} finally {
if (!callbackEventAddedToPending)
releaseEvent(callbackEvent);
}
}
if (owner.getConcurrencyChecksEnabled() && txEntryState != null) {
txEntryState.setVersionTag(callbackEvent.getVersionTag());
}
}
} catch (RegionClearedException e) {
// TODO
} finally {
if (oqlIndexManager != null) {
oqlIndexManager.countDownIndexUpdaters();
}
}
} else { // re == null
// Fix bug#43594
// In cases where bucket region is re-created, it may so happen that
// the destroy is already applied on the Initial image provider, thus
// causing region entry to be absent.
// Notify clients with client events.
@Released
EntryEventImpl callbackEvent =
txCallbackEventFactory
.createCallbackEvent(owner, op, key, null, txId, txEvent, eventId,
aCallbackArgument,
filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
try {
if (owner.isUsedForPartitionedRegionBucket()) {
txHandleWANEvent(owner, callbackEvent, txEntryState);
}
switchEventOwnerAndOriginRemote(callbackEvent, hasRemoteOrigin);
pendingCallbacks.add(callbackEvent);
callbackEventAddedToPending = true;
} finally {
if (!callbackEventAddedToPending)
releaseEvent(callbackEvent);
}
}
} catch (DiskAccessException dae) {
owner.handleDiskAccessException(dae);
throw dae;
} finally {
if (locked) {
owner.unlockWhenRegionIsInitializing();
}
}
}
void releaseEvent(final EntryEventImpl event) {
event.release();
}
@Override
public boolean invalidate(EntryEventImpl event, boolean invokeCallbacks, boolean forceNewEntry,
boolean forceCallbacks) throws EntryNotFoundException {
final boolean isDebugEnabled = logger.isDebugEnabled();
final LocalRegion owner = _getOwner();
if (owner == null) {
// "fix" for bug 32440
Assert.assertTrue(false, "The owner for RegionMap " + this + " is null for event " + event);
}
logger.debug("ARM.invalidate invoked for key {}", event.getKey());
boolean didInvalidate = false;
RegionEntry invalidatedRe = null;
boolean clearOccured = false;
DiskRegion dr = owner.getDiskRegion();
boolean ownerIsInitialized = owner.isInitialized();
// Fix for Bug #44431. We do NOT want to update the region and wait
// later for index INIT as region.clear() can cause inconsistency if
// happened in parallel as it also does index INIT.
IndexManager oqlIndexManager = owner.getIndexManager();
if (oqlIndexManager != null) {
oqlIndexManager.waitForIndexInit();
}
lockForCacheModification(owner, event);
final boolean locked = owner.lockWhenRegionIsInitializing();
try {
try {
try {
if (forceNewEntry || forceCallbacks) {
boolean opCompleted = false;
RegionEntry newRe =
getEntryFactory().createEntry(owner, event.getKey(), Token.REMOVED_PHASE1);
synchronized (newRe) {
try {
RegionEntry oldRe = putEntryIfAbsent(event.getKey(), newRe);
while (!opCompleted && oldRe != null) {
synchronized (oldRe) {
// if the RE is in phase 2 of removal, it will really be removed
// from the map. Otherwise, we can use it here and the thread
// that is destroying the RE will see the invalidation and not
// proceed to phase 2 of removal.
if (oldRe.isRemovedPhase2()) {
owner.getCachePerfStats().incRetries();
getEntryMap().remove(event.getKey(), oldRe);
oldRe = putEntryIfAbsent(event.getKey(), newRe);
} else {
opCompleted = true;
event.setRegionEntry(oldRe);
if (oldRe.isDestroyed()) {
if (isDebugEnabled) {
logger.debug(
"mapInvalidate: Found DESTROYED token, not invalidated; key={}",
event.getKey());
}
} else if (oldRe.isInvalid()) {
// was already invalid, do not invoke listeners or increment stat
handleAlreadyInvalidEntry(event, owner, oldRe);
try {
oldRe.setValue(owner, oldRe.getValueInVM(owner)); // OFFHEAP noop setting
// an already invalid to
// invalid; No need to
// call
// prepareValueForCache
// since it is an
// invalid token.
} catch (RegionClearedException e) {
// that's okay - when writing an invalid into a disk, the
// region has been cleared (including this token)
}
} else {
owner.serverInvalidate(event);
if (owner.getConcurrencyChecksEnabled()
&& event.noVersionReceivedFromServer()) {
// server did not perform the invalidation, so don't leave an invalid
// entry here
return false;
}
final int oldSize = owner.calculateRegionEntryValueSize(oldRe);
// added for cq which needs old value. rdubey
FilterProfile fp = owner.getFilterProfile();
if (!oldRe.isRemoved() && (fp != null && fp.getCqCount() > 0)) {
Object oldValue = oldRe.getValueInVM(owner); // OFFHEAP EntryEventImpl
// oldValue
// this will not fault in the value.
if (oldValue == Token.NOT_AVAILABLE) {
event.setOldValue(oldRe.getValueOnDiskOrBuffer(owner));
} else {
event.setOldValue(oldValue);
}
}
boolean isCreate = false;
try {
if (oldRe.isRemoved()) {
processVersionTag(oldRe, event);
event.putNewEntry(owner, oldRe);
EntryLogger.logInvalidate(event);
owner.recordEvent(event);
if (!oldRe.isTombstone()) {
owner.updateSizeOnPut(event.getKey(), oldSize,
event.getNewValueBucketSize());
} else {
owner.updateSizeOnCreate(event.getKey(),
event.getNewValueBucketSize());
isCreate = true;
}
} else {
processVersionTag(oldRe, event);
event.putExistingEntry(owner, oldRe);
EntryLogger.logInvalidate(event);
owner.recordEvent(event);
owner.updateSizeOnPut(event.getKey(), oldSize,
event.getNewValueBucketSize());
}
} catch (RegionClearedException e) {
// generate versionTag for the event
EntryLogger.logInvalidate(event);
owner.recordEvent(event);
clearOccured = true;
}
owner.basicInvalidatePart2(oldRe, event,
clearOccured /* conflict with clear */, invokeCallbacks);
if (!clearOccured) {
if (isCreate) {
lruEntryCreate(oldRe);
} else {
lruEntryUpdate(oldRe);
}
}
didInvalidate = true;
invalidatedRe = oldRe;
}
}
} // synchronized oldRe
} // while oldRe exists
if (!opCompleted) {
if (forceNewEntry && event.isFromServer()) {
// don't invoke listeners - we didn't force new entries for
// CCU invalidations before 7.0, and listeners don't care
if (!FORCE_INVALIDATE_EVENT) {
event.inhibitCacheListenerNotification(true);
}
}
event.setRegionEntry(newRe);
owner.serverInvalidate(event);
if (!forceNewEntry && event.noVersionReceivedFromServer()) {
// server did not perform the invalidation, so don't leave an invalid
// entry here
return false;
}
try {
ownerIsInitialized = owner.isInitialized();
if (!ownerIsInitialized && owner.getDataPolicy().withReplication()) {
final int oldSize = owner.calculateRegionEntryValueSize(newRe);
invalidateEntry(event, newRe, oldSize);
} else {
invalidateNewEntry(event, owner, newRe);
}
} catch (RegionClearedException e) {
// TODO: deltaGII: do we even need RegionClearedException?
// generate versionTag for the event
owner.recordEvent(event);
clearOccured = true;
}
owner.basicInvalidatePart2(newRe, event, clearOccured /* conflict with clear */,
invokeCallbacks);
if (!clearOccured) {
lruEntryCreate(newRe);
incEntryCount(1);
}
opCompleted = true;
didInvalidate = true;
invalidatedRe = newRe;
// Don't leave an entry in the cache, if we
// just wanted to force the distribution and events
// for this invalidate
if (!forceNewEntry) {
removeEntry(event.getKey(), newRe, false);
}
} // !opCompleted
} catch (ConcurrentCacheModificationException ccme) {
VersionTag tag = event.getVersionTag();
if (tag != null && tag.isTimeStampUpdated()) {
// Notify gateways of new time-stamp.
owner.notifyTimestampsToGateways(event);
}
throw ccme;
} finally {
if (!opCompleted) {
removeEntry(event.getKey(), newRe, false);
}
}
} // synchronized newRe
} // forceNewEntry
else { // !forceNewEntry
boolean retry = true;
// RegionEntry retryEntry = null;
// int retries = -1;
while (retry) {
retry = false;
boolean entryExisted = false;
RegionEntry re = getEntry(event.getKey());
RegionEntry tombstone = null;
boolean haveTombstone = false;
if (re != null && re.isTombstone()) {
tombstone = re;
haveTombstone = true;
re = null;
}
if (re == null) {
ownerIsInitialized = owner.isInitialized();
if (!ownerIsInitialized) {
// when GII message arrived or processed later than invalidate
// message, the entry should be created as placeholder
RegionEntry newRe = haveTombstone ? tombstone
: getEntryFactory().createEntry(owner, event.getKey(), Token.INVALID);
synchronized (newRe) {
if (haveTombstone && !tombstone.isTombstone()) {
// state of the tombstone has changed so we need to retry
retry = true;
// retryEntry = tombstone; // leave this in place for debugging
continue;
}
re = putEntryIfAbsent(event.getKey(), newRe);
if (re == tombstone) {
re = null; // pretend we don't have an entry
}
}
} else if (owner.getServerProxy() != null) {
Object sync = haveTombstone ? tombstone : new Object();
synchronized (sync) {
if (haveTombstone && !tombstone.isTombstone()) {
// bug 45295: state of the tombstone has changed so we need to retry
retry = true;
// retryEntry = tombstone; // leave this in place for debugging
continue;
}
// bug #43287 - send event to server even if it's not in the client (LRU may
// have evicted it)
owner.serverInvalidate(event);
if (owner.getConcurrencyChecksEnabled()) {
if (event.getVersionTag() == null) {
// server did not perform the invalidation, so don't leave an invalid
// entry here
return false;
} else if (tombstone != null) {
processVersionTag(tombstone, event);
try {
tombstone.setValue(owner, Token.TOMBSTONE);
} catch (RegionClearedException e) {
// that's okay - when writing a tombstone into a disk, the
// region has been cleared (including this tombstone)
} catch (ConcurrentCacheModificationException ccme) {
VersionTag tag = event.getVersionTag();
if (tag != null && tag.isTimeStampUpdated()) {
// Notify gateways of new time-stamp.
owner.notifyTimestampsToGateways(event);
}
throw ccme;
}
// update the tombstone's version to prevent an older CCU/putAll from
// overwriting it
owner.rescheduleTombstone(tombstone, event.getVersionTag());
}
}
}
entryExisted = true;
}
}
if (re != null) {
// Gester: Race condition in GII
// when adding the placeholder for invalidate entry during GII,
// if the GII got processed earlier for this entry, then do
// normal invalidate operation
synchronized (re) {
if (!event.isOriginRemote() && event.getOperation().isExpiration()) {
// If this expiration started locally then only do it if the RE is not being
// used by a tx.
if (re.isInUseByTransaction()) {
return false;
}
}
if (re.isTombstone() || (!re.isRemoved() && !re.isDestroyed())) {
entryExisted = true;
if (re.isInvalid()) {
// was already invalid, do not invoke listeners or increment
// stat
handleAlreadyInvalidEntry(event, owner, re);
} else { // previous value not invalid
event.setRegionEntry(re);
owner.serverInvalidate(event);
if (owner.getConcurrencyChecksEnabled()
&& event.noVersionReceivedFromServer()) {
// server did not perform the invalidation, so don't leave an invalid
// entry here
if (isDebugEnabled) {
logger.debug(
"returning early because server did not generate a version stamp for this event:{}",
event);
}
return false;
}
// in case of overflow to disk we need the old value for cqs.
if (owner.getFilterProfile().getCqCount() > 0) {
// use to be getValue and can cause dead lock rdubey.
if (re.isValueNull()) {
event.setOldValue(re.getValueOnDiskOrBuffer(owner));
} else {
Object v = re.getValueInVM(owner);
event.setOldValue(v); // OFFHEAP escapes to EntryEventImpl oldValue
}
}
final boolean oldWasTombstone = re.isTombstone();
final int oldSize = _getOwner().calculateRegionEntryValueSize(re);
try {
invalidateEntry(event, re, oldSize);
} catch (RegionClearedException rce) {
// generate versionTag for the event
EntryLogger.logInvalidate(event);
_getOwner().recordEvent(event);
clearOccured = true;
} catch (ConcurrentCacheModificationException ccme) {
VersionTag tag = event.getVersionTag();
if (tag != null && tag.isTimeStampUpdated()) {
// Notify gateways of new time-stamp.
owner.notifyTimestampsToGateways(event);
}
throw ccme;
}
owner.basicInvalidatePart2(re, event, clearOccured /* conflict with clear */,
invokeCallbacks);
if (!clearOccured) {
if (oldWasTombstone) {
lruEntryCreate(re);
} else {
lruEntryUpdate(re);
}
}
didInvalidate = true;
invalidatedRe = re;
} // previous value not invalid
}
} // synchronized re
} // re != null
else {
// At this point, either it's not in GII mode, or the placeholder
// is in region, do nothing
}
if (!entryExisted) {
owner.checkEntryNotFound(event.getKey());
}
} // while(retry)
} // !forceNewEntry
} catch (DiskAccessException dae) {
invalidatedRe = null;
didInvalidate = false;
this._getOwner().handleDiskAccessException(dae);
throw dae;
} finally {
if (oqlIndexManager != null) {
oqlIndexManager.countDownIndexUpdaters();
}
if (invalidatedRe != null) {
owner.basicInvalidatePart3(invalidatedRe, event, invokeCallbacks);
}
if (didInvalidate && !clearOccured) {
try {
lruUpdateCallback();
} catch (DiskAccessException dae) {
this._getOwner().handleDiskAccessException(dae);
throw dae;
}
} else if (!didInvalidate) {
resetThreadLocals();
}
}
return didInvalidate;
} finally {
if (ownerIsInitialized) {
forceInvalidateEvent(event, owner);
}
}
} finally {
if (locked) {
owner.unlockWhenRegionIsInitializing();
}
releaseCacheModificationLock(owner, event);
}
}
/**
* If an entry is already invalid we still want to perform a conflict check, update
* the entry's version stamp and invoke listeners.
*/
private void handleAlreadyInvalidEntry(EntryEventImpl event, LocalRegion owner, RegionEntry re) {
if (logger.isDebugEnabled()) {
logger.debug("Invalidate: Entry already invalid: '{}'", event.getKey());
}
processVersionTag(re, event);
if (owner.getConcurrencyChecksEnabled() && event.hasValidVersionTag()) {
// notify clients so they can update their version stamps
event.invokeCallbacks(owner, true, true);
}
}
private void invalidateNewEntry(EntryEventImpl event, final LocalRegion owner, RegionEntry newRe)
throws RegionClearedException {
processVersionTag(newRe, event);
event.putNewEntry(owner, newRe);
owner.recordEvent(event);
owner.updateSizeOnCreate(event.getKey(), event.getNewValueBucketSize());
}
protected void invalidateEntry(EntryEventImpl event, RegionEntry re, int oldSize)
throws RegionClearedException {
processVersionTag(re, event);
event.putExistingEntry(_getOwner(), re);
EntryLogger.logInvalidate(event);
_getOwner().recordEvent(event);
_getOwner().updateSizeOnPut(event.getKey(), oldSize, event.getNewValueBucketSize());
}
/*
* (non-Javadoc)
*
* @see
* org.apache.geode.internal.cache.RegionMap#updateEntryVersion(org.apache.geode.internal.cache.
* EntryEventImpl)
*/
@Override
public void updateEntryVersion(EntryEventImpl event) throws EntryNotFoundException {
final LocalRegion owner = _getOwner();
if (owner == null) {
// "fix" for bug 32440
Assert.assertTrue(false, "The owner for RegionMap " + this + " is null for event " + event);
}
DiskRegion dr = owner.getDiskRegion();
if (dr != null) {
dr.setClearCountReference();
}
lockForCacheModification(owner, event);
final boolean locked = owner.lockWhenRegionIsInitializing();
try {
RegionEntry re = getEntry(event.getKey());
boolean entryExisted = false;
if (re != null) {
// process version tag
synchronized (re) {
try {
if (re.isTombstone() || (!re.isRemoved() && !re.isDestroyed())) {
entryExisted = true;
}
processVersionTag(re, event);
owner.generateAndSetVersionTag(event, re);
EntryLogger.logUpdateEntryVersion(event);
_getOwner().recordEvent(event);
} catch (ConcurrentCacheModificationException ccme) {
// Do nothing.
}
}
}
if (!entryExisted) {
owner.checkEntryNotFound(event.getKey());
}
} catch (DiskAccessException dae) {
this._getOwner().handleDiskAccessException(dae);
throw dae;
} finally {
if (locked) {
owner.unlockWhenRegionIsInitializing();
}
releaseCacheModificationLock(owner, event);
if (dr != null) {
dr.removeClearCountReference();
}
}
}
@Override
public void txApplyInvalidate(Object key, Object newValue, boolean didDestroy, TransactionId txId,
TXRmtEvent txEvent, boolean localOp, EventID eventId, Object aCallbackArgument,
List<EntryEventImpl> pendingCallbacks, FilterRoutingInfo filterRoutingInfo,
ClientProxyMembershipID bridgeContext, TXEntryState txEntryState, VersionTag versionTag,
long tailKey) {
assert pendingCallbacks != null;
// boolean didInvalidate = false;
final LocalRegion owner = _getOwner();
@Released
EntryEventImpl callbackEvent = null;
boolean forceNewEntry = !owner.isInitialized() && owner.isAllEvents();
final boolean hasRemoteOrigin = !((TXId) txId).getMemberId().equals(owner.getMyId());
DiskRegion dr = owner.getDiskRegion();
// Fix for Bug #44431. We do NOT want to update the region and wait
// later for index INIT as region.clear() can cause inconsistency if
// happened in parallel as it also does index INIT.
IndexManager oqlIndexManager = owner.getIndexManager();
if (oqlIndexManager != null) {
oqlIndexManager.waitForIndexInit();
}
final boolean locked = owner.lockWhenRegionIsInitializing();
try {
if (forceNewEntry) {
boolean opCompleted = false;
RegionEntry newRe = getEntryFactory().createEntry(owner, key, Token.REMOVED_PHASE1);
synchronized (newRe) {
try {
RegionEntry oldRe = putEntryIfAbsent(key, newRe);
while (!opCompleted && oldRe != null) {
synchronized (oldRe) {
if (oldRe.isRemovedPhase2()) {
owner.getCachePerfStats().incRetries();
getEntryMap().remove(key, oldRe);
oldRe = putEntryIfAbsent(key, newRe);
} else {
opCompleted = true;
final boolean oldWasTombstone = oldRe.isTombstone();
final int oldSize = owner.calculateRegionEntryValueSize(oldRe);
Object oldValue = oldRe.getValueInVM(owner); // OFFHEAP eei
// Create an entry event only if the calling context is
// a receipt of a TXCommitMessage AND there are callbacks
// installed
// for this region
boolean invokeCallbacks = shouldInvokeCallbacks(owner, owner.isInitialized());
boolean callbackEventInPending = false;
callbackEvent = txCallbackEventFactory.createCallbackEvent(owner,
localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE, key, newValue,
txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext,
txEntryState, versionTag, tailKey);
try {
callbackEvent.setRegionEntry(oldRe);
callbackEvent.setOldValue(oldValue);
if (logger.isDebugEnabled()) {
logger.debug("txApplyInvalidate callbackEvent={}", callbackEvent);
}
txRemoveOldIndexEntry(Operation.INVALIDATE, oldRe);
if (didDestroy) {
oldRe.txDidDestroy(owner.cacheTimeMillis());
}
if (txEvent != null) {
txEvent.addInvalidate(owner, oldRe, oldRe.getKey(), newValue,
aCallbackArgument);
}
oldRe.setValueResultOfSearch(false);
processAndGenerateTXVersionTag(callbackEvent, oldRe, txEntryState);
boolean clearOccured = false;
try {
oldRe.setValue(owner, oldRe.prepareValueForCache(owner, newValue, true));
EntryLogger.logTXInvalidate(_getOwnerObject(), key);
owner.updateSizeOnPut(key, oldSize, 0);
if (oldWasTombstone) {
owner.unscheduleTombstone(oldRe);
}
} catch (RegionClearedException rce) {
clearOccured = true;
}
owner.txApplyInvalidatePart2(oldRe, oldRe.getKey(), didDestroy, true);
// didInvalidate = true;
if (invokeCallbacks) {
switchEventOwnerAndOriginRemote(callbackEvent, hasRemoteOrigin);
pendingCallbacks.add(callbackEvent);
callbackEventInPending = true;
}
if (!clearOccured) {
lruEntryUpdate(oldRe);
}
if (shouldPerformConcurrencyChecks(owner, callbackEvent)
&& txEntryState != null) {
txEntryState.setVersionTag(callbackEvent.getVersionTag());
}
} finally {
if (!callbackEventInPending)
releaseEvent(callbackEvent);
}
}
}
}
if (!opCompleted) {
boolean invokeCallbacks = shouldInvokeCallbacks(owner, owner.isInitialized());
boolean callbackEventInPending = false;
callbackEvent = txCallbackEventFactory.createCallbackEvent(owner,
localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE, key, newValue, txId,
txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext,
txEntryState, versionTag, tailKey);
try {
callbackEvent.setRegionEntry(newRe);
txRemoveOldIndexEntry(Operation.INVALIDATE, newRe);
newRe.setValueResultOfSearch(false);
boolean clearOccured = false;
try {
processAndGenerateTXVersionTag(callbackEvent, newRe, txEntryState);
newRe.setValue(owner, newRe.prepareValueForCache(owner, newValue, true));
EntryLogger.logTXInvalidate(_getOwnerObject(), key);
owner.updateSizeOnCreate(newRe.getKey(), 0);// we are putting in a new invalidated
// entry
} catch (RegionClearedException rce) {
clearOccured = true;
}
owner.txApplyInvalidatePart2(newRe, newRe.getKey(), didDestroy, true);
if (invokeCallbacks) {
switchEventOwnerAndOriginRemote(callbackEvent, hasRemoteOrigin);
pendingCallbacks.add(callbackEvent);
callbackEventInPending = true;
}
opCompleted = true;
if (!clearOccured) {
lruEntryCreate(newRe);
incEntryCount(1);
}
if (shouldPerformConcurrencyChecks(owner, callbackEvent) && txEntryState != null) {
txEntryState.setVersionTag(callbackEvent.getVersionTag());
}
} finally {
if (!callbackEventInPending)
releaseEvent(callbackEvent);
}
}
} finally {
if (!opCompleted) {
removeEntry(key, newRe, false);
}
}
}
} else { /* !forceNewEntry */
RegionEntry re = getEntry(key);
if (re != null) {
synchronized (re) {
// Fix GEODE-3204, do not invalidate the region entry if it is a removed token
if (!Token.isRemoved(re.getValueAsToken())) {
final int oldSize = owner.calculateRegionEntryValueSize(re);
Object oldValue = re.getValueInVM(owner); // OFFHEAP eei
// Create an entry event only if the calling context is
// a receipt of a TXCommitMessage AND there are callbacks
// installed
// for this region
boolean invokeCallbacks = shouldInvokeCallbacks(owner, owner.isInitialized());
boolean callbackEventInPending = false;
callbackEvent = txCallbackEventFactory.createCallbackEvent(owner,
localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE, key, newValue, txId,
txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext,
txEntryState, versionTag, tailKey);
try {
callbackEvent.setRegionEntry(re);
callbackEvent.setOldValue(oldValue);
txRemoveOldIndexEntry(Operation.INVALIDATE, re);
if (didDestroy) {
re.txDidDestroy(owner.cacheTimeMillis());
}
if (txEvent != null) {
txEvent.addInvalidate(owner, re, re.getKey(), newValue, aCallbackArgument);
}
re.setValueResultOfSearch(false);
processAndGenerateTXVersionTag(callbackEvent, re, txEntryState);
boolean clearOccured = false;
try {
re.setValue(owner, re.prepareValueForCache(owner, newValue, true));
EntryLogger.logTXInvalidate(_getOwnerObject(), key);
owner.updateSizeOnPut(key, oldSize, 0);
} catch (RegionClearedException rce) {
clearOccured = true;
}
owner.txApplyInvalidatePart2(re, re.getKey(), didDestroy, true);
// didInvalidate = true;
if (invokeCallbacks) {
switchEventOwnerAndOriginRemote(callbackEvent, hasRemoteOrigin);
pendingCallbacks.add(callbackEvent);
callbackEventInPending = true;
}
if (!clearOccured) {
lruEntryUpdate(re);
}
if (shouldPerformConcurrencyChecks(owner, callbackEvent) && txEntryState != null) {
txEntryState.setVersionTag(callbackEvent.getVersionTag());
}
} finally {
if (!callbackEventInPending)
releaseEvent(callbackEvent);
}
return;
}
}
}
{ // re == null or region entry is removed token.
// Fix bug#43594
// In cases where bucket region is re-created, it may so happen
// that the invalidate is already applied on the Initial image
// provider, thus causing region entry to be absent.
// Notify clients with client events.
boolean callbackEventInPending = false;
callbackEvent = txCallbackEventFactory.createCallbackEvent(owner,
localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE, key, newValue, txId,
txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState,
versionTag, tailKey);
try {
switchEventOwnerAndOriginRemote(callbackEvent, hasRemoteOrigin);
pendingCallbacks.add(callbackEvent);
callbackEventInPending = true;
} finally {
if (!callbackEventInPending)
releaseEvent(callbackEvent);
}
}
}
} catch (DiskAccessException dae) {
owner.handleDiskAccessException(dae);
throw dae;
} finally {
if (locked) {
owner.unlockWhenRegionIsInitializing();
}
if (oqlIndexManager != null) {
oqlIndexManager.countDownIndexUpdaters();
}
}
}
/**
* This code needs to be evaluated. It was added quickly to help PR persistence not to consume as
* much memory.
*/
@Override
public void evictValue(Object key) {
final LocalRegion owner = _getOwner();
RegionEntry re = getEntry(key);
if (re != null) {
synchronized (re) {
if (!re.isValueNull()) {
re.setValueToNull();
owner.getDiskRegion().incNumEntriesInVM(-1L);
owner.getDiskRegion().incNumOverflowOnDisk(1L);
if (owner instanceof BucketRegion) {
((BucketRegion) owner).incNumEntriesInVM(-1L);
((BucketRegion) owner).incNumOverflowOnDisk(1L);
}
}
}
}
}
/*
* returns null if the operation fails
*/
@Override
public RegionEntry basicPut(EntryEventImpl event, final long unused, final boolean ifNew,
final boolean ifOld, final Object expectedOldValue, // only non-null if ifOld
final boolean requireOldValue, final boolean overwriteDestroyed)
throws CacheWriterException, TimeoutException {
final RegionMapPut regionMapPut =
new RegionMapPut(this, _getOwner(), this, entryEventSerialization, event, ifNew, ifOld,
overwriteDestroyed, requireOldValue, expectedOldValue);
return regionMapPut.put();
}
@Override
public void runWhileEvictionDisabled(Runnable r) {
final boolean disabled = disableLruUpdateCallback();
try {
r.run();
} finally {
if (disabled) {
enableLruUpdateCallback();
}
}
}
@Override
public void txApplyPut(Operation putOp, Object key, Object nv, boolean didDestroy,
TransactionId txId, TXRmtEvent txEvent, EventID eventId, Object aCallbackArgument,
List<EntryEventImpl> pendingCallbacks, FilterRoutingInfo filterRoutingInfo,
ClientProxyMembershipID bridgeContext, TXEntryState txEntryState, VersionTag versionTag,
long tailKey) {
assert pendingCallbacks != null;
final LocalRegion owner = _getOwner();
@Released
final EntryEventImpl callbackEvent =
createTransactionCallbackEvent(owner, putOp, key, nv, txId, txEvent, eventId,
aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
if (owner.isUsedForPartitionedRegionBucket()) {
callbackEvent.makeSerializedNewValue();
txHandleWANEvent(owner, callbackEvent, txEntryState);
}
RegionMapCommitPut commitPut = new RegionMapCommitPut(this, owner, callbackEvent, putOp,
didDestroy, txId, txEvent, pendingCallbacks, txEntryState);
commitPut.put();
}
private void txHandleWANEvent(final LocalRegion owner, EntryEventImpl callbackEvent,
TXEntryState txEntryState) {
owner.handleWANEvent(callbackEvent);
if (txEntryState != null) {
txEntryState.setTailKey(callbackEvent.getTailKey());
}
}
/**
* called from txApply* methods to process and generate versionTags.
*/
@Override
public void processAndGenerateTXVersionTag(EntryEventImpl callbackEvent, RegionEntry re,
TXEntryState txEntryState) {
final LocalRegion owner = _getOwner();
if (shouldPerformConcurrencyChecks(owner, callbackEvent)) {
try {
if (txEntryState != null && txEntryState.getRemoteVersionTag() != null) {
// to generate a version based on a remote VersionTag, we will
// have to put the remote versionTag in the regionEntry
VersionTag remoteTag = txEntryState.getRemoteVersionTag();
if (re instanceof VersionStamp) {
VersionStamp stamp = (VersionStamp) re;
stamp.setVersions(remoteTag);
}
}
processVersionTag(re, callbackEvent);
} catch (ConcurrentCacheModificationException ignore) {
// ignore this exception, however invoke callbacks for this operation
}
// For distributed transactions, stuff the next region version generated
// in phase-1 commit into the callbackEvent so that ARE.generateVersionTag can later
// just apply it and not regenerate it in phase-2 commit
if (txEntryState != null
&& txEntryState.getDistTxEntryStates() != null) {
callbackEvent.setNextRegionVersion(txEntryState.getDistTxEntryStates().getRegionVersion());
}
// callbackEvent.setNextRegionVersion(txEntryState.getNextRegionVersion());
owner.generateAndSetVersionTag(callbackEvent, re);
}
}
/**
* Checks for concurrency checks enabled on Region and that callbackEvent is not null.
*/
private boolean shouldPerformConcurrencyChecks(LocalRegion owner, EntryEventImpl callbackEvent) {
return owner.getConcurrencyChecksEnabled() && callbackEvent != null;
}
/**
* Removing the existing indexed value requires the current value in the cache, that is the one
* prior to applying the operation.
*
* @param entry the RegionEntry that contains the value prior to applying the op
*/
@Override
public void txRemoveOldIndexEntry(Operation op, RegionEntry entry) {
if ((op.isUpdate() && !entry.isInvalid()) || op.isInvalidate() || op.isDestroy()) {
IndexManager idxManager = _getOwner().getIndexManager();
if (idxManager != null) {
try {
idxManager.updateIndexes(entry, IndexManager.REMOVE_ENTRY,
op.isUpdate() ? IndexProtocol.BEFORE_UPDATE_OP : IndexProtocol.OTHER_OP);
} catch (QueryException e) {
throw new IndexMaintenanceException(e);
}
}
}
}
public void dumpMap() {
logger.info("dump of concurrent map of size {} for region {}", getEntryMap().size(),
this._getOwner());
for (Iterator it = getEntryMap().values().iterator(); it.hasNext();) {
logger.info("dumpMap:" + it.next().toString());
}
}
EntryEventImpl createTransactionCallbackEvent(final LocalRegion re, Operation op, Object key,
Object newValue, TransactionId txId, TXRmtEvent txEvent, EventID eventId,
Object aCallbackArgument, FilterRoutingInfo filterRoutingInfo,
ClientProxyMembershipID bridgeContext, TXEntryState txEntryState, VersionTag versionTag,
long tailKey) {
return txCallbackEventFactory
.createCallbackEvent(re, op, key, newValue, txId, txEvent, eventId, aCallbackArgument,
filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
}
@Override
public void writeSyncIfPresent(Object key, Runnable runner) {
RegionEntry re = getEntry(key);
if (re != null) {
final boolean disabled = disableLruUpdateCallback();
try {
synchronized (re) {
if (!re.isRemoved()) {
runner.run();
}
}
} finally {
if (disabled) {
enableLruUpdateCallback();
}
try {
lruUpdateCallback();
} catch (DiskAccessException dae) {
this._getOwner().handleDiskAccessException(dae);
throw dae;
}
}
}
}
@Override
public void removeIfDestroyed(Object key) {
LocalRegion owner = _getOwner();
// boolean makeTombstones = owner.concurrencyChecksEnabled;
DiskRegion dr = owner.getDiskRegion();
RegionEntry re = getEntry(key);
if (re != null) {
if (re.isDestroyed()) {
synchronized (re) {
if (re.isDestroyed()) {
// [bruce] destroyed entries aren't in the LRU clock, so they can't be retained here
// if (makeTombstones) {
// re.makeTombstone(owner, re.getVersionStamp().asVersionTag());
// } else {
re.removePhase2();
removeEntry(key, re, true);
}
}
}
}
// }
}
/** get version-generation permission from the region's version vector */
@Override
public void lockForCacheModification(InternalRegion owner, EntryEventImpl event) {
boolean lockedByBulkOp = event.isBulkOpInProgress() && owner.getDataPolicy().withReplication();
if (armLockTestHook != null) {
armLockTestHook.beforeLock(owner, event);
}
if (!event.isOriginRemote() && !lockedByBulkOp && !owner.hasServerProxy()) {
RegionVersionVector vector = owner.getVersionVector();
if (vector != null) {
vector.lockForCacheModification();
}
}
if (armLockTestHook != null) {
armLockTestHook.afterLock(owner, event);
}
}
/** release version-generation permission from the region's version vector */
@Override
public void releaseCacheModificationLock(InternalRegion owner, EntryEventImpl event) {
boolean lockedByBulkOp = event.isBulkOpInProgress() && owner.getDataPolicy().withReplication();
if (armLockTestHook != null)
armLockTestHook.beforeRelease(owner, event);
if (!event.isOriginRemote() && !lockedByBulkOp && !owner.hasServerProxy()) {
RegionVersionVector vector = owner.getVersionVector();
if (vector != null) {
vector.releaseCacheModificationLock();
}
}
if (armLockTestHook != null)
armLockTestHook.afterRelease(owner, event);
}
@Override
public void lockRegionForAtomicTX(InternalRegion r) {
if (armLockTestHook != null)
armLockTestHook.beforeLock(r, null);
RegionVersionVector vector = r.getVersionVector();
if (vector != null) {
vector.lockForCacheModification();
}
if (armLockTestHook != null)
armLockTestHook.afterLock(r, null);
}
@Override
public void unlockRegionForAtomicTX(InternalRegion r) {
if (armLockTestHook != null)
armLockTestHook.beforeRelease(r, null);
RegionVersionVector vector = r.getVersionVector();
if (vector != null) {
vector.releaseCacheModificationLock();
}
if (armLockTestHook != null)
armLockTestHook.afterRelease(r, null);
}
/**
* for testing race conditions between threads trying to apply ops to the same entry
*
* @param entry the entry to attempt to add to the system
*/
protected RegionEntry putEntryIfAbsentForTest(RegionEntry entry) {
return (RegionEntry) putEntryIfAbsent(entry.getKey(), entry);
}
@Override
public boolean isTombstoneNotNeeded(RegionEntry re, int destroyedVersion) {
// no need for synchronization - stale values are okay here
// TODO this looks like a problem for regionEntry pooling
if (getEntry(re.getKey()) != re) {
// region entry was either removed (null)
// or changed to a different region entry.
// In either case the old tombstone is no longer needed.
return true;
}
if (!re.isTombstone()) {
// if the region entry no longer contains a tombstone
// then the old tombstone is no longer needed
return true;
}
VersionStamp<?> vs = re.getVersionStamp();
if (vs == null) {
// if we have no VersionStamp why were we even added as a tombstone?
// We used to see an NPE here. See bug 52092.
logger.error(
"Unexpected RegionEntry scheduled as tombstone: re.getClass {} destroyedVersion {}",
re.getClass(), destroyedVersion);
return true;
}
if (vs.getEntryVersion() != destroyedVersion) {
// the version changed so old tombstone no longer needed
return true;
}
// region entry still has the same tombstone so we need to keep it.
return false;
}
/** removes a tombstone that has expired locally */
@Override
public boolean removeTombstone(RegionEntry re, VersionHolder version, boolean isEviction,
boolean isScheduledTombstone) {
boolean result = false;
int destroyedVersion = version.getEntryVersion();
synchronized (this._getOwner().getSizeGuard()) { // do this sync first; see bug 51985
synchronized (re) {
int entryVersion = re.getVersionStamp().getEntryVersion();
if (!re.isTombstone() || entryVersion > destroyedVersion) {
if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT_VERBOSE)) {
logger.trace(LogMarker.TOMBSTONE_COUNT_VERBOSE,
"tombstone for {} was resurrected with v{}; destroyed version was v{}; count is {}; entryMap size is {}",
re.getKey(), re.getVersionStamp().getEntryVersion(), destroyedVersion,
this._getOwner().getTombstoneCount(), size());
}
} else {
if (logger.isTraceEnabled(LogMarker.TOMBSTONE_COUNT_VERBOSE)) {
if (entryVersion == destroyedVersion) {
// logging this can put tremendous pressure on the log writer in tests
// that "wait for silence"
logger.trace(LogMarker.TOMBSTONE_COUNT_VERBOSE,
"removing tombstone for {} with v{} rv{}; count is {}", re.getKey(),
destroyedVersion, version.getRegionVersion(),
(this._getOwner().getTombstoneCount() - 1));
} else {
logger.trace(LogMarker.TOMBSTONE_COUNT_VERBOSE,
"removing entry (v{}) that is older than an expiring tombstone (v{} rv{}) for {}",
entryVersion, destroyedVersion, version.getRegionVersion(), re.getKey());
}
}
try {
re.setValue(_getOwner(), Token.REMOVED_PHASE2);
if (removeTombstone(re)) {
_getOwner().cancelExpiryTask(re);
result = true;
incEntryCount(-1);
// Bug 51118: When the method is called by tombstoneGC thread, current 're' is an
// expired tombstone. Then we detected an destroyed (due to overwritingOldTombstone()
// returns true earlier) tombstone with bigger entry version, it's safe to delete
// current tombstone 're' and adjust the tombstone count.
// lruEntryDestroy(re); // tombstones are invisible to LRU
if (isScheduledTombstone) {
_getOwner().incTombstoneCount(-1);
}
RegionVersionVector vector = _getOwner().getVersionVector();
if (vector != null) {
vector.recordGCVersion(version.getMemberID(), version.getRegionVersion());
}
}
} catch (RegionClearedException e) {
// if the region has been cleared we don't need to remove the tombstone
} catch (RegionDestroyedException e) {
// if the region has been destroyed, the tombstone is already
// gone. Catch an exception to avoid an error from the GC thread.
}
}
}
}
return result;
}
private boolean removeTombstone(RegionEntry re) {
return getEntryMap().remove(re.getKey(), re);
}
// method used for debugging tombstone count issues
public boolean verifyTombstoneCount(AtomicInteger numTombstones) {
int deadEntries = 0;
try {
for (Iterator it = getEntryMap().values().iterator(); it.hasNext();) {
RegionEntry re = (RegionEntry) it.next();
if (re.isTombstone()) {
deadEntries++;
}
}
if (deadEntries != numTombstones.get()) {
if (logger.isDebugEnabled()) {
logger.debug("tombstone count ({}) does not match actual number of tombstones ({})",
numTombstones, deadEntries, new Exception());
}
return false;
} else {
if (logger.isDebugEnabled()) {
logger.debug("tombstone count verified");
}
}
} catch (Exception e) {
// ignore
}
return true;
}
@Override
public int getEntryOverhead() {
return (int) ReflectionSingleObjectSizer.sizeof(getEntryFactory().getEntryClass());
}
private ARMLockTestHook armLockTestHook;
@Override
public ARMLockTestHook getARMLockTestHook() {
return armLockTestHook;
}
public void setARMLockTestHook(ARMLockTestHook theHook) {
armLockTestHook = theHook;
}
}