blob: 809996bb62862895c4de64713ef3347885d3d5b9 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import java.io.IOException;
import java.util.Arrays;
import org.apache.logging.log4j.Logger;
import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_FILL_IN_VALUE;
import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.InvalidDeltaException;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.CacheWriterException;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.TimeoutException;
import com.gemstone.gemfire.cache.query.IndexMaintenanceException;
import com.gemstone.gemfire.cache.query.QueryException;
import com.gemstone.gemfire.cache.query.internal.index.IndexManager;
import com.gemstone.gemfire.cache.query.internal.index.IndexProtocol;
import com.gemstone.gemfire.cache.util.GatewayConflictHelper;
import com.gemstone.gemfire.cache.util.GatewayConflictResolver;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.ByteArrayDataInput;
import com.gemstone.gemfire.internal.HeapDataOutputStream;
import com.gemstone.gemfire.internal.InternalDataSerializer;
import com.gemstone.gemfire.internal.InternalStatisticsDisabledException;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.cache.lru.LRUClockNode;
import com.gemstone.gemfire.internal.cache.lru.NewLRUClockHand;
import com.gemstone.gemfire.internal.cache.persistence.DiskStoreID;
import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationException;
import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
import com.gemstone.gemfire.internal.cache.versions.VersionSource;
import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.lang.StringUtils;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
import com.gemstone.gemfire.internal.offheap.MemoryAllocator;
import com.gemstone.gemfire.internal.offheap.OffHeapCachedDeserializable;
import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.Chunk;
import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.ChunkType;
import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.GemFireChunk;
import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.GemFireChunkType;
import com.gemstone.gemfire.internal.offheap.StoredObject;
import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.internal.offheap.annotations.Retained;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
import com.gemstone.gemfire.internal.util.BlobHelper;
import com.gemstone.gemfire.internal.util.Versionable;
import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap;
import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap.HashEntry;
import com.gemstone.gemfire.pdx.PdxInstance;
import com.gemstone.gemfire.pdx.PdxSerializable;
import com.gemstone.gemfire.pdx.PdxSerializationException;
import com.gemstone.gemfire.pdx.PdxSerializer;
import com.gemstone.gemfire.pdx.internal.ConvertableToBytes;
import com.gemstone.gemfire.pdx.internal.PdxInstanceImpl;
/**
* Abstract implementation class of RegionEntry interface.
* This is the topmost implementation class so common behavior
* lives here.
*
* @since 3.5.1
*
* @author Darrel Schneider
* @author bruce
*
*/
public abstract class AbstractRegionEntry implements RegionEntry,
HashEntry<Object, Object> {
private static final Logger logger = LogService.getLogger();
/**
* Whether to disable last access time update when a put occurs. The default
* is false (enable last access time update on put). To disable it, set the
* 'gemfire.disableAccessTimeUpdateOnPut' system property.
*/
protected static final boolean DISABLE_ACCESS_TIME_UPDATE_ON_PUT = Boolean
.getBoolean("gemfire.disableAccessTimeUpdateOnPut");
/*
* Flags for a Region Entry.
* These flags are stored in the msb of the long used to also store the lastModicationTime.
*/
private static final long VALUE_RESULT_OF_SEARCH = 0x01L<<56;
private static final long UPDATE_IN_PROGRESS = 0x02L<<56;
private static final long TOMBSTONE_SCHEDULED = 0x04L<<56;
private static final long LISTENER_INVOCATION_IN_PROGRESS = 0x08L<<56;
/** used for LRUEntry instances. */
protected static final long RECENTLY_USED = 0x10L<<56;
/** used for LRUEntry instances. */
protected static final long EVICTED = 0x20L<<56;
/**
* Set if the entry is being used by a transactions.
* Some features (eviction and expiration) will not modify an entry when a tx is using it
* to prevent the tx to fail do to conflict.
*/
protected static final long IN_USE_BY_TX = 0x40L<<56;
protected static final long MARKED_FOR_EVICTION = 0x80L<<56;
// public Exception removeTrace; // debugging hot loop in AbstractRegionMap.basicPut()
protected AbstractRegionEntry(RegionEntryContext context,
@Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) Object value) {
setValue(context,this.prepareValueForCache(context, value, false),false);
// setLastModified(System.currentTimeMillis()); [bruce] this must be set later so we can use ==0 to know this is a new entry in checkForConflicts
}
/////////////////////////////////////////////////////////////////////
////////////////////////// instance methods /////////////////////////
/////////////////////////////////////////////////////////////////////
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IMSE_DONT_CATCH_IMSE")
public boolean dispatchListenerEvents(final EntryEventImpl event) throws InterruptedException {
final LocalRegion rgn = event.getRegion();
if (event.callbacksInvoked()) {
return true;
}
// don't wait for certain events to reach the head of the queue before
// dispatching listeners. However, we must not notify the gateways for
// remote-origin ops out of order. Otherwise the other systems will have
// inconsistent content.
event.setCallbacksInvokedByCurrentThread();
if (logger.isDebugEnabled()) {
logger.debug("{} dispatching event {}", this, event);
}
// All the following code that sets "thr" is to workaround
// spurious IllegalMonitorStateExceptions caused by JVM bugs.
try {
// call invokeCallbacks while synced on RegionEntry
event.invokeCallbacks(rgn, event.inhibitCacheListenerNotification(), false);
return true;
} finally {
if (isRemoved() && !isTombstone() && !event.isEvicted()) {
// Phase 2 of region entry removal is done here. The first phase is done
// by the RegionMap. It is unclear why this code is needed. ARM destroy
// does this also and we are now doing it as phase3 of the ARM destroy.
removePhase2();
rgn.getRegionMap().removeEntry(event.getKey(), this, true, event, rgn, rgn.getIndexUpdater());
}
}
}
public long getLastAccessed() throws InternalStatisticsDisabledException {
throw new InternalStatisticsDisabledException();
}
public long getHitCount() throws InternalStatisticsDisabledException {
throw new InternalStatisticsDisabledException();
}
public long getMissCount() throws InternalStatisticsDisabledException {
throw new InternalStatisticsDisabledException();
}
protected void setLastModified(long lastModified) {
_setLastModified(lastModified);
}
public void txDidDestroy(long currTime) {
setLastModified(currTime);
}
public final void updateStatsForPut(long lastModifiedTime) {
setLastModified(lastModifiedTime);
}
public void setRecentlyUsed() {
// do nothing by default; only needed for LRU
}
public void updateStatsForGet(boolean hit, long time) {
// nothing needed
}
public void resetCounts() throws InternalStatisticsDisabledException {
throw new InternalStatisticsDisabledException();
}
public void _removePhase1() {
_setValue(Token.REMOVED_PHASE1);
// debugging for 38467 (hot thread in ARM.basicUpdate)
// this.removeTrace = new Exception("stack trace for thread " + Thread.currentThread());
}
public void removePhase1(LocalRegion r, boolean isClear) throws RegionClearedException {
_removePhase1();
}
public void removePhase2() {
_setValue(Token.REMOVED_PHASE2);
// this.removeTrace = new Exception("stack trace for thread " + Thread.currentThread());
}
public void makeTombstone(LocalRegion r, VersionTag version) throws RegionClearedException {
assert r.getVersionVector() != null;
assert version != null;
if (r.getServerProxy() == null &&
r.getVersionVector().isTombstoneTooOld(version.getMemberID(), version.getRegionVersion())) {
// distributed gc with higher vector version preempts this operation
if (!isTombstone()) {
setValue(r, Token.TOMBSTONE);
r.incTombstoneCount(1);
}
r.getRegionMap().removeTombstone(this, version, false, true);
} else {
if (isTombstone()) {
// unschedule the old tombstone
r.unscheduleTombstone(this);
}
setRecentlyUsed();
boolean newEntry = (getValueAsToken() == Token.REMOVED_PHASE1);
setValue(r, Token.TOMBSTONE);
r.scheduleTombstone(this, version);
if (newEntry) {
// bug #46631 - entry count is decremented by scheduleTombstone but this is a new entry
r.getCachePerfStats().incEntryCount(1);
}
}
}
@Override
public void setValueWithTombstoneCheck(@Unretained Object v, EntryEvent e) throws RegionClearedException {
if (v == Token.TOMBSTONE) {
makeTombstone((LocalRegion)e.getRegion(), ((EntryEventImpl)e).getVersionTag());
} else {
setValue((LocalRegion)e.getRegion(), v, (EntryEventImpl)e);
}
}
/**
* Return true if the object is removed.
*
* TODO this method does NOT return true if the object
* is Token.DESTROYED. dispatchListenerEvents relies on that
* fact to avoid removing destroyed tokens from the map.
* We should refactor so that this method calls Token.isRemoved,
* and places that don't want a destroyed Token can explicitly check
* for a DESTROY token.
*/
public final boolean isRemoved() {
Token o = getValueAsToken();
return (o == Token.REMOVED_PHASE1) || (o == Token.REMOVED_PHASE2) || (o == Token.TOMBSTONE);
}
public final boolean isDestroyedOrRemoved() {
return Token.isRemoved(getValueAsToken());
}
public final boolean isDestroyedOrRemovedButNotTombstone() {
Token o = getValueAsToken();
return o == Token.DESTROYED || o == Token.REMOVED_PHASE1 || o == Token.REMOVED_PHASE2;
}
public final boolean isTombstone() {
return getValueAsToken() == Token.TOMBSTONE;
}
public final boolean isRemovedPhase2() {
return getValueAsToken() == Token.REMOVED_PHASE2;
}
public boolean fillInValue(LocalRegion region,
@Retained(ABSTRACT_REGION_ENTRY_FILL_IN_VALUE) InitialImageOperation.Entry dst,
ByteArrayDataInput in,
DM mgr)
{
dst.setSerialized(false); // starting default value
@Retained(ABSTRACT_REGION_ENTRY_FILL_IN_VALUE) final Object v;
if (isTombstone()) {
v = Token.TOMBSTONE;
} else {
v = getValue(region); // OFFHEAP: need to incrc, copy bytes, decrc
if (v == null) {
return false;
}
}
final boolean isEagerDeserialize = dst.isEagerDeserialize();
if (isEagerDeserialize) {
dst.clearEagerDeserialize();
}
dst.setLastModified(mgr, getLastModified()); // fix for bug 31059
if (v == Token.INVALID) {
dst.setInvalid();
}
else if (v == Token.LOCAL_INVALID) {
dst.setLocalInvalid();
}
else if (v == Token.TOMBSTONE) {
dst.setTombstone();
}
else if (v instanceof CachedDeserializable) {
// don't serialize here if it is not already serialized
// if(v instanceof ByteSource && CachedDeserializableFactory.preferObject()) {
// // For SQLFire we prefer eager deserialized
// dst.setEagerDeserialize();
// }
if (v instanceof StoredObject && !((StoredObject) v).isSerialized()) {
dst.value = ((StoredObject) v).getDeserializedForReading();
} else {
/*if (v instanceof ByteSource && CachedDeserializableFactory.preferObject()) {
dst.value = v;
} else */ {
Object tmp = ((CachedDeserializable) v).getValue();
if (tmp instanceof byte[]) {
byte[] bb = (byte[]) tmp;
dst.value = bb;
} else {
try {
HeapDataOutputStream hdos = new HeapDataOutputStream(
Version.CURRENT);
BlobHelper.serializeTo(tmp, hdos);
hdos.trim();
dst.value = hdos;
} catch (IOException e) {
RuntimeException e2 = new IllegalArgumentException(
LocalizedStrings.AbstractRegionEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING
.toLocalizedString());
e2.initCause(e);
throw e2;
}
}
dst.setSerialized(true);
}
}
}
else if (v instanceof byte[]) {
dst.value = v;
}
else {
Object preparedValue = v;
if (preparedValue != null) {
preparedValue = prepareValueForGII(preparedValue);
if (preparedValue == null) {
return false;
}
}
if (CachedDeserializableFactory.preferObject()) {
dst.value = preparedValue;
dst.setEagerDeserialize();
}
else {
try {
HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
BlobHelper.serializeTo(preparedValue, hdos);
hdos.trim();
dst.value = hdos;
dst.setSerialized(true);
} catch (IOException e) {
RuntimeException e2 = new IllegalArgumentException(LocalizedStrings.AbstractRegionEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING.toLocalizedString());
e2.initCause(e);
throw e2;
}
}
}
return true;
}
/**
* To fix bug 49901 if v is a GatewaySenderEventImpl then make
* a heap copy of it if it is offheap.
* @return the value to provide to the gii request; null if no value should be provided.
*/
public static Object prepareValueForGII(Object v) {
assert v != null;
if (v instanceof GatewaySenderEventImpl) {
return ((GatewaySenderEventImpl) v).makeHeapCopyIfOffHeap();
} else {
return v;
}
}
public boolean isOverflowedToDisk(LocalRegion r, DistributedRegion.DiskPosition dp) {
return false;
}
@Override
public Object getValue(RegionEntryContext context) {
SimpleMemoryAllocatorImpl.createReferenceCountOwner();
@Retained Object result = _getValueRetain(context, true);
//Asif: If the thread is an Index Creation Thread & the value obtained is
//Token.REMOVED , we can skip synchronization block. This is required to prevent
// the dead lock caused if an Index Update Thread has gone into a wait holding the
// lock of the Entry object. There should not be an issue if the Index creation thread
// gets the temporary value of token.REMOVED as the correct value will get indexed
// by the Index Update Thread , once the index creation thread has exited.
// Part of Bugfix # 33336
// if ((result == Token.REMOVED_PHASE1 || result == Token.REMOVED_PHASE2) && !r.isIndexCreationThread()) {
// synchronized (this) {
// result = _getValue();
// }
// }
if (Token.isRemoved(result)) {
SimpleMemoryAllocatorImpl.setReferenceCountOwner(null);
return null;
} else {
result = OffHeapHelper.copyAndReleaseIfNeeded(result); // sqlf does not dec ref count in this call
SimpleMemoryAllocatorImpl.setReferenceCountOwner(null);
setRecentlyUsed();
return result;
}
}
@Override
@Retained
public Object getValueRetain(RegionEntryContext context) {
@Retained Object result = _getValueRetain(context, true);
if (Token.isRemoved(result)) {
return null;
} else {
setRecentlyUsed();
return result;
}
}
@Override
@Released
public void setValue(RegionEntryContext context, @Unretained Object value) throws RegionClearedException {
// @todo darrel: This will mark new entries as being recently used
// It might be better to only mark them when they are modified.
// Or should we only mark them on reads?
setValue(context,value,true);
}
@Override
public void setValue(RegionEntryContext context, Object value, EntryEventImpl event) throws RegionClearedException {
setValue(context,value);
}
@Released
protected void setValue(RegionEntryContext context, @Unretained Object value, boolean recentlyUsed) {
_setValue(value);
if (value != null && context != null && (this instanceof OffHeapRegionEntry)
&& context instanceof LocalRegion && ((LocalRegion)context).isThisRegionBeingClosedOrDestroyed()) {
((OffHeapRegionEntry)this).release();
((LocalRegion)context).checkReadiness();
}
if (recentlyUsed) {
setRecentlyUsed();
}
}
/**
* This method determines if the value is in a compressed representation and decompresses it if it is.
*
* @param context the values context.
* @param value a region entry value.
*
* @return the decompressed form of the value parameter.
*/
static Object decompress(RegionEntryContext context,Object value) {
if(isCompressible(context, value)) {
long time = context.getCachePerfStats().startDecompression();
value = EntryEventImpl.deserialize(context.getCompressor().decompress((byte[]) value));
context.getCachePerfStats().endDecompression(time);
}
return value;
}
static protected Object compress(RegionEntryContext context,Object value) {
return compress(context, value, null);
}
/**
* This method determines if the value is compressible and compresses it if it is.
*
* @param context the values context.
* @param value a region entry value.
*
* @return the compressed form of the value parameter.
*/
static protected Object compress(RegionEntryContext context,Object value, EntryEventImpl event) {
if(isCompressible(context, value)) {
long time = context.getCachePerfStats().startCompression();
byte[] serializedValue;
if (event != null && event.getCachedSerializedNewValue() != null) {
serializedValue = event.getCachedSerializedNewValue();
if (value instanceof CachedDeserializable) {
CachedDeserializable cd = (CachedDeserializable) value;
if (!(cd.getValue() instanceof byte[])) {
// The cd now has the object form so use the cached serialized form in a new cd.
// This serialization is much cheaper than reserializing the object form.
serializedValue = EntryEventImpl.serialize(CachedDeserializableFactory.create(serializedValue));
} else {
serializedValue = EntryEventImpl.serialize(cd);
}
}
} else {
serializedValue = EntryEventImpl.serialize(value);
if (event != null && !(value instanceof byte[])) {
// See if we can cache the serialized new value in the event.
// If value is a byte[] then we don't cache it since it is not serialized.
if (value instanceof CachedDeserializable) {
// For a CacheDeserializable we want to only cache the wrapped value;
// not the serialized CacheDeserializable.
CachedDeserializable cd = (CachedDeserializable) value;
Object cdVal = cd.getValue();
if (cdVal instanceof byte[]) {
event.setCachedSerializedNewValue((byte[])cdVal);
}
} else {
event.setCachedSerializedNewValue(serializedValue);
}
}
}
value = context.getCompressor().compress(serializedValue);
context.getCachePerfStats().endCompression(time, serializedValue.length, ((byte []) value).length);
}
return value;
}
private static byte[] compressBytes(RegionEntryContext context, byte[] uncompressedBytes) {
byte[] result = uncompressedBytes;
if (isCompressible(context, uncompressedBytes)) {
long time = context.getCachePerfStats().startCompression();
result = context.getCompressor().compress(uncompressedBytes);
context.getCachePerfStats().endCompression(time, uncompressedBytes.length, result.length);
}
return result;
}
@Retained
public final Object getValueInVM(RegionEntryContext context) {
SimpleMemoryAllocatorImpl.createReferenceCountOwner();
@Retained Object v = _getValueRetain(context, true);
if (v == null) { // should only be possible if disk entry
v = Token.NOT_AVAILABLE;
}
@Retained Object result = OffHeapHelper.copyAndReleaseIfNeeded(v); // TODO OFFHEAP keep it offheap?
SimpleMemoryAllocatorImpl.setReferenceCountOwner(null);
return result;
}
@Retained
public Object getValueInVMOrDiskWithoutFaultIn(LocalRegion owner) {
return getValueInVM(owner);
}
@Override
@Retained
public Object getValueOffHeapOrDiskWithoutFaultIn(LocalRegion owner) {
@Retained Object result = _getValueRetain(owner, true);
// if (result instanceof ByteSource) {
// // If the ByteSource contains a Delta or ListOfDelta then we want to deserialize it
// Object deserVal = ((CachedDeserializable)result).getDeserializedForReading();
// if (deserVal != result) {
// OffHeapHelper.release(result);
// result = deserVal;
// }
// }
return result;
}
public Object getValueOnDisk(LocalRegion r)
throws EntryNotFoundException
{
throw new IllegalStateException(LocalizedStrings.AbstractRegionEntry_CANNOT_GET_VALUE_ON_DISK_FOR_A_REGION_THAT_DOES_NOT_ACCESS_THE_DISK.toLocalizedString());
}
public Object getSerializedValueOnDisk(final LocalRegion r)
throws EntryNotFoundException
{
throw new IllegalStateException(LocalizedStrings.AbstractRegionEntry_CANNOT_GET_VALUE_ON_DISK_FOR_A_REGION_THAT_DOES_NOT_ACCESS_THE_DISK.toLocalizedString());
}
public Object getValueOnDiskOrBuffer(LocalRegion r)
throws EntryNotFoundException
{
throw new IllegalStateException(LocalizedStrings.AbstractRegionEntry_CANNOT_GET_VALUE_ON_DISK_FOR_A_REGION_THAT_DOES_NOT_ACCESS_THE_DISK.toLocalizedString());
// @todo darrel if value is Token.REMOVED || Token.DESTROYED throw EntryNotFoundException
}
public final boolean initialImagePut(final LocalRegion region,
final long lastModifiedTime,
Object newValue,
boolean wasRecovered,
boolean versionTagAccepted) throws RegionClearedException
{
// note that the caller has already write synced this RegionEntry
return initialImageInit(region, lastModifiedTime, newValue, this.isTombstone(), wasRecovered, versionTagAccepted);
}
public boolean initialImageInit(final LocalRegion region,
final long lastModifiedTime,
final Object newValue,
final boolean create,
final boolean wasRecovered,
final boolean versionTagAccepted) throws RegionClearedException
{
// note that the caller has already write synced this RegionEntry
boolean result = false;
// if it has been destroyed then don't do anything
Token vTok = getValueAsToken();
if (versionTagAccepted || create || (vTok != Token.DESTROYED || vTok != Token.TOMBSTONE)) { // OFFHEAP noop
Object newValueToWrite = newValue;
boolean putValue = versionTagAccepted || create
|| (newValueToWrite != Token.LOCAL_INVALID
&& (wasRecovered || (vTok == Token.LOCAL_INVALID))); // OFFHEAP noop
if (region.isUsedForPartitionedRegionAdmin() && newValueToWrite instanceof CachedDeserializable) {
// Special case for partitioned region meta data
// We do not need the RegionEntry on this case.
// Because the pr meta data region will not have an LRU.
newValueToWrite = ((CachedDeserializable) newValueToWrite).getDeserializedValue(region, null);
if (!create && newValueToWrite instanceof Versionable) {
@Retained @Released final Object oldValue = getValueInVM(region); // Heap value should always be deserialized at this point // OFFHEAP will not be deserialized
try {
// BUGFIX for 35029. If oldValue is null the newValue should be put.
if(oldValue == null) {
putValue = true;
}
else if (oldValue instanceof Versionable) {
Versionable nv = (Versionable) newValueToWrite;
Versionable ov = (Versionable) oldValue;
putValue = nv.isNewerThan(ov);
}
} finally {
OffHeapHelper.release(oldValue);
}
}
}
if (putValue) {
// change to INVALID if region itself has been invalidated,
// and current value is recovered
if (create || versionTagAccepted) {
// At this point, since we now always recover from disk first,
// we only care about "isCreate" since "isRecovered" is impossible
// if we had a regionInvalidate or regionClear
ImageState imageState = region.getImageState();
// this method is called during loadSnapshot as well as getInitialImage
if (imageState.getRegionInvalidated()) {
if (newValueToWrite != Token.TOMBSTONE) {
newValueToWrite = Token.INVALID;
}
}
else if (imageState.getClearRegionFlag()) {
boolean entryOK = false;
RegionVersionVector rvv = imageState.getClearRegionVersionVector();
if (rvv != null) { // a filtered clear
VersionSource id = getVersionStamp().getMemberID();
if (id == null) {
id = region.getVersionMember();
}
if (!rvv.contains(id, getVersionStamp().getRegionVersion())) {
entryOK = true;
}
}
if (!entryOK) {
//Asif: If the region has been issued cleared during
// the GII , then those entries loaded before this one would have
// been cleared from the Map due to clear operation & for the
// currententry whose key may have escaped the clearance , will be
// cleansed by the destroy token.
newValueToWrite = Token.DESTROYED;
imageState.addDestroyedEntry(this.getKey());
throw new RegionClearedException(LocalizedStrings.AbstractRegionEntry_DURING_THE_GII_PUT_OF_ENTRY_THE_REGION_GOT_CLEARED_SO_ABORTING_THE_OPERATION.toLocalizedString());
}
}
}
setValue(region, this.prepareValueForCache(region, newValueToWrite, false));
result = true;
if (newValueToWrite != Token.TOMBSTONE){
if (create) {
region.getCachePerfStats().incCreates();
}
region.updateStatsForPut(this, lastModifiedTime, false);
}
if (logger.isTraceEnabled()) {
if (newValueToWrite instanceof CachedDeserializable) {
logger.trace("ProcessChunk: region={}; put a CachedDeserializable ({},{})",
region.getFullPath(), getKey(),((CachedDeserializable)newValueToWrite).getStringForm());
}
else {
logger.trace("ProcessChunk: region={}; put({},{})", region.getFullPath(), getKey(), StringUtils.forceToString(newValueToWrite));
}
}
}
}
return result;
}
/**
* @throws EntryNotFoundException if expectedOldValue is
* not null and is not equal to current value
*/
@Released
public final boolean destroy(LocalRegion region,
EntryEventImpl event,
boolean inTokenMode,
boolean cacheWrite,
@Unretained Object expectedOldValue,
boolean forceDestroy,
boolean removeRecoveredEntry)
throws CacheWriterException,
EntryNotFoundException,
TimeoutException,
RegionClearedException {
boolean proceed = false;
{
// A design decision was made to not retrieve the old value from the disk
// if the entry has been evicted to only have the CacheListener afterDestroy
// method ignore it. We don't want to pay the performance penalty. The
// getValueInVM method does not retrieve the value from disk if it has been
// evicted. Instead, it uses the NotAvailable token.
//
// If the region is a WAN queue region, the old value is actually used by the
// afterDestroy callback on a secondary. It is not needed on a primary.
// Since the destroy that sets WAN_QUEUE_TOKEN always originates on the primary
// we only pay attention to WAN_QUEUE_TOKEN if the event is originRemote.
//
// :ezoerner:20080814 We also read old value from disk or buffer
// in the case where there is a non-null expectedOldValue
// see PartitionedRegion#remove(Object key, Object value)
SimpleMemoryAllocatorImpl.skipRefCountTracking();
@Retained @Released Object curValue = _getValueRetain(region, true);
SimpleMemoryAllocatorImpl.unskipRefCountTracking();
try {
if (curValue == null) curValue = Token.NOT_AVAILABLE;
if (curValue == Token.NOT_AVAILABLE) {
// In some cases we need to get the current value off of disk.
// if the event is transmitted during GII and has an old value, it was
// the state of the transmitting cache's entry & should be used here
if (event.getCallbackArgument() != null
&& event.getCallbackArgument().equals(RegionQueue.WAN_QUEUE_TOKEN)
&& event.isOriginRemote()) { // check originRemote for bug 40508
//curValue = getValue(region); can cause deadlock if GII is occurring
curValue = getValueOnDiskOrBuffer(region);
}
else {
FilterProfile fp = region.getFilterProfile();
// rdubey: Old value also required for SqlfIndexManager.
if (fp != null && ((fp.getCqCount() > 0) || expectedOldValue != null
|| event.getRegion().getIndexUpdater() != null)) {
//curValue = getValue(region); can cause deadlock will fault in the value
// and will confuse LRU. rdubey.
curValue = getValueOnDiskOrBuffer(region);
}
}
}
if (expectedOldValue != null) {
if (!checkExpectedOldValue(expectedOldValue, curValue, region)) {
throw new EntryNotFoundException(
LocalizedStrings.AbstractRegionEntry_THE_CURRENT_VALUE_WAS_NOT_EQUAL_TO_EXPECTED_VALUE.toLocalizedString());
}
}
if (inTokenMode && event.hasOldValue()) {
proceed = true;
}
else {
proceed = event.setOldValue(curValue, curValue instanceof GatewaySenderEventImpl) || removeRecoveredEntry
|| forceDestroy || region.getConcurrencyChecksEnabled() // fix for bug #47868 - create a tombstone
|| (event.getOperation() == Operation.REMOVE // fix for bug #42242
&& (curValue == null || curValue == Token.LOCAL_INVALID
|| curValue == Token.INVALID));
}
} finally {
OffHeapHelper.releaseWithNoTracking(curValue);
}
} // end curValue block
if (proceed) {
//Generate the version tag if needed. This method should only be
//called if we are in fact going to destroy the entry, so it must be
//after the entry not found exception above.
if(!removeRecoveredEntry) {
region.generateAndSetVersionTag(event, this);
}
if (cacheWrite) {
region.cacheWriteBeforeDestroy(event, expectedOldValue);
if (event.getRegion().getServerProxy() != null) { // server will return a version tag
// update version information (may throw ConcurrentCacheModificationException)
VersionStamp stamp = getVersionStamp();
if (stamp != null) {
stamp.processVersionTag(event);
}
}
}
region.recordEvent(event);
// don't do index maintenance on a destroy if the value in the
// RegionEntry (the old value) is invalid
if (!region.isProxy() && !isInvalid()) {
IndexManager indexManager = region.getIndexManager();
if (indexManager != null) {
try {
if(isValueNull()) {
@Released Object value = getValueOffHeapOrDiskWithoutFaultIn(region);
try {
_setValue(prepareValueForCache(region, value, false));
if (value != null && region != null && (this instanceof OffHeapRegionEntry) && region.isThisRegionBeingClosedOrDestroyed()) {
((OffHeapRegionEntry)this).release();
region.checkReadiness();
}
} finally {
OffHeapHelper.release(value);
}
}
indexManager.updateIndexes(this,
IndexManager.REMOVE_ENTRY,
IndexProtocol.OTHER_OP);
}
catch (QueryException e) {
throw new IndexMaintenanceException(e);
}
}
}
boolean removeEntry = false;
VersionTag v = event.getVersionTag();
if (region.concurrencyChecksEnabled && !removeRecoveredEntry
&& !event.isFromRILocalDestroy()) { // bug #46780, don't retain tombstones for entries destroyed for register-interest
// Destroy will write a tombstone instead
if (v == null || !v.hasValidVersion()) {
// localDestroy and eviction and ops received with no version tag
// should create a tombstone using the existing version stamp, as should
// (bug #45245) responses from servers that do not have valid version information
VersionStamp stamp = this.getVersionStamp();
if (stamp != null) { // proxy has no stamps
v = stamp.asVersionTag();
event.setVersionTag(v);
}
}
removeEntry = (v == null) || !v.hasValidVersion();
} else {
removeEntry = true;
}
// See #47887, we do not insert a tombstone for evicted HDFS
// entries since the value is still present in HDFS
// Check if we have to evict or just do destroy.
boolean forceRemoveEntry =
(event.isEviction() || event.isExpiration())
&& event.getRegion().isUsedForPartitionedRegionBucket()
&& event.getRegion().getPartitionedRegion().isHDFSRegion();
if (removeEntry || forceRemoveEntry) {
boolean isThisTombstone = isTombstone();
if(inTokenMode && !event.getOperation().isEviction()) {
setValue(region, Token.DESTROYED);
} else {
removePhase1(region, false);
}
if (isThisTombstone) {
region.unscheduleTombstone(this);
}
} else {
makeTombstone(region, v);
}
return true;
}
else {
return false;
}
}
static boolean checkExpectedOldValue(@Unretained Object expectedOldValue, @Unretained Object actualValue, LocalRegion lr) {
if (Token.isInvalid(expectedOldValue)) {
return (actualValue == null) || Token.isInvalid(actualValue);
} else {
boolean isCompressedOffHeap = lr.getAttributes().getOffHeap() && lr.getAttributes().getCompressor() != null;
return checkEquals(expectedOldValue, actualValue, isCompressedOffHeap);
}
}
private static boolean basicEquals(Object v1, Object v2) {
if (v2 != null) {
if (v2.getClass().isArray()) {
// fix for 52093
if (v2 instanceof byte[]) {
if (v1 instanceof byte[]) {
return Arrays.equals((byte[])v2, (byte[])v1);
} else {
return false;
}
} else if (v2 instanceof Object[]) {
if (v1 instanceof Object[]) {
return Arrays.deepEquals((Object[])v2, (Object[])v1);
} else {
return false;
}
} else if (v2 instanceof int[]) {
if (v1 instanceof int[]) {
return Arrays.equals((int[])v2, (int[])v1);
} else {
return false;
}
} else if (v2 instanceof long[]) {
if (v1 instanceof long[]) {
return Arrays.equals((long[])v2, (long[])v1);
} else {
return false;
}
} else if (v2 instanceof boolean[]) {
if (v1 instanceof boolean[]) {
return Arrays.equals((boolean[])v2, (boolean[])v1);
} else {
return false;
}
} else if (v2 instanceof short[]) {
if (v1 instanceof short[]) {
return Arrays.equals((short[])v2, (short[])v1);
} else {
return false;
}
} else if (v2 instanceof char[]) {
if (v1 instanceof char[]) {
return Arrays.equals((char[])v2, (char[])v1);
} else {
return false;
}
} else if (v2 instanceof float[]) {
if (v1 instanceof float[]) {
return Arrays.equals((float[])v2, (float[])v1);
} else {
return false;
}
} else if (v2 instanceof double[]) {
if (v1 instanceof double[]) {
return Arrays.equals((double[])v2, (double[])v1);
} else {
return false;
}
}
// fall through and call equals method
}
return v2.equals(v1);
} else {
return v1 == null;
}
}
static boolean checkEquals(@Unretained Object v1, @Unretained Object v2, boolean isCompressedOffHeap) {
// need to give PdxInstance#equals priority
if (v1 instanceof PdxInstance) {
return checkPdxEquals((PdxInstance)v1, v2);
} else if (v2 instanceof PdxInstance) {
return checkPdxEquals((PdxInstance)v2, v1);
} else if (v1 instanceof OffHeapCachedDeserializable) {
return checkOffHeapEquals((OffHeapCachedDeserializable)v1, v2);
} else if (v2 instanceof OffHeapCachedDeserializable) {
return checkOffHeapEquals((OffHeapCachedDeserializable)v2, v1);
} else if (v1 instanceof CachedDeserializable) {
return checkCDEquals((CachedDeserializable)v1, v2, isCompressedOffHeap);
} else if (v2 instanceof CachedDeserializable) {
return checkCDEquals((CachedDeserializable)v2, v1, isCompressedOffHeap);
} else {
return basicEquals(v1, v2);
}
}
private static boolean checkOffHeapEquals(@Unretained OffHeapCachedDeserializable cd, @Unretained Object obj) {
if (cd.isSerializedPdxInstance()) {
PdxInstance pi = InternalDataSerializer.readPdxInstance(cd.getSerializedValue(), GemFireCacheImpl.getForPdx("Could not check value equality"));
return checkPdxEquals(pi, obj);
}
if (obj instanceof OffHeapCachedDeserializable) {
return cd.checkDataEquals((OffHeapCachedDeserializable)obj);
} else {
byte[] serializedObj;
if (obj instanceof CachedDeserializable) {
if (!cd.isSerialized()) {
if (obj instanceof StoredObject && !((StoredObject) obj).isSerialized()) {
// both are byte[]
// obj must be DataAsAddress since it was not OffHeapCachedDeserializable
// so its byte[] will be small.
byte[] objBytes = (byte[]) ((StoredObject) obj).getDeserializedForReading();
return cd.checkDataEquals(objBytes);
} else {
return false;
}
}
serializedObj = ((CachedDeserializable) obj).getSerializedValue();
} else if (obj instanceof byte[]) {
if (cd.isSerialized()) {
return false;
}
serializedObj = (byte[]) obj;
} else {
if (!cd.isSerialized()) {
return false;
}
if (obj == null || obj == Token.NOT_AVAILABLE
|| Token.isInvalidOrRemoved(obj)) {
return false;
}
serializedObj = EntryEventImpl.serialize(obj);
}
return cd.checkDataEquals(serializedObj);
}
}
private static boolean checkCDEquals(CachedDeserializable cd, Object obj, boolean isCompressedOffHeap) {
if (cd instanceof StoredObject && !((StoredObject) cd).isSerialized()) {
// cd is an actual byte[].
byte[] ba2;
if (obj instanceof StoredObject) {
if (!((StoredObject) obj).isSerialized()) {
return false;
}
ba2 = (byte[]) ((StoredObject) obj).getDeserializedForReading();
} else if (obj instanceof byte[]) {
ba2 = (byte[]) obj;
} else {
return false;
}
byte[] ba1 = (byte[]) cd.getDeserializedForReading();
return Arrays.equals(ba1, ba2);
}
Object cdVal = cd.getValue();
if (cdVal instanceof byte[]) {
byte[] cdValBytes = (byte[])cdVal;
PdxInstance pi = InternalDataSerializer.readPdxInstance(cdValBytes, GemFireCacheImpl.getForPdx("Could not check value equality"));
if (pi != null) {
return checkPdxEquals(pi, obj);
}
if (isCompressedOffHeap) { // fix for bug 52248
byte[] serializedObj;
if (obj instanceof CachedDeserializable) {
serializedObj = ((CachedDeserializable) obj).getSerializedValue();
} else {
serializedObj = EntryEventImpl.serialize(obj);
}
return Arrays.equals(cdValBytes, serializedObj);
} else {
/**
* To be more compatible with previous releases do not compare the serialized forms here.
* Instead deserialize and call the equals method.
*/
Object deserializedObj;
if (obj instanceof CachedDeserializable) {
deserializedObj =((CachedDeserializable) obj).getDeserializedForReading();
} else {
if (obj == null || obj == Token.NOT_AVAILABLE
|| Token.isInvalidOrRemoved(obj)) {
return false;
}
// TODO OPTIMIZE: Before serializing all of obj we could get the top
// level class name of cdVal and compare it to the top level class name of obj.
deserializedObj = obj;
}
return basicEquals(deserializedObj, cd.getDeserializedForReading());
}
// boolean result = Arrays.equals((byte[])cdVal, serializedObj);
// if (!result) {
// try {
// Object o1 = BlobHelper.deserializeBlob((byte[])cdVal);
// Object o2 = BlobHelper.deserializeBlob(serializedObj);
// SimpleMemoryAllocatorImpl.debugLog("checkCDEquals o1=<" + o1 + "> o2=<" + o2 + ">", false);
// if (o1.equals(o2)) {
// SimpleMemoryAllocatorImpl.debugLog("they are equal! a1=<" + Arrays.toString((byte[])cdVal) + "> a2=<" + Arrays.toString(serializedObj) + ">", false);
// }
// } catch (IOException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// } catch (ClassNotFoundException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
// }
// return result;
} else {
// prefer object form
if (obj instanceof CachedDeserializable) {
// TODO OPTIMIZE: Before deserializing all of obj we could get the top
// class name of cdVal and the top level class name of obj and compare.
obj = ((CachedDeserializable) obj).getDeserializedForReading();
}
return basicEquals(cdVal, obj);
}
}
/**
* This method fixes bug 43643
*/
private static boolean checkPdxEquals(PdxInstance pdx, Object obj) {
if (!(obj instanceof PdxInstance)) {
// obj may be a CachedDeserializable in which case we want to convert it to a PdxInstance even if we are not readSerialized.
if (obj instanceof CachedDeserializable) {
if (obj instanceof StoredObject && !((StoredObject) obj).isSerialized()) {
// obj is actually a byte[] which will never be equal to a PdxInstance
return false;
}
Object cdVal = ((CachedDeserializable) obj).getValue();
if (cdVal instanceof byte[]) {
byte[] cdValBytes = (byte[]) cdVal;
PdxInstance pi = InternalDataSerializer.readPdxInstance(cdValBytes, GemFireCacheImpl.getForPdx("Could not check value equality"));
if (pi != null) {
return pi.equals(pdx);
} else {
// since obj is serialized as something other than pdx it must not equal our pdx
return false;
}
} else {
// remove the cd wrapper so that obj is the actual value we want to compare.
obj = cdVal;
}
}
if (obj.getClass().getName().equals(pdx.getClassName())) {
GemFireCacheImpl gfc = GemFireCacheImpl.getForPdx("Could not access Pdx registry");
if (gfc != null) {
PdxSerializer pdxSerializer;
if (obj instanceof PdxSerializable) {
pdxSerializer = null;
} else {
pdxSerializer = gfc.getPdxSerializer();
}
if (pdxSerializer != null || obj instanceof PdxSerializable) {
// try to convert obj to a PdxInstance
HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
try {
if (InternalDataSerializer.autoSerialized(obj, hdos) ||
InternalDataSerializer.writePdx(hdos, gfc, obj, pdxSerializer)) {
PdxInstance pi = InternalDataSerializer.readPdxInstance(hdos.toByteArray(), gfc);
if (pi != null) {
obj = pi;
}
}
} catch (IOException ignore) {
// we are not able to convert it so just fall through
} catch (PdxSerializationException ignore) {
// we are not able to convert it so just fall through
}
}
}
}
}
return basicEquals(obj, pdx);
}
/////////////////////////////////////////////////////////////
/////////////////////////// fields //////////////////////////
/////////////////////////////////////////////////////////////
// Do not add any instance fields to this class.
// Instead add them to LeafRegionEntry.cpp
public static class HashRegionEntryCreator implements
CustomEntryConcurrentHashMap.HashEntryCreator<Object, Object> {
public HashEntry<Object, Object> newEntry(final Object key, final int hash,
final HashEntry<Object, Object> next, final Object value) {
final AbstractRegionEntry entry = (AbstractRegionEntry)value;
// if hash is already set then assert that the two should be same
final int entryHash = entry.getEntryHash();
if (hash == 0 || entryHash != 0) {
if (entryHash != hash) {
Assert.fail("unexpected mismatch of hash, expected=" + hash
+ ", actual=" + entryHash + " for " + entry);
}
}
entry.setEntryHash(hash);
entry.setNextEntry(next);
return entry;
}
public int keyHashCode(final Object key, final boolean compareValues) {
return CustomEntryConcurrentHashMap.keyHash(key, compareValues);
}
};
public abstract Object getKey();
protected static boolean okToStoreOffHeap(Object v, AbstractRegionEntry e) {
if (v == null) return false;
if (Token.isInvalidOrRemoved(v)) return false;
if (v == Token.NOT_AVAILABLE) return false;
if (v instanceof DiskEntry.RecoveredEntry) return false; // The disk layer has special logic that ends up storing the nested value in the RecoveredEntry off heap
if (!(e instanceof OffHeapRegionEntry)) return false;
// TODO should we check for deltas here or is that a user error?
return true;
}
/**
* Default implementation. Override in subclasses with primitive keys
* to prevent creating an Object form of the key for each equality check.
*/
@Override
public boolean isKeyEqual(Object k) {
return k.equals(getKey());
}
private static final long LAST_MODIFIED_MASK = 0x00FFFFFFFFFFFFFFL;
protected final void _setLastModified(long lastModifiedTime) {
if (lastModifiedTime < 0 || lastModifiedTime > LAST_MODIFIED_MASK) {
throw new IllegalStateException("Expected lastModifiedTime " + lastModifiedTime + " to be >= 0 and <= " + LAST_MODIFIED_MASK);
}
long storedValue;
long newValue;
do {
storedValue = getlastModifiedField();
newValue = storedValue & ~LAST_MODIFIED_MASK;
newValue |= lastModifiedTime;
} while (!compareAndSetLastModifiedField(storedValue, newValue));
}
protected abstract long getlastModifiedField();
protected abstract boolean compareAndSetLastModifiedField(long expectedValue, long newValue);
public final long getLastModified() {
return getlastModifiedField() & LAST_MODIFIED_MASK;
}
protected final boolean areAnyBitsSet(long bitMask) {
return ( getlastModifiedField() & bitMask ) != 0L;
}
/**
* Any bits in "bitMask" that are 1 will be set.
*/
protected final void setBits(long bitMask) {
boolean done = false;
do {
long bits = getlastModifiedField();
long newBits = bits | bitMask;
if (bits == newBits) return;
done = compareAndSetLastModifiedField(bits, newBits);
} while(!done);
}
/**
* Any bits in "bitMask" that are 0 will be cleared.
*/
protected final void clearBits(long bitMask) {
boolean done = false;
do {
long bits = getlastModifiedField();
long newBits = bits & bitMask;
if (bits == newBits) return;
done = compareAndSetLastModifiedField(bits, newBits);
} while(!done);
}
@Override
@Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE)
public Object prepareValueForCache(RegionEntryContext r,
@Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) Object val,
boolean isEntryUpdate) {
return prepareValueForCache(r, val, null, isEntryUpdate);
}
@Override
@Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE)
public Object prepareValueForCache(RegionEntryContext r,
@Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) Object val,
EntryEventImpl event, boolean isEntryUpdate) {
if (r != null && r.getOffHeap() && okToStoreOffHeap(val, this)) {
if (val instanceof StoredObject) {
// Check to see if val has the same compression settings as this region.
// The recursive calls in this section are safe because
// we only do it after copy the off-heap value to the heap.
// This is needed to fix bug 52057.
StoredObject soVal = (StoredObject) val;
assert !soVal.isCompressed();
if (r.getCompressor() != null) {
// val is uncompressed and we need a compressed value.
// So copy the off-heap value to the heap in a form that can be compressed.
byte[] valAsBytes = soVal.getValueAsHeapByteArray();
Object heapValue;
if (soVal.isSerialized()) {
heapValue = CachedDeserializableFactory.create(valAsBytes);
} else {
heapValue = valAsBytes;
}
return prepareValueForCache(r, heapValue, event, isEntryUpdate);
}
if (val instanceof Chunk) {
// if the reused guy has a refcount then need to inc it
if (!((Chunk)val).retain()) {
throw new IllegalStateException("Could not use an off heap value because it was freed");
}
}
// else it is DataAsAddress. This code just returns it as prepared.
// TODO OFFHEAP: Review the callers to see if they will handle DataAsAddress correctly.
} else {
byte[] data;
boolean isSerialized = !(val instanceof byte[]);
if (isSerialized) {
if (event != null && event.getCachedSerializedNewValue() != null) {
data = event.getCachedSerializedNewValue();
} else if (val instanceof CachedDeserializable) {
data = ((CachedDeserializable)val).getSerializedValue();
// TODO OFFHEAP: cache data in event?
} else if (val instanceof PdxInstance) {
try {
data = ((ConvertableToBytes)val).toBytes();
// TODO OFFHEAP: cache data in event?
} catch (IOException e) {
throw new PdxSerializationException("Could not convert " + val + " to bytes", e);
}
} else {
data = EntryEventImpl.serialize(val);
// TODO OFFHEAP: cache data in event?
}
} else {
data = (byte[]) val;
}
byte[] compressedData = compressBytes(r, data);
boolean isCompressed = compressedData != data;
SimpleMemoryAllocatorImpl.setReferenceCountOwner(this);
MemoryAllocator ma = SimpleMemoryAllocatorImpl.getAllocator(); // fix for bug 47875
val = ma.allocateAndInitialize(compressedData, isSerialized, isCompressed, GemFireChunk.TYPE); // TODO:KIRK:48068 race happens right after this line
SimpleMemoryAllocatorImpl.setReferenceCountOwner(null);
if (val instanceof GemFireChunk) {
val = new com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.ChunkWithHeapForm((GemFireChunk)val, data);
}
// if (val instanceof Chunk && r instanceof LocalRegion) {
// Chunk c = (Chunk) val;
// LocalRegion lr = (LocalRegion) r;
// SimpleMemoryAllocatorImpl.debugLog("allocated @" + Long.toHexString(c.getMemoryAddress()) + " reg=" + lr.getFullPath(), false);
// }
}
return val;
}
@Unretained Object nv = val;
if (nv instanceof StoredObject) {
// This off heap value is being put into a on heap region.
byte[] data = ((StoredObject) nv).getSerializedValue();
nv = CachedDeserializableFactory.create(data);
}
// don't bother checking for SQLFire
if (!GemFireCacheImpl.sqlfSystem() && nv instanceof PdxInstanceImpl) {
// We do not want to put PDXs in the cache as values.
// So get the serialized bytes and use a CachedDeserializable.
try {
byte[] data = ((ConvertableToBytes)nv).toBytes();
byte[] compressedData = compressBytes(r, data);
if (data == compressedData) {
nv = CachedDeserializableFactory.create(data);
} else {
nv = compressedData;
}
} catch (IOException e) {
throw new PdxSerializationException("Could not convert " + nv + " to bytes", e);
}
} else {
nv = compress(r, nv, event);
}
return nv;
}
@Override
@Unretained
public final Object _getValue() {
return getValueField();
}
public final boolean isUpdateInProgress() {
return areAnyBitsSet(UPDATE_IN_PROGRESS);
}
public final void setUpdateInProgress(final boolean underUpdate) {
if (underUpdate) {
setBits(UPDATE_IN_PROGRESS);
} else {
clearBits(~UPDATE_IN_PROGRESS);
}
}
public final boolean isCacheListenerInvocationInProgress() {
return areAnyBitsSet(LISTENER_INVOCATION_IN_PROGRESS);
}
public final void setCacheListenerInvocationInProgress(final boolean listenerInvoked) {
if (listenerInvoked) {
setBits(LISTENER_INVOCATION_IN_PROGRESS);
} else {
clearBits(~LISTENER_INVOCATION_IN_PROGRESS);
}
}
@Override
public final boolean isInUseByTransaction() {
return areAnyBitsSet(IN_USE_BY_TX);
}
@Override
public final void setInUseByTransaction(final boolean v) {
if (v) {
setBits(IN_USE_BY_TX);
} else {
clearBits(~IN_USE_BY_TX);
}
}
@Override
public final synchronized void incRefCount() {
TXManagerImpl.incRefCount(this);
setInUseByTransaction(true);
}
/**
* {@inheritDoc}
*/
@Override
public final boolean isMarkedForEviction() {
return areAnyBitsSet(MARKED_FOR_EVICTION);
}
/**
* {@inheritDoc}
*/
@Override
public final void setMarkedForEviction() {
setBits(MARKED_FOR_EVICTION);
}
/**
* {@inheritDoc}
*/
@Override
public final void clearMarkedForEviction() {
clearBits(~MARKED_FOR_EVICTION);
}
@Override
public final synchronized void decRefCount(NewLRUClockHand lruList, LocalRegion lr) {
if (TXManagerImpl.decRefCount(this)) {
if (isInUseByTransaction()) {
setInUseByTransaction(false);
if (lruList != null) {
// No more transactions, place in lru list
lruList.appendEntry((LRUClockNode)this);
}
if (lr != null && lr.isEntryExpiryPossible()) {
lr.addExpiryTaskIfAbsent(this);
}
}
}
}
@Override
public final synchronized void resetRefCount(NewLRUClockHand lruList) {
if (isInUseByTransaction()) {
setInUseByTransaction(false);
if (lruList != null) {
lruList.appendEntry((LRUClockNode)this);
}
}
}
/**
* soubhik: this method is overridden in sqlf flavor of entries.
* Instead of overriding this method; override areSetValue.
*/
protected final void _setValue(Object val) {
setValueField(val);
}
@Override
public Token getValueAsToken() {
Object v = getValueField();
if (v == null || v instanceof Token) {
return (Token)v;
} else {
return Token.NOT_A_TOKEN;
}
}
/**
* Reads the value of this region entry.
* Provides low level access to the value field.
* @return possible OFF_HEAP_OBJECT (caller uses region entry reference)
*/
@Unretained
protected abstract Object getValueField();
/**
* Set the value of this region entry.
* Provides low level access to the value field.
* @param v the new value to set
*/
protected abstract void setValueField(@Unretained Object v);
@Retained
public Object getTransformedValue() {
return _getValueRetain(null, false);
}
public final boolean getValueWasResultOfSearch() {
return areAnyBitsSet(VALUE_RESULT_OF_SEARCH);
}
public final void setValueResultOfSearch(boolean v) {
if (v) {
setBits(VALUE_RESULT_OF_SEARCH);
} else {
clearBits(~VALUE_RESULT_OF_SEARCH);
}
}
public boolean hasValidVersion() {
VersionStamp stamp = (VersionStamp)this;
boolean has = stamp.getRegionVersion() != 0 || stamp.getEntryVersion() != 0;
return has;
}
public boolean hasStats() {
// override this in implementations that have stats
return false;
}
/**
* @see HashEntry#getMapValue()
*/
public final Object getMapValue() {
return this;
}
/**
* @see HashEntry#setMapValue(Object)
*/
public final void setMapValue(final Object newValue) {
if (this != newValue) {
Assert.fail("AbstractRegionEntry#setMapValue: unexpected setMapValue "
+ "with newValue=" + newValue + ", this=" + this);
}
}
protected abstract void setEntryHash(int v);
@Override
public final String toString() {
final StringBuilder sb = new StringBuilder(this.getClass().getSimpleName())
.append('@').append(Integer.toHexString(System.identityHashCode(this)))
.append(" (");
return appendFieldsToString(sb).append(')').toString();
}
protected StringBuilder appendFieldsToString(final StringBuilder sb) {
sb.append("key=").append(getKey()).append("; rawValue=")
.append(_getValue()); // OFFHEAP _getValue ok: the current toString on OffHeapCachedDeserializable is safe to use without incing refcount.
VersionStamp stamp = getVersionStamp();
if (stamp != null) {
sb.append("; version=").append(stamp.asVersionTag()+";member="+stamp.getMemberID());
}
return sb;
}
/*
* (non-Javadoc)
* This generates version tags for outgoing messages for all subclasses
* supporting concurrency versioning. It also sets the entry's version
* stamp to the tag's values.
*
* @see com.gemstone.gemfire.internal.cache.RegionEntry#generateVersionTag(com.gemstone.gemfire.distributed.DistributedMember, boolean)
*/
public VersionTag generateVersionTag(VersionSource mbr, boolean withDelta, LocalRegion region, EntryEventImpl event) {
VersionStamp stamp = this.getVersionStamp();
if (stamp != null && region.getServerProxy() == null) { // clients do not generate versions
int v = stamp.getEntryVersion()+1;
if (v > 0xFFFFFF) {
v -= 0x1000000; // roll-over
}
VersionSource previous = stamp.getMemberID();
//For non persistent regions, we allow the member to be null and
//when we send a message and the remote side can determine the member
//from the sender. For persistent regions, we need to send
//the persistent id to the remote side.
//
//TODO - RVV - optimize the way we send the persistent id to save
//space.
if(mbr == null) {
VersionSource regionMember = region.getVersionMember();
if(regionMember instanceof DiskStoreID) {
mbr = regionMember;
}
}
VersionTag tag = VersionTag.create(mbr);
tag.setEntryVersion(v);
if (region.getVersionVector() != null) {
// Use region version if already provided, else generate
long nextRegionVersion = event.getNextRegionVersion();
if (nextRegionVersion != -1) {
// Set on the tag and record it locally
tag.setRegionVersion(nextRegionVersion);
RegionVersionVector rvv = region.getVersionVector();
rvv.recordVersion(rvv.getOwnerId(),nextRegionVersion);
if (logger.isDebugEnabled()) {
logger.debug("recorded region version {}; region={}", nextRegionVersion, region.getFullPath());
}
} else {
tag.setRegionVersion(region.getVersionVector().getNextVersion());
}
}
if (withDelta) {
tag.setPreviousMemberID(previous);
}
VersionTag remoteTag = event.getVersionTag();
if (remoteTag != null && remoteTag.isGatewayTag()) {
// if this event was received from a gateway we use the remote system's
// timestamp and dsid.
tag.setVersionTimeStamp(remoteTag.getVersionTimeStamp());
tag.setDistributedSystemId(remoteTag.getDistributedSystemId());
tag.setAllowedByResolver(remoteTag.isAllowedByResolver());
} else {
long time = region.cacheTimeMillis();
int dsid = region.getDistributionManager().getDistributedSystemId();
// a locally generated change should always have a later timestamp than
// one received from a wan gateway, so fake a timestamp if necessary
if (time <= stamp.getVersionTimeStamp() && dsid != tag.getDistributedSystemId()) {
time = stamp.getVersionTimeStamp() + 1;
}
tag.setVersionTimeStamp(time);
tag.setDistributedSystemId(dsid);
}
stamp.setVersions(tag);
stamp.setMemberID(mbr);
event.setVersionTag(tag);
if (logger.isDebugEnabled()) {
logger.debug("generated tag {}; key={}; oldvalue={} newvalue={} client={} region={}; rvv={}", tag,
event.getKey(), event.getOldValue(), event.getNewValue(),
(event.getContext() == null? "none" : event.getContext().getDistributedMember().getName()),
region.getFullPath(), region.getVersionVector());
}
return tag;
}
return null;
}
/** set/unset the flag noting that a tombstone has been scheduled for this entry */
public void setTombstoneScheduled(boolean scheduled) {
if (scheduled) {
setBits(TOMBSTONE_SCHEDULED);
} else {
clearBits(~TOMBSTONE_SCHEDULED);
}
}
/**
* return the flag noting whether a tombstone has been scheduled for this entry. This should
* be called under synchronization on the region entry if you want an accurate result.
*/
public boolean isTombstoneScheduled() {
return areAnyBitsSet(TOMBSTONE_SCHEDULED);
}
/*
* (non-Javadoc)
* This performs a concurrency check.
*
* This check compares the version number first, followed by the member ID.
*
* Wraparound of the version number is detected and handled by extending the
* range of versions by one bit.
*
* The normal membership ID comparison method is used.<p>
*
* Note that a tag from a remote (WAN) system may be in the event. If this
* is the case this method will either invoke a user plugin that allows/disallows
* the event (and may modify the value) or it determines whether to allow
* or disallow the event based on timestamps and distributedSystemIDs.
*
* @throws ConcurrentCacheModificationException if the event conflicts with
* an event that has already been applied to the entry.
*
* @see com.gemstone.gemfire.internal.cache.RegionEntry#concurrencyCheck(com.gemstone.gemfire.cache.EntryEvent)
*/
public void processVersionTag(EntryEvent cacheEvent) {
processVersionTag(cacheEvent, true);
}
protected void processVersionTag(EntryEvent cacheEvent, boolean conflictCheck) {
EntryEventImpl event = (EntryEventImpl)cacheEvent;
VersionTag tag = event.getVersionTag();
if (tag == null) {
return;
}
try {
if (tag.isGatewayTag()) {
// this may throw ConcurrentCacheModificationException or modify the event
if (processGatewayTag(cacheEvent)) {
return;
}
assert false : "processGatewayTag failure - returned false";
}
if (!tag.isFromOtherMember()) {
if (!event.getOperation().isNetSearch()) {
// except for netsearch, all locally-generated tags can be ignored
return;
}
}
final InternalDistributedMember originator = (InternalDistributedMember)event.getDistributedMember();
final VersionSource dmId = event.getRegion().getVersionMember();
LocalRegion r = event.getLocalRegion();
boolean eventHasDelta = event.getDeltaBytes() != null && event.getRawNewValue() == null;
VersionStamp stamp = getVersionStamp();
// bug #46223, an event received from a peer or a server may be from a different
// distributed system than the last modification made to this entry so we must
// perform a gateway conflict check
if (stamp != null && !tag.isAllowedByResolver()) {
int stampDsId = stamp.getDistributedSystemId();
int tagDsId = tag.getDistributedSystemId();
if (stampDsId != 0 && stampDsId != tagDsId && stampDsId != -1) {
StringBuilder verbose = null;
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
verbose = new StringBuilder();
verbose.append("processing tag for key " + getKey() + ", stamp=" + stamp.asVersionTag() + ", tag=").append(tag);
}
long stampTime = stamp.getVersionTimeStamp();
long tagTime = tag.getVersionTimeStamp();
if (stampTime > 0 && (tagTime > stampTime
|| (tagTime == stampTime && tag.getDistributedSystemId() >= stamp.getDistributedSystemId()))) {
if (verbose != null) {
verbose.append(" - allowing event");
logger.trace(LogMarker.TOMBSTONE, verbose);
}
// Update the stamp with event's version information.
applyVersionTag(r, stamp, tag, originator);
return;
}
if (stampTime > 0) {
if (verbose != null) {
verbose.append(" - disallowing event");
logger.trace(LogMarker.TOMBSTONE, verbose);
}
r.getCachePerfStats().incConflatedEventsCount();
persistConflictingTag(r, tag);
throw new ConcurrentCacheModificationException("conflicting event detected");
}
}
}
if (r.getVersionVector() != null &&
r.getServerProxy() == null &&
(r.getDataPolicy().withPersistence() ||
!r.getScope().isLocal())) { // bug #45258 - perf degradation for local regions and RVV
VersionSource who = tag.getMemberID();
if (who == null) {
who = originator;
}
r.getVersionVector().recordVersion(who, tag);
}
assert !tag.isFromOtherMember() || tag.getMemberID() != null : "remote tag is missing memberID";
// [bruce] for a long time I had conflict checks turned off in clients when
// receiving a response from a server and applying it to the cache. This lowered
// the CPU cost of versioning but eventually had to be pulled for bug #45453
// if (r.getServerProxy() != null && conflictCheck) {
// // events coming from servers while a local sync is held on the entry
// // do not require a conflict check. Conflict checks were already
// // performed on the server and here we just consume whatever was sent back.
// // Event.isFromServer() returns true for client-update messages and
// // for putAll/getAll, which do not hold syncs during the server operation.
// conflictCheck = event.isFromServer();
// }
// else
// [bruce] for a very long time we had conflict checks turned off for PR buckets.
// Bug 45669 showed a primary dying in the middle of distribution. This caused
// one backup bucket to have a v2. The other bucket was promoted to primary and
// generated a conflicting v2. We need to do the check so that if this second
// v2 loses to the original one in the delta-GII operation that the original v2
// will be the winner in both buckets.
// if (r.isUsedForPartitionedRegionBucket()) {
// conflictCheck = false; // primary/secondary model
// }
// The new value in event is not from GII, even it could be tombstone
basicProcessVersionTag(r, tag, false, eventHasDelta, dmId, originator, conflictCheck);
} catch (ConcurrentCacheModificationException ex) {
event.isConcurrencyConflict(true);
throw ex;
}
}
protected final void basicProcessVersionTag(LocalRegion region, VersionTag tag, boolean isTombstoneFromGII,
boolean deltaCheck, VersionSource dmId, InternalDistributedMember sender, boolean checkForConflict) {
StringBuilder verbose = null;
if (tag != null) {
VersionStamp stamp = getVersionStamp();
if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
VersionTag stampTag = stamp.asVersionTag();
if (stampTag.hasValidVersion() && checkForConflict) { // only be verbose here if there's a possibility we might reject the operation
verbose = new StringBuilder();
verbose.append("processing tag for key " + getKey() + ", stamp=" + stamp.asVersionTag() + ", tag=").append(tag)
.append(", checkForConflict=").append(checkForConflict); //.append(", current value=").append(_getValue());
}
}
if (stamp == null) {
throw new IllegalStateException("message contained a version tag but this region has no version storage");
}
boolean apply = true;
try {
if (checkForConflict) {
apply = checkForConflict(region, stamp, tag, isTombstoneFromGII, deltaCheck, dmId, sender, verbose);
}
} catch (ConcurrentCacheModificationException e) {
// Even if we don't apply the operation we should always retain the
// highest timestamp in order for WAN conflict checks to work correctly
// because the operation may have been sent to other systems and been
// applied there
if (!tag.isGatewayTag()
&& stamp.getDistributedSystemId() == tag.getDistributedSystemId()
&& tag.getVersionTimeStamp() > stamp.getVersionTimeStamp()) {
stamp.setVersionTimeStamp(tag.getVersionTimeStamp());
tag.setTimeStampApplied(true);
if (verbose != null) {
verbose.append("\nThough in conflict the tag timestamp was more recent and was recorded.");
}
}
throw e;
} finally {
if (verbose != null) {
logger.trace(LogMarker.TOMBSTONE, verbose);
}
}
if (apply) {
applyVersionTag(region, stamp, tag, sender);
}
}
}
private void applyVersionTag(LocalRegion region, VersionStamp stamp, VersionTag tag, InternalDistributedMember sender) {
// stamp.setPreviousMemberID(stamp.getMemberID());
VersionSource mbr = tag.getMemberID();
if (mbr == null) {
mbr = sender;
}
mbr = region.getVersionVector().getCanonicalId(mbr);
tag.setMemberID(mbr);
stamp.setVersions(tag);
if (tag.hasPreviousMemberID()) {
if (tag.getPreviousMemberID() == null) {
tag.setPreviousMemberID(stamp.getMemberID());
} else {
tag.setPreviousMemberID(region.getVersionVector().getCanonicalId(
tag.getPreviousMemberID()));
}
}
}
/** perform conflict checking for a stamp/tag */
protected boolean checkForConflict(LocalRegion region,
VersionStamp stamp, VersionTag tag,
boolean isTombstoneFromGII,
boolean deltaCheck, VersionSource dmId,
InternalDistributedMember sender, StringBuilder verbose) {
int stampVersion = stamp.getEntryVersion();
int tagVersion = tag.getEntryVersion();
boolean throwex = false;
boolean apply = false;
if (stamp.getVersionTimeStamp() != 0) { // new entries have no timestamp
// check for wrap-around on the version number
long difference = tagVersion - stampVersion;
if (0x10000 < difference || difference < -0x10000) {
if (verbose != null) {
verbose.append("\nversion rollover detected: tag="+tagVersion + " stamp=" + stampVersion);
}
if (difference < 0) {
tagVersion += 0x1000000L;
} else {
stampVersion += 0x1000000L;
}
}
}
if (verbose != null) {
verbose.append("\nstamp=v").append(stampVersion)
.append(" tag=v").append(tagVersion);
}
if (deltaCheck) {
checkForDeltaConflict(region, stampVersion, tagVersion, stamp, tag, dmId, sender, verbose);
}
if (stampVersion == 0 || stampVersion < tagVersion) {
if (verbose != null) { verbose.append(" - applying change"); }
apply = true;
} else if (stampVersion > tagVersion) {
if (overwritingOldTombstone(region, stamp, tag, verbose) && tag.getVersionTimeStamp() > stamp.getVersionTimeStamp()) {
apply = true;
} else {
// check for an incoming expired tombstone from an initial image chunk.
if (tagVersion > 0
&& isExpiredTombstone(region, tag.getVersionTimeStamp(), isTombstoneFromGII)
&& tag.getVersionTimeStamp() > stamp.getVersionTimeStamp()) {
// A special case to apply: when remote entry is expired tombstone, then let local vs remote with newer timestamp to win
if (verbose != null) { verbose.append(" - applying change in Delta GII"); }
apply = true;
} else {
if (verbose != null) { verbose.append(" - disallowing"); }
throwex= true;
}
}
} else {
if (overwritingOldTombstone(region, stamp, tag, verbose)) {
apply = true;
} else {
// compare member IDs
VersionSource stampID = stamp.getMemberID();
if (stampID == null) {
stampID = dmId;
}
VersionSource tagID = tag.getMemberID();
if (tagID == null) {
tagID = sender;
}
if (verbose != null) { verbose.append("\ncomparing IDs"); }
int compare = stampID.compareTo(tagID);
if (compare < 0) {
if (verbose != null) { verbose.append(" - applying change"); }
apply = true;
} else if (compare > 0) {
if (verbose != null) { verbose.append(" - disallowing"); }
throwex = true;
} else if (tag.isPosDup()) {
if (verbose != null) { verbose.append(" - disallowing duplicate marked with posdup"); }
throwex = true;
} else /* if (isTombstoneFromGII && isTombstone()) {
if (verbose != null) { verbose.append(" - disallowing duplicate tombstone from GII"); }
return false; // bug #49601 don't schedule tombstones from GII if there's already one here
} else */ {
if (verbose != null) { verbose.append(" - allowing duplicate"); }
}
}
}
if (!apply && throwex) {
region.getCachePerfStats().incConflatedEventsCount();
persistConflictingTag(region, tag);
throw new ConcurrentCacheModificationException();
}
return apply;
}
private boolean isExpiredTombstone(LocalRegion region, long timestamp, boolean isTombstone) {
return isTombstone && (timestamp + TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT) <= region.cacheTimeMillis();
}
private boolean overwritingOldTombstone(LocalRegion region, VersionStamp stamp, VersionTag tag, StringBuilder verbose) {
// Tombstone GC does not use locking to stop operations when old tombstones
// are being removed. Because of this we might get an operation that was applied
// in another VM that has just reaped a tombstone and is now using a reset
// entry version number. Because of this we check the timestamp on the current
// local entry and see if it is old enough to have expired. If this is the case
// we accept the change and allow the tag to be recorded
long stampTime = stamp.getVersionTimeStamp();
if (isExpiredTombstone(region, stampTime, this.isTombstone())) {
// no local change since the tombstone would have timed out - accept the change
if (verbose != null) { verbose.append(" - accepting because local timestamp is old"); }
return true;
} else {
return false;
}
}
protected void persistConflictingTag(LocalRegion region, VersionTag tag) {
// only persist region needs to persist conflict tag
}
/**
* for an event containing a delta we must check to see if the tag's
* previous member id is the stamp's member id and ensure that the
* version is only incremented by 1. Otherwise the delta is being
* applied to a value that does not match the source of the delta.
*
* @throws InvalidDeltaException
*/
private void checkForDeltaConflict(LocalRegion region,
long stampVersion, long tagVersion,
VersionStamp stamp, VersionTag tag,
VersionSource dmId, InternalDistributedMember sender,
StringBuilder verbose) {
if (tagVersion != stampVersion+1) {
if (verbose != null) {
verbose.append("\ndelta requires full value due to version mismatch");
}
region.getCachePerfStats().incDeltaFailedUpdates();
throw new InvalidDeltaException("delta cannot be applied due to version mismatch");
} else {
// make sure the tag was based on the value in this entry by checking the
// tag's previous-changer ID against this stamp's current ID
VersionSource stampID = stamp.getMemberID();
if (stampID == null) {
stampID = dmId;
}
VersionSource tagID = tag.getPreviousMemberID();
if (tagID == null) {
tagID = sender;
}
if (!tagID.equals(stampID)) {
if (verbose != null) {
verbose.append("\ndelta requires full value. tag.previous=")
.append(tagID).append(" but stamp.current=").append(stampID);
}
region.getCachePerfStats().incDeltaFailedUpdates();
throw new InvalidDeltaException("delta cannot be applied due to version ID mismatch");
}
}
}
private boolean processGatewayTag(EntryEvent cacheEvent) {
// Gateway tags are installed in the server-side LocalRegion cache
// modification methods. They do not have version numbers or distributed
// member IDs. Instead they only have timestamps and distributed system IDs.
// If there is a resolver plug-in, invoke it. Otherwise we use the timestamps and
// distributed system IDs to determine whether to allow the event to proceed.
final boolean isDebugEnabled = logger.isDebugEnabled();
if (this.isRemoved() && !this.isTombstone()) {
return true; // no conflict on a new entry
}
EntryEventImpl event = (EntryEventImpl)cacheEvent;
VersionTag tag = event.getVersionTag();
long stampTime = getVersionStamp().getVersionTimeStamp();
long tagTime = tag.getVersionTimeStamp();
int stampDsid = getVersionStamp().getDistributedSystemId();
int tagDsid = tag.getDistributedSystemId();
if (isDebugEnabled) {
logger.debug("processing gateway version information for {}. Stamp dsid={} time={} Tag dsid={} time={}",
event.getKey(), stampDsid, stampTime, tagDsid, tagTime);
}
if (tagTime == VersionTag.ILLEGAL_VERSION_TIMESTAMP) {
return true; // no timestamp received from other system - just apply it
}
if (tagDsid == stampDsid || stampDsid == -1) {
return true;
}
GatewayConflictResolver resolver = event.getRegion().getCache().getGatewayConflictResolver();
if (resolver != null) {
if (isDebugEnabled) {
logger.debug("invoking gateway conflict resolver");
}
final boolean[] disallow = new boolean[1];
final Object[] newValue = new Object[] { this };
GatewayConflictHelper helper = new GatewayConflictHelper() {
@Override
public void disallowEvent() {
disallow[0] = true;
}
@Override
public void changeEventValue(Object v) {
newValue[0] = v;
}
};
TimestampedEntryEventImpl timestampedEvent =
(TimestampedEntryEventImpl)event.getTimestampedEvent(tagDsid, stampDsid, tagTime, stampTime);
// gateway conflict resolvers will usually want to see the old value
if (!timestampedEvent.hasOldValue() && isRemoved()) {
timestampedEvent.setOldValue(getValue(timestampedEvent.getRegion())); // OFFHEAP: since isRemoved I think getValue will never be stored off heap in this case
}
Throwable thr = null;
try {
resolver.onEvent(timestampedEvent, helper);
}
catch (CancelException cancelled) {
throw cancelled;
}
catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
}
catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
logger.error(LocalizedMessage.create(LocalizedStrings.LocalRegion_EXCEPTION_OCCURRED_IN_CONFLICTRESOLVER), t);
thr = t;
} finally {
timestampedEvent.release();
}
if (isDebugEnabled) {
logger.debug("done invoking resolver {}", thr);
}
if (thr == null) {
if (disallow[0]) {
if (isDebugEnabled) {
logger.debug("conflict resolver rejected the event for {}", event.getKey());
}
throw new ConcurrentCacheModificationException("WAN conflict resolver rejected the operation");
}
tag.setAllowedByResolver(true);
if (newValue[0] != this) {
if (isDebugEnabled) {
logger.debug("conflict resolver changed the value of the event for {}", event.getKey());
}
// the resolver changed the event value!
event.setNewValue(newValue[0]);
}
// if nothing was done then we allow the event
if (isDebugEnabled) {
logger.debug("change was allowed by conflict resolver: {}", tag);
}
return true;
}
}
if (isDebugEnabled) {
logger.debug("performing normal WAN conflict check");
}
if (tagTime > stampTime
|| (tagTime == stampTime && tagDsid >= stampDsid)) {
if (isDebugEnabled) {
logger.debug("allowing event");
}
return true;
}
if (isDebugEnabled) {
logger.debug("disallowing event for " + event.getKey());
}
throw new ConcurrentCacheModificationException("conflicting WAN event detected");
}
static boolean isCompressible(RegionEntryContext context,Object value) {
return ((value != null) && (context != null) && (context.getCompressor() != null) && !Token.isInvalidOrRemoved(value));
}
/* subclasses supporting versions must override this */
public VersionStamp getVersionStamp() {
return null;
}
public boolean isValueNull() {
return (null == getValueAsToken());
}
public boolean isInvalid() {
return Token.isInvalid(getValueAsToken());
}
public boolean isDestroyed() {
return Token.isDestroyed(getValueAsToken());
}
public void setValueToNull() {
_setValue(null);
}
public boolean isInvalidOrRemoved() {
return Token.isInvalidOrRemoved(getValueAsToken());
}
/**
* Maximum size of a string that can be encoded as char.
*/
public static final int MAX_INLINE_STRING_KEY_CHAR_ENCODING = 7;
/**
* Maximum size of a string that can be encoded as byte.
*/
public static final int MAX_INLINE_STRING_KEY_BYTE_ENCODING = 15;
/**
* This is only retained in off-heap subclasses. However, it's marked as
* Retained here so that callers are aware that the value may be retained.
*/
@Override
@Retained
public Object _getValueRetain(RegionEntryContext context, boolean decompress) {
if (decompress) {
return decompress(context, _getValue());
} else {
return _getValue();
}
}
@Override
public void returnToPool() {
// noop by default
}
}