blob: 6bad8b658925654b342de64560d34d91187eddac [file] [log] [blame]
package com.gemstone.gemfire.internal.offheap;
import com.gemstone.gemfire.internal.DSCODE;
import com.gemstone.gemfire.internal.cache.CachedDeserializableFactory;
import com.gemstone.gemfire.internal.cache.DiskEntry;
import com.gemstone.gemfire.internal.cache.DiskId;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.OffHeapRegionEntry;
import com.gemstone.gemfire.internal.cache.RegionEntryContext;
import com.gemstone.gemfire.internal.cache.Token;
import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.Chunk;
import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.DataAsAddress;
import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.internal.offheap.annotations.Retained;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
/**
* The class just has static methods
* that operate on instances of {@link OffHeapRegionEntry}.
* It allows common code to be shared for all the
* classes we have that implement {@link OffHeapRegionEntry}.
*
* @author darrel
* @since 9.0
*/
public class OffHeapRegionEntryHelper {
private static final long NULL_ADDRESS = 0L<<1;
private static final long INVALID_ADDRESS = 1L<<1;
private static final long LOCAL_INVALID_ADDRESS = 2L<<1;
private static final long DESTROYED_ADDRESS = 3L<<1;
public static final long REMOVED_PHASE1_ADDRESS = 4L<<1;
private static final long REMOVED_PHASE2_ADDRESS = 5L<<1;
private static final long END_OF_STREAM_ADDRESS = 6L<<1;
private static final long NOT_AVAILABLE_ADDRESS = 7L<<1;
private static final long TOMBSTONE_ADDRESS = 8L<<1;
public static final int MAX_LENGTH_FOR_DATA_AS_ADDRESS = 8;
/* private static final ChunkFactory chunkFactory ;
static {
ChunkFactory factory;
try {
factory= SimpleMemoryAllocatorImpl.getAllocator().getChunkFactory();
}catch(CacheClosedException ce) {
factory = null;
}
chunkFactory = factory;
}*/
private static final Token[] addrToObj = new Token[]{
null,
Token.INVALID,
Token.LOCAL_INVALID,
Token.DESTROYED,
Token.REMOVED_PHASE1,
Token.REMOVED_PHASE2,
Token.END_OF_STREAM,
Token.NOT_AVAILABLE,
Token.TOMBSTONE,
};
private static long objectToAddress(@Unretained Object v) {
if (v instanceof Chunk) return ((Chunk) v).getMemoryAddress();
if (v instanceof DataAsAddress) return ((DataAsAddress) v).getEncodedAddress();
if (v == null) return NULL_ADDRESS;
if (v == Token.TOMBSTONE) return TOMBSTONE_ADDRESS;
if (v == Token.INVALID) return INVALID_ADDRESS;
if (v == Token.LOCAL_INVALID) return LOCAL_INVALID_ADDRESS;
if (v == Token.DESTROYED) return DESTROYED_ADDRESS;
if (v == Token.REMOVED_PHASE1) return REMOVED_PHASE1_ADDRESS;
if (v == Token.REMOVED_PHASE2) return REMOVED_PHASE2_ADDRESS;
if (v == Token.END_OF_STREAM) return END_OF_STREAM_ADDRESS;
if (v == Token.NOT_AVAILABLE) return NOT_AVAILABLE_ADDRESS;
throw new IllegalStateException("Can not convert " + v + " to an off heap address.");
}
static Object encodedAddressToObject(long ohAddress) {
return encodedAddressToObject(ohAddress, true, true);
}
//TODO:Asif:Check if this is a valid equality conditions
public static boolean isAddressInvalidOrRemoved(long address) {
return address == INVALID_ADDRESS || address == LOCAL_INVALID_ADDRESS
|| address == REMOVED_PHASE2_ADDRESS || address == NULL_ADDRESS;
}
/**
* This method may release the object stored at ohAddress if the result
* needs to be decompressed and the decompress parameter is true.
* This decompressed result will be on the heap.
*
* @param ohAddress OFF_HEAP_ADDRESS
* @param decompress true if off-heap value should be decompressed before returning
* @param context used for decompression
* @return OFF_HEAP_OBJECT (sometimes)
*/
@Unretained @Retained
public static Object addressToObject(@Released @Retained long ohAddress, boolean decompress, RegionEntryContext context) {
if (isOffHeap(ohAddress)) {
//Chunk chunk = chunkFactory.newChunk(ohAddress);
@Unretained Chunk chunk = SimpleMemoryAllocatorImpl.getAllocator().getChunkFactory().newChunk(ohAddress);
@Unretained Object result = chunk;
if (decompress && chunk.isCompressed()) {
try {
// to fix bug 47982 need to:
byte[] decompressedBytes = chunk.getDecompressedBytes(context);
if (chunk.isSerialized()) {
// return a VMCachedDeserializable with the decompressed serialized bytes since chunk is serialized
result = CachedDeserializableFactory.create(decompressedBytes);
} else {
// return a byte[] since chunk is not serialized
result = decompressedBytes;
}
} finally {
// decompress is only true when this method is called by _getValueRetain.
// In that case the caller has already retained ohAddress because it thought
// we would return it. But we have unwrapped it and are returning the decompressed results.
// So we need to release the chunk here.
chunk.release();
}
}
return result;
} else if ((ohAddress & ENCODED_BIT) != 0) {
DataAsAddress daa = new DataAsAddress(ohAddress);
Object result = daa;
if (decompress && daa.isCompressed()) {
byte[] decompressedBytes = daa.getDecompressedBytes(context);
if (daa.isSerialized()) {
// return a VMCachedDeserializable with the decompressed serialized bytes since daa is serialized
result = CachedDeserializableFactory.create(decompressedBytes);
} else {
// return a byte[] since daa is not serialized
result = decompressedBytes;
}
}
return result;
} else {
return addrToObj[(int) ohAddress>>1];
}
}
public static int getSerializedLengthFromDataAsAddress(DataAsAddress dataAsAddress) {
final long ohAddress = dataAsAddress.getEncodedAddress();
if ((ohAddress & ENCODED_BIT) != 0) {
boolean isLong = (ohAddress & LONG_BIT) != 0;
if (isLong) {
return 9;
} else {
return (int) ((ohAddress & SIZE_MASK) >> SIZE_SHIFT);
}
} else {
return 0;
}
}
private static Token addressToToken(long ohAddress) {
if (isOffHeap(ohAddress) || (ohAddress & ENCODED_BIT) != 0) {
return Token.NOT_A_TOKEN;
} else {
return addrToObj[(int) ohAddress>>1];
}
}
private static void releaseAddress(@Released long ohAddress) {
if (isOffHeap(ohAddress)) {
Chunk.release(ohAddress, true);
}
}
/**
* The address in 're' will be @Released.
*/
public static void releaseEntry(@Released OffHeapRegionEntry re) {
if (re instanceof DiskEntry) {
DiskId did = ((DiskEntry) re).getDiskId();
if (did != null && did.isPendingAsync()) {
synchronized (did) {
// This may not be needed so remove this call if it causes problems.
// We no longer need this entry to be written to disk so unschedule it
// before we change its value to REMOVED_PHASE2.
did.setPendingAsync(false);
setValue(re, Token.REMOVED_PHASE2);
return;
}
}
}
setValue(re, Token.REMOVED_PHASE2);
}
public static void releaseEntry(@Unretained OffHeapRegionEntry re, @Released MemoryChunkWithRefCount expectedValue) {
long oldAddress = objectToAddress(expectedValue);
final long newAddress = objectToAddress(Token.REMOVED_PHASE2);
if (re.setAddress(oldAddress, newAddress) || re.getAddress() != newAddress) {
releaseAddress(oldAddress);
} /*else {
if (!calledSetValue || re.getAddress() != newAddress) {
expectedValue.release();
}
}*/
}
/**
* This bit is set to indicate that this address has data encoded in it.
*/
private static long ENCODED_BIT = 1L;
/**
* This bit is set to indicate that the encoded data is serialized.
*/
private static long SERIALIZED_BIT = 2L;
/**
* This bit is set to indicate that the encoded data is compressed.
*/
private static long COMPRESSED_BIT = 4L;
/**
* This bit is set to indicate that the encoded data is a long whose value fits in 7 bytes.
*/
private static long LONG_BIT = 8L;
/**
* size is in the range 0..7 so we only need 3 bits.
*/
private static long SIZE_MASK = 0x70L;
/**
* number of bits to shift the size by.
*/
private static int SIZE_SHIFT = 4;
// the msb of this byte is currently unused
/**
* Returns 0 if the data could not be encoded as an address.
*/
public static long encodeDataAsAddress(byte[] v, boolean isSerialized, boolean isCompressed) {
if (v.length < MAX_LENGTH_FOR_DATA_AS_ADDRESS) {
long result = 0L;
for (int i=0; i < v.length; i++) {
result |= v[i] & 0x00ff;
result <<= 8;
}
result |= (v.length << SIZE_SHIFT) | ENCODED_BIT;
if (isSerialized) {
result |= SERIALIZED_BIT;
}
if (isCompressed) {
result |= COMPRESSED_BIT;
}
return result;
} else if (isSerialized && !isCompressed) {
// Check for some special types that take more than 7 bytes to serialize
// but that might be able to be inlined with less than 8 bytes.
if (v[0] == DSCODE.LONG) {
// A long is currently always serialized as 8 bytes (9 if you include the dscode).
// But many long values will actually be small enough for is to encode in 7 bytes.
if ((v[1] == 0 && (v[2] & 0x80) == 0) || (v[1] == -1 && (v[2] & 0x80) != 0)) {
// The long can be encoded as 7 bytes since the most signification byte
// is simply an extension of the sign byte on the second most signification byte.
long result = 0L;
for (int i=2; i < v.length; i++) {
result |= v[i] & 0x00ff;
result <<= 8;
}
result |= (7 << SIZE_SHIFT) | LONG_BIT | SERIALIZED_BIT | ENCODED_BIT;
return result;
}
}
}
return 0L;
}
public static Object encodedAddressToObject(long addr, boolean decompress, boolean deserialize) {
boolean isSerialized = (addr & SERIALIZED_BIT) != 0;
byte[] bytes = encodedAddressToBytes(addr, decompress, false);
if (isSerialized) {
if (deserialize) {
return EntryEventImpl.deserialize(bytes);
} else {
return CachedDeserializableFactory.create(bytes);
}
} else {
return bytes;
}
}
static byte[] encodedAddressToBytes(long addr) {
byte[] result = encodedAddressToBytes(addr, true, false);
boolean isSerialized = (addr & SERIALIZED_BIT) != 0;
if (!isSerialized) {
result = EntryEventImpl.serialize(result);
}
return result;
}
/**
* If the address contains a byte[] return it.
* Otherwise return the serialize bytes in the address in a byte array.
*/
static byte[] encodedAddressToRawBytes(long addr) {
return encodedAddressToBytes(addr, true, false);
}
private static byte[] encodedAddressToBytes(long addr, boolean decompress, boolean compressedOk) {
assert (addr & ENCODED_BIT) != 0;
boolean isCompressed = (addr & COMPRESSED_BIT) != 0;
int size = (int) ((addr & SIZE_MASK) >> SIZE_SHIFT);
boolean isLong = (addr & LONG_BIT) != 0;
byte[] bytes;
if (isLong) {
bytes = new byte[9];
bytes[0] = DSCODE.LONG;
for (int i=8; i >=2; i--) {
addr >>= 8;
bytes[i] = (byte) (addr & 0x00ff);
}
if ((bytes[2] & 0x80) != 0) {
bytes[1] = -1;
} else {
bytes[1] = 0;
}
} else {
bytes = new byte[size];
for (int i=size-1; i >=0; i--) {
addr >>= 8;
bytes[i] = (byte) (addr & 0x00ff);
}
}
if (decompress && isCompressed) {
if (!compressedOk) {
throw new UnsupportedOperationException("Did not expect DataAsAddress to be compressed");
}
}
return bytes;
}
public static byte[] encodedAddressToBytes(long addr, boolean decompress, RegionEntryContext context) {
byte[] bytes = encodedAddressToBytes(addr, decompress, true);
if (decompress) {
boolean isCompressed = (addr & COMPRESSED_BIT) != 0;
if (isCompressed) {
long time = context.getCachePerfStats().startDecompression();
bytes = context.getCompressor().decompress(bytes);
context.getCachePerfStats().endDecompression(time);
}
}
return bytes;
}
/**
* The previous value at the address in 're' will be @Released and then the
* address in 're' will be set to the @Unretained address of 'v'.
*/
public static void setValue(@Released OffHeapRegionEntry re, @Unretained Object v) {
// setValue is called when synced so I don't need to worry
// about oldAddress being released by someone else.
final long newAddress = objectToAddress(v);
long oldAddress;
do {
oldAddress = re.getAddress();
} while (!re.setAddress(oldAddress, newAddress));
SimpleMemoryAllocatorImpl.setReferenceCountOwner(re);
releaseAddress(oldAddress);
SimpleMemoryAllocatorImpl.setReferenceCountOwner(null);
}
public static Token getValueAsToken(@Unretained OffHeapRegionEntry re) {
return addressToToken(re.getAddress());
}
@Unretained
public static Object _getValue(@Unretained OffHeapRegionEntry re) {
return addressToObject(re.getAddress(), false, null); // no context needed so decompress is false
}
public static boolean isOffHeap(long addr) {
if ((addr & ENCODED_BIT) != 0) return false;
if (addr < 0) return true;
addr >>= 1; // shift left 1 to convert to array index;
return addr >= addrToObj.length;
}
/**
* If the value stored at the location held in 're' is returned, then it will
* be Retained. If the value returned is 're' decompressed into another
* off-heap location, then 're' will be Unretained but the new,
* decompressed value will be Retained. Therefore, whichever is returned
* (the value at the address in 're' or the decompressed value) it will have
* been Retained.
*
* @return possible OFF_HEAP_OBJECT (caller must release)
*/
@Retained
public static Object _getValueRetain(@Retained @Unretained OffHeapRegionEntry re, boolean decompress, RegionEntryContext context) {
int retryCount = 0;
@Retained long addr = re.getAddress();
while (isOffHeap(addr)) {
if (Chunk.retain(addr)) {
@Unretained long addr2 = re.getAddress();
if (addr != addr2) {
retryCount = 0;
Chunk.release(addr, true);
// spin around and try again.
addr = addr2;
} else {
return addressToObject(addr, decompress, context);
}
} else {
// spin around and try again
long addr2 = re.getAddress();
retryCount++;
if (retryCount > 100) {
throw new IllegalStateException("retain failed addr=" + addr + " addr2=" + addr + " 100 times" + " history=" + SimpleMemoryAllocatorImpl.getFreeRefCountInfo(addr));
}
addr = addr2;
// Since retain returned false our region entry should have a different
// value in it. However the actual address could be the exact same one
// because addr was released, then reallocated from the free list and set
// back into this region entry. See bug 47782
}
}
return addressToObject(addr, decompress, context);
}
public static boolean isSerialized(long address) {
return (address & SERIALIZED_BIT) != 0;
}
public static boolean isCompressed(long address) {
return (address & COMPRESSED_BIT) != 0;
}
private static final ThreadLocal<Object> clearNeedsToCheckForOffHeap = new ThreadLocal<Object>();
public static boolean doesClearNeedToCheckForOffHeap() {
return clearNeedsToCheckForOffHeap.get() != null;
}
public static void doWithOffHeapClear(Runnable r) {
clearNeedsToCheckForOffHeap.set(Boolean.TRUE);
try {
r.run();
} finally {
clearNeedsToCheckForOffHeap.remove();
}
}
}