| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.entries; |
| |
| import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_FILL_IN_VALUE; |
| import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE; |
| |
| import java.io.IOException; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelException; |
| import org.apache.geode.InvalidDeltaException; |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.cache.CacheWriterException; |
| import org.apache.geode.cache.EntryEvent; |
| import org.apache.geode.cache.EntryNotFoundException; |
| import org.apache.geode.cache.Operation; |
| import org.apache.geode.cache.TimeoutException; |
| import org.apache.geode.cache.query.IndexMaintenanceException; |
| import org.apache.geode.cache.query.QueryException; |
| import org.apache.geode.cache.query.internal.index.IndexManager; |
| import org.apache.geode.cache.query.internal.index.IndexProtocol; |
| import org.apache.geode.cache.util.GatewayConflictHelper; |
| import org.apache.geode.cache.util.GatewayConflictResolver; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.HeapDataOutputStream; |
| import org.apache.geode.internal.InternalStatisticsDisabledException; |
| import org.apache.geode.internal.cache.CachedDeserializable; |
| import org.apache.geode.internal.cache.CachedDeserializableFactory; |
| import org.apache.geode.internal.cache.DistributedRegion; |
| import org.apache.geode.internal.cache.EntryEventImpl; |
| import org.apache.geode.internal.cache.FilterProfile; |
| import org.apache.geode.internal.cache.ImageState; |
| import org.apache.geode.internal.cache.InitialImageOperation.Entry; |
| import org.apache.geode.internal.cache.InternalCacheEvent; |
| import org.apache.geode.internal.cache.InternalRegion; |
| import org.apache.geode.internal.cache.RegionClearedException; |
| import org.apache.geode.internal.cache.RegionEntryContext; |
| import org.apache.geode.internal.cache.RegionQueue; |
| import org.apache.geode.internal.cache.TXManagerImpl; |
| import org.apache.geode.internal.cache.TimestampedEntryEventImpl; |
| import org.apache.geode.internal.cache.Token; |
| import org.apache.geode.internal.cache.TombstoneService; |
| import org.apache.geode.internal.cache.ValueComparisonHelper; |
| import org.apache.geode.internal.cache.eviction.EvictionList; |
| import org.apache.geode.internal.cache.persistence.DiskRecoveryStore; |
| import org.apache.geode.internal.cache.persistence.DiskStoreID; |
| import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException; |
| import org.apache.geode.internal.cache.versions.RegionVersionVector; |
| import org.apache.geode.internal.cache.versions.VersionSource; |
| import org.apache.geode.internal.cache.versions.VersionStamp; |
| import org.apache.geode.internal.cache.versions.VersionTag; |
| import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; |
| import org.apache.geode.internal.lang.StringUtils; |
| import org.apache.geode.internal.logging.log4j.LogMarker; |
| import org.apache.geode.internal.offheap.MemoryAllocator; |
| import org.apache.geode.internal.offheap.MemoryAllocatorImpl; |
| import org.apache.geode.internal.offheap.OffHeapHelper; |
| import org.apache.geode.internal.offheap.ReferenceCountHelper; |
| import org.apache.geode.internal.offheap.Releasable; |
| import org.apache.geode.internal.offheap.StoredObject; |
| import org.apache.geode.internal.offheap.annotations.Released; |
| import org.apache.geode.internal.offheap.annotations.Retained; |
| import org.apache.geode.internal.offheap.annotations.Unretained; |
| import org.apache.geode.internal.serialization.ByteArrayDataInput; |
| import org.apache.geode.internal.serialization.Version; |
| import org.apache.geode.internal.util.BlobHelper; |
| import org.apache.geode.internal.util.Versionable; |
| import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap; |
| import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap.HashEntry; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| import org.apache.geode.pdx.PdxInstance; |
| import org.apache.geode.pdx.PdxSerializationException; |
| import org.apache.geode.pdx.internal.ConvertableToBytes; |
| import org.apache.geode.pdx.internal.PdxInstanceImpl; |
| |
| /** |
| * Abstract implementation class of RegionEntry interface. This is the topmost implementation class |
| * so common behavior lives here. |
| * |
| * @since GemFire 3.5.1 |
| */ |
| public abstract class AbstractRegionEntry implements HashRegionEntry<Object, Object> { |
| private static final Logger logger = LogService.getLogger(); |
| |
| /** |
| * Whether to disable last access time update when a put occurs. The default is false (enable last |
| * access time update on put). To disable it, set the 'gemfire.disableAccessTimeUpdateOnPut' |
| * system property. |
| */ |
| protected static final boolean DISABLE_ACCESS_TIME_UPDATE_ON_PUT = |
| Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "disableAccessTimeUpdateOnPut"); |
| |
| /* |
| * Flags for a Region Entry. These flags are stored in the msb of the long used to also store the |
| * lastModificationTime. |
| */ |
| private static final long VALUE_RESULT_OF_SEARCH = 0x01L << 56; |
| |
| private static final long UPDATE_IN_PROGRESS = 0x02L << 56; |
| |
| private static final long LISTENER_INVOCATION_IN_PROGRESS = 0x08L << 56; |
| |
| /** used for LRUEntry instances. */ |
| protected static final long RECENTLY_USED = 0x10L << 56; |
| |
| /** used for LRUEntry instances. */ |
| protected static final long EVICTED = 0x20L << 56; |
| |
| /** |
| * Set if the entry is being used by a transactions. Some features (eviction and expiration) will |
| * not modify an entry when a tx is using it to prevent the tx to fail do to conflict. |
| */ |
| private static final long IN_USE_BY_TX = 0x40L << 56; |
| |
| protected AbstractRegionEntry(RegionEntryContext context, |
| @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) Object value) { |
| |
| setValue(context, prepareValueForCache(context, value, false), false); |
| |
| // setLastModified(System.currentTimeMillis()); this must be set later so we can use ==0 |
| // to know this is a new entry in checkForConflicts |
| } |
| |
| @Override |
| @SuppressWarnings("IMSE_DONT_CATCH_IMSE") |
| public boolean dispatchListenerEvents(final EntryEventImpl event) throws InterruptedException { |
| final InternalRegion rgn = event.getRegion(); |
| |
| if (event.callbacksInvoked()) { |
| return true; |
| } |
| |
| // don't wait for certain events to reach the head of the queue before |
| // dispatching listeners. However, we must not notify the gateways for |
| // remote-origin ops out of order. Otherwise the other systems will have |
| // inconsistent content. |
| |
| event.setCallbacksInvokedByCurrentThread(); |
| |
| if (logger.isDebugEnabled() && !rgn.isInternalRegion()) { |
| logger.debug("{} dispatching event {}", this, event); |
| } |
| // All the following code that sets "thr" is to workaround |
| // spurious IllegalMonitorStateExceptions caused by JVM bugs. |
| try { |
| // call invokeCallbacks while synced on RegionEntry |
| event.invokeCallbacks(rgn, event.inhibitCacheListenerNotification(), false); |
| return true; |
| |
| } finally { |
| if (isRemoved() && !isTombstone() && !event.isEvicted()) { |
| // Phase 2 of region entry removal is done here. The first phase is done |
| // by the RegionMap. It is unclear why this code is needed. ARM destroy |
| // does this also and we are now doing it as phase3 of the ARM destroy. |
| removePhase2(); |
| ((DiskRecoveryStore) rgn).getRegionMap().removeEntry(event.getKey(), this, true, event, |
| rgn); |
| } |
| } |
| } |
| |
| @Override |
| public long getLastAccessed() throws InternalStatisticsDisabledException { |
| throw new InternalStatisticsDisabledException(); |
| } |
| |
| @Override |
| public long getHitCount() throws InternalStatisticsDisabledException { |
| throw new InternalStatisticsDisabledException(); |
| } |
| |
| @Override |
| public long getMissCount() throws InternalStatisticsDisabledException { |
| throw new InternalStatisticsDisabledException(); |
| } |
| |
| /** |
| * This sets the lastModified time for the entry. In subclasses with statistics it will also set |
| * the lastAccessed time unless the system property gemfire.disableAccessTimeUpdateOnPut is set to |
| * true. |
| * |
| * @param lastModified the time of last modification of the entry |
| */ |
| public void setLastModified(long lastModified) { |
| setLastModifiedAndAccessedTimes(lastModified, lastModified); |
| } |
| |
| /** |
| * This sets the lastModified and lastAccessed time for the entry. Subclasses that do not keep |
| * track of lastAccessed time will ignore the second parameter. |
| * |
| * @param lastModified the time of last modification of the entry |
| * @param lastAccessed the time the entry was last accessed |
| */ |
| protected void setLastModifiedAndAccessedTimes(long lastModified, long lastAccessed) { |
| _setLastModified(lastModified); |
| } |
| |
| @Override |
| public void txDidDestroy(long currentTime) { |
| setLastModifiedAndAccessedTimes(currentTime, currentTime); |
| } |
| |
| @Override |
| public void updateStatsForPut(long lastModifiedTime, long lastAccessedTime) { |
| setLastModifiedAndAccessedTimes(lastModifiedTime, lastAccessedTime); |
| } |
| |
| @Override |
| public void setRecentlyUsed(RegionEntryContext context) { |
| // do nothing by default; only needed for LRU |
| } |
| |
| @Override |
| public void updateStatsForGet(boolean hit, long time) { |
| // nothing needed |
| } |
| |
| @Override |
| public void resetCounts() throws InternalStatisticsDisabledException { |
| throw new InternalStatisticsDisabledException(); |
| } |
| |
| void _removePhase1() { |
| _setValue(Token.REMOVED_PHASE1); |
| } |
| |
| @Override |
| public void removePhase1(InternalRegion region, boolean clear) throws RegionClearedException { |
| _removePhase1(); |
| } |
| |
| @Override |
| public void removePhase2() { |
| _setValue(Token.REMOVED_PHASE2); |
| } |
| |
| @Override |
| public void makeTombstone(InternalRegion region, VersionTag version) |
| throws RegionClearedException { |
| assert region.getVersionVector() != null; |
| assert version != null; |
| |
| boolean wasTombstone = isTombstone(); |
| setRecentlyUsed(region); |
| boolean newEntry = getValueAsToken() == Token.REMOVED_PHASE1; |
| basicMakeTombstone(region); |
| region.scheduleTombstone(this, version, wasTombstone); |
| if (newEntry) { |
| // bug #46631 - entry count is decremented by scheduleTombstone but this is a new entry |
| region.getCachePerfStats().incEntryCount(1); |
| } |
| } |
| |
| private void basicMakeTombstone(InternalRegion region) throws RegionClearedException { |
| boolean setValueCompleted = false; |
| try { |
| setValue(region, Token.TOMBSTONE); |
| setValueCompleted = true; |
| } finally { |
| if (!setValueCompleted && isTombstone()) { |
| removePhase2(); |
| } |
| } |
| } |
| |
| @Override |
| public void setValueWithTombstoneCheck(@Unretained Object v, EntryEvent e) |
| throws RegionClearedException { |
| if (v == Token.TOMBSTONE) { |
| makeTombstone((InternalRegion) e.getRegion(), ((InternalCacheEvent) e).getVersionTag()); |
| } else { |
| setValue((RegionEntryContext) e.getRegion(), v, (EntryEventImpl) e); |
| } |
| } |
| |
| /** |
| * Return true if the object is removed. |
| * |
| * TODO this method does NOT return true if the object is Token.DESTROYED. dispatchListenerEvents |
| * relies on that fact to avoid removing destroyed tokens from the map. We should refactor so that |
| * this method calls Token.isRemoved, and places that don't want a destroyed Token can explicitly |
| * check for a DESTROY token. |
| */ |
| @Override |
| public boolean isRemoved() { |
| Token o = getValueAsToken(); |
| return o == Token.REMOVED_PHASE1 || o == Token.REMOVED_PHASE2 || o == Token.TOMBSTONE; |
| } |
| |
| @Override |
| public boolean isDestroyedOrRemoved() { |
| return Token.isRemoved(getValueAsToken()); |
| } |
| |
| @Override |
| public boolean isDestroyedOrRemovedButNotTombstone() { |
| Token o = getValueAsToken(); |
| return o == Token.DESTROYED || o == Token.REMOVED_PHASE1 || o == Token.REMOVED_PHASE2; |
| } |
| |
| @Override |
| public boolean isTombstone() { |
| return getValueAsToken() == Token.TOMBSTONE; |
| } |
| |
| @Override |
| public boolean isRemovedPhase2() { |
| return getValueAsToken() == Token.REMOVED_PHASE2; |
| } |
| |
| @Override |
| public boolean fillInValue(InternalRegion region, |
| @Retained(ABSTRACT_REGION_ENTRY_FILL_IN_VALUE) Entry entry, ByteArrayDataInput in, |
| DistributionManager mgr, final Version version) { |
| |
| // starting default value |
| entry.setSerialized(false); |
| |
| @Retained(ABSTRACT_REGION_ENTRY_FILL_IN_VALUE) |
| final Object v; |
| if (isTombstone()) { |
| v = Token.TOMBSTONE; |
| } else { |
| // OFFHEAP: need to incrc, copy bytes, decrc |
| v = getValue(region); |
| if (v == null) { |
| return false; |
| } |
| } |
| |
| entry.setLastModified(mgr, getLastModified()); // fix for bug 31059 |
| if (v == Token.INVALID) { |
| entry.setInvalid(); |
| } else if (v == Token.LOCAL_INVALID) { |
| entry.setLocalInvalid(); |
| } else if (v == Token.TOMBSTONE) { |
| entry.setTombstone(); |
| } else if (v instanceof CachedDeserializable) { |
| // don't serialize here if it is not already serialized |
| CachedDeserializable cd = (CachedDeserializable) v; |
| if (!cd.isSerialized()) { |
| entry.setValue(cd.getDeserializedForReading()); |
| } else { |
| Object tmp = cd.getValue(); |
| if (tmp instanceof byte[]) { |
| entry.setValue(tmp); |
| } else { |
| try { |
| HeapDataOutputStream hdos = new HeapDataOutputStream(version); |
| BlobHelper.serializeTo(tmp, hdos); |
| hdos.trim(); |
| entry.setValue(hdos); |
| } catch (IOException e) { |
| throw new IllegalArgumentException( |
| "An IOException was thrown while serializing.", |
| e); |
| } |
| } |
| entry.setSerialized(true); |
| } |
| } else if (v instanceof byte[]) { |
| entry.setValue(v); |
| } else { |
| Object preparedValue = v; |
| if (preparedValue != null) { |
| preparedValue = prepareValueForGII(preparedValue); |
| if (preparedValue == null) { |
| return false; |
| } |
| } |
| try { |
| HeapDataOutputStream hdos = new HeapDataOutputStream(version); |
| BlobHelper.serializeTo(preparedValue, hdos); |
| hdos.trim(); |
| entry.setValue(hdos); |
| entry.setSerialized(true); |
| } catch (IOException e) { |
| throw new IllegalArgumentException( |
| "An IOException was thrown while serializing.", |
| e); |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * To fix bug 49901 if v is a GatewaySenderEventImpl then make a heap copy of it if it is offheap. |
| * |
| * @return the value to provide to the gii request; null if no value should be provided. |
| */ |
| static Object prepareValueForGII(Object v) { |
| assert v != null; |
| if (v instanceof GatewaySenderEventImpl) { |
| return ((GatewaySenderEventImpl) v).makeHeapCopyIfOffHeap(); |
| } else { |
| return v; |
| } |
| } |
| |
| @Override |
| public boolean isOverflowedToDisk(InternalRegion region, |
| DistributedRegion.DiskPosition diskPosition) { |
| return false; |
| } |
| |
| @Override |
| public Object getValue(RegionEntryContext context) { |
| ReferenceCountHelper.createReferenceCountOwner(); |
| @Retained |
| Object result = getValueRetain(context, true); |
| |
| // If the thread is an Index Creation Thread & the value obtained is |
| // Token.REMOVED , we can skip synchronization block. This is required to prevent |
| // the dead lock caused if an Index Update Thread has gone into a wait holding the |
| // lock of the Entry object. There should not be an issue if the Index creation thread |
| // gets the temporary value of token.REMOVED as the correct value will get indexed |
| // by the Index Update Thread , once the index creation thread has exited. |
| // Part of Bugfix # 33336 |
| |
| if (Token.isRemoved(result)) { |
| ReferenceCountHelper.setReferenceCountOwner(null); |
| return null; |
| } else { |
| result = OffHeapHelper.copyAndReleaseIfNeeded(result, context.getCache()); |
| ReferenceCountHelper.setReferenceCountOwner(null); |
| setRecentlyUsed(context); |
| return result; |
| } |
| } |
| |
| @Override |
| @Retained |
| public Object getValueRetain(RegionEntryContext context) { |
| @Retained |
| Object result = getValueRetain(context, true); |
| if (Token.isRemoved(result)) { |
| return null; |
| } else { |
| setRecentlyUsed(context); |
| return result; |
| } |
| } |
| |
| @Override |
| @Released |
| public void setValue(RegionEntryContext context, @Unretained Object value) |
| throws RegionClearedException { |
| // TODO: This will mark new entries as being recently used |
| // It might be better to only mark them when they are modified. |
| // Or should we only mark them on reads? |
| setValue(context, value, true); |
| } |
| |
| @Override |
| public void setValue(RegionEntryContext context, Object value, EntryEventImpl event) |
| throws RegionClearedException { |
| setValue(context, value); |
| } |
| |
| @Released |
| protected void setValue(RegionEntryContext context, @Unretained Object value, |
| boolean recentlyUsed) { |
| _setValue(value); |
| releaseOffHeapRefIfRegionBeingClosedOrDestroyed(context, value); |
| if (recentlyUsed) { |
| setRecentlyUsed(context); |
| } |
| } |
| |
| void releaseOffHeapRefIfRegionBeingClosedOrDestroyed(RegionEntryContext context, Object ref) { |
| if (isOffHeapReference(ref) && isThisRegionBeingClosedOrDestroyed(context)) { |
| ((Releasable) this).release(); |
| } |
| } |
| |
| private boolean isThisRegionBeingClosedOrDestroyed(RegionEntryContext context) { |
| return context instanceof InternalRegion |
| && ((InternalRegion) context).isThisRegionBeingClosedOrDestroyed(); |
| } |
| |
| private boolean isOffHeapReference(Object ref) { |
| return ref != Token.REMOVED_PHASE1 && this instanceof OffHeapRegionEntry |
| && ref instanceof StoredObject && ((StoredObject) ref).hasRefCount(); |
| } |
| |
| /** |
| * This method determines if the value is in a compressed representation and decompresses it if it |
| * is. |
| * |
| * @param context the values context. |
| * @param value a region entry value. |
| * |
| * @return the decompressed form of the value parameter. |
| */ |
| static Object decompress(RegionEntryContext context, Object value) { |
| if (isCompressible(context, value)) { |
| long time = context.getCachePerfStats().startDecompression(); |
| value = EntryEventImpl.deserialize(context.getCompressor().decompress((byte[]) value)); |
| context.getCachePerfStats().endDecompression(time); |
| } |
| |
| return value; |
| } |
| |
| protected static Object compress(RegionEntryContext context, Object value) { |
| return compress(context, value, null); |
| } |
| |
| /** |
| * This method determines if the value is compressible and compresses it if it is. |
| * |
| * @param context the values context. |
| * @param value a region entry value. |
| * |
| * @return the compressed form of the value parameter. |
| */ |
| protected static Object compress(RegionEntryContext context, Object value, EntryEventImpl event) { |
| if (isCompressible(context, value)) { |
| long time = context.getCachePerfStats().startCompression(); |
| byte[] serializedValue; |
| if (event != null && event.getCachedSerializedNewValue() != null) { |
| serializedValue = event.getCachedSerializedNewValue(); |
| if (value instanceof CachedDeserializable) { |
| CachedDeserializable cd = (CachedDeserializable) value; |
| if (!(cd.getValue() instanceof byte[])) { |
| // The cd now has the object form so use the cached serialized form in a new cd. |
| // This serialization is much cheaper than reserializing the object form. |
| serializedValue = EntryEventImpl |
| .serialize(CachedDeserializableFactory.create(serializedValue, context.getCache())); |
| } else { |
| serializedValue = EntryEventImpl.serialize(cd); |
| } |
| } |
| } else { |
| serializedValue = EntryEventImpl.serialize(value); |
| if (event != null && !(value instanceof byte[])) { |
| // See if we can cache the serialized new value in the event. |
| // If value is a byte[] then we don't cache it since it is not serialized. |
| if (value instanceof CachedDeserializable) { |
| // For a CacheDeserializable we want to only cache the wrapped value; |
| // not the serialized CacheDeserializable. |
| CachedDeserializable cd = (CachedDeserializable) value; |
| Object cdVal = cd.getValue(); |
| if (cdVal instanceof byte[]) { |
| event.setCachedSerializedNewValue((byte[]) cdVal); |
| } |
| } else { |
| event.setCachedSerializedNewValue(serializedValue); |
| } |
| } |
| } |
| value = context.getCompressor().compress(serializedValue); |
| context.getCachePerfStats().endCompression(time, serializedValue.length, |
| ((byte[]) value).length); |
| } |
| |
| return value; |
| } |
| |
| private static byte[] compressBytes(RegionEntryContext context, byte[] uncompressedBytes) { |
| byte[] result = uncompressedBytes; |
| if (isCompressible(context, uncompressedBytes)) { |
| long time = context.getCachePerfStats().startCompression(); |
| result = context.getCompressor().compress(uncompressedBytes); |
| context.getCachePerfStats().endCompression(time, uncompressedBytes.length, result.length); |
| } |
| return result; |
| } |
| |
| |
| @Override |
| public Object getValueInVM(RegionEntryContext context) { |
| ReferenceCountHelper.createReferenceCountOwner(); |
| @Released |
| Object v = getValueRetain(context, true); |
| |
| if (v == null) { |
| // should only be possible if disk entry |
| v = Token.NOT_AVAILABLE; |
| } |
| Object result = OffHeapHelper.copyAndReleaseIfNeeded(v, context.getCache()); |
| ReferenceCountHelper.setReferenceCountOwner(null); |
| return result; |
| } |
| |
| @Override |
| public Object getValueInVMOrDiskWithoutFaultIn(InternalRegion region) { |
| return getValueInVM(region); |
| } |
| |
| @Override |
| @Retained |
| public Object getValueOffHeapOrDiskWithoutFaultIn(InternalRegion region) { |
| @Retained |
| Object result = getValueRetain(region, true); |
| return result; |
| } |
| |
| @Override |
| public Object getValueOnDisk(InternalRegion region) throws EntryNotFoundException { |
| throw new IllegalStateException( |
| "Cannot get value on disk for a region that does not access the disk."); |
| } |
| |
| @Override |
| public Object getSerializedValueOnDisk(final InternalRegion region) |
| throws EntryNotFoundException { |
| throw new IllegalStateException( |
| "Cannot get value on disk for a region that does not access the disk."); |
| } |
| |
| @Override |
| public Object getValueOnDiskOrBuffer(InternalRegion region) throws EntryNotFoundException { |
| throw new IllegalStateException( |
| "Cannot get value on disk for a region that does not access the disk."); |
| // TODO: if value is Token.REMOVED || Token.DESTROYED throw EntryNotFoundException |
| } |
| |
| @Override |
| public boolean initialImagePut(final InternalRegion region, final long lastModified, |
| Object newValue, boolean wasRecovered, boolean acceptedVersionTag) |
| throws RegionClearedException { |
| // note that the caller has already write synced this RegionEntry |
| return initialImageInit(region, lastModified, newValue, this.isTombstone(), wasRecovered, |
| acceptedVersionTag); |
| } |
| |
| @Override |
| public boolean initialImageInit(final InternalRegion region, final long lastModified, |
| final Object newValue, final boolean create, final boolean wasRecovered, |
| final boolean acceptedVersionTag) throws RegionClearedException { |
| |
| // note that the caller has already write synced this RegionEntry |
| boolean result = false; |
| |
| // if it has been destroyed then don't do anything |
| Token vTok = getValueAsToken(); |
| if (acceptedVersionTag || create || (vTok != Token.DESTROYED || vTok != Token.TOMBSTONE)) { |
| // OFFHEAP noop |
| Object newValueToWrite = newValue; |
| // OFFHEAP noop |
| boolean putValue = acceptedVersionTag || create || (newValueToWrite != Token.LOCAL_INVALID |
| && (wasRecovered || (vTok == Token.LOCAL_INVALID))); |
| |
| if (region.isUsedForPartitionedRegionAdmin() |
| && newValueToWrite instanceof CachedDeserializable) { |
| // Special case for partitioned region meta data |
| // We do not need the RegionEntry on this case. |
| // Because the pr meta data region will not have an LRU. |
| newValueToWrite = |
| ((CachedDeserializable) newValueToWrite).getDeserializedValue(region, null); |
| if (!create && newValueToWrite instanceof Versionable) { |
| // Heap value should always be deserialized at this point // OFFHEAP will not be |
| // deserialized |
| final Object oldValue = getValueInVM(region); |
| // BUGFIX for 35029. If oldValue is null the newValue should be put. |
| if (oldValue == null) { |
| putValue = true; |
| } else if (oldValue instanceof Versionable) { |
| Versionable nv = (Versionable) newValueToWrite; |
| Versionable ov = (Versionable) oldValue; |
| putValue = nv.isNewerThan(ov); |
| } |
| } |
| } |
| |
| if (putValue) { |
| // change to INVALID if region itself has been invalidated, |
| // and current value is recovered |
| if (create || acceptedVersionTag) { |
| // At this point, since we now always recover from disk first, |
| // we only care about "isCreate" since "isRecovered" is impossible |
| // if we had a regionInvalidate or regionClear |
| ImageState imageState = region.getImageState(); |
| // this method is called during loadSnapshot as well as getInitialImage |
| if (imageState.getRegionInvalidated()) { |
| if (newValueToWrite != Token.TOMBSTONE) { |
| newValueToWrite = Token.INVALID; |
| } |
| } else if (imageState.getClearRegionFlag()) { |
| boolean entryOK = false; |
| RegionVersionVector rvv = imageState.getClearRegionVersionVector(); |
| if (rvv != null) { // a filtered clear |
| VersionSource id = getVersionStamp().getMemberID(); |
| if (id == null) { |
| id = region.getVersionMember(); |
| } |
| if (!rvv.contains(id, getVersionStamp().getRegionVersion())) { |
| entryOK = true; |
| } |
| } |
| if (!entryOK) { |
| // If the region has been issued cleared during |
| // the GII , then those entries loaded before this one would have |
| // been cleared from the Map due to clear operation & for the |
| // currententry whose key may have escaped the clearance , will be |
| // cleansed by the destroy token. |
| newValueToWrite = Token.DESTROYED; // TODO: never used |
| imageState.addDestroyedEntry(this.getKey()); |
| throw new RegionClearedException( |
| "During the GII put of entry, the region got cleared so aborting the operation"); |
| } |
| } |
| } |
| setValue(region, this.prepareValueForCache(region, newValueToWrite, false)); |
| result = true; |
| |
| if (newValueToWrite != Token.TOMBSTONE) { |
| if (create) { |
| region.getCachePerfStats().incCreates(); |
| } |
| region.updateStatsForPut(this, lastModified, false); |
| } |
| |
| if (logger.isTraceEnabled()) { |
| if (newValueToWrite instanceof CachedDeserializable) { |
| logger.trace("ProcessChunk: region={}; put a CachedDeserializable ({},{})", |
| region.getFullPath(), getKey(), |
| ((CachedDeserializable) newValueToWrite).getStringForm()); |
| } else { |
| logger.trace("ProcessChunk: region={}; put({},{})", region.getFullPath(), getKey(), |
| StringUtils.forceToString(newValueToWrite)); |
| } |
| } |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * @throws EntryNotFoundException if expectedOldValue is not null and is not equal to current |
| * value |
| */ |
| @Override |
| @Released |
| public boolean destroy(InternalRegion region, EntryEventImpl event, boolean inTokenMode, |
| boolean cacheWrite, @Unretained Object expectedOldValue, boolean forceDestroy, |
| boolean removeRecoveredEntry) throws CacheWriterException, EntryNotFoundException, |
| TimeoutException, RegionClearedException { |
| |
| // A design decision was made to not retrieve the old value from the disk |
| // if the entry has been evicted to only have the CacheListener afterDestroy |
| // method ignore it. We don't want to pay the performance penalty. The |
| // getValueInVM method does not retrieve the value from disk if it has been |
| // evicted. Instead, it uses the NotAvailable token. |
| // |
| // If the region is a WAN queue region, the old value is actually used by the |
| // afterDestroy callback on a secondary. It is not needed on a primary. |
| // Since the destroy that sets WAN_QUEUE_TOKEN always originates on the primary |
| // we only pay attention to WAN_QUEUE_TOKEN if the event is originRemote. |
| // |
| // We also read old value from disk or buffer |
| // in the case where there is a non-null expectedOldValue |
| // see PartitionedRegion#remove(Object key, Object value) |
| ReferenceCountHelper.skipRefCountTracking(); |
| @Retained |
| @Released |
| Object curValue = getValueRetain(region, true); |
| ReferenceCountHelper.unskipRefCountTracking(); |
| boolean proceed; |
| try { |
| if (curValue == null) { |
| curValue = Token.NOT_AVAILABLE; |
| } |
| |
| if (curValue == Token.NOT_AVAILABLE) { |
| // In some cases we need to get the current value off of disk. |
| |
| // if the event is transmitted during GII and has an old value, it was |
| // the state of the transmitting cache's entry & should be used here |
| if (event.getCallbackArgument() != null |
| && event.getCallbackArgument().equals(RegionQueue.WAN_QUEUE_TOKEN) |
| && event.isOriginRemote()) { // check originRemote for bug 40508 |
| // curValue = getValue(region); can cause deadlock if GII is occurring |
| curValue = getValueOnDiskOrBuffer(region); |
| } else { |
| FilterProfile fp = region.getFilterProfile(); |
| if (fp != null && (fp.getCqCount() > 0 || expectedOldValue != null)) { |
| // curValue = getValue(region); can cause deadlock will fault in the value |
| // and will confuse LRU. |
| curValue = getValueOnDiskOrBuffer(region); |
| } |
| } |
| } |
| |
| if (expectedOldValue != null) { |
| if (!checkExpectedOldValue(expectedOldValue, curValue, region)) { |
| throw new EntryNotFoundException( |
| "The current value was not equal to expected value."); |
| } |
| } |
| |
| if (inTokenMode && event.hasOldValue()) { |
| proceed = true; |
| } else { |
| event.setOldValue(curValue, curValue instanceof GatewaySenderEventImpl); |
| proceed = region.getConcurrencyChecksEnabled() || removeRecoveredEntry || forceDestroy |
| || destroyShouldProceedBasedOnCurrentValue(curValue) |
| || (event.getOperation() == Operation.REMOVE && (curValue == null |
| || curValue == Token.LOCAL_INVALID || curValue == Token.INVALID)); |
| } |
| } finally { |
| OffHeapHelper.releaseWithNoTracking(curValue); |
| } |
| |
| if (proceed) { |
| // Generate the version tag if needed. This method should only be |
| // called if we are in fact going to destroy the entry, so it must be |
| // after the entry not found exception above. |
| if (!removeRecoveredEntry) { |
| region.generateAndSetVersionTag(event, this); |
| } |
| if (cacheWrite) { |
| region.cacheWriteBeforeDestroy(event, expectedOldValue); |
| if (event.getRegion().getServerProxy() != null) { |
| // server will return a version tag |
| // update version information (may throw ConcurrentCacheModificationException) |
| VersionStamp stamp = getVersionStamp(); |
| if (stamp != null) { |
| stamp.processVersionTag(event); |
| } |
| } |
| } |
| region.recordEvent(event); |
| // don't do index maintenance on a destroy if the value in the |
| // RegionEntry (the old value) is invalid |
| updateIndexOnDestroyOperation(region); |
| |
| boolean removeEntry = false; |
| VersionTag v = event.getVersionTag(); |
| if (region.getConcurrencyChecksEnabled() && !removeRecoveredEntry |
| && !event.isFromRILocalDestroy()) { |
| // bug #46780, don't retain tombstones for entries destroyed for register-interest |
| // Destroy will write a tombstone instead |
| if (v == null || !v.hasValidVersion()) { |
| // localDestroy and eviction and ops received with no version tag |
| // should create a tombstone using the existing version stamp, as should |
| // (bug #45245) responses from servers that do not have valid version information |
| VersionStamp stamp = this.getVersionStamp(); |
| if (stamp != null) { // proxy has no stamps |
| v = stamp.asVersionTag(); |
| event.setVersionTag(v); |
| } |
| } |
| removeEntry = v == null || !v.hasValidVersion(); |
| } else { |
| removeEntry = true; |
| } |
| |
| if (removeEntry) { |
| boolean isThisTombstone = isTombstone(); |
| if (inTokenMode && !event.getOperation().isEviction()) { |
| setValue(region, Token.DESTROYED); |
| } else { |
| removePhase1(region, false); |
| } |
| if (isThisTombstone) { |
| region.unscheduleTombstone(this); |
| } |
| } else { |
| makeTombstone(region, v); |
| } |
| |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| protected void updateIndexOnDestroyOperation(InternalRegion region) { |
| if (!isTombstone() && !region.isProxy() && !isInvalid()) { |
| IndexManager indexManager = region.getIndexManager(); |
| if (indexManager != null) { |
| try { |
| if (isValueNull()) { |
| @Released |
| Object value = getValueOffHeapOrDiskWithoutFaultIn(region); |
| try { |
| Object preparedValue = prepareValueForCache(region, value, false); |
| _setValue(preparedValue); |
| releaseOffHeapRefIfRegionBeingClosedOrDestroyed(region, preparedValue); |
| } finally { |
| OffHeapHelper.release(value); |
| } |
| } |
| indexManager.updateIndexes(this, IndexManager.REMOVE_ENTRY, IndexProtocol.OTHER_OP); |
| } catch (QueryException e) { |
| throw new IndexMaintenanceException(e); |
| } |
| } |
| } |
| } |
| |
| private static boolean destroyShouldProceedBasedOnCurrentValue(Object curValue) { |
| if (curValue == null) { |
| return false; |
| } |
| if (Token.isRemoved(curValue)) { |
| return false; |
| } |
| return true; |
| } |
| |
| public static boolean checkExpectedOldValue(@Unretained Object expectedOldValue, |
| @Unretained Object actualValue, InternalRegion region) { |
| |
| if (Token.isInvalid(expectedOldValue)) { |
| return actualValue == null || Token.isInvalid(actualValue); |
| } else { |
| boolean isCompressedOffHeap = |
| region.getAttributes().getOffHeap() && region.getAttributes().getCompressor() != null; |
| return ValueComparisonHelper |
| .checkEquals(expectedOldValue, actualValue, isCompressedOffHeap, region.getCache()); |
| } |
| } |
| |
| // Do not add any instance fields to this class. |
| // Instead add them to LeafRegionEntry.cpp |
| |
| public static class HashRegionEntryCreator |
| implements CustomEntryConcurrentHashMap.HashEntryCreator<Object, Object> { |
| |
| @Override |
| public HashEntry<Object, Object> newEntry(final Object key, final int hash, |
| final HashEntry<Object, Object> next, final Object value) { |
| final AbstractRegionEntry entry = (AbstractRegionEntry) value; |
| // if hash is already set then assert that the two should be same |
| final int entryHash = entry.getEntryHash(); |
| if (hash == 0 || entryHash != 0) { |
| if (entryHash != hash) { |
| Assert.fail("unexpected mismatch of hash, expected=" + hash + ", actual=" + entryHash |
| + " for " + entry); |
| } |
| } |
| entry.setEntryHash(hash); |
| entry.setNextEntry(next); |
| return entry; |
| } |
| |
| @Override |
| public int keyHashCode(final Object key, final boolean compareValues) { |
| return CustomEntryConcurrentHashMap.keyHash(key, compareValues); |
| } |
| } |
| |
| @Override |
| public abstract Object getKey(); |
| |
| private static boolean okToStoreOffHeap(Object v, AbstractRegionEntry e) { |
| if (v == null) |
| return false; |
| if (Token.isInvalidOrRemoved(v)) |
| return false; |
| if (v == Token.NOT_AVAILABLE) |
| return false; |
| if (v instanceof DiskEntry.RecoveredEntry) |
| return false; // The disk layer has special logic that ends up storing the nested value in the |
| // RecoveredEntry off heap |
| if (!(e instanceof OffHeapRegionEntry)) |
| return false; |
| // TODO should we check for deltas here or is that a user error? |
| return true; |
| } |
| |
| /** |
| * Default implementation. Override in subclasses with primitive keys to prevent creating an |
| * Object form of the key for each equality check. |
| */ |
| @Override |
| public boolean isKeyEqual(Object k) { |
| return k.equals(getKey()); |
| } |
| |
| private static final long LAST_MODIFIED_MASK = 0x00FFFFFFFFFFFFFFL; |
| |
| protected void _setLastModified(long lastModifiedTime) { |
| if (lastModifiedTime < 0 || lastModifiedTime > LAST_MODIFIED_MASK) { |
| throw new IllegalStateException("Expected lastModifiedTime " + lastModifiedTime |
| + " to be >= 0 and <= " + LAST_MODIFIED_MASK); |
| } |
| long storedValue; |
| long newValue; |
| do { |
| storedValue = getLastModifiedField(); |
| newValue = storedValue & ~LAST_MODIFIED_MASK; |
| newValue |= lastModifiedTime; |
| } while (!compareAndSetLastModifiedField(storedValue, newValue)); |
| } |
| |
| protected abstract long getLastModifiedField(); |
| |
| protected abstract boolean compareAndSetLastModifiedField(long expectedValue, long newValue); |
| |
| @Override |
| public long getLastModified() { |
| return getLastModifiedField() & LAST_MODIFIED_MASK; |
| } |
| |
| protected boolean areAnyBitsSet(long bitMask) { |
| return (getLastModifiedField() & bitMask) != 0L; |
| } |
| |
| /** |
| * Any bits in "bitMask" that are 1 will be set. |
| */ |
| protected void setBits(long bitMask) { |
| boolean done; |
| do { |
| long bits = getLastModifiedField(); |
| long newBits = bits | bitMask; |
| if (bits == newBits) |
| return; |
| done = compareAndSetLastModifiedField(bits, newBits); |
| } while (!done); |
| } |
| |
| /** |
| * Any bits in "bitMask" that are 0 will be cleared. |
| */ |
| protected void clearBits(long bitMask) { |
| boolean done; |
| do { |
| long bits = getLastModifiedField(); |
| long newBits = bits & bitMask; |
| if (bits == newBits) |
| return; |
| done = compareAndSetLastModifiedField(bits, newBits); |
| } while (!done); |
| } |
| |
| @Override |
| @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) |
| public Object prepareValueForCache(RegionEntryContext r, |
| @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) Object val, boolean isEntryUpdate) { |
| return prepareValueForCache(r, val, null, isEntryUpdate); |
| } |
| |
| @Override |
| @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) |
| public Object prepareValueForCache(RegionEntryContext r, |
| @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) Object val, EntryEventImpl event, |
| boolean isEntryUpdate) { |
| if (r != null && r.getOffHeap() && okToStoreOffHeap(val, this)) { |
| if (val instanceof StoredObject) { |
| // Check to see if val has the same compression settings as this region. |
| // The recursive calls in this section are safe because |
| // we only do it after copy the off-heap value to the heap. |
| // This is needed to fix bug 52057. |
| StoredObject soVal = (StoredObject) val; |
| assert !soVal.isCompressed(); |
| if (r.getCompressor() != null) { |
| // val is uncompressed and we need a compressed value. |
| // So copy the off-heap value to the heap in a form that can be compressed. |
| byte[] valAsBytes = soVal.getValueAsHeapByteArray(); |
| Object heapValue; |
| if (soVal.isSerialized()) { |
| heapValue = CachedDeserializableFactory.create(valAsBytes, r.getCache()); |
| } else { |
| heapValue = valAsBytes; |
| } |
| return prepareValueForCache(r, heapValue, event, isEntryUpdate); |
| } |
| if (soVal.hasRefCount()) { |
| // if the reused StoredObject has a refcount then need to increment it |
| if (!soVal.retain()) { |
| throw new IllegalStateException("Could not use an off heap value because it was freed"); |
| } |
| } |
| // else it is has no refCount so just return it as prepared. |
| } else { |
| byte[] data; |
| boolean isSerialized = !(val instanceof byte[]); |
| if (isSerialized) { |
| if (event != null && event.getCachedSerializedNewValue() != null) { |
| data = event.getCachedSerializedNewValue(); |
| } else { |
| if (val instanceof CachedDeserializable) { |
| data = ((CachedDeserializable) val).getSerializedValue(); |
| } else if (val instanceof PdxInstance) { |
| try { |
| data = ((ConvertableToBytes) val).toBytes(); |
| } catch (IOException e) { |
| throw new PdxSerializationException("Could not convert " + val + " to bytes", e); |
| } |
| } else { |
| data = EntryEventImpl.serialize(val); |
| } |
| if (event != null) { |
| event.setCachedSerializedNewValue(data); |
| } |
| } |
| } else { |
| data = (byte[]) val; |
| } |
| byte[] compressedData = compressBytes(r, data); |
| // TODO: array comparison is broken |
| boolean isCompressed = compressedData != data; |
| ReferenceCountHelper.setReferenceCountOwner(this); |
| MemoryAllocator ma = MemoryAllocatorImpl.getAllocator(); // fix for bug 47875 |
| val = ma.allocateAndInitialize(compressedData, isSerialized, isCompressed, data); |
| ReferenceCountHelper.setReferenceCountOwner(null); |
| } |
| return val; |
| } |
| @Unretained |
| Object nv = val; |
| if (nv instanceof StoredObject) { |
| // This off heap value is being put into a on heap region. |
| byte[] data = ((StoredObject) nv).getSerializedValue(); |
| nv = CachedDeserializableFactory.create(data, r.getCache()); |
| } |
| if (nv instanceof PdxInstanceImpl) { |
| // We do not want to put PDXs in the cache as values. |
| // So get the serialized bytes and use a CachedDeserializable. |
| try { |
| byte[] data = ((ConvertableToBytes) nv).toBytes(); |
| byte[] compressedData = compressBytes(r, data); |
| // TODO: array comparison is broken |
| if (data == compressedData) { |
| nv = CachedDeserializableFactory.create(data, r.getCache()); |
| } else { |
| nv = compressedData; |
| } |
| } catch (IOException e) { |
| throw new PdxSerializationException("Could not convert " + nv + " to bytes", e); |
| } |
| } else { |
| nv = compress(r, nv, event); |
| } |
| return nv; |
| } |
| |
| @Override |
| @Unretained |
| public Object getValue() { |
| return getValueField(); |
| } |
| |
| @Override |
| public boolean isUpdateInProgress() { |
| return areAnyBitsSet(UPDATE_IN_PROGRESS); |
| } |
| |
| @Override |
| public void setUpdateInProgress(final boolean underUpdate) { |
| if (underUpdate) { |
| setBits(UPDATE_IN_PROGRESS); |
| } else { |
| clearBits(~UPDATE_IN_PROGRESS); |
| } |
| } |
| |
| @Override |
| public boolean isCacheListenerInvocationInProgress() { |
| return areAnyBitsSet(LISTENER_INVOCATION_IN_PROGRESS); |
| } |
| |
| @Override |
| public void setCacheListenerInvocationInProgress(final boolean isListenerInvoked) { |
| if (isListenerInvoked) { |
| setBits(LISTENER_INVOCATION_IN_PROGRESS); |
| } else { |
| clearBits(~LISTENER_INVOCATION_IN_PROGRESS); |
| } |
| } |
| |
| @Override |
| public synchronized boolean isInUseByTransaction() { |
| return areAnyBitsSet(IN_USE_BY_TX); |
| } |
| |
| private void setInUseByTransaction(final boolean v) { |
| if (v) { |
| setBits(IN_USE_BY_TX); |
| } else { |
| clearBits(~IN_USE_BY_TX); |
| } |
| } |
| |
| @Override |
| public synchronized void incRefCount() { |
| TXManagerImpl.incRefCount(this); |
| setInUseByTransaction(true); |
| } |
| |
| @Override |
| public synchronized void decRefCount(EvictionList evictionList, InternalRegion region) { |
| if (TXManagerImpl.decRefCount(this)) { |
| if (isInUseByTransaction()) { |
| setInUseByTransaction(false); |
| if (!isDestroyedOrRemoved()) { |
| appendToEvictionList(evictionList); |
| if (region != null && region.isEntryExpiryPossible()) { |
| region.addExpiryTaskIfAbsent(this); |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| public synchronized void resetRefCount(EvictionList evictionList) { |
| if (isInUseByTransaction()) { |
| setInUseByTransaction(false); |
| appendToEvictionList(evictionList); |
| } |
| } |
| |
| protected void appendToEvictionList(EvictionList evictionList) { |
| // nothing |
| } |
| |
| void _setValue(Object val) { |
| setValueField(val); |
| } |
| |
| @Override |
| public Token getValueAsToken() { |
| Object v = getValueField(); |
| if (v == null || v instanceof Token) { |
| return (Token) v; |
| } else { |
| return Token.NOT_A_TOKEN; |
| } |
| } |
| |
| /** |
| * Reads the value of this region entry. Provides low level access to the value field. |
| * |
| * @return possible OFF_HEAP_OBJECT (caller uses region entry reference) |
| */ |
| @Unretained |
| protected abstract Object getValueField(); |
| |
| /** |
| * Set the value of this region entry. Provides low level access to the value field. |
| * |
| * @param v the new value to set |
| */ |
| protected abstract void setValueField(@Unretained Object v); |
| |
| @Override |
| @Retained |
| public Object getTransformedValue() { |
| return getValueRetain(null, false); |
| } |
| |
| @Override |
| public boolean getValueWasResultOfSearch() { |
| return areAnyBitsSet(VALUE_RESULT_OF_SEARCH); |
| } |
| |
| @Override |
| public void setValueResultOfSearch(boolean v) { |
| if (v) { |
| setBits(VALUE_RESULT_OF_SEARCH); |
| } else { |
| clearBits(~VALUE_RESULT_OF_SEARCH); |
| } |
| } |
| |
| public boolean hasValidVersion() { |
| VersionStamp stamp = (VersionStamp) this; |
| return stamp.getRegionVersion() != 0 || stamp.getEntryVersion() != 0; |
| } |
| |
| @Override |
| public boolean hasStats() { |
| // override this in implementations that have stats |
| return false; |
| } |
| |
| @Override |
| public Object getMapValue() { |
| return this; |
| } |
| |
| @Override |
| public void setMapValue(final Object newValue) { |
| if (this != newValue) { |
| Assert.fail("AbstractRegionEntry#setMapValue: unexpected setMapValue " + "with newValue=" |
| + newValue + ", this=" + this); |
| } |
| } |
| |
| protected abstract void setEntryHash(int v); |
| |
| @Override |
| public String toString() { |
| final StringBuilder sb = new StringBuilder(this.getClass().getSimpleName()).append('@') |
| .append(Integer.toHexString(System.identityHashCode(this))).append(" ("); |
| return appendFieldsToString(sb).append(')').toString(); |
| } |
| |
| protected StringBuilder appendFieldsToString(final StringBuilder sb) { |
| // OFFHEAP _getValue ok: the current toString on ObjectChunk is safe to use without incing |
| // refcount. |
| sb.append("key=").append(getKey()).append("; rawValue=").append(getValue()); |
| VersionStamp stamp = getVersionStamp(); |
| if (stamp != null) { |
| sb.append("; version=").append(stamp.asVersionTag()).append(";member=") |
| .append(stamp.getMemberID()); |
| } |
| return sb; |
| } |
| |
| /** |
| * This generates version tags for outgoing messages for all subclasses supporting concurrency |
| * versioning. It also sets the entry's version stamp to the tag's values. |
| */ |
| @Override |
| public VersionTag generateVersionTag(VersionSource member, boolean withDelta, |
| InternalRegion region, EntryEventImpl event) { |
| VersionStamp stamp = this.getVersionStamp(); |
| if (stamp != null && region.getServerProxy() == null) { |
| // clients do not generate versions |
| int v = stamp.getEntryVersion() + 1; |
| if (v > 0xFFFFFF) { |
| v -= 0x1000000; // roll-over |
| } |
| VersionSource previous = stamp.getMemberID(); |
| |
| // For non persistent regions, we allow the member to be null and |
| // when we send a message and the remote side can determine the member |
| // from the sender. For persistent regions, we need to send |
| // the persistent id to the remote side. |
| // |
| // TODO - RVV - optimize the way we send the persistent id to save |
| // space. |
| if (member == null) { |
| VersionSource regionMember = region.getVersionMember(); |
| if (regionMember instanceof DiskStoreID) { |
| member = regionMember; |
| } |
| } |
| |
| VersionTag tag = VersionTag.create(member); |
| tag.setEntryVersion(v); |
| if (region.getVersionVector() != null) { |
| // Use region version if already provided, else generate |
| long nextRegionVersion = event.getNextRegionVersion(); |
| if (nextRegionVersion != -1) { |
| // Set on the tag and record it locally |
| tag.setRegionVersion(nextRegionVersion); |
| RegionVersionVector rvv = region.getVersionVector(); |
| rvv.recordVersion(rvv.getOwnerId(), nextRegionVersion); |
| if (logger.isDebugEnabled()) { |
| logger.debug("recorded region version {}; region={}", nextRegionVersion, |
| region.getFullPath()); |
| } |
| } else { |
| tag.setRegionVersion(region.getVersionVector().getNextVersion()); |
| } |
| } |
| if (withDelta) { |
| tag.setPreviousMemberID(previous); |
| } |
| VersionTag remoteTag = event.getVersionTag(); |
| if (remoteTag != null && remoteTag.isGatewayTag()) { |
| // if this event was received from a gateway we use the remote system's |
| // timestamp and dsid. |
| tag.setVersionTimeStamp(remoteTag.getVersionTimeStamp()); |
| tag.setDistributedSystemId(remoteTag.getDistributedSystemId()); |
| tag.setAllowedByResolver(remoteTag.isAllowedByResolver()); |
| } else { |
| long time = region.cacheTimeMillis(); |
| int dsid = region.getDistributionManager().getDistributedSystemId(); |
| // a locally generated change should always have a later timestamp than |
| // one received from a wan gateway, so fake a timestamp if necessary |
| if (time <= stamp.getVersionTimeStamp() && dsid != tag.getDistributedSystemId()) { |
| time = stamp.getVersionTimeStamp() + 1; |
| } |
| tag.setVersionTimeStamp(time); |
| tag.setDistributedSystemId(dsid); |
| } |
| stamp.setVersions(tag); |
| stamp.setMemberID(member); |
| event.setVersionTag(tag); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "generated tag {}; key={}; oldvalue={} newvalue={} client={} region={}; rvv={}", tag, |
| event.getKey(), event.getOldValueStringForm(), event.getNewValueStringForm(), |
| event.getContext() == null ? "none" |
| : event.getContext().getDistributedMember().getName(), |
| region.getFullPath(), region.getVersionVector()); |
| } |
| return tag; |
| } |
| return null; |
| } |
| |
| /** |
| * This performs a concurrency check. |
| * |
| * This check compares the version number first, followed by the member ID. |
| * |
| * Wraparound of the version number is detected and handled by extending the range of versions by |
| * one bit. |
| * |
| * The normal membership ID comparison method is used. |
| * <p> |
| * |
| * Note that a tag from a remote (WAN) system may be in the event. If this is the case this method |
| * will either invoke a user plugin that allows/disallows the event (and may modify the value) or |
| * it determines whether to allow or disallow the event based on timestamps and |
| * distributedSystemIDs. |
| * |
| * @throws ConcurrentCacheModificationException if the event conflicts with an event that has |
| * already been applied to the entry. |
| */ |
| public void processVersionTag(EntryEvent cacheEvent) { |
| processVersionTag(cacheEvent, true); |
| } |
| |
| protected void processVersionTag(EntryEvent cacheEvent, boolean conflictCheck) { |
| EntryEventImpl event = (EntryEventImpl) cacheEvent; |
| VersionTag tag = event.getVersionTag(); |
| if (tag == null) { |
| return; |
| } |
| |
| try { |
| if (tag.isGatewayTag()) { |
| // this may throw ConcurrentCacheModificationException or modify the event |
| if (processGatewayTag(cacheEvent)) { |
| return; |
| } |
| assert false : "processGatewayTag failure - returned false"; |
| } |
| |
| if (!tag.isFromOtherMember()) { |
| if (!event.getOperation().isNetSearch()) { |
| // except for netsearch, all locally-generated tags can be ignored |
| return; |
| } |
| } |
| |
| final InternalDistributedMember originator = |
| (InternalDistributedMember) event.getDistributedMember(); |
| final VersionSource dmId = event.getRegion().getVersionMember(); |
| InternalRegion r = event.getRegion(); |
| boolean eventHasDelta = event.getDeltaBytes() != null && event.getRawNewValue() == null; |
| |
| VersionStamp stamp = getVersionStamp(); |
| // bug #46223, an event received from a peer or a server may be from a different |
| // distributed system than the last modification made to this entry so we must |
| // perform a gateway conflict check |
| if (stamp != null && !tag.isAllowedByResolver()) { |
| int stampDsId = stamp.getDistributedSystemId(); |
| int tagDsId = tag.getDistributedSystemId(); |
| |
| if (stampDsId != 0 && stampDsId != tagDsId && stampDsId != -1) { |
| StringBuilder verbose = null; |
| if (logger.isTraceEnabled(LogMarker.TOMBSTONE_VERBOSE)) { |
| verbose = new StringBuilder(); |
| verbose.append("processing tag for key ").append(getKey()).append(", stamp=") |
| .append(stamp.asVersionTag()).append(", tag=").append(tag); |
| } |
| long stampTime = stamp.getVersionTimeStamp(); |
| long tagTime = tag.getVersionTimeStamp(); |
| if (stampTime > 0 && (tagTime > stampTime || (tagTime == stampTime |
| && tag.getDistributedSystemId() >= stamp.getDistributedSystemId()))) { |
| if (verbose != null) { |
| verbose.append(" - allowing event"); |
| logger.trace(LogMarker.TOMBSTONE_VERBOSE, verbose); |
| } |
| // Update the stamp with event's version information. |
| applyVersionTag(r, stamp, tag, originator); |
| return; |
| } |
| |
| if (stampTime > 0) { |
| if (verbose != null) { |
| verbose.append(" - disallowing event"); |
| logger.trace(LogMarker.TOMBSTONE_VERBOSE, verbose); |
| } |
| r.getCachePerfStats().incConflatedEventsCount(); |
| persistConflictingTag(r, tag); |
| throw new ConcurrentCacheModificationException("conflicting event detected"); |
| } |
| } |
| } |
| |
| if (r.getVersionVector() != null && r.getServerProxy() == null |
| && (r.getDataPolicy().withPersistence() || !r.getScope().isLocal())) { |
| // bug #45258 - perf degradation for local regions and RVV |
| VersionSource who = tag.getMemberID(); |
| if (who == null) { |
| who = originator; |
| } |
| r.getVersionVector().recordVersion(who, tag); |
| } |
| |
| assert !tag.isFromOtherMember() |
| || tag.getMemberID() != null : "remote tag is missing memberID"; |
| |
| // for a long time I had conflict checks turned off in clients when |
| // receiving a response from a server and applying it to the cache. This lowered |
| // the CPU cost of versioning but eventually had to be pulled for bug #45453 |
| |
| // events coming from servers while a local sync is held on the entry |
| // do not require a conflict check. Conflict checks were already |
| // performed on the server and here we just consume whatever was sent back. |
| // Event.isFromServer() returns true for client-update messages and |
| // for putAll/getAll, which do not hold syncs during the server operation. |
| |
| // for a very long time we had conflict checks turned off for PR buckets. |
| // Bug 45669 showed a primary dying in the middle of distribution. This caused |
| // one backup bucket to have a v2. The other bucket was promoted to primary and |
| // generated a conflicting v2. We need to do the check so that if this second |
| // v2 loses to the original one in the delta-GII operation that the original v2 |
| // will be the winner in both buckets. |
| |
| // The new value in event is not from GII, even it could be tombstone |
| basicProcessVersionTag(r, tag, false, eventHasDelta, dmId, originator, conflictCheck); |
| } catch (ConcurrentCacheModificationException ex) { |
| event.isConcurrencyConflict(true); |
| throw ex; |
| } |
| } |
| |
| protected void basicProcessVersionTag(InternalRegion region, VersionTag tag, |
| boolean isTombstoneFromGII, boolean deltaCheck, VersionSource dmId, |
| InternalDistributedMember sender, boolean checkForConflict) { |
| |
| if (tag != null) { |
| VersionStamp stamp = getVersionStamp(); |
| |
| StringBuilder verbose = null; |
| if (logger.isTraceEnabled(LogMarker.TOMBSTONE_VERBOSE)) { |
| VersionTag stampTag = stamp.asVersionTag(); |
| if (stampTag.hasValidVersion() && checkForConflict) { |
| // only be verbose here if there's a possibility we might reject the operation |
| verbose = new StringBuilder(); |
| verbose.append("processing tag for key ").append(getKey()).append(", stamp=") |
| .append(stamp.asVersionTag()).append(", tag=").append(tag) |
| .append(", checkForConflict=").append(checkForConflict); |
| } |
| } |
| |
| if (stamp == null) { |
| throw new IllegalStateException( |
| "message contained a version tag but this region has no version storage"); |
| } |
| |
| boolean apply = true; |
| |
| try { |
| if (checkForConflict) { |
| apply = checkForConflict(region, stamp, tag, isTombstoneFromGII, deltaCheck, dmId, sender, |
| verbose); |
| } |
| } catch (ConcurrentCacheModificationException e) { |
| // Even if we don't apply the operation we should always retain the |
| // highest timestamp in order for WAN conflict checks to work correctly |
| // because the operation may have been sent to other systems and been |
| // applied there |
| if (!tag.isGatewayTag() && stamp.getDistributedSystemId() == tag.getDistributedSystemId() |
| && tag.getVersionTimeStamp() > stamp.getVersionTimeStamp()) { |
| stamp.setVersionTimeStamp(tag.getVersionTimeStamp()); |
| tag.setTimeStampApplied(true); |
| if (verbose != null) { |
| verbose |
| .append("\nThough in conflict the tag timestamp was more recent and was recorded."); |
| } |
| } |
| throw e; |
| } finally { |
| if (verbose != null) { |
| logger.trace(LogMarker.TOMBSTONE_VERBOSE, verbose); |
| } |
| } |
| |
| if (apply) { |
| applyVersionTag(region, stamp, tag, sender); |
| } |
| } |
| } |
| |
| private void applyVersionTag(InternalRegion region, VersionStamp stamp, VersionTag tag, |
| InternalDistributedMember sender) { |
| VersionSource mbr = tag.getMemberID(); |
| if (mbr == null) { |
| mbr = sender; |
| } |
| mbr = region.getVersionVector().getCanonicalId(mbr); |
| tag.setMemberID(mbr); |
| stamp.setVersions(tag); |
| if (tag.hasPreviousMemberID()) { |
| if (tag.getPreviousMemberID() == null) { |
| tag.setPreviousMemberID(stamp.getMemberID()); |
| } else { |
| tag.setPreviousMemberID( |
| region.getVersionVector().getCanonicalId(tag.getPreviousMemberID())); |
| } |
| } |
| } |
| |
| /** perform conflict checking for a stamp/tag */ |
| private boolean checkForConflict(InternalRegion region, VersionStamp stamp, VersionTag tag, |
| boolean isTombstoneFromGII, boolean deltaCheck, VersionSource dmId, |
| InternalDistributedMember sender, StringBuilder verbose) { |
| |
| int stampVersion = stamp.getEntryVersion(); |
| int tagVersion = tag.getEntryVersion(); |
| |
| if (stamp.getVersionTimeStamp() != 0) { // new entries have no timestamp |
| // check for wrap-around on the version number |
| long difference = tagVersion - stampVersion; |
| if (0x10000 < difference || difference < -0x10000) { |
| if (verbose != null) { |
| verbose.append("\nversion rollover detected: tag=").append(tagVersion).append(" stamp=") |
| .append(stampVersion); |
| } |
| if (difference < 0) { |
| tagVersion += 0x1000000; |
| } else { |
| stampVersion += 0x1000000; |
| } |
| } |
| } |
| if (verbose != null) { |
| verbose.append("\nstamp=v").append(stampVersion).append(" tag=v").append(tagVersion); |
| } |
| |
| if (deltaCheck) { |
| checkForDeltaConflict(region, stampVersion, tagVersion, stamp, tag, dmId, sender, verbose); |
| } |
| |
| boolean throwex = false; |
| boolean apply = false; |
| if (stampVersion == 0 || stampVersion < tagVersion) { |
| if (verbose != null) { |
| verbose.append(" - applying change"); |
| } |
| apply = true; |
| } else if (stampVersion > tagVersion) { |
| if (overwritingOldTombstone(region, stamp, tag, verbose) |
| && tag.getVersionTimeStamp() > stamp.getVersionTimeStamp()) { |
| apply = true; |
| } else { |
| // check for an incoming expired tombstone from an initial image chunk. |
| if (tagVersion > 0 |
| && isExpiredTombstone(region, tag.getVersionTimeStamp(), isTombstoneFromGII) |
| && tag.getVersionTimeStamp() > stamp.getVersionTimeStamp()) { |
| // A special case to apply: when remote entry is expired tombstone, then let local vs |
| // remote with newer timestamp to win |
| if (verbose != null) { |
| verbose.append(" - applying change in Delta GII"); |
| } |
| apply = true; |
| } else { |
| if (verbose != null) { |
| verbose.append(" - disallowing"); |
| } |
| throwex = true; |
| } |
| } |
| } else { |
| if (overwritingOldTombstone(region, stamp, tag, verbose)) { |
| apply = true; |
| } else { |
| // compare member IDs |
| VersionSource stampID = stamp.getMemberID(); |
| if (stampID == null) { |
| stampID = dmId; |
| } |
| VersionSource tagID = tag.getMemberID(); |
| if (tagID == null) { |
| tagID = sender; |
| } |
| if (verbose != null) { |
| verbose.append("\ncomparing IDs"); |
| } |
| int compare = stampID.compareTo(tagID); |
| if (compare < 0) { |
| if (verbose != null) { |
| verbose.append(" - applying change"); |
| } |
| apply = true; |
| } else if (compare > 0) { |
| if (verbose != null) { |
| verbose.append(" - disallowing"); |
| } |
| throwex = true; |
| } else if (tag.isPosDup()) { |
| if (verbose != null) { |
| verbose.append(" - disallowing duplicate marked with posdup"); |
| } |
| throwex = true; |
| } else { |
| if (verbose != null) { |
| verbose.append(" - allowing duplicate"); |
| } |
| } |
| } |
| } |
| |
| if (!apply && throwex) { |
| region.getCachePerfStats().incConflatedEventsCount(); |
| persistConflictingTag(region, tag); |
| throw new ConcurrentCacheModificationException(); |
| } |
| |
| return apply; |
| } |
| |
| private boolean isExpiredTombstone(InternalRegion region, long timestamp, boolean isTombstone) { |
| return isTombstone |
| && timestamp + TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT <= region.cacheTimeMillis(); |
| } |
| |
| private boolean overwritingOldTombstone(InternalRegion region, VersionStamp stamp, VersionTag tag, |
| StringBuilder verbose) { |
| // Tombstone GC does not use locking to stop operations when old tombstones |
| // are being removed. Because of this we might get an operation that was applied |
| // in another VM that has just reaped a tombstone and is now using a reset |
| // entry version number. Because of this we check the timestamp on the current |
| // local entry and see if it is old enough to have expired. If this is the case |
| // we accept the change and allow the tag to be recorded |
| long stampTime = stamp.getVersionTimeStamp(); |
| if (isExpiredTombstone(region, stampTime, this.isTombstone())) { |
| // no local change since the tombstone would have timed out - accept the change |
| if (verbose != null) { |
| verbose.append(" - accepting because local timestamp is old"); |
| } |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| protected void persistConflictingTag(InternalRegion region, VersionTag tag) { |
| // only persist region needs to persist conflict tag |
| } |
| |
| /** |
| * for an event containing a delta we must check to see if the tag's previous member id is the |
| * stamp's member id and ensure that the version is only incremented by 1. Otherwise the delta is |
| * being applied to a value that does not match the source of the delta. |
| */ |
| private void checkForDeltaConflict(InternalRegion region, long stampVersion, long tagVersion, |
| VersionStamp stamp, VersionTag tag, VersionSource dmId, InternalDistributedMember sender, |
| StringBuilder verbose) { |
| |
| if (tagVersion != stampVersion + 1) { |
| if (verbose != null) { |
| verbose.append("\ndelta requires full value due to version mismatch"); |
| } |
| region.getCachePerfStats().incDeltaFailedUpdates(); |
| throw new InvalidDeltaException("delta cannot be applied due to version mismatch"); |
| |
| } else { |
| // make sure the tag was based on the value in this entry by checking the |
| // tag's previous-changer ID against this stamp's current ID |
| VersionSource stampID = stamp.getMemberID(); |
| if (stampID == null) { |
| stampID = dmId; |
| } |
| VersionSource tagID = tag.getPreviousMemberID(); |
| if (tagID == null) { |
| tagID = sender; |
| } |
| if (!tagID.equals(stampID)) { |
| if (verbose != null) { |
| verbose.append("\ndelta requires full value. tag.previous=").append(tagID) |
| .append(" but stamp.current=").append(stampID); |
| } |
| region.getCachePerfStats().incDeltaFailedUpdates(); |
| throw new InvalidDeltaException("delta cannot be applied due to version ID mismatch"); |
| } |
| } |
| } |
| |
| private boolean processGatewayTag(EntryEvent cacheEvent) { |
| // Gateway tags are installed in the server-side InternalRegion cache |
| // modification methods. They do not have version numbers or distributed |
| // member IDs. Instead they only have timestamps and distributed system IDs. |
| |
| // If there is a resolver plug-in, invoke it. Otherwise we use the timestamps and |
| // distributed system IDs to determine whether to allow the event to proceed. |
| |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| |
| if (this.isRemoved() && !this.isTombstone()) { |
| return true; // no conflict on a new entry |
| } |
| EntryEventImpl event = (EntryEventImpl) cacheEvent; |
| VersionTag tag = event.getVersionTag(); |
| long stampTime = getVersionStamp().getVersionTimeStamp(); |
| long tagTime = tag.getVersionTimeStamp(); |
| int stampDsid = getVersionStamp().getDistributedSystemId(); |
| int tagDsid = tag.getDistributedSystemId(); |
| if (isDebugEnabled) { |
| logger.debug( |
| "processing gateway version information for {}. Stamp dsid={} time={} Tag dsid={} time={}", |
| event.getKey(), stampDsid, stampTime, tagDsid, tagTime); |
| } |
| if (tagTime == VersionTag.ILLEGAL_VERSION_TIMESTAMP) { |
| return true; // no timestamp received from other system - just apply it |
| } |
| // According to GatewayConflictResolver's java doc, it will only be used on tag with different |
| // distributed system id than stamp's |
| if (stampDsid == -1) { |
| return true; |
| } else if (tagDsid == stampDsid) { |
| if (tagTime >= stampTime) { |
| return true; |
| } else { |
| throw new ConcurrentCacheModificationException("conflicting WAN event detected"); |
| } |
| } |
| GatewayConflictResolver resolver = event.getRegion().getCache().getGatewayConflictResolver(); |
| if (resolver != null) { |
| if (isDebugEnabled) { |
| logger.debug("invoking gateway conflict resolver"); |
| } |
| final boolean[] disallow = new boolean[1]; |
| final Object[] newValue = new Object[] {this}; |
| GatewayConflictHelper helper = new GatewayConflictHelper() { |
| @Override |
| public void disallowEvent() { |
| disallow[0] = true; |
| } |
| |
| @Override |
| public void changeEventValue(Object value) { |
| newValue[0] = value; |
| } |
| }; |
| |
| @Released |
| TimestampedEntryEventImpl timestampedEvent = (TimestampedEntryEventImpl) event |
| .getTimestampedEvent(tagDsid, stampDsid, tagTime, stampTime); |
| |
| // gateway conflict resolvers will usually want to see the old value |
| if (!timestampedEvent.hasOldValue() && isRemoved()) { |
| // OFFHEAP: since isRemoved I think getValue will never be stored off heap in this case |
| timestampedEvent.setOldValue(getValue(timestampedEvent.getRegion())); |
| } |
| |
| Throwable thr = null; |
| try { |
| resolver.onEvent(timestampedEvent, helper); |
| } catch (CancelException cancelled) { |
| throw cancelled; |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| logger.error("Exception occurred in GatewayConflictResolver", t); |
| thr = t; |
| } finally { |
| timestampedEvent.release(); |
| } |
| |
| if (isDebugEnabled) { |
| logger.debug("done invoking resolver", thr); |
| } |
| if (thr == null) { |
| if (disallow[0]) { |
| if (isDebugEnabled) { |
| logger.debug("conflict resolver rejected the event for {}", event.getKey()); |
| } |
| throw new ConcurrentCacheModificationException( |
| "WAN conflict resolver rejected the operation"); |
| } |
| |
| tag.setAllowedByResolver(true); |
| |
| if (newValue[0] != this) { |
| if (isDebugEnabled) { |
| logger.debug("conflict resolver changed the value of the event for {}", event.getKey()); |
| } |
| // the resolver changed the event value! |
| event.setNewValue(newValue[0]); |
| } |
| // if nothing was done then we allow the event |
| if (isDebugEnabled) { |
| logger.debug("change was allowed by conflict resolver: {}", tag); |
| } |
| return true; |
| } |
| } |
| if (isDebugEnabled) { |
| logger.debug("performing normal WAN conflict check"); |
| } |
| if (tagTime > stampTime || tagTime == stampTime && tagDsid >= stampDsid) { |
| if (isDebugEnabled) { |
| logger.debug("allowing event"); |
| } |
| return true; |
| } |
| if (isDebugEnabled) { |
| logger.debug("disallowing event for {}", event.getKey()); |
| } |
| throw new ConcurrentCacheModificationException("conflicting WAN event detected"); |
| } |
| |
| static boolean isCompressible(RegionEntryContext context, Object value) { |
| return value != null && context != null && context.getCompressor() != null |
| && !Token.isInvalidOrRemoved(value); |
| } |
| |
| /* subclasses supporting versions must override this */ |
| @Override |
| public VersionStamp getVersionStamp() { |
| return null; |
| } |
| |
| @Override |
| public boolean isValueNull() { |
| return null == getValueAsToken(); |
| } |
| |
| @Override |
| public boolean isInvalid() { |
| return Token.isInvalid(getValueAsToken()); |
| } |
| |
| @Override |
| public boolean isDestroyed() { |
| return Token.isDestroyed(getValueAsToken()); |
| } |
| |
| @Override |
| public void setValueToNull() { |
| _setValue(null); |
| } |
| |
| @Override |
| public boolean isInvalidOrRemoved() { |
| return Token.isInvalidOrRemoved(getValueAsToken()); |
| } |
| |
| /** |
| * This is only retained in off-heap subclasses. However, it's marked as Retained here so that |
| * callers are aware that the value may be retained. |
| */ |
| @Override |
| @Retained |
| public Object getValueRetain(RegionEntryContext context, boolean decompress) { |
| if (decompress) { |
| return decompress(context, getValue()); |
| } else { |
| return getValue(); |
| } |
| } |
| |
| @Override |
| public void returnToPool() { |
| // noop by default |
| } |
| |
| @Override |
| public boolean isEvicted() { |
| return false; |
| } |
| } |