blob: b67d0f10c63a19307288d5c6f2f9538eb280bfe4 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.cache.CacheClosedException;
import com.gemstone.gemfire.cache.DiskAccessException;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.ByteArrayDataInput;
import com.gemstone.gemfire.internal.HeapDataOutputStream;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.ValueWrapper;
import com.gemstone.gemfire.internal.cache.DiskStoreImpl.AsyncDiskEntry;
import com.gemstone.gemfire.internal.cache.lru.EnableLRU;
import com.gemstone.gemfire.internal.cache.lru.LRUClockNode;
import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
import com.gemstone.gemfire.internal.cache.persistence.BytesAndBits;
import com.gemstone.gemfire.internal.cache.persistence.DiskRecoveryStore;
import com.gemstone.gemfire.internal.cache.persistence.DiskRegionView;
import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
import com.gemstone.gemfire.internal.offheap.Releasable;
import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
import com.gemstone.gemfire.internal.offheap.UnsafeMemoryChunk;
import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.Chunk;
import com.gemstone.gemfire.internal.offheap.StoredObject;
import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.internal.offheap.annotations.Retained;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
import com.gemstone.gemfire.internal.util.BlobHelper;
/**
* Represents an entry in an {@link RegionMap} whose value may be
* stored on disk. This interface provides accessor and mutator
* methods for a disk entry's state. This allows us to abstract all
* of the interesting behavior into a {@linkplain DiskEntry.Helper
* helper class} that we only need to implement once.
*
* <P>
*
* Each <code>DiskEntry</code> has a unique <code>id</code> that is
* used by the {@link DiskRegion} to identify the key/value pair.
* Before the disk entry is written to disk, the value of the
* <code>id</code> is {@link DiskRegion#INVALID_ID invalid}. Once the
* object has been written to disk, the <code>id</code> is a positive
* number. If the value is {@linkplain Helper#update updated}, then the
* <code>id</code> is negated to signify that the value on disk is
* dirty.
*
* @see DiskRegion
*
* @author David Whitlock
*
* @since 3.2
*/
public interface DiskEntry extends RegionEntry {
/**
* Sets the value with a {@link RegionEntryContext}.
* @param context the value's context.
* @param value an entry value.
*/
public void setValueWithContext(RegionEntryContext context,Object value);
/**
* In some cases we need to do something just before we drop the value
* from a DiskEntry that is being moved (i.e. overflowed) to disk.
* @param context
*/
public void handleValueOverflow(RegionEntryContext context);
/**
* In some cases we need to do something just after we unset the value
* from a DiskEntry that has been moved (i.e. overflowed) to disk.
* @param context
*/
public void afterValueOverflow(RegionEntryContext context);
/**
* Returns true if the DiskEntry value is equal to {@link Token#DESTROYED}, {@link Token#REMOVED_PHASE1}, or {@link Token#REMOVED_PHASE2}.
*/
public boolean isRemovedFromDisk();
/**
* Returns the id of this <code>DiskEntry</code>
*/
public DiskId getDiskId();
public void _removePhase1();
public int updateAsyncEntrySize(EnableLRU capacityController);
public DiskEntry getPrev();
public DiskEntry getNext();
public void setPrev(DiskEntry v);
public void setNext(DiskEntry v);
/**
* Used as the entry value if it was invalidated.
*/
public static final byte[] INVALID_BYTES = new byte[0];
/**
* Used as the entry value if it was locally invalidated.
*/
public static final byte[] LOCAL_INVALID_BYTES = new byte[0];
/**
* Used as the entry value if it was tombstone.
*/
public static final byte[] TOMBSTONE_BYTES = new byte[0];
/////////////////////// Inner Classes //////////////////////
/**
* A Helper class for performing functions common to all
* <code>DiskEntry</code>s.
*/
public static class Helper {
private static final Logger logger = LogService.getLogger();
/**
* Testing purpose only
* Get the value of an entry that is on disk without faulting
* it in and without looking in the io buffer.
* @since 3.2.1
*/
static Object getValueOnDisk(DiskEntry entry, DiskRegion dr) {
DiskId id = entry.getDiskId();
if (id == null) {
return null;
}
dr.acquireReadLock();
try {
synchronized (id) {
if (id == null
|| (dr.isBackup() && id.getKeyId() == DiskRegion.INVALID_ID)
|| (!entry.isValueNull() && id.needsToBeWritten() && !EntryBits.isRecoveredFromDisk(id.getUserBits()))/*fix for bug 41942*/) {
return null;
}
return dr.getNoBuffer(id);
}
} finally {
dr.releaseReadLock();
}
}
/**
* Get the serialized value directly from disk. Returned object may be
* a {@link CachedDeserializable}. Goes straight to disk without faulting
* into memory. Only looks at the disk storage, not at heap storage.
* @param entry the entry used to identify the value to fetch
* @param dr the persistent storage from which to fetch the value
* @return either null, byte array, or CacheDeserializable
* @since gemfire57_hotfix
*/
public static Object getSerializedValueOnDisk(
DiskEntry entry, DiskRegion dr) {
DiskId did = entry.getDiskId();
if (did == null) {
return null;
}
dr.acquireReadLock();
try {
synchronized (did) {
if (did == null
|| (dr.isBackup() && did.getKeyId() == DiskRegion.INVALID_ID)) {
return null;
} else if (!entry.isValueNull() && did.needsToBeWritten() && !EntryBits.isRecoveredFromDisk(did.getUserBits())/*fix for bug 41942*/) {
return null;
}
return dr.getSerializedData(did);
}
} finally {
dr.releaseReadLock();
}
}
/**
* Get the value of an entry that is on disk without
* faulting it in . It checks for the presence in the buffer also.
* This method is used for concurrent map operations, SQLFabric and CQ processing
*
* @throws DiskAccessException
* @since 5.1
*/
static Object getValueOnDiskOrBuffer(DiskEntry entry, DiskRegion dr, RegionEntryContext context) {
@Released Object v = getOffHeapValueOnDiskOrBuffer(entry, dr, context);
if (v instanceof CachedDeserializable) {
if (v instanceof Chunk) {
@Released Chunk ohv = (Chunk) v;
try {
v = ohv.getDeserializedValue(null, null);
if (v == ohv) {
throw new IllegalStateException("sqlf tried to use getValueOnDiskOrBuffer");
}
} finally {
ohv.release(); // OFFHEAP the offheap ref is decremented here
}
} else {
v = ((CachedDeserializable)v).getDeserializedValue(null, null);
}
}
return v;
}
@Retained
static Object getOffHeapValueOnDiskOrBuffer(DiskEntry entry, DiskRegion dr, RegionEntryContext context) {
DiskId did = entry.getDiskId();
Object syncObj = did;
if (syncObj == null) {
syncObj = entry;
}
if (syncObj == did) {
dr.acquireReadLock();
}
try {
synchronized (syncObj) {
if (did != null && did.isPendingAsync()) {
@Retained Object v = entry._getValueRetain(context, true); // TODO:KIRK:OK Rusty had Object v = entry.getValueWithContext(context);
if (Token.isRemovedFromDisk(v)) {
v = null;
}
return v;
}
if (did == null
|| ( dr.isBackup() && did.getKeyId() == DiskRegion.INVALID_ID)
|| (!entry.isValueNull() && did.needsToBeWritten() && !EntryBits.isRecoveredFromDisk(did.getUserBits()))/*fix for bug 41942*/) {
return null;
}
return dr.getSerializedData(did);
}
} finally {
if (syncObj == did) {
dr.releaseReadLock();
}
}
}
/**
* Returns false if the entry is INVALID (or LOCAL_INVALID). Determines this
* without faulting in the value from disk.
*
* @since 3.2.1
*/
/* TODO prpersist - Do we need this method? It was added by the sqlf merge
static boolean isValid(DiskEntry entry, DiskRegion dr) {
synchronized (entry) {
if (entry.isRecovered()) {
// We have a recovered entry whose value is still on disk.
// So take a peek at it without faulting it in.
//long id = entry.getDiskId().getKeyId();
//entry.getDiskId().setKeyId(-id);
byte bits = dr.getBits(entry.getDiskId());
//TODO Asif:Check if resetting is needed
return !EntryBits.isInvalid(bits) && !EntryBits.isLocalInvalid(bits);
}
}
}*/
static boolean isOverflowedToDisk(DiskEntry de, DiskRegion dr, DistributedRegion.DiskPosition dp,RegionEntryContext context) {
Object v = null;
DiskId did;
synchronized (de) {
did = de.getDiskId();
}
Object syncObj = did;
if (syncObj == null) {
syncObj = de;
}
if (syncObj == did) {
dr.acquireReadLock();
}
try {
synchronized (syncObj) {
if (de.isValueNull()) {
if (did == null) {
synchronized (de) {
did = de.getDiskId();
}
assert did != null;
return isOverflowedToDisk(de, dr, dp, context);
} else {
dp.setPosition(did.getOplogId(), did.getOffsetInOplog());
return true;
}
} else {
return false;
}
}
} finally {
if (syncObj == did) {
dr.releaseReadLock();
}
}
}
/**
* Get the value of an entry that is on disk without faulting
* it in.
* @since 3.2.1
*/
static boolean fillInValue(DiskEntry de, InitialImageOperation.Entry entry,
DiskRegion dr, DM mgr, ByteArrayDataInput in, RegionEntryContext context) {
@Retained @Released Object v = null;
DiskId did;
synchronized (de) {
did = de.getDiskId();
}
Object syncObj = did;
if (syncObj == null) {
syncObj = de;
}
if (syncObj == did) {
dr.acquireReadLock();
}
try {
synchronized (syncObj) {
entry.setLastModified(mgr, de.getLastModified());
SimpleMemoryAllocatorImpl.setReferenceCountOwner(entry);
v = de._getValueRetain(context, true); // OFFHEAP copied to heap entry; todo allow entry to refer to offheap since it will be copied to network.
SimpleMemoryAllocatorImpl.setReferenceCountOwner(null);
if (v == null) {
if (did == null) {
// fix for bug 41449
synchronized (de) {
did = de.getDiskId();
}
assert did != null;
// do recursive call to get readLock on did
return fillInValue(de, entry, dr, mgr, in, context);
}
if (logger.isDebugEnabled()) {
logger.debug("DiskEntry.Helper.fillInValue, key={}; getting value from disk, disk id={}", entry.key, did);
}
BytesAndBits bb = null;
try {
bb = dr.getBytesAndBits(did, false);
}catch(DiskAccessException dae){
return false;
}
if (EntryBits.isInvalid(bb.getBits())) {
entry.setInvalid();
}
else if (EntryBits.isLocalInvalid(bb.getBits())) {
entry.setLocalInvalid();
}
else if (EntryBits.isTombstone(bb.getBits())) {
entry.setTombstone();
}
else {
entry.value = bb.getBytes();
entry.setSerialized(EntryBits.isSerialized(bb.getBits()));
}
return true;
}
}
} finally {
if (syncObj == did) {
dr.releaseReadLock();
}
}
final boolean isEagerDeserialize = entry.isEagerDeserialize();
if (isEagerDeserialize) {
entry.clearEagerDeserialize();
}
if (Token.isRemovedFromDisk(v)) {
// fix for bug 31757
return false;
} else if (v instanceof CachedDeserializable) {
try {
if (v instanceof StoredObject && !((StoredObject) v).isSerialized()) {
entry.setSerialized(false);
entry.value = ((StoredObject) v).getDeserializedForReading();
//For SQLFire we prefer eager deserialized
// if(v instanceof ByteSource) {
// entry.setEagerDeserialize();
// }
} else {
// don't serialize here if it is not already serialized
Object tmp = ((CachedDeserializable)v).getValue();
//For SQLFire we prefer eager deserialized
// if(v instanceof ByteSource) {
// entry.setEagerDeserialize();
// }
if (tmp instanceof byte[]) {
byte[] bb = (byte[])tmp;
entry.value = bb;
entry.setSerialized(true);
}
else if (isEagerDeserialize && tmp instanceof byte[][]) {
// optimize for byte[][] since it will need to be eagerly deserialized
// for SQLFabric
entry.value = tmp;
entry.setEagerDeserialize();
entry.setSerialized(true);
}
else {
try {
HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
BlobHelper.serializeTo(tmp, hdos);
hdos.trim();
entry.value = hdos;
entry.setSerialized(true);
} catch (IOException e) {
RuntimeException e2 = new IllegalArgumentException(LocalizedStrings.DiskEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING.toLocalizedString());
e2.initCause(e);
throw e2;
}
}
}
} finally {
// If v == entry.value then v is assumed to be an OffHeapByteSource
// and release() will be called on v after the bytes have been read from
// off-heap.
if (v != entry.value) {
OffHeapHelper.releaseWithNoTracking(v);
}
}
}
else if (v instanceof byte[]) {
entry.value = v;
entry.setSerialized(false);
}
else if (isEagerDeserialize && v instanceof byte[][]) {
// optimize for byte[][] since it will need to be eagerly deserialized
// for SQLFabric
entry.value = v;
entry.setEagerDeserialize();
}
else if (v == Token.INVALID) {
entry.setInvalid();
}
else if (v == Token.LOCAL_INVALID) {
// fix for bug 31107
entry.setLocalInvalid();
} else if (v == Token.TOMBSTONE) {
entry.setTombstone();
}
else {
Object preparedValue = v;
if (preparedValue != null) {
preparedValue = AbstractRegionEntry.prepareValueForGII(preparedValue);
if (preparedValue == null) {
return false;
}
}
if (CachedDeserializableFactory.preferObject()) {
entry.value = preparedValue;
entry.setEagerDeserialize();
}
else {
try {
HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
BlobHelper.serializeTo(preparedValue, hdos);
hdos.trim();
entry.value = hdos;
entry.setSerialized(true);
} catch (IOException e) {
RuntimeException e2 = new IllegalArgumentException(LocalizedStrings.DiskEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING.toLocalizedString());
e2.initCause(e);
throw e2;
}
}
}
return true;
}
/**
* Used to initialize a new disk entry
*/
static void initialize(DiskEntry entry, DiskRecoveryStore r, Object newValue) {
DiskRegionView drv = null;
if (r instanceof LocalRegion) {
drv = ((LocalRegion)r).getDiskRegion();
} else if (r instanceof DiskRegionView) {
drv = (DiskRegionView)r;
}
if (drv == null) {
throw new IllegalArgumentException(LocalizedStrings.DiskEntry_DISK_REGION_IS_NULL.toLocalizedString());
}
if (newValue == null || Token.isRemovedFromDisk(newValue)) {
// it is not in vm and it is not on disk
DiskId did = entry.getDiskId();
if (did != null) {
did.setKeyId(DiskRegion.INVALID_ID);
}
}
else if (newValue instanceof RecoveredEntry) {
// Set the id directly, the value will also be set if RECOVER_VALUES
RecoveredEntry re = (RecoveredEntry)newValue;
DiskId did = entry.getDiskId();
did.setOplogId(re.getOplogId());
did.setOffsetInOplog(re.getOffsetInOplog());
did.setKeyId(re.getRecoveredKeyId());
did.setUserBits(re.getUserBits());
did.setValueLength(re.getValueLength());
if (re.getRecoveredKeyId() < 0) {
drv.incNumOverflowOnDisk(1L);
incrementBucketStats(r, 0/*InVM*/, 1/*OnDisk*/, did.getValueLength());
}
else {
entry.setValueWithContext(drv, entry.prepareValueForCache((RegionEntryContext) r,
re.getValue(), false));
drv.incNumEntriesInVM(1L);
incrementBucketStats(r, 1/*InVM*/, 0/*OnDisk*/, 0);
}
}
else {
DiskId did = entry.getDiskId();
if (did != null) {
did.setKeyId(DiskRegion.INVALID_ID);
}
drv.incNumEntriesInVM(1L);
incrementBucketStats(r, 1/*InVM*/, 0/*OnDisk*/, 0);
}
}
private static final ValueWrapper INVALID_VW = new ByteArrayValueWrapper(true, INVALID_BYTES);
private static final ValueWrapper LOCAL_INVALID_VW = new ByteArrayValueWrapper(true, LOCAL_INVALID_BYTES);
private static final ValueWrapper TOMBSTONE_VW = new ByteArrayValueWrapper(true, TOMBSTONE_BYTES);
public static interface ValueWrapper {
public boolean isSerialized();
public int getLength();
public byte getUserBits();
public void sendTo(ByteBuffer bb, Flushable flushable) throws IOException;
public String getBytesAsString();
}
public static interface Flushable {
public void flush() throws IOException;
public void flush(ByteBuffer bb, ByteBuffer chunkbb) throws IOException;
}
public static class ByteArrayValueWrapper implements ValueWrapper {
public final boolean isSerializedObject;
public final byte[] bytes;
public ByteArrayValueWrapper(boolean isSerializedObject, byte[] bytes) {
this.isSerializedObject = isSerializedObject;
this.bytes = bytes;
}
@Override
public boolean isSerialized() {
return this.isSerializedObject;
}
@Override
public int getLength() {
return (this.bytes != null) ? this.bytes.length : 0;
}
private boolean isInvalidToken() {
return this == INVALID_VW;
}
private boolean isLocalInvalidToken() {
return this == LOCAL_INVALID_VW;
}
private boolean isTombstoneToken() {
return this == TOMBSTONE_VW;
}
@Override
public byte getUserBits() {
byte userBits = 0x0;
if (isSerialized()) {
if (isTombstoneToken()) {
userBits = EntryBits.setTombstone(userBits, true);
} else if (isInvalidToken()) {
userBits = EntryBits.setInvalid(userBits, true);
} else if (isLocalInvalidToken()) {
userBits = EntryBits.setLocalInvalid(userBits, true);
} else {
if (this.bytes == null) {
throw new IllegalStateException("userBits==1 and value is null");
} else if (this.bytes.length == 0) {
throw new IllegalStateException("userBits==1 and value is zero length");
}
userBits = EntryBits.setSerialized(userBits, true);
}
}
return userBits;
}
@Override
public void sendTo(ByteBuffer bb, Flushable flushable) throws IOException {
int offset = 0;
final int maxOffset = getLength();
while (offset < maxOffset) {
int bytesThisTime = maxOffset - offset;
boolean needsFlush = false;
if (bytesThisTime > bb.remaining()) {
needsFlush = true;
bytesThisTime = bb.remaining();
}
bb.put(this.bytes, offset, bytesThisTime);
offset += bytesThisTime;
if (needsFlush) {
flushable.flush();
}
}
}
@Override
public String getBytesAsString() {
if (this.bytes == null) {
return "null";
}
StringBuffer sb = new StringBuffer();
int len = getLength();
for (int i = 0; i < len; i++) {
sb.append(this.bytes[i]).append(", ");
}
return sb.toString();
}
}
/**
* This class is a bit of a hack used by the compactor.
* For the compactor always copies to a byte[] so
* this class is just a simple wrapper.
* It is possible that the length of the byte array is greater
* than the actual length of the wrapped data.
* At the time we create this we are all done with isSerialized
* and userBits so those methods are not supported.
*/
public static class CompactorValueWrapper extends ByteArrayValueWrapper {
private final int length;
public CompactorValueWrapper(byte[] bytes, int length) {
super(false, bytes);
this.length = length;
}
@Override
public boolean isSerialized() {
throw new UnsupportedOperationException();
}
@Override
public int getLength() {
return this.length;
}
@Override
public byte getUserBits() {
throw new UnsupportedOperationException();
}
}
/**
* Note that the Chunk this ValueWrapper is created with
* is unretained so it must be used before the owner of
* the chunk releases it.
* Since the RegionEntry that has the value we are writing to
* disk has it retained we are ok as long as this ValueWrapper's
* life ends before the RegionEntry sync is released.
* Note that this class is only used with uncompressed chunks.
*/
public static class ChunkValueWrapper implements ValueWrapper {
private final @Unretained Chunk chunk;
public ChunkValueWrapper(Chunk c) {
assert !c.isCompressed();
this.chunk = c;
}
@Override
public boolean isSerialized() {
return this.chunk.isSerialized();
}
@Override
public int getLength() {
return this.chunk.getDataSize();
}
@Override
public byte getUserBits() {
byte userBits = 0x0;
if (isSerialized()) {
userBits = EntryBits.setSerialized(userBits, true);
}
return userBits;
}
@Override
public void sendTo(ByteBuffer bb, Flushable flushable) throws IOException {
final int maxOffset = getLength();
if (maxOffset == 0) {
return;
}
if (maxOffset > bb.capacity()) {
ByteBuffer chunkbb = this.chunk.createDirectByteBuffer();
if (chunkbb != null) {
flushable.flush(bb, chunkbb);
return;
}
}
final long bbAddress = Chunk.getDirectByteBufferAddress(bb);
if (bbAddress != 0L) {
int bytesRemaining = maxOffset;
int availableSpace = bb.remaining();
long addrToWrite = bbAddress + bb.position();
long addrToRead = this.chunk.getAddressForReading(0, maxOffset);
if (bytesRemaining > availableSpace) {
do {
UnsafeMemoryChunk.copyMemory(addrToRead, addrToWrite, availableSpace);
bb.position(bb.position()+availableSpace);
addrToRead += availableSpace;
bytesRemaining -= availableSpace;
flushable.flush();
addrToWrite = bbAddress + bb.position();
availableSpace = bb.remaining();
} while (bytesRemaining > availableSpace);
}
UnsafeMemoryChunk.copyMemory(addrToRead, addrToWrite, bytesRemaining);
bb.position(bb.position()+bytesRemaining);
} else {
long addr = this.chunk.getAddressForReading(0, maxOffset);
final long endAddr = addr + maxOffset;
while (addr != endAddr) {
bb.put(UnsafeMemoryChunk.readAbsoluteByte(addr));
addr++;
if (!bb.hasRemaining()) {
flushable.flush();
}
}
}
}
@Override
public String getBytesAsString() {
return this.chunk.getStringForm();
}
}
public static ValueWrapper createValueWrapper(Object value, EntryEventImpl event) {
if (value == Token.INVALID) {
// even though it is not serialized we say it is because
// bytes will never be an empty array when it is serialized
// so that gives us a way to specify the invalid value
// given a byte array and a boolean flag.
return INVALID_VW;
}
else if (value == Token.LOCAL_INVALID) {
// even though it is not serialized we say it is because
// bytes will never be an empty array when it is serialized
// so that gives us a way to specify the local-invalid value
// given a byte array and a boolean flag.
return LOCAL_INVALID_VW;
}
else if (value == Token.TOMBSTONE) {
return TOMBSTONE_VW;
}
else {
boolean isSerializedObject = true;
byte[] bytes;
if (value instanceof CachedDeserializable) {
CachedDeserializable proxy = (CachedDeserializable)value;
if (proxy instanceof Chunk) {
return new ChunkValueWrapper((Chunk) proxy);
}
if (proxy instanceof StoredObject) {
StoredObject ohproxy = (StoredObject) proxy;
isSerializedObject = ohproxy.isSerialized();
if (isSerializedObject) {
bytes = ohproxy.getSerializedValue();
} else {
bytes = (byte[]) ohproxy.getDeserializedForReading();
}
} else {
bytes = proxy.getSerializedValue();
}
if (event != null && isSerializedObject) {
event.setCachedSerializedNewValue(bytes);
}
}
else if (value instanceof byte[]) {
isSerializedObject = false;
bytes = (byte[])value;
}
else {
Assert.assertTrue(!Token.isRemovedFromDisk(value));
if (event != null && event.getCachedSerializedNewValue() != null) {
bytes = event.getCachedSerializedNewValue();
} else {
bytes = EntryEventImpl.serialize(value);
if (bytes.length == 0) {
throw new IllegalStateException("serializing <" + value + "> produced empty byte array");
}
if (event != null) {
event.setCachedSerializedNewValue(bytes);
}
}
}
return new ByteArrayValueWrapper(isSerializedObject, bytes);
}
}
public static ValueWrapper createValueWrapperFromEntry(DiskEntry entry, LocalRegion region, EntryEventImpl event) {
if (event != null) {
// For off-heap it should be faster to pass a reference to the
// StoredObject instead of using the cached byte[] (unless it is also compressed).
// Since NIO is used if the chunk of memory is large we can write it
// to the file with using the off-heap memory with no extra copying.
// So we give preference to getRawNewValue over getCachedSerializedNewValue
Object rawValue = null;
if (!event.hasDelta()) {
// We don't do this for the delta case because getRawNewValue returns delta
// and we want to write the entire new value to disk.
rawValue = event.getRawNewValue();
if (rawValue instanceof Chunk) {
return new ChunkValueWrapper((Chunk) rawValue);
}
}
if (event.getCachedSerializedNewValue() != null) {
return new ByteArrayValueWrapper(true, event.getCachedSerializedNewValue());
}
if (rawValue != null) {
return createValueWrapper(rawValue, event);
}
}
// TODO OFFHEAP: No need to retain since we hold the sync on entry but we need a flavor of _getValue that will decompress
@Retained Object value = entry._getValueRetain(region, true);
try {
return createValueWrapper(value, event);
} finally {
OffHeapHelper.release(value);
}
}
private static void writeToDisk(DiskEntry entry, LocalRegion region, boolean async) throws RegionClearedException {
writeToDisk(entry, region, async, null);
}
/**
* Writes the key/value object stored in the given entry to disk
* @throws RegionClearedException
*
* @see DiskRegion#put
*/
private static void writeToDisk(DiskEntry entry, LocalRegion region, boolean async, EntryEventImpl event) throws RegionClearedException {
writeBytesToDisk(entry, region, async, createValueWrapperFromEntry(entry, region, event));
}
private static void writeBytesToDisk(DiskEntry entry, LocalRegion region, boolean async, ValueWrapper vw) throws RegionClearedException {
// @todo does the following unmark need to be called when an async
// write is scheduled or is it ok for doAsyncFlush to do it?
entry.getDiskId().unmarkForWriting();
region.getDiskRegion().put(entry, region, vw, async);
}
public static void update(DiskEntry entry, LocalRegion region, Object newValue) throws RegionClearedException {
update(entry, region, newValue, null);
}
/**
* Updates the value of the disk entry with a new value. This allows us to
* free up disk space in the non-backup case.
*
* @throws RegionClearedException
*/
public static void update(DiskEntry entry, LocalRegion region, Object newValue, EntryEventImpl event) throws RegionClearedException {
DiskRegion dr = region.getDiskRegion();
if (newValue == null) {
throw new NullPointerException(LocalizedStrings.DiskEntry_ENTRYS_VALUE_SHOULD_NOT_BE_NULL.toLocalizedString());
}
//If we have concurrency checks enabled for a persistent region, we need
//to add an entry to the async queue for every update to maintain the RVV
boolean maintainRVV = region.concurrencyChecksEnabled && dr.isBackup();
Token oldValue = null;
int oldValueLength = 0;
boolean scheduleAsync = false;
boolean callRemoveFromDisk = false;
DiskId did = entry.getDiskId();
VersionTag tag = null;
Object syncObj = did;
if (syncObj == null) {
syncObj = entry;
}
if (syncObj == did) {
dr.acquireReadLock();
}
try {
synchronized (syncObj) {
oldValue = entry.getValueAsToken();
if (Token.isRemovedFromDisk(newValue)) {
if (dr.isBackup()) {
dr.testIsRecoveredAndClear(did); // fixes bug 41409
}
RuntimeException rte = null;
try {
if (!Token.isRemovedFromDisk(oldValue)) {
// removeFromDisk takes care of oldValueLength
if (dr.isSync()) {
removeFromDisk(entry, region, false);
} else {
callRemoveFromDisk = true; // do it outside the sync
}
}
} catch (RuntimeException e) {
rte = e;
throw e;
}
finally {
if (rte != null && (rte instanceof CacheClosedException)) {
// 47616: not to set the value to be removedFromDisk since it failed to persist
} else {
// Asif Ensure that the value is rightly set despite clear so
// that it can be distributed correctly
entry.setValueWithContext(region, newValue); // OFFHEAP newValue was already preparedForCache
}
}
}
else if (newValue instanceof RecoveredEntry) {
// Now that oplog creates are immediately put in cache
// a later oplog modify will get us here
RecoveredEntry re = (RecoveredEntry)newValue;
long oldKeyId = did.getKeyId();
long oldOplogId = did.getOplogId();
long newOplogId = re.getOplogId();
if (newOplogId != oldOplogId) {
did.setOplogId(newOplogId);
re.setOplogId(oldOplogId); // so caller knows oldoplog id
}
did.setOffsetInOplog(re.getOffsetInOplog());
// id already set
did.setUserBits(re.getUserBits());
oldValueLength = did.getValueLength();
did.setValueLength(re.getValueLength());
// The following undo and then do fixes bug 41849
// First, undo the stats done for the previous recovered value
if (oldKeyId < 0) {
dr.incNumOverflowOnDisk(-1L);
incrementBucketStats(region, 0/*InVM*/, -1/*OnDisk*/, -oldValueLength);
} else {
dr.incNumEntriesInVM(-1L);
incrementBucketStats(region, -1/*InVM*/, 0/*OnDisk*/, 0);
}
// Second, do the stats done for the current recovered value
if (re.getRecoveredKeyId() < 0) {
if (!entry.isValueNull()) {
try {
entry.handleValueOverflow(region);
entry.setValueWithContext(region, null); // fixes bug 41119
}finally {
entry.afterValueOverflow(region);
}
}
dr.incNumOverflowOnDisk(1L);
incrementBucketStats(region, 0/*InVM*/, 1/*OnDisk*/,
did.getValueLength());
} else {
entry.setValueWithContext(region, entry.prepareValueForCache(region, re.getValue(), false));
dr.incNumEntriesInVM(1L);
incrementBucketStats(region, 1/*InVM*/, 0/*OnDisk*/, 0);
}
}
else {
//The new value in the entry needs to be set after the disk writing
// has succeeded. If not , for GemFireXD , it is possible that other thread
// may pick this transient value from region entry ( which for
//offheap will eventually be released ) as index key,
//given that this operation is bound to fail in case of
//disk access exception.
//entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
if (dr.isBackup()) {
dr.testIsRecoveredAndClear(did); // fixes bug 41409
oldValueLength = getValueLength(did);
if (dr.isSync()) {
//In case of compression the value is being set first
// because atleast for now , GemFireXD does not support compression
// if and when it does support, this needs to be taken care of else
// we risk Bug 48965
if (AbstractRegionEntry.isCompressible(dr, newValue)) {
entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
// newValue is prepared and compressed. We can't write compressed values to disk.
writeToDisk(entry, region, false, event);
} else {
writeBytesToDisk(entry, region, false, createValueWrapper(newValue, event));
entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
}
} else if (did.isPendingAsync() && !maintainRVV) {
entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
// nothing needs to be done except
// fixing up LRU stats
// @todo fixup LRU stats if needed
// I'm not sure anything needs to be done here.
// If we have overflow and it decided to evict this entry
// how do we handle that case when we are async?
// Seems like the eviction code needs to leave the value
// in memory until the pendingAsync is done.
} else {
//if the entry is not async, we need to schedule it
//for regions with concurrency checks enabled, we add an entry
//to the queue for every entry.
scheduleAsync = true;
did.setPendingAsync(true);
VersionStamp stamp = entry.getVersionStamp();
if(stamp != null) {
tag = stamp.asVersionTag();
}
entry.setValueWithContext(region, newValue);
}
} else if (did != null) {
entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
// Mark the id as needing to be written
// The disk remove that this section used to do caused bug 30961
// @todo this seems wrong. How does leaving it on disk fix the bug?
did.markForWriting();
//did.setValueSerializedSize(0);
}else {
entry.setValueWithContext(region, newValue);
}
if (Token.isRemovedFromDisk(oldValue)) {
// Note we now initialize entries removed and then set their
// value once we find no existing entry.
// So this is the normal path for a brand new entry.
dr.incNumEntriesInVM(1L);
incrementBucketStats(region, 1/*InVM*/, 0/*OnDisk*/, 0);
}
}
if (entry instanceof LRUEntry) {
LRUEntry le = (LRUEntry)entry;
boolean wasEvicted = le.testEvicted();
le.unsetEvicted();
if (!Token.isRemovedFromDisk(newValue)) {
if (oldValue == null
// added null check for bug 41759
|| wasEvicted && did != null && did.isPendingAsync()) {
// Note we do not append this entry because that will be
// done by lruEntryUpdate
dr.incNumEntriesInVM(1L);
dr.incNumOverflowOnDisk(-1L);
incrementBucketStats(region, 1/*InVM*/, -1/*OnDisk*/, -oldValueLength);
}
}
}
}
} finally {
if (syncObj == did) {
dr.releaseReadLock();
}
}
if (callRemoveFromDisk) {
removeFromDisk(entry, region, false, oldValue == null, false);
} else if (scheduleAsync && did.isPendingAsync()) {
// this needs to be done outside the above sync
scheduleAsyncWrite(new AsyncDiskEntry(region, entry, tag));
}
}
private static int getValueLength(DiskId did) {
int result = 0;
if (did != null) {
synchronized (did) {
result = did.getValueLength();
}
}
return result;
}
public static void updateRecoveredEntry(PlaceHolderDiskRegion drv,
DiskEntry entry,
RecoveredEntry newValue,RegionEntryContext context)
{
if (newValue == null) {
throw new NullPointerException(LocalizedStrings.DiskEntry_ENTRYS_VALUE_SHOULD_NOT_BE_NULL.toLocalizedString());
}
DiskId did = entry.getDiskId();
synchronized (did) {
boolean oldValueWasNull = entry.isValueNull();
// Now that oplog creates are immediately put in cache
// a later oplog modify will get us here
long oldOplogId = did.getOplogId();
long newOplogId = newValue.getOplogId();
if (newOplogId != oldOplogId) {
did.setOplogId(newOplogId);
newValue.setOplogId(oldOplogId); // so caller knows oldoplog id
}
did.setOffsetInOplog(newValue.getOffsetInOplog());
// id already set
did.setUserBits(newValue.getUserBits());
did.setValueLength(newValue.getValueLength());
if (newValue.getRecoveredKeyId() >= 0) {
entry.setValueWithContext(context, entry.prepareValueForCache(drv, newValue.getValue(),
false));
} else {
if (!oldValueWasNull) {
try {
entry.handleValueOverflow(context);
entry.setValueWithContext(context,null); // fixes bug 41119
}finally {
entry.afterValueOverflow(context);
}
}
}
if (entry instanceof LRUEntry) {
LRUEntry le = (LRUEntry)entry;
assert !le.testEvicted();
// we don't allow eviction during recovery
if (oldValueWasNull) {
// Note we do not append this entry because that will be
// done by lruEntryUpdate
drv.incNumEntriesInVM(1L);
drv.incNumOverflowOnDisk(-1L);
}
}
}
}
public static Object getValueInVMOrDiskWithoutFaultIn(DiskEntry entry, LocalRegion region) {
Object result = OffHeapHelper.copyAndReleaseIfNeeded(getValueOffHeapOrDiskWithoutFaultIn(entry, region));
if (result instanceof CachedDeserializable) {
result = ((CachedDeserializable)result).getDeserializedValue(null, null);
}
if (result instanceof StoredObject) {
((StoredObject) result).release();
throw new IllegalStateException("sqlf tried to use getValueInVMOrDiskWithoutFaultIn");
}
return result;
}
@Retained
public static Object getValueOffHeapOrDiskWithoutFaultIn(DiskEntry entry, LocalRegion region) {
@Retained Object v = entry._getValueRetain(region, true); // TODO:KIRK:OK Object v = entry.getValueWithContext(region);
if (v == null || Token.isRemovedFromDisk(v)
&& !region.isIndexCreationThread()) {
synchronized (entry) {
v = entry._getValueRetain(region, true); // TODO:KIRK:OK v = entry.getValueWithContext(region);
if (v == null) {
v = Helper.getOffHeapValueOnDiskOrBuffer(entry, region.getDiskRegion(),region);
}
}
}
if (Token.isRemovedFromDisk(v)) {
// fix for bug 31800
v = null;
// } else if (v instanceof ByteSource) {
// // If the ByteSource contains a Delta or ListOfDelta then we want to deserialize it
// Object deserVal = ((CachedDeserializable)v).getDeserializedForReading();
// if (deserVal != v) {
// OffHeapHelper.release(v);
// v = deserVal;
// }
}
return v;
}
/**
*
* @param entry
* @param region
* @return Value
* @throws DiskAccessException
*/
public static Object faultInValue(DiskEntry entry, LocalRegion region) {
return faultInValue(entry, region, false);
}
@Retained
public static Object faultInValueRetain(DiskEntry entry, LocalRegion region) {
return faultInValue(entry, region, true);
}
/**
* @param retainResult if true then the result may be a retained off-heap reference
*/
@Retained
private static Object faultInValue(DiskEntry entry, LocalRegion region, boolean retainResult)
{
DiskRegion dr = region.getDiskRegion();
@Retained Object v = entry._getValueRetain(region, true); // TODO:KIRK:OK Object v = entry.getValueWithContext(region);
boolean lruFaultedIn = false;
boolean done = false;
try {
//Asif: If the entry is instance of LRU then DidkRegion cannot be null.
//Since SqlFabric is accessing this method direcly & it passes the owning region,
//if the region happens to be persistent PR type, the owning region passed is PR,
// but it will have DiskRegion as null. SqlFabric takes care of passing owning region
// as BucketRegion in case of Overflow type entry. This is fix for Bug # 41804
if ( entry instanceof LRUEntry && !dr.isSync() ) {
synchronized (entry) {
DiskId did = entry.getDiskId();
if (did != null && did.isPendingAsync()) {
done = true;
// See if it is pending async because of a faultOut.
// If so then if we are not a backup then we can unschedule the pending async.
// In either case we need to do the lruFaultIn logic.
boolean evicted = ((LRUEntry)entry).testEvicted();
if (evicted) {
if (!dr.isBackup()) {
// @todo do we also need a bit that tells us if it is in the async queue?
// Seems like we could end up adding it to the queue multiple times.
did.setPendingAsync(false);
}
// since it was evicted fix the stats here
dr.incNumEntriesInVM(1L);
dr.incNumOverflowOnDisk(-1L);
// no need to dec overflowBytesOnDisk because it was not inced in this case.
incrementBucketStats(region, 1/*InVM*/, -1/*OnDisk*/, 0);
}
lruEntryFaultIn((LRUEntry) entry, region);
lruFaultedIn = true;
}
}
}
if (!done
&& (v == null || Token.isRemovedFromDisk(v) && !region.isIndexCreationThread())) {
synchronized (entry) {
v = entry._getValueRetain(region, true); // TODO:KIRK:OK v = entry.getValueWithContext(region);
if (v == null) {
v = readValueFromDisk(entry, region);
if (entry instanceof LRUEntry) {
if (v != null && !Token.isInvalid(v)) {
lruEntryFaultIn((LRUEntry) entry, region);
lruFaultedIn = true;
}
}
}
}
}
} finally {
if (!retainResult) {
v = OffHeapHelper.copyAndReleaseIfNeeded(v);
// At this point v should be either a heap object
}
}
if (Token.isRemoved(v)) {
// fix for bug 31800
v = null;
} else {
((RegionEntry)entry).setRecentlyUsed();
}
if (lruFaultedIn) {
lruUpdateCallback(region);
}
return v; // OFFHEAP: the value ends up being returned by RegionEntry.getValue
}
public static void recoverValue(DiskEntry entry, long oplogId, DiskRecoveryStore recoveryStore, ByteArrayDataInput in) {
boolean lruFaultedIn = false;
synchronized (entry) {
if (entry.isValueNull()) {
DiskId did = entry.getDiskId();
if (did != null) {
Object value = null;
DiskRecoveryStore region = recoveryStore;
DiskRegionView dr = region.getDiskRegionView();
dr.acquireReadLock();
try {
synchronized (did) {
// don't read if the oplog has changed.
if (oplogId == did.getOplogId()) {
value = getValueFromDisk(dr, did, in);
if (value != null) {
setValueOnFaultIn(value, did, entry, dr, region);
}
}
}
} finally {
dr.releaseReadLock();
}
if (entry instanceof LRUEntry) {
if (value != null && !Token.isInvalid(value)) {
lruEntryFaultIn((LRUEntry) entry, recoveryStore);
lruFaultedIn = true;
}
}
}
}
}
if (lruFaultedIn) {
lruUpdateCallback(recoveryStore);
}
}
/**
* Caller must have "did" synced.
*/
private static Object getValueFromDisk(DiskRegionView dr, DiskId did, ByteArrayDataInput in) {
Object value;
if (dr.isBackup() && did.getKeyId() == DiskRegion.INVALID_ID) {
// must have been destroyed
value = null;
} else {
if (did.isKeyIdNegative()) {
did.setKeyId(- did.getKeyId());
}
// if a bucket region then create a CachedDeserializable here instead of object
value = dr.getRaw(did); // fix bug 40192
if (value instanceof BytesAndBits) {
BytesAndBits bb = (BytesAndBits)value;
if (EntryBits.isInvalid(bb.getBits())) {
value = Token.INVALID;
} else if (EntryBits.isLocalInvalid(bb.getBits())) {
value = Token.LOCAL_INVALID;
} else if (EntryBits.isTombstone(bb.getBits())) {
value = Token.TOMBSTONE;
} else if (EntryBits.isSerialized(bb.getBits())) {
value = readSerializedValue(bb.getBytes(), bb.getVersion(), in, false);
} else {
value = readRawValue(bb.getBytes(), bb.getVersion(), in);
}
}
}
return value;
}
private static void lruUpdateCallback(DiskRecoveryStore recoveryStore) {
/*
* Used conditional check to see if
* if its a LIFO Enabled,
* yes then disable lruUpdateCallback()
* and called updateStats()
* its keep track of actual entries
* present in memory - useful when
* checking capacity constraint
*/
try {
if (recoveryStore.getEvictionAttributes() != null
&& recoveryStore.getEvictionAttributes().getAlgorithm().isLIFO()) {
((VMLRURegionMap) recoveryStore.getRegionMap()).updateStats();
return;
}
// this must be done after releasing synchronization
recoveryStore.getRegionMap().lruUpdateCallback();
}catch( DiskAccessException dae) {
recoveryStore.handleDiskAccessException(dae);
throw dae;
}
}
private static void lruEntryFaultIn(LRUEntry entry, DiskRecoveryStore recoveryStore) {
RegionMap rm = (RegionMap)recoveryStore.getRegionMap();
try {
rm.lruEntryFaultIn((LRUEntry) entry);
}catch(DiskAccessException dae) {
recoveryStore.handleDiskAccessException(dae);
throw dae;
}
}
/**
* Returns the value of this map entry, reading it from disk, if necessary.
* Sets the value in the entry.
* This is only called by the faultIn code once it has determined that
* the value is no longer in memory.
* return the result will only be off-heap if the value is a sqlf ByteSource. Otherwise result will be on-heap.
* Caller must have "entry" synced.
*/
@Retained
private static Object readValueFromDisk(DiskEntry entry, DiskRecoveryStore region) {
DiskRegionView dr = region.getDiskRegionView();
DiskId did = entry.getDiskId();
if (did == null) {
return null;
}
dr.acquireReadLock();
try {
synchronized (did) {
Object value = getValueFromDisk(dr, did, null);
if (value == null) return null;
@Unretained Object preparedValue = setValueOnFaultIn(value, did, entry, dr, region);
// For Sqlfire we want to return the offheap representation.
// So we need to retain it for the caller to release.
/*if (preparedValue instanceof ByteSource) {
// This is the only case in which we return a retained off-heap ref.
((ByteSource)preparedValue).retain();
return preparedValue;
} else */{
return value;
}
}
} finally {
dr.releaseReadLock();
}
}
/**
* Caller must have "entry" and "did" synced and "dr" readLocked.
* @return the unretained result must be used by the caller before it releases the sync on "entry".
*/
@Unretained
private static Object setValueOnFaultIn(Object value, DiskId did, DiskEntry entry, DiskRegionView dr, DiskRecoveryStore region) {
// dr.getOwner().getCache().getLogger().info("DEBUG: faulting in entry with key " + entry.getKey());
int bytesOnDisk = getValueLength(did);
// Retained by the prepareValueForCache call for the region entry.
// NOTE that we return this value unretained because the retain is owned by the region entry not the caller.
@Retained Object preparedValue = entry.prepareValueForCache((RegionEntryContext) region, value,
false);
region.updateSizeOnFaultIn(entry.getKey(), region.calculateValueSize(preparedValue), bytesOnDisk);
//did.setValueSerializedSize(0);
// I think the following assertion is true but need to run
// a regression with it. Reenable this post 6.5
//Assert.assertTrue(entry._getValue() == null);
entry.setValueWithContext((RegionEntryContext) region, preparedValue);
dr.incNumEntriesInVM(1L);
dr.incNumOverflowOnDisk(-1L);
incrementBucketStats(region, 1/*InVM*/, -1/*OnDisk*/, -bytesOnDisk);
return preparedValue;
}
static Object readSerializedValue(byte[] valueBytes, Version version,
ByteArrayDataInput in, boolean forceDeserialize) {
if (forceDeserialize) {
// deserialize checking for product version change
return EntryEventImpl.deserialize(valueBytes, version, in);
}
else {
// TODO: upgrades: is there a case where GemFire values are internal
// ones that need to be upgraded transparently; probably messages
// being persisted (gateway events?)
return CachedDeserializableFactory.create(valueBytes);
}
}
static Object readRawValue(byte[] valueBytes, Version version,
ByteArrayDataInput in) {
/*
final StaticSystemCallbacks sysCb;
if (version != null && (sysCb = GemFireCacheImpl.FactoryStatics
.systemCallbacks) != null) {
// may need to change serialized shape for SQLFire
return sysCb.fromVersion(valueBytes, false, version, in);
}
else */ {
return valueBytes;
}
}
public static void incrementBucketStats(Object owner,
int entriesInVmDelta,
int overflowOnDiskDelta,
int overflowBytesOnDiskDelta) {
if (owner instanceof BucketRegion) {
((BucketRegion)owner).incNumEntriesInVM(entriesInVmDelta);
((BucketRegion)owner).incNumOverflowOnDisk(overflowOnDiskDelta);
((BucketRegion)owner).incNumOverflowBytesOnDisk(overflowBytesOnDiskDelta);
} else if (owner instanceof DiskRegionView) {
((DiskRegionView)owner).incNumOverflowBytesOnDisk(overflowBytesOnDiskDelta);
}
}
/**
* Writes the value of this <code>DiskEntry</code> to disk and
* <code>null</code> s out the reference to the value to free up VM space.
* <p>
* Note that if the value had already been written to disk, it is not
* written again.
* <p>
* Caller must synchronize on entry and it is assumed the entry is evicted
*
* see #writeToDisk
* @throws RegionClearedException
*/
public static int overflowToDisk(DiskEntry entry, LocalRegion region, EnableLRU ccHelper) throws RegionClearedException {
{
Token entryVal = entry.getValueAsToken();
if (entryVal == null || Token.isRemovedFromDisk(entryVal)) {
// Note it could be removed token now because
// freeAllEntriesOnDisk is not able to sync on entry
return 0;
}
}
DiskRegion dr = region.getDiskRegion();
final int oldSize = region.calculateRegionEntryValueSize(entry);;
//Asif:Get diskID . If it is null, it implies it is
// overflow only mode.
//long id = entry.getDiskId().getKeyId();
DiskId did = entry.getDiskId();
if (did == null) {
((LRUEntry)entry).setDelayedDiskId(region);
did = entry.getDiskId();
}
// Notify the SQLFire IndexManager if present
/* final IndexUpdater indexUpdater = region.getIndexUpdater();
if(indexUpdater != null && dr.isSync()) {
indexUpdater.onOverflowToDisk(entry);
}*/
int change = 0;
boolean scheduledAsyncHere = false;
dr.acquireReadLock();
try {
synchronized (did) {
// check for a concurrent freeAllEntriesOnDisk
if (entry.isRemovedFromDisk()) {
return 0;
}
//TODO:Asif: Check if we need to overflow even when id is = 0
boolean wasAlreadyPendingAsync = did.isPendingAsync();
if (did.needsToBeWritten()) {
if (dr.isSync()) {
writeToDisk(entry, region, false);
} else if (!wasAlreadyPendingAsync) {
scheduledAsyncHere = true;
did.setPendingAsync(true);
} else {
// it may have been scheduled to be written (isBackup==true)
// and now we are faulting it out
}
}
boolean movedValueToDisk = false; // added for bug 41849
// If async then if it does not need to be written (because it already was)
// then treat it like the sync case. This fixes bug 41310
if (scheduledAsyncHere || wasAlreadyPendingAsync) {
// we call _setValue(null) after it is actually written to disk
change = entry.updateAsyncEntrySize(ccHelper);
// do the stats when it is actually written to disk
} else {
region.updateSizeOnEvict(entry.getKey(), oldSize);
//did.setValueSerializedSize(byteSizeOnDisk);
try {
entry.handleValueOverflow(region);
entry.setValueWithContext(region,null);
}finally {
entry.afterValueOverflow(region);
}
movedValueToDisk = true;
change = ((LRUClockNode)entry).updateEntrySize(ccHelper);
}
dr.incNumEntriesInVM(-1L);
dr.incNumOverflowOnDisk(1L);
int valueLength = 0;
if (movedValueToDisk) {
valueLength = getValueLength(did);
}
incrementBucketStats(region, -1/*InVM*/, 1/*OnDisk*/, valueLength);
}
} finally {
dr.releaseReadLock();
}
if (scheduledAsyncHere && did.isPendingAsync()) {
// this needs to be done outside the above sync
// the version tag is null here because this method only needs
// to write to disk for overflow only regions, which do not need
// to maintain an RVV on disk.
scheduleAsyncWrite(new AsyncDiskEntry(region, entry, null));
}
return change;
}
private static void scheduleAsyncWrite(AsyncDiskEntry ade) {
DiskRegion dr = ade.region.getDiskRegion();
dr.scheduleAsyncWrite(ade);
}
public static void handleFullAsyncQueue(DiskEntry entry, LocalRegion region, VersionTag tag) {
DiskRegion dr = region.getDiskRegion();
DiskId did = entry.getDiskId();
synchronized (entry) {
dr.acquireReadLock();
try {
synchronized (did) {
if (did.isPendingAsync()) {
did.setPendingAsync(false);
final Token entryVal = entry.getValueAsToken();
final int entryValSize = region.calculateRegionEntryValueSize(entry);
boolean remove = false;
try {
if (Token.isRemovedFromDisk(entryVal)) {
// onDisk was already deced so just do the valueLength here
incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/,
-did.getValueLength());
dr.remove(region, entry, true, false);
if (dr.isBackup()) {
did.setKeyId(DiskRegion.INVALID_ID); // fix for bug 41340
}
remove = true;
} else if (Token.isInvalid(entryVal) && !dr.isBackup()) {
// no need to write invalid to disk if overflow only
} else if (entryVal != null) {
writeToDisk(entry, region, true);
} else {
//if we have a version tag we need to record the operation
//to update the RVV
if(tag != null) {
DiskEntry.Helper.doAsyncFlush(tag, region);
}
return;
}
assert !dr.isSync();
// Only setValue to null if this was an evict.
// We could just be a backup that is writing async.
if (!remove
&& !Token.isInvalid(entryVal)
&& entry instanceof LRUEntry
&& ((LRUEntry)entry).testEvicted()) {
// Moved this here to fix bug 40116.
region.updateSizeOnEvict(entry.getKey(), entryValSize);
// note the old size was already accounted for
// onDisk was already inced so just do the valueLength here
incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/,
did.getValueLength());
try {
entry.handleValueOverflow(region);
entry.setValueWithContext(region,null);
}finally {
entry.afterValueOverflow(region);
}
}
//See if we the entry we wrote to disk has the same tag
//as this entry. If not, write the tag as a conflicting operation.
//to update the RVV.
VersionStamp stamp = entry.getVersionStamp();
if(tag != null && stamp != null
&& (stamp.getMemberID() != tag.getMemberID()
|| stamp.getRegionVersion() != tag.getRegionVersion())) {
DiskEntry.Helper.doAsyncFlush(tag, region);
}
} catch (RegionClearedException ignore) {
// no need to do the op since it was clobbered by a region clear
}
} else {
//if we have a version tag we need to record the operation
//to update the RVV, even if we don't write the entry
if(tag != null) {
DiskEntry.Helper.doAsyncFlush(tag, region);
}
}
}
} finally {
dr.releaseReadLock();
}
} // sync entry
}
public static void doAsyncFlush(VersionTag tag, LocalRegion region) {
if (region.isThisRegionBeingClosedOrDestroyed()) return;
DiskRegion dr = region.getDiskRegion();
if (!dr.isBackup()) {
return;
}
assert !dr.isSync();
dr.acquireReadLock();
try {
dr.getDiskStore().putVersionTagOnly(region, tag, true);
} finally {
dr.releaseReadLock();
}
}
/**
* Flush an entry that was previously scheduled to be written to disk.
* @param tag
* @since prPersistSprint1
*/
public static void doAsyncFlush(DiskEntry entry, LocalRegion region, VersionTag tag) {
if (region.isThisRegionBeingClosedOrDestroyed()) return;
DiskRegion dr = region.getDiskRegion();
dr.setClearCountReference();
synchronized (entry) { // fixes 40116
// If I don't sync the entry and this method ends up doing an eviction
// thus setting value to null
// some other thread is free to fetch the value while the entry is synced
// and think it has removed it or replaced it. This results in updateSizeOn*
// being called twice for the same value (once when it is evicted and once
// when it is removed/updated).
try {
dr.acquireReadLock();
try {
DiskId did = entry.getDiskId();
synchronized (did) {
if (did.isPendingAsync()) {
did.setPendingAsync(false);
final Token entryVal = entry.getValueAsToken();
final int entryValSize = region.calculateRegionEntryValueSize(entry);
boolean remove = false;
try {
if (Token.isRemovedFromDisk(entryVal)) {
if (region.isThisRegionBeingClosedOrDestroyed()) return;
// onDisk was already deced so just do the valueLength here
incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/,
-did.getValueLength());
dr.remove(region, entry, true, false);
if (dr.isBackup()) {
did.setKeyId(DiskRegion.INVALID_ID); // fix for bug 41340
}
remove = true;
} else if ((Token.isInvalid(entryVal) || entryVal == Token.TOMBSTONE) && !dr.isBackup()) {
// no need to write invalid or tombstones to disk if overflow only
} else if (entryVal != null) {
writeToDisk(entry, region, true);
} else {
// @todo why would we have a null value here?
// I'm seeing it show up in tests:
// java.lang.IllegalArgumentException: Must not serialize null in this context.
// at com.gemstone.gemfire.internal.cache.EntryEventImpl.serialize(EntryEventImpl.java:1024)
// at com.gemstone.gemfire.internal.cache.DiskEntry$Helper.writeToDisk(DiskEntry.java:351)
// at com.gemstone.gemfire.internal.cache.DiskEntry$Helper.doAsyncFlush(DiskEntry.java:683)
// at com.gemstone.gemfire.internal.cache.DiskRegion$FlusherThread.run(DiskRegion.java:1055)
//if we have a version tag we need to record the operation
//to update the RVV
if(tag != null) {
DiskEntry.Helper.doAsyncFlush(tag, region);
}
return;
}
assert !dr.isSync();
// Only setValue to null if this was an evict.
// We could just be a backup that is writing async.
if (!remove
&& !Token.isInvalid(entryVal)
&& (entryVal != Token.TOMBSTONE)
&& entry instanceof LRUEntry
&& ((LRUEntry)entry).testEvicted()) {
// Moved this here to fix bug 40116.
region.updateSizeOnEvict(entry.getKey(), entryValSize);
// note the old size was already accounted for
// onDisk was already inced so just do the valueLength here
incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/,
did.getValueLength());
try {
entry.handleValueOverflow(region);
entry.setValueWithContext(region,null);
}finally {
entry.afterValueOverflow(region);
}
}
} catch (RegionClearedException ignore) {
// no need to do the op since it was clobbered by a region clear
}
//See if we the entry we wrote to disk has the same tag
//as this entry. If not, write the tag as a conflicting operation.
//to update the RVV.
VersionStamp stamp = entry.getVersionStamp();
if(tag != null && stamp != null
&& (stamp.getMemberID() != tag.getMemberID()
|| stamp.getRegionVersion() != tag.getRegionVersion())) {
DiskEntry.Helper.doAsyncFlush(tag, region);
}
} else {
//if we have a version tag we need to record the operation
//to update the RVV
if(tag != null) {
DiskEntry.Helper.doAsyncFlush(tag, region);
}
}
}
} finally {
dr.releaseReadLock();
}
} finally {
dr.removeClearCountReference();
}
} // sync entry
}
/**
* Removes the key/value pair in the given entry from disk
*
* @throws RegionClearedException If the operation is aborted due to a clear
* @see DiskRegion#remove
*/
public static void removeFromDisk(DiskEntry entry, LocalRegion region, boolean isClear) throws RegionClearedException {
removeFromDisk(entry, region, true, false, isClear);
}
private static void removeFromDisk(DiskEntry entry, LocalRegion region,
boolean checkValue, boolean valueWasNull, boolean isClear) throws RegionClearedException {
DiskRegion dr = region.getDiskRegion();
//If we have concurrency checks enabled for a persistent region, we need
//to add an entry to the async queue for every update to maintain the RVV
boolean maintainRVV = region.concurrencyChecksEnabled && dr.isBackup();
DiskId did = entry.getDiskId();
VersionTag tag = null;
Object syncObj = did;
if (did == null) {
syncObj = entry;
}
boolean scheduledAsyncHere = false;
if (syncObj == did) {
dr.acquireReadLock();
}
try {
synchronized (syncObj) {
if (did == null || (dr.isBackup() && did.getKeyId()== DiskRegion.INVALID_ID)) {
// Not on disk yet
dr.incNumEntriesInVM(-1L);
incrementBucketStats(region, -1/*InVM*/, 0/*OnDisk*/, 0);
dr.unscheduleAsyncWrite(did);
return;
}
//Asif: This will convert the -ve OplogKeyId to positive as part of fixing
//Bug # 39989
did.unmarkForWriting();
//System.out.println("DEBUG: removeFromDisk doing remove(" + id + ")");
int oldValueLength = 0;
if (dr.isSync() || isClear) {
oldValueLength = did.getValueLength();
dr.remove(region, entry, false, isClear);
if (dr.isBackup()) {
did.setKeyId(DiskRegion.INVALID_ID); // fix for bug 41340
}
//If this is a clear, we should unschedule the async write for this
//entry
did.setPendingAsync(false);
} else {
if (!did.isPendingAsync() || maintainRVV) {
scheduledAsyncHere = true;
did.setPendingAsync(true);
VersionStamp stamp = entry.getVersionStamp();
if(stamp != null) {
tag = stamp.asVersionTag();
}
}
}
if (checkValue) {
valueWasNull = entry.isValueNull();
entry._removePhase1();
}
if (valueWasNull) {
dr.incNumOverflowOnDisk(-1L);
incrementBucketStats(region, 0/*InVM*/, -1/*OnDisk*/, -oldValueLength);
}
else {
dr.incNumEntriesInVM(-1L);
incrementBucketStats(region, -1/*InVM*/, 0/*OnDisk*/, 0);
if (!dr.isSync()) {
// we are going to do an async remove of an entry that is not currently
// overflowed to disk so we don't want to count its value length as being
// on disk when we finally do the async op. So we clear it here.
did.setValueLength(0);
}
}
}
} finally {
if (syncObj == did) {
dr.releaseReadLock();
}
}
if (scheduledAsyncHere && did.isPendingAsync()) {
// do this outside the sync
scheduleAsyncWrite(new AsyncDiskEntry(region, entry, tag));
}
}
/**
* @param entry
* @param region
* @param tag
*/
public static void updateVersionOnly(DiskEntry entry, LocalRegion region,
VersionTag tag) {
DiskRegion dr = region.getDiskRegion();
if (!dr.isBackup()) {
return;
}
assert tag != null && tag.getMemberID()!=null;
boolean scheduleAsync = false;
DiskId did = entry.getDiskId();
Object syncObj = did;
if (syncObj == null) {
syncObj = entry;
}
if (syncObj == did) {
dr.acquireReadLock();
}
try {
synchronized (syncObj) {
if (dr.isSync()) {
dr.getDiskStore().putVersionTagOnly(region, tag, false);
} else {
scheduleAsync = true;
}
}
} finally {
if (syncObj == did) {
dr.releaseReadLock();
}
}
if (scheduleAsync) {
// this needs to be done outside the above sync
scheduleAsyncWrite(new AsyncDiskEntry(region, tag));
}
}
}
/**
* A marker object for an entry that has been recovered from disk.
* It is handled specially when it is placed in a region.
*/
public static class RecoveredEntry {
/** The disk id of the entry being recovered */
private final long recoveredKeyId;
/** The value of the recovered entry */
private final Object value;
private final long offsetInOplog;
private final byte userBits;
private final int valueLength;
private long oplogId;
private VersionTag tag;
/**
* Only for this constructor, the value is not loaded into the region & it is lying
* on the oplogs. Since Oplogs rely on DiskId to furnish user bits so as to correctly
* interpret bytes, the userbit needs to be set correctly here.
*/
public RecoveredEntry(long keyId, long oplogId, long offsetInOplog,
byte userBits, int valueLength) {
this(-keyId, oplogId, offsetInOplog, userBits, valueLength, null);
}
public RecoveredEntry(long keyId, long oplogId, long offsetInOplog,
byte userBits, int valueLength, Object value) {
this.recoveredKeyId = keyId;
this.value = value;
this.oplogId = oplogId;
this.offsetInOplog = offsetInOplog;
this.userBits = EntryBits.setRecoveredFromDisk(userBits, true);
this.valueLength = valueLength;
}
/**
* Returns the disk id of the entry being recovered
*/
public long getRecoveredKeyId() {
return this.recoveredKeyId;
}
/**
* Returns the value of the recovered entry. Note that if the
* disk id is < 0 then the value has not been faulted in and
* this method will return null.
*/
public Object getValue() {
return this.value;
}
/**
*
* @return byte indicating the user bits. The correct value is returned only in the specific case of
* entry recovered from oplog ( & not rolled to Htree) & the RECOVER_VALUES flag is false . In other cases
* the exact value is not needed
*/
public byte getUserBits() {
return this.userBits;
}
public int getValueLength() {
return this.valueLength;
}
public long getOffsetInOplog() {
return offsetInOplog;
}
public long getOplogId() {
return this.oplogId;
}
public void setOplogId(long v) {
this.oplogId = v;
}
public VersionTag getVersionTag() {
return this.tag;
}
public void setVersionTag(VersionTag tag) {
this.tag = tag;
}
}
}